2 * @file telepathy-transport.c
6 * Copyright (C) 2012-2013 SIPE Project <http://sipe.sourceforge.net/>
8 * This program is free software; you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License as published by
10 * the Free Software Foundation; either version 2 of the License, or
11 * (at your option) any later version.
13 * This program is distributed in the hope that it will be useful,
14 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 * GNU General Public License for more details.
18 * You should have received a copy of the GNU General Public License
19 * along with this program; if not, write to the Free Software
20 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
32 #include "sipe-backend.h"
33 #include "sipe-common.h"
34 #include "sipe-core.h"
37 #include "telepathy-private.h"
39 struct sipe_transport_telepathy
{
40 /* public part shared with core */
41 struct sipe_transport_connection
public;
43 /* telepathy private part */
44 transport_connected_cb
*connected
;
45 transport_input_cb
*input
;
46 transport_error_cb
*error
;
47 struct sipe_backend_private
*private;
49 GSocketConnection
*socket
;
50 GInputStream
*istream
;
51 GOutputStream
*ostream
;
55 gboolean wait_for_user
;
58 #define TELEPATHY_TRANSPORT ((struct sipe_transport_telepathy *) conn)
59 #define SIPE_TRANSPORT_CONNECTION ((struct sipe_transport_connection *) transport)
61 #define BUFFER_SIZE_INCREMENT 4096
63 static void read_completed(GObject
*stream
,
67 struct sipe_transport_telepathy
*transport
= data
;
68 struct sipe_transport_connection
*conn
= SIPE_TRANSPORT_CONNECTION
;
71 if (conn
->buffer_length
< conn
->buffer_used
+ BUFFER_SIZE_INCREMENT
) {
72 conn
->buffer_length
+= BUFFER_SIZE_INCREMENT
;
73 conn
->buffer
= g_realloc(conn
->buffer
, conn
->buffer_length
);
74 SIPE_DEBUG_INFO("read_completed: new buffer length %" G_GSIZE_FORMAT
,
78 /* callback result is valid */
81 gssize len
= g_input_stream_read_finish(G_INPUT_STREAM(stream
),
86 const gchar
*msg
= error
? error
->message
: "UNKNOWN";
87 SIPE_DEBUG_ERROR("read_completed: error: %s", msg
);
89 transport
->error(conn
, msg
);
92 } else if (len
== 0) {
93 SIPE_DEBUG_ERROR_NOFORMAT("read_completed: server has disconnected");
94 transport
->error(conn
, _("Server has disconnected"));
96 } else if (transport
->do_flush
) {
97 /* read completed while disconnected transport is flushing */
98 SIPE_DEBUG_INFO_NOFORMAT("read_completed: ignored during flushing");
100 } else if (g_cancellable_is_cancelled(transport
->cancel
)) {
101 /* read completed when transport was disconnected */
102 SIPE_DEBUG_INFO_NOFORMAT("read_completed: cancelled");
106 /* Forward data to core */
107 conn
->buffer_used
+= len
;
108 conn
->buffer
[conn
->buffer_used
] = '\0';
109 transport
->input(conn
);
111 /* we processed the result */
115 /* buffer too short? */
116 } while (conn
->buffer_length
- conn
->buffer_used
- 1 == 0);
118 /* setup next read */
119 g_input_stream_read_async(G_INPUT_STREAM(stream
),
120 conn
->buffer
+ conn
->buffer_used
,
121 conn
->buffer_length
- conn
->buffer_used
- 1,
128 static void socket_connected(GObject
*client
,
129 GAsyncResult
*result
,
132 struct sipe_transport_telepathy
*transport
= data
;
133 GError
*error
= NULL
;
135 transport
->socket
= g_socket_client_connect_finish(G_SOCKET_CLIENT(client
),
139 if (transport
->socket
== NULL
) {
140 if (transport
->wait_for_user
) {
141 SIPE_DEBUG_INFO_NOFORMAT("socket_connected: need to wait for user interaction");
142 /* @TODO: trigger user interaction */
144 const gchar
*msg
= error
? error
->message
: "UNKNOWN";
145 SIPE_DEBUG_ERROR("socket_connected: failed: %s", msg
);
146 if (transport
->error
)
147 transport
->error(SIPE_TRANSPORT_CONNECTION
, msg
);
150 } else if (g_cancellable_is_cancelled(transport
->cancel
)) {
151 /* connect already succeeded when transport was disconnected */
152 g_object_unref(transport
->socket
);
153 transport
->socket
= NULL
;
154 SIPE_DEBUG_INFO_NOFORMAT("socket_connected: succeeded, but cancelled");
156 GSocketAddress
*saddr
= g_socket_connection_get_local_address(transport
->socket
,
160 SIPE_DEBUG_INFO_NOFORMAT("socket_connected: success");
162 transport
->public.client_port
= g_inet_socket_address_get_port(G_INET_SOCKET_ADDRESS(saddr
));
163 g_object_unref(saddr
);
165 transport
->istream
= g_io_stream_get_input_stream(G_IO_STREAM(transport
->socket
));
166 transport
->ostream
= g_io_stream_get_output_stream(G_IO_STREAM(transport
->socket
));
168 /* the first connection is always to the server */
169 if (transport
->private->transport
== NULL
)
170 transport
->private->transport
= transport
;
172 /* this sets up the async read handler */
173 read_completed(G_OBJECT(transport
->istream
), NULL
, transport
);
174 transport
->connected(SIPE_TRANSPORT_CONNECTION
);
177 g_object_unref(transport
->socket
);
178 transport
->socket
= NULL
;
179 SIPE_DEBUG_ERROR("socket_connected: failed: %s", error
->message
);
180 transport
->error(SIPE_TRANSPORT_CONNECTION
, error
->message
);
186 static void certificate_result(SIPE_UNUSED_PARAMETER GObject
*unused
,
187 SIPE_UNUSED_PARAMETER GAsyncResult
*res
,
190 struct sipe_transport_telepathy
*transport
= data
;
192 SIPE_DEBUG_INFO("certificate_result: %p", transport
);
194 /* @TODO: take action based on result */
197 static gboolean
accept_certificate_signal(GTlsConnection
*tls
,
198 SIPE_UNUSED_PARAMETER GTlsCertificate
*peer_cert
,
199 SIPE_UNUSED_PARAMETER GTlsCertificateFlags errors
,
202 struct sipe_transport_telepathy
*transport
= user_data
;
204 SIPE_DEBUG_INFO("accept_certificate_signal: %p", tls
);
206 /* second connection attempt after feedback from user? */
207 if (transport
->wait_for_user
) {
208 /* user accepted certificate */
209 transport
->wait_for_user
= FALSE
;
212 /* retry after user accepted certificate */
213 transport
->wait_for_user
= TRUE
;
214 /* @TODO: set up correct parameters */
215 sipe_telepathy_tls_verify_async(G_OBJECT(transport
->private->connection
),
224 static void tls_handshake_starts(SIPE_UNUSED_PARAMETER GSocketClient
*client
,
225 GSocketClientEvent event
,
226 SIPE_UNUSED_PARAMETER GSocketConnectable
*connectable
,
227 GIOStream
*connection
,
230 if (event
== G_SOCKET_CLIENT_TLS_HANDSHAKING
) {
231 SIPE_DEBUG_INFO("tls_handshake_starts: %p", connection
);
232 g_signal_connect(connection
, /* is a GTlsConnection */
233 "accept-certificate",
234 G_CALLBACK(accept_certificate_signal
),
239 struct sipe_transport_connection
*sipe_backend_transport_connect(struct sipe_core_public
*sipe_public
,
240 const sipe_connect_setup
*setup
)
242 struct sipe_transport_telepathy
*transport
= g_new0(struct sipe_transport_telepathy
, 1);
244 SIPE_DEBUG_INFO("sipe_backend_transport_connect - hostname: %s port: %d",
245 setup
->server_name
, setup
->server_port
);
247 transport
->public.type
= setup
->type
;
248 transport
->public.user_data
= setup
->user_data
;
249 transport
->connected
= setup
->connected
;
250 transport
->input
= setup
->input
;
251 transport
->error
= setup
->error
;
252 transport
->private = sipe_public
->backend_private
;
253 transport
->buffers
= NULL
;
254 transport
->is_writing
= FALSE
;
255 transport
->do_flush
= FALSE
;
256 transport
->wait_for_user
= FALSE
;
258 if ((setup
->type
== SIPE_TRANSPORT_TLS
) ||
259 (setup
->type
== SIPE_TRANSPORT_TCP
)) {
260 GSocketClient
*client
= g_socket_client_new();
262 /* request TLS connection */
263 if (setup
->type
== SIPE_TRANSPORT_TLS
) {
264 SIPE_DEBUG_INFO_NOFORMAT("using TLS");
265 g_socket_client_set_tls(client
, TRUE
);
266 g_signal_connect(client
,
268 G_CALLBACK(tls_handshake_starts
),
271 SIPE_DEBUG_INFO_NOFORMAT("using TCP");
273 transport
->cancel
= g_cancellable_new();
274 g_socket_client_connect_async(client
,
275 g_network_address_new(setup
->server_name
,
280 g_object_unref(client
);
282 setup
->error(SIPE_TRANSPORT_CONNECTION
,
283 "This should not happen...");
284 sipe_backend_transport_disconnect(SIPE_TRANSPORT_CONNECTION
);
288 return(SIPE_TRANSPORT_CONNECTION
);
291 static gboolean
free_transport(gpointer data
)
293 struct sipe_transport_telepathy
*transport
= data
;
296 SIPE_DEBUG_INFO("free_transport %p", transport
);
298 /* free unflushed buffers */
299 for (entry
= transport
->buffers
; entry
; entry
= entry
->next
)
301 g_slist_free(transport
->buffers
);
303 if (transport
->cancel
)
304 g_object_unref(transport
->cancel
);
311 static void close_completed(GObject
*stream
,
312 GAsyncResult
*result
,
315 struct sipe_transport_telepathy
*transport
= data
;
316 SIPE_DEBUG_INFO("close_completed: transport %p", data
);
317 g_io_stream_close_finish(G_IO_STREAM(stream
), result
, NULL
);
318 g_idle_add(free_transport
, transport
);
321 static void do_close(struct sipe_transport_telepathy
*transport
)
323 SIPE_DEBUG_INFO("do_close: %p", transport
);
325 /* cancel outstanding asynchronous operations */
326 transport
->do_flush
= FALSE
;
327 g_cancellable_cancel(transport
->cancel
);
328 g_io_stream_close_async(G_IO_STREAM(transport
->socket
),
335 void sipe_backend_transport_disconnect(struct sipe_transport_connection
*conn
)
337 struct sipe_transport_telepathy
*transport
= TELEPATHY_TRANSPORT
;
339 if (!transport
) return;
341 SIPE_DEBUG_INFO("sipe_backend_transport_disconnect: %p", transport
);
343 /* error callback is invalid now, do no longer call! */
344 transport
->error
= NULL
;
346 /* dropping connection to the server? */
347 if (transport
->private->transport
== transport
)
348 transport
->private->transport
= NULL
;
350 /* already connected? */
351 if (transport
->socket
) {
353 /* flush required? */
354 if (transport
->do_flush
&& transport
->is_writing
)
355 SIPE_DEBUG_INFO("sipe_backend_transport_disconnect: %p needs flushing",
361 /* cancel outstanding connect operation */
362 if (transport
->cancel
)
363 g_cancellable_cancel(transport
->cancel
);
365 /* queue transport to be deleted */
366 g_idle_add(free_transport
, transport
);
370 static void do_write(struct sipe_transport_telepathy
*transport
,
371 const gchar
*buffer
);
372 static void write_completed(GObject
*stream
,
373 GAsyncResult
*result
,
376 struct sipe_transport_telepathy
*transport
= data
;
377 GError
*error
= NULL
;
378 gssize written
= g_output_stream_write_finish(G_OUTPUT_STREAM(stream
),
382 if ((written
< 0) || error
) {
383 const gchar
*msg
= error
? error
->message
: "UNKNOWN";
384 SIPE_DEBUG_ERROR("write_completed: error: %s", msg
);
385 if (transport
->error
)
386 transport
->error(SIPE_TRANSPORT_CONNECTION
, msg
);
389 /* error during flush: give up and close transport */
390 if (transport
->do_flush
)
393 } else if (g_cancellable_is_cancelled(transport
->cancel
)) {
394 /* write completed when transport was disconnected */
395 SIPE_DEBUG_INFO_NOFORMAT("write_completed: cancelled");
396 transport
->is_writing
= FALSE
;
399 if (transport
->buffers
) {
401 gchar
*buffer
= transport
->buffers
->data
;
402 transport
->buffers
= g_slist_remove(transport
->buffers
,
404 do_write(transport
, buffer
);
407 /* no, we're done for now... */
408 transport
->is_writing
= FALSE
;
410 /* flush completed */
411 if (transport
->do_flush
)
417 static void do_write(struct sipe_transport_telepathy
*transport
,
420 transport
->is_writing
= TRUE
;
421 g_output_stream_write_async(transport
->ostream
,
430 void sipe_backend_transport_message(struct sipe_transport_connection
*conn
,
433 struct sipe_transport_telepathy
*transport
= TELEPATHY_TRANSPORT
;
435 /* currently writing? */
436 if (transport
->is_writing
) {
437 /* yes, append copy of buffer to list */
438 transport
->buffers
= g_slist_append(transport
->buffers
,
441 /* no, write directly to stream */
442 do_write(transport
, buffer
);
445 void sipe_backend_transport_flush(struct sipe_transport_connection
*conn
)
447 struct sipe_transport_telepathy
*transport
= TELEPATHY_TRANSPORT
;
448 transport
->do_flush
= TRUE
;
451 const gchar
*sipe_backend_network_ip_address(struct sipe_core_public
*sipe_public
)
453 struct sipe_backend_private
*telepathy_private
= sipe_public
->backend_private
;
454 const gchar
*ipstr
= telepathy_private
->ipaddress
;
456 /* address cached? */
458 struct sipe_transport_telepathy
*transport
= telepathy_private
->transport
;
460 /* default if everything should fail */
463 /* connection to server established - get local IP from socket */
464 if (transport
&& transport
->socket
) {
465 GSocketAddress
*saddr
= g_socket_connection_get_local_address(transport
->socket
,
469 GInetAddress
*iaddr
= g_inet_socket_address_get_address(G_INET_SOCKET_ADDRESS(saddr
));
472 /* cache address string */
473 ipstr
= telepathy_private
->ipaddress
= g_inet_address_to_string(iaddr
);
474 SIPE_DEBUG_INFO("sipe_backend_network_ip_address: %s", ipstr
);
476 g_object_unref(saddr
);