2 {Upload Manager for brodnetd}
5 USES Chat
,TC
,opcode
,ServerLoop
,MemStream
,NetAddr
;
19 isOpen
,Active
:boolean;
21 oinfo
:tStoreObjectInfo
;
22 procedure Init(ag
:tAggr_ptr
; var nchat
:tChat
; msg
: tSMsg
);
23 procedure OnMsg(msg
:tSMsg
; data
:boolean);
25 procedure DoGET(const fid
:tfid
; base
,limit
:LongWord
);
26 procedure DoSEG(base
,limit
:LongWord
);
30 procedure ChatTimeout(willwait
:LongWord
);
41 procedure Init(const source
:tNetAddr
);
50 GET(filehash:20; baseHi:word2; base:word4; limit:word4);
51 SEG(baseHi:word2; base:word4; limit:word4);
53 INFO(sizeHi:word2; size:word4; final:byte; seglen:word4);
58 procedure tPrv
.DoGET(const fid
:tfid
; base
,limit
:LongWord
);
59 var err
:tmemorystream
;
61 if isOpen
then oinfo
.Close
;
62 if Active
then Stop
; //opt
65 {if not oinfo.final then begin
69 if oinfo
.rc
>0 then begin
70 ch
^.StreamInit(err
,3);
71 err
.WriteByte(upFAIL
);
72 if oinfo
.rc
=1 then err
.WriteByte(upErrNotFound
)
73 else begin err
.WriteByte(upErrIO
); err
.WriteByte(oinfo
.rc
) end;
81 procedure tPrv
.DoSEG(base
,limit
:LongWord
);
82 var err
:tmemorystream
;
85 ch
^.StreamInit(err
,12);
87 if oinfo
.rc
>0 then begin
88 err
.WriteByte(upFAIL
);
89 err
.WriteByte(upErrIO
);
90 err
.WriteByte(oinfo
.rc
);
94 err
.WriteByte(upINFO
);
96 err
.WriteWord(oinfo
.length
,4);
98 if oinfo
.seglen
<seglen
then seglen
:=oinfo
.seglen
;
100 then err
.WriteByte(1)
101 else err
.WriteByte(0);
102 err
.WriteWord(seglen
,4);
104 if not Active
then Start
;
106 ch
^.StreamInit(err
,2);
107 err
.WriteByte(upFAIL
);
108 err
.WriteByte(upErrSegNoGet
);
113 procedure tPrv
.Start
;
119 writeln('upmgr: Startig transfer');
120 if not assigned(aggr
^.prv
) then begin
126 next
:=aggr
^.prv
^.next
;
138 if prev
<>@self
then begin
142 if aggr
^.prv
=@self
then aggr
^.prv
:=next
;
144 Shedule(20000,@Close
);
145 writeln('upmgr: Stop');
151 var buf
:array [1..4096] of byte;
153 //writeln('upmgr: CONT! ',chan);
154 Assert(Active
and isOpen
);
155 sz
:=aggr
^.tcs
.MaxSize(sizeof(buf
))-1;
156 if sz
>SegLen
then sz
:=SegLen
;
157 //s.Init(GetMem(sz),0,sz);
158 Assert((sz
+1)<=sizeof(buf
));
159 s
.Init(@buf
,0,sizeof(buf
)); aggr
^.tcs
.WriteHeaders(s
);
161 Assert(sz
<=s
.WrBufLen
);
162 oinfo
.ReadAhead(sz
,s
.WrBuf
); //todo
164 Assert(oinfo
.rc
=0); //todo
167 //FreeMem(s.base,s.size);
170 //FIXME: wait for ack of previous message!
171 if SegLen
=0 then begin
177 if (wcur
=0) then begin
183 procedure tPrv
.DoClose
;
189 procedure tPrv
.OnMsg(msg
:tSMsg
; data
:boolean);
194 var err
:tmemorystream
;
197 if not data
then exit
; //todo
198 Assert(not(aggr
^.rekt
and active
));
199 if aggr
^.rekt
then exit
;
200 if msg
.stream
.RdBufLen
<1 then goto malformed
;
201 op
:=msg
.stream
.ReadByte
;
202 writeln('upmgr: opcode=',op
,' sz=',msg
.stream
.RdBufLen
);
206 if msg
.stream
.RdBufLen
<>30 then goto malformed
;
207 msg
.stream
.Read(hash
,20);
208 if msg
.stream
.ReadWord(2)>0 then goto malformed
;
209 base
:=msg
.stream
.ReadWord(4);
210 limit
:=msg
.stream
.ReadWord(4);
211 DoGet(hash
,base
,limit
);
214 if msg
.stream
.RdBufLen
<10 then goto malformed
;
215 if msg
.stream
.ReadWord(2)>0 then goto malformed
;
216 base
:=msg
.stream
.ReadWord(4);
217 limit
:=msg
.stream
.ReadWord(4);
223 ch
^.StreamInit(err
,2);
224 err
.WriteByte(upFAIL
);
225 err
.WriteByte(upErrMalformed
);
227 writeln('upmgr: malformed request stage=1');
230 {######Timeouts and Shit#######}
231 procedure tPrv
.ChatTimeout(willwait
:LongWord
);
233 if WillWait
<8000 then exit
;
234 writeln('upmgr: Chat timeout');
237 procedure tPrv
.Close
;
238 var err
:tMemoryStream
;
240 assert(assigned(ch
));
241 ch
^.StreamInit(err
,1);
242 err
.WriteByte(upClose
);
243 try ch
^.Send(err
); except end;
245 if isOpen
then oinfo
.Close
;
251 FreeMem(@self
,sizeof(self
));
254 procedure tPrv
.Init(ag
:tAggr_ptr
; var nchat
:tChat
; msg
: tSMsg
);
257 ch
^.Callback
:=@OnMsg
;
258 ch
^.TMHook
:=@ChatTimeout
;
262 chan
:=msg
.stream
.readbyte
; {todo: except}
263 writeln('upmgr: prv init ',string(msg
.source
^),' chan=',chan
);
266 isOpen
:=false; Active
:=false;
268 Shedule(5000,@Close
);
272 procedure tAggr
.Init(const source
:tNetAddr
);
274 writeln('upmgr: init');
278 if assigned(Peers
) then Peers
^.prev
:=@self
;
283 tcs
.OnTimeout
:=@TCTimeout
;
284 tcs
.Limit
.Rate
:=20*1024*1024; {20MB}
289 procedure tAggr
.TCTimeout
;
292 writeln('upmgr: TCTimeout');
293 while assigned(prv
) do begin
302 procedure tAggr
.Cont
;
304 if not assigned(prv
) then exit
;
307 procedure tAggr
.UnRef
;
311 writeln('upmgr: aggr unrefd');
314 FreeMem(@self
,sizeof(self
));
317 procedure tAggr
.Done
;
320 writeln('upmgr: aggr close');
323 if assigned(prev
) then prev
^.next
:=next
else Peers
:=next
;
324 if assigned(next
) then next
^.prev
:=prev
;
327 function FindAggr({const} addr
:tNetAddr
): tAggr_ptr
;
330 while assigned(result
) do begin
331 if assigned(result
^.next
) then assert(result
^.next
^.prev
=result
);
332 if result
^.tcs
.remote
=addr
then exit
;
333 result
:=result
^.next
;
337 procedure ChatHandler(var nchat
:tChat
; msg
:tSMsg
);
343 writeln('upmgr: ChatHandler');
344 msg
.stream
.skip({the initcode}1);
345 if msg
.stream
.RdBufLen
<2 then begin
346 writeln('upmgr: malformed init');
347 nchat
.StreamInit(s
,16);
349 s
.writebyte(upErrMalformed
);
352 writeln('upmgr: malformed request stage=0');
355 ag
:=FindAggr(msg
.source
^);
356 if assigned(ag
) then begin
358 if ag
^.Cnt
>=cMax
then begin
359 nchat
.StreamInit(s
,16);
361 s
.WriteByte(upErrHiChan
);
363 s
.WriteByte(ag
^.Cnt
);
369 ag
^.init(msg
.source
^);
372 pr
^.Init(ag
,nchat
,msg
);
376 SetChatHandler(opcode
.upFileServer
,@ChatHandler
);