Some doc path fixes from Anders
[pkg-k5-afs_openafs.git] / src / rx / rx_lwp.c
blobfa4ee6dc48080aa80d5229b818eab3b7640bb3ce
1 /*
2 * Copyright 2000, International Business Machines Corporation and others.
3 * All Rights Reserved.
5 * This software has been released under the terms of the IBM Public
6 * License. For details, see the LICENSE file in the top-level source
7 * directory or online at http://www.openafs.org/dl/license10.html
8 */
10 /* rx_user.c contains routines specific to the user space UNIX implementation of rx */
12 /* This controls the size of an fd_set; it must be defined early before
13 * the system headers define that type and the macros that operate on it.
14 * Its value should be as large as the maximum file descriptor limit we
15 * are likely to run into on any platform. Right now, that is 65536
16 * which is the default hard fd limit on Solaris 9 */
17 #ifndef _WIN32
18 #define FD_SETSIZE 65536
19 #endif
21 #include <afsconfig.h>
22 #include <afs/param.h>
24 #include <roken.h>
26 #ifdef HAVE_SYS_FILE_H
27 # include <sys/file.h>
28 #endif
30 #ifndef AFS_PTHREAD_ENV
32 #include <lwp.h>
34 #include "rx.h"
35 #include "rx_atomic.h"
36 #include "rx_globals.h"
37 #include "rx_internal.h"
38 #include "rx_stats.h"
39 #ifdef AFS_NT40_ENV
40 #include "rx_xmit_nt.h"
41 #endif
43 #define MAXTHREADNAMELENGTH 64
45 int debugSelectFailure; /* # of times select failed */
48 * Sleep on the unique wait channel provided.
50 void
51 rxi_Sleep(void *addr)
53 LWP_WaitProcess(addr);
57 * Wakeup any threads on the channel provided.
58 * They may be woken up spuriously, and must check any conditions.
60 void
61 rxi_Wakeup(void *addr)
63 LWP_NoYieldSignal(addr);
66 PROCESS rx_listenerPid = 0; /* LWP process id of socket listener process */
67 static void* rx_ListenerProc(void *dummy);
70 * Delay the current thread the specified number of seconds.
72 void
73 rxi_Delay(int sec)
75 IOMGR_Sleep(sec);
78 static int quitListening = 0;
80 /* This routine will kill the listener thread, if it exists. */
81 void
82 rxi_StopListener(void)
84 quitListening = 1;
85 rxi_ReScheduleEvents();
88 /* This routine will get called by the event package whenever a new,
89 earlier than others, event is posted. If the Listener process
90 is blocked in selects, this will unblock it. It also can be called
91 to force a new trip through the rxi_Listener select loop when the set
92 of file descriptors it should be listening to changes... */
93 void
94 rxi_ReScheduleEvents(void)
96 if (rx_listenerPid)
97 IOMGR_Cancel(rx_listenerPid);
100 void
101 rxi_InitializeThreadSupport(void)
103 PROCESS junk;
105 LWP_InitializeProcessSupport(LWP_NORMAL_PRIORITY, &junk);
106 IOMGR_Initialize();
107 FD_ZERO(&rx_selectMask);
110 void
111 rxi_StartServerProc(void *(*proc) (void *), int stacksize)
113 PROCESS scratchPid;
114 static int number = 0;
115 char name[32];
117 sprintf(name, "srv_%d", ++number);
118 LWP_CreateProcess(proc, stacksize, RX_PROCESS_PRIORITY, NULL,
119 "rx_ServerProc", &scratchPid);
120 if (registerProgram)
121 (*registerProgram) (scratchPid, name);
124 void
125 rxi_StartListener(void)
127 /* Priority of listener should be high, so it can keep conns alive */
128 #define RX_LIST_STACK 24000
129 LWP_CreateProcess(rx_ListenerProc, RX_LIST_STACK, LWP_MAX_PRIORITY,
130 NULL, "rx_Listener", &rx_listenerPid);
131 if (registerProgram)
132 (*registerProgram) (rx_listenerPid, "listener");
135 /* The main loop which listens to the net for datagrams, and handles timeouts
136 and retransmissions, etc. It also is responsible for scheduling the
137 execution of pending events (in conjunction with event.c).
139 Note interaction of nextPollTime and lastPollWorked. The idea is that if rx is not
140 keeping up with the incoming stream of packets (because there are threads that
141 are interfering with its running sufficiently often), rx does a polling select
142 before doing a real IOMGR_Select system call. Doing a real select means that
143 we don't have to let other processes run before processing more packets.
145 So, our algorithm is that if the last poll on the file descriptor found useful data, or
146 we're at the time nextPollTime (which is advanced so that it occurs every 3 or 4 seconds),
147 then we try the polling select before the IOMGR_Select. If we eventually catch up
148 (which we can tell by the polling select returning no input packets ready), then we
149 don't do a polling select again until several seconds later (via nextPollTime mechanism).
152 static void
153 rxi_ListenerProc(fd_set * rfds, int *tnop, struct rx_call **newcallp)
155 afs_uint32 host;
156 u_short port;
157 struct rx_packet *p = (struct rx_packet *)0;
158 osi_socket socket;
159 struct clock cv;
160 afs_int32 nextPollTime; /* time to next poll FD before sleeping */
161 int lastPollWorked, doingPoll; /* true iff last poll was useful */
162 struct timeval tv, *tvp;
163 int code;
164 #ifdef AFS_NT40_ENV
165 int i;
166 #endif
167 PROCESS pid;
168 char name[MAXTHREADNAMELENGTH] = "srv_0";
170 clock_NewTime();
171 lastPollWorked = 0;
172 nextPollTime = 0;
173 code = LWP_CurrentProcess(&pid);
174 if (code) {
175 osi_Panic("rxi_Listener: Can't get my pid.\n");
177 rx_listenerPid = pid;
178 if (swapNameProgram)
179 (*swapNameProgram) (pid, "listener", &name[0]);
181 for (;;) {
182 /* See if a check for additional packets was issued */
183 rx_CheckPackets();
185 /* Grab a new packet only if necessary (otherwise re-use the old one) */
186 if (p) {
187 rxi_RestoreDataBufs(p);
188 } else {
189 if (!(p = rxi_AllocPacket(RX_PACKET_CLASS_RECEIVE)))
190 osi_Panic("rxi_ListenerProc: no packets!"); /* Shouldn't happen */
192 /* Wait for the next event time or a packet to arrive. */
193 /* event_RaiseEvents schedules any events whose time has come and
194 * then atomically computes the time to the next event, guaranteeing
195 * that this is positive. If there is no next event, it returns 0 */
196 clock_NewTime();
197 if (!rxevent_RaiseEvents(&cv))
198 tvp = NULL;
199 else {
200 /* It's important to copy cv to tv, because the 4.3 documentation
201 * for select threatens that *tv may be updated after a select, in
202 * future editions of the system, to indicate how much of the time
203 * period has elapsed. So we shouldn't rely on tv not being altered. */
204 tv.tv_sec = cv.sec; /* Time to next event */
205 tv.tv_usec = cv.usec;
206 tvp = &tv;
208 if (rx_stats_active)
209 rx_atomic_inc(&rx_stats.selects);
211 *rfds = rx_selectMask;
213 if (lastPollWorked || nextPollTime < clock_Sec()) {
214 /* we're catching up, or haven't tried to for a few seconds */
215 doingPoll = 1;
216 nextPollTime = clock_Sec() + 4; /* try again in 4 seconds no matter what */
217 tv.tv_sec = tv.tv_usec = 0; /* make sure we poll */
218 tvp = &tv;
219 code = select((int)(rx_maxSocketNumber + 1), rfds, 0, 0, tvp);
220 } else {
221 doingPoll = 0;
222 code = IOMGR_Select((int)(rx_maxSocketNumber + 1), rfds, 0, 0, tvp);
224 lastPollWorked = 0; /* default is that it didn't find anything */
226 if (quitListening) {
227 quitListening = 0;
228 LWP_DestroyProcess(pid);
231 switch (code) {
232 case 0:
233 /* Timer interrupt:
234 * If it was a timer interrupt then we can assume that
235 * the time has advanced by roughly the value of the
236 * previous timeout, and that there is now at least
237 * one pending event.
239 clock_NewTime();
240 break;
241 case -1:
242 /* select or IOMGR_Select returned failure */
243 debugSelectFailure++; /* update debugging counter */
244 clock_NewTime();
245 break;
246 case -2:
247 /* IOMGR_Cancel:
248 * IOMGR_Cancel is invoked whenever a new event is
249 * posted that is earlier than any existing events.
250 * So we re-evaluate the time, and then go back to
251 * reschedule events
253 clock_NewTime();
254 break;
256 default:
257 /* Packets have arrived, presumably:
258 * If it wasn't a timer interrupt, then no event should have
259 * timed out yet (well some event may have, but only just...), so
260 * we don't bother looking to see if any have timed out, but just
261 * go directly to reading the data packets
263 clock_NewTime();
264 if (doingPoll)
265 lastPollWorked = 1;
266 #ifdef AFS_NT40_ENV
267 for (i = 0; p && i < rfds->fd_count; i++) {
268 socket = rfds->fd_array[i];
269 if (rxi_ReadPacket(socket, p, &host, &port)) {
270 *newcallp = NULL;
271 p = rxi_ReceivePacket(p, socket, host, port, tnop,
272 newcallp);
273 if (newcallp && *newcallp) {
274 if (p) {
275 rxi_FreePacket(p);
277 if (swapNameProgram) {
278 (*swapNameProgram) (rx_listenerPid, name, 0);
279 rx_listenerPid = 0;
281 return;
285 #else
286 for (socket = rx_minSocketNumber;
287 p && socket <= rx_maxSocketNumber; socket++) {
288 if (!FD_ISSET(socket, rfds))
289 continue;
290 if (rxi_ReadPacket(socket, p, &host, &port)) {
291 p = rxi_ReceivePacket(p, socket, host, port, tnop,
292 newcallp);
293 if (newcallp && *newcallp) {
294 if (p) {
295 rxi_FreePacket(p);
297 if (swapNameProgram) {
298 (*swapNameProgram) (rx_listenerPid, name, 0);
299 rx_listenerPid = 0;
301 return;
305 #endif
306 break;
309 /* NOTREACHED */
312 /* This is the listener process request loop. The listener process loop
313 * becomes a server thread when rxi_ListenerProc returns, and stays
314 * server thread until rxi_ServerProc returns. */
315 static void *
316 rx_ListenerProc(void *dummy)
318 int threadID;
319 osi_socket sock;
320 struct rx_call *newcall;
321 fd_set *rfds;
323 if (!(rfds = IOMGR_AllocFDSet())) {
324 osi_Panic("rx_ListenerProc: no fd_sets!\n");
327 while (1) {
328 newcall = NULL;
329 threadID = -1;
330 rxi_ListenerProc(rfds, &threadID, &newcall);
331 /* osi_Assert(threadID != -1); */
332 /* osi_Assert(newcall != NULL); */
333 sock = OSI_NULLSOCKET;
334 rxi_ServerProc(threadID, newcall, &sock);
335 /* osi_Assert(sock != OSI_NULLSOCKET); */
337 /* not reached */
338 return NULL;
341 /* This is the server process request loop. The server process loop
342 * becomes a listener thread when rxi_ServerProc returns, and stays
343 * listener thread until rxi_ListenerProc returns. */
344 void *
345 rx_ServerProc(void * unused)
347 osi_socket sock;
348 int threadID;
349 struct rx_call *newcall = NULL;
350 fd_set *rfds;
352 if (!(rfds = IOMGR_AllocFDSet())) {
353 osi_Panic("rxi_ListenerProc: no fd_sets!\n");
356 rxi_MorePackets(rx_maxReceiveWindow + 2); /* alloc more packets */
357 rxi_dataQuota += rx_initSendWindow; /* Reserve some pkts for hard times */
358 /* threadID is used for making decisions in GetCall. Get it by bumping
359 * number of threads handling incoming calls */
360 threadID = rxi_availProcs++;
362 while (1) {
363 sock = OSI_NULLSOCKET;
364 rxi_ServerProc(threadID, newcall, &sock);
365 /* osi_Assert(sock != OSI_NULLSOCKET); */
366 newcall = NULL;
367 rxi_ListenerProc(rfds, &threadID, &newcall);
368 /* osi_Assert(threadID != -1); */
369 /* osi_Assert(newcall != NULL); */
371 /* not reached */
372 return NULL;
376 * Called from GetUDPSocket.
377 * Called from a single thread at startup.
378 * Returns 0 on success; -1 on failure.
381 rxi_Listen(osi_socket sock)
383 #ifndef AFS_NT40_ENV
385 * Put the socket into non-blocking mode so that rx_Listener
386 * can do a polling read before entering select
388 if (fcntl(sock, F_SETFL, FNDELAY) == -1) {
389 perror("fcntl");
390 (osi_Msg "rxi_Listen: unable to set non-blocking mode on socket\n");
391 return -1;
394 if (sock > FD_SETSIZE - 1) {
395 (osi_Msg "rxi_Listen: socket descriptor > (FD_SETSIZE-1) = %d\n",
396 FD_SETSIZE - 1);
397 return -1;
399 #endif
401 FD_SET(sock, &rx_selectMask);
402 if (sock > rx_maxSocketNumber)
403 rx_maxSocketNumber = sock;
404 if (sock < rx_minSocketNumber)
405 rx_minSocketNumber = sock;
406 return 0;
410 * Recvmsg
413 rxi_Recvmsg(osi_socket socket, struct msghdr *msg_p, int flags)
415 int code;
416 code = recvmsg(socket, msg_p, flags);
418 #ifdef AFS_RXERRQ_ENV
419 if (code < 0) {
420 while((rxi_HandleSocketError(socket)) > 0)
423 #endif
425 return code;
429 * Simulate a blocking sendmsg on the non-blocking socket.
430 * It's non blocking because it was set that way for recvmsg.
433 rxi_Sendmsg(osi_socket socket, struct msghdr *msg_p, int flags)
435 fd_set *sfds = (fd_set *) 0;
436 while (sendmsg(socket, msg_p, flags) == -1) {
437 int err;
439 #ifdef AFS_NT40_ENV
440 err = WSAGetLastError();
441 #else
442 err = errno;
443 #endif
445 if (rx_stats_active)
446 rx_atomic_inc(&rx_stats.sendSelects);
448 if (!sfds) {
449 if (!(sfds = IOMGR_AllocFDSet())) {
450 (osi_Msg "rx failed to alloc fd_set: ");
451 perror("rx_sendmsg");
452 return -1;
454 FD_SET(socket, sfds);
456 #ifdef AFS_RXERRQ_ENV
457 while((rxi_HandleSocketError(socket)) > 0)
459 #endif
460 #ifdef AFS_NT40_ENV
461 if (err)
462 #elif defined(AFS_LINUX22_ENV)
463 /* linux unfortunately returns ECONNREFUSED if the target port
464 * is no longer in use */
465 /* and EAGAIN if a UDP checksum is incorrect */
466 if (err != EWOULDBLOCK && err != ENOBUFS && err != ECONNREFUSED
467 && err != EAGAIN)
468 #else
469 if (err != EWOULDBLOCK && err != ENOBUFS)
470 #endif
472 (osi_Msg "rx failed to send packet: ");
473 perror("rx_sendmsg");
474 if (err > 0)
475 return -err;
476 return -1;
478 while ((err = select(
479 #ifdef AFS_NT40_ENV
481 #else
482 socket + 1,
483 #endif
484 0, sfds, 0, 0)) != 1) {
485 if (err >= 0 || errno != EINTR)
486 osi_Panic("rxi_sendmsg: select error %d.%d", err, errno);
487 FD_ZERO(sfds);
488 FD_SET(socket, sfds);
491 if (sfds)
492 IOMGR_FreeFDSet(sfds);
493 return 0;
495 #endif