Fix sheduler inaccurancy, call shed after poll too.
[brdnet.git] / ServerLoop.pas
blobecb1456169e1578b5ca012d5a1cbb335aa7a2de8
1 UNIT ServerLoop;
3 INTERFACE
4 uses MemStream,NetAddr,UnixType;
6 procedure Main;
8 {#Message handling#}
9 type tSMsg=object
10 Source: ^tNetAddr;
11 Length: {Long}Word;
12 Data: pointer;
13 stream: tMemoryStream;
14 channel: word;
15 end;
16 type tMessageHandler=procedure(msg:tSMsg);
17 procedure SetMsgHandler(OpCode:byte; handler:tMessageHandler);
18 procedure SetHiMsgHandler(handler:tMessageHandler);
20 procedure SendMessage(const data; len:word; const rcpt:tNetAddr );
21 {procedure SendReply(const data; len:word; const rcpt:tSMsg );}
22 procedure SendMessage(const data; len:word; const rcpt:tNetAddr; channel:word );
24 {#Sheduling and watching#}
25 type tFDEventHandler=procedure(ev:Word) of object;
26 type tOnTimer=procedure of object;
27 procedure WatchFD(fd:tHandle; h:tFDEventHandler);
28 procedure Shedule(timeout{ms}: LongWord; h:tOnTimer);
29 procedure UnShedule(h:tOnTimer);
30 {note unshed will fail when called from OnTimer proc}
32 type tObjMessageHandler=procedure(msg:tSMsg) of object;
33 {deliver message from peer to the object}
34 procedure SetMsgHandler(OpCode:byte; from:tNetAddr; handler:tObjMessageHandler);
35 function IsMsgHandled(OpCode:byte; from:tNetAddr):boolean;
37 type tTimeVal=UnixType.timeval;
38 var iNow:tTimeVal;
40 IMPLEMENTATION
42 USES SysUtils,Sockets,BaseUnix
43 ,Unix
46 {aim for most simple implementation, since could be extended anytime}
48 var s_inet:tSocket;
50 type tPollTop=0..7;
51 var pollArr: packed array [tPollTop] of tPollFd;
52 type tFdHndDsc=record
53 cb: tFDEventHandler; {proc+object}
54 end;
55 var pollHnd: array [tPollTop] of tFdHndDsc;
56 var pollTop: tPollTop;
58 var hnd: array [1..36] of tMessageHandler;
59 var HiHnd: tMessageHandler;
61 type tSheduled_ptr=^tSheduled; tSheduled=record
62 left:LongWord;
63 cb:tOnTimer;
64 next:tSheduled_ptr;
65 end;
66 var ShedTop: ^tSheduled;
67 var ShedUU: ^tSheduled;
68 var LastShed: UnixType.timeval;
69 var PollTimeout:LongInt;
71 procedure SC(fn:pointer; retval:cint);
72 begin
73 if retval < 0 then begin
74 raise eXception.Create(Format('Socket error %d operation %P',[SocketError,fn]));
75 end;
76 end;
78 procedure s_SetupInet;
79 var bind_addr:tInetSockAddr;
80 var turnon:cint;
81 begin
82 with bind_addr do begin
83 sin_family:=AF_INET;
84 sin_port:=htons(3511);
85 sin_addr.s_addr:=0; {any}
86 s_inet:=fpSocket(sin_family,SOCK_DGRAM,IPPROTO_UDP);
87 SC(@fpSocket,s_inet);
88 turnon:=IP_PMTUDISC_DO;
89 SC(@fpsetsockopt,fpsetsockopt(s_inet, IPPROTO_IP, IP_MTU_DISCOVER, @turnon, sizeof(turnon)));
90 end;
91 SC(@fpBind,fpBind(s_inet,@bind_addr,sizeof(bind_addr)));
92 with PollArr[0] do begin
93 fd:=s_inet;
94 events:=pollIN;
95 revents:=0;
96 end;
97 end;
99 var Terminated:boolean=false;
101 procedure SendMessage(const data; len:word; const rcpt:tSockAddrL );
102 begin
103 {SC(@fpsendto,}fpsendto(s_inet,@data,len,0,@rcpt,sizeof(sockaddr_in)){)};
104 end;
105 procedure SendMessage(const data; len:word; const rcpt:tNetAddr );
106 var sa:tSockAddrL;
107 begin
108 rcpt.ToSocket(sa);
109 SendMessage(data,len,sa);
110 end;
111 procedure SendMessage(const data; len:word; const rcpt:tNetAddr; channel:word );
112 begin
113 SendMessage(data,len,rcpt);
114 {todo: optimization??}
115 end;
117 procedure SignalHandler(sig:cint);CDecl;
118 begin
119 writeln;
120 if terminated then raise eControlC.Create('CtrlC DoubleTap') ;
121 Terminated:=true;
122 writeln('Shutdown requested');
123 end;
125 {index=iphash+opcode}
126 type tPeerTableBucket=record
127 opcode:byte;
128 remote:tNetAddr;
129 handler:tObjMessageHandler;
130 end;
131 var PT:array [0..255] of ^tPeerTableBucket;
132 var PT_opcodes: set of 1..36;
134 function FindPT(opcode:byte; const addr:tNetAddr):Word; { $FFFF=fail}
135 var i,o:word;
136 begin
137 i:=(addr.hash+opcode) mod high(PT); {0..63}
138 for o:=0 to high(PT) do begin
139 result:=(i+o) mod high(PT);
140 if not assigned(PT[result]) then break;
141 if (PT[result]^.opcode=opcode) and (PT[result]^.remote=addr) then exit;
142 end;
143 result:=$FFFF;
144 end;
146 function IsMsgHandled(OpCode:byte; from:tNetAddr):boolean;
147 begin result:=FindPT(opcode,from)<>$FFFF end;
149 procedure UnSetMsgHandler(const from:tNetAddr; opcode:byte);
150 var i,h:word;
151 begin
152 h:=FindPT(opcode,from);
153 if h=$FFFF then exit;
154 Dispose(PT[h]);
155 PT[h]:=nil;
156 {go reverse exit on null, hash them, match: move to H and stop}
157 i:=h-1;
158 while (i<>h)and assigned(PT[i]) do begin
159 if (PT[i]^.remote.hash+PT[i]^.opcode)=h then begin
160 PT[h]:=PT[i];
161 PT[i]:=nil;
162 break;
163 end;
164 if i=0 then i:=high(PT) else dec(i);
165 end;
166 end;
168 procedure SetMsgHandler(OpCode:byte; from:tNetAddr; handler:tObjMessageHandler);
169 var h,o,i:word;
170 begin
171 UnSetMsgHandler(from,opcode);
172 if handler=nil then exit;
173 h:=(from.hash+opcode) mod high(PT);
174 for o:=0 to high(PT) do begin
175 i:=(h+o) mod high(PT);
176 if not assigned(PT[i]) then break;
177 end;
178 New(PT[i]);
179 PT[i]^.opcode:=OpCode;
180 PT[i]^.remote:=from;
181 PT[i]^.handler:=handler;
182 Include(PT_opcodes,opcode);
183 end;
185 {do not waste stack on statics}
186 var EventsCount:integer;
187 var Buffer:array [1..1024] of byte;
188 var pkLen:LongWord;
189 var From:tSockAddrL; {use larger struct so everything fits}
190 var FromLen:LongWord;
191 var curhnd:tMessageHandler;
192 var curhndo:tObjMessageHandler;
193 var Msg:tSMsg;
194 var tp:tPollTop;
196 procedure DoSock(var p:tPollFD);
197 var FromG:tNetAddr;
198 var ptidx:byte;
199 begin
200 curhnd:=nil;
201 curhndo:=nil;
202 if (p.revents and pollIN)=0 then exit;
203 FromLen:=sizeof(From);
204 pkLen:=fprecvfrom(p.FD,@Buffer,sizeof(Buffer),0,@from,@fromlen);
205 SC(@fprecvfrom,pkLen);
206 p.revents:=0;
207 FromG.FromSocket(from);
208 Msg.Source:=@FromG; {!thread}
209 Msg.Length:=pkLen;
210 Msg.Data:=@Buffer; {!thread}
211 Msg.stream.Init(@Buffer,pkLen,sizeof(Buffer));
212 Msg.channel:=0; {!multisocket}
213 if Buffer[1]>=128 then curhnd:=HiHnd else if Buffer[1]<=high(hnd) then curhnd:=hnd[Buffer[1]];
214 if Buffer[1] in PT_opcodes then ptidx:=FindPT(Buffer[1],FromG) else ptidx:=0;
215 if ptidx>0 then curhndo:=PT[ptidx]^.handler;
216 end;
218 procedure ShedRun;
219 var cur:^tSheduled;
220 var pcur:^pointer;
221 var now:UnixType.timeval absolute iNow;
222 var delta:LongWord;
223 var olTop:^tSheduled;
224 begin
225 {Sheduling}
226 olTop:=ShedTop;
227 pcur:=@olTop;
228 cur:=pcur^;
229 ShedTop:=nil; {unlink the current shed list}
230 fpgettimeofday(@Now,nil);
231 delta:=(Now.tv_sec-LastShed.tv_sec);
232 if delta>6 then delta:=5000 else delta:=(delta*1000)+((Now.tv_usec-LastShed.tv_usec) div 1000);
233 LastShed:=Now;
234 //writeln('DeltaTime: ',delta);
235 while assigned(cur) do begin
236 if (cur^.left<=delta)or(cur^.left=0) then begin
237 cur^.cb;
238 pcur^:=cur^.next;
239 cur^.next:=ShedUU;
240 ShedUU:=cur;
241 cur:=pcur^;
242 end else begin
243 DEC(cur^.left,delta);
244 //writeln('Left: ',cur^.left);
245 if pollTimeOut>cur^.left then PollTimeOut:=cur^.left;
246 pcur:=@cur^.next;
247 cur:=cur^.next;
248 end;
249 end;
250 pcur^:=ShedTop; {append newly added tasks to end of untriggererd list}
251 ShedTop:=olTop; {link in the untriggered tasks}
252 cur:=olTop;
253 while assigned(cur) do begin
254 if cur^.left<PollTimeout then PollTimeout:=cur^.left;
255 cur:=cur^.next;
256 end;
257 if pollTimeout=0 then pollTimeOut:=1;
258 end;
260 procedure Main;
261 begin
262 s_setupInet;
263 while not terminated do begin
264 PollTimeout:=5000;
265 ShedRun;
266 EventsCount:=fpPoll(@PollArr[0],PollTop,PollTimeout);
267 ShedRun;
268 if (eventscount=-1)and terminated then break;
269 if eventscount=-1 then break; {fixme: print error}
270 if eventscount=0 then continue else begin
271 {INET socket}
272 DoSock(PollArr[0]);
273 if assigned(curhndo) then curhndo(msg)
274 else if assigned(curhnd) then curhnd(msg)
275 else raise eXception.Create('No handler for opcode '+IntToStr(Buffer[1]));
276 {INET6...}
277 {Generic}
278 for tp:=1 to pollTop do if PollArr[tp].revents>0 then begin
279 PollHnd[tp].CB(PollArr[tp].rEvents);
280 PollArr[tp].revents:=0;
281 end;
282 end;
283 end;
284 write('Loop broken [');
285 CloseSocket(s_inet);
286 writeln(']');
287 end;
289 procedure SetMsgHandler(OpCode:byte; handler:tMessageHandler);
290 begin assert(hnd[OpCode]=nil); hnd[OpCode]:=handler; end;
291 procedure SetHiMsgHandler(handler:tMessageHandler);
292 begin Hihnd:=handler; end;
294 procedure WatchFD(fd:tHandle; h:tFDEventHandler);
295 var opt: tPollTop;
296 begin
297 if assigned(h) then begin
298 PollHnd[pollTop].CB:=h;
299 PollArr[pollTop].fd:=fd;
300 PollArr[pollTop].events:=POLLERR or POLLHUP or POLLIN or POLLPRI or
301 POLLRDBAND or POLLRDNORM;
302 PollArr[pollTop].revents:=0;
303 //writeln('Add watch ',pollTop,' on ',fd,' to ',IntToHex(qword(@h),8));
304 Inc(PollTop);
305 end else for opt:=0 to high(opt) do if PollArr[opt].fd=fd then begin
306 if (pollTop-1)>opt then begin
307 PollArr[opt]:=PollArr[pollTop-1];
308 PollHnd[opt]:=PollHnd[pollTop-1];
309 end;
310 dec(pollTop);
311 PollArr[pollTop].fd:=-1;
312 PollArr[pollTop].events:=0;
313 PollArr[pollTop].revents:=0;
314 break;
315 end;
316 end;
318 procedure Shedule(timeout{ms}: LongWord; h:tOnTimer);
319 var old:^tSheduled;
320 begin
321 old:=ShedTop;
322 if Assigned(ShedUU) then begin
323 ShedTop:=ShedUU;
324 ShedUU:=ShedUU^.next;
325 end else New(ShedTop);
326 ShedTop^.Left:=timeout;
327 ShedTop^.CB:=h;
328 ShedTop^.Next:=old;
329 end;
331 procedure UnShedule(h:tOnTimer);
332 var cur:^tSheduled;
333 var pcur:^pointer;
334 begin
335 pcur:=@ShedTop;
336 cur:=pcur^;
337 while assigned(cur) do begin
338 if cur^.cb=h then begin
339 pcur^:=cur^.next; {unlink from main list}
340 cur^.next:=ShedUU; ShedUU:=cur; {link to unused}
341 break;
342 end else begin
343 pcur:=@cur^.next;
344 cur:=pcur^;
345 end;
346 end;
347 end;
349 var i:byte;
350 BEGIN
351 Randomize;
352 fpSignal(SigInt,@SignalHandler);
353 fpSignal(SigTerm,@SignalHandler);
354 for i:=1 to high(hnd) do hnd[i]:=nil;
355 for i:=1 to high(PT) do PT[i]:=nil;
356 PT_opcodes:=[];
357 pollTop:=1; {1 for basic listen}
358 ShedTop:=nil;
359 ShedUU:=nil; {todo: allocate a few to improve paging}
360 fpgettimeofday(@LastShed,nil);
361 END.