UpMgr work + Tests and temp store.
[brdnet.git] / upmgr.pas
blobc4480d355be9043064a3d2f9136fed9ec12ef429
1 UNIT UPMGR;
2 {Upload Manager for brodnetd}
4 INTERFACE
5 USES Chat,TC,opcode,ServerLoop,MemStream,NetAddr;
7 IMPLEMENTATION
8 USES ZidanStore;
10 type
11 tAggr_ptr=^tAggr;
12 tPrv_ptr=^tPrv;
13 tPrv=object
14 aggr:tAggr_ptr;
15 ch: ^tChat;
16 chan: byte;
17 next,prev: tPrv_ptr;
18 weight,wcur:Word;
19 isOpen,Active:boolean;
20 seglen:LongWord;
21 oinfo:tStoreObjectInfo;
22 datafile:file of byte;
23 procedure Init(ag:tAggr_ptr; var nchat:tChat; msg: tSMsg);
24 procedure OnMsg(msg:tSMsg; data:boolean);
25 procedure IdleTimeout;
26 procedure ChatTimeout(willwait:LongWord);
27 procedure Cont;
28 procedure DoGET(const fid:tfid; base,limit:LongWord);
29 procedure DoSEG(base,limit:LongWord);
30 procedure Start;
31 procedure Stop;
32 procedure DoClose;
33 end;
34 tAggr=object
35 tcs: tTCS;
36 rekt:boolean;
37 next,prev: tAggr_ptr;
38 prv:^tPrv;
39 Cnt:Byte;
40 procedure UnRef;
41 procedure Cont;
42 procedure Init(const source:tNetAddr);
43 procedure Done;
44 procedure TCTimeout;
45 end;
47 var Peers:^tAggr;
49 {Requests
50 Close();
51 GET(filehash:20; baseHi:word2; base:word4; limit:word4);
52 SEG(baseHi:word2; base:word4; limit:word4);
53 }{Responses
54 INFO(sizeHi:word2; size:word4; final:byte; seglen:word4);
55 FAIL(code:byte;...);
56 DONE();
59 procedure tPrv.DoGET(const fid:tfid; base,limit:LongWord);
60 var err:tmemorystream;
61 begin
62 if isOpen then oinfo.Close;
63 if Active then Stop; //opt
64 ch^.Ack;
65 oinfo.Open(fid);
66 {if not oinfo.final then begin
67 oinfo.rc:=200;
68 Close(oinfo.hnd);
69 end;}
70 if oinfo.rc>0 then begin
71 ch^.StreamInit(err,3);
72 err.WriteByte(upFAIL);
73 if oinfo.rc=1 then err.WriteByte(upErrNotFound)
74 else begin err.WriteByte(upErrIO); err.WriteByte(oinfo.rc) end;
75 ch^.Send(err);
76 end else begin
77 datafile:=oinfo.hnd;
78 isopen:=true;
79 DoSeg(base,limit);
80 end;
81 end;
83 procedure tPrv.DoSEG(base,limit:LongWord);
84 var err:tmemorystream;
85 begin
86 if isOpen then begin
87 ch^.StreamInit(err,12);
88 oinfo.SegSeek(base);
89 if oinfo.rc>0 then begin
90 err.WriteByte(upFAIL);
91 err.WriteByte(upErrIO);
92 err.WriteByte(oinfo.rc);
93 ch^.Send(err);
94 if Active then Stop;
95 end else begin
96 err.WriteByte(upINFO);
97 datafile:=oinfo.hnd;
98 err.WriteWord(0,2);
99 err.WriteWord(oinfo.length,4);
100 seglen:=limit;
101 if oinfo.seglen<seglen then seglen:=oinfo.seglen;
102 if oinfo.final
103 then err.WriteByte(1)
104 else err.WriteByte(0);
105 err.WriteWord(seglen,4);
106 ch^.Send(err);
107 if not Active then Start;
108 end end else begin
109 ch^.StreamInit(err,2);
110 err.WriteByte(upFAIL);
111 err.WriteByte(upErrSegNoGet);
112 ch^.Send(err);
113 end;
114 end;
116 procedure tPrv.Start;
117 begin
118 Assert(isOpen);
119 Assert(not Active);
120 Active:=true;
121 UnShedule(@IdleTimeout);
122 writeln('upmgr: Startig transfer');
123 if not assigned(aggr^.prv) then begin
124 next:=@self;
125 prev:=@self;
126 aggr^.prv:=@self;
127 aggr^.tcs.Start;
128 end else begin
129 next:=aggr^.prv^.next;
130 prev:=aggr^.prv;
131 prev^.next:=@self;
132 next^.prev:=@self;
133 end;
134 wcur:=weight;
135 end;
137 procedure tPrv.Stop;
138 begin
139 Assert(isOpen);
140 Assert(Active);
141 if prev<>@self then begin
142 prev^.next:=next;
143 next^.prev:=prev;
144 end else next:=nil;
145 if aggr^.prv=@self then aggr^.prv:=next;
146 active:=false;
147 Shedule(20000,@IdleTimeout);
148 writeln('upmgr: Stop');
149 end;
151 procedure tPrv.Cont;
152 var s:tMemoryStream;
153 var sz:LongWord;
154 var rs:LongWord;
155 var buf:array [1..2048] of byte;
156 begin
157 writeln('upmgr: CONT! ',chan);
158 Assert(Active and isOpen);
159 sz:=aggr^.tcs.MaxSize(sizeof(buf))-1;
160 if sz>SegLen then sz:=SegLen;
161 //s.Init(GetMem(sz),0,sz);
162 Assert((sz+1)<=sizeof(buf));
163 s.Init(@buf,0,sizeof(buf)); aggr^.tcs.WriteHeaders(s);
164 s.WriteByte(Chan);
165 Assert(sz<=s.WrBufLen);
166 BlockRead(datafile,s.WrBuf^,sz,rs); s.WrEnd(rs);
167 Assert(RS=sz);//todo
168 aggr^.tcs.Send(s);
169 //FreeMem(s.base,s.size);
170 SegLen:=SegLen-sz;
171 dec(wcur);
172 if SegLen=0 then begin
173 ch^.StreamInit(s,2);
174 s.WriteByte(upDONE);
175 ch^.Send(s);
176 Stop;
177 end else
178 if (wcur=0) then begin
179 wcur:=weight;
180 aggr^.prv:=next;
181 end;
182 end;
184 procedure tPrv.DoClose;
185 begin
186 if Active then Stop;
187 if isOpen then oinfo.Close;
188 isOpen:=false;
189 UnShedule(@IdleTimeout);
190 ch^.Ack;
191 ch^.Close;
192 aggr^.UnRef;
193 FreeMem(@self,sizeof(self));
194 end;
196 procedure tPrv.OnMsg(msg:tSMsg; data:boolean);
197 var op:byte;
198 var hash:tfid;
199 var base:LongWord;
200 var limit:LongWord;
201 var err:tmemorystream;
202 label malformed;
203 begin
204 if not data then exit; //todo
205 Assert(not(aggr^.rekt and active));
206 if aggr^.rekt then exit;
207 if msg.stream.RdBufLen<1 then goto malformed;
208 op:=msg.stream.ReadByte;
209 writeln('upmgr: opcode=',op,' sz=',msg.stream.RdBufLen);
210 case op of
211 upClose: DoClose;
212 upGET: begin
213 if msg.stream.RdBufLen<>30 then goto malformed;
214 msg.stream.Read(hash,20);
215 if msg.stream.ReadWord(2)>0 then goto malformed;
216 base:=msg.stream.ReadWord(4);
217 limit:=msg.stream.ReadWord(4);
218 DoGet(hash,base,limit);
219 end;
220 upSEG: begin
221 if msg.stream.RdBufLen<10 then goto malformed;
222 if msg.stream.ReadWord(2)>0 then goto malformed;
223 base:=msg.stream.ReadWord(4);
224 limit:=msg.stream.ReadWord(4);
225 DoSEG(base, limit);
226 end;
227 else goto malformed;
228 end;
229 exit; malformed:
230 ch^.StreamInit(err,2);
231 err.WriteByte(upFAIL);
232 err.WriteByte(upErrMalformed);
233 ch^.Send(err);
234 writeln('upmgr: malformed request stage=1');
235 end;
237 procedure tPrv.ChatTimeout(willwait:LongWord);
238 var wasactive:boolean;
239 begin
240 if WillWait<8000 then exit;
241 writeln('upmgr: Chat timeout');
242 wasactive:=active;
243 if Active then Stop;
244 if isOpen then oinfo.Close;
245 isOpen:=false;
246 ch^.Close;
247 ch:=nil;
248 if wasactive then IdleTimeout {else it is sheduled};
249 end;
250 procedure tPrv.IdleTimeout;
251 var err:tMemoryStream;
252 begin
253 if assigned(ch) then begin {chat is still not rekt}
254 if not active then writeln('upmgr: Idle timeout');
255 ch^.StreamInit(err,1);
256 err.WriteByte(upClose);
257 try ch^.Send(err);
258 except end;
259 ch^.Close;
260 end;
261 {it is idle timeout, but may be called from aggr it tc tiomes out}
262 if Active then Stop;
263 if isOpen then oinfo.Close; {may still be open}
264 UnShedule(@IdleTimeout);
265 aggr^.UnRef;
266 FreeMem(@self,sizeof(self));
267 end;
269 procedure tPrv.Init(ag:tAggr_ptr; var nchat:tChat; msg: tSMsg);
270 begin
271 ch:=@nchat;
272 ch^.Callback:=@OnMsg;
273 ch^.TMHook:=@ChatTimeout;
274 aggr:=ag;
275 next:=nil;
276 prev:=nil;
277 chan:=msg.stream.readbyte;
278 writeln('upmgr: prv init chan=',chan);
279 weight:=100;
280 wcur:=0;
281 isOpen:=false; Active:=false;
282 inc(aggr^.Cnt);
283 Shedule(5000,@IdleTimeout);
284 OnMsg(msg,true);
285 end;
287 procedure tAggr.Init(const source:tNetAddr);
288 begin
289 writeln('upmgr: init');
290 next:=Peers;
291 prev:=nil;
292 rekt:=false;
293 if assigned(Peers) then Peers^.prev:=@self;
294 Peers:=@self;
295 tcs.Init(source);
296 tcs.CanSend:=@Cont;
297 tcs.maxTimeout:=8;
298 tcs.OnTimeout:=@TCTimeout;
299 prv:=nil;
300 cnt:=0;
301 end;
303 procedure tAggr.TCTimeout;
304 var pprv:pointer;
305 begin
306 writeln('upmgr: TCTimeout');
307 while assigned(prv) do begin
308 assert(not rekt);
309 pprv:=prv;
310 prv^.IdleTimeout;
311 if rekt then exit;
312 Assert(pprv<>prv);
313 end;
314 Done;
315 end;
316 procedure tAggr.Cont;
317 begin
318 if not assigned(prv) then exit;
319 prv^.Cont;
320 end;
321 procedure tAggr.UnRef;
322 begin
323 Assert(cnt>0);
324 Dec(Cnt);
325 writeln('upmgr: aggr unrefd');
326 if cnt=0 then begin
327 Done;
328 FreeMem(@self,sizeof(self));
329 end;
330 end;
331 procedure tAggr.Done;
332 begin
333 assert(not rekt);
334 writeln('upmgr: aggr close');
335 rekt:=true;
336 tcs.Done;
337 if assigned(prev) then prev^.next:=next else Peers:=next;
338 if assigned(next) then next^.prev:=prev;
339 end;
341 function FindAggr({const} addr:tNetAddr): tAggr_ptr;
342 begin
343 result:=Peers;
344 while assigned(result) do begin
345 if assigned(result^.next) then assert(result^.next^.prev=result);
346 if result^.tcs.remote=addr then exit;
347 result:=result^.next;
348 end;
349 end;
351 procedure ChatHandler(var nchat:tChat; msg:tSMsg);
352 var ag:^tAggr;
353 var pr:^tPrv;
354 var s:tMemoryStream;
355 const cMax=16;
356 begin
357 writeln('upmgr: ChatHandler');
358 msg.stream.skip({the initcode}1);
359 if msg.stream.RdBufLen<2 then begin
360 writeln('upmgr: malformed init');
361 nchat.StreamInit(s,16);
362 s.WriteByte(upFAIL);
363 s.writebyte(upErrMalformed);
364 nchat.Send(s);
365 nchat.Close;
366 writeln('upmgr: malformed request stage=0');
367 exit end;
368 {first get the ag}
369 ag:=FindAggr(msg.source^);
370 if assigned(ag) then begin
371 {check}
372 if ag^.Cnt>=cMax then begin
373 nchat.StreamInit(s,16);
374 s.WriteByte(upFAIL);
375 s.WriteByte(upErrHiChan);
376 s.WriteByte(cMax);
377 s.WriteByte(ag^.Cnt);
378 nchat.Send(s);
379 nchat.Close;
380 exit end;
381 end else begin
382 New(ag);
383 ag^.init(msg.source^);
384 end;
385 New(pr);
386 pr^.Init(ag,nchat,msg);
387 end;
389 BEGIN
390 SetChatHandler(opcode.upFileServer,@ChatHandler);
391 END.