make file closing more robust
[python/dscho.git] / Lib / test / test_kqueue.py
blob42ef958060d86de1ff652716e88761fec682cc68
1 """
2 Tests for kqueue wrapper.
3 """
4 import socket
5 import errno
6 import time
7 import select
8 import sys
9 import unittest
11 from test import support
12 if not hasattr(select, "kqueue"):
13 raise unittest.SkipTest("test works only on BSD")
15 class TestKQueue(unittest.TestCase):
16 def test_create_queue(self):
17 kq = select.kqueue()
18 self.assert_(kq.fileno() > 0, kq.fileno())
19 self.assert_(not kq.closed)
20 kq.close()
21 self.assert_(kq.closed)
22 self.assertRaises(ValueError, kq.fileno)
24 def test_create_event(self):
25 from operator import lt, le, gt, ge
26 fd = sys.stderr.fileno()
27 ev = select.kevent(fd)
28 other = select.kevent(1000)
29 self.assertEqual(ev.ident, fd)
30 self.assertEqual(ev.filter, select.KQ_FILTER_READ)
31 self.assertEqual(ev.flags, select.KQ_EV_ADD)
32 self.assertEqual(ev.fflags, 0)
33 self.assertEqual(ev.data, 0)
34 self.assertEqual(ev.udata, 0)
35 self.assertEqual(ev, ev)
36 self.assertNotEqual(ev, other)
37 self.assert_(ev < other)
38 self.assert_(other >= ev)
39 for op in lt, le, gt, ge:
40 self.assertRaises(TypeError, op, ev, None)
41 self.assertRaises(TypeError, op, ev, 1)
42 self.assertRaises(TypeError, op, ev, "ev")
44 ev = select.kevent(fd, select.KQ_FILTER_WRITE)
45 self.assertEqual(ev.ident, fd)
46 self.assertEqual(ev.filter, select.KQ_FILTER_WRITE)
47 self.assertEqual(ev.flags, select.KQ_EV_ADD)
48 self.assertEqual(ev.fflags, 0)
49 self.assertEqual(ev.data, 0)
50 self.assertEqual(ev.udata, 0)
51 self.assertEqual(ev, ev)
52 self.assertNotEqual(ev, other)
54 ev = select.kevent(fd, select.KQ_FILTER_WRITE, select.KQ_EV_ONESHOT)
55 self.assertEqual(ev.ident, fd)
56 self.assertEqual(ev.filter, select.KQ_FILTER_WRITE)
57 self.assertEqual(ev.flags, select.KQ_EV_ONESHOT)
58 self.assertEqual(ev.fflags, 0)
59 self.assertEqual(ev.data, 0)
60 self.assertEqual(ev.udata, 0)
61 self.assertEqual(ev, ev)
62 self.assertNotEqual(ev, other)
64 ev = select.kevent(1, 2, 3, 4, 5, 6)
65 self.assertEqual(ev.ident, 1)
66 self.assertEqual(ev.filter, 2)
67 self.assertEqual(ev.flags, 3)
68 self.assertEqual(ev.fflags, 4)
69 self.assertEqual(ev.data, 5)
70 self.assertEqual(ev.udata, 6)
71 self.assertEqual(ev, ev)
72 self.assertNotEqual(ev, other)
74 def test_queue_event(self):
75 serverSocket = socket.socket()
76 serverSocket.bind(('127.0.0.1', 0))
77 serverSocket.listen(1)
78 client = socket.socket()
79 client.setblocking(False)
80 try:
81 client.connect(('127.0.0.1', serverSocket.getsockname()[1]))
82 except socket.error as e:
83 self.assertEquals(e.args[0], errno.EINPROGRESS)
84 else:
85 #raise AssertionError("Connect should have raised EINPROGRESS")
86 pass # FreeBSD doesn't raise an exception here
87 server, addr = serverSocket.accept()
89 if sys.platform.startswith("darwin"):
90 flags = select.KQ_EV_ADD | select.KQ_EV_ENABLE
91 else:
92 flags = 0
94 kq = select.kqueue()
95 kq2 = select.kqueue.fromfd(kq.fileno())
97 ev = select.kevent(server.fileno(),
98 select.KQ_FILTER_WRITE,
99 select.KQ_EV_ADD | select.KQ_EV_ENABLE)
100 kq.control([ev], 0)
101 ev = select.kevent(server.fileno(),
102 select.KQ_FILTER_READ,
103 select.KQ_EV_ADD | select.KQ_EV_ENABLE)
104 kq.control([ev], 0)
105 ev = select.kevent(client.fileno(),
106 select.KQ_FILTER_WRITE,
107 select.KQ_EV_ADD | select.KQ_EV_ENABLE)
108 kq2.control([ev], 0)
109 ev = select.kevent(client.fileno(),
110 select.KQ_FILTER_READ,
111 select.KQ_EV_ADD | select.KQ_EV_ENABLE)
112 kq2.control([ev], 0)
114 events = kq.control(None, 4, 1)
115 events = [(e.ident, e.filter, e.flags) for e in events]
116 events.sort()
117 self.assertEquals(events, [
118 (client.fileno(), select.KQ_FILTER_WRITE, flags),
119 (server.fileno(), select.KQ_FILTER_WRITE, flags)])
121 client.send(b"Hello!")
122 server.send(b"world!!!")
124 # We may need to call it several times
125 for i in range(10):
126 events = kq.control(None, 4, 1)
127 if len(events) == 4:
128 break
129 time.sleep(1.0)
130 else:
131 self.fail('timeout waiting for event notifications')
133 events = [(e.ident, e.filter, e.flags) for e in events]
134 events.sort()
136 self.assertEquals(events, [
137 (client.fileno(), select.KQ_FILTER_WRITE, flags),
138 (client.fileno(), select.KQ_FILTER_READ, flags),
139 (server.fileno(), select.KQ_FILTER_WRITE, flags),
140 (server.fileno(), select.KQ_FILTER_READ, flags)])
142 # Remove completely client, and server read part
143 ev = select.kevent(client.fileno(),
144 select.KQ_FILTER_WRITE,
145 select.KQ_EV_DELETE)
146 kq.control([ev], 0)
147 ev = select.kevent(client.fileno(),
148 select.KQ_FILTER_READ,
149 select.KQ_EV_DELETE)
150 kq.control([ev], 0)
151 ev = select.kevent(server.fileno(),
152 select.KQ_FILTER_READ,
153 select.KQ_EV_DELETE)
154 kq.control([ev], 0, 0)
156 events = kq.control([], 4, 0.99)
157 events = [(e.ident, e.filter, e.flags) for e in events]
158 events.sort()
159 self.assertEquals(events, [
160 (server.fileno(), select.KQ_FILTER_WRITE, flags)])
162 client.close()
163 server.close()
164 serverSocket.close()
166 def test_main():
167 support.run_unittest(TestKQueue)
169 if __name__ == "__main__":
170 test_main()