3 import asyncore
, asynchat
, socket
, threading
, time
6 from test
import test_support
8 # Skip tests if thread module does not exist.
9 test_support
.import_module('thread')
11 HOST
= test_support
.HOST
12 SERVER_QUIT
= 'QUIT\n'
14 class echo_server(threading
.Thread
):
15 # parameter to determine the number of bytes passed back to the
19 def __init__(self
, event
):
20 threading
.Thread
.__init
__(self
)
22 self
.sock
= socket
.socket(socket
.AF_INET
, socket
.SOCK_STREAM
)
23 self
.port
= test_support
.bind_port(self
.sock
)
28 conn
, client
= self
.sock
.accept()
30 # collect data until quit message is seen
31 while SERVER_QUIT
not in self
.buffer:
35 self
.buffer = self
.buffer + data
37 # remove the SERVER_QUIT message
38 self
.buffer = self
.buffer.replace(SERVER_QUIT
, '')
40 # re-send entire set of collected data
42 # this may fail on some tests, such as test_close_when_done, since
43 # the client closes the channel when it's done sending
45 n
= conn
.send(self
.buffer[:self
.chunk_size
])
47 self
.buffer = self
.buffer[n
:]
54 class echo_client(asynchat
.async_chat
):
56 def __init__(self
, terminator
, server_port
):
57 asynchat
.async_chat
.__init
__(self
)
59 self
.create_socket(socket
.AF_INET
, socket
.SOCK_STREAM
)
60 self
.connect((HOST
, server_port
))
61 self
.set_terminator(terminator
)
64 def handle_connect(self
):
67 if sys
.platform
== 'darwin':
68 # select.poll returns a select.POLLHUP at the end of the tests
69 # on darwin, so just ignore it
70 def handle_expt(self
):
73 def collect_incoming_data(self
, data
):
76 def found_terminator(self
):
77 self
.contents
.append(self
.buffer)
81 def start_echo_server():
82 event
= threading
.Event()
83 s
= echo_server(event
)
87 time
.sleep(0.01) # Give server time to start accepting.
91 class TestAsynchat(unittest
.TestCase
):
95 self
._threads
= test_support
.threading_setup()
98 test_support
.threading_cleanup(*self
._threads
)
100 def line_terminator_check(self
, term
, server_chunk
):
101 event
= threading
.Event()
102 s
= echo_server(event
)
103 s
.chunk_size
= server_chunk
107 time
.sleep(0.01) # Give server time to start accepting.
108 c
= echo_client(term
, s
.port
)
110 c
.push("world%s" % term
)
111 c
.push("I'm not dead yet!%s" % term
)
113 asyncore
.loop(use_poll
=self
.usepoll
, count
=300, timeout
=.01)
116 self
.assertEqual(c
.contents
, ["hello world", "I'm not dead yet!"])
118 # the line terminator tests below check receiving variously-sized
119 # chunks back from the server in order to exercise all branches of
120 # async_chat.handle_read
122 def test_line_terminator1(self
):
123 # test one-character terminator
125 self
.line_terminator_check('\n', l
)
127 def test_line_terminator2(self
):
128 # test two-character terminator
130 self
.line_terminator_check('\r\n', l
)
132 def test_line_terminator3(self
):
133 # test three-character terminator
135 self
.line_terminator_check('qqq', l
)
137 def numeric_terminator_check(self
, termlen
):
138 # Try reading a fixed number of bytes
139 s
, event
= start_echo_server()
140 c
= echo_client(termlen
, s
.port
)
141 data
= "hello world, I'm not dead yet!\n"
144 asyncore
.loop(use_poll
=self
.usepoll
, count
=300, timeout
=.01)
147 self
.assertEqual(c
.contents
, [data
[:termlen
]])
149 def test_numeric_terminator1(self
):
150 # check that ints & longs both work (since type is
151 # explicitly checked in async_chat.handle_read)
152 self
.numeric_terminator_check(1)
153 self
.numeric_terminator_check(1L)
155 def test_numeric_terminator2(self
):
156 self
.numeric_terminator_check(6L)
158 def test_none_terminator(self
):
159 # Try reading a fixed number of bytes
160 s
, event
= start_echo_server()
161 c
= echo_client(None, s
.port
)
162 data
= "hello world, I'm not dead yet!\n"
165 asyncore
.loop(use_poll
=self
.usepoll
, count
=300, timeout
=.01)
168 self
.assertEqual(c
.contents
, [])
169 self
.assertEqual(c
.buffer, data
)
171 def test_simple_producer(self
):
172 s
, event
= start_echo_server()
173 c
= echo_client('\n', s
.port
)
174 data
= "hello world\nI'm not dead yet!\n"
175 p
= asynchat
.simple_producer(data
+SERVER_QUIT
, buffer_size
=8)
176 c
.push_with_producer(p
)
177 asyncore
.loop(use_poll
=self
.usepoll
, count
=300, timeout
=.01)
180 self
.assertEqual(c
.contents
, ["hello world", "I'm not dead yet!"])
182 def test_string_producer(self
):
183 s
, event
= start_echo_server()
184 c
= echo_client('\n', s
.port
)
185 data
= "hello world\nI'm not dead yet!\n"
186 c
.push_with_producer(data
+SERVER_QUIT
)
187 asyncore
.loop(use_poll
=self
.usepoll
, count
=300, timeout
=.01)
190 self
.assertEqual(c
.contents
, ["hello world", "I'm not dead yet!"])
192 def test_empty_line(self
):
193 # checks that empty lines are handled correctly
194 s
, event
= start_echo_server()
195 c
= echo_client('\n', s
.port
)
196 c
.push("hello world\n\nI'm not dead yet!\n")
198 asyncore
.loop(use_poll
=self
.usepoll
, count
=300, timeout
=.01)
201 self
.assertEqual(c
.contents
, ["hello world", "", "I'm not dead yet!"])
203 def test_close_when_done(self
):
204 s
, event
= start_echo_server()
205 c
= echo_client('\n', s
.port
)
206 c
.push("hello world\nI'm not dead yet!\n")
209 asyncore
.loop(use_poll
=self
.usepoll
, count
=300, timeout
=.01)
212 self
.assertEqual(c
.contents
, [])
213 # the server might have been able to send a byte or two back, but this
214 # at least checks that it received something and didn't just fail
215 # (which could still result in the client not having received anything)
216 self
.assertTrue(len(s
.buffer) > 0)
219 class TestAsynchat_WithPoll(TestAsynchat
):
222 class TestHelperFunctions(unittest
.TestCase
):
223 def test_find_prefix_at_end(self
):
224 self
.assertEqual(asynchat
.find_prefix_at_end("qwerty\r", "\r\n"), 1)
225 self
.assertEqual(asynchat
.find_prefix_at_end("qwertydkjf", "\r\n"), 0)
227 class TestFifo(unittest
.TestCase
):
228 def test_basic(self
):
232 self
.assertEqual(len(f
), 2)
233 self
.assertEqual(f
.first(), 7)
234 self
.assertEqual(f
.pop(), (1, 7))
235 self
.assertEqual(len(f
), 1)
236 self
.assertEqual(f
.first(), 'a')
237 self
.assertEqual(f
.is_empty(), False)
238 self
.assertEqual(f
.pop(), (1, 'a'))
239 self
.assertEqual(len(f
), 0)
240 self
.assertEqual(f
.is_empty(), True)
241 self
.assertEqual(f
.pop(), (0, None))
243 def test_given_list(self
):
244 f
= asynchat
.fifo(['x', 17, 3])
245 self
.assertEqual(len(f
), 3)
246 self
.assertEqual(f
.pop(), (1, 'x'))
247 self
.assertEqual(f
.pop(), (1, 17))
248 self
.assertEqual(f
.pop(), (1, 3))
249 self
.assertEqual(f
.pop(), (0, None))
252 def test_main(verbose
=None):
253 test_support
.run_unittest(TestAsynchat
, TestAsynchat_WithPoll
,
254 TestHelperFunctions
, TestFifo
)
256 if __name__
== "__main__":
257 test_main(verbose
=True)