5 from slixmpp
.test
import SlixTest
8 class TestInBandByteStreams(SlixTest
):
11 self
.stream_start(plugins
=['xep_0047', 'xep_0030'])
16 def testOpenStream(self
):
17 """Test requesting a stream, successfully"""
21 def on_stream_start(stream
):
22 events
.append('ibb_stream_start')
25 self
.xmpp
.add_event_handler('ibb_stream_start', on_stream_start
)
27 t
= threading
.Thread(name
='open_stream',
28 target
=self
.xmpp
['xep_0047'].open_stream
,
29 args
=('tester@localhost/receiver',),
30 kwargs
={'sid': 'testing'})
34 <iq type="set" to="tester@localhost/receiver" id="1">
35 <open xmlns="http://jabber.org/protocol/ibb"
43 <iq type="result" id="1"
45 from="tester@localhost/receiver" />
52 self
.assertEqual(events
, ['ibb_stream_start'])
54 def testAysncOpenStream(self
):
55 """Test requesting a stream, aysnc"""
59 def on_stream_start(stream
):
60 events
.add('ibb_stream_start')
62 def stream_callback(iq
):
63 events
.add('callback')
65 self
.xmpp
.add_event_handler('ibb_stream_start', on_stream_start
)
67 t
= threading
.Thread(name
='open_stream',
68 target
=self
.xmpp
['xep_0047'].open_stream
,
69 args
=('tester@localhost/receiver',),
70 kwargs
={'sid': 'testing',
72 'callback': stream_callback
})
76 <iq type="set" to="tester@localhost/receiver" id="1">
77 <open xmlns="http://jabber.org/protocol/ibb"
85 <iq type="result" id="1"
87 from="tester@localhost/receiver" />
94 self
.assertEqual(events
, set(['ibb_stream_start', 'callback']))
96 def testSendData(self
):
97 """Test sending data over an in-band bytestream."""
102 def on_stream_start(stream
):
103 streams
.append(stream
)
105 def on_stream_data(d
):
106 data
.append(d
['data'])
108 self
.xmpp
.add_event_handler('ibb_stream_start', on_stream_start
)
109 self
.xmpp
.add_event_handler('ibb_stream_data', on_stream_data
)
111 t
= threading
.Thread(name
='open_stream',
112 target
=self
.xmpp
['xep_0047'].open_stream
,
113 args
=('tester@localhost/receiver',),
114 kwargs
={'sid': 'testing'})
118 <iq type="set" to="tester@localhost/receiver" id="1">
119 <open xmlns="http://jabber.org/protocol/ibb"
127 <iq type="result" id="1"
128 to="tester@localhost"
129 from="tester@localhost/receiver" />
139 # Test sending data out
140 stream
.send("Testing")
143 <iq type="set" id="2"
144 from="tester@localhost"
145 to="tester@localhost/receiver">
146 <data xmlns="http://jabber.org/protocol/ibb"
155 <iq type="result" id="2"
156 to="tester@localhost"
157 from="tester@localhost/receiver" />
160 # Test receiving data
162 <iq type="set" id="A"
163 to="tester@localhost"
164 from="tester@localhost/receiver">
165 <data xmlns="http://jabber.org/protocol/ibb"
174 <iq type="result" id="A"
175 to="tester@localhost/receiver" />
178 self
.assertEqual(data
, [b
'it works!'])
181 suite
= unittest
.TestLoader().loadTestsFromTestCase(TestInBandByteStreams
)