2 {Upload Manager for brodnetd}
5 USES Chat
,opcode
,ServerLoop
,MemStream
,NetAddr
,Store1
;
16 uc
:UploadThread
.tChannel
;
18 isOpen
,Active
:boolean;
19 procedure Init(var nchat
:tChat
);
20 procedure OnMSG(msg
:tSMsg
;data
:boolean);
22 procedure DoOPEN(const fid
:tfid
);
23 procedure DoLSEG(count
:byte; base
:array of LongWord
; limit
:array of LongWord
);
24 procedure DoWEIGHT(nweight
:word);
28 procedure Close(tell
:boolean); overload
; inline;
29 procedure ChatTimeout(willwait
:LongWord
);
35 chan
:array[0..11] of ^tPrv
;
36 acks
:Word; {ack counter, for timeouts}
39 limRate
,limSize
:Single;
41 procedure Free(ac
:byte);{called by closing prv}
43 procedure Start(ac
:byte);
44 procedure Stop(ac
:byte);
45 procedure Init(const source
:tNetAddr
);
46 procedure CalcRates(rxRate
:Single);
48 procedure OnCont(msg
:tSMsg
);
49 procedure OnAck(msg
:tSMsg
);
54 procedure SendError(var ch
:tChat
;e1
,e2
:byte); forward;
55 function FindAggr({const} addr
:tNetAddr
): tAggr_ptr
; forward;
59 init upFileServer <channel> -> ACK
60 upOPEN <id> -> upINFO <length> <final>
61 upOPEN <id> <b,l> -> upINFO <length> <final> <avl-bytes>
62 -> upFAIL <code> <code2> <details>
63 upLSEG [b,l] -> upSEGOK <available-bytes>
68 upWEIGHT <weight> -> ACK
70 special server messages:
71 upEPROTO <code> <details> (protocol violation)
72 upCLOSE (close by server, usualy timeout)
74 upErrHiChan (channel number too high or too many connections)
76 upErrNotFound (file was not found)
77 upErrIO (other error while opening/reading/seeking)
78 upEPROTO upErrNotOpen (LSEG without OPEN or afer STOP)
79 upEPROTO upErrTroll (trolling)
81 OPEN message can be merged with init, saving a round-trip
84 procedure tPrv
.DoOPEN(const fid
:tfid
);
85 var err
:tmemorystream
;
87 writeln('Upload: ',string(ch
^.remote
),'/',chan
,' OPEN');
89 if isOpen
then uc
.oi
.Close
;
93 {if not oinfo.final then begin
97 if uc
.oi
.rc
>0 then begin
98 ch
^.StreamInit(err
,3);
99 err
.WriteByte(upFAIL
);
100 if uc
.oi
.rc
=1 then err
.WriteByte(upErrNotFound
)
101 else begin err
.WriteByte(upErrIO
); err
.WriteByte(uc
.oi
.rc
) end;
105 ch
^.StreamInit(err
,10);
106 err
.WriteByte(upINFO
);
107 err
.WriteWord(uc
.oi
.length
,4);
108 if uc
.oi
.final
then err
.WriteByte(1) else err
.WriteByte(0);
114 procedure tPrv
.DoLSEG(count
:byte; base
,limit
: array of LongWord
);
115 var err
:tmemorystream
;
120 writeln('Upload: ',string(ch
^.remote
),'/',chan
,' LSEG');
121 if not isOpen
then begin
122 ch
^.StreamInit(err
,3);
123 err
.WriteByte(upEPROTO
);
124 err
.WriteByte(upErrNotOpen
);
128 if count
=0 then begin
129 ch
^.StreamInit(err
,3);
130 err
.WriteByte(upEPROTO
);
133 writeln('ZeroCount');
138 for i
:=1 to count
do begin
139 if limit
[i
-1]=0 then begin
140 ch
^.StreamInit(err
,3);
141 err
.WriteByte(upEPROTO
);
144 writeln('ZeroLimit');
146 l
:=uc
.oi
.SegmentLength(base
[i
-1]);
149 uc
.s
[uc
.seg
].base
:=base
[i
-1];
150 if l
>limit
[i
-1] then l
:=limit
[i
-1];
153 end else if i
=1 then begin
154 {first failed, try find some seg}
155 uc
.oi
.GetSegAfter(base
[0],fb
,l
);
156 ch
^.StreamInit(err
,5);
158 err
.WriteByte(upUNAVL
);
164 ch
^.StreamInit(err
,6);
165 err
.WriteByte(upSEGOK
);
166 err
.WriteWord(tbytes
,4);
167 err
.WriteByte(uc
.seg
);
172 procedure tPrv
.DoWEIGHT(nweight
:word);
174 if nweight
<50 then nweight
:=50;
179 procedure ChatHandler(var nchat
:tChat
; msg
:tSMsg
);
184 writeln('Upload: ChatHandler');
185 msg
.stream
.skip({the initcode}1);
186 if msg
.stream
.RdBufLen
<2 then begin SendError(nchat
,upErrMalformed
,0); exit
end;
187 chan
:=msg
.stream
.ReadByte
;
189 if chan
>high(tAggr
.chan
) then begin Senderror(nchat
,upErrHiChan
,chan
); exit
end;
190 ag
:=FindAggr(msg
.source
^);
191 if not assigned(ag
) then begin
193 ag
^.init(msg
.source
^);
194 end else if assigned(ag
^.chan
[chan
]) then begin SendError(nchat
,upErrChanInUse
,0); exit
end;
201 if msg
.stream
.RdBufLen
>0 {the request may be empty}
202 then pr
^.OnMSG(msg
,true);
204 procedure tPrv
.OnMSG(msg
:tSMsg
;data
:boolean);
209 var err
:tmemorystream
;
211 var lbas
:array [0..23] of LongWOrd
;
212 var llim
:array [0..23] of LongWOrd
;
215 if not data
then exit
; //todo
216 if msg
.stream
.RdBufLen
<1 then goto malformed
;
217 op
:=msg
.stream
.ReadByte
;
218 writeln('Upload: ',string(ch
^.remote
),' opcode=',op
,' sz=',msg
.stream
.RdBufLen
);
225 if msg
.stream
.RdBufLen
<20 then goto malformed
;
226 msg
.stream
.Read(hash
,20);
231 while (msg
.stream
.RdBufLen
>0)and(count
<=high(lbas
)) do begin
232 if msg
.stream
.RdBufLen
<8 then goto malformed
;
233 lbas
[count
]:=msg
.stream
.ReadWord(4);
234 llim
[count
]:=msg
.stream
.ReadWord(4);
237 DoLSEG(count
,lbas
,llim
);
240 if msg
.stream
.RdBufLen
<>2 then goto malformed
;
241 base
:=msg
.stream
.ReadWord(2);
247 ch
^.StreamInit(err
,3);
248 err
.WriteByte(upEPROTO
);
249 err
.WriteByte(upErrMalformed
);
251 writeln('Upload: malformed request stage=1');
254 procedure tPrv
.Init(var nchat
:tChat
);
257 ch
^.Callback
:=@OnMsg
;
258 ch
^.TMHook
:=@ChatTimeout
;
260 isOpen
:=false; Active
:=false;
261 Shedule(5000,@Close
);
262 writeln('Upload: prv for ',string(ch
^.remote
),'/',chan
,' init');
265 procedure tPrv
.NotifyDone
;
266 var err
:tmemorystream
;
269 ch
^.StreamInit(err
,2);
270 err
.WriteByte(upDONE
);
278 Shedule(20000,@Close
);
281 writeln('Upload: prv for ',string(ch
^.remote
),'/',chan
,' stop');
283 procedure tPrv
.Start
;
286 if not active
then UnShedule(@Close
);
289 writeln('Upload: prv for ',string(ch
^.remote
),'/',chan
,' start');
292 procedure tPrv
.Close(tell
:boolean);
293 var err
:tMemoryStream
;
295 assert(assigned(ch
));
296 writeln('Upload: prv for ',string(ch
^.remote
),'/',chan
,' close');
298 ch
^.StreamInit(err
,1);
299 err
.WriteByte(upClose
);
303 if isOpen
then uc
.oi
.Close
;
309 FreeMem(@self
,sizeof(self
));
311 procedure tPrv
.Close
;
316 procedure tPrv
.ChatTimeout(willwait
:LongWord
);
318 if WillWait
<8000 then exit
;
319 writeln('Upload: prv for ',string(ch
^.remote
),'/',chan
,' ChatTimeout');
325 procedure tAggr
.Init(const source
:tNetAddr
);
329 if assigned(Peers
) then Peers
^.prev
:=@self
;
336 limRate
:=2000*1024*1024;
339 writeln('Upload: aggr for ',string(remote
),' init');
342 SetMsgHandler(opcode
.tccont
,remote
,@OnCont
);
343 SetMsgHandler(opcode
.tceack
,remote
,@OnAck
);
346 function FindAggr({const} addr
:tNetAddr
): tAggr_ptr
;
349 while assigned(result
) do begin
350 if assigned(result
^.next
) then assert(result
^.next
^.prev
=result
);
351 if result
^.remote
=addr
then exit
;
352 result
:=result
^.next
;
356 procedure SendError(var ch
:tChat
;e1
,e2
:byte);
367 procedure tAggr
.Free(ac
:byte);
369 assert(assigned(chan
[ac
]));
374 procedure tAggr
.Done
;
376 write('Upload: aggr for ',string(remote
),' done');
377 thr
.Done
; writeln(' thrdone');
378 UnShedule(@Periodic
);
379 if assigned(prev
) then prev
^.next
:=next
else Peers
:=next
;
380 if assigned(next
) then next
^.prev
:=prev
;
381 SetMsgHandler(opcode
.tccont
,remote
,nil);
382 SetMsgHandler(opcode
.tceack
,remote
,nil);
383 FreeMem(@Self
,sizeof(self
));
386 procedure tAggr
.Start(ac
:byte);
388 writeln('Upload: aggr for ',string(remote
),' start chan ',ac
);
389 assert(assigned(chan
[ac
]));
390 EnterCriticalSection(thr
.crit
);
391 assert(not assigned(thr
.chans
[ac
]));
392 thr
.chans
[ac
]:=@chan
[ac
]^.uc
;
393 chan
[ac
]^.uc
.wcur
:=chan
[ac
]^.uc
.weight
;
394 UnShedule(@Periodic
);
395 Shedule(700,@Periodic
);
396 if thr
.stop
or thr
.wait
then ResetMark
else {do not reset if running};
397 thr
.Start
; {wake up, or start if not running}
398 LeaveCriticalSection(thr
.crit
);
401 procedure tAggr
.Stop(ac
:byte);
403 writeln('Upload: aggr for ',string(remote
),' stop chan ',ac
);
404 assert(assigned(chan
[ac
]));
405 EnterCriticalSection(thr
.crit
);
406 assert(assigned(thr
.chans
[ac
]));
408 LeaveCriticalSection(thr
.crit
);
411 procedure tAggr
.Periodic
;
415 if (thr
.stop
)or(thr
.wait
) then begin
416 for i
:=0 to high(chan
) do if assigned(chan
[i
]) then with chan
[i
]^ do begin
417 if not active
then continue
;
418 EnterCriticalSection(thr
.crit
);
420 LeaveCriticalSection(thr
.crit
);
421 if e
then NotifyDone
;
426 if timeout
>=10 then begin
428 for i
:=0 to high(chan
) do if assigned(chan
[i
]) then chan
[i
]^.Close
;
430 if timeout
=4 then CalcRates(512);
433 Shedule(700,@Periodic
);
440 SetChatHandler(opcode
.upFileServer
,@ChatHandler
);