Moved client_msg_thread() and client_msg_sender_thread() to status.c.
[pwmd.git] / src / status.c
blob416dfe26e49784e2fe84a9ba8113cfb07a23e920
1 /* vim:tw=78:ts=8:sw=4:set ft=c: */
2 /*
3 Copyright (C) 2006-2009 Ben Kibbey <bjk@luxsci.net>
5 This program is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation; either version 2 of the License, or
8 (at your option) any later version.
10 This program is distributed in the hope that it will be useful,
11 but WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 GNU General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with this program; if not, write to the Free Software
17 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02110-1301 USA
19 #include <glib.h>
20 #include "common.h"
21 #include "lock.h"
22 #include "misc.h"
23 #include "status.h"
24 #include "pwmd_error.h"
26 struct status_thread_s {
27 assuan_context_t ctx;
28 gchar *status;
29 const gchar *line;
30 pthread_t tid;
31 gint fd[2];
34 static void cleanup(void *arg)
36 struct status_thread_s *s = arg;
38 close(s->fd[0]);
39 close(s->fd[1]);
40 g_free(s);
43 static void *send_status_thread(void *arg)
45 struct status_thread_s *s = arg;
46 gpg_error_t rc;
48 pthread_cleanup_push(cleanup, s);
49 pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL);
50 rc = assuan_write_status(s->ctx, s->status, s->line);
51 pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL);
52 write(s->fd[1], &rc, sizeof(gpg_error_t));
53 pthread_cleanup_pop(0);
54 return NULL;
57 gpg_error_t send_status(assuan_context_t ctx, status_msg_t which,
58 const gchar *fmt, ...)
60 const gchar *line = NULL;
61 struct client_s *client = assuan_get_pointer(ctx);
62 gchar buf[ASSUAN_LINELENGTH];
63 gchar *status = NULL;
64 va_list ap;
65 gint n;
66 gpg_error_t rc;
67 struct status_thread_s *s;
68 struct timeval tv = {0, 0};
69 gint to = get_key_file_integer("global", "keepalive");
70 fd_set rfds;
71 pthread_attr_t attr;
73 if (fmt) {
74 va_start(ap, fmt);
75 g_vsnprintf(buf, sizeof(buf), fmt, ap);
76 va_end(ap);
77 line = buf;
80 switch (which) {
81 case STATUS_CACHE:
82 CACHE_LOCK(client->ctx);
83 line = print_fmt(buf, sizeof(buf), "%i", cache_file_count());
84 CACHE_UNLOCK;
85 status = "CACHE";
86 break;
87 case STATUS_CLIENTS:
88 MUTEX_LOCK(&cn_mutex);
89 line = print_fmt(buf, sizeof(buf), "%i", g_slist_length(cn_thread_list));
90 MUTEX_UNLOCK(&cn_mutex);
91 status = "CLIENTS";
92 break;
93 case STATUS_CONFIG:
94 status = "CONFIG";
95 break;
96 case STATUS_KEEPALIVE:
97 status = "KEEPALIVE";
98 break;
99 case STATUS_LOCKED:
100 status = "LOCKED";
101 line = N_("Waiting for lock");
102 break;
103 case STATUS_ENCRYPT:
104 status = "ENCRYPT";
105 break;
106 case STATUS_DECRYPT:
107 status = "DECRYPT";
108 break;
109 case STATUS_DECOMPRESS:
110 status = "DECOMPRESS";
111 break;
112 case STATUS_COMPRESS:
113 status = "COMPRESS";
114 break;
117 if (!ctx) {
118 log_write("%s %s", status, line);
119 return 0;
122 s = g_malloc0(sizeof(struct status_thread_s));
124 if (!s) {
125 log_write("%s(%i): %s", __FILE__, __LINE__, strerror(ENOMEM));
126 return gpg_error_from_errno(ENOMEM);
129 s->ctx = ctx;
130 s->status = status;
131 s->line = line;
132 tv.tv_sec = to;
134 if (pipe(s->fd) == -1) {
135 n = errno;
136 g_free(s);
137 return gpg_error_from_errno(n);
140 pthread_cleanup_push(cleanup, s);
141 pthread_attr_init(&attr);
142 pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
143 pthread_create(&s->tid, &attr, send_status_thread, s);
144 pthread_attr_destroy(&attr);
145 FD_ZERO(&rfds);
146 FD_SET(s->fd[0], &rfds);
147 n = s->fd[0] > s->fd[1] ? s->fd[0] : s->fd[1];
148 n = select(n+1, &rfds, NULL, NULL, &tv);
150 if (n <= 0) {
151 n = errno;
152 pthread_cancel(s->tid);
154 if (n) {
155 log_write("%s(%i): %s", __FILE__, __LINE__, strerror(n));
156 rc = gpg_error_from_errno(n);
158 else
159 rc = GPG_ERR_ASS_WRITE_ERROR;
161 return rc;
164 if (FD_ISSET(s->fd[0], &rfds)) {
165 size_t len = read(s->fd[0], &rc, sizeof(gpg_error_t));
167 if (len != sizeof(gpg_error_t)) {
168 log_write("%s(%i): %s", __FILE__, __LINE__, strerror(errno));
169 rc = GPG_ERR_ASS_WRITE_ERROR;
172 else
173 rc = GPG_ERR_ASS_WRITE_ERROR;
175 pthread_cleanup_pop(1);
176 return rc;
179 /* This is needed for FreeBSD systems (and maybe others). I'm guessing here,
180 * but I think it lets the scheduler let other threads do their work. If we
181 * were to do the write() in send_status_all() then write() would return
182 * before the signal was actually sent to client_msg_thread(). This is only
183 * called from send_status_all().
185 static void *write_status_msg(void *arg)
187 struct client_thread_s *cn =arg;
188 size_t len;
190 pthread_cond_signal(&cn->msg_cond);
191 len = write(cn->msg_fd[1], &cn->msg, sizeof(status_msg_t));
193 if (len != sizeof(status_msg_t))
194 log_write("%s(%i): %s", __FILE__, __LINE__, strerror(errno));
196 return NULL;
199 void send_status_all(status_msg_t which)
201 guint i, t;
203 MUTEX_LOCK(&cn_mutex);
205 for (t = g_slist_length(cn_thread_list), i = 0; i < t; i++) {
206 struct client_thread_s *cn = g_slist_nth_data(cn_thread_list, i);
207 pthread_t tid;
209 if (cn->quit)
210 continue;
212 cn->msg = which;
213 pthread_create(&tid, NULL, write_status_msg, cn);
214 pthread_join(tid, NULL);
217 MUTEX_UNLOCK(&cn_mutex);
220 /* The msg_sender_mutex should have been locked before calling this thread
221 * entry point. */
222 void *client_msg_sender_thread(void *arg)
224 struct client_thread_s *thd = arg;
226 for (;;) {
227 struct status_msg_s *msg;
228 gpg_error_t rc;
230 pthread_cond_wait(&thd->msg_sender_cond, &thd->msg_sender_mutex);
231 pthread_testcancel();
233 /* The messages may have been stacked while waiting for send_status()
234 * to return. Send what's in the queue. */
235 for (;;) {
236 msg = g_slist_nth_data(thd->msg_queue, 0);
238 if (!msg)
239 break;
241 /* Unlock to prevent blocking in client_msg_thread(). */
242 MUTEX_UNLOCK(&thd->msg_sender_mutex);
243 rc = send_status(thd->cl->ctx, msg->msg, NULL);
244 MUTEX_LOCK(&thd->msg_sender_mutex);
245 thd->msg_queue = g_slist_remove(thd->msg_queue, msg);
246 g_free(msg);
247 pthread_testcancel();
249 if (rc) {
250 log_write(N_("msg for %i failed: %s"), thd->fd,
251 pwmd_strerror(rc));
252 pthread_cancel(thd->tid);
253 break;
258 return NULL;
262 * This function waits for a signal from send_status_all() then appends a
263 * message read from a pipe to the clients message queue. The actual sending
264 * of the message is done in client_msg_sender_thread() which waits for a
265 * signal from this function. This prevents blocking in assuan_send_status()
266 * when sending to remote clients. This also avoids duplicate status messages.
267 * If an existing status message in the message queue has the same type as the
268 * current message the the current one will be skipped. This avoids flooding
269 * the client with old status messages.
271 void *client_msg_thread(void *arg)
273 struct client_thread_s *thd = arg;
274 pthread_mutex_t mu;
276 pthread_mutex_init(&mu, NULL);
277 pthread_mutex_lock(&mu);
279 for (;;) {
280 fd_set rfds;
281 int n;
282 status_msg_t m;
283 size_t len;
284 struct status_msg_s *msg;
286 pthread_cleanup_push(pthread_mutex_destroy, &mu);
287 pthread_cleanup_push(pthread_mutex_unlock, &mu);
288 pthread_cond_wait(&thd->msg_cond, &mu);
289 pthread_testcancel();
290 FD_ZERO(&rfds);
291 FD_SET(thd->msg_fd[0], &rfds);
292 n = select(thd->msg_fd[0]+1, &rfds, NULL, NULL, NULL);
293 pthread_testcancel();
295 if (n <= 0 || !FD_ISSET(thd->msg_fd[0], &rfds))
296 continue;
298 len = read(thd->msg_fd[0], &m, sizeof(status_msg_t));
299 pthread_testcancel();
301 if (len != sizeof(status_msg_t)) {
302 log_write("%s(%i): %s", __FILE__, __LINE__, strerror(errno));
303 continue;
306 MUTEX_LOCK(&thd->msg_sender_mutex);
308 for (n = 0; n < g_slist_length(thd->msg_queue); n++) {
309 msg = g_slist_nth_data(thd->msg_queue, n);
311 if (msg->msg == m)
312 goto done;
315 msg = g_malloc(sizeof(struct status_msg_s));
317 if (!msg) {
318 log_write("%s(%i): %s", __FILE__, __LINE__, strerror(ENOMEM));
319 MUTEX_UNLOCK(&thd->msg_sender_mutex);
320 continue;
323 msg->msg = m;
324 thd->msg_queue = g_slist_append(thd->msg_queue, msg);
325 done:
326 MUTEX_UNLOCK(&thd->msg_sender_mutex);
327 pthread_cond_signal(&thd->msg_sender_cond);
328 pthread_cleanup_pop(0);
329 pthread_cleanup_pop(0);
332 return NULL;