From c047cec9c6332294342a9b96b47826438387ea42 Mon Sep 17 00:00:00 2001 From: Graham Cobb Date: Tue, 3 Feb 2009 19:48:24 +0000 Subject: [PATCH] Timeout merge #3: 6) Implement a mechanism to protect the queue itself: i.e. is the receiver still reading the queue? 4) Handle timeout before reply queue set up by generating an error message on the command queue git-svn-id: https://svn.opensync.org/trunk/trunk@5257 53f5c7ee-bee3-0310-bbc5-ea0e15fffd5e --- ChangeLog | 54 ++++++ opensync/ipc/opensync_queue.c | 174 +++++++++++++++--- opensync/ipc/opensync_queue_private.h | 16 ++ tests/CMakeLists.txt | 2 + tests/ipc-tests/check_ipc.c | 333 +++++++++++++++++++++++++++++++++- 5 files changed, 552 insertions(+), 27 deletions(-) diff --git a/ChangeLog b/ChangeLog index 2e96ac21..38120f96 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,5 +1,59 @@ +2009-02-03 Graham Cobb + + * tests/ipc-tests/check_ipc.c (ipc_loop_timeout_with_idle): Change test parameters + so test completes within 30 second limit. + +2009-02-02 Graham Cobb + + * opensync/ipc/opensync_queue_private.h (OSYNC_QUEUE_PENDING_QUEUE_MIN_TIMEOUT): Define + minimum pending queue timeout. + + * opensync/ipc/opensync_queue.c (_osync_queue_restart_pending_timeout): Apply + minimum value to pending queue timeout. + +2009-02-01 Graham Cobb + + * tests/ipc-tests/check_ipc.c: Add ipc_timeout_noreceiver test. + + * opensync/ipc/opensync_queue.c (osync_queue_send_message_with_timeout): Track maximum + timeout seen. Start pending queue timeout. + (_osync_queue_remove_pending_reply): restart pending queue timeout if necessary + (_osync_queue_restart_pending_timeout): Add function to start/restart pending queue timeout + (_timeout_check): Fix calculation of expiry of timeout. Add pending queue timeout. + (_timeout_dispatch): Fix calculation of expiry of timeout. Add pending queue timeout. + + * opensync/ipc/opensync_queue_private.h: Add max_timeout and pending_timeout fields (to queue). + (OSYNC_QUEUE_PENDING_QUEUE_IPC_DELAY): Add value to assume for IPC delay in pending queue timout. + +2009-01-25 Graham Cobb + + * opensync/engine/opensync_sink_engine.c (osync_sink_engine_new): Ref objengine [Bug #1052] + (osync_sink_engine_unref): Unref objengine [Bug #1052] + + * opensync/engine/opensync_obj_engine.c (_osync_obj_engine_*_callback): Unref sinkengine [Bug #1052] + (osync_obj_engine_command): Ref sinkengine every time it is used for a callback [Bug #1052] + + * opensync/ipc/opensync_queue.c (osync_queue_disconnect): Empty pending + queue before performing disconnect. Pending messages with callbacks + will get called with an error message. + (_incoming_check): Do not action incoming queue if a disconnect is in progress. + This avoids entries being added to pending queue while we are trying to empty it. + (osync_queue_send_message_with_timeout): Do not allow sending messages which require + adding entries to the reply queue pending list if the reply queue is being disconnected. + + * opensync/ipc/opensync_queue_private.h: Add disc_in_progress flag to queue. + 2009-01-24 Graham Cobb + * opensync/ipc/opensync_queue.c (_osync_queue_generate_error): Add + _osync_queue_generate_error + (_osync_send_timeout_response): If there is no reply queue, + call _osync_queue_generate_error + + * tests/CMakeLists.txt: Add ipc_timeout_noreplyq + + * tests/ipc-tests/check_ipc.c: Add ipc_timeout_noreplyq + * opensync/ipc/opensync_queue.c (osync_queue_remove_cross_link): Add osync_queue_remove_cross_link (osync_queue_disconnect): Call osync_queue_remove_cross_link diff --git a/opensync/ipc/opensync_queue.c b/opensync/ipc/opensync_queue.c index bf1e00dd..8035b1ef 100644 --- a/opensync/ipc/opensync_queue.c +++ b/opensync/ipc/opensync_queue.c @@ -29,6 +29,30 @@ #include "opensync_queue_internals.h" #include "opensync_queue_private.h" +static gboolean _osync_queue_generate_error(OSyncQueue *queue, OSyncMessageCommand errcode, OSyncError **error) +{ + OSyncMessage *message; + + queue->connected = FALSE; + + /* Now we can send the hup message, and wake up the consumer thread so + * it can pickup the messages in the incoming queue */ + message = osync_message_new(errcode, 0, error); + if (!message) { + return FALSE; + } + osync_trace(TRACE_INTERNAL, "Generating incoming error message %p(%s), id= %lli", message, osync_message_get_commandstr(message), osync_message_get_id(message)); + + osync_message_ref(message); + + g_async_queue_push(queue->incoming, message); + + if (queue->incomingContext) + g_main_context_wakeup(queue->incomingContext); + + return TRUE; +} + static gboolean _timeout_prepare(GSource *source, gint *timeout_) { /* TODO adapt *timeout_ value to shortest message timeout value... @@ -55,6 +79,16 @@ static gboolean _timeout_check(GSource *source) * list since another thread might be duing the updates */ g_mutex_lock(queue->pendingLock); + /* First check the overall queue timer */ + if (queue->pendingCount> 0 && queue->pending_timeout.tv_sec > 0) { + if (current_time.tv_sec > queue->pending_timeout.tv_sec + || (current_time.tv_sec == queue->pending_timeout.tv_sec + && current_time.tv_usec >= queue->pending_timeout.tv_usec) ) { + g_mutex_unlock(queue->pendingLock); + return TRUE; + } + } + for (p = queue->pendingReplies; p; p = p->next) { pending = p->data; @@ -63,7 +97,7 @@ static gboolean _timeout_check(GSource *source) toinfo = pending->timeout_info; - if (current_time.tv_sec >= toinfo->expiration.tv_sec + if (current_time.tv_sec > toinfo->expiration.tv_sec || (current_time.tv_sec == toinfo->expiration.tv_sec && current_time.tv_usec >= toinfo->expiration.tv_usec)) { /* Unlock the pending lock since the messages might be sent during the callback */ @@ -96,6 +130,18 @@ static gboolean _timeout_dispatch(GSource *source, GSourceFunc callback, gpointe * list since another thread might be duing the updates */ g_mutex_lock(queue->pendingLock); + /* First check the overall queue timer */ + if (queue->pendingCount> 0 && queue->pending_timeout.tv_sec > 0) { + if (current_time.tv_sec > queue->pending_timeout.tv_sec + || (current_time.tv_sec == queue->pending_timeout.tv_sec + && current_time.tv_usec >= queue->pending_timeout.tv_usec) ) { + /* The queue has died. Generate an error */ + osync_trace(TRACE_INTERNAL, "%s: Pending queue timer expired: receiver must have died", __func__); + _osync_queue_generate_error(queue, OSYNC_MESSAGE_QUEUE_ERROR, NULL); + queue->pending_timeout.tv_sec = 0; // Stop timer + } + } + for (p = queue->pendingReplies; p; p = p->next) { pending = p->data; @@ -104,8 +150,8 @@ static gboolean _timeout_dispatch(GSource *source, GSourceFunc callback, gpointe toinfo = pending->timeout_info; - if (current_time.tv_sec == toinfo->expiration.tv_sec || - (current_time.tv_sec >= toinfo->expiration.tv_sec + if (current_time.tv_sec > toinfo->expiration.tv_sec || + (current_time.tv_sec == toinfo->expiration.tv_sec && current_time.tv_usec >= toinfo->expiration.tv_usec)) { OSyncError *error = NULL; OSyncError *timeouterr = NULL; @@ -175,7 +221,8 @@ static gboolean _incoming_check(GSource *source) if (g_async_queue_length(queue->incoming) > 0 && (queue->pendingLimit == 0 - || queue->pendingCount < queue->pendingLimit) ) + || queue->pendingCount < queue->pendingLimit) + && !queue->disc_in_progress ) return TRUE; return FALSE; @@ -191,15 +238,45 @@ static void _osync_send_timeout_response(OSyncMessage *errormsg, void *user_data osync_queue_send_message_with_timeout(queue->reply_queue, NULL, errormsg, 0, NULL); } else { osync_message_unref(errormsg); - /* TODO: as we can't send the timeout response, disconnect the queue to signal to - the sender that they aren't going to get a response. Note: we can't just - call osync_queue_disconnect as that tries to kill the thread and hits a deadlock! */ + /* As we can't send the timeout response, create an error and drop it in the incoming queue + so the higher layer will disconnect. Note: we can't just call osync_queue_disconnect + as that tries to kill the thread and hits a deadlock! */ + if (!_osync_queue_generate_error(queue, OSYNC_MESSAGE_QUEUE_ERROR, NULL)) { + osync_trace(TRACE_EXIT_ERROR, "%s: cannot even generate error on incoming queue", __func__); + return; + } osync_trace(TRACE_EXIT_ERROR, "%s: cannot find reply queue to send timeout error", __func__); return; } osync_trace(TRACE_EXIT, "%s", __func__); } +/* Restart the pending queue timeout */ +static void _osync_queue_restart_pending_timeout(OSyncQueue *queue) +{ + /* Note that the pending queue timeout is just to make sure that progress is being made. + It does not time individual commands -- it just makes sure that responses are being + received for outstanding commands at some rate. Individual message timeouts are + handled at the receiver. The main purpose of this timer is to detect if the + receiver has stopped receiving for some reason. + + The timer is started when the first message is put on the pending queue and is reset + and restarted whenever a response is received. The timeout value is based on the + largest message timeout seen to date. */ + + /* Note: queue->pending_timout is protected by the pending lock, which should be held + by the caller before calling this function */ + + if (queue->max_timeout) { + unsigned int timeout; + timeout = queue->max_timeout + OSYNC_QUEUE_PENDING_QUEUE_IPC_DELAY; + if (timeout < OSYNC_QUEUE_PENDING_QUEUE_MIN_TIMEOUT) + timeout = OSYNC_QUEUE_PENDING_QUEUE_MIN_TIMEOUT; + g_source_get_current_time(queue->timeout_source, &queue->pending_timeout); + queue->pending_timeout.tv_sec += timeout; + } +} + /* Find and remove a pending message that this message is a reply to */ static void _osync_queue_remove_pending_reply(OSyncQueue *queue, OSyncMessage *reply, gboolean callback) { @@ -226,7 +303,8 @@ static void _osync_queue_remove_pending_reply(OSyncQueue *queue, OSyncMessage *r gets called twice! */ queue->pendingReplies = osync_list_remove(queue->pendingReplies, pending); - queue->pendingCount--; + if (--queue->pendingCount != 0) + _osync_queue_restart_pending_timeout(queue); /* Unlock the pending lock since the messages might be sent during the callback */ g_mutex_unlock(queue->pendingLock); @@ -271,7 +349,7 @@ static gboolean _incoming_dispatch(GSource *source, GSourceFunc callback, gpoint osync_message_get_timeout(message), osync_message_get_id(message)); if (osync_message_get_cmd(message) == OSYNC_MESSAGE_REPLY || osync_message_get_cmd(message) == OSYNC_MESSAGE_ERRORREPLY) { - /* Remove pending reply and call callback */ + /* Remove pending reply and call callback*/ _osync_queue_remove_pending_reply(queue, message, TRUE); } else { unsigned int timeout = osync_message_get_timeout(message); @@ -570,18 +648,8 @@ static gboolean _source_check(GSource *source) return TRUE; case OSYNC_QUEUE_EVENT_HUP: case OSYNC_QUEUE_EVENT_ERROR: - queue->connected = FALSE; - - /* Now we can send the hup message, and wake up the consumer thread so - * it can pickup the messages in the incoming queue */ - message = osync_message_new(OSYNC_MESSAGE_QUEUE_HUP, 0, &error); - if (!message) + if (!_osync_queue_generate_error(queue, OSYNC_MESSAGE_QUEUE_HUP, &error)) goto error; - - g_async_queue_push(queue->incoming, message); - - if (queue->incomingContext) - g_main_context_wakeup(queue->incomingContext); return FALSE; } @@ -1000,6 +1068,52 @@ osync_bool osync_queue_disconnect(OSyncQueue *queue, OSyncError **error) osync_trace(TRACE_ENTRY, "%s(%p, %p)", __func__, queue, error); osync_assert(queue); + /* Before doing the real disconnect, we empty the pending queue by creating HUP + errors for anything on it. In order to make sure it empties, the disconnect + is marked as in progress so that no more entries get put on it (e.g. if a + callback tries to send another message). */ + + g_mutex_lock(queue->pendingLock); + + queue->disc_in_progress = TRUE; + + while (queue->pendingCount > 0) { + OSyncPendingMessage *pending = queue->pendingReplies->data; + OSyncError *error = NULL; + OSyncError *huperr = NULL; + OSyncMessage *errormsg = NULL; + + queue->pendingReplies = osync_list_remove(queue->pendingReplies, pending); + queue->pendingCount--; + + /* Call the callback of the pending message */ + if (pending->callback) { + osync_error_set(&huperr, OSYNC_ERROR_IO_ERROR, "Disconnect."); + errormsg = osync_message_new_errorreply(NULL, huperr, &error); + osync_error_unref(&huperr); + osync_message_set_id(errormsg, pending->id); + + /* Unlock the pending lock during the callback */ + g_mutex_unlock(queue->pendingLock); + + osync_trace(TRACE_INTERNAL, "%s: Reporting disconnect error for message %lli", __func__, pending->id); + + pending->callback(errormsg, pending->user_data); + if (errormsg != NULL) + osync_message_unref(errormsg); + + /* Lock again */ + g_mutex_lock(queue->pendingLock); + } + + // TODO: Refcounting for OSyncPendingMessage + if (pending->timeout_info) + g_free(pending->timeout_info); + + osync_free(pending); + } + g_mutex_unlock(queue->pendingLock); + osync_queue_remove_cross_link(queue); g_mutex_lock(queue->disconnectLock); @@ -1038,6 +1152,10 @@ osync_bool osync_queue_disconnect(OSyncQueue *queue, OSyncError **error) queue->fd = -1; queue->connected = FALSE; g_mutex_unlock(queue->disconnectLock); + + g_mutex_lock(queue->pendingLock); + queue->disc_in_progress = FALSE; + g_mutex_unlock(queue->pendingLock); osync_trace(TRACE_EXIT, "%s", __func__); return TRUE; @@ -1221,6 +1339,13 @@ osync_bool osync_queue_send_message_with_timeout(OSyncQueue *queue, OSyncQueue * GTimeVal current_time; long long int id = 0; osync_assert(replyqueue); + + g_mutex_lock(replyqueue->pendingLock); + if (replyqueue->disc_in_progress) { + osync_error_set(error, OSYNC_ERROR_IO_ERROR, "Disconnect in progress."); + goto error; + } + pending = osync_try_malloc0(sizeof(OSyncPendingMessage), error); if (!pending) goto error; @@ -1236,6 +1361,9 @@ osync_bool osync_queue_send_message_with_timeout(OSyncQueue *queue, OSyncQueue * if (timeout) { /* Send timeout info to other end to handle */ osync_message_set_timeout(message, timeout); + /* Note largest timeout seen */ + if (timeout > replyqueue->max_timeout) + replyqueue->max_timeout = timeout; } else { osync_trace(TRACE_INTERNAL, "handler message got sent without timeout!: %s", osync_message_get_commandstr(message)); } @@ -1243,9 +1371,11 @@ osync_bool osync_queue_send_message_with_timeout(OSyncQueue *queue, OSyncQueue * pending->callback = osync_message_get_handler(message); pending->user_data = osync_message_get_handler_data(message); - g_mutex_lock(replyqueue->pendingLock); replyqueue->pendingReplies = osync_list_append(replyqueue->pendingReplies, pending); - replyqueue->pendingCount++; + if (replyqueue->pendingCount++ == 0) { + /* Start queue timeout */ + _osync_queue_restart_pending_timeout(replyqueue); + } g_mutex_unlock(replyqueue->pendingLock); } diff --git a/opensync/ipc/opensync_queue_private.h b/opensync/ipc/opensync_queue_private.h index adae271e..13fd2d37 100644 --- a/opensync/ipc/opensync_queue_private.h +++ b/opensync/ipc/opensync_queue_private.h @@ -96,8 +96,24 @@ struct OSyncQueue { /* Queue for receiving commands we are replying to */ OSyncQueue *cmd_queue; + + /* Disconnect in progress */ + gboolean disc_in_progress; + + /* Largest timeout value seen so far */ + unsigned int max_timeout; + + /* Expiration time of pending queue timeout */ + GTimeVal pending_timeout; }; +/** @brief Pending queue timeout addition for ipc delay + */ +#define OSYNC_QUEUE_PENDING_QUEUE_IPC_DELAY 1 + +/** @brief Pending queue minimum timeout + */ +#define OSYNC_QUEUE_PENDING_QUEUE_MIN_TIMEOUT 20 /** @brief Timeout object */ diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 5be5f671..c21676ef 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -249,6 +249,8 @@ OSYNC_TESTCASE(ipc ipc_timeout) OSYNC_TESTCASE(ipc ipc_loop_with_timeout) OSYNC_TESTCASE(ipc ipc_late_reply) OSYNC_TESTCASE(ipc ipc_loop_timeout_with_idle) +OSYNC_TESTCASE(ipc ipc_timeout_noreplyq) +OSYNC_TESTCASE(ipc ipc_timeout_noreceiver) ENDIF (NOT WIN32) BUILD_CHECK_TEST( mapping mapping-tests/check_mapping.c ${TEST_TARGET_LIBRARIES} ) diff --git a/tests/ipc-tests/check_ipc.c b/tests/ipc-tests/check_ipc.c index a940af1d..10dadd85 100644 --- a/tests/ipc-tests/check_ipc.c +++ b/tests/ipc-tests/check_ipc.c @@ -2089,10 +2089,13 @@ int num_callback = 0; static void _message_handler(OSyncMessage *message, void *user_data) { + osync_trace(TRACE_ENTRY, "%s(%p, %p)", __func__, message, user_data); + osync_trace(TRACE_INTERNAL, "%s",osync_message_get_commandstr(message)); if (osync_message_is_error(message)) num_callback_timeout++; else num_callback++; + osync_trace(TRACE_EXIT, "%s", __func__); } char *data5 = "this is another test string"; @@ -2664,11 +2667,12 @@ START_TEST (ipc_loop_timeout_with_idle) /* Same as ipc_loop_with_timeout except that the client handler doesn't sleep, so the queue dispatchers can run while the operation is waiting. - Even though each action takes 1 second, none of these messages should time out - as they are being sent with a timeout of 3 seconds */ + Even though each action takes 1 second, and might be delayed by 3 seconds + due to the messages already processed on the pending queue, none of these + messages should time out as they are being sent with a timeout of 5 seconds */ num_msgs = 0; - req_msgs = 30; + req_msgs = 10; char *testbed = setup_testbed(NULL); osync_testing_file_remove("/tmp/testpipe-server"); @@ -2693,7 +2697,8 @@ START_TEST (ipc_loop_timeout_with_idle) OSyncThread *thread = osync_thread_new(context, &error); osync_queue_set_message_handler(client_queue, client_handler_first_part, GINT_TO_POINTER(1)); - osync_queue_set_pending_limit(client_queue, OSYNC_QUEUE_PENDING_LIMIT); + // Set pending limit to 3 so response wil be delayed at most 3 seconds + osync_queue_set_pending_limit(client_queue, 3); osync_queue_setup_with_gmainloop(client_queue, context); @@ -2765,7 +2770,8 @@ START_TEST (ipc_loop_timeout_with_idle) osync_message_set_handler(message, callback_handler_check_reply, GINT_TO_POINTER(1)); - fail_unless(osync_queue_send_message_with_timeout(client_queue, server_queue, message, 10, &error), NULL); + // Timeout of 5 will do as pending limit is 3 + fail_unless(osync_queue_send_message_with_timeout(client_queue, server_queue, message, 5, &error), NULL); fail_unless(!osync_error_is_set(&error), NULL); osync_message_unref(message); @@ -2803,6 +2809,321 @@ START_TEST (ipc_loop_timeout_with_idle) } END_TEST +void client_handler6(OSyncMessage *message, void *user_data) +{ + osync_trace(TRACE_ENTRY, "%s(%p, %p)", __func__, message, user_data); + OSyncError *error = NULL; + + osync_assert(GPOINTER_TO_INT(user_data) ==1); + + if (osync_message_get_command(message) == OSYNC_MESSAGE_QUEUE_ERROR) { + osync_queue_disconnect(client_queue, NULL); + osync_trace(TRACE_EXIT, "%s: disconnect", __func__); + return; + } + + osync_assert(osync_message_get_command(message) == OSYNC_MESSAGE_INITIALIZE); + + int int1; + long long int longint1; + char *string; + char databuf[strlen(data5) + 1]; + + osync_message_read_int(message, &int1); + osync_message_read_string(message, &string); + osync_message_read_long_long_int(message, &longint1); + osync_message_read_data(message, databuf, strlen(data5) + 1); + + osync_assert(int1 == 4000000); + osync_assert(!strcmp(string, "this is a test string")); + osync_assert(longint1 == 400000000); + osync_assert(!strcmp(databuf, data5)); + + /* TIMEOUT TIMEOUT TIMEOUT (no reply...) */ + + /* Proper code would reply to this message, but for testing + purposes we don't reply and simulate a "timeout" situation */ + + osync_trace(TRACE_EXIT, "%s", __func__); +} +START_TEST (ipc_timeout_noreplyq) +{ + /* This testcase is inteded to test timeout before the command and reply queues are cross-linked. + Client got forked and listens for messages from Server and replies. + + To simulate a "timeout" situation the Client doesn't reply to one of the Server messages. + + As there is no reply queue, an error will be sent to the **client**, who then disconnects + so an error (although not a timeout) ends up sent to the server. + */ + + char *testbed = setup_testbed(NULL); + osync_testing_file_remove("/tmp/testpipe-server"); + osync_testing_file_remove("/tmp/testpipe-client"); + + num_callback_timeout = 0; + num_callback = 0; + + OSyncError *error = NULL; + server_queue = osync_queue_new("/tmp/testpipe-server", &error); + client_queue = osync_queue_new("/tmp/testpipe-client", &error); + OSyncMessage *message = NULL; + + osync_queue_create(server_queue, &error); + fail_unless(error == NULL, NULL); + + osync_queue_create(client_queue, &error); + fail_unless(error == NULL, NULL); + + pid_t cpid = fork(); + if (cpid == 0) { //Child + + GMainContext *context = g_main_context_new(); + OSyncThread *thread = osync_thread_new(context, &error); + + osync_queue_set_message_handler(client_queue, client_handler6, GINT_TO_POINTER(1)); + osync_queue_set_pending_limit(client_queue, OSYNC_QUEUE_PENDING_LIMIT); + + osync_queue_setup_with_gmainloop(client_queue, context); + + osync_thread_start(thread); + + osync_assert(osync_queue_connect(client_queue, OSYNC_QUEUE_RECEIVER, &error)); + osync_assert(error == NULL); + + osync_assert(osync_queue_connect(server_queue, OSYNC_QUEUE_SENDER, &error)); + osync_assert(error == NULL); + + /* Do not cross-link */ + /*osync_queue_cross_link(client_queue, server_queue);*/ + + message = osync_queue_get_message(server_queue); + + osync_assert(osync_message_get_command(message) == OSYNC_MESSAGE_QUEUE_HUP); + + osync_message_unref(message); + + if (osync_queue_disconnect(server_queue, &error) != TRUE || error != NULL) + exit(1); + osync_queue_unref(server_queue); + + osync_assert(osync_queue_disconnect(client_queue, &error)); + osync_assert(error == NULL); + + osync_thread_stop(thread); + osync_thread_unref(thread); + + osync_queue_unref(client_queue); + + g_free(testbed); + + exit(0); + } else { + GMainContext *context = g_main_context_new(); + OSyncThread *thread = osync_thread_new(context, &error); + + osync_queue_set_message_handler(server_queue, server_handler4, GINT_TO_POINTER(1)); + + osync_queue_setup_with_gmainloop(server_queue, context); + + osync_thread_start(thread); + + fail_unless(osync_queue_connect(client_queue, OSYNC_QUEUE_SENDER, &error), NULL); + fail_unless(error == NULL, NULL); + + fail_unless(osync_queue_connect(server_queue, OSYNC_QUEUE_RECEIVER, &error), NULL); + fail_unless(error == NULL, NULL); + + message = osync_message_new(OSYNC_MESSAGE_INITIALIZE, 0, &error); + fail_unless(message != NULL, NULL); + fail_unless(!osync_error_is_set(&error), NULL); + + osync_message_set_handler(message, _message_handler, NULL); + + osync_message_write_int(message, 4000000); + osync_message_write_string(message, "this is a test string"); + osync_message_write_long_long_int(message, 400000000); + osync_message_write_data(message, data5, strlen(data5) + 1); + + // Send with timeout of one second + fail_unless(osync_queue_send_message_with_timeout(client_queue, server_queue, message, 1, &error), NULL); + fail_unless(!osync_error_is_set(&error), NULL); + + osync_message_unref(message); + + while (!(message = osync_queue_get_message(client_queue))) { + g_usleep(10000); + } + + fail_unless(osync_message_get_command(message) == OSYNC_MESSAGE_QUEUE_HUP); + osync_message_unref(message); + + osync_queue_disconnect(client_queue, &error); + fail_unless(error == NULL, NULL); + + osync_queue_disconnect(server_queue, &error); + fail_unless(error == NULL, NULL); + + int status = 0; + wait(&status); + fail_unless(WEXITSTATUS(status) == 0, NULL); + } + + fail_unless(osync_testing_file_exists("/tmp/testpipe-client") == TRUE, NULL); + + fail_unless(osync_queue_remove(client_queue, &error), NULL); + fail_unless(osync_queue_remove(server_queue, &error), NULL); + fail_unless(!osync_error_is_set(&error), NULL); + + /* Check if the timeout handler replied with an error */ + fail_unless(num_callback_timeout == 1, NULL); + fail_unless(num_callback == 0, NULL); + + fail_unless(osync_testing_file_exists("/tmp/testpipe-client") == FALSE, NULL); + + osync_queue_unref(client_queue); + osync_queue_unref(server_queue); + + destroy_testbed(testbed); +} +END_TEST + +START_TEST (ipc_timeout_noreceiver) +{ + /* This testcase is intended to test the case where the receiver is not even listening, + and so does not run the timeout. + */ + + char *testbed = setup_testbed(NULL); + osync_testing_file_remove("/tmp/testpipe-server"); + osync_testing_file_remove("/tmp/testpipe-client"); + + num_callback_timeout = 0; + num_callback = 0; + + OSyncError *error = NULL; + server_queue = osync_queue_new("/tmp/testpipe-server", &error); + client_queue = osync_queue_new("/tmp/testpipe-client", &error); + OSyncMessage *message = NULL; + + osync_queue_create(server_queue, &error); + fail_unless(error == NULL, NULL); + + osync_queue_create(client_queue, &error); + fail_unless(error == NULL, NULL); + + pid_t cpid = fork(); + if (cpid == 0) { //Child + + GMainContext *context = g_main_context_new(); + OSyncThread *thread = osync_thread_new(context, &error); + + osync_queue_set_message_handler(client_queue, client_handler1, GINT_TO_POINTER(1)); + osync_queue_set_pending_limit(client_queue, OSYNC_QUEUE_PENDING_LIMIT); + + /* Do not start receiver */ + /* osync_queue_setup_with_gmainloop(client_queue, context); */ + + osync_thread_start(thread); + + osync_assert(osync_queue_connect(client_queue, OSYNC_QUEUE_RECEIVER, &error)); + osync_assert(error == NULL); + + osync_assert(osync_queue_connect(server_queue, OSYNC_QUEUE_SENDER, &error)); + osync_assert(error == NULL); + + /* Do not cross-link */ + osync_queue_cross_link(client_queue, server_queue); + + message = osync_queue_get_message(server_queue); + + osync_assert(osync_message_get_command(message) == OSYNC_MESSAGE_QUEUE_HUP); + + osync_message_unref(message); + + if (osync_queue_disconnect(server_queue, &error) != TRUE || error != NULL) + exit(1); + osync_queue_unref(server_queue); + + osync_assert(osync_queue_disconnect(client_queue, &error)); + osync_assert(error == NULL); + + osync_thread_stop(thread); + osync_thread_unref(thread); + + osync_queue_unref(client_queue); + + g_free(testbed); + + exit(0); + } else { + GMainContext *context = g_main_context_new(); + OSyncThread *thread = osync_thread_new(context, &error); + + osync_queue_set_message_handler(server_queue, server_handler4, GINT_TO_POINTER(1)); + + osync_queue_setup_with_gmainloop(server_queue, context); + + osync_thread_start(thread); + + fail_unless(osync_queue_connect(client_queue, OSYNC_QUEUE_SENDER, &error), NULL); + fail_unless(error == NULL, NULL); + + fail_unless(osync_queue_connect(server_queue, OSYNC_QUEUE_RECEIVER, &error), NULL); + fail_unless(error == NULL, NULL); + + message = osync_message_new(OSYNC_MESSAGE_INITIALIZE, 0, &error); + fail_unless(message != NULL, NULL); + fail_unless(!osync_error_is_set(&error), NULL); + + osync_message_set_handler(message, _message_handler, NULL); + + osync_message_write_int(message, 4000000); + osync_message_write_string(message, "this is a test string"); + osync_message_write_long_long_int(message, 400000000); + osync_message_write_data(message, data5, strlen(data5) + 1); + + // Send with timeout of one second + fail_unless(osync_queue_send_message_with_timeout(client_queue, server_queue, message, 1, &error), NULL); + fail_unless(!osync_error_is_set(&error), NULL); + + osync_message_unref(message); + + /* Note: OSYNC_QUEUE_PENDING_QUEUE_MIN_TIMEOUT is 20 */ + g_usleep(25*G_USEC_PER_SEC); + + /* Check if the timeout handler replied with an error. + Note: it is important we check **before** we start disconnecting + otherwise we are not testing the right thing */ + fail_unless(num_callback_timeout == 1, NULL); + fail_unless(num_callback == 0, NULL); + + osync_queue_disconnect(client_queue, &error); + fail_unless(error == NULL, NULL); + + osync_queue_disconnect(server_queue, &error); + fail_unless(error == NULL, NULL); + + int status = 0; + wait(&status); + fail_unless(WEXITSTATUS(status) == 0, NULL); + } + + fail_unless(osync_testing_file_exists("/tmp/testpipe-client") == TRUE, NULL); + + fail_unless(osync_queue_remove(client_queue, &error), NULL); + fail_unless(osync_queue_remove(server_queue, &error), NULL); + fail_unless(!osync_error_is_set(&error), NULL); + + fail_unless(osync_testing_file_exists("/tmp/testpipe-client") == FALSE, NULL); + + osync_queue_unref(client_queue); + osync_queue_unref(server_queue); + + destroy_testbed(testbed); +} +END_TEST + OSYNC_TESTCASE_START("ipc") OSYNC_TESTCASE_ADD(ipc_new) OSYNC_TESTCASE_ADD(ipc_ref) @@ -2832,5 +3153,7 @@ OSYNC_TESTCASE_ADD(ipc_timeout) OSYNC_TESTCASE_ADD(ipc_late_reply) OSYNC_TESTCASE_ADD(ipc_loop_with_timeout) OSYNC_TESTCASE_ADD(ipc_loop_timeout_with_idle) +OSYNC_TESTCASE_ADD(ipc_timeout_noreplyq) +OSYNC_TESTCASE_ADD(ipc_timeout_noreceiver) OSYNC_TESTCASE_END -- 2.11.4.GIT