Upload Manager mission statement.
[brdnet.git] / ServerLoop.pas
blob9cfa23dd468a0d7546494b8a5831af37fc2589bf
1 UNIT ServerLoop;
3 INTERFACE
4 uses MemStream,NetAddr,UnixType;
6 procedure Main;
8 {#Message handling#}
9 type tSMsg=object
10 Source: ^tNetAddr;
11 Length: {Long}Word;
12 Data: pointer;
13 stream: tMemoryStream;
14 channel: word;
15 end;
16 type tMessageHandler=procedure(msg:tSMsg);
17 procedure SetMsgHandler(OpCode:byte; handler:tMessageHandler);
18 procedure SetHiMsgHandler(handler:tMessageHandler);
20 procedure SendMessage(const data; len:word; const rcpt:tNetAddr );
21 {procedure SendReply(const data; len:word; const rcpt:tSMsg );}
22 procedure SendMessage(const data; len:word; const rcpt:tNetAddr; channel:word );
24 {#Sheduling and watching#}
25 type tFDEventHandler=procedure(ev:Word) of object;
26 type tOnTimer=procedure of object;
27 procedure WatchFD(fd:tHandle; h:tFDEventHandler);
28 procedure Shedule(timeout{ms}: LongWord; h:tOnTimer);
29 procedure UnShedule(h:tOnTimer);
30 {note unshed will fail when called from OnTimer proc}
32 type tObjMessageHandler=procedure(msg:tSMsg) of object;
33 {deliver message from peer to the object}
34 procedure SetMsgHandler(OpCode:byte; from:tNetAddr; handler:tObjMessageHandler);
36 type tTimeVal=UnixType.timeval;
37 var iNow:tTimeVal;
39 IMPLEMENTATION
41 USES SysUtils,Sockets,BaseUnix
42 ,Unix
45 {aim for most simple implementation, since could be extended anytime}
47 var s_inet:tSocket;
49 type tPollTop=0..7;
50 var pollArr: packed array [tPollTop] of tPollFd;
51 type tFdHndDsc=record
52 cb: tFDEventHandler; {proc+object}
53 end;
54 var pollHnd: array [tPollTop] of tFdHndDsc;
55 var pollTop: tPollTop;
57 var hnd: array [1..36] of tMessageHandler;
58 var HiHnd: tMessageHandler;
60 type tSheduled_ptr=^tSheduled; tSheduled=record
61 left:LongWord;
62 cb:tOnTimer;
63 next:tSheduled_ptr;
64 end;
65 var ShedTop: ^tSheduled;
66 var ShedUU: ^tSheduled;
67 var LastShed: UnixType.timeval;
68 var PollTimeout:LongInt;
70 procedure SC(fn:pointer; retval:cint);
71 begin
72 if retval < 0 then begin
73 raise eXception.Create(Format('Socket error %d operation %P',[SocketError,fn]));
74 end;
75 end;
77 procedure s_SetupInet;
78 var bind_addr:tInetSockAddr;
79 var turnon:cint;
80 begin
81 with bind_addr do begin
82 sin_family:=AF_INET;
83 sin_port:=htons(3511);
84 sin_addr.s_addr:=0; {any}
85 s_inet:=fpSocket(sin_family,SOCK_DGRAM,IPPROTO_UDP);
86 SC(@fpSocket,s_inet);
87 turnon:=IP_PMTUDISC_DO;
88 SC(@fpsetsockopt,fpsetsockopt(s_inet, IPPROTO_IP, IP_MTU_DISCOVER, @turnon, sizeof(turnon)));
89 end;
90 SC(@fpBind,fpBind(s_inet,@bind_addr,sizeof(bind_addr)));
91 with PollArr[0] do begin
92 fd:=s_inet;
93 events:=pollIN;
94 revents:=0;
95 end;
96 end;
98 var Terminated:boolean=false;
100 procedure SendMessage(const data; len:word; const rcpt:tSockAddrL );
101 begin
102 {SC(@fpsendto,}fpsendto(s_inet,@data,len,0,@rcpt,sizeof(sockaddr_in)){)};
103 end;
104 procedure SendMessage(const data; len:word; const rcpt:tNetAddr );
105 var sa:tSockAddrL;
106 begin
107 rcpt.ToSocket(sa);
108 SendMessage(data,len,sa);
109 end;
110 procedure SendMessage(const data; len:word; const rcpt:tNetAddr; channel:word );
111 begin
112 SendMessage(data,len,rcpt);
113 {todo: optimization??}
114 end;
116 procedure SignalHandler(sig:cint);CDecl;
117 begin
118 writeln;
119 if terminated then raise eControlC.Create('CtrlC DoubleTap') ;
120 Terminated:=true;
121 writeln('Shutdown requested');
122 end;
124 {index=iphash+opcode}
125 type tPeerTableBucket=record
126 opcode:byte;
127 remote:tNetAddr;
128 handler:tObjMessageHandler;
129 end;
130 var PT:array [0..255] of ^tPeerTableBucket;
131 var PT_opcodes: set of 1..36;
133 function FindPT(opcode:byte; const addr:tNetAddr):Word; { $FFFF=fail}
134 var i,o:word;
135 begin
136 i:=(addr.hash+opcode) mod high(PT); {0..63}
137 for o:=0 to high(PT) do begin
138 result:=(i+o) mod high(PT);
139 if not assigned(PT[result]) then break;
140 if (PT[result]^.opcode=opcode) and (PT[result]^.remote=addr) then exit;
141 end;
142 result:=$FFFF;
143 end;
145 procedure UnSetMsgHandler(const from:tNetAddr; opcode:byte);
146 var i,h:word;
147 begin
148 h:=FindPT(opcode,from);
149 if h=$FFFF then exit;
150 Dispose(PT[h]);
151 PT[h]:=nil;
152 {go reverse exit on null, hash them, match: move to H and stop}
153 i:=h-1;
154 while (i<>h)and assigned(PT[i]) do begin
155 if (PT[i]^.remote.hash+PT[i]^.opcode)=h then begin
156 PT[h]:=PT[i];
157 PT[i]:=nil;
158 break;
159 end;
160 if i=0 then i:=high(PT) else dec(i);
161 end;
162 end;
164 procedure SetMsgHandler(OpCode:byte; from:tNetAddr; handler:tObjMessageHandler);
165 var h,o,i:word;
166 begin
167 UnSetMsgHandler(from,opcode);
168 if handler=nil then exit;
169 h:=(from.hash+opcode) mod high(PT);
170 for o:=0 to high(PT) do begin
171 i:=(h+o) mod high(PT);
172 if not assigned(PT[i]) then break;
173 end;
174 New(PT[i]);
175 PT[i]^.opcode:=OpCode;
176 PT[i]^.remote:=from;
177 PT[i]^.handler:=handler;
178 Include(PT_opcodes,opcode);
179 end;
181 {do not waste stack on statics}
182 var EventsCount:integer;
183 var Buffer:array [1..1024] of byte;
184 var pkLen:LongWord;
185 var From:tSockAddrL; {use larger struct so everything fits}
186 var FromLen:LongWord;
187 var curhnd:tMessageHandler;
188 var curhndo:tObjMessageHandler;
189 var Msg:tSMsg;
190 var tp:tPollTop;
192 procedure DoSock(var p:tPollFD);
193 var FromG:tNetAddr;
194 var ptidx:byte;
195 begin
196 curhnd:=nil;
197 curhndo:=nil;
198 if (p.revents and pollIN)=0 then exit;
199 FromLen:=sizeof(From);
200 pkLen:=fprecvfrom(p.FD,@Buffer,sizeof(Buffer),0,@from,@fromlen);
201 SC(@fprecvfrom,pkLen);
202 p.revents:=0;
203 FromG.FromSocket(from);
204 Msg.Source:=@FromG; {!thread}
205 Msg.Length:=pkLen;
206 Msg.Data:=@Buffer; {!thread}
207 Msg.stream.Init(@Buffer,pkLen,sizeof(Buffer));
208 Msg.channel:=0; {!multisocket}
209 if Buffer[1]>128 then curhnd:=HiHnd else if Buffer[1]<=high(hnd) then curhnd:=hnd[Buffer[1]];
210 if Buffer[1] in PT_opcodes then ptidx:=FindPT(Buffer[1],FromG) else ptidx:=0;
211 if ptidx>0 then curhndo:=PT[ptidx]^.handler;
212 end;
214 procedure ShedRun;
215 var cur:^tSheduled;
216 var pcur:^pointer;
217 var now:UnixType.timeval absolute iNow;
218 var delta:LongWord;
219 var olTop:^tSheduled;
220 begin
221 {Sheduling}
222 olTop:=ShedTop;
223 pcur:=@olTop;
224 cur:=pcur^;
225 ShedTop:=nil; {unlink the current shed list}
226 fpgettimeofday(@Now,nil);
227 delta:=(Now.tv_sec-LastShed.tv_sec);
228 if delta>6 then delta:=5000 else delta:=(delta*1000)+((Now.tv_usec-LastShed.tv_usec) div 1000);
229 LastShed:=Now;
230 //writeln('DeltaTime: ',delta);
231 while assigned(cur) do begin
232 if (cur^.left<=delta)or(cur^.left=0) then begin
233 cur^.cb;
234 pcur^:=cur^.next;
235 cur^.next:=ShedUU;
236 ShedUU:=cur;
237 cur:=pcur^;
238 end else begin
239 DEC(cur^.left,delta);
240 //writeln('Left: ',cur^.left);
241 if pollTimeOut>cur^.left then PollTimeOut:=cur^.left;
242 pcur:=@cur^.next;
243 cur:=cur^.next;
244 end;
245 end;
246 pcur^:=ShedTop; {append newly added tasks to end of untriggererd list}
247 ShedTop:=olTop; {link in the untriggered tasks}
248 cur:=olTop;
249 while assigned(cur) do begin
250 if cur^.left<PollTimeout then PollTimeout:=cur^.left;
251 cur:=cur^.next;
252 end;
253 if pollTimeout=0 then pollTimeOut:=1;
254 end;
256 procedure Main;
257 begin
258 s_setupInet;
259 while not terminated do begin
260 PollTimeout:=5000;
261 ShedRun;
262 EventsCount:=fpPoll(@PollArr[0],PollTop,PollTimeout);
263 if (eventscount=-1)and terminated then break;
264 if eventscount=-1 then break; {fixme: print error}
265 if eventscount=0 then continue else begin
266 {INET socket}
267 DoSock(PollArr[0]);
268 if assigned(curhndo) then curhndo(msg)
269 else if assigned(curhnd) then curhnd(msg)
270 else raise eXception.Create('No handler for opcode '+IntToStr(Buffer[1]));
271 {INET6...}
272 {Generic}
273 for tp:=1 to pollTop do if PollArr[tp].revents>0 then begin
274 PollHnd[tp].CB(PollArr[tp].rEvents);
275 PollArr[tp].revents:=0;
276 end;
277 end;
278 end;
279 write('Loop broken [');
280 CloseSocket(s_inet);
281 writeln(']');
282 end;
284 procedure SetMsgHandler(OpCode:byte; handler:tMessageHandler);
285 begin assert(hnd[OpCode]=nil); hnd[OpCode]:=handler; end;
286 procedure SetHiMsgHandler(handler:tMessageHandler);
287 begin Hihnd:=handler; end;
289 procedure WatchFD(fd:tHandle; h:tFDEventHandler);
290 var opt: tPollTop;
291 begin
292 if assigned(h) then begin
293 PollHnd[pollTop].CB:=h;
294 PollArr[pollTop].fd:=fd;
295 PollArr[pollTop].events:=POLLERR or POLLHUP or POLLIN or POLLPRI or
296 POLLRDBAND or POLLRDNORM;
297 PollArr[pollTop].revents:=0;
298 //writeln('Add watch ',pollTop,' on ',fd,' to ',IntToHex(qword(@h),8));
299 Inc(PollTop);
300 end else for opt:=0 to high(opt) do if PollArr[opt].fd=fd then begin
301 if (pollTop-1)>opt then begin
302 PollArr[opt]:=PollArr[pollTop-1];
303 PollHnd[opt]:=PollHnd[pollTop-1];
304 end;
305 dec(pollTop);
306 PollArr[pollTop].fd:=-1;
307 PollArr[pollTop].events:=0;
308 PollArr[pollTop].revents:=0;
309 break;
310 end;
311 end;
313 procedure Shedule(timeout{ms}: LongWord; h:tOnTimer);
314 var old:^tSheduled;
315 begin
316 old:=ShedTop;
317 if Assigned(ShedUU) then begin
318 ShedTop:=ShedUU;
319 ShedUU:=ShedUU^.next;
320 end else New(ShedTop);
321 ShedTop^.Left:=timeout;
322 ShedTop^.CB:=h;
323 ShedTop^.Next:=old;
324 end;
326 procedure UnShedule(h:tOnTimer);
327 var cur:^tSheduled;
328 var pcur:^pointer;
329 begin
330 pcur:=@ShedTop;
331 cur:=pcur^;
332 while assigned(cur) do begin
333 if cur^.cb=h then begin
334 pcur^:=cur^.next; {unlink from main list}
335 cur^.next:=ShedUU; ShedUU:=cur; {link to unused}
336 break;
337 end else begin
338 pcur:=@cur^.next;
339 cur:=pcur^;
340 end;
341 end;
342 end;
344 var i:byte;
345 BEGIN
346 Randomize;
347 fpSignal(SigInt,@SignalHandler);
348 fpSignal(SigTerm,@SignalHandler);
349 for i:=1 to high(hnd) do hnd[i]:=nil;
350 for i:=1 to high(PT) do PT[i]:=nil;
351 PT_opcodes:=[];
352 pollTop:=1; {1 for basic listen}
353 ShedTop:=nil;
354 ShedUU:=nil; {todo: allocate a few to improve paging}
355 fpgettimeofday(@LastShed,nil);
356 END.