4 ## Copyright (C) 2004 Alexey "Snake" Nezhdanov
6 ## This program is free software; you can redistribute it and/or modify
7 ## it under the terms of the GNU General Public License as published by
8 ## the Free Software Foundation; either version 2, or (at your option)
11 ## This program is distributed in the hope that it will be useful,
12 ## but WITHOUT ANY WARRANTY; without even the implied warranty of
13 ## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 ## GNU General Public License for more details.
19 When your handler is called it is getting the session instance as the first argument.
20 This is the difference from xmpppy 0.1 where you got the "Client" instance.
21 With Session class you can have "multi-session" client instead of having
22 one client for each connection. Is is specifically important when you are
26 from protocol
import *
28 # Transport-level flags
46 The Session class instance is used for storing all session-related info like
47 credentials, socket/xml stream/session state flags, roster items (in case of
48 client type connection) etc.
49 Session object have no means of discovering is any info is ready to be read.
50 Instead you should use poll() (recomended) or select() methods for this purpose.
51 Session can be one of two types: 'server' and 'client'. 'server' session handles
52 inbound connection and 'client' one used to create an outbound one.
53 Session instance have multitude of internal attributes. The most imporant is the 'peer' one.
54 It is set once the peer is authenticated (client).
56 def __init__(self
,socket
,owner
,xmlns
=None,peer
=None):
57 """ When the session is created it's type (client/server) is determined from the beginning.
58 socket argument is the pre-created socket-like object.
59 It must have the following methods: send, recv, fileno, close.
60 owner is the 'master' instance that have Dispatcher plugged into it and generally
61 will take care about all session events.
62 xmlns is the stream namespace that will be used. Client must set this argument
63 If server sets this argument than stream will be dropped if opened with some another namespace.
64 peer is the name of peer instance. This is the flag that differentiates client session from
65 server session. Client must set it to the name of the server that will be connected, server must
66 leave this argument alone.
72 self
._socket
_state
=SOCKET_UNCONNECTED
76 self
._socket
_state
=SOCKET_ALIVE
78 self
._send
=socket
.send
79 self
._recv
=socket
.recv
80 self
.fileno
=socket
.fileno
83 self
.Dispatcher
=owner
.Dispatcher
84 self
.DBG_LINE
='session'
85 self
.DEBUG
=owner
.Dispatcher
.DEBUG
88 if self
.TYP
=='server': self
.ID
=`random
.random()`
[2:]
92 self
._stream
_pos
_queued
=None
93 self
._stream
_pos
_sent
=0
94 self
.deliver_key_queue
=[]
95 self
.deliver_queue_map
={}
98 self
._session
_state
=SESSION_NOT_AUTHED
99 self
.waiting_features
=[]
100 for feature
in [NS_TLS
,NS_SASL
,NS_BIND
,NS_SESSION
]:
101 if feature
in owner
.features
: self
.waiting_features
.append(feature
)
103 self
.feature_in_process
=None
104 self
.slave_session
=None
107 def StartStream(self
):
108 """ This method is used to initialise the internal xml expat parser
109 and to send initial stream header (in case of client connection).
110 Should be used after initial connection and after every stream restart."""
111 self
._stream
_state
=STREAM__NOT_OPENED
112 self
.Stream
=simplexml
.NodeBuilder()
113 self
.Stream
._dispatch
_depth
=2
114 self
.Stream
.dispatch
=self
._dispatch
115 self
.Parse
=self
.Stream
.Parse
116 self
.Stream
.stream_footer_received
=self
._stream
_close
117 if self
.TYP
=='client':
118 self
.Stream
.stream_header_received
=self
._catch
_stream
_id
121 self
.Stream
.stream_header_received
=self
._stream
_open
124 """ Reads all pending incoming data.
125 Raises IOError on disconnection.
126 Blocks until at least one byte is read."""
127 try: received
= self
._recv
(10240)
128 except: received
= ''
130 if len(received
): # length of 0 means disconnect
131 self
.DEBUG(`self
.fileno()`
+' '+received
,'got')
133 self
.DEBUG('Socket error while receiving data','error')
134 self
.set_socket_state(SOCKET_DEAD
)
135 raise IOError("Peer disconnected")
138 def sendnow(self
,chunk
):
139 """ Put chunk into "immidiatedly send" queue.
140 Should only be used for auth/TLS stuff and like.
141 If you just want to shedule regular stanza for delivery use enqueue method.
143 if isinstance(chunk
,Node
): chunk
= chunk
.__str
__().encode('utf-8')
144 elif type(chunk
)==type(u
''): chunk
= chunk
.encode('utf-8')
147 def enqueue(self
,stanza
):
148 """ Takes Protocol instance as argument.
149 Puts stanza into "send" fifo queue. Items into the send queue are hold until
150 stream authenticated. After that this method is effectively the same as "sendnow" method."""
151 if isinstance(stanza
,Protocol
):
152 self
.stanza_queue
.append(stanza
)
153 else: self
.sendbuffer
+=stanza
154 if self
._socket
_state
>=SOCKET_ALIVE
: self
.push_queue()
156 def push_queue(self
,failreason
=ERR_RECIPIENT_UNAVAILABLE
):
157 """ If stream is authenticated than move items from "send" queue to "immidiatedly send" queue.
158 Else if the stream is failed then return all queued stanzas with error passed as argument.
159 Otherwise do nothing."""
160 # If the stream authed - convert stanza_queue into sendbuffer and set the checkpoints
162 if self
._stream
_state
>=STREAM__CLOSED
or self
._socket
_state
>=SOCKET_DEAD
: # the stream failed. Return all stanzas that are still waiting for delivery.
163 self
._owner
.deactivatesession(self
)
164 for key
in self
.deliver_key_queue
: # Not sure. May be I
165 self
._dispatch
(Error(self
.deliver_queue_map
[key
],failreason
),trusted
=1) # should simply re-dispatch it?
166 for stanza
in self
.stanza_queue
: # But such action can invoke
167 self
._dispatch
(Error(stanza
,failreason
),trusted
=1) # Infinite loops in case of S2S connection...
168 self
.deliver_queue_map
,self
.deliver_key_queue
,self
.stanza_queue
={},[],[]
170 elif self
._session
_state
>=SESSION_AUTHED
: # FIXME! äÏÌÖÅÎ ÂÙÔØ ËÁËÏÊ-ÔÏ ÄÒÕÇÏÊ ÆÌÁÇ.
172 for stanza
in self
.stanza_queue
:
173 txt
=stanza
.__str
__().encode('utf-8')
175 self
._stream
_pos
_queued
+=len(txt
) # should be re-evaluated for SSL connection.
176 self
.deliver_queue_map
[self
._stream
_pos
_queued
]=stanza
# position of the stream when stanza will be successfully and fully sent
177 self
.deliver_key_queue
.append(self
._stream
_pos
_queued
)
181 def flush_queue(self
):
182 """ Put the "immidiatedly send" queue content on the wire. Blocks until at least one byte sent."""
186 sent
=self
._send
(self
.sendbuffer
) # âÌÏËÉÒÕÀÝÁÑ ÛÔÕÞËÁ!
189 self
.set_socket_state(SOCKET_DEAD
)
190 self
.DEBUG("Socket error while sending data",'error')
191 return self
.terminate_stream()
192 self
.DEBUG(`self
.fileno()`
+' '+self
.sendbuffer
[:sent
],'sent')
193 self
._stream
_pos
_sent
+=sent
194 self
.sendbuffer
=self
.sendbuffer
[sent
:]
195 self
._stream
_pos
_delivered
=self
._stream
_pos
_sent
# Should be acquired from socket somehow. Take SSL into account.
196 while self
.deliver_key_queue
and self
._stream
_pos
_delivered
>self
.deliver_key_queue
[0]:
197 del self
.deliver_queue_map
[self
.deliver_key_queue
[0]]
198 self
.deliver_key_queue
.remove(self
.deliver_key_queue
[0])
201 def _dispatch(self
,stanza
,trusted
=0):
202 """ This is callback that is used to pass the received stanza forth to owner's dispatcher
203 _if_ the stream is authorised. Otherwise the stanza is just dropped.
204 The 'trusted' argument is used to emulate stanza receive.
205 This method is used internally.
207 self
._owner
.packets
+=1
208 print self
._owner
.packets
209 if self
._stream
_state
==STREAM__OPENED
or trusted
: # if the server really should reject all stanzas after he is closed stream (himeself)?
210 self
.DEBUG(stanza
.__str
__(),'dispatch')
211 stanza
.trusted
=trusted
212 return self
.Dispatcher
.dispatch(stanza
,self
)
214 def _catch_stream_id(self
,ns
=None,tag
='stream',attrs
={}):
215 """ This callback is used to detect the stream namespace of incoming stream. Used internally. """
216 if not attrs
.has_key('id') or not attrs
['id']:
217 return self
.terminate_stream(STREAM_INVALID_XML
)
219 if not attrs
.has_key('version'): self
._owner
.Dialback(self
)
221 def _stream_open(self
,ns
=None,tag
='stream',attrs
={}):
222 """ This callback is used to handle opening stream tag of the incoming stream.
223 In the case of client session it just make some validation.
224 Server session also sends server headers and if the stream valid the features node.
226 text
='<?xml version="1.0" encoding="utf-8"?>\n<stream:stream'
227 if self
.TYP
=='client':
228 text
+=' to="%s"'%self
.peer
230 text
+=' id="%s"'%self
.ID
231 if not attrs
.has_key('to'): text
+=' from="%s"'%self
._owner
.servernames
[0]
232 else: text
+=' from="%s"'%attrs
['to']
233 if attrs
.has_key('xml:lang'): text
+=' xml:lang="%s"'%attrs
['xml:lang']
234 if self
.xmlns
: xmlns
=self
.xmlns
235 else: xmlns
=NS_SERVER
236 text
+=' xmlns:db="%s" xmlns:stream="%s" xmlns="%s"'%(NS_DIALBACK
,NS_STREAMS
,xmlns
)
237 if attrs
.has_key('version') or self
.TYP
=='client': text
+=' version="1.0"'
238 self
.sendnow(text
+'>')
239 self
.set_stream_state(STREAM__OPENED
)
240 if self
.TYP
=='client': return
241 if tag
<>'stream': return self
.terminate_stream(STREAM_INVALID_XML
)
242 if ns
<>NS_STREAMS
: return self
.terminate_stream(STREAM_INVALID_NAMESPACE
)
243 if self
.Stream
.xmlns
<>self
.xmlns
: return self
.terminate_stream(STREAM_BAD_NAMESPACE_PREFIX
)
244 if not attrs
.has_key('to'): return self
.terminate_stream(STREAM_IMPROPER_ADDRESSING
)
245 if attrs
['to'] not in self
._owner
.servernames
: return self
.terminate_stream(STREAM_HOST_UNKNOWN
)
246 self
.ourname
=attrs
['to'].lower()
247 if self
.TYP
=='server' and attrs
.has_key('version'):
249 features
=Node('stream:features')
250 if NS_TLS
in self
.waiting_features
:
251 features
.NT
.starttls
.setNamespace(NS_TLS
)
252 features
.T
.starttls
.NT
.required
253 if NS_SASL
in self
.waiting_features
:
254 features
.NT
.mechanisms
.setNamespace(NS_SASL
)
255 for mec
in self
._owner
.SASL
.mechanisms
:
256 features
.T
.mechanisms
.NT
.mechanism
=mec
258 if NS_BIND
in self
.waiting_features
: features
.NT
.bind
.setNamespace(NS_BIND
)
259 if NS_SESSION
in self
.waiting_features
: features
.NT
.session
.setNamespace(NS_SESSION
)
260 self
.sendnow(features
)
262 def feature(self
,feature
):
263 """ Declare some stream feature as activated one. """
264 if feature
not in self
.features
: self
.features
.append(feature
)
265 self
.unfeature(feature
)
267 def unfeature(self
,feature
):
268 """ Declare some feature as illegal. Illegal features can not be used.
269 Example: BIND feature becomes illegal after Non-SASL auth. """
270 if feature
in self
.waiting_features
: self
.waiting_features
.remove(feature
)
272 def _stream_close(self
,unregister
=1):
273 """ Write the closing stream tag and destroy the underlaying socket. Used internally. """
274 if self
._stream
_state
>=STREAM__CLOSED
: return
275 self
.set_stream_state(STREAM__CLOSING
)
276 self
.sendnow('</stream:stream>')
277 self
.set_stream_state(STREAM__CLOSED
)
278 self
.push_queue() # decompose queue really since STREAM__CLOSED
279 self
._owner
.flush_queues()
280 if unregister
: self
._owner
.unregistersession(self
)
281 self
._destroy
_socket
()
283 def terminate_stream(self
,error
=None,unregister
=1):
284 """ Notify the peer about stream closure.
285 Ensure that xmlstream is not brokes - i.e. if the stream isn't opened yet -
286 open it before closure.
287 If the error condition is specified than create a stream error and send it along with
289 Emulate receiving 'unavailable' type presence just before stream closure.
291 if self
._stream
_state
>=STREAM__CLOSING
: return
292 if self
._stream
_state
<STREAM__OPENED
:
293 self
.set_stream_state(STREAM__CLOSING
)
296 self
.set_stream_state(STREAM__CLOSING
)
297 p
=Presence(typ
='unavailable')
298 p
.setNamespace(NS_CLIENT
)
299 self
._dispatch
(p
,trusted
=1)
301 if isinstance(error
,Node
): self
.sendnow(error
)
302 else: self
.sendnow(ErrorNode(error
))
303 self
._stream
_close
(unregister
=unregister
)
304 if self
.slave_session
:
305 self
.slave_session
.terminate_stream(STREAM_REMOTE_CONNECTION_FAILED
)
307 def _destroy_socket(self
):
308 """ Break cyclic dependancies to let python's GC free memory right now."""
309 self
.Stream
.dispatch
=None
310 self
.Stream
.stream_footer_received
=None
311 self
.Stream
.stream_header_received
=None
312 self
.Stream
.destroy()
314 self
.set_socket_state(SOCKET_DEAD
)
316 def start_feature(self
,f
):
317 """ Declare some feature as "negotiating now" to prevent other features from start negotiating. """
318 if self
.feature_in_process
: raise "Starting feature %s over %s !"%(f
,self
.feature_in_process
)
319 self
.feature_in_process
=f
321 def stop_feature(self
,f
):
322 """ Declare some feature as "negotiated" to allow other features start negotiating. """
323 if self
.feature_in_process
<>f
: raise "Stopping feature %s instead of %s !"%(f
,self
.feature_in_process
)
324 self
.feature_in_process
=None
326 def set_socket_state(self
,newstate
):
327 """ Change the underlaying socket state.
328 Socket starts with SOCKET_UNCONNECTED state
329 and then proceeds (possibly) to SOCKET_ALIVE
330 and then to SOCKET_DEAD """
331 if self
._socket
_state
<newstate
: self
._socket
_state
=newstate
333 def set_session_state(self
,newstate
):
334 """ Change the session state.
335 Session starts with SESSION_NOT_AUTHED state
336 and then comes through
337 SESSION_AUTHED, SESSION_BOUND, SESSION_OPENED and SESSION_CLOSED states.
339 if self
._session
_state
<newstate
:
340 if self
._session
_state
<SESSION_AUTHED
and \
341 newstate
>=SESSION_AUTHED
: self
._stream
_pos
_queued
=self
._stream
_pos
_sent
342 self
._session
_state
=newstate
344 def set_stream_state(self
,newstate
):
345 """ Change the underlaying XML stream state
346 Stream starts with STREAM__NOT_OPENED and then proceeds with
347 STREAM__OPENED, STREAM__CLOSING and STREAM__CLOSED states.
348 Note that some features (like TLS and SASL)
349 requires stream re-start so this state can have non-linear changes. """
350 if self
._stream
_state
<newstate
: self
._stream
_state
=newstate