Few fixes in support units.
[brdnet.git] / ServerLoop.pas
blobc27d882d61a967cc9907c51355a6ebec2e4b5977
1 UNIT ServerLoop;
3 INTERFACE
4 uses MemStream,NetAddr,UnixType;
6 procedure Main;
8 {#Message handling#}
9 type tSMsg=object
10 Source: ^tNetAddr;
11 Length: {Long}Word;
12 Data: pointer;
13 stream: tMemoryStream;
14 channel: word;
15 end;
16 type tMessageHandler=procedure(msg:tSMsg);
17 procedure SetMsgHandler(OpCode:byte; handler:tMessageHandler);
18 procedure SetHiMsgHandler(handler:tMessageHandler);
20 procedure SendMessage(const data; len:word; const rcpt:tNetAddr );
21 {procedure SendReply(const data; len:word; const rcpt:tSMsg );}
22 procedure SendMessage(const data; len:word; const rcpt:tNetAddr; channel:word );
24 {#Sheduling and watching#}
25 type tFDEventHandler=procedure(ev:Word) of object;
26 type tOnTimer=procedure of object;
27 procedure WatchFD(fd:tHandle; h:tFDEventHandler);
28 procedure Shedule(timeout{ms}: LongWord; h:tOnTimer);
29 procedure UnShedule(h:tOnTimer);
30 {note unshed will fail when called from OnTimer proc}
32 type tObjMessageHandler=procedure(msg:tSMsg) of object;
33 {deliver message from peer to the object}
34 procedure SetMsgHandler(OpCode:byte; from:tNetAddr; handler:tObjMessageHandler); overload;
35 function IsMsgHandled(OpCode:byte; from:tNetAddr):boolean;
37 function OptIndex(o:string):word;
38 function OptParamCount(o:word):word;
40 type tTimeVal=UnixType.timeval;
41 var iNow:tTimeVal;
43 IMPLEMENTATION
45 USES SysUtils,Sockets,BaseUnix
46 ,Unix
49 {aim for most simple implementation, since could be extended anytime}
51 var s_inet:tSocket;
53 type tPollTop=0..7;
54 var pollArr: packed array [tPollTop] of tPollFd;
55 type tFdHndDsc=record
56 cb: tFDEventHandler; {proc+object}
57 end;
58 var pollHnd: array [tPollTop] of tFdHndDsc;
59 var pollTop: tPollTop;
61 var hnd: array [1..36] of tMessageHandler;
62 var HiHnd: tMessageHandler;
64 type tSheduled_ptr=^tSheduled; tSheduled=record
65 left:LongWord;
66 cb:tOnTimer;
67 next:tSheduled_ptr;
68 end;
69 var ShedTop: ^tSheduled;
70 var ShedUU: ^tSheduled;
71 var LastShed: UnixType.timeval;
72 var PollTimeout:LongInt;
74 procedure SC(fn:pointer; retval:cint);
75 begin
76 if retval < 0 then begin
77 raise eXception.Create(Format('Socket error %d operation %P',[SocketError,fn]));
78 end;
79 end;
81 procedure s_SetupInet;
82 var bind_addr:tInetSockAddr;
83 var turnon:cint;
84 var oi:word;
85 begin
86 with bind_addr do begin
87 sin_family:=AF_INET;
88 oi:=OptIndex('-port');
89 if oi=0 then sin_port:=htons(3511)
90 else begin
91 assert(OptParamCount(oi)=1);
92 sin_port:=htons(StrToInt(paramstr(oi+1)));
93 end;
94 sin_addr.s_addr:=0; {any}
95 s_inet:=fpSocket(sin_family,SOCK_DGRAM,IPPROTO_UDP);
96 SC(@fpSocket,s_inet);
97 turnon:=IP_PMTUDISC_DO;
98 SC(@fpsetsockopt,fpsetsockopt(s_inet, IPPROTO_IP, IP_MTU_DISCOVER, @turnon, sizeof(turnon)));
99 end;
100 SC(@fpBind,fpBind(s_inet,@bind_addr,sizeof(bind_addr)));
101 with PollArr[0] do begin
102 fd:=s_inet;
103 events:=pollIN;
104 revents:=0;
105 end;
106 end;
108 var Terminated:boolean=false;
110 procedure SendMessage(const data; len:word; const rcpt:tSockAddrL );
111 begin
112 {SC(@fpsendto,}fpsendto(s_inet,@data,len,0,@rcpt,sizeof(sockaddr_in)){)};
113 end;
114 procedure SendMessage(const data; len:word; const rcpt:tNetAddr );
115 var sa:tSockAddrL;
116 begin
117 rcpt.ToSocket(sa);
118 SendMessage(data,len,sa);
119 end;
120 procedure SendMessage(const data; len:word; const rcpt:tNetAddr; channel:word );
121 begin
122 SendMessage(data,len,rcpt);
123 {todo: optimization??}
124 end;
126 procedure SignalHandler(sig:cint);CDecl;
127 begin
128 writeln;
129 if terminated then raise eControlC.Create('CtrlC DoubleTap') ;
130 Terminated:=true;
131 writeln('Shutdown requested');
132 end;
134 {index=iphash+opcode}
135 type tPeerTableBucket=record
136 opcode:byte;
137 remote:tNetAddr;
138 handler:tObjMessageHandler;
139 end;
140 var PT:array [0..255] of ^tPeerTableBucket;
141 var PT_opcodes: set of 1..high(hnd);
143 function FindPT(opcode:byte; addr:tNetAddr):Word; { $FFFF=fail}
144 var i,o:word;
145 begin
146 i:=(addr.hash+opcode) mod high(PT); {0..63}
147 for o:=0 to high(PT) do begin
148 result:=(i+o) mod high(PT);
149 if not assigned(PT[result]) then break;
150 if (PT[result]^.opcode=opcode) and (PT[result]^.remote=addr) then exit;
151 end;
152 result:=$FFFF;
153 end;
155 function IsMsgHandled(OpCode:byte; from:tNetAddr):boolean;
156 begin result:=FindPT(opcode,from)<>$FFFF end;
158 procedure UnSetMsgHandler(const from:tNetAddr; opcode:byte);
159 var i,h:word;
160 begin
161 h:=FindPT(opcode,from);
162 if h=$FFFF then exit;
163 Dispose(PT[h]);
164 PT[h]:=nil;
165 {go reverse exit on null, hash them, match: move to H and stop}
166 if h=0 then i:=high(PT) else i:=h-1;
167 while (i<>h)and assigned(PT[i]) do begin
168 if (PT[i]^.remote.hash+PT[i]^.opcode)=h then begin
169 PT[h]:=PT[i];
170 PT[i]:=nil;
171 break;
172 end;
173 if i=0 then i:=high(PT) else dec(i);
174 end;
175 end;
177 procedure SetMsgHandler(OpCode:byte; from:tNetAddr; handler:tObjMessageHandler);
178 var h,o,i:word;
179 begin
180 UnSetMsgHandler(from,opcode);
181 if handler=nil then exit;
182 h:=(from.hash+opcode) mod high(PT);
183 for o:=0 to high(PT) do begin
184 i:=(h+o) mod high(PT);
185 if not assigned(PT[i]) then break;
186 end;
187 New(PT[i]);
188 PT[i]^.opcode:=OpCode;
189 PT[i]^.remote:=from;
190 PT[i]^.handler:=handler;
191 if opcode<=high(hnd) then Include(PT_opcodes,opcode);
192 end;
194 {do not waste stack on statics}
195 var EventsCount:integer;
196 var Buffer:array [1..1024] of byte;
197 var pkLen:LongWord;
198 var From:tSockAddrL; {use larger struct so everything fits}
199 var FromLen:LongWord;
200 var FromG:tNetAddr;
201 var curhnd:tMessageHandler;
202 var curhndo:tObjMessageHandler;
203 var Msg:tSMsg;
204 var tp:tPollTop;
206 function DoSock(var p:tPollFD):boolean;
207 var ptidx:word;
208 begin
209 curhnd:=nil;
210 curhndo:=nil;
211 result:=false;
212 ptidx:=$FFFF;
213 if (p.revents and pollIN)=0 then exit else result:=true;
214 FromLen:=sizeof(From);
215 pkLen:=fprecvfrom(p.FD,@Buffer,sizeof(Buffer),0,@from,@fromlen);
216 SC(@fprecvfrom,pkLen);
217 p.revents:=0;
218 FromG.FromSocket(from);
219 Msg.Source:=@FromG; {!thread}
220 Msg.Length:=pkLen;
221 Msg.Data:=@Buffer; {!thread}
222 Msg.stream.Init(@Buffer,pkLen,sizeof(Buffer));
223 Msg.channel:=0; {!multisocket}
224 if Buffer[1]>=128 then curhnd:=HiHnd else if Buffer[1]<=high(hnd) then curhnd:=hnd[Buffer[1]];
225 if (Buffer[1]>high(hnd))or(Buffer[1] in PT_opcodes) then begin
226 ptidx:=FindPT(Buffer[1],FromG);
227 if ptidx<$FFFF then curhndo:=PT[ptidx]^.handler;
228 end;
229 end;
231 procedure ShedRun;
232 var cur:^tSheduled;
233 var pcur:^pointer;
234 var now:UnixType.timeval absolute iNow;
235 var delta:LongWord;
236 var tasks:word;
237 begin
238 {Sheduling}
239 {fixme: proste niak aby to šlo vymazať z callbacku}
240 {prejdi ich od zaciatku,
241 po spusteni jednej, chod zas na zaciatok
243 pcur:=@ShedTop;
244 cur:=pcur^;
245 fpgettimeofday(@Now,nil);
246 delta:=(Now.tv_sec-LastShed.tv_sec);
247 if delta>6 then delta:=5000 else delta:=(delta*1000)+((Now.tv_usec-LastShed.tv_usec) div 1000);
248 LastShed:=Now;
249 //writeln('DeltaTime: ',delta);
250 while assigned(cur) do begin
251 if (cur^.left<=delta)or(cur^.left=0) then begin
252 {unlink}
253 pcur^:=cur^.next;
254 {link to unused}
255 cur^.next:=ShedUU;
256 ShedUU:=cur;
257 {call}
258 cur^.cb;
259 {go to beginning}
260 break;
261 pcur:=@ShedTop;
262 cur:=pcur^;
263 end else begin
264 DEC(cur^.left,delta);
265 //writeln('Left: ',cur^.left);
266 if pollTimeOut>cur^.left then PollTimeOut:=cur^.left;
267 pcur:=@cur^.next;
268 cur:=cur^.next;
269 end;
270 end;
271 cur:=ShedTop;
272 tasks:=0;
273 while assigned(cur) do begin
274 if cur^.left<PollTimeout then PollTimeout:=cur^.left;
275 cur:=cur^.next;
276 inc(tasks);
277 end;
278 if pollTimeout=0 then pollTimeOut:=1;
279 //if delta >4990 then writeln('ServerLoop: tasks=',tasks);
280 end;
282 procedure Main;
283 begin
284 s_setupInet;
285 while not terminated do begin
286 PollTimeout:=5000;
287 ShedRun;
288 EventsCount:=fpPoll(@PollArr[0],PollTop,PollTimeout);
289 ShedRun;
290 if (eventscount=-1)and terminated then break;
291 if eventscount=-1 then break; {fixme: print error}
292 if eventscount=0 then continue else begin
293 {INET socket}
294 if DoSock(PollArr[0]) then
295 if assigned(curhndo) then curhndo(msg)
296 else if assigned(curhnd) then curhnd(msg)
297 else raise eXception.Create('No handler for opcode '+IntToStr(Buffer[1]));
298 {INET6...}
299 {Generic}
300 for tp:=1 to pollTop do if PollArr[tp].revents>0 then begin
301 PollHnd[tp].CB(PollArr[tp].rEvents);
302 PollArr[tp].revents:=0;
303 end;
304 end;
305 end;
306 write('Loop broken [');
307 CloseSocket(s_inet);
308 writeln(']');
309 end;
311 procedure SetMsgHandler(OpCode:byte; handler:tMessageHandler);
312 begin assert(hnd[OpCode]=nil); hnd[OpCode]:=handler; end;
313 procedure SetHiMsgHandler(handler:tMessageHandler);
314 begin Hihnd:=handler; end;
316 procedure WatchFD(fd:tHandle; h:tFDEventHandler);
317 var opt: tPollTop;
318 begin
319 if assigned(h) then begin
320 PollHnd[pollTop].CB:=h;
321 PollArr[pollTop].fd:=fd;
322 PollArr[pollTop].events:=POLLERR or POLLHUP or POLLIN or POLLPRI or
323 POLLRDBAND or POLLRDNORM;
324 PollArr[pollTop].revents:=0;
325 //writeln('Add watch ',pollTop,' on ',fd,' to ',IntToHex(qword(@h),8));
326 Inc(PollTop);
327 end else for opt:=0 to high(opt) do if PollArr[opt].fd=fd then begin
328 if (pollTop-1)>opt then begin
329 PollArr[opt]:=PollArr[pollTop-1];
330 PollHnd[opt]:=PollHnd[pollTop-1];
331 end;
332 dec(pollTop);
333 PollArr[pollTop].fd:=-1;
334 PollArr[pollTop].events:=0;
335 PollArr[pollTop].revents:=0;
336 break;
337 end;
338 end;
340 procedure Shedule(timeout{ms}: LongWord; h:tOnTimer);
341 var old:^tSheduled;
342 begin
343 old:=ShedTop;
344 if Assigned(ShedUU) then begin
345 ShedTop:=ShedUU;
346 ShedUU:=ShedUU^.next;
347 end else New(ShedTop);
348 ShedTop^.Left:=timeout;
349 ShedTop^.CB:=h;
350 ShedTop^.Next:=old;
351 end;
353 procedure UnShedule(h:tOnTimer);
354 var cur:^tSheduled;
355 var pcur:^pointer;
356 begin
357 //if ShedTop=nil then AbstractError;
358 pcur:=@ShedTop;
359 cur:=pcur^;
360 while assigned(cur) do begin
361 if 0=CompareByte(cur^.cb,h,sizeof(h)) then begin
362 pcur^:=cur^.next; {unlink from main list}
363 cur^.next:=ShedUU; ShedUU:=cur; {link to unused}
364 cur:=pcur^;
365 end else begin
366 pcur:=@cur^.next;
367 cur:=pcur^;
368 end;
369 end;
370 end;
372 function OptIndex(o:string):word;
373 begin
374 result:=paramcount;
375 while result>0 do begin
376 if o=system.paramstr(result) then break;
377 dec(result);
378 end;
379 end;
381 function OptParamCount(o:word):word;
382 var i:word;
383 begin
384 result:=0;
385 if o>0 then for i:=o+1 to paramcount do begin
386 if paramstr(i)[1]<>'-' then inc(result)
387 else break;
388 end;
389 end;
391 var i:byte;
392 BEGIN
393 Randomize;
394 fpSignal(SigInt,@SignalHandler);
395 fpSignal(SigTerm,@SignalHandler);
396 for i:=1 to high(hnd) do hnd[i]:=nil;
397 for i:=1 to high(PT) do PT[i]:=nil;
398 PT_opcodes:=[];
399 pollTop:=1; {1 for basic listen}
400 ShedTop:=nil;
401 ShedUU:=nil; {todo: allocate a few to improve paging}
402 fpgettimeofday(@LastShed,nil);
403 END.