Try to fix transient refleaks in test_asynchat.
[python.git] / Lib / test / test_asynchat.py
blobcaa1117baa18b16f9fb22e064bea03af43380d2c
1 # test asynchat
3 import asyncore, asynchat, socket, threading, time
4 import unittest
5 import sys
6 from test import test_support
8 # Skip tests if thread module does not exist.
9 test_support.import_module('thread')
11 HOST = test_support.HOST
12 SERVER_QUIT = 'QUIT\n'
14 class echo_server(threading.Thread):
15 # parameter to determine the number of bytes passed back to the
16 # client each send
17 chunk_size = 1
19 def __init__(self, event):
20 threading.Thread.__init__(self)
21 self.event = event
22 self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
23 self.port = test_support.bind_port(self.sock)
25 def run(self):
26 self.sock.listen(1)
27 self.event.set()
28 conn, client = self.sock.accept()
29 self.buffer = ""
30 # collect data until quit message is seen
31 while SERVER_QUIT not in self.buffer:
32 data = conn.recv(1)
33 if not data:
34 break
35 self.buffer = self.buffer + data
37 # remove the SERVER_QUIT message
38 self.buffer = self.buffer.replace(SERVER_QUIT, '')
40 # re-send entire set of collected data
41 try:
42 # this may fail on some tests, such as test_close_when_done, since
43 # the client closes the channel when it's done sending
44 while self.buffer:
45 n = conn.send(self.buffer[:self.chunk_size])
46 time.sleep(0.001)
47 self.buffer = self.buffer[n:]
48 except:
49 pass
51 conn.close()
52 self.sock.close()
54 class echo_client(asynchat.async_chat):
56 def __init__(self, terminator, server_port):
57 asynchat.async_chat.__init__(self)
58 self.contents = []
59 self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
60 self.connect((HOST, server_port))
61 self.set_terminator(terminator)
62 self.buffer = ''
64 def handle_connect(self):
65 pass
67 if sys.platform == 'darwin':
68 # select.poll returns a select.POLLHUP at the end of the tests
69 # on darwin, so just ignore it
70 def handle_expt(self):
71 pass
73 def collect_incoming_data(self, data):
74 self.buffer += data
76 def found_terminator(self):
77 self.contents.append(self.buffer)
78 self.buffer = ""
81 def start_echo_server():
82 event = threading.Event()
83 s = echo_server(event)
84 s.start()
85 event.wait()
86 event.clear()
87 time.sleep(0.01) # Give server time to start accepting.
88 return s, event
91 class TestAsynchat(unittest.TestCase):
92 usepoll = False
94 def setUp (self):
95 self._threads = test_support.threading_setup()
97 def tearDown (self):
98 test_support.threading_cleanup(*self._threads)
100 def line_terminator_check(self, term, server_chunk):
101 event = threading.Event()
102 s = echo_server(event)
103 s.chunk_size = server_chunk
104 s.start()
105 event.wait()
106 event.clear()
107 time.sleep(0.01) # Give server time to start accepting.
108 c = echo_client(term, s.port)
109 c.push("hello ")
110 c.push("world%s" % term)
111 c.push("I'm not dead yet!%s" % term)
112 c.push(SERVER_QUIT)
113 asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
114 s.join()
116 self.assertEqual(c.contents, ["hello world", "I'm not dead yet!"])
118 # the line terminator tests below check receiving variously-sized
119 # chunks back from the server in order to exercise all branches of
120 # async_chat.handle_read
122 def test_line_terminator1(self):
123 # test one-character terminator
124 for l in (1,2,3):
125 self.line_terminator_check('\n', l)
127 def test_line_terminator2(self):
128 # test two-character terminator
129 for l in (1,2,3):
130 self.line_terminator_check('\r\n', l)
132 def test_line_terminator3(self):
133 # test three-character terminator
134 for l in (1,2,3):
135 self.line_terminator_check('qqq', l)
137 def numeric_terminator_check(self, termlen):
138 # Try reading a fixed number of bytes
139 s, event = start_echo_server()
140 c = echo_client(termlen, s.port)
141 data = "hello world, I'm not dead yet!\n"
142 c.push(data)
143 c.push(SERVER_QUIT)
144 asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
145 s.join()
147 self.assertEqual(c.contents, [data[:termlen]])
149 def test_numeric_terminator1(self):
150 # check that ints & longs both work (since type is
151 # explicitly checked in async_chat.handle_read)
152 self.numeric_terminator_check(1)
153 self.numeric_terminator_check(1L)
155 def test_numeric_terminator2(self):
156 self.numeric_terminator_check(6L)
158 def test_none_terminator(self):
159 # Try reading a fixed number of bytes
160 s, event = start_echo_server()
161 c = echo_client(None, s.port)
162 data = "hello world, I'm not dead yet!\n"
163 c.push(data)
164 c.push(SERVER_QUIT)
165 asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
166 s.join()
168 self.assertEqual(c.contents, [])
169 self.assertEqual(c.buffer, data)
171 def test_simple_producer(self):
172 s, event = start_echo_server()
173 c = echo_client('\n', s.port)
174 data = "hello world\nI'm not dead yet!\n"
175 p = asynchat.simple_producer(data+SERVER_QUIT, buffer_size=8)
176 c.push_with_producer(p)
177 asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
178 s.join()
180 self.assertEqual(c.contents, ["hello world", "I'm not dead yet!"])
182 def test_string_producer(self):
183 s, event = start_echo_server()
184 c = echo_client('\n', s.port)
185 data = "hello world\nI'm not dead yet!\n"
186 c.push_with_producer(data+SERVER_QUIT)
187 asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
188 s.join()
190 self.assertEqual(c.contents, ["hello world", "I'm not dead yet!"])
192 def test_empty_line(self):
193 # checks that empty lines are handled correctly
194 s, event = start_echo_server()
195 c = echo_client('\n', s.port)
196 c.push("hello world\n\nI'm not dead yet!\n")
197 c.push(SERVER_QUIT)
198 asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
199 s.join()
201 self.assertEqual(c.contents, ["hello world", "", "I'm not dead yet!"])
203 def test_close_when_done(self):
204 s, event = start_echo_server()
205 c = echo_client('\n', s.port)
206 c.push("hello world\nI'm not dead yet!\n")
207 c.push(SERVER_QUIT)
208 c.close_when_done()
209 asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
210 s.join()
212 self.assertEqual(c.contents, [])
213 # the server might have been able to send a byte or two back, but this
214 # at least checks that it received something and didn't just fail
215 # (which could still result in the client not having received anything)
216 self.assertTrue(len(s.buffer) > 0)
219 class TestAsynchat_WithPoll(TestAsynchat):
220 usepoll = True
222 class TestHelperFunctions(unittest.TestCase):
223 def test_find_prefix_at_end(self):
224 self.assertEqual(asynchat.find_prefix_at_end("qwerty\r", "\r\n"), 1)
225 self.assertEqual(asynchat.find_prefix_at_end("qwertydkjf", "\r\n"), 0)
227 class TestFifo(unittest.TestCase):
228 def test_basic(self):
229 f = asynchat.fifo()
230 f.push(7)
231 f.push('a')
232 self.assertEqual(len(f), 2)
233 self.assertEqual(f.first(), 7)
234 self.assertEqual(f.pop(), (1, 7))
235 self.assertEqual(len(f), 1)
236 self.assertEqual(f.first(), 'a')
237 self.assertEqual(f.is_empty(), False)
238 self.assertEqual(f.pop(), (1, 'a'))
239 self.assertEqual(len(f), 0)
240 self.assertEqual(f.is_empty(), True)
241 self.assertEqual(f.pop(), (0, None))
243 def test_given_list(self):
244 f = asynchat.fifo(['x', 17, 3])
245 self.assertEqual(len(f), 3)
246 self.assertEqual(f.pop(), (1, 'x'))
247 self.assertEqual(f.pop(), (1, 17))
248 self.assertEqual(f.pop(), (1, 3))
249 self.assertEqual(f.pop(), (0, None))
252 def test_main(verbose=None):
253 test_support.run_unittest(TestAsynchat, TestAsynchat_WithPoll,
254 TestHelperFunctions, TestFifo)
256 if __name__ == "__main__":
257 test_main(verbose=True)