Network constants for file sharing
[brdnet.git] / Upload.pas
blobf6e008b97c9ed41cf26bc3988c216b6bad92a8d4
1 UNIT Upload;
2 {Upload Manager for brodnetd}
4 INTERFACE
5 USES Chat,opcode,ServerLoop,MemStream,NetAddr,Store1;
7 IMPLEMENTATION
9 type
10 tAggr_ptr=^tAggr;
11 tPrv_ptr=^tPrv;
12 tPrv=object
13 aggr:tAggr_ptr;
14 ch: ^tChat;
15 chan: byte;
16 next,prev: tPrv_ptr;
17 weight,wcur:Word;
18 isOpen,Active:boolean;
19 seglen:LongWord;
20 oinfo:tStoreObjectInfo;
21 {procedure Init(ag:tAggr_ptr; var nchat:tChat; msg: tSMsg);
22 procedure OnMsg(msg:tSMsg; data:boolean);
23 procedure Cont;
24 procedure DoGET(const fid:tfid; base,limit:LongWord);
25 procedure DoSEG(base,limit:LongWord);
26 procedure DoClose;
27 procedure Start;
28 procedure Stop;
29 procedure ChatTimeout(willwait:LongWord);
30 procedure Close;}
31 end;
32 tAggrThr=object
33 thrid:tThreadID;
34 crit:tRtlCriticalSection;
35 stop:ByteBool;
36 remote:tSockAddrL;
37 prv1:^tPrv;
38 size1,size2:Word;
39 mark1,mark2:Byte;
40 rate:Single;
41 burst:byte;
42 MarkData:LongWord;
43 MarkStart:tMTime;
44 end;
45 tAggr=object
46 thr:tAggrThr;
47 remote:tNetAddr;
48 refc:byte;
50 acks:Word; {ack counter, for timeouts}
51 timeout:word;
52 rateIF,sizeIF,
53 limRate,limSize:Single;
55 next,prev: tAggr_ptr;
56 procedure Start(sprv:tPrv_ptr);
57 procedure Stop(sprv:tPrv_ptr);
58 procedure Free;
59 procedure Init(const source:tNetAddr);
60 procedure CalcRates(rxRate:Single);
61 procedure Periodic;
62 procedure OnCont(msg:tSMsg);
63 procedure OnAck(msg:tSMsg);
64 end;
66 var Peers:^tAggr;
68 procedure tAggr.Init(const source:tNetAddr);
69 begin
70 writeln('upmgr: init');
71 next:=Peers;
72 prev:=nil;
73 if assigned(Peers) then Peers^.prev:=@self;
74 Peers:=@self;
75 refc:=0;
76 acks:=0;
77 timeout:=0;
78 rateIF:=1;
79 sizeIF:=1;
80 limRate:=20*1024*1024;
81 limSize:=4096;
82 InitCriticalSection(thr.crit);
83 source.ToSocket(thr.remote);
84 remote:=source;
85 thr.prv1:=nil;
86 end;
88 function ThrStart(p:pointer):LongInt;
89 begin with tAggrThr(p^) do begin
90 { ( read; ) send; wait; repeat}
91 end end;
93 procedure tAggr.Start(sprv:tPrv_ptr);
94 begin
95 if assigned(thr.prv1) then begin
96 sprv^.next:=thr.prv1^.next;
97 sprv^.prev:=thr.prv1;
98 sprv^.prev^.next:=sprv;
99 sprv^.next^.prev:=sprv;
100 end else begin
101 sprv^.next:=sprv;
102 sprv^.prev:=sprv;
103 thr.prv1:=sprv;
104 thr.stop:=false;
105 thr.MarkData:=0;
106 thr.MarkStart:=mNow;
107 CalcRates(20480);
108 thr.thrid:=BeginThread(@ThrStart,@thr{,var,stack});
109 end;
110 sprv^.wcur:=sprv^.weight;
111 end;
113 procedure tAggr.CalcRates(rxRate:Single);
114 var txRate:Single;
115 var RateFill:Single;
116 var pMark:byte;
117 const limRateIF=3;
118 begin
119 EnterCriticalSection(thr.crit);
120 pMark:=thr.mark1;
121 if thr.MarkStart=0
122 then begin txRate:=rxRate; thr.Rate:=rxRate end
123 else txRate:=thr.MarkData/((mNow-thr.MarkStart)/1000{*SecsPerDay});
124 RateFill:=rxRate/txRate;
125 write('speed: ',(rxRate/1024):1:3,'kB/s (',(RateFill*100):3:1,'% of ',txRate/1024:1:3,'), ');
126 if RateFill<0.85 then begin
127 writeln('limit');
128 if RateFill<0.5 then thr.size1:=128;
129 thr.Rate:=rxRate;
130 RateIF:=RateIF/2;
131 repeat thr.mark1:=Random(256) until (thr.mark1<>pMark);
132 thr.MarkData:=0; {include on-the-wire data if increasing}
133 end else
134 if (txRate/thr.Rate)>0.7 then begin
135 writeln('pass');
136 thr.Rate:=1+txRate*(RateIF+1);
137 if thr.Rate>limRate then thr.Rate:=limRate
138 else RateIF:=RateIF*2;
139 if RateIF>limRateIF then RateIF:=LimRateIF;
140 end;
141 if (thr.Rate/thr.size1)<4 then thr.size1:=thr.Rate/5;
142 if thr.size1<120 then thr.size1:=128;
143 sizeIF:=sizeIF/2;
144 thr.size2:=round(thr.size1*(1+SizeIF));
145 thr.mark2:=Random(256);
146 LeaveCriticalSection(thr.crit);
147 end;
149 procedure tAggr.Periodic;
150 begin
151 Shedule(2000,@Periodic);
152 if acks=0 then begin
153 inc(Timeout);
154 CalcRates(512);
155 end;
156 acks:=0;
157 Shedule(2000,@Periodic);
158 end;
160 procedure tAggr.OnCont(msg:tSMsg);
161 var op,rmark:byte;
162 var rRate:LongWord;
163 begin
164 op:=msg.stream.readbyte;
165 assert(op=opcode.tccont);
166 rmark:=msg.stream.readbyte;
167 if rmark=thr.mark1 then begin
168 inc(acks);
169 timeout:=0;
170 rrate:=msg.stream.readword(4);
171 CalcRates(rRate*64);
172 end;
173 end;
175 procedure tAggr.OnAck(msg:tSMsg);
176 var op,rmark:byte;
177 var rSize:LongWord;
178 const limSizeIF=1;
179 begin
180 op:=msg.stream.readbyte;
181 assert(op=opcode.tceack);
182 rmark:=msg.stream.readbyte;
183 rsize:=msg.stream.readword(2);
184 if (rmark<>thr.mark2)and(rmark<>thr.mark1) then exit;
185 inc(acks);
186 Timeout:=0;
187 if rsize<thr.size1 then exit;
188 SizeIF:=((rSize/thr.Size1)-1)*2;
189 if SizeIF>limSizeIF then SizeIF:=limSizeIF;
190 EnterCriticalSection(thr.crit);
191 thr.size1:=rSize;
192 thr.size2:=round(thr.size1*(1+SizeIF));
193 thr.mark2:=Random(256);
194 LeaveCriticalSection(thr.crit);
195 end;
197 procedure tAggr.Stop(sprv:tPrv_ptr);
198 begin
199 end;
201 procedure tAggr.Free;
202 begin
203 Assert(refc>0);
204 Dec(refc);
205 if refc=0 then begin
206 writeln('upmgr: aggr close');
207 DoneCriticalSection(thr.crit);
208 if assigned(prev) then prev^.next:=next else Peers:=next;
209 if assigned(next) then next^.prev:=prev;
210 FreeMem(@self,sizeof(self));
211 end else
212 writeln('upmgr: aggr unrefd');
213 end;
215 function FindAggr({const} addr:tNetAddr): tAggr_ptr;
216 begin
217 result:=Peers;
218 while assigned(result) do begin
219 if assigned(result^.next) then assert(result^.next^.prev=result);
220 if result^.remote=addr then exit;
221 result:=result^.next;
222 end;
223 end;
225 procedure ChatHandler(var nchat:tChat; msg:tSMsg);
226 var ag:^tAggr;
227 var pr:^tPrv;
228 var s:tMemoryStream;
229 const cMax=16;
230 begin
231 writeln('upmgr: ChatHandler');
232 msg.stream.skip({the initcode}1);
233 if msg.stream.RdBufLen<2 then begin
234 writeln('upmgr: malformed init');
235 nchat.StreamInit(s,16);
236 s.WriteByte(upFAIL);
237 s.writebyte(upErrMalformed);
238 nchat.Send(s);
239 nchat.Close;
240 writeln('upmgr: malformed request stage=0');
241 exit end;
242 {first get the ag}
243 ag:=FindAggr(msg.source^);
244 if assigned(ag) then begin
245 {check}
246 if ag^.refc>=cMax then begin
247 nchat.StreamInit(s,16);
248 s.WriteByte(upFAIL);
249 s.WriteByte(upErrHiChan);
250 s.WriteByte(cMax);
251 s.WriteByte(ag^.refc);
252 nchat.Send(s);
253 nchat.Close;
254 exit end;
255 end else begin
256 New(ag);
257 ag^.init(msg.source^);
258 end;
259 New(pr);
260 //pr^.Init(ag,nchat,msg);
261 end;
263 BEGIN
264 SetChatHandler(opcode.upFileServer,@ChatHandler);
265 END.