File Sharing download/upload units impl
[brdnet.git] / Upload.pas
blob19ee6efda349069b045785f11f55619fb01b6c2a
1 UNIT Upload;
2 {Upload Manager for brodnetd}
4 INTERFACE
5 USES Chat,opcode,ServerLoop,MemStream,NetAddr,Store1;
7 IMPLEMENTATION
8 uses UploadThread;
10 type
11 tAggr_ptr=^tAggr;
12 tPrv_ptr=^tPrv;
13 tPrv=object
14 chan:byte;
15 aggr:tAggr_ptr;
16 uc:UploadThread.tChannel;
17 ch:^tChat;
18 isOpen,Active:boolean;
19 procedure Init(var nchat:tChat);
20 procedure OnMSG(msg:tSMsg;data:boolean);
21 procedure NotifyDOne;
22 procedure DoOPEN(const fid:tfid);
23 procedure DoLSEG(count:byte; base:array of LongWord; limit:array of LongWord);
24 procedure DoWEIGHT(nweight:word);
25 procedure Stop;
26 procedure Start;
27 procedure Close;
28 procedure Close(tell:boolean); overload; inline;
29 procedure ChatTimeout(willwait:LongWord);
30 end;
31 tAggr=object
32 thr:tUploadThr;
33 remote:tNetAddr;
34 refc:byte;
35 chan:array[0..11] of ^tPrv;
36 acks:Word; {ack counter, for timeouts}
37 timeout:word;
38 rateIF,sizeIF,
39 limRate,limSize:Single;
40 next,prev: tAggr_ptr;
41 procedure Free(ac:byte);{called by closing prv}
42 procedure Done;
43 procedure Start(ac:byte);
44 procedure Stop(ac:byte);
45 procedure Init(const source:tNetAddr);
46 procedure CalcRates(rxRate:Single);
47 procedure Periodic;
48 procedure OnCont(msg:tSMsg);
49 procedure OnAck(msg:tSMsg);
50 procedure ResetMark;
51 end;
53 var Peers:^tAggr;
54 procedure SendError(var ch:tChat;e1,e2:byte); forward;
55 function FindAggr({const} addr:tNetAddr): tAggr_ptr; forward;
57 { PROTOCOL }
58 { CLIENT SERVER(us)}{
59 init upFileServer <channel> -> ACK
60 upOPEN <id> -> upINFO <length> <final>
61 upOPEN <id> <b,l> -> upINFO <length> <final> <avl-bytes>
62 -> upFAIL <code> <code2> <details>
63 upLSEG [b,l] -> upSEGOK <available-bytes>
64 -> upUNAVL <next-avl>
65 -> upFAIL upErrIO
66 upSTOP -> ACK
67 upCLOSE -> ACK
68 upWEIGHT <weight> -> ACK
70 special server messages:
71 upEPROTO <code> <details> (protocol violation)
72 upCLOSE (close by server, usualy timeout)
73 error conditions:
74 upErrHiChan (channel number too high or too many connections)
75 upErrChanInUse
76 upErrNotFound (file was not found)
77 upErrIO (other error while opening/reading/seeking)
78 upEPROTO upErrNotOpen (LSEG without OPEN or afer STOP)
79 upEPROTO upErrTroll (trolling)
80 notes:
81 OPEN message can be merged with init, saving a round-trip
84 procedure tPrv.DoOPEN(const fid:tfid);
85 var err:tmemorystream;
86 begin
87 writeln('Upload: ',string(ch^.remote),'/',chan,' OPEN');
88 Stop;
89 if isOpen then uc.oi.Close;
90 isOpen:=false;
91 ch^.Ack;
92 uc.oi.Open(fid);
93 {if not oinfo.final then begin
94 oinfo.rc:=200;
95 Close(oinfo.hnd);
96 end;}
97 if uc.oi.rc>0 then begin
98 ch^.StreamInit(err,3);
99 err.WriteByte(upFAIL);
100 if uc.oi.rc=1 then err.WriteByte(upErrNotFound)
101 else begin err.WriteByte(upErrIO); err.WriteByte(uc.oi.rc) end;
102 ch^.Send(err);
103 end else begin
104 isopen:=true;
105 ch^.StreamInit(err,10);
106 err.WriteByte(upINFO);
107 err.WriteWord(uc.oi.length,4);
108 if uc.oi.final then err.WriteByte(1) else err.WriteByte(0);
109 err.WriteWord(0,4);
110 ch^.Send(err);
111 end;
112 end;
114 procedure tPrv.DoLSEG(count:byte; base,limit: array of LongWord);
115 var err:tmemorystream;
116 var i:byte;
117 var l,fb:LongWord;
118 var tbytes:LongWOrd;
119 begin
120 writeln('Upload: ',string(ch^.remote),'/',chan,' LSEG');
121 if not isOpen then begin
122 ch^.StreamInit(err,3);
123 err.WriteByte(upEPROTO);
124 err.WriteByte(upErrNotOpen);
125 ch^.send(err);
126 writeln('notOpen');
127 exit end;
128 if count=0 then begin
129 ch^.StreamInit(err,3);
130 err.WriteByte(upEPROTO);
131 err.WriteByte(100);
132 ch^.send(err);
133 writeln('ZeroCount');
134 exit end;
135 stop;
136 uc.seg:=0;
137 tbytes:=0;
138 for i:=1 to count do begin
139 if limit[i-1]=0 then begin
140 ch^.StreamInit(err,3);
141 err.WriteByte(upEPROTO);
142 err.WriteByte(101);
143 ch^.send(err);
144 writeln('ZeroLimit');
145 exit end;
146 l:=uc.oi.SegmentLength(base[i-1]);
147 if l>0 then begin
148 inc(uc.seg);
149 uc.s[uc.seg].base:=base[i-1];
150 if l>limit[i-1] then l:=limit[i-1];
151 uc.s[uc.seg].len:=l;
152 inc(tbytes,l);
153 end else if i=1 then begin
154 {first failed, try find some seg}
155 uc.oi.GetSegAfter(base[0],fb,l);
156 ch^.StreamInit(err,5);
157 if l=0 then fb:=0;
158 err.WriteByte(upUNAVL);
159 err.WriteWord(fb,4);
160 ch^.Send(err);
161 exit;
162 end;
163 end;
164 ch^.StreamInit(err,6);
165 err.WriteByte(upSEGOK);
166 err.WriteWord(tbytes,4);
167 err.WriteByte(uc.seg);
168 ch^.Send(err);
169 Start;
170 end;
172 procedure tPrv.DoWEIGHT(nweight:word);
173 begin
174 if nweight<50 then nweight:=50;
175 uc.Weight:=nweight;
176 ch^.Ack;
177 end;
179 procedure ChatHandler(var nchat:tChat; msg:tSMsg);
180 var ag:^tAggr;
181 var pr:^tPrv;
182 var chan:byte;
183 begin
184 writeln('Upload: ChatHandler');
185 msg.stream.skip({the initcode}1);
186 if msg.stream.RdBufLen<2 then begin SendError(nchat,upErrMalformed,0); exit end;
187 chan:=msg.stream.ReadByte;
188 writeln(chan);
189 if chan>high(tAggr.chan) then begin Senderror(nchat,upErrHiChan,chan); exit end;
190 ag:=FindAggr(msg.source^);
191 if not assigned(ag) then begin
192 New(ag);
193 ag^.init(msg.source^);
194 end else if assigned(ag^.chan[chan]) then begin SendError(nchat,upErrChanInUse,0); exit end;
195 New(pr);
196 pr^.aggr:=ag;
197 pr^.chan:=chan;
198 ag^.chan[chan]:=pr;
199 inc(ag^.refc);
200 pr^.Init(nchat);
201 if msg.stream.RdBufLen>0 {the request may be empty}
202 then pr^.OnMSG(msg,true);
203 end;
204 procedure tPrv.OnMSG(msg:tSMsg;data:boolean);
205 var op:byte;
206 var hash:tfid;
207 var base:LongWord;
208 var limit:LongWord;
209 var err:tmemorystream;
210 var count:byte;
211 var lbas:array [0..23] of LongWOrd;
212 var llim:array [0..23] of LongWOrd;
213 label malformed;
214 begin
215 if not data then exit; //todo
216 if msg.stream.RdBufLen<1 then goto malformed;
217 op:=msg.stream.ReadByte;
218 writeln('Upload: ',string(ch^.remote),' opcode=',op,' sz=',msg.stream.RdBufLen);
219 case op of
220 upClose: begin
221 ch^.Ack;
222 Close(false);
223 end;
224 upOPEN: begin
225 if msg.stream.RdBufLen<20 then goto malformed;
226 msg.stream.Read(hash,20);
227 DoOPEN(hash);
228 end;
229 upLSEG: begin
230 count:=0;
231 while (msg.stream.RdBufLen>0)and(count<=high(lbas)) do begin
232 if msg.stream.RdBufLen<8 then goto malformed;
233 lbas[count]:=msg.stream.ReadWord(4);
234 llim[count]:=msg.stream.ReadWord(4);
235 inc(count);
236 end;
237 DoLSEG(count,lbas,llim);
238 end;
239 upWEIGHT: begin
240 if msg.stream.RdBufLen<>2 then goto malformed;
241 base:=msg.stream.ReadWord(2);
242 DoWEIGHT(base);
243 end;
244 else goto malformed;
245 end;
246 exit; malformed:
247 ch^.StreamInit(err,3);
248 err.WriteByte(upEPROTO);
249 err.WriteByte(upErrMalformed);
250 ch^.Send(err);
251 writeln('Upload: malformed request stage=1');
252 end;
254 procedure tPrv.Init(var nchat:tChat);
255 begin
256 ch:=@nchat;
257 ch^.Callback:=@OnMsg;
258 ch^.TMHook:=@ChatTimeout;
259 uc.weight:=100;
260 isOpen:=false; Active:=false;
261 Shedule(5000,@Close);
262 writeln('Upload: prv for ',string(ch^.remote),'/',chan,' init');
263 end;
265 procedure tPrv.NotifyDone;
266 var err:tmemorystream;
267 begin
268 Stop;
269 ch^.StreamInit(err,2);
270 err.WriteByte(upDONE);
271 ch^.Send(err);
272 end;
274 procedure tPrv.Stop;
275 begin
276 if active then begin
277 active:=False;
278 Shedule(20000,@Close);
279 aggr^.Stop(chan);
280 end;
281 writeln('Upload: prv for ',string(ch^.remote),'/',chan,' stop');
282 end;
283 procedure tPrv.Start;
284 begin
285 assert(isOpen);
286 if not active then UnShedule(@Close);
287 active:=true;
288 aggr^.Start(chan);
289 writeln('Upload: prv for ',string(ch^.remote),'/',chan,' start');
290 end;
292 procedure tPrv.Close(tell:boolean);
293 var err:tMemoryStream;
294 begin
295 assert(assigned(ch));
296 writeln('Upload: prv for ',string(ch^.remote),'/',chan,' close');
297 if tell then begin
298 ch^.StreamInit(err,1);
299 err.WriteByte(upClose);
300 ch^.Send(err);
301 end;
302 Stop;
303 if isOpen then uc.oi.Close;
304 isOpen:=false;
305 ch^.Close;
306 ch:=nil;
307 UnShedule(@Close);
308 Aggr^.Free(chan);
309 FreeMem(@self,sizeof(self));
310 end;
311 procedure tPrv.Close;
312 begin
313 Close(true);
314 end;
316 procedure tPrv.ChatTimeout(willwait:LongWord);
317 begin
318 if WillWait<8000 then exit;
319 writeln('Upload: prv for ',string(ch^.remote),'/',chan,' ChatTimeout');
320 Close;
321 end;
323 {***AGGREGATOR***}
325 procedure tAggr.Init(const source:tNetAddr);
326 begin
327 next:=Peers;
328 prev:=nil;
329 if assigned(Peers) then Peers^.prev:=@self;
330 Peers:=@self;
331 refc:=0;
332 acks:=0;
333 timeout:=0;
334 rateIF:=1;
335 sizeIF:=1;
336 limRate:=2000*1024*1024;
337 limSize:=4096;
338 remote:=source;
339 writeln('Upload: aggr for ',string(remote),' init');
340 thr.Init(source);
341 CalcRates(2048);
342 SetMsgHandler(opcode.tccont,remote,@OnCont);
343 SetMsgHandler(opcode.tceack,remote,@OnAck);
344 end;
346 function FindAggr({const} addr:tNetAddr): tAggr_ptr;
347 begin
348 result:=Peers;
349 while assigned(result) do begin
350 if assigned(result^.next) then assert(result^.next^.prev=result);
351 if result^.remote=addr then exit;
352 result:=result^.next;
353 end;
354 end;
356 procedure SendError(var ch:tChat;e1,e2:byte);
357 var s:tMemoryStream;
358 begin
359 ch.StreamInit(s,3);
360 s.WriteByte(upFAIL);
361 s.WriteByte(e1);
362 s.WriteByte(e2);
363 ch.Send(s);
364 ch.Close;
365 end;
367 procedure tAggr.Free(ac:byte);
368 begin
369 assert(assigned(chan[ac]));
370 chan[ac]:=nil;
371 dec(refc);
372 if refc=0 then Done;
373 end;
374 procedure tAggr.Done;
375 begin
376 write('Upload: aggr for ',string(remote),' done');
377 thr.Done; writeln(' thrdone');
378 UnShedule(@Periodic);
379 if assigned(prev) then prev^.next:=next else Peers:=next;
380 if assigned(next) then next^.prev:=prev;
381 SetMsgHandler(opcode.tccont,remote,nil);
382 SetMsgHandler(opcode.tceack,remote,nil);
383 FreeMem(@Self,sizeof(self));
384 end;
386 procedure tAggr.Start(ac:byte);
387 begin
388 writeln('Upload: aggr for ',string(remote),' start chan ',ac);
389 assert(assigned(chan[ac]));
390 EnterCriticalSection(thr.crit);
391 assert(not assigned(thr.chans[ac]));
392 thr.chans[ac]:=@chan[ac]^.uc;
393 chan[ac]^.uc.wcur:=chan[ac]^.uc.weight;
394 UnShedule(@Periodic);
395 Shedule(700,@Periodic);
396 if thr.stop or thr.wait then ResetMark else {do not reset if running};
397 thr.Start; {wake up, or start if not running}
398 LeaveCriticalSection(thr.crit);
399 end;
401 procedure tAggr.Stop(ac:byte);
402 begin
403 writeln('Upload: aggr for ',string(remote),' stop chan ',ac);
404 assert(assigned(chan[ac]));
405 EnterCriticalSection(thr.crit);
406 assert(assigned(thr.chans[ac]));
407 thr.chans[ac]:=nil;
408 LeaveCriticalSection(thr.crit);
409 end;
411 procedure tAggr.Periodic;
412 var i:byte;
413 var e:boolean;
414 begin
415 if (thr.stop)or(thr.wait) then begin
416 for i:=0 to high(chan) do if assigned(chan[i]) then with chan[i]^ do begin
417 if not active then continue;
418 EnterCriticalSection(thr.crit);
419 e:=uc.Seg=0;
420 LeaveCriticalSection(thr.crit);
421 if e then NotifyDone;
422 end;
423 exit end;
424 if acks=0 then begin
425 inc(Timeout);
426 if timeout>=10 then begin
427 refc:=255;
428 for i:=0 to high(chan) do if assigned(chan[i]) then chan[i]^.Close;
429 Done;exit;end;
430 if timeout=4 then CalcRates(512);
431 end else timeout:=0;
432 acks:=0;
433 Shedule(700,@Periodic);
434 end;
436 {$I UploadTC.pas}
438 BEGIN
439 Peers:=nil;
440 SetChatHandler(opcode.upFileServer,@ChatHandler);
441 END.