1 /* vim:tw=78:ts=8:sw=4:set ft=c: */
3 Copyright (C) 2006-2009 Ben Kibbey <bjk@luxsci.net>
5 This program is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation; either version 2 of the License, or
8 (at your option) any later version.
10 This program is distributed in the hope that it will be useful,
11 but WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 GNU General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with this program; if not, write to the Free Software
17 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02110-1301 USA
24 #include "pwmd_error.h"
26 struct status_thread_s
{
34 static void cleanup(void *arg
)
36 struct status_thread_s
*s
= arg
;
43 static void *send_status_thread(void *arg
)
45 struct status_thread_s
*s
= arg
;
48 pthread_cleanup_push(cleanup
, s
);
49 pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS
, NULL
);
50 rc
= assuan_write_status(s
->ctx
, s
->status
, s
->line
);
51 pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED
, NULL
);
52 write(s
->fd
[1], &rc
, sizeof(gpg_error_t
));
53 pthread_cleanup_pop(0);
57 gpg_error_t
send_status(assuan_context_t ctx
, status_msg_t which
,
58 const gchar
*fmt
, ...)
60 const gchar
*line
= NULL
;
61 struct client_s
*client
= assuan_get_pointer(ctx
);
62 gchar buf
[ASSUAN_LINELENGTH
];
67 struct status_thread_s
*s
;
68 struct timeval tv
= {0, 0};
69 gint to
= get_key_file_integer("global", "keepalive");
75 g_vsnprintf(buf
, sizeof(buf
), fmt
, ap
);
82 CACHE_LOCK(client
->ctx
);
83 line
= print_fmt(buf
, sizeof(buf
), "%i", cache_file_count());
88 MUTEX_LOCK(&cn_mutex
);
89 line
= print_fmt(buf
, sizeof(buf
), "%i", g_slist_length(cn_thread_list
));
90 MUTEX_UNLOCK(&cn_mutex
);
96 case STATUS_KEEPALIVE
:
101 line
= N_("Waiting for lock");
109 case STATUS_DECOMPRESS
:
110 status
= "DECOMPRESS";
112 case STATUS_COMPRESS
:
118 log_write("%s %s", status
, line
);
122 s
= g_malloc0(sizeof(struct status_thread_s
));
125 log_write("%s(%i): %s", __FILE__
, __LINE__
, strerror(ENOMEM
));
126 return gpg_error_from_errno(ENOMEM
);
134 if (pipe(s
->fd
) == -1) {
137 return gpg_error_from_errno(n
);
140 pthread_cleanup_push(cleanup
, s
);
141 pthread_attr_init(&attr
);
142 pthread_attr_setdetachstate(&attr
, PTHREAD_CREATE_DETACHED
);
143 pthread_create(&s
->tid
, &attr
, send_status_thread
, s
);
144 pthread_attr_destroy(&attr
);
146 FD_SET(s
->fd
[0], &rfds
);
147 n
= s
->fd
[0] > s
->fd
[1] ? s
->fd
[0] : s
->fd
[1];
148 n
= select(n
+1, &rfds
, NULL
, NULL
, &tv
);
152 pthread_cancel(s
->tid
);
155 log_write("%s(%i): %s", __FILE__
, __LINE__
, strerror(n
));
156 rc
= gpg_error_from_errno(n
);
159 rc
= GPG_ERR_ASS_WRITE_ERROR
;
164 if (FD_ISSET(s
->fd
[0], &rfds
)) {
165 size_t len
= read(s
->fd
[0], &rc
, sizeof(gpg_error_t
));
167 if (len
!= sizeof(gpg_error_t
)) {
168 log_write("%s(%i): %s", __FILE__
, __LINE__
, strerror(errno
));
169 rc
= GPG_ERR_ASS_WRITE_ERROR
;
173 rc
= GPG_ERR_ASS_WRITE_ERROR
;
175 pthread_cleanup_pop(1);
179 /* This is needed for FreeBSD systems (and maybe others). I'm guessing here,
180 * but I think it lets the scheduler let other threads do their work. If we
181 * were to do the write() in send_status_all() then write() would return
182 * before the signal was actually sent to client_msg_thread(). This is only
183 * called from send_status_all().
185 static void *write_status_msg(void *arg
)
187 struct client_thread_s
*cn
=arg
;
190 pthread_cond_signal(&cn
->msg_cond
);
191 len
= write(cn
->msg_fd
[1], &cn
->msg
, sizeof(status_msg_t
));
193 if (len
!= sizeof(status_msg_t
))
194 log_write("%s(%i): %s", __FILE__
, __LINE__
, strerror(errno
));
199 void send_status_all(status_msg_t which
)
203 MUTEX_LOCK(&cn_mutex
);
205 for (t
= g_slist_length(cn_thread_list
), i
= 0; i
< t
; i
++) {
206 struct client_thread_s
*cn
= g_slist_nth_data(cn_thread_list
, i
);
213 pthread_create(&tid
, NULL
, write_status_msg
, cn
);
214 pthread_join(tid
, NULL
);
217 MUTEX_UNLOCK(&cn_mutex
);
220 /* The msg_sender_mutex should have been locked before calling this thread
222 void *client_msg_sender_thread(void *arg
)
224 struct client_thread_s
*thd
= arg
;
227 struct status_msg_s
*msg
;
230 pthread_cond_wait(&thd
->msg_sender_cond
, &thd
->msg_sender_mutex
);
231 pthread_testcancel();
233 /* The messages may have been stacked while waiting for send_status()
234 * to return. Send what's in the queue. */
236 msg
= g_slist_nth_data(thd
->msg_queue
, 0);
241 /* Unlock to prevent blocking in client_msg_thread(). */
242 MUTEX_UNLOCK(&thd
->msg_sender_mutex
);
243 rc
= send_status(thd
->cl
->ctx
, msg
->msg
, NULL
);
244 MUTEX_LOCK(&thd
->msg_sender_mutex
);
245 thd
->msg_queue
= g_slist_remove(thd
->msg_queue
, msg
);
247 pthread_testcancel();
250 log_write(N_("msg for %i failed: %s"), thd
->fd
,
252 pthread_cancel(thd
->tid
);
262 * This function waits for a signal from send_status_all() then appends a
263 * message read from a pipe to the clients message queue. The actual sending
264 * of the message is done in client_msg_sender_thread() which waits for a
265 * signal from this function. This prevents blocking in assuan_send_status()
266 * when sending to remote clients. This also avoids duplicate status messages.
267 * If an existing status message in the message queue has the same type as the
268 * current message the the current one will be skipped. This avoids flooding
269 * the client with old status messages.
271 void *client_msg_thread(void *arg
)
273 struct client_thread_s
*thd
= arg
;
276 pthread_mutex_init(&mu
, NULL
);
277 pthread_mutex_lock(&mu
);
284 struct status_msg_s
*msg
;
286 pthread_cleanup_push(pthread_mutex_destroy
, &mu
);
287 pthread_cleanup_push(pthread_mutex_unlock
, &mu
);
288 pthread_cond_wait(&thd
->msg_cond
, &mu
);
289 pthread_testcancel();
291 FD_SET(thd
->msg_fd
[0], &rfds
);
292 n
= select(thd
->msg_fd
[0]+1, &rfds
, NULL
, NULL
, NULL
);
293 pthread_testcancel();
295 if (n
<= 0 || !FD_ISSET(thd
->msg_fd
[0], &rfds
))
298 len
= read(thd
->msg_fd
[0], &m
, sizeof(status_msg_t
));
299 pthread_testcancel();
301 if (len
!= sizeof(status_msg_t
)) {
302 log_write("%s(%i): %s", __FILE__
, __LINE__
, strerror(errno
));
306 MUTEX_LOCK(&thd
->msg_sender_mutex
);
308 for (n
= 0; n
< g_slist_length(thd
->msg_queue
); n
++) {
309 msg
= g_slist_nth_data(thd
->msg_queue
, n
);
315 msg
= g_malloc(sizeof(struct status_msg_s
));
318 log_write("%s(%i): %s", __FILE__
, __LINE__
, strerror(ENOMEM
));
319 MUTEX_UNLOCK(&thd
->msg_sender_mutex
);
324 thd
->msg_queue
= g_slist_append(thd
->msg_queue
, msg
);
326 MUTEX_UNLOCK(&thd
->msg_sender_mutex
);
327 pthread_cond_signal(&thd
->msg_sender_cond
);
328 pthread_cleanup_pop(0);
329 pthread_cleanup_pop(0);