better URI splitting code. see #5400, #5401
[gajim.git] / src / common / xmpp / bosh.py
blobbb6ea4dd89064c3830041a6db39121901f879d06
1 ## bosh.py
2 ##
3 ##
4 ## Copyright (C) 2008 Tomas Karasek <tom.to.the.k@gmail.com>
5 ##
6 ## This file is part of Gajim.
7 ##
8 ## Gajim is free software; you can redistribute it and/or modify
9 ## it under the terms of the GNU General Public License as published
10 ## by the Free Software Foundation; version 3 only.
12 ## Gajim is distributed in the hope that it will be useful,
13 ## but WITHOUT ANY WARRANTY; without even the implied warranty of
14 ## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 ## GNU General Public License for more details.
17 ## You should have received a copy of the GNU General Public License
18 ## along with Gajim. If not, see <http://www.gnu.org/licenses/>.
21 import locale, random
22 from hashlib import sha1
23 from transports_nb import NonBlockingTransport, NonBlockingHTTPBOSH,\
24 CONNECTED, CONNECTING, DISCONNECTED, DISCONNECTING,\
25 urisplit, DISCONNECT_TIMEOUT_SECONDS
26 from protocol import BOSHBody
27 from simplexml import Node
29 import logging
30 log = logging.getLogger('gajim.c.x.bosh')
32 KEY_COUNT = 10
34 # Fake file descriptor - it's used for setting read_timeout in idlequeue for
35 # BOSH Transport. In TCP-derived transports this is file descriptor of socket.
36 FAKE_DESCRIPTOR = -1337
39 class NonBlockingBOSH(NonBlockingTransport):
40 def __init__(self, raise_event, on_disconnect, idlequeue, estabilish_tls, certs,
41 xmpp_server, domain, bosh_dict, proxy_creds):
42 NonBlockingTransport.__init__(self, raise_event, on_disconnect, idlequeue,
43 estabilish_tls, certs)
45 self.bosh_sid = None
46 if locale.getdefaultlocale()[0]:
47 self.bosh_xml_lang = locale.getdefaultlocale()[0].split('_')[0]
48 else:
49 self.bosh_xml_lang = 'en'
51 self.http_version = 'HTTP/1.1'
52 self.http_persistent = True
53 self.http_pipelining = bosh_dict['bosh_http_pipelining']
54 self.bosh_to = domain
56 self.route_host, self.route_port = xmpp_server
58 self.bosh_wait = bosh_dict['bosh_wait']
59 if not self.http_pipelining:
60 self.bosh_hold = 1
61 else:
62 self.bosh_hold = bosh_dict['bosh_hold']
63 self.bosh_requests = self.bosh_hold
64 self.bosh_uri = bosh_dict['bosh_uri']
65 self.bosh_content = bosh_dict['bosh_content']
66 self.over_proxy = bosh_dict['bosh_useproxy']
67 if estabilish_tls:
68 self.bosh_secure = 'true'
69 else:
70 self.bosh_secure = 'false'
71 self.use_proxy_auth = bosh_dict['useauth']
72 self.proxy_creds = proxy_creds
73 self.wait_cb_time = None
74 self.http_socks = []
75 self.stanza_buffer = []
76 self.prio_bosh_stanzas = []
77 self.current_recv_handler = None
78 self.current_recv_socket = None
79 self.key_stack = None
80 self.ack_checker = None
81 self.after_init = False
82 self.proxy_dict = {}
83 if self.over_proxy and self.estabilish_tls:
84 self.proxy_dict['type'] = 'http'
85 # with SSL over proxy, we do HTTP CONNECT to proxy to open a channel to
86 # BOSH Connection Manager
87 host, port = urisplit(self.bosh_uri)[1:3]
88 self.proxy_dict['xmpp_server'] = (host, port)
89 self.proxy_dict['credentials'] = self.proxy_creds
92 def connect(self, conn_5tuple, on_connect, on_connect_failure):
93 NonBlockingTransport.connect(self, conn_5tuple, on_connect, on_connect_failure)
95 global FAKE_DESCRIPTOR
96 FAKE_DESCRIPTOR = FAKE_DESCRIPTOR - 1
97 self.fd = FAKE_DESCRIPTOR
99 self.stanza_buffer = []
100 self.prio_bosh_stanzas = []
102 self.key_stack = KeyStack(KEY_COUNT)
103 self.ack_checker = AckChecker()
104 self.after_init = True
106 self.http_socks.append(self.get_new_http_socket())
107 self._tcp_connecting_started()
109 self.http_socks[0].connect(
110 conn_5tuple = conn_5tuple,
111 on_connect = self._on_connect,
112 on_connect_failure = self._on_connect_failure)
114 def _on_connect(self):
115 self.peerhost = self.http_socks[0].peerhost
116 self.ssl_lib = self.http_socks[0].ssl_lib
117 NonBlockingTransport._on_connect(self)
121 def set_timeout(self, timeout):
122 if self.get_state() != DISCONNECTED and self.fd != -1:
123 NonBlockingTransport.set_timeout(self, timeout)
124 else:
125 log.warn('set_timeout: TIMEOUT NOT SET: state is %s, fd is %s' % (self.get_state(), self.fd))
127 def on_http_request_possible(self):
129 Called when HTTP request it's possible to send a HTTP request. It can be when
130 socket is connected or when HTTP response arrived.
131 There should be always one pending request to BOSH CM.
133 log.debug('on_http_req possible, state:\n%s' % self.get_current_state())
134 if self.get_state()==DISCONNECTED: return
136 #Hack for making the non-secure warning dialog work
137 if self._owner.got_features:
138 if (hasattr(self._owner, 'NonBlockingNonSASL') or hasattr(self._owner, 'SASL')):
139 self.send_BOSH(None)
140 else:
141 # If we already got features and no auth module was plugged yet, we are
142 # probably waiting for confirmation of the "not-secure-connection" dialog.
143 # We don't send HTTP request in that case.
144 # see http://lists.jabber.ru/pipermail/ejabberd/2008-August/004027.html
145 return
146 else:
147 self.send_BOSH(None)
151 def get_socket_in(self, state):
152 ''' gets sockets in desired state '''
153 for s in self.http_socks:
154 if s.get_state()==state: return s
155 return None
158 def get_free_socket(self):
159 ''' Selects and returns socket eligible for sending a data to.'''
160 if self.http_pipelining:
161 return self.get_socket_in(CONNECTED)
162 else:
163 last_recv_time, tmpsock = 0, None
164 for s in self.http_socks:
165 # we're interested only in CONNECTED socket with no requests pending
166 if s.get_state()==CONNECTED and s.pending_requests==0:
167 # if there's more of them, we want the one with the least recent data receive
168 # (lowest last_recv_time)
169 if (last_recv_time==0) or (s.last_recv_time < last_recv_time):
170 last_recv_time = s.last_recv_time
171 tmpsock = s
172 if tmpsock:
173 return tmpsock
174 else:
175 return None
178 def send_BOSH(self, payload):
180 Tries to send a stanza in payload by appeding it to a buffer and plugging a
181 free socket for writing.
183 total_pending_reqs = sum([s.pending_requests for s in self.http_socks])
185 # when called after HTTP response (Payload=None) and when there are already
186 # some pending requests and no data to send, or when the socket is
187 # disconnected, we do nothing
188 if payload is None and \
189 total_pending_reqs > 0 and \
190 self.stanza_buffer == [] and \
191 self.prio_bosh_stanzas == [] or \
192 self.get_state()==DISCONNECTED:
193 return
195 # now the payload is put to buffer and will be sent at some point
196 self.append_stanza(payload)
198 # if we're about to make more requests than allowed, we don't send - stanzas will be
199 # sent after HTTP response from CM, exception is when we're disconnecting - then we
200 # send anyway
201 if total_pending_reqs >= self.bosh_requests and self.get_state()!=DISCONNECTING:
202 log.warn('attemp to make more requests than allowed by Connection Manager:\n%s' %
203 self.get_current_state())
204 return
206 # when there's free CONNECTED socket, we plug it for write and the data will
207 # be sent when write is possible
208 if self.get_free_socket():
209 self.plug_socket()
210 return
212 # if there is a connecting socket, we just wait for when it connects,
213 # payload will be sent in a sec when the socket connects
214 if self.get_socket_in(CONNECTING): return
216 # being here means there are either DISCONNECTED sockets or all sockets are
217 # CONNECTED with too many pending requests
218 s = self.get_socket_in(DISCONNECTED)
220 # if we have DISCONNECTED socket, lets connect it and plug for send
221 if s:
222 self.connect_and_flush(s)
223 else:
224 # otherwise create and connect a new one
225 ss = self.get_new_http_socket()
226 self.http_socks.append(ss)
227 self.connect_and_flush(ss)
228 return
230 def plug_socket(self):
231 stanza = None
232 s = self.get_free_socket()
233 if s:
234 s._plug_idle(writable=True, readable=True)
235 else:
236 log.error('=====!!!!!!!!====> Couldn\'t get free socket in plug_socket())')
238 def build_stanza(self, socket):
240 Builds a BOSH body tag from data in buffers and adds key, rid and ack
241 attributes to it.
242 This method is called from _do_send() of underlying transport. This is to
243 ensure rid and keys will be processed in correct order. If I generate them
244 before plugging a socket for write (and did it for two sockets/HTTP
245 connections) in parallel, they might be sent in wrong order, which results
246 in violating the BOSH session and server-side disconnect.
248 if self.prio_bosh_stanzas:
249 stanza, add_payload = self.prio_bosh_stanzas.pop(0)
250 if add_payload:
251 stanza.setPayload(self.stanza_buffer)
252 self.stanza_buffer = []
253 else:
254 stanza = self.boshify_stanzas(self.stanza_buffer)
255 self.stanza_buffer = []
257 stanza = self.ack_checker.backup_stanza(stanza, socket)
259 key, newkey = self.key_stack.get()
260 if key:
261 stanza.setAttr('key', key)
262 if newkey:
263 stanza.setAttr('newkey', newkey)
266 log.info('sending msg with rid=%s to sock %s' % (stanza.getAttr('rid'), id(socket)))
267 self.renew_bosh_wait_timeout(self.bosh_wait + 3)
268 return stanza
271 def on_bosh_wait_timeout(self):
272 log.error('Connection Manager didn\'t respond within %s + 3 seconds --> forcing disconnect' % self.bosh_wait)
273 self.disconnect()
276 def renew_bosh_wait_timeout(self, timeout):
277 if self.wait_cb_time is not None:
278 self.remove_bosh_wait_timeout()
279 sched_time = self.idlequeue.set_alarm(self.on_bosh_wait_timeout, timeout)
280 self.wait_cb_time = sched_time
282 def remove_bosh_wait_timeout(self):
283 self.idlequeue.remove_alarm(
284 self.on_bosh_wait_timeout,
285 self.wait_cb_time)
287 def on_persistent_fallback(self, socket):
289 Called from underlying transport when server closes TCP connection.
290 :param socket: disconnected transport object
292 if socket.http_persistent:
293 log.warn('Fallback to nonpersistent HTTP (no pipelining as well)')
294 socket.http_persistent = False
295 self.http_persistent = False
296 self.http_pipelining = False
297 socket.disconnect(do_callback=False)
298 self.connect_and_flush(socket)
299 else:
300 socket.disconnect()
304 def handle_body_attrs(self, stanza_attrs):
306 Called for each incoming body stanza from dispatcher. Checks body attributes.
308 self.remove_bosh_wait_timeout()
310 if self.after_init:
311 if stanza_attrs.has_key('sid'):
312 # session ID should be only in init response
313 self.bosh_sid = stanza_attrs['sid']
315 if stanza_attrs.has_key('requests'):
316 self.bosh_requests = int(stanza_attrs['requests'])
318 if stanza_attrs.has_key('wait'):
319 self.bosh_wait = int(stanza_attrs['wait'])
320 self.after_init = False
322 ack = None
323 if stanza_attrs.has_key('ack'):
324 ack = stanza_attrs['ack']
325 self.ack_checker.process_incoming_ack(ack=ack,
326 socket=self.current_recv_socket)
328 if stanza_attrs.has_key('type'):
329 if stanza_attrs['type'] in ['terminate', 'terminal']:
330 condition = 'n/a'
331 if stanza_attrs.has_key('condition'):
332 condition = stanza_attrs['condition']
333 if condition == 'n/a':
334 log.info('Received sesion-ending terminating stanza')
335 else:
336 log.error('Received terminating stanza: %s - %s' % (condition,
337 bosh_errors[condition]))
338 self.disconnect()
339 return
341 if stanza_attrs['type'] == 'error':
342 # recoverable error
343 pass
344 return
347 def append_stanza(self, stanza):
348 ''' appends stanza to a buffer to send '''
349 if stanza:
350 if isinstance(stanza, tuple):
351 # stanza is tuple of BOSH stanza and bool value for whether to add payload
352 self.prio_bosh_stanzas.append(stanza)
353 else:
354 # stanza is XMPP stanza. Will be boshified before send.
355 self.stanza_buffer.append(stanza)
358 def send(self, stanza, now=False):
359 self.send_BOSH(stanza)
363 def get_current_state(self):
364 t = '------ SOCKET_ID\tSOCKET_STATE\tPENDING_REQS\n'
365 for s in self.http_socks:
366 t = '%s------ %s\t%s\t%s\n' % (t,id(s), s.get_state(), s.pending_requests)
367 t = '%s------ prio stanzas: %s, queued XMPP stanzas: %s, not_acked stanzas: %s' \
368 % (t, self.prio_bosh_stanzas, self.stanza_buffer,
369 self.ack_checker.get_not_acked_rids())
370 return t
373 def connect_and_flush(self, socket):
374 socket.connect(
375 conn_5tuple = self.conn_5tuple,
376 on_connect = self.on_http_request_possible,
377 on_connect_failure = self.disconnect)
380 def boshify_stanzas(self, stanzas=[], body_attrs=None):
381 ''' wraps zero to many stanzas by body tag with xmlns and sid '''
382 log.debug('boshify_staza - type is: %s, stanza is %s' % (type(stanzas), stanzas))
383 tag = BOSHBody(attrs={'sid': self.bosh_sid})
384 tag.setPayload(stanzas)
385 return tag
388 def send_init(self, after_SASL=False):
389 if after_SASL:
390 t = BOSHBody(
391 attrs={ 'to': self.bosh_to,
392 'sid': self.bosh_sid,
393 'xml:lang': self.bosh_xml_lang,
394 'xmpp:restart': 'true',
395 'secure': self.bosh_secure,
396 'xmlns:xmpp': 'urn:xmpp:xbosh'})
397 else:
398 t = BOSHBody(
399 attrs={ 'content': self.bosh_content,
400 'hold': str(self.bosh_hold),
401 'route': '%s:%s' % (self.route_host, self.route_port),
402 'to': self.bosh_to,
403 'wait': str(self.bosh_wait),
404 'xml:lang': self.bosh_xml_lang,
405 'xmpp:version': '1.0',
406 'ver': '1.6',
407 'xmlns:xmpp': 'urn:xmpp:xbosh'})
408 self.send_BOSH((t,True))
410 def start_disconnect(self):
411 NonBlockingTransport.start_disconnect(self)
412 self.renew_bosh_wait_timeout(DISCONNECT_TIMEOUT_SECONDS)
413 self.send_BOSH(
414 (BOSHBody(attrs={'sid': self.bosh_sid, 'type': 'terminate'}), True))
417 def get_new_http_socket(self):
418 http_dict = {'http_uri': self.bosh_uri,
419 'http_version': self.http_version,
420 'http_persistent': self.http_persistent,
421 'add_proxy_headers': self.over_proxy and not self.estabilish_tls}
422 if self.use_proxy_auth:
423 http_dict['proxy_user'], http_dict['proxy_pass'] = self.proxy_creds
425 s = NonBlockingHTTPBOSH(
426 raise_event=self.raise_event,
427 on_disconnect=self.disconnect,
428 idlequeue = self.idlequeue,
429 estabilish_tls = self.estabilish_tls,
430 certs = self.certs,
431 on_http_request_possible = self.on_http_request_possible,
432 http_dict = http_dict,
433 proxy_dict = self.proxy_dict,
434 on_persistent_fallback = self.on_persistent_fallback)
436 s.onreceive(self.on_received_http)
437 s.set_stanza_build_cb(self.build_stanza)
438 return s
441 def onreceive(self, recv_handler):
442 if recv_handler is None:
443 recv_handler = self._owner.Dispatcher.ProcessNonBlocking
444 self.current_recv_handler = recv_handler
447 def on_received_http(self, data, socket):
448 self.current_recv_socket = socket
449 self.current_recv_handler(data)
452 def disconnect(self, do_callback=True):
453 self.remove_bosh_wait_timeout()
454 if self.get_state() == DISCONNECTED: return
455 self.fd = -1
456 for s in self.http_socks:
457 s.disconnect(do_callback=False)
458 NonBlockingTransport.disconnect(self, do_callback)
461 def get_rand_number():
462 # with 50-bit random initial rid, session would have to go up
463 # to 7881299347898368 messages to raise rid over 2**53
464 # (see http://www.xmpp.org/extensions/xep-0124.html#rids)
465 # it's also used for sequence key initialization
466 r = random.Random()
467 r.seed()
468 return r.getrandbits(50)
472 class AckChecker():
474 Class for generating rids and generating and checking acknowledgements in
475 BOSH messages.
477 def __init__(self):
478 self.rid = get_rand_number()
479 self.ack = 1
480 self.last_rids = {}
481 self.not_acked = []
484 def get_not_acked_rids(self): return [rid for rid, st in self.not_acked]
486 def backup_stanza(self, stanza, socket):
487 socket.pending_requests += 1
488 rid = self.get_rid()
489 self.not_acked.append((rid, stanza))
490 stanza.setAttr('rid', str(rid))
491 self.last_rids[socket]=rid
493 if self.rid != self.ack + 1:
494 stanza.setAttr('ack', str(self.ack))
495 return stanza
497 def process_incoming_ack(self, socket, ack=None):
498 socket.pending_requests -= 1
499 if ack:
500 ack = int(ack)
501 else:
502 ack = self.last_rids[socket]
504 i = len([rid for rid, st in self.not_acked if ack >= rid])
505 self.not_acked = self.not_acked[i:]
507 self.ack = ack
510 def get_rid(self):
511 self.rid = self.rid + 1
512 return self.rid
518 class KeyStack():
520 Class implementing key sequences for BOSH messages
522 def __init__(self, count):
523 self.count = count
524 self.keys = []
525 self.reset()
526 self.first_call = True
528 def reset(self):
529 seed = str(get_rand_number())
530 self.keys = [sha1(seed).hexdigest()]
531 for i in range(self.count-1):
532 curr_seed = self.keys[i]
533 self.keys.append(sha1(curr_seed).hexdigest())
535 def get(self):
536 if self.first_call:
537 self.first_call = False
538 return (None, self.keys.pop())
540 if len(self.keys)>1:
541 return (self.keys.pop(), None)
542 else:
543 last_key = self.keys.pop()
544 self.reset()
545 new_key = self.keys.pop()
546 return (last_key, new_key)
548 # http://www.xmpp.org/extensions/xep-0124.html#errorstatus-terminal
549 bosh_errors = {
550 'n/a': 'none or unknown condition in terminating body stanza',
551 'bad-request': 'The format of an HTTP header or binding element received from the client is unacceptable (e.g., syntax error), or Script Syntax is not supported.',
552 'host-gone': 'The target domain specified in the "to" attribute or the target host or port specified in the "route" attribute is no longer serviced by the connection manager.',
553 'host-unknown': 'The target domain specified in the "to" attribute or the target host or port specified in the "route" attribute is unknown to the connection manager.',
554 'improper-addressing': 'The initialization element lacks a "to" or "route" attribute (or the attribute has no value) but the connection manager requires one.',
555 'internal-server-error': 'The connection manager has experienced an internal error that prevents it from servicing the request.',
556 'item-not-found': '(1) "sid" is not valid, (2) "stream" is not valid, (3) "rid" is larger than the upper limit of the expected window, (4) connection manager is unable to resend response, (5) "key" sequence is invalid',
557 'other-request': 'Another request being processed at the same time as this request caused the session to terminate.',
558 'policy-violation': 'The client has broken the session rules (polling too frequently, requesting too frequently, too many simultaneous requests).',
559 'remote-connection-failed': 'The connection manager was unable to connect to, or unable to connect securely to, or has lost its connection to, the server.',
560 'remote-stream-error': 'Encapsulates an error in the protocol being transported.',
561 'see-other-uri': 'The connection manager does not operate at this URI (e.g., the connection manager accepts only SSL or TLS connections at some https: URI rather than the http: URI requested by the client). The client may try POSTing to the URI in the content of the <uri/> child element.',
562 'system-shutdown': 'The connection manager is being shut down. All active HTTP sessions are being terminated. No new sessions can be created.',
563 'undefined-condition': 'The error is not one of those defined herein; the connection manager SHOULD include application-specific information in the content of the <body/> wrapper.'