2 * Copyright (c) 2003-2004, 2006 Sendmail, Inc. and its suppliers.
5 * By using this file, you agree to the terms and conditions set
6 * forth in the LICENSE file which can be found at the top level of
7 * the sendmail distribution.
9 * Contributed by Jose Marcio Martins da Cruz - Ecole des Mines de Paris
10 * Jose-Marcio.Martins@ensmp.fr
14 SM_RCSID("@(#)$Id: worker.c,v 8.9 2006/12/18 18:26:51 ca Exp $")
16 #include "libmilter.h"
20 typedef struct taskmgr_S taskmgr_T
;
22 #define TM_SIGNATURE 0x23021957
26 long tm_signature
; /* has the controller been initialized */
27 sthread_t tm_tid
; /* thread id of controller */
28 smfi_hd_T tm_ctx_head
; /* head of the linked list of contexts */
30 int tm_nb_workers
; /* number of workers in the pool */
31 int tm_nb_idle
; /* number of workers waiting */
33 int tm_p
[2]; /* poll control pipe */
35 smutex_t tm_w_mutex
; /* linked list access mutex */
36 scond_t tm_w_cond
; /* */
39 static taskmgr_T Tskmgr
= {0};
41 #define WRK_CTX_HEAD Tskmgr.tm_ctx_head
43 #define RD_PIPE (Tskmgr.tm_p[0])
44 #define WR_PIPE (Tskmgr.tm_p[1])
46 #define PIPE_SEND_SIGNAL() \
51 if (write(fd, &evt, sizeof(evt)) != sizeof(evt)) \
52 smi_log(SMI_LOG_ERR, \
53 "Error writing to event pipe: %s", \
54 sm_errstring(errno)); \
57 #ifndef USE_PIPE_WAKE_POLL
58 # define USE_PIPE_WAKE_POLL 1
59 #endif /* USE_PIPE_WAKE_POLL */
61 /* poll check periodicity (default 10000 - 10 s) */
62 #define POLL_TIMEOUT 10000
64 /* worker conditional wait timeout (default 10 s) */
65 #define COND_TIMEOUT 10
68 static int mi_close_session
__P((SMFICTX_PTR
));
70 static void *mi_worker
__P((void *));
71 static void *mi_pool_controller
__P((void *));
73 static int mi_list_add_ctx
__P((SMFICTX_PTR
));
74 static int mi_list_del_ctx
__P((SMFICTX_PTR
));
77 ** periodicity of cleaning up old sessions (timedout)
78 ** sessions list will be checked to find old inactive
79 ** sessions each DT_CHECK_OLD_SESSIONS sec
82 #define DT_CHECK_OLD_SESSIONS 600
84 #ifndef OLD_SESSION_TIMEOUT
85 # define OLD_SESSION_TIMEOUT ctx->ctx_timeout
86 #endif /* OLD_SESSION_TIMEOUT */
88 /* session states - with respect to the pool of workers */
89 #define WKST_INIT 0 /* initial state */
90 #define WKST_READY_TO_RUN 1 /* command ready do be read */
91 #define WKST_RUNNING 2 /* session running on a worker */
92 #define WKST_READY_TO_WAIT 3 /* session just finished by a worker */
93 #define WKST_WAITING 4 /* waiting for new command */
94 #define WKST_CLOSING 5 /* session finished */
97 # define MIN_WORKERS 2 /* minimum number of threads to keep around */
100 #define MIN_IDLE 1 /* minimum number of idle threads */
104 ** Macros for threads and mutex management
107 #define TASKMGR_LOCK() \
110 if (!smutex_lock(&Tskmgr.tm_w_mutex)) \
111 smi_log(SMI_LOG_ERR, "TASKMGR_LOCK error"); \
114 #define TASKMGR_UNLOCK() \
117 if (!smutex_unlock(&Tskmgr.tm_w_mutex)) \
118 smi_log(SMI_LOG_ERR, "TASKMGR_UNLOCK error"); \
121 #define TASKMGR_COND_WAIT() \
122 scond_timedwait(&Tskmgr.tm_w_cond, &Tskmgr.tm_w_mutex, COND_TIMEOUT)
124 #define TASKMGR_COND_SIGNAL() \
127 if (scond_signal(&Tskmgr.tm_w_cond) != 0) \
128 smi_log(SMI_LOG_ERR, "TASKMGR_COND_SIGNAL error"); \
131 #define LAUNCH_WORKER(ctx) \
137 if ((r = thread_create(&tid, mi_worker, ctx)) != 0) \
138 smi_log(SMI_LOG_ERR, "LAUNCH_WORKER error: %s",\
143 # define POOL_LEV_DPRINTF(lev, x) \
145 if ((lev) < ctx->ctx_dbg) \
148 #else /* POOL_DEBUG */
149 # define POOL_LEV_DPRINTF(lev, x)
150 #endif /* POOL_DEBUG */
153 ** MI_START_SESSION -- Start a session in the pool of workers
156 ** ctx -- context structure
159 ** MI_SUCCESS/MI_FAILURE
163 mi_start_session(ctx
)
168 SM_ASSERT(Tskmgr
.tm_signature
== TM_SIGNATURE
);
169 SM_ASSERT(ctx
!= NULL
);
170 POOL_LEV_DPRINTF(4, ("PIPE r=[%d] w=[%d]", RD_PIPE
, WR_PIPE
));
173 if (mi_list_add_ctx(ctx
) != MI_SUCCESS
)
181 /* if there is an idle worker, signal it, otherwise start new worker */
182 if (Tskmgr
.tm_nb_idle
> 0)
184 ctx
->ctx_wstate
= WKST_READY_TO_RUN
;
185 TASKMGR_COND_SIGNAL();
189 ctx
->ctx_wstate
= WKST_RUNNING
;
197 ** MI_CLOSE_SESSION -- Close a session and clean up data structures
200 ** ctx -- context structure
203 ** MI_SUCCESS/MI_FAILURE
207 mi_close_session(ctx
)
210 SM_ASSERT(ctx
!= NULL
);
212 (void) mi_list_del_ctx(ctx
);
213 if (ValidSocket(ctx
->ctx_sd
))
215 (void) closesocket(ctx
->ctx_sd
);
216 ctx
->ctx_sd
= INVALID_SOCKET
;
218 if (ctx
->ctx_reply
!= NULL
)
220 free(ctx
->ctx_reply
);
221 ctx
->ctx_reply
= NULL
;
223 if (ctx
->ctx_privdata
!= NULL
)
225 smi_log(SMI_LOG_WARN
, "%s: private data not NULL",
226 ctx
->ctx_smfi
->xxfi_name
);
228 mi_clr_macros(ctx
, 0);
235 ** MI_POOL_CONTROLER_INIT -- Launch the worker pool controller
236 ** Must be called before starting sessions.
242 ** MI_SUCCESS/MI_FAILURE
246 mi_pool_controller_init()
251 if (Tskmgr
.tm_signature
== TM_SIGNATURE
)
254 SM_TAILQ_INIT(&WRK_CTX_HEAD
);
255 Tskmgr
.tm_tid
= (sthread_t
) -1;
256 Tskmgr
.tm_nb_workers
= 0;
257 Tskmgr
.tm_nb_idle
= 0;
259 if (pipe(Tskmgr
.tm_p
) != 0)
261 smi_log(SMI_LOG_ERR
, "can't create event pipe: %s",
266 POOL_LEV_DPRINTF(4, ("PIPE r=[%d] w=[%d]", RD_PIPE
, WR_PIPE
));
268 (void) smutex_init(&Tskmgr
.tm_w_mutex
);
269 (void) scond_init(&Tskmgr
.tm_w_cond
);
271 /* Launch the pool controller */
272 if ((r
= thread_create(&tid
, mi_pool_controller
, (void *) NULL
)) != 0)
274 smi_log(SMI_LOG_ERR
, "can't create controller thread: %s",
279 Tskmgr
.tm_signature
= TM_SIGNATURE
;
281 /* Create the pool of workers */
282 for (i
= 0; i
< MIN_WORKERS
; i
++)
284 if ((r
= thread_create(&tid
, mi_worker
, (void *) NULL
)) != 0)
286 smi_log(SMI_LOG_ERR
, "can't create workers crew: %s",
296 ** MI_POOL_CONTROLLER -- manage the pool of workers
297 ** This thread must be running when listener begins
308 ** Look for timed out sessions
309 ** Select sessions to wait for sendmail command
310 ** Poll set of file descriptors
313 ** For each file descriptor ready
314 ** launch new thread if no worker available
316 ** signal waiting worker
319 /* Poll structure array (pollfd) size step */
322 #define WAIT_FD(i) (pfd[i].fd)
323 #define WAITFN "POLL"
326 mi_pool_controller(arg
)
329 struct pollfd
*pfd
= NULL
;
331 bool rebuild_set
= true;
332 int pcnt
= 0; /* error count for poll() failures */
334 Tskmgr
.tm_tid
= sthread_get_id();
335 if (pthread_detach(Tskmgr
.tm_tid
) != 0)
337 smi_log(SMI_LOG_ERR
, "Failed to detach pool controller thread");
341 pfd
= (struct pollfd
*) malloc(PFD_STEP
* sizeof(struct pollfd
));
344 smi_log(SMI_LOG_ERR
, "Failed to malloc pollfd array: %s",
345 sm_errstring(errno
));
357 POOL_LEV_DPRINTF(4, ("Let's %s again...", WAITFN
));
359 if (mi_stop() != MILTER_CONT
)
366 /* check for timed out sessions? */
367 if (lastcheck
+ DT_CHECK_OLD_SESSIONS
< now
)
369 SM_TAILQ_FOREACH(ctx
, &WRK_CTX_HEAD
, ctx_link
)
371 if (ctx
->ctx_wstate
== WKST_WAITING
)
373 if (ctx
->ctx_wait
== 0)
379 /* if session timed out, close it */
380 if (ctx
->ctx_wait
+ OLD_SESSION_TIMEOUT
383 sfsistat (*fi_close
) __P((SMFICTX
*));
386 ("Closing old connection: sd=%d id=%d",
390 if ((fi_close
= ctx
->ctx_smfi
->xxfi_close
) != NULL
)
391 (void) (*fi_close
)(ctx
);
393 mi_close_session(ctx
);
394 ctx
= SM_TAILQ_FIRST(&WRK_CTX_HEAD
);
405 ** Initialize poll set.
406 ** Insert into the poll set the file descriptors of
407 ** all sessions waiting for a command from sendmail.
412 /* begin with worker pipe */
413 pfd
[nfd
].fd
= RD_PIPE
;
414 pfd
[nfd
].events
= MI_POLL_RD_FLAGS
;
415 pfd
[nfd
].revents
= 0;
418 SM_TAILQ_FOREACH(ctx
, &WRK_CTX_HEAD
, ctx_link
)
421 ** update ctx_wait - start of wait moment -
425 if (ctx
->ctx_wstate
== WKST_READY_TO_WAIT
)
428 /* add the session to the pollfd array? */
429 if ((ctx
->ctx_wstate
== WKST_READY_TO_WAIT
) ||
430 (ctx
->ctx_wstate
== WKST_WAITING
))
433 ** Resize the pollfd array if it
434 ** isn't large enough.
442 new = (dim_pfd
+ PFD_STEP
) *
444 tpfd
= (struct pollfd
*)
454 "Failed to realloc pollfd array:%s",
455 sm_errstring(errno
));
459 /* add the session to pollfd array */
462 ctx
->ctx_wstate
= WKST_WAITING
;
463 pfd
[nfd
].fd
= ctx
->ctx_sd
;
464 pfd
[nfd
].events
= MI_POLL_RD_FLAGS
;
465 pfd
[nfd
].revents
= 0;
474 /* Everything is ready, let's wait for an event */
475 rfd
= poll(pfd
, nfd
, POLL_TIMEOUT
);
477 POOL_LEV_DPRINTF(4, ("%s returned: at epoch %d value %d",
493 "%s() failed (%s), %s",
494 WAITFN
, sm_errstring(errno
),
495 pcnt
>= MAX_FAILS_S
? "abort" : "try again");
497 if (pcnt
>= MAX_FAILS_S
)
502 /* something happened */
503 for (i
= 0; i
< nfd
; i
++)
505 if (pfd
[i
].revents
== 0)
508 POOL_LEV_DPRINTF(4, ("%s event on pfd[%d/%d]=%d ",
512 /* has a worker signaled an end of task ? */
513 if (WAIT_FD(i
) == RD_PIPE
)
519 ("PIPE WILL READ evt = %08X %08X",
520 pfd
[i
].events
, pfd
[i
].revents
));
522 if ((pfd
[i
].revents
& MI_POLL_RD_FLAGS
) != 0)
524 r
= read(RD_PIPE
, &evt
, sizeof(evt
));
525 if (r
== sizeof(evt
))
532 ("PIPE DONE READ i=[%d] fd=[%d] r=[%d] evt=[%d]",
533 i
, RD_PIPE
, r
, evt
));
535 if ((pfd
[i
].revents
& ~MI_POLL_RD_FLAGS
) != 0)
537 /* Exception handling */
542 /* no ! sendmail wants to send a command */
543 SM_TAILQ_FOREACH(ctx
, &WRK_CTX_HEAD
, ctx_link
)
545 if (ctx
->ctx_wstate
!= WKST_WAITING
)
549 ("Checking context sd=%d - fd=%d ",
550 ctx
->ctx_sd
, WAIT_FD(i
)));
552 if (ctx
->ctx_sd
== pfd
[i
].fd
)
557 ("TASK: found %d for fd[%d]=%d",
558 ctx
->ctx_sid
, i
, WAIT_FD(i
)));
560 if (Tskmgr
.tm_nb_idle
> 0)
562 ctx
->ctx_wstate
= WKST_READY_TO_RUN
;
563 TASKMGR_COND_SIGNAL();
567 ctx
->ctx_wstate
= WKST_RUNNING
;
576 ("TASK %s FOUND - Checking PIPE for fd[%d]",
577 ctx
!= NULL
? "" : "NOT", WAIT_FD(i
)));
585 Tskmgr
.tm_signature
= 0;
590 ctx
= SM_TAILQ_FIRST(&WRK_CTX_HEAD
);
593 mi_close_session(ctx
);
596 (void) smutex_destroy(&Tskmgr
.tm_w_mutex
);
597 (void) scond_destroy(&Tskmgr
.tm_w_cond
);
603 ** Look for a task ready to run.
604 ** Value of ctx is NULL or a pointer to a task ready to run.
607 #define GET_TASK_READY_TO_RUN() \
608 SM_TAILQ_FOREACH(ctx, &WRK_CTX_HEAD, ctx_link) \
610 if (ctx->ctx_wstate == WKST_READY_TO_RUN) \
612 ctx->ctx_wstate = WKST_RUNNING; \
618 ** MI_WORKER -- worker thread
619 ** executes tasks distributed by the mi_pool_controller
620 ** or by mi_start_session
623 ** arg -- pointer to context structure
638 ctx
= (SMFICTX_PTR
) arg
;
641 ctx
->ctx_wstate
= WKST_RUNNING
;
643 t_id
= sthread_get_id();
644 if (pthread_detach(t_id
) != 0)
646 smi_log(SMI_LOG_ERR
, "Failed to detach worker thread");
648 ctx
->ctx_wstate
= WKST_READY_TO_RUN
;
653 Tskmgr
.tm_nb_workers
++;
658 if (mi_stop() != MILTER_CONT
)
661 /* let's handle next task... */
667 ("worker %d: new task -> let's handle it",
669 res
= mi_engine(ctx
);
671 ("worker %d: mi_engine returned %d", t_id
, res
));
674 if (res
!= MI_CONTINUE
)
676 ctx
->ctx_wstate
= WKST_CLOSING
;
679 ** Delete context from linked list of
680 ** sessions and close session.
683 mi_close_session(ctx
);
687 ctx
->ctx_wstate
= WKST_READY_TO_WAIT
;
690 ("writing to event pipe..."));
693 ** Signal task controller to add new session
704 /* check if there is any task waiting to be served */
707 GET_TASK_READY_TO_RUN();
717 ** if not, let's check if there is enough idle workers
721 if (Tskmgr
.tm_nb_workers
> MIN_WORKERS
&&
722 Tskmgr
.tm_nb_idle
> MIN_IDLE
)
725 POOL_LEV_DPRINTF(4, ("worker %d: checking ... %d %d", t_id
,
726 Tskmgr
.tm_nb_workers
, Tskmgr
.tm_nb_idle
+ 1));
730 POOL_LEV_DPRINTF(4, ("worker %d: quitting... ", t_id
));
731 Tskmgr
.tm_nb_workers
--;
737 ** if no task ready to run, wait for another one
744 /* look for a task */
745 GET_TASK_READY_TO_RUN();
753 ** MI_LIST_ADD_CTX -- add new session to linked list
756 ** ctx -- context structure
759 ** MI_FAILURE/MI_SUCCESS
766 SM_ASSERT(ctx
!= NULL
);
767 SM_TAILQ_INSERT_TAIL(&WRK_CTX_HEAD
, ctx
, ctx_link
);
772 ** MI_LIST_DEL_CTX -- remove session from linked list when finished
775 ** ctx -- context structure
778 ** MI_FAILURE/MI_SUCCESS
785 SM_ASSERT(ctx
!= NULL
);
786 if (SM_TAILQ_EMPTY(&WRK_CTX_HEAD
))
789 SM_TAILQ_REMOVE(&WRK_CTX_HEAD
, ctx
, ctx_link
);
792 #endif /* _FFR_WORKERS_POOL */