2 Tests for kqueue wrapper.
11 from test
import test_support
12 if not hasattr(select
, "kqueue"):
13 raise test_support
.TestSkipped("test works only on BSD")
15 class TestKQueue(unittest
.TestCase
):
16 def test_create_queue(self
):
18 self
.assert_(kq
.fileno() > 0, kq
.fileno())
19 self
.assert_(not kq
.closed
)
21 self
.assert_(kq
.closed
)
22 self
.assertRaises(ValueError, kq
.fileno
)
24 def test_create_event(self
):
25 fd
= sys
.stderr
.fileno()
26 ev
= select
.kevent(fd
)
27 other
= select
.kevent(1000)
28 self
.assertEqual(ev
.ident
, fd
)
29 self
.assertEqual(ev
.filter, select
.KQ_FILTER_READ
)
30 self
.assertEqual(ev
.flags
, select
.KQ_EV_ADD
)
31 self
.assertEqual(ev
.fflags
, 0)
32 self
.assertEqual(ev
.data
, 0)
33 self
.assertEqual(ev
.udata
, 0)
34 self
.assertEqual(ev
, ev
)
35 self
.assertNotEqual(ev
, other
)
36 self
.assertEqual(cmp(ev
, other
), -1)
37 self
.assert_(ev
< other
)
38 self
.assert_(other
>= ev
)
39 self
.assertRaises(TypeError, cmp, ev
, None)
40 self
.assertRaises(TypeError, cmp, ev
, 1)
41 self
.assertRaises(TypeError, cmp, ev
, "ev")
43 ev
= select
.kevent(fd
, select
.KQ_FILTER_WRITE
)
44 self
.assertEqual(ev
.ident
, fd
)
45 self
.assertEqual(ev
.filter, select
.KQ_FILTER_WRITE
)
46 self
.assertEqual(ev
.flags
, select
.KQ_EV_ADD
)
47 self
.assertEqual(ev
.fflags
, 0)
48 self
.assertEqual(ev
.data
, 0)
49 self
.assertEqual(ev
.udata
, 0)
50 self
.assertEqual(ev
, ev
)
51 self
.assertNotEqual(ev
, other
)
53 ev
= select
.kevent(fd
, select
.KQ_FILTER_WRITE
, select
.KQ_EV_ONESHOT
)
54 self
.assertEqual(ev
.ident
, fd
)
55 self
.assertEqual(ev
.filter, select
.KQ_FILTER_WRITE
)
56 self
.assertEqual(ev
.flags
, select
.KQ_EV_ONESHOT
)
57 self
.assertEqual(ev
.fflags
, 0)
58 self
.assertEqual(ev
.data
, 0)
59 self
.assertEqual(ev
.udata
, 0)
60 self
.assertEqual(ev
, ev
)
61 self
.assertNotEqual(ev
, other
)
63 ev
= select
.kevent(1, 2, 3, 4, 5, 6)
64 self
.assertEqual(ev
.ident
, 1)
65 self
.assertEqual(ev
.filter, 2)
66 self
.assertEqual(ev
.flags
, 3)
67 self
.assertEqual(ev
.fflags
, 4)
68 self
.assertEqual(ev
.data
, 5)
69 self
.assertEqual(ev
.udata
, 6)
70 self
.assertEqual(ev
, ev
)
71 self
.assertNotEqual(ev
, other
)
73 def test_queue_event(self
):
74 serverSocket
= socket
.socket()
75 serverSocket
.bind(('127.0.0.1', 0))
76 serverSocket
.listen(1)
77 client
= socket
.socket()
78 client
.setblocking(False)
80 client
.connect(('127.0.0.1', serverSocket
.getsockname()[1]))
81 except socket
.error
, e
:
82 self
.assertEquals(e
.args
[0], errno
.EINPROGRESS
)
84 #raise AssertionError("Connect should have raised EINPROGRESS")
85 pass # FreeBSD doesn't raise an exception here
86 server
, addr
= serverSocket
.accept()
88 if sys
.platform
.startswith("darwin"):
89 flags
= select
.KQ_EV_ADD | select
.KQ_EV_ENABLE
94 kq2
= select
.kqueue
.fromfd(kq
.fileno())
96 ev
= select
.kevent(server
.fileno(),
97 select
.KQ_FILTER_WRITE
,
98 select
.KQ_EV_ADD | select
.KQ_EV_ENABLE
)
100 ev
= select
.kevent(server
.fileno(),
101 select
.KQ_FILTER_READ
,
102 select
.KQ_EV_ADD | select
.KQ_EV_ENABLE
)
104 ev
= select
.kevent(client
.fileno(),
105 select
.KQ_FILTER_WRITE
,
106 select
.KQ_EV_ADD | select
.KQ_EV_ENABLE
)
108 ev
= select
.kevent(client
.fileno(),
109 select
.KQ_FILTER_READ
,
110 select
.KQ_EV_ADD | select
.KQ_EV_ENABLE
)
113 events
= kq
.control(None, 4, 1)
114 events
= [(e
.ident
, e
.filter, e
.flags
) for e
in events
]
116 self
.assertEquals(events
, [
117 (client
.fileno(), select
.KQ_FILTER_WRITE
, flags
),
118 (server
.fileno(), select
.KQ_FILTER_WRITE
, flags
)])
120 client
.send("Hello!")
121 server
.send("world!!!")
123 # We may need to call it several times
125 events
= kq
.control(None, 4, 1)
130 self
.fail('timeout waiting for event notifications')
132 events
= [(e
.ident
, e
.filter, e
.flags
) for e
in events
]
135 self
.assertEquals(events
, [
136 (client
.fileno(), select
.KQ_FILTER_WRITE
, flags
),
137 (client
.fileno(), select
.KQ_FILTER_READ
, flags
),
138 (server
.fileno(), select
.KQ_FILTER_WRITE
, flags
),
139 (server
.fileno(), select
.KQ_FILTER_READ
, flags
)])
141 # Remove completely client, and server read part
142 ev
= select
.kevent(client
.fileno(),
143 select
.KQ_FILTER_WRITE
,
146 ev
= select
.kevent(client
.fileno(),
147 select
.KQ_FILTER_READ
,
150 ev
= select
.kevent(server
.fileno(),
151 select
.KQ_FILTER_READ
,
153 kq
.control([ev
], 0, 0)
155 events
= kq
.control([], 4, 0.99)
156 events
= [(e
.ident
, e
.filter, e
.flags
) for e
in events
]
158 self
.assertEquals(events
, [
159 (server
.fileno(), select
.KQ_FILTER_WRITE
, flags
)])
166 test_support
.run_unittest(TestKQueue
)
168 if __name__
== "__main__":