Clean timing in Download.
[brdnet.git] / ServerLoop.pas
blob28b97e102316a181457411dc10c9485c6420a913
1 UNIT ServerLoop;
3 INTERFACE
4 uses MemStream,NetAddr,UnixType;
6 procedure Main;
8 {#Message handling#}
9 type tSMsg=object
10 Source: ^tNetAddr;
11 Length: {Long}Word;
12 Data: pointer;
13 stream: tMemoryStream;
14 channel: word;
15 end;
16 type tMessageHandler=procedure(msg:tSMsg);
17 procedure SetMsgHandler(OpCode:byte; handler:tMessageHandler);
18 procedure SetHiMsgHandler(handler:tMessageHandler);
20 procedure SendMessage(const data; len:word; const rcpt:tNetAddr );
21 {procedure SendReply(const data; len:word; const rcpt:tSMsg );}
22 procedure SendMessage(const data; len:word; const rcpt:tNetAddr; channel:word );
24 {#Sheduling and watching#}
25 type tFDEventHandler=procedure(ev:Word) of object;
26 type tOnTimer=procedure of object;
27 procedure WatchFD(fd:tHandle; h:tFDEventHandler);
28 procedure Shedule(timeout{ms}: LongWord; h:tOnTimer);
29 procedure UnShedule(h:tOnTimer);
30 {note unshed will fail when called from OnTimer proc}
32 type tObjMessageHandler=procedure(msg:tSMsg) of object;
33 {deliver message from peer to the object}
34 procedure SetMsgHandler(OpCode:byte; from:tNetAddr; handler:tObjMessageHandler); overload;
35 function IsMsgHandled(OpCode:byte; from:tNetAddr):boolean;
37 function OptIndex(o:string):word;
38 function OptParamCount(o:word):word;
40 type tTimeVal=UnixType.timeval;
41 type tMTime=DWORD;
42 var iNow:tTimeVal;
43 var mNow:tMTime; { miliseconds since start }
44 {overflows in hunderd hours }
46 IMPLEMENTATION
48 USES SysUtils,Sockets,BaseUnix
49 ,Unix
52 {aim for most simple implementation, since could be extended anytime}
54 var s_inet:tSocket;
56 type tPollTop=0..7;
57 var pollArr: packed array [tPollTop] of tPollFd;
58 type tFdHndDsc=record
59 cb: tFDEventHandler; {proc+object}
60 end;
61 var pollHnd: array [tPollTop] of tFdHndDsc;
62 var pollTop: tPollTop;
64 var hnd: array [1..36] of tMessageHandler;
65 var HiHnd: tMessageHandler;
67 type tSheduled_ptr=^tSheduled; tSheduled=record
68 left:LongWord;
69 cb:tOnTimer;
70 next:tSheduled_ptr;
71 end;
72 var ShedTop: ^tSheduled;
73 var ShedUU: ^tSheduled;
74 var LastShed: UnixType.timeval;
75 var PollTimeout:LongInt;
77 procedure SC(fn:pointer; retval:cint);
78 begin
79 if retval < 0 then begin
80 raise eXception.Create(Format('Socket error %d operation %P',[SocketError,fn]));
81 end;
82 end;
84 procedure s_SetupInet;
85 var bind_addr:tInetSockAddr;
86 var turnon:cint;
87 var oi:word;
88 begin
89 with bind_addr do begin
90 sin_family:=AF_INET;
91 oi:=OptIndex('-port');
92 if oi=0 then sin_port:=htons(3511)
93 else begin
94 assert(OptParamCount(oi)=1);
95 sin_port:=htons(StrToInt(paramstr(oi+1)));
96 end;
97 sin_addr.s_addr:=0; {any}
98 s_inet:=fpSocket(sin_family,SOCK_DGRAM,IPPROTO_UDP);
99 SC(@fpSocket,s_inet);
100 turnon:=IP_PMTUDISC_DO;
101 SC(@fpsetsockopt,fpsetsockopt(s_inet, IPPROTO_IP, IP_MTU_DISCOVER, @turnon, sizeof(turnon)));
102 end;
103 SC(@fpBind,fpBind(s_inet,@bind_addr,sizeof(bind_addr)));
104 with PollArr[0] do begin
105 fd:=s_inet;
106 events:=pollIN;
107 revents:=0;
108 end;
109 end;
111 var Terminated:boolean=false;
113 procedure SendMessage(const data; len:word; const rcpt:tSockAddrL );
114 begin
115 {SC(@fpsendto,}fpsendto(s_inet,@data,len,0,@rcpt,sizeof(sockaddr_in)){)};
116 end;
117 procedure SendMessage(const data; len:word; const rcpt:tNetAddr );
118 var sa:tSockAddrL;
119 begin
120 rcpt.ToSocket(sa);
121 SendMessage(data,len,sa);
122 end;
123 procedure SendMessage(const data; len:word; const rcpt:tNetAddr; channel:word );
124 begin
125 SendMessage(data,len,rcpt);
126 {todo: optimization??}
127 end;
129 procedure SignalHandler(sig:cint);CDecl;
130 begin
131 writeln;
132 if terminated then raise eControlC.Create('CtrlC DoubleTap') ;
133 Terminated:=true;
134 writeln('Shutdown requested');
135 end;
137 {index=iphash+opcode}
138 type tPeerTableBucket=record
139 opcode:byte;
140 remote:tNetAddr;
141 handler:tObjMessageHandler;
142 end;
143 var PT:array [0..255] of ^tPeerTableBucket;
144 var PT_opcodes: set of 1..high(hnd);
146 function FindPT(opcode:byte; addr:tNetAddr):Word; { $FFFF=fail}
147 var i,o:word;
148 begin
149 i:=(addr.hash+opcode) mod high(PT); {0..63}
150 for o:=0 to high(PT) do begin
151 result:=(i+o) mod high(PT);
152 if not assigned(PT[result]) then break;
153 if (PT[result]^.opcode=opcode) and (PT[result]^.remote=addr) then exit;
154 end;
155 result:=$FFFF;
156 end;
158 function IsMsgHandled(OpCode:byte; from:tNetAddr):boolean;
159 begin result:=FindPT(opcode,from)<>$FFFF end;
161 procedure UnSetMsgHandler(const from:tNetAddr; opcode:byte);
162 var i,h:word;
163 begin
164 h:=FindPT(opcode,from);
165 if h=$FFFF then exit;
166 Dispose(PT[h]);
167 PT[h]:=nil;
168 {go reverse exit on null, hash them, match: move to H and stop}
169 if h=0 then i:=high(PT) else i:=h-1;
170 while (i<>h)and assigned(PT[i]) do begin
171 if (PT[i]^.remote.hash+PT[i]^.opcode)=h then begin
172 PT[h]:=PT[i];
173 PT[i]:=nil;
174 break;
175 end;
176 if i=0 then i:=high(PT) else dec(i);
177 end;
178 end;
180 procedure SetMsgHandler(OpCode:byte; from:tNetAddr; handler:tObjMessageHandler);
181 var h,o,i:word;
182 begin
183 UnSetMsgHandler(from,opcode);
184 if handler=nil then exit;
185 h:=(from.hash+opcode) mod high(PT);
186 for o:=0 to high(PT) do begin
187 i:=(h+o) mod high(PT);
188 if not assigned(PT[i]) then break;
189 end;
190 New(PT[i]);
191 PT[i]^.opcode:=OpCode;
192 PT[i]^.remote:=from;
193 PT[i]^.handler:=handler;
194 if opcode<=high(hnd) then Include(PT_opcodes,opcode);
195 end;
197 {do not waste stack on statics}
198 var EventsCount:integer;
199 var Buffer:array [1..4096] of byte;
200 var pkLen:LongWord;
201 var From:tSockAddrL; {use larger struct so everything fits}
202 var FromLen:LongWord;
203 var FromG:tNetAddr;
204 var curhnd:tMessageHandler;
205 var curhndo:tObjMessageHandler;
206 var Msg:tSMsg;
207 var tp:tPollTop;
209 function DoSock(var p:tPollFD):boolean;
210 var ptidx:word;
211 begin
212 curhnd:=nil;
213 curhndo:=nil;
214 result:=false;
215 ptidx:=$FFFF;
216 if (p.revents and pollIN)=0 then exit else result:=true;
217 FromLen:=sizeof(From);
218 pkLen:=fprecvfrom(p.FD,@Buffer,sizeof(Buffer),0,@from,@fromlen);
219 SC(@fprecvfrom,pkLen);
220 p.revents:=0;
221 FromG.FromSocket(from);
222 Msg.Source:=@FromG; {!thread}
223 Msg.Length:=pkLen;
224 Msg.Data:=@Buffer; {!thread}
225 Msg.stream.Init(@Buffer,pkLen,sizeof(Buffer));
226 Msg.channel:=0; {!multisocket}
227 if Buffer[1]>=128 then curhnd:=HiHnd else if Buffer[1]<=high(hnd) then curhnd:=hnd[Buffer[1]];
228 if (Buffer[1]>high(hnd))or(Buffer[1] in PT_opcodes) then begin
229 ptidx:=FindPT(Buffer[1],FromG);
230 if ptidx<$FFFF then curhndo:=PT[ptidx]^.handler;
231 end;
232 end;
234 procedure ShedRun;
235 var cur:^tSheduled;
236 var pcur:^pointer;
237 var now:UnixType.timeval{ absolute iNow};
238 var delta:LongWord;
239 var tasks:word;
240 begin
241 {Sheduling}
242 {gmagic with delta-time, increment mNow, ...}
243 fpgettimeofday(@Now,nil);
244 delta:=(Now.tv_sec-LastShed.tv_sec);
245 if delta>6 then delta:=5000 else delta:=(delta*1000)+((Now.tv_usec-LastShed.tv_usec) div 1000);
246 LastShed:=Now;
247 mNow:=mNow+Delta;
248 //writeln('DeltaTime: ',delta);
249 {first tick all tasks}
250 tasks:=0;
251 cur:=ShedTop;
252 while assigned(cur) do begin
253 if cur^.left<=delta then cur^.left:=0 else begin
254 dec(cur^.left,delta);
255 {also set next wake time}
256 if cur^.left<PollTimeout then PollTimeout:=cur^.left;
257 end;
258 {count tasks here}
259 inc(tasks);
260 cur:=cur^.next;
261 end;
262 {correct floating-point glitch}
263 if pollTimeout=0 then pollTimeOut:=1;
264 {run first runnable task}
265 pcur:=@ShedTop;
266 cur:=pcur^;
267 while assigned(cur) do begin
268 if cur^.left=0 then begin
269 {unlink}
270 pcur^:=cur^.next;
271 {link to unused}
272 cur^.next:=ShedUU;
273 ShedUU:=cur;
274 {call}
275 cur^.cb;
276 {do rest later}
277 pollTimeout:=0;
278 break;
279 end;
280 pcur:=@cur^.next;
281 cur:=cur^.next;
282 end;
283 if delta >4990 then writeln('ServerLoop: tasks=',tasks);
284 end;
286 procedure Main;
287 begin
288 s_setupInet;
289 while not terminated do begin
290 PollTimeout:=5000;
291 ShedRun;
292 EventsCount:=fpPoll(@PollArr[0],PollTop,PollTimeout);
293 ShedRun;
294 if (eventscount=-1)and terminated then break;
295 if eventscount=-1 then break; {fixme: print error}
296 if eventscount=0 then continue else begin
297 {INET socket}
298 if DoSock(PollArr[0]) then
299 if assigned(curhndo) then curhndo(msg)
300 else if assigned(curhnd) then curhnd(msg)
301 else raise eXception.Create('No handler for opcode '+IntToStr(Buffer[1]));
302 {INET6...}
303 {Generic}
304 for tp:=1 to pollTop do if PollArr[tp].revents>0 then begin
305 PollHnd[tp].CB(PollArr[tp].rEvents);
306 PollArr[tp].revents:=0;
307 end;
308 end;
309 end;
310 write('Loop broken [');
311 CloseSocket(s_inet);
312 writeln(']');
313 end;
315 procedure SetMsgHandler(OpCode:byte; handler:tMessageHandler);
316 begin assert(hnd[OpCode]=nil); hnd[OpCode]:=handler; end;
317 procedure SetHiMsgHandler(handler:tMessageHandler);
318 begin Hihnd:=handler; end;
320 procedure WatchFD(fd:tHandle; h:tFDEventHandler);
321 var opt: tPollTop;
322 begin
323 if assigned(h) then begin
324 PollHnd[pollTop].CB:=h;
325 PollArr[pollTop].fd:=fd;
326 PollArr[pollTop].events:=POLLERR or POLLHUP or POLLIN or POLLPRI or
327 POLLRDBAND or POLLRDNORM;
328 PollArr[pollTop].revents:=0;
329 //writeln('Add watch ',pollTop,' on ',fd,' to ',IntToHex(qword(@h),8));
330 Inc(PollTop);
331 end else for opt:=0 to high(opt) do if PollArr[opt].fd=fd then begin
332 if (pollTop-1)>opt then begin
333 PollArr[opt]:=PollArr[pollTop-1];
334 PollHnd[opt]:=PollHnd[pollTop-1];
335 end;
336 dec(pollTop);
337 PollArr[pollTop].fd:=-1;
338 PollArr[pollTop].events:=0;
339 PollArr[pollTop].revents:=0;
340 break;
341 end;
342 end;
344 procedure Shedule(timeout{ms}: LongWord; h:tOnTimer);
345 var old:^tSheduled;
346 begin
347 old:=ShedTop;
348 if Assigned(ShedUU) then begin
349 ShedTop:=ShedUU;
350 ShedUU:=ShedUU^.next;
351 end else New(ShedTop);
352 ShedTop^.Left:=timeout;
353 ShedTop^.CB:=h;
354 ShedTop^.Next:=old;
355 end;
357 procedure UnShedule(h:tOnTimer);
358 var cur:^tSheduled;
359 var pcur:^pointer;
360 begin
361 //if ShedTop=nil then AbstractError;
362 pcur:=@ShedTop;
363 cur:=pcur^;
364 while assigned(cur) do begin
365 if 0=CompareByte(cur^.cb,h,sizeof(h)) then begin
366 pcur^:=cur^.next; {unlink from main list}
367 cur^.next:=ShedUU; ShedUU:=cur; {link to unused}
368 cur:=pcur^;
369 end else begin
370 pcur:=@cur^.next;
371 cur:=pcur^;
372 end;
373 end;
374 end;
376 function OptIndex(o:string):word;
377 begin
378 result:=paramcount;
379 while result>0 do begin
380 if o=system.paramstr(result) then break;
381 dec(result);
382 end;
383 end;
385 function OptParamCount(o:word):word;
386 var i:word;
387 begin
388 result:=0;
389 if o>0 then for i:=o+1 to paramcount do begin
390 if paramstr(i)[1]<>'-' then inc(result)
391 else break;
392 end;
393 end;
395 var i:byte;
396 BEGIN
397 mNow:=0;
398 Randomize;
399 fpSignal(SigInt,@SignalHandler);
400 fpSignal(SigTerm,@SignalHandler);
401 for i:=1 to high(hnd) do hnd[i]:=nil;
402 for i:=1 to high(PT) do PT[i]:=nil;
403 PT_opcodes:=[];
404 pollTop:=1; {1 for basic listen}
405 ShedTop:=nil;
406 ShedUU:=nil; {todo: allocate a few to improve paging}
407 fpgettimeofday(@LastShed,nil);
408 END.