TC Fix the suspend begaviour.
[brdnet.git] / TC.pas
blobbce838ef3d7ac729c8448d960a6dd2b794a1bdee
1 unit TC;
2 {TransmissionControll over UDP
3 some dg larger
4 if pass set payload to that
6 useful for file transfer, voip should only consult the current rate
7 and detect congestion based on latency
9 Used by UploadManager. 1 TC per peer.
11 Suspend: return from CanSend without sending :)
12 Resume: call start
14 opcodes:
15 data=4
16 mark:1;payload:XX
17 data-no-report=8
18 data-imm-ack=6
19 cont=5
20 mark:1;rate:Word4(shr 6)
21 ack=7
22 mark:1;len:Word2
24 INTERFACE
25 uses MemStream,NetAddr,ServerLoop;
27 type tTCSSe=record
28 Rate:Real; {sending rate}
29 Size:word; {datagram size}
30 RateIF:single; {rate increase fraction}
31 SizeIF:single; {size increase fraction}
32 {new:=old+old*IF}
33 end;
35 type tTCS=object {this is sender part}
36 {in order methods should be set/called}
37 procedure Init(const iremote:tNetAddr); {set defaults for vars}
38 public
39 remote:tNetAddr;
40 Mark:byte;
41 MarkStart:tDateTime; {when the mark was started}
42 MarkData:LongWord; {how much data sent}
43 txLastSize:Word; {is zero if suspend}
44 siMark:byte;
45 siNow,siWait:boolean;
46 isTimeout:word;
47 Cur:tTCSSe; {current values}
48 Limit:tTCSSe; {maximum alloved}
49 Initial:tTCSSe; {after start/timeout}
50 minRateIF:single; {used after rate decrease}
51 CanSend: procedure of object; {called when transmit possible}
52 procedure Start; {start the transmission}
53 function MaxSize(req:word):word;
54 procedure WriteHeaders(var s:tMemoryStream); {add headers before the data}
55 procedure Send(var s:tMemoryStream);
56 private
57 {timer callbacks}
58 procedure TransmitDelay;
59 procedure Timeout;
60 procedure OnCont(msg:ServerLoop.tSMsg);
61 procedure OnAck(msg:ServerLoop.tSMsg);
62 procedure Done; {unregister all callbacks}
63 end;
65 IMPLEMENTATION
66 uses SysUtils;
68 procedure tTCS.Init(const iremote:tNetAddr);
69 begin
70 remote:=iremote;
71 SetMsgHandler(5,remote,@OnCont);
72 SetMsgHandler(7,remote,@OnAck);
73 Limit.Rate:=2*1024*1024*1024; {2GB}
74 Limit.Size:=4096;
75 Limit.RateIF:=1;
76 Limit.SizeIF:=2;
77 Initial.Rate:={20*}1024;
78 Initial.Size:=32+5;
79 Initial.RateIF:=0.5;
80 Initial.SizeIF:=2;
81 minRateIF:=0.01;
82 CanSend:=nil;
83 Cur:=Initial;
84 txLastSize:=0;
85 end;
87 procedure tTCS.Start; {start the transmission}
88 begin
89 Assert(assigned(CanSend) ); Assert(not remote.isnil);
90 assert(txLastSize=0);
91 mark:=Random(256); MarkData:=0;
92 siMark:=0;
93 isTimeout:=0;
94 Shedule(80,@TransmitDelay);
95 Shedule(3000,@Timeout);
96 end;
98 function tTCS.MaxSize(req:word):word;
99 begin
100 req:=req-2;{headers}
101 if siNow
102 then result:=round(cur.Size*(1+cur.SizeIF))
103 else result:=cur.Size;
104 dec(result,2);
105 if result>req then result:=req;
106 end;
108 procedure tTCS.WriteHeaders(var s:tMemoryStream);
109 begin
110 if siNow then begin
111 s.WriteByte(6);{opcode}
112 s.WriteByte(siMark);
113 end else if isTimeout=0 then begin
114 s.WriteByte(4);{opcode}
115 s.WriteByte(mark);
116 end else begin
117 s.WriteByte(6);{opcode}
118 s.WriteByte(simark);
119 end;
120 end;
122 procedure tTCS.Send(var s:tMemoryStream);
123 begin
124 ServerLoop.SendMessage(s.base^,s.length,remote);
125 if MarkData=0 then begin
126 MarkStart:=Now;
127 MarkData:=1;
128 end else MarkData:=MarkData+s.length;
129 txLastSize:=s.length;
130 siNow:=false;
131 end;
133 procedure tTCS.OnCont(msg:ServerLoop.tSMsg);
134 var rnow:tDateTime;
135 var RateFill:single;
136 var txRate:real;
137 var rxRate:real;
138 var rmark:byte;
139 var rrate:longword;
140 var opcode:byte;
141 begin
142 opcode:=msg.stream.ReadByte; {skip opcode}
143 rmark:=msg.stream.ReadByte;
144 assert(opcode=5);
145 rrate:=msg.stream.ReadWord(4);
146 if (rmark=Mark) then begin
147 rnow:=Now;
148 rxRate:=(rrate*64); {B/s}
149 txRate:=MarkData/((rnow-MarkStart)*SecsPerDay);
150 RateFill:=rxRate/txRate;
151 write('speed: ',(rxRate/1024):1:3,'kB/s (',(RateFill*100):3:1,'% of ',txRate/1024:1:3,'), ');
152 UnShedule(@Timeout);
153 Shedule(2000,@Timeout);
154 if RateFill<0.85 then begin
155 write('limit, ');
156 cur.Rate:=rxrate;
157 cur.RateIF:=minRateIF;
158 end else
159 if (txRate/cur.Rate)<0.7 then begin
160 write('3hard, ');
161 end else begin
162 write('pass, ');
163 cur.Rate:=txRate*(cur.RateIF+1);
164 if cur.Rate>limit.Rate then cur.Rate:=Limit.Rate
165 else cur.RateIF:=cur.RateIF*2;
166 if cur.RateIF>limit.RateIF then cur.RateIF:=Limit.RateIF;
167 end;
168 repeat mark:=Random(256) until (mark<>rMark);
169 MarkData:=0;
170 writeln('-> ',(Cur.Rate/1024):1:4,'kB/s if=',cur.RateIF:6:4);
171 if siWait then begin
172 cur.SizeIF:=cur.SizeIF/2;
173 end;
174 siMark:=0;
175 end end;
177 procedure tTCS.OnAck(msg:ServerLoop.tSMsg);
178 var rmark:byte;
179 var rsize:word;
180 var opcode:byte;
181 begin
182 opcode:=msg.stream.ReadByte; {skip opcode}
183 rmark:=msg.stream.ReadByte;
184 assert(opcode=7);
185 rsize:=msg.stream.ReadWord(2);
186 if rmark<>simark then exit;
187 if isTimeout>0 then begin
188 Shedule(80,@TransmitDelay);
189 isTimeout:=0;
190 end else
191 if rsize>cur.size then begin
192 writeln('size inc to ',rsize);
193 cur.SizeIF:=((rSize/cur.Size)-1)*2;
194 if cur.SizeIF>Limit.SizeIF then Cur.SizeIF:=Limit.SizeIF;
195 if (rsize/cur.rate)<=0.3 then cur.size:=rSize; {use new size for all transmit}
196 end;
197 if rsize>=cur.size then siWait:=false;
198 end;
200 procedure tTCS.Timeout;
201 begin
202 if txLastSize=0 then exit; {suspend}
203 cur:=initial;
204 mark:=Random(256); MarkData:=0;
205 siMark:=0;
206 Inc(isTimeout);
207 Shedule(80,@TransmitDelay);
208 Shedule(3000,@Timeout);
209 end;
211 procedure tTCS.TransmitDelay;
212 var txwait:real;
213 var burst:word;
214 begin
215 txLastSize:=0;
216 txwait:=0;
217 burst:=0;
218 if (siMark=0)and(cur.Size<limit.Size){and(random(10)=0)}and(istimeout=0) then begin
219 siNow:=true;
220 siWait:=true;
221 siMark:=random(255)+1;
222 end;
223 repeat
224 CanSend;
225 if txLastSize=0 then exit; {pause}
226 if (isTimeout>0) then exit; {no burst, no shedule next}
227 //txwait:=txwait+(txLastSize/cur.rate);
228 txwait:=(MarkData/cur.Rate)-((Now-MarkStart)*SecsPerDay);
229 inc(burst);
230 siNow:=false;
231 until (txwait>0.02)or(burst>200);
232 if txwait<0.02 then txwait:=0.01;
233 //writeln(txwait:1:3,burst);
234 Shedule(round(txwait*1000),@TransmitDelay);
235 end;
237 procedure tTCS.Done; {unregister all callbacks}
238 begin
239 UnShedule(@TransmitDelay);
240 UnShedule(@Timeout);
241 SetMsgHandler(5,remote,nil);
242 SetMsgHandler(7,remote,nil);
243 end;
245 BEGIN
246 END.