From a2372587f2b74d0bdd97b6f04545ecf25c2c5755 Mon Sep 17 00:00:00 2001 From: =?utf8?q?Tom=C3=A1=C5=A1=20Brada?= Date: Fri, 16 Oct 2015 15:03:54 +0200 Subject: [PATCH] UpMgr work + Tests and temp store. --- TestFS.pas | 52 ++++++++++++++++++++++++++++++++++++---------------- ZidanStore.pas | 19 +++++++++++++++++++ upmgr.pas | 51 ++++++++++++++++++++++++++++++++++----------------- 3 files changed, 89 insertions(+), 33 deletions(-) diff --git a/TestFS.pas b/TestFS.pas index 0f26327..d7c7f8b 100644 --- a/TestFS.pas +++ b/TestFS.pas @@ -15,34 +15,49 @@ type t=object end; procedure t.ST1(msg:tSMsg; data:boolean); + var r:tMemoryStream absolute msg.stream; var s:tMemoryStream; + var op:byte; begin + {reply from GET request} write('TestFS: ST1 reply from FS: '); if not data then begin writeln('ack'); - s.init(GetMem(56),0,56); - ch.AddHeaders(s); - s.WriteByte(99); - ch.Send(s); - ch.Callback:=@ST2; - halt(32); - end else writeln('unexpected data'); + end else begin + ch.Ack; + op:=msg.stream.ReadByte; + if op=upFAIL then writeln('FAIL ',r.readbyte{,'-',r.readbyte}) + else if op=upINFO then begin + r.skip(2); + writeln('INFO size=',r.ReadWord(4),' final=',r.readbyte,' seg=',r.readword(4)); + ch.Callback:=@ST2; + end else if op=upClose then writeln('CLOSE') + else writeln('unknown'); + end; end; procedure t.ST2(msg:tSMsg; data:boolean); + var r:tMemoryStream absolute msg.stream; var s:tMemoryStream; + var op:byte; begin + {Status Message} write('TestFS: ST2 reply from FS: '); if data then begin - writeln(msg.stream.ReadByte,'-',msg.stream.ReadByte); - s.init(GetMem(56),0,56); - ch.AddHeaders(s); - s.WriteByte(opcode.upClose); - ch.Send(s); - ch.Callback:=@ST3; - end else writeln('ack'); + ch.Ack; + op:=msg.stream.ReadByte; + if op=upCLOSE then writeln('CLOSE ') + else if op=upDONE then begin + writeln('DONE'); + ch.streaminit(s,1); + s.WriteByte(opcode.upClose); + ch.Send(s); + ch.Callback:=@ST3; + end; + end else writeln('ack (unexpected)'); end; procedure t.ST3(msg:tSMsg; data:boolean); begin + {ACK to Close} write('TestFS: ST3 reply from FS: '); if data then writeln('unepected data') else begin writeln('ack'); @@ -79,9 +94,14 @@ procedure init; ch.Init(paramstr(oi+1)); ch.Callback:=@ST1; Shedule(7000,@HardTimeout); - s.init(GetMem(56),0,56); - ch.AddHeaders(s); + ch.streaminit(s,33); s.WriteByte(opcode.upFileServer); + s.WriteByte({channel}99); + s.WriteByte(opcode.upGET); + s.Skip(20); + s.WriteWord(0,2); + s.WriteWord(0,4); + s.WriteWord($FFFFFFFF,4); ch.Send(s); end; end; diff --git a/ZidanStore.pas b/ZidanStore.pas index 68c3691..b698894 100644 --- a/ZidanStore.pas +++ b/ZidanStore.pas @@ -9,12 +9,31 @@ tStoreObjectInfo=object final:boolean; rc:Word; length:LongWord; + seglen:longword; + procedure Open(const fid:tfid); + procedure Close; + procedure SegSeek(ofs:LongWord); end; IMPLEMENTATION procedure tStoreObjectInfo.Open(const fid:tfid); begin + rc:=0; + final:=true; + length:=65000; + Assign(hnd,'/dev/urandom'); + Reset(hnd); +end; +procedure tStoreObjectInfo.Close; + begin +end; +procedure tStoreObjectInfo.SegSeek(ofs:longword); + begin + if ofs=0 then begin + rc:=0; + seglen:=65000; + end else rc:=7; end; END. diff --git a/upmgr.pas b/upmgr.pas index 12359f4..c4480d3 100644 --- a/upmgr.pas +++ b/upmgr.pas @@ -119,10 +119,12 @@ procedure tPrv.Start; Assert(not Active); Active:=true; UnShedule(@IdleTimeout); + writeln('upmgr: Startig transfer'); if not assigned(aggr^.prv) then begin next:=@self; prev:=@self; aggr^.prv:=@self; + aggr^.tcs.Start; end else begin next:=aggr^.prv^.next; prev:=aggr^.prv; @@ -143,6 +145,7 @@ procedure tPrv.Stop; if aggr^.prv=@self then aggr^.prv:=next; active:=false; Shedule(20000,@IdleTimeout); + writeln('upmgr: Stop'); end; procedure tPrv.Cont; @@ -151,17 +154,17 @@ procedure tPrv.Cont; var rs:LongWord; var buf:array [1..2048] of byte; begin + writeln('upmgr: CONT! ',chan); Assert(Active and isOpen); - sz:=SegLen; - if SegLen>high(buf) then sz:=high(buf) else sz:=SegLen; - sz:=aggr^.tcs.MaxSize(sz); + sz:=aggr^.tcs.MaxSize(sizeof(buf))-1; + if sz>SegLen then sz:=SegLen; //s.Init(GetMem(sz),0,sz); - s.Init(@buf,0,sz); - aggr^.tcs.WriteHeaders(s); - Assert(s.WrBufLen=sz); //really? - BlockRead(datafile,s.WrBuf^,s.WrBufLen,rs); - s.WrEnd(rs); - Assert(RS=s.WrBufLen);//todo + Assert((sz+1)<=sizeof(buf)); + s.Init(@buf,0,sizeof(buf)); aggr^.tcs.WriteHeaders(s); + s.WriteByte(Chan); + Assert(sz<=s.WrBufLen); + BlockRead(datafile,s.WrBuf^,sz,rs); s.WrEnd(rs); + Assert(RS=sz);//todo aggr^.tcs.Send(s); //FreeMem(s.base,s.size); SegLen:=SegLen-sz; @@ -203,6 +206,7 @@ procedure tPrv.OnMsg(msg:tSMsg; data:boolean); if aggr^.rekt then exit; if msg.stream.RdBufLen<1 then goto malformed; op:=msg.stream.ReadByte; + writeln('upmgr: opcode=',op,' sz=',msg.stream.RdBufLen); case op of upClose: DoClose; upGET: begin @@ -227,12 +231,14 @@ procedure tPrv.OnMsg(msg:tSMsg; data:boolean); err.WriteByte(upFAIL); err.WriteByte(upErrMalformed); ch^.Send(err); + writeln('upmgr: malformed request stage=1'); end; procedure tPrv.ChatTimeout(willwait:LongWord); var wasactive:boolean; begin - if WillWait<30000 then exit; + if WillWait<8000 then exit; + writeln('upmgr: Chat timeout'); wasactive:=active; if Active then Stop; if isOpen then oinfo.Close; @@ -245,13 +251,17 @@ procedure tPrv.IdleTimeout; var err:tMemoryStream; begin if assigned(ch) then begin {chat is still not rekt} + if not active then writeln('upmgr: Idle timeout'); ch^.StreamInit(err,1); err.WriteByte(upClose); - ch^.Send(err); + try ch^.Send(err); + except end; ch^.Close; end; - Assert(not Active); {is idle} + {it is idle timeout, but may be called from aggr it tc tiomes out} + if Active then Stop; if isOpen then oinfo.Close; {may still be open} + UnShedule(@IdleTimeout); aggr^.UnRef; FreeMem(@self,sizeof(self)); end; @@ -265,6 +275,7 @@ procedure tPrv.Init(ag:tAggr_ptr; var nchat:tChat; msg: tSMsg); next:=nil; prev:=nil; chan:=msg.stream.readbyte; + writeln('upmgr: prv init chan=',chan); weight:=100; wcur:=0; isOpen:=false; Active:=false; @@ -292,23 +303,26 @@ end; procedure tAggr.TCTimeout; var pprv:pointer; begin - writeln('TCTimeout'); + writeln('upmgr: TCTimeout'); while assigned(prv) do begin + assert(not rekt); pprv:=prv; prv^.IdleTimeout; + if rekt then exit; Assert(pprv<>prv); end; Done; end; procedure tAggr.Cont; begin - assert(assigned(prv)); + if not assigned(prv) then exit; prv^.Cont; end; procedure tAggr.UnRef; begin Assert(cnt>0); Dec(Cnt); + writeln('upmgr: aggr unrefd'); if cnt=0 then begin Done; FreeMem(@self,sizeof(self)); @@ -316,8 +330,8 @@ procedure tAggr.UnRef; end; procedure tAggr.Done; begin - if rekt then exit; - writeln('upmgr: close'); + assert(not rekt); + writeln('upmgr: aggr close'); rekt:=true; tcs.Done; if assigned(prev) then prev^.next:=next else Peers:=next; @@ -328,8 +342,8 @@ function FindAggr({const} addr:tNetAddr): tAggr_ptr; begin result:=Peers; while assigned(result) do begin + if assigned(result^.next) then assert(result^.next^.prev=result); if result^.tcs.remote=addr then exit; - assert(result^.prev=result); result:=result^.next; end; end; @@ -340,6 +354,8 @@ procedure ChatHandler(var nchat:tChat; msg:tSMsg); var s:tMemoryStream; const cMax=16; begin + writeln('upmgr: ChatHandler'); + msg.stream.skip({the initcode}1); if msg.stream.RdBufLen<2 then begin writeln('upmgr: malformed init'); nchat.StreamInit(s,16); @@ -347,6 +363,7 @@ procedure ChatHandler(var nchat:tChat; msg:tSMsg); s.writebyte(upErrMalformed); nchat.Send(s); nchat.Close; + writeln('upmgr: malformed request stage=0'); exit end; {first get the ag} ag:=FindAggr(msg.source^); -- 2.11.4.GIT