3 ## Copyright (C) 2003-2005 Alexey "Snake" Nezhdanov
5 ## This program is free software; you can redistribute it and/or modify
6 ## it under the terms of the GNU General Public License as published by
7 ## the Free Software Foundation; either version 2, or (at your option)
10 ## This program is distributed in the hope that it will be useful,
11 ## but WITHOUT ANY WARRANTY; without even the implied warranty of
12 ## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 ## GNU General Public License for more details.
15 # $Id: dispatcher.py,v 1.42 2007/05/18 23:18:36 normanr Exp $
18 Main xmpppy mechanism. Provides library with methods to assign different handlers
19 to different XMPP stanzas.
20 Contains one tunable attribute: DefaultTimeout (25 seconds by default). It defines time that
21 Dispatcher.SendAndWaitForResponce method will wait for reply stanza before giving up.
24 import simplexml
,time
,sys
25 from protocol
import *
26 from client
import PlugIn
31 class Dispatcher(PlugIn
):
32 """ Ancestor of PlugIn class. Handles XMPP stream, i.e. aware of stream headers.
33 Can be plugged out/in to restart these headers (used for SASL f.e.). """
39 self
._defaultHandler
=None
40 self
._pendingExceptions
=[]
41 self
._eventHandler
=None
42 self
._cycleHandlers
=[]
43 self
._exported
_methods
=[self
.Process
,self
.RegisterHandler
,self
.RegisterDefaultHandler
,\
44 self
.RegisterEventHandler
,self
.UnregisterCycleHandler
,self
.RegisterCycleHandler
,\
45 self
.RegisterHandlerOnce
,self
.UnregisterHandler
,self
.RegisterProtocol
,\
46 self
.WaitForResponse
,self
.SendAndWaitForResponse
,self
.send
,self
.disconnect
,\
47 self
.SendAndCallForResponse
, ]
49 def dumpHandlers(self
):
50 """ Return set of user-registered callbacks in it's internal format.
51 Used within the library to carry user handlers set over Dispatcher replugins. """
53 def restoreHandlers(self
,handlers
):
54 """ Restores user-registered callbacks structure from dump previously obtained via dumpHandlers.
55 Used within the library to carry user handlers set over Dispatcher replugins. """
56 self
.handlers
=handlers
59 """ Registers default namespaces/protocols/handlers. Used internally. """
60 self
.RegisterNamespace('unknown')
61 self
.RegisterNamespace(NS_STREAMS
)
62 self
.RegisterNamespace(self
._owner
.defaultNamespace
)
63 self
.RegisterProtocol('iq',Iq
)
64 self
.RegisterProtocol('presence',Presence
)
65 self
.RegisterProtocol('message',Message
)
66 self
.RegisterDefaultHandler(self
.returnStanzaHandler
)
67 self
.RegisterHandler('error',self
.streamErrorHandler
,xmlns
=NS_STREAMS
)
69 def plugin(self
, owner
):
70 """ Plug the Dispatcher instance into Client class instance and send initial stream header. Used internally."""
72 for method
in self
._old
_owners
_methods
:
73 if method
.__name
__=='send': self
._owner
_send
=method
; break
74 self
._owner
.lastErrNode
=None
75 self
._owner
.lastErr
=None
76 self
._owner
.lastErrCode
=None
80 """ Prepares instance to be destructed. """
81 self
.Stream
.dispatch
=None
82 self
.Stream
.DEBUG
=None
83 self
.Stream
.features
=None
87 """ Send an initial stream header. """
88 self
.Stream
=simplexml
.NodeBuilder()
89 self
.Stream
._dispatch
_depth
=2
90 self
.Stream
.dispatch
=self
.dispatch
91 self
.Stream
.stream_header_received
=self
._check
_stream
_start
92 self
._owner
.debug_flags
.append(simplexml
.DBG_NODEBUILDER
)
93 self
.Stream
.DEBUG
=self
._owner
.DEBUG
94 self
.Stream
.features
=None
95 self
._metastream
=Node('stream:stream')
96 self
._metastream
.setNamespace(self
._owner
.Namespace
)
97 self
._metastream
.setAttr('version','1.0')
98 self
._metastream
.setAttr('xmlns:stream',NS_STREAMS
)
99 self
._metastream
.setAttr('to',self
._owner
.Server
)
100 self
._owner
.send("<?xml version='1.0'?>%s>"%str
(self
._metastream
)[:-2])
102 def _check_stream_start(self
,ns
,tag
,attrs
):
103 if ns
<>NS_STREAMS
or tag
<>'stream':
104 raise ValueError('Incorrect stream start: (%s,%s). Terminating.'%(tag
,ns
))
106 def Process(self
, timeout
=0):
107 """ Check incoming stream for data waiting. If "timeout" is positive - block for as max. this time.
109 1) length of processed data if some data were processed;
110 2) '0' string if no data were processed but link is alive;
111 3) 0 (zero) if underlying connection is closed.
112 Take note that in case of disconnection detect during Process() call
113 disconnect handlers are called automatically.
115 for handler
in self
._cycleHandlers
: handler(self
)
116 if len(self
._pendingExceptions
) > 0:
117 _pendingException
= self
._pendingExceptions
.pop()
118 raise _pendingException
[0], _pendingException
[1], _pendingException
[2]
119 if self
._owner
.Connection
.pending_data(timeout
):
120 try: data
=self
._owner
.Connection
.receive()
121 except IOError: return
122 self
.Stream
.Parse(data
)
123 if len(self
._pendingExceptions
) > 0:
124 _pendingException
= self
._pendingExceptions
.pop()
125 raise _pendingException
[0], _pendingException
[1], _pendingException
[2]
126 if data
: return len(data
)
127 return '0' # It means that nothing is received but link is alive.
129 def RegisterNamespace(self
,xmlns
,order
='info'):
130 """ Creates internal structures for newly registered namespace.
131 You can register handlers for this namespace afterwards. By default one namespace
132 already registered (jabber:client or jabber:component:accept depending on context. """
133 self
.DEBUG('Registering namespace "%s"'%xmlns
,order
)
134 self
.handlers
[xmlns
]={}
135 self
.RegisterProtocol('unknown',Protocol
,xmlns
=xmlns
)
136 self
.RegisterProtocol('default',Protocol
,xmlns
=xmlns
)
138 def RegisterProtocol(self
,tag_name
,Proto
,xmlns
=None,order
='info'):
139 """ Used to declare some top-level stanza name to dispatcher.
140 Needed to start registering handlers for such stanzas.
141 Iq, message and presence protocols are registered by default. """
142 if not xmlns
: xmlns
=self
._owner
.defaultNamespace
143 self
.DEBUG('Registering protocol "%s" as %s(%s)'%(tag_name
,Proto
,xmlns
), order
)
144 self
.handlers
[xmlns
][tag_name
]={type:Proto
, 'default':[]}
146 def RegisterNamespaceHandler(self
,xmlns
,handler
,typ
='',ns
='', makefirst
=0, system
=0):
147 """ Register handler for processing all stanzas for specified namespace. """
148 self
.RegisterHandler('default', handler
, typ
, ns
, xmlns
, makefirst
, system
)
150 def RegisterHandler(self
,name
,handler
,typ
='',ns
='',xmlns
=None, makefirst
=0, system
=0):
151 """Register user callback as stanzas handler of declared type. Callback must take
152 (if chained, see later) arguments: dispatcher instance (for replying), incomed
153 return of previous handlers.
154 The callback must raise xmpp.NodeProcessed just before return if it want preven
155 callbacks to be called with the same stanza as argument _and_, more importantly
156 library from returning stanza to sender with error set (to be enabled in 0.2 ve
158 "name" - name of stanza. F.e. "iq".
159 "handler" - user callback.
160 "typ" - value of stanza's "type" attribute. If not specified any value match
161 "ns" - namespace of child that stanza must contain.
162 "chained" - chain together output of several handlers.
163 "makefirst" - insert handler in the beginning of handlers list instead of
164 adding it to the end. Note that more common handlers (i.e. w/o "typ" and "
165 will be called first nevertheless.
166 "system" - call handler even if NodeProcessed Exception were raised already.
168 if not xmlns
: xmlns
=self
._owner
.defaultNamespace
169 self
.DEBUG('Registering handler %s for "%s" type->%s ns->%s(%s)'%(handler
,name
,typ
,ns
,xmlns
), 'info')
170 if not typ
and not ns
: typ
='default'
171 if not self
.handlers
.has_key(xmlns
): self
.RegisterNamespace(xmlns
,'warn')
172 if not self
.handlers
[xmlns
].has_key(name
): self
.RegisterProtocol(name
,Protocol
,xmlns
,'warn')
173 if not self
.handlers
[xmlns
][name
].has_key(typ
+ns
): self
.handlers
[xmlns
][name
][typ
+ns
]=[]
174 if makefirst
: self
.handlers
[xmlns
][name
][typ
+ns
].insert(0,{'func':handler
,'system':system
})
175 else: self
.handlers
[xmlns
][name
][typ
+ns
].append({'func':handler
,'system':system
})
177 def RegisterHandlerOnce(self
,name
,handler
,typ
='',ns
='',xmlns
=None,makefirst
=0, system
=0):
178 """ Unregister handler after first call (not implemented yet). """
179 if not xmlns
: xmlns
=self
._owner
.defaultNamespace
180 self
.RegisterHandler(name
, handler
, typ
, ns
, xmlns
, makefirst
, system
)
182 def UnregisterHandler(self
,name
,handler
,typ
='',ns
='',xmlns
=None):
183 """ Unregister handler. "typ" and "ns" must be specified exactly the same as with registering."""
184 if not xmlns
: xmlns
=self
._owner
.defaultNamespace
185 if not self
.handlers
.has_key(xmlns
): return
186 if not typ
and not ns
: typ
='default'
187 for pack
in self
.handlers
[xmlns
][name
][typ
+ns
]:
188 if handler
==pack
['func']: break
190 try: self
.handlers
[xmlns
][name
][typ
+ns
].remove(pack
)
191 except ValueError: pass
193 def RegisterDefaultHandler(self
,handler
):
194 """ Specify the handler that will be used if no NodeProcessed exception were raised.
195 This is returnStanzaHandler by default. """
196 self
._defaultHandler
=handler
198 def RegisterEventHandler(self
,handler
):
199 """ Register handler that will process events. F.e. "FILERECEIVED" event. """
200 self
._eventHandler
=handler
202 def returnStanzaHandler(self
,conn
,stanza
):
203 """ Return stanza back to the sender with <feature-not-implemennted/> error set. """
204 if stanza
.getType() in ['get','set']:
205 conn
.send(Error(stanza
,ERR_FEATURE_NOT_IMPLEMENTED
))
207 def streamErrorHandler(self
,conn
,error
):
208 name
,text
='error',error
.getData()
209 for tag
in error
.getChildren():
210 if tag
.getNamespace()==NS_XMPP_STREAMS
:
211 if tag
.getName()=='text': text
=tag
.getData()
212 else: name
=tag
.getName()
213 if name
in stream_exceptions
.keys(): exc
=stream_exceptions
[name
]
214 else: exc
=StreamError
215 raise exc((name
,text
))
217 def RegisterCycleHandler(self
,handler
):
218 """ Register handler that will be called on every Dispatcher.Process() call. """
219 if handler
not in self
._cycleHandlers
: self
._cycleHandlers
.append(handler
)
221 def UnregisterCycleHandler(self
,handler
):
222 """ Unregister handler that will is called on every Dispatcher.Process() call."""
223 if handler
in self
._cycleHandlers
: self
._cycleHandlers
.remove(handler
)
225 def Event(self
,realm
,event
,data
):
226 """ Raise some event. Takes three arguments:
227 1) "realm" - scope of event. Usually a namespace.
228 2) "event" - the event itself. F.e. "SUCESSFULL SEND".
229 3) data that comes along with event. Depends on event."""
230 if self
._eventHandler
: self
._eventHandler
(realm
,event
,data
)
232 def dispatch(self
,stanza
,session
=None,direct
=0):
233 """ Main procedure that performs XMPP stanza recognition and calling apppropriate handlers for it.
234 Called internally. """
235 if not session
: session
=self
236 session
.Stream
._mini
_dom
=None
237 name
=stanza
.getName()
239 if not direct
and self
._owner
._route
:
241 if stanza
.getAttr('error') == None:
242 if len(stanza
.getChildren()) == 1:
243 stanza
= stanza
.getChildren()[0]
244 name
=stanza
.getName()
246 for each
in stanza
.getChildren():
247 self
.dispatch(each
,session
,direct
=1)
249 elif name
== 'presence':
251 elif name
in ('features','bind'):
254 raise UnsupportedStanzaType(name
)
256 if name
=='features': session
.Stream
.features
=stanza
258 xmlns
=stanza
.getNamespace()
259 if not self
.handlers
.has_key(xmlns
):
260 self
.DEBUG("Unknown namespace: " + xmlns
,'warn')
262 if not self
.handlers
[xmlns
].has_key(name
):
263 self
.DEBUG("Unknown stanza: " + name
,'warn')
266 self
.DEBUG("Got %s/%s stanza"%(xmlns
,name
), 'ok')
268 if stanza
.__class
__.__name
__=='Node': stanza
=self
.handlers
[xmlns
][name
][type](node
=stanza
)
272 stanza
.props
=stanza
.getProperties()
275 session
.DEBUG("Dispatching %s stanza with type->%s props->%s id->%s"%(name
,typ
,stanza
.props
,ID
),'ok')
277 list=['default'] # we will use all handlers:
278 if self
.handlers
[xmlns
][name
].has_key(typ
): list.append(typ
) # from very common...
279 for prop
in stanza
.props
:
280 if self
.handlers
[xmlns
][name
].has_key(prop
): list.append(prop
)
281 if typ
and self
.handlers
[xmlns
][name
].has_key(typ
+prop
): list.append(typ
+prop
) # ...to very particular
283 chain
=self
.handlers
[xmlns
]['default']['default']
285 if key
: chain
= chain
+ self
.handlers
[xmlns
][name
][key
]
288 if session
._expected
.has_key(ID
):
290 if type(session
._expected
[ID
])==type(()):
291 cb
,args
=session
._expected
[ID
]
292 session
.DEBUG("Expected stanza arrived. Callback %s(%s) found!"%(cb
,args
),'ok')
293 try: cb(session
,stanza
,**args
)
294 except Exception, typ
:
295 if typ
.__class
__.__name
__<>'NodeProcessed': raise
297 session
.DEBUG("Expected stanza arrived!",'ok')
298 session
._expected
[ID
]=stanza
300 for handler
in chain
:
301 if user
or handler
['system']:
303 handler
['func'](session
,stanza
)
304 except Exception, typ
:
305 if typ
.__class
__.__name
__<>'NodeProcessed':
306 self
._pendingExceptions
.insert(0, sys
.exc_info())
309 if user
and self
._defaultHandler
: self
._defaultHandler
(session
,stanza
)
311 def WaitForResponse(self
, ID
, timeout
=DefaultTimeout
):
312 """ Block and wait until stanza with specific "id" attribute will come.
313 If no such stanza is arrived within timeout, return None.
314 If operation failed for some reason then owner's attributes
315 lastErrNode, lastErr and lastErrCode are set accordingly. """
316 self
._expected
[ID
]=None
318 abort_time
=time
.time() + timeout
319 self
.DEBUG("Waiting for ID:%s with timeout %s..." % (ID
,timeout
),'wait')
320 while not self
._expected
[ID
]:
321 if not self
.Process(0.04):
322 self
._owner
.lastErr
="Disconnect"
324 if time
.time() > abort_time
:
325 self
._owner
.lastErr
="Timeout"
327 response
=self
._expected
[ID
]
328 del self
._expected
[ID
]
329 if response
.getErrorCode():
330 self
._owner
.lastErrNode
=response
331 self
._owner
.lastErr
=response
.getError()
332 self
._owner
.lastErrCode
=response
.getErrorCode()
335 def SendAndWaitForResponse(self
, stanza
, timeout
=DefaultTimeout
):
336 """ Put stanza on the wire and wait for recipient's response to it. """
337 return self
.WaitForResponse(self
.send(stanza
),timeout
)
339 def SendAndCallForResponse(self
, stanza
, func
, args
={}):
340 """ Put stanza on the wire and call back when recipient replies.
341 Additional callback arguments can be specified in args. """
342 self
._expected
[self
.send(stanza
)]=(func
,args
)
344 def send(self
,stanza
):
345 """ Serialise stanza and put it on the wire. Assign an unique ID to it before send.
346 Returns assigned ID."""
347 if type(stanza
) in [type(''), type(u
'')]: return self
._owner
_send
(stanza
)
348 if not isinstance(stanza
,Protocol
): _ID
=None
349 elif not stanza
.getID():
354 else: _ID
=stanza
.getID()
355 if self
._owner
._registered
_name
and not stanza
.getAttr('from'): stanza
.setAttr('from',self
._owner
._registered
_name
)
356 if self
._owner
._route
and stanza
.getName()!='bind':
357 to
=self
._owner
.Server
358 if stanza
.getTo() and stanza
.getTo().getDomain():
359 to
=stanza
.getTo().getDomain()
363 route
=Protocol('route',to
=to
,frm
=frm
,payload
=[stanza
])
365 stanza
.setNamespace(self
._owner
.Namespace
)
366 stanza
.setParent(self
._metastream
)
367 self
._owner
_send
(stanza
)
370 def disconnect(self
):
371 """ Send a stream terminator and and handle all incoming stanzas before stream closure. """
372 self
._owner
_send
('</stream:stream>')
373 while self
.Process(1): pass