2 * test-simple-ipc.c: verify that the Inter-Process Communication works.
8 #include "simple-ipc.h"
9 #include "parse-options.h"
10 #include "thread-utils.h"
12 #include "run-command.h"
14 #ifndef SUPPORTS_SIMPLE_IPC
15 int cmd__simple_ipc(int argc
, const char **argv
)
17 die("simple IPC not available on this platform");
22 * The test daemon defines an "application callback" that supports a
23 * series of commands (see `test_app_cb()`).
25 * Unknown commands are caught here and we send an error message back
26 * to the client process.
28 static int app__unhandled_command(const char *command
,
29 ipc_server_reply_cb
*reply_cb
,
30 struct ipc_server_reply_data
*reply_data
)
32 struct strbuf buf
= STRBUF_INIT
;
35 strbuf_addf(&buf
, "unhandled command: %s", command
);
36 ret
= reply_cb(reply_data
, buf
.buf
, buf
.len
);
43 * Reply with a single very large buffer. This is to ensure that
44 * long response are properly handled -- whether the chunking occurs
45 * in the kernel or in the (probably pkt-line) layer.
47 #define BIG_ROWS (10000)
48 static int app__big_command(ipc_server_reply_cb
*reply_cb
,
49 struct ipc_server_reply_data
*reply_data
)
51 struct strbuf buf
= STRBUF_INIT
;
55 for (row
= 0; row
< BIG_ROWS
; row
++)
56 strbuf_addf(&buf
, "big: %.75d\n", row
);
58 ret
= reply_cb(reply_data
, buf
.buf
, buf
.len
);
65 * Reply with a series of lines. This is to ensure that we can incrementally
66 * compute the response and chunk it to the client.
68 #define CHUNK_ROWS (10000)
69 static int app__chunk_command(ipc_server_reply_cb
*reply_cb
,
70 struct ipc_server_reply_data
*reply_data
)
72 struct strbuf buf
= STRBUF_INIT
;
76 for (row
= 0; row
< CHUNK_ROWS
; row
++) {
77 strbuf_setlen(&buf
, 0);
78 strbuf_addf(&buf
, "big: %.75d\n", row
);
79 ret
= reply_cb(reply_data
, buf
.buf
, buf
.len
);
88 * Slowly reply with a series of lines. This is to model an expensive to
89 * compute chunked response (which might happen if this callback is running
90 * in a thread and is fighting for a lock with other threads).
92 #define SLOW_ROWS (1000)
93 #define SLOW_DELAY_MS (10)
94 static int app__slow_command(ipc_server_reply_cb
*reply_cb
,
95 struct ipc_server_reply_data
*reply_data
)
97 struct strbuf buf
= STRBUF_INIT
;
101 for (row
= 0; row
< SLOW_ROWS
; row
++) {
102 strbuf_setlen(&buf
, 0);
103 strbuf_addf(&buf
, "big: %.75d\n", row
);
104 ret
= reply_cb(reply_data
, buf
.buf
, buf
.len
);
105 sleep_millisec(SLOW_DELAY_MS
);
108 strbuf_release(&buf
);
114 * The client sent a command followed by a (possibly very) large buffer.
116 static int app__sendbytes_command(const char *received
, size_t received_len
,
117 ipc_server_reply_cb
*reply_cb
,
118 struct ipc_server_reply_data
*reply_data
)
120 struct strbuf buf_resp
= STRBUF_INIT
;
128 * The test is setup to send:
129 * "sendbytes" SP <n * char>
131 if (received_len
< strlen("sendbytes "))
132 BUG("received_len is short in app__sendbytes_command");
134 if (skip_prefix(received
, "sendbytes ", &p
))
135 len_ballast
= strlen(p
);
138 * Verify that the ballast is n copies of a single letter.
139 * And that the multi-threaded IO layer didn't cross the streams.
141 for (k
= 1; k
< len_ballast
; k
++)
146 strbuf_addf(&buf_resp
, "errs:%d\n", errs
);
148 strbuf_addf(&buf_resp
, "rcvd:%c%08d\n", p
[0], len_ballast
);
150 ret
= reply_cb(reply_data
, buf_resp
.buf
, buf_resp
.len
);
152 strbuf_release(&buf_resp
);
158 * An arbitrary fixed address to verify that the application instance
159 * data is handled properly.
161 static int my_app_data
= 42;
163 static ipc_server_application_cb test_app_cb
;
166 * This is the "application callback" that sits on top of the
167 * "ipc-server". It completely defines the set of commands supported
168 * by this application.
170 static int test_app_cb(void *application_data
,
171 const char *command
, size_t command_len
,
172 ipc_server_reply_cb
*reply_cb
,
173 struct ipc_server_reply_data
*reply_data
)
176 * Verify that we received the application-data that we passed
177 * when we started the ipc-server. (We have several layers of
178 * callbacks calling callbacks and it's easy to get things mixed
179 * up (especially when some are "void*").)
181 if (application_data
!= (void*)&my_app_data
)
182 BUG("application_cb: application_data pointer wrong");
184 if (command_len
== 4 && !strncmp(command
, "quit", 4)) {
186 * The client sent a "quit" command. This is an async
187 * request for the server to shutdown.
189 * We DO NOT send the client a response message
190 * (because we have nothing to say and the other
191 * server threads have not yet stopped).
193 * Tell the ipc-server layer to start shutting down.
194 * This includes: stop listening for new connections
195 * on the socket/pipe and telling all worker threads
196 * to finish/drain their outgoing responses to other
199 * This DOES NOT force an immediate sync shutdown.
201 return SIMPLE_IPC_QUIT
;
204 if (command_len
== 4 && !strncmp(command
, "ping", 4)) {
205 const char *answer
= "pong";
206 return reply_cb(reply_data
, answer
, strlen(answer
));
209 if (command_len
== 3 && !strncmp(command
, "big", 3))
210 return app__big_command(reply_cb
, reply_data
);
212 if (command_len
== 5 && !strncmp(command
, "chunk", 5))
213 return app__chunk_command(reply_cb
, reply_data
);
215 if (command_len
== 4 && !strncmp(command
, "slow", 4))
216 return app__slow_command(reply_cb
, reply_data
);
218 if (command_len
>= 10 && starts_with(command
, "sendbytes "))
219 return app__sendbytes_command(command
, command_len
,
220 reply_cb
, reply_data
);
222 return app__unhandled_command(command
, reply_cb
, reply_data
);
227 const char *subcommand
;
239 static struct cl_args cl_args
= {
253 * This process will run as a simple-ipc server and listen for IPC commands
254 * from client processes.
256 static int daemon__run_server(void)
260 struct ipc_server_opts opts
= {
261 .nr_threads
= cl_args
.nr_threads
,
265 * Synchronously run the ipc-server. We don't need any application
266 * instance data, so pass an arbitrary pointer (that we'll later
267 * verify made the round trip).
269 ret
= ipc_server_run(cl_args
.path
, &opts
, test_app_cb
, (void*)&my_app_data
);
271 error("socket/pipe already in use: '%s'", cl_args
.path
);
273 error_errno("could not start server on: '%s'", cl_args
.path
);
278 static start_bg_wait_cb bg_wait_cb
;
280 static int bg_wait_cb(const struct child_process
*cp
, void *cb_data
)
282 int s
= ipc_get_active_state(cl_args
.path
);
285 case IPC_STATE__LISTENING
:
286 /* child is "ready" */
289 case IPC_STATE__NOT_LISTENING
:
290 case IPC_STATE__PATH_NOT_FOUND
:
291 /* give child more time */
295 case IPC_STATE__INVALID_PATH
:
296 case IPC_STATE__OTHER_ERROR
:
297 /* all the time in world won't help */
302 static int daemon__start_server(void)
304 struct child_process cp
= CHILD_PROCESS_INIT
;
305 enum start_bg_result sbgr
;
307 strvec_push(&cp
.args
, "test-tool");
308 strvec_push(&cp
.args
, "simple-ipc");
309 strvec_push(&cp
.args
, "run-daemon");
310 strvec_pushf(&cp
.args
, "--name=%s", cl_args
.path
);
311 strvec_pushf(&cp
.args
, "--threads=%d", cl_args
.nr_threads
);
317 sbgr
= start_bg_command(&cp
, bg_wait_cb
, NULL
, cl_args
.max_wait_sec
);
326 return error("daemon failed to start");
329 return error("daemon not online yet");
332 return error("daemon terminated");
337 * This process will run a quick probe to see if a simple-ipc server
338 * is active on this path.
340 * Returns 0 if the server is alive.
342 static int client__probe_server(void)
344 enum ipc_active_state s
;
346 s
= ipc_get_active_state(cl_args
.path
);
348 case IPC_STATE__LISTENING
:
351 case IPC_STATE__NOT_LISTENING
:
352 return error("no server listening at '%s'", cl_args
.path
);
354 case IPC_STATE__PATH_NOT_FOUND
:
355 return error("path not found '%s'", cl_args
.path
);
357 case IPC_STATE__INVALID_PATH
:
358 return error("invalid pipe/socket name '%s'", cl_args
.path
);
360 case IPC_STATE__OTHER_ERROR
:
362 return error("other error for '%s'", cl_args
.path
);
367 * Send an IPC command token to an already-running server daemon and
368 * print the response.
370 * This is a simple 1 word command/token that `test_app_cb()` (in the
371 * daemon process) will understand.
373 static int client__send_ipc(void)
375 const char *command
= "(no-command)";
376 struct strbuf buf
= STRBUF_INIT
;
377 struct ipc_client_connect_options options
378 = IPC_CLIENT_CONNECT_OPTIONS_INIT
;
380 if (cl_args
.token
&& *cl_args
.token
)
381 command
= cl_args
.token
;
383 options
.wait_if_busy
= 1;
384 options
.wait_if_not_found
= 0;
386 if (!ipc_client_send_command(cl_args
.path
, &options
,
387 command
, strlen(command
),
390 printf("%s\n", buf
.buf
);
393 strbuf_release(&buf
);
398 return error("failed to send '%s' to '%s'", command
, cl_args
.path
);
402 * Send an IPC command to an already-running server and ask it to
403 * shutdown. "send quit" is an async request and queues a shutdown
404 * event in the server, so we spin and wait here for it to actually
405 * shutdown to make the unit tests a little easier to write.
407 static int client__stop_server(void)
410 time_t time_limit
, now
;
411 enum ipc_active_state s
;
414 time_limit
+= cl_args
.max_wait_sec
;
416 cl_args
.token
= "quit";
418 ret
= client__send_ipc();
425 s
= ipc_get_active_state(cl_args
.path
);
427 if (s
!= IPC_STATE__LISTENING
) {
429 * The socket/pipe is gone and/or has stopped
430 * responding. Lets assume that the daemon
431 * process has exited too.
437 if (now
> time_limit
)
438 return error("daemon has not shutdown yet");
443 * Send an IPC command followed by ballast to confirm that a large
444 * message can be sent and that the kernel or pkt-line layers will
445 * properly chunk it and that the daemon receives the entire message.
447 static int do_sendbytes(int bytecount
, char byte
, const char *path
,
448 const struct ipc_client_connect_options
*options
)
450 struct strbuf buf_send
= STRBUF_INIT
;
451 struct strbuf buf_resp
= STRBUF_INIT
;
453 strbuf_addstr(&buf_send
, "sendbytes ");
454 strbuf_addchars(&buf_send
, byte
, bytecount
);
456 if (!ipc_client_send_command(path
, options
,
457 buf_send
.buf
, buf_send
.len
,
459 strbuf_rtrim(&buf_resp
);
460 printf("sent:%c%08d %s\n", byte
, bytecount
, buf_resp
.buf
);
462 strbuf_release(&buf_send
);
463 strbuf_release(&buf_resp
);
468 return error("client failed to sendbytes(%d, '%c') to '%s'",
469 bytecount
, byte
, path
);
473 * Send an IPC command with ballast to an already-running server daemon.
475 static int client__sendbytes(void)
477 struct ipc_client_connect_options options
478 = IPC_CLIENT_CONNECT_OPTIONS_INIT
;
480 options
.wait_if_busy
= 1;
481 options
.wait_if_not_found
= 0;
482 options
.uds_disallow_chdir
= 0;
484 return do_sendbytes(cl_args
.bytecount
, cl_args
.bytevalue
, cl_args
.path
,
488 struct multiple_thread_data
{
489 pthread_t pthread_id
;
490 struct multiple_thread_data
*next
;
499 static void *multiple_thread_proc(void *_multiple_thread_data
)
501 struct multiple_thread_data
*d
= _multiple_thread_data
;
503 struct ipc_client_connect_options options
504 = IPC_CLIENT_CONNECT_OPTIONS_INIT
;
506 options
.wait_if_busy
= 1;
507 options
.wait_if_not_found
= 0;
509 * A multi-threaded client should not be randomly calling chdir().
510 * The test will pass without this restriction because the test is
511 * not otherwise accessing the filesystem, but it makes us honest.
513 options
.uds_disallow_chdir
= 1;
515 trace2_thread_start("multiple");
517 for (k
= 0; k
< d
->batchsize
; k
++) {
518 if (do_sendbytes(d
->bytecount
+ k
, d
->letter
, d
->path
, &options
))
524 trace2_thread_exit();
529 * Start a client-side thread pool. Each thread sends a series of
530 * IPC requests. Each request is on a new connection to the server.
532 static int client__multiple(void)
534 struct multiple_thread_data
*list
= NULL
;
536 int sum_join_errors
= 0;
537 int sum_thread_errors
= 0;
540 for (k
= 0; k
< cl_args
.nr_threads
; k
++) {
541 struct multiple_thread_data
*d
= xcalloc(1, sizeof(*d
));
543 d
->path
= cl_args
.path
;
544 d
->bytecount
= cl_args
.bytecount
+ cl_args
.batchsize
*(k
/26);
545 d
->batchsize
= cl_args
.batchsize
;
548 d
->letter
= 'A' + (k
% 26);
550 if (pthread_create(&d
->pthread_id
, NULL
, multiple_thread_proc
, d
)) {
551 warning("failed to create thread[%d] skipping remainder", k
);
560 struct multiple_thread_data
*d
= list
;
562 if (pthread_join(d
->pthread_id
, NULL
))
565 sum_thread_errors
+= d
->sum_errors
;
566 sum_good
+= d
->sum_good
;
572 printf("client (good %d) (join %d), (errors %d)\n",
573 sum_good
, sum_join_errors
, sum_thread_errors
);
575 return (sum_join_errors
+ sum_thread_errors
) ? 1 : 0;
578 int cmd__simple_ipc(int argc
, const char **argv
)
580 const char * const simple_ipc_usage
[] = {
581 N_("test-helper simple-ipc is-active [<name>] [<options>]"),
582 N_("test-helper simple-ipc run-daemon [<name>] [<threads>]"),
583 N_("test-helper simple-ipc start-daemon [<name>] [<threads>] [<max-wait>]"),
584 N_("test-helper simple-ipc stop-daemon [<name>] [<max-wait>]"),
585 N_("test-helper simple-ipc send [<name>] [<token>]"),
586 N_("test-helper simple-ipc sendbytes [<name>] [<bytecount>] [<byte>]"),
587 N_("test-helper simple-ipc multiple [<name>] [<threads>] [<bytecount>] [<batchsize>]"),
591 const char *bytevalue
= NULL
;
593 struct option options
[] = {
594 #ifndef GIT_WINDOWS_NATIVE
595 OPT_STRING(0, "name", &cl_args
.path
, N_("name"), N_("name or pathname of unix domain socket")),
597 OPT_STRING(0, "name", &cl_args
.path
, N_("name"), N_("named-pipe name")),
599 OPT_INTEGER(0, "threads", &cl_args
.nr_threads
, N_("number of threads in server thread pool")),
600 OPT_INTEGER(0, "max-wait", &cl_args
.max_wait_sec
, N_("seconds to wait for daemon to start or stop")),
602 OPT_INTEGER(0, "bytecount", &cl_args
.bytecount
, N_("number of bytes")),
603 OPT_INTEGER(0, "batchsize", &cl_args
.batchsize
, N_("number of requests per thread")),
605 OPT_STRING(0, "byte", &bytevalue
, N_("byte"), N_("ballast character")),
606 OPT_STRING(0, "token", &cl_args
.token
, N_("token"), N_("command token to send to the server")),
612 usage_with_options(simple_ipc_usage
, options
);
614 if (argc
== 2 && !strcmp(argv
[1], "-h"))
615 usage_with_options(simple_ipc_usage
, options
);
617 if (argc
== 2 && !strcmp(argv
[1], "SUPPORTS_SIMPLE_IPC"))
620 cl_args
.subcommand
= argv
[1];
625 argc
= parse_options(argc
, argv
, NULL
, options
, simple_ipc_usage
, 0);
627 if (cl_args
.nr_threads
< 1)
628 cl_args
.nr_threads
= 1;
629 if (cl_args
.max_wait_sec
< 0)
630 cl_args
.max_wait_sec
= 0;
631 if (cl_args
.bytecount
< 1)
632 cl_args
.bytecount
= 1;
633 if (cl_args
.batchsize
< 1)
634 cl_args
.batchsize
= 1;
636 if (bytevalue
&& *bytevalue
)
637 cl_args
.bytevalue
= bytevalue
[0];
640 * Use '!!' on all dispatch functions to map from `error()` style
641 * (returns -1) style to `test_must_fail` style (expects 1). This
642 * makes shell error messages less confusing.
645 if (!strcmp(cl_args
.subcommand
, "is-active"))
646 return !!client__probe_server();
648 if (!strcmp(cl_args
.subcommand
, "run-daemon"))
649 return !!daemon__run_server();
651 if (!strcmp(cl_args
.subcommand
, "start-daemon"))
652 return !!daemon__start_server();
655 * Client commands follow. Ensure a server is running before
656 * sending any data. This might be overkill, but then again
657 * this is a test harness.
660 if (!strcmp(cl_args
.subcommand
, "stop-daemon")) {
661 if (client__probe_server())
663 return !!client__stop_server();
666 if (!strcmp(cl_args
.subcommand
, "send")) {
667 if (client__probe_server())
669 return !!client__send_ipc();
672 if (!strcmp(cl_args
.subcommand
, "sendbytes")) {
673 if (client__probe_server())
675 return !!client__sendbytes();
678 if (!strcmp(cl_args
.subcommand
, "multiple")) {
679 if (client__probe_server())
681 return !!client__multiple();
684 die("Unhandled subcommand: '%s'", cl_args
.subcommand
);