Expose ether_input_oncpu()
[dragonfly.git] / contrib / sendmail-8.14 / libmilter / worker.c
blob04026783f32a0aaa3235060f1e1dcebbf542c8cf
1 /*
2 * Copyright (c) 2003-2004, 2006 Sendmail, Inc. and its suppliers.
3 * All rights reserved.
5 * By using this file, you agree to the terms and conditions set
6 * forth in the LICENSE file which can be found at the top level of
7 * the sendmail distribution.
9 * Contributed by Jose Marcio Martins da Cruz - Ecole des Mines de Paris
10 * Jose-Marcio.Martins@ensmp.fr
13 #include <sm/gen.h>
14 SM_RCSID("@(#)$Id: worker.c,v 8.9 2006/12/18 18:26:51 ca Exp $")
16 #include "libmilter.h"
18 #if _FFR_WORKERS_POOL
20 typedef struct taskmgr_S taskmgr_T;
22 #define TM_SIGNATURE 0x23021957
24 struct taskmgr_S
26 long tm_signature; /* has the controller been initialized */
27 sthread_t tm_tid; /* thread id of controller */
28 smfi_hd_T tm_ctx_head; /* head of the linked list of contexts */
30 int tm_nb_workers; /* number of workers in the pool */
31 int tm_nb_idle; /* number of workers waiting */
33 int tm_p[2]; /* poll control pipe */
35 smutex_t tm_w_mutex; /* linked list access mutex */
36 scond_t tm_w_cond; /* */
39 static taskmgr_T Tskmgr = {0};
41 #define WRK_CTX_HEAD Tskmgr.tm_ctx_head
43 #define RD_PIPE (Tskmgr.tm_p[0])
44 #define WR_PIPE (Tskmgr.tm_p[1])
46 #define PIPE_SEND_SIGNAL() \
47 do \
48 { \
49 char evt = 0x5a; \
50 int fd = WR_PIPE; \
51 if (write(fd, &evt, sizeof(evt)) != sizeof(evt)) \
52 smi_log(SMI_LOG_ERR, \
53 "Error writing to event pipe: %s", \
54 sm_errstring(errno)); \
55 } while (0)
57 #ifndef USE_PIPE_WAKE_POLL
58 # define USE_PIPE_WAKE_POLL 1
59 #endif /* USE_PIPE_WAKE_POLL */
61 /* poll check periodicity (default 10000 - 10 s) */
62 #define POLL_TIMEOUT 10000
64 /* worker conditional wait timeout (default 10 s) */
65 #define COND_TIMEOUT 10
67 /* functions */
68 static int mi_close_session __P((SMFICTX_PTR));
70 static void *mi_worker __P((void *));
71 static void *mi_pool_controller __P((void *));
73 static int mi_list_add_ctx __P((SMFICTX_PTR));
74 static int mi_list_del_ctx __P((SMFICTX_PTR));
77 ** periodicity of cleaning up old sessions (timedout)
78 ** sessions list will be checked to find old inactive
79 ** sessions each DT_CHECK_OLD_SESSIONS sec
82 #define DT_CHECK_OLD_SESSIONS 600
84 #ifndef OLD_SESSION_TIMEOUT
85 # define OLD_SESSION_TIMEOUT ctx->ctx_timeout
86 #endif /* OLD_SESSION_TIMEOUT */
88 /* session states - with respect to the pool of workers */
89 #define WKST_INIT 0 /* initial state */
90 #define WKST_READY_TO_RUN 1 /* command ready do be read */
91 #define WKST_RUNNING 2 /* session running on a worker */
92 #define WKST_READY_TO_WAIT 3 /* session just finished by a worker */
93 #define WKST_WAITING 4 /* waiting for new command */
94 #define WKST_CLOSING 5 /* session finished */
96 #ifndef MIN_WORKERS
97 # define MIN_WORKERS 2 /* minimum number of threads to keep around */
98 #endif
100 #define MIN_IDLE 1 /* minimum number of idle threads */
104 ** Macros for threads and mutex management
107 #define TASKMGR_LOCK() \
108 do \
110 if (!smutex_lock(&Tskmgr.tm_w_mutex)) \
111 smi_log(SMI_LOG_ERR, "TASKMGR_LOCK error"); \
112 } while (0)
114 #define TASKMGR_UNLOCK() \
115 do \
117 if (!smutex_unlock(&Tskmgr.tm_w_mutex)) \
118 smi_log(SMI_LOG_ERR, "TASKMGR_UNLOCK error"); \
119 } while (0)
121 #define TASKMGR_COND_WAIT() \
122 scond_timedwait(&Tskmgr.tm_w_cond, &Tskmgr.tm_w_mutex, COND_TIMEOUT)
124 #define TASKMGR_COND_SIGNAL() \
125 do \
127 if (scond_signal(&Tskmgr.tm_w_cond) != 0) \
128 smi_log(SMI_LOG_ERR, "TASKMGR_COND_SIGNAL error"); \
129 } while (0)
131 #define LAUNCH_WORKER(ctx) \
132 do \
134 int r; \
135 sthread_t tid; \
137 if ((r = thread_create(&tid, mi_worker, ctx)) != 0) \
138 smi_log(SMI_LOG_ERR, "LAUNCH_WORKER error: %s",\
139 sm_errstring(r)); \
140 } while (0)
142 #if POOL_DEBUG
143 # define POOL_LEV_DPRINTF(lev, x) \
144 do { \
145 if ((lev) < ctx->ctx_dbg) \
146 sm_dprintf x; \
147 } while (0)
148 #else /* POOL_DEBUG */
149 # define POOL_LEV_DPRINTF(lev, x)
150 #endif /* POOL_DEBUG */
153 ** MI_START_SESSION -- Start a session in the pool of workers
155 ** Parameters:
156 ** ctx -- context structure
158 ** Returns:
159 ** MI_SUCCESS/MI_FAILURE
163 mi_start_session(ctx)
164 SMFICTX_PTR ctx;
166 static long id = 0;
168 SM_ASSERT(Tskmgr.tm_signature == TM_SIGNATURE);
169 SM_ASSERT(ctx != NULL);
170 POOL_LEV_DPRINTF(4, ("PIPE r=[%d] w=[%d]", RD_PIPE, WR_PIPE));
171 TASKMGR_LOCK();
173 if (mi_list_add_ctx(ctx) != MI_SUCCESS)
175 TASKMGR_UNLOCK();
176 return MI_FAILURE;
179 ctx->ctx_sid = id++;
181 /* if there is an idle worker, signal it, otherwise start new worker */
182 if (Tskmgr.tm_nb_idle > 0)
184 ctx->ctx_wstate = WKST_READY_TO_RUN;
185 TASKMGR_COND_SIGNAL();
187 else
189 ctx->ctx_wstate = WKST_RUNNING;
190 LAUNCH_WORKER(ctx);
192 TASKMGR_UNLOCK();
193 return MI_SUCCESS;
197 ** MI_CLOSE_SESSION -- Close a session and clean up data structures
199 ** Parameters:
200 ** ctx -- context structure
202 ** Returns:
203 ** MI_SUCCESS/MI_FAILURE
206 static int
207 mi_close_session(ctx)
208 SMFICTX_PTR ctx;
210 SM_ASSERT(ctx != NULL);
212 (void) mi_list_del_ctx(ctx);
213 if (ValidSocket(ctx->ctx_sd))
215 (void) closesocket(ctx->ctx_sd);
216 ctx->ctx_sd = INVALID_SOCKET;
218 if (ctx->ctx_reply != NULL)
220 free(ctx->ctx_reply);
221 ctx->ctx_reply = NULL;
223 if (ctx->ctx_privdata != NULL)
225 smi_log(SMI_LOG_WARN, "%s: private data not NULL",
226 ctx->ctx_smfi->xxfi_name);
228 mi_clr_macros(ctx, 0);
229 free(ctx);
231 return MI_SUCCESS;
235 ** MI_POOL_CONTROLER_INIT -- Launch the worker pool controller
236 ** Must be called before starting sessions.
238 ** Parameters:
239 ** none
241 ** Returns:
242 ** MI_SUCCESS/MI_FAILURE
246 mi_pool_controller_init()
248 sthread_t tid;
249 int r, i;
251 if (Tskmgr.tm_signature == TM_SIGNATURE)
252 return MI_SUCCESS;
254 SM_TAILQ_INIT(&WRK_CTX_HEAD);
255 Tskmgr.tm_tid = (sthread_t) -1;
256 Tskmgr.tm_nb_workers = 0;
257 Tskmgr.tm_nb_idle = 0;
259 if (pipe(Tskmgr.tm_p) != 0)
261 smi_log(SMI_LOG_ERR, "can't create event pipe: %s",
262 sm_errstring(r));
263 return MI_FAILURE;
266 POOL_LEV_DPRINTF(4, ("PIPE r=[%d] w=[%d]", RD_PIPE, WR_PIPE));
268 (void) smutex_init(&Tskmgr.tm_w_mutex);
269 (void) scond_init(&Tskmgr.tm_w_cond);
271 /* Launch the pool controller */
272 if ((r = thread_create(&tid, mi_pool_controller, (void *) NULL)) != 0)
274 smi_log(SMI_LOG_ERR, "can't create controller thread: %s",
275 sm_errstring(r));
276 return MI_FAILURE;
278 Tskmgr.tm_tid = tid;
279 Tskmgr.tm_signature = TM_SIGNATURE;
281 /* Create the pool of workers */
282 for (i = 0; i < MIN_WORKERS; i++)
284 if ((r = thread_create(&tid, mi_worker, (void *) NULL)) != 0)
286 smi_log(SMI_LOG_ERR, "can't create workers crew: %s",
287 sm_errstring(r));
288 return MI_FAILURE;
292 return MI_SUCCESS;
296 ** MI_POOL_CONTROLLER -- manage the pool of workers
297 ** This thread must be running when listener begins
298 ** starting sessions
300 ** Parameters:
301 ** arg -- unused
303 ** Returns:
304 ** NULL
306 ** Control flow:
307 ** for (;;)
308 ** Look for timed out sessions
309 ** Select sessions to wait for sendmail command
310 ** Poll set of file descriptors
311 ** if timeout
312 ** continue
313 ** For each file descriptor ready
314 ** launch new thread if no worker available
315 ** else
316 ** signal waiting worker
319 /* Poll structure array (pollfd) size step */
320 #define PFD_STEP 256
322 #define WAIT_FD(i) (pfd[i].fd)
323 #define WAITFN "POLL"
325 static void *
326 mi_pool_controller(arg)
327 void *arg;
329 struct pollfd *pfd = NULL;
330 int dim_pfd = 0;
331 bool rebuild_set = true;
332 int pcnt = 0; /* error count for poll() failures */
334 Tskmgr.tm_tid = sthread_get_id();
335 if (pthread_detach(Tskmgr.tm_tid) != 0)
337 smi_log(SMI_LOG_ERR, "Failed to detach pool controller thread");
338 return NULL;
341 pfd = (struct pollfd *) malloc(PFD_STEP * sizeof(struct pollfd));
342 if (pfd == NULL)
344 smi_log(SMI_LOG_ERR, "Failed to malloc pollfd array: %s",
345 sm_errstring(errno));
346 return NULL;
348 dim_pfd = PFD_STEP;
350 for (;;)
352 SMFICTX_PTR ctx;
353 int nfd, rfd, i;
354 time_t now;
355 time_t lastcheck;
357 POOL_LEV_DPRINTF(4, ("Let's %s again...", WAITFN));
359 if (mi_stop() != MILTER_CONT)
360 break;
362 TASKMGR_LOCK();
364 now = time(NULL);
366 /* check for timed out sessions? */
367 if (lastcheck + DT_CHECK_OLD_SESSIONS < now)
369 SM_TAILQ_FOREACH(ctx, &WRK_CTX_HEAD, ctx_link)
371 if (ctx->ctx_wstate == WKST_WAITING)
373 if (ctx->ctx_wait == 0)
375 ctx->ctx_wait = now;
376 continue;
379 /* if session timed out, close it */
380 if (ctx->ctx_wait + OLD_SESSION_TIMEOUT
381 < now)
383 sfsistat (*fi_close) __P((SMFICTX *));
385 POOL_LEV_DPRINTF(4,
386 ("Closing old connection: sd=%d id=%d",
387 ctx->ctx_sd,
388 ctx->ctx_sid));
390 if ((fi_close = ctx->ctx_smfi->xxfi_close) != NULL)
391 (void) (*fi_close)(ctx);
393 mi_close_session(ctx);
394 ctx = SM_TAILQ_FIRST(&WRK_CTX_HEAD);
395 continue;
399 lastcheck = now;
402 if (rebuild_set)
405 ** Initialize poll set.
406 ** Insert into the poll set the file descriptors of
407 ** all sessions waiting for a command from sendmail.
410 nfd = 0;
412 /* begin with worker pipe */
413 pfd[nfd].fd = RD_PIPE;
414 pfd[nfd].events = MI_POLL_RD_FLAGS;
415 pfd[nfd].revents = 0;
416 nfd++;
418 SM_TAILQ_FOREACH(ctx, &WRK_CTX_HEAD, ctx_link)
421 ** update ctx_wait - start of wait moment -
422 ** for timeout
425 if (ctx->ctx_wstate == WKST_READY_TO_WAIT)
426 ctx->ctx_wait = now;
428 /* add the session to the pollfd array? */
429 if ((ctx->ctx_wstate == WKST_READY_TO_WAIT) ||
430 (ctx->ctx_wstate == WKST_WAITING))
433 ** Resize the pollfd array if it
434 ** isn't large enough.
437 if (nfd >= dim_pfd)
439 struct pollfd *tpfd;
440 size_t new;
442 new = (dim_pfd + PFD_STEP) *
443 sizeof(*tpfd);
444 tpfd = (struct pollfd *)
445 realloc(pfd, new);
446 if (tpfd != NULL)
448 pfd = tpfd;
449 dim_pfd += PFD_STEP;
451 else
453 smi_log(SMI_LOG_ERR,
454 "Failed to realloc pollfd array:%s",
455 sm_errstring(errno));
459 /* add the session to pollfd array */
460 if (nfd < dim_pfd)
462 ctx->ctx_wstate = WKST_WAITING;
463 pfd[nfd].fd = ctx->ctx_sd;
464 pfd[nfd].events = MI_POLL_RD_FLAGS;
465 pfd[nfd].revents = 0;
466 nfd++;
472 TASKMGR_UNLOCK();
474 /* Everything is ready, let's wait for an event */
475 rfd = poll(pfd, nfd, POLL_TIMEOUT);
477 POOL_LEV_DPRINTF(4, ("%s returned: at epoch %d value %d",
478 WAITFN, now, nfd));
480 /* timeout */
481 if (rfd == 0)
482 continue;
484 rebuild_set = true;
486 /* error */
487 if (rfd < 0)
489 if (errno == EINTR)
490 continue;
491 pcnt++;
492 smi_log(SMI_LOG_ERR,
493 "%s() failed (%s), %s",
494 WAITFN, sm_errstring(errno),
495 pcnt >= MAX_FAILS_S ? "abort" : "try again");
497 if (pcnt >= MAX_FAILS_S)
498 goto err;
500 pcnt = 0;
502 /* something happened */
503 for (i = 0; i < nfd; i++)
505 if (pfd[i].revents == 0)
506 continue;
508 POOL_LEV_DPRINTF(4, ("%s event on pfd[%d/%d]=%d ",
509 WAITFN, i, nfd,
510 WAIT_FD(i)));
512 /* has a worker signaled an end of task ? */
513 if (WAIT_FD(i) == RD_PIPE)
515 char evt = 0;
516 int r = 0;
518 POOL_LEV_DPRINTF(4,
519 ("PIPE WILL READ evt = %08X %08X",
520 pfd[i].events, pfd[i].revents));
522 if ((pfd[i].revents & MI_POLL_RD_FLAGS) != 0)
524 r = read(RD_PIPE, &evt, sizeof(evt));
525 if (r == sizeof(evt))
527 /* Do nothing */
531 POOL_LEV_DPRINTF(4,
532 ("PIPE DONE READ i=[%d] fd=[%d] r=[%d] evt=[%d]",
533 i, RD_PIPE, r, evt));
535 if ((pfd[i].revents & ~MI_POLL_RD_FLAGS) != 0)
537 /* Exception handling */
539 continue;
542 /* no ! sendmail wants to send a command */
543 SM_TAILQ_FOREACH(ctx, &WRK_CTX_HEAD, ctx_link)
545 if (ctx->ctx_wstate != WKST_WAITING)
546 continue;
548 POOL_LEV_DPRINTF(4,
549 ("Checking context sd=%d - fd=%d ",
550 ctx->ctx_sd , WAIT_FD(i)));
552 if (ctx->ctx_sd == pfd[i].fd)
554 TASKMGR_LOCK();
556 POOL_LEV_DPRINTF(4,
557 ("TASK: found %d for fd[%d]=%d",
558 ctx->ctx_sid, i, WAIT_FD(i)));
560 if (Tskmgr.tm_nb_idle > 0)
562 ctx->ctx_wstate = WKST_READY_TO_RUN;
563 TASKMGR_COND_SIGNAL();
565 else
567 ctx->ctx_wstate = WKST_RUNNING;
568 LAUNCH_WORKER(ctx);
570 TASKMGR_UNLOCK();
571 break;
575 POOL_LEV_DPRINTF(4,
576 ("TASK %s FOUND - Checking PIPE for fd[%d]",
577 ctx != NULL ? "" : "NOT", WAIT_FD(i)));
581 err:
582 if (pfd != NULL)
583 free(pfd);
585 Tskmgr.tm_signature = 0;
586 for (;;)
588 SMFICTX_PTR ctx;
590 ctx = SM_TAILQ_FIRST(&WRK_CTX_HEAD);
591 if (ctx == NULL)
592 break;
593 mi_close_session(ctx);
596 (void) smutex_destroy(&Tskmgr.tm_w_mutex);
597 (void) scond_destroy(&Tskmgr.tm_w_cond);
599 return NULL;
603 ** Look for a task ready to run.
604 ** Value of ctx is NULL or a pointer to a task ready to run.
607 #define GET_TASK_READY_TO_RUN() \
608 SM_TAILQ_FOREACH(ctx, &WRK_CTX_HEAD, ctx_link) \
610 if (ctx->ctx_wstate == WKST_READY_TO_RUN) \
612 ctx->ctx_wstate = WKST_RUNNING; \
613 break; \
618 ** MI_WORKER -- worker thread
619 ** executes tasks distributed by the mi_pool_controller
620 ** or by mi_start_session
622 ** Parameters:
623 ** arg -- pointer to context structure
625 ** Returns:
626 ** NULL pointer
629 static void *
630 mi_worker(arg)
631 void *arg;
633 SMFICTX_PTR ctx;
634 bool done;
635 sthread_t t_id;
636 int r;
638 ctx = (SMFICTX_PTR) arg;
639 done = false;
640 if (ctx != NULL)
641 ctx->ctx_wstate = WKST_RUNNING;
643 t_id = sthread_get_id();
644 if (pthread_detach(t_id) != 0)
646 smi_log(SMI_LOG_ERR, "Failed to detach worker thread");
647 if (ctx != NULL)
648 ctx->ctx_wstate = WKST_READY_TO_RUN;
649 return NULL;
652 TASKMGR_LOCK();
653 Tskmgr.tm_nb_workers++;
654 TASKMGR_UNLOCK();
656 while (!done)
658 if (mi_stop() != MILTER_CONT)
659 break;
661 /* let's handle next task... */
662 if (ctx != NULL)
664 int res;
666 POOL_LEV_DPRINTF(4,
667 ("worker %d: new task -> let's handle it",
668 t_id));
669 res = mi_engine(ctx);
670 POOL_LEV_DPRINTF(4,
671 ("worker %d: mi_engine returned %d", t_id, res));
673 TASKMGR_LOCK();
674 if (res != MI_CONTINUE)
676 ctx->ctx_wstate = WKST_CLOSING;
679 ** Delete context from linked list of
680 ** sessions and close session.
683 mi_close_session(ctx);
685 else
687 ctx->ctx_wstate = WKST_READY_TO_WAIT;
689 POOL_LEV_DPRINTF(4,
690 ("writing to event pipe..."));
693 ** Signal task controller to add new session
694 ** to poll set.
697 PIPE_SEND_SIGNAL();
699 TASKMGR_UNLOCK();
700 ctx = NULL;
704 /* check if there is any task waiting to be served */
705 TASKMGR_LOCK();
707 GET_TASK_READY_TO_RUN();
709 /* Got a task? */
710 if (ctx != NULL)
712 TASKMGR_UNLOCK();
713 continue;
717 ** if not, let's check if there is enough idle workers
718 ** if yes: quit
721 if (Tskmgr.tm_nb_workers > MIN_WORKERS &&
722 Tskmgr.tm_nb_idle > MIN_IDLE)
723 done = true;
725 POOL_LEV_DPRINTF(4, ("worker %d: checking ... %d %d", t_id,
726 Tskmgr.tm_nb_workers, Tskmgr.tm_nb_idle + 1));
728 if (done)
730 POOL_LEV_DPRINTF(4, ("worker %d: quitting... ", t_id));
731 Tskmgr.tm_nb_workers--;
732 TASKMGR_UNLOCK();
733 continue;
737 ** if no task ready to run, wait for another one
740 Tskmgr.tm_nb_idle++;
741 TASKMGR_COND_WAIT();
742 Tskmgr.tm_nb_idle--;
744 /* look for a task */
745 GET_TASK_READY_TO_RUN();
747 TASKMGR_UNLOCK();
749 return NULL;
753 ** MI_LIST_ADD_CTX -- add new session to linked list
755 ** Parameters:
756 ** ctx -- context structure
758 ** Returns:
759 ** MI_FAILURE/MI_SUCCESS
762 static int
763 mi_list_add_ctx(ctx)
764 SMFICTX_PTR ctx;
766 SM_ASSERT(ctx != NULL);
767 SM_TAILQ_INSERT_TAIL(&WRK_CTX_HEAD, ctx, ctx_link);
768 return MI_SUCCESS;
772 ** MI_LIST_DEL_CTX -- remove session from linked list when finished
774 ** Parameters:
775 ** ctx -- context structure
777 ** Returns:
778 ** MI_FAILURE/MI_SUCCESS
781 static int
782 mi_list_del_ctx(ctx)
783 SMFICTX_PTR ctx;
785 SM_ASSERT(ctx != NULL);
786 if (SM_TAILQ_EMPTY(&WRK_CTX_HEAD))
787 return MI_FAILURE;
789 SM_TAILQ_REMOVE(&WRK_CTX_HEAD, ctx, ctx_link);
790 return MI_SUCCESS;
792 #endif /* _FFR_WORKERS_POOL */