Add DHT id option and random id option.
[brdnet.git] / ServerLoop.pas
blob174d86cc7539a626543dab88519a4f78bc7c13d4
1 UNIT ServerLoop;
3 INTERFACE
4 uses MemStream,NetAddr,UnixType,Sockets;
6 procedure Main;
8 {#Message handling#}
9 type tSMsg=object
10 Source: ^tNetAddr;
11 Length: {Long}Word;
12 Data: pointer;
13 stream: tMemoryStream;
14 channel: word;
15 end;
16 type tMessageHandler=procedure(msg:tSMsg);
17 procedure SetMsgHandler(OpCode:byte; handler:tMessageHandler);
18 procedure SetHiMsgHandler(handler:tMessageHandler);
20 function GetSocket(const rcpt:tNetAddr):tSocket;
21 procedure SendMessage(const data; len:word; const rcpt:tNetAddr );
22 {procedure SendReply(const data; len:word; const rcpt:tSMsg );}
23 procedure SendMessage(const data; len:word; const rcpt:tNetAddr; channel:word );
25 {#Sheduling and watching#}
26 type tFDEventHandler=procedure(ev:Word) of object;
27 type tOnTimer=procedure of object;
28 procedure WatchFD(fd:tHandle; h:tFDEventHandler);
29 procedure Shedule(timeout{ms}: LongWord; h:tOnTimer);
30 procedure UnShedule(h:tOnTimer);
31 {note unshed will fail when called from OnTimer proc}
33 type tObjMessageHandler=procedure(msg:tSMsg) of object;
34 {deliver message from peer to the object}
35 procedure SetMsgHandler(OpCode:byte; from:tNetAddr; handler:tObjMessageHandler); overload;
36 function IsMsgHandled(OpCode:byte; from:tNetAddr):boolean;
38 function OptIndex(o:string):word;
39 function OptParamCount(o:word):word;
41 var OnTerminate:procedure;
43 type tTimeVal=UnixType.timeval;
44 type tMTime=DWORD;
45 var iNow:tTimeVal;
46 var mNow:tMTime; { miliseconds since start }
47 {overflows in hunderd hours }
49 IMPLEMENTATION
51 USES SysUtils,BaseUnix
52 ,Unix
55 {aim for most simple implementation, since could be extended anytime}
57 var s_inet:tSocket;
59 type tPollTop=0..7;
60 var pollArr: packed array [tPollTop] of tPollFd;
61 type tFdHndDsc=record
62 cb: tFDEventHandler; {proc+object}
63 end;
64 var pollHnd: array [tPollTop] of tFdHndDsc;
65 var pollTop: tPollTop;
67 var hnd: array [1..36] of tMessageHandler;
68 var HiHnd: tMessageHandler;
70 type tSheduled_ptr=^tSheduled; tSheduled=record
71 left:LongWord;
72 cb:tOnTimer;
73 next:tSheduled_ptr;
74 end;
75 var ShedTop: ^tSheduled;
76 var ShedUU: ^tSheduled;
77 var LastShed: UnixType.timeval;
78 var PollTimeout:LongInt;
79 var umNow:integer;
81 procedure SC(fn:pointer; retval:cint);
82 begin
83 if retval < 0 then begin
84 raise eXception.Create(Format('Socket error %d operation %P',[SocketError,fn]));
85 end;
86 end;
88 procedure s_SetupInet;
89 var bind_addr:tInetSockAddr;
90 var turnon:cint;
91 var oi:word;
92 begin
93 with bind_addr do begin
94 sin_family:=AF_INET;
95 oi:=OptIndex('-port');
96 if oi=0 then sin_port:=htons(3511)
97 else begin
98 assert(OptParamCount(oi)=1);
99 sin_port:=htons(StrToInt(paramstr(oi+1)));
100 end;
101 sin_addr.s_addr:=0; {any}
102 s_inet:=fpSocket(sin_family,SOCK_DGRAM,IPPROTO_UDP);
103 SC(@fpSocket,s_inet);
104 turnon:=IP_PMTUDISC_DO;
105 SC(@fpsetsockopt,fpsetsockopt(s_inet, IPPROTO_IP, IP_MTU_DISCOVER, @turnon, sizeof(turnon)));
106 end;
107 SC(@fpBind,fpBind(s_inet,@bind_addr,sizeof(bind_addr)));
108 with PollArr[0] do begin
109 fd:=s_inet;
110 events:=pollIN;
111 revents:=0;
112 end;
113 end;
115 var Terminated:boolean=false;
117 function GetSocket(const rcpt:tNetAddr):tSocket;
118 begin
119 result:=s_inet;
120 end;
121 procedure SendMessage(const data; len:word; const rcpt:tSockAddrL );
122 begin
123 {SC(@fpsendto,}fpsendto(s_inet,@data,len,0,@rcpt,sizeof(sockaddr_in)){)};
124 end;
125 procedure SendMessage(const data; len:word; const rcpt:tNetAddr );
126 var sa:tSockAddrL;
127 begin
128 rcpt.ToSocket(sa);
129 SendMessage(data,len,sa);
130 end;
131 procedure SendMessage(const data; len:word; const rcpt:tNetAddr; channel:word );
132 begin
133 SendMessage(data,len,rcpt);
134 {todo: optimization??}
135 end;
137 procedure SignalHandler(sig:cint);CDecl;
138 begin
139 writeln;
140 if terminated then raise eControlC.Create('CtrlC DoubleTap') ;
141 Terminated:=true;
142 end;
144 {index=iphash+opcode}
145 type tPeerTableBucket=record
146 opcode:byte;
147 remote:tNetAddr;
148 handler:tObjMessageHandler;
149 end;
150 var PT:array [0..255] of ^tPeerTableBucket;
151 var PT_opcodes: set of 1..high(hnd);
153 function FindPT(opcode:byte; addr:tNetAddr):Word; { $FFFF=fail}
154 var i,o:word;
155 begin
156 i:=(addr.hash+opcode) mod high(PT); {0..63}
157 for o:=0 to high(PT) do begin
158 result:=(i+o) mod high(PT);
159 if not assigned(PT[result]) then break;
160 if (PT[result]^.opcode=opcode) and (PT[result]^.remote=addr) then exit;
161 end;
162 result:=$FFFF;
163 end;
165 function IsMsgHandled(OpCode:byte; from:tNetAddr):boolean;
166 begin result:=FindPT(opcode,from)<>$FFFF end;
168 procedure UnSetMsgHandler(const from:tNetAddr; opcode:byte);
169 var i,h:word;
170 begin
171 h:=FindPT(opcode,from);
172 if h=$FFFF then exit;
173 Dispose(PT[h]);
174 PT[h]:=nil;
175 {go reverse exit on null, hash them, match: move to H and stop}
176 if h=0 then i:=high(PT) else i:=h-1;
177 while (i<>h)and assigned(PT[i]) do begin
178 if (PT[i]^.remote.hash+PT[i]^.opcode)=h then begin
179 PT[h]:=PT[i];
180 PT[i]:=nil;
181 break;
182 end;
183 if i=0 then i:=high(PT) else dec(i);
184 end;
185 end;
187 procedure SetMsgHandler(OpCode:byte; from:tNetAddr; handler:tObjMessageHandler);
188 var h,o,i:word;
189 begin
190 UnSetMsgHandler(from,opcode);
191 if handler=nil then exit;
192 h:=(from.hash+opcode) mod high(PT);
193 for o:=0 to high(PT) do begin
194 i:=(h+o) mod high(PT);
195 if not assigned(PT[i]) then break;
196 end;
197 New(PT[i]);
198 PT[i]^.opcode:=OpCode;
199 PT[i]^.remote:=from;
200 PT[i]^.handler:=handler;
201 if opcode<=high(hnd) then Include(PT_opcodes,opcode);
202 end;
204 {do not waste stack on statics}
205 var EventsCount:integer;
206 var Buffer:array [1..4096] of byte;
207 var pkLen:LongWord;
208 var From:tSockAddrL; {use larger struct so everything fits}
209 var FromLen:LongWord;
210 var FromG:tNetAddr;
211 var curhnd:tMessageHandler;
212 var curhndo:tObjMessageHandler;
213 var Msg:tSMsg;
214 var tp:tPollTop;
216 function DoSock(var p:tPollFD):boolean;
217 var ptidx:word;
218 begin
219 curhnd:=nil;
220 curhndo:=nil;
221 result:=false;
222 ptidx:=$FFFF;
223 if (p.revents and pollIN)=0 then exit else result:=true;
224 FromLen:=sizeof(From);
225 pkLen:=fprecvfrom(p.FD,@Buffer,sizeof(Buffer),0,@from,@fromlen);
226 SC(@fprecvfrom,pkLen);
227 p.revents:=0;
228 FromG.FromSocket(from);
229 Msg.Source:=@FromG; {!thread}
230 Msg.Length:=pkLen;
231 Msg.Data:=@Buffer; {!thread}
232 Msg.stream.Init(@Buffer,pkLen,sizeof(Buffer));
233 Msg.channel:=0; {!multisocket}
234 if Buffer[1]>=128 then curhnd:=HiHnd else if Buffer[1]<=high(hnd) then curhnd:=hnd[Buffer[1]];
235 if (Buffer[1]>high(hnd))or(Buffer[1] in PT_opcodes) then begin
236 ptidx:=FindPT(Buffer[1],FromG);
237 if ptidx<$FFFF then curhndo:=PT[ptidx]^.handler;
238 end;
239 end;
241 procedure ShedRun;
242 var cur:^tSheduled;
243 var pcur:^pointer;
244 var now:UnixType.timeval{ absolute iNow};
245 var delta:LongWord;
246 var delta_us:LongInt;
247 var tasks:word;
248 begin
249 {Sheduling}
250 {gmagic with delta-time, increment mNow, ...}
251 fpgettimeofday(@Now,nil);
252 delta:=(Now.tv_sec-LastShed.tv_sec);
253 delta_us:=Now.tv_usec-LastShed.tv_usec;
254 delta:=(delta*1000)+(delta_us div 1000);
255 umNow:=umNow+(delta_us mod 1000);
256 if delta>6000 then delta:=5000;
257 LastShed:=Now;
258 mNow:=mNow+Delta;
259 if umNow>1000 then begin inc(mNow); dec(umNow,1000) end;
260 if umNow<-1000 then begin dec(mNow); inc(umNow,1000) end;
261 //writeln('DeltaTime: ',delta);
262 {first tick all tasks}
263 tasks:=0;
264 cur:=ShedTop;
265 while assigned(cur) do begin
266 if cur^.left<=delta then cur^.left:=0 else begin
267 dec(cur^.left,delta);
268 {also set next wake time}
269 if cur^.left<PollTimeout then PollTimeout:=cur^.left;
270 end;
271 {count tasks here}
272 inc(tasks);
273 cur:=cur^.next;
274 end;
275 {correct floating-point glitch}
276 if pollTimeout=0 then pollTimeOut:=1;
277 {run first runnable task}
278 pcur:=@ShedTop;
279 cur:=pcur^;
280 while assigned(cur) do begin
281 if cur^.left=0 then begin
282 {unlink}
283 pcur^:=cur^.next;
284 {link to unused}
285 cur^.next:=ShedUU;
286 ShedUU:=cur;
287 {call}
288 cur^.cb;
289 {do rest later}
290 pollTimeout:=0;
291 break;
292 end;
293 pcur:=@cur^.next;
294 cur:=cur^.next;
295 end;
296 end;
298 procedure Main;
299 begin
300 s_setupInet;
301 while not terminated do begin
302 PollTimeout:=5000;
303 ShedRun;
304 EventsCount:=fpPoll(@PollArr[0],PollTop,PollTimeout);
305 ShedRun;
306 if (eventscount=-1)and terminated then break;
307 if eventscount=-1 then break; {fixme: print error}
308 if eventscount=0 then continue else begin
309 {INET socket}
310 if DoSock(PollArr[0]) then
311 if assigned(curhndo) then curhndo(msg)
312 else if assigned(curhnd) then curhnd(msg)
313 else {raise eXception.Create('}writeln('ServerLoop: No handler for opcode '+IntToStr(Buffer[1]));
314 {INET6...}
315 {Generic}
316 for tp:=1 to pollTop do if PollArr[tp].revents>0 then begin
317 PollHnd[tp].CB(PollArr[tp].rEvents);
318 PollArr[tp].revents:=0;
319 end;
320 end;
321 end;
322 if assigned(onTerminate) then onTerminate;
323 CloseSocket(s_inet);
324 end;
326 procedure SetMsgHandler(OpCode:byte; handler:tMessageHandler);
327 begin assert(hnd[OpCode]=nil); hnd[OpCode]:=handler; end;
328 procedure SetHiMsgHandler(handler:tMessageHandler);
329 begin Hihnd:=handler; end;
331 procedure WatchFD(fd:tHandle; h:tFDEventHandler);
332 var opt: tPollTop;
333 begin
334 if assigned(h) then begin
335 PollHnd[pollTop].CB:=h;
336 PollArr[pollTop].fd:=fd;
337 PollArr[pollTop].events:=POLLERR or POLLHUP or POLLIN or POLLPRI or
338 POLLRDBAND or POLLRDNORM;
339 PollArr[pollTop].revents:=0;
340 //writeln('Add watch ',pollTop,' on ',fd,' to ',IntToHex(qword(@h),8));
341 Inc(PollTop);
342 end else for opt:=0 to high(opt) do if PollArr[opt].fd=fd then begin
343 if (pollTop-1)>opt then begin
344 PollArr[opt]:=PollArr[pollTop-1];
345 PollHnd[opt]:=PollHnd[pollTop-1];
346 end;
347 dec(pollTop);
348 PollArr[pollTop].fd:=-1;
349 PollArr[pollTop].events:=0;
350 PollArr[pollTop].revents:=0;
351 break;
352 end;
353 end;
355 procedure Shedule(timeout{ms}: LongWord; h:tOnTimer);
356 var old:^tSheduled;
357 begin
358 old:=ShedTop;
359 if Assigned(ShedUU) then begin
360 ShedTop:=ShedUU;
361 ShedUU:=ShedUU^.next;
362 end else New(ShedTop);
363 ShedTop^.Left:=timeout;
364 ShedTop^.CB:=h;
365 ShedTop^.Next:=old;
366 end;
368 procedure UnShedule(h:tOnTimer);
369 var cur:^tSheduled;
370 var pcur:^pointer;
371 begin
372 //if ShedTop=nil then AbstractError;
373 pcur:=@ShedTop;
374 cur:=pcur^;
375 while assigned(cur) do begin
376 if 0=CompareByte(cur^.cb,h,sizeof(h)) then begin
377 pcur^:=cur^.next; {unlink from main list}
378 cur^.next:=ShedUU; ShedUU:=cur; {link to unused}
379 cur:=pcur^;
380 end else begin
381 pcur:=@cur^.next;
382 cur:=pcur^;
383 end;
384 end;
385 end;
387 var DoShowOpts:boolean=false;
388 function OptIndex(o:string):word;
389 begin
390 if DoShowOpts then writeln('Option: ',o);
391 result:=paramcount;
392 while result>0 do begin
393 if o=system.paramstr(result) then break;
394 dec(result);
395 end;
396 end;
398 function OptParamCount(o:word):word;
399 var i:word;
400 begin
401 result:=0;
402 if o>0 then for i:=o+1 to paramcount do begin
403 if paramstr(i)[1]<>'-' then inc(result)
404 else break;
405 end;
406 end;
408 var i:byte;
409 BEGIN
410 mNow:=0;
411 umNow:=0;
412 Randomize;
413 fpSignal(SigInt,@SignalHandler);
414 fpSignal(SigTerm,@SignalHandler);
415 for i:=1 to high(hnd) do hnd[i]:=nil;
416 for i:=1 to high(PT) do PT[i]:=nil;
417 PT_opcodes:=[];
418 pollTop:=1; {1 for basic listen}
419 ShedTop:=nil;
420 ShedUU:=nil; {todo: allocate a few to improve paging}
421 fpgettimeofday(@LastShed,nil);
422 if OptIndex('-h')>0 then DoShowOpts:=true;
423 OnTerminate:=nil;
424 END.