Chat API update. Breaks everything.
[brdnet.git] / Upload.pas
blob34a00304d3428dff2bd5523d1577d0a213880681
1 UNIT Upload;
2 {Upload Manager for brodnetd}
4 INTERFACE
5 USES Chat,opcode,ServerLoop,MemStream,NetAddr,Store1;
7 IMPLEMENTATION
8 uses UploadThread;
10 type
11 tAggr_ptr=^tAggr;
12 tPrv_ptr=^tPrv;
13 tPrv=object
14 chan:byte;
15 aggr:tAggr_ptr;
16 uc:UploadThread.tChannel;
17 ch:^tChat;
18 isOpen,Active:boolean;
19 procedure Init(var nchat:tChat);
20 procedure OnMSG(msg:tSMsg;data:boolean);
21 procedure NotifyDOne;
22 procedure DoOPEN(const fid:tfid);
23 procedure DoLSEG(count:byte; base:array of LongWord; limit:array of LongWord);
24 procedure DoWEIGHT(nweight:word);
25 procedure Stop;
26 procedure Start;
27 procedure Close;
28 procedure Close(tell:boolean); overload; inline;
29 procedure ChatTimeout(willwait:LongWord);
30 end;
31 tAggr=object
32 thr:tUploadThr;
33 remote:tNetAddr;
34 refc:byte;
35 chan:array[0..11] of ^tPrv;
36 acks:Word; {ack counter, for timeouts}
37 timeout:word;
38 rateIF,sizeIF,
39 limRate,limSize:Single;
40 next,prev: tAggr_ptr;
41 procedure Free(ac:byte);{called by closing prv}
42 procedure Done;
43 procedure Start(ac:byte);
44 procedure Stop(ac:byte);
45 procedure Init(const source:tNetAddr);
46 procedure CalcRates(rxRate:Single);
47 procedure Periodic;
48 procedure OnCont(msg:tSMsg);
49 procedure OnAck(msg:tSMsg);
50 procedure ResetMark;
51 end;
53 var Peers:^tAggr;
54 procedure SendError(var ch:tChat;e1,e2:byte); forward;
55 function FindAggr({const} addr:tNetAddr): tAggr_ptr; forward;
57 { PROTOCOL }
58 { CLIENT SERVER(us)}{
59 init upFileServer <channel> -> ACK
60 upOPEN <id> -> upINFO <length> <final>
61 upOPEN <id> <b,l> -> upINFO <length> <final> <avl-bytes>
62 -> upFAIL <code> <code2> <details>
63 upLSEG [b,l] -> upSEGOK <available-bytes>
64 -> upUNAVL <next-avl>
65 -> upFAIL upErrIO
66 upSTOP -> ACK
67 upCLOSE -> ACK
68 upWEIGHT <weight> -> ACK
70 special server messages:
71 upEPROTO <code> <details> (protocol violation)
72 upCLOSE (close by server, usualy timeout)
73 error conditions:
74 upErrHiChan (channel number too high or too many connections)
75 upErrChanInUse
76 upErrNotFound (file was not found)
77 upErrIO (other error while opening/reading/seeking)
78 upEPROTO upErrNotOpen (LSEG without OPEN or afer STOP)
79 upEPROTO upErrTroll (trolling)
80 notes:
81 OPEN message can be merged with init, saving a round-trip
84 procedure tPrv.DoOPEN(const fid:tfid);
85 var err:tmemorystream;
86 begin
87 writeln('Upload: ',string(ch^.remote),'/',chan,' OPEN');
88 Stop;
89 if isOpen then uc.oi.Close;
90 isOpen:=false;
91 ch^.Ack;
92 uc.oi.Open(fid);
93 {if not oinfo.final then begin
94 oinfo.rc:=200;
95 Close(oinfo.hnd);
96 end;}
97 if uc.oi.rc>0 then begin
98 ch^.StreamInit(err,3);
99 err.WriteByte(upFAIL);
100 if uc.oi.rc=1 then err.WriteByte(upErrNotFound)
101 else begin err.WriteByte(upErrIO); err.WriteByte(uc.oi.rc) end;
102 ch^.Send(err);
103 end else begin
104 isopen:=true;
105 ch^.StreamInit(err,10);
106 err.WriteByte(upINFO);
107 err.WriteWord(uc.oi.length,4);
108 if uc.oi.final then err.WriteByte(1) else err.WriteByte(0);
109 err.WriteWord(0,4);
110 ch^.Send(err);
111 end;
112 end;
114 procedure tPrv.DoLSEG(count:byte; base,limit: array of LongWord);
115 var err:tmemorystream;
116 var i:byte;
117 var l,fb:LongWord;
118 var tbytes:LongWOrd;
119 begin
120 write('Upload: ',string(ch^.remote),'/',chan,' LSEG ');
121 if not isOpen then begin
122 ch^.StreamInit(err,3);
123 err.WriteByte(upEPROTO);
124 err.WriteByte(upErrNotOpen);
125 ch^.send(err);
126 writeln('notOpen');
127 exit end;
128 if count=0 then begin
129 ch^.StreamInit(err,3);
130 err.WriteByte(upEPROTO);
131 err.WriteByte(100);
132 ch^.send(err);
133 writeln('ZeroCount');
134 exit end;
135 stop;
136 uc.seg:=0;
137 tbytes:=0;
138 for i:=1 to count do begin
139 if limit[i-1]=0 then begin
140 ch^.StreamInit(err,3);
141 err.WriteByte(upEPROTO);
142 err.WriteByte(101);
143 ch^.send(err);
144 writeln('ZeroLimit');
145 exit end;
146 l:=uc.oi.SegmentLength(base[i-1]);
147 if l>0 then begin
148 inc(uc.seg);
149 uc.s[uc.seg].base:=base[i-1];
150 if l>limit[i-1] then l:=limit[i-1];
151 uc.s[uc.seg].len:=l;
152 inc(tbytes,l);
153 end else if i=1 then begin
154 {first failed, try find some seg}
155 uc.oi.GetSegAfter(base[0],fb,l);
156 ch^.StreamInit(err,5);
157 if l=0 then fb:=0;
158 err.WriteByte(upUNAVL);
159 err.WriteWord(fb,4);
160 ch^.Send(err);
161 exit;
162 end;
163 end;
164 ch^.StreamInit(err,6);
165 err.WriteByte(upSEGOK);
166 err.WriteWord(tbytes,4);
167 err.WriteByte(uc.seg);
168 ch^.Send(err);
169 writeln('x',uc.seg);
170 Start;
171 end;
173 procedure tPrv.DoWEIGHT(nweight:word);
174 begin
175 if nweight<50 then nweight:=50;
176 uc.Weight:=nweight;
177 ch^.Ack;
178 end;
180 procedure ChatHandler(var nchat:tChat; msg:tSMsg);
181 var ag:^tAggr;
182 var pr:^tPrv;
183 var chan:byte;
184 begin
185 msg.stream.skip({the initcode}1);
186 if msg.stream.RdBufLen<2 then begin SendError(nchat,upErrMalformed,0); exit end;
187 chan:=msg.stream.ReadByte;
188 if chan>high(tAggr.chan) then begin Senderror(nchat,upErrHiChan,chan); exit end;
189 ag:=FindAggr(msg.source^);
190 if not assigned(ag) then begin
191 New(ag);
192 ag^.init(msg.source^);
193 end else if assigned(ag^.chan[chan]) then begin SendError(nchat,upErrChanInUse,0); exit end;
194 New(pr);
195 pr^.aggr:=ag;
196 pr^.chan:=chan;
197 ag^.chan[chan]:=pr;
198 inc(ag^.refc);
199 pr^.Init(nchat);
200 if msg.stream.RdBufLen>0 {the request may be empty}
201 then pr^.OnMSG(msg,true);
202 end;
203 procedure tPrv.OnMSG(msg:tSMsg;data:boolean);
204 var op:byte;
205 var hash:tfid;
206 var base:LongWord;
207 var limit:LongWord;
208 var err:tmemorystream;
209 var count:byte;
210 var lbas:array [0..23] of LongWOrd;
211 var llim:array [0..23] of LongWOrd;
212 label malformed;
213 begin
214 if not data then exit; //todo
215 if msg.stream.RdBufLen<1 then goto malformed;
216 op:=msg.stream.ReadByte;
217 case op of
218 upClose: begin
219 ch^.Ack;
220 Close(false);
221 end;
222 upOPEN: begin
223 if msg.stream.RdBufLen<20 then goto malformed;
224 msg.stream.Read(hash,20);
225 DoOPEN(hash);
226 end;
227 upLSEG: begin
228 count:=0;
229 while (msg.stream.RdBufLen>0)and(count<=high(lbas)) do begin
230 if msg.stream.RdBufLen<8 then goto malformed;
231 lbas[count]:=msg.stream.ReadWord(4);
232 llim[count]:=msg.stream.ReadWord(4);
233 inc(count);
234 end;
235 DoLSEG(count,lbas,llim);
236 end;
237 upWEIGHT: begin
238 if msg.stream.RdBufLen<>2 then goto malformed;
239 base:=msg.stream.ReadWord(2);
240 DoWEIGHT(base);
241 end;
242 else goto malformed;
243 end;
244 exit; malformed:
245 ch^.StreamInit(err,3);
246 err.WriteByte(upEPROTO);
247 err.WriteByte(upErrMalformed);
248 ch^.Send(err);
249 end;
251 procedure tPrv.Init(var nchat:tChat);
252 begin
253 ch:=@nchat;
254 ch^.Callback:=@OnMsg;
255 ch^.TMHook:=@ChatTimeout;
256 uc.weight:=100;
257 isOpen:=false; Active:=false;
258 Shedule(5000,@Close);
259 end;
261 procedure tPrv.NotifyDone;
262 var err:tmemorystream;
263 begin
264 Stop;
265 ch^.StreamInit(err,2);
266 err.WriteByte(upDONE);
267 ch^.Send(err);
268 end;
270 procedure tPrv.Stop;
271 begin
272 if active then begin
273 active:=False;
274 Shedule(20000,@Close);
275 aggr^.Stop(chan);
276 end;
277 end;
278 procedure tPrv.Start;
279 begin
280 assert(isOpen);
281 if not active then UnShedule(@Close);
282 active:=true;
283 aggr^.Start(chan);
284 end;
286 procedure tPrv.Close(tell:boolean);
287 var err:tMemoryStream;
288 begin
289 assert(assigned(ch));
290 if tell then begin
291 ch^.StreamInit(err,1);
292 err.WriteByte(upClose);
293 ch^.Send(err);
294 end;
295 Stop;
296 if isOpen then uc.oi.Close;
297 isOpen:=false;
298 ch^.Close;
299 ch:=nil;
300 UnShedule(@Close);
301 Aggr^.Free(chan);
302 FreeMem(@self,sizeof(self));
303 end;
304 procedure tPrv.Close;
305 begin
306 Close(true);
307 end;
309 procedure tPrv.ChatTimeout(willwait:LongWord);
310 begin
311 if WillWait<8000 then exit;
312 writeln('Upload: prv for ',string(ch^.remote),'/',chan,' ChatTimeout');
313 Close(false);
314 end;
316 {***AGGREGATOR***}
318 procedure tAggr.Init(const source:tNetAddr);
319 begin
320 next:=Peers;
321 prev:=nil;
322 if assigned(Peers) then Peers^.prev:=@self;
323 Peers:=@self;
324 refc:=0;
325 acks:=0;
326 timeout:=0;
327 rateIF:=1;
328 sizeIF:=1;
329 limRate:=2000*1024*1024;
330 limSize:=4096;
331 remote:=source;
332 writeln('Upload: ',string(remote),' aggr init');
333 thr.Init(source);
334 CalcRates(2048);
335 SetMsgHandler(opcode.tccont,remote,@OnCont);
336 SetMsgHandler(opcode.tceack,remote,@OnAck);
337 end;
339 function FindAggr({const} addr:tNetAddr): tAggr_ptr;
340 begin
341 result:=Peers;
342 while assigned(result) do begin
343 if assigned(result^.next) then assert(result^.next^.prev=result);
344 if result^.remote=addr then exit;
345 result:=result^.next;
346 end;
347 end;
349 procedure SendError(var ch:tChat;e1,e2:byte);
350 var s:tMemoryStream;
351 begin
352 ch.StreamInit(s,3);
353 s.WriteByte(upFAIL);
354 s.WriteByte(e1);
355 s.WriteByte(e2);
356 ch.Send(s);
357 ch.Close;
358 end;
360 procedure tAggr.Free(ac:byte);
361 begin
362 assert(assigned(chan[ac]));
363 chan[ac]:=nil;
364 dec(refc);
365 if refc=0 then Done;
366 end;
367 procedure tAggr.Done;
368 begin
369 write('Upload: ',string(remote),' aggr done');
370 thr.Done; writeln(' thrdone');
371 UnShedule(@Periodic);
372 if assigned(prev) then prev^.next:=next else Peers:=next;
373 if assigned(next) then next^.prev:=prev;
374 SetMsgHandler(opcode.tccont,remote,nil);
375 SetMsgHandler(opcode.tceack,remote,nil);
376 FreeMem(@Self,sizeof(self));
377 end;
379 procedure tAggr.Start(ac:byte);
380 begin
381 //writeln('Upload: aggr for ',string(remote),'/',ac,' start seg=',chan[ac]^.uc.seg);
382 assert(assigned(chan[ac]));
383 EnterCriticalSection(thr.crit);
384 assert(not assigned(thr.chans[ac]));
385 thr.chans[ac]:=@chan[ac]^.uc;
386 chan[ac]^.uc.wcur:=chan[ac]^.uc.weight;
387 UnShedule(@Periodic);
388 Shedule(700,@Periodic);
389 {do not reset if resuming from wait}
390 if thr.stop {or thr.wait} then ResetMark else {do not reset if running};
391 thr.Start; {wake up, or start if not running}
392 LeaveCriticalSection(thr.crit);
393 end;
395 procedure tAggr.Stop(ac:byte);
396 begin
397 //writeln('Upload: aggr for ',string(remote),' stop chan ',ac);
398 assert(assigned(chan[ac]));
399 EnterCriticalSection(thr.crit);
400 assert(assigned(thr.chans[ac]));
401 thr.chans[ac]:=nil;
402 LeaveCriticalSection(thr.crit);
403 end;
405 procedure tAggr.Periodic;
406 var i:byte;
407 var e:boolean;
408 begin
409 if (thr.stop)or(thr.wait) then begin
410 for i:=0 to high(chan) do if assigned(chan[i]) then with chan[i]^ do begin
411 if not active then continue;
412 EnterCriticalSection(thr.crit);
413 //writeln('periodic ',thr.stop,thr.wait,uc.seg);
414 e:=uc.Seg=0;
415 LeaveCriticalSection(thr.crit);
416 if e
417 then NotifyDone;
418 end;
419 exit end;
420 if acks=0 then begin
421 inc(Timeout);
422 if timeout>=10 then begin
423 refc:=255;
424 for i:=0 to high(chan) do if assigned(chan[i]) then chan[i]^.Close;
425 Done;exit;end;
426 if timeout=4 then CalcRates(512);
427 end else timeout:=0;
428 acks:=0;
429 Shedule(700,@Periodic);
430 end;
432 {$I UploadTC.pas}
434 BEGIN
435 Peers:=nil;
436 SetChatHandler(opcode.upFileServer,@ChatHandler);
437 END.