Work to Store.
[brdnet.git] / upmgr.pas
blob4d1af7409f356c9df753fcd5a2a77e86b3f6da7e
1 UNIT UPMGR;
2 {Upload Manager for brodnetd}
4 INTERFACE
5 USES Chat,TC,opcode,ServerLoop,MemStream,NetAddr;
7 IMPLEMENTATION
8 USES Store1;
10 type
11 tAggr_ptr=^tAggr;
12 tPrv_ptr=^tPrv;
13 tPrv=object
14 aggr:tAggr_ptr;
15 ch: ^tChat;
16 chan: byte;
17 next,prev: tPrv_ptr;
18 weight,wcur:Word;
19 isOpen,Active:boolean;
20 seglen:LongWord;
21 oinfo:tStoreObjectInfo;
22 procedure Init(ag:tAggr_ptr; var nchat:tChat; msg: tSMsg);
23 procedure OnMsg(msg:tSMsg; data:boolean);
24 procedure IdleTimeout;
25 procedure ChatTimeout(willwait:LongWord);
26 procedure Cont;
27 procedure DoGET(const fid:tfid; base,limit:LongWord);
28 procedure DoSEG(base,limit:LongWord);
29 procedure Start;
30 procedure Stop;
31 procedure DoClose;
32 end;
33 tAggr=object
34 tcs: tTCS;
35 rekt:boolean;
36 next,prev: tAggr_ptr;
37 prv:^tPrv;
38 Cnt:Byte;
39 procedure UnRef;
40 procedure Cont;
41 procedure Init(const source:tNetAddr);
42 procedure Done;
43 procedure TCTimeout;
44 end;
46 var Peers:^tAggr;
48 {Requests
49 Close();
50 GET(filehash:20; baseHi:word2; base:word4; limit:word4);
51 SEG(baseHi:word2; base:word4; limit:word4);
52 }{Responses
53 INFO(sizeHi:word2; size:word4; final:byte; seglen:word4);
54 FAIL(code:byte;...);
55 DONE();
58 procedure tPrv.DoGET(const fid:tfid; base,limit:LongWord);
59 var err:tmemorystream;
60 begin
61 if isOpen then oinfo.Close;
62 if Active then Stop; //opt
63 ch^.Ack;
64 oinfo.Open(fid);
65 {if not oinfo.final then begin
66 oinfo.rc:=200;
67 Close(oinfo.hnd);
68 end;}
69 if oinfo.rc>0 then begin
70 ch^.StreamInit(err,3);
71 err.WriteByte(upFAIL);
72 if oinfo.rc=1 then err.WriteByte(upErrNotFound)
73 else begin err.WriteByte(upErrIO); err.WriteByte(oinfo.rc) end;
74 ch^.Send(err);
75 end else begin
76 isopen:=true;
77 DoSeg(base,limit);
78 end;
79 end;
81 procedure tPrv.DoSEG(base,limit:LongWord);
82 var err:tmemorystream;
83 begin
84 if isOpen then begin
85 ch^.StreamInit(err,12);
86 oinfo.SegSeek(base);
87 if oinfo.rc>0 then begin
88 err.WriteByte(upFAIL);
89 err.WriteByte(upErrIO);
90 err.WriteByte(oinfo.rc);
91 ch^.Send(err);
92 if Active then Stop;
93 end else begin
94 err.WriteByte(upINFO);
95 err.WriteWord(0,2);
96 err.WriteWord(oinfo.length,4);
97 seglen:=limit;
98 if oinfo.seglen<seglen then seglen:=oinfo.seglen;
99 if oinfo.final
100 then err.WriteByte(1)
101 else err.WriteByte(0);
102 err.WriteWord(seglen,4);
103 ch^.Send(err);
104 if not Active then Start;
105 end end else begin
106 ch^.StreamInit(err,2);
107 err.WriteByte(upFAIL);
108 err.WriteByte(upErrSegNoGet);
109 ch^.Send(err);
110 end;
111 end;
113 procedure tPrv.Start;
114 begin
115 Assert(isOpen);
116 Assert(not Active);
117 Active:=true;
118 UnShedule(@IdleTimeout);
119 writeln('upmgr: Startig transfer');
120 if not assigned(aggr^.prv) then begin
121 next:=@self;
122 prev:=@self;
123 aggr^.prv:=@self;
124 aggr^.tcs.Start;
125 end else begin
126 next:=aggr^.prv^.next;
127 prev:=aggr^.prv;
128 prev^.next:=@self;
129 next^.prev:=@self;
130 end;
131 wcur:=weight;
132 end;
134 procedure tPrv.Stop;
135 begin
136 Assert(isOpen);
137 Assert(Active);
138 if prev<>@self then begin
139 prev^.next:=next;
140 next^.prev:=prev;
141 end else next:=nil;
142 if aggr^.prv=@self then aggr^.prv:=next;
143 active:=false;
144 Shedule(20000,@IdleTimeout);
145 writeln('upmgr: Stop');
146 end;
148 procedure tPrv.Cont;
149 var s:tMemoryStream;
150 var sz:LongWord;
151 var rs:LongWord;
152 var buf:array [1..2048] of byte;
153 begin
154 // writeln('upmgr: CONT! ',chan);
155 Assert(Active and isOpen);
156 sz:=aggr^.tcs.MaxSize(sizeof(buf))-1;
157 if sz>SegLen then sz:=SegLen;
158 //s.Init(GetMem(sz),0,sz);
159 Assert((sz+1)<=sizeof(buf));
160 s.Init(@buf,0,sizeof(buf)); aggr^.tcs.WriteHeaders(s);
161 s.WriteByte(Chan);
162 Assert(sz<=s.WrBufLen);
163 oinfo.ReadAhead(sz,s.WrBuf); //todo
164 oinfo.WaitRead;
165 Assert(oinfo.rc=0); //todo
166 s.WrEnd(sz);
167 aggr^.tcs.Send(s);
168 //FreeMem(s.base,s.size);
169 SegLen:=SegLen-sz;
170 dec(wcur);
171 if SegLen=0 then begin
172 ch^.StreamInit(s,2);
173 s.WriteByte(upDONE);
174 ch^.Send(s);
175 Stop;
176 end else
177 if (wcur=0) then begin
178 wcur:=weight;
179 aggr^.prv:=next;
180 end;
181 end;
183 procedure tPrv.DoClose;
184 begin
185 if Active then Stop;
186 if isOpen then oinfo.Close;
187 isOpen:=false;
188 UnShedule(@IdleTimeout);
189 ch^.Ack;
190 ch^.Close;
191 aggr^.UnRef;
192 FreeMem(@self,sizeof(self));
193 end;
195 procedure tPrv.OnMsg(msg:tSMsg; data:boolean);
196 var op:byte;
197 var hash:tfid;
198 var base:LongWord;
199 var limit:LongWord;
200 var err:tmemorystream;
201 label malformed;
202 begin
203 if not data then exit; //todo
204 Assert(not(aggr^.rekt and active));
205 if aggr^.rekt then exit;
206 if msg.stream.RdBufLen<1 then goto malformed;
207 op:=msg.stream.ReadByte;
208 writeln('upmgr: opcode=',op,' sz=',msg.stream.RdBufLen);
209 case op of
210 upClose: DoClose;
211 upGET: begin
212 if msg.stream.RdBufLen<>30 then goto malformed;
213 msg.stream.Read(hash,20);
214 if msg.stream.ReadWord(2)>0 then goto malformed;
215 base:=msg.stream.ReadWord(4);
216 limit:=msg.stream.ReadWord(4);
217 DoGet(hash,base,limit);
218 end;
219 upSEG: begin
220 if msg.stream.RdBufLen<10 then goto malformed;
221 if msg.stream.ReadWord(2)>0 then goto malformed;
222 base:=msg.stream.ReadWord(4);
223 limit:=msg.stream.ReadWord(4);
224 DoSEG(base, limit);
225 end;
226 else goto malformed;
227 end;
228 exit; malformed:
229 ch^.StreamInit(err,2);
230 err.WriteByte(upFAIL);
231 err.WriteByte(upErrMalformed);
232 ch^.Send(err);
233 writeln('upmgr: malformed request stage=1');
234 end;
236 procedure tPrv.ChatTimeout(willwait:LongWord);
237 var wasactive:boolean;
238 begin
239 if WillWait<8000 then exit;
240 writeln('upmgr: Chat timeout');
241 wasactive:=active;
242 if Active then Stop;
243 if isOpen then oinfo.Close;
244 isOpen:=false;
245 ch^.Close;
246 ch:=nil;
247 if wasactive then IdleTimeout {else it is sheduled};
248 end;
249 procedure tPrv.IdleTimeout;
250 var err:tMemoryStream;
251 begin
252 if assigned(ch) then begin {chat is still not rekt}
253 if not active then writeln('upmgr: Idle timeout');
254 ch^.StreamInit(err,1);
255 err.WriteByte(upClose);
256 try ch^.Send(err);
257 except end;
258 ch^.Close;
259 end;
260 {it is idle timeout, but may be called from aggr it tc tiomes out}
261 if Active then Stop;
262 if isOpen then oinfo.Close; {may still be open}
263 UnShedule(@IdleTimeout);
264 aggr^.UnRef;
265 FreeMem(@self,sizeof(self));
266 end;
268 procedure tPrv.Init(ag:tAggr_ptr; var nchat:tChat; msg: tSMsg);
269 begin
270 ch:=@nchat;
271 ch^.Callback:=@OnMsg;
272 ch^.TMHook:=@ChatTimeout;
273 aggr:=ag;
274 next:=nil;
275 prev:=nil;
276 chan:=msg.stream.readbyte;
277 writeln('upmgr: prv init chan=',chan);
278 weight:=100;
279 wcur:=0;
280 isOpen:=false; Active:=false;
281 inc(aggr^.Cnt);
282 Shedule(5000,@IdleTimeout);
283 OnMsg(msg,true);
284 end;
286 procedure tAggr.Init(const source:tNetAddr);
287 begin
288 writeln('upmgr: init');
289 next:=Peers;
290 prev:=nil;
291 rekt:=false;
292 if assigned(Peers) then Peers^.prev:=@self;
293 Peers:=@self;
294 tcs.Init(source);
295 tcs.CanSend:=@Cont;
296 tcs.maxTimeout:=8;
297 tcs.OnTimeout:=@TCTimeout;
298 prv:=nil;
299 cnt:=0;
300 end;
302 procedure tAggr.TCTimeout;
303 var pprv:pointer;
304 begin
305 writeln('upmgr: TCTimeout');
306 while assigned(prv) do begin
307 assert(not rekt);
308 pprv:=prv;
309 prv^.IdleTimeout;
310 if rekt then exit;
311 Assert(pprv<>prv);
312 end;
313 Done;
314 end;
315 procedure tAggr.Cont;
316 begin
317 if not assigned(prv) then exit;
318 prv^.Cont;
319 end;
320 procedure tAggr.UnRef;
321 begin
322 Assert(cnt>0);
323 Dec(Cnt);
324 writeln('upmgr: aggr unrefd');
325 if cnt=0 then begin
326 Done;
327 FreeMem(@self,sizeof(self));
328 end;
329 end;
330 procedure tAggr.Done;
331 begin
332 assert(not rekt);
333 writeln('upmgr: aggr close');
334 rekt:=true;
335 tcs.Done;
336 if assigned(prev) then prev^.next:=next else Peers:=next;
337 if assigned(next) then next^.prev:=prev;
338 end;
340 function FindAggr({const} addr:tNetAddr): tAggr_ptr;
341 begin
342 result:=Peers;
343 while assigned(result) do begin
344 if assigned(result^.next) then assert(result^.next^.prev=result);
345 if result^.tcs.remote=addr then exit;
346 result:=result^.next;
347 end;
348 end;
350 procedure ChatHandler(var nchat:tChat; msg:tSMsg);
351 var ag:^tAggr;
352 var pr:^tPrv;
353 var s:tMemoryStream;
354 const cMax=16;
355 begin
356 writeln('upmgr: ChatHandler');
357 msg.stream.skip({the initcode}1);
358 if msg.stream.RdBufLen<2 then begin
359 writeln('upmgr: malformed init');
360 nchat.StreamInit(s,16);
361 s.WriteByte(upFAIL);
362 s.writebyte(upErrMalformed);
363 nchat.Send(s);
364 nchat.Close;
365 writeln('upmgr: malformed request stage=0');
366 exit end;
367 {first get the ag}
368 ag:=FindAggr(msg.source^);
369 if assigned(ag) then begin
370 {check}
371 if ag^.Cnt>=cMax then begin
372 nchat.StreamInit(s,16);
373 s.WriteByte(upFAIL);
374 s.WriteByte(upErrHiChan);
375 s.WriteByte(cMax);
376 s.WriteByte(ag^.Cnt);
377 nchat.Send(s);
378 nchat.Close;
379 exit end;
380 end else begin
381 New(ag);
382 ag^.init(msg.source^);
383 end;
384 New(pr);
385 pr^.Init(ag,nchat,msg);
386 end;
388 BEGIN
389 SetChatHandler(opcode.upFileServer,@ChatHandler);
390 END.