From f5750bd6619ecb73babb6cb025b2548e8fa1dd03 Mon Sep 17 00:00:00 2001 From: Ben Kibbey Date: Sat, 7 Feb 2009 09:33:34 -0500 Subject: [PATCH] Moved client_msg_thread() and client_msg_sender_thread() to status.c. --- src/pwmd.c | 115 ---------------------------------------------------------- src/status.c | 116 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ src/status.h | 2 ++ 3 files changed, 118 insertions(+), 115 deletions(-) diff --git a/src/pwmd.c b/src/pwmd.c index b1a336a0..e2fb3e01 100644 --- a/src/pwmd.c +++ b/src/pwmd.c @@ -502,121 +502,6 @@ static void cleanup_cb(void *arg) send_status_all(STATUS_CLIENTS); } -/* The msg_sender_mutex should have been locked before calling this thread - * entry point. */ -static void *client_msg_sender_thread(void *arg) -{ - struct client_thread_s *thd = arg; - - for (;;) { - struct status_msg_s *msg; - gpg_error_t rc; - - pthread_cond_wait(&thd->msg_sender_cond, &thd->msg_sender_mutex); - pthread_testcancel(); - - /* The messages may have been stacked while waiting for send_status() - * to return. Send what's in the queue. */ - for (;;) { - msg = g_slist_nth_data(thd->msg_queue, 0); - - if (!msg) - break; - - /* Unlock to prevent blocking in client_msg_thread(). */ - MUTEX_UNLOCK(&thd->msg_sender_mutex); - rc = send_status(thd->cl->ctx, msg->msg, NULL); - MUTEX_LOCK(&thd->msg_sender_mutex); - thd->msg_queue = g_slist_remove(thd->msg_queue, msg); - g_free(msg); - pthread_testcancel(); - - if (rc) { - log_write(N_("msg for %i failed: %s"), thd->fd, - pwmd_strerror(rc)); - pthread_cancel(thd->tid); - break; - } - } - } - - return NULL; -} - -/* - * This function waits for a signal from send_status_all() then appends a - * message read from a pipe to the clients message queue. The actual sending - * of the message is done in client_msg_sender_thread() which waits for a - * signal from this function. This prevents blocking in assuan_send_status() - * when sending to remote clients. This also avoids duplicate status messages. - * If an existing status message in the message queue has the same type as the - * current message the the current one will be skipped. This avoids flooding - * the client with old status messages. - */ -static void *client_msg_thread(void *arg) -{ - struct client_thread_s *thd = arg; - pthread_mutex_t mu; - - pthread_mutex_init(&mu, NULL); - pthread_mutex_lock(&mu); - - for (;;) { - fd_set rfds; - int n; - status_msg_t m; - size_t len; - struct status_msg_s *msg; - - pthread_cleanup_push(pthread_mutex_destroy, &mu); - pthread_cleanup_push(pthread_mutex_unlock, &mu); - pthread_cond_wait(&thd->msg_cond, &mu); - pthread_testcancel(); - FD_ZERO(&rfds); - FD_SET(thd->msg_fd[0], &rfds); - n = select(thd->msg_fd[0]+1, &rfds, NULL, NULL, NULL); - pthread_testcancel(); - - if (n <= 0 || !FD_ISSET(thd->msg_fd[0], &rfds)) - continue; - - len = read(thd->msg_fd[0], &m, sizeof(status_msg_t)); - pthread_testcancel(); - - if (len != sizeof(status_msg_t)) { - log_write("%s(%i): %s", __FILE__, __LINE__, strerror(errno)); - continue; - } - - MUTEX_LOCK(&thd->msg_sender_mutex); - - for (n = 0; n < g_slist_length(thd->msg_queue); n++) { - msg = g_slist_nth_data(thd->msg_queue, n); - - if (msg->msg == m) - goto done; - } - - msg = g_malloc(sizeof(struct status_msg_s)); - - if (!msg) { - log_write("%s(%i): %s", __FILE__, __LINE__, strerror(ENOMEM)); - MUTEX_UNLOCK(&thd->msg_sender_mutex); - continue; - } - - msg->msg = m; - thd->msg_queue = g_slist_append(thd->msg_queue, msg); -done: - MUTEX_UNLOCK(&thd->msg_sender_mutex); - pthread_cond_signal(&thd->msg_sender_cond); - pthread_cleanup_pop(0); - pthread_cleanup_pop(0); - } - - return NULL; -} - /* * Called every time a connection is made from init_new_connection(). This is * the thread entry point. diff --git a/src/status.c b/src/status.c index 752298d9..416dfe26 100644 --- a/src/status.c +++ b/src/status.c @@ -21,6 +21,7 @@ #include "lock.h" #include "misc.h" #include "status.h" +#include "pwmd_error.h" struct status_thread_s { assuan_context_t ctx; @@ -215,3 +216,118 @@ void send_status_all(status_msg_t which) MUTEX_UNLOCK(&cn_mutex); } + +/* The msg_sender_mutex should have been locked before calling this thread + * entry point. */ +void *client_msg_sender_thread(void *arg) +{ + struct client_thread_s *thd = arg; + + for (;;) { + struct status_msg_s *msg; + gpg_error_t rc; + + pthread_cond_wait(&thd->msg_sender_cond, &thd->msg_sender_mutex); + pthread_testcancel(); + + /* The messages may have been stacked while waiting for send_status() + * to return. Send what's in the queue. */ + for (;;) { + msg = g_slist_nth_data(thd->msg_queue, 0); + + if (!msg) + break; + + /* Unlock to prevent blocking in client_msg_thread(). */ + MUTEX_UNLOCK(&thd->msg_sender_mutex); + rc = send_status(thd->cl->ctx, msg->msg, NULL); + MUTEX_LOCK(&thd->msg_sender_mutex); + thd->msg_queue = g_slist_remove(thd->msg_queue, msg); + g_free(msg); + pthread_testcancel(); + + if (rc) { + log_write(N_("msg for %i failed: %s"), thd->fd, + pwmd_strerror(rc)); + pthread_cancel(thd->tid); + break; + } + } + } + + return NULL; +} + +/* + * This function waits for a signal from send_status_all() then appends a + * message read from a pipe to the clients message queue. The actual sending + * of the message is done in client_msg_sender_thread() which waits for a + * signal from this function. This prevents blocking in assuan_send_status() + * when sending to remote clients. This also avoids duplicate status messages. + * If an existing status message in the message queue has the same type as the + * current message the the current one will be skipped. This avoids flooding + * the client with old status messages. + */ +void *client_msg_thread(void *arg) +{ + struct client_thread_s *thd = arg; + pthread_mutex_t mu; + + pthread_mutex_init(&mu, NULL); + pthread_mutex_lock(&mu); + + for (;;) { + fd_set rfds; + int n; + status_msg_t m; + size_t len; + struct status_msg_s *msg; + + pthread_cleanup_push(pthread_mutex_destroy, &mu); + pthread_cleanup_push(pthread_mutex_unlock, &mu); + pthread_cond_wait(&thd->msg_cond, &mu); + pthread_testcancel(); + FD_ZERO(&rfds); + FD_SET(thd->msg_fd[0], &rfds); + n = select(thd->msg_fd[0]+1, &rfds, NULL, NULL, NULL); + pthread_testcancel(); + + if (n <= 0 || !FD_ISSET(thd->msg_fd[0], &rfds)) + continue; + + len = read(thd->msg_fd[0], &m, sizeof(status_msg_t)); + pthread_testcancel(); + + if (len != sizeof(status_msg_t)) { + log_write("%s(%i): %s", __FILE__, __LINE__, strerror(errno)); + continue; + } + + MUTEX_LOCK(&thd->msg_sender_mutex); + + for (n = 0; n < g_slist_length(thd->msg_queue); n++) { + msg = g_slist_nth_data(thd->msg_queue, n); + + if (msg->msg == m) + goto done; + } + + msg = g_malloc(sizeof(struct status_msg_s)); + + if (!msg) { + log_write("%s(%i): %s", __FILE__, __LINE__, strerror(ENOMEM)); + MUTEX_UNLOCK(&thd->msg_sender_mutex); + continue; + } + + msg->msg = m; + thd->msg_queue = g_slist_append(thd->msg_queue, msg); +done: + MUTEX_UNLOCK(&thd->msg_sender_mutex); + pthread_cond_signal(&thd->msg_sender_cond); + pthread_cleanup_pop(0); + pthread_cleanup_pop(0); + } + + return NULL; +} diff --git a/src/status.h b/src/status.h index 3e6d95d1..e9724e0e 100644 --- a/src/status.h +++ b/src/status.h @@ -37,5 +37,7 @@ typedef enum { gpg_error_t send_status(assuan_context_t ctx, status_msg_t which, const gchar *fmt, ...); void send_status_all(status_msg_t which); +void *client_msg_thread(void *arg); +void *client_msg_sender_thread(void *arg); #endif -- 2.11.4.GIT