UpMgr work + Tests and temp store.
[brdnet.git] / TC.pas
blob508059d89bb3febd659d9de787f9b391a949e1e8
1 unit TC;
2 {TransmissionControll over UDP
3 some dg larger
4 if pass set payload to that
6 useful for file transfer, voip should only consult the current rate
7 and detect congestion based on latency
9 Used by UploadManager. 1 TC per peer.
11 Suspend: return from CanSend without sending :)
12 Resume: call start
14 opcodes:
15 data=4
16 mark:1;payload:XX
17 data-no-report=8
18 data-imm-ack=6
19 cont=5
20 mark:1;rate:Word4(shr 6)
21 ack=7
22 mark:1;len:Word2
24 INTERFACE
25 uses MemStream,NetAddr,ServerLoop,opcode;
27 type tTCSSe=record
28 Rate:Real; {sending rate}
29 Size:word; {datagram size}
30 RateIF:single; {rate increase fraction}
31 SizeIF:single; {size increase fraction}
32 {new:=old+old*IF}
33 end;
35 type tTCS=object {this is sender part}
36 {in order methods should be set/called}
37 procedure Init(const iremote:tNetAddr); {set defaults for vars}
38 public
39 remote:tNetAddr;
40 Mark:byte;
41 MarkStart:tDateTime; {when the mark was started}
42 MarkData:LongWord; {how much data sent}
43 txLastSize:Word; {is zero if suspend}
44 siMark:byte;
45 siNow,siWait:boolean;
46 isTimeout,maxTimeout:word;
47 Cur:tTCSSe; {current values}
48 Limit:tTCSSe; {maximum alloved}
49 Initial:tTCSSe; {after start/timeout}
50 minRateIF:single; {used after rate decrease}
51 CanSend: procedure of object; {called when transmit possible}
52 OnTimeout: procedure of object;
53 procedure Start; {start the transmission}
54 function MaxSize(req:word):word;
55 procedure WriteHeaders(var s:tMemoryStream); {add headers before the data}
56 procedure Send(var s:tMemoryStream);
57 procedure Done; {unregister all callbacks}
58 private
59 {timer callbacks}
60 procedure TransmitDelay;
61 procedure Timeout;
62 procedure OnCont(msg:ServerLoop.tSMsg);
63 procedure OnAck(msg:ServerLoop.tSMsg);
64 end;
66 IMPLEMENTATION
67 uses SysUtils;
69 procedure tTCS.Init(const iremote:tNetAddr);
70 begin
71 remote:=iremote;
72 SetMsgHandler(opcode.tccont,remote,@OnCont);
73 SetMsgHandler(opcode.tceack,remote,@OnAck);
74 Limit.Rate:=2*1024*1024*1024; {2GB}
75 Limit.Size:=4096;
76 Limit.RateIF:=1;
77 Limit.SizeIF:=2;
78 Initial.Rate:={20*}1024;
79 Initial.Size:=32+5;
80 Initial.RateIF:=0.5;
81 Initial.SizeIF:=2;
82 minRateIF:=0.01;
83 CanSend:=nil;
84 OnTimeout:=nil;
85 maxTimeout:=65535;
86 Cur:=Initial;
87 txLastSize:=0;
88 end;
90 procedure tTCS.Start; {start the transmission}
91 begin
92 Assert(assigned(CanSend) ); Assert(not remote.isnil);
93 assert(txLastSize=0);
94 mark:=Random(256); MarkData:=0;
95 siMark:=0;
96 isTimeout:=0;
97 Shedule(80,@TransmitDelay);
98 Shedule(3000,@Timeout);
99 end;
101 function tTCS.MaxSize(req:word):word;
102 begin
103 req:=req-2;{headers}
104 if siNow
105 then result:=round(cur.Size*(1+cur.SizeIF))
106 else result:=cur.Size;
107 dec(result,2);
108 if result>req then result:=req;
109 end;
111 procedure tTCS.WriteHeaders(var s:tMemoryStream);
112 begin
113 if siNow then begin
114 s.WriteByte(opcode.tcdataimm);{opcode}
115 s.WriteByte(siMark);
116 end else if isTimeout=0 then begin
117 s.WriteByte(opcode.tcdata);{opcode}
118 s.WriteByte(mark);
119 end else begin
120 s.WriteByte(opcode.tcdataimm);{opcode}
121 s.WriteByte(simark);
122 end;
123 end;
125 procedure tTCS.Send(var s:tMemoryStream);
126 begin
127 ServerLoop.SendMessage(s.base^,s.length,remote);
128 if MarkData=0 then begin
129 MarkStart:=Now;
130 MarkData:=1;
131 end else MarkData:=MarkData+s.length;
132 txLastSize:=s.length;
133 siNow:=false;
134 end;
136 procedure tTCS.OnCont(msg:ServerLoop.tSMsg);
137 var rnow:tDateTime;
138 var RateFill:single;
139 var txRate:real;
140 var rxRate:real;
141 var rmark:byte;
142 var rrate:longword;
143 var opcode:byte;
144 begin
145 opcode:=msg.stream.ReadByte; {skip opcode}
146 rmark:=msg.stream.ReadByte;
147 assert(opcode=5);
148 rrate:=msg.stream.ReadWord(4);
149 if (rmark=Mark) then begin
150 rnow:=Now;
151 rxRate:=(rrate*64); {B/s}
152 txRate:=MarkData/((rnow-MarkStart)*SecsPerDay);
153 RateFill:=rxRate/txRate;
154 write('speed: ',(rxRate/1024):1:3,'kB/s (',(RateFill*100):3:1,'% of ',txRate/1024:1:3,'), ');
155 UnShedule(@Timeout);
156 Shedule(2000,@Timeout);
157 if RateFill<0.85 then begin
158 write('limit, ');
159 cur.Rate:=rxrate;
160 cur.RateIF:=minRateIF;
161 end else
162 if (txRate/cur.Rate)<0.7 then begin
163 write('3hard, ');
164 end else begin
165 write('pass, ');
166 cur.Rate:=txRate*(cur.RateIF+1);
167 if cur.Rate>limit.Rate then cur.Rate:=Limit.Rate
168 else cur.RateIF:=cur.RateIF*2;
169 if cur.RateIF>limit.RateIF then cur.RateIF:=Limit.RateIF;
170 end;
171 repeat mark:=Random(256) until (mark<>rMark);
172 MarkData:=0;
173 writeln('-> ',(Cur.Rate/1024):1:4,'kB/s if=',cur.RateIF:6:4);
174 if siWait then begin
175 cur.SizeIF:=cur.SizeIF/2;
176 end;
177 siMark:=0;
178 end end;
180 procedure tTCS.OnAck(msg:ServerLoop.tSMsg);
181 var rmark:byte;
182 var rsize:word;
183 var opcode:byte;
184 begin
185 opcode:=msg.stream.ReadByte; {skip opcode}
186 rmark:=msg.stream.ReadByte;
187 assert(opcode=7);
188 rsize:=msg.stream.ReadWord(2);
189 if rmark<>simark then exit;
190 if isTimeout>0 then begin
191 Shedule(80,@TransmitDelay);
192 isTimeout:=0;
193 end else
194 if rsize>cur.size then begin
195 writeln('size inc to ',rsize);
196 cur.SizeIF:=((rSize/cur.Size)-1)*2;
197 if cur.SizeIF>Limit.SizeIF then Cur.SizeIF:=Limit.SizeIF;
198 if (rsize/cur.rate)<=0.3 then cur.size:=rSize; {use new size for all transmit}
199 end;
200 if rsize>=cur.size then siWait:=false;
201 end;
203 procedure tTCS.Timeout;
204 begin
205 if txLastSize=0 then exit; {suspend}
206 cur:=initial;
207 mark:=Random(256); MarkData:=0;
208 siMark:=0;
209 Inc(isTimeout);
210 if (isTimeout>maxTimeout)and assigned(OnTimeout) then OnTimeout;
211 Shedule(80,@TransmitDelay);
212 Shedule(3000,@Timeout);
213 end;
215 procedure tTCS.TransmitDelay;
216 var txwait:real;
217 var burst:word;
218 begin
219 txLastSize:=0;
220 txwait:=0;
221 burst:=0;
222 if (siMark=0)and(cur.Size<limit.Size){and(random(10)=0)}and(istimeout=0) then begin
223 siNow:=true;
224 siWait:=true;
225 siMark:=random(255)+1;
226 end;
227 repeat
228 CanSend;
229 if txLastSize=0 then exit; {pause}
230 if (isTimeout>0) then exit; {no burst, no shedule next}
231 //txwait:=txwait+(txLastSize/cur.rate);
232 txwait:=(MarkData/cur.Rate)-((Now-MarkStart)*SecsPerDay);
233 inc(burst);
234 siNow:=false;
235 until (txwait>0.02)or(burst>200);
236 if txwait<0.02 then txwait:=0.01;
237 //writeln(txwait:1:3,burst);
238 Shedule(round(txwait*1000),@TransmitDelay);
239 end;
241 procedure tTCS.Done; {unregister all callbacks}
242 begin
243 UnShedule(@TransmitDelay);
244 UnShedule(@Timeout);
245 SetMsgHandler(5,remote,nil);
246 SetMsgHandler(7,remote,nil);
247 end;
249 BEGIN
250 END.