Writing and SegInfo to Store1.
[brdnet.git] / TC.pas
blob327adc43f5eb33e89380901927205c9225210b83
1 unit TC;
2 {TransmissionControll over UDP
3 some dg larger
4 if pass set payload to that
6 useful for file transfer, voip should only consult the current rate
7 and detect congestion based on latency
9 Used by UploadManager. 1 TC per peer.
11 Suspend: return from CanSend without sending :)
12 Resume: call start
14 opcodes:
15 data=4
16 mark:1;payload:XX
17 data-no-report=8
18 data-imm-ack=6
19 cont=5
20 mark:1;rate:Word4(shr 6)
21 ack=7
22 mark:1;len:Word2
24 INTERFACE
25 uses MemStream,NetAddr,ServerLoop,opcode;
27 type tTCSSe=record
28 Rate:Real; {sending rate}
29 Size:word; {datagram size}
30 RateIF:single; {rate increase fraction}
31 SizeIF:single; {size increase fraction}
32 {new:=old+old*IF}
33 end;
35 type tTCS=object {this is sender part}
36 {in order methods should be set/called}
37 procedure Init(const iremote:tNetAddr); {set defaults for vars}
38 public
39 remote:tNetAddr;
40 Mark:byte;
41 MarkStart:tMTime; {when the mark was started}
42 MarkData:LongWord; {how much data sent}
43 txLastSize:Word; {is zero if suspend}
44 siMark:byte;
45 siNow,siWait:boolean;
46 isTimeout,maxTimeout:word;
47 Cur:tTCSSe; {current values}
48 Limit:tTCSSe; {maximum alloved}
49 Initial:tTCSSe; {after start/timeout}
50 minRateIF:single; {used after rate decrease}
51 CanSend: procedure of object; {called when transmit possible}
52 OnTimeout: procedure of object;
53 procedure Start; {start the transmission}
54 function MaxSize(req:word):word;
55 procedure WriteHeaders(var s:tMemoryStream); {add headers before the data}
56 procedure Send(var s:tMemoryStream);
57 procedure Done; {unregister all callbacks}
58 private
59 {timer callbacks}
60 procedure TransmitDelay;
61 procedure Timeout;
62 procedure OnCont(msg:ServerLoop.tSMsg);
63 procedure OnAck(msg:ServerLoop.tSMsg);
64 end;
66 IMPLEMENTATION
68 procedure tTCS.Init(const iremote:tNetAddr);
69 begin
70 remote:=iremote;
71 SetMsgHandler(opcode.tccont,remote,@OnCont);
72 SetMsgHandler(opcode.tceack,remote,@OnAck);
73 Limit.Rate:=2*1024*1024*1024; {2GB}
74 Limit.Size:=4096;
75 Limit.RateIF:=4;
76 Limit.SizeIF:=2;
77 Initial.Rate:={20*}1024;
78 Initial.Size:=32+5;
79 Initial.RateIF:=0.5;
80 Initial.SizeIF:=2;
81 minRateIF:=0.01;
82 CanSend:=nil;
83 OnTimeout:=nil;
84 maxTimeout:=65535;
85 Cur:=Initial;
86 txLastSize:=0;
87 end;
89 procedure tTCS.Start; {start the transmission}
90 begin
91 Assert(assigned(CanSend) ); Assert(not remote.isnil);
92 assert(txLastSize=0);
93 mark:=Random(256); MarkData:=0;
94 siMark:=0;
95 isTimeout:=0;
96 Shedule(80,@TransmitDelay);
97 Shedule(3000,@Timeout);
98 end;
100 function tTCS.MaxSize(req:word):word;
101 begin
102 req:=req-2;{headers}
103 if siNow
104 then result:=round(cur.Size*(1+cur.SizeIF))
105 else result:=cur.Size;
106 dec(result,2);
107 if result>req then result:=req;
108 end;
110 procedure tTCS.WriteHeaders(var s:tMemoryStream);
111 begin
112 if siNow then begin
113 s.WriteByte(opcode.tcdataimm);{opcode}
114 s.WriteByte(siMark);
115 end else if isTimeout=0 then begin
116 s.WriteByte(opcode.tcdata);{opcode}
117 s.WriteByte(mark);
118 end else begin
119 s.WriteByte(opcode.tcdataimm);{opcode}
120 s.WriteByte(simark);
121 end;
122 end;
124 procedure tTCS.Send(var s:tMemoryStream);
125 begin
126 ServerLoop.SendMessage(s.base^,s.length,remote);
127 if MarkData=0 then begin
128 MarkStart:=mNow;
129 MarkData:=1;
130 end else MarkData:=MarkData+s.length;
131 txLastSize:=s.length;
132 siNow:=false;
133 end;
135 procedure tTCS.OnCont(msg:ServerLoop.tSMsg);
136 var RateFill:single;
137 var txRate:real;
138 var rxRate:real;
139 var rmark:byte;
140 var rrate:longword;
141 var opcode:byte;
142 begin
143 opcode:=msg.stream.ReadByte; {skip opcode}
144 rmark:=msg.stream.ReadByte;
145 assert(opcode=5);
146 rrate:=msg.stream.ReadWord(4);
147 if (rmark=Mark) then begin
148 rxRate:=(rrate*64); {B/s}
149 txRate:=MarkData/((mNow-MarkStart)/1000{*SecsPerDay});
150 RateFill:=rxRate/txRate;
151 write('speed: ',(rxRate/1024):1:3,'kB/s (',(RateFill*100):3:1,'% of ',txRate/1024:1:3,'), ');
152 UnShedule(@Timeout);
153 Shedule(2000,@Timeout);
154 if RateFill<0.85 then begin
155 write('limit, ');
156 cur.Rate:=rxrate;
157 cur.RateIF:=minRateIF;
158 end else
159 if (txRate/cur.Rate)<0.7 then begin
160 write('3hard, ');
161 end else begin
162 write('pass, ');
163 cur.Rate:=txRate*(cur.RateIF+1);
164 if cur.Rate>limit.Rate then cur.Rate:=Limit.Rate
165 else cur.RateIF:=cur.RateIF*2;
166 if cur.RateIF>limit.RateIF then cur.RateIF:=Limit.RateIF;
167 end;
168 repeat mark:=Random(256) until (mark<>rMark);
169 MarkData:=0;
170 writeln('-> ',(Cur.Rate/1024):1:4,'kB/s if=',cur.RateIF:6:4);
171 if siWait then begin
172 cur.SizeIF:=cur.SizeIF/2;
173 end;
174 siMark:=0;
175 end end;
177 procedure tTCS.OnAck(msg:ServerLoop.tSMsg);
178 var rmark:byte;
179 var rsize:word;
180 var opcode:byte;
181 begin
182 opcode:=msg.stream.ReadByte; {skip opcode}
183 rmark:=msg.stream.ReadByte;
184 assert(opcode=7);
185 rsize:=msg.stream.ReadWord(2);
186 if rmark<>simark then exit;
187 if isTimeout>0 then begin
188 Shedule(80,@TransmitDelay);
189 isTimeout:=0;
190 end else
191 if rsize>cur.size then begin
192 writeln('size inc to ',rsize);
193 cur.SizeIF:=((rSize/cur.Size)-1)*2;
194 if cur.SizeIF>Limit.SizeIF then Cur.SizeIF:=Limit.SizeIF;
195 if (rsize/cur.rate)<=0.3 then cur.size:=rSize; {use new size for all transmit}
196 end;
197 if rsize>=cur.size then siWait:=false;
198 end;
200 procedure tTCS.Timeout;
201 begin
202 if txLastSize=0 then exit; {suspend}
203 cur:=initial;
204 mark:=Random(256); MarkData:=0;
205 siMark:=0;
206 Inc(isTimeout);
207 if (isTimeout>maxTimeout)and assigned(OnTimeout) then OnTimeout;
208 Shedule(80,@TransmitDelay);
209 Shedule(3000,@Timeout);
210 end;
212 procedure tTCS.TransmitDelay;
213 var txwait:real;
214 var burst:word;
215 begin
216 txwait:=0;
217 burst:=0;
218 if (siMark=0)and(cur.Size<limit.Size){and(random(10)=0)}and(istimeout=0) then begin
219 siNow:=true;
220 siWait:=true;
221 siMark:=random(255)+1;
222 end;
223 repeat
224 txLastSize:=0;
225 CanSend;
226 if txLastSize=0 then exit; {pause}
227 if (isTimeout>0) then exit; {no burst, no shedule next}
228 //txwait:=txwait+(txLastSize/cur.rate);
229 txwait:=(MarkData/cur.Rate)-((mNow-MarkStart)/1000{*SecsPerDay});
230 inc(burst);
231 siNow:=false;
232 until (txwait>0.02)or(burst>200);
233 if txwait<0.02 then txwait:=0.01;
234 //writeln(txwait:1:3,burst);
235 Shedule(round(txwait*1000),@TransmitDelay);
236 end;
238 procedure tTCS.Done; {unregister all callbacks}
239 begin
240 UnShedule(@TransmitDelay);
241 UnShedule(@Timeout);
242 SetMsgHandler(5,remote,nil);
243 SetMsgHandler(7,remote,nil);
244 end;
246 BEGIN
247 END.