4 uses MemStream
,NetAddr
,UnixType
;
13 stream
: tMemoryStream
;
16 type tMessageHandler
=procedure(msg
:tSMsg
);
17 procedure SetMsgHandler(OpCode
:byte; handler
:tMessageHandler
);
18 procedure SetHiMsgHandler(handler
:tMessageHandler
);
20 procedure SendMessage(const data
; len
:word; const rcpt
:tNetAddr
);
21 {procedure SendReply(const data; len:word; const rcpt:tSMsg );}
22 procedure SendMessage(const data
; len
:word; const rcpt
:tNetAddr
; channel
:word );
24 {#Sheduling and watching#}
25 type tFDEventHandler
=procedure(ev
:Word) of object;
26 type tOnTimer
=procedure of object;
27 procedure WatchFD(fd
:tHandle
; h
:tFDEventHandler
);
28 procedure Shedule(timeout
{ms}: LongWord
; h
:tOnTimer
);
29 procedure UnShedule(h
:tOnTimer
);
30 {note unshed will fail when called from OnTimer proc}
32 type tObjMessageHandler
=procedure(msg
:tSMsg
) of object;
33 {deliver message from peer to the object}
34 procedure SetMsgHandler(OpCode
:byte; from
:tNetAddr
; handler
:tObjMessageHandler
);
35 function IsMsgHandled(OpCode
:byte; from
:tNetAddr
):boolean;
37 type tTimeVal
=UnixType
.timeval
;
42 USES SysUtils
,Sockets
,BaseUnix
46 {aim for most simple implementation, since could be extended anytime}
51 var pollArr
: packed array [tPollTop
] of tPollFd
;
53 cb
: tFDEventHandler
; {proc+object}
55 var pollHnd
: array [tPollTop
] of tFdHndDsc
;
56 var pollTop
: tPollTop
;
58 var hnd
: array [1..36] of tMessageHandler
;
59 var HiHnd
: tMessageHandler
;
61 type tSheduled_ptr
=^tSheduled
; tSheduled
=record
66 var ShedTop
: ^tSheduled
;
67 var ShedUU
: ^tSheduled
;
68 var LastShed
: UnixType
.timeval
;
69 var PollTimeout
:LongInt;
71 procedure SC(fn
:pointer; retval
:cint
);
73 if retval
< 0 then begin
74 raise eXception
.Create(Format('Socket error %d operation %P',[SocketError
,fn
]));
78 procedure s_SetupInet
;
79 var bind_addr
:tInetSockAddr
;
82 with bind_addr
do begin
84 sin_port
:=htons(3511);
85 sin_addr
.s_addr
:=0; {any}
86 s_inet
:=fpSocket(sin_family
,SOCK_DGRAM
,IPPROTO_UDP
);
88 turnon
:=IP_PMTUDISC_DO
;
89 SC(@fpsetsockopt
,fpsetsockopt(s_inet
, IPPROTO_IP
, IP_MTU_DISCOVER
, @turnon
, sizeof(turnon
)));
91 SC(@fpBind
,fpBind(s_inet
,@bind_addr
,sizeof(bind_addr
)));
92 with PollArr
[0] do begin
99 var Terminated
:boolean=false;
101 procedure SendMessage(const data
; len
:word; const rcpt
:tSockAddrL
);
103 {SC(@fpsendto,}fpsendto(s_inet
,@data
,len
,0,@rcpt
,sizeof(sockaddr_in
)){)};
105 procedure SendMessage(const data
; len
:word; const rcpt
:tNetAddr
);
109 SendMessage(data
,len
,sa
);
111 procedure SendMessage(const data
; len
:word; const rcpt
:tNetAddr
; channel
:word );
113 SendMessage(data
,len
,rcpt
);
114 {todo: optimization??}
117 procedure SignalHandler(sig
:cint
);CDecl;
120 if terminated
then raise eControlC
.Create('CtrlC DoubleTap') ;
122 writeln('Shutdown requested');
125 {index=iphash+opcode}
126 type tPeerTableBucket
=record
129 handler
:tObjMessageHandler
;
131 var PT
:array [0..255] of ^tPeerTableBucket
;
132 var PT_opcodes
: set of 1..36;
134 function FindPT(opcode
:byte; const addr
:tNetAddr
):Word; { $FFFF=fail}
137 i
:=(addr
.hash
+opcode
) mod high(PT
); {0..63}
138 for o
:=0 to high(PT
) do begin
139 result
:=(i
+o
) mod high(PT
);
140 if not assigned(PT
[result
]) then break
;
141 if (PT
[result
]^.opcode
=opcode
) and (PT
[result
]^.remote
=addr
) then exit
;
146 function IsMsgHandled(OpCode
:byte; from
:tNetAddr
):boolean;
147 begin result
:=FindPT(opcode
,from
)<>$FFFF end;
149 procedure UnSetMsgHandler(const from
:tNetAddr
; opcode
:byte);
152 h
:=FindPT(opcode
,from
);
153 if h
=$FFFF then exit
;
156 {go reverse exit on null, hash them, match: move to H and stop}
158 while (i
<>h
)and assigned(PT
[i
]) do begin
159 if (PT
[i
]^.remote
.hash
+PT
[i
]^.opcode
)=h
then begin
164 if i
=0 then i
:=high(PT
) else dec(i
);
168 procedure SetMsgHandler(OpCode
:byte; from
:tNetAddr
; handler
:tObjMessageHandler
);
171 UnSetMsgHandler(from
,opcode
);
172 if handler
=nil then exit
;
173 h
:=(from
.hash
+opcode
) mod high(PT
);
174 for o
:=0 to high(PT
) do begin
175 i
:=(h
+o
) mod high(PT
);
176 if not assigned(PT
[i
]) then break
;
179 PT
[i
]^.opcode
:=OpCode
;
181 PT
[i
]^.handler
:=handler
;
182 Include(PT_opcodes
,opcode
);
185 {do not waste stack on statics}
186 var EventsCount
:integer;
187 var Buffer
:array [1..1024] of byte;
189 var From
:tSockAddrL
; {use larger struct so everything fits}
190 var FromLen
:LongWord
;
191 var curhnd
:tMessageHandler
;
192 var curhndo
:tObjMessageHandler
;
196 procedure DoSock(var p
:tPollFD
);
202 if (p
.revents
and pollIN
)=0 then exit
;
203 FromLen
:=sizeof(From
);
204 pkLen
:=fprecvfrom(p
.FD
,@Buffer
,sizeof(Buffer
),0,@from
,@fromlen
);
205 SC(@fprecvfrom
,pkLen
);
207 FromG
.FromSocket(from
);
208 Msg
.Source
:=@FromG
; {!thread}
210 Msg
.Data
:=@Buffer
; {!thread}
211 Msg
.stream
.Init(@Buffer
,pkLen
,sizeof(Buffer
));
212 Msg
.channel
:=0; {!multisocket}
213 if Buffer
[1]>=128 then curhnd
:=HiHnd
else if Buffer
[1]<=high(hnd
) then curhnd
:=hnd
[Buffer
[1]];
214 if Buffer
[1] in PT_opcodes
then ptidx
:=FindPT(Buffer
[1],FromG
) else ptidx
:=0;
215 if ptidx
>0 then curhndo
:=PT
[ptidx
]^.handler
;
221 var now
:UnixType
.timeval
absolute iNow
;
223 var olTop
:^tSheduled
;
229 ShedTop
:=nil; {unlink the current shed list}
230 fpgettimeofday(@Now
,nil);
231 delta
:=(Now
.tv_sec
-LastShed
.tv_sec
);
232 if delta
>6 then delta
:=5000 else delta
:=(delta
*1000)+((Now
.tv_usec
-LastShed
.tv_usec
) div 1000);
234 //writeln('DeltaTime: ',delta);
235 while assigned(cur
) do begin
236 if (cur
^.left
<=delta
)or(cur
^.left
=0) then begin
243 DEC(cur
^.left
,delta
);
244 //writeln('Left: ',cur^.left);
245 if pollTimeOut
>cur
^.left
then PollTimeOut
:=cur
^.left
;
250 pcur
^:=ShedTop
; {append newly added tasks to end of untriggererd list}
251 ShedTop
:=olTop
; {link in the untriggered tasks}
253 while assigned(cur
) do begin
254 if cur
^.left
<PollTimeout
then PollTimeout
:=cur
^.left
;
257 if pollTimeout
=0 then pollTimeOut
:=1;
263 while not terminated
do begin
266 EventsCount
:=fpPoll(@PollArr
[0],PollTop
,PollTimeout
);
268 if (eventscount
=-1)and terminated
then break
;
269 if eventscount
=-1 then break
; {fixme: print error}
270 if eventscount
=0 then continue
else begin
273 if assigned(curhndo
) then curhndo(msg
)
274 else if assigned(curhnd
) then curhnd(msg
)
275 else raise eXception
.Create('No handler for opcode '+IntToStr(Buffer
[1]));
278 for tp
:=1 to pollTop
do if PollArr
[tp
].revents
>0 then begin
279 PollHnd
[tp
].CB(PollArr
[tp
].rEvents
);
280 PollArr
[tp
].revents
:=0;
284 write('Loop broken [');
289 procedure SetMsgHandler(OpCode
:byte; handler
:tMessageHandler
);
290 begin assert(hnd
[OpCode
]=nil); hnd
[OpCode
]:=handler
; end;
291 procedure SetHiMsgHandler(handler
:tMessageHandler
);
292 begin Hihnd
:=handler
; end;
294 procedure WatchFD(fd
:tHandle
; h
:tFDEventHandler
);
297 if assigned(h
) then begin
298 PollHnd
[pollTop
].CB
:=h
;
299 PollArr
[pollTop
].fd
:=fd
;
300 PollArr
[pollTop
].events
:=POLLERR
or POLLHUP
or POLLIN
or POLLPRI
or
301 POLLRDBAND
or POLLRDNORM
;
302 PollArr
[pollTop
].revents
:=0;
303 //writeln('Add watch ',pollTop,' on ',fd,' to ',IntToHex(qword(@h),8));
305 end else for opt
:=0 to high(opt
) do if PollArr
[opt
].fd
=fd
then begin
306 if (pollTop
-1)>opt
then begin
307 PollArr
[opt
]:=PollArr
[pollTop
-1];
308 PollHnd
[opt
]:=PollHnd
[pollTop
-1];
311 PollArr
[pollTop
].fd
:=-1;
312 PollArr
[pollTop
].events
:=0;
313 PollArr
[pollTop
].revents
:=0;
318 procedure Shedule(timeout
{ms}: LongWord
; h
:tOnTimer
);
322 if Assigned(ShedUU
) then begin
324 ShedUU
:=ShedUU
^.next
;
325 end else New(ShedTop
);
326 ShedTop
^.Left
:=timeout
;
331 procedure UnShedule(h
:tOnTimer
);
337 while assigned(cur
) do begin
338 if cur
^.cb
=h
then begin
339 pcur
^:=cur
^.next
; {unlink from main list}
340 cur
^.next
:=ShedUU
; ShedUU
:=cur
; {link to unused}
352 fpSignal(SigInt
,@SignalHandler
);
353 fpSignal(SigTerm
,@SignalHandler
);
354 for i
:=1 to high(hnd
) do hnd
[i
]:=nil;
355 for i
:=1 to high(PT
) do PT
[i
]:=nil;
357 pollTop
:=1; {1 for basic listen}
359 ShedUU
:=nil; {todo: allocate a few to improve paging}
360 fpgettimeofday(@LastShed
,nil);