2 {TransmissionControll over UDP
6 decrease rate if ack rate does not increase
9 if pass set payload to that
11 trid: diff on each side, send with the other side id
14 trid:2;sec:2;payload:XX
16 trid:2;sec:2;len:WordNetOrder
17 ctrl (feedback to sender)
22 uses MemStream
,NetAddr
;
25 Rate
:Real; {sending rate}
26 Size
:word; {datagram size}
27 RateIF
:single; {rate increase fraction}
28 SizeIF
:single; {size increase fraction}
32 type tTCS
=object {this is sender part}
33 {in order methods should be set/called}
34 procedure Init
; {set defaults for vars}
41 paused
:boolean; {if this is true, do not call start, call send directly (size in cur.size)}
42 rxRecTime
:tDateTime
; {last recent}
44 rxCumTime
:tTime
; {cumulative for current mark}
46 txLastSize
:word; {last recent}
48 txLastTime:tDateTime; {last recent}
49 txCumTime:tTime; {cumulative for current mark}
52 trySize
:Word; {experimental size}
54 SizeIncScarcity
:Word; {inverse probability of size experiment}
55 siMark
:byte; {size increase mark}
56 Cur
:tTCSSe
; {current values}
57 Limit
:tTCSSe
; {maximum alloved}
58 Initial
:tTCSSe
; {after start/timeout}
59 minRateIF
:single; {used after rate decrease}
60 minSizeIF
:single; {used after rate increase}
62 OnCtrl
: procedure(var s
:tMemoryStream
) of object;
63 OnGTimeout
: procedure of object;
64 CanSend
: procedure(msize
:word) of object; {called when transmit possible}
65 procedure Start
; {start the transmission}
66 procedure WriteHeaders(var s
:tMemoryStream
); {add headers before the data, return payload size}
67 {add ur own data to stream, but max plsize bytes}
68 procedure Send(var s
:tMemoryStream
);
71 procedure TransmitDelay
; {delay sending packets immediatlely}
72 procedure AdjustSpeed
;
73 procedure TimeoutIncreaseSize
;
74 procedure OnCont(rmark
:byte;rsize
:word); {cont packet recved}
75 procedure Done
; {unregister all callbacks}
78 procedure RegTxer(var t
:tTCS
);
79 procedure DelTxer(var t
:tTCS
);
82 uses ServerLoop
,SysUtils
;
84 {register cont and ctrl opcodes and deliver them to senders}
85 var Txers
:array [0..31] of ^tTCS
;
87 procedure RegTxer(var t
:tTCS
);
90 //for tn:=0 to high(TXERS) do if txers[tn]=nil then break;
91 tn
:=t
.lid
and high(txers
);
92 assert(not assigned(txers
[tn
]));
94 t
.lid
:=(t
.lid
and (not high(txers
))) or tn
; {mask and set}
97 procedure DelTxer(var t
:tTCS
);
101 tn
:=t
.lid
and high(txers
);
106 function GetTxer(lid
:word):tTCSp
;
109 tn
:=lid
and high(txers
);
111 if assigned(result
) and (result
^.lid
<>lid
) then result
:=nil; {drop mismatched}
112 {todo: check sender address match}
115 procedure RecvCtrl(msg
:ServerLoop
.tSMsg
);
119 msg
.stream
.skip(1); {skip opcode}
120 msg
.stream
.Read(lid
,2); {dont reorder bytes, lid is no number}
122 if not assigned(t
) then exit
;
123 t
^.OnCtrl(msg
.stream
);
126 procedure RecvCont(msg
:ServerLoop
.tSMsg
);
132 msg
.stream
.skip(1); {skip opcode}
133 msg
.stream
.Read(lid
,2); {dont reorder bytes, lid is no number}
134 rmark
:=msg
.stream
.ReadByte
;
135 msg
.stream
.Skip(1); {skip unused sec}
136 rsize
:=msg
.stream
.ReadWord(2);
138 if not assigned(t
) then exit
;
139 t
^.OnCont(rmark
,rsize
);
147 SizeIncScarcity
:=20; {inverse probability of size experiment}
148 Limit
.Rate
:=2*1024*1024*1024; {2GB}
165 procedure tTCS
.Start
; {start the transmission}
168 Assert(assigned(CanSend
) );
169 Assert(not remote
.isnil
);
177 Shedule(80,@TransmitDelay
);
178 Shedule(2000,@AdjustSpeed
);
181 procedure tTCS
.WriteHeaders(var s
:tMemoryStream
);
185 if isTrySize
then begin
193 procedure tTCS
.Send(var s
:tMemoryStream
);
195 if isTrySize
then assert(s
.length
<=trySize
) else assert(s
.Length
<=cur
.size
);
199 ServerLoop
.SendMessage(s
.base
^,s
.length
,remote
);
200 (*if txLastSize=0 then txCumTime:=0 else begin
201 txCumTime:=txCumTime+((Now-txLastTime)*SecsPerDay);
202 inc(txCumSize,txLastSize);
206 txLastSize
:=s
.length
;
209 procedure tTCS
.OnCont(rmark
:byte;rsize
:word);
213 if (rmark
=mark
)or((trySize
>0)and(rmark
=simark
)) then begin
216 if markc
=1 then begin
218 rxCumSize
:=0; {ignore this size since no info how long it sending}
219 if isTimeout
then begin
221 Shedule(80,@TransmitDelay
);
222 writeln('TIMEOUT RECOVERY');
225 delta
:=(rnow
-rxRecTime
)*SecsPerDay
;
226 rxCumTime
:=rxCumTime
+delta
;
227 rxCumSize
:=rxCumSize
+rsize
;
228 //writeln('told size is ',rsize, 'delta ',round(delta*1000));
232 if (markc
>200)or(rxCumSize
>640000) then begin
233 UnShedule(@AdjustSpeed
); {do not wait}
234 AdjustSpeed
; {adjust now!}
237 if rmark
=simark
then begin
240 if rsize
>cur
.size
then begin
241 cur
.SizeIF
:=((rSize
/cur
.Size
)-1)*2;
242 if cur
.SizeIF
>Limit
.SizeIF
then Cur
.SizeIF
:=Limit
.SizeIF
;
243 if (rsize
/cur
.rate
)<=0.3 then cur
.size
:=rSize
; {use new size for all transmit}
244 //writeln('New size ',cur.Size);
245 UnShedule(@TimeoutIncreaseSize
);
250 procedure tTCS
.AdjustSpeed
;
255 if isCanSend
then begin paused
:=true; exit
end; {nothing to transmit, sleep forever}
256 if isTimeout
then begin Start
; exit
end;
257 if markc
>3 then begin {only proceed with enough data}
258 rxrate
:=rxCumSize
/rxCumTime
;
259 RateFill
:=rxrate
/cur
.rate
;
260 (*txrate:=txCumSize/txCumTime;
261 //write('speed: ',(rxrate/1024):1:4,'kB/s ',(txrate/1024):1:4,'kB/s (',((rxrate/cur.rate)*100):3:1,'%), ');*)
262 write('speed: ',(rxrate
/1024):1:4,'kB/s (',(RateFill
*100):3:1,'%), ');
263 if RateFill
<0.85 then begin
267 cur
.RateIF
:=minRateIF
;
268 {cur.Size:=round(cur.size-(cur.Size/4));}
269 //cur.SizeIF:=minSizeIF;
273 {if RateFill>1.05 then cur.Rate:=rxRate
274 else }cur
.Rate
:=cur
.Rate
+(cur
.Rate
*cur
.RateIF
);
275 if cur
.Rate
>limit
.Rate
then cur
.Rate
:=Limit
.Rate
;
276 cur
.RateIF
:=cur
.RateIF
*2;
277 if cur
.RateIF
>limit
.RateIF
then cur
.RateIF
:=Limit
.RateIF
;
278 repeat mark
:=Random(256) until mark
<>siMark
;
281 {this is timeout! reset to safe rates}
286 //writeln('txwait ',((cur.size/cur.rate)*1000):1:1);
288 writeln('adjust to ',(Cur
.Rate
/1024):1:4,'kB/s mark', mark
, ' size=',cur
.Size
);
290 Shedule(1600,@AdjustSpeed
);
293 procedure tTCS
.TransmitDelay
;
298 if (not isTimeout
)and(TrySize
=0)and(Random(SizeIncScarcity
)=0)and(cur
.Size
<Limit
.Size
) then begin
299 repeat siMark
:=Random(256) until siMark
<>Mark
;
301 trySize
:=round(cur
.Size
+(cur
.Size
*cur
.SizeIF
));
302 if trySize
>Limit
.Size
then trySize
:=Limit
.Size
;
303 //writeln('Try size ',trySize);
305 txwait
:=txwait
+((txLastSize
/cur
.rate
)*1000);
306 Shedule(round(txwait
),@TransmitDelay
);
307 if not isCanSend
then Shedule(2500,@TimeoutIncreaseSize
)
313 txwait
:=txwait
+((txLastSize
/cur
.rate
)*1000);
314 if isTrySize
then break
;
315 if isTimeout
then exit
;
317 until (txwait
>20)or(burst
>200);
318 //writeln('Burst ',burst);
319 Shedule(round(txwait
),@TransmitDelay
);
323 procedure tTCS
.TimeoutIncreaseSize
;
326 //writeln('Size Inc timeout');
327 cur
.SizeIF
:=cur
.SizeIF
/8;
328 {make sure we increase at least by 2 bytes}
329 if (cur
.SizeIF
*cur
.Size
)<1 then cur
.SizeIF
:=1/cur
.Size
;
333 procedure tTCS
.Done
; {unregister all callbacks}
335 UnShedule(@AdjustSpeed
);
336 UnShedule(@TransmitDelay
);
337 UnShedule(@TimeoutIncreaseSize
);
341 FillByte(txers
,sizeof(txers
),0); {make'em nil}
342 SetMsgHandler(4,@RecvCont
);
343 SetMsgHandler(5,@RecvCtrl
);