From 2d08f6a22b1f956946e35c3a5f886fbef9753757 Mon Sep 17 00:00:00 2001 From: =?utf8?q?Tom=C3=A1=C5=A1=20Brada?= Date: Tue, 13 Oct 2015 17:23:23 +0200 Subject: [PATCH] WIP UpMgr. --- ZidanStore.pas | 21 +++ opcode.pas | 2 + upmgr.pas | 563 +++++++++++++++++++++++++++++++++++---------------------- 3 files changed, 371 insertions(+), 215 deletions(-) create mode 100644 ZidanStore.pas rewrite upmgr.pas (68%) diff --git a/ZidanStore.pas b/ZidanStore.pas new file mode 100644 index 0000000..68c3691 --- /dev/null +++ b/ZidanStore.pas @@ -0,0 +1,21 @@ +unit ZidanStore; + +INTERFACE + +type +tfid=array [0..20] of byte; +tStoreObjectInfo=object + hnd:file of byte; + final:boolean; + rc:Word; + length:LongWord; + procedure Open(const fid:tfid); +end; + +IMPLEMENTATION +procedure tStoreObjectInfo.Open(const fid:tfid); + begin +end; + +END. + \ No newline at end of file diff --git a/opcode.pas b/opcode.pas index 690b56e..c229dba 100644 --- a/opcode.pas +++ b/opcode.pas @@ -21,6 +21,8 @@ const {FS opcodes} upErrMalformed=1; upErrHiChan=2; upErrChanInUse=3; + upErrNotFound=4; + upErrIO=5; IMPLEMENTATION END. \ No newline at end of file diff --git a/upmgr.pas b/upmgr.pas dissimilarity index 68% index e524c6d..2ffd0f4 100644 --- a/upmgr.pas +++ b/upmgr.pas @@ -1,215 +1,348 @@ -UNIT UPMGR; -{Upload Manager for brodnetd} - -{mission: - - read files - - add demux info - - add sequence/offset - - handle retransmit requests - - handle all requests - - deprioritize/cancel uploads - keep one TC connection per peer (+expire-delete) - => need 'chat' protocol -} - -INTERFACE -USES Chat,TC,opcode,ServerLoop,MemStream,NetAddr; - -IMPLEMENTATION - -type -tAggr_ptr=^tAggr; -tPrv=object - u:byte; -end; -tPrv_ptr=^tPrv; -tAggr=object - idle: boolean; - tcs: tTCS; - ch:^tChat; - prv: array [0..15] of ^tPrv; - cprv:byte;{current} - next,prev: tAggr_ptr; - procedure OnCont; - procedure OnMsg(msg: tSMsg; data: boolean); - procedure Init(var nchat:tChat; msg:tSMsg); - procedure ForceClose; - procedure Done; - procedure IdleTimeout; procedure TCTimeout; procedure CHTimeout(willwait:LongWOrd); - procedure SendTestReply; - procedure ExpandPrv(last:byte); -end; - -var Peers:^tAggr; - -{Requests -Close(); -GET(channel:byte; filehash:20; baseHi:word2; base:word4; limit:word4); -SEG(channel:byte; baseHi:word2; base:word4; limit:word4); -FIN(channel:byte; avail:byte); -}{Responses -INFO(channel:byte; struct); -FAIL(channel:byte; code:byte); -DONE(channel:byte); -} - -procedure tAggr.OnMsg(msg: tSMsg; data: boolean); - var op:byte; - begin - if data then begin - op:=msg.stream.readbyte; - case op of - opcode.upClose: begin - ch^.Ack; - Done; - FreeMem(@self,sizeof(self)); - end; - opcode.upGET: ReqGET(msg); - 99: SendTestReply; - end{case}; - end{data}; -end; - -procedure tAggr.ReqGET(msg:tSMsg); - var chan:byte; - var filehash: array [1..20] of byte; - var basehi:word; - var base:LongWord; - var limit:LongWord; - begin - if msg.stream.RdBufLen<31 then begin - SendError(opcode.upErrMalformed); exit end; - chan:=msg.stream.ReadByte; - if chan>high(prv) then begin - SendError(opcode.upErrHiChan,chan); exit end; - if assigned(prv[chan]) then begin - SendError(opcode.upErrChanInUse,chan); exit end; - msg.stream.Read(FileHash,20); - basehi:=msg.stream.ReadWord(2); - base:=msg.stream.Read(4); - limit:=msg.stream.Read(4); - New(prv[chan]); - ch^.Ack; - with prv[chan]^ do begin - channel:=chan; - aggr:=@self; - Init(filehash,basehi,base,limit); - end; -end; - -procedure tAggr.OnCont; - var pprv:byte; - begin - pprv:=cprv; - repeat - repeat - if cprv>=length(prv) then cprv:=0 else inc(cprv); - if cprv=pprv then begin - idle:=true; - Shedule(15000,@IdleTimeout); - exit; - end; - until prv[cprv].u>0; - {} - until tcs.txLastSize>0; -end; - -procedure tAggr.SendTestReply; - var s:tMemoryStream; - begin - writeln('upmgr: test'); - s.Init(GetMem(56),0,56); - ch^.AddHeaders(s); - s.WriteByte(98); - s.WriteByte(42); - ch^.send(s); -end; - -procedure tAggr.Init(var nchat:tChat; msg:tSMsg); - var i:byte; - begin - writeln('upmgr: init'); - next:=Peers; - prev:=nil; - if assigned(Peers) then Peers^.prev:=@self; - Peers:=@self; - ch:=@nchat; - tcs.Init(msg.source^); - tcs.CanSend:=@OnCont; - tcs.maxTimeout:=8; - tcs.OnTimeout:=@TCTimeout; - for i:=0 to high(prv) do prv[i]:=nil; - cprv:=0; - ch^.Callback:=@OnMsg; - ch^.TMHook:=@CHTimeout; - writeln('upmgr: send ack to init'); - ch^.Ack; - idle:=true; - Shedule(15000,@IdleTimeout); -end; - -procedure tAggr.IdleTimeout; - begin if not idle then exit; - writeln('Idle Timeout'); - ForceClose end; -procedure tAggr.TCTimeout; - begin - writeln('TCTimeout'); - ForceClose end; -procedure tAggr.CHTimeout(willwait:LongWOrd); - begin if willwait<30000 then exit; - writeln('ChatTimeout'); - ForceClose end; - -procedure tAggr.ForceClose; - var s:tMemoryStream; - begin - writeln('upmgr: force close'); - s.Init(GetMem(56),0,56); - ch^.AddHeaders(s); - s.WriteByte(opcode.upClose); - s.WriteByte(22); - try - ch^.send(s); - except end; - Done; {fixme sheduler} - FreeMem(@self,sizeof(self)); -end; - -procedure tAggr.Done; - begin - writeln('upmgr: close'); - ch^.Close; - tcs.Done; - UnShedule(@IdleTimeout); - if assigned(prev) then prev^.next:=next else Peers:=next; - if assigned(next) then next^.prev:=prev; -end; - -function FindAggr({const} addr:tNetAddr): tAggr_ptr; - begin - result:=Peers; - while assigned(result) do begin - if result^.tcs.remote=addr then exit; - assert(result^.prev=result); - result:=result^.next; - end; -end; - -procedure ChatHandler(var nchat:tChat; msg:tSMsg); - var dup:^tAggr; - begin - {check dup} - dup:=FindAggr(msg.source^); - if assigned(dup) then begin - Dup^.ForceClose; - Dup^.Done; - end else begin - New(dup); - end; - Dup^.Init(nchat,msg); -end; - -BEGIN - SetChatHandler(opcode.upFileServer,@ChatHandler); -END. \ No newline at end of file +UNIT UPMGR; +{Upload Manager for brodnetd} + +INTERFACE +USES Chat,TC,opcode,ServerLoop,MemStream,NetAddr; + +IMPLEMENTATION +USES ZidanStore; + +type +tAggr_ptr=^tAggr; +tPrv_ptr=^tPrv; +tPrv=object + aggr:tAggr_ptr; + ch: ^tChat; + chan: byte; + next,prev: tPrv_ptr; + weight,wcur:Word; + isOpen,Active:boolean; + seglen:LongWord; + datafile:file of byte; + procedure Init(ag:tAggr_ptr; var nchat:tChat; msg: tSMsg); + procedure OnMsg(msg:tSMsg; data:boolean); + procedure IdleTimeout; + procedure ChatTimeout(willwait:LongWord); + procedure Cont; + procedure DoGET(const fid:tfid; base,limit:LongWord); + procedure DoSEG(base,limit:LongWord); + procedure Start; + procedure Stop; + procedure DoClose; +end; +tAggr=object + tcs: tTCS; + rekt:boolean; + next,prev: tAggr_ptr; + prv:^tPrv; + Cnt:Byte; + procedure UnRef; + procedure Cont; + procedure Init(const source:tNetAddr); + procedure Done; + procedure TCTimeout; +end; + +var Peers:^tAggr; + +{Requests +Close(); +GET(filehash:20; baseHi:word2; base:word4; limit:word4); +SEG(baseHi:word2; base:word4; limit:word4); +}{Responses +INFO(sizeHi:word2; size:word4; final:byte; seglen:word4); +FAIL(code:byte;...); +DONE(); +} + +procedure tPrv.DoGET(const fid:tfid; base,limit:LongWord); + var err:tmemorystream; + var info:tStoreObjectInfo; + begin + Assert(not(active or isOpen)); //todo + ch^.Ack; + info.Open(fid); + if not info.final then begin + info.rc:=200; + Close(info.hnd); + end; + if info.rc>0 then begin + ch^.StreamInit(err,3); + err.WriteByte(upFAIL); + if (info.rc=1)or(not info.final) then err.WriteByte(upErrNotFound) + else begin err.WriteByte(upErrIO); err.WriteByte(info.rc) end; + ch^.Send(err); + end else begin + ch^.StreamInit(err,12); + err.WriteByte(upINFO); + datafile:=info.hnd; + isopen:=true; + err.WriteWord(0,2); + err.WriteWord(info.length,4); + seglen:=limit; + if info.length@self then begin + prev^.next:=next; + next^.prev:=prev; + end else next:=nil; + if aggr^.prv=@self then aggr^.prv:=next; + active:=false; + Shedule(20000,@IdleTimeout); +end; + +procedure tPrv.Cont; + var s:tMemoryStream; + var sz:LongWord; + var rs:LongWord; + var buf:array [1..4096] of byte; + begin + Assert(Active and isOpen); + sz:=SegLen; + if SegLen>high(buf) then sz:=high(buf) else sz:=SegLen; + sz:=aggr^.tcs.MaxSize(sz); + //s.Init(GetMem(sz),0,sz); + s.Init(@buf,0,sz); + aggr^.tcs.WriteHeaders(s); + Assert(s.WrBufLen=sz); //really? + BlockRead(datafile,s.WrBuf^,s.WrBufLen,rs); + s.WrEnd(rs); + Assert(RS=s.WrBufLen);//todo + aggr^.tcs.Send(s); + //FreeMem(s.base,s.size); + SegLen:=SegLen-sz; + dec(wcur); + if wcur=0 then begin + wcur:=weight; + aggr^.prv:=next; + end; +end; + +procedure tPrv.DoClose; + begin + if Active then Stop; + if isOpen then Close(datafile); + isOpen:=false; + UnShedule(@IdleTimeout); + ch^.Ack; + ch^.Close; + aggr^.UnRef; + FreeMem(@self,sizeof(self)); +end; + +procedure tPrv.OnMsg(msg:tSMsg; data:boolean); + var op:byte; + var hash:tfid; + var base:LongWord; + var limit:LongWord; + var err:tmemorystream; + label malformed; + begin + if not data then exit; //todo + Assert(not(aggr^.rekt and active)); + if aggr^.rekt then exit; + if msg.stream.RdBufLen<1 then goto malformed; + op:=msg.stream.ReadByte; + case op of + upClose: DoClose; + upGET: begin + if msg.stream.RdBufLen<>30 then goto malformed; + msg.stream.Read(hash,20); + if msg.stream.ReadWord(2)>0 then goto malformed; + base:=msg.stream.ReadWord(4); + limit:=msg.stream.ReadWord(4); + DoGet(hash,base,limit); + end; +{ upSEG: begin + if msg.stream.RdBufLen<10 then goto malformed; + if msg.stream.ReadWord(2)>0 then goto malformed; + base:=msg.stream.ReadWord(4); + limit:=msg.stream.ReadWord(4); + DoSEG(base, limit); + end; +} + else goto malformed; + end; + exit; malformed: + ch^.StreamInit(err,2); + err.WriteByte(upFAIL); + err.WriteByte(upErrMalformed); + ch^.Send(err); +end; + +procedure tPrv.ChatTimeout(willwait:LongWord); + var wasactive:boolean; + begin + if WillWait<30000 then exit; + wasactive:=active; + if Active then Stop; + if isOpen then Close(datafile); + isOpen:=false; + ch^.Close; + ch:=nil; + if wasactive then IdleTimeout {else it is sheduled}; +end; +procedure tPrv.IdleTimeout; + var err:tMemoryStream; + begin + if assigned(ch) then begin {chat is still not rekt} + ch^.StreamInit(err,1); + err.WriteByte(upClose); + ch^.Send(err); + ch^.Close; + end; + if Active then Stop; + if isOpen then Close(datafile); + aggr^.UnRef; + FreeMem(@self,sizeof(self)); +end; + +procedure tPrv.Init(ag:tAggr_ptr; var nchat:tChat; msg: tSMsg); + begin + ch:=@nchat; + ch^.Callback:=@OnMsg; + ch^.TMHook:=@ChatTimeout; + aggr:=ag; + next:=nil; + prev:=nil; + chan:=msg.stream.readbyte; + weight:=100; + wcur:=0; + isOpen:=false; Active:=false; + inc(aggr^.Cnt); + Shedule(15000,@IdleTimeout); + OnMsg(msg,true); +end; + +procedure tAggr.Init(const source:tNetAddr); + begin + writeln('upmgr: init'); + next:=Peers; + prev:=nil; + rekt:=false; + if assigned(Peers) then Peers^.prev:=@self; + Peers:=@self; + tcs.Init(source); + tcs.CanSend:=@Cont; + tcs.maxTimeout:=8; + tcs.OnTimeout:=@TCTimeout; + prv:=nil; + cnt:=0; +end; + +procedure tAggr.TCTimeout; + var pprv:pointer; + begin + writeln('TCTimeout'); + while assigned(prv) do begin + pprv:=prv; + prv^.IdleTimeout; + Assert(pprv<>prv); + end; + Done; +end; +procedure tAggr.Cont; + begin + assert(assigned(prv)); + prv^.Cont; +end; +procedure tAggr.UnRef; + begin + Assert(cnt>0); + Dec(Cnt); + if cnt=0 then begin + Done; + FreeMem(@self,sizeof(self)); + end; +end; +procedure tAggr.Done; + begin + if rekt then exit; + writeln('upmgr: close'); + rekt:=true; + tcs.Done; + if assigned(prev) then prev^.next:=next else Peers:=next; + if assigned(next) then next^.prev:=prev; +end; + +function FindAggr({const} addr:tNetAddr): tAggr_ptr; + begin + result:=Peers; + while assigned(result) do begin + if result^.tcs.remote=addr then exit; + assert(result^.prev=result); + result:=result^.next; + end; +end; + +procedure ChatHandler(var nchat:tChat; msg:tSMsg); + var ag:^tAggr; + var pr:^tPrv; + var s:tMemoryStream; + const cMax=16; + begin + if msg.stream.RdBufLen<2 then begin + writeln('upmgr: malformed init'); + nchat.StreamInit(s,16); + s.WriteByte(upFAIL); + s.writebyte(upErrMalformed); + nchat.Send(s); + nchat.Close; + exit end; + {first get the ag} + ag:=FindAggr(msg.source^); + if assigned(ag) then begin + {check} + if ag^.Cnt>=cMax then begin + nchat.StreamInit(s,16); + s.WriteByte(upFAIL); + s.WriteByte(upErrHiChan); + s.WriteByte(cMax); + s.WriteByte(ag^.Cnt); + nchat.Send(s); + nchat.Close; + exit end; + end else begin + New(ag); + ag^.init(msg.source^); + end; + New(pr); + pr^.Init(ag,nchat,msg); +end; + +BEGIN + SetChatHandler(opcode.upFileServer,@ChatHandler); +END. \ No newline at end of file -- 2.11.4.GIT