Minor fixes in ServerLoop. Main program uses update.
[brdnet.git] / ServerLoop.pas
blob9b7e56d3c555a3c661cf9e6dddd7448e3e8bf7da
1 UNIT ServerLoop;
3 INTERFACE
4 uses MemStream,NetAddr,UnixType,Sockets;
6 procedure Main;
8 {#Message handling#}
9 type tSMsg=object
10 Source: ^tNetAddr;
11 Length: {Long}Word;
12 Data: pointer;
13 stream: tMemoryStream;
14 channel: word;
15 end;
16 type tMessageHandler=procedure(msg:tSMsg);
17 procedure SetMsgHandler(OpCode:byte; handler:tMessageHandler);
18 procedure SetHiMsgHandler(handler:tMessageHandler);
20 function GetSocket(const rcpt:tNetAddr):tSocket;
21 procedure SendMessage(const data; len:word; const rcpt:tNetAddr );
22 {procedure SendReply(const data; len:word; const rcpt:tSMsg );}
23 procedure SendMessage(const data; len:word; const rcpt:tNetAddr; channel:word );
25 {#Sheduling and watching#}
26 type tFDEventHandler=procedure(ev:Word) of object;
27 type tOnTimer=procedure of object;
28 procedure WatchFD(fd:tHandle; h:tFDEventHandler);
29 procedure Shedule(timeout{ms}: LongWord; h:tOnTimer);
30 procedure UnShedule(h:tOnTimer);
31 {note unshed will fail when called from OnTimer proc}
33 type tObjMessageHandler=procedure(msg:tSMsg) of object;
34 {deliver message from peer to the object}
35 procedure SetMsgHandler(OpCode:byte; from:tNetAddr; handler:tObjMessageHandler); overload;
36 function IsMsgHandled(OpCode:byte; from:tNetAddr):boolean;
38 function OptIndex(o:string):word;
39 function OptParamCount(o:word):word;
41 type tTimeVal=UnixType.timeval;
42 type tMTime=DWORD;
43 var iNow:tTimeVal;
44 var mNow:tMTime; { miliseconds since start }
45 {overflows in hunderd hours }
47 IMPLEMENTATION
49 USES SysUtils,BaseUnix
50 ,Unix
53 {aim for most simple implementation, since could be extended anytime}
55 var s_inet:tSocket;
57 type tPollTop=0..7;
58 var pollArr: packed array [tPollTop] of tPollFd;
59 type tFdHndDsc=record
60 cb: tFDEventHandler; {proc+object}
61 end;
62 var pollHnd: array [tPollTop] of tFdHndDsc;
63 var pollTop: tPollTop;
65 var hnd: array [1..36] of tMessageHandler;
66 var HiHnd: tMessageHandler;
68 type tSheduled_ptr=^tSheduled; tSheduled=record
69 left:LongWord;
70 cb:tOnTimer;
71 next:tSheduled_ptr;
72 end;
73 var ShedTop: ^tSheduled;
74 var ShedUU: ^tSheduled;
75 var LastShed: UnixType.timeval;
76 var PollTimeout:LongInt;
77 var umNow:integer;
79 procedure SC(fn:pointer; retval:cint);
80 begin
81 if retval < 0 then begin
82 raise eXception.Create(Format('Socket error %d operation %P',[SocketError,fn]));
83 end;
84 end;
86 procedure s_SetupInet;
87 var bind_addr:tInetSockAddr;
88 var turnon:cint;
89 var oi:word;
90 begin
91 with bind_addr do begin
92 sin_family:=AF_INET;
93 oi:=OptIndex('-port');
94 if oi=0 then sin_port:=htons(3511)
95 else begin
96 assert(OptParamCount(oi)=1);
97 sin_port:=htons(StrToInt(paramstr(oi+1)));
98 end;
99 sin_addr.s_addr:=0; {any}
100 s_inet:=fpSocket(sin_family,SOCK_DGRAM,IPPROTO_UDP);
101 SC(@fpSocket,s_inet);
102 turnon:=IP_PMTUDISC_DO;
103 SC(@fpsetsockopt,fpsetsockopt(s_inet, IPPROTO_IP, IP_MTU_DISCOVER, @turnon, sizeof(turnon)));
104 end;
105 SC(@fpBind,fpBind(s_inet,@bind_addr,sizeof(bind_addr)));
106 with PollArr[0] do begin
107 fd:=s_inet;
108 events:=pollIN;
109 revents:=0;
110 end;
111 end;
113 var Terminated:boolean=false;
115 function GetSocket(const rcpt:tNetAddr):tSocket;
116 begin
117 result:=s_inet;
118 end;
119 procedure SendMessage(const data; len:word; const rcpt:tSockAddrL );
120 begin
121 {SC(@fpsendto,}fpsendto(s_inet,@data,len,0,@rcpt,sizeof(sockaddr_in)){)};
122 end;
123 procedure SendMessage(const data; len:word; const rcpt:tNetAddr );
124 var sa:tSockAddrL;
125 begin
126 rcpt.ToSocket(sa);
127 SendMessage(data,len,sa);
128 end;
129 procedure SendMessage(const data; len:word; const rcpt:tNetAddr; channel:word );
130 begin
131 SendMessage(data,len,rcpt);
132 {todo: optimization??}
133 end;
135 procedure SignalHandler(sig:cint);CDecl;
136 begin
137 writeln;
138 if terminated then raise eControlC.Create('CtrlC DoubleTap') ;
139 Terminated:=true;
140 end;
142 {index=iphash+opcode}
143 type tPeerTableBucket=record
144 opcode:byte;
145 remote:tNetAddr;
146 handler:tObjMessageHandler;
147 end;
148 var PT:array [0..255] of ^tPeerTableBucket;
149 var PT_opcodes: set of 1..high(hnd);
151 function FindPT(opcode:byte; addr:tNetAddr):Word; { $FFFF=fail}
152 var i,o:word;
153 begin
154 i:=(addr.hash+opcode) mod high(PT); {0..63}
155 for o:=0 to high(PT) do begin
156 result:=(i+o) mod high(PT);
157 if not assigned(PT[result]) then break;
158 if (PT[result]^.opcode=opcode) and (PT[result]^.remote=addr) then exit;
159 end;
160 result:=$FFFF;
161 end;
163 function IsMsgHandled(OpCode:byte; from:tNetAddr):boolean;
164 begin result:=FindPT(opcode,from)<>$FFFF end;
166 procedure UnSetMsgHandler(const from:tNetAddr; opcode:byte);
167 var i,h:word;
168 begin
169 h:=FindPT(opcode,from);
170 if h=$FFFF then exit;
171 Dispose(PT[h]);
172 PT[h]:=nil;
173 {go reverse exit on null, hash them, match: move to H and stop}
174 if h=0 then i:=high(PT) else i:=h-1;
175 while (i<>h)and assigned(PT[i]) do begin
176 if (PT[i]^.remote.hash+PT[i]^.opcode)=h then begin
177 PT[h]:=PT[i];
178 PT[i]:=nil;
179 break;
180 end;
181 if i=0 then i:=high(PT) else dec(i);
182 end;
183 end;
185 procedure SetMsgHandler(OpCode:byte; from:tNetAddr; handler:tObjMessageHandler);
186 var h,o,i:word;
187 begin
188 UnSetMsgHandler(from,opcode);
189 if handler=nil then exit;
190 h:=(from.hash+opcode) mod high(PT);
191 for o:=0 to high(PT) do begin
192 i:=(h+o) mod high(PT);
193 if not assigned(PT[i]) then break;
194 end;
195 New(PT[i]);
196 PT[i]^.opcode:=OpCode;
197 PT[i]^.remote:=from;
198 PT[i]^.handler:=handler;
199 if opcode<=high(hnd) then Include(PT_opcodes,opcode);
200 end;
202 {do not waste stack on statics}
203 var EventsCount:integer;
204 var Buffer:array [1..4096] of byte;
205 var pkLen:LongWord;
206 var From:tSockAddrL; {use larger struct so everything fits}
207 var FromLen:LongWord;
208 var FromG:tNetAddr;
209 var curhnd:tMessageHandler;
210 var curhndo:tObjMessageHandler;
211 var Msg:tSMsg;
212 var tp:tPollTop;
214 function DoSock(var p:tPollFD):boolean;
215 var ptidx:word;
216 begin
217 curhnd:=nil;
218 curhndo:=nil;
219 result:=false;
220 ptidx:=$FFFF;
221 if (p.revents and pollIN)=0 then exit else result:=true;
222 FromLen:=sizeof(From);
223 pkLen:=fprecvfrom(p.FD,@Buffer,sizeof(Buffer),0,@from,@fromlen);
224 SC(@fprecvfrom,pkLen);
225 p.revents:=0;
226 FromG.FromSocket(from);
227 Msg.Source:=@FromG; {!thread}
228 Msg.Length:=pkLen;
229 Msg.Data:=@Buffer; {!thread}
230 Msg.stream.Init(@Buffer,pkLen,sizeof(Buffer));
231 Msg.channel:=0; {!multisocket}
232 if Buffer[1]>=128 then curhnd:=HiHnd else if Buffer[1]<=high(hnd) then curhnd:=hnd[Buffer[1]];
233 if (Buffer[1]>high(hnd))or(Buffer[1] in PT_opcodes) then begin
234 ptidx:=FindPT(Buffer[1],FromG);
235 if ptidx<$FFFF then curhndo:=PT[ptidx]^.handler;
236 end;
237 end;
239 procedure ShedRun;
240 var cur:^tSheduled;
241 var pcur:^pointer;
242 var now:UnixType.timeval{ absolute iNow};
243 var delta:LongWord;
244 var delta_us:LongInt;
245 var tasks:word;
246 begin
247 {Sheduling}
248 {gmagic with delta-time, increment mNow, ...}
249 fpgettimeofday(@Now,nil);
250 delta:=(Now.tv_sec-LastShed.tv_sec);
251 delta_us:=Now.tv_usec-LastShed.tv_usec;
252 delta:=(delta*1000)+(delta_us div 1000);
253 umNow:=umNow+(delta_us mod 1000);
254 if delta>6000 then delta:=5000;
255 LastShed:=Now;
256 mNow:=mNow+Delta;
257 if umNow>1000 then begin inc(mNow); dec(umNow,1000) end;
258 if umNow<-1000 then begin dec(mNow); inc(umNow,1000) end;
259 //writeln('DeltaTime: ',delta);
260 {first tick all tasks}
261 tasks:=0;
262 cur:=ShedTop;
263 while assigned(cur) do begin
264 if cur^.left<=delta then cur^.left:=0 else begin
265 dec(cur^.left,delta);
266 {also set next wake time}
267 if cur^.left<PollTimeout then PollTimeout:=cur^.left;
268 end;
269 {count tasks here}
270 inc(tasks);
271 cur:=cur^.next;
272 end;
273 {correct floating-point glitch}
274 if pollTimeout=0 then pollTimeOut:=1;
275 {run first runnable task}
276 pcur:=@ShedTop;
277 cur:=pcur^;
278 while assigned(cur) do begin
279 if cur^.left=0 then begin
280 {unlink}
281 pcur^:=cur^.next;
282 {link to unused}
283 cur^.next:=ShedUU;
284 ShedUU:=cur;
285 {call}
286 cur^.cb;
287 {do rest later}
288 pollTimeout:=0;
289 break;
290 end;
291 pcur:=@cur^.next;
292 cur:=cur^.next;
293 end;
294 end;
296 procedure Main;
297 begin
298 s_setupInet;
299 while not terminated do begin
300 PollTimeout:=5000;
301 ShedRun;
302 EventsCount:=fpPoll(@PollArr[0],PollTop,PollTimeout);
303 ShedRun;
304 if (eventscount=-1)and terminated then break;
305 if eventscount=-1 then break; {fixme: print error}
306 if eventscount=0 then continue else begin
307 {INET socket}
308 if DoSock(PollArr[0]) then
309 if assigned(curhndo) then curhndo(msg)
310 else if assigned(curhnd) then curhnd(msg)
311 else {raise eXception.Create('}writeln('ServerLoop: No handler for opcode '+IntToStr(Buffer[1]));
312 {INET6...}
313 {Generic}
314 for tp:=1 to pollTop do if PollArr[tp].revents>0 then begin
315 PollHnd[tp].CB(PollArr[tp].rEvents);
316 PollArr[tp].revents:=0;
317 end;
318 end;
319 end;
320 CloseSocket(s_inet);
321 end;
323 procedure SetMsgHandler(OpCode:byte; handler:tMessageHandler);
324 begin assert(hnd[OpCode]=nil); hnd[OpCode]:=handler; end;
325 procedure SetHiMsgHandler(handler:tMessageHandler);
326 begin Hihnd:=handler; end;
328 procedure WatchFD(fd:tHandle; h:tFDEventHandler);
329 var opt: tPollTop;
330 begin
331 if assigned(h) then begin
332 PollHnd[pollTop].CB:=h;
333 PollArr[pollTop].fd:=fd;
334 PollArr[pollTop].events:=POLLERR or POLLHUP or POLLIN or POLLPRI or
335 POLLRDBAND or POLLRDNORM;
336 PollArr[pollTop].revents:=0;
337 //writeln('Add watch ',pollTop,' on ',fd,' to ',IntToHex(qword(@h),8));
338 Inc(PollTop);
339 end else for opt:=0 to high(opt) do if PollArr[opt].fd=fd then begin
340 if (pollTop-1)>opt then begin
341 PollArr[opt]:=PollArr[pollTop-1];
342 PollHnd[opt]:=PollHnd[pollTop-1];
343 end;
344 dec(pollTop);
345 PollArr[pollTop].fd:=-1;
346 PollArr[pollTop].events:=0;
347 PollArr[pollTop].revents:=0;
348 break;
349 end;
350 end;
352 procedure Shedule(timeout{ms}: LongWord; h:tOnTimer);
353 var old:^tSheduled;
354 begin
355 old:=ShedTop;
356 if Assigned(ShedUU) then begin
357 ShedTop:=ShedUU;
358 ShedUU:=ShedUU^.next;
359 end else New(ShedTop);
360 ShedTop^.Left:=timeout;
361 ShedTop^.CB:=h;
362 ShedTop^.Next:=old;
363 end;
365 procedure UnShedule(h:tOnTimer);
366 var cur:^tSheduled;
367 var pcur:^pointer;
368 begin
369 //if ShedTop=nil then AbstractError;
370 pcur:=@ShedTop;
371 cur:=pcur^;
372 while assigned(cur) do begin
373 if 0=CompareByte(cur^.cb,h,sizeof(h)) then begin
374 pcur^:=cur^.next; {unlink from main list}
375 cur^.next:=ShedUU; ShedUU:=cur; {link to unused}
376 cur:=pcur^;
377 end else begin
378 pcur:=@cur^.next;
379 cur:=pcur^;
380 end;
381 end;
382 end;
384 function OptIndex(o:string):word;
385 begin
386 result:=paramcount;
387 while result>0 do begin
388 if o=system.paramstr(result) then break;
389 dec(result);
390 end;
391 end;
393 function OptParamCount(o:word):word;
394 var i:word;
395 begin
396 result:=0;
397 if o>0 then for i:=o+1 to paramcount do begin
398 if paramstr(i)[1]<>'-' then inc(result)
399 else break;
400 end;
401 end;
403 var i:byte;
404 BEGIN
405 mNow:=0;
406 umNow:=0;
407 Randomize;
408 fpSignal(SigInt,@SignalHandler);
409 fpSignal(SigTerm,@SignalHandler);
410 for i:=1 to high(hnd) do hnd[i]:=nil;
411 for i:=1 to high(PT) do PT[i]:=nil;
412 PT_opcodes:=[];
413 pollTop:=1; {1 for basic listen}
414 ShedTop:=nil;
415 ShedUU:=nil; {todo: allocate a few to improve paging}
416 fpgettimeofday(@LastShed,nil);
417 END.