From 7c269bc98fe02e5488cba87628b46df320ce7567 Mon Sep 17 00:00:00 2001 From: =?utf8?q?Tom=C3=A1=C5=A1=20Brada?= Date: Wed, 21 Oct 2015 20:30:20 +0200 Subject: [PATCH] Download Aggr impl + test. --- Download.pas | 108 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++-- TestFS.pas | 12 ++++--- 2 files changed, 113 insertions(+), 7 deletions(-) diff --git a/Download.pas b/Download.pas index c49af4c..02e63a9 100644 --- a/Download.pas +++ b/Download.pas @@ -2,9 +2,8 @@ unit Download; {manage downloads} INTERFACE -{todo} - -IMPLEMENTATION +uses NetAddr, + ServerLoop,opcode,MemStream; { Same idea here as upmgr. Have an tAggr for each peer reporting speed and tJob for each file request saving to file and reqesting missed segments. @@ -15,7 +14,110 @@ IMPLEMENTATION not be forgotten. More advanced DM could consult CHK or Category the file was found. } +type + tJob_ptr=pointer;//^tJob; + tAggr=object + Rate:Real; + ByteCnt:LongWord; + DgrCnt:LongWord; + CurMark,PrvMark:byte; + StartT:tMTime; + Jobs: array [0..15] of tJob_ptr; + refs:byte; + ChanOfs:byte; + DgrCntCheck:LongWord; + remote:tNetAddr; + procedure Init(const src:tNetAddr); + procedure MsgDATA(sz:Word;mark:byte); + procedure MsgIMME(sz:Word;mark:byte); + procedure Recv(msg:tSMsg); + procedure Periodic; + procedure Done; + end; +IMPLEMENTATION + +procedure tAggr.Init(const src:tNetAddr); + begin + Rate:=0; + ByteCnt:=0; + DgrCnt:=0; + CurMark:=0;PrvMark:=0; + StartT:=mNow; + refs:=high(Jobs); while refs>0 do begin Jobs[refs]:=nil; dec(refs) end; + ChanOfs:=Random(255-high(Jobs)); + DgrCntCheck:=0; + Shedule(5000,@Periodic); + remote:=src; + SetMsgHandler(opcode.tcdata,src,@Recv); + SetMsgHandler(opcode.tcdataimm,src,@Recv); +end; + +procedure tAggr.Recv(msg:tSMsg); + var op:byte; + var mark:byte; + var chan:byte; + begin + op:=msg.stream.readbyte; + mark:=msg.stream.readbyte; + if op=opcode.tcdataimm then MsgIMME(msg.length,mark); + MsgDATA(msg.length,mark); + chan:=msg.stream.readbyte; + //delegate to others todo +end; + +procedure tAggr.MsgIMME(sz:Word; mark:byte); + var r:tMemoryStream; + var buf:array [1..4] of byte; + begin + r.Init(@buf,0,sizeof(buf)); + r.WriteByte(opcode.tceack); + r.WriteByte(mark); + r.WriteWord(sz,2); + SendMessage(r.base^,r.length,remote); +end; + +procedure tAggr.MsgDATA(sz:Word; mark:byte); + var r:tMemoryStream; + var rateb: DWord; {BytesPerSecond shr 6 (=64)} + var buf:array [1..6] of byte; + begin + if mark<>PrvMark then begin + if mark<>CurMark then begin + PrvMark:=CurMark; + CurMark:=mark; + StartT:=mNow; + ByteCnt:=1; + DgrCnt:=1; + end else begin Inc(ByteCnt,sz); Inc(DgrCnt) end; + inc(DgrCntCheck); + end; + if DgrCnt<8 then exit; + if (mnow-Startt)<400 then exit; + rate:=(ByteCnt/(mNow-StartT))*1000; + writeln('Rate: ',(rate/1024):7:1); + rateb:=round(rate/64); + StartT:=mNow; + ByteCnt:=1; + r.Init(@buf,0,sizeof(buf)); + r.WriteByte(opcode.tccont); + r.WriteByte(mark); + r.WriteWord(rateb,4); + SendMessage(r.base^,r.length,remote); +end; +procedure tAggr.Periodic; + begin + if DgrCntCheck>1 then begin + DgrCntCheck:=0; + Shedule(5000,@Periodic); + exit end; + writeln('Periodic check failed'); + //todo do +end; +procedure tAggr.Done; + begin + UnShedule(@Periodic); +end; END. \ No newline at end of file diff --git a/TestFS.pas b/TestFS.pas index 1fd846d..1674596 100644 --- a/TestFS.pas +++ b/TestFS.pas @@ -2,10 +2,11 @@ unit TestFS; INTERFACE IMPLEMENTATION -USES ServerLoop,Chat,SysUtils,MemStream,NetAddr,opcode; +USES ServerLoop,Chat,SysUtils,MemStream,NetAddr,opcode,Download; type t=object ch: tChat; + dw:Download.tAggr; //procedure UserInput procedure ST1(msg:tSMsg; data:boolean); procedure ST2(msg:tSMsg; data:boolean); @@ -31,6 +32,8 @@ procedure t.ST1(msg:tSMsg; data:boolean); r.skip(2); writeln('INFO size=',r.ReadWord(4),' final=',r.readbyte,' seg=',r.readword(4)); ch.Callback:=@ST2; + UnShedule(@HardTimeout); + halt(99); end else if op=upClose then writeln('CLOSE') else writeln('unknown'); end; @@ -61,9 +64,9 @@ procedure t.ST3(msg:tSMsg; data:boolean); write('TestFS: ST3 reply from FS: '); if data then writeln('unepected data') else begin writeln('ack'); - UnShedule(@HardTimeout); ch.DisposeHook:=@Rekt; ch.Close; + dw.Done; end; end; @@ -101,6 +104,7 @@ procedure init; new(o); with o^ do begin ch.Init(paramstr(oi+1)); ch.Callback:=@ST1; + dw.Init(ch.remote); Shedule(20000,@HardTimeout); ch.streaminit(s,33); s.WriteByte(opcode.upFileServer); @@ -111,8 +115,8 @@ procedure init; s.WriteWord(0,4); s.WriteWord($FFFFFFFF,4); ch.Send(s); - ServerLoop.SetMsgHandler(4,@IgnoreData); - ServerLoop.SetMsgHandler(6,@IgnoreData); + //ServerLoop.SetMsgHandler(4,@IgnoreData); + //ServerLoop.SetMsgHandler(6,@IgnoreData); end; end; end; -- 2.11.4.GIT