3 import asyncore
, asynchat
, socket
, time
6 from test
import test_support
12 HOST
= test_support
.HOST
13 SERVER_QUIT
= 'QUIT\n'
16 class echo_server(threading
.Thread
):
17 # parameter to determine the number of bytes passed back to the
21 def __init__(self
, event
):
22 threading
.Thread
.__init
__(self
)
24 self
.sock
= socket
.socket(socket
.AF_INET
, socket
.SOCK_STREAM
)
25 self
.port
= test_support
.bind_port(self
.sock
)
26 # This will be set if the client wants us to wait before echoing data
28 self
.start_resend_event
= None
33 conn
, client
= self
.sock
.accept()
35 # collect data until quit message is seen
36 while SERVER_QUIT
not in self
.buffer:
40 self
.buffer = self
.buffer + data
42 # remove the SERVER_QUIT message
43 self
.buffer = self
.buffer.replace(SERVER_QUIT
, '')
45 if self
.start_resend_event
:
46 self
.start_resend_event
.wait()
48 # re-send entire set of collected data
50 # this may fail on some tests, such as test_close_when_done, since
51 # the client closes the channel when it's done sending
53 n
= conn
.send(self
.buffer[:self
.chunk_size
])
55 self
.buffer = self
.buffer[n
:]
62 class echo_client(asynchat
.async_chat
):
64 def __init__(self
, terminator
, server_port
):
65 asynchat
.async_chat
.__init
__(self
)
67 self
.create_socket(socket
.AF_INET
, socket
.SOCK_STREAM
)
68 self
.connect((HOST
, server_port
))
69 self
.set_terminator(terminator
)
72 def handle_connect(self
):
75 if sys
.platform
== 'darwin':
76 # select.poll returns a select.POLLHUP at the end of the tests
77 # on darwin, so just ignore it
78 def handle_expt(self
):
81 def collect_incoming_data(self
, data
):
84 def found_terminator(self
):
85 self
.contents
.append(self
.buffer)
89 def start_echo_server():
90 event
= threading
.Event()
91 s
= echo_server(event
)
95 time
.sleep(0.01) # Give server time to start accepting.
99 @unittest.skipUnless(threading
, 'Threading required for this test.')
100 class TestAsynchat(unittest
.TestCase
):
104 self
._threads
= test_support
.threading_setup()
107 test_support
.threading_cleanup(*self
._threads
)
109 def line_terminator_check(self
, term
, server_chunk
):
110 event
= threading
.Event()
111 s
= echo_server(event
)
112 s
.chunk_size
= server_chunk
116 time
.sleep(0.01) # Give server time to start accepting.
117 c
= echo_client(term
, s
.port
)
119 c
.push("world%s" % term
)
120 c
.push("I'm not dead yet!%s" % term
)
122 asyncore
.loop(use_poll
=self
.usepoll
, count
=300, timeout
=.01)
125 self
.assertEqual(c
.contents
, ["hello world", "I'm not dead yet!"])
127 # the line terminator tests below check receiving variously-sized
128 # chunks back from the server in order to exercise all branches of
129 # async_chat.handle_read
131 def test_line_terminator1(self
):
132 # test one-character terminator
134 self
.line_terminator_check('\n', l
)
136 def test_line_terminator2(self
):
137 # test two-character terminator
139 self
.line_terminator_check('\r\n', l
)
141 def test_line_terminator3(self
):
142 # test three-character terminator
144 self
.line_terminator_check('qqq', l
)
146 def numeric_terminator_check(self
, termlen
):
147 # Try reading a fixed number of bytes
148 s
, event
= start_echo_server()
149 c
= echo_client(termlen
, s
.port
)
150 data
= "hello world, I'm not dead yet!\n"
153 asyncore
.loop(use_poll
=self
.usepoll
, count
=300, timeout
=.01)
156 self
.assertEqual(c
.contents
, [data
[:termlen
]])
158 def test_numeric_terminator1(self
):
159 # check that ints & longs both work (since type is
160 # explicitly checked in async_chat.handle_read)
161 self
.numeric_terminator_check(1)
162 self
.numeric_terminator_check(1L)
164 def test_numeric_terminator2(self
):
165 self
.numeric_terminator_check(6L)
167 def test_none_terminator(self
):
168 # Try reading a fixed number of bytes
169 s
, event
= start_echo_server()
170 c
= echo_client(None, s
.port
)
171 data
= "hello world, I'm not dead yet!\n"
174 asyncore
.loop(use_poll
=self
.usepoll
, count
=300, timeout
=.01)
177 self
.assertEqual(c
.contents
, [])
178 self
.assertEqual(c
.buffer, data
)
180 def test_simple_producer(self
):
181 s
, event
= start_echo_server()
182 c
= echo_client('\n', s
.port
)
183 data
= "hello world\nI'm not dead yet!\n"
184 p
= asynchat
.simple_producer(data
+SERVER_QUIT
, buffer_size
=8)
185 c
.push_with_producer(p
)
186 asyncore
.loop(use_poll
=self
.usepoll
, count
=300, timeout
=.01)
189 self
.assertEqual(c
.contents
, ["hello world", "I'm not dead yet!"])
191 def test_string_producer(self
):
192 s
, event
= start_echo_server()
193 c
= echo_client('\n', s
.port
)
194 data
= "hello world\nI'm not dead yet!\n"
195 c
.push_with_producer(data
+SERVER_QUIT
)
196 asyncore
.loop(use_poll
=self
.usepoll
, count
=300, timeout
=.01)
199 self
.assertEqual(c
.contents
, ["hello world", "I'm not dead yet!"])
201 def test_empty_line(self
):
202 # checks that empty lines are handled correctly
203 s
, event
= start_echo_server()
204 c
= echo_client('\n', s
.port
)
205 c
.push("hello world\n\nI'm not dead yet!\n")
207 asyncore
.loop(use_poll
=self
.usepoll
, count
=300, timeout
=.01)
210 self
.assertEqual(c
.contents
, ["hello world", "", "I'm not dead yet!"])
212 def test_close_when_done(self
):
213 s
, event
= start_echo_server()
214 s
.start_resend_event
= threading
.Event()
215 c
= echo_client('\n', s
.port
)
216 c
.push("hello world\nI'm not dead yet!\n")
219 asyncore
.loop(use_poll
=self
.usepoll
, count
=300, timeout
=.01)
221 # Only allow the server to start echoing data back to the client after
222 # the client has closed its connection. This prevents a race condition
223 # where the server echoes all of its data before we can check that it
224 # got any down below.
225 s
.start_resend_event
.set()
228 self
.assertEqual(c
.contents
, [])
229 # the server might have been able to send a byte or two back, but this
230 # at least checks that it received something and didn't just fail
231 # (which could still result in the client not having received anything)
232 self
.assertTrue(len(s
.buffer) > 0)
235 class TestAsynchat_WithPoll(TestAsynchat
):
238 class TestHelperFunctions(unittest
.TestCase
):
239 def test_find_prefix_at_end(self
):
240 self
.assertEqual(asynchat
.find_prefix_at_end("qwerty\r", "\r\n"), 1)
241 self
.assertEqual(asynchat
.find_prefix_at_end("qwertydkjf", "\r\n"), 0)
243 class TestFifo(unittest
.TestCase
):
244 def test_basic(self
):
248 self
.assertEqual(len(f
), 2)
249 self
.assertEqual(f
.first(), 7)
250 self
.assertEqual(f
.pop(), (1, 7))
251 self
.assertEqual(len(f
), 1)
252 self
.assertEqual(f
.first(), 'a')
253 self
.assertEqual(f
.is_empty(), False)
254 self
.assertEqual(f
.pop(), (1, 'a'))
255 self
.assertEqual(len(f
), 0)
256 self
.assertEqual(f
.is_empty(), True)
257 self
.assertEqual(f
.pop(), (0, None))
259 def test_given_list(self
):
260 f
= asynchat
.fifo(['x', 17, 3])
261 self
.assertEqual(len(f
), 3)
262 self
.assertEqual(f
.pop(), (1, 'x'))
263 self
.assertEqual(f
.pop(), (1, 17))
264 self
.assertEqual(f
.pop(), (1, 3))
265 self
.assertEqual(f
.pop(), (0, None))
268 def test_main(verbose
=None):
269 test_support
.run_unittest(TestAsynchat
, TestAsynchat_WithPoll
,
270 TestHelperFunctions
, TestFifo
)
272 if __name__
== "__main__":
273 test_main(verbose
=True)