From c558153a1f7bab74e00a3d5d1dead81695b92477 Mon Sep 17 00:00:00 2001 From: =?utf8?q?Tom=C3=A1=C5=A1=20Brada?= Date: Sat, 19 Sep 2015 09:57:44 +0200 Subject: [PATCH] Add set handler for msg from peer, impl as hash table. --- ServerLoop.pas | 95 +++++++++++++++++++++++++++++++++++++++++++++++++--------- 1 file changed, 81 insertions(+), 14 deletions(-) diff --git a/ServerLoop.pas b/ServerLoop.pas index d90fee7..0cca0f7 100644 --- a/ServerLoop.pas +++ b/ServerLoop.pas @@ -29,6 +29,10 @@ procedure Shedule(timeout{ms}: LongWord; h:tOnTimer); procedure UnShedule(h:tOnTimer); {note unshed will fail when called from OnTimer proc} +type tObjMessageHandler=procedure(msg:tSMsg) of object; +{deliver message from peer to the object} +procedure SetMsgHandler(OpCode:byte; from:tNetAddr; handler:tObjMessageHandler); + type tTimeVal=UnixType.timeval; var iNow:tTimeVal; @@ -118,27 +122,94 @@ procedure SignalHandler(sig:cint);CDecl; writeln('Shutdown requested'); end; +{index=iphash+opcode} +type tPeerTableBucket=record + opcode:byte; + remote:tNetAddr; + handler:tObjMessageHandler; +end; +var PT:array [0..255] of ^tPeerTableBucket; +var PT_opcodes: set of 1..36; + +function FindPT(opcode:byte; const addr:tNetAddr):Word; { $FFFF=fail} + var i,o:word; + begin + i:=(addr.hash+opcode) mod high(PT); {0..63} + for o:=0 to high(PT) do begin + result:=(i+o) mod high(PT); + if not assigned(PT[result]) then break; + if (PT[result]^.opcode=opcode) and (PT[result]^.remote=addr) then exit; + end; + result:=$FFFF; +end; + +procedure UnSetMsgHandler(const from:tNetAddr; opcode:byte); + var i,h:word; + begin + h:=FindPT(opcode,from); + if h=$FFFF then exit; + Dispose(PT[h]); + PT[h]:=nil; + {go reverse exit on null, hash them, match: move to H and stop} + i:=h-1; + while (i<>h)and assigned(PT[i]) do begin + if (PT[i]^.remote.hash+PT[i]^.opcode)=h then begin + PT[h]:=PT[i]; + PT[i]:=nil; + break; + end; + if i=0 then i:=high(PT) else dec(i); + end; +end; + +procedure SetMsgHandler(OpCode:byte; from:tNetAddr; handler:tObjMessageHandler); + var h,o,i:word; + begin + UnSetMsgHandler(from,opcode); + if handler=nil then exit; + h:=(from.hash+opcode) mod high(PT); + for o:=0 to high(PT) do begin + i:=(h+o) mod high(PT); + if not assigned(PT[i]) then break; + end; + New(PT[i]); + PT[i]^.opcode:=OpCode; + PT[i]^.remote:=from; + PT[i]^.handler:=handler; + Include(PT_opcodes,opcode); +end; + {do not waste stack on statics} var EventsCount:integer; var Buffer:array [1..1024] of byte; var pkLen:LongWord; var From:tSockAddrL; {use larger struct so everything fits} var FromLen:LongWord; - var FromG:tNetAddr; var curhnd:tMessageHandler; + var curhndo:tObjMessageHandler; var Msg:tSMsg; var tp:tPollTop; -procedure PrepareHandler; +procedure DoSock(var p:tPollFD); + var FromG:tNetAddr; + var ptidx:byte; begin + curhnd:=nil; + curhndo:=nil; + if (p.revents and pollIN)=0 then exit; + FromLen:=sizeof(From); + pkLen:=fprecvfrom(p.FD,@Buffer,sizeof(Buffer),0,@from,@fromlen); + SC(@fprecvfrom,pkLen); + p.revents:=0; FromG.FromSocket(from); - if Buffer[1]>128 then curhnd:=HiHnd else curhnd:=hnd[Buffer[1]]; - if not assigned(curhnd) then raise eXception.Create('No handler for opcode '+IntToStr(Buffer[1])); Msg.Source:=@FromG; {!thread} Msg.Length:=pkLen; Msg.Data:=@Buffer; {!thread} Msg.stream.Init(@Buffer,pkLen,sizeof(Buffer)); Msg.channel:=0; {!multisocket} + if Buffer[1]>128 then curhnd:=HiHnd else if Buffer[1]<=high(hnd) then curhnd:=hnd[Buffer[1]]; + if Buffer[1] in PT_opcodes then ptidx:=FindPT(Buffer[1],FromG) else ptidx:=0; + if ptidx>0 then curhndo:=PT[ptidx]^.handler; end; procedure ShedRun; @@ -194,16 +265,10 @@ procedure Main; if eventscount=-1 then break; {fixme: print error} if eventscount=0 then continue else begin {INET socket} - with PollArr[0] do begin - if (revents and pollIN)>0 then begin - FromLen:=sizeof(From); - pkLen:=fprecvfrom(s_inet,@Buffer,sizeof(Buffer),0,@from,@fromlen); - SC(@fprecvfrom,pkLen); - PrepareHandler; - curhnd(Msg); - revents:=0; - end; - end; + DoSock(PollArr[0]); + if assigned(curhndo) then curhndo(msg) + else if assigned(curhnd) then curhnd(msg) + else raise eXception.Create('No handler for opcode '+IntToStr(Buffer[1])); {INET6...} {Generic} for tp:=1 to pollTop do if PollArr[tp].revents>0 then begin @@ -283,6 +348,8 @@ BEGIN fpSignal(SigInt,@SignalHandler); fpSignal(SigTerm,@SignalHandler); for i:=1 to high(hnd) do hnd[i]:=nil; + for i:=1 to high(PT) do PT[i]:=nil; + PT_opcodes:=[]; pollTop:=1; {1 for basic listen} ShedTop:=nil; ShedUU:=nil; {todo: allocate a few to improve paging} -- 2.11.4.GIT