4 uses MemStream
,NetAddr
,UnixType
,Sockets
;
7 procedure RequestTerminate(c
:byte);
14 stream
: tMemoryStream
;
17 type tMessageHandler
=procedure(msg
:tSMsg
);
18 procedure SetMsgHandler(OpCode
:byte; handler
:tMessageHandler
);
19 procedure SetHiMsgHandler(handler
:tMessageHandler
);
21 function GetSocket(const rcpt
:tNetAddr
):tSocket
;
22 procedure SendMessage(const data
; len
:word; const rcpt
:tNetAddr
);
23 {procedure SendReply(const data; len:word; const rcpt:tSMsg );}
24 procedure SendMessage(const data
; len
:word; const rcpt
:tNetAddr
; channel
:word );
26 {#Sheduling and watching#}
27 type tFDEventHandler
=procedure(ev
:Word) of object;
28 type tOnTimer
=procedure of object;
29 procedure WatchFD(fd
:tHandle
; h
:tFDEventHandler
);
30 procedure WatchFDRW(fd
:tHandle
; h
:tFDEventHandler
);
31 procedure Shedule(timeout
{ms}: LongWord
; h
:tOnTimer
);
32 procedure UnShedule(h
:tOnTimer
);
33 {note unshed will fail when called from OnTimer proc}
35 type tObjMessageHandler
=procedure(msg
:tSMsg
) of object;
36 {deliver message from peer to the object}
37 procedure SetMsgHandler(OpCode
:byte; from
:tNetAddr
; handler
:tObjMessageHandler
); overload
;
38 function IsMsgHandled(OpCode
:byte; from
:tNetAddr
):boolean;
40 function OptIndex(o
:string):word;
41 function OptParamCount(o
:word):word;
43 var OnTerminate
:procedure;
44 var VersionString
:string[63];
45 const VersionBrand
='BrodNetD';
47 type tTimeVal
=UnixType
.timeval
;
50 var mNow
:tMTime
; { miliseconds since start }
51 {overflows in hunderd hours }
52 function GetMTime
:tMTime
;
53 procedure SetThreadName(name
:pchar
);
54 procedure SC(fn
:pointer; retval
:cint
);
58 USES SysUtils
,BaseUnix
63 {aim for most simple implementation, since could be extended anytime}
68 var pollArr
: packed array [tPollTop
] of tPollFd
;
70 cb
: tFDEventHandler
; {proc+object}
72 var pollHnd
: array [tPollTop
] of tFdHndDsc
;
73 var pollTop
: tPollTop
;
75 var hnd
: array [1..36] of tMessageHandler
;
76 var HiHnd
: tMessageHandler
;
78 type tSheduled_ptr
=^tSheduled
; tSheduled
=record
83 var ShedTop
: ^tSheduled
;
84 var ShedUU
: ^tSheduled
;
86 var PollTimeout
:LongInt;
88 procedure SC(fn
:pointer; retval
:cint
);
90 if retval
< 0 then begin
91 raise eXception
.Create(Format('Socket error %d operation %P',[SocketError
,fn
]));
95 procedure s_SetupInet
;
96 var bind_addr
:tInetSockAddr
;
100 with bind_addr
do begin
102 oi
:=OptIndex('-port');
103 if oi
=0 then sin_port
:=htons(3511)
105 assert(OptParamCount(oi
)=1);
106 sin_port
:=htons(StrToInt(paramstr(oi
+1)));
108 sin_addr
.s_addr
:=0; {any}
109 s_inet
:=fpSocket(sin_family
,SOCK_DGRAM
,IPPROTO_UDP
);
110 SC(@fpSocket
,s_inet
);
111 turnon
:=IP_PMTUDISC_DO
;
112 SC(@fpsetsockopt
,fpsetsockopt(s_inet
, IPPROTO_IP
, IP_MTU_DISCOVER
, @turnon
, sizeof(turnon
)));
114 SC(@fpBind
,fpBind(s_inet
,@bind_addr
,sizeof(bind_addr
)));
115 with PollArr
[0] do begin
122 var Terminated
:boolean=false;
124 function GetSocket(const rcpt
:tNetAddr
):tSocket
;
128 procedure SendMessage(const data
; len
:word; const rcpt
:tSockAddrL
);
130 {SC(@fpsendto,}fpsendto(s_inet
,@data
,len
,0,@rcpt
,sizeof(sockaddr_in
)){)};
132 procedure SendMessage(const data
; len
:word; const rcpt
:tNetAddr
);
136 SendMessage(data
,len
,sa
);
138 procedure SendMessage(const data
; len
:word; const rcpt
:tNetAddr
; channel
:word );
140 SendMessage(data
,len
,rcpt
);
141 {todo: optimization??}
144 procedure SignalHandler(sig
:cint
);CDecl;
147 if terminated
then raise eControlC
.Create('CtrlC DoubleTap') ;
150 procedure FatalSignalHandler(sig
:cint
);CDecl;
152 raise eExternal
.Create('Unexpected Signal '+IntToStr(sig
)) ;
156 {index=iphash+opcode}
157 type tPeerTableBucket
=record
160 handler
:tObjMessageHandler
;
162 var PT
:array [0..255] of ^tPeerTableBucket
;
163 var PT_opcodes
: set of 1..high(hnd
);
165 function FindPT(opcode
:byte; addr
:tNetAddr
):Word; { $FFFF=fail}
168 i
:=(addr
.hash
+opcode
) mod high(PT
); {0..63}
169 for o
:=0 to high(PT
) do begin
170 result
:=(i
+o
) mod high(PT
);
171 if not assigned(PT
[result
]) then break
;
172 if (PT
[result
]^.opcode
=opcode
) and (PT
[result
]^.remote
=addr
) then exit
;
177 function IsMsgHandled(OpCode
:byte; from
:tNetAddr
):boolean;
178 begin result
:=FindPT(opcode
,from
)<>$FFFF end;
180 procedure UnSetMsgHandler(const from
:tNetAddr
; opcode
:byte);
183 h
:=FindPT(opcode
,from
);
184 if h
=$FFFF then exit
;
187 {go reverse exit on null, hash them, match: move to H and stop}
188 if h
=0 then i
:=high(PT
) else i
:=h
-1;
189 while (i
<>h
)and assigned(PT
[i
]) do begin
190 if (PT
[i
]^.remote
.hash
+PT
[i
]^.opcode
)=h
then begin
195 if i
=0 then i
:=high(PT
) else dec(i
);
199 procedure SetMsgHandler(OpCode
:byte; from
:tNetAddr
; handler
:tObjMessageHandler
);
202 UnSetMsgHandler(from
,opcode
);
203 if handler
=nil then exit
;
204 h
:=(from
.hash
+opcode
) mod high(PT
);
205 for o
:=0 to high(PT
) do begin
206 i
:=(h
+o
) mod high(PT
);
207 if not assigned(PT
[i
]) then break
;
210 PT
[i
]^.opcode
:=OpCode
;
212 PT
[i
]^.handler
:=handler
;
213 if opcode
<=high(hnd
) then Include(PT_opcodes
,opcode
);
216 {do not waste stack on statics}
217 var EventsCount
:integer;
218 var Buffer
:array [1..4096] of byte;
220 var From
:tSockAddrL
; {use larger struct so everything fits}
221 var FromLen
:LongWord
;
223 var curhnd
:tMessageHandler
;
224 var curhndo
:tObjMessageHandler
;
228 function DoSock(var p
:tPollFD
):boolean;
235 if (p
.revents
and pollIN
)=0 then exit
else result
:=true;
236 FromLen
:=sizeof(From
);
237 pkLen
:=fprecvfrom(p
.FD
,@Buffer
,sizeof(Buffer
),0,@from
,@fromlen
);
238 SC(@fprecvfrom
,pkLen
);
240 FromG
.FromSocket(from
);
241 Msg
.Source
:=@FromG
; {!thread}
243 Msg
.Data
:=@Buffer
; {!thread}
244 Msg
.stream
.Init(@Buffer
,pkLen
,sizeof(Buffer
));
245 Msg
.channel
:=0; {!multisocket}
246 if Buffer
[1]>=128 then curhnd
:=HiHnd
else if Buffer
[1]<=high(hnd
) then curhnd
:=hnd
[Buffer
[1]];
247 if (Buffer
[1]>high(hnd
))or(Buffer
[1] in PT_opcodes
) then begin
248 ptidx
:=FindPT(Buffer
[1],FromG
);
249 if ptidx
<$FFFF then curhndo
:=PT
[ptidx
]^.handler
;
253 var GetMTimeOffsetSec
:DWORD
=0;
254 function GetMTime
:tMTime
;
256 var time
:UnixType
.timespec
;
259 assert(clock_gettime(CLOCK_MONOTONIC
,@time
)=0);
260 trans
:=((time
.tv_sec
-GetMTimeOffsetSec
)*1000)+(time
.tv_nsec
div 1000000);
261 GetMTime
:=trans
and $FFFFFFFF;
262 {$ELSE}{$ERROR Not Implemented on non unix}
265 procedure InitMTime
; {$IFDEF UNIX}
266 var time
:UnixType
.timespec
;
268 assert(clock_gettime(CLOCK_MONOTONIC
,@time
)=0);
269 GetMTimeOffsetSec
:=time
.tv_sec
;
270 {$ELSE}{$ERROR Not Implemented on non unix}
275 function prctl( option
:cint
; arg2
,arg3
,arg4
,arg5
:culong
):cint
; cdecl; external;
276 const PR_SET_NAME
=15;
278 procedure SetThreadName(name
:pchar
);
279 {$IFDEF Linux} begin prctl(PR_SET_NAME
,culong(pchar(name
)),0,0,0)
280 {$ELSE}begin{$NOTE Custom thread mames not supported}
290 {gmagic with delta-time, increment mNow, ...}
292 delta
:=mNow
-LastShed
;
294 //writeln('DeltaTime: ',delta);
295 {first tick all tasks}
298 while assigned(cur
) do begin
299 if cur
^.left
<=delta
then cur
^.left
:=0 else begin
300 dec(cur
^.left
,delta
);
301 {also set next wake time}
302 if cur
^.left
<PollTimeout
then PollTimeout
:=cur
^.left
;
308 {correct floating-point glitch}
309 if pollTimeout
=0 then pollTimeOut
:=1;
310 {run first runnable task}
313 while assigned(cur
) do begin
314 if cur
^.left
=0 then begin
331 var ReExec
:boolean=false;
335 while not terminated
do begin
338 EventsCount
:=fpPoll(@PollArr
[0],PollTop
,PollTimeout
);
340 if (eventscount
=-1)and terminated
then break
;
341 if eventscount
=-1 then break
; {fixme: print error}
342 if eventscount
=0 then continue
else begin
344 if DoSock(PollArr
[0]) then
345 if assigned(curhndo
) then curhndo(msg
)
346 else if assigned(curhnd
) then curhnd(msg
)
347 else {raise eXception.Create('}writeln('ServerLoop: No handler for opcode '+IntToStr(Buffer
[1]));
350 for tp
:=1 to pollTop
do if PollArr
[tp
].revents
>0 then begin
351 PollHnd
[tp
].CB(PollArr
[tp
].rEvents
);
352 PollArr
[tp
].revents
:=0;
356 if assigned(onTerminate
) then onTerminate
;
358 if ReExec
then fpExecv(paramstr(0),argv
);
361 procedure SetMsgHandler(OpCode
:byte; handler
:tMessageHandler
);
362 begin assert(hnd
[OpCode
]=nil); hnd
[OpCode
]:=handler
; end;
363 procedure SetHiMsgHandler(handler
:tMessageHandler
);
364 begin Hihnd
:=handler
; end;
366 procedure WatchFD(fd
:tHandle
; h
:tFDEventHandler
; e
:LongWord
);
369 if assigned(h
) then begin
370 PollHnd
[pollTop
].CB
:=h
;
371 PollArr
[pollTop
].fd
:=fd
;
372 PollArr
[pollTop
].events
:=e
;
373 PollArr
[pollTop
].revents
:=0;
374 //writeln('Add watch ',pollTop,' on ',fd,' to ',IntToHex(qword(@h),8));
376 end else for opt
:=0 to high(opt
) do if PollArr
[opt
].fd
=fd
then begin
377 if (pollTop
-1)>opt
then begin
378 PollArr
[opt
]:=PollArr
[pollTop
-1];
379 PollHnd
[opt
]:=PollHnd
[pollTop
-1];
382 PollArr
[pollTop
].fd
:=-1;
383 PollArr
[pollTop
].events
:=0;
384 PollArr
[pollTop
].revents
:=0;
388 procedure WatchFD(fd
:tHandle
; h
:tFDEventHandler
);
390 WatchFD(fd
,h
,POLLERR
or POLLHUP
or POLLIN
or POLLPRI
or
391 POLLRDBAND
or POLLRDNORM
);
393 procedure WatchFDRW(fd
:tHandle
; h
:tFDEventHandler
);
395 WatchFD(fd
,h
,POLLERR
or POLLHUP
or POLLIN
or POLLPRI
or
396 POLLRDBAND
or POLLRDNORM
or POLLOUT
);
399 procedure Shedule(timeout
{ms}: LongWord
; h
:tOnTimer
);
403 if Assigned(ShedUU
) then begin
405 ShedUU
:=ShedUU
^.next
;
406 end else New(ShedTop
);
407 ShedTop
^.Left
:=timeout
;
412 procedure UnShedule(h
:tOnTimer
);
416 //if ShedTop=nil then AbstractError;
419 while assigned(cur
) do begin
420 if 0=CompareByte(cur
^.cb
,h
,sizeof(h
)) then begin
421 pcur
^:=cur
^.next
; {unlink from main list}
422 cur
^.next
:=ShedUU
; ShedUU
:=cur
; {link to unused}
431 var DoShowOpts
:boolean=false;
432 function OptIndex(o
:string):word;
434 if DoShowOpts
then writeln('Option: ',o
);
436 while result
>0 do begin
437 if o
=system
.paramstr(result
) then break
;
442 function OptParamCount(o
:word):word;
446 if o
>0 then for i
:=o
+1 to paramcount
do begin
447 if paramstr(i
)[1]<>'-' then inc(result
)
451 procedure RequestTerminate(c
:byte);
452 begin Terminated
:=true;
453 if c
=9 then ReExec
:=true;
457 var nb
:array [0..0] of byte;
460 VersionString
:=GIT_VERSION
+'-'+IntToStr(BUILD_VERSION
);
461 writeln('ServerLoop: ',VersionBrand
,' ',VersionString
);
464 fpSignal(SigInt
,@SignalHandler
);
465 fpSignal(SigTerm
,@SignalHandler
);
466 fpSignal(SigPipe
,baseunix
.signalhandler(SIG_IGN
));
467 for i
:=1 to high(hnd
) do hnd
[i
]:=nil;
468 for i
:=1 to high(PT
) do PT
[i
]:=nil;
470 pollTop
:=1; {1 for basic listen}
472 ShedUU
:=nil; {todo: allocate a few to improve paging}
475 if OptIndex('-h')>0 then DoShowOpts
:=true;
478 SetTextBuf(OUTPUT
,nb
);