From 1f7bfbeebdcc976f3f00ff3207553397b28ed7e9 Mon Sep 17 00:00:00 2001 From: =?utf8?q?Tom=C3=A1=C5=A1=20Brada?= Date: Fri, 6 Nov 2015 12:37:10 +0100 Subject: [PATCH] Merge TC into Upload. fileserver not ported. --- TC.pas | 247 ---------------------------------------- Upload.pas | 265 +++++++++++++++++++++++++++++++++++++++++++ upmgr.pas | 378 ------------------------------------------------------------- 3 files changed, 265 insertions(+), 625 deletions(-) delete mode 100644 TC.pas create mode 100644 Upload.pas delete mode 100644 upmgr.pas diff --git a/TC.pas b/TC.pas deleted file mode 100644 index 327adc4..0000000 --- a/TC.pas +++ /dev/null @@ -1,247 +0,0 @@ -unit TC; -{TransmissionControll over UDP -some dg larger -if pass set payload to that - -useful for file transfer, voip should only consult the current rate -and detect congestion based on latency - -Used by UploadManager. 1 TC per peer. - -Suspend: return from CanSend without sending :) -Resume: call start - -opcodes: - data=4 - mark:1;payload:XX - data-no-report=8 - data-imm-ack=6 - cont=5 - mark:1;rate:Word4(shr 6) - ack=7 - mark:1;len:Word2 -} -INTERFACE -uses MemStream,NetAddr,ServerLoop,opcode; - -type tTCSSe=record - Rate:Real; {sending rate} - Size:word; {datagram size} - RateIF:single; {rate increase fraction} - SizeIF:single; {size increase fraction} - {new:=old+old*IF} - end; - -type tTCS=object {this is sender part} - {in order methods should be set/called} - procedure Init(const iremote:tNetAddr); {set defaults for vars} - public - remote:tNetAddr; - Mark:byte; - MarkStart:tMTime; {when the mark was started} - MarkData:LongWord; {how much data sent} - txLastSize:Word; {is zero if suspend} - siMark:byte; - siNow,siWait:boolean; - isTimeout,maxTimeout:word; - Cur:tTCSSe; {current values} - Limit:tTCSSe; {maximum alloved} - Initial:tTCSSe; {after start/timeout} - minRateIF:single; {used after rate decrease} - CanSend: procedure of object; {called when transmit possible} - OnTimeout: procedure of object; - procedure Start; {start the transmission} - function MaxSize(req:word):word; - procedure WriteHeaders(var s:tMemoryStream); {add headers before the data} - procedure Send(var s:tMemoryStream); - procedure Done; {unregister all callbacks} - private - {timer callbacks} - procedure TransmitDelay; - procedure Timeout; - procedure OnCont(msg:ServerLoop.tSMsg); - procedure OnAck(msg:ServerLoop.tSMsg); - end; - -IMPLEMENTATION - -procedure tTCS.Init(const iremote:tNetAddr); - begin - remote:=iremote; - SetMsgHandler(opcode.tccont,remote,@OnCont); - SetMsgHandler(opcode.tceack,remote,@OnAck); - Limit.Rate:=2*1024*1024*1024; {2GB} - Limit.Size:=4096; - Limit.RateIF:=4; - Limit.SizeIF:=2; - Initial.Rate:={20*}1024; - Initial.Size:=32+5; - Initial.RateIF:=0.5; - Initial.SizeIF:=2; - minRateIF:=0.01; - CanSend:=nil; - OnTimeout:=nil; - maxTimeout:=65535; - Cur:=Initial; - txLastSize:=0; -end; - -procedure tTCS.Start; {start the transmission} - begin - Assert(assigned(CanSend) ); Assert(not remote.isnil); - assert(txLastSize=0); - mark:=Random(256); MarkData:=0; - siMark:=0; - isTimeout:=0; - Shedule(80,@TransmitDelay); - Shedule(3000,@Timeout); -end; - -function tTCS.MaxSize(req:word):word; - begin - req:=req-2;{headers} - if siNow - then result:=round(cur.Size*(1+cur.SizeIF)) - else result:=cur.Size; - dec(result,2); - if result>req then result:=req; -end; - -procedure tTCS.WriteHeaders(var s:tMemoryStream); - begin - if siNow then begin - s.WriteByte(opcode.tcdataimm);{opcode} - s.WriteByte(siMark); - end else if isTimeout=0 then begin - s.WriteByte(opcode.tcdata);{opcode} - s.WriteByte(mark); - end else begin - s.WriteByte(opcode.tcdataimm);{opcode} - s.WriteByte(simark); - end; -end; - -procedure tTCS.Send(var s:tMemoryStream); - begin - ServerLoop.SendMessage(s.base^,s.length,remote); - if MarkData=0 then begin - MarkStart:=mNow; - MarkData:=1; - end else MarkData:=MarkData+s.length; - txLastSize:=s.length; - siNow:=false; -end; - -procedure tTCS.OnCont(msg:ServerLoop.tSMsg); - var RateFill:single; - var txRate:real; - var rxRate:real; - var rmark:byte; - var rrate:longword; - var opcode:byte; - begin - opcode:=msg.stream.ReadByte; {skip opcode} - rmark:=msg.stream.ReadByte; - assert(opcode=5); - rrate:=msg.stream.ReadWord(4); - if (rmark=Mark) then begin - rxRate:=(rrate*64); {B/s} - txRate:=MarkData/((mNow-MarkStart)/1000{*SecsPerDay}); - RateFill:=rxRate/txRate; - write('speed: ',(rxRate/1024):1:3,'kB/s (',(RateFill*100):3:1,'% of ',txRate/1024:1:3,'), '); - UnShedule(@Timeout); - Shedule(2000,@Timeout); - if RateFill<0.85 then begin - write('limit, '); - cur.Rate:=rxrate; - cur.RateIF:=minRateIF; - end else - if (txRate/cur.Rate)<0.7 then begin - write('3hard, '); - end else begin - write('pass, '); - cur.Rate:=txRate*(cur.RateIF+1); - if cur.Rate>limit.Rate then cur.Rate:=Limit.Rate - else cur.RateIF:=cur.RateIF*2; - if cur.RateIF>limit.RateIF then cur.RateIF:=Limit.RateIF; - end; - repeat mark:=Random(256) until (mark<>rMark); - MarkData:=0; - writeln('-> ',(Cur.Rate/1024):1:4,'kB/s if=',cur.RateIF:6:4); - if siWait then begin - cur.SizeIF:=cur.SizeIF/2; - end; - siMark:=0; -end end; - -procedure tTCS.OnAck(msg:ServerLoop.tSMsg); - var rmark:byte; - var rsize:word; - var opcode:byte; - begin - opcode:=msg.stream.ReadByte; {skip opcode} - rmark:=msg.stream.ReadByte; - assert(opcode=7); - rsize:=msg.stream.ReadWord(2); - if rmark<>simark then exit; - if isTimeout>0 then begin - Shedule(80,@TransmitDelay); - isTimeout:=0; - end else - if rsize>cur.size then begin - writeln('size inc to ',rsize); - cur.SizeIF:=((rSize/cur.Size)-1)*2; - if cur.SizeIF>Limit.SizeIF then Cur.SizeIF:=Limit.SizeIF; - if (rsize/cur.rate)<=0.3 then cur.size:=rSize; {use new size for all transmit} - end; - if rsize>=cur.size then siWait:=false; -end; - -procedure tTCS.Timeout; - begin - if txLastSize=0 then exit; {suspend} - cur:=initial; - mark:=Random(256); MarkData:=0; - siMark:=0; - Inc(isTimeout); - if (isTimeout>maxTimeout)and assigned(OnTimeout) then OnTimeout; - Shedule(80,@TransmitDelay); - Shedule(3000,@Timeout); -end; - -procedure tTCS.TransmitDelay; - var txwait:real; - var burst:word; - begin - txwait:=0; - burst:=0; - if (siMark=0)and(cur.Size0) then exit; {no burst, no shedule next} - //txwait:=txwait+(txLastSize/cur.rate); - txwait:=(MarkData/cur.Rate)-((mNow-MarkStart)/1000{*SecsPerDay}); - inc(burst); - siNow:=false; - until (txwait>0.02)or(burst>200); - if txwait<0.02 then txwait:=0.01; - //writeln(txwait:1:3,burst); - Shedule(round(txwait*1000),@TransmitDelay); -end; - -procedure tTCS.Done; {unregister all callbacks} - begin - UnShedule(@TransmitDelay); - UnShedule(@Timeout); - SetMsgHandler(5,remote,nil); - SetMsgHandler(7,remote,nil); -end; - -BEGIN -END. diff --git a/Upload.pas b/Upload.pas new file mode 100644 index 0000000..f6e008b --- /dev/null +++ b/Upload.pas @@ -0,0 +1,265 @@ +UNIT Upload; +{Upload Manager for brodnetd} + +INTERFACE +USES Chat,opcode,ServerLoop,MemStream,NetAddr,Store1; + +IMPLEMENTATION + +type +tAggr_ptr=^tAggr; +tPrv_ptr=^tPrv; +tPrv=object + aggr:tAggr_ptr; + ch: ^tChat; + chan: byte; + next,prev: tPrv_ptr; + weight,wcur:Word; + isOpen,Active:boolean; + seglen:LongWord; + oinfo:tStoreObjectInfo; + {procedure Init(ag:tAggr_ptr; var nchat:tChat; msg: tSMsg); + procedure OnMsg(msg:tSMsg; data:boolean); + procedure Cont; + procedure DoGET(const fid:tfid; base,limit:LongWord); + procedure DoSEG(base,limit:LongWord); + procedure DoClose; + procedure Start; + procedure Stop; + procedure ChatTimeout(willwait:LongWord); + procedure Close;} +end; +tAggrThr=object + thrid:tThreadID; + crit:tRtlCriticalSection; + stop:ByteBool; + remote:tSockAddrL; + prv1:^tPrv; + size1,size2:Word; + mark1,mark2:Byte; + rate:Single; + burst:byte; + MarkData:LongWord; + MarkStart:tMTime; +end; +tAggr=object + thr:tAggrThr; + remote:tNetAddr; + refc:byte; + + acks:Word; {ack counter, for timeouts} + timeout:word; + rateIF,sizeIF, + limRate,limSize:Single; + + next,prev: tAggr_ptr; + procedure Start(sprv:tPrv_ptr); + procedure Stop(sprv:tPrv_ptr); + procedure Free; + procedure Init(const source:tNetAddr); + procedure CalcRates(rxRate:Single); + procedure Periodic; + procedure OnCont(msg:tSMsg); + procedure OnAck(msg:tSMsg); +end; + +var Peers:^tAggr; + +procedure tAggr.Init(const source:tNetAddr); + begin + writeln('upmgr: init'); + next:=Peers; + prev:=nil; + if assigned(Peers) then Peers^.prev:=@self; + Peers:=@self; + refc:=0; + acks:=0; + timeout:=0; + rateIF:=1; + sizeIF:=1; + limRate:=20*1024*1024; + limSize:=4096; + InitCriticalSection(thr.crit); + source.ToSocket(thr.remote); + remote:=source; + thr.prv1:=nil; +end; + +function ThrStart(p:pointer):LongInt; + begin with tAggrThr(p^) do begin + { ( read; ) send; wait; repeat} +end end; + +procedure tAggr.Start(sprv:tPrv_ptr); + begin + if assigned(thr.prv1) then begin + sprv^.next:=thr.prv1^.next; + sprv^.prev:=thr.prv1; + sprv^.prev^.next:=sprv; + sprv^.next^.prev:=sprv; + end else begin + sprv^.next:=sprv; + sprv^.prev:=sprv; + thr.prv1:=sprv; + thr.stop:=false; + thr.MarkData:=0; + thr.MarkStart:=mNow; + CalcRates(20480); + thr.thrid:=BeginThread(@ThrStart,@thr{,var,stack}); + end; + sprv^.wcur:=sprv^.weight; +end; + +procedure tAggr.CalcRates(rxRate:Single); + var txRate:Single; + var RateFill:Single; + var pMark:byte; + const limRateIF=3; + begin + EnterCriticalSection(thr.crit); + pMark:=thr.mark1; + if thr.MarkStart=0 + then begin txRate:=rxRate; thr.Rate:=rxRate end + else txRate:=thr.MarkData/((mNow-thr.MarkStart)/1000{*SecsPerDay}); + RateFill:=rxRate/txRate; + write('speed: ',(rxRate/1024):1:3,'kB/s (',(RateFill*100):3:1,'% of ',txRate/1024:1:3,'), '); + if RateFill<0.85 then begin + writeln('limit'); + if RateFill<0.5 then thr.size1:=128; + thr.Rate:=rxRate; + RateIF:=RateIF/2; + repeat thr.mark1:=Random(256) until (thr.mark1<>pMark); + thr.MarkData:=0; {include on-the-wire data if increasing} + end else + if (txRate/thr.Rate)>0.7 then begin + writeln('pass'); + thr.Rate:=1+txRate*(RateIF+1); + if thr.Rate>limRate then thr.Rate:=limRate + else RateIF:=RateIF*2; + if RateIF>limRateIF then RateIF:=LimRateIF; + end; + if (thr.Rate/thr.size1)<4 then thr.size1:=thr.Rate/5; + if thr.size1<120 then thr.size1:=128; + sizeIF:=sizeIF/2; + thr.size2:=round(thr.size1*(1+SizeIF)); + thr.mark2:=Random(256); + LeaveCriticalSection(thr.crit); +end; + +procedure tAggr.Periodic; + begin + Shedule(2000,@Periodic); + if acks=0 then begin + inc(Timeout); + CalcRates(512); + end; + acks:=0; + Shedule(2000,@Periodic); +end; + +procedure tAggr.OnCont(msg:tSMsg); + var op,rmark:byte; + var rRate:LongWord; + begin + op:=msg.stream.readbyte; + assert(op=opcode.tccont); + rmark:=msg.stream.readbyte; + if rmark=thr.mark1 then begin + inc(acks); + timeout:=0; + rrate:=msg.stream.readword(4); + CalcRates(rRate*64); + end; +end; + +procedure tAggr.OnAck(msg:tSMsg); + var op,rmark:byte; + var rSize:LongWord; + const limSizeIF=1; + begin + op:=msg.stream.readbyte; + assert(op=opcode.tceack); + rmark:=msg.stream.readbyte; + rsize:=msg.stream.readword(2); + if (rmark<>thr.mark2)and(rmark<>thr.mark1) then exit; + inc(acks); + Timeout:=0; + if rsizelimSizeIF then SizeIF:=limSizeIF; + EnterCriticalSection(thr.crit); + thr.size1:=rSize; + thr.size2:=round(thr.size1*(1+SizeIF)); + thr.mark2:=Random(256); + LeaveCriticalSection(thr.crit); +end; + +procedure tAggr.Stop(sprv:tPrv_ptr); + begin +end; + +procedure tAggr.Free; + begin + Assert(refc>0); + Dec(refc); + if refc=0 then begin + writeln('upmgr: aggr close'); + DoneCriticalSection(thr.crit); + if assigned(prev) then prev^.next:=next else Peers:=next; + if assigned(next) then next^.prev:=prev; + FreeMem(@self,sizeof(self)); + end else + writeln('upmgr: aggr unrefd'); +end; + +function FindAggr({const} addr:tNetAddr): tAggr_ptr; + begin + result:=Peers; + while assigned(result) do begin + if assigned(result^.next) then assert(result^.next^.prev=result); + if result^.remote=addr then exit; + result:=result^.next; + end; +end; + +procedure ChatHandler(var nchat:tChat; msg:tSMsg); + var ag:^tAggr; + var pr:^tPrv; + var s:tMemoryStream; + const cMax=16; + begin + writeln('upmgr: ChatHandler'); + msg.stream.skip({the initcode}1); + if msg.stream.RdBufLen<2 then begin + writeln('upmgr: malformed init'); + nchat.StreamInit(s,16); + s.WriteByte(upFAIL); + s.writebyte(upErrMalformed); + nchat.Send(s); + nchat.Close; + writeln('upmgr: malformed request stage=0'); + exit end; + {first get the ag} + ag:=FindAggr(msg.source^); + if assigned(ag) then begin + {check} + if ag^.refc>=cMax then begin + nchat.StreamInit(s,16); + s.WriteByte(upFAIL); + s.WriteByte(upErrHiChan); + s.WriteByte(cMax); + s.WriteByte(ag^.refc); + nchat.Send(s); + nchat.Close; + exit end; + end else begin + New(ag); + ag^.init(msg.source^); + end; + New(pr); + //pr^.Init(ag,nchat,msg); +end; + +BEGIN + SetChatHandler(opcode.upFileServer,@ChatHandler); +END. \ No newline at end of file diff --git a/upmgr.pas b/upmgr.pas deleted file mode 100644 index f4e13a1..0000000 --- a/upmgr.pas +++ /dev/null @@ -1,378 +0,0 @@ -UNIT UPMGR; -{Upload Manager for brodnetd} - -INTERFACE -USES Chat,TC,opcode,ServerLoop,MemStream,NetAddr; - -IMPLEMENTATION -USES Store1; - -type -tAggr_ptr=^tAggr; -tPrv_ptr=^tPrv; -tPrv=object - aggr:tAggr_ptr; - ch: ^tChat; - chan: byte; - next,prev: tPrv_ptr; - weight,wcur:Word; - isOpen,Active:boolean; - seglen:LongWord; - oinfo:tStoreObjectInfo; - procedure Init(ag:tAggr_ptr; var nchat:tChat; msg: tSMsg); - procedure OnMsg(msg:tSMsg; data:boolean); - procedure Cont; - procedure DoGET(const fid:tfid; base,limit:LongWord); - procedure DoSEG(base,limit:LongWord); - procedure DoClose; - procedure Start; - procedure Stop; - procedure ChatTimeout(willwait:LongWord); - procedure Close; -end; -tAggr=object - tcs: tTCS; - rekt:boolean; - next,prev: tAggr_ptr; - prv:^tPrv; - Cnt:Byte; - procedure UnRef; - procedure Cont; - procedure Init(const source:tNetAddr); - procedure TCTimeout; - procedure Done; -end; - -var Peers:^tAggr; - -{Requests -Close(); -GET(filehash:20; baseHi:word2; base:word4; limit:word4); -SEG(baseHi:word2; base:word4; limit:word4); -}{Responses -INFO(sizeHi:word2; size:word4; final:byte; seglen:word4); -FAIL(code:byte;...); -DONE(); -} - -procedure tPrv.DoGET(const fid:tfid; base,limit:LongWord); - var err:tmemorystream; - begin - if isOpen then oinfo.Close; - if Active then Stop; //opt - ch^.Ack; - oinfo.Open(fid); - {if not oinfo.final then begin - oinfo.rc:=200; - Close(oinfo.hnd); - end;} - if oinfo.rc>0 then begin - ch^.StreamInit(err,3); - err.WriteByte(upFAIL); - if oinfo.rc=1 then err.WriteByte(upErrNotFound) - else begin err.WriteByte(upErrIO); err.WriteByte(oinfo.rc) end; - ch^.Send(err); - end else begin - isopen:=true; - DoSeg(base,limit); - end; -end; - -procedure tPrv.DoSEG(base,limit:LongWord); - var err:tmemorystream; - begin - if isOpen then begin - ch^.StreamInit(err,12); - oinfo.SegSeek(base); - if oinfo.rc>0 then begin - err.WriteByte(upFAIL); - err.WriteByte(upErrIO); - err.WriteByte(oinfo.rc); - ch^.Send(err); - if Active then Stop; - end else begin - err.WriteByte(upINFO); - err.WriteWord(0,2); - err.WriteWord(oinfo.length,4); - seglen:=limit; - if oinfo.seglen@self then begin - prev^.next:=next; - next^.prev:=prev; - end else next:=nil; - if aggr^.prv=@self then aggr^.prv:=next; - active:=false; - Shedule(20000,@Close); - writeln('upmgr: Stop'); -end; - -procedure tPrv.Cont; - var s:tMemoryStream; - var sz:LongWord; - var buf:array [1..4096] of byte; - begin - //writeln('upmgr: CONT! ',chan); - Assert(Active and isOpen); - sz:=aggr^.tcs.MaxSize(sizeof(buf))-5; {1mark+4base} - if sz>SegLen then sz:=SegLen; - //s.Init(GetMem(sz),0,sz); - Assert((sz+5)<=sizeof(buf)); - s.Init(@buf,0,sizeof(buf)); aggr^.tcs.WriteHeaders(s); - s.WriteByte(Chan); - s.WriteWord(DWORD(oinfo.Offset),4); - Assert(sz<=s.WrBufLen); - oinfo.ReadAhead(sz,s.WrBuf); //todo - oinfo.WaitRead; - Assert(oinfo.rc=0); //todo - s.WrEnd(sz); - aggr^.tcs.Send(s); - //FreeMem(s.base,s.size); - SegLen:=SegLen-sz; - dec(wcur); - //FIXME: wait for ack of previous message! - if SegLen=0 then begin - ch^.StreamInit(s,2); - s.WriteByte(upDONE); - ch^.Send(s); - Stop; - end else - if (wcur=0) then begin - wcur:=weight; - aggr^.prv:=next; - end; -end; - -procedure tPrv.DoClose; - begin - ch^.Ack; - Close; -end; - -procedure tPrv.OnMsg(msg:tSMsg; data:boolean); - var op:byte; - var hash:tfid; - var base:LongWord; - var limit:LongWord; - var err:tmemorystream; - label malformed; - begin - if not data then exit; //todo - Assert(not(aggr^.rekt and active)); - if aggr^.rekt then exit; - if msg.stream.RdBufLen<1 then goto malformed; - op:=msg.stream.ReadByte; - writeln('upmgr: opcode=',op,' sz=',msg.stream.RdBufLen); - case op of - upClose: DoClose; - upGET: begin - if msg.stream.RdBufLen<>30 then goto malformed; - msg.stream.Read(hash,20); - if msg.stream.ReadWord(2)>0 then goto malformed; - base:=msg.stream.ReadWord(4); - limit:=msg.stream.ReadWord(4); - DoGet(hash,base,limit); - end; - upSEG: begin - if msg.stream.RdBufLen<10 then goto malformed; - if msg.stream.ReadWord(2)>0 then goto malformed; - base:=msg.stream.ReadWord(4); - limit:=msg.stream.ReadWord(4); - DoSEG(base, limit); - end; - else goto malformed; - end; - exit; malformed: - ch^.StreamInit(err,2); - err.WriteByte(upFAIL); - err.WriteByte(upErrMalformed); - ch^.Send(err); - writeln('upmgr: malformed request stage=1'); -end; - -{######Timeouts and Shit#######} -procedure tPrv.ChatTimeout(willwait:LongWord); - begin - if WillWait<8000 then exit; - writeln('upmgr: Chat timeout'); - Close; -end; -procedure tPrv.Close; - var err:tMemoryStream; - begin - assert(assigned(ch)); - ch^.StreamInit(err,1); - err.WriteByte(upClose); - try ch^.Send(err); except end; - if Active then Stop; - if isOpen then oinfo.Close; - isOpen:=false; - ch^.Close; - ch:=nil; - UnShedule(@Close); - aggr^.UnRef; - FreeMem(@self,sizeof(self)); -end; - -procedure tPrv.Init(ag:tAggr_ptr; var nchat:tChat; msg: tSMsg); - begin - ch:=@nchat; - ch^.Callback:=@OnMsg; - ch^.TMHook:=@ChatTimeout; - aggr:=ag; - next:=nil; - prev:=nil; - chan:=msg.stream.readbyte; {todo: except} - writeln('upmgr: prv init ',string(msg.source^),' chan=',chan); - weight:=100; - wcur:=0; - isOpen:=false; Active:=false; - inc(aggr^.Cnt); - Shedule(5000,@Close); - OnMsg(msg,true); -end; - -procedure tAggr.Init(const source:tNetAddr); - begin - writeln('upmgr: init'); - next:=Peers; - prev:=nil; - rekt:=false; - if assigned(Peers) then Peers^.prev:=@self; - Peers:=@self; - tcs.Init(source); - tcs.CanSend:=@Cont; - tcs.maxTimeout:=8; - tcs.OnTimeout:=@TCTimeout; - tcs.Limit.Rate:=20*1024*1024; {20MB} - prv:=nil; - cnt:=0; -end; - -procedure tAggr.TCTimeout; - var pprv:pointer; - begin - writeln('upmgr: TCTimeout'); - while assigned(prv) do begin - assert(not rekt); - pprv:=prv; - prv^.Close; - if rekt then exit; - Assert(pprv<>prv); - end; - Done; -end; -procedure tAggr.Cont; - begin - if not assigned(prv) then exit; - prv^.Cont; -end; -procedure tAggr.UnRef; - begin - Assert(cnt>0); - Dec(Cnt); - writeln('upmgr: aggr unrefd'); - if cnt=0 then begin - Done; - FreeMem(@self,sizeof(self)); - end; -end; -procedure tAggr.Done; - begin - assert(not rekt); - writeln('upmgr: aggr close'); - rekt:=true; - tcs.Done; - if assigned(prev) then prev^.next:=next else Peers:=next; - if assigned(next) then next^.prev:=prev; -end; - -function FindAggr({const} addr:tNetAddr): tAggr_ptr; - begin - result:=Peers; - while assigned(result) do begin - if assigned(result^.next) then assert(result^.next^.prev=result); - if result^.tcs.remote=addr then exit; - result:=result^.next; - end; -end; - -procedure ChatHandler(var nchat:tChat; msg:tSMsg); - var ag:^tAggr; - var pr:^tPrv; - var s:tMemoryStream; - const cMax=16; - begin - writeln('upmgr: ChatHandler'); - msg.stream.skip({the initcode}1); - if msg.stream.RdBufLen<2 then begin - writeln('upmgr: malformed init'); - nchat.StreamInit(s,16); - s.WriteByte(upFAIL); - s.writebyte(upErrMalformed); - nchat.Send(s); - nchat.Close; - writeln('upmgr: malformed request stage=0'); - exit end; - {first get the ag} - ag:=FindAggr(msg.source^); - if assigned(ag) then begin - {check} - if ag^.Cnt>=cMax then begin - nchat.StreamInit(s,16); - s.WriteByte(upFAIL); - s.WriteByte(upErrHiChan); - s.WriteByte(cMax); - s.WriteByte(ag^.Cnt); - nchat.Send(s); - nchat.Close; - exit end; - end else begin - New(ag); - ag^.init(msg.source^); - end; - New(pr); - pr^.Init(ag,nchat,msg); -end; - -BEGIN - SetChatHandler(opcode.upFileServer,@ChatHandler); -END. \ No newline at end of file -- 2.11.4.GIT