From 45acbdff25ea5959f92e61c05b218874c1f9af81 Mon Sep 17 00:00:00 2001 From: =?utf8?q?Tom=C3=A1=C5=A1=20Brada?= Date: Mon, 30 Nov 2015 11:58:00 +0100 Subject: [PATCH] File Sharing download/upload units impl --- Download.pas | 707 +++++++++++++++++++++++++++---------------------------- DownloadTC.pas | 158 +++++++++++++ Upload.pas | 706 +++++++++++++++++++++++++++++++++--------------------- UploadTC.pas | 100 ++++++++ UploadThread.pas | 148 ++++++++++++ brodnetd.pas | 2 + 6 files changed, 1197 insertions(+), 624 deletions(-) rewrite Download.pas (62%) create mode 100644 DownloadTC.pas rewrite Upload.pas (69%) create mode 100644 UploadTC.pas create mode 100644 UploadThread.pas diff --git a/Download.pas b/Download.pas dissimilarity index 62% index 721a824..b49d251 100644 --- a/Download.pas +++ b/Download.pas @@ -1,359 +1,348 @@ -unit Download; -{manage downloads} - -INTERFACE -uses NetAddr, - ServerLoop,opcode,MemStream - ,Store1 - ,Chat - ; -{ - Same idea here as upmgr. Have an tAggr for each peer reporting speed and - tJob for each file request saving to file and reqesting missed segments. - The aggr should have limit of paralele jobs. Jobs will first be linked in - tAggr queue and then started as slots become available. - - After node restart, notify requester with a No-Source error. But should - not be forgotten. More advanced DM could consult CHK or Category the file - was found. -} -type - pDownloadJob=^tDownloadJob; - tDownloadJob=object - total,done:LongWord; - missc:LongWord; - state:(stStop,stActive,stDone,stError); - error:byte; - error2:byte; - fid:tFID; - procedure Start; - procedure Abort; - end; -function GetJob(const fid:tFID):pDownloadJob; -function NewJob(const source:tNetAddr; const fid: tFID):pDownloadJob; - -IMPLEMENTATION -type - tJob=object(tDownloadJob) - aggr:pointer; - ix:byte; - so:tStoreObjectInfo; - ch:tChat; - rofs,rlen:LongWord;{b:l of to be requested block} - procedure Init(const source:tNetAddr; const ifid: tFID); - procedure MsgDATA(base,length:LongWord; data:pointer); - procedure Start; - procedure StartTransfer(preamble:boolean); - procedure Abort; - procedure ReplyGET(msg:tSMsg; data:boolean); - procedure ReplyClose(msg:tSMsg; data:boolean); - end; - tAggr_ptr=^tAggr; - tAggr=object - Rate:Real; - ByteCnt:LongWord; - DgrCnt:LongWord; - CurMark,PrvMark:byte; - StartT:tMTime; - Jobs: array [0..15] of ^tJob; - refs:byte; - ChanOfs:byte; - DgrCntCheck:LongWord; - remote:tNetAddr; - next:tAggr_ptr; - procedure Init(const src:tNetAddr); - procedure MsgDATA(sz:Word;mark:byte); - procedure MsgIMME(sz:Word;mark:byte); - procedure Recv(msg:tSMsg); - procedure Periodic; - procedure Done; - end; -var AggrChain:^tAggr; - -function GetAggr(const remote:tNetAddr):tAggr_ptr; - var a:^tAggr; - var p:^pointer; - begin - p:=@AggrChain; - a:=AggrChain; - while assigned(a) do begin - if a^.remote=remote then begin - GetAggr:=a; - p^:=a^.next; - a^.next:=AggrChain; - AggrChain:=a^.next; - exit; - end; - end; - GetAggr:=nil; -end; -function GetJob(const fid:tFID):pDownloadJob; - var a:^tAggr; - var i:byte; - var p:^pointer; - begin - p:=@AggrChain; - a:=AggrChain; - while assigned(a) do begin - for i:=0 to high(tAggr.Jobs) do begin - if CompareWord(a^.Jobs[i],fid,10)=0 then begin - GetJob:=a^.Jobs[i]; - assert(a^.Jobs[i]^.ix=i); - assert(a^.Jobs[i]^.aggr=a); - p^:=a^.next; - a^.next:=AggrChain; - AggrChain:=a^.next; - exit; - end; - end; - end; - GetJob:=nil; -end; - -function NewJob(const source:tNetAddr; const fid: tFID):pDownloadJob; - begin - result:=GetJob(fid); - if assigned(result) then exit; - result:=GetMem(sizeof(tJob)); - tJob(pointer(result)^).init(source,fid); -end; - -procedure tJob.Init(const source:tNetAddr; const ifid: tFID); - var dw:^tAggr; - begin - error:=0; - error2:=0; - fid:=ifid; - so.Open(fid); - done:=0; - missc:=0; - if so.final then begin - done:=total; - state:=stDone; - rlen:=0; - exit end; - state:=stStop{,stActive,stDone,stError)}; - if so.rc=0 then begin - writeln('Download: resuming'); - total:=so.Length; - so.GetMiss(rofs,rlen); - end else begin - writeln('Download: start from zero'); - total:=0; - rofs:=0; - rlen:=8192; - end; - so.EnableWrite(fid); - if so.rc<>0 then begin - state:=stError; - error:=253; - error2:=so.rc; - exit end; - dw:=GetAggr(source); - if not assigned(aggr) then begin - new(dw); - dw^.Init(source); - ix:=0; - end else begin - ix:=0; while (ix0))or((state=stDone)and(rlen=0)) ); -end; -procedure tJob.Start; - begin - assert( (state=stStop)and(rlen>0) ); - StartTransfer(true); - //Shedule(20000,@HardTimeout); - state:={stStop,}stActive{,stDone,stError}; -end; -procedure tJob.StartTransfer(preamble:boolean); - var s:tMemoryStream; - begin - assert( rlen>0 ); - ch.Callback:=@ReplyGET; - ch.streaminit(s,33); - if preamble then begin - {service}s.WriteByte(opcode.upFileServer); - {channel}s.WriteByte(ix); - {opcode }s.WriteByte(opcode.upGET); - {file }s.Write(fid,20); - end else - {opcode }s.WriteByte(opcode.upSEG); - {basehi }s.WriteWord(0,2); - {base }s.WriteWord(rofs,4); - {limit }s.WriteWord(rlen,4); - ch.Send(s); -end; -procedure tJob.ReplyGET(msg:tSMsg; data:boolean); - var r:tMemoryStream absolute msg.stream; - var op:byte; - var rsize,rseg:LongWord; - var rfinal:byte; - begin - {reply from GET request} - write('Download: ReplyGET: '); - if not data then begin - writeln('ack'); - end else begin - ch.Ack; - op:=msg.stream.ReadByte; - if op=upFAIL then begin - state:=stError; - try - error:=r.ReadByte; - error2:=r.ReadByte; - except end; - writeln('FAIL ',error,'-',error2); - end - else if op=upINFO then begin - {rsizehi}r.skip(2); - rsize :=r.ReadWord(4); - rfinal :=r.readbyte; - rseg :=r.readword(4); - writeln('INFO size=',rsize,' final=',rfinal,' seg=',rseg); - if (rsize<>so.length) then writeln('Download: length mismatch ',so.length,'->',rsize); - total:=rsize; - so.SetFLength(total); - //UnShedule(@HardTimeout); - end else if op=opcode.upDONE then begin - writeln('DONE'); - assert(so.Length>0); - so.GetMiss(rofs,rlen); - if rlen=0 then begin - state:=stDone; - writeln('Download: completed'); - end else StartTransfer(false); - end else begin - if op=upClose then writeln('CLOSE') else writeln('unknown'); - state:=stError; - error:=254; - error2:=op; - end; - end; -end; - -procedure tJob.Abort; - var s:tMemoryStream; - begin - assert(state=stActive); - ch.Callback:=@ReplyClose; - //Shedule(20000,@HardTimeout); - ch.streaminit(s,2); - {opcode }s.WriteByte(opcode.upClose); - ch.Send(s); - state:=stError; - error:=255; -end; -procedure tJob.ReplyClose(msg:tSMsg; data:boolean); - begin - writeln('Download: ReplyClose'); -end; - -procedure tJob.MsgDATA(base,length:LongWord; data:pointer); - begin - so.WriteSeg(base,length,data); - done:=done+length; -end; - -procedure tAggr.Init(const src:tNetAddr); - begin - Rate:=0; - ByteCnt:=0; - DgrCnt:=0; - CurMark:=0;PrvMark:=0; - StartT:=mNow; - refs:=high(Jobs); while refs>0 do begin Jobs[refs]:=nil; dec(refs) end; - ChanOfs:=Random(255-high(Jobs)); - DgrCntCheck:=0; - Shedule(5000,@Periodic); - remote:=src; - SetMsgHandler(opcode.tcdata,src,@Recv); - SetMsgHandler(opcode.tcdataimm,src,@Recv); -end; - -procedure tAggr.Recv(msg:tSMsg); - var op:byte; - var chan:byte; - var mark:byte; - var base:DWORD; - begin - op:=msg.stream.readbyte; - mark:=msg.stream.readbyte; - if op=opcode.tcdataimm then MsgIMME(msg.length,mark); - MsgDATA(msg.length,mark); - chan:=msg.stream.readbyte; - base:=msg.stream.ReadWord(4); - if (chan<=high(Jobs))and assigned(Jobs[chan]) then Jobs[chan]^.MsgDATA(base,msg.stream.RDBufLen,msg.stream.RDBuf); -end; - -procedure tAggr.MsgIMME(sz:Word; mark:byte); - var r:tMemoryStream; - var buf:array [1..4] of byte; - begin - r.Init(@buf,0,sizeof(buf)); - r.WriteByte(opcode.tceack); - r.WriteByte(mark); - r.WriteWord(sz,2); - SendMessage(r.base^,r.length,remote); -end; - -procedure tAggr.MsgDATA(sz:Word; mark:byte); - var r:tMemoryStream; - var rateb: DWord; {BytesPerSecond shr 6 (=64)} - var buf:array [1..6] of byte; - var delta:tMTime; - begin - if mark<>PrvMark then begin - if mark<>CurMark then begin - PrvMark:=CurMark; - CurMark:=mark; - StartT:=mNow; - ByteCnt:=1; - DgrCnt:=1; - end else begin Inc(ByteCnt,sz); Inc(DgrCnt) end; - inc(DgrCntCheck); - end; - if DgrCnt<8 then exit; - delta:=(mNow-StartT){*MSecsPerDay}; - if delta<400 then exit; - rate:=(ByteCnt/delta)*1000; - writeln('Download: rate ',(rate/1024):7:1, 'kB/s'); - rateb:=round((rate)/64); - StartT:=mNow; - ByteCnt:=1; - r.Init(@buf,0,sizeof(buf)); - r.WriteByte(opcode.tccont); - r.WriteByte(mark); - r.WriteWord(rateb,4); - SendMessage(r.base^,r.length,remote); -end; - -procedure tAggr.Periodic; - begin - if DgrCntCheck>1 then begin - DgrCntCheck:=0; - Shedule(5000,@Periodic); - exit end; - writeln('Download: Periodic check failed, unimplemented!'); - //todo do -end; - -procedure tAggr.Done; - begin - UnShedule(@Periodic); -end; - -procedure tDownloadJob.Start; - begin - tJob(pointer(@self)^).Start; -end; -procedure tDownloadJob.Abort; - begin - tJob(pointer(@self)^).Abort; -end; - -END. \ No newline at end of file +unit Download; +{manage downloads} + +INTERFACE +uses NetAddr, + ServerLoop,opcode,MemStream + ,Store1 + ,Chat + ,Sha1 + ; +{ + Same idea here as upmgr. Have an tAggr for each peer reporting speed and + tJob for each file request saving to file and reqesting missed segments. + The aggr should have limit of paralele jobs. Jobs will first be linked in + tAggr queue and then started as slots become available. + + After node restart, notify requester with a No-Source error. But should + not be forgotten. More advanced DM could consult CHK or Category the file + was found. +} +type + pDownloadJob=^tDownloadJob; + tDownloadJob=object + total,done:LongWord; + missc:LongWord; + state:(stStop,stActive,stDone,stError,stLocalError); + error:byte; + error2:byte; + fid:tFID; + procedure Start; + procedure Free; + procedure Abort; + protected + refc:byte; + end; +function GetJob(const fid:tFID):pDownloadJob; +function NewJob(const source:tNetAddr; const fid: tFID):pDownloadJob; + +IMPLEMENTATION +{TODO: cache for chats} +type + tAggr_ptr=^tAggr; + tJob=object(tDownloadJob) + aggr:tAggr_ptr; + ix:byte; + so:tStoreObjectInfo; + ch:^tChat; + active:boolean; + procedure Init(const source:tNetAddr; const ifid: tFID); + procedure MsgDATA(base,length:LongWord; data:pointer); + procedure Start; + procedure Free; + procedure Abort; + procedure Close; + private + HighestRequestBase, RemoteSkipTo :LongWord; + procedure ReplyOPEN(msg:tSMsg; data:boolean); + procedure ReplyLSEG(msg:tSMsg; data:boolean); + procedure HandleFAIL(r:tMemoryStream); + procedure HandleEPROTO(r:tMemoryStream); + procedure ReplyDONE(msg:tSMsg; data:boolean); + procedure MakeRequest; + end; +{$I DownloadTC.pas} + +function GetJob(const fid:tFID):pDownloadJob; + var a:^tAggr; + var i:byte; + var p:^pointer; + begin + p:=@AggrChain; + a:=AggrChain; + while assigned(a) do begin + for i:=0 to high(tAggr.Jobs) do if assigned(a^.Jobs[i]) then begin + if CompareWord(a^.Jobs[i],fid,10)=0 then begin + GetJob:=a^.Jobs[i]; + Inc(a^.refs); + assert(a^.Jobs[i]^.ix=i); + assert(a^.Jobs[i]^.aggr=a); + exit; + end else break{for}; + end; + p:=@a^.next; a:=p^; + end; + GetJob:=nil; +end; + +function NewJob(const source:tNetAddr; const fid: tFID):pDownloadJob; + begin + result:=GetJob(fid); + if assigned(result) then exit; + result:=GetMem(sizeof(tJob)); + tJob(pointer(result)^).init(source,fid); +end; + +procedure tJob.Init(const source:tNetAddr; const ifid: tFID); + var dw:^tAggr; + begin + refc:=1; + error:=0; + error2:=0; + fid:=ifid; + active:=false; + so.Open(fid); + done:=0; + missc:=0; + if so.final then begin + done:=total; + state:=stDone; + aggr:=nil; + so.Close; + exit end; + state:=stStop; + {todo: initialize Done} + so.EnableWrite(fid); + if so.rc<>0 then begin + state:=stLocalError; + error:=255; + error2:=so.rc; + exit end; + dw:=GetAggr(source); + if not assigned(aggr) then begin + new(dw); + dw^.Init(source); + ix:=0; + end else begin + ix:=0; while (ix0))or((state=stDone)and(rlen=0)) ); +end; + +procedure tJob.Start; + var s:tMemoryStream; + begin + writeln('Download: job start'); + assert( state=stStop ); + state:=stActive; + ch^.Callback:=@ReplyOPEN; + ch^.streaminit(s,33); + {service}s.WriteByte(opcode.upFileServer); + {channel}s.WriteByte(ix); + {opcode }s.WriteByte(opcode.upOPEN); + {file }s.Write(fid,20); + {todo: request first segment} + ch^.Send(s); +end; +procedure tJob.ReplyOPEN(msg:tSMsg; data:boolean); + var r:tMemoryStream absolute msg.stream; + var op:byte; + var rsize:LongWord; + var rfinal:byte; + begin + if not data then exit; + op:=msg.stream.ReadByte; + {valid responses: INFO FAIL EPROTO} + if op=upFAIL then HandleFAIL(r) + else if op=upINFO then begin + rsize :=r.ReadWord(4); + rfinal :=r.readbyte; + writeln('INFO size=',rsize,' final=',rfinal); + self.total:=rsize; + if so.length<>rsize then begin + if so.length>0 then writeln('Download: warning: size mismatch!'); + so.SetFLength(rsize); + end; + MakeRequest; + end + else HandleEPROTO(r); +end; +{Strategy for fixing holes without waiting for DONE message: + * stuff all small segments to LSEG + * put large one at end + * when datagrams from the large arrive, Repeat +} + +procedure tJob.MakeRequest; + var s:tMemoryStream; + var b,l,trl:LongWord; + var cnt:byte; + var mst:Pointer; + const clim=83886080; + begin + write('Download: job MakeRequest'); + mst:=nil; + trl:=0; + cnt:=0; + ch^.Callback:=@ReplyLSEG; + ch^.streaminit(s,180); + s.WriteByte(upLSEG); + repeat + {todo: skipto} + so.GetMiss(b,l,mst); + if l=0 then break; + if (trl+l)>clim then l:=clim-trl; + write(' ',b,'+',l); + s.WriteWord(b,4); + s.WriteWord(l,4); + inc(trl,l); + inc(cnt); + until (s.WrBufLen<8)or(trl>=clim); + writeln(' for ',trl,'B in ',cnt,' ',s.Length); + if trl=0 then begin + state:=stDone; + writeln('Verifu!!!!!'); + so.VerifyAndReset; + if not so.final then begin + state:=stLocalError; + error:=254; + writeln('Verifu Faialed!!!!!'); + end; + Aggr^.Stop(ix); + Close; + exit end; + ch^.Send(s); + if cnt>1 then HighestRequestBase:=b else HighestRequestBase:=$FFFFFFFF; +end; +procedure tJob.ReplyLSEG(msg:tSMsg; data:boolean); + var r:tMemoryStream absolute msg.stream; + var op:byte; + var avail:LongWord; + begin + if not data then exit; + op:=msg.stream.ReadByte; + {valid responses: SEGOK UNAVL FAIL} + if op=upFAIL then HandleFAIL(r) + else if op=upUNAVL then begin + avail:=msg.stream.ReadWord(4); + writeln('Download: job ReplyLSEG: UNAVL avail=',avail); + if avail=0 then begin + state:=stLocalError; + error:=253; + Close; + end else begin + RemoteSkipTo:=avail; + MakeRequest; + end; + end + else if op=upSEGOK then begin + avail:=msg.stream.ReadWord(4); + MissC:=avail; + writeln('Download: job ReplyLSEG: SEGOK avail=',avail); + aggr^.Start(ix); + ch^.Callback:=@ReplyDONE; + ch^.Ack; + end + else if op=upDONE then begin + end {ignore, done is sent async so it can hang in-flight a bit} + else HandleEPROTO(r); +end; +procedure tJob.ReplyDONE(msg:tSMsg; data:boolean); + var r:tMemoryStream absolute msg.stream; + var op:byte; + begin + if not data then exit; + op:=msg.stream.ReadByte; + {valid responses: DONE} + if op=upDONE then begin + writeln('Download: ReplyDONE: DONE, miss=',MissC); + MakeRequest; + end else HandleEPROTO(r); +end; +procedure tJob.HandleFAIL(r:tMemoryStream); + begin + writeln('Download: FAIL'); + state:=stError; + try + error:=r.readByte; + error2:=r.readByte; + except end; + Close; +end; +procedure tJob.HandleEPROTO(r:tMemoryStream); + begin + r.Seek(r.position-1); + try error2:=r.ReadByte; except end; + writeln('Download: EPROTO ',error2); + state:=stLocalError; + error:=252; + Close; +end; + +procedure tJob.Close; + var s:tMemoryStream; + begin + ch^.streaminit(s,2); + {opcode }s.WriteByte(opcode.upClose); + ch^.Send(s); + ch^.Close; + writeln('chat close'); + aggr^.Stop(ix); +end; +procedure tJob.Abort; + begin + assert(state=stActive); + state:=stLocalError; + error:=251; + Close; +end; + +procedure tJob.MsgDATA(base,length:LongWord; data:pointer); + begin + so.WriteSeg(base,length,data); + done:=done+length; + if MissC<=length + then MakeRequest + else dec(MissC,length); + if base>=HighestRequestBase then {TODO, last segment in list, MakeRequest}; +end; + +procedure tJob.Free; + begin + Dec(refc); + if refc=0 then begin + writeln('Download: job closing'); + if state=stStop then Close + else if state=stActive then Abort; + {tu ja EAV, nieco s pointermi} + if assigned(aggr) then begin + aggr^.Jobs[ix]:=nil; + dec(aggr^.refs); + if aggr^.refs=0 then aggr^.Done; + end; + if state<>stDone then so.Close; + FreeMem(@self,sizeof(tJob)); + end else writeln('not closing ',refc); +end; + +procedure tDownloadJob.Start; + begin + tJob(pointer(@self)^).Start; +end; +procedure tDownloadJob.Abort; + begin + tJob(pointer(@self)^).Abort; +end; +procedure tDownloadJob.Free; + begin + tJob(pointer(@self)^).Free; +end; +END. \ No newline at end of file diff --git a/DownloadTC.pas b/DownloadTC.pas new file mode 100644 index 0000000..01ac4c7 --- /dev/null +++ b/DownloadTC.pas @@ -0,0 +1,158 @@ +{Include file} + tAggr=object + Rate:Real; + ByteCnt:LongWord; + DgrCnt:LongWord; + CurMark,PrvMark:byte; + StartT:tMTime; + Jobs: array [0..15] of ^tJob; + refs,acnt:byte; + ChanOfs:byte; + DgrCntCheck:LongWord; + remote:tNetAddr; + next:tAggr_ptr; + procedure Init(const src:tNetAddr); + procedure MsgDATA(sz:Word;mark:byte); + procedure MsgIMME(sz:Word;mark:byte); + procedure Recv(msg:tSMsg); + procedure Periodic; + procedure Done; + procedure Start(ix:byte); + procedure Stop(ix:byte); + end; +var AggrChain:^tAggr; + +function GetAggr(const remote:tNetAddr):tAggr_ptr; + var a:^tAggr; + var p:^pointer; + begin + p:=@AggrChain; + a:=AggrChain; + while assigned(a) do begin + if a^.remote=remote then begin + GetAggr:=a; + p^:=a^.next; + a^.next:=AggrChain; + AggrChain:=a^.next; + exit; + end; + end; + GetAggr:=nil; +end; +procedure tAggr.Init(const src:tNetAddr); + begin + acnt:=0; + Rate:=0; + ByteCnt:=0; + DgrCnt:=0; + CurMark:=0;PrvMark:=0; + StartT:=mNow; + refs:=high(Jobs); while refs>0 do begin Jobs[refs]:=nil; dec(refs) end; + ChanOfs:=Random(255-high(Jobs)); + DgrCntCheck:=0; + remote:=src; + SetMsgHandler(opcode.tcdata,remote,@Recv); + SetMsgHandler(opcode.tcdataimm,remote,@Recv); +end; + +procedure tAggr.Recv(msg:tSMsg); + var op:byte; + var chan:byte; + var mark:byte; + var base:DWORD; + begin + op:=msg.stream.readbyte; + mark:=msg.stream.readbyte; + if op=opcode.tcdataimm then MsgIMME(msg.length,mark); + MsgDATA(msg.length,mark); + chan:=msg.stream.readbyte; + base:=msg.stream.ReadWord(4); + if (chan<=high(Jobs))and assigned(Jobs[chan]) then Jobs[chan]^.MsgDATA(base,msg.stream.RDBufLen,msg.stream.RDBuf); +end; + +procedure tAggr.MsgIMME(sz:Word; mark:byte); + var r:tMemoryStream; + var buf:array [1..4] of byte; + begin + r.Init(@buf,0,sizeof(buf)); + r.WriteByte(opcode.tceack); + r.WriteByte(mark); + r.WriteWord(sz,2); + SendMessage(r.base^,r.length,remote); +end; + +procedure tAggr.MsgDATA(sz:Word; mark:byte); + var r:tMemoryStream; + var rateb: DWord; {BytesPerSecond shr 6 (=64)} + var buf:array [1..6] of byte; + var delta:tMTime; + begin + if mark<>PrvMark then begin + if mark<>CurMark then begin + PrvMark:=CurMark; + CurMark:=mark; + StartT:=mNow; + ByteCnt:=1; + DgrCnt:=1; + end else begin Inc(ByteCnt,sz); Inc(DgrCnt); end; + inc(DgrCntCheck); + end; + //writeln('Download: got ',DgrCnt,'dg,',ByteCnt,'B in ',delta,'ms'); + if DgrCnt<8 then exit; + delta:=(mNow-StartT){*MSecsPerDay}; + if delta<400 then exit; + rate:=(ByteCnt/delta)*1000; + //writeln('Download: rate ',(rate/1024):7:1, 'kB/s'); + rateb:=round((rate)/64); + StartT:=mNow; + ByteCnt:=1; + DgrCnt:=0; + r.Init(@buf,0,sizeof(buf)); + r.WriteByte(opcode.tccont); + r.WriteByte(mark); + r.WriteWord(rateb,4); + SendMessage(r.base^,r.length,remote); +end; + +procedure tAggr.Periodic; + begin + if DgrCntCheck>1 then begin + DgrCntCheck:=0; + Shedule(5000,@Periodic); + exit end; + writeln('Download: Periodic check failed, unimplemented!'); + //todo do +end; + +procedure tAggr.Done; + var a:^tAggr; + var p:^pointer; + begin + p:=@AggrChain; + a:=AggrChain; + while assigned(a) do begin + if a=@self then begin + p^:=next; + break end; + end; + UnShedule(@Periodic); + SetMsgHandler(opcode.tcdata,remote,nil); + SetMsgHandler(opcode.tcdataimm,remote,nil); + FreeMem(@self,sizeof(self)); +end; + +procedure tAggr.Start(ix:byte); + begin + if Jobs[ix]^.active then exit; + if acnt=0 then Shedule(5000,@Periodic); + inc(acnt); + Jobs[ix]^.active:=true; +end; + +procedure tAggr.Stop(ix:byte); + begin + if not Jobs[ix]^.active then exit; + dec(acnt); + Jobs[ix]^.active:=false; + if acnt=0 then UnShedule(@Periodic); +end; diff --git a/Upload.pas b/Upload.pas dissimilarity index 69% index f6e008b..19ee6ef 100644 --- a/Upload.pas +++ b/Upload.pas @@ -1,265 +1,441 @@ -UNIT Upload; -{Upload Manager for brodnetd} - -INTERFACE -USES Chat,opcode,ServerLoop,MemStream,NetAddr,Store1; - -IMPLEMENTATION - -type -tAggr_ptr=^tAggr; -tPrv_ptr=^tPrv; -tPrv=object - aggr:tAggr_ptr; - ch: ^tChat; - chan: byte; - next,prev: tPrv_ptr; - weight,wcur:Word; - isOpen,Active:boolean; - seglen:LongWord; - oinfo:tStoreObjectInfo; - {procedure Init(ag:tAggr_ptr; var nchat:tChat; msg: tSMsg); - procedure OnMsg(msg:tSMsg; data:boolean); - procedure Cont; - procedure DoGET(const fid:tfid; base,limit:LongWord); - procedure DoSEG(base,limit:LongWord); - procedure DoClose; - procedure Start; - procedure Stop; - procedure ChatTimeout(willwait:LongWord); - procedure Close;} -end; -tAggrThr=object - thrid:tThreadID; - crit:tRtlCriticalSection; - stop:ByteBool; - remote:tSockAddrL; - prv1:^tPrv; - size1,size2:Word; - mark1,mark2:Byte; - rate:Single; - burst:byte; - MarkData:LongWord; - MarkStart:tMTime; -end; -tAggr=object - thr:tAggrThr; - remote:tNetAddr; - refc:byte; - - acks:Word; {ack counter, for timeouts} - timeout:word; - rateIF,sizeIF, - limRate,limSize:Single; - - next,prev: tAggr_ptr; - procedure Start(sprv:tPrv_ptr); - procedure Stop(sprv:tPrv_ptr); - procedure Free; - procedure Init(const source:tNetAddr); - procedure CalcRates(rxRate:Single); - procedure Periodic; - procedure OnCont(msg:tSMsg); - procedure OnAck(msg:tSMsg); -end; - -var Peers:^tAggr; - -procedure tAggr.Init(const source:tNetAddr); - begin - writeln('upmgr: init'); - next:=Peers; - prev:=nil; - if assigned(Peers) then Peers^.prev:=@self; - Peers:=@self; - refc:=0; - acks:=0; - timeout:=0; - rateIF:=1; - sizeIF:=1; - limRate:=20*1024*1024; - limSize:=4096; - InitCriticalSection(thr.crit); - source.ToSocket(thr.remote); - remote:=source; - thr.prv1:=nil; -end; - -function ThrStart(p:pointer):LongInt; - begin with tAggrThr(p^) do begin - { ( read; ) send; wait; repeat} -end end; - -procedure tAggr.Start(sprv:tPrv_ptr); - begin - if assigned(thr.prv1) then begin - sprv^.next:=thr.prv1^.next; - sprv^.prev:=thr.prv1; - sprv^.prev^.next:=sprv; - sprv^.next^.prev:=sprv; - end else begin - sprv^.next:=sprv; - sprv^.prev:=sprv; - thr.prv1:=sprv; - thr.stop:=false; - thr.MarkData:=0; - thr.MarkStart:=mNow; - CalcRates(20480); - thr.thrid:=BeginThread(@ThrStart,@thr{,var,stack}); - end; - sprv^.wcur:=sprv^.weight; -end; - -procedure tAggr.CalcRates(rxRate:Single); - var txRate:Single; - var RateFill:Single; - var pMark:byte; - const limRateIF=3; - begin - EnterCriticalSection(thr.crit); - pMark:=thr.mark1; - if thr.MarkStart=0 - then begin txRate:=rxRate; thr.Rate:=rxRate end - else txRate:=thr.MarkData/((mNow-thr.MarkStart)/1000{*SecsPerDay}); - RateFill:=rxRate/txRate; - write('speed: ',(rxRate/1024):1:3,'kB/s (',(RateFill*100):3:1,'% of ',txRate/1024:1:3,'), '); - if RateFill<0.85 then begin - writeln('limit'); - if RateFill<0.5 then thr.size1:=128; - thr.Rate:=rxRate; - RateIF:=RateIF/2; - repeat thr.mark1:=Random(256) until (thr.mark1<>pMark); - thr.MarkData:=0; {include on-the-wire data if increasing} - end else - if (txRate/thr.Rate)>0.7 then begin - writeln('pass'); - thr.Rate:=1+txRate*(RateIF+1); - if thr.Rate>limRate then thr.Rate:=limRate - else RateIF:=RateIF*2; - if RateIF>limRateIF then RateIF:=LimRateIF; - end; - if (thr.Rate/thr.size1)<4 then thr.size1:=thr.Rate/5; - if thr.size1<120 then thr.size1:=128; - sizeIF:=sizeIF/2; - thr.size2:=round(thr.size1*(1+SizeIF)); - thr.mark2:=Random(256); - LeaveCriticalSection(thr.crit); -end; - -procedure tAggr.Periodic; - begin - Shedule(2000,@Periodic); - if acks=0 then begin - inc(Timeout); - CalcRates(512); - end; - acks:=0; - Shedule(2000,@Periodic); -end; - -procedure tAggr.OnCont(msg:tSMsg); - var op,rmark:byte; - var rRate:LongWord; - begin - op:=msg.stream.readbyte; - assert(op=opcode.tccont); - rmark:=msg.stream.readbyte; - if rmark=thr.mark1 then begin - inc(acks); - timeout:=0; - rrate:=msg.stream.readword(4); - CalcRates(rRate*64); - end; -end; - -procedure tAggr.OnAck(msg:tSMsg); - var op,rmark:byte; - var rSize:LongWord; - const limSizeIF=1; - begin - op:=msg.stream.readbyte; - assert(op=opcode.tceack); - rmark:=msg.stream.readbyte; - rsize:=msg.stream.readword(2); - if (rmark<>thr.mark2)and(rmark<>thr.mark1) then exit; - inc(acks); - Timeout:=0; - if rsizelimSizeIF then SizeIF:=limSizeIF; - EnterCriticalSection(thr.crit); - thr.size1:=rSize; - thr.size2:=round(thr.size1*(1+SizeIF)); - thr.mark2:=Random(256); - LeaveCriticalSection(thr.crit); -end; - -procedure tAggr.Stop(sprv:tPrv_ptr); - begin -end; - -procedure tAggr.Free; - begin - Assert(refc>0); - Dec(refc); - if refc=0 then begin - writeln('upmgr: aggr close'); - DoneCriticalSection(thr.crit); - if assigned(prev) then prev^.next:=next else Peers:=next; - if assigned(next) then next^.prev:=prev; - FreeMem(@self,sizeof(self)); - end else - writeln('upmgr: aggr unrefd'); -end; - -function FindAggr({const} addr:tNetAddr): tAggr_ptr; - begin - result:=Peers; - while assigned(result) do begin - if assigned(result^.next) then assert(result^.next^.prev=result); - if result^.remote=addr then exit; - result:=result^.next; - end; -end; - -procedure ChatHandler(var nchat:tChat; msg:tSMsg); - var ag:^tAggr; - var pr:^tPrv; - var s:tMemoryStream; - const cMax=16; - begin - writeln('upmgr: ChatHandler'); - msg.stream.skip({the initcode}1); - if msg.stream.RdBufLen<2 then begin - writeln('upmgr: malformed init'); - nchat.StreamInit(s,16); - s.WriteByte(upFAIL); - s.writebyte(upErrMalformed); - nchat.Send(s); - nchat.Close; - writeln('upmgr: malformed request stage=0'); - exit end; - {first get the ag} - ag:=FindAggr(msg.source^); - if assigned(ag) then begin - {check} - if ag^.refc>=cMax then begin - nchat.StreamInit(s,16); - s.WriteByte(upFAIL); - s.WriteByte(upErrHiChan); - s.WriteByte(cMax); - s.WriteByte(ag^.refc); - nchat.Send(s); - nchat.Close; - exit end; - end else begin - New(ag); - ag^.init(msg.source^); - end; - New(pr); - //pr^.Init(ag,nchat,msg); -end; - -BEGIN - SetChatHandler(opcode.upFileServer,@ChatHandler); -END. \ No newline at end of file +UNIT Upload; +{Upload Manager for brodnetd} + +INTERFACE +USES Chat,opcode,ServerLoop,MemStream,NetAddr,Store1; + +IMPLEMENTATION +uses UploadThread; + +type +tAggr_ptr=^tAggr; +tPrv_ptr=^tPrv; +tPrv=object + chan:byte; + aggr:tAggr_ptr; + uc:UploadThread.tChannel; + ch:^tChat; + isOpen,Active:boolean; + procedure Init(var nchat:tChat); + procedure OnMSG(msg:tSMsg;data:boolean); + procedure NotifyDOne; + procedure DoOPEN(const fid:tfid); + procedure DoLSEG(count:byte; base:array of LongWord; limit:array of LongWord); + procedure DoWEIGHT(nweight:word); + procedure Stop; + procedure Start; + procedure Close; + procedure Close(tell:boolean); overload; inline; + procedure ChatTimeout(willwait:LongWord); +end; +tAggr=object + thr:tUploadThr; + remote:tNetAddr; + refc:byte; + chan:array[0..11] of ^tPrv; + acks:Word; {ack counter, for timeouts} + timeout:word; + rateIF,sizeIF, + limRate,limSize:Single; + next,prev: tAggr_ptr; + procedure Free(ac:byte);{called by closing prv} + procedure Done; + procedure Start(ac:byte); + procedure Stop(ac:byte); + procedure Init(const source:tNetAddr); + procedure CalcRates(rxRate:Single); + procedure Periodic; + procedure OnCont(msg:tSMsg); + procedure OnAck(msg:tSMsg); + procedure ResetMark; +end; + +var Peers:^tAggr; +procedure SendError(var ch:tChat;e1,e2:byte); forward; +function FindAggr({const} addr:tNetAddr): tAggr_ptr; forward; + +{ PROTOCOL } +{ CLIENT SERVER(us)}{ +init upFileServer -> ACK +upOPEN -> upINFO +upOPEN -> upINFO + -> upFAIL
+upLSEG [b,l] -> upSEGOK + -> upUNAVL + -> upFAIL upErrIO +upSTOP -> ACK +upCLOSE -> ACK +upWEIGHT -> ACK + +special server messages: +upEPROTO
(protocol violation) +upCLOSE (close by server, usualy timeout) +error conditions: +upErrHiChan (channel number too high or too many connections) +upErrChanInUse +upErrNotFound (file was not found) +upErrIO (other error while opening/reading/seeking) +upEPROTO upErrNotOpen (LSEG without OPEN or afer STOP) +upEPROTO upErrTroll (trolling) +notes: +OPEN message can be merged with init, saving a round-trip +} + +procedure tPrv.DoOPEN(const fid:tfid); + var err:tmemorystream; + begin + writeln('Upload: ',string(ch^.remote),'/',chan,' OPEN'); + Stop; + if isOpen then uc.oi.Close; + isOpen:=false; + ch^.Ack; + uc.oi.Open(fid); + {if not oinfo.final then begin + oinfo.rc:=200; + Close(oinfo.hnd); + end;} + if uc.oi.rc>0 then begin + ch^.StreamInit(err,3); + err.WriteByte(upFAIL); + if uc.oi.rc=1 then err.WriteByte(upErrNotFound) + else begin err.WriteByte(upErrIO); err.WriteByte(uc.oi.rc) end; + ch^.Send(err); + end else begin + isopen:=true; + ch^.StreamInit(err,10); + err.WriteByte(upINFO); + err.WriteWord(uc.oi.length,4); + if uc.oi.final then err.WriteByte(1) else err.WriteByte(0); + err.WriteWord(0,4); + ch^.Send(err); + end; +end; + +procedure tPrv.DoLSEG(count:byte; base,limit: array of LongWord); + var err:tmemorystream; + var i:byte; + var l,fb:LongWord; + var tbytes:LongWOrd; + begin + writeln('Upload: ',string(ch^.remote),'/',chan,' LSEG'); + if not isOpen then begin + ch^.StreamInit(err,3); + err.WriteByte(upEPROTO); + err.WriteByte(upErrNotOpen); + ch^.send(err); + writeln('notOpen'); + exit end; + if count=0 then begin + ch^.StreamInit(err,3); + err.WriteByte(upEPROTO); + err.WriteByte(100); + ch^.send(err); + writeln('ZeroCount'); + exit end; + stop; + uc.seg:=0; + tbytes:=0; + for i:=1 to count do begin + if limit[i-1]=0 then begin + ch^.StreamInit(err,3); + err.WriteByte(upEPROTO); + err.WriteByte(101); + ch^.send(err); + writeln('ZeroLimit'); + exit end; + l:=uc.oi.SegmentLength(base[i-1]); + if l>0 then begin + inc(uc.seg); + uc.s[uc.seg].base:=base[i-1]; + if l>limit[i-1] then l:=limit[i-1]; + uc.s[uc.seg].len:=l; + inc(tbytes,l); + end else if i=1 then begin + {first failed, try find some seg} + uc.oi.GetSegAfter(base[0],fb,l); + ch^.StreamInit(err,5); + if l=0 then fb:=0; + err.WriteByte(upUNAVL); + err.WriteWord(fb,4); + ch^.Send(err); + exit; + end; + end; + ch^.StreamInit(err,6); + err.WriteByte(upSEGOK); + err.WriteWord(tbytes,4); + err.WriteByte(uc.seg); + ch^.Send(err); + Start; +end; + +procedure tPrv.DoWEIGHT(nweight:word); + begin + if nweight<50 then nweight:=50; + uc.Weight:=nweight; + ch^.Ack; +end; + +procedure ChatHandler(var nchat:tChat; msg:tSMsg); + var ag:^tAggr; + var pr:^tPrv; + var chan:byte; + begin + writeln('Upload: ChatHandler'); + msg.stream.skip({the initcode}1); + if msg.stream.RdBufLen<2 then begin SendError(nchat,upErrMalformed,0); exit end; + chan:=msg.stream.ReadByte; + writeln(chan); + if chan>high(tAggr.chan) then begin Senderror(nchat,upErrHiChan,chan); exit end; + ag:=FindAggr(msg.source^); + if not assigned(ag) then begin + New(ag); + ag^.init(msg.source^); + end else if assigned(ag^.chan[chan]) then begin SendError(nchat,upErrChanInUse,0); exit end; + New(pr); + pr^.aggr:=ag; + pr^.chan:=chan; + ag^.chan[chan]:=pr; + inc(ag^.refc); + pr^.Init(nchat); + if msg.stream.RdBufLen>0 {the request may be empty} + then pr^.OnMSG(msg,true); +end; +procedure tPrv.OnMSG(msg:tSMsg;data:boolean); + var op:byte; + var hash:tfid; + var base:LongWord; + var limit:LongWord; + var err:tmemorystream; + var count:byte; + var lbas:array [0..23] of LongWOrd; + var llim:array [0..23] of LongWOrd; + label malformed; + begin + if not data then exit; //todo + if msg.stream.RdBufLen<1 then goto malformed; + op:=msg.stream.ReadByte; + writeln('Upload: ',string(ch^.remote),' opcode=',op,' sz=',msg.stream.RdBufLen); + case op of + upClose: begin + ch^.Ack; + Close(false); + end; + upOPEN: begin + if msg.stream.RdBufLen<20 then goto malformed; + msg.stream.Read(hash,20); + DoOPEN(hash); + end; + upLSEG: begin + count:=0; + while (msg.stream.RdBufLen>0)and(count<=high(lbas)) do begin + if msg.stream.RdBufLen<8 then goto malformed; + lbas[count]:=msg.stream.ReadWord(4); + llim[count]:=msg.stream.ReadWord(4); + inc(count); + end; + DoLSEG(count,lbas,llim); + end; + upWEIGHT: begin + if msg.stream.RdBufLen<>2 then goto malformed; + base:=msg.stream.ReadWord(2); + DoWEIGHT(base); + end; + else goto malformed; + end; + exit; malformed: + ch^.StreamInit(err,3); + err.WriteByte(upEPROTO); + err.WriteByte(upErrMalformed); + ch^.Send(err); + writeln('Upload: malformed request stage=1'); +end; + +procedure tPrv.Init(var nchat:tChat); + begin + ch:=@nchat; + ch^.Callback:=@OnMsg; + ch^.TMHook:=@ChatTimeout; + uc.weight:=100; + isOpen:=false; Active:=false; + Shedule(5000,@Close); + writeln('Upload: prv for ',string(ch^.remote),'/',chan,' init'); +end; + +procedure tPrv.NotifyDone; + var err:tmemorystream; + begin + Stop; + ch^.StreamInit(err,2); + err.WriteByte(upDONE); + ch^.Send(err); +end; + +procedure tPrv.Stop; + begin + if active then begin + active:=False; + Shedule(20000,@Close); + aggr^.Stop(chan); + end; + writeln('Upload: prv for ',string(ch^.remote),'/',chan,' stop'); +end; +procedure tPrv.Start; + begin + assert(isOpen); + if not active then UnShedule(@Close); + active:=true; + aggr^.Start(chan); + writeln('Upload: prv for ',string(ch^.remote),'/',chan,' start'); +end; + +procedure tPrv.Close(tell:boolean); + var err:tMemoryStream; + begin + assert(assigned(ch)); + writeln('Upload: prv for ',string(ch^.remote),'/',chan,' close'); + if tell then begin + ch^.StreamInit(err,1); + err.WriteByte(upClose); + ch^.Send(err); + end; + Stop; + if isOpen then uc.oi.Close; + isOpen:=false; + ch^.Close; + ch:=nil; + UnShedule(@Close); + Aggr^.Free(chan); + FreeMem(@self,sizeof(self)); +end; +procedure tPrv.Close; + begin + Close(true); +end; + +procedure tPrv.ChatTimeout(willwait:LongWord); + begin + if WillWait<8000 then exit; + writeln('Upload: prv for ',string(ch^.remote),'/',chan,' ChatTimeout'); + Close; +end; + +{***AGGREGATOR***} + +procedure tAggr.Init(const source:tNetAddr); + begin + next:=Peers; + prev:=nil; + if assigned(Peers) then Peers^.prev:=@self; + Peers:=@self; + refc:=0; + acks:=0; + timeout:=0; + rateIF:=1; + sizeIF:=1; + limRate:=2000*1024*1024; + limSize:=4096; + remote:=source; + writeln('Upload: aggr for ',string(remote),' init'); + thr.Init(source); + CalcRates(2048); + SetMsgHandler(opcode.tccont,remote,@OnCont); + SetMsgHandler(opcode.tceack,remote,@OnAck); +end; + +function FindAggr({const} addr:tNetAddr): tAggr_ptr; + begin + result:=Peers; + while assigned(result) do begin + if assigned(result^.next) then assert(result^.next^.prev=result); + if result^.remote=addr then exit; + result:=result^.next; + end; +end; + +procedure SendError(var ch:tChat;e1,e2:byte); + var s:tMemoryStream; + begin + ch.StreamInit(s,3); + s.WriteByte(upFAIL); + s.WriteByte(e1); + s.WriteByte(e2); + ch.Send(s); + ch.Close; +end; + +procedure tAggr.Free(ac:byte); + begin + assert(assigned(chan[ac])); + chan[ac]:=nil; + dec(refc); + if refc=0 then Done; +end; +procedure tAggr.Done; + begin + write('Upload: aggr for ',string(remote),' done'); + thr.Done; writeln(' thrdone'); + UnShedule(@Periodic); + if assigned(prev) then prev^.next:=next else Peers:=next; + if assigned(next) then next^.prev:=prev; + SetMsgHandler(opcode.tccont,remote,nil); + SetMsgHandler(opcode.tceack,remote,nil); + FreeMem(@Self,sizeof(self)); +end; + +procedure tAggr.Start(ac:byte); + begin + writeln('Upload: aggr for ',string(remote),' start chan ',ac); + assert(assigned(chan[ac])); + EnterCriticalSection(thr.crit); + assert(not assigned(thr.chans[ac])); + thr.chans[ac]:=@chan[ac]^.uc; + chan[ac]^.uc.wcur:=chan[ac]^.uc.weight; + UnShedule(@Periodic); + Shedule(700,@Periodic); + if thr.stop or thr.wait then ResetMark else {do not reset if running}; + thr.Start; {wake up, or start if not running} + LeaveCriticalSection(thr.crit); +end; + +procedure tAggr.Stop(ac:byte); + begin + writeln('Upload: aggr for ',string(remote),' stop chan ',ac); + assert(assigned(chan[ac])); + EnterCriticalSection(thr.crit); + assert(assigned(thr.chans[ac])); + thr.chans[ac]:=nil; + LeaveCriticalSection(thr.crit); +end; + +procedure tAggr.Periodic; + var i:byte; + var e:boolean; + begin + if (thr.stop)or(thr.wait) then begin + for i:=0 to high(chan) do if assigned(chan[i]) then with chan[i]^ do begin + if not active then continue; + EnterCriticalSection(thr.crit); + e:=uc.Seg=0; + LeaveCriticalSection(thr.crit); + if e then NotifyDone; + end; + exit end; + if acks=0 then begin + inc(Timeout); + if timeout>=10 then begin + refc:=255; + for i:=0 to high(chan) do if assigned(chan[i]) then chan[i]^.Close; + Done;exit;end; + if timeout=4 then CalcRates(512); + end else timeout:=0; + acks:=0; + Shedule(700,@Periodic); +end; + +{$I UploadTC.pas} + +BEGIN + Peers:=nil; + SetChatHandler(opcode.upFileServer,@ChatHandler); +END. \ No newline at end of file diff --git a/UploadTC.pas b/UploadTC.pas new file mode 100644 index 0000000..b2cd915 --- /dev/null +++ b/UploadTC.pas @@ -0,0 +1,100 @@ +{include file} +procedure tAggr.ResetMark; + var pMark:byte; + begin + pMark:=thr.mark; + thr.MarkData:=0; thr.MarkTime:=0; {prevent timing screwup in thread} + repeat thr.mark:=Random(255)+1 until (thr.mark<>pMark); +end; + +procedure tAggr.CalcRates(rxRate:Single); + var txRate:Single; + var RateFill:Single; + const limRateIF=1; + begin + EnterCriticalSection(thr.crit); + if thr.MarkTime=0 then thr.MarkTime:=1; + if thr.MarkData=0 + then begin + writeln('RESET INITIAL'); + txRate:=rxRate; thr.Rate:=4096; + ResetMark; + end else txRate:=thr.MarkData/(thr.MarkTime/1000); + if rxRate=0 then rxRate:=1; + if txRate=0 then txRate:=1; + RateFill:=rxRate/txRate; + write('speed: ',(rxRate/1024):8:2,'kB/s (',(RateFill*100):3:0,'% of ',txRate/1024:8:2,'), '); + if RateFill<0.90 then begin + write('limit'); + if RateFill<0.5 then thr.size1:=round(thr.size1*0.75); + thr.Rate:=rxRate; + RateIF:=RateIF*0.1*RateFill; + end else + if (txRate/thr.Rate)>0.7 then begin + write('pass'); + thr.Rate:=1+txRate*(RateIF+1); + if thr.Rate>limRate then thr.Rate:=limRate + else RateIF:=RateIF+0.1; + if RateIF>limRateIF then RateIF:=LimRateIF; + end else write('3hard'); + ResetMark; {TODO: do not do this} + if thr.size1<120 then thr.size1:=128; + {no ack to size inc packet, back up} + sizeIF:=sizeIF/8; + {but at least 1 byte increase} + if (thr.size1*SizeIF)<1 then SizeIF:=1/thr.Size1; + {freq...} + if (thr.Size1/thr.Rate)>0.11 then begin thr.size1:=100; thr.rate:=4096; end; + write(', if=',RateIF:6:4); + write(', size=',thr.size1:5,'+',SizeIF:6:4); + {request ack, also triggers MTU discovery} + thr.size2:=thr.size1; + thr.size1:=thr.size2-1; {???} + writeln; + LeaveCriticalSection(thr.crit); +end; + +procedure tAggr.OnCont(msg:tSMsg); + var op,rmark:byte; + var rRate:LongWord; + begin + op:=msg.stream.readbyte; + assert(op=opcode.tccont); + rmark:=msg.stream.readbyte; + if rmark=thr.mark then begin + inc(acks); + timeout:=0; + rrate:=msg.stream.readword(4); + CalcRates(rRate*64); + end; +end; + +procedure tAggr.OnAck(msg:tSMsg); + var op,rmark:byte; + var rSize:LongWord; + const limSizeIF=1; + begin + op:=msg.stream.readbyte; + assert(op=opcode.tceack); + rmark:=msg.stream.readbyte; + rsize:=msg.stream.readword(2); + if (rmark<>thr.mark) then exit; + inc(acks); + Timeout:=0; + {do nothing if timeout recovery or not increase} + if (rsize<=thr.size1)or(timeout>0) then exit; + EnterCriticalSection(thr.crit); + {try to maintain frequency} + if (rSize/thr.Rate)<0.11 + then thr.size1:=rSize {use the new size as main size} + else sizeIF:=0; + {increase increase fastor} + SizeIF:=SizeIF*2; if SizeIF>limSizeIF then SizeIF:=limSizeIF; + assert(thr.size2=0); {wtf?} + {calc new packet size} + thr.size2:=round(thr.Size1*(1+SizeIF)); + {do nothing if they are equal} + if thr.size1=thr.size2 then thr.size2:=0 + {else writeln('Set size2: ',thr.size2,' ',thr.size1)}; + LeaveCriticalSection(thr.crit); +end; \ No newline at end of file diff --git a/UploadThread.pas b/UploadThread.pas new file mode 100644 index 0000000..8f9c6f2 --- /dev/null +++ b/UploadThread.pas @@ -0,0 +1,148 @@ +unit UploadThread; +{coprocessor to Upload unit. Move I/O out of main thread} +INTERFACE +uses Store1,Sockets,NetAddr; + +type tSegment=record + base,len:LongWord; +end; +type tChannel=record + s: array [1..24] of tSegment; + seg:byte; + weight:Word; + wcur:Word; + oi:tStoreObjectInfo; + end; +type tUploadThr=object + thrid:tThreadID; + crit:tRtlCriticalSection; + socket:tSocket; + remote:tSockAddrL; + size1,size2:Word; + mark:Byte; + rate:Single; + MarkTime:LongWord;{ms} + MarkData:LongWord; + chans:array [0..11] of ^tChannel; + curc:byte; + buffer:array [0..2047] of byte; + stop:boolean; {the therad is stopped or stopping} + wait:boolean; {the therad is waiting for data} + + procedure Main; + procedure Init(source:tNetAddr); + procedure Start; + procedure Done; +end; + +IMPLEMENTATION +uses MemStream,ServerLoop,SysUtils,opcode; + +procedure tUploadThr.Init(source:tNetAddr); + var i:integer; + begin + InitCriticalSection(crit); + source.ToSocket(remote); + socket:=GetSocket(source); + MarkData:=0; + MarkTime:=0; + stop:=true; + wait:=false; + for i:=0 to high(chans) do chans[i]:=nil; +end; + +procedure tUploadThr.Main; + var pch:byte; + var s:tMemoryStream; + var sz:Word; + var txwait,delta:single;//msec + var LastTime:tDateTime;//days + var chan:^tChannel; + var seg:^tSegment; + begin + txwait:=0; + delta:=0; + while not stop do begin + EnterCriticalSection(crit); + pch:=0; + {find usable channel} + while (chans[curc]=nil)or(chans[curc]^.wcur=0)or(chans[curc]^.seg=0) do begin + if assigned(chans[curc])and(chans[curc]^.WCur=0) then chans[curc]^.WCur:=chans[curc]^.weight; + inc(curc); + inc(pch); + if curc>high(chans) then curc:=0; + if pch>(high(chans)+1) then begin wait:=true; break; end; + end; + if wait then begin + LeaveCriticalSection(crit); + sleep(200); + continue; + end; + LastTime:=SysUtils.Now; + chan:=chans[curc]; + seg:=@chan^.s[chan^.seg]; + s.Init(@buffer,0,high(buffer)); + {prepare header} + if size2>s.size then size2:=0; + if size2=0 then begin + sz:=size1; if size1>s.size then sz:=s.size; + s.WriteByte(opcode.tcdata); + end else begin + sz:=size2; if sz>s.size then sz:=s.size; + s.WriteByte(opcode.tcdataimm); + size2:=0; + end; + Assert(seg^.len>0); + s.WriteByte(mark); + s.WriteByte(curc); + s.WriteWord(seg^.base,4); + dec(sz,s.length); + if sz>seg^.Len then sz:=seg^.Len; + assert(sz<=seg^.len); + chan^.oi.ReadSeg(s.WrBuf,seg^.base,sz); + Assert(chan^.oi.rc=0,'IO error reading segment'); + s.WrEnd(sz); + assert((Seg^.Len-sz)>=0); + Dec(Seg^.Len,sz); + Dec(chan^.WCur); + if Seg^.Len=0 then Dec(chan^.seg) + else Inc(Seg^.Base,sz); + LeaveCriticalSection(crit); + fpSendTo(socket,s.base,s.length,0,@remote,sizeof(remote)); + txwait:=((MarkData/Rate)*1000)-(MarkTime); + MarkData:=MarkData+s.length; + if txWait>1000 then begin writeln('!!! txwait=',round(txWait)); txWait:=1000;end; + if txWait>0 then Sleep(round(txWait)); + Delta:=Delta+((SysUtils.Now-LastTime)*MSecsPerDay); + if Delta>5000 then Delta:=3000; + if Delta<0 then Delta:=0; + MarkTime:=MarkTime+trunc(Delta); + Delta:=frac(Delta); + end; +end; + +function thrfunc(p:pointer):PtrInt; + begin + tUploadThr(p^).Main; + thrfunc:=9; +end; +procedure tUploadThr.Start; + begin + wait:=false; + if not stop then exit; + stop:=false; + MarkData:=0; + MarkTime:=0; + thrid:=BeginThread(@ThrFunc,@self); +end; + +procedure tUploadThr.Done; + begin + if stop then exit; + EnterCriticalSection(crit); + stop:=true; + LeaveCriticalSection(crit); + WaitForThreadterminate(thrid,999999); + DoneCriticalSection(crit); +end; +END. \ No newline at end of file diff --git a/brodnetd.pas b/brodnetd.pas index 757bd87..2517382 100644 --- a/brodnetd.pas +++ b/brodnetd.pas @@ -5,6 +5,8 @@ uses cthreads,ServerLoop ,TestWatch ,TestChat ,AsyncProcess + ,Upload + ,Download ; BEGIN -- 2.11.4.GIT