From 53a38d7ba37945a7fddafe200f5cf38e36a03cb2 Mon Sep 17 00:00:00 2001 From: =?utf8?q?Tom=C3=A1=C5=A1=20Brada?= Date: Thu, 5 Nov 2015 22:01:15 +0100 Subject: [PATCH] Fuck. --- Download.pas | 246 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++-- Store1.pas | 78 ++++++++++++++++--- TestFS.pas | 187 +++++++++++++++------------------------------ todo.txt | 10 +++ upmgr.pas | 5 +- 5 files changed, 381 insertions(+), 145 deletions(-) rewrite TestFS.pas (76%) diff --git a/Download.pas b/Download.pas index 319eefe..721a824 100644 --- a/Download.pas +++ b/Download.pas @@ -3,7 +3,10 @@ unit Download; INTERFACE uses NetAddr, - ServerLoop,opcode,MemStream; + ServerLoop,opcode,MemStream + ,Store1 + ,Chat + ; { Same idea here as upmgr. Have an tAggr for each peer reporting speed and tJob for each file request saving to file and reqesting missed segments. @@ -15,18 +18,49 @@ uses NetAddr, was found. } type - tJob_ptr=pointer;//^tJob; + pDownloadJob=^tDownloadJob; + tDownloadJob=object + total,done:LongWord; + missc:LongWord; + state:(stStop,stActive,stDone,stError); + error:byte; + error2:byte; + fid:tFID; + procedure Start; + procedure Abort; + end; +function GetJob(const fid:tFID):pDownloadJob; +function NewJob(const source:tNetAddr; const fid: tFID):pDownloadJob; + +IMPLEMENTATION +type + tJob=object(tDownloadJob) + aggr:pointer; + ix:byte; + so:tStoreObjectInfo; + ch:tChat; + rofs,rlen:LongWord;{b:l of to be requested block} + procedure Init(const source:tNetAddr; const ifid: tFID); + procedure MsgDATA(base,length:LongWord; data:pointer); + procedure Start; + procedure StartTransfer(preamble:boolean); + procedure Abort; + procedure ReplyGET(msg:tSMsg; data:boolean); + procedure ReplyClose(msg:tSMsg; data:boolean); + end; + tAggr_ptr=^tAggr; tAggr=object Rate:Real; ByteCnt:LongWord; DgrCnt:LongWord; CurMark,PrvMark:byte; StartT:tMTime; - Jobs: array [0..15] of tJob_ptr; + Jobs: array [0..15] of ^tJob; refs:byte; ChanOfs:byte; DgrCntCheck:LongWord; remote:tNetAddr; + next:tAggr_ptr; procedure Init(const src:tNetAddr); procedure MsgDATA(sz:Word;mark:byte); procedure MsgIMME(sz:Word;mark:byte); @@ -34,7 +68,196 @@ type procedure Periodic; procedure Done; end; -IMPLEMENTATION +var AggrChain:^tAggr; + +function GetAggr(const remote:tNetAddr):tAggr_ptr; + var a:^tAggr; + var p:^pointer; + begin + p:=@AggrChain; + a:=AggrChain; + while assigned(a) do begin + if a^.remote=remote then begin + GetAggr:=a; + p^:=a^.next; + a^.next:=AggrChain; + AggrChain:=a^.next; + exit; + end; + end; + GetAggr:=nil; +end; +function GetJob(const fid:tFID):pDownloadJob; + var a:^tAggr; + var i:byte; + var p:^pointer; + begin + p:=@AggrChain; + a:=AggrChain; + while assigned(a) do begin + for i:=0 to high(tAggr.Jobs) do begin + if CompareWord(a^.Jobs[i],fid,10)=0 then begin + GetJob:=a^.Jobs[i]; + assert(a^.Jobs[i]^.ix=i); + assert(a^.Jobs[i]^.aggr=a); + p^:=a^.next; + a^.next:=AggrChain; + AggrChain:=a^.next; + exit; + end; + end; + end; + GetJob:=nil; +end; + +function NewJob(const source:tNetAddr; const fid: tFID):pDownloadJob; + begin + result:=GetJob(fid); + if assigned(result) then exit; + result:=GetMem(sizeof(tJob)); + tJob(pointer(result)^).init(source,fid); +end; + +procedure tJob.Init(const source:tNetAddr; const ifid: tFID); + var dw:^tAggr; + begin + error:=0; + error2:=0; + fid:=ifid; + so.Open(fid); + done:=0; + missc:=0; + if so.final then begin + done:=total; + state:=stDone; + rlen:=0; + exit end; + state:=stStop{,stActive,stDone,stError)}; + if so.rc=0 then begin + writeln('Download: resuming'); + total:=so.Length; + so.GetMiss(rofs,rlen); + end else begin + writeln('Download: start from zero'); + total:=0; + rofs:=0; + rlen:=8192; + end; + so.EnableWrite(fid); + if so.rc<>0 then begin + state:=stError; + error:=253; + error2:=so.rc; + exit end; + dw:=GetAggr(source); + if not assigned(aggr) then begin + new(dw); + dw^.Init(source); + ix:=0; + end else begin + ix:=0; while (ix0))or((state=stDone)and(rlen=0)) ); +end; +procedure tJob.Start; + begin + assert( (state=stStop)and(rlen>0) ); + StartTransfer(true); + //Shedule(20000,@HardTimeout); + state:={stStop,}stActive{,stDone,stError}; +end; +procedure tJob.StartTransfer(preamble:boolean); + var s:tMemoryStream; + begin + assert( rlen>0 ); + ch.Callback:=@ReplyGET; + ch.streaminit(s,33); + if preamble then begin + {service}s.WriteByte(opcode.upFileServer); + {channel}s.WriteByte(ix); + {opcode }s.WriteByte(opcode.upGET); + {file }s.Write(fid,20); + end else + {opcode }s.WriteByte(opcode.upSEG); + {basehi }s.WriteWord(0,2); + {base }s.WriteWord(rofs,4); + {limit }s.WriteWord(rlen,4); + ch.Send(s); +end; +procedure tJob.ReplyGET(msg:tSMsg; data:boolean); + var r:tMemoryStream absolute msg.stream; + var op:byte; + var rsize,rseg:LongWord; + var rfinal:byte; + begin + {reply from GET request} + write('Download: ReplyGET: '); + if not data then begin + writeln('ack'); + end else begin + ch.Ack; + op:=msg.stream.ReadByte; + if op=upFAIL then begin + state:=stError; + try + error:=r.ReadByte; + error2:=r.ReadByte; + except end; + writeln('FAIL ',error,'-',error2); + end + else if op=upINFO then begin + {rsizehi}r.skip(2); + rsize :=r.ReadWord(4); + rfinal :=r.readbyte; + rseg :=r.readword(4); + writeln('INFO size=',rsize,' final=',rfinal,' seg=',rseg); + if (rsize<>so.length) then writeln('Download: length mismatch ',so.length,'->',rsize); + total:=rsize; + so.SetFLength(total); + //UnShedule(@HardTimeout); + end else if op=opcode.upDONE then begin + writeln('DONE'); + assert(so.Length>0); + so.GetMiss(rofs,rlen); + if rlen=0 then begin + state:=stDone; + writeln('Download: completed'); + end else StartTransfer(false); + end else begin + if op=upClose then writeln('CLOSE') else writeln('unknown'); + state:=stError; + error:=254; + error2:=op; + end; + end; +end; + +procedure tJob.Abort; + var s:tMemoryStream; + begin + assert(state=stActive); + ch.Callback:=@ReplyClose; + //Shedule(20000,@HardTimeout); + ch.streaminit(s,2); + {opcode }s.WriteByte(opcode.upClose); + ch.Send(s); + state:=stError; + error:=255; +end; +procedure tJob.ReplyClose(msg:tSMsg; data:boolean); + begin + writeln('Download: ReplyClose'); +end; + +procedure tJob.MsgDATA(base,length:LongWord; data:pointer); + begin + so.WriteSeg(base,length,data); + done:=done+length; +end; procedure tAggr.Init(const src:tNetAddr); begin @@ -54,15 +277,17 @@ end; procedure tAggr.Recv(msg:tSMsg); var op:byte; - var mark:byte; var chan:byte; + var mark:byte; + var base:DWORD; begin op:=msg.stream.readbyte; mark:=msg.stream.readbyte; if op=opcode.tcdataimm then MsgIMME(msg.length,mark); MsgDATA(msg.length,mark); chan:=msg.stream.readbyte; - //delegate to others todo + base:=msg.stream.ReadWord(4); + if (chan<=high(Jobs))and assigned(Jobs[chan]) then Jobs[chan]^.MsgDATA(base,msg.stream.RDBufLen,msg.stream.RDBuf); end; procedure tAggr.MsgIMME(sz:Word; mark:byte); @@ -122,4 +347,13 @@ procedure tAggr.Done; UnShedule(@Periodic); end; +procedure tDownloadJob.Start; + begin + tJob(pointer(@self)^).Start; +end; +procedure tDownloadJob.Abort; + begin + tJob(pointer(@self)^).Abort; +end; + END. \ No newline at end of file diff --git a/Store1.pas b/Store1.pas index 1b6fd08..0b81ee4 100644 --- a/Store1.pas +++ b/Store1.pas @@ -11,6 +11,7 @@ tStoreObjectInfo=object rc:Word; {0=no error 1=not found, other} length:LongWord; {the whole file} seglen:longword; {from cur to end of segment} + offset:LongWord; {only valid when reading} procedure Open(const fid:tfid); procedure Close; @@ -18,7 +19,7 @@ tStoreObjectInfo=object procedure ReadAhead(cnt:Word; into:pointer); procedure WaitRead; {wait for read to finish, rc} procedure EnableWrite(const fid:tFID); - procedure SetLength(len:LongWord); + procedure SetFLength(len:LongWord); procedure WriteSeg(ofs:LongWord;len:word;data:pointer); procedure GetMiss(out ofs:LongWord; out len:LongWord; var state:pointer); procedure GetMiss(out ofs:LongWord; out len:LongWord); @@ -112,6 +113,7 @@ procedure tStoreObjectInfo.Open(const fid:tfid); begin mkfilen(filename,'f',fid); segi:=nil; + Offset:=0; dh:=FileOpen(filename,fmOpenRead or fmShareDenyWrite); if dh<>-1 then begin rc:=0; @@ -137,10 +139,14 @@ end; procedure tStoreObjectInfo.EnableWrite(const fid:tFID); begin + writeln('Store1: enaling write'); assert((dh=-1)or(not final)); if dh=-1 then begin {file was close, create} dh:=FileCreate(filename); + if dh=-1 then begin + Writeln('Store1: create failed for file ',filename,', ioresult=',IOResult); + rc:=3; exit end; {init length and segments} length:=0; segi:=GetSegInfo(fid); @@ -152,9 +158,10 @@ procedure tStoreObjectInfo.EnableWrite(const fid:tFID); end; if dh=-1 then rc:=2 else rc:=0; end; -procedure tStoreObjectInfo.SetLength(len:LongWord); +procedure tStoreObjectInfo.SetFLength(len:LongWord); begin - assert( (length=0)and(not final)and(dh<>-1) ); + assert(not final); + writeln('Store1: SetFLength ',len); length:=len; {todo: errors!!!} FileSeek(dh,len,fsFromBeginning); @@ -164,12 +171,32 @@ procedure tSegInfo.SetSeg(ofs,len:LongWord; state:boolean); var cp:^tSeg; var pcp:^pointer; var after:LongWord; + var op:boolean; + procedure Dump(c:char); + begin + cp:=cache; + writeln('Store1: dumpCache ',c,' ',LongWord(@self)); + while assigned(cp) do begin + writeln(cp^.first,'-',cp^.after); + cp:=cp^.next; + end; + end; begin assert(state); after:=ofs+len; + //Dump('a'); pcp:=@cache; cp:=cache; + //writeln('Store1: Add: ',ofs,'-',after); while assigned(cp) do begin + op:=false; + if (ofs<=cp^.first)and(after>=cp^.after) then begin + {merge complete-encase} + pcp^:=cp^.next; + dispose(cp); + cp:=pcp^; + continue; + end; if cp^.after=ofs then begin {merge left-matching} pcp^:=cp^.next; @@ -186,15 +213,21 @@ procedure tSegInfo.SetSeg(ofs,len:LongWord; state:boolean); cp:=pcp^; continue; end; - pcp:=@cp^.next; + if (after>cp^.first)and(ofs<=cp^.first)and(after<=cp^.after) then begin writeln('k'); after:=cp^.first; end; + if (ofs=cp^.after)and(ofs>=cp^.first) then begin writeln('l'); ofs:=cp^.after;end; + if not op then pcp:=@cp^.next; cp:=pcp^; end; + //Dump('b'); {add the merged seg} + if ofs<>after then begin new(cp); cp^.first:=ofs; cp^.after:=after; cp^.next:=cache; cache:=cp; + end; + //Dump('c'); end; procedure tStoreObjectInfo.WriteSeg(ofs:LongWord;len:word;data:pointer); begin @@ -204,11 +237,30 @@ procedure tStoreObjectInfo.WriteSeg(ofs:LongWord;len:word;data:pointer); tSegInfo(segi^).SetSeg(ofs,len,true); end; procedure tStoreObjectInfo.GetMiss(out ofs:LongWord; out len:LongWord; var state:pointer); -begin -with tSegInfo(segi^) do begin - assert(false); -end; -end; + var cp,cp1,cp2:^tSeg; + begin with tSegInfo(segi^) do begin + assert(state=nil); + {find seg with lowest base, return 0..base-1} + cp1:=nil; cp2:=nil; + len:=0; + ofs:=0; + cp:=cache; while assigned(cp) do begin + if (cp1=nil)or(cp^.firstcp1^.first) then cp2:=cp; + cp:=cp^.next; end; + if assigned(cp2) then begin + ofs:=cp1^.after; + len:=cp2^.first-ofs; + end else begin + ofs:=cp1^.after; + len:=self.length-ofs; + end; + end else len:=self.length; + writeln('Store1: report miss ',ofs,'+',len); +end;end; procedure tStoreObjectInfo.GetMiss(out ofs:LongWord; out len:LongWord); var state:pointer; begin @@ -224,6 +276,7 @@ procedure tStoreObjectInfo.ReadAhead(cnt:Word; into:pointer); assert(seglen>=cnt); red:=FileRead(dh,into^,cnt); seglen:=seglen-red; + offset:=offset+red; if red=cnt then rc:=0 else begin //todo writeln('Store1: read ',red,' out of ',cnt,' requested bytes'); @@ -275,12 +328,15 @@ procedure tStoreObjectInfo.SegSeek(ofs:longword); if final then begin if ofs<=length then begin seglen:=length-ofs; - FileSeek(dh,ofs,fsFromBeginning); - rc:=0; + if FileSeek(dh,ofs,fsFromBeginning)=ofs then begin + offset:=ofs; + rc:=0; + end else rc:=3; end else rc:=5; end else if assigned(segi) then begin seglen:=tSegInfo(segi^).GetSegLen(ofs); if seglen=0 then rc:=4 else if FileSeek(dh,ofs,fsFromBeginning)<>ofs then rc:=3 else rc:=0; + offset:=ofs; end else rc:=7; end; diff --git a/TestFS.pas b/TestFS.pas dissimilarity index 76% index 72a6602..5a697c5 100644 --- a/TestFS.pas +++ b/TestFS.pas @@ -1,126 +1,61 @@ -unit TestFS; - -INTERFACE -IMPLEMENTATION -USES ServerLoop,Chat,SysUtils,MemStream,NetAddr,opcode,Download; - -type t=object - ch: tChat; - dw:Download.tAggr; - //procedure UserInput - procedure ST1(msg:tSMsg; data:boolean); - procedure ST2(msg:tSMsg; data:boolean); - procedure ST3(msg:tSMsg; data:boolean); - procedure Rekt; - procedure HardTimeout; -end; - -procedure t.ST1(msg:tSMsg; data:boolean); - var r:tMemoryStream absolute msg.stream; - var s:tMemoryStream; - var op:byte; - begin - {reply from GET request} - write('TestFS: ST1 reply from FS: '); - if not data then begin - writeln('ack'); - end else begin - ch.Ack; - op:=msg.stream.ReadByte; - if op=upFAIL then writeln('FAIL ',r.readbyte{,'-',r.readbyte}) - else if op=upINFO then begin - r.skip(2); - writeln('INFO size=',r.ReadWord(4),' final=',r.readbyte,' seg=',r.readword(4)); - ch.Callback:=@ST2; - UnShedule(@HardTimeout); - end else if op=upClose then writeln('CLOSE') - else writeln('unknown'); - end; -end; -procedure t.ST2(msg:tSMsg; data:boolean); - var r:tMemoryStream absolute msg.stream; - var s:tMemoryStream; - var op:byte; - begin - {Status Message} - write('TestFS: ST2 reply from FS: '); - if data then begin - ch.Ack; - op:=msg.stream.ReadByte; - if op=upCLOSE then writeln('CLOSE ') - else if op=upDONE then begin - writeln('DONE'); - halt(99); - ch.streaminit(s,1); - s.WriteByte(opcode.upClose); - ch.Send(s); - ch.Callback:=@ST3; - end; - end else writeln('ack (unexpected)'); -end; -procedure t.ST3(msg:tSMsg; data:boolean); - begin - {ACK to Close} - write('TestFS: ST3 reply from FS: '); - if data then writeln('unepected data') else begin - writeln('ack'); - ch.DisposeHook:=@Rekt; - ch.Close; - dw.Done; - end; -end; - -var cnt:LongWOrd=0; -procedure IgnoreData(msg:tSMsg); - begin - cnt:=cnt+msg.length; - //Writeln('Data: ',cnt); -end; - -procedure t.Rekt; - begin - writeln('TestFS: rekt'); - UnShedule(@HardTimeout); - FreeMem(@self,sizeof(self)); -end; - -procedure t.HardTimeout; - begin - writeln('TestFS: hardtimeout'); - ch.DisposeHook:=@Rekt; - ch.Close; -end; - -procedure init; - var o:^t; - var oi:word; - var s:tMemoryStream; - const opt='-test-fs'; - begin - oi:=OptIndex(opt); - if oi>0 then begin - assert(OptParamCount(oi)=1,opt+'(rcpt:tNetAddr)'); - writeln('TestFS: rcpt '+paramstr(oi+1)); - new(o); with o^ do begin - ch.Init(paramstr(oi+1)); - ch.Callback:=@ST1; - dw.Init(ch.remote); - Shedule(20000,@HardTimeout); - ch.streaminit(s,33); - s.WriteByte(opcode.upFileServer); - s.WriteByte({channel}99); - s.WriteByte(opcode.upGET); - s.Skip(20); - s.WriteWord(0,2); - s.WriteWord(0,4); - s.WriteWord($FFFFFFFF,4); - ch.Send(s); - //ServerLoop.SetMsgHandler(4,@IgnoreData); - //ServerLoop.SetMsgHandler(6,@IgnoreData); - end; - end; -end; - -BEGIN - init; -END. \ No newline at end of file +unit TestFS; + +INTERFACE +IMPLEMENTATION +USES ServerLoop,Chat,SysUtils,MemStream,NetAddr,opcode,Download,Store1; + +type t=object + job:^tDownloadJob; + //procedure UserInput + procedure Periodic; + procedure Rekt; + procedure HardTimeout; +end; + +procedure t.Periodic; + begin + write('TestFS: ',job^.state); + if job^.state=stError then write(job^.error,'-',job^.error2); + writeln(' total=',job^.total,' done=',job^.done); + Shedule(800,@Periodic); +end; + +procedure t.Rekt; + begin + writeln('TestFS: rekt'); + UnShedule(@HardTimeout); + UnShedule(@Periodic); + FreeMem(@self,sizeof(self)); +end; + +procedure t.HardTimeout; + begin + writeln('TestFS: hardtimeout'); + //ch.DisposeHook:=@Rekt; + //ch.Close; +end; + +procedure init; + var o:^t; + var oi:word; + var s:tMemoryStream; + const opt='-test-fs'; + var fid:tFID; + begin + oi:=OptIndex(opt); + if oi>0 then begin + assert(OptParamCount(oi)=1,opt+'(rcpt:tNetAddr)'); + writeln('TestFS: rcpt '+paramstr(oi+1)); + new(o); with o^ do begin + Shedule(20000,@HardTimeout); + Shedule(20,@Periodic); + FillChar(fid,sizeof(fid),0); + job:=NewJob(paramstr(oi+1),fid); + if job^.state=stStop then job^.Start; + end; + end; +end; + +BEGIN + init; +END. \ No newline at end of file diff --git a/todo.txt b/todo.txt index 621d336..3896a34 100644 --- a/todo.txt +++ b/todo.txt @@ -1 +1,11 @@ To-Do list +- merge TC and upmgr +- move file sending to therad +- move GetMiss to Download +- handle timeouts in Download +- skip unavailable segments + - upmgr: send avalable subsegment base +- Chat: send aborts retransmission +? Download request random subsegment + (distribution + exercise for recombinator) +- ... diff --git a/upmgr.pas b/upmgr.pas index cdeb627..f4e13a1 100644 --- a/upmgr.pas +++ b/upmgr.pas @@ -152,12 +152,13 @@ procedure tPrv.Cont; begin //writeln('upmgr: CONT! ',chan); Assert(Active and isOpen); - sz:=aggr^.tcs.MaxSize(sizeof(buf))-1; + sz:=aggr^.tcs.MaxSize(sizeof(buf))-5; {1mark+4base} if sz>SegLen then sz:=SegLen; //s.Init(GetMem(sz),0,sz); - Assert((sz+1)<=sizeof(buf)); + Assert((sz+5)<=sizeof(buf)); s.Init(@buf,0,sizeof(buf)); aggr^.tcs.WriteHeaders(s); s.WriteByte(Chan); + s.WriteWord(DWORD(oinfo.Offset),4); Assert(sz<=s.WrBufLen); oinfo.ReadAhead(sz,s.WrBuf); //todo oinfo.WaitRead; -- 2.11.4.GIT