3 ## Copyright (C) 2004 Alexey "Snake" Nezhdanov
5 ## This program is free software; you can redistribute it and/or modify
6 ## it under the terms of the GNU General Public License as published by
7 ## the Free Software Foundation; either version 2, or (at your option)
10 ## This program is distributed in the hope that it will be useful,
11 ## but WITHOUT ANY WARRANTY; without even the implied warranty of
12 ## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 ## GNU General Public License for more details.
15 # $Id: filetransfer.py,v 1.6 2004/12/25 20:06:59 snakeru Exp $
18 This module contains IBB class that is the simple implementation of JEP-0047.
19 Note that this is just a transport for data. You have to negotiate data transfer before
20 (via StreamInitiation most probably). Unfortunately SI is not implemented yet.
23 from protocol
import *
24 from dispatcher
import PlugIn
28 """ IBB used to transfer small-sized data chunk over estabilished xmpp connection.
29 Data is split into small blocks (by default 3000 bytes each), encoded as base 64
30 and sent to another entity that compiles these blocks back into the data chunk.
31 This is very inefficiend but should work under any circumstances. Note that
32 using IBB normally should be the last resort.
35 """ Initialise internal variables. """
38 self
._exported
_methods
=[self
.OpenStream
]
40 self
._ampnode
=Node(NS_AMP
+' amp',payload
=[Node('rule',{'condition':'deliver-at','value':'stored','action':'error'}),Node('rule',{'condition':'match-resource','value':'exact','action':'error'})])
42 def plugin(self
,owner
):
43 """ Register handlers for receiving incoming datastreams. Used internally. """
44 self
._owner
.RegisterHandlerOnce('iq',self
.StreamOpenReplyHandler
) # Move to StreamOpen and specify stanza id
45 self
._owner
.RegisterHandler('iq',self
.IqHandler
,ns
=NS_IBB
)
46 self
._owner
.RegisterHandler('message',self
.ReceiveHandler
,ns
=NS_IBB
)
48 def IqHandler(self
,conn
,stanza
):
49 """ Handles streams state change. Used internally. """
51 self
.DEBUG('IqHandler called typ->%s'%typ
,'info')
52 if typ
=='set' and stanza
.getTag('open',namespace
=NS_IBB
): self
.StreamOpenHandler(conn
,stanza
)
53 elif typ
=='set' and stanza
.getTag('close',namespace
=NS_IBB
): self
.StreamCloseHandler(conn
,stanza
)
54 elif typ
=='result': self
.StreamCommitHandler(conn
,stanza
)
55 elif typ
=='error': self
.StreamOpenReplyHandler(conn
,stanza
)
56 else: conn
.send(Error(stanza
,ERR_BAD_REQUEST
))
59 def StreamOpenHandler(self
,conn
,stanza
):
60 """ Handles opening of new incoming stream. Used internally. """
63 from='romeo@montague.net/orchard'
64 to='juliet@capulet.com/balcony'
68 xmlns='http://jabber.org/protocol/ibb'/>
72 sid
,blocksize
=stanza
.getTagAttr('open','sid'),stanza
.getTagAttr('open','block-size')
73 self
.DEBUG('StreamOpenHandler called sid->%s blocksize->%s'%(sid
,blocksize
),'info')
74 try: blocksize
=int(blocksize
)
75 except: err
=ERR_BAD_REQUEST
76 if not sid
or not blocksize
: err
=ERR_BAD_REQUEST
77 elif sid
in self
._streams
.keys(): err
=ERR_UNEXPECTED_REQUEST
78 if err
: rep
=Error(stanza
,err
)
80 self
.DEBUG("Opening stream: id %s, block-size %s"%(sid
,blocksize
),'info')
81 rep
=Protocol('iq',stanza
.getFrom(),'result',stanza
.getTo(),{'id':stanza
.getID()})
82 self
._streams
[sid
]={'direction':'<'+str(stanza
.getFrom()),'block-size':blocksize
,'fp':open('/tmp/xmpp_file_'+sid
,'w'),'seq':0,'syn_id':stanza
.getID()}
85 def OpenStream(self
,sid
,to
,fp
,blocksize
=3000):
86 """ Start new stream. You should provide stream id 'sid', the endpoind jid 'to',
87 the file object containing info for send 'fp'. Also the desired blocksize can be specified.
88 Take into account that recommended stanza size is 4k and IBB uses base64 encoding
89 that increases size of data by 1/3."""
90 if sid
in self
._streams
.keys(): return
91 if not JID(to
).getResource(): return
92 self
._streams
[sid
]={'direction':'|>'+to
,'block-size':blocksize
,'fp':fp
,'seq':0}
93 self
._owner
.RegisterCycleHandler(self
.SendHandler
)
94 syn
=Protocol('iq',to
,'set',payload
=[Node(NS_IBB
+' open',{'sid':sid
,'block-size':blocksize
})])
96 self
._streams
[sid
]['syn_id']=syn
.getID()
97 return self
._streams
[sid
]
99 def SendHandler(self
,conn
):
100 """ Send next portion of data if it is time to do it. Used internally. """
101 self
.DEBUG('SendHandler called','info')
102 for sid
in self
._streams
.keys():
103 stream
=self
._streams
[sid
]
104 if stream
['direction'][:2]=='|>': cont
=1
105 elif stream
['direction'][0]=='>':
106 chunk
=stream
['fp'].read(stream
['block-size'])
108 datanode
=Node(NS_IBB
+' data',{'sid':sid
,'seq':stream
['seq']},base64
.encodestring(chunk
))
110 if stream
['seq']==65536: stream
['seq']=0
111 conn
.send(Protocol('message',stream
['direction'][1:],payload
=[datanode
,self
._ampnode
]))
113 """ notify the other side about stream closing
114 notify the local user about sucessfull send
115 delete the local stream"""
116 conn
.send(Protocol('iq',stream
['direction'][1:],'set',payload
=[Node(NS_IBB
+' close',{'sid':sid
})]))
117 conn
.Event(self
.DBG_LINE
,'SUCCESSFULL SEND',stream
)
118 del self
._streams
[sid
]
119 self
._owner
.UnregisterCycleHandler(self
.SendHandler
)
122 <message from='romeo@montague.net/orchard' to='juliet@capulet.com/balcony' id='msg1'>
123 <data xmlns='http://jabber.org/protocol/ibb' sid='mySID' seq='0'>
124 qANQR1DBwU4DX7jmYZnncmUQB/9KuKBddzQH+tZ1ZywKK0yHKnq57kWq+RFtQdCJ
125 WpdWpR0uQsuJe7+vh3NWn59/gTc5MDlX8dS9p0ovStmNcyLhxVgmqS8ZKhsblVeu
126 IpQ0JgavABqibJolc3BKrVtVV1igKiX/N7Pi8RtY1K18toaMDhdEfhBRzO/XB0+P
127 AQhYlRjNacGcslkhXqNjK5Va4tuOAPy2n1Q8UUrHbUd0g+xJ9Bm0G0LZXyvCWyKH
128 kuNEHFQiLuCY6Iv0myq6iX6tjuHehZlFSh80b5BVV9tNLwNR5Eqz1klxMhoghJOA
130 <amp xmlns='http://jabber.org/protocol/amp'>
131 <rule condition='deliver-at' value='stored' action='error'/>
132 <rule condition='match-resource' value='exact' action='error'/>
137 def ReceiveHandler(self
,conn
,stanza
):
138 """ Receive next portion of incoming datastream and store it write
139 it to temporary file. Used internally.
141 sid
,seq
,data
=stanza
.getTagAttr('data','sid'),stanza
.getTagAttr('data','seq'),stanza
.getTagData('data')
142 self
.DEBUG('ReceiveHandler called sid->%s seq->%s'%(sid
,seq
),'info')
143 try: seq
=int(seq
); data
=base64
.decodestring(data
)
144 except: seq
=''; data
=''
146 if not sid
in self
._streams
.keys(): err
=ERR_ITEM_NOT_FOUND
148 stream
=self
._streams
[sid
]
149 if not data
: err
=ERR_BAD_REQUEST
150 elif seq
<>stream
['seq']: err
=ERR_UNEXPECTED_REQUEST
152 self
.DEBUG('Successfull receive sid->%s %s+%s bytes'%(sid
,stream
['fp'].tell(),len(data
)),'ok')
154 stream
['fp'].write(data
)
156 self
.DEBUG('Error on receive: %s'%err,'error')
157 conn
.send(Error(Iq(to
=stanza
.getFrom(),frm
=stanza
.getTo(),payload
=[Node(NS_IBB
+' close')]),err
,reply
=0))
159 def StreamCloseHandler(self
,conn
,stanza
):
160 """ Handle stream closure due to all data transmitted.
161 Raise xmpppy event specifying successfull data receive. """
162 sid
=stanza
.getTagAttr('close','sid')
163 self
.DEBUG('StreamCloseHandler called sid->%s'%sid,'info')
164 if sid
in self
._streams
.keys():
165 conn
.send(stanza
.buildReply('result'))
166 conn
.Event(self
.DBG_LINE
,'SUCCESSFULL RECEIVE',self
._streams
[sid
])
167 del self
._streams
[sid
]
168 else: conn
.send(Error(stanza
,ERR_ITEM_NOT_FOUND
))
170 def StreamBrokenHandler(self
,conn
,stanza
):
171 """ Handle stream closure due to all some error while receiving data.
172 Raise xmpppy event specifying unsuccessfull data receive. """
173 syn_id
=stanza
.getID()
174 self
.DEBUG('StreamBrokenHandler called syn_id->%s'%syn
_id
,'info')
175 for sid
in self
._streams
.keys():
176 stream
=self
._streams
[sid
]
177 if stream
['syn_id']==syn_id
:
178 if stream
['direction'][0]=='<': conn
.Event(self
.DBG_LINE
,'ERROR ON RECEIVE',stream
)
179 else: conn
.Event(self
.DBG_LINE
,'ERROR ON SEND',stream
)
180 del self
._streams
[sid
]
182 def StreamOpenReplyHandler(self
,conn
,stanza
):
183 """ Handle remote side reply about is it agree or not to receive our datastream.
184 Used internally. Raises xmpppy event specfiying if the data transfer
186 syn_id
=stanza
.getID()
187 self
.DEBUG('StreamOpenReplyHandler called syn_id->%s'%syn
_id
,'info')
188 for sid
in self
._streams
.keys():
189 stream
=self
._streams
[sid
]
190 if stream
['syn_id']==syn_id
:
191 if stanza
.getType()=='error':
192 if stream
['direction'][0]=='<': conn
.Event(self
.DBG_LINE
,'ERROR ON RECEIVE',stream
)
193 else: conn
.Event(self
.DBG_LINE
,'ERROR ON SEND',stream
)
194 del self
._streams
[sid
]
195 elif stanza
.getType()=='result':
196 if stream
['direction'][0]=='|':
197 stream
['direction']=stream
['direction'][1:]
198 conn
.Event(self
.DBG_LINE
,'STREAM COMMITTED',stream
)
199 else: conn
.send(Error(stanza
,ERR_UNEXPECTED_REQUEST
))