2 {TransmissionControll over UDP
4 if pass set payload to that
6 useful for file transfer, voip should only consult the current rate
7 and detect congestion based on latency
15 mark:1;rate:Word4(shr 6)
20 uses MemStream
,NetAddr
;
23 Rate
:Real; {sending rate}
24 Size
:word; {datagram size}
25 RateIF
:single; {rate increase fraction}
26 SizeIF
:single; {size increase fraction}
30 type tTCS
=object {this is sender part}
31 {in order methods should be set/called}
32 procedure Init
; {set defaults for vars}
36 MarkStart
:tDateTime
; {when the mark was started}
37 MarkData
:LongWord
; {how much data sent}
39 Cur
:tTCSSe
; {current values}
40 Limit
:tTCSSe
; {maximum alloved}
41 Initial
:tTCSSe
; {after start/timeout}
42 minRateIF
:single; {used after rate decrease}
43 CanSend
: procedure of object; {called when transmit possible}
44 procedure Start
; {start the transmission}
45 function MaxSize(req
:word):word;
46 procedure WriteHeaders(var s
:tMemoryStream
); {add headers before the data}
47 procedure Send(var s
:tMemoryStream
);
50 procedure TransmitDelay
;
51 //procedure TimeoutCont;
52 procedure OnCont(rmark
:byte;rrate
:real);
53 //procedure OnAck(rmark:byte;rsize:word);
54 procedure Done
; {unregister all callbacks}
57 procedure RegTxer(var t
:tTCS
);
58 procedure DelTxer(var t
:tTCS
);
61 uses ServerLoop
,SysUtils
;
63 var Txers
:array [0..31] of ^tTCS
;
65 procedure RegTxer(var t
:tTCS
);
68 for tn
:=0 to high(TXERS
) do if txers
[tn
]=nil then break
;
69 assert(not assigned(txers
[tn
]));
73 procedure DelTxer(var t
:tTCS
);
76 tn
:=0; while tn
<=high(TXERS
) do if txers
[tn
]=@t
then break
else inc(tn
);
77 assert(tn
<=high(TXERS
));
83 function GetTxer(const cource
:tNetAddr
):tTCSp
;
87 tn
:=0; while tn
<=high(TXERS
) do if txers
[tn
]^.remote
=cource
then break
else inc(tn
);
88 if tn
<=high(TXERS
) then result
:=txers
[tn
];
91 procedure RecvCont(msg
:ServerLoop
.tSMsg
);
96 t
:=GetTxer(msg
.source
^);
97 if not assigned(t
) then exit
;
98 msg
.stream
.skip(1); {skip opcode}
99 rmark
:=msg
.stream
.ReadByte
;
100 rrate
:=msg
.stream
.ReadWord(4);
101 t
^.OnCont(rmark
,rrate
);
107 //SizeIncScarcity:=20; {inverse probability of size experiment}
108 Limit
.Rate
:=2*1024*1024*1024; {2GB}
112 Initial
.Rate
:=20*1024;
113 Initial
.Size
:={32+5}1024;
120 procedure tTCS
.Start
; {start the transmission}
122 Assert(assigned(CanSend
) ); Assert(not remote
.isnil
);
124 mark
:=Random(256); MarkData
:=0;
125 Shedule(80,@TransmitDelay
);
128 function tTCS
.MaxSize(req
:word):word;
130 if req
>cur
.Size
then MaxSize
:=cur
.Size
else MaxSize
:=req
;
133 procedure tTCS
.WriteHeaders(var s
:tMemoryStream
);
135 {if isTrySize then begin
137 s
.WriteByte(4);{opcode}
142 procedure tTCS
.Send(var s
:tMemoryStream
);
144 ServerLoop
.SendMessage(s
.base
^,s
.length
,remote
);
145 if MarkData
=0 then begin
148 end else MarkData
:=MarkData
+s
.length
;
149 txLastSize
:=s
.length
;
152 procedure tTCS
.OnCont(rmark
:byte;rrate
:real);
158 if (rmark
=Mark
) then begin
160 rxRate
:=(rrate
*64); {B/s}
161 txRate
:=MarkData
/((rnow
-MarkStart
)*SecsPerDay
);
162 RateFill
:=rxRate
/txRate
;
163 write('speed: ',(rxRate
/1024):1:3,'kB/s (',(RateFill
*100):3:1,'% of ',txRate
/1024:1:3,'), ');
164 if RateFill
<0.85 then begin
167 cur
.RateIF
:=minRateIF
;
169 if (txRate
/cur
.Rate
)<0.7 then begin
173 cur
.Rate
:=txRate
*(cur
.RateIF
+1);
174 if cur
.Rate
>limit
.Rate
then cur
.Rate
:=Limit
.Rate
175 else cur
.RateIF
:=cur
.RateIF
*2;
176 if cur
.RateIF
>limit
.RateIF
then cur
.RateIF
:=Limit
.RateIF
;
178 repeat mark
:=Random(256) until (mark
<>rMark
);
180 writeln('-> ',(Cur
.Rate
/1024):1:4,'kB/s if=',cur
.RateIF
:6:4);
183 if rmark=simark then begin
186 if rsize>cur.size then begin
187 cur.SizeIF:=((rSize/cur.Size)-1)*2;
188 if cur.SizeIF>Limit.SizeIF then Cur.SizeIF:=Limit.SizeIF;
189 if (rsize/cur.rate)<=0.3 then cur.size:=rSize; {use new size for all transmit}
190 UnShedule(@TimeoutIncreaseSize);
196 procedure tTCS
.TransmitDelay
;
205 if txLastSize
=0 then exit
;{pause}
206 //txwait:=txwait+(txLastSize/cur.rate);
207 txwait
:=(MarkData
/cur
.Rate
)-((Now
-MarkStart
)*SecsPerDay
);
209 until (txwait
>0.02)or(burst
>200);
210 if txwait
<0.02 then txwait
:=0.01;
211 //writeln(txwait:1:3,burst);
212 Shedule(round(txwait
*1000),@TransmitDelay
);
215 procedure tTCS
.Done
; {unregister all callbacks}
217 UnShedule(@TransmitDelay
);
221 FillByte(txers
,sizeof(txers
),0); {make'em nil}
222 SetMsgHandler(5,@RecvCont
);
223 //SetMsgHandler(7,@RecvCtrl);