2 {Upload Manager for brodnetd}
5 USES Chat
,opcode
,ServerLoop
,MemStream
,NetAddr
,Store1
;
18 isOpen
,Active
:boolean;
20 oinfo
:tStoreObjectInfo
;
21 {procedure Init(ag:tAggr_ptr; var nchat:tChat; msg: tSMsg);
22 procedure OnMsg(msg:tSMsg; data:boolean);
24 procedure DoGET(const fid:tfid; base,limit:LongWord);
25 procedure DoSEG(base,limit:LongWord);
29 procedure ChatTimeout(willwait:LongWord);
34 crit
:tRtlCriticalSection
;
50 acks
:Word; {ack counter, for timeouts}
53 limRate
,limSize
:Single;
56 procedure Start(sprv
:tPrv_ptr
);
57 procedure Stop(sprv
:tPrv_ptr
);
59 procedure Init(const source
:tNetAddr
);
60 procedure CalcRates(rxRate
:Single);
62 procedure OnCont(msg
:tSMsg
);
63 procedure OnAck(msg
:tSMsg
);
68 procedure tAggr
.Init(const source
:tNetAddr
);
70 writeln('upmgr: init');
73 if assigned(Peers
) then Peers
^.prev
:=@self
;
80 limRate
:=20*1024*1024;
82 InitCriticalSection(thr
.crit
);
83 source
.ToSocket(thr
.remote
);
88 function ThrStart(p
:pointer):LongInt;
89 begin with tAggrThr(p
^) do begin
90 { ( read; ) send; wait; repeat}
93 procedure tAggr
.Start(sprv
:tPrv_ptr
);
95 if assigned(thr
.prv1
) then begin
96 sprv
^.next
:=thr
.prv1
^.next
;
98 sprv
^.prev
^.next
:=sprv
;
99 sprv
^.next
^.prev
:=sprv
;
108 thr
.thrid
:=BeginThread(@ThrStart
,@thr
{,var,stack});
110 sprv
^.wcur
:=sprv
^.weight
;
113 procedure tAggr
.CalcRates(rxRate
:Single);
119 EnterCriticalSection(thr
.crit
);
122 then begin txRate
:=rxRate
; thr
.Rate
:=rxRate
end
123 else txRate
:=thr
.MarkData
/((mNow
-thr
.MarkStart
)/1000{*SecsPerDay});
124 RateFill
:=rxRate
/txRate
;
125 write('speed: ',(rxRate
/1024):1:3,'kB/s (',(RateFill
*100):3:1,'% of ',txRate
/1024:1:3,'), ');
126 if RateFill
<0.85 then begin
128 if RateFill
<0.5 then thr
.size1
:=128;
131 repeat thr
.mark1
:=Random(256) until (thr
.mark1
<>pMark
);
132 thr
.MarkData
:=0; {include on-the-wire data if increasing}
134 if (txRate
/thr
.Rate
)>0.7 then begin
136 thr
.Rate
:=1+txRate
*(RateIF
+1);
137 if thr
.Rate
>limRate
then thr
.Rate
:=limRate
138 else RateIF
:=RateIF
*2;
139 if RateIF
>limRateIF
then RateIF
:=LimRateIF
;
141 if (thr
.Rate
/thr
.size1
)<4 then thr
.size1
:=thr
.Rate
/5;
142 if thr
.size1
<120 then thr
.size1
:=128;
144 thr
.size2
:=round(thr
.size1
*(1+SizeIF
));
145 thr
.mark2
:=Random(256);
146 LeaveCriticalSection(thr
.crit
);
149 procedure tAggr
.Periodic
;
151 Shedule(2000,@Periodic
);
157 Shedule(2000,@Periodic
);
160 procedure tAggr
.OnCont(msg
:tSMsg
);
164 op
:=msg
.stream
.readbyte
;
165 assert(op
=opcode
.tccont
);
166 rmark
:=msg
.stream
.readbyte
;
167 if rmark
=thr
.mark1
then begin
170 rrate
:=msg
.stream
.readword(4);
175 procedure tAggr
.OnAck(msg
:tSMsg
);
180 op
:=msg
.stream
.readbyte
;
181 assert(op
=opcode
.tceack
);
182 rmark
:=msg
.stream
.readbyte
;
183 rsize
:=msg
.stream
.readword(2);
184 if (rmark
<>thr
.mark2
)and(rmark
<>thr
.mark1
) then exit
;
187 if rsize
<thr
.size1
then exit
;
188 SizeIF
:=((rSize
/thr
.Size1
)-1)*2;
189 if SizeIF
>limSizeIF
then SizeIF
:=limSizeIF
;
190 EnterCriticalSection(thr
.crit
);
192 thr
.size2
:=round(thr
.size1
*(1+SizeIF
));
193 thr
.mark2
:=Random(256);
194 LeaveCriticalSection(thr
.crit
);
197 procedure tAggr
.Stop(sprv
:tPrv_ptr
);
201 procedure tAggr
.Free
;
206 writeln('upmgr: aggr close');
207 DoneCriticalSection(thr
.crit
);
208 if assigned(prev
) then prev
^.next
:=next
else Peers
:=next
;
209 if assigned(next
) then next
^.prev
:=prev
;
210 FreeMem(@self
,sizeof(self
));
212 writeln('upmgr: aggr unrefd');
215 function FindAggr({const} addr
:tNetAddr
): tAggr_ptr
;
218 while assigned(result
) do begin
219 if assigned(result
^.next
) then assert(result
^.next
^.prev
=result
);
220 if result
^.remote
=addr
then exit
;
221 result
:=result
^.next
;
225 procedure ChatHandler(var nchat
:tChat
; msg
:tSMsg
);
231 writeln('upmgr: ChatHandler');
232 msg
.stream
.skip({the initcode}1);
233 if msg
.stream
.RdBufLen
<2 then begin
234 writeln('upmgr: malformed init');
235 nchat
.StreamInit(s
,16);
237 s
.writebyte(upErrMalformed
);
240 writeln('upmgr: malformed request stage=0');
243 ag
:=FindAggr(msg
.source
^);
244 if assigned(ag
) then begin
246 if ag
^.refc
>=cMax
then begin
247 nchat
.StreamInit(s
,16);
249 s
.WriteByte(upErrHiChan
);
251 s
.WriteByte(ag
^.refc
);
257 ag
^.init(msg
.source
^);
260 //pr^.Init(ag,nchat,msg);
264 SetChatHandler(opcode
.upFileServer
,@ChatHandler
);