GUI putis.
[brdnet.git] / ServerLoop.pas
blob13fe045c4f2d4b391cc81f79a9d04fd81dc4f090
1 UNIT ServerLoop;
3 INTERFACE
4 uses MemStream,NetAddr,UnixType;
6 procedure Main;
8 {#Message handling#}
9 type tSMsg=object
10 Source: ^tNetAddr;
11 Length: {Long}Word;
12 Data: pointer;
13 stream: tMemoryStream;
14 channel: word;
15 end;
16 type tMessageHandler=procedure(msg:tSMsg);
17 procedure SetMsgHandler(OpCode:byte; handler:tMessageHandler);
18 procedure SetHiMsgHandler(handler:tMessageHandler);
20 procedure SendMessage(const data; len:word; const rcpt:tNetAddr );
21 {procedure SendReply(const data; len:word; const rcpt:tSMsg );}
22 procedure SendMessage(const data; len:word; const rcpt:tNetAddr; channel:word );
24 {#Sheduling and watching#}
25 type tFDEventHandler=procedure(ev:Word) of object;
26 type tOnTimer=procedure of object;
27 procedure WatchFD(fd:tHandle; h:tFDEventHandler);
28 procedure Shedule(timeout{ms}: LongWord; h:tOnTimer);
29 procedure UnShedule(h:tOnTimer);
30 {note unshed will fail when called from OnTimer proc}
32 type tObjMessageHandler=procedure(msg:tSMsg) of object;
33 {deliver message from peer to the object}
34 procedure SetMsgHandler(OpCode:byte; from:tNetAddr; handler:tObjMessageHandler); overload;
35 function IsMsgHandled(OpCode:byte; from:tNetAddr):boolean;
37 function OptIndex(o:string):word;
38 function OptParamCount(o:word):word;
40 type tTimeVal=UnixType.timeval;
41 type tMTime=DWORD;
42 var iNow:tTimeVal;
43 var mNow:tMTime; { miliseconds since start }
44 {overflows in hunderd hours }
46 IMPLEMENTATION
48 USES SysUtils,Sockets,BaseUnix
49 ,Unix
52 {aim for most simple implementation, since could be extended anytime}
54 var s_inet:tSocket;
56 type tPollTop=0..7;
57 var pollArr: packed array [tPollTop] of tPollFd;
58 type tFdHndDsc=record
59 cb: tFDEventHandler; {proc+object}
60 end;
61 var pollHnd: array [tPollTop] of tFdHndDsc;
62 var pollTop: tPollTop;
64 var hnd: array [1..36] of tMessageHandler;
65 var HiHnd: tMessageHandler;
67 type tSheduled_ptr=^tSheduled; tSheduled=record
68 left:LongWord;
69 cb:tOnTimer;
70 next:tSheduled_ptr;
71 end;
72 var ShedTop: ^tSheduled;
73 var ShedUU: ^tSheduled;
74 var LastShed: UnixType.timeval;
75 var PollTimeout:LongInt;
76 var umNow:integer;
78 procedure SC(fn:pointer; retval:cint);
79 begin
80 if retval < 0 then begin
81 raise eXception.Create(Format('Socket error %d operation %P',[SocketError,fn]));
82 end;
83 end;
85 procedure s_SetupInet;
86 var bind_addr:tInetSockAddr;
87 var turnon:cint;
88 var oi:word;
89 begin
90 with bind_addr do begin
91 sin_family:=AF_INET;
92 oi:=OptIndex('-port');
93 if oi=0 then sin_port:=htons(3511)
94 else begin
95 assert(OptParamCount(oi)=1);
96 sin_port:=htons(StrToInt(paramstr(oi+1)));
97 end;
98 sin_addr.s_addr:=0; {any}
99 s_inet:=fpSocket(sin_family,SOCK_DGRAM,IPPROTO_UDP);
100 SC(@fpSocket,s_inet);
101 turnon:=IP_PMTUDISC_DO;
102 SC(@fpsetsockopt,fpsetsockopt(s_inet, IPPROTO_IP, IP_MTU_DISCOVER, @turnon, sizeof(turnon)));
103 end;
104 SC(@fpBind,fpBind(s_inet,@bind_addr,sizeof(bind_addr)));
105 with PollArr[0] do begin
106 fd:=s_inet;
107 events:=pollIN;
108 revents:=0;
109 end;
110 end;
112 var Terminated:boolean=false;
114 procedure SendMessage(const data; len:word; const rcpt:tSockAddrL );
115 begin
116 {SC(@fpsendto,}fpsendto(s_inet,@data,len,0,@rcpt,sizeof(sockaddr_in)){)};
117 end;
118 procedure SendMessage(const data; len:word; const rcpt:tNetAddr );
119 var sa:tSockAddrL;
120 begin
121 rcpt.ToSocket(sa);
122 SendMessage(data,len,sa);
123 end;
124 procedure SendMessage(const data; len:word; const rcpt:tNetAddr; channel:word );
125 begin
126 SendMessage(data,len,rcpt);
127 {todo: optimization??}
128 end;
130 procedure SignalHandler(sig:cint);CDecl;
131 begin
132 writeln;
133 if terminated then raise eControlC.Create('CtrlC DoubleTap') ;
134 Terminated:=true;
135 writeln('Shutdown requested');
136 end;
138 {index=iphash+opcode}
139 type tPeerTableBucket=record
140 opcode:byte;
141 remote:tNetAddr;
142 handler:tObjMessageHandler;
143 end;
144 var PT:array [0..255] of ^tPeerTableBucket;
145 var PT_opcodes: set of 1..high(hnd);
147 function FindPT(opcode:byte; addr:tNetAddr):Word; { $FFFF=fail}
148 var i,o:word;
149 begin
150 i:=(addr.hash+opcode) mod high(PT); {0..63}
151 for o:=0 to high(PT) do begin
152 result:=(i+o) mod high(PT);
153 if not assigned(PT[result]) then break;
154 if (PT[result]^.opcode=opcode) and (PT[result]^.remote=addr) then exit;
155 end;
156 result:=$FFFF;
157 end;
159 function IsMsgHandled(OpCode:byte; from:tNetAddr):boolean;
160 begin result:=FindPT(opcode,from)<>$FFFF end;
162 procedure UnSetMsgHandler(const from:tNetAddr; opcode:byte);
163 var i,h:word;
164 begin
165 h:=FindPT(opcode,from);
166 if h=$FFFF then exit;
167 Dispose(PT[h]);
168 PT[h]:=nil;
169 {go reverse exit on null, hash them, match: move to H and stop}
170 if h=0 then i:=high(PT) else i:=h-1;
171 while (i<>h)and assigned(PT[i]) do begin
172 if (PT[i]^.remote.hash+PT[i]^.opcode)=h then begin
173 PT[h]:=PT[i];
174 PT[i]:=nil;
175 break;
176 end;
177 if i=0 then i:=high(PT) else dec(i);
178 end;
179 end;
181 procedure SetMsgHandler(OpCode:byte; from:tNetAddr; handler:tObjMessageHandler);
182 var h,o,i:word;
183 begin
184 UnSetMsgHandler(from,opcode);
185 if handler=nil then exit;
186 h:=(from.hash+opcode) mod high(PT);
187 for o:=0 to high(PT) do begin
188 i:=(h+o) mod high(PT);
189 if not assigned(PT[i]) then break;
190 end;
191 New(PT[i]);
192 PT[i]^.opcode:=OpCode;
193 PT[i]^.remote:=from;
194 PT[i]^.handler:=handler;
195 if opcode<=high(hnd) then Include(PT_opcodes,opcode);
196 end;
198 {do not waste stack on statics}
199 var EventsCount:integer;
200 var Buffer:array [1..4096] of byte;
201 var pkLen:LongWord;
202 var From:tSockAddrL; {use larger struct so everything fits}
203 var FromLen:LongWord;
204 var FromG:tNetAddr;
205 var curhnd:tMessageHandler;
206 var curhndo:tObjMessageHandler;
207 var Msg:tSMsg;
208 var tp:tPollTop;
210 function DoSock(var p:tPollFD):boolean;
211 var ptidx:word;
212 begin
213 curhnd:=nil;
214 curhndo:=nil;
215 result:=false;
216 ptidx:=$FFFF;
217 if (p.revents and pollIN)=0 then exit else result:=true;
218 FromLen:=sizeof(From);
219 pkLen:=fprecvfrom(p.FD,@Buffer,sizeof(Buffer),0,@from,@fromlen);
220 SC(@fprecvfrom,pkLen);
221 p.revents:=0;
222 FromG.FromSocket(from);
223 Msg.Source:=@FromG; {!thread}
224 Msg.Length:=pkLen;
225 Msg.Data:=@Buffer; {!thread}
226 Msg.stream.Init(@Buffer,pkLen,sizeof(Buffer));
227 Msg.channel:=0; {!multisocket}
228 if Buffer[1]>=128 then curhnd:=HiHnd else if Buffer[1]<=high(hnd) then curhnd:=hnd[Buffer[1]];
229 if (Buffer[1]>high(hnd))or(Buffer[1] in PT_opcodes) then begin
230 ptidx:=FindPT(Buffer[1],FromG);
231 if ptidx<$FFFF then curhndo:=PT[ptidx]^.handler;
232 end;
233 end;
235 procedure ShedRun;
236 var cur:^tSheduled;
237 var pcur:^pointer;
238 var now:UnixType.timeval{ absolute iNow};
239 var delta:LongWord;
240 var delta_us:LongInt;
241 var tasks:word;
242 begin
243 {Sheduling}
244 {gmagic with delta-time, increment mNow, ...}
245 fpgettimeofday(@Now,nil);
246 delta:=(Now.tv_sec-LastShed.tv_sec);
247 delta_us:=Now.tv_usec-LastShed.tv_usec;
248 delta:=(delta*1000)+(delta_us div 1000);
249 umNow:=umNow+(delta_us mod 1000);
250 if delta>6000 then delta:=5000;
251 LastShed:=Now;
252 mNow:=mNow+Delta;
253 if umNow>1000 then begin inc(mNow); dec(umNow,1000) end;
254 if umNow<-1000 then begin dec(mNow); inc(umNow,1000) end;
255 //writeln('DeltaTime: ',delta);
256 {first tick all tasks}
257 tasks:=0;
258 cur:=ShedTop;
259 while assigned(cur) do begin
260 if cur^.left<=delta then cur^.left:=0 else begin
261 dec(cur^.left,delta);
262 {also set next wake time}
263 if cur^.left<PollTimeout then PollTimeout:=cur^.left;
264 end;
265 {count tasks here}
266 inc(tasks);
267 cur:=cur^.next;
268 end;
269 {correct floating-point glitch}
270 if pollTimeout=0 then pollTimeOut:=1;
271 {run first runnable task}
272 pcur:=@ShedTop;
273 cur:=pcur^;
274 while assigned(cur) do begin
275 if cur^.left=0 then begin
276 {unlink}
277 pcur^:=cur^.next;
278 {link to unused}
279 cur^.next:=ShedUU;
280 ShedUU:=cur;
281 {call}
282 cur^.cb;
283 {do rest later}
284 pollTimeout:=0;
285 break;
286 end;
287 pcur:=@cur^.next;
288 cur:=cur^.next;
289 end;
290 if delta >4990 then writeln('ServerLoop: tasks=',tasks);
291 end;
293 procedure Main;
294 begin
295 s_setupInet;
296 while not terminated do begin
297 PollTimeout:=5000;
298 ShedRun;
299 EventsCount:=fpPoll(@PollArr[0],PollTop,PollTimeout);
300 ShedRun;
301 if (eventscount=-1)and terminated then break;
302 if eventscount=-1 then break; {fixme: print error}
303 if eventscount=0 then continue else begin
304 {INET socket}
305 if DoSock(PollArr[0]) then
306 if assigned(curhndo) then curhndo(msg)
307 else if assigned(curhnd) then curhnd(msg)
308 else raise eXception.Create('No handler for opcode '+IntToStr(Buffer[1]));
309 {INET6...}
310 {Generic}
311 for tp:=1 to pollTop do if PollArr[tp].revents>0 then begin
312 PollHnd[tp].CB(PollArr[tp].rEvents);
313 PollArr[tp].revents:=0;
314 end;
315 end;
316 end;
317 write('Loop broken [');
318 CloseSocket(s_inet);
319 writeln(']');
320 end;
322 procedure SetMsgHandler(OpCode:byte; handler:tMessageHandler);
323 begin assert(hnd[OpCode]=nil); hnd[OpCode]:=handler; end;
324 procedure SetHiMsgHandler(handler:tMessageHandler);
325 begin Hihnd:=handler; end;
327 procedure WatchFD(fd:tHandle; h:tFDEventHandler);
328 var opt: tPollTop;
329 begin
330 if assigned(h) then begin
331 PollHnd[pollTop].CB:=h;
332 PollArr[pollTop].fd:=fd;
333 PollArr[pollTop].events:=POLLERR or POLLHUP or POLLIN or POLLPRI or
334 POLLRDBAND or POLLRDNORM;
335 PollArr[pollTop].revents:=0;
336 //writeln('Add watch ',pollTop,' on ',fd,' to ',IntToHex(qword(@h),8));
337 Inc(PollTop);
338 end else for opt:=0 to high(opt) do if PollArr[opt].fd=fd then begin
339 if (pollTop-1)>opt then begin
340 PollArr[opt]:=PollArr[pollTop-1];
341 PollHnd[opt]:=PollHnd[pollTop-1];
342 end;
343 dec(pollTop);
344 PollArr[pollTop].fd:=-1;
345 PollArr[pollTop].events:=0;
346 PollArr[pollTop].revents:=0;
347 break;
348 end;
349 end;
351 procedure Shedule(timeout{ms}: LongWord; h:tOnTimer);
352 var old:^tSheduled;
353 begin
354 old:=ShedTop;
355 if Assigned(ShedUU) then begin
356 ShedTop:=ShedUU;
357 ShedUU:=ShedUU^.next;
358 end else New(ShedTop);
359 ShedTop^.Left:=timeout;
360 ShedTop^.CB:=h;
361 ShedTop^.Next:=old;
362 end;
364 procedure UnShedule(h:tOnTimer);
365 var cur:^tSheduled;
366 var pcur:^pointer;
367 begin
368 //if ShedTop=nil then AbstractError;
369 pcur:=@ShedTop;
370 cur:=pcur^;
371 while assigned(cur) do begin
372 if 0=CompareByte(cur^.cb,h,sizeof(h)) then begin
373 pcur^:=cur^.next; {unlink from main list}
374 cur^.next:=ShedUU; ShedUU:=cur; {link to unused}
375 cur:=pcur^;
376 end else begin
377 pcur:=@cur^.next;
378 cur:=pcur^;
379 end;
380 end;
381 end;
383 function OptIndex(o:string):word;
384 begin
385 result:=paramcount;
386 while result>0 do begin
387 if o=system.paramstr(result) then break;
388 dec(result);
389 end;
390 end;
392 function OptParamCount(o:word):word;
393 var i:word;
394 begin
395 result:=0;
396 if o>0 then for i:=o+1 to paramcount do begin
397 if paramstr(i)[1]<>'-' then inc(result)
398 else break;
399 end;
400 end;
402 var i:byte;
403 BEGIN
404 mNow:=0;
405 umNow:=0;
406 Randomize;
407 fpSignal(SigInt,@SignalHandler);
408 fpSignal(SigTerm,@SignalHandler);
409 for i:=1 to high(hnd) do hnd[i]:=nil;
410 for i:=1 to high(PT) do PT[i]:=nil;
411 PT_opcodes:=[];
412 pollTop:=1; {1 for basic listen}
413 ShedTop:=nil;
414 ShedUU:=nil; {todo: allocate a few to improve paging}
415 fpgettimeofday(@LastShed,nil);
416 END.