Transmission speed controll. (squash)
[brdnet.git] / TC.pas
blob39ac20ab8bd386a0d62d7a3fd899e850a6350f35
1 unit TC;
2 {TransmissionControll over UDP
3 ack all packets
4 send at fixed rate
5 increase rate
6 decrease rate if ack rate does not increase
8 some dg larger
9 if pass set payload to that
11 trid: diff on each side, send with the other side id
12 pktypes:
13 data
14 trid:2;sec:2;payload:XX
15 cont (send next)
16 trid:2;sec:2;len:WordNetOrder
17 ctrl (feedback to sender)
18 trid; arbitrary data
19 sec: mark:1 unused:1
21 INTERFACE
22 uses MemStream,NetAddr;
24 type tTCSSe=record
25 Rate:Real; {sending rate}
26 Size:word; {datagram size}
27 RateIF:single; {rate increase fraction}
28 SizeIF:single; {size increase fraction}
29 {new:=old+old*IF}
30 end;
32 type tTCS=object {this is sender part}
33 {in order methods should be set/called}
34 procedure Init; {set defaults for vars}
35 public
36 rid,lid:Word;
37 remote:tNetAddr;
38 mark,markc:byte;
39 isTimeout:boolean;
40 isCanSend:boolean;
41 paused:boolean; {if this is true, do not call start, call send directly (size in cur.size)}
42 rxRecTime:tDateTime; {last recent}
43 rxRecSize:word;
44 rxCumTime:tTime; {cumulative for current mark}
45 rxCumSize:longword;
46 txLastSize:word; {last recent}
48 txLastTime:tDateTime; {last recent}
49 txCumTime:tTime; {cumulative for current mark}
50 txCumSize:longword;
52 trySize:Word; {experimental size}
53 isTrySize:boolean;
54 SizeIncScarcity:Word; {inverse probability of size experiment}
55 siMark:byte; {size increase mark}
56 Cur:tTCSSe; {current values}
57 Limit:tTCSSe; {maximum alloved}
58 Initial:tTCSSe; {after start/timeout}
59 minRateIF:single; {used after rate decrease}
60 minSizeIF:single; {used after rate increase}
61 {statistic todo}
62 OnCtrl: procedure(var s:tMemoryStream) of object;
63 OnGTimeout: procedure of object;
64 CanSend: procedure(msize:word) of object; {called when transmit possible}
65 procedure Start; {start the transmission}
66 procedure WriteHeaders(var s:tMemoryStream); {add headers before the data, return payload size}
67 {add ur own data to stream, but max plsize bytes}
68 procedure Send(var s:tMemoryStream);
69 private
70 {timer callbacks}
71 procedure TransmitDelay; {delay sending packets immediatlely}
72 procedure AdjustSpeed;
73 procedure TimeoutIncreaseSize;
74 procedure OnCont(rmark:byte;rsize:word); {cont packet recved}
75 procedure Done; {unregister all callbacks}
76 end;
78 procedure RegTxer(var t:tTCS);
79 procedure DelTxer(var t:tTCS);
81 IMPLEMENTATION
82 uses ServerLoop,SysUtils;
84 {register cont and ctrl opcodes and deliver them to senders}
85 var Txers:array [0..31] of ^tTCS;
87 procedure RegTxer(var t:tTCS);
88 var tn:byte;
89 begin
90 //for tn:=0 to high(TXERS) do if txers[tn]=nil then break;
91 tn:=t.lid and high(txers);
92 assert(not assigned(txers[tn]));
93 txers[tn]:=@t;
94 t.lid:=(t.lid and (not high(txers))) or tn; {mask and set}
95 end;
97 procedure DelTxer(var t:tTCS);
98 var tn:byte;
99 begin
100 t.Done;
101 tn:=t.lid and high(txers);
102 txers[tn]:=nil;
103 end;
105 type tTCSp=^tTCS;
106 function GetTxer(lid:word):tTCSp;
107 var tn:byte;
108 begin
109 tn:=lid and high(txers);
110 result:=txers[tn];
111 if assigned(result) and (result^.lid<>lid) then result:=nil; {drop mismatched}
112 {todo: check sender address match}
113 end;
115 procedure RecvCtrl(msg:ServerLoop.tSMsg);
116 var t:^tTCS;
117 var lid:word;
118 begin
119 msg.stream.skip(1); {skip opcode}
120 msg.stream.Read(lid,2); {dont reorder bytes, lid is no number}
121 t:=GetTxer(lid);
122 if not assigned(t) then exit;
123 t^.OnCtrl(msg.stream);
124 end;
126 procedure RecvCont(msg:ServerLoop.tSMsg);
127 var t:^tTCS;
128 var lid:word;
129 var rmark:byte;
130 var rsize:word;
131 begin
132 msg.stream.skip(1); {skip opcode}
133 msg.stream.Read(lid,2); {dont reorder bytes, lid is no number}
134 rmark:=msg.stream.ReadByte;
135 msg.stream.Skip(1); {skip unused sec}
136 rsize:=msg.stream.ReadWord(2);
137 t:=GetTxer(lid);
138 if not assigned(t) then exit;
139 t^.OnCont(rmark,rsize);
140 end;
142 procedure tTCS.Init;
143 begin
144 lid:=Random(65535);
145 rid:=65535;
146 remote.clear;
147 SizeIncScarcity:=20; {inverse probability of size experiment}
148 Limit.Rate:=2*1024*1024*1024; {2GB}
149 isTimeout:=false;
150 Limit.Size:=4096;
151 Limit.RateIF:=4;
152 Limit.SizeIF:=3;
153 Initial.Rate:=256;
154 Initial.Size:=32+5;
155 Initial.RateIF:=10;
156 Initial.SizeIF:=2;
157 minRateIF:=0.01;
158 minSizeIF:=0.05;
159 paused:=false;
160 {statistic todo}
161 OnCtrl:=nil;
162 OnGTimeout:=nil;
163 CanSend:=nil;
164 end;
165 procedure tTCS.Start; {start the transmission}
166 begin
167 assert(rid<655350);
168 Assert(assigned(CanSend) );
169 Assert(not remote.isnil);
170 Cur:=Initial;
171 markc:=0;
172 mark:=Random(256);
173 isTrySize:=false;
174 isCanSend:=false;
175 (*txLastSize:=0;*)
176 paused:=false;
177 Shedule(80,@TransmitDelay);
178 Shedule(2000,@AdjustSpeed);
179 end;
181 procedure tTCS.WriteHeaders(var s:tMemoryStream);
182 begin
183 s.WriteByte(6);
184 s.Write(rid,2);
185 if isTrySize then begin
186 s.writebyte(siMark);
187 end else begin
188 s.WriteByte(mark);
189 end;
190 s.WriteByte(0);
191 end;
193 procedure tTCS.Send(var s:tMemoryStream);
194 begin
195 if isTrySize then assert(s.length<=trySize) else assert(s.Length<=cur.size);
196 isTrySize:=false;
197 paused:=false;
198 isCanSend:=false;
199 ServerLoop.SendMessage(s.base^,s.length,remote);
200 (*if txLastSize=0 then txCumTime:=0 else begin
201 txCumTime:=txCumTime+((Now-txLastTime)*SecsPerDay);
202 inc(txCumSize,txLastSize);
203 end;
204 txLastTime:=Now;
206 txLastSize:=s.length;
207 end;
209 procedure tTCS.OnCont(rmark:byte;rsize:word);
210 var rnow:tDateTime;
211 var delta:real;
212 begin
213 if (rmark=mark)or((trySize>0)and(rmark=simark)) then begin
214 rnow:=Now;
215 inc(markc);
216 if markc=1 then begin
217 rxCumTime:=0;
218 rxCumSize:=0; {ignore this size since no info how long it sending}
219 if isTimeout then begin
220 isTimeout:=false;
221 Shedule(80,@TransmitDelay);
222 writeln('TIMEOUT RECOVERY');
223 end;
224 end else begin
225 delta:=(rnow-rxRecTime)*SecsPerDay;
226 rxCumTime:=rxCumTime+delta;
227 rxCumSize:=rxCumSize+rsize;
228 //writeln('told size is ',rsize, 'delta ',round(delta*1000));
229 end;
230 rxRecTime:=rnow;
231 rxRecSize:=rsize;
232 if (markc>200)or(rxCumSize>640000) then begin
233 UnShedule(@AdjustSpeed); {do not wait}
234 AdjustSpeed; {adjust now!}
235 end;
236 end;
237 if rmark=simark then begin
238 isTrySize:=false;
239 TrySize:=0;
240 if rsize>cur.size then begin
241 cur.SizeIF:=((rSize/cur.Size)-1)*2;
242 if cur.SizeIF>Limit.SizeIF then Cur.SizeIF:=Limit.SizeIF;
243 if (rsize/cur.rate)<=0.3 then cur.size:=rSize; {use new size for all transmit}
244 //writeln('New size ',cur.Size);
245 UnShedule(@TimeoutIncreaseSize);
246 end;
247 end;
248 end;
250 procedure tTCS.AdjustSpeed;
251 var rxRate:real;
252 var RateFill:single;
253 (*var txRate:real;*)
254 begin
255 if isCanSend then begin paused:=true; exit end; {nothing to transmit, sleep forever}
256 if isTimeout then begin Start; exit end;
257 if markc>3 then begin {only proceed with enough data}
258 rxrate:=rxCumSize/rxCumTime;
259 RateFill:=rxrate/cur.rate;
260 (*txrate:=txCumSize/txCumTime;
261 //write('speed: ',(rxrate/1024):1:4,'kB/s ',(txrate/1024):1:4,'kB/s (',((rxrate/cur.rate)*100):3:1,'%), ');*)
262 write('speed: ',(rxrate/1024):1:4,'kB/s (',(RateFill*100):3:1,'%), ');
263 if RateFill<0.85 then begin
264 write('limit, ');
265 {we hit the limit}
266 cur.Rate:=rxrate;
267 cur.RateIF:=minRateIF;
268 {cur.Size:=round(cur.size-(cur.Size/4));}
269 //cur.SizeIF:=minSizeIF;
270 end else begin
271 write('pass, ');
272 {rates are ok}
273 {if RateFill>1.05 then cur.Rate:=rxRate
274 else }cur.Rate:=cur.Rate+(cur.Rate*cur.RateIF);
275 if cur.Rate>limit.Rate then cur.Rate:=Limit.Rate;
276 cur.RateIF:=cur.RateIF*2;
277 if cur.RateIF>limit.RateIF then cur.RateIF:=Limit.RateIF;
278 repeat mark:=Random(256) until mark<>siMark;
279 end;
280 end else begin
281 {this is timeout! reset to safe rates}
282 write('timeout, ');
283 Cur:=Initial;
284 isTimeout:=true;
285 end;
286 //writeln('txwait ',((cur.size/cur.rate)*1000):1:1);
287 markc:=0;
288 writeln('adjust to ',(Cur.Rate/1024):1:4,'kB/s mark', mark, ' size=',cur.Size);
289 (*txLastSize:=0;*)
290 Shedule(1600,@AdjustSpeed);
291 end;
293 procedure tTCS.TransmitDelay;
294 var txwait:real;
295 var burst:byte;
296 begin
297 isCanSend:=true;
298 if (not isTimeout)and(TrySize=0)and(Random(SizeIncScarcity)=0)and(cur.Size<Limit.Size) then begin
299 repeat siMark:=Random(256) until siMark<>Mark;
300 isTrySize:=true;
301 trySize:=round(cur.Size+(cur.Size*cur.SizeIF));
302 if trySize>Limit.Size then trySize:=Limit.Size;
303 //writeln('Try size ',trySize);
304 CanSend(trySize-5);
305 txwait:=txwait+((txLastSize/cur.rate)*1000);
306 Shedule(round(txwait),@TransmitDelay);
307 if not isCanSend then Shedule(2500,@TimeoutIncreaseSize)
308 end else begin
309 txwait:=0;
310 burst:=0;
311 repeat
312 CanSend(Cur.Size-5);
313 txwait:=txwait+((txLastSize/cur.rate)*1000);
314 if isTrySize then break;
315 if isTimeout then exit;
316 inc(burst);
317 until (txwait>20)or(burst>200);
318 //writeln('Burst ',burst);
319 Shedule(round(txwait),@TransmitDelay);
320 end;
321 end;
323 procedure tTCS.TimeoutIncreaseSize;
324 begin
325 isTrySize:=false;
326 //writeln('Size Inc timeout');
327 cur.SizeIF:=cur.SizeIF/8;
328 {make sure we increase at least by 2 bytes}
329 if (cur.SizeIF*cur.Size)<1 then cur.SizeIF:=1/cur.Size;
330 trySize:=0;
331 end;
333 procedure tTCS.Done; {unregister all callbacks}
334 begin
335 UnShedule(@AdjustSpeed);
336 UnShedule(@TransmitDelay);
337 UnShedule(@TimeoutIncreaseSize);
338 end;
340 BEGIN
341 FillByte(txers,sizeof(txers),0); {make'em nil}
342 SetMsgHandler(4,@RecvCont);
343 SetMsgHandler(5,@RecvCtrl);
344 END.