telepathy: implement buffered writes in transport
[siplcs.git] / src / telepathy / telepathy-transport.c
blobe59e9c86138a2dc8734bb2c7dc6a69f0c0e60066
1 /**
2 * @file telepathy-transport.c
4 * pidgin-sipe
6 * Copyright (C) 2012 SIPE Project <http://sipe.sourceforge.net/>
8 * This program is free software; you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License as published by
10 * the Free Software Foundation; either version 2 of the License, or
11 * (at your option) any later version.
13 * This program is distributed in the hope that it will be useful,
14 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 * GNU General Public License for more details.
18 * You should have received a copy of the GNU General Public License
19 * along with this program; if not, write to the Free Software
20 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
23 #ifdef HAVE_CONFIG_H
24 #include "config.h"
25 #endif
27 #include <string.h>
29 #include <glib.h>
30 #include <gio/gio.h>
32 #include "sipe-backend.h"
33 #include "sipe-common.h"
34 #include "sipe-core.h"
35 #include "sipe-nls.h"
37 #include "telepathy-private.h"
39 struct sipe_transport_telepathy {
40 /* public part shared with core */
41 struct sipe_transport_connection public;
43 /* telepathy private part */
44 transport_connected_cb *connected;
45 transport_input_cb *input;
46 transport_error_cb *error;
47 struct sipe_backend_private *private;
48 GSocketConnection *socket;
49 GInputStream *istream;
50 GOutputStream *ostream;
51 GSList *buffers;
52 gboolean is_writing;
55 #define TELEPATHY_TRANSPORT ((struct sipe_transport_telepathy *) conn)
56 #define SIPE_TRANSPORT_CONNECTION ((struct sipe_transport_connection *) transport)
58 #define BUFFER_SIZE_INCREMENT 4096
60 static void read_completed(GObject *stream,
61 GAsyncResult *result,
62 gpointer data)
64 struct sipe_transport_telepathy *transport = data;
65 struct sipe_transport_connection *conn = SIPE_TRANSPORT_CONNECTION;
67 if (conn->buffer_length < conn->buffer_used + BUFFER_SIZE_INCREMENT) {
68 conn->buffer_length += BUFFER_SIZE_INCREMENT;
69 conn->buffer = g_realloc(conn->buffer, conn->buffer_length);
70 SIPE_DEBUG_INFO("read_completed: new buffer length %" G_GSIZE_FORMAT,
71 conn->buffer_length);
74 /* callback result is valid */
75 if (result) {
76 GError *error = NULL;
77 gssize len = g_input_stream_read_finish(G_INPUT_STREAM(stream),
78 result,
79 &error);
81 if (len < 0) {
82 SIPE_DEBUG_ERROR("read error: %s", error->message);
83 transport->error(conn, error->message);
84 g_error_free(error);
85 return;
86 } else if (len == 0) {
87 SIPE_DEBUG_ERROR_NOFORMAT("Server has disconnected");
88 transport->error(conn, _("Server has disconnected"));
89 return;
92 /* Forward data to core */
93 conn->buffer_used += len;
94 conn->buffer[conn->buffer_used] = '\0';
95 transport->input(conn);
98 /* setup next read */
99 g_input_stream_read_async(G_INPUT_STREAM(stream),
100 conn->buffer + conn->buffer_used,
101 conn->buffer_length - conn->buffer_used - 1,
102 G_PRIORITY_DEFAULT,
103 NULL,
104 read_completed,
105 transport);
108 static void socket_connected(GObject *client,
109 GAsyncResult *result,
110 gpointer data)
112 struct sipe_transport_telepathy *transport = data;
113 GError *error = NULL;
115 transport->socket = g_socket_client_connect_finish(G_SOCKET_CLIENT(client),
116 result,
117 &error);
119 if (transport->socket) {
120 GSocketAddress *saddr = g_socket_connection_get_local_address(transport->socket,
121 &error);
123 if (saddr) {
124 SIPE_DEBUG_INFO_NOFORMAT("socket_connected: success");
126 transport->public.client_port = g_inet_socket_address_get_port(G_INET_SOCKET_ADDRESS(saddr));
127 g_object_unref(saddr);
129 transport->istream = g_io_stream_get_input_stream(G_IO_STREAM(transport->socket));
130 transport->ostream = g_io_stream_get_output_stream(G_IO_STREAM(transport->socket));
131 transport->buffers = NULL;
132 transport->is_writing = FALSE;
134 /* this sets up the async read handler */
135 read_completed(G_OBJECT(transport->istream), NULL, transport);
136 transport->connected(SIPE_TRANSPORT_CONNECTION);
137 } else {
138 g_object_unref(transport->socket);
139 transport->socket = NULL;
140 SIPE_DEBUG_ERROR("socket_connected: failed: %s", error->message);
141 transport->error(SIPE_TRANSPORT_CONNECTION, error->message);
142 g_error_free(error);
144 } else {
145 SIPE_DEBUG_ERROR("socket_connected: failed: %s", error->message);
146 transport->error(SIPE_TRANSPORT_CONNECTION, error->message);
147 g_error_free(error);
151 struct sipe_transport_connection *sipe_backend_transport_connect(struct sipe_core_public *sipe_public,
152 const sipe_connect_setup *setup)
154 struct sipe_transport_telepathy *transport = g_new0(struct sipe_transport_telepathy, 1);
155 struct sipe_backend_private *telepathy_private = sipe_public->backend_private;
157 SIPE_DEBUG_INFO("sipe_backend_transport_connect - hostname: %s port: %d",
158 setup->server_name, setup->server_port);
160 transport->public.type = setup->type;
161 transport->public.user_data = setup->user_data;
162 transport->connected = setup->connected;
163 transport->input = setup->input;
164 transport->error = setup->error;
165 transport->private = telepathy_private;
167 if ((setup->type == SIPE_TRANSPORT_TLS) ||
168 (setup->type == SIPE_TRANSPORT_TCP)) {
169 GSocketClient *client = g_socket_client_new();
171 /* request TLS connection */
172 if (setup->type == SIPE_TRANSPORT_TLS) {
173 SIPE_DEBUG_INFO_NOFORMAT("using TLS");
174 g_socket_client_set_tls(client,
175 setup->type == SIPE_TRANSPORT_TLS);
176 /* @TODO certificate handling - now accept all*/
177 g_socket_client_set_tls_validation_flags(client, 0);
178 } else
179 SIPE_DEBUG_INFO_NOFORMAT("using TCP");
181 g_socket_client_connect_async(client,
182 g_network_address_new(setup->server_name,
183 setup->server_port),
184 NULL,
185 socket_connected,
186 transport);
187 g_object_unref(client);
188 } else {
189 setup->error(SIPE_TRANSPORT_CONNECTION,
190 "This should not happen...");
191 sipe_backend_transport_disconnect(SIPE_TRANSPORT_CONNECTION);
192 return(NULL);
195 /* the first connection is always to the server */
196 if (telepathy_private->transport == NULL)
197 telepathy_private->transport = transport;
199 return(SIPE_TRANSPORT_CONNECTION);
202 void sipe_backend_transport_disconnect(struct sipe_transport_connection *conn)
204 struct sipe_transport_telepathy *transport = TELEPATHY_TRANSPORT;
205 GSList *entry;
207 if (!transport) return;
209 for (entry = transport->buffers; entry; entry = entry->next)
210 g_free(entry->data);
211 g_slist_free(transport->buffers);
212 transport->buffers = NULL;
213 transport->is_writing = FALSE;
215 if (transport->socket)
216 g_object_unref(transport->socket);
217 transport->socket = NULL;
219 /* connection to the server dropped? */
220 if (transport->private->transport == transport)
221 transport->private->transport = NULL;
223 g_free(transport);
226 static void do_write(struct sipe_transport_telepathy *transport,
227 const gchar *buffer);
228 static void write_completed(GObject *stream,
229 GAsyncResult *result,
230 gpointer data)
232 struct sipe_transport_telepathy *transport = data;
233 GError *error = NULL;
235 g_output_stream_write_finish(G_OUTPUT_STREAM(stream), result, &error);
237 if (error) {
238 SIPE_DEBUG_ERROR("write error: %s", error->message);
239 transport->error(SIPE_TRANSPORT_CONNECTION, error->message);
240 g_error_free(error);
241 } else {
242 /* more to write? */
243 if (transport->buffers) {
244 /* yes */
245 gchar *buffer = transport->buffers->data;
246 transport->buffers = g_slist_remove(transport->buffers,
247 buffer);
248 do_write(transport, buffer);
249 g_free(buffer);
250 } else
251 /* no, we're done for now... */
252 transport->is_writing = FALSE;
256 static void do_write(struct sipe_transport_telepathy *transport,
257 const gchar *buffer)
259 transport->is_writing = TRUE;
260 g_output_stream_write_async(transport->ostream,
261 buffer,
262 strlen(buffer),
263 G_PRIORITY_DEFAULT,
264 NULL,
265 write_completed,
266 transport);
269 void sipe_backend_transport_message(struct sipe_transport_connection *conn,
270 const gchar *buffer)
272 struct sipe_transport_telepathy *transport = TELEPATHY_TRANSPORT;
274 /* currently writing? */
275 if (transport->is_writing) {
276 /* yes, append copy of buffer to list */
277 transport->buffers = g_slist_append(transport->buffers,
278 g_strdup(buffer));
279 } else
280 /* no, write directly to stream */
281 do_write(transport, buffer);
284 void sipe_backend_transport_flush(SIPE_UNUSED_PARAMETER struct sipe_transport_connection *conn)
286 /* @TODO? */
289 const gchar *sipe_backend_network_ip_address(struct sipe_core_public *sipe_public)
291 struct sipe_backend_private *telepathy_private = sipe_public->backend_private;
292 const gchar *ipstr = telepathy_private->ipaddress;
294 /* address cached? */
295 if (!ipstr) {
296 struct sipe_transport_telepathy *transport = telepathy_private->transport;
298 /* default if everything should fail */
299 ipstr = "127.0.0.1";
301 /* connection to server established - get local IP from socket */
302 if (transport && transport->socket) {
303 GSocketAddress *saddr = g_socket_connection_get_local_address(transport->socket,
304 NULL);
306 if (saddr) {
307 GInetAddress *iaddr = g_inet_socket_address_get_address(G_INET_SOCKET_ADDRESS(saddr));
309 if (iaddr) {
310 /* cache address string */
311 ipstr = telepathy_private->ipaddress = g_inet_address_to_string(iaddr);
312 SIPE_DEBUG_INFO("sipe_backend_network_ip_address: %s", ipstr);
314 g_object_unref(saddr);
319 return(ipstr);
324 Local Variables:
325 mode: c
326 c-file-style: "bsd"
327 indent-tabs-mode: t
328 tab-width: 8
329 End: