2 Tests for kqueue wrapper.
11 from test
import support
12 if not hasattr(select
, "kqueue"):
13 raise unittest
.SkipTest("test works only on BSD")
15 class TestKQueue(unittest
.TestCase
):
16 def test_create_queue(self
):
18 self
.assert_(kq
.fileno() > 0, kq
.fileno())
19 self
.assert_(not kq
.closed
)
21 self
.assert_(kq
.closed
)
22 self
.assertRaises(ValueError, kq
.fileno
)
24 def test_create_event(self
):
25 from operator
import lt
, le
, gt
, ge
26 fd
= sys
.stderr
.fileno()
27 ev
= select
.kevent(fd
)
28 other
= select
.kevent(1000)
29 self
.assertEqual(ev
.ident
, fd
)
30 self
.assertEqual(ev
.filter, select
.KQ_FILTER_READ
)
31 self
.assertEqual(ev
.flags
, select
.KQ_EV_ADD
)
32 self
.assertEqual(ev
.fflags
, 0)
33 self
.assertEqual(ev
.data
, 0)
34 self
.assertEqual(ev
.udata
, 0)
35 self
.assertEqual(ev
, ev
)
36 self
.assertNotEqual(ev
, other
)
37 self
.assert_(ev
< other
)
38 self
.assert_(other
>= ev
)
39 for op
in lt
, le
, gt
, ge
:
40 self
.assertRaises(TypeError, op
, ev
, None)
41 self
.assertRaises(TypeError, op
, ev
, 1)
42 self
.assertRaises(TypeError, op
, ev
, "ev")
44 ev
= select
.kevent(fd
, select
.KQ_FILTER_WRITE
)
45 self
.assertEqual(ev
.ident
, fd
)
46 self
.assertEqual(ev
.filter, select
.KQ_FILTER_WRITE
)
47 self
.assertEqual(ev
.flags
, select
.KQ_EV_ADD
)
48 self
.assertEqual(ev
.fflags
, 0)
49 self
.assertEqual(ev
.data
, 0)
50 self
.assertEqual(ev
.udata
, 0)
51 self
.assertEqual(ev
, ev
)
52 self
.assertNotEqual(ev
, other
)
54 ev
= select
.kevent(fd
, select
.KQ_FILTER_WRITE
, select
.KQ_EV_ONESHOT
)
55 self
.assertEqual(ev
.ident
, fd
)
56 self
.assertEqual(ev
.filter, select
.KQ_FILTER_WRITE
)
57 self
.assertEqual(ev
.flags
, select
.KQ_EV_ONESHOT
)
58 self
.assertEqual(ev
.fflags
, 0)
59 self
.assertEqual(ev
.data
, 0)
60 self
.assertEqual(ev
.udata
, 0)
61 self
.assertEqual(ev
, ev
)
62 self
.assertNotEqual(ev
, other
)
64 ev
= select
.kevent(1, 2, 3, 4, 5, 6)
65 self
.assertEqual(ev
.ident
, 1)
66 self
.assertEqual(ev
.filter, 2)
67 self
.assertEqual(ev
.flags
, 3)
68 self
.assertEqual(ev
.fflags
, 4)
69 self
.assertEqual(ev
.data
, 5)
70 self
.assertEqual(ev
.udata
, 6)
71 self
.assertEqual(ev
, ev
)
72 self
.assertNotEqual(ev
, other
)
74 def test_queue_event(self
):
75 serverSocket
= socket
.socket()
76 serverSocket
.bind(('127.0.0.1', 0))
77 serverSocket
.listen(1)
78 client
= socket
.socket()
79 client
.setblocking(False)
81 client
.connect(('127.0.0.1', serverSocket
.getsockname()[1]))
82 except socket
.error
as e
:
83 self
.assertEquals(e
.args
[0], errno
.EINPROGRESS
)
85 #raise AssertionError("Connect should have raised EINPROGRESS")
86 pass # FreeBSD doesn't raise an exception here
87 server
, addr
= serverSocket
.accept()
89 if sys
.platform
.startswith("darwin"):
90 flags
= select
.KQ_EV_ADD | select
.KQ_EV_ENABLE
95 kq2
= select
.kqueue
.fromfd(kq
.fileno())
97 ev
= select
.kevent(server
.fileno(),
98 select
.KQ_FILTER_WRITE
,
99 select
.KQ_EV_ADD | select
.KQ_EV_ENABLE
)
101 ev
= select
.kevent(server
.fileno(),
102 select
.KQ_FILTER_READ
,
103 select
.KQ_EV_ADD | select
.KQ_EV_ENABLE
)
105 ev
= select
.kevent(client
.fileno(),
106 select
.KQ_FILTER_WRITE
,
107 select
.KQ_EV_ADD | select
.KQ_EV_ENABLE
)
109 ev
= select
.kevent(client
.fileno(),
110 select
.KQ_FILTER_READ
,
111 select
.KQ_EV_ADD | select
.KQ_EV_ENABLE
)
114 events
= kq
.control(None, 4, 1)
115 events
= [(e
.ident
, e
.filter, e
.flags
) for e
in events
]
117 self
.assertEquals(events
, [
118 (client
.fileno(), select
.KQ_FILTER_WRITE
, flags
),
119 (server
.fileno(), select
.KQ_FILTER_WRITE
, flags
)])
121 client
.send(b
"Hello!")
122 server
.send(b
"world!!!")
124 # We may need to call it several times
126 events
= kq
.control(None, 4, 1)
131 self
.fail('timeout waiting for event notifications')
133 events
= [(e
.ident
, e
.filter, e
.flags
) for e
in events
]
136 self
.assertEquals(events
, [
137 (client
.fileno(), select
.KQ_FILTER_WRITE
, flags
),
138 (client
.fileno(), select
.KQ_FILTER_READ
, flags
),
139 (server
.fileno(), select
.KQ_FILTER_WRITE
, flags
),
140 (server
.fileno(), select
.KQ_FILTER_READ
, flags
)])
142 # Remove completely client, and server read part
143 ev
= select
.kevent(client
.fileno(),
144 select
.KQ_FILTER_WRITE
,
147 ev
= select
.kevent(client
.fileno(),
148 select
.KQ_FILTER_READ
,
151 ev
= select
.kevent(server
.fileno(),
152 select
.KQ_FILTER_READ
,
154 kq
.control([ev
], 0, 0)
156 events
= kq
.control([], 4, 0.99)
157 events
= [(e
.ident
, e
.filter, e
.flags
) for e
in events
]
159 self
.assertEquals(events
, [
160 (server
.fileno(), select
.KQ_FILTER_WRITE
, flags
)])
167 support
.run_unittest(TestKQueue
)
169 if __name__
== "__main__":