Send the encoded data (bytes) and not the str, on the socket.
[slixmpp.git] / sleekxmpp / xmlstream / xmlstream.py
blobd9b78129eef5ee8911e14925d0fe1ac923f47007
1 """
2 SleekXMPP: The Sleek XMPP Library
3 Copyright (C) 2010 Nathanael C. Fritz
4 This file is part of SleekXMPP.
6 See the file LICENSE for copying permission.
7 """
9 from __future__ import with_statement, unicode_literals
11 import base64
12 import copy
13 import logging
14 import signal
15 import socket as Socket
16 import ssl
17 import sys
18 import threading
19 import time
20 import types
21 import random
22 import weakref
23 try:
24 import queue
25 except ImportError:
26 import Queue as queue
28 import sleekxmpp
29 from sleekxmpp.thirdparty.statemachine import StateMachine
30 from sleekxmpp.xmlstream import Scheduler, tostring
31 from sleekxmpp.xmlstream.stanzabase import StanzaBase, ET
32 from sleekxmpp.xmlstream.handler import Waiter, XMLCallback
33 from sleekxmpp.xmlstream.matcher import MatchXMLMask
35 # In Python 2.x, file socket objects are broken. A patched socket
36 # wrapper is provided for this case in filesocket.py.
37 if sys.version_info < (3, 0):
38 from sleekxmpp.xmlstream.filesocket import FileSocket, Socket26
40 try:
41 import dns.resolver
42 except ImportError:
43 DNSPYTHON = False
44 else:
45 DNSPYTHON = True
48 # The time in seconds to wait before timing out waiting for response stanzas.
49 RESPONSE_TIMEOUT = 30
51 # The time in seconds to wait for events from the event queue, and also the
52 # time between checks for the process stop signal.
53 WAIT_TIMEOUT = 1
55 # The number of threads to use to handle XML stream events. This is not the
56 # same as the number of custom event handling threads. HANDLER_THREADS must
57 # be at least 1.
58 HANDLER_THREADS = 1
60 # Flag indicating if the SSL library is available for use.
61 SSL_SUPPORT = True
63 # The time in seconds to delay between attempts to resend data
64 # after an SSL error.
65 SSL_RETRY_DELAY = 0.5
67 # The maximum number of times to attempt resending data due to
68 # an SSL error.
69 SSL_RETRY_MAX = 10
71 # Maximum time to delay between connection attempts is one hour.
72 RECONNECT_MAX_DELAY = 600
74 log = logging.getLogger(__name__)
77 class RestartStream(Exception):
78 """
79 Exception to restart stream processing, including
80 resending the stream header.
81 """
84 class XMLStream(object):
85 """
86 An XML stream connection manager and event dispatcher.
88 The XMLStream class abstracts away the issues of establishing a
89 connection with a server and sending and receiving XML "stanzas".
90 A stanza is a complete XML element that is a direct child of a root
91 document element. Two streams are used, one for each communication
92 direction, over the same socket. Once the connection is closed, both
93 streams should be complete and valid XML documents.
95 Three types of events are provided to manage the stream:
96 Stream -- Triggered based on received stanzas, similar in concept
97 to events in a SAX XML parser.
98 Custom -- Triggered manually.
99 Scheduled -- Triggered based on time delays.
101 Typically, stanzas are first processed by a stream event handler which
102 will then trigger custom events to continue further processing,
103 especially since custom event handlers may run in individual threads.
106 Attributes:
107 address -- The hostname and port of the server.
108 default_ns -- The default XML namespace that will be applied
109 to all non-namespaced stanzas.
110 event_queue -- A queue of stream, custom, and scheduled
111 events to be processed.
112 filesocket -- A filesocket created from the main connection socket.
113 Required for ElementTree.iterparse.
114 default_port -- Default port to connect to.
115 namespace_map -- Optional mapping of namespaces to namespace prefixes.
116 scheduler -- A scheduler object for triggering events
117 after a given period of time.
118 send_queue -- A queue of stanzas to be sent on the stream.
119 socket -- The connection to the server.
120 ssl_support -- Indicates if a SSL library is available for use.
121 ssl_version -- The version of the SSL protocol to use.
122 Defaults to ssl.PROTOCOL_TLSv1.
123 ca_certs -- File path to a CA certificate to verify the
124 server's identity.
125 state -- A state machine for managing the stream's
126 connection state.
127 stream_footer -- The start tag and any attributes for the stream's
128 root element.
129 stream_header -- The closing tag of the stream's root element.
130 use_ssl -- Flag indicating if SSL should be used.
131 use_tls -- Flag indicating if TLS should be used.
132 use_proxy -- Flag indicating that an HTTP Proxy should be used.
133 stop -- threading Event used to stop all threads.
134 proxy_config -- An optional dictionary with the following entries:
135 host -- The host offering proxy services.
136 port -- The port for the proxy service.
137 username -- Optional username for the proxy.
138 password -- Optional password for the proxy.
140 auto_reconnect -- Flag to determine whether we auto reconnect.
141 reconnect_max_delay -- Maximum time to delay between connection
142 attempts. Defaults to RECONNECT_MAX_DELAY,
143 which is one hour.
144 dns_answers -- List of dns answers not yet used to connect.
146 Methods:
147 add_event_handler -- Add a handler for a custom event.
148 add_handler -- Shortcut method for registerHandler.
149 connect -- Connect to the given server.
150 del_event_handler -- Remove a handler for a custom event.
151 disconnect -- Disconnect from the server and terminate
152 processing.
153 event -- Trigger a custom event.
154 get_id -- Return the current stream ID.
155 incoming_filter -- Optionally filter stanzas before processing.
156 new_id -- Generate a new, unique ID value.
157 process -- Read XML stanzas from the stream and apply
158 matching stream handlers.
159 reconnect -- Reestablish a connection to the server.
160 register_handler -- Add a handler for a stream event.
161 register_stanza -- Add a new stanza object type that may appear
162 as a direct child of the stream's root.
163 remove_handler -- Remove a stream handler.
164 remove_stanza -- Remove a stanza object type.
165 schedule -- Schedule an event handler to execute after a
166 given delay.
167 send -- Send a stanza object on the stream.
168 send_raw -- Send a raw string on the stream.
169 send_xml -- Send an XML string on the stream.
170 set_socket -- Set the stream's socket and generate a new
171 filesocket.
172 start_stream_handler -- Perform any stream initialization such
173 as handshakes.
174 start_tls -- Establish a TLS connection and restart
175 the stream.
178 def __init__(self, socket=None, host='', port=0):
180 Establish a new XML stream.
182 Arguments:
183 socket -- Use an existing socket for the stream.
184 Defaults to None to generate a new socket.
185 host -- The name of the target server.
186 Defaults to the empty string.
187 port -- The port to use for the connection.
188 Defaults to 0.
190 self.ssl_support = SSL_SUPPORT
191 self.ssl_version = ssl.PROTOCOL_TLSv1
192 self.ca_certs = None
194 self.wait_timeout = WAIT_TIMEOUT
195 self.response_timeout = RESPONSE_TIMEOUT
196 self.reconnect_delay = None
197 self.reconnect_max_delay = RECONNECT_MAX_DELAY
198 self.ssl_retry_max = SSL_RETRY_MAX
199 self.ssl_retry_delay = SSL_RETRY_DELAY
201 self.state = StateMachine(('disconnected', 'connected'))
202 self.state._set_state('disconnected')
204 self.default_port = int(port)
205 self.default_domain = ''
206 self.address = (host, int(port))
207 self.filesocket = None
208 self.set_socket(socket)
210 if sys.version_info < (3, 0):
211 self.socket_class = Socket26
212 else:
213 self.socket_class = Socket.socket
215 self.use_ssl = False
216 self.use_tls = False
217 self.use_proxy = False
219 self.proxy_config = {}
221 self.default_ns = ''
222 self.stream_ns = ''
223 self.stream_header = "<stream>"
224 self.stream_footer = "</stream>"
226 self.whitespace_keepalive = True
227 self.whitespace_keepalive_interval = 300
229 self.stop = threading.Event()
230 self.stream_end_event = threading.Event()
231 self.stream_end_event.set()
232 self.session_started_event = threading.Event()
233 self.session_timeout = 45
235 self.event_queue = queue.Queue()
236 self.send_queue = queue.Queue()
237 self.__failed_send_stanza = None
238 self.scheduler = Scheduler(self.stop)
240 self.namespace_map = {StanzaBase.xml_ns: 'xml'}
242 self.__thread = {}
243 self.__root_stanza = []
244 self.__handlers = []
245 self.__event_handlers = {}
246 self.__event_handlers_lock = threading.Lock()
248 self._id = 0
249 self._id_lock = threading.Lock()
251 self.auto_reconnect = True
252 self.dns_answers = []
254 self.add_event_handler('connected', self._handle_connected)
255 self.add_event_handler('session_start', self._start_keepalive)
256 self.add_event_handler('session_end', self._end_keepalive)
258 def use_signals(self, signals=None):
260 Register signal handlers for SIGHUP and SIGTERM, if possible,
261 which will raise a "killed" event when the application is
262 terminated.
264 If a signal handler already existed, it will be executed first,
265 before the "killed" event is raised.
267 Arguments:
268 signals -- A list of signal names to be monitored.
269 Defaults to ['SIGHUP', 'SIGTERM'].
271 if signals is None:
272 signals = ['SIGHUP', 'SIGTERM']
274 existing_handlers = {}
275 for sig_name in signals:
276 if hasattr(signal, sig_name):
277 sig = getattr(signal, sig_name)
278 handler = signal.getsignal(sig)
279 if handler:
280 existing_handlers[sig] = handler
282 def handle_kill(signum, frame):
284 Capture kill event and disconnect cleanly after first
285 spawning the "killed" event.
288 if signum in existing_handlers and \
289 existing_handlers[signum] != handle_kill:
290 existing_handlers[signum](signum, frame)
292 self.event("killed", direct=True)
293 self.disconnect()
295 try:
296 for sig_name in signals:
297 if hasattr(signal, sig_name):
298 sig = getattr(signal, sig_name)
299 signal.signal(sig, handle_kill)
300 self.__signals_installed = True
301 except:
302 log.debug("Can not set interrupt signal handlers. " + \
303 "SleekXMPP is not running from a main thread.")
305 def new_id(self):
307 Generate and return a new stream ID in hexadecimal form.
309 Many stanzas, handlers, or matchers may require unique
310 ID values. Using this method ensures that all new ID values
311 are unique in this stream.
313 with self._id_lock:
314 self._id += 1
315 return self.get_id()
317 def get_id(self):
319 Return the current unique stream ID in hexadecimal form.
321 return "%X" % self._id
323 def connect(self, host='', port=0, use_ssl=False,
324 use_tls=True, reattempt=True):
326 Create a new socket and connect to the server.
328 Setting reattempt to True will cause connection attempts to be made
329 every second until a successful connection is established.
331 Arguments:
332 host -- The name of the desired server for the connection.
333 port -- Port to connect to on the server.
334 use_ssl -- Flag indicating if SSL should be used.
335 use_tls -- Flag indicating if TLS should be used.
336 reattempt -- Flag indicating if the socket should reconnect
337 after disconnections.
339 if host and port:
340 self.address = (host, int(port))
341 try:
342 Socket.inet_aton(self.address[0])
343 except Socket.error:
344 self.default_domain = self.address[0]
346 # Respect previous SSL and TLS usage directives.
347 if use_ssl is not None:
348 self.use_ssl = use_ssl
349 if use_tls is not None:
350 self.use_tls = use_tls
352 # Repeatedly attempt to connect until a successful connection
353 # is established.
354 connected = self.state.transition('disconnected', 'connected',
355 func=self._connect)
356 while reattempt and not connected and not self.stop.is_set():
357 connected = self.state.transition('disconnected', 'connected',
358 func=self._connect)
359 return connected
361 def _connect(self):
362 self.scheduler.remove('Session timeout check')
363 self.stop.clear()
364 if self.default_domain:
365 self.address = self.pick_dns_answer(self.default_domain,
366 self.address[1])
367 self.socket = self.socket_class(Socket.AF_INET, Socket.SOCK_STREAM)
368 self.configure_socket()
370 if self.reconnect_delay is None:
371 delay = 1.0
372 else:
373 delay = min(self.reconnect_delay * 2, self.reconnect_max_delay)
374 delay = random.normalvariate(delay, delay * 0.1)
375 log.debug('Waiting %s seconds before connecting.', delay)
376 time.sleep(delay)
378 if self.use_proxy:
379 connected = self._connect_proxy()
380 if not connected:
381 self.reconnect_delay = delay
382 return False
384 if self.use_ssl and self.ssl_support:
385 log.debug("Socket Wrapped for SSL")
386 if self.ca_certs is None:
387 cert_policy = ssl.CERT_NONE
388 else:
389 cert_policy = ssl.CERT_REQUIRED
391 ssl_socket = ssl.wrap_socket(self.socket,
392 ca_certs=self.ca_certs,
393 cert_reqs=cert_policy)
395 if hasattr(self.socket, 'socket'):
396 # We are using a testing socket, so preserve the top
397 # layer of wrapping.
398 self.socket.socket = ssl_socket
399 else:
400 self.socket = ssl_socket
402 try:
403 if not self.use_proxy:
404 log.debug("Connecting to %s:%s", *self.address)
405 self.socket.connect(self.address)
407 self.set_socket(self.socket, ignore=True)
408 #this event is where you should set your application state
409 self.event("connected", direct=True)
410 self.reconnect_delay = 1.0
411 return True
412 except Socket.error as serr:
413 error_msg = "Could not connect to %s:%s. Socket Error #%s: %s"
414 self.event('socket_error', serr)
415 log.error(error_msg, self.address[0], self.address[1],
416 serr.errno, serr.strerror)
417 self.reconnect_delay = delay
418 return False
420 def _connect_proxy(self):
421 """Attempt to connect using an HTTP Proxy."""
423 # Extract the proxy address, and optional credentials
424 address = (self.proxy_config['host'], int(self.proxy_config['port']))
425 cred = None
426 if self.proxy_config['username']:
427 username = self.proxy_config['username']
428 password = self.proxy_config['password']
430 cred = '%s:%s' % (username, password)
431 if sys.version_info < (3, 0):
432 cred = bytes(cred)
433 else:
434 cred = bytes(cred, 'utf-8')
435 cred = base64.b64encode(cred).decode('utf-8')
437 # Build the HTTP headers for connecting to the XMPP server
438 headers = ['CONNECT %s:%s HTTP/1.0' % self.address,
439 'Host: %s:%s' % self.address,
440 'Proxy-Connection: Keep-Alive',
441 'Pragma: no-cache',
442 'User-Agent: SleekXMPP/%s' % sleekxmpp.__version__]
443 if cred:
444 headers.append('Proxy-Authorization: Basic %s' % cred)
445 headers = '\r\n'.join(headers) + '\r\n\r\n'
447 try:
448 log.debug("Connecting to proxy: %s:%s", address)
449 self.socket.connect(address)
450 self.send_raw(headers, now=True)
451 resp = ''
452 while '\r\n\r\n' not in resp and not self.stop.is_set():
453 resp += self.socket.recv(1024).decode('utf-8')
454 log.debug('RECV: %s', resp)
456 lines = resp.split('\r\n')
457 if '200' not in lines[0]:
458 self.event('proxy_error', resp)
459 log.error('Proxy Error: %s', lines[0])
460 return False
462 # Proxy connection established, continue connecting
463 # with the XMPP server.
464 return True
465 except Socket.error as serr:
466 error_msg = "Could not connect to %s:%s. Socket Error #%s: %s"
467 self.event('socket_error', serr)
468 log.error(error_msg, self.address[0], self.address[1],
469 serr.errno, serr.strerror)
470 return False
472 def _handle_connected(self, event=None):
474 Add check to ensure that a session is established within
475 a reasonable amount of time.
478 def _handle_session_timeout():
479 if not self.session_started_event.is_set():
480 log.debug("Session start has taken more " + \
481 "than %d seconds", self.session_timeout)
482 self.disconnect(reconnect=self.auto_reconnect)
484 self.schedule("Session timeout check",
485 self.session_timeout,
486 _handle_session_timeout)
488 def disconnect(self, reconnect=False, wait=False):
490 Terminate processing and close the XML streams.
492 Optionally, the connection may be reconnected and
493 resume processing afterwards.
495 If the disconnect should take place after all items
496 in the send queue have been sent, use wait=True. However,
497 take note: If you are constantly adding items to the queue
498 such that it is never empty, then the disconnect will
499 not occur and the call will continue to block.
501 Arguments:
502 reconnect -- Flag indicating if the connection
503 and processing should be restarted.
504 Defaults to False.
505 wait -- Flag indicating if the send queue should
506 be emptied before disconnecting.
508 self.state.transition('connected', 'disconnected',
509 func=self._disconnect, args=(reconnect, wait))
511 def _disconnect(self, reconnect=False, wait=False):
512 # Wait for the send queue to empty.
513 if wait:
514 self.send_queue.join()
516 # Send the end of stream marker.
517 self.send_raw(self.stream_footer, now=True)
518 self.session_started_event.clear()
519 # Wait for confirmation that the stream was
520 # closed in the other direction.
521 self.auto_reconnect = reconnect
522 log.debug('Waiting for %s from server', self.stream_footer)
523 self.stream_end_event.wait(4)
524 if not self.auto_reconnect:
525 self.stop.set()
526 try:
527 self.socket.shutdown(Socket.SHUT_RDWR)
528 self.socket.close()
529 self.filesocket.close()
530 except Socket.error as serr:
531 self.event('socket_error', serr)
532 finally:
533 #clear your application state
534 self.event('session_end', direct=True)
535 self.event("disconnected", direct=True)
536 return True
538 def reconnect(self, reattempt=True):
540 Reset the stream's state and reconnect to the server.
542 log.debug("reconnecting...")
543 if self.state.ensure('connected'):
544 self.state.transition('connected', 'disconnected', wait=2.0,
545 func=self._disconnect, args=(True,))
547 log.debug("connecting...")
548 connected = self.state.transition('disconnected', 'connected',
549 wait=2.0, func=self._connect)
550 while reattempt and not connected and not self.stop.is_set():
551 connected = self.state.transition('disconnected', 'connected',
552 wait=2.0, func=self._connect)
553 connected = connected or self.state.ensure('connected')
554 return connected
556 def set_socket(self, socket, ignore=False):
558 Set the socket to use for the stream.
560 The filesocket will be recreated as well.
562 Arguments:
563 socket -- The new socket to use.
564 ignore -- don't set the state
566 self.socket = socket
567 if socket is not None:
568 # ElementTree.iterparse requires a file.
569 # 0 buffer files have to be binary.
571 # Use the correct fileobject type based on the Python
572 # version to work around a broken implementation in
573 # Python 2.x.
574 if sys.version_info < (3, 0):
575 self.filesocket = FileSocket(self.socket)
576 else:
577 self.filesocket = self.socket.makefile('rb', 0)
578 if not ignore:
579 self.state._set_state('connected')
581 def configure_socket(self):
583 Set timeout and other options for self.socket.
585 Meant to be overridden.
587 self.socket.settimeout(None)
589 def configure_dns(self, resolver, domain=None, port=None):
591 Configure and set options for a dns.resolver.Resolver
592 instance, and other DNS related tasks. For example, you
593 can also check Socket.getaddrinfo to see if you need to
594 call out to libresolv.so.2 to run res_init().
596 Meant to be overridden.
598 Arguments:
599 resolver -- A dns.resolver.Resolver instance, or None
600 if dnspython is not installed.
601 domain -- The initial domain under consideration.
602 port -- The initial port under consideration.
604 pass
606 def start_tls(self):
608 Perform handshakes for TLS.
610 If the handshake is successful, the XML stream will need
611 to be restarted.
613 if self.ssl_support:
614 log.info("Negotiating TLS")
615 log.info("Using SSL version: %s", str(self.ssl_version))
616 if self.ca_certs is None:
617 cert_policy = ssl.CERT_NONE
618 else:
619 cert_policy = ssl.CERT_REQUIRED
621 ssl_socket = ssl.wrap_socket(self.socket,
622 ssl_version=self.ssl_version,
623 do_handshake_on_connect=False,
624 ca_certs=self.ca_certs,
625 cert_reqs=cert_policy)
627 if hasattr(self.socket, 'socket'):
628 # We are using a testing socket, so preserve the top
629 # layer of wrapping.
630 self.socket.socket = ssl_socket
631 else:
632 self.socket = ssl_socket
633 self.socket.do_handshake()
634 self.set_socket(self.socket)
635 return True
636 else:
637 log.warning("Tried to enable TLS, but ssl module not found.")
638 return False
640 def _start_keepalive(self, event):
642 Begin sending whitespace periodically to keep the connection alive.
644 May be disabled by setting:
645 self.whitespace_keepalive = False
647 The keepalive interval can be set using:
648 self.whitespace_keepalive_interval = 300
651 def send_keepalive():
652 if self.send_queue.empty():
653 self.send_raw(' ')
655 self.schedule('Whitespace Keepalive',
656 self.whitespace_keepalive_interval,
657 send_keepalive,
658 repeat=True)
660 def _end_keepalive(self, event):
661 """Stop sending whitespace keepalives"""
662 self.scheduler.remove('Whitespace Keepalive')
664 def start_stream_handler(self, xml):
666 Perform any initialization actions, such as handshakes, once the
667 stream header has been sent.
669 Meant to be overridden.
671 pass
673 def register_stanza(self, stanza_class):
675 Add a stanza object class as a known root stanza. A root stanza is
676 one that appears as a direct child of the stream's root element.
678 Stanzas that appear as substanzas of a root stanza do not need to
679 be registered here. That is done using register_stanza_plugin() from
680 sleekxmpp.xmlstream.stanzabase.
682 Stanzas that are not registered will not be converted into
683 stanza objects, but may still be processed using handlers and
684 matchers.
686 Arguments:
687 stanza_class -- The top-level stanza object's class.
689 self.__root_stanza.append(stanza_class)
691 def remove_stanza(self, stanza_class):
693 Remove a stanza from being a known root stanza. A root stanza is
694 one that appears as a direct child of the stream's root element.
696 Stanzas that are not registered will not be converted into
697 stanza objects, but may still be processed using handlers and
698 matchers.
700 del self.__root_stanza[stanza_class]
702 def add_handler(self, mask, pointer, name=None, disposable=False,
703 threaded=False, filter=False, instream=False):
705 A shortcut method for registering a handler using XML masks.
707 Arguments:
708 mask -- An XML snippet matching the structure of the
709 stanzas that will be passed to this handler.
710 pointer -- The handler function itself.
711 name -- A unique name for the handler. A name will
712 be generated if one is not provided.
713 disposable -- Indicates if the handler should be discarded
714 after one use.
715 threaded -- Deprecated. Remains for backwards compatibility.
716 filter -- Deprecated. Remains for backwards compatibility.
717 instream -- Indicates if the handler should execute during
718 stream processing and not during normal event
719 processing.
721 # To prevent circular dependencies, we must load the matcher
722 # and handler classes here.
724 if name is None:
725 name = 'add_handler_%s' % self.getNewId()
726 self.registerHandler(XMLCallback(name, MatchXMLMask(mask), pointer,
727 once=disposable, instream=instream))
729 def register_handler(self, handler, before=None, after=None):
731 Add a stream event handler that will be executed when a matching
732 stanza is received.
734 Arguments:
735 handler -- The handler object to execute.
737 if handler.stream is None:
738 self.__handlers.append(handler)
739 handler.stream = weakref.ref(self)
741 def remove_handler(self, name):
743 Remove any stream event handlers with the given name.
745 Arguments:
746 name -- The name of the handler.
748 idx = 0
749 for handler in self.__handlers:
750 if handler.name == name:
751 self.__handlers.pop(idx)
752 return True
753 idx += 1
754 return False
756 def get_dns_records(self, domain, port=None):
758 Get the DNS records for a domain.
760 Arguments:
761 domain -- The domain in question.
762 port -- If the results don't include a port, use this one.
764 if port is None:
765 port = self.default_port
766 if DNSPYTHON:
767 resolver = dns.resolver.get_default_resolver()
768 self.configure_dns(resolver, domain=domain, port=port)
770 try:
771 answers = resolver.query(domain, dns.rdatatype.A)
772 except (dns.resolver.NXDOMAIN, dns.resolver.NoAnswer):
773 log.warning("No A records for %s", domain)
774 return [((domain, port), 0, 0)]
775 except dns.exception.Timeout:
776 log.warning("DNS resolution timed out " + \
777 "for A record of %s", domain)
778 return [((domain, port), 0, 0)]
779 else:
780 return [((ans.address, port), 0, 0) for ans in answers]
781 else:
782 log.warning("dnspython is not installed -- " + \
783 "relying on OS A record resolution")
784 self.configure_dns(None, domain=domain, port=port)
785 return [((domain, port), 0, 0)]
787 def pick_dns_answer(self, domain, port=None):
789 Pick a server and port from DNS answers.
790 Gets DNS answers if none available.
791 Removes used answer from available answers.
793 Arguments:
794 domain -- The domain in question.
795 port -- If the results don't include a port, use this one.
797 if not self.dns_answers:
798 self.dns_answers = self.get_dns_records(domain, port)
799 addresses = {}
800 intmax = 0
801 topprio = 65535
802 for answer in self.dns_answers:
803 topprio = min(topprio, answer[1])
804 for answer in self.dns_answers:
805 if answer[1] == topprio:
806 intmax += answer[2]
807 addresses[intmax] = answer[0]
809 #python3 returns a generator for dictionary keys
810 items = [x for x in addresses.keys()]
811 items.sort()
813 picked = random.randint(0, intmax)
814 for item in items:
815 if picked <= item:
816 address = addresses[item]
817 break
818 for idx, answer in enumerate(self.dns_answers):
819 if self.dns_answers[0] == address:
820 break
821 self.dns_answers.pop(idx)
822 log.debug("Trying to connect to %s:%s", *address)
823 return address
825 def add_event_handler(self, name, pointer,
826 threaded=False, disposable=False):
828 Add a custom event handler that will be executed whenever
829 its event is manually triggered.
831 Arguments:
832 name -- The name of the event that will trigger
833 this handler.
834 pointer -- The function to execute.
835 threaded -- If set to True, the handler will execute
836 in its own thread. Defaults to False.
837 disposable -- If set to True, the handler will be
838 discarded after one use. Defaults to False.
840 if not name in self.__event_handlers:
841 self.__event_handlers[name] = []
842 self.__event_handlers[name].append((pointer, threaded, disposable))
844 def del_event_handler(self, name, pointer):
846 Remove a function as a handler for an event.
848 Arguments:
849 name -- The name of the event.
850 pointer -- The function to remove as a handler.
852 if not name in self.__event_handlers:
853 return
855 # Need to keep handlers that do not use
856 # the given function pointer
857 def filter_pointers(handler):
858 return handler[0] != pointer
860 self.__event_handlers[name] = list(filter(
861 filter_pointers,
862 self.__event_handlers[name]))
864 def event_handled(self, name):
866 Indicates if an event has any associated handlers.
868 Returns the number of registered handlers.
870 Arguments:
871 name -- The name of the event to check.
873 return len(self.__event_handlers.get(name, []))
875 def event(self, name, data={}, direct=False):
877 Manually trigger a custom event.
879 Arguments:
880 name -- The name of the event to trigger.
881 data -- Data that will be passed to each event handler.
882 Defaults to an empty dictionary.
883 direct -- Runs the event directly if True, skipping the
884 event queue. All event handlers will run in the
885 same thread.
887 handlers = self.__event_handlers.get(name, [])
888 for handler in handlers:
889 #TODO: Data should not be copied, but should be read only,
890 # but this might break current code so it's left for future.
892 out_data = copy.copy(data) if len(handlers) > 1 else data
893 old_exception = getattr(data, 'exception', None)
894 if direct:
895 try:
896 handler[0](out_data)
897 except Exception as e:
898 error_msg = 'Error processing event handler: %s'
899 log.exception(error_msg, str(handler[0]))
900 if old_exception:
901 old_exception(e)
902 else:
903 self.exception(e)
904 else:
905 self.event_queue.put(('event', handler, out_data))
906 if handler[2]:
907 # If the handler is disposable, we will go ahead and
908 # remove it now instead of waiting for it to be
909 # processed in the queue.
910 with self.__event_handlers_lock:
911 try:
912 h_index = self.__event_handlers[name].index(handler)
913 self.__event_handlers[name].pop(h_index)
914 except:
915 pass
917 def schedule(self, name, seconds, callback, args=None,
918 kwargs=None, repeat=False):
920 Schedule a callback function to execute after a given delay.
922 Arguments:
923 name -- A unique name for the scheduled callback.
924 seconds -- The time in seconds to wait before executing.
925 callback -- A pointer to the function to execute.
926 args -- A tuple of arguments to pass to the function.
927 kwargs -- A dictionary of keyword arguments to pass to
928 the function.
929 repeat -- Flag indicating if the scheduled event should
930 be reset and repeat after executing.
932 self.scheduler.add(name, seconds, callback, args, kwargs,
933 repeat, qpointer=self.event_queue)
935 def incoming_filter(self, xml):
937 Filter incoming XML objects before they are processed.
939 Possible uses include remapping namespaces, or correcting elements
940 from sources with incorrect behavior.
942 Meant to be overridden.
944 return xml
946 def send(self, data, mask=None, timeout=None, now=False):
948 A wrapper for send_raw for sending stanza objects.
950 May optionally block until an expected response is received.
952 Arguments:
953 data -- The stanza object to send on the stream.
954 mask -- Deprecated. An XML snippet matching the structure
955 of the expected response. Execution will block
956 in this thread until the response is received
957 or a timeout occurs.
958 timeout -- Time in seconds to wait for a response before
959 continuing. Defaults to RESPONSE_TIMEOUT.
960 now -- Indicates if the send queue should be skipped,
961 sending the stanza immediately. Useful mainly
962 for stream initialization stanzas.
963 Defaults to False.
965 if timeout is None:
966 timeout = self.response_timeout
967 if hasattr(mask, 'xml'):
968 mask = mask.xml
969 data = str(data)
970 if mask is not None:
971 log.warning("Use of send mask waiters is deprecated.")
972 wait_for = Waiter("SendWait_%s" % self.new_id(),
973 MatchXMLMask(mask))
974 self.register_handler(wait_for)
975 self.send_raw(data, now)
976 if mask is not None:
977 return wait_for.wait(timeout)
979 def send_xml(self, data, mask=None, timeout=None, now=False):
981 Send an XML object on the stream, and optionally wait
982 for a response.
984 Arguments:
985 data -- The XML object to send on the stream.
986 mask -- Deprecated. An XML snippet matching the structure
987 of the expected response. Execution will block
988 in this thread until the response is received
989 or a timeout occurs.
990 timeout -- Time in seconds to wait for a response before
991 continuing. Defaults to RESPONSE_TIMEOUT.
992 now -- Indicates if the send queue should be skipped,
993 sending the stanza immediately. Useful mainly
994 for stream initialization stanzas.
995 Defaults to False.
997 if timeout is None:
998 timeout = self.response_timeout
999 return self.send(tostring(data), mask, timeout, now)
1001 def send_raw(self, data, now=False, reconnect=None):
1003 Send raw data across the stream.
1005 Arguments:
1006 data -- Any string value.
1007 reconnect -- Indicates if the stream should be
1008 restarted if there is an error sending
1009 the stanza. Used mainly for testing.
1010 Defaults to self.auto_reconnect.
1012 if now:
1013 log.debug("SEND (IMMED): %s", data)
1014 try:
1015 data = data.encode('utf-8')
1016 total = len(data)
1017 sent = 0
1018 count = 0
1019 tries = 0
1020 while sent < total and not self.stop.is_set():
1021 try:
1022 sent += self.socket.send(data[sent:])
1023 count += 1
1024 except ssl.SSLError as serr:
1025 if tries >= self.ssl_retry_max:
1026 log.debug('SSL error - max retries reached')
1027 self.exception(serr)
1028 log.warning("Failed to send %s", data)
1029 if reconnect is None:
1030 reconnect = self.auto_reconnect
1031 self.disconnect(reconnect)
1032 log.warning('SSL write error - reattempting')
1033 time.sleep(self.ssl_retry_delay)
1034 tries += 1
1035 if count > 1:
1036 log.debug('SENT: %d chunks', count)
1037 except Socket.error as serr:
1038 self.event('socket_error', serr)
1039 log.warning("Failed to send %s", data)
1040 if reconnect is None:
1041 reconnect = self.auto_reconnect
1042 self.disconnect(reconnect)
1043 else:
1044 self.send_queue.put(data)
1045 return True
1047 def process(self, **kwargs):
1049 Initialize the XML streams and begin processing events.
1051 The number of threads used for processing stream events is determined
1052 by HANDLER_THREADS.
1054 Arguments:
1055 block -- If block=False then event dispatcher will run
1056 in a separate thread, allowing for the stream to be
1057 used in the background for another application.
1058 Otherwise, process(block=True) blocks the current thread.
1059 Defaults to False.
1061 **threaded is deprecated and included for API compatibility**
1062 threaded -- If threaded=True then event dispatcher will run
1063 in a separate thread, allowing for the stream to be
1064 used in the background for another application.
1065 Defaults to True.
1067 Event handlers and the send queue will be threaded
1068 regardless of these parameters.
1070 if 'threaded' in kwargs and 'block' in kwargs:
1071 raise ValueError("process() called with both " + \
1072 "block and threaded arguments")
1073 elif 'block' in kwargs:
1074 threaded = not(kwargs.get('block', False))
1075 else:
1076 threaded = kwargs.get('threaded', True)
1078 self.scheduler.process(threaded=True)
1080 def start_thread(name, target):
1081 self.__thread[name] = threading.Thread(name=name, target=target)
1082 self.__thread[name].start()
1084 for t in range(0, HANDLER_THREADS):
1085 log.debug("Starting HANDLER THREAD")
1086 start_thread('stream_event_handler_%s' % t, self._event_runner)
1088 start_thread('send_thread', self._send_thread)
1090 if threaded:
1091 # Run the XML stream in the background for another application.
1092 start_thread('process', self._process)
1093 else:
1094 self._process()
1096 def _process(self):
1098 Start processing the XML streams.
1100 Processing will continue after any recoverable errors
1101 if reconnections are allowed.
1104 # The body of this loop will only execute once per connection.
1105 # Additional passes will be made only if an error occurs and
1106 # reconnecting is permitted.
1107 while True:
1108 try:
1109 # The call to self.__read_xml will block and prevent
1110 # the body of the loop from running until a disconnect
1111 # occurs. After any reconnection, the stream header will
1112 # be resent and processing will resume.
1113 while not self.stop.is_set():
1114 # Only process the stream while connected to the server
1115 if not self.state.ensure('connected', wait=0.1,
1116 block_on_transition=True):
1117 continue
1118 # Ensure the stream header is sent for any
1119 # new connections.
1120 if not self.session_started_event.is_set():
1121 self.send_raw(self.stream_header, now=True)
1122 if not self.__read_xml():
1123 # If the server terminated the stream, end processing
1124 break
1125 except SyntaxError as e:
1126 log.error("Error reading from XML stream.")
1127 self.exception(e)
1128 except KeyboardInterrupt:
1129 log.debug("Keyboard Escape Detected in _process")
1130 self.stop.set()
1131 except SystemExit:
1132 log.debug("SystemExit in _process")
1133 self.stop.set()
1134 self.scheduler.quit()
1135 except Socket.error as serr:
1136 self.event('socket_error', serr)
1137 log.exception('Socket Error')
1138 except Exception as e:
1139 if not self.stop.is_set():
1140 log.exception('Connection error.')
1141 self.exception(e)
1143 if not self.stop.is_set() and self.auto_reconnect:
1144 self.reconnect()
1145 else:
1146 self.disconnect()
1147 break
1149 def __read_xml(self):
1151 Parse the incoming XML stream, raising stream events for
1152 each received stanza.
1154 depth = 0
1155 root = None
1156 for event, xml in ET.iterparse(self.filesocket, (b'end', b'start')):
1157 if event == b'start':
1158 if depth == 0:
1159 # We have received the start of the root element.
1160 root = xml
1161 # Perform any stream initialization actions, such
1162 # as handshakes.
1163 self.stream_end_event.clear()
1164 self.start_stream_handler(root)
1165 depth += 1
1166 if event == b'end':
1167 depth -= 1
1168 if depth == 0:
1169 # The stream's root element has closed,
1170 # terminating the stream.
1171 log.debug("End of stream recieved")
1172 self.stream_end_event.set()
1173 return False
1174 elif depth == 1:
1175 # We only raise events for stanzas that are direct
1176 # children of the root element.
1177 try:
1178 self.__spawn_event(xml)
1179 except RestartStream:
1180 return True
1181 if root is not None:
1182 # Keep the root element empty of children to
1183 # save on memory use.
1184 root.clear()
1185 log.debug("Ending read XML loop")
1187 def _build_stanza(self, xml, default_ns=None):
1189 Create a stanza object from a given XML object.
1191 If a specialized stanza type is not found for the XML, then
1192 a generic StanzaBase stanza will be returned.
1194 Arguments:
1195 xml -- The XML object to convert into a stanza object.
1196 default_ns -- Optional default namespace to use instead of the
1197 stream's current default namespace.
1199 if default_ns is None:
1200 default_ns = self.default_ns
1201 stanza_type = StanzaBase
1202 for stanza_class in self.__root_stanza:
1203 if xml.tag == "{%s}%s" % (default_ns, stanza_class.name) or \
1204 xml.tag == stanza_class.tag_name():
1205 stanza_type = stanza_class
1206 break
1207 stanza = stanza_type(self, xml)
1208 return stanza
1210 def __spawn_event(self, xml):
1212 Analyze incoming XML stanzas and convert them into stanza
1213 objects if applicable and queue stream events to be processed
1214 by matching handlers.
1216 Arguments:
1217 xml -- The XML stanza to analyze.
1219 log.debug("RECV: %s", tostring(xml, xmlns=self.default_ns,
1220 stream=self))
1221 # Apply any preprocessing filters.
1222 xml = self.incoming_filter(xml)
1224 # Convert the raw XML object into a stanza object. If no registered
1225 # stanza type applies, a generic StanzaBase stanza will be used.
1226 stanza = self._build_stanza(xml)
1228 # Match the stanza against registered handlers. Handlers marked
1229 # to run "in stream" will be executed immediately; the rest will
1230 # be queued.
1231 unhandled = True
1232 matched_handlers = [h for h in self.__handlers if h.match(stanza)]
1233 for handler in matched_handlers:
1234 if len(matched_handlers) > 1:
1235 stanza_copy = copy.copy(stanza)
1236 else:
1237 stanza_copy = stanza
1238 handler.prerun(stanza_copy)
1239 self.event_queue.put(('stanza', handler, stanza_copy))
1240 try:
1241 if handler.check_delete():
1242 self.__handlers.remove(handler)
1243 except:
1244 pass # not thread safe
1245 unhandled = False
1247 # Some stanzas require responses, such as Iq queries. A default
1248 # handler will be executed immediately for this case.
1249 if unhandled:
1250 stanza.unhandled()
1252 def _threaded_event_wrapper(self, func, args):
1254 Capture exceptions for event handlers that run
1255 in individual threads.
1257 Arguments:
1258 func -- The event handler to execute.
1259 args -- Arguments to the event handler.
1261 # this is always already copied before this is invoked
1262 orig = args[0]
1263 try:
1264 func(*args)
1265 except Exception as e:
1266 error_msg = 'Error processing event handler: %s'
1267 log.exception(error_msg, str(func))
1268 if hasattr(orig, 'exception'):
1269 orig.exception(e)
1270 else:
1271 self.exception(e)
1273 def _event_runner(self):
1275 Process the event queue and execute handlers.
1277 The number of event runner threads is controlled by HANDLER_THREADS.
1279 Stream event handlers will all execute in this thread. Custom event
1280 handlers may be spawned in individual threads.
1282 log.debug("Loading event runner")
1283 try:
1284 while not self.stop.is_set():
1285 try:
1286 wait = self.wait_timeout
1287 event = self.event_queue.get(True, timeout=wait)
1288 except queue.Empty:
1289 event = None
1290 if event is None:
1291 continue
1293 etype, handler = event[0:2]
1294 args = event[2:]
1295 orig = copy.copy(args[0])
1297 if etype == 'stanza':
1298 try:
1299 handler.run(args[0])
1300 except Exception as e:
1301 error_msg = 'Error processing stream handler: %s'
1302 log.exception(error_msg, handler.name)
1303 orig.exception(e)
1304 elif etype == 'schedule':
1305 name = args[1]
1306 try:
1307 log.debug('Scheduled event: %s: %s', name, args[0])
1308 handler(*args[0])
1309 except Exception as e:
1310 log.exception('Error processing scheduled task')
1311 self.exception(e)
1312 elif etype == 'event':
1313 func, threaded, disposable = handler
1314 try:
1315 if threaded:
1316 x = threading.Thread(
1317 name="Event_%s" % str(func),
1318 target=self._threaded_event_wrapper,
1319 args=(func, args))
1320 x.start()
1321 else:
1322 func(*args)
1323 except Exception as e:
1324 error_msg = 'Error processing event handler: %s'
1325 log.exception(error_msg, str(func))
1326 if hasattr(orig, 'exception'):
1327 orig.exception(e)
1328 else:
1329 self.exception(e)
1330 elif etype == 'quit':
1331 log.debug("Quitting event runner thread")
1332 return False
1333 except KeyboardInterrupt:
1334 log.debug("Keyboard Escape Detected in _event_runner")
1335 self.event('killed', direct=True)
1336 self.disconnect()
1337 return
1338 except SystemExit:
1339 self.disconnect()
1340 self.event_queue.put(('quit', None, None))
1341 return
1343 def _send_thread(self):
1345 Extract stanzas from the send queue and send them on the stream.
1347 try:
1348 while not self.stop.is_set():
1349 while not self.stop.is_set and \
1350 not self.session_started_event.is_set():
1351 self.session_started_event.wait(timeout=1)
1352 if self.__failed_send_stanza is not None:
1353 data = self.__failed_send_stanza
1354 self.__failed_send_stanza = None
1355 else:
1356 try:
1357 data = self.send_queue.get(True, 1)
1358 except queue.Empty:
1359 continue
1360 log.debug("SEND: %s", data)
1361 enc_data = data.encode('utf-8')
1362 total = len(enc_data)
1363 sent = 0
1364 count = 0
1365 tries = 0
1366 try:
1367 while sent < total and not self.stop.is_set():
1368 try:
1369 sent += self.socket.send(enc_data[sent:])
1370 count += 1
1371 except ssl.SSLError as serr:
1372 if tries >= self.ssl_retry_max:
1373 log.debug('SSL error - max retries reached')
1374 self.exception(serr)
1375 log.warning("Failed to send %s", data)
1376 if reconnect is None:
1377 reconnect = self.auto_reconnect
1378 self.disconnect(reconnect)
1379 log.warning('SSL write error - reattempting')
1380 time.sleep(self.ssl_retry_delay)
1381 tries += 1
1382 if count > 1:
1383 log.debug('SENT: %d chunks', count)
1384 self.send_queue.task_done()
1385 except Socket.error as serr:
1386 self.event('socket_error', serr)
1387 log.warning("Failed to send %s", data)
1388 self.__failed_send_stanza = data
1389 self.disconnect(self.auto_reconnect)
1390 except Exception as ex:
1391 log.exception('Unexpected error in send thread: %s', ex)
1392 self.exception(ex)
1393 if not self.stop.is_set():
1394 self.disconnect(self.auto_reconnect)
1396 def exception(self, exception):
1398 Process an unknown exception.
1400 Meant to be overridden.
1402 Arguments:
1403 exception -- An unhandled exception object.
1405 pass
1408 # To comply with PEP8, method names now use underscores.
1409 # Deprecated method names are re-mapped for backwards compatibility.
1410 XMLStream.startTLS = XMLStream.start_tls
1411 XMLStream.registerStanza = XMLStream.register_stanza
1412 XMLStream.removeStanza = XMLStream.remove_stanza
1413 XMLStream.registerHandler = XMLStream.register_handler
1414 XMLStream.removeHandler = XMLStream.remove_handler
1415 XMLStream.setSocket = XMLStream.set_socket
1416 XMLStream.sendRaw = XMLStream.send_raw
1417 XMLStream.getId = XMLStream.get_id
1418 XMLStream.getNewId = XMLStream.new_id
1419 XMLStream.sendXML = XMLStream.send_xml