From ac8415b76cdabd324d99cc5fd03472bb69aea966 Mon Sep 17 00:00:00 2001 From: =?utf8?q?Tom=C3=A1=C5=A1=20Brada?= Date: Tue, 15 Sep 2015 09:59:44 +0200 Subject: [PATCH] Convert TC to periodic ack. Change packet struct. No size probing and timeouts. --- RecvTC.pas | 56 ++++-- TC.pas | 568 ++++++++++++++++++++++++------------------------------------- TestTC.pas | 10 +- 3 files changed, 273 insertions(+), 361 deletions(-) rewrite TC.pas (68%) diff --git a/RecvTC.pas b/RecvTC.pas index 570f453..3aa7899 100644 --- a/RecvTC.pas +++ b/RecvTC.pas @@ -12,23 +12,55 @@ procedure SC(fn:pointer; retval:cint); end; end; +var CurMark:byte=0; +var PrvMark:byte=0; +var StartT:tDateTime; +var Total:LongWord=0; +var DGcnt:LongWord=0; +{opcode 4=data, 8=data-no-report, 6=data-immediate-ack} +{opcode 5=cont, 7=ack} + procedure HandleMSG(sock:tSocket; var s:tMemoryStream; const from: tSockAddr); var opcode:byte; - var trid:word; - var sec:word; + var mark:byte; var sendbuf:array [1..128] of byte; var r:tMemoryStream; + var rateR:real; + var rate:DWord; {BytesPerSecond shr 6 (=64)} begin - opcode:=s.readbyte; - assert(opcode=6); - s.Read(trid,2); - s.Read(sec,2); r.Init(@sendbuf,0,128); - r.WriteByte(4); - trid:=22; //intel :) - r.Write(trid,2); - r.Write(sec,2); - r.WriteWord(s.length,2); + opcode:=s.ReadByte; + mark:=s.ReadByte; + case opcode of + 4:begin + if mark<>PrvMark then begin + if mark<>CurMark then begin + PrvMark:=CurMark; + CurMark:=mark; + StartT:=now; + Total:=1; + DgCnt:=1; + end else begin Inc(Total,s.length); Inc(DgCnt) end; + end; + end; + 8:; + 6:begin + r.WriteByte(7); + r.WriteByte(mark); + r.WriteWord(r.length,2); + SC(@fpsendto,fpsendto(s_inet,r.base,r.length,0,@from,sizeof(sockaddr_in))); + end; + end; + if DgCnt<8 then exit; + if (now-Startt)<(0.4/SecsPerDay) then exit; + rateR:=Total/((now-Startt)*SecsPerDay); + writeln('Rate: ',(rateR/1024):7:1); + rate:=round(rateR/64); + StartT:=now; + Total:=1; + r.WriteByte(5); + r.WriteByte(mark); + r.WriteWord(rate,4); SC(@fpsendto,fpsendto(s_inet,r.base,r.length,0,@from,sizeof(sockaddr_in))); end; @@ -37,7 +69,7 @@ procedure s_SetupInet; begin with bind_addr do begin family:=AF_INET; - port:=htons(3511); + port:=htons(3519); addr:=0; {any} s_inet:=fpSocket(family,SOCK_DGRAM,IPPROTO_UDP); SC(@fpSocket,s_inet); diff --git a/TC.pas b/TC.pas dissimilarity index 68% index d7853f3..ba28473 100644 --- a/TC.pas +++ b/TC.pas @@ -1,344 +1,224 @@ -unit TC; -{TransmissionControll over UDP -ack all packets -send at fixed rate -increase rate -decrease rate if ack rate does not increase - -some dg larger -if pass set payload to that - -trid: diff on each side, send with the other side id -pktypes: - data - trid:2;sec:2;payload:XX - cont (send next) - trid:2;sec:2;len:WordNetOrder - ctrl (feedback to sender) - trid; arbitrary data -sec: mark:1 unused:1 -} -INTERFACE -uses MemStream,NetAddr; - -type tTCSSe=record - Rate:Real; {sending rate} - Size:word; {datagram size} - RateIF:single; {rate increase fraction} - SizeIF:single; {size increase fraction} - {new:=old+old*IF} - end; - -type tTCS=object {this is sender part} - {in order methods should be set/called} - procedure Init; {set defaults for vars} - public - rid,lid:Word; - remote:tNetAddr; - mark,markc:byte; - isTimeout:boolean; - isCanSend:boolean; - paused:boolean; {if this is true, do not call start, call send directly (size in cur.size)} - rxRecTime:tDateTime; {last recent} - rxRecSize:word; - rxCumTime:tTime; {cumulative for current mark} - rxCumSize:longword; - txLastSize:word; {last recent} - txLastTime:tDateTime; {last recent} - txCumTime:tTime; {cumulative for current mark} - txCumSize:longword; - trySize:Word; {experimental size} - isTrySize:boolean; - SizeIncScarcity:Word; {inverse probability of size experiment} - siMark:byte; {size increase mark} - Cur:tTCSSe; {current values} - Limit:tTCSSe; {maximum alloved} - Initial:tTCSSe; {after start/timeout} - minRateIF:single; {used after rate decrease} - minSizeIF:single; {used after rate increase} - {statistic todo} - OnCtrl: procedure(var s:tMemoryStream) of object; - OnGTimeout: procedure of object; - CanSend: procedure(msize:word) of object; {called when transmit possible} - procedure Start; {start the transmission} - procedure WriteHeaders(var s:tMemoryStream); {add headers before the data, return payload size} - {add ur own data to stream, but max plsize bytes} - procedure Send(var s:tMemoryStream); - private - {timer callbacks} - procedure TransmitDelay; {delay sending packets immediatlely} - procedure AdjustSpeed; - procedure TimeoutIncreaseSize; - procedure OnCont(rmark:byte;rsize:word); {cont packet recved} - procedure Done; {unregister all callbacks} - end; - -procedure RegTxer(var t:tTCS); -procedure DelTxer(var t:tTCS); - -IMPLEMENTATION -uses ServerLoop,SysUtils; - -{register cont and ctrl opcodes and deliver them to senders} -var Txers:array [0..31] of ^tTCS; - -procedure RegTxer(var t:tTCS); - var tn:byte; - begin - //for tn:=0 to high(TXERS) do if txers[tn]=nil then break; - tn:=t.lid and high(txers); - assert(not assigned(txers[tn])); - txers[tn]:=@t; - t.lid:=(t.lid and (not high(txers))) or tn; {mask and set} -end; - -procedure DelTxer(var t:tTCS); - var tn:byte; - begin - t.Done; - tn:=t.lid and high(txers); - txers[tn]:=nil; -end; - -type tTCSp=^tTCS; -function GetTxer(lid:word):tTCSp; - var tn:byte; - begin - tn:=lid and high(txers); - result:=txers[tn]; - if assigned(result) and (result^.lid<>lid) then result:=nil; {drop mismatched} - {todo: check sender address match} -end; - -procedure RecvCtrl(msg:ServerLoop.tSMsg); - var t:^tTCS; - var lid:word; - begin - msg.stream.skip(1); {skip opcode} - msg.stream.Read(lid,2); {dont reorder bytes, lid is no number} - t:=GetTxer(lid); - if not assigned(t) then exit; - t^.OnCtrl(msg.stream); -end; - -procedure RecvCont(msg:ServerLoop.tSMsg); - var t:^tTCS; - var lid:word; - var rmark:byte; - var rsize:word; - begin - msg.stream.skip(1); {skip opcode} - msg.stream.Read(lid,2); {dont reorder bytes, lid is no number} - rmark:=msg.stream.ReadByte; - msg.stream.Skip(1); {skip unused sec} - rsize:=msg.stream.ReadWord(2); - t:=GetTxer(lid); - if not assigned(t) then exit; - t^.OnCont(rmark,rsize); -end; - -procedure tTCS.Init; - begin - lid:=Random(65535); - rid:=65535; - remote.clear; - SizeIncScarcity:=20; {inverse probability of size experiment} - Limit.Rate:=2*1024*1024*1024; {2GB} - isTimeout:=false; - Limit.Size:=4096; - Limit.RateIF:=4; - Limit.SizeIF:=3; - Initial.Rate:=256; - Initial.Size:=32+5; - Initial.RateIF:=10; - Initial.SizeIF:=2; - minRateIF:=0.01; - minSizeIF:=0.05; - paused:=false; - {statistic todo} - OnCtrl:=nil; - OnGTimeout:=nil; - CanSend:=nil; -end; -procedure tTCS.Start; {start the transmission} - begin - assert(rid<655350); - Assert(assigned(CanSend) ); - Assert(not remote.isnil); - Cur:=Initial; - markc:=0; - mark:=Random(256); - isTrySize:=false; - isCanSend:=false; - txLastSize:=0; - paused:=false; - Shedule(80,@TransmitDelay); - Shedule(2000,@AdjustSpeed); -end; - -procedure tTCS.WriteHeaders(var s:tMemoryStream); - begin - s.WriteByte(6); - s.Write(rid,2); - if isTrySize then begin - s.writebyte(siMark); - end else begin - s.WriteByte(mark); - end; - s.WriteByte(0); -end; - -procedure tTCS.Send(var s:tMemoryStream); - begin - if isTrySize then assert(s.length<=trySize) else assert(s.Length<=cur.size); - isTrySize:=false; - paused:=false; - isCanSend:=false; - ServerLoop.SendMessage(s.base^,s.length,remote); - if txLastSize=0 then begin - txCumTime:=0; - txCumSize:=0; - end else begin - txCumTime:=txCumTime+((Now-txLastTime)*SecsPerDay); - txCumSize:=txCumSize+txLastSize; - end; - txLastTime:=Now; - txLastSize:=s.length; -end; - -procedure tTCS.OnCont(rmark:byte;rsize:word); - var rnow:tDateTime; - var delta:real; - begin - if (rmark=mark)or((trySize>0)and(rmark=simark)) then begin - rnow:=Now; - inc(markc); - if markc=1 then begin - rxCumTime:=0; - rxCumSize:=0; {ignore this size since no info how long it sending} - if isTimeout then begin - isTimeout:=false; - Shedule(80,@TransmitDelay); - writeln('TIMEOUT RECOVERY'); - end; - end else begin - delta:=(rnow-rxRecTime)*SecsPerDay; - rxCumTime:=rxCumTime+delta; - rxCumSize:=rxCumSize+rsize; - //writeln('told size is ',rsize, 'delta ',round(delta*1000)); - end; - rxRecTime:=rnow; - rxRecSize:=rsize; - if (markc>200)or(rxCumSize>640000) then begin - UnShedule(@AdjustSpeed); {do not wait} - AdjustSpeed; {adjust now!} - end; - end; - if rmark=simark then begin - isTrySize:=false; - TrySize:=0; - if rsize>cur.size then begin - cur.SizeIF:=((rSize/cur.Size)-1)*2; - if cur.SizeIF>Limit.SizeIF then Cur.SizeIF:=Limit.SizeIF; - if (rsize/cur.rate)<=0.3 then cur.size:=rSize; {use new size for all transmit} - //writeln('New size ',cur.Size); - UnShedule(@TimeoutIncreaseSize); - end; - end; -end; - -procedure tTCS.AdjustSpeed; - var rxRate:real; - var RateFill:single; - var txRate:real; - begin - if isCanSend then begin paused:=true; exit end; {nothing to transmit, sleep forever} - if isTimeout then begin Start; exit end; - if markc>3 then begin {only proceed with enough data} - rxrate:=rxCumSize/rxCumTime; - if txCumTime>0.01 then txrate:=txCumSize/txCumTime - else txrate:=cur.Rate; - RateFill:=rxrate/txRate; - write('speed: ',(rxrate/1024):1:4,'kB/s @',(txRate/1024):1:4,'kB/s (',(RateFill*100):3:1,'%), '); - if RateFill<0.85 then begin - write('limit, '); - {we hit the limit} - cur.Rate:=rxrate; - cur.RateIF:=minRateIF; - {cur.Size:=round(cur.size-(cur.Size/4));} - //cur.SizeIF:=minSizeIF; - end else begin - write('pass, '); - {rates are ok} - cur.Rate:=txrate+(cur.Rate*cur.RateIF); - if cur.Rate>limit.Rate then cur.Rate:=Limit.Rate; - cur.RateIF:=cur.RateIF*2; - if cur.RateIF>limit.RateIF then cur.RateIF:=Limit.RateIF; - repeat mark:=Random(256) until mark<>siMark; - end; - end else begin - {this is timeout! reset to safe rates} - write('timeout, '); - Cur:=Initial; - isTimeout:=true; - end; - //writeln('txwait ',((cur.size/cur.rate)*1000):1:1); - markc:=0; - txLastSize:=0; - writeln('adjust to ',(Cur.Rate/1024):1:4,'kB/s mark', mark, ' size=',cur.Size); - (*txLastSize:=0;*) - Shedule(1600,@AdjustSpeed); -end; - -procedure tTCS.TransmitDelay; - var txwait:real; - var burst:byte; - begin - isCanSend:=true; - if (not isTimeout)and(TrySize=0)and(Random(SizeIncScarcity)=0)and(cur.SizeMark; - isTrySize:=true; - trySize:=round(cur.Size+(cur.Size*cur.SizeIF)); - if trySize>Limit.Size then trySize:=Limit.Size; - //writeln('Try size ',trySize); - CanSend(trySize-5); - txwait:=((txLastSize/cur.rate)*1000); - Shedule(round(txwait),@TransmitDelay); - if not isCanSend then Shedule(2500,@TimeoutIncreaseSize) - end else begin - txwait:=0; - burst:=0; - repeat - CanSend(Cur.Size-5); - txwait:=txwait+((txLastSize/cur.rate)*1000); - if isTrySize then break; - if isTimeout then exit; - inc(burst); - until (txwait>20)or(burst>200); - //writeln('Burst ',burst); - Shedule(round(txwait),@TransmitDelay); - end; -end; - -procedure tTCS.TimeoutIncreaseSize; - begin - isTrySize:=false; - //writeln('Size Inc timeout'); - cur.SizeIF:=cur.SizeIF/8; - {make sure we increase at least by 2 bytes} - if (cur.SizeIF*cur.Size)<1 then cur.SizeIF:=1/cur.Size; - trySize:=0; -end; - -procedure tTCS.Done; {unregister all callbacks} - begin - UnShedule(@AdjustSpeed); - UnShedule(@TransmitDelay); - UnShedule(@TimeoutIncreaseSize); -end; - -BEGIN - FillByte(txers,sizeof(txers),0); {make'em nil} - SetMsgHandler(4,@RecvCont); - SetMsgHandler(5,@RecvCtrl); -END. +unit TC; +{TransmissionControll over UDP +some dg larger +if pass set payload to that + +useful for file transfer, voip should only consult the current rate +and detect congestion based on latency + +opcodes: + data=4 + mark:1;payload:XX + data-no-report=8 + data-imm-ack=6 + cont=5 + mark:1;rate:Word4(shr 6) + ack=7 + mark:1;len:Word2 +} +INTERFACE +uses MemStream,NetAddr; + +type tTCSSe=record + Rate:Real; {sending rate} + Size:word; {datagram size} + RateIF:single; {rate increase fraction} + SizeIF:single; {size increase fraction} + {new:=old+old*IF} + end; + +type tTCS=object {this is sender part} + {in order methods should be set/called} + procedure Init; {set defaults for vars} + public + remote:tNetAddr; + Mark:byte; + MarkStart:tDateTime; {when the mark was started} + MarkData:LongWord; {how much data sent} + txLastSize:Word; + Cur:tTCSSe; {current values} + Limit:tTCSSe; {maximum alloved} + Initial:tTCSSe; {after start/timeout} + minRateIF:single; {used after rate decrease} + CanSend: procedure of object; {called when transmit possible} + procedure Start; {start the transmission} + function MaxSize(req:word):word; + procedure WriteHeaders(var s:tMemoryStream); {add headers before the data} + procedure Send(var s:tMemoryStream); + private + {timer callbacks} + procedure TransmitDelay; + //procedure TimeoutCont; + procedure OnCont(rmark:byte;rrate:real); + //procedure OnAck(rmark:byte;rsize:word); + procedure Done; {unregister all callbacks} + end; + +procedure RegTxer(var t:tTCS); +procedure DelTxer(var t:tTCS); + +IMPLEMENTATION +uses ServerLoop,SysUtils; + +var Txers:array [0..31] of ^tTCS; + +procedure RegTxer(var t:tTCS); + var tn:byte; + begin + for tn:=0 to high(TXERS) do if txers[tn]=nil then break; + assert(not assigned(txers[tn])); + txers[tn]:=@t; +end; + +procedure DelTxer(var t:tTCS); + var tn:byte; + begin + tn:=0; while tn<=high(TXERS) do if txers[tn]=@t then break else inc(tn); + assert(tn<=high(TXERS)); + t.Done; + txers[tn]:=nil; +end; + +type tTCSp=^tTCS; +function GetTxer(const cource:tNetAddr):tTCSp; + var tn:byte; + begin + result:=nil; + tn:=0; while tn<=high(TXERS) do if txers[tn]^.remote=cource then break else inc(tn); + if tn<=high(TXERS) then result:=txers[tn]; +end; + +procedure RecvCont(msg:ServerLoop.tSMsg); + var t:^tTCS; + var rmark:byte; + var rrate:longword; + begin + t:=GetTxer(msg.source^); + if not assigned(t) then exit; + msg.stream.skip(1); {skip opcode} + rmark:=msg.stream.ReadByte; + rrate:=msg.stream.ReadWord(4); + t^.OnCont(rmark,rrate); +end; + +procedure tTCS.Init; + begin + remote.clear; + //SizeIncScarcity:=20; {inverse probability of size experiment} + Limit.Rate:=2*1024*1024*1024; {2GB} + Limit.Size:=4096; + Limit.RateIF:=1; + Limit.SizeIF:=2; + Initial.Rate:=20*1024; + Initial.Size:={32+5}1024; + Initial.RateIF:=0.5; + Initial.SizeIF:=2; + minRateIF:=0.01; + CanSend:=nil; +end; + +procedure tTCS.Start; {start the transmission} + begin + Assert(assigned(CanSend) ); Assert(not remote.isnil); + Cur:=Initial; + mark:=Random(256); MarkData:=0; + Shedule(80,@TransmitDelay); +end; + +function tTCS.MaxSize(req:word):word; + begin + if req>cur.Size then MaxSize:=cur.Size else MaxSize:=req; +end; + +procedure tTCS.WriteHeaders(var s:tMemoryStream); + begin + {if isTrySize then begin + end else begin} + s.WriteByte(4);{opcode} + s.WriteByte(mark); + {end;} +end; + +procedure tTCS.Send(var s:tMemoryStream); + begin + ServerLoop.SendMessage(s.base^,s.length,remote); + if MarkData=0 then begin + MarkStart:=Now; + MarkData:=1; + end else MarkData:=MarkData+s.length; + txLastSize:=s.length; +end; + +procedure tTCS.OnCont(rmark:byte;rrate:real); + var rnow:tDateTime; + var RateFill:single; + var txRate:real; + var rxRate:real; + begin + if (rmark=Mark) then begin + rnow:=Now; + rxRate:=(rrate*64); {B/s} + txRate:=MarkData/((rnow-MarkStart)*SecsPerDay); + RateFill:=rxRate/txRate; + write('speed: ',(rxRate/1024):1:3,'kB/s (',(RateFill*100):3:1,'% of ',txRate/1024:1:3,'), '); + if RateFill<0.85 then begin + write('limit, '); + cur.Rate:=rxrate; + cur.RateIF:=minRateIF; + end else + if (txRate/cur.Rate)<0.7 then begin + write('3hard, '); + end else begin + write('pass, '); + cur.Rate:=txRate*(cur.RateIF+1); + if cur.Rate>limit.Rate then cur.Rate:=Limit.Rate + else cur.RateIF:=cur.RateIF*2; + if cur.RateIF>limit.RateIF then cur.RateIF:=Limit.RateIF; + end; + repeat mark:=Random(256) until (mark<>rMark); + MarkData:=0; + writeln('-> ',(Cur.Rate/1024):1:4,'kB/s if=',cur.RateIF:6:4); + end; + (* + if rmark=simark then begin + isTrySize:=false; + TrySize:=0; + if rsize>cur.size then begin + cur.SizeIF:=((rSize/cur.Size)-1)*2; + if cur.SizeIF>Limit.SizeIF then Cur.SizeIF:=Limit.SizeIF; + if (rsize/cur.rate)<=0.3 then cur.size:=rSize; {use new size for all transmit} + UnShedule(@TimeoutIncreaseSize); + end; + end; + *) +end; + +procedure tTCS.TransmitDelay; + var txwait:real; + var burst:word; + begin + txLastSize:=0; + txwait:=0; + burst:=0; + repeat + CanSend; + if txLastSize=0 then exit;{pause} + //txwait:=txwait+(txLastSize/cur.rate); + txwait:=(MarkData/cur.Rate)-((Now-MarkStart)*SecsPerDay); + inc(burst); + until (txwait>0.02)or(burst>200); + if txwait<0.02 then txwait:=0.01; + //writeln(txwait:1:3,burst); + Shedule(round(txwait*1000),@TransmitDelay); +end; + +procedure tTCS.Done; {unregister all callbacks} + begin + UnShedule(@TransmitDelay); +end; + +BEGIN + FillByte(txers,sizeof(txers),0); {make'em nil} + SetMsgHandler(5,@RecvCont); + //SetMsgHandler(7,@RecvCtrl); +END. diff --git a/TestTC.pas b/TestTC.pas index 641d2b1..7a12f87 100644 --- a/TestTC.pas +++ b/TestTC.pas @@ -9,14 +9,16 @@ type t=object tcs:TC.tTCS; cnt:byte; buf: array [1..4096] of char; - procedure CanSend(size:word); + procedure CanSend; procedure Init; end; -procedure t.CanSend(size:word); +procedure t.CanSend; var s:tMemoryStream; + var size:word; begin s.Init(@buf,0,4096); + size:=tcs.MaxSize(4096); tcs.WriteHeaders(s); if size>s.size then size:=s.size; s.Skip(size-1); @@ -28,9 +30,7 @@ procedure t.Init; begin cnt:=0; tcs.Init; - tcs.rid:=42; - tcs.lid:=22; - tcs.Remote.FromString('//ip4/192.168.1.47/3511'); + tcs.Remote.FromString('//ip4/192.168.1.49/3519'); tcs.CanSend:=@CanSend; TC.RegTXer(tcs); tcs.Start; -- 2.11.4.GIT