2 {Upload Manager for brodnetd}
5 USES Chat
,TC
,opcode
,ServerLoop
,MemStream
,NetAddr
;
19 isOpen
,Active
:boolean;
21 datafile
:file of byte;
22 procedure Init(ag
:tAggr_ptr
; var nchat
:tChat
; msg
: tSMsg
);
23 procedure OnMsg(msg
:tSMsg
; data
:boolean);
24 procedure IdleTimeout
;
25 procedure ChatTimeout(willwait
:LongWord
);
27 procedure DoGET(const fid
:tfid
; base
,limit
:LongWord
);
28 procedure DoSEG(base
,limit
:LongWord
);
41 procedure Init(const source
:tNetAddr
);
50 GET(filehash:20; baseHi:word2; base:word4; limit:word4);
51 SEG(baseHi:word2; base:word4; limit:word4);
53 INFO(sizeHi:word2; size:word4; final:byte; seglen:word4);
58 procedure tPrv
.DoGET(const fid
:tfid
; base
,limit
:LongWord
);
59 var err
:tmemorystream
;
60 var info
:tStoreObjectInfo
;
62 Assert(not(active
or isOpen
)); //todo
65 if not info
.final
then begin
69 if info
.rc
>0 then begin
70 ch
^.StreamInit(err
,3);
71 err
.WriteByte(upFAIL
);
72 if (info
.rc
=1)or(not info
.final
) then err
.WriteByte(upErrNotFound
)
73 else begin err
.WriteByte(upErrIO
); err
.WriteByte(info
.rc
) end;
76 ch
^.StreamInit(err
,12);
77 err
.WriteByte(upINFO
);
81 err
.WriteWord(info
.length
,4);
83 if info
.length
<seglen
then seglen
:=info
.length
;
85 err
.WriteWord(seglen
,4);
91 procedure tPrv
.DoSEG(base
,limit
:LongWord
);
100 UnShedule(@IdleTimeout
);
101 if not assigned(aggr
^.prv
) then begin
106 next
:=aggr
^.prv
^.next
;
118 if prev
<>@self
then begin
122 if aggr
^.prv
=@self
then aggr
^.prv
:=next
;
124 Shedule(20000,@IdleTimeout
);
131 var buf
:array [1..4096] of byte;
133 Assert(Active
and isOpen
);
135 if SegLen
>high(buf
) then sz
:=high(buf
) else sz
:=SegLen
;
136 sz
:=aggr
^.tcs
.MaxSize(sz
);
137 //s.Init(GetMem(sz),0,sz);
139 aggr
^.tcs
.WriteHeaders(s
);
140 Assert(s
.WrBufLen
=sz
); //really?
141 BlockRead(datafile
,s
.WrBuf
^,s
.WrBufLen
,rs
);
143 Assert(RS
=s
.WrBufLen
);//todo
145 //FreeMem(s.base,s.size);
154 procedure tPrv
.DoClose
;
157 if isOpen
then Close(datafile
);
159 UnShedule(@IdleTimeout
);
163 FreeMem(@self
,sizeof(self
));
166 procedure tPrv
.OnMsg(msg
:tSMsg
; data
:boolean);
171 var err
:tmemorystream
;
174 if not data
then exit
; //todo
175 Assert(not(aggr
^.rekt
and active
));
176 if aggr
^.rekt
then exit
;
177 if msg
.stream
.RdBufLen
<1 then goto malformed
;
178 op
:=msg
.stream
.ReadByte
;
182 if msg
.stream
.RdBufLen
<>30 then goto malformed
;
183 msg
.stream
.Read(hash
,20);
184 if msg
.stream
.ReadWord(2)>0 then goto malformed
;
185 base
:=msg
.stream
.ReadWord(4);
186 limit
:=msg
.stream
.ReadWord(4);
187 DoGet(hash
,base
,limit
);
190 if msg.stream.RdBufLen<10 then goto malformed;
191 if msg.stream.ReadWord(2)>0 then goto malformed;
192 base:=msg.stream.ReadWord(4);
193 limit:=msg.stream.ReadWord(4);
200 ch
^.StreamInit(err
,2);
201 err
.WriteByte(upFAIL
);
202 err
.WriteByte(upErrMalformed
);
206 procedure tPrv
.ChatTimeout(willwait
:LongWord
);
207 var wasactive
:boolean;
209 if WillWait
<30000 then exit
;
212 if isOpen
then Close(datafile
);
216 if wasactive
then IdleTimeout
{else it is sheduled};
218 procedure tPrv
.IdleTimeout
;
219 var err
:tMemoryStream
;
221 if assigned(ch
) then begin {chat is still not rekt}
222 ch
^.StreamInit(err
,1);
223 err
.WriteByte(upClose
);
228 if isOpen
then Close(datafile
);
230 FreeMem(@self
,sizeof(self
));
233 procedure tPrv
.Init(ag
:tAggr_ptr
; var nchat
:tChat
; msg
: tSMsg
);
236 ch
^.Callback
:=@OnMsg
;
237 ch
^.TMHook
:=@ChatTimeout
;
241 chan
:=msg
.stream
.readbyte
;
244 isOpen
:=false; Active
:=false;
246 Shedule(15000,@IdleTimeout
);
250 procedure tAggr
.Init(const source
:tNetAddr
);
252 writeln('upmgr: init');
256 if assigned(Peers
) then Peers
^.prev
:=@self
;
261 tcs
.OnTimeout
:=@TCTimeout
;
266 procedure tAggr
.TCTimeout
;
269 writeln('TCTimeout');
270 while assigned(prv
) do begin
277 procedure tAggr
.Cont
;
279 assert(assigned(prv
));
282 procedure tAggr
.UnRef
;
288 FreeMem(@self
,sizeof(self
));
291 procedure tAggr
.Done
;
294 writeln('upmgr: close');
297 if assigned(prev
) then prev
^.next
:=next
else Peers
:=next
;
298 if assigned(next
) then next
^.prev
:=prev
;
301 function FindAggr({const} addr
:tNetAddr
): tAggr_ptr
;
304 while assigned(result
) do begin
305 if result
^.tcs
.remote
=addr
then exit
;
306 assert(result
^.prev
=result
);
307 result
:=result
^.next
;
311 procedure ChatHandler(var nchat
:tChat
; msg
:tSMsg
);
317 if msg
.stream
.RdBufLen
<2 then begin
318 writeln('upmgr: malformed init');
319 nchat
.StreamInit(s
,16);
321 s
.writebyte(upErrMalformed
);
326 ag
:=FindAggr(msg
.source
^);
327 if assigned(ag
) then begin
329 if ag
^.Cnt
>=cMax
then begin
330 nchat
.StreamInit(s
,16);
332 s
.WriteByte(upErrHiChan
);
334 s
.WriteByte(ag
^.Cnt
);
340 ag
^.init(msg
.source
^);
343 pr
^.Init(ag
,nchat
,msg
);
347 SetChatHandler(opcode
.upFileServer
,@ChatHandler
);