From be69d27dfb4ed953a1e9c4a7b2ce6130f9061970 Mon Sep 17 00:00:00 2001 From: Stefan Becker Date: Tue, 28 Aug 2012 22:10:21 +0300 Subject: [PATCH] telepathy: implement buffered writes in transport GOutputStream can only have one pending action at a time. If the stream is busy then append the buffer to a list of outstanding writes. --- src/telepathy/telepathy-transport.c | 54 ++++++++++++++++++++++++++++++++----- 1 file changed, 47 insertions(+), 7 deletions(-) diff --git a/src/telepathy/telepathy-transport.c b/src/telepathy/telepathy-transport.c index c9dd4030..e59e9c86 100644 --- a/src/telepathy/telepathy-transport.c +++ b/src/telepathy/telepathy-transport.c @@ -48,6 +48,8 @@ struct sipe_transport_telepathy { GSocketConnection *socket; GInputStream *istream; GOutputStream *ostream; + GSList *buffers; + gboolean is_writing; }; #define TELEPATHY_TRANSPORT ((struct sipe_transport_telepathy *) conn) @@ -124,8 +126,11 @@ static void socket_connected(GObject *client, transport->public.client_port = g_inet_socket_address_get_port(G_INET_SOCKET_ADDRESS(saddr)); g_object_unref(saddr); - transport->istream = g_io_stream_get_input_stream(G_IO_STREAM(transport->socket)); - transport->ostream = g_io_stream_get_output_stream(G_IO_STREAM(transport->socket)); + transport->istream = g_io_stream_get_input_stream(G_IO_STREAM(transport->socket)); + transport->ostream = g_io_stream_get_output_stream(G_IO_STREAM(transport->socket)); + transport->buffers = NULL; + transport->is_writing = FALSE; + /* this sets up the async read handler */ read_completed(G_OBJECT(transport->istream), NULL, transport); transport->connected(SIPE_TRANSPORT_CONNECTION); @@ -197,9 +202,16 @@ struct sipe_transport_connection *sipe_backend_transport_connect(struct sipe_cor void sipe_backend_transport_disconnect(struct sipe_transport_connection *conn) { struct sipe_transport_telepathy *transport = TELEPATHY_TRANSPORT; + GSList *entry; if (!transport) return; + for (entry = transport->buffers; entry; entry = entry->next) + g_free(entry->data); + g_slist_free(transport->buffers); + transport->buffers = NULL; + transport->is_writing = FALSE; + if (transport->socket) g_object_unref(transport->socket); transport->socket = NULL; @@ -211,6 +223,8 @@ void sipe_backend_transport_disconnect(struct sipe_transport_connection *conn) g_free(transport); } +static void do_write(struct sipe_transport_telepathy *transport, + const gchar *buffer); static void write_completed(GObject *stream, GAsyncResult *result, gpointer data) @@ -224,13 +238,25 @@ static void write_completed(GObject *stream, SIPE_DEBUG_ERROR("write error: %s", error->message); transport->error(SIPE_TRANSPORT_CONNECTION, error->message); g_error_free(error); + } else { + /* more to write? */ + if (transport->buffers) { + /* yes */ + gchar *buffer = transport->buffers->data; + transport->buffers = g_slist_remove(transport->buffers, + buffer); + do_write(transport, buffer); + g_free(buffer); + } else + /* no, we're done for now... */ + transport->is_writing = FALSE; } } -void sipe_backend_transport_message(struct sipe_transport_connection *conn, - const gchar *buffer) +static void do_write(struct sipe_transport_telepathy *transport, + const gchar *buffer) { - struct sipe_transport_telepathy *transport = TELEPATHY_TRANSPORT; + transport->is_writing = TRUE; g_output_stream_write_async(transport->ostream, buffer, strlen(buffer), @@ -240,10 +266,24 @@ void sipe_backend_transport_message(struct sipe_transport_connection *conn, transport); } -void sipe_backend_transport_flush(struct sipe_transport_connection *conn) +void sipe_backend_transport_message(struct sipe_transport_connection *conn, + const gchar *buffer) { struct sipe_transport_telepathy *transport = TELEPATHY_TRANSPORT; - g_output_stream_flush(transport->ostream, NULL, NULL); + + /* currently writing? */ + if (transport->is_writing) { + /* yes, append copy of buffer to list */ + transport->buffers = g_slist_append(transport->buffers, + g_strdup(buffer)); + } else + /* no, write directly to stream */ + do_write(transport, buffer); +} + +void sipe_backend_transport_flush(SIPE_UNUSED_PARAMETER struct sipe_transport_connection *conn) +{ + /* @TODO? */ } const gchar *sipe_backend_network_ip_address(struct sipe_core_public *sipe_public) -- 2.11.4.GIT