UnShedule all duplicate entries.
[brdnet.git] / upmgr.pas
blob12359f4623f8c6b13627c23844f997dd07fa130b
1 UNIT UPMGR;
2 {Upload Manager for brodnetd}
4 INTERFACE
5 USES Chat,TC,opcode,ServerLoop,MemStream,NetAddr;
7 IMPLEMENTATION
8 USES ZidanStore;
10 type
11 tAggr_ptr=^tAggr;
12 tPrv_ptr=^tPrv;
13 tPrv=object
14 aggr:tAggr_ptr;
15 ch: ^tChat;
16 chan: byte;
17 next,prev: tPrv_ptr;
18 weight,wcur:Word;
19 isOpen,Active:boolean;
20 seglen:LongWord;
21 oinfo:tStoreObjectInfo;
22 datafile:file of byte;
23 procedure Init(ag:tAggr_ptr; var nchat:tChat; msg: tSMsg);
24 procedure OnMsg(msg:tSMsg; data:boolean);
25 procedure IdleTimeout;
26 procedure ChatTimeout(willwait:LongWord);
27 procedure Cont;
28 procedure DoGET(const fid:tfid; base,limit:LongWord);
29 procedure DoSEG(base,limit:LongWord);
30 procedure Start;
31 procedure Stop;
32 procedure DoClose;
33 end;
34 tAggr=object
35 tcs: tTCS;
36 rekt:boolean;
37 next,prev: tAggr_ptr;
38 prv:^tPrv;
39 Cnt:Byte;
40 procedure UnRef;
41 procedure Cont;
42 procedure Init(const source:tNetAddr);
43 procedure Done;
44 procedure TCTimeout;
45 end;
47 var Peers:^tAggr;
49 {Requests
50 Close();
51 GET(filehash:20; baseHi:word2; base:word4; limit:word4);
52 SEG(baseHi:word2; base:word4; limit:word4);
53 }{Responses
54 INFO(sizeHi:word2; size:word4; final:byte; seglen:word4);
55 FAIL(code:byte;...);
56 DONE();
59 procedure tPrv.DoGET(const fid:tfid; base,limit:LongWord);
60 var err:tmemorystream;
61 begin
62 if isOpen then oinfo.Close;
63 if Active then Stop; //opt
64 ch^.Ack;
65 oinfo.Open(fid);
66 {if not oinfo.final then begin
67 oinfo.rc:=200;
68 Close(oinfo.hnd);
69 end;}
70 if oinfo.rc>0 then begin
71 ch^.StreamInit(err,3);
72 err.WriteByte(upFAIL);
73 if oinfo.rc=1 then err.WriteByte(upErrNotFound)
74 else begin err.WriteByte(upErrIO); err.WriteByte(oinfo.rc) end;
75 ch^.Send(err);
76 end else begin
77 datafile:=oinfo.hnd;
78 isopen:=true;
79 DoSeg(base,limit);
80 end;
81 end;
83 procedure tPrv.DoSEG(base,limit:LongWord);
84 var err:tmemorystream;
85 begin
86 if isOpen then begin
87 ch^.StreamInit(err,12);
88 oinfo.SegSeek(base);
89 if oinfo.rc>0 then begin
90 err.WriteByte(upFAIL);
91 err.WriteByte(upErrIO);
92 err.WriteByte(oinfo.rc);
93 ch^.Send(err);
94 if Active then Stop;
95 end else begin
96 err.WriteByte(upINFO);
97 datafile:=oinfo.hnd;
98 err.WriteWord(0,2);
99 err.WriteWord(oinfo.length,4);
100 seglen:=limit;
101 if oinfo.seglen<seglen then seglen:=oinfo.seglen;
102 if oinfo.final
103 then err.WriteByte(1)
104 else err.WriteByte(0);
105 err.WriteWord(seglen,4);
106 ch^.Send(err);
107 if not Active then Start;
108 end end else begin
109 ch^.StreamInit(err,2);
110 err.WriteByte(upFAIL);
111 err.WriteByte(upErrSegNoGet);
112 ch^.Send(err);
113 end;
114 end;
116 procedure tPrv.Start;
117 begin
118 Assert(isOpen);
119 Assert(not Active);
120 Active:=true;
121 UnShedule(@IdleTimeout);
122 if not assigned(aggr^.prv) then begin
123 next:=@self;
124 prev:=@self;
125 aggr^.prv:=@self;
126 end else begin
127 next:=aggr^.prv^.next;
128 prev:=aggr^.prv;
129 prev^.next:=@self;
130 next^.prev:=@self;
131 end;
132 wcur:=weight;
133 end;
135 procedure tPrv.Stop;
136 begin
137 Assert(isOpen);
138 Assert(Active);
139 if prev<>@self then begin
140 prev^.next:=next;
141 next^.prev:=prev;
142 end else next:=nil;
143 if aggr^.prv=@self then aggr^.prv:=next;
144 active:=false;
145 Shedule(20000,@IdleTimeout);
146 end;
148 procedure tPrv.Cont;
149 var s:tMemoryStream;
150 var sz:LongWord;
151 var rs:LongWord;
152 var buf:array [1..2048] of byte;
153 begin
154 Assert(Active and isOpen);
155 sz:=SegLen;
156 if SegLen>high(buf) then sz:=high(buf) else sz:=SegLen;
157 sz:=aggr^.tcs.MaxSize(sz);
158 //s.Init(GetMem(sz),0,sz);
159 s.Init(@buf,0,sz);
160 aggr^.tcs.WriteHeaders(s);
161 Assert(s.WrBufLen=sz); //really?
162 BlockRead(datafile,s.WrBuf^,s.WrBufLen,rs);
163 s.WrEnd(rs);
164 Assert(RS=s.WrBufLen);//todo
165 aggr^.tcs.Send(s);
166 //FreeMem(s.base,s.size);
167 SegLen:=SegLen-sz;
168 dec(wcur);
169 if SegLen=0 then begin
170 ch^.StreamInit(s,2);
171 s.WriteByte(upDONE);
172 ch^.Send(s);
173 Stop;
174 end else
175 if (wcur=0) then begin
176 wcur:=weight;
177 aggr^.prv:=next;
178 end;
179 end;
181 procedure tPrv.DoClose;
182 begin
183 if Active then Stop;
184 if isOpen then oinfo.Close;
185 isOpen:=false;
186 UnShedule(@IdleTimeout);
187 ch^.Ack;
188 ch^.Close;
189 aggr^.UnRef;
190 FreeMem(@self,sizeof(self));
191 end;
193 procedure tPrv.OnMsg(msg:tSMsg; data:boolean);
194 var op:byte;
195 var hash:tfid;
196 var base:LongWord;
197 var limit:LongWord;
198 var err:tmemorystream;
199 label malformed;
200 begin
201 if not data then exit; //todo
202 Assert(not(aggr^.rekt and active));
203 if aggr^.rekt then exit;
204 if msg.stream.RdBufLen<1 then goto malformed;
205 op:=msg.stream.ReadByte;
206 case op of
207 upClose: DoClose;
208 upGET: begin
209 if msg.stream.RdBufLen<>30 then goto malformed;
210 msg.stream.Read(hash,20);
211 if msg.stream.ReadWord(2)>0 then goto malformed;
212 base:=msg.stream.ReadWord(4);
213 limit:=msg.stream.ReadWord(4);
214 DoGet(hash,base,limit);
215 end;
216 upSEG: begin
217 if msg.stream.RdBufLen<10 then goto malformed;
218 if msg.stream.ReadWord(2)>0 then goto malformed;
219 base:=msg.stream.ReadWord(4);
220 limit:=msg.stream.ReadWord(4);
221 DoSEG(base, limit);
222 end;
223 else goto malformed;
224 end;
225 exit; malformed:
226 ch^.StreamInit(err,2);
227 err.WriteByte(upFAIL);
228 err.WriteByte(upErrMalformed);
229 ch^.Send(err);
230 end;
232 procedure tPrv.ChatTimeout(willwait:LongWord);
233 var wasactive:boolean;
234 begin
235 if WillWait<30000 then exit;
236 wasactive:=active;
237 if Active then Stop;
238 if isOpen then oinfo.Close;
239 isOpen:=false;
240 ch^.Close;
241 ch:=nil;
242 if wasactive then IdleTimeout {else it is sheduled};
243 end;
244 procedure tPrv.IdleTimeout;
245 var err:tMemoryStream;
246 begin
247 if assigned(ch) then begin {chat is still not rekt}
248 ch^.StreamInit(err,1);
249 err.WriteByte(upClose);
250 ch^.Send(err);
251 ch^.Close;
252 end;
253 Assert(not Active); {is idle}
254 if isOpen then oinfo.Close; {may still be open}
255 aggr^.UnRef;
256 FreeMem(@self,sizeof(self));
257 end;
259 procedure tPrv.Init(ag:tAggr_ptr; var nchat:tChat; msg: tSMsg);
260 begin
261 ch:=@nchat;
262 ch^.Callback:=@OnMsg;
263 ch^.TMHook:=@ChatTimeout;
264 aggr:=ag;
265 next:=nil;
266 prev:=nil;
267 chan:=msg.stream.readbyte;
268 weight:=100;
269 wcur:=0;
270 isOpen:=false; Active:=false;
271 inc(aggr^.Cnt);
272 Shedule(5000,@IdleTimeout);
273 OnMsg(msg,true);
274 end;
276 procedure tAggr.Init(const source:tNetAddr);
277 begin
278 writeln('upmgr: init');
279 next:=Peers;
280 prev:=nil;
281 rekt:=false;
282 if assigned(Peers) then Peers^.prev:=@self;
283 Peers:=@self;
284 tcs.Init(source);
285 tcs.CanSend:=@Cont;
286 tcs.maxTimeout:=8;
287 tcs.OnTimeout:=@TCTimeout;
288 prv:=nil;
289 cnt:=0;
290 end;
292 procedure tAggr.TCTimeout;
293 var pprv:pointer;
294 begin
295 writeln('TCTimeout');
296 while assigned(prv) do begin
297 pprv:=prv;
298 prv^.IdleTimeout;
299 Assert(pprv<>prv);
300 end;
301 Done;
302 end;
303 procedure tAggr.Cont;
304 begin
305 assert(assigned(prv));
306 prv^.Cont;
307 end;
308 procedure tAggr.UnRef;
309 begin
310 Assert(cnt>0);
311 Dec(Cnt);
312 if cnt=0 then begin
313 Done;
314 FreeMem(@self,sizeof(self));
315 end;
316 end;
317 procedure tAggr.Done;
318 begin
319 if rekt then exit;
320 writeln('upmgr: close');
321 rekt:=true;
322 tcs.Done;
323 if assigned(prev) then prev^.next:=next else Peers:=next;
324 if assigned(next) then next^.prev:=prev;
325 end;
327 function FindAggr({const} addr:tNetAddr): tAggr_ptr;
328 begin
329 result:=Peers;
330 while assigned(result) do begin
331 if result^.tcs.remote=addr then exit;
332 assert(result^.prev=result);
333 result:=result^.next;
334 end;
335 end;
337 procedure ChatHandler(var nchat:tChat; msg:tSMsg);
338 var ag:^tAggr;
339 var pr:^tPrv;
340 var s:tMemoryStream;
341 const cMax=16;
342 begin
343 if msg.stream.RdBufLen<2 then begin
344 writeln('upmgr: malformed init');
345 nchat.StreamInit(s,16);
346 s.WriteByte(upFAIL);
347 s.writebyte(upErrMalformed);
348 nchat.Send(s);
349 nchat.Close;
350 exit end;
351 {first get the ag}
352 ag:=FindAggr(msg.source^);
353 if assigned(ag) then begin
354 {check}
355 if ag^.Cnt>=cMax then begin
356 nchat.StreamInit(s,16);
357 s.WriteByte(upFAIL);
358 s.WriteByte(upErrHiChan);
359 s.WriteByte(cMax);
360 s.WriteByte(ag^.Cnt);
361 nchat.Send(s);
362 nchat.Close;
363 exit end;
364 end else begin
365 New(ag);
366 ag^.init(msg.source^);
367 end;
368 New(pr);
369 pr^.Init(ag,nchat,msg);
370 end;
372 BEGIN
373 SetChatHandler(opcode.upFileServer,@ChatHandler);
374 END.