2 * @file telepathy-transport.c
6 * Copyright (C) 2012 SIPE Project <http://sipe.sourceforge.net/>
8 * This program is free software; you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License as published by
10 * the Free Software Foundation; either version 2 of the License, or
11 * (at your option) any later version.
13 * This program is distributed in the hope that it will be useful,
14 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 * GNU General Public License for more details.
18 * You should have received a copy of the GNU General Public License
19 * along with this program; if not, write to the Free Software
20 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
32 #include "sipe-backend.h"
33 #include "sipe-core.h"
36 #include "telepathy-private.h"
38 struct sipe_transport_telepathy
{
39 /* public part shared with core */
40 struct sipe_transport_connection
public;
42 /* telepathy private part */
43 transport_connected_cb
*connected
;
44 transport_input_cb
*input
;
45 transport_error_cb
*error
;
46 struct sipe_backend_private
*private;
48 GSocketConnection
*socket
;
49 GInputStream
*istream
;
50 GOutputStream
*ostream
;
56 #define TELEPATHY_TRANSPORT ((struct sipe_transport_telepathy *) conn)
57 #define SIPE_TRANSPORT_CONNECTION ((struct sipe_transport_connection *) transport)
59 #define BUFFER_SIZE_INCREMENT 4096
61 static void read_completed(GObject
*stream
,
65 struct sipe_transport_telepathy
*transport
= data
;
66 struct sipe_transport_connection
*conn
= SIPE_TRANSPORT_CONNECTION
;
69 if (conn
->buffer_length
< conn
->buffer_used
+ BUFFER_SIZE_INCREMENT
) {
70 conn
->buffer_length
+= BUFFER_SIZE_INCREMENT
;
71 conn
->buffer
= g_realloc(conn
->buffer
, conn
->buffer_length
);
72 SIPE_DEBUG_INFO("read_completed: new buffer length %" G_GSIZE_FORMAT
,
76 /* callback result is valid */
79 gssize len
= g_input_stream_read_finish(G_INPUT_STREAM(stream
),
84 const gchar
*msg
= error
? error
->message
: "UNKNOWN";
85 SIPE_DEBUG_ERROR("read_completed: error: %s", msg
);
87 transport
->error(conn
, msg
);
90 } else if (len
== 0) {
91 SIPE_DEBUG_ERROR_NOFORMAT("read_completed: server has disconnected");
92 transport
->error(conn
, _("Server has disconnected"));
94 } else if (transport
->do_flush
) {
95 /* read completed while disconnected transport is flushing */
96 SIPE_DEBUG_INFO_NOFORMAT("read_completed: ignored during flushing");
98 } else if (g_cancellable_is_cancelled(transport
->cancel
)) {
99 /* read completed when transport was disconnected */
100 SIPE_DEBUG_INFO_NOFORMAT("read_completed: cancelled");
104 /* Forward data to core */
105 conn
->buffer_used
+= len
;
106 conn
->buffer
[conn
->buffer_used
] = '\0';
107 transport
->input(conn
);
109 /* we processed the result */
113 /* buffer too short? */
114 } while (conn
->buffer_length
- conn
->buffer_used
- 1 == 0);
116 /* setup next read */
117 g_input_stream_read_async(G_INPUT_STREAM(stream
),
118 conn
->buffer
+ conn
->buffer_used
,
119 conn
->buffer_length
- conn
->buffer_used
- 1,
126 static void socket_connected(GObject
*client
,
127 GAsyncResult
*result
,
130 struct sipe_transport_telepathy
*transport
= data
;
131 GError
*error
= NULL
;
133 transport
->socket
= g_socket_client_connect_finish(G_SOCKET_CLIENT(client
),
137 if (transport
->socket
== NULL
) {
138 const gchar
*msg
= error
? error
->message
: "UNKNOWN";
139 SIPE_DEBUG_ERROR("socket_connected: failed: %s", msg
);
140 if (transport
->error
)
141 transport
->error(SIPE_TRANSPORT_CONNECTION
, msg
);
143 } else if (g_cancellable_is_cancelled(transport
->cancel
)) {
144 /* connect already succeeded when transport was disconnected */
145 g_object_unref(transport
->socket
);
146 transport
->socket
= NULL
;
147 SIPE_DEBUG_INFO_NOFORMAT("socket_connected: succeeded, but cancelled");
149 GSocketAddress
*saddr
= g_socket_connection_get_local_address(transport
->socket
,
153 SIPE_DEBUG_INFO_NOFORMAT("socket_connected: success");
155 transport
->public.client_port
= g_inet_socket_address_get_port(G_INET_SOCKET_ADDRESS(saddr
));
156 g_object_unref(saddr
);
158 transport
->istream
= g_io_stream_get_input_stream(G_IO_STREAM(transport
->socket
));
159 transport
->ostream
= g_io_stream_get_output_stream(G_IO_STREAM(transport
->socket
));
161 /* the first connection is always to the server */
162 if (transport
->private->transport
== NULL
)
163 transport
->private->transport
= transport
;
165 /* this sets up the async read handler */
166 read_completed(G_OBJECT(transport
->istream
), NULL
, transport
);
167 transport
->connected(SIPE_TRANSPORT_CONNECTION
);
170 g_object_unref(transport
->socket
);
171 transport
->socket
= NULL
;
172 SIPE_DEBUG_ERROR("socket_connected: failed: %s", error
->message
);
173 transport
->error(SIPE_TRANSPORT_CONNECTION
, error
->message
);
179 struct sipe_transport_connection
*sipe_backend_transport_connect(struct sipe_core_public
*sipe_public
,
180 const sipe_connect_setup
*setup
)
182 struct sipe_transport_telepathy
*transport
= g_new0(struct sipe_transport_telepathy
, 1);
184 SIPE_DEBUG_INFO("sipe_backend_transport_connect - hostname: %s port: %d",
185 setup
->server_name
, setup
->server_port
);
187 transport
->public.type
= setup
->type
;
188 transport
->public.user_data
= setup
->user_data
;
189 transport
->connected
= setup
->connected
;
190 transport
->input
= setup
->input
;
191 transport
->error
= setup
->error
;
192 transport
->private = sipe_public
->backend_private
;
193 transport
->buffers
= NULL
;
194 transport
->is_writing
= FALSE
;
195 transport
->do_flush
= FALSE
;
197 if ((setup
->type
== SIPE_TRANSPORT_TLS
) ||
198 (setup
->type
== SIPE_TRANSPORT_TCP
)) {
199 GSocketClient
*client
= g_socket_client_new();
201 /* request TLS connection */
202 if (setup
->type
== SIPE_TRANSPORT_TLS
) {
203 SIPE_DEBUG_INFO_NOFORMAT("using TLS");
204 g_socket_client_set_tls(client
,
205 setup
->type
== SIPE_TRANSPORT_TLS
);
206 /* @TODO certificate handling - now accept all*/
207 g_socket_client_set_tls_validation_flags(client
, 0);
209 SIPE_DEBUG_INFO_NOFORMAT("using TCP");
211 transport
->cancel
= g_cancellable_new();
212 g_socket_client_connect_async(client
,
213 g_network_address_new(setup
->server_name
,
218 g_object_unref(client
);
220 setup
->error(SIPE_TRANSPORT_CONNECTION
,
221 "This should not happen...");
222 sipe_backend_transport_disconnect(SIPE_TRANSPORT_CONNECTION
);
226 return(SIPE_TRANSPORT_CONNECTION
);
229 static gboolean
free_transport(gpointer data
)
231 struct sipe_transport_telepathy
*transport
= data
;
234 SIPE_DEBUG_INFO("free_transport %p", transport
);
236 /* free unflushed buffers */
237 for (entry
= transport
->buffers
; entry
; entry
= entry
->next
)
239 g_slist_free(transport
->buffers
);
241 if (transport
->cancel
)
242 g_object_unref(transport
->cancel
);
249 static void close_completed(GObject
*stream
,
250 GAsyncResult
*result
,
253 struct sipe_transport_telepathy
*transport
= data
;
254 SIPE_DEBUG_INFO("close_completed: transport %p", data
);
255 g_io_stream_close_finish(G_IO_STREAM(stream
), result
, NULL
);
256 g_idle_add(free_transport
, transport
);
259 static void do_close(struct sipe_transport_telepathy
*transport
)
261 SIPE_DEBUG_INFO("do_close: %p", transport
);
263 /* cancel outstanding asynchronous operations */
264 transport
->do_flush
= FALSE
;
265 g_cancellable_cancel(transport
->cancel
);
266 g_io_stream_close_async(G_IO_STREAM(transport
->socket
),
273 void sipe_backend_transport_disconnect(struct sipe_transport_connection
*conn
)
275 struct sipe_transport_telepathy
*transport
= TELEPATHY_TRANSPORT
;
277 if (!transport
) return;
279 SIPE_DEBUG_INFO("sipe_backend_transport_disconnect: %p", transport
);
281 /* error callback is invalid now, do no longer call! */
282 transport
->error
= NULL
;
284 /* dropping connection to the server? */
285 if (transport
->private->transport
== transport
)
286 transport
->private->transport
= NULL
;
288 /* already connected? */
289 if (transport
->socket
) {
291 /* flush required? */
292 if (transport
->do_flush
&& transport
->is_writing
)
293 SIPE_DEBUG_INFO("sipe_backend_transport_disconnect: %p needs flushing",
299 /* cancel outstanding connect operation */
300 if (transport
->cancel
)
301 g_cancellable_cancel(transport
->cancel
);
303 /* queue transport to be deleted */
304 g_idle_add(free_transport
, transport
);
308 static void do_write(struct sipe_transport_telepathy
*transport
,
309 const gchar
*buffer
);
310 static void write_completed(GObject
*stream
,
311 GAsyncResult
*result
,
314 struct sipe_transport_telepathy
*transport
= data
;
315 GError
*error
= NULL
;
316 gssize written
= g_output_stream_write_finish(G_OUTPUT_STREAM(stream
),
320 if ((written
< 0) || error
) {
321 const gchar
*msg
= error
? error
->message
: "UNKNOWN";
322 SIPE_DEBUG_ERROR("write_completed: error: %s", msg
);
323 if (transport
->error
)
324 transport
->error(SIPE_TRANSPORT_CONNECTION
, msg
);
327 /* error during flush: give up and close transport */
328 if (transport
->do_flush
)
331 } else if (g_cancellable_is_cancelled(transport
->cancel
)) {
332 /* write completed when transport was disconnected */
333 SIPE_DEBUG_INFO_NOFORMAT("write_completed: cancelled");
334 transport
->is_writing
= FALSE
;
337 if (transport
->buffers
) {
339 gchar
*buffer
= transport
->buffers
->data
;
340 transport
->buffers
= g_slist_remove(transport
->buffers
,
342 do_write(transport
, buffer
);
345 /* no, we're done for now... */
346 transport
->is_writing
= FALSE
;
348 /* flush completed */
349 if (transport
->do_flush
)
355 static void do_write(struct sipe_transport_telepathy
*transport
,
358 transport
->is_writing
= TRUE
;
359 g_output_stream_write_async(transport
->ostream
,
368 void sipe_backend_transport_message(struct sipe_transport_connection
*conn
,
371 struct sipe_transport_telepathy
*transport
= TELEPATHY_TRANSPORT
;
373 /* currently writing? */
374 if (transport
->is_writing
) {
375 /* yes, append copy of buffer to list */
376 transport
->buffers
= g_slist_append(transport
->buffers
,
379 /* no, write directly to stream */
380 do_write(transport
, buffer
);
383 void sipe_backend_transport_flush(struct sipe_transport_connection
*conn
)
385 struct sipe_transport_telepathy
*transport
= TELEPATHY_TRANSPORT
;
386 transport
->do_flush
= TRUE
;
389 const gchar
*sipe_backend_network_ip_address(struct sipe_core_public
*sipe_public
)
391 struct sipe_backend_private
*telepathy_private
= sipe_public
->backend_private
;
392 const gchar
*ipstr
= telepathy_private
->ipaddress
;
394 /* address cached? */
396 struct sipe_transport_telepathy
*transport
= telepathy_private
->transport
;
398 /* default if everything should fail */
401 /* connection to server established - get local IP from socket */
402 if (transport
&& transport
->socket
) {
403 GSocketAddress
*saddr
= g_socket_connection_get_local_address(transport
->socket
,
407 GInetAddress
*iaddr
= g_inet_socket_address_get_address(G_INET_SOCKET_ADDRESS(saddr
));
410 /* cache address string */
411 ipstr
= telepathy_private
->ipaddress
= g_inet_address_to_string(iaddr
);
412 SIPE_DEBUG_INFO("sipe_backend_network_ip_address: %s", ipstr
);
414 g_object_unref(saddr
);