transport: remove temporary re-subscription fix
[siplcs.git] / src / telepathy / telepathy-transport.c
blob9a625196176c8113a717e7e659e0d8e6e319dc45
1 /**
2 * @file telepathy-transport.c
4 * pidgin-sipe
6 * Copyright (C) 2012 SIPE Project <http://sipe.sourceforge.net/>
8 * This program is free software; you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License as published by
10 * the Free Software Foundation; either version 2 of the License, or
11 * (at your option) any later version.
13 * This program is distributed in the hope that it will be useful,
14 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 * GNU General Public License for more details.
18 * You should have received a copy of the GNU General Public License
19 * along with this program; if not, write to the Free Software
20 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
23 #ifdef HAVE_CONFIG_H
24 #include "config.h"
25 #endif
27 #include <string.h>
29 #include <glib.h>
30 #include <gio/gio.h>
32 #include "sipe-backend.h"
33 #include "sipe-core.h"
34 #include "sipe-nls.h"
36 #include "telepathy-private.h"
38 struct sipe_transport_telepathy {
39 /* public part shared with core */
40 struct sipe_transport_connection public;
42 /* telepathy private part */
43 transport_connected_cb *connected;
44 transport_input_cb *input;
45 transport_error_cb *error;
46 struct sipe_backend_private *private;
47 GCancellable *cancel;
48 GSocketConnection *socket;
49 GInputStream *istream;
50 GOutputStream *ostream;
51 GSList *buffers;
52 gboolean is_writing;
53 gboolean do_flush;
56 #define TELEPATHY_TRANSPORT ((struct sipe_transport_telepathy *) conn)
57 #define SIPE_TRANSPORT_CONNECTION ((struct sipe_transport_connection *) transport)
59 #define BUFFER_SIZE_INCREMENT 4096
61 static void read_completed(GObject *stream,
62 GAsyncResult *result,
63 gpointer data)
65 struct sipe_transport_telepathy *transport = data;
66 struct sipe_transport_connection *conn = SIPE_TRANSPORT_CONNECTION;
68 do {
69 if (conn->buffer_length < conn->buffer_used + BUFFER_SIZE_INCREMENT) {
70 conn->buffer_length += BUFFER_SIZE_INCREMENT;
71 conn->buffer = g_realloc(conn->buffer, conn->buffer_length);
72 SIPE_DEBUG_INFO("read_completed: new buffer length %" G_GSIZE_FORMAT,
73 conn->buffer_length);
76 /* callback result is valid */
77 if (result) {
78 GError *error = NULL;
79 gssize len = g_input_stream_read_finish(G_INPUT_STREAM(stream),
80 result,
81 &error);
83 if (len < 0) {
84 const gchar *msg = error ? error->message : "UNKNOWN";
85 SIPE_DEBUG_ERROR("read_completed: error: %s", msg);
86 if (transport->error)
87 transport->error(conn, msg);
88 g_error_free(error);
89 return;
90 } else if (len == 0) {
91 SIPE_DEBUG_ERROR_NOFORMAT("read_completed: server has disconnected");
92 transport->error(conn, _("Server has disconnected"));
93 return;
94 } else if (transport->do_flush) {
95 /* read completed while disconnected transport is flushing */
96 SIPE_DEBUG_INFO_NOFORMAT("read_completed: ignored during flushing");
97 return;
98 } else if (g_cancellable_is_cancelled(transport->cancel)) {
99 /* read completed when transport was disconnected */
100 SIPE_DEBUG_INFO_NOFORMAT("read_completed: cancelled");
101 return;
104 /* Forward data to core */
105 conn->buffer_used += len;
106 conn->buffer[conn->buffer_used] = '\0';
107 transport->input(conn);
109 /* we processed the result */
110 result = NULL;
113 /* buffer too short? */
114 } while (conn->buffer_length - conn->buffer_used - 1 == 0);
116 /* setup next read */
117 g_input_stream_read_async(G_INPUT_STREAM(stream),
118 conn->buffer + conn->buffer_used,
119 conn->buffer_length - conn->buffer_used - 1,
120 G_PRIORITY_DEFAULT,
121 transport->cancel,
122 read_completed,
123 transport);
126 static void socket_connected(GObject *client,
127 GAsyncResult *result,
128 gpointer data)
130 struct sipe_transport_telepathy *transport = data;
131 GError *error = NULL;
133 transport->socket = g_socket_client_connect_finish(G_SOCKET_CLIENT(client),
134 result,
135 &error);
137 if (transport->socket == NULL) {
138 const gchar *msg = error ? error->message : "UNKNOWN";
139 SIPE_DEBUG_ERROR("socket_connected: failed: %s", msg);
140 if (transport->error)
141 transport->error(SIPE_TRANSPORT_CONNECTION, msg);
142 g_error_free(error);
143 } else if (g_cancellable_is_cancelled(transport->cancel)) {
144 /* connect already succeeded when transport was disconnected */
145 g_object_unref(transport->socket);
146 transport->socket = NULL;
147 SIPE_DEBUG_INFO_NOFORMAT("socket_connected: succeeded, but cancelled");
148 } else {
149 GSocketAddress *saddr = g_socket_connection_get_local_address(transport->socket,
150 &error);
152 if (saddr) {
153 SIPE_DEBUG_INFO_NOFORMAT("socket_connected: success");
155 transport->public.client_port = g_inet_socket_address_get_port(G_INET_SOCKET_ADDRESS(saddr));
156 g_object_unref(saddr);
158 transport->istream = g_io_stream_get_input_stream(G_IO_STREAM(transport->socket));
159 transport->ostream = g_io_stream_get_output_stream(G_IO_STREAM(transport->socket));
161 /* the first connection is always to the server */
162 if (transport->private->transport == NULL)
163 transport->private->transport = transport;
165 /* this sets up the async read handler */
166 read_completed(G_OBJECT(transport->istream), NULL, transport);
167 transport->connected(SIPE_TRANSPORT_CONNECTION);
169 } else {
170 g_object_unref(transport->socket);
171 transport->socket = NULL;
172 SIPE_DEBUG_ERROR("socket_connected: failed: %s", error->message);
173 transport->error(SIPE_TRANSPORT_CONNECTION, error->message);
174 g_error_free(error);
179 struct sipe_transport_connection *sipe_backend_transport_connect(struct sipe_core_public *sipe_public,
180 const sipe_connect_setup *setup)
182 struct sipe_transport_telepathy *transport = g_new0(struct sipe_transport_telepathy, 1);
184 SIPE_DEBUG_INFO("sipe_backend_transport_connect - hostname: %s port: %d",
185 setup->server_name, setup->server_port);
187 transport->public.type = setup->type;
188 transport->public.user_data = setup->user_data;
189 transport->connected = setup->connected;
190 transport->input = setup->input;
191 transport->error = setup->error;
192 transport->private = sipe_public->backend_private;
193 transport->buffers = NULL;
194 transport->is_writing = FALSE;
195 transport->do_flush = FALSE;
197 if ((setup->type == SIPE_TRANSPORT_TLS) ||
198 (setup->type == SIPE_TRANSPORT_TCP)) {
199 GSocketClient *client = g_socket_client_new();
201 /* request TLS connection */
202 if (setup->type == SIPE_TRANSPORT_TLS) {
203 SIPE_DEBUG_INFO_NOFORMAT("using TLS");
204 g_socket_client_set_tls(client,
205 setup->type == SIPE_TRANSPORT_TLS);
206 /* @TODO certificate handling - now accept all*/
207 g_socket_client_set_tls_validation_flags(client, 0);
208 } else
209 SIPE_DEBUG_INFO_NOFORMAT("using TCP");
211 transport->cancel = g_cancellable_new();
212 g_socket_client_connect_async(client,
213 g_network_address_new(setup->server_name,
214 setup->server_port),
215 transport->cancel,
216 socket_connected,
217 transport);
218 g_object_unref(client);
219 } else {
220 setup->error(SIPE_TRANSPORT_CONNECTION,
221 "This should not happen...");
222 sipe_backend_transport_disconnect(SIPE_TRANSPORT_CONNECTION);
223 return(NULL);
226 return(SIPE_TRANSPORT_CONNECTION);
229 static gboolean free_transport(gpointer data)
231 struct sipe_transport_telepathy *transport = data;
232 GSList *entry;
234 SIPE_DEBUG_INFO("free_transport %p", transport);
236 /* free unflushed buffers */
237 for (entry = transport->buffers; entry; entry = entry->next)
238 g_free(entry->data);
239 g_slist_free(transport->buffers);
241 if (transport->cancel)
242 g_object_unref(transport->cancel);
244 g_free(transport);
246 return(FALSE);
249 static void close_completed(GObject *stream,
250 GAsyncResult *result,
251 gpointer data)
253 struct sipe_transport_telepathy *transport = data;
254 SIPE_DEBUG_INFO("close_completed: transport %p", data);
255 g_io_stream_close_finish(G_IO_STREAM(stream), result, NULL);
256 g_idle_add(free_transport, transport);
259 static void do_close(struct sipe_transport_telepathy *transport)
261 SIPE_DEBUG_INFO("do_close: %p", transport);
263 /* cancel outstanding asynchronous operations */
264 transport->do_flush = FALSE;
265 g_cancellable_cancel(transport->cancel);
266 g_io_stream_close_async(G_IO_STREAM(transport->socket),
267 G_PRIORITY_DEFAULT,
268 NULL,
269 close_completed,
270 transport);
273 void sipe_backend_transport_disconnect(struct sipe_transport_connection *conn)
275 struct sipe_transport_telepathy *transport = TELEPATHY_TRANSPORT;
277 if (!transport) return;
279 SIPE_DEBUG_INFO("sipe_backend_transport_disconnect: %p", transport);
281 /* error callback is invalid now, do no longer call! */
282 transport->error = NULL;
284 /* dropping connection to the server? */
285 if (transport->private->transport == transport)
286 transport->private->transport = NULL;
288 /* already connected? */
289 if (transport->socket) {
291 /* flush required? */
292 if (transport->do_flush && transport->is_writing)
293 SIPE_DEBUG_INFO("sipe_backend_transport_disconnect: %p needs flushing",
294 transport);
295 else
296 do_close(transport);
298 } else {
299 /* cancel outstanding connect operation */
300 if (transport->cancel)
301 g_cancellable_cancel(transport->cancel);
303 /* queue transport to be deleted */
304 g_idle_add(free_transport, transport);
308 static void do_write(struct sipe_transport_telepathy *transport,
309 const gchar *buffer);
310 static void write_completed(GObject *stream,
311 GAsyncResult *result,
312 gpointer data)
314 struct sipe_transport_telepathy *transport = data;
315 GError *error = NULL;
316 gssize written = g_output_stream_write_finish(G_OUTPUT_STREAM(stream),
317 result,
318 &error);
320 if ((written < 0) || error) {
321 const gchar *msg = error ? error->message : "UNKNOWN";
322 SIPE_DEBUG_ERROR("write_completed: error: %s", msg);
323 if (transport->error)
324 transport->error(SIPE_TRANSPORT_CONNECTION, msg);
325 g_error_free(error);
327 /* error during flush: give up and close transport */
328 if (transport->do_flush)
329 do_close(transport);
331 } else if (g_cancellable_is_cancelled(transport->cancel)) {
332 /* write completed when transport was disconnected */
333 SIPE_DEBUG_INFO_NOFORMAT("write_completed: cancelled");
334 transport->is_writing = FALSE;
335 } else {
336 /* more to write? */
337 if (transport->buffers) {
338 /* yes */
339 gchar *buffer = transport->buffers->data;
340 transport->buffers = g_slist_remove(transport->buffers,
341 buffer);
342 do_write(transport, buffer);
343 g_free(buffer);
344 } else {
345 /* no, we're done for now... */
346 transport->is_writing = FALSE;
348 /* flush completed */
349 if (transport->do_flush)
350 do_close(transport);
355 static void do_write(struct sipe_transport_telepathy *transport,
356 const gchar *buffer)
358 transport->is_writing = TRUE;
359 g_output_stream_write_async(transport->ostream,
360 buffer,
361 strlen(buffer),
362 G_PRIORITY_DEFAULT,
363 transport->cancel,
364 write_completed,
365 transport);
368 void sipe_backend_transport_message(struct sipe_transport_connection *conn,
369 const gchar *buffer)
371 struct sipe_transport_telepathy *transport = TELEPATHY_TRANSPORT;
373 /* currently writing? */
374 if (transport->is_writing) {
375 /* yes, append copy of buffer to list */
376 transport->buffers = g_slist_append(transport->buffers,
377 g_strdup(buffer));
378 } else
379 /* no, write directly to stream */
380 do_write(transport, buffer);
383 void sipe_backend_transport_flush(struct sipe_transport_connection *conn)
385 struct sipe_transport_telepathy *transport = TELEPATHY_TRANSPORT;
386 transport->do_flush = TRUE;
389 const gchar *sipe_backend_network_ip_address(struct sipe_core_public *sipe_public)
391 struct sipe_backend_private *telepathy_private = sipe_public->backend_private;
392 const gchar *ipstr = telepathy_private->ipaddress;
394 /* address cached? */
395 if (!ipstr) {
396 struct sipe_transport_telepathy *transport = telepathy_private->transport;
398 /* default if everything should fail */
399 ipstr = "127.0.0.1";
401 /* connection to server established - get local IP from socket */
402 if (transport && transport->socket) {
403 GSocketAddress *saddr = g_socket_connection_get_local_address(transport->socket,
404 NULL);
406 if (saddr) {
407 GInetAddress *iaddr = g_inet_socket_address_get_address(G_INET_SOCKET_ADDRESS(saddr));
409 if (iaddr) {
410 /* cache address string */
411 ipstr = telepathy_private->ipaddress = g_inet_address_to_string(iaddr);
412 SIPE_DEBUG_INFO("sipe_backend_network_ip_address: %s", ipstr);
414 g_object_unref(saddr);
419 return(ipstr);
424 Local Variables:
425 mode: c
426 c-file-style: "bsd"
427 indent-tabs-mode: t
428 tab-width: 8
429 End: