WIP UpMgr.
[brdnet.git] / upmgr.pas
blob2ffd0f4580995bfb93133d06c689b0e77da5bb05
1 UNIT UPMGR;
2 {Upload Manager for brodnetd}
4 INTERFACE
5 USES Chat,TC,opcode,ServerLoop,MemStream,NetAddr;
7 IMPLEMENTATION
8 USES ZidanStore;
10 type
11 tAggr_ptr=^tAggr;
12 tPrv_ptr=^tPrv;
13 tPrv=object
14 aggr:tAggr_ptr;
15 ch: ^tChat;
16 chan: byte;
17 next,prev: tPrv_ptr;
18 weight,wcur:Word;
19 isOpen,Active:boolean;
20 seglen:LongWord;
21 datafile:file of byte;
22 procedure Init(ag:tAggr_ptr; var nchat:tChat; msg: tSMsg);
23 procedure OnMsg(msg:tSMsg; data:boolean);
24 procedure IdleTimeout;
25 procedure ChatTimeout(willwait:LongWord);
26 procedure Cont;
27 procedure DoGET(const fid:tfid; base,limit:LongWord);
28 procedure DoSEG(base,limit:LongWord);
29 procedure Start;
30 procedure Stop;
31 procedure DoClose;
32 end;
33 tAggr=object
34 tcs: tTCS;
35 rekt:boolean;
36 next,prev: tAggr_ptr;
37 prv:^tPrv;
38 Cnt:Byte;
39 procedure UnRef;
40 procedure Cont;
41 procedure Init(const source:tNetAddr);
42 procedure Done;
43 procedure TCTimeout;
44 end;
46 var Peers:^tAggr;
48 {Requests
49 Close();
50 GET(filehash:20; baseHi:word2; base:word4; limit:word4);
51 SEG(baseHi:word2; base:word4; limit:word4);
52 }{Responses
53 INFO(sizeHi:word2; size:word4; final:byte; seglen:word4);
54 FAIL(code:byte;...);
55 DONE();
58 procedure tPrv.DoGET(const fid:tfid; base,limit:LongWord);
59 var err:tmemorystream;
60 var info:tStoreObjectInfo;
61 begin
62 Assert(not(active or isOpen)); //todo
63 ch^.Ack;
64 info.Open(fid);
65 if not info.final then begin
66 info.rc:=200;
67 Close(info.hnd);
68 end;
69 if info.rc>0 then begin
70 ch^.StreamInit(err,3);
71 err.WriteByte(upFAIL);
72 if (info.rc=1)or(not info.final) then err.WriteByte(upErrNotFound)
73 else begin err.WriteByte(upErrIO); err.WriteByte(info.rc) end;
74 ch^.Send(err);
75 end else begin
76 ch^.StreamInit(err,12);
77 err.WriteByte(upINFO);
78 datafile:=info.hnd;
79 isopen:=true;
80 err.WriteWord(0,2);
81 err.WriteWord(info.length,4);
82 seglen:=limit;
83 if info.length<seglen then seglen:=info.length;
84 err.WriteByte(1);
85 err.WriteWord(seglen,4);
86 ch^.Send(err);
87 Start;
88 end;
89 end;
91 procedure tPrv.DoSEG(base,limit:LongWord);
92 begin
93 end;
95 procedure tPrv.Start;
96 begin
97 Assert(isOpen);
98 Assert(not Active);
99 Active:=true;
100 UnShedule(@IdleTimeout);
101 if not assigned(aggr^.prv) then begin
102 next:=@self;
103 prev:=@self;
104 aggr^.prv:=@self;
105 end else begin
106 next:=aggr^.prv^.next;
107 prev:=aggr^.prv;
108 prev^.next:=@self;
109 next^.prev:=@self;
110 end;
111 wcur:=weight;
112 end;
114 procedure tPrv.Stop;
115 begin
116 Assert(isOpen);
117 Assert(Active);
118 if prev<>@self then begin
119 prev^.next:=next;
120 next^.prev:=prev;
121 end else next:=nil;
122 if aggr^.prv=@self then aggr^.prv:=next;
123 active:=false;
124 Shedule(20000,@IdleTimeout);
125 end;
127 procedure tPrv.Cont;
128 var s:tMemoryStream;
129 var sz:LongWord;
130 var rs:LongWord;
131 var buf:array [1..4096] of byte;
132 begin
133 Assert(Active and isOpen);
134 sz:=SegLen;
135 if SegLen>high(buf) then sz:=high(buf) else sz:=SegLen;
136 sz:=aggr^.tcs.MaxSize(sz);
137 //s.Init(GetMem(sz),0,sz);
138 s.Init(@buf,0,sz);
139 aggr^.tcs.WriteHeaders(s);
140 Assert(s.WrBufLen=sz); //really?
141 BlockRead(datafile,s.WrBuf^,s.WrBufLen,rs);
142 s.WrEnd(rs);
143 Assert(RS=s.WrBufLen);//todo
144 aggr^.tcs.Send(s);
145 //FreeMem(s.base,s.size);
146 SegLen:=SegLen-sz;
147 dec(wcur);
148 if wcur=0 then begin
149 wcur:=weight;
150 aggr^.prv:=next;
151 end;
152 end;
154 procedure tPrv.DoClose;
155 begin
156 if Active then Stop;
157 if isOpen then Close(datafile);
158 isOpen:=false;
159 UnShedule(@IdleTimeout);
160 ch^.Ack;
161 ch^.Close;
162 aggr^.UnRef;
163 FreeMem(@self,sizeof(self));
164 end;
166 procedure tPrv.OnMsg(msg:tSMsg; data:boolean);
167 var op:byte;
168 var hash:tfid;
169 var base:LongWord;
170 var limit:LongWord;
171 var err:tmemorystream;
172 label malformed;
173 begin
174 if not data then exit; //todo
175 Assert(not(aggr^.rekt and active));
176 if aggr^.rekt then exit;
177 if msg.stream.RdBufLen<1 then goto malformed;
178 op:=msg.stream.ReadByte;
179 case op of
180 upClose: DoClose;
181 upGET: begin
182 if msg.stream.RdBufLen<>30 then goto malformed;
183 msg.stream.Read(hash,20);
184 if msg.stream.ReadWord(2)>0 then goto malformed;
185 base:=msg.stream.ReadWord(4);
186 limit:=msg.stream.ReadWord(4);
187 DoGet(hash,base,limit);
188 end;
189 { upSEG: begin
190 if msg.stream.RdBufLen<10 then goto malformed;
191 if msg.stream.ReadWord(2)>0 then goto malformed;
192 base:=msg.stream.ReadWord(4);
193 limit:=msg.stream.ReadWord(4);
194 DoSEG(base, limit);
195 end;
197 else goto malformed;
198 end;
199 exit; malformed:
200 ch^.StreamInit(err,2);
201 err.WriteByte(upFAIL);
202 err.WriteByte(upErrMalformed);
203 ch^.Send(err);
204 end;
206 procedure tPrv.ChatTimeout(willwait:LongWord);
207 var wasactive:boolean;
208 begin
209 if WillWait<30000 then exit;
210 wasactive:=active;
211 if Active then Stop;
212 if isOpen then Close(datafile);
213 isOpen:=false;
214 ch^.Close;
215 ch:=nil;
216 if wasactive then IdleTimeout {else it is sheduled};
217 end;
218 procedure tPrv.IdleTimeout;
219 var err:tMemoryStream;
220 begin
221 if assigned(ch) then begin {chat is still not rekt}
222 ch^.StreamInit(err,1);
223 err.WriteByte(upClose);
224 ch^.Send(err);
225 ch^.Close;
226 end;
227 if Active then Stop;
228 if isOpen then Close(datafile);
229 aggr^.UnRef;
230 FreeMem(@self,sizeof(self));
231 end;
233 procedure tPrv.Init(ag:tAggr_ptr; var nchat:tChat; msg: tSMsg);
234 begin
235 ch:=@nchat;
236 ch^.Callback:=@OnMsg;
237 ch^.TMHook:=@ChatTimeout;
238 aggr:=ag;
239 next:=nil;
240 prev:=nil;
241 chan:=msg.stream.readbyte;
242 weight:=100;
243 wcur:=0;
244 isOpen:=false; Active:=false;
245 inc(aggr^.Cnt);
246 Shedule(15000,@IdleTimeout);
247 OnMsg(msg,true);
248 end;
250 procedure tAggr.Init(const source:tNetAddr);
251 begin
252 writeln('upmgr: init');
253 next:=Peers;
254 prev:=nil;
255 rekt:=false;
256 if assigned(Peers) then Peers^.prev:=@self;
257 Peers:=@self;
258 tcs.Init(source);
259 tcs.CanSend:=@Cont;
260 tcs.maxTimeout:=8;
261 tcs.OnTimeout:=@TCTimeout;
262 prv:=nil;
263 cnt:=0;
264 end;
266 procedure tAggr.TCTimeout;
267 var pprv:pointer;
268 begin
269 writeln('TCTimeout');
270 while assigned(prv) do begin
271 pprv:=prv;
272 prv^.IdleTimeout;
273 Assert(pprv<>prv);
274 end;
275 Done;
276 end;
277 procedure tAggr.Cont;
278 begin
279 assert(assigned(prv));
280 prv^.Cont;
281 end;
282 procedure tAggr.UnRef;
283 begin
284 Assert(cnt>0);
285 Dec(Cnt);
286 if cnt=0 then begin
287 Done;
288 FreeMem(@self,sizeof(self));
289 end;
290 end;
291 procedure tAggr.Done;
292 begin
293 if rekt then exit;
294 writeln('upmgr: close');
295 rekt:=true;
296 tcs.Done;
297 if assigned(prev) then prev^.next:=next else Peers:=next;
298 if assigned(next) then next^.prev:=prev;
299 end;
301 function FindAggr({const} addr:tNetAddr): tAggr_ptr;
302 begin
303 result:=Peers;
304 while assigned(result) do begin
305 if result^.tcs.remote=addr then exit;
306 assert(result^.prev=result);
307 result:=result^.next;
308 end;
309 end;
311 procedure ChatHandler(var nchat:tChat; msg:tSMsg);
312 var ag:^tAggr;
313 var pr:^tPrv;
314 var s:tMemoryStream;
315 const cMax=16;
316 begin
317 if msg.stream.RdBufLen<2 then begin
318 writeln('upmgr: malformed init');
319 nchat.StreamInit(s,16);
320 s.WriteByte(upFAIL);
321 s.writebyte(upErrMalformed);
322 nchat.Send(s);
323 nchat.Close;
324 exit end;
325 {first get the ag}
326 ag:=FindAggr(msg.source^);
327 if assigned(ag) then begin
328 {check}
329 if ag^.Cnt>=cMax then begin
330 nchat.StreamInit(s,16);
331 s.WriteByte(upFAIL);
332 s.WriteByte(upErrHiChan);
333 s.WriteByte(cMax);
334 s.WriteByte(ag^.Cnt);
335 nchat.Send(s);
336 nchat.Close;
337 exit end;
338 end else begin
339 New(ag);
340 ag^.init(msg.source^);
341 end;
342 New(pr);
343 pr^.Init(ag,nchat,msg);
344 end;
346 BEGIN
347 SetChatHandler(opcode.upFileServer,@ChatHandler);
348 END.