2 * Asterisk -- An open source telephony toolkit.
4 * Copyright (C) 2007-2008, Dwayne M. Hubbard
6 * Dwayne M. Hubbard <dhubbard@digium.com>
8 * See http://www.asterisk.org for more information about
9 * the Asterisk project. Please do not directly contact
10 * any of the maintainers of this project for assistance;
11 * the project provides a web site, mailing lists and IRC
12 * channels for your use.
14 * This program is free software, distributed under the terms of
15 * the GNU General Public License Version 2. See the LICENSE file
16 * at the top of the source tree.
20 * \brief Maintain a container of uniquely-named taskprocessor threads that can be shared across modules.
22 * \author Dwayne Hubbard <dhubbard@digium.com>
27 ASTERISK_FILE_VERSION(__FILE__
, "$Revision$")
32 #include "asterisk/_private.h"
33 #include "asterisk/module.h"
34 #include "asterisk/time.h"
35 #include "asterisk/astobj2.h"
36 #include "asterisk/cli.h"
37 #include "asterisk/taskprocessor.h"
40 /*! \brief tps_task structure is queued to a taskprocessor
42 * tps_tasks are processed in FIFO order and freed by the taskprocessing
43 * thread after the task handler returns. The callback function that is assigned
44 * to the execute() function pointer is responsible for releasing datap resources if necessary. */
46 /*! \brief The execute() task callback function pointer */
47 int (*execute
)(void *datap
);
48 /*! \brief The data pointer for the task execute() function */
50 /*! \brief AST_LIST_ENTRY overhead */
51 AST_LIST_ENTRY(tps_task
) list
;
54 /*! \brief tps_taskprocessor_stats maintain statistics for a taskprocessor. */
55 struct tps_taskprocessor_stats
{
56 /*! \brief This is the maximum number of tasks queued at any one time */
57 unsigned long max_qsize
;
58 /*! \brief This is the current number of tasks processed */
59 unsigned long _tasks_processed_count
;
62 /*! \brief A ast_taskprocessor structure is a singleton by name */
63 struct ast_taskprocessor
{
64 /*! \brief Friendly name of the taskprocessor */
66 /*! \brief Thread poll condition */
68 /*! \brief Taskprocessor thread */
69 pthread_t poll_thread
;
70 /*! \brief Taskprocessor lock */
71 ast_mutex_t taskprocessor_lock
;
72 /*! \brief Taskprocesor thread run flag */
73 unsigned char poll_thread_run
;
74 /*! \brief Taskprocessor statistics */
75 struct tps_taskprocessor_stats
*stats
;
76 /*! \brief Taskprocessor current queue size */
78 /*! \brief Taskprocessor queue */
79 AST_LIST_HEAD_NOLOCK(tps_queue
, tps_task
) tps_queue
;
80 /*! \brief Taskprocessor singleton list entry */
81 AST_LIST_ENTRY(ast_taskprocessor
) list
;
83 #define TPS_MAX_BUCKETS 7
84 /*! \brief tps_singletons is the astobj2 container for taskprocessor singletons */
85 static struct ao2_container
*tps_singletons
;
87 /*! \brief CLI 'taskprocessor ping <blah>' operation requires a ping condition */
88 static ast_cond_t cli_ping_cond
;
90 /*! \brief CLI 'taskprocessor ping <blah>' operation requires a ping condition lock */
91 AST_MUTEX_DEFINE_STATIC(cli_ping_cond_lock
);
93 /*! \brief The astobj2 hash callback for taskprocessors */
94 static int tps_hash_cb(const void *obj
, const int flags
);
95 /*! \brief The astobj2 compare callback for taskprocessors */
96 static int tps_cmp_cb(void *obj
, void *arg
, int flags
);
98 /*! \brief The task processing function executed by a taskprocessor */
99 static void *tps_processing_function(void *data
);
101 /*! \brief Destroy the taskprocessor when its refcount reaches zero */
102 static void tps_taskprocessor_destroy(void *tps
);
104 /*! \brief CLI 'taskprocessor ping <blah>' handler function */
105 static int tps_ping_handler(void *datap
);
107 /*! \brief Remove the front task off the taskprocessor queue */
108 static struct tps_task
*tps_taskprocessor_pop(struct ast_taskprocessor
*tps
);
110 /*! \brief Return the size of the taskprocessor queue */
111 static int tps_taskprocessor_depth(struct ast_taskprocessor
*tps
);
113 static char *cli_tps_ping(struct ast_cli_entry
*e
, int cmd
, struct ast_cli_args
*a
);
114 static char *cli_tps_report(struct ast_cli_entry
*e
, int cmd
, struct ast_cli_args
*a
);
116 static struct ast_cli_entry taskprocessor_clis
[] = {
117 AST_CLI_DEFINE(cli_tps_ping
, "Ping a named task processors"),
118 AST_CLI_DEFINE(cli_tps_report
, "List instantiated task processors and statistics"),
121 /* initialize the taskprocessor container and register CLI operations */
122 int ast_tps_init(void)
124 if (!(tps_singletons
= ao2_container_alloc(TPS_MAX_BUCKETS
, tps_hash_cb
, tps_cmp_cb
))) {
125 ast_log(LOG_ERROR
, "taskprocessor container failed to initialize!\n");
129 ast_cond_init(&cli_ping_cond
, NULL
);
131 ast_cli_register_multiple(taskprocessor_clis
, ARRAY_LEN(taskprocessor_clis
));
135 /* allocate resources for the task */
136 static struct tps_task
*tps_task_alloc(int (*task_exe
)(void *datap
), void *datap
)
139 if ((t
= ast_calloc(1, sizeof(*t
)))) {
140 t
->execute
= task_exe
;
146 /* release task resources */
147 static void *tps_task_free(struct tps_task
*task
)
155 /* taskprocessor tab completion */
156 static char *tps_taskprocessor_tab_complete(struct ast_taskprocessor
*p
, struct ast_cli_args
*a
)
161 struct ao2_iterator i
;
166 tklen
= strlen(a
->word
);
167 i
= ao2_iterator_init(tps_singletons
, 0);
168 while ((p
= ao2_iterator_next(&i
))) {
169 if (!strncasecmp(a
->word
, p
->name
, tklen
) && ++wordnum
> a
->n
) {
170 name
= ast_strdup(p
->name
);
179 /* ping task handling function */
180 static int tps_ping_handler(void *datap
)
182 ast_mutex_lock(&cli_ping_cond_lock
);
183 ast_cond_signal(&cli_ping_cond
);
184 ast_mutex_unlock(&cli_ping_cond_lock
);
188 /* ping the specified taskprocessor and display the ping time on the CLI */
189 static char *cli_tps_ping(struct ast_cli_entry
*e
, int cmd
, struct ast_cli_args
*a
)
191 struct timeval begin
, end
, delta
;
195 struct ast_taskprocessor
*tps
= NULL
;
199 e
->command
= "taskprocessor ping";
201 "Usage: taskprocessor ping <taskprocessor>\n"
202 " Displays the time required for a processor to deliver a task\n";
205 return tps_taskprocessor_tab_complete(tps
, a
);
209 return CLI_SHOWUSAGE
;
212 if (!(tps
= ast_taskprocessor_get(name
, TPS_REF_IF_EXISTS
))) {
213 ast_cli(a
->fd
, "\nping failed: %s not found\n\n", name
);
216 ast_cli(a
->fd
, "\npinging %s ...", name
);
217 tv
= ast_tvadd((begin
= ast_tvnow()), ast_samp2tv(1000, 1000));
218 ts
.tv_sec
= tv
.tv_sec
;
219 ts
.tv_nsec
= tv
.tv_usec
* 1000;
220 ast_mutex_lock(&cli_ping_cond_lock
);
221 if (ast_taskprocessor_push(tps
, tps_ping_handler
, 0) < 0) {
222 ast_cli(a
->fd
, "\nping failed: could not push task to %s\n\n", name
);
226 ast_cond_timedwait(&cli_ping_cond
, &cli_ping_cond_lock
, &ts
);
227 ast_mutex_unlock(&cli_ping_cond_lock
);
229 delta
= ast_tvsub(end
, begin
);
230 ast_cli(a
->fd
, "\n\t%24s ping time: %.1ld.%.6ld sec\n\n", name
, (long)delta
.tv_sec
, (long int)delta
.tv_usec
);
235 /* TPS reports are cool */
236 static char *cli_tps_report(struct ast_cli_entry
*e
, int cmd
, struct ast_cli_args
*a
)
241 unsigned long maxqsize
;
242 unsigned long processed
;
243 struct ast_taskprocessor
*p
;
244 struct ao2_iterator i
;
248 e
->command
= "taskprocessor show stats";
250 "Usage: taskprocessor show stats\n"
251 " Shows a list of instantiated task processors and their statistics\n";
257 if (a
->argc
!= e
->args
)
258 return CLI_SHOWUSAGE
;
260 ast_cli(a
->fd
, "\n\t+----- Processor -----+--- Processed ---+- In Queue -+- Max Depth -+");
261 i
= ao2_iterator_init(tps_singletons
, 0);
262 while ((p
= ao2_iterator_next(&i
))) {
263 ast_copy_string(name
, p
->name
, sizeof(name
));
264 qsize
= p
->tps_queue_size
;
265 maxqsize
= p
->stats
->max_qsize
;
266 processed
= p
->stats
->_tasks_processed_count
;
267 ast_cli(a
->fd
, "\n%24s %17ld %12ld %12ld", name
, processed
, qsize
, maxqsize
);
270 tcount
= ao2_container_count(tps_singletons
);
271 ast_cli(a
->fd
, "\n\t+---------------------+-----------------+------------+-------------+\n\t%d taskprocessors\n\n", tcount
);
275 /* this is the task processing worker function */
276 static void *tps_processing_function(void *data
)
278 struct ast_taskprocessor
*i
= data
;
283 ast_log(LOG_ERROR
, "cannot start thread_function loop without a ast_taskprocessor structure.\n");
287 while (i
->poll_thread_run
) {
288 ast_mutex_lock(&i
->taskprocessor_lock
);
289 if (!i
->poll_thread_run
) {
290 ast_mutex_unlock(&i
->taskprocessor_lock
);
293 if (!(size
= tps_taskprocessor_depth(i
))) {
294 ast_cond_wait(&i
->poll_cond
, &i
->taskprocessor_lock
);
295 if (!i
->poll_thread_run
) {
296 ast_mutex_unlock(&i
->taskprocessor_lock
);
300 ast_mutex_unlock(&i
->taskprocessor_lock
);
301 /* stuff is in the queue */
302 if (!(t
= tps_taskprocessor_pop(i
))) {
303 ast_log(LOG_ERROR
, "Wtf?? %d tasks in the queue, but we're popping blanks!\n", size
);
307 ast_log(LOG_WARNING
, "Task is missing a function to execute!\n");
311 t
->execute(t
->datap
);
313 ast_mutex_lock(&i
->taskprocessor_lock
);
315 i
->stats
->_tasks_processed_count
++;
316 if (size
> i
->stats
->max_qsize
) {
317 i
->stats
->max_qsize
= size
;
320 ast_mutex_unlock(&i
->taskprocessor_lock
);
324 while ((t
= tps_taskprocessor_pop(i
))) {
330 /* hash callback for astobj2 */
331 static int tps_hash_cb(const void *obj
, const int flags
)
333 const struct ast_taskprocessor
*tps
= obj
;
335 return ast_str_hash(tps
->name
);
338 /* compare callback for astobj2 */
339 static int tps_cmp_cb(void *obj
, void *arg
, int flags
)
341 struct ast_taskprocessor
*lhs
= obj
, *rhs
= arg
;
343 return !strcasecmp(lhs
->name
, rhs
->name
) ? CMP_MATCH
: 0;
346 /* destroy the taskprocessor */
347 static void tps_taskprocessor_destroy(void *tps
)
349 struct ast_taskprocessor
*t
= tps
;
352 ast_log(LOG_ERROR
, "missing taskprocessor\n");
355 ast_log(LOG_DEBUG
, "destroying taskprocessor '%s'\n", t
->name
);
357 ast_mutex_lock(&t
->taskprocessor_lock
);
358 t
->poll_thread_run
= 0;
359 ast_cond_signal(&t
->poll_cond
);
360 ast_mutex_unlock(&t
->taskprocessor_lock
);
361 pthread_join(t
->poll_thread
, NULL
);
362 t
->poll_thread
= AST_PTHREADT_NULL
;
363 ast_mutex_destroy(&t
->taskprocessor_lock
);
364 ast_cond_destroy(&t
->poll_cond
);
373 /* pop the front task and return it */
374 static struct tps_task
*tps_taskprocessor_pop(struct ast_taskprocessor
*tps
)
376 struct tps_task
*task
;
379 ast_log(LOG_ERROR
, "missing taskprocessor\n");
382 ast_mutex_lock(&tps
->taskprocessor_lock
);
383 if ((task
= AST_LIST_REMOVE_HEAD(&tps
->tps_queue
, list
))) {
384 tps
->tps_queue_size
--;
386 ast_mutex_unlock(&tps
->taskprocessor_lock
);
390 static int tps_taskprocessor_depth(struct ast_taskprocessor
*tps
)
392 return (tps
) ? tps
->tps_queue_size
: -1;
395 /* taskprocessor name accessor */
396 const char *ast_taskprocessor_name(struct ast_taskprocessor
*tps
)
399 ast_log(LOG_ERROR
, "no taskprocessor specified!\n");
405 /* Provide a reference to a taskprocessor. Create the taskprocessor if necessary, but don't
406 * create the taskprocessor if we were told via ast_tps_options to return a reference only
407 * if it already exists */
408 struct ast_taskprocessor
*ast_taskprocessor_get(char *name
, enum ast_tps_options create
)
410 struct ast_taskprocessor
*p
, tmp_tps
= {
414 if (ast_strlen_zero(name
)) {
415 ast_log(LOG_ERROR
, "requesting a nameless taskprocessor!!!\n");
418 ao2_lock(tps_singletons
);
419 p
= ao2_find(tps_singletons
, &tmp_tps
, OBJ_POINTER
);
421 ao2_unlock(tps_singletons
);
424 if (create
& TPS_REF_IF_EXISTS
) {
425 /* calling function does not want a new taskprocessor to be created if it doesn't already exist */
426 ao2_unlock(tps_singletons
);
429 /* create a new taskprocessor */
430 if (!(p
= ao2_alloc(sizeof(*p
), tps_taskprocessor_destroy
))) {
431 ao2_unlock(tps_singletons
);
432 ast_log(LOG_WARNING
, "failed to create taskprocessor '%s'\n", name
);
436 ast_cond_init(&p
->poll_cond
, NULL
);
437 ast_mutex_init(&p
->taskprocessor_lock
);
439 if (!(p
->stats
= ast_calloc(1, sizeof(*p
->stats
)))) {
440 ao2_unlock(tps_singletons
);
441 ast_log(LOG_WARNING
, "failed to create taskprocessor stats for '%s'\n", name
);
445 if (!(p
->name
= ast_strdup(name
))) {
446 ao2_unlock(tps_singletons
);
450 p
->poll_thread_run
= 1;
451 p
->poll_thread
= AST_PTHREADT_NULL
;
452 if (ast_pthread_create(&p
->poll_thread
, NULL
, tps_processing_function
, p
) < 0) {
453 ao2_unlock(tps_singletons
);
454 ast_log(LOG_ERROR
, "Taskprocessor '%s' failed to create the processing thread.\n", p
->name
);
458 if (!(ao2_link(tps_singletons
, p
))) {
459 ao2_unlock(tps_singletons
);
460 ast_log(LOG_ERROR
, "Failed to add taskprocessor '%s' to container\n", p
->name
);
464 ao2_unlock(tps_singletons
);
468 /* decrement the taskprocessor reference count and unlink from the container if necessary */
469 void *ast_taskprocessor_unreference(struct ast_taskprocessor
*tps
)
472 ao2_lock(tps_singletons
);
473 ao2_unlink(tps_singletons
, tps
);
474 if (ao2_ref(tps
, -1) > 1) {
475 ao2_link(tps_singletons
, tps
);
477 ao2_unlock(tps_singletons
);
482 /* push the task into the taskprocessor queue */
483 int ast_taskprocessor_push(struct ast_taskprocessor
*tps
, int (*task_exe
)(void *datap
), void *datap
)
487 if (!tps
|| !task_exe
) {
488 ast_log(LOG_ERROR
, "%s is missing!!\n", (tps
) ? "task callback" : "taskprocessor");
491 if (!(t
= tps_task_alloc(task_exe
, datap
))) {
492 ast_log(LOG_ERROR
, "failed to allocate task! Can't push to '%s'\n", tps
->name
);
495 ast_mutex_lock(&tps
->taskprocessor_lock
);
496 AST_LIST_INSERT_TAIL(&tps
->tps_queue
, t
, list
);
497 tps
->tps_queue_size
++;
498 ast_cond_signal(&tps
->poll_cond
);
499 ast_mutex_unlock(&tps
->taskprocessor_lock
);