2 {Upload Manager for brodnetd}
5 USES Chat
,TC
,opcode
,ServerLoop
,MemStream
,NetAddr
;
19 isOpen
,Active
:boolean;
21 oinfo
:tStoreObjectInfo
;
22 datafile
:file of byte;
23 procedure Init(ag
:tAggr_ptr
; var nchat
:tChat
; msg
: tSMsg
);
24 procedure OnMsg(msg
:tSMsg
; data
:boolean);
25 procedure IdleTimeout
;
26 procedure ChatTimeout(willwait
:LongWord
);
28 procedure DoGET(const fid
:tfid
; base
,limit
:LongWord
);
29 procedure DoSEG(base
,limit
:LongWord
);
42 procedure Init(const source
:tNetAddr
);
51 GET(filehash:20; baseHi:word2; base:word4; limit:word4);
52 SEG(baseHi:word2; base:word4; limit:word4);
54 INFO(sizeHi:word2; size:word4; final:byte; seglen:word4);
59 procedure tPrv
.DoGET(const fid
:tfid
; base
,limit
:LongWord
);
60 var err
:tmemorystream
;
62 if isOpen
then oinfo
.Close
;
63 if Active
then Stop
; //opt
66 {if not oinfo.final then begin
70 if oinfo
.rc
>0 then begin
71 ch
^.StreamInit(err
,3);
72 err
.WriteByte(upFAIL
);
73 if oinfo
.rc
=1 then err
.WriteByte(upErrNotFound
)
74 else begin err
.WriteByte(upErrIO
); err
.WriteByte(oinfo
.rc
) end;
83 procedure tPrv
.DoSEG(base
,limit
:LongWord
);
84 var err
:tmemorystream
;
87 ch
^.StreamInit(err
,12);
89 if oinfo
.rc
>0 then begin
90 err
.WriteByte(upFAIL
);
91 err
.WriteByte(upErrIO
);
92 err
.WriteByte(oinfo
.rc
);
96 err
.WriteByte(upINFO
);
99 err
.WriteWord(oinfo
.length
,4);
101 if oinfo
.seglen
<seglen
then seglen
:=oinfo
.seglen
;
103 then err
.WriteByte(1)
104 else err
.WriteByte(0);
105 err
.WriteWord(seglen
,4);
107 if not Active
then Start
;
109 ch
^.StreamInit(err
,2);
110 err
.WriteByte(upFAIL
);
111 err
.WriteByte(upErrSegNoGet
);
116 procedure tPrv
.Start
;
121 UnShedule(@IdleTimeout
);
122 if not assigned(aggr
^.prv
) then begin
127 next
:=aggr
^.prv
^.next
;
139 if prev
<>@self
then begin
143 if aggr
^.prv
=@self
then aggr
^.prv
:=next
;
145 Shedule(20000,@IdleTimeout
);
152 var buf
:array [1..2048] of byte;
154 Assert(Active
and isOpen
);
156 if SegLen
>high(buf
) then sz
:=high(buf
) else sz
:=SegLen
;
157 sz
:=aggr
^.tcs
.MaxSize(sz
);
158 //s.Init(GetMem(sz),0,sz);
160 aggr
^.tcs
.WriteHeaders(s
);
161 Assert(s
.WrBufLen
=sz
); //really?
162 BlockRead(datafile
,s
.WrBuf
^,s
.WrBufLen
,rs
);
164 Assert(RS
=s
.WrBufLen
);//todo
166 //FreeMem(s.base,s.size);
169 if SegLen
=0 then begin
175 if (wcur
=0) then begin
181 procedure tPrv
.DoClose
;
184 if isOpen
then oinfo
.Close
;
186 UnShedule(@IdleTimeout
);
190 FreeMem(@self
,sizeof(self
));
193 procedure tPrv
.OnMsg(msg
:tSMsg
; data
:boolean);
198 var err
:tmemorystream
;
201 if not data
then exit
; //todo
202 Assert(not(aggr
^.rekt
and active
));
203 if aggr
^.rekt
then exit
;
204 if msg
.stream
.RdBufLen
<1 then goto malformed
;
205 op
:=msg
.stream
.ReadByte
;
209 if msg
.stream
.RdBufLen
<>30 then goto malformed
;
210 msg
.stream
.Read(hash
,20);
211 if msg
.stream
.ReadWord(2)>0 then goto malformed
;
212 base
:=msg
.stream
.ReadWord(4);
213 limit
:=msg
.stream
.ReadWord(4);
214 DoGet(hash
,base
,limit
);
217 if msg
.stream
.RdBufLen
<10 then goto malformed
;
218 if msg
.stream
.ReadWord(2)>0 then goto malformed
;
219 base
:=msg
.stream
.ReadWord(4);
220 limit
:=msg
.stream
.ReadWord(4);
226 ch
^.StreamInit(err
,2);
227 err
.WriteByte(upFAIL
);
228 err
.WriteByte(upErrMalformed
);
232 procedure tPrv
.ChatTimeout(willwait
:LongWord
);
233 var wasactive
:boolean;
235 if WillWait
<30000 then exit
;
238 if isOpen
then oinfo
.Close
;
242 if wasactive
then IdleTimeout
{else it is sheduled};
244 procedure tPrv
.IdleTimeout
;
245 var err
:tMemoryStream
;
247 if assigned(ch
) then begin {chat is still not rekt}
248 ch
^.StreamInit(err
,1);
249 err
.WriteByte(upClose
);
253 Assert(not Active
); {is idle}
254 if isOpen
then oinfo
.Close
; {may still be open}
256 FreeMem(@self
,sizeof(self
));
259 procedure tPrv
.Init(ag
:tAggr_ptr
; var nchat
:tChat
; msg
: tSMsg
);
262 ch
^.Callback
:=@OnMsg
;
263 ch
^.TMHook
:=@ChatTimeout
;
267 chan
:=msg
.stream
.readbyte
;
270 isOpen
:=false; Active
:=false;
272 Shedule(5000,@IdleTimeout
);
276 procedure tAggr
.Init(const source
:tNetAddr
);
278 writeln('upmgr: init');
282 if assigned(Peers
) then Peers
^.prev
:=@self
;
287 tcs
.OnTimeout
:=@TCTimeout
;
292 procedure tAggr
.TCTimeout
;
295 writeln('TCTimeout');
296 while assigned(prv
) do begin
303 procedure tAggr
.Cont
;
305 assert(assigned(prv
));
308 procedure tAggr
.UnRef
;
314 FreeMem(@self
,sizeof(self
));
317 procedure tAggr
.Done
;
320 writeln('upmgr: close');
323 if assigned(prev
) then prev
^.next
:=next
else Peers
:=next
;
324 if assigned(next
) then next
^.prev
:=prev
;
327 function FindAggr({const} addr
:tNetAddr
): tAggr_ptr
;
330 while assigned(result
) do begin
331 if result
^.tcs
.remote
=addr
then exit
;
332 assert(result
^.prev
=result
);
333 result
:=result
^.next
;
337 procedure ChatHandler(var nchat
:tChat
; msg
:tSMsg
);
343 if msg
.stream
.RdBufLen
<2 then begin
344 writeln('upmgr: malformed init');
345 nchat
.StreamInit(s
,16);
347 s
.writebyte(upErrMalformed
);
352 ag
:=FindAggr(msg
.source
^);
353 if assigned(ag
) then begin
355 if ag
^.Cnt
>=cMax
then begin
356 nchat
.StreamInit(s
,16);
358 s
.WriteByte(upErrHiChan
);
360 s
.WriteByte(ag
^.Cnt
);
366 ag
^.init(msg
.source
^);
369 pr
^.Init(ag
,nchat
,msg
);
373 SetChatHandler(opcode
.upFileServer
,@ChatHandler
);