Add measure sending rate and calculate using that.
[brdnet.git] / TC.pas
blobd7853f327baecda02b77426250ffb172f87eb256
1 unit TC;
2 {TransmissionControll over UDP
3 ack all packets
4 send at fixed rate
5 increase rate
6 decrease rate if ack rate does not increase
8 some dg larger
9 if pass set payload to that
11 trid: diff on each side, send with the other side id
12 pktypes:
13 data
14 trid:2;sec:2;payload:XX
15 cont (send next)
16 trid:2;sec:2;len:WordNetOrder
17 ctrl (feedback to sender)
18 trid; arbitrary data
19 sec: mark:1 unused:1
21 INTERFACE
22 uses MemStream,NetAddr;
24 type tTCSSe=record
25 Rate:Real; {sending rate}
26 Size:word; {datagram size}
27 RateIF:single; {rate increase fraction}
28 SizeIF:single; {size increase fraction}
29 {new:=old+old*IF}
30 end;
32 type tTCS=object {this is sender part}
33 {in order methods should be set/called}
34 procedure Init; {set defaults for vars}
35 public
36 rid,lid:Word;
37 remote:tNetAddr;
38 mark,markc:byte;
39 isTimeout:boolean;
40 isCanSend:boolean;
41 paused:boolean; {if this is true, do not call start, call send directly (size in cur.size)}
42 rxRecTime:tDateTime; {last recent}
43 rxRecSize:word;
44 rxCumTime:tTime; {cumulative for current mark}
45 rxCumSize:longword;
46 txLastSize:word; {last recent}
47 txLastTime:tDateTime; {last recent}
48 txCumTime:tTime; {cumulative for current mark}
49 txCumSize:longword;
50 trySize:Word; {experimental size}
51 isTrySize:boolean;
52 SizeIncScarcity:Word; {inverse probability of size experiment}
53 siMark:byte; {size increase mark}
54 Cur:tTCSSe; {current values}
55 Limit:tTCSSe; {maximum alloved}
56 Initial:tTCSSe; {after start/timeout}
57 minRateIF:single; {used after rate decrease}
58 minSizeIF:single; {used after rate increase}
59 {statistic todo}
60 OnCtrl: procedure(var s:tMemoryStream) of object;
61 OnGTimeout: procedure of object;
62 CanSend: procedure(msize:word) of object; {called when transmit possible}
63 procedure Start; {start the transmission}
64 procedure WriteHeaders(var s:tMemoryStream); {add headers before the data, return payload size}
65 {add ur own data to stream, but max plsize bytes}
66 procedure Send(var s:tMemoryStream);
67 private
68 {timer callbacks}
69 procedure TransmitDelay; {delay sending packets immediatlely}
70 procedure AdjustSpeed;
71 procedure TimeoutIncreaseSize;
72 procedure OnCont(rmark:byte;rsize:word); {cont packet recved}
73 procedure Done; {unregister all callbacks}
74 end;
76 procedure RegTxer(var t:tTCS);
77 procedure DelTxer(var t:tTCS);
79 IMPLEMENTATION
80 uses ServerLoop,SysUtils;
82 {register cont and ctrl opcodes and deliver them to senders}
83 var Txers:array [0..31] of ^tTCS;
85 procedure RegTxer(var t:tTCS);
86 var tn:byte;
87 begin
88 //for tn:=0 to high(TXERS) do if txers[tn]=nil then break;
89 tn:=t.lid and high(txers);
90 assert(not assigned(txers[tn]));
91 txers[tn]:=@t;
92 t.lid:=(t.lid and (not high(txers))) or tn; {mask and set}
93 end;
95 procedure DelTxer(var t:tTCS);
96 var tn:byte;
97 begin
98 t.Done;
99 tn:=t.lid and high(txers);
100 txers[tn]:=nil;
101 end;
103 type tTCSp=^tTCS;
104 function GetTxer(lid:word):tTCSp;
105 var tn:byte;
106 begin
107 tn:=lid and high(txers);
108 result:=txers[tn];
109 if assigned(result) and (result^.lid<>lid) then result:=nil; {drop mismatched}
110 {todo: check sender address match}
111 end;
113 procedure RecvCtrl(msg:ServerLoop.tSMsg);
114 var t:^tTCS;
115 var lid:word;
116 begin
117 msg.stream.skip(1); {skip opcode}
118 msg.stream.Read(lid,2); {dont reorder bytes, lid is no number}
119 t:=GetTxer(lid);
120 if not assigned(t) then exit;
121 t^.OnCtrl(msg.stream);
122 end;
124 procedure RecvCont(msg:ServerLoop.tSMsg);
125 var t:^tTCS;
126 var lid:word;
127 var rmark:byte;
128 var rsize:word;
129 begin
130 msg.stream.skip(1); {skip opcode}
131 msg.stream.Read(lid,2); {dont reorder bytes, lid is no number}
132 rmark:=msg.stream.ReadByte;
133 msg.stream.Skip(1); {skip unused sec}
134 rsize:=msg.stream.ReadWord(2);
135 t:=GetTxer(lid);
136 if not assigned(t) then exit;
137 t^.OnCont(rmark,rsize);
138 end;
140 procedure tTCS.Init;
141 begin
142 lid:=Random(65535);
143 rid:=65535;
144 remote.clear;
145 SizeIncScarcity:=20; {inverse probability of size experiment}
146 Limit.Rate:=2*1024*1024*1024; {2GB}
147 isTimeout:=false;
148 Limit.Size:=4096;
149 Limit.RateIF:=4;
150 Limit.SizeIF:=3;
151 Initial.Rate:=256;
152 Initial.Size:=32+5;
153 Initial.RateIF:=10;
154 Initial.SizeIF:=2;
155 minRateIF:=0.01;
156 minSizeIF:=0.05;
157 paused:=false;
158 {statistic todo}
159 OnCtrl:=nil;
160 OnGTimeout:=nil;
161 CanSend:=nil;
162 end;
163 procedure tTCS.Start; {start the transmission}
164 begin
165 assert(rid<655350);
166 Assert(assigned(CanSend) );
167 Assert(not remote.isnil);
168 Cur:=Initial;
169 markc:=0;
170 mark:=Random(256);
171 isTrySize:=false;
172 isCanSend:=false;
173 txLastSize:=0;
174 paused:=false;
175 Shedule(80,@TransmitDelay);
176 Shedule(2000,@AdjustSpeed);
177 end;
179 procedure tTCS.WriteHeaders(var s:tMemoryStream);
180 begin
181 s.WriteByte(6);
182 s.Write(rid,2);
183 if isTrySize then begin
184 s.writebyte(siMark);
185 end else begin
186 s.WriteByte(mark);
187 end;
188 s.WriteByte(0);
189 end;
191 procedure tTCS.Send(var s:tMemoryStream);
192 begin
193 if isTrySize then assert(s.length<=trySize) else assert(s.Length<=cur.size);
194 isTrySize:=false;
195 paused:=false;
196 isCanSend:=false;
197 ServerLoop.SendMessage(s.base^,s.length,remote);
198 if txLastSize=0 then begin
199 txCumTime:=0;
200 txCumSize:=0;
201 end else begin
202 txCumTime:=txCumTime+((Now-txLastTime)*SecsPerDay);
203 txCumSize:=txCumSize+txLastSize;
204 end;
205 txLastTime:=Now;
206 txLastSize:=s.length;
207 end;
209 procedure tTCS.OnCont(rmark:byte;rsize:word);
210 var rnow:tDateTime;
211 var delta:real;
212 begin
213 if (rmark=mark)or((trySize>0)and(rmark=simark)) then begin
214 rnow:=Now;
215 inc(markc);
216 if markc=1 then begin
217 rxCumTime:=0;
218 rxCumSize:=0; {ignore this size since no info how long it sending}
219 if isTimeout then begin
220 isTimeout:=false;
221 Shedule(80,@TransmitDelay);
222 writeln('TIMEOUT RECOVERY');
223 end;
224 end else begin
225 delta:=(rnow-rxRecTime)*SecsPerDay;
226 rxCumTime:=rxCumTime+delta;
227 rxCumSize:=rxCumSize+rsize;
228 //writeln('told size is ',rsize, 'delta ',round(delta*1000));
229 end;
230 rxRecTime:=rnow;
231 rxRecSize:=rsize;
232 if (markc>200)or(rxCumSize>640000) then begin
233 UnShedule(@AdjustSpeed); {do not wait}
234 AdjustSpeed; {adjust now!}
235 end;
236 end;
237 if rmark=simark then begin
238 isTrySize:=false;
239 TrySize:=0;
240 if rsize>cur.size then begin
241 cur.SizeIF:=((rSize/cur.Size)-1)*2;
242 if cur.SizeIF>Limit.SizeIF then Cur.SizeIF:=Limit.SizeIF;
243 if (rsize/cur.rate)<=0.3 then cur.size:=rSize; {use new size for all transmit}
244 //writeln('New size ',cur.Size);
245 UnShedule(@TimeoutIncreaseSize);
246 end;
247 end;
248 end;
250 procedure tTCS.AdjustSpeed;
251 var rxRate:real;
252 var RateFill:single;
253 var txRate:real;
254 begin
255 if isCanSend then begin paused:=true; exit end; {nothing to transmit, sleep forever}
256 if isTimeout then begin Start; exit end;
257 if markc>3 then begin {only proceed with enough data}
258 rxrate:=rxCumSize/rxCumTime;
259 if txCumTime>0.01 then txrate:=txCumSize/txCumTime
260 else txrate:=cur.Rate;
261 RateFill:=rxrate/txRate;
262 write('speed: ',(rxrate/1024):1:4,'kB/s @',(txRate/1024):1:4,'kB/s (',(RateFill*100):3:1,'%), ');
263 if RateFill<0.85 then begin
264 write('limit, ');
265 {we hit the limit}
266 cur.Rate:=rxrate;
267 cur.RateIF:=minRateIF;
268 {cur.Size:=round(cur.size-(cur.Size/4));}
269 //cur.SizeIF:=minSizeIF;
270 end else begin
271 write('pass, ');
272 {rates are ok}
273 cur.Rate:=txrate+(cur.Rate*cur.RateIF);
274 if cur.Rate>limit.Rate then cur.Rate:=Limit.Rate;
275 cur.RateIF:=cur.RateIF*2;
276 if cur.RateIF>limit.RateIF then cur.RateIF:=Limit.RateIF;
277 repeat mark:=Random(256) until mark<>siMark;
278 end;
279 end else begin
280 {this is timeout! reset to safe rates}
281 write('timeout, ');
282 Cur:=Initial;
283 isTimeout:=true;
284 end;
285 //writeln('txwait ',((cur.size/cur.rate)*1000):1:1);
286 markc:=0;
287 txLastSize:=0;
288 writeln('adjust to ',(Cur.Rate/1024):1:4,'kB/s mark', mark, ' size=',cur.Size);
289 (*txLastSize:=0;*)
290 Shedule(1600,@AdjustSpeed);
291 end;
293 procedure tTCS.TransmitDelay;
294 var txwait:real;
295 var burst:byte;
296 begin
297 isCanSend:=true;
298 if (not isTimeout)and(TrySize=0)and(Random(SizeIncScarcity)=0)and(cur.Size<Limit.Size) then begin
299 repeat siMark:=Random(256) until siMark<>Mark;
300 isTrySize:=true;
301 trySize:=round(cur.Size+(cur.Size*cur.SizeIF));
302 if trySize>Limit.Size then trySize:=Limit.Size;
303 //writeln('Try size ',trySize);
304 CanSend(trySize-5);
305 txwait:=((txLastSize/cur.rate)*1000);
306 Shedule(round(txwait),@TransmitDelay);
307 if not isCanSend then Shedule(2500,@TimeoutIncreaseSize)
308 end else begin
309 txwait:=0;
310 burst:=0;
311 repeat
312 CanSend(Cur.Size-5);
313 txwait:=txwait+((txLastSize/cur.rate)*1000);
314 if isTrySize then break;
315 if isTimeout then exit;
316 inc(burst);
317 until (txwait>20)or(burst>200);
318 //writeln('Burst ',burst);
319 Shedule(round(txwait),@TransmitDelay);
320 end;
321 end;
323 procedure tTCS.TimeoutIncreaseSize;
324 begin
325 isTrySize:=false;
326 //writeln('Size Inc timeout');
327 cur.SizeIF:=cur.SizeIF/8;
328 {make sure we increase at least by 2 bytes}
329 if (cur.SizeIF*cur.Size)<1 then cur.SizeIF:=1/cur.Size;
330 trySize:=0;
331 end;
333 procedure tTCS.Done; {unregister all callbacks}
334 begin
335 UnShedule(@AdjustSpeed);
336 UnShedule(@TransmitDelay);
337 UnShedule(@TimeoutIncreaseSize);
338 end;
340 BEGIN
341 FillByte(txers,sizeof(txers),0); {make'em nil}
342 SetMsgHandler(4,@RecvCont);
343 SetMsgHandler(5,@RecvCtrl);
344 END.