Allow XEP 0092 to send os information
[slixmpp.git] / tests / test_stream_handlers.py
bloba475b36c39795efd0820f651221105f0bdd07d50
1 import time
3 from sleekxmpp.test import *
4 from sleekxmpp.xmlstream.handler import *
5 from sleekxmpp.xmlstream.matcher import *
8 class TestHandlers(SleekTest):
9 """
10 Test using handlers and waiters.
11 """
13 def setUp(self):
14 self.stream_start()
16 def tearDown(self):
17 self.stream_close()
19 def testCallback(self):
20 """Test using stream callback handlers."""
22 def callback_handler(stanza):
23 self.xmpp.sendRaw("""
24 <message>
25 <body>Success!</body>
26 </message>
27 """)
29 callback = Callback('Test Callback',
30 MatchXPath('{test}tester'),
31 callback_handler)
33 self.xmpp.registerHandler(callback)
35 self.recv("""<tester xmlns="test" />""")
37 msg = self.Message()
38 msg['body'] = 'Success!'
39 self.send(msg)
41 def testWaiter(self):
42 """Test using stream waiter handler."""
44 def waiter_handler(stanza):
45 iq = self.xmpp.Iq()
46 iq['id'] = 'test'
47 iq['type'] = 'set'
48 iq['query'] = 'test'
49 reply = iq.send(block=True)
50 if reply:
51 self.xmpp.sendRaw("""
52 <message>
53 <body>Successful: %s</body>
54 </message>
55 """ % reply['query'])
57 self.xmpp.add_event_handler('message', waiter_handler, threaded=True)
59 # Send message to trigger waiter_handler
60 self.recv("""
61 <message>
62 <body>Testing</body>
63 </message>
64 """)
66 # Check that Iq was sent by waiter_handler
67 iq = self.Iq()
68 iq['id'] = 'test'
69 iq['type'] = 'set'
70 iq['query'] = 'test'
71 self.send(iq)
73 # Send the reply Iq
74 self.recv("""
75 <iq id="test" type="result">
76 <query xmlns="test" />
77 </iq>
78 """)
80 # Check that waiter_handler received the reply
81 msg = self.Message()
82 msg['body'] = 'Successful: test'
83 self.send(msg)
85 def testWaiterTimeout(self):
86 """Test that waiter handler is removed after timeout."""
88 def waiter_handler(stanza):
89 iq = self.xmpp.Iq()
90 iq['id'] = 'test2'
91 iq['type'] = 'set'
92 iq['query'] = 'test2'
93 reply = iq.send(block=True, timeout=0)
95 self.xmpp.add_event_handler('message', waiter_handler, threaded=True)
97 # Start test by triggerig waiter_handler
98 self.recv("""<message><body>Start Test</body></message>""")
100 # Check that Iq was sent to trigger start of timeout period
101 iq = self.Iq()
102 iq['id'] = 'test2'
103 iq['type'] = 'set'
104 iq['query'] = 'test2'
105 self.send(iq)
107 # Check that the waiter is no longer registered
108 waiter_exists = self.xmpp.removeHandler('IqWait_test2')
110 self.failUnless(waiter_exists == False,
111 "Waiter handler was not removed.")
113 def testIqCallback(self):
114 """Test that iq.send(callback=handle_foo) works."""
115 events = []
117 def handle_foo(iq):
118 events.append('foo')
120 iq = self.Iq()
121 iq['type'] = 'get'
122 iq['id'] = 'test-foo'
123 iq['to'] = 'user@localhost'
124 iq['query'] = 'foo'
125 iq.send(callback=handle_foo)
127 self.send("""
128 <iq type="get" id="test-foo" to="user@localhost">
129 <query xmlns="foo" />
130 </iq>
131 """)
133 self.recv("""
134 <iq type="result" id="test-foo"
135 to="test@localhost"
136 from="user@localhost">
137 <query xmlns="foo">
138 <data />
139 </query>
140 </iq>
141 """)
143 # Give event queue time to process
144 time.sleep(0.1)
146 self.failUnless(events == ['foo'],
147 "Iq callback was not executed: %s" % events)
150 suite = unittest.TestLoader().loadTestsFromTestCase(TestHandlers)