Few fixes in support units.
[brdnet.git] / upmgr.pas
blob784295197c958de43b278cf8d6da050d1863484f
1 UNIT UPMGR;
2 {Upload Manager for brodnetd}
4 INTERFACE
5 USES Chat,TC,opcode,ServerLoop,MemStream,NetAddr;
7 IMPLEMENTATION
8 USES Store1;
10 type
11 tAggr_ptr=^tAggr;
12 tPrv_ptr=^tPrv;
13 tPrv=object
14 aggr:tAggr_ptr;
15 ch: ^tChat;
16 chan: byte;
17 next,prev: tPrv_ptr;
18 weight,wcur:Word;
19 isOpen,Active:boolean;
20 seglen:LongWord;
21 oinfo:tStoreObjectInfo;
22 procedure Init(ag:tAggr_ptr; var nchat:tChat; msg: tSMsg);
23 procedure OnMsg(msg:tSMsg; data:boolean);
24 procedure Cont;
25 procedure DoGET(const fid:tfid; base,limit:LongWord);
26 procedure DoSEG(base,limit:LongWord);
27 procedure DoClose;
28 procedure Start;
29 procedure Stop;
30 procedure ChatTimeout(willwait:LongWord);
31 procedure Close;
32 end;
33 tAggr=object
34 tcs: tTCS;
35 rekt:boolean;
36 next,prev: tAggr_ptr;
37 prv:^tPrv;
38 Cnt:Byte;
39 procedure UnRef;
40 procedure Cont;
41 procedure Init(const source:tNetAddr);
42 procedure TCTimeout;
43 procedure Done;
44 end;
46 var Peers:^tAggr;
48 {Requests
49 Close();
50 GET(filehash:20; baseHi:word2; base:word4; limit:word4);
51 SEG(baseHi:word2; base:word4; limit:word4);
52 }{Responses
53 INFO(sizeHi:word2; size:word4; final:byte; seglen:word4);
54 FAIL(code:byte;...);
55 DONE();
58 procedure tPrv.DoGET(const fid:tfid; base,limit:LongWord);
59 var err:tmemorystream;
60 begin
61 if isOpen then oinfo.Close;
62 if Active then Stop; //opt
63 ch^.Ack;
64 oinfo.Open(fid);
65 {if not oinfo.final then begin
66 oinfo.rc:=200;
67 Close(oinfo.hnd);
68 end;}
69 if oinfo.rc>0 then begin
70 ch^.StreamInit(err,3);
71 err.WriteByte(upFAIL);
72 if oinfo.rc=1 then err.WriteByte(upErrNotFound)
73 else begin err.WriteByte(upErrIO); err.WriteByte(oinfo.rc) end;
74 ch^.Send(err);
75 end else begin
76 isopen:=true;
77 DoSeg(base,limit);
78 end;
79 end;
81 procedure tPrv.DoSEG(base,limit:LongWord);
82 var err:tmemorystream;
83 begin
84 if isOpen then begin
85 ch^.StreamInit(err,12);
86 oinfo.SegSeek(base);
87 if oinfo.rc>0 then begin
88 err.WriteByte(upFAIL);
89 err.WriteByte(upErrIO);
90 err.WriteByte(oinfo.rc);
91 ch^.Send(err);
92 if Active then Stop;
93 end else begin
94 err.WriteByte(upINFO);
95 err.WriteWord(0,2);
96 err.WriteWord(oinfo.length,4);
97 seglen:=limit;
98 if oinfo.seglen<seglen then seglen:=oinfo.seglen;
99 if oinfo.final
100 then err.WriteByte(1)
101 else err.WriteByte(0);
102 err.WriteWord(seglen,4);
103 ch^.Send(err);
104 if not Active then Start;
105 end end else begin
106 ch^.StreamInit(err,2);
107 err.WriteByte(upFAIL);
108 err.WriteByte(upErrSegNoGet);
109 ch^.Send(err);
110 end;
111 end;
113 procedure tPrv.Start;
114 begin
115 Assert(isOpen);
116 Assert(not Active);
117 Active:=true;
118 UnShedule(@Close);
119 writeln('upmgr: Startig transfer');
120 if not assigned(aggr^.prv) then begin
121 next:=@self;
122 prev:=@self;
123 aggr^.prv:=@self;
124 aggr^.tcs.Start;
125 end else begin
126 next:=aggr^.prv^.next;
127 prev:=aggr^.prv;
128 prev^.next:=@self;
129 next^.prev:=@self;
130 end;
131 wcur:=weight;
132 end;
134 procedure tPrv.Stop;
135 begin
136 Assert(isOpen);
137 Assert(Active);
138 if prev<>@self then begin
139 prev^.next:=next;
140 next^.prev:=prev;
141 end else next:=nil;
142 if aggr^.prv=@self then aggr^.prv:=next;
143 active:=false;
144 Shedule(20000,@Close);
145 writeln('upmgr: Stop');
146 end;
148 procedure tPrv.Cont;
149 var s:tMemoryStream;
150 var sz:LongWord;
151 var buf:array [1..2048] of byte;
152 begin
153 //writeln('upmgr: CONT! ',chan);
154 Assert(Active and isOpen);
155 sz:=aggr^.tcs.MaxSize(sizeof(buf))-1;
156 if sz>SegLen then sz:=SegLen;
157 //s.Init(GetMem(sz),0,sz);
158 Assert((sz+1)<=sizeof(buf));
159 s.Init(@buf,0,sizeof(buf)); aggr^.tcs.WriteHeaders(s);
160 s.WriteByte(Chan);
161 Assert(sz<=s.WrBufLen);
162 oinfo.ReadAhead(sz,s.WrBuf); //todo
163 oinfo.WaitRead;
164 Assert(oinfo.rc=0); //todo
165 s.WrEnd(sz);
166 aggr^.tcs.Send(s);
167 //FreeMem(s.base,s.size);
168 SegLen:=SegLen-sz;
169 dec(wcur);
170 if SegLen=0 then begin
171 ch^.StreamInit(s,2);
172 s.WriteByte(upDONE);
173 ch^.Send(s);
174 Stop;
175 end else
176 if (wcur=0) then begin
177 wcur:=weight;
178 aggr^.prv:=next;
179 end;
180 end;
182 procedure tPrv.DoClose;
183 begin
184 ch^.Ack;
185 Close;
186 end;
188 procedure tPrv.OnMsg(msg:tSMsg; data:boolean);
189 var op:byte;
190 var hash:tfid;
191 var base:LongWord;
192 var limit:LongWord;
193 var err:tmemorystream;
194 label malformed;
195 begin
196 if not data then exit; //todo
197 Assert(not(aggr^.rekt and active));
198 if aggr^.rekt then exit;
199 if msg.stream.RdBufLen<1 then goto malformed;
200 op:=msg.stream.ReadByte;
201 writeln('upmgr: opcode=',op,' sz=',msg.stream.RdBufLen);
202 case op of
203 upClose: DoClose;
204 upGET: begin
205 if msg.stream.RdBufLen<>30 then goto malformed;
206 msg.stream.Read(hash,20);
207 if msg.stream.ReadWord(2)>0 then goto malformed;
208 base:=msg.stream.ReadWord(4);
209 limit:=msg.stream.ReadWord(4);
210 DoGet(hash,base,limit);
211 end;
212 upSEG: begin
213 if msg.stream.RdBufLen<10 then goto malformed;
214 if msg.stream.ReadWord(2)>0 then goto malformed;
215 base:=msg.stream.ReadWord(4);
216 limit:=msg.stream.ReadWord(4);
217 DoSEG(base, limit);
218 end;
219 else goto malformed;
220 end;
221 exit; malformed:
222 ch^.StreamInit(err,2);
223 err.WriteByte(upFAIL);
224 err.WriteByte(upErrMalformed);
225 ch^.Send(err);
226 writeln('upmgr: malformed request stage=1');
227 end;
229 {######Timeouts and Shit#######}
230 procedure tPrv.ChatTimeout(willwait:LongWord);
231 begin
232 if WillWait<8000 then exit;
233 writeln('upmgr: Chat timeout');
234 Close;
235 end;
236 procedure tPrv.Close;
237 var err:tMemoryStream;
238 begin
239 assert(assigned(ch));
240 ch^.StreamInit(err,1);
241 err.WriteByte(upClose);
242 try ch^.Send(err); except end;
243 if Active then Stop;
244 if isOpen then oinfo.Close;
245 isOpen:=false;
246 ch^.Close;
247 ch:=nil;
248 UnShedule(@Close);
249 aggr^.UnRef;
250 FreeMem(@self,sizeof(self));
251 end;
253 procedure tPrv.Init(ag:tAggr_ptr; var nchat:tChat; msg: tSMsg);
254 begin
255 ch:=@nchat;
256 ch^.Callback:=@OnMsg;
257 ch^.TMHook:=@ChatTimeout;
258 aggr:=ag;
259 next:=nil;
260 prev:=nil;
261 chan:=msg.stream.readbyte;
262 writeln('upmgr: prv init chan=',chan);
263 weight:=100;
264 wcur:=0;
265 isOpen:=false; Active:=false;
266 inc(aggr^.Cnt);
267 Shedule(5000,@Close);
268 OnMsg(msg,true);
269 end;
271 procedure tAggr.Init(const source:tNetAddr);
272 begin
273 writeln('upmgr: init');
274 next:=Peers;
275 prev:=nil;
276 rekt:=false;
277 if assigned(Peers) then Peers^.prev:=@self;
278 Peers:=@self;
279 tcs.Init(source);
280 tcs.CanSend:=@Cont;
281 tcs.maxTimeout:=8;
282 tcs.OnTimeout:=@TCTimeout;
283 prv:=nil;
284 cnt:=0;
285 end;
287 procedure tAggr.TCTimeout;
288 var pprv:pointer;
289 begin
290 writeln('upmgr: TCTimeout');
291 while assigned(prv) do begin
292 assert(not rekt);
293 pprv:=prv;
294 prv^.Close;
295 if rekt then exit;
296 Assert(pprv<>prv);
297 end;
298 Done;
299 end;
300 procedure tAggr.Cont;
301 begin
302 if not assigned(prv) then exit;
303 prv^.Cont;
304 end;
305 procedure tAggr.UnRef;
306 begin
307 Assert(cnt>0);
308 Dec(Cnt);
309 writeln('upmgr: aggr unrefd');
310 if cnt=0 then begin
311 Done;
312 FreeMem(@self,sizeof(self));
313 end;
314 end;
315 procedure tAggr.Done;
316 begin
317 assert(not rekt);
318 writeln('upmgr: aggr close');
319 rekt:=true;
320 tcs.Done;
321 if assigned(prev) then prev^.next:=next else Peers:=next;
322 if assigned(next) then next^.prev:=prev;
323 end;
325 function FindAggr({const} addr:tNetAddr): tAggr_ptr;
326 begin
327 result:=Peers;
328 while assigned(result) do begin
329 if assigned(result^.next) then assert(result^.next^.prev=result);
330 if result^.tcs.remote=addr then exit;
331 result:=result^.next;
332 end;
333 end;
335 procedure ChatHandler(var nchat:tChat; msg:tSMsg);
336 var ag:^tAggr;
337 var pr:^tPrv;
338 var s:tMemoryStream;
339 const cMax=16;
340 begin
341 writeln('upmgr: ChatHandler');
342 msg.stream.skip({the initcode}1);
343 if msg.stream.RdBufLen<2 then begin
344 writeln('upmgr: malformed init');
345 nchat.StreamInit(s,16);
346 s.WriteByte(upFAIL);
347 s.writebyte(upErrMalformed);
348 nchat.Send(s);
349 nchat.Close;
350 writeln('upmgr: malformed request stage=0');
351 exit end;
352 {first get the ag}
353 ag:=FindAggr(msg.source^);
354 if assigned(ag) then begin
355 {check}
356 if ag^.Cnt>=cMax then begin
357 nchat.StreamInit(s,16);
358 s.WriteByte(upFAIL);
359 s.WriteByte(upErrHiChan);
360 s.WriteByte(cMax);
361 s.WriteByte(ag^.Cnt);
362 nchat.Send(s);
363 nchat.Close;
364 exit end;
365 end else begin
366 New(ag);
367 ag^.init(msg.source^);
368 end;
369 New(pr);
370 pr^.Init(ag,nchat,msg);
371 end;
373 BEGIN
374 SetChatHandler(opcode.upFileServer,@ChatHandler);
375 END.