From bdeaedd0ad601effc062a5ef15a1a8d1dd9d7958 Mon Sep 17 00:00:00 2001 From: =?utf8?q?Tom=C3=A1=C5=A1=20Brada?= Date: Tue, 22 Sep 2015 16:38:13 +0200 Subject: [PATCH] Fix Chat and PT in ServerLoop so it works. --- Chat.pas | 42 +++++++++++++++++++++++++++--------------- ServerLoop.pas | 24 ++++++++++++++---------- 2 files changed, 41 insertions(+), 25 deletions(-) diff --git a/Chat.pas b/Chat.pas index e57338b..96f6420 100644 --- a/Chat.pas +++ b/Chat.pas @@ -15,6 +15,7 @@ type tChat=object RTT:LongWord;{in ms} callback: procedure(msg:tSMsg; data:boolean) of object; {client must maintain active chats} TMhook : procedure(willwait:LongWord ) of object; + DisposeHook: procedure of object; {called instead of freeing self} procedure Init(const iremote:tNetAddr); procedure AddHeaders(var s:tMemoryStream); procedure Send(s:tMemoryStream); @@ -59,7 +60,9 @@ procedure tChat.Init(const iremote:tNetAddr); end; procedure tChat.InitFrom(const iremote:tNetAddr; iopcode:byte); begin + remote:=iremote; opcode:=iopcode; + SetMsgHandler(opcode,remote,@OnReply); txSeq:=0; rxSeq:=0; rxAcked:=true; {to not ack pk 0} @@ -68,7 +71,8 @@ procedure tChat.InitFrom(const iremote:tNetAddr; iopcode:byte); txLen:=0; callback:=nil; TMhook:=nil; - RTT:=500; {a default for timeouts} + DisposeHook:=nil; + RTT:=200; {a default for timeouts} txTime:=0; end; {struct @@ -84,7 +88,7 @@ procedure tCHat.AddHeaders(var s:tMemoryStream); procedure tChat.Send(s:tMemoryStream); begin assert(txLen=0); - assert(assigned(callback)); + //assert(assigned(callback)); Inc(txSeq); s.Seek(0); s.WriteByte(opcode); @@ -117,39 +121,44 @@ end; procedure tChat.Close; begin assert(not closed); + Ack; closed:=true; if txLen=0 {no packets in flight} then begin Shedule(3000{todo},@Done); {wait for something lost} callback:=nil; {avoid calling} + tmhook:=nil; end; end; procedure tChat.Done; begin {called from sheduler, Done is unsheduled, Resend is not sheduled since ack was received when Done was sheduled} - FreeMem(txPk,txLen); + if txLen>0 then FreeMem(txPk,txLen); SetMsgHandler(opcode,remote,nil); - FreeMem(@self,sizeof(self)); + if assigned(DisposeHook) then DisposeHook + else FreeMem(@self,sizeof(self)); end; procedure tChat.Resend; {timeout waiting for ack} begin {resend and reshedule} + Assert(txLen>0); txTime:=0; RTT:=RTT*2; if assigned(TMhook) and (not closed) then begin - TMhook(RTT*2); + TMhook(RTT); if closed then begin Done; {if hook decided to close then abort} exit; end; end; - if (RTT>32000) and closed - then Done {give up} - else if txLen>0 then begin + if (RTT>16000) and closed then begin + Done {give up} + end else begin + {finally resend the msg} ServerLoop.SendMessage(txPk^,txLen,remote); - ServerLoop.Shedule(RTT*2,@Resend); + ServerLoop.Shedule(RTT,@Resend); end; end; @@ -173,17 +182,18 @@ procedure tChat.OnReply(msg:tSMsg); {some useful data!} rxSeq:=seq; rxAcked:=false; - callback(msg,true); + if assigned(callback) then callback(msg,true); end; end; if aseq>0 then {ack of our msg} begin - if (aseq=rxSeq)and(txLen>0) {it is current} then begin + if (aseq=txSeq)and(txLen>0) {it is current} then begin if txTime>0 then RTT:=Round((Now-txTime)*MsecsPerDay); FreeMem(txPk,txLen); TxLen:=0; txPk:=nil; - callback(msg,false); - end else {it is ack of old data, do nothing}; + if assigned(callback) then callback(msg,false); + ServerLoop.UnShedule(@Resend); + end else {write(' old-ack')it is ack of old data, do nothing}; end; end; @@ -207,14 +217,16 @@ procedure OnHiMsg(msg:tSMsg); assert(not IsMsgHandled(opcode,msg.source^)); seq:=msg.stream.ReadWord(2); aseq:=msg.stream.ReadWord(2); - if (seq<>1)and(aseq>0) then exit; {invalid initial state} + if (seq<>1)or(aseq>0) then exit; {invalid initial state} ix:=msg.stream.ReadByte; if (ix<1)or(ix>high(ChatHandlers)) then exit; hnd:=ChatHandlers[ix]; if not assigned(hnd) then raise eXception.Create('No handler for initcode '+IntToStr(ix)); msg.stream.seek(msg.stream.position-1);{unskip the initcode} - New(nchat); + nchat:=GetMem(sizeof(tChat)); nchat^.InitFrom(msg.Source^,opcode); + nchat^.rxacked:=false; + nchat^.rxSeq:=1; hnd(nchat^,msg); end; diff --git a/ServerLoop.pas b/ServerLoop.pas index d04ee35..bcde2bf 100644 --- a/ServerLoop.pas +++ b/ServerLoop.pas @@ -138,9 +138,9 @@ type tPeerTableBucket=record handler:tObjMessageHandler; end; var PT:array [0..255] of ^tPeerTableBucket; -var PT_opcodes: set of 1..36; +var PT_opcodes: set of 1..high(hnd); -function FindPT(opcode:byte; const addr:tNetAddr):Word; { $FFFF=fail} +function FindPT(opcode:byte; addr:tNetAddr):Word; { $FFFF=fail} var i,o:word; begin i:=(addr.hash+opcode) mod high(PT); {0..63} @@ -188,7 +188,7 @@ procedure SetMsgHandler(OpCode:byte; from:tNetAddr; handler:tObjMessageHandler); PT[i]^.opcode:=OpCode; PT[i]^.remote:=from; PT[i]^.handler:=handler; - Include(PT_opcodes,opcode); + if opcode<=high(hnd) then Include(PT_opcodes,opcode); end; {do not waste stack on statics} @@ -197,18 +197,20 @@ end; var pkLen:LongWord; var From:tSockAddrL; {use larger struct so everything fits} var FromLen:LongWord; + var FromG:tNetAddr; var curhnd:tMessageHandler; var curhndo:tObjMessageHandler; var Msg:tSMsg; var tp:tPollTop; -procedure DoSock(var p:tPollFD); - var FromG:tNetAddr; - var ptidx:byte; +function DoSock(var p:tPollFD):boolean; + var ptidx:word; begin curhnd:=nil; curhndo:=nil; - if (p.revents and pollIN)=0 then exit; + result:=false; + ptidx:=$FFFF; + if (p.revents and pollIN)=0 then exit else result:=true; FromLen:=sizeof(From); pkLen:=fprecvfrom(p.FD,@Buffer,sizeof(Buffer),0,@from,@fromlen); SC(@fprecvfrom,pkLen); @@ -220,8 +222,10 @@ procedure DoSock(var p:tPollFD); Msg.stream.Init(@Buffer,pkLen,sizeof(Buffer)); Msg.channel:=0; {!multisocket} if Buffer[1]>=128 then curhnd:=HiHnd else if Buffer[1]<=high(hnd) then curhnd:=hnd[Buffer[1]]; - if Buffer[1] in PT_opcodes then ptidx:=FindPT(Buffer[1],FromG) else ptidx:=0; - if ptidx>0 then curhndo:=PT[ptidx]^.handler; + if (Buffer[1]>high(hnd))or(Buffer[1] in PT_opcodes) then begin + ptidx:=FindPT(Buffer[1],FromG); + if ptidx<$FFFF then curhndo:=PT[ptidx]^.handler; + end; end; procedure ShedRun; @@ -278,7 +282,7 @@ procedure Main; if eventscount=-1 then break; {fixme: print error} if eventscount=0 then continue else begin {INET socket} - DoSock(PollArr[0]); + if DoSock(PollArr[0]) then if assigned(curhndo) then curhndo(msg) else if assigned(curhnd) then curhnd(msg) else raise eXception.Create('No handler for opcode '+IntToStr(Buffer[1])); -- 2.11.4.GIT