2 {TransmissionControll over UDP
4 if pass set payload to that
6 useful for file transfer, voip should only consult the current rate
7 and detect congestion based on latency
15 mark:1;rate:Word4(shr 6)
20 uses MemStream
,NetAddr
;
23 Rate
:Real; {sending rate}
24 Size
:word; {datagram size}
25 RateIF
:single; {rate increase fraction}
26 SizeIF
:single; {size increase fraction}
30 type tTCS
=object {this is sender part}
31 {in order methods should be set/called}
32 procedure Init
; {set defaults for vars}
36 MarkStart
:tDateTime
; {when the mark was started}
37 MarkData
:LongWord
; {how much data sent}
42 Cur
:tTCSSe
; {current values}
43 Limit
:tTCSSe
; {maximum alloved}
44 Initial
:tTCSSe
; {after start/timeout}
45 minRateIF
:single; {used after rate decrease}
46 CanSend
: procedure of object; {called when transmit possible}
47 procedure Start
; {start the transmission}
48 function MaxSize(req
:word):word;
49 procedure WriteHeaders(var s
:tMemoryStream
); {add headers before the data}
50 procedure Send(var s
:tMemoryStream
);
53 procedure TransmitDelay
;
55 procedure OnCont(rmark
:byte;rrate
:real);
56 procedure OnAck(rmark
:byte;rsize
:word);
57 procedure Done
; {unregister all callbacks}
60 procedure RegTxer(var t
:tTCS
);
61 procedure DelTxer(var t
:tTCS
);
64 uses ServerLoop
,SysUtils
;
66 var Txers
:array [0..31] of ^tTCS
;
68 procedure RegTxer(var t
:tTCS
);
71 for tn
:=0 to high(TXERS
) do if txers
[tn
]=nil then break
;
72 assert(not assigned(txers
[tn
]));
76 procedure DelTxer(var t
:tTCS
);
79 tn
:=0; while tn
<=high(TXERS
) do if txers
[tn
]=@t
then break
else inc(tn
);
80 assert(tn
<=high(TXERS
));
86 function GetTxer(const cource
:tNetAddr
):tTCSp
;
90 tn
:=0; while tn
<=high(TXERS
) do if txers
[tn
]^.remote
=cource
then break
else inc(tn
);
91 if tn
<=high(TXERS
) then result
:=txers
[tn
];
94 procedure RecvCont(msg
:ServerLoop
.tSMsg
);
98 var rsize
:word absolute rrate
;
101 t
:=GetTxer(msg
.source
^);
102 if not assigned(t
) then exit
;
103 opcode
:=msg
.stream
.ReadByte
; {skip opcode}
104 rmark
:=msg
.stream
.ReadByte
;
105 if opcode
=5 {periodic} then begin
106 rrate
:=msg
.stream
.ReadWord(4);
107 t
^.OnCont(rmark
,rrate
);
108 end else if opcode
=7 {explicit} then begin
109 rsize
:=msg
.stream
.ReadWord(2);
110 t
^.OnAck(rmark
,rsize
);
117 //SizeIncScarcity:=20; {inverse probability of size experiment}
118 Limit
.Rate
:=2*1024*1024*1024; {2GB}
122 Initial
.Rate
:={20*}1024;
130 procedure tTCS
.Start
; {start the transmission}
132 Assert(assigned(CanSend
) ); Assert(not remote
.isnil
);
134 mark
:=Random(256); MarkData
:=0;
137 Shedule(80,@TransmitDelay
);
138 Shedule(2000,@Timeout
);
141 function tTCS
.MaxSize(req
:word):word;
144 then result
:=round(cur
.Size
*(1+cur
.SizeIF
))
145 else result
:=cur
.Size
;
147 if result
>req
then result
:=req
;
150 procedure tTCS
.WriteHeaders(var s
:tMemoryStream
);
153 s
.WriteByte(6);{opcode}
155 end else if isTimeout
=0 then begin
156 s
.WriteByte(4);{opcode}
159 s
.WriteByte(6);{opcode}
164 procedure tTCS
.Send(var s
:tMemoryStream
);
166 ServerLoop
.SendMessage(s
.base
^,s
.length
,remote
);
167 if MarkData
=0 then begin
170 end else MarkData
:=MarkData
+s
.length
;
171 txLastSize
:=s
.length
;
175 procedure tTCS
.OnCont(rmark
:byte;rrate
:real);
181 if (rmark
=Mark
) then begin
183 rxRate
:=(rrate
*64); {B/s}
184 txRate
:=MarkData
/((rnow
-MarkStart
)*SecsPerDay
);
185 RateFill
:=rxRate
/txRate
;
186 write('speed: ',(rxRate
/1024):1:3,'kB/s (',(RateFill
*100):3:1,'% of ',txRate
/1024:1:3,'), ');
188 Shedule(2000,@Timeout
);
189 if RateFill
<0.85 then begin
192 cur
.RateIF
:=minRateIF
;
194 if (txRate
/cur
.Rate
)<0.7 then begin
198 cur
.Rate
:=txRate
*(cur
.RateIF
+1);
199 if cur
.Rate
>limit
.Rate
then cur
.Rate
:=Limit
.Rate
200 else cur
.RateIF
:=cur
.RateIF
*2;
201 if cur
.RateIF
>limit
.RateIF
then cur
.RateIF
:=Limit
.RateIF
;
203 repeat mark
:=Random(256) until (mark
<>rMark
);
205 writeln('-> ',(Cur
.Rate
/1024):1:4,'kB/s if=',cur
.RateIF
:6:4);
207 cur
.SizeIF
:=cur
.SizeIF
/2;
212 procedure tTCS
.OnAck(rmark
:byte;rsize
:word);
214 if rmark
<>simark
then exit
;
215 if isTimeout
>0 then begin
216 Shedule(80,@TransmitDelay
);
219 if rsize
>cur
.size
then begin
220 writeln('size inc to ',rsize
);
221 cur
.SizeIF
:=((rSize
/cur
.Size
)-1)*2;
222 if cur
.SizeIF
>Limit
.SizeIF
then Cur
.SizeIF
:=Limit
.SizeIF
;
223 if (rsize
/cur
.rate
)<=0.3 then cur
.size
:=rSize
; {use new size for all transmit}
225 if rsize
>=cur
.size
then siWait
:=false;
228 procedure tTCS
.Timeout
;
231 mark
:=Random(256); MarkData
:=0;
234 Shedule(80,@TransmitDelay
);
235 Shedule(5000,@Timeout
);
238 procedure tTCS
.TransmitDelay
;
245 if (siMark
=0)and(cur
.Size
<limit
.Size
){and(random(10)=0)}and(istimeout
=0) then begin
248 siMark
:=random(255)+1;
252 if txLastSize
=0 then exit
;{pause}
253 if (isTimeout
>0) then exit
;
254 //txwait:=txwait+(txLastSize/cur.rate);
255 txwait
:=(MarkData
/cur
.Rate
)-((Now
-MarkStart
)*SecsPerDay
);
258 until (txwait
>0.02)or(burst
>200);
259 if txwait
<0.02 then txwait
:=0.01;
260 //writeln(txwait:1:3,burst);
261 Shedule(round(txwait
*1000),@TransmitDelay
);
264 procedure tTCS
.Done
; {unregister all callbacks}
266 UnShedule(@TransmitDelay
);
271 FillByte(txers
,sizeof(txers
),0); {make'em nil}
272 SetMsgHandler(5,@RecvCont
);
273 SetMsgHandler(7,@RecvCont
);