range-diff: avoid segfault with -I
[git/debian.git] / compat / simple-ipc / ipc-unix-socket.c
blob1927e6ef4bca8eb7f7a6827078e4c56cd5d40a0f
1 #include "cache.h"
2 #include "simple-ipc.h"
3 #include "strbuf.h"
4 #include "pkt-line.h"
5 #include "thread-utils.h"
6 #include "unix-socket.h"
7 #include "unix-stream-server.h"
9 #ifndef SUPPORTS_SIMPLE_IPC
11 * This source file should only be compiled when Simple IPC is supported.
12 * See the top-level Makefile.
14 #error SUPPORTS_SIMPLE_IPC not defined
15 #endif
17 enum ipc_active_state ipc_get_active_state(const char *path)
19 enum ipc_active_state state = IPC_STATE__OTHER_ERROR;
20 struct ipc_client_connect_options options
21 = IPC_CLIENT_CONNECT_OPTIONS_INIT;
22 struct stat st;
23 struct ipc_client_connection *connection_test = NULL;
25 options.wait_if_busy = 0;
26 options.wait_if_not_found = 0;
28 if (lstat(path, &st) == -1) {
29 switch (errno) {
30 case ENOENT:
31 case ENOTDIR:
32 return IPC_STATE__NOT_LISTENING;
33 default:
34 return IPC_STATE__INVALID_PATH;
38 /* also complain if a plain file is in the way */
39 if ((st.st_mode & S_IFMT) != S_IFSOCK)
40 return IPC_STATE__INVALID_PATH;
43 * Just because the filesystem has a S_IFSOCK type inode
44 * at `path`, doesn't mean it that there is a server listening.
45 * Ping it to be sure.
47 state = ipc_client_try_connect(path, &options, &connection_test);
48 ipc_client_close_connection(connection_test);
50 return state;
54 * Retry frequency when trying to connect to a server.
56 * This value should be short enough that we don't seriously delay our
57 * caller, but not fast enough that our spinning puts pressure on the
58 * system.
60 #define WAIT_STEP_MS (50)
63 * Try to connect to the server. If the server is just starting up or
64 * is very busy, we may not get a connection the first time.
66 static enum ipc_active_state connect_to_server(
67 const char *path,
68 int timeout_ms,
69 const struct ipc_client_connect_options *options,
70 int *pfd)
72 int k;
74 *pfd = -1;
76 for (k = 0; k < timeout_ms; k += WAIT_STEP_MS) {
77 int fd = unix_stream_connect(path, options->uds_disallow_chdir);
79 if (fd != -1) {
80 *pfd = fd;
81 return IPC_STATE__LISTENING;
84 if (errno == ENOENT) {
85 if (!options->wait_if_not_found)
86 return IPC_STATE__PATH_NOT_FOUND;
88 goto sleep_and_try_again;
91 if (errno == ETIMEDOUT) {
92 if (!options->wait_if_busy)
93 return IPC_STATE__NOT_LISTENING;
95 goto sleep_and_try_again;
98 if (errno == ECONNREFUSED) {
99 if (!options->wait_if_busy)
100 return IPC_STATE__NOT_LISTENING;
102 goto sleep_and_try_again;
105 return IPC_STATE__OTHER_ERROR;
107 sleep_and_try_again:
108 sleep_millisec(WAIT_STEP_MS);
111 return IPC_STATE__NOT_LISTENING;
115 * The total amount of time that we are willing to wait when trying to
116 * connect to a server.
118 * When the server is first started, it might take a little while for
119 * it to become ready to service requests. Likewise, the server may
120 * be very (temporarily) busy and not respond to our connections.
122 * We should gracefully and silently handle those conditions and try
123 * again for a reasonable time period.
125 * The value chosen here should be long enough for the server
126 * to reliably heal from the above conditions.
128 #define MY_CONNECTION_TIMEOUT_MS (1000)
130 enum ipc_active_state ipc_client_try_connect(
131 const char *path,
132 const struct ipc_client_connect_options *options,
133 struct ipc_client_connection **p_connection)
135 enum ipc_active_state state = IPC_STATE__OTHER_ERROR;
136 int fd = -1;
138 *p_connection = NULL;
140 trace2_region_enter("ipc-client", "try-connect", NULL);
141 trace2_data_string("ipc-client", NULL, "try-connect/path", path);
143 state = connect_to_server(path, MY_CONNECTION_TIMEOUT_MS,
144 options, &fd);
146 trace2_data_intmax("ipc-client", NULL, "try-connect/state",
147 (intmax_t)state);
148 trace2_region_leave("ipc-client", "try-connect", NULL);
150 if (state == IPC_STATE__LISTENING) {
151 (*p_connection) = xcalloc(1, sizeof(struct ipc_client_connection));
152 (*p_connection)->fd = fd;
155 return state;
158 void ipc_client_close_connection(struct ipc_client_connection *connection)
160 if (!connection)
161 return;
163 if (connection->fd != -1)
164 close(connection->fd);
166 free(connection);
169 int ipc_client_send_command_to_connection(
170 struct ipc_client_connection *connection,
171 const char *message, struct strbuf *answer)
173 int ret = 0;
175 strbuf_setlen(answer, 0);
177 trace2_region_enter("ipc-client", "send-command", NULL);
179 if (write_packetized_from_buf_no_flush(message, strlen(message),
180 connection->fd) < 0 ||
181 packet_flush_gently(connection->fd) < 0) {
182 ret = error(_("could not send IPC command"));
183 goto done;
186 if (read_packetized_to_strbuf(
187 connection->fd, answer,
188 PACKET_READ_GENTLE_ON_EOF | PACKET_READ_GENTLE_ON_READ_ERROR) < 0) {
189 ret = error(_("could not read IPC response"));
190 goto done;
193 done:
194 trace2_region_leave("ipc-client", "send-command", NULL);
195 return ret;
198 int ipc_client_send_command(const char *path,
199 const struct ipc_client_connect_options *options,
200 const char *message, struct strbuf *answer)
202 int ret = -1;
203 enum ipc_active_state state;
204 struct ipc_client_connection *connection = NULL;
206 state = ipc_client_try_connect(path, options, &connection);
208 if (state != IPC_STATE__LISTENING)
209 return ret;
211 ret = ipc_client_send_command_to_connection(connection, message, answer);
213 ipc_client_close_connection(connection);
215 return ret;
218 static int set_socket_blocking_flag(int fd, int make_nonblocking)
220 int flags;
222 flags = fcntl(fd, F_GETFL, NULL);
224 if (flags < 0)
225 return -1;
227 if (make_nonblocking)
228 flags |= O_NONBLOCK;
229 else
230 flags &= ~O_NONBLOCK;
232 return fcntl(fd, F_SETFL, flags);
236 * Magic numbers used to annotate callback instance data.
237 * These are used to help guard against accidentally passing the
238 * wrong instance data across multiple levels of callbacks (which
239 * is easy to do if there are `void*` arguments).
241 enum magic {
242 MAGIC_SERVER_REPLY_DATA,
243 MAGIC_WORKER_THREAD_DATA,
244 MAGIC_ACCEPT_THREAD_DATA,
245 MAGIC_SERVER_DATA,
248 struct ipc_server_reply_data {
249 enum magic magic;
250 int fd;
251 struct ipc_worker_thread_data *worker_thread_data;
254 struct ipc_worker_thread_data {
255 enum magic magic;
256 struct ipc_worker_thread_data *next_thread;
257 struct ipc_server_data *server_data;
258 pthread_t pthread_id;
261 struct ipc_accept_thread_data {
262 enum magic magic;
263 struct ipc_server_data *server_data;
265 struct unix_ss_socket *server_socket;
267 int fd_send_shutdown;
268 int fd_wait_shutdown;
269 pthread_t pthread_id;
273 * With unix-sockets, the conceptual "ipc-server" is implemented as a single
274 * controller "accept-thread" thread and a pool of "worker-thread" threads.
275 * The former does the usual `accept()` loop and dispatches connections
276 * to an idle worker thread. The worker threads wait in an idle loop for
277 * a new connection, communicate with the client and relay data to/from
278 * the `application_cb` and then wait for another connection from the
279 * server thread. This avoids the overhead of constantly creating and
280 * destroying threads.
282 struct ipc_server_data {
283 enum magic magic;
284 ipc_server_application_cb *application_cb;
285 void *application_data;
286 struct strbuf buf_path;
288 struct ipc_accept_thread_data *accept_thread;
289 struct ipc_worker_thread_data *worker_thread_list;
291 pthread_mutex_t work_available_mutex;
292 pthread_cond_t work_available_cond;
295 * Accepted but not yet processed client connections are kept
296 * in a circular buffer FIFO. The queue is empty when the
297 * positions are equal.
299 int *fifo_fds;
300 int queue_size;
301 int back_pos;
302 int front_pos;
304 int shutdown_requested;
305 int is_stopped;
309 * Remove and return the oldest queued connection.
311 * Returns -1 if empty.
313 static int fifo_dequeue(struct ipc_server_data *server_data)
315 /* ASSERT holding mutex */
317 int fd;
319 if (server_data->back_pos == server_data->front_pos)
320 return -1;
322 fd = server_data->fifo_fds[server_data->front_pos];
323 server_data->fifo_fds[server_data->front_pos] = -1;
325 server_data->front_pos++;
326 if (server_data->front_pos == server_data->queue_size)
327 server_data->front_pos = 0;
329 return fd;
333 * Push a new fd onto the back of the queue.
335 * Drop it and return -1 if queue is already full.
337 static int fifo_enqueue(struct ipc_server_data *server_data, int fd)
339 /* ASSERT holding mutex */
341 int next_back_pos;
343 next_back_pos = server_data->back_pos + 1;
344 if (next_back_pos == server_data->queue_size)
345 next_back_pos = 0;
347 if (next_back_pos == server_data->front_pos) {
348 /* Queue is full. Just drop it. */
349 close(fd);
350 return -1;
353 server_data->fifo_fds[server_data->back_pos] = fd;
354 server_data->back_pos = next_back_pos;
356 return fd;
360 * Wait for a connection to be queued to the FIFO and return it.
362 * Returns -1 if someone has already requested a shutdown.
364 static int worker_thread__wait_for_connection(
365 struct ipc_worker_thread_data *worker_thread_data)
367 /* ASSERT NOT holding mutex */
369 struct ipc_server_data *server_data = worker_thread_data->server_data;
370 int fd = -1;
372 pthread_mutex_lock(&server_data->work_available_mutex);
373 for (;;) {
374 if (server_data->shutdown_requested)
375 break;
377 fd = fifo_dequeue(server_data);
378 if (fd >= 0)
379 break;
381 pthread_cond_wait(&server_data->work_available_cond,
382 &server_data->work_available_mutex);
384 pthread_mutex_unlock(&server_data->work_available_mutex);
386 return fd;
390 * Forward declare our reply callback function so that any compiler
391 * errors are reported when we actually define the function (in addition
392 * to any errors reported when we try to pass this callback function as
393 * a parameter in a function call). The former are easier to understand.
395 static ipc_server_reply_cb do_io_reply_callback;
398 * Relay application's response message to the client process.
399 * (We do not flush at this point because we allow the caller
400 * to chunk data to the client thru us.)
402 static int do_io_reply_callback(struct ipc_server_reply_data *reply_data,
403 const char *response, size_t response_len)
405 if (reply_data->magic != MAGIC_SERVER_REPLY_DATA)
406 BUG("reply_cb called with wrong instance data");
408 return write_packetized_from_buf_no_flush(response, response_len,
409 reply_data->fd);
412 /* A randomly chosen value. */
413 #define MY_WAIT_POLL_TIMEOUT_MS (10)
416 * If the client hangs up without sending any data on the wire, just
417 * quietly close the socket and ignore this client.
419 * This worker thread is committed to reading the IPC request data
420 * from the client at the other end of this fd. Wait here for the
421 * client to actually put something on the wire -- because if the
422 * client just does a ping (connect and hangup without sending any
423 * data), our use of the pkt-line read routines will spew an error
424 * message.
426 * Return -1 if the client hung up.
427 * Return 0 if data (possibly incomplete) is ready.
429 static int worker_thread__wait_for_io_start(
430 struct ipc_worker_thread_data *worker_thread_data,
431 int fd)
433 struct ipc_server_data *server_data = worker_thread_data->server_data;
434 struct pollfd pollfd[1];
435 int result;
437 for (;;) {
438 pollfd[0].fd = fd;
439 pollfd[0].events = POLLIN;
441 result = poll(pollfd, 1, MY_WAIT_POLL_TIMEOUT_MS);
442 if (result < 0) {
443 if (errno == EINTR)
444 continue;
445 goto cleanup;
448 if (result == 0) {
449 /* a timeout */
451 int in_shutdown;
453 pthread_mutex_lock(&server_data->work_available_mutex);
454 in_shutdown = server_data->shutdown_requested;
455 pthread_mutex_unlock(&server_data->work_available_mutex);
458 * If a shutdown is already in progress and this
459 * client has not started talking yet, just drop it.
461 if (in_shutdown)
462 goto cleanup;
463 continue;
466 if (pollfd[0].revents & POLLHUP)
467 goto cleanup;
469 if (pollfd[0].revents & POLLIN)
470 return 0;
472 goto cleanup;
475 cleanup:
476 close(fd);
477 return -1;
481 * Receive the request/command from the client and pass it to the
482 * registered request-callback. The request-callback will compose
483 * a response and call our reply-callback to send it to the client.
485 static int worker_thread__do_io(
486 struct ipc_worker_thread_data *worker_thread_data,
487 int fd)
489 /* ASSERT NOT holding lock */
491 struct strbuf buf = STRBUF_INIT;
492 struct ipc_server_reply_data reply_data;
493 int ret = 0;
495 reply_data.magic = MAGIC_SERVER_REPLY_DATA;
496 reply_data.worker_thread_data = worker_thread_data;
498 reply_data.fd = fd;
500 ret = read_packetized_to_strbuf(
501 reply_data.fd, &buf,
502 PACKET_READ_GENTLE_ON_EOF | PACKET_READ_GENTLE_ON_READ_ERROR);
503 if (ret >= 0) {
504 ret = worker_thread_data->server_data->application_cb(
505 worker_thread_data->server_data->application_data,
506 buf.buf, do_io_reply_callback, &reply_data);
508 packet_flush_gently(reply_data.fd);
510 else {
512 * The client probably disconnected/shutdown before it
513 * could send a well-formed message. Ignore it.
517 strbuf_release(&buf);
518 close(reply_data.fd);
520 return ret;
524 * Block SIGPIPE on the current thread (so that we get EPIPE from
525 * write() rather than an actual signal).
527 * Note that using sigchain_push() and _pop() to control SIGPIPE
528 * around our IO calls is not thread safe:
529 * [] It uses a global stack of handler frames.
530 * [] It uses ALLOC_GROW() to resize it.
531 * [] Finally, according to the `signal(2)` man-page:
532 * "The effects of `signal()` in a multithreaded process are unspecified."
534 static void thread_block_sigpipe(sigset_t *old_set)
536 sigset_t new_set;
538 sigemptyset(&new_set);
539 sigaddset(&new_set, SIGPIPE);
541 sigemptyset(old_set);
542 pthread_sigmask(SIG_BLOCK, &new_set, old_set);
546 * Thread proc for an IPC worker thread. It handles a series of
547 * connections from clients. It pulls the next fd from the queue
548 * processes it, and then waits for the next client.
550 * Block SIGPIPE in this worker thread for the life of the thread.
551 * This avoids stray (and sometimes delayed) SIGPIPE signals caused
552 * by client errors and/or when we are under extremely heavy IO load.
554 * This means that the application callback will have SIGPIPE blocked.
555 * The callback should not change it.
557 static void *worker_thread_proc(void *_worker_thread_data)
559 struct ipc_worker_thread_data *worker_thread_data = _worker_thread_data;
560 struct ipc_server_data *server_data = worker_thread_data->server_data;
561 sigset_t old_set;
562 int fd, io;
563 int ret;
565 trace2_thread_start("ipc-worker");
567 thread_block_sigpipe(&old_set);
569 for (;;) {
570 fd = worker_thread__wait_for_connection(worker_thread_data);
571 if (fd == -1)
572 break; /* in shutdown */
574 io = worker_thread__wait_for_io_start(worker_thread_data, fd);
575 if (io == -1)
576 continue; /* client hung up without sending anything */
578 ret = worker_thread__do_io(worker_thread_data, fd);
580 if (ret == SIMPLE_IPC_QUIT) {
581 trace2_data_string("ipc-worker", NULL, "queue_stop_async",
582 "application_quit");
584 * The application layer is telling the ipc-server
585 * layer to shutdown.
587 * We DO NOT have a response to send to the client.
589 * Queue an async stop (to stop the other threads) and
590 * allow this worker thread to exit now (no sense waiting
591 * for the thread-pool shutdown signal).
593 * Other non-idle worker threads are allowed to finish
594 * responding to their current clients.
596 ipc_server_stop_async(server_data);
597 break;
601 trace2_thread_exit();
602 return NULL;
605 /* A randomly chosen value. */
606 #define MY_ACCEPT_POLL_TIMEOUT_MS (60 * 1000)
609 * Accept a new client connection on our socket. This uses non-blocking
610 * IO so that we can also wait for shutdown requests on our socket-pair
611 * without actually spinning on a fast timeout.
613 static int accept_thread__wait_for_connection(
614 struct ipc_accept_thread_data *accept_thread_data)
616 struct pollfd pollfd[2];
617 int result;
619 for (;;) {
620 pollfd[0].fd = accept_thread_data->fd_wait_shutdown;
621 pollfd[0].events = POLLIN;
623 pollfd[1].fd = accept_thread_data->server_socket->fd_socket;
624 pollfd[1].events = POLLIN;
626 result = poll(pollfd, 2, MY_ACCEPT_POLL_TIMEOUT_MS);
627 if (result < 0) {
628 if (errno == EINTR)
629 continue;
630 return result;
633 if (result == 0) {
634 /* a timeout */
637 * If someone deletes or force-creates a new unix
638 * domain socket at our path, all future clients
639 * will be routed elsewhere and we silently starve.
640 * If that happens, just queue a shutdown.
642 if (unix_ss_was_stolen(
643 accept_thread_data->server_socket)) {
644 trace2_data_string("ipc-accept", NULL,
645 "queue_stop_async",
646 "socket_stolen");
647 ipc_server_stop_async(
648 accept_thread_data->server_data);
650 continue;
653 if (pollfd[0].revents & POLLIN) {
654 /* shutdown message queued to socketpair */
655 return -1;
658 if (pollfd[1].revents & POLLIN) {
659 /* a connection is available on server_socket */
661 int client_fd =
662 accept(accept_thread_data->server_socket->fd_socket,
663 NULL, NULL);
664 if (client_fd >= 0)
665 return client_fd;
668 * An error here is unlikely -- it probably
669 * indicates that the connecting process has
670 * already dropped the connection.
672 continue;
675 BUG("unandled poll result errno=%d r[0]=%d r[1]=%d",
676 errno, pollfd[0].revents, pollfd[1].revents);
681 * Thread proc for the IPC server "accept thread". This waits for
682 * an incoming socket connection, appends it to the queue of available
683 * connections, and notifies a worker thread to process it.
685 * Block SIGPIPE in this thread for the life of the thread. This
686 * avoids any stray SIGPIPE signals when closing pipe fds under
687 * extremely heavy loads (such as when the fifo queue is full and we
688 * drop incomming connections).
690 static void *accept_thread_proc(void *_accept_thread_data)
692 struct ipc_accept_thread_data *accept_thread_data = _accept_thread_data;
693 struct ipc_server_data *server_data = accept_thread_data->server_data;
694 sigset_t old_set;
696 trace2_thread_start("ipc-accept");
698 thread_block_sigpipe(&old_set);
700 for (;;) {
701 int client_fd = accept_thread__wait_for_connection(
702 accept_thread_data);
704 pthread_mutex_lock(&server_data->work_available_mutex);
705 if (server_data->shutdown_requested) {
706 pthread_mutex_unlock(&server_data->work_available_mutex);
707 if (client_fd >= 0)
708 close(client_fd);
709 break;
712 if (client_fd < 0) {
713 /* ignore transient accept() errors */
715 else {
716 fifo_enqueue(server_data, client_fd);
717 pthread_cond_broadcast(&server_data->work_available_cond);
719 pthread_mutex_unlock(&server_data->work_available_mutex);
722 trace2_thread_exit();
723 return NULL;
727 * We can't predict the connection arrival rate relative to the worker
728 * processing rate, therefore we allow the "accept-thread" to queue up
729 * a generous number of connections, since we'd rather have the client
730 * not unnecessarily timeout if we can avoid it. (The assumption is
731 * that this will be used for FSMonitor and a few second wait on a
732 * connection is better than having the client timeout and do the full
733 * computation itself.)
735 * The FIFO queue size is set to a multiple of the worker pool size.
736 * This value chosen at random.
738 #define FIFO_SCALE (100)
741 * The backlog value for `listen(2)`. This doesn't need to huge,
742 * rather just large enough for our "accept-thread" to wake up and
743 * queue incoming connections onto the FIFO without the kernel
744 * dropping any.
746 * This value chosen at random.
748 #define LISTEN_BACKLOG (50)
750 static int create_listener_socket(
751 const char *path,
752 const struct ipc_server_opts *ipc_opts,
753 struct unix_ss_socket **new_server_socket)
755 struct unix_ss_socket *server_socket = NULL;
756 struct unix_stream_listen_opts uslg_opts = UNIX_STREAM_LISTEN_OPTS_INIT;
757 int ret;
759 uslg_opts.listen_backlog_size = LISTEN_BACKLOG;
760 uslg_opts.disallow_chdir = ipc_opts->uds_disallow_chdir;
762 ret = unix_ss_create(path, &uslg_opts, -1, &server_socket);
763 if (ret)
764 return ret;
766 if (set_socket_blocking_flag(server_socket->fd_socket, 1)) {
767 int saved_errno = errno;
768 unix_ss_free(server_socket);
769 errno = saved_errno;
770 return -1;
773 *new_server_socket = server_socket;
775 trace2_data_string("ipc-server", NULL, "listen-with-lock", path);
776 return 0;
779 static int setup_listener_socket(
780 const char *path,
781 const struct ipc_server_opts *ipc_opts,
782 struct unix_ss_socket **new_server_socket)
784 int ret, saved_errno;
786 trace2_region_enter("ipc-server", "create-listener_socket", NULL);
788 ret = create_listener_socket(path, ipc_opts, new_server_socket);
790 saved_errno = errno;
791 trace2_region_leave("ipc-server", "create-listener_socket", NULL);
792 errno = saved_errno;
794 return ret;
798 * Start IPC server in a pool of background threads.
800 int ipc_server_run_async(struct ipc_server_data **returned_server_data,
801 const char *path, const struct ipc_server_opts *opts,
802 ipc_server_application_cb *application_cb,
803 void *application_data)
805 struct unix_ss_socket *server_socket = NULL;
806 struct ipc_server_data *server_data;
807 int sv[2];
808 int k;
809 int ret;
810 int nr_threads = opts->nr_threads;
812 *returned_server_data = NULL;
815 * Create a socketpair and set sv[1] to non-blocking. This
816 * will used to send a shutdown message to the accept-thread
817 * and allows the accept-thread to wait on EITHER a client
818 * connection or a shutdown request without spinning.
820 if (socketpair(AF_UNIX, SOCK_STREAM, 0, sv) < 0)
821 return -1;
823 if (set_socket_blocking_flag(sv[1], 1)) {
824 int saved_errno = errno;
825 close(sv[0]);
826 close(sv[1]);
827 errno = saved_errno;
828 return -1;
831 ret = setup_listener_socket(path, opts, &server_socket);
832 if (ret) {
833 int saved_errno = errno;
834 close(sv[0]);
835 close(sv[1]);
836 errno = saved_errno;
837 return ret;
840 server_data = xcalloc(1, sizeof(*server_data));
841 server_data->magic = MAGIC_SERVER_DATA;
842 server_data->application_cb = application_cb;
843 server_data->application_data = application_data;
844 strbuf_init(&server_data->buf_path, 0);
845 strbuf_addstr(&server_data->buf_path, path);
847 if (nr_threads < 1)
848 nr_threads = 1;
850 pthread_mutex_init(&server_data->work_available_mutex, NULL);
851 pthread_cond_init(&server_data->work_available_cond, NULL);
853 server_data->queue_size = nr_threads * FIFO_SCALE;
854 CALLOC_ARRAY(server_data->fifo_fds, server_data->queue_size);
856 server_data->accept_thread =
857 xcalloc(1, sizeof(*server_data->accept_thread));
858 server_data->accept_thread->magic = MAGIC_ACCEPT_THREAD_DATA;
859 server_data->accept_thread->server_data = server_data;
860 server_data->accept_thread->server_socket = server_socket;
861 server_data->accept_thread->fd_send_shutdown = sv[0];
862 server_data->accept_thread->fd_wait_shutdown = sv[1];
864 if (pthread_create(&server_data->accept_thread->pthread_id, NULL,
865 accept_thread_proc, server_data->accept_thread))
866 die_errno(_("could not start accept_thread '%s'"), path);
868 for (k = 0; k < nr_threads; k++) {
869 struct ipc_worker_thread_data *wtd;
871 wtd = xcalloc(1, sizeof(*wtd));
872 wtd->magic = MAGIC_WORKER_THREAD_DATA;
873 wtd->server_data = server_data;
875 if (pthread_create(&wtd->pthread_id, NULL, worker_thread_proc,
876 wtd)) {
877 if (k == 0)
878 die(_("could not start worker[0] for '%s'"),
879 path);
881 * Limp along with the thread pool that we have.
883 break;
886 wtd->next_thread = server_data->worker_thread_list;
887 server_data->worker_thread_list = wtd;
890 *returned_server_data = server_data;
891 return 0;
895 * Gently tell the IPC server treads to shutdown.
896 * Can be run on any thread.
898 int ipc_server_stop_async(struct ipc_server_data *server_data)
900 /* ASSERT NOT holding mutex */
902 int fd;
904 if (!server_data)
905 return 0;
907 trace2_region_enter("ipc-server", "server-stop-async", NULL);
909 pthread_mutex_lock(&server_data->work_available_mutex);
911 server_data->shutdown_requested = 1;
914 * Write a byte to the shutdown socket pair to wake up the
915 * accept-thread.
917 if (write(server_data->accept_thread->fd_send_shutdown, "Q", 1) < 0)
918 error_errno("could not write to fd_send_shutdown");
921 * Drain the queue of existing connections.
923 while ((fd = fifo_dequeue(server_data)) != -1)
924 close(fd);
927 * Gently tell worker threads to stop processing new connections
928 * and exit. (This does not abort in-process conversations.)
930 pthread_cond_broadcast(&server_data->work_available_cond);
932 pthread_mutex_unlock(&server_data->work_available_mutex);
934 trace2_region_leave("ipc-server", "server-stop-async", NULL);
936 return 0;
940 * Wait for all IPC server threads to stop.
942 int ipc_server_await(struct ipc_server_data *server_data)
944 pthread_join(server_data->accept_thread->pthread_id, NULL);
946 if (!server_data->shutdown_requested)
947 BUG("ipc-server: accept-thread stopped for '%s'",
948 server_data->buf_path.buf);
950 while (server_data->worker_thread_list) {
951 struct ipc_worker_thread_data *wtd =
952 server_data->worker_thread_list;
954 pthread_join(wtd->pthread_id, NULL);
956 server_data->worker_thread_list = wtd->next_thread;
957 free(wtd);
960 server_data->is_stopped = 1;
962 return 0;
965 void ipc_server_free(struct ipc_server_data *server_data)
967 struct ipc_accept_thread_data * accept_thread_data;
969 if (!server_data)
970 return;
972 if (!server_data->is_stopped)
973 BUG("cannot free ipc-server while running for '%s'",
974 server_data->buf_path.buf);
976 accept_thread_data = server_data->accept_thread;
977 if (accept_thread_data) {
978 unix_ss_free(accept_thread_data->server_socket);
980 if (accept_thread_data->fd_send_shutdown != -1)
981 close(accept_thread_data->fd_send_shutdown);
982 if (accept_thread_data->fd_wait_shutdown != -1)
983 close(accept_thread_data->fd_wait_shutdown);
985 free(server_data->accept_thread);
988 while (server_data->worker_thread_list) {
989 struct ipc_worker_thread_data *wtd =
990 server_data->worker_thread_list;
992 server_data->worker_thread_list = wtd->next_thread;
993 free(wtd);
996 pthread_cond_destroy(&server_data->work_available_cond);
997 pthread_mutex_destroy(&server_data->work_available_mutex);
999 strbuf_release(&server_data->buf_path);
1001 free(server_data->fifo_fds);
1002 free(server_data);