Make connect(), abort() and reconnect() work
[slixmpp.git] / tests / test_stream.py
blob85f22c56922a53238082bac36b1e8bdb4d8b536d
1 import time
2 import unittest
3 from slixmpp.test import SlixTest
6 class TestStreamTester(SlixTest):
7 """
8 Test that we can simulate and test a stanza stream.
9 """
11 def tearDown(self):
12 self.stream_close()
14 def testClientEcho(self):
15 """Test that we can interact with a ClientXMPP instance."""
16 self.stream_start(mode='client')
18 def echo(msg):
19 msg.reply('Thanks for sending: %(body)s' % msg).send()
21 self.xmpp.add_event_handler('message', echo)
23 self.recv("""
24 <message to="tester@localhost" from="user@localhost">
25 <body>Hi!</body>
26 </message>
27 """)
29 self.send("""
30 <message to="user@localhost">
31 <body>Thanks for sending: Hi!</body>
32 </message>
33 """)
35 def testComponentEcho(self):
36 """Test that we can interact with a ComponentXMPP instance."""
37 self.stream_start(mode='component')
39 def echo(msg):
40 msg.reply('Thanks for sending: %(body)s' % msg).send()
42 self.xmpp.add_event_handler('message', echo)
44 self.recv("""
45 <message to="tester.localhost" from="user@localhost">
46 <body>Hi!</body>
47 </message>
48 """)
50 self.send("""
51 <message to="user@localhost" from="tester.localhost">
52 <body>Thanks for sending: Hi!</body>
53 </message>
54 """)
56 def testSendStreamHeader(self):
57 """Test that we can check a sent stream header."""
58 self.stream_start(mode='client', skip=False)
59 self.send_header(sto='localhost')
61 def testStreamDisconnect(self):
62 """Test that the test socket can simulate disconnections."""
63 self.stream_start()
64 events = set()
66 def stream_error(event):
67 events.add('socket_error')
69 self.xmpp.add_event_handler('socket_error', stream_error)
71 self.stream_disconnect()
72 self.xmpp.send_raw(' ')
74 time.sleep(.1)
76 self.failUnless('socket_error' in events,
77 "Stream error event not raised: %s" % events)
80 suite = unittest.TestLoader().loadTestsFromTestCase(TestStreamTester)