TC: Add timeout.
[brdnet.git] / TC.pas
blobe329785114b6096ec34f51c2caa4e99043e5e298
1 unit TC;
2 {TransmissionControll over UDP
3 some dg larger
4 if pass set payload to that
6 useful for file transfer, voip should only consult the current rate
7 and detect congestion based on latency
9 opcodes:
10 data=4
11 mark:1;payload:XX
12 data-no-report=8
13 data-imm-ack=6
14 cont=5
15 mark:1;rate:Word4(shr 6)
16 ack=7
17 mark:1;len:Word2
19 INTERFACE
20 uses MemStream,NetAddr;
22 type tTCSSe=record
23 Rate:Real; {sending rate}
24 Size:word; {datagram size}
25 RateIF:single; {rate increase fraction}
26 SizeIF:single; {size increase fraction}
27 {new:=old+old*IF}
28 end;
30 type tTCS=object {this is sender part}
31 {in order methods should be set/called}
32 procedure Init; {set defaults for vars}
33 public
34 remote:tNetAddr;
35 Mark:byte;
36 MarkStart:tDateTime; {when the mark was started}
37 MarkData:LongWord; {how much data sent}
38 txLastSize:Word;
39 siMark:byte;
40 siNow,siWait:boolean;
41 isTimeout:word;
42 Cur:tTCSSe; {current values}
43 Limit:tTCSSe; {maximum alloved}
44 Initial:tTCSSe; {after start/timeout}
45 minRateIF:single; {used after rate decrease}
46 CanSend: procedure of object; {called when transmit possible}
47 procedure Start; {start the transmission}
48 function MaxSize(req:word):word;
49 procedure WriteHeaders(var s:tMemoryStream); {add headers before the data}
50 procedure Send(var s:tMemoryStream);
51 private
52 {timer callbacks}
53 procedure TransmitDelay;
54 procedure Timeout;
55 procedure OnCont(rmark:byte;rrate:real);
56 procedure OnAck(rmark:byte;rsize:word);
57 procedure Done; {unregister all callbacks}
58 end;
60 procedure RegTxer(var t:tTCS);
61 procedure DelTxer(var t:tTCS);
63 IMPLEMENTATION
64 uses ServerLoop,SysUtils;
66 var Txers:array [0..31] of ^tTCS;
68 procedure RegTxer(var t:tTCS);
69 var tn:byte;
70 begin
71 for tn:=0 to high(TXERS) do if txers[tn]=nil then break;
72 assert(not assigned(txers[tn]));
73 txers[tn]:=@t;
74 end;
76 procedure DelTxer(var t:tTCS);
77 var tn:byte;
78 begin
79 tn:=0; while tn<=high(TXERS) do if txers[tn]=@t then break else inc(tn);
80 assert(tn<=high(TXERS));
81 t.Done;
82 txers[tn]:=nil;
83 end;
85 type tTCSp=^tTCS;
86 function GetTxer(const cource:tNetAddr):tTCSp;
87 var tn:byte;
88 begin
89 result:=nil;
90 tn:=0; while tn<=high(TXERS) do if txers[tn]^.remote=cource then break else inc(tn);
91 if tn<=high(TXERS) then result:=txers[tn];
92 end;
94 procedure RecvCont(msg:ServerLoop.tSMsg);
95 var t:^tTCS;
96 var rmark:byte;
97 var rrate:longword;
98 var rsize:word absolute rrate;
99 var opcode:byte;
100 begin
101 t:=GetTxer(msg.source^);
102 if not assigned(t) then exit;
103 opcode:=msg.stream.ReadByte; {skip opcode}
104 rmark:=msg.stream.ReadByte;
105 if opcode=5 {periodic} then begin
106 rrate:=msg.stream.ReadWord(4);
107 t^.OnCont(rmark,rrate);
108 end else if opcode=7 {explicit} then begin
109 rsize:=msg.stream.ReadWord(2);
110 t^.OnAck(rmark,rsize);
111 end;
112 end;
114 procedure tTCS.Init;
115 begin
116 remote.clear;
117 //SizeIncScarcity:=20; {inverse probability of size experiment}
118 Limit.Rate:=2*1024*1024*1024; {2GB}
119 Limit.Size:=4096;
120 Limit.RateIF:=1;
121 Limit.SizeIF:=2;
122 Initial.Rate:={20*}1024;
123 Initial.Size:=32+5;
124 Initial.RateIF:=0.5;
125 Initial.SizeIF:=2;
126 minRateIF:=0.01;
127 CanSend:=nil;
128 end;
130 procedure tTCS.Start; {start the transmission}
131 begin
132 Assert(assigned(CanSend) ); Assert(not remote.isnil);
133 Cur:=Initial;
134 mark:=Random(256); MarkData:=0;
135 siMark:=0;
136 isTimeout:=0;
137 Shedule(80,@TransmitDelay);
138 Shedule(2000,@Timeout);
139 end;
141 function tTCS.MaxSize(req:word):word;
142 begin
143 if siNow
144 then result:=round(cur.Size*(1+cur.SizeIF))
145 else result:=cur.Size;
146 dec(result,2);
147 if result>req then result:=req;
148 end;
150 procedure tTCS.WriteHeaders(var s:tMemoryStream);
151 begin
152 if siNow then begin
153 s.WriteByte(6);{opcode}
154 s.WriteByte(siMark);
155 end else if isTimeout=0 then begin
156 s.WriteByte(4);{opcode}
157 s.WriteByte(mark);
158 end else begin
159 s.WriteByte(6);{opcode}
160 s.WriteByte(simark);
161 end;
162 end;
164 procedure tTCS.Send(var s:tMemoryStream);
165 begin
166 ServerLoop.SendMessage(s.base^,s.length,remote);
167 if MarkData=0 then begin
168 MarkStart:=Now;
169 MarkData:=1;
170 end else MarkData:=MarkData+s.length;
171 txLastSize:=s.length;
172 siNow:=false;
173 end;
175 procedure tTCS.OnCont(rmark:byte;rrate:real);
176 var rnow:tDateTime;
177 var RateFill:single;
178 var txRate:real;
179 var rxRate:real;
180 begin
181 if (rmark=Mark) then begin
182 rnow:=Now;
183 rxRate:=(rrate*64); {B/s}
184 txRate:=MarkData/((rnow-MarkStart)*SecsPerDay);
185 RateFill:=rxRate/txRate;
186 write('speed: ',(rxRate/1024):1:3,'kB/s (',(RateFill*100):3:1,'% of ',txRate/1024:1:3,'), ');
187 UnShedule(@Timeout);
188 Shedule(2000,@Timeout);
189 if RateFill<0.85 then begin
190 write('limit, ');
191 cur.Rate:=rxrate;
192 cur.RateIF:=minRateIF;
193 end else
194 if (txRate/cur.Rate)<0.7 then begin
195 write('3hard, ');
196 end else begin
197 write('pass, ');
198 cur.Rate:=txRate*(cur.RateIF+1);
199 if cur.Rate>limit.Rate then cur.Rate:=Limit.Rate
200 else cur.RateIF:=cur.RateIF*2;
201 if cur.RateIF>limit.RateIF then cur.RateIF:=Limit.RateIF;
202 end;
203 repeat mark:=Random(256) until (mark<>rMark);
204 MarkData:=0;
205 writeln('-> ',(Cur.Rate/1024):1:4,'kB/s if=',cur.RateIF:6:4);
206 if siWait then begin
207 cur.SizeIF:=cur.SizeIF/2;
208 end;
209 siMark:=0;
210 end end;
212 procedure tTCS.OnAck(rmark:byte;rsize:word);
213 begin
214 if rmark<>simark then exit;
215 if isTimeout>0 then begin
216 Shedule(80,@TransmitDelay);
217 isTimeout:=0;
218 end else
219 if rsize>cur.size then begin
220 writeln('size inc to ',rsize);
221 cur.SizeIF:=((rSize/cur.Size)-1)*2;
222 if cur.SizeIF>Limit.SizeIF then Cur.SizeIF:=Limit.SizeIF;
223 if (rsize/cur.rate)<=0.3 then cur.size:=rSize; {use new size for all transmit}
224 end;
225 if rsize>=cur.size then siWait:=false;
226 end;
228 procedure tTCS.Timeout;
229 begin
230 cur:=initial;
231 mark:=Random(256); MarkData:=0;
232 siMark:=0;
233 Inc(isTimeout);
234 Shedule(80,@TransmitDelay);
235 Shedule(5000,@Timeout);
236 end;
238 procedure tTCS.TransmitDelay;
239 var txwait:real;
240 var burst:word;
241 begin
242 txLastSize:=0;
243 txwait:=0;
244 burst:=0;
245 if (siMark=0)and(cur.Size<limit.Size){and(random(10)=0)}and(istimeout=0) then begin
246 siNow:=true;
247 siWait:=true;
248 siMark:=random(255)+1;
249 end;
250 repeat
251 CanSend;
252 if txLastSize=0 then exit;{pause}
253 if (isTimeout>0) then exit;
254 //txwait:=txwait+(txLastSize/cur.rate);
255 txwait:=(MarkData/cur.Rate)-((Now-MarkStart)*SecsPerDay);
256 inc(burst);
257 siNow:=false;
258 until (txwait>0.02)or(burst>200);
259 if txwait<0.02 then txwait:=0.01;
260 //writeln(txwait:1:3,burst);
261 Shedule(round(txwait*1000),@TransmitDelay);
262 end;
264 procedure tTCS.Done; {unregister all callbacks}
265 begin
266 UnShedule(@TransmitDelay);
267 UnShedule(@Timeout);
268 end;
270 BEGIN
271 FillByte(txers,sizeof(txers),0); {make'em nil}
272 SetMsgHandler(5,@RecvCont);
273 SetMsgHandler(7,@RecvCont);
274 END.