Convert TC to periodic ack. Change packet struct. No size probing and timeouts.
[brdnet.git] / TC.pas
blobba28473d5d25b31b9c5ba688346d55f843860874
1 unit TC;
2 {TransmissionControll over UDP
3 some dg larger
4 if pass set payload to that
6 useful for file transfer, voip should only consult the current rate
7 and detect congestion based on latency
9 opcodes:
10 data=4
11 mark:1;payload:XX
12 data-no-report=8
13 data-imm-ack=6
14 cont=5
15 mark:1;rate:Word4(shr 6)
16 ack=7
17 mark:1;len:Word2
19 INTERFACE
20 uses MemStream,NetAddr;
22 type tTCSSe=record
23 Rate:Real; {sending rate}
24 Size:word; {datagram size}
25 RateIF:single; {rate increase fraction}
26 SizeIF:single; {size increase fraction}
27 {new:=old+old*IF}
28 end;
30 type tTCS=object {this is sender part}
31 {in order methods should be set/called}
32 procedure Init; {set defaults for vars}
33 public
34 remote:tNetAddr;
35 Mark:byte;
36 MarkStart:tDateTime; {when the mark was started}
37 MarkData:LongWord; {how much data sent}
38 txLastSize:Word;
39 Cur:tTCSSe; {current values}
40 Limit:tTCSSe; {maximum alloved}
41 Initial:tTCSSe; {after start/timeout}
42 minRateIF:single; {used after rate decrease}
43 CanSend: procedure of object; {called when transmit possible}
44 procedure Start; {start the transmission}
45 function MaxSize(req:word):word;
46 procedure WriteHeaders(var s:tMemoryStream); {add headers before the data}
47 procedure Send(var s:tMemoryStream);
48 private
49 {timer callbacks}
50 procedure TransmitDelay;
51 //procedure TimeoutCont;
52 procedure OnCont(rmark:byte;rrate:real);
53 //procedure OnAck(rmark:byte;rsize:word);
54 procedure Done; {unregister all callbacks}
55 end;
57 procedure RegTxer(var t:tTCS);
58 procedure DelTxer(var t:tTCS);
60 IMPLEMENTATION
61 uses ServerLoop,SysUtils;
63 var Txers:array [0..31] of ^tTCS;
65 procedure RegTxer(var t:tTCS);
66 var tn:byte;
67 begin
68 for tn:=0 to high(TXERS) do if txers[tn]=nil then break;
69 assert(not assigned(txers[tn]));
70 txers[tn]:=@t;
71 end;
73 procedure DelTxer(var t:tTCS);
74 var tn:byte;
75 begin
76 tn:=0; while tn<=high(TXERS) do if txers[tn]=@t then break else inc(tn);
77 assert(tn<=high(TXERS));
78 t.Done;
79 txers[tn]:=nil;
80 end;
82 type tTCSp=^tTCS;
83 function GetTxer(const cource:tNetAddr):tTCSp;
84 var tn:byte;
85 begin
86 result:=nil;
87 tn:=0; while tn<=high(TXERS) do if txers[tn]^.remote=cource then break else inc(tn);
88 if tn<=high(TXERS) then result:=txers[tn];
89 end;
91 procedure RecvCont(msg:ServerLoop.tSMsg);
92 var t:^tTCS;
93 var rmark:byte;
94 var rrate:longword;
95 begin
96 t:=GetTxer(msg.source^);
97 if not assigned(t) then exit;
98 msg.stream.skip(1); {skip opcode}
99 rmark:=msg.stream.ReadByte;
100 rrate:=msg.stream.ReadWord(4);
101 t^.OnCont(rmark,rrate);
102 end;
104 procedure tTCS.Init;
105 begin
106 remote.clear;
107 //SizeIncScarcity:=20; {inverse probability of size experiment}
108 Limit.Rate:=2*1024*1024*1024; {2GB}
109 Limit.Size:=4096;
110 Limit.RateIF:=1;
111 Limit.SizeIF:=2;
112 Initial.Rate:=20*1024;
113 Initial.Size:={32+5}1024;
114 Initial.RateIF:=0.5;
115 Initial.SizeIF:=2;
116 minRateIF:=0.01;
117 CanSend:=nil;
118 end;
120 procedure tTCS.Start; {start the transmission}
121 begin
122 Assert(assigned(CanSend) ); Assert(not remote.isnil);
123 Cur:=Initial;
124 mark:=Random(256); MarkData:=0;
125 Shedule(80,@TransmitDelay);
126 end;
128 function tTCS.MaxSize(req:word):word;
129 begin
130 if req>cur.Size then MaxSize:=cur.Size else MaxSize:=req;
131 end;
133 procedure tTCS.WriteHeaders(var s:tMemoryStream);
134 begin
135 {if isTrySize then begin
136 end else begin}
137 s.WriteByte(4);{opcode}
138 s.WriteByte(mark);
139 {end;}
140 end;
142 procedure tTCS.Send(var s:tMemoryStream);
143 begin
144 ServerLoop.SendMessage(s.base^,s.length,remote);
145 if MarkData=0 then begin
146 MarkStart:=Now;
147 MarkData:=1;
148 end else MarkData:=MarkData+s.length;
149 txLastSize:=s.length;
150 end;
152 procedure tTCS.OnCont(rmark:byte;rrate:real);
153 var rnow:tDateTime;
154 var RateFill:single;
155 var txRate:real;
156 var rxRate:real;
157 begin
158 if (rmark=Mark) then begin
159 rnow:=Now;
160 rxRate:=(rrate*64); {B/s}
161 txRate:=MarkData/((rnow-MarkStart)*SecsPerDay);
162 RateFill:=rxRate/txRate;
163 write('speed: ',(rxRate/1024):1:3,'kB/s (',(RateFill*100):3:1,'% of ',txRate/1024:1:3,'), ');
164 if RateFill<0.85 then begin
165 write('limit, ');
166 cur.Rate:=rxrate;
167 cur.RateIF:=minRateIF;
168 end else
169 if (txRate/cur.Rate)<0.7 then begin
170 write('3hard, ');
171 end else begin
172 write('pass, ');
173 cur.Rate:=txRate*(cur.RateIF+1);
174 if cur.Rate>limit.Rate then cur.Rate:=Limit.Rate
175 else cur.RateIF:=cur.RateIF*2;
176 if cur.RateIF>limit.RateIF then cur.RateIF:=Limit.RateIF;
177 end;
178 repeat mark:=Random(256) until (mark<>rMark);
179 MarkData:=0;
180 writeln('-> ',(Cur.Rate/1024):1:4,'kB/s if=',cur.RateIF:6:4);
181 end;
183 if rmark=simark then begin
184 isTrySize:=false;
185 TrySize:=0;
186 if rsize>cur.size then begin
187 cur.SizeIF:=((rSize/cur.Size)-1)*2;
188 if cur.SizeIF>Limit.SizeIF then Cur.SizeIF:=Limit.SizeIF;
189 if (rsize/cur.rate)<=0.3 then cur.size:=rSize; {use new size for all transmit}
190 UnShedule(@TimeoutIncreaseSize);
191 end;
192 end;
194 end;
196 procedure tTCS.TransmitDelay;
197 var txwait:real;
198 var burst:word;
199 begin
200 txLastSize:=0;
201 txwait:=0;
202 burst:=0;
203 repeat
204 CanSend;
205 if txLastSize=0 then exit;{pause}
206 //txwait:=txwait+(txLastSize/cur.rate);
207 txwait:=(MarkData/cur.Rate)-((Now-MarkStart)*SecsPerDay);
208 inc(burst);
209 until (txwait>0.02)or(burst>200);
210 if txwait<0.02 then txwait:=0.01;
211 //writeln(txwait:1:3,burst);
212 Shedule(round(txwait*1000),@TransmitDelay);
213 end;
215 procedure tTCS.Done; {unregister all callbacks}
216 begin
217 UnShedule(@TransmitDelay);
218 end;
220 BEGIN
221 FillByte(txers,sizeof(txers),0); {make'em nil}
222 SetMsgHandler(5,@RecvCont);
223 //SetMsgHandler(7,@RecvCtrl);
224 END.