Merge branch 'master' of git://repo.or.cz/pyTivo/wmcbrine
[pyTivo/wgw.git] / xmpp / filetransfer.py
blob87ddc2196939be84df0b115ce3042520c81909af
1 ## filetransfer.py
2 ##
3 ## Copyright (C) 2004 Alexey "Snake" Nezhdanov
4 ##
5 ## This program is free software; you can redistribute it and/or modify
6 ## it under the terms of the GNU General Public License as published by
7 ## the Free Software Foundation; either version 2, or (at your option)
8 ## any later version.
9 ##
10 ## This program is distributed in the hope that it will be useful,
11 ## but WITHOUT ANY WARRANTY; without even the implied warranty of
12 ## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 ## GNU General Public License for more details.
15 # $Id: filetransfer.py,v 1.6 2004/12/25 20:06:59 snakeru Exp $
17 """
18 This module contains IBB class that is the simple implementation of JEP-0047.
19 Note that this is just a transport for data. You have to negotiate data transfer before
20 (via StreamInitiation most probably). Unfortunately SI is not implemented yet.
21 """
23 from protocol import *
24 from dispatcher import PlugIn
25 import base64
27 class IBB(PlugIn):
28 """ IBB used to transfer small-sized data chunk over estabilished xmpp connection.
29 Data is split into small blocks (by default 3000 bytes each), encoded as base 64
30 and sent to another entity that compiles these blocks back into the data chunk.
31 This is very inefficiend but should work under any circumstances. Note that
32 using IBB normally should be the last resort.
33 """
34 def __init__(self):
35 """ Initialise internal variables. """
36 PlugIn.__init__(self)
37 self.DBG_LINE='ibb'
38 self._exported_methods=[self.OpenStream]
39 self._streams={}
40 self._ampnode=Node(NS_AMP+' amp',payload=[Node('rule',{'condition':'deliver-at','value':'stored','action':'error'}),Node('rule',{'condition':'match-resource','value':'exact','action':'error'})])
42 def plugin(self,owner):
43 """ Register handlers for receiving incoming datastreams. Used internally. """
44 self._owner.RegisterHandlerOnce('iq',self.StreamOpenReplyHandler) # Move to StreamOpen and specify stanza id
45 self._owner.RegisterHandler('iq',self.IqHandler,ns=NS_IBB)
46 self._owner.RegisterHandler('message',self.ReceiveHandler,ns=NS_IBB)
48 def IqHandler(self,conn,stanza):
49 """ Handles streams state change. Used internally. """
50 typ=stanza.getType()
51 self.DEBUG('IqHandler called typ->%s'%typ,'info')
52 if typ=='set' and stanza.getTag('open',namespace=NS_IBB): self.StreamOpenHandler(conn,stanza)
53 elif typ=='set' and stanza.getTag('close',namespace=NS_IBB): self.StreamCloseHandler(conn,stanza)
54 elif typ=='result': self.StreamCommitHandler(conn,stanza)
55 elif typ=='error': self.StreamOpenReplyHandler(conn,stanza)
56 else: conn.send(Error(stanza,ERR_BAD_REQUEST))
57 raise NodeProcessed
59 def StreamOpenHandler(self,conn,stanza):
60 """ Handles opening of new incoming stream. Used internally. """
61 """
62 <iq type='set'
63 from='romeo@montague.net/orchard'
64 to='juliet@capulet.com/balcony'
65 id='inband_1'>
66 <open sid='mySID'
67 block-size='4096'
68 xmlns='http://jabber.org/protocol/ibb'/>
69 </iq>
70 """
71 err=None
72 sid,blocksize=stanza.getTagAttr('open','sid'),stanza.getTagAttr('open','block-size')
73 self.DEBUG('StreamOpenHandler called sid->%s blocksize->%s'%(sid,blocksize),'info')
74 try: blocksize=int(blocksize)
75 except: err=ERR_BAD_REQUEST
76 if not sid or not blocksize: err=ERR_BAD_REQUEST
77 elif sid in self._streams.keys(): err=ERR_UNEXPECTED_REQUEST
78 if err: rep=Error(stanza,err)
79 else:
80 self.DEBUG("Opening stream: id %s, block-size %s"%(sid,blocksize),'info')
81 rep=Protocol('iq',stanza.getFrom(),'result',stanza.getTo(),{'id':stanza.getID()})
82 self._streams[sid]={'direction':'<'+str(stanza.getFrom()),'block-size':blocksize,'fp':open('/tmp/xmpp_file_'+sid,'w'),'seq':0,'syn_id':stanza.getID()}
83 conn.send(rep)
85 def OpenStream(self,sid,to,fp,blocksize=3000):
86 """ Start new stream. You should provide stream id 'sid', the endpoind jid 'to',
87 the file object containing info for send 'fp'. Also the desired blocksize can be specified.
88 Take into account that recommended stanza size is 4k and IBB uses base64 encoding
89 that increases size of data by 1/3."""
90 if sid in self._streams.keys(): return
91 if not JID(to).getResource(): return
92 self._streams[sid]={'direction':'|>'+to,'block-size':blocksize,'fp':fp,'seq':0}
93 self._owner.RegisterCycleHandler(self.SendHandler)
94 syn=Protocol('iq',to,'set',payload=[Node(NS_IBB+' open',{'sid':sid,'block-size':blocksize})])
95 self._owner.send(syn)
96 self._streams[sid]['syn_id']=syn.getID()
97 return self._streams[sid]
99 def SendHandler(self,conn):
100 """ Send next portion of data if it is time to do it. Used internally. """
101 self.DEBUG('SendHandler called','info')
102 for sid in self._streams.keys():
103 stream=self._streams[sid]
104 if stream['direction'][:2]=='|>': cont=1
105 elif stream['direction'][0]=='>':
106 chunk=stream['fp'].read(stream['block-size'])
107 if chunk:
108 datanode=Node(NS_IBB+' data',{'sid':sid,'seq':stream['seq']},base64.encodestring(chunk))
109 stream['seq']+=1
110 if stream['seq']==65536: stream['seq']=0
111 conn.send(Protocol('message',stream['direction'][1:],payload=[datanode,self._ampnode]))
112 else:
113 """ notify the other side about stream closing
114 notify the local user about sucessfull send
115 delete the local stream"""
116 conn.send(Protocol('iq',stream['direction'][1:],'set',payload=[Node(NS_IBB+' close',{'sid':sid})]))
117 conn.Event(self.DBG_LINE,'SUCCESSFULL SEND',stream)
118 del self._streams[sid]
119 self._owner.UnregisterCycleHandler(self.SendHandler)
122 <message from='romeo@montague.net/orchard' to='juliet@capulet.com/balcony' id='msg1'>
123 <data xmlns='http://jabber.org/protocol/ibb' sid='mySID' seq='0'>
124 qANQR1DBwU4DX7jmYZnncmUQB/9KuKBddzQH+tZ1ZywKK0yHKnq57kWq+RFtQdCJ
125 WpdWpR0uQsuJe7+vh3NWn59/gTc5MDlX8dS9p0ovStmNcyLhxVgmqS8ZKhsblVeu
126 IpQ0JgavABqibJolc3BKrVtVV1igKiX/N7Pi8RtY1K18toaMDhdEfhBRzO/XB0+P
127 AQhYlRjNacGcslkhXqNjK5Va4tuOAPy2n1Q8UUrHbUd0g+xJ9Bm0G0LZXyvCWyKH
128 kuNEHFQiLuCY6Iv0myq6iX6tjuHehZlFSh80b5BVV9tNLwNR5Eqz1klxMhoghJOA
129 </data>
130 <amp xmlns='http://jabber.org/protocol/amp'>
131 <rule condition='deliver-at' value='stored' action='error'/>
132 <rule condition='match-resource' value='exact' action='error'/>
133 </amp>
134 </message>
137 def ReceiveHandler(self,conn,stanza):
138 """ Receive next portion of incoming datastream and store it write
139 it to temporary file. Used internally.
141 sid,seq,data=stanza.getTagAttr('data','sid'),stanza.getTagAttr('data','seq'),stanza.getTagData('data')
142 self.DEBUG('ReceiveHandler called sid->%s seq->%s'%(sid,seq),'info')
143 try: seq=int(seq); data=base64.decodestring(data)
144 except: seq=''; data=''
145 err=None
146 if not sid in self._streams.keys(): err=ERR_ITEM_NOT_FOUND
147 else:
148 stream=self._streams[sid]
149 if not data: err=ERR_BAD_REQUEST
150 elif seq<>stream['seq']: err=ERR_UNEXPECTED_REQUEST
151 else:
152 self.DEBUG('Successfull receive sid->%s %s+%s bytes'%(sid,stream['fp'].tell(),len(data)),'ok')
153 stream['seq']+=1
154 stream['fp'].write(data)
155 if err:
156 self.DEBUG('Error on receive: %s'%err,'error')
157 conn.send(Error(Iq(to=stanza.getFrom(),frm=stanza.getTo(),payload=[Node(NS_IBB+' close')]),err,reply=0))
159 def StreamCloseHandler(self,conn,stanza):
160 """ Handle stream closure due to all data transmitted.
161 Raise xmpppy event specifying successfull data receive. """
162 sid=stanza.getTagAttr('close','sid')
163 self.DEBUG('StreamCloseHandler called sid->%s'%sid,'info')
164 if sid in self._streams.keys():
165 conn.send(stanza.buildReply('result'))
166 conn.Event(self.DBG_LINE,'SUCCESSFULL RECEIVE',self._streams[sid])
167 del self._streams[sid]
168 else: conn.send(Error(stanza,ERR_ITEM_NOT_FOUND))
170 def StreamBrokenHandler(self,conn,stanza):
171 """ Handle stream closure due to all some error while receiving data.
172 Raise xmpppy event specifying unsuccessfull data receive. """
173 syn_id=stanza.getID()
174 self.DEBUG('StreamBrokenHandler called syn_id->%s'%syn_id,'info')
175 for sid in self._streams.keys():
176 stream=self._streams[sid]
177 if stream['syn_id']==syn_id:
178 if stream['direction'][0]=='<': conn.Event(self.DBG_LINE,'ERROR ON RECEIVE',stream)
179 else: conn.Event(self.DBG_LINE,'ERROR ON SEND',stream)
180 del self._streams[sid]
182 def StreamOpenReplyHandler(self,conn,stanza):
183 """ Handle remote side reply about is it agree or not to receive our datastream.
184 Used internally. Raises xmpppy event specfiying if the data transfer
185 is agreed upon."""
186 syn_id=stanza.getID()
187 self.DEBUG('StreamOpenReplyHandler called syn_id->%s'%syn_id,'info')
188 for sid in self._streams.keys():
189 stream=self._streams[sid]
190 if stream['syn_id']==syn_id:
191 if stanza.getType()=='error':
192 if stream['direction'][0]=='<': conn.Event(self.DBG_LINE,'ERROR ON RECEIVE',stream)
193 else: conn.Event(self.DBG_LINE,'ERROR ON SEND',stream)
194 del self._streams[sid]
195 elif stanza.getType()=='result':
196 if stream['direction'][0]=='|':
197 stream['direction']=stream['direction'][1:]
198 conn.Event(self.DBG_LINE,'STREAM COMMITTED',stream)
199 else: conn.send(Error(stanza,ERR_UNEXPECTED_REQUEST))