2 SleekXMPP: The Sleek XMPP Library
3 Copyright (C) 2010 Nathanael C. Fritz
4 This file is part of SleekXMPP.
6 See the file LICENSE for copying permission.
9 from __future__
import with_statement
, unicode_literals
15 import socket
as Socket
29 from sleekxmpp
.thirdparty
.statemachine
import StateMachine
30 from sleekxmpp
.xmlstream
import Scheduler
, tostring
31 from sleekxmpp
.xmlstream
.stanzabase
import StanzaBase
, ET
32 from sleekxmpp
.xmlstream
.handler
import Waiter
, XMLCallback
33 from sleekxmpp
.xmlstream
.matcher
import MatchXMLMask
35 # In Python 2.x, file socket objects are broken. A patched socket
36 # wrapper is provided for this case in filesocket.py.
37 if sys
.version_info
< (3, 0):
38 from sleekxmpp
.xmlstream
.filesocket
import FileSocket
, Socket26
48 # The time in seconds to wait before timing out waiting for response stanzas.
51 # The time in seconds to wait for events from the event queue, and also the
52 # time between checks for the process stop signal.
55 # The number of threads to use to handle XML stream events. This is not the
56 # same as the number of custom event handling threads. HANDLER_THREADS must
60 # Flag indicating if the SSL library is available for use.
63 # The time in seconds to delay between attempts to resend data
67 # The maximum number of times to attempt resending data due to
71 # Maximum time to delay between connection attempts is one hour.
72 RECONNECT_MAX_DELAY
= 600
74 log
= logging
.getLogger(__name__
)
77 class RestartStream(Exception):
79 Exception to restart stream processing, including
80 resending the stream header.
84 class XMLStream(object):
86 An XML stream connection manager and event dispatcher.
88 The XMLStream class abstracts away the issues of establishing a
89 connection with a server and sending and receiving XML "stanzas".
90 A stanza is a complete XML element that is a direct child of a root
91 document element. Two streams are used, one for each communication
92 direction, over the same socket. Once the connection is closed, both
93 streams should be complete and valid XML documents.
95 Three types of events are provided to manage the stream:
96 Stream -- Triggered based on received stanzas, similar in concept
97 to events in a SAX XML parser.
98 Custom -- Triggered manually.
99 Scheduled -- Triggered based on time delays.
101 Typically, stanzas are first processed by a stream event handler which
102 will then trigger custom events to continue further processing,
103 especially since custom event handlers may run in individual threads.
107 address -- The hostname and port of the server.
108 default_ns -- The default XML namespace that will be applied
109 to all non-namespaced stanzas.
110 event_queue -- A queue of stream, custom, and scheduled
111 events to be processed.
112 filesocket -- A filesocket created from the main connection socket.
113 Required for ElementTree.iterparse.
114 default_port -- Default port to connect to.
115 namespace_map -- Optional mapping of namespaces to namespace prefixes.
116 scheduler -- A scheduler object for triggering events
117 after a given period of time.
118 send_queue -- A queue of stanzas to be sent on the stream.
119 socket -- The connection to the server.
120 ssl_support -- Indicates if a SSL library is available for use.
121 ssl_version -- The version of the SSL protocol to use.
122 Defaults to ssl.PROTOCOL_TLSv1.
123 ca_certs -- File path to a CA certificate to verify the
125 state -- A state machine for managing the stream's
127 stream_footer -- The start tag and any attributes for the stream's
129 stream_header -- The closing tag of the stream's root element.
130 use_ssl -- Flag indicating if SSL should be used.
131 use_tls -- Flag indicating if TLS should be used.
132 use_proxy -- Flag indicating that an HTTP Proxy should be used.
133 stop -- threading Event used to stop all threads.
134 proxy_config -- An optional dictionary with the following entries:
135 host -- The host offering proxy services.
136 port -- The port for the proxy service.
137 username -- Optional username for the proxy.
138 password -- Optional password for the proxy.
140 auto_reconnect -- Flag to determine whether we auto reconnect.
141 reconnect_max_delay -- Maximum time to delay between connection
142 attempts. Defaults to RECONNECT_MAX_DELAY,
144 dns_answers -- List of dns answers not yet used to connect.
147 add_event_handler -- Add a handler for a custom event.
148 add_handler -- Shortcut method for registerHandler.
149 connect -- Connect to the given server.
150 del_event_handler -- Remove a handler for a custom event.
151 disconnect -- Disconnect from the server and terminate
153 event -- Trigger a custom event.
154 get_id -- Return the current stream ID.
155 incoming_filter -- Optionally filter stanzas before processing.
156 new_id -- Generate a new, unique ID value.
157 process -- Read XML stanzas from the stream and apply
158 matching stream handlers.
159 reconnect -- Reestablish a connection to the server.
160 register_handler -- Add a handler for a stream event.
161 register_stanza -- Add a new stanza object type that may appear
162 as a direct child of the stream's root.
163 remove_handler -- Remove a stream handler.
164 remove_stanza -- Remove a stanza object type.
165 schedule -- Schedule an event handler to execute after a
167 send -- Send a stanza object on the stream.
168 send_raw -- Send a raw string on the stream.
169 send_xml -- Send an XML string on the stream.
170 set_socket -- Set the stream's socket and generate a new
172 start_stream_handler -- Perform any stream initialization such
174 start_tls -- Establish a TLS connection and restart
178 def __init__(self
, socket
=None, host
='', port
=0):
180 Establish a new XML stream.
183 socket -- Use an existing socket for the stream.
184 Defaults to None to generate a new socket.
185 host -- The name of the target server.
186 Defaults to the empty string.
187 port -- The port to use for the connection.
190 self
.ssl_support
= SSL_SUPPORT
191 self
.ssl_version
= ssl
.PROTOCOL_TLSv1
194 self
.wait_timeout
= WAIT_TIMEOUT
195 self
.response_timeout
= RESPONSE_TIMEOUT
196 self
.reconnect_delay
= None
197 self
.reconnect_max_delay
= RECONNECT_MAX_DELAY
198 self
.ssl_retry_max
= SSL_RETRY_MAX
199 self
.ssl_retry_delay
= SSL_RETRY_DELAY
201 self
.state
= StateMachine(('disconnected', 'connected'))
202 self
.state
._set
_state
('disconnected')
204 self
.default_port
= int(port
)
205 self
.default_domain
= ''
206 self
.address
= (host
, int(port
))
207 self
.filesocket
= None
208 self
.set_socket(socket
)
210 if sys
.version_info
< (3, 0):
211 self
.socket_class
= Socket26
213 self
.socket_class
= Socket
.socket
217 self
.use_proxy
= False
219 self
.proxy_config
= {}
223 self
.stream_header
= "<stream>"
224 self
.stream_footer
= "</stream>"
226 self
.whitespace_keepalive
= True
227 self
.whitespace_keepalive_interval
= 300
229 self
.stop
= threading
.Event()
230 self
.stream_end_event
= threading
.Event()
231 self
.stream_end_event
.set()
232 self
.session_started_event
= threading
.Event()
233 self
.session_timeout
= 45
235 self
.event_queue
= queue
.Queue()
236 self
.send_queue
= queue
.Queue()
237 self
.__failed
_send
_stanza
= None
238 self
.scheduler
= Scheduler(self
.stop
)
240 self
.namespace_map
= {StanzaBase
.xml_ns
: 'xml'}
243 self
.__root
_stanza
= []
245 self
.__event
_handlers
= {}
246 self
.__event
_handlers
_lock
= threading
.Lock()
249 self
._id
_lock
= threading
.Lock()
251 self
.auto_reconnect
= True
252 self
.dns_answers
= []
254 self
.add_event_handler('connected', self
._handle
_connected
)
255 self
.add_event_handler('session_start', self
._start
_keepalive
)
256 self
.add_event_handler('session_end', self
._end
_keepalive
)
258 def use_signals(self
, signals
=None):
260 Register signal handlers for SIGHUP and SIGTERM, if possible,
261 which will raise a "killed" event when the application is
264 If a signal handler already existed, it will be executed first,
265 before the "killed" event is raised.
268 signals -- A list of signal names to be monitored.
269 Defaults to ['SIGHUP', 'SIGTERM'].
272 signals
= ['SIGHUP', 'SIGTERM']
274 existing_handlers
= {}
275 for sig_name
in signals
:
276 if hasattr(signal
, sig_name
):
277 sig
= getattr(signal
, sig_name
)
278 handler
= signal
.getsignal(sig
)
280 existing_handlers
[sig
] = handler
282 def handle_kill(signum
, frame
):
284 Capture kill event and disconnect cleanly after first
285 spawning the "killed" event.
288 if signum
in existing_handlers
and \
289 existing_handlers
[signum
] != handle_kill
:
290 existing_handlers
[signum
](signum
, frame
)
292 self
.event("killed", direct
=True)
296 for sig_name
in signals
:
297 if hasattr(signal
, sig_name
):
298 sig
= getattr(signal
, sig_name
)
299 signal
.signal(sig
, handle_kill
)
300 self
.__signals
_installed
= True
302 log
.debug("Can not set interrupt signal handlers. " + \
303 "SleekXMPP is not running from a main thread.")
307 Generate and return a new stream ID in hexadecimal form.
309 Many stanzas, handlers, or matchers may require unique
310 ID values. Using this method ensures that all new ID values
311 are unique in this stream.
319 Return the current unique stream ID in hexadecimal form.
321 return "%X" % self
._id
323 def connect(self
, host
='', port
=0, use_ssl
=False,
324 use_tls
=True, reattempt
=True):
326 Create a new socket and connect to the server.
328 Setting reattempt to True will cause connection attempts to be made
329 every second until a successful connection is established.
332 host -- The name of the desired server for the connection.
333 port -- Port to connect to on the server.
334 use_ssl -- Flag indicating if SSL should be used.
335 use_tls -- Flag indicating if TLS should be used.
336 reattempt -- Flag indicating if the socket should reconnect
337 after disconnections.
340 self
.address
= (host
, int(port
))
342 Socket
.inet_aton(self
.address
[0])
344 self
.default_domain
= self
.address
[0]
346 # Respect previous SSL and TLS usage directives.
347 if use_ssl
is not None:
348 self
.use_ssl
= use_ssl
349 if use_tls
is not None:
350 self
.use_tls
= use_tls
352 # Repeatedly attempt to connect until a successful connection
354 connected
= self
.state
.transition('disconnected', 'connected',
356 while reattempt
and not connected
and not self
.stop
.is_set():
357 connected
= self
.state
.transition('disconnected', 'connected',
362 self
.scheduler
.remove('Session timeout check')
364 if self
.default_domain
:
365 self
.address
= self
.pick_dns_answer(self
.default_domain
,
367 self
.socket
= self
.socket_class(Socket
.AF_INET
, Socket
.SOCK_STREAM
)
368 self
.configure_socket()
370 if self
.reconnect_delay
is None:
373 delay
= min(self
.reconnect_delay
* 2, self
.reconnect_max_delay
)
374 delay
= random
.normalvariate(delay
, delay
* 0.1)
375 log
.debug('Waiting %s seconds before connecting.', delay
)
379 connected
= self
._connect
_proxy
()
381 self
.reconnect_delay
= delay
384 if self
.use_ssl
and self
.ssl_support
:
385 log
.debug("Socket Wrapped for SSL")
386 if self
.ca_certs
is None:
387 cert_policy
= ssl
.CERT_NONE
389 cert_policy
= ssl
.CERT_REQUIRED
391 ssl_socket
= ssl
.wrap_socket(self
.socket
,
392 ca_certs
=self
.ca_certs
,
393 cert_reqs
=cert_policy
)
395 if hasattr(self
.socket
, 'socket'):
396 # We are using a testing socket, so preserve the top
398 self
.socket
.socket
= ssl_socket
400 self
.socket
= ssl_socket
403 if not self
.use_proxy
:
404 log
.debug("Connecting to %s:%s", *self
.address
)
405 self
.socket
.connect(self
.address
)
407 self
.set_socket(self
.socket
, ignore
=True)
408 #this event is where you should set your application state
409 self
.event("connected", direct
=True)
410 self
.reconnect_delay
= 1.0
412 except Socket
.error
as serr
:
413 error_msg
= "Could not connect to %s:%s. Socket Error #%s: %s"
414 self
.event('socket_error', serr
)
415 log
.error(error_msg
, self
.address
[0], self
.address
[1],
416 serr
.errno
, serr
.strerror
)
417 self
.reconnect_delay
= delay
420 def _connect_proxy(self
):
421 """Attempt to connect using an HTTP Proxy."""
423 # Extract the proxy address, and optional credentials
424 address
= (self
.proxy_config
['host'], int(self
.proxy_config
['port']))
426 if self
.proxy_config
['username']:
427 username
= self
.proxy_config
['username']
428 password
= self
.proxy_config
['password']
430 cred
= '%s:%s' % (username
, password
)
431 if sys
.version_info
< (3, 0):
434 cred
= bytes(cred
, 'utf-8')
435 cred
= base64
.b64encode(cred
).decode('utf-8')
437 # Build the HTTP headers for connecting to the XMPP server
438 headers
= ['CONNECT %s:%s HTTP/1.0' % self
.address
,
439 'Host: %s:%s' % self
.address
,
440 'Proxy-Connection: Keep-Alive',
442 'User-Agent: SleekXMPP/%s' % sleekxmpp
.__version
__]
444 headers
.append('Proxy-Authorization: Basic %s' % cred
)
445 headers
= '\r\n'.join(headers
) + '\r\n\r\n'
448 log
.debug("Connecting to proxy: %s:%s", address
)
449 self
.socket
.connect(address
)
450 self
.send_raw(headers
, now
=True)
452 while '\r\n\r\n' not in resp
and not self
.stop
.is_set():
453 resp
+= self
.socket
.recv(1024).decode('utf-8')
454 log
.debug('RECV: %s', resp
)
456 lines
= resp
.split('\r\n')
457 if '200' not in lines
[0]:
458 self
.event('proxy_error', resp
)
459 log
.error('Proxy Error: %s', lines
[0])
462 # Proxy connection established, continue connecting
463 # with the XMPP server.
465 except Socket
.error
as serr
:
466 error_msg
= "Could not connect to %s:%s. Socket Error #%s: %s"
467 self
.event('socket_error', serr
)
468 log
.error(error_msg
, self
.address
[0], self
.address
[1],
469 serr
.errno
, serr
.strerror
)
472 def _handle_connected(self
, event
=None):
474 Add check to ensure that a session is established within
475 a reasonable amount of time.
478 def _handle_session_timeout():
479 if not self
.session_started_event
.is_set():
480 log
.debug("Session start has taken more " + \
481 "than %d seconds", self
.session_timeout
)
482 self
.disconnect(reconnect
=self
.auto_reconnect
)
484 self
.schedule("Session timeout check",
485 self
.session_timeout
,
486 _handle_session_timeout
)
488 def disconnect(self
, reconnect
=False, wait
=False):
490 Terminate processing and close the XML streams.
492 Optionally, the connection may be reconnected and
493 resume processing afterwards.
495 If the disconnect should take place after all items
496 in the send queue have been sent, use wait=True. However,
497 take note: If you are constantly adding items to the queue
498 such that it is never empty, then the disconnect will
499 not occur and the call will continue to block.
502 reconnect -- Flag indicating if the connection
503 and processing should be restarted.
505 wait -- Flag indicating if the send queue should
506 be emptied before disconnecting.
508 self
.state
.transition('connected', 'disconnected',
509 func
=self
._disconnect
, args
=(reconnect
, wait
))
511 def _disconnect(self
, reconnect
=False, wait
=False):
512 # Wait for the send queue to empty.
514 self
.send_queue
.join()
516 # Send the end of stream marker.
517 self
.send_raw(self
.stream_footer
, now
=True)
518 self
.session_started_event
.clear()
519 # Wait for confirmation that the stream was
520 # closed in the other direction.
521 self
.auto_reconnect
= reconnect
522 log
.debug('Waiting for %s from server', self
.stream_footer
)
523 self
.stream_end_event
.wait(4)
524 if not self
.auto_reconnect
:
527 self
.socket
.shutdown(Socket
.SHUT_RDWR
)
529 self
.filesocket
.close()
530 except Socket
.error
as serr
:
531 self
.event('socket_error', serr
)
533 #clear your application state
534 self
.event('session_end', direct
=True)
535 self
.event("disconnected", direct
=True)
538 def reconnect(self
, reattempt
=True):
540 Reset the stream's state and reconnect to the server.
542 log
.debug("reconnecting...")
543 if self
.state
.ensure('connected'):
544 self
.state
.transition('connected', 'disconnected', wait
=2.0,
545 func
=self
._disconnect
, args
=(True,))
547 log
.debug("connecting...")
548 connected
= self
.state
.transition('disconnected', 'connected',
549 wait
=2.0, func
=self
._connect
)
550 while reattempt
and not connected
and not self
.stop
.is_set():
551 connected
= self
.state
.transition('disconnected', 'connected',
552 wait
=2.0, func
=self
._connect
)
553 connected
= connected
or self
.state
.ensure('connected')
556 def set_socket(self
, socket
, ignore
=False):
558 Set the socket to use for the stream.
560 The filesocket will be recreated as well.
563 socket -- The new socket to use.
564 ignore -- don't set the state
567 if socket
is not None:
568 # ElementTree.iterparse requires a file.
569 # 0 buffer files have to be binary.
571 # Use the correct fileobject type based on the Python
572 # version to work around a broken implementation in
574 if sys
.version_info
< (3, 0):
575 self
.filesocket
= FileSocket(self
.socket
)
577 self
.filesocket
= self
.socket
.makefile('rb', 0)
579 self
.state
._set
_state
('connected')
581 def configure_socket(self
):
583 Set timeout and other options for self.socket.
585 Meant to be overridden.
587 self
.socket
.settimeout(None)
589 def configure_dns(self
, resolver
, domain
=None, port
=None):
591 Configure and set options for a dns.resolver.Resolver
592 instance, and other DNS related tasks. For example, you
593 can also check Socket.getaddrinfo to see if you need to
594 call out to libresolv.so.2 to run res_init().
596 Meant to be overridden.
599 resolver -- A dns.resolver.Resolver instance, or None
600 if dnspython is not installed.
601 domain -- The initial domain under consideration.
602 port -- The initial port under consideration.
608 Perform handshakes for TLS.
610 If the handshake is successful, the XML stream will need
614 log
.info("Negotiating TLS")
615 log
.info("Using SSL version: %s", str(self
.ssl_version
))
616 if self
.ca_certs
is None:
617 cert_policy
= ssl
.CERT_NONE
619 cert_policy
= ssl
.CERT_REQUIRED
621 ssl_socket
= ssl
.wrap_socket(self
.socket
,
622 ssl_version
=self
.ssl_version
,
623 do_handshake_on_connect
=False,
624 ca_certs
=self
.ca_certs
,
625 cert_reqs
=cert_policy
)
627 if hasattr(self
.socket
, 'socket'):
628 # We are using a testing socket, so preserve the top
630 self
.socket
.socket
= ssl_socket
632 self
.socket
= ssl_socket
633 self
.socket
.do_handshake()
634 self
.set_socket(self
.socket
)
637 log
.warning("Tried to enable TLS, but ssl module not found.")
640 def _start_keepalive(self
, event
):
642 Begin sending whitespace periodically to keep the connection alive.
644 May be disabled by setting:
645 self.whitespace_keepalive = False
647 The keepalive interval can be set using:
648 self.whitespace_keepalive_interval = 300
651 def send_keepalive():
652 if self
.send_queue
.empty():
655 self
.schedule('Whitespace Keepalive',
656 self
.whitespace_keepalive_interval
,
660 def _end_keepalive(self
, event
):
661 """Stop sending whitespace keepalives"""
662 self
.scheduler
.remove('Whitespace Keepalive')
664 def start_stream_handler(self
, xml
):
666 Perform any initialization actions, such as handshakes, once the
667 stream header has been sent.
669 Meant to be overridden.
673 def register_stanza(self
, stanza_class
):
675 Add a stanza object class as a known root stanza. A root stanza is
676 one that appears as a direct child of the stream's root element.
678 Stanzas that appear as substanzas of a root stanza do not need to
679 be registered here. That is done using register_stanza_plugin() from
680 sleekxmpp.xmlstream.stanzabase.
682 Stanzas that are not registered will not be converted into
683 stanza objects, but may still be processed using handlers and
687 stanza_class -- The top-level stanza object's class.
689 self
.__root
_stanza
.append(stanza_class
)
691 def remove_stanza(self
, stanza_class
):
693 Remove a stanza from being a known root stanza. A root stanza is
694 one that appears as a direct child of the stream's root element.
696 Stanzas that are not registered will not be converted into
697 stanza objects, but may still be processed using handlers and
700 del self
.__root
_stanza
[stanza_class
]
702 def add_handler(self
, mask
, pointer
, name
=None, disposable
=False,
703 threaded
=False, filter=False, instream
=False):
705 A shortcut method for registering a handler using XML masks.
708 mask -- An XML snippet matching the structure of the
709 stanzas that will be passed to this handler.
710 pointer -- The handler function itself.
711 name -- A unique name for the handler. A name will
712 be generated if one is not provided.
713 disposable -- Indicates if the handler should be discarded
715 threaded -- Deprecated. Remains for backwards compatibility.
716 filter -- Deprecated. Remains for backwards compatibility.
717 instream -- Indicates if the handler should execute during
718 stream processing and not during normal event
721 # To prevent circular dependencies, we must load the matcher
722 # and handler classes here.
725 name
= 'add_handler_%s' % self
.getNewId()
726 self
.registerHandler(XMLCallback(name
, MatchXMLMask(mask
), pointer
,
727 once
=disposable
, instream
=instream
))
729 def register_handler(self
, handler
, before
=None, after
=None):
731 Add a stream event handler that will be executed when a matching
735 handler -- The handler object to execute.
737 if handler
.stream
is None:
738 self
.__handlers
.append(handler
)
739 handler
.stream
= weakref
.ref(self
)
741 def remove_handler(self
, name
):
743 Remove any stream event handlers with the given name.
746 name -- The name of the handler.
749 for handler
in self
.__handlers
:
750 if handler
.name
== name
:
751 self
.__handlers
.pop(idx
)
756 def get_dns_records(self
, domain
, port
=None):
758 Get the DNS records for a domain.
761 domain -- The domain in question.
762 port -- If the results don't include a port, use this one.
765 port
= self
.default_port
767 resolver
= dns
.resolver
.get_default_resolver()
768 self
.configure_dns(resolver
, domain
=domain
, port
=port
)
771 answers
= resolver
.query(domain
, dns
.rdatatype
.A
)
772 except (dns
.resolver
.NXDOMAIN
, dns
.resolver
.NoAnswer
):
773 log
.warning("No A records for %s", domain
)
774 return [((domain
, port
), 0, 0)]
775 except dns
.exception
.Timeout
:
776 log
.warning("DNS resolution timed out " + \
777 "for A record of %s", domain
)
778 return [((domain
, port
), 0, 0)]
780 return [((ans
.address
, port
), 0, 0) for ans
in answers
]
782 log
.warning("dnspython is not installed -- " + \
783 "relying on OS A record resolution")
784 self
.configure_dns(None, domain
=domain
, port
=port
)
785 return [((domain
, port
), 0, 0)]
787 def pick_dns_answer(self
, domain
, port
=None):
789 Pick a server and port from DNS answers.
790 Gets DNS answers if none available.
791 Removes used answer from available answers.
794 domain -- The domain in question.
795 port -- If the results don't include a port, use this one.
797 if not self
.dns_answers
:
798 self
.dns_answers
= self
.get_dns_records(domain
, port
)
802 for answer
in self
.dns_answers
:
803 topprio
= min(topprio
, answer
[1])
804 for answer
in self
.dns_answers
:
805 if answer
[1] == topprio
:
807 addresses
[intmax
] = answer
[0]
809 #python3 returns a generator for dictionary keys
810 items
= [x
for x
in addresses
.keys()]
813 picked
= random
.randint(0, intmax
)
816 address
= addresses
[item
]
818 for idx
, answer
in enumerate(self
.dns_answers
):
819 if self
.dns_answers
[0] == address
:
821 self
.dns_answers
.pop(idx
)
822 log
.debug("Trying to connect to %s:%s", *address
)
825 def add_event_handler(self
, name
, pointer
,
826 threaded
=False, disposable
=False):
828 Add a custom event handler that will be executed whenever
829 its event is manually triggered.
832 name -- The name of the event that will trigger
834 pointer -- The function to execute.
835 threaded -- If set to True, the handler will execute
836 in its own thread. Defaults to False.
837 disposable -- If set to True, the handler will be
838 discarded after one use. Defaults to False.
840 if not name
in self
.__event
_handlers
:
841 self
.__event
_handlers
[name
] = []
842 self
.__event
_handlers
[name
].append((pointer
, threaded
, disposable
))
844 def del_event_handler(self
, name
, pointer
):
846 Remove a function as a handler for an event.
849 name -- The name of the event.
850 pointer -- The function to remove as a handler.
852 if not name
in self
.__event
_handlers
:
855 # Need to keep handlers that do not use
856 # the given function pointer
857 def filter_pointers(handler
):
858 return handler
[0] != pointer
860 self
.__event
_handlers
[name
] = list(filter(
862 self
.__event
_handlers
[name
]))
864 def event_handled(self
, name
):
866 Indicates if an event has any associated handlers.
868 Returns the number of registered handlers.
871 name -- The name of the event to check.
873 return len(self
.__event
_handlers
.get(name
, []))
875 def event(self
, name
, data
={}, direct
=False):
877 Manually trigger a custom event.
880 name -- The name of the event to trigger.
881 data -- Data that will be passed to each event handler.
882 Defaults to an empty dictionary.
883 direct -- Runs the event directly if True, skipping the
884 event queue. All event handlers will run in the
887 handlers
= self
.__event
_handlers
.get(name
, [])
888 for handler
in handlers
:
889 #TODO: Data should not be copied, but should be read only,
890 # but this might break current code so it's left for future.
892 out_data
= copy
.copy(data
) if len(handlers
) > 1 else data
893 old_exception
= getattr(data
, 'exception', None)
897 except Exception as e
:
898 error_msg
= 'Error processing event handler: %s'
899 log
.exception(error_msg
, str(handler
[0]))
905 self
.event_queue
.put(('event', handler
, out_data
))
907 # If the handler is disposable, we will go ahead and
908 # remove it now instead of waiting for it to be
909 # processed in the queue.
910 with self
.__event
_handlers
_lock
:
912 h_index
= self
.__event
_handlers
[name
].index(handler
)
913 self
.__event
_handlers
[name
].pop(h_index
)
917 def schedule(self
, name
, seconds
, callback
, args
=None,
918 kwargs
=None, repeat
=False):
920 Schedule a callback function to execute after a given delay.
923 name -- A unique name for the scheduled callback.
924 seconds -- The time in seconds to wait before executing.
925 callback -- A pointer to the function to execute.
926 args -- A tuple of arguments to pass to the function.
927 kwargs -- A dictionary of keyword arguments to pass to
929 repeat -- Flag indicating if the scheduled event should
930 be reset and repeat after executing.
932 self
.scheduler
.add(name
, seconds
, callback
, args
, kwargs
,
933 repeat
, qpointer
=self
.event_queue
)
935 def incoming_filter(self
, xml
):
937 Filter incoming XML objects before they are processed.
939 Possible uses include remapping namespaces, or correcting elements
940 from sources with incorrect behavior.
942 Meant to be overridden.
946 def send(self
, data
, mask
=None, timeout
=None, now
=False):
948 A wrapper for send_raw for sending stanza objects.
950 May optionally block until an expected response is received.
953 data -- The stanza object to send on the stream.
954 mask -- Deprecated. An XML snippet matching the structure
955 of the expected response. Execution will block
956 in this thread until the response is received
958 timeout -- Time in seconds to wait for a response before
959 continuing. Defaults to RESPONSE_TIMEOUT.
960 now -- Indicates if the send queue should be skipped,
961 sending the stanza immediately. Useful mainly
962 for stream initialization stanzas.
966 timeout
= self
.response_timeout
967 if hasattr(mask
, 'xml'):
971 log
.warning("Use of send mask waiters is deprecated.")
972 wait_for
= Waiter("SendWait_%s" % self
.new_id(),
974 self
.register_handler(wait_for
)
975 self
.send_raw(data
, now
)
977 return wait_for
.wait(timeout
)
979 def send_xml(self
, data
, mask
=None, timeout
=None, now
=False):
981 Send an XML object on the stream, and optionally wait
985 data -- The XML object to send on the stream.
986 mask -- Deprecated. An XML snippet matching the structure
987 of the expected response. Execution will block
988 in this thread until the response is received
990 timeout -- Time in seconds to wait for a response before
991 continuing. Defaults to RESPONSE_TIMEOUT.
992 now -- Indicates if the send queue should be skipped,
993 sending the stanza immediately. Useful mainly
994 for stream initialization stanzas.
998 timeout
= self
.response_timeout
999 return self
.send(tostring(data
), mask
, timeout
, now
)
1001 def send_raw(self
, data
, now
=False, reconnect
=None):
1003 Send raw data across the stream.
1006 data -- Any string value.
1007 reconnect -- Indicates if the stream should be
1008 restarted if there is an error sending
1009 the stanza. Used mainly for testing.
1010 Defaults to self.auto_reconnect.
1013 log
.debug("SEND (IMMED): %s", data
)
1015 data
= data
.encode('utf-8')
1020 while sent
< total
and not self
.stop
.is_set():
1022 sent
+= self
.socket
.send(data
[sent
:])
1024 except ssl
.SSLError
as serr
:
1025 if tries
>= self
.ssl_retry_max
:
1026 log
.debug('SSL error - max retries reached')
1027 self
.exception(serr
)
1028 log
.warning("Failed to send %s", data
)
1029 if reconnect
is None:
1030 reconnect
= self
.auto_reconnect
1031 self
.disconnect(reconnect
)
1032 log
.warning('SSL write error - reattempting')
1033 time
.sleep(self
.ssl_retry_delay
)
1036 log
.debug('SENT: %d chunks', count
)
1037 except Socket
.error
as serr
:
1038 self
.event('socket_error', serr
)
1039 log
.warning("Failed to send %s", data
)
1040 if reconnect
is None:
1041 reconnect
= self
.auto_reconnect
1042 self
.disconnect(reconnect
)
1044 self
.send_queue
.put(data
)
1047 def process(self
, **kwargs
):
1049 Initialize the XML streams and begin processing events.
1051 The number of threads used for processing stream events is determined
1055 block -- If block=False then event dispatcher will run
1056 in a separate thread, allowing for the stream to be
1057 used in the background for another application.
1058 Otherwise, process(block=True) blocks the current thread.
1061 **threaded is deprecated and included for API compatibility**
1062 threaded -- If threaded=True then event dispatcher will run
1063 in a separate thread, allowing for the stream to be
1064 used in the background for another application.
1067 Event handlers and the send queue will be threaded
1068 regardless of these parameters.
1070 if 'threaded' in kwargs
and 'block' in kwargs
:
1071 raise ValueError("process() called with both " + \
1072 "block and threaded arguments")
1073 elif 'block' in kwargs
:
1074 threaded
= not(kwargs
.get('block', False))
1076 threaded
= kwargs
.get('threaded', True)
1078 self
.scheduler
.process(threaded
=True)
1080 def start_thread(name
, target
):
1081 self
.__thread
[name
] = threading
.Thread(name
=name
, target
=target
)
1082 self
.__thread
[name
].start()
1084 for t
in range(0, HANDLER_THREADS
):
1085 log
.debug("Starting HANDLER THREAD")
1086 start_thread('stream_event_handler_%s' % t
, self
._event
_runner
)
1088 start_thread('send_thread', self
._send
_thread
)
1091 # Run the XML stream in the background for another application.
1092 start_thread('process', self
._process
)
1098 Start processing the XML streams.
1100 Processing will continue after any recoverable errors
1101 if reconnections are allowed.
1104 # The body of this loop will only execute once per connection.
1105 # Additional passes will be made only if an error occurs and
1106 # reconnecting is permitted.
1109 # The call to self.__read_xml will block and prevent
1110 # the body of the loop from running until a disconnect
1111 # occurs. After any reconnection, the stream header will
1112 # be resent and processing will resume.
1113 while not self
.stop
.is_set():
1114 # Only process the stream while connected to the server
1115 if not self
.state
.ensure('connected', wait
=0.1,
1116 block_on_transition
=True):
1118 # Ensure the stream header is sent for any
1120 if not self
.session_started_event
.is_set():
1121 self
.send_raw(self
.stream_header
, now
=True)
1122 if not self
.__read
_xml
():
1123 # If the server terminated the stream, end processing
1125 except SyntaxError as e
:
1126 log
.error("Error reading from XML stream.")
1128 except KeyboardInterrupt:
1129 log
.debug("Keyboard Escape Detected in _process")
1132 log
.debug("SystemExit in _process")
1134 self
.scheduler
.quit()
1135 except Socket
.error
as serr
:
1136 self
.event('socket_error', serr
)
1137 log
.exception('Socket Error')
1138 except Exception as e
:
1139 if not self
.stop
.is_set():
1140 log
.exception('Connection error.')
1143 if not self
.stop
.is_set() and self
.auto_reconnect
:
1149 def __read_xml(self
):
1151 Parse the incoming XML stream, raising stream events for
1152 each received stanza.
1156 for event
, xml
in ET
.iterparse(self
.filesocket
, (b
'end', b
'start')):
1157 if event
== b
'start':
1159 # We have received the start of the root element.
1161 # Perform any stream initialization actions, such
1163 self
.stream_end_event
.clear()
1164 self
.start_stream_handler(root
)
1169 # The stream's root element has closed,
1170 # terminating the stream.
1171 log
.debug("End of stream recieved")
1172 self
.stream_end_event
.set()
1175 # We only raise events for stanzas that are direct
1176 # children of the root element.
1178 self
.__spawn
_event
(xml
)
1179 except RestartStream
:
1181 if root
is not None:
1182 # Keep the root element empty of children to
1183 # save on memory use.
1185 log
.debug("Ending read XML loop")
1187 def _build_stanza(self
, xml
, default_ns
=None):
1189 Create a stanza object from a given XML object.
1191 If a specialized stanza type is not found for the XML, then
1192 a generic StanzaBase stanza will be returned.
1195 xml -- The XML object to convert into a stanza object.
1196 default_ns -- Optional default namespace to use instead of the
1197 stream's current default namespace.
1199 if default_ns
is None:
1200 default_ns
= self
.default_ns
1201 stanza_type
= StanzaBase
1202 for stanza_class
in self
.__root
_stanza
:
1203 if xml
.tag
== "{%s}%s" % (default_ns
, stanza_class
.name
) or \
1204 xml
.tag
== stanza_class
.tag_name():
1205 stanza_type
= stanza_class
1207 stanza
= stanza_type(self
, xml
)
1210 def __spawn_event(self
, xml
):
1212 Analyze incoming XML stanzas and convert them into stanza
1213 objects if applicable and queue stream events to be processed
1214 by matching handlers.
1217 xml -- The XML stanza to analyze.
1219 log
.debug("RECV: %s", tostring(xml
, xmlns
=self
.default_ns
,
1221 # Apply any preprocessing filters.
1222 xml
= self
.incoming_filter(xml
)
1224 # Convert the raw XML object into a stanza object. If no registered
1225 # stanza type applies, a generic StanzaBase stanza will be used.
1226 stanza
= self
._build
_stanza
(xml
)
1228 # Match the stanza against registered handlers. Handlers marked
1229 # to run "in stream" will be executed immediately; the rest will
1232 matched_handlers
= [h
for h
in self
.__handlers
if h
.match(stanza
)]
1233 for handler
in matched_handlers
:
1234 if len(matched_handlers
) > 1:
1235 stanza_copy
= copy
.copy(stanza
)
1237 stanza_copy
= stanza
1238 handler
.prerun(stanza_copy
)
1239 self
.event_queue
.put(('stanza', handler
, stanza_copy
))
1241 if handler
.check_delete():
1242 self
.__handlers
.remove(handler
)
1244 pass # not thread safe
1247 # Some stanzas require responses, such as Iq queries. A default
1248 # handler will be executed immediately for this case.
1252 def _threaded_event_wrapper(self
, func
, args
):
1254 Capture exceptions for event handlers that run
1255 in individual threads.
1258 func -- The event handler to execute.
1259 args -- Arguments to the event handler.
1261 # this is always already copied before this is invoked
1265 except Exception as e
:
1266 error_msg
= 'Error processing event handler: %s'
1267 log
.exception(error_msg
, str(func
))
1268 if hasattr(orig
, 'exception'):
1273 def _event_runner(self
):
1275 Process the event queue and execute handlers.
1277 The number of event runner threads is controlled by HANDLER_THREADS.
1279 Stream event handlers will all execute in this thread. Custom event
1280 handlers may be spawned in individual threads.
1282 log
.debug("Loading event runner")
1284 while not self
.stop
.is_set():
1286 wait
= self
.wait_timeout
1287 event
= self
.event_queue
.get(True, timeout
=wait
)
1293 etype
, handler
= event
[0:2]
1295 orig
= copy
.copy(args
[0])
1297 if etype
== 'stanza':
1299 handler
.run(args
[0])
1300 except Exception as e
:
1301 error_msg
= 'Error processing stream handler: %s'
1302 log
.exception(error_msg
, handler
.name
)
1304 elif etype
== 'schedule':
1307 log
.debug('Scheduled event: %s: %s', name
, args
[0])
1309 except Exception as e
:
1310 log
.exception('Error processing scheduled task')
1312 elif etype
== 'event':
1313 func
, threaded
, disposable
= handler
1316 x
= threading
.Thread(
1317 name
="Event_%s" % str(func
),
1318 target
=self
._threaded
_event
_wrapper
,
1323 except Exception as e
:
1324 error_msg
= 'Error processing event handler: %s'
1325 log
.exception(error_msg
, str(func
))
1326 if hasattr(orig
, 'exception'):
1330 elif etype
== 'quit':
1331 log
.debug("Quitting event runner thread")
1333 except KeyboardInterrupt:
1334 log
.debug("Keyboard Escape Detected in _event_runner")
1335 self
.event('killed', direct
=True)
1340 self
.event_queue
.put(('quit', None, None))
1343 def _send_thread(self
):
1345 Extract stanzas from the send queue and send them on the stream.
1348 while not self
.stop
.is_set():
1349 while not self
.stop
.is_set
and \
1350 not self
.session_started_event
.is_set():
1351 self
.session_started_event
.wait(timeout
=1)
1352 if self
.__failed
_send
_stanza
is not None:
1353 data
= self
.__failed
_send
_stanza
1354 self
.__failed
_send
_stanza
= None
1357 data
= self
.send_queue
.get(True, 1)
1360 log
.debug("SEND: %s", data
)
1361 enc_data
= data
.encode('utf-8')
1362 total
= len(enc_data
)
1367 while sent
< total
and not self
.stop
.is_set():
1369 sent
+= self
.socket
.send(enc_data
[sent
:])
1371 except ssl
.SSLError
as serr
:
1372 if tries
>= self
.ssl_retry_max
:
1373 log
.debug('SSL error - max retries reached')
1374 self
.exception(serr
)
1375 log
.warning("Failed to send %s", data
)
1376 if reconnect
is None:
1377 reconnect
= self
.auto_reconnect
1378 self
.disconnect(reconnect
)
1379 log
.warning('SSL write error - reattempting')
1380 time
.sleep(self
.ssl_retry_delay
)
1383 log
.debug('SENT: %d chunks', count
)
1384 self
.send_queue
.task_done()
1385 except Socket
.error
as serr
:
1386 self
.event('socket_error', serr
)
1387 log
.warning("Failed to send %s", data
)
1388 self
.__failed
_send
_stanza
= data
1389 self
.disconnect(self
.auto_reconnect
)
1390 except Exception as ex
:
1391 log
.exception('Unexpected error in send thread: %s', ex
)
1393 if not self
.stop
.is_set():
1394 self
.disconnect(self
.auto_reconnect
)
1396 def exception(self
, exception
):
1398 Process an unknown exception.
1400 Meant to be overridden.
1403 exception -- An unhandled exception object.
1408 # To comply with PEP8, method names now use underscores.
1409 # Deprecated method names are re-mapped for backwards compatibility.
1410 XMLStream
.startTLS
= XMLStream
.start_tls
1411 XMLStream
.registerStanza
= XMLStream
.register_stanza
1412 XMLStream
.removeStanza
= XMLStream
.remove_stanza
1413 XMLStream
.registerHandler
= XMLStream
.register_handler
1414 XMLStream
.removeHandler
= XMLStream
.remove_handler
1415 XMLStream
.setSocket
= XMLStream
.set_socket
1416 XMLStream
.sendRaw
= XMLStream
.send_raw
1417 XMLStream
.getId
= XMLStream
.get_id
1418 XMLStream
.getNewId
= XMLStream
.new_id
1419 XMLStream
.sendXML
= XMLStream
.send_xml