Writing and SegInfo to Store1.
[brdnet.git] / upmgr.pas
blobcdeb6271f96b6b2374f71b479285a52ab8b094e0
1 UNIT UPMGR;
2 {Upload Manager for brodnetd}
4 INTERFACE
5 USES Chat,TC,opcode,ServerLoop,MemStream,NetAddr;
7 IMPLEMENTATION
8 USES Store1;
10 type
11 tAggr_ptr=^tAggr;
12 tPrv_ptr=^tPrv;
13 tPrv=object
14 aggr:tAggr_ptr;
15 ch: ^tChat;
16 chan: byte;
17 next,prev: tPrv_ptr;
18 weight,wcur:Word;
19 isOpen,Active:boolean;
20 seglen:LongWord;
21 oinfo:tStoreObjectInfo;
22 procedure Init(ag:tAggr_ptr; var nchat:tChat; msg: tSMsg);
23 procedure OnMsg(msg:tSMsg; data:boolean);
24 procedure Cont;
25 procedure DoGET(const fid:tfid; base,limit:LongWord);
26 procedure DoSEG(base,limit:LongWord);
27 procedure DoClose;
28 procedure Start;
29 procedure Stop;
30 procedure ChatTimeout(willwait:LongWord);
31 procedure Close;
32 end;
33 tAggr=object
34 tcs: tTCS;
35 rekt:boolean;
36 next,prev: tAggr_ptr;
37 prv:^tPrv;
38 Cnt:Byte;
39 procedure UnRef;
40 procedure Cont;
41 procedure Init(const source:tNetAddr);
42 procedure TCTimeout;
43 procedure Done;
44 end;
46 var Peers:^tAggr;
48 {Requests
49 Close();
50 GET(filehash:20; baseHi:word2; base:word4; limit:word4);
51 SEG(baseHi:word2; base:word4; limit:word4);
52 }{Responses
53 INFO(sizeHi:word2; size:word4; final:byte; seglen:word4);
54 FAIL(code:byte;...);
55 DONE();
58 procedure tPrv.DoGET(const fid:tfid; base,limit:LongWord);
59 var err:tmemorystream;
60 begin
61 if isOpen then oinfo.Close;
62 if Active then Stop; //opt
63 ch^.Ack;
64 oinfo.Open(fid);
65 {if not oinfo.final then begin
66 oinfo.rc:=200;
67 Close(oinfo.hnd);
68 end;}
69 if oinfo.rc>0 then begin
70 ch^.StreamInit(err,3);
71 err.WriteByte(upFAIL);
72 if oinfo.rc=1 then err.WriteByte(upErrNotFound)
73 else begin err.WriteByte(upErrIO); err.WriteByte(oinfo.rc) end;
74 ch^.Send(err);
75 end else begin
76 isopen:=true;
77 DoSeg(base,limit);
78 end;
79 end;
81 procedure tPrv.DoSEG(base,limit:LongWord);
82 var err:tmemorystream;
83 begin
84 if isOpen then begin
85 ch^.StreamInit(err,12);
86 oinfo.SegSeek(base);
87 if oinfo.rc>0 then begin
88 err.WriteByte(upFAIL);
89 err.WriteByte(upErrIO);
90 err.WriteByte(oinfo.rc);
91 ch^.Send(err);
92 if Active then Stop;
93 end else begin
94 err.WriteByte(upINFO);
95 err.WriteWord(0,2);
96 err.WriteWord(oinfo.length,4);
97 seglen:=limit;
98 if oinfo.seglen<seglen then seglen:=oinfo.seglen;
99 if oinfo.final
100 then err.WriteByte(1)
101 else err.WriteByte(0);
102 err.WriteWord(seglen,4);
103 ch^.Send(err);
104 if not Active then Start;
105 end end else begin
106 ch^.StreamInit(err,2);
107 err.WriteByte(upFAIL);
108 err.WriteByte(upErrSegNoGet);
109 ch^.Send(err);
110 end;
111 end;
113 procedure tPrv.Start;
114 begin
115 Assert(isOpen);
116 Assert(not Active);
117 Active:=true;
118 UnShedule(@Close);
119 writeln('upmgr: Startig transfer');
120 if not assigned(aggr^.prv) then begin
121 next:=@self;
122 prev:=@self;
123 aggr^.prv:=@self;
124 aggr^.tcs.Start;
125 end else begin
126 next:=aggr^.prv^.next;
127 prev:=aggr^.prv;
128 prev^.next:=@self;
129 next^.prev:=@self;
130 end;
131 wcur:=weight;
132 end;
134 procedure tPrv.Stop;
135 begin
136 Assert(isOpen);
137 Assert(Active);
138 if prev<>@self then begin
139 prev^.next:=next;
140 next^.prev:=prev;
141 end else next:=nil;
142 if aggr^.prv=@self then aggr^.prv:=next;
143 active:=false;
144 Shedule(20000,@Close);
145 writeln('upmgr: Stop');
146 end;
148 procedure tPrv.Cont;
149 var s:tMemoryStream;
150 var sz:LongWord;
151 var buf:array [1..4096] of byte;
152 begin
153 //writeln('upmgr: CONT! ',chan);
154 Assert(Active and isOpen);
155 sz:=aggr^.tcs.MaxSize(sizeof(buf))-1;
156 if sz>SegLen then sz:=SegLen;
157 //s.Init(GetMem(sz),0,sz);
158 Assert((sz+1)<=sizeof(buf));
159 s.Init(@buf,0,sizeof(buf)); aggr^.tcs.WriteHeaders(s);
160 s.WriteByte(Chan);
161 Assert(sz<=s.WrBufLen);
162 oinfo.ReadAhead(sz,s.WrBuf); //todo
163 oinfo.WaitRead;
164 Assert(oinfo.rc=0); //todo
165 s.WrEnd(sz);
166 aggr^.tcs.Send(s);
167 //FreeMem(s.base,s.size);
168 SegLen:=SegLen-sz;
169 dec(wcur);
170 //FIXME: wait for ack of previous message!
171 if SegLen=0 then begin
172 ch^.StreamInit(s,2);
173 s.WriteByte(upDONE);
174 ch^.Send(s);
175 Stop;
176 end else
177 if (wcur=0) then begin
178 wcur:=weight;
179 aggr^.prv:=next;
180 end;
181 end;
183 procedure tPrv.DoClose;
184 begin
185 ch^.Ack;
186 Close;
187 end;
189 procedure tPrv.OnMsg(msg:tSMsg; data:boolean);
190 var op:byte;
191 var hash:tfid;
192 var base:LongWord;
193 var limit:LongWord;
194 var err:tmemorystream;
195 label malformed;
196 begin
197 if not data then exit; //todo
198 Assert(not(aggr^.rekt and active));
199 if aggr^.rekt then exit;
200 if msg.stream.RdBufLen<1 then goto malformed;
201 op:=msg.stream.ReadByte;
202 writeln('upmgr: opcode=',op,' sz=',msg.stream.RdBufLen);
203 case op of
204 upClose: DoClose;
205 upGET: begin
206 if msg.stream.RdBufLen<>30 then goto malformed;
207 msg.stream.Read(hash,20);
208 if msg.stream.ReadWord(2)>0 then goto malformed;
209 base:=msg.stream.ReadWord(4);
210 limit:=msg.stream.ReadWord(4);
211 DoGet(hash,base,limit);
212 end;
213 upSEG: begin
214 if msg.stream.RdBufLen<10 then goto malformed;
215 if msg.stream.ReadWord(2)>0 then goto malformed;
216 base:=msg.stream.ReadWord(4);
217 limit:=msg.stream.ReadWord(4);
218 DoSEG(base, limit);
219 end;
220 else goto malformed;
221 end;
222 exit; malformed:
223 ch^.StreamInit(err,2);
224 err.WriteByte(upFAIL);
225 err.WriteByte(upErrMalformed);
226 ch^.Send(err);
227 writeln('upmgr: malformed request stage=1');
228 end;
230 {######Timeouts and Shit#######}
231 procedure tPrv.ChatTimeout(willwait:LongWord);
232 begin
233 if WillWait<8000 then exit;
234 writeln('upmgr: Chat timeout');
235 Close;
236 end;
237 procedure tPrv.Close;
238 var err:tMemoryStream;
239 begin
240 assert(assigned(ch));
241 ch^.StreamInit(err,1);
242 err.WriteByte(upClose);
243 try ch^.Send(err); except end;
244 if Active then Stop;
245 if isOpen then oinfo.Close;
246 isOpen:=false;
247 ch^.Close;
248 ch:=nil;
249 UnShedule(@Close);
250 aggr^.UnRef;
251 FreeMem(@self,sizeof(self));
252 end;
254 procedure tPrv.Init(ag:tAggr_ptr; var nchat:tChat; msg: tSMsg);
255 begin
256 ch:=@nchat;
257 ch^.Callback:=@OnMsg;
258 ch^.TMHook:=@ChatTimeout;
259 aggr:=ag;
260 next:=nil;
261 prev:=nil;
262 chan:=msg.stream.readbyte; {todo: except}
263 writeln('upmgr: prv init ',string(msg.source^),' chan=',chan);
264 weight:=100;
265 wcur:=0;
266 isOpen:=false; Active:=false;
267 inc(aggr^.Cnt);
268 Shedule(5000,@Close);
269 OnMsg(msg,true);
270 end;
272 procedure tAggr.Init(const source:tNetAddr);
273 begin
274 writeln('upmgr: init');
275 next:=Peers;
276 prev:=nil;
277 rekt:=false;
278 if assigned(Peers) then Peers^.prev:=@self;
279 Peers:=@self;
280 tcs.Init(source);
281 tcs.CanSend:=@Cont;
282 tcs.maxTimeout:=8;
283 tcs.OnTimeout:=@TCTimeout;
284 tcs.Limit.Rate:=20*1024*1024; {20MB}
285 prv:=nil;
286 cnt:=0;
287 end;
289 procedure tAggr.TCTimeout;
290 var pprv:pointer;
291 begin
292 writeln('upmgr: TCTimeout');
293 while assigned(prv) do begin
294 assert(not rekt);
295 pprv:=prv;
296 prv^.Close;
297 if rekt then exit;
298 Assert(pprv<>prv);
299 end;
300 Done;
301 end;
302 procedure tAggr.Cont;
303 begin
304 if not assigned(prv) then exit;
305 prv^.Cont;
306 end;
307 procedure tAggr.UnRef;
308 begin
309 Assert(cnt>0);
310 Dec(Cnt);
311 writeln('upmgr: aggr unrefd');
312 if cnt=0 then begin
313 Done;
314 FreeMem(@self,sizeof(self));
315 end;
316 end;
317 procedure tAggr.Done;
318 begin
319 assert(not rekt);
320 writeln('upmgr: aggr close');
321 rekt:=true;
322 tcs.Done;
323 if assigned(prev) then prev^.next:=next else Peers:=next;
324 if assigned(next) then next^.prev:=prev;
325 end;
327 function FindAggr({const} addr:tNetAddr): tAggr_ptr;
328 begin
329 result:=Peers;
330 while assigned(result) do begin
331 if assigned(result^.next) then assert(result^.next^.prev=result);
332 if result^.tcs.remote=addr then exit;
333 result:=result^.next;
334 end;
335 end;
337 procedure ChatHandler(var nchat:tChat; msg:tSMsg);
338 var ag:^tAggr;
339 var pr:^tPrv;
340 var s:tMemoryStream;
341 const cMax=16;
342 begin
343 writeln('upmgr: ChatHandler');
344 msg.stream.skip({the initcode}1);
345 if msg.stream.RdBufLen<2 then begin
346 writeln('upmgr: malformed init');
347 nchat.StreamInit(s,16);
348 s.WriteByte(upFAIL);
349 s.writebyte(upErrMalformed);
350 nchat.Send(s);
351 nchat.Close;
352 writeln('upmgr: malformed request stage=0');
353 exit end;
354 {first get the ag}
355 ag:=FindAggr(msg.source^);
356 if assigned(ag) then begin
357 {check}
358 if ag^.Cnt>=cMax then begin
359 nchat.StreamInit(s,16);
360 s.WriteByte(upFAIL);
361 s.WriteByte(upErrHiChan);
362 s.WriteByte(cMax);
363 s.WriteByte(ag^.Cnt);
364 nchat.Send(s);
365 nchat.Close;
366 exit end;
367 end else begin
368 New(ag);
369 ag^.init(msg.source^);
370 end;
371 New(pr);
372 pr^.Init(ag,nchat,msg);
373 end;
375 BEGIN
376 SetChatHandler(opcode.upFileServer,@ChatHandler);
377 END.