make file closing more robust
[python/dscho.git] / Lib / test / test_socketserver.py
blob43ce55c37ab481613c86f151a0c947af98b0dfd1
1 """
2 Test suite for socketserver.
3 """
5 import contextlib
6 import errno
7 import imp
8 import os
9 import select
10 import signal
11 import socket
12 import tempfile
13 import threading
14 import time
15 import unittest
16 import socketserver
18 import test.support
19 from test.support import reap_children, verbose
20 from test.support import TESTFN as TEST_FILE
22 test.support.requires("network")
24 TEST_STR = b"hello world\n"
25 HOST = test.support.HOST
27 HAVE_UNIX_SOCKETS = hasattr(socket, "AF_UNIX")
28 HAVE_FORKING = hasattr(os, "fork") and os.name != "os2"
30 def signal_alarm(n):
31 """Call signal.alarm when it exists (i.e. not on Windows)."""
32 if hasattr(signal, 'alarm'):
33 signal.alarm(n)
35 def receive(sock, n, timeout=20):
36 r, w, x = select.select([sock], [], [], timeout)
37 if sock in r:
38 return sock.recv(n)
39 else:
40 raise RuntimeError("timed out on %r" % (sock,))
42 if HAVE_UNIX_SOCKETS:
43 class ForkingUnixStreamServer(socketserver.ForkingMixIn,
44 socketserver.UnixStreamServer):
45 pass
47 class ForkingUnixDatagramServer(socketserver.ForkingMixIn,
48 socketserver.UnixDatagramServer):
49 pass
52 @contextlib.contextmanager
53 def simple_subprocess(testcase):
54 pid = os.fork()
55 if pid == 0:
56 # Don't throw an exception; it would be caught by the test harness.
57 os._exit(72)
58 yield None
59 pid2, status = os.waitpid(pid, 0)
60 testcase.assertEquals(pid2, pid)
61 testcase.assertEquals(72 << 8, status)
64 class SocketServerTest(unittest.TestCase):
65 """Test all socket servers."""
67 def setUp(self):
68 signal_alarm(20) # Kill deadlocks after 20 seconds.
69 self.port_seed = 0
70 self.test_files = []
72 def tearDown(self):
73 signal_alarm(0) # Didn't deadlock.
74 reap_children()
76 for fn in self.test_files:
77 try:
78 os.remove(fn)
79 except os.error:
80 pass
81 self.test_files[:] = []
83 def pickaddr(self, proto):
84 if proto == socket.AF_INET:
85 return (HOST, 0)
86 else:
87 # XXX: We need a way to tell AF_UNIX to pick its own name
88 # like AF_INET provides port==0.
89 dir = None
90 if os.name == 'os2':
91 dir = '\socket'
92 fn = tempfile.mktemp(prefix='unix_socket.', dir=dir)
93 if os.name == 'os2':
94 # AF_UNIX socket names on OS/2 require a specific prefix
95 # which can't include a drive letter and must also use
96 # backslashes as directory separators
97 if fn[1] == ':':
98 fn = fn[2:]
99 if fn[0] in (os.sep, os.altsep):
100 fn = fn[1:]
101 if os.sep == '/':
102 fn = fn.replace(os.sep, os.altsep)
103 else:
104 fn = fn.replace(os.altsep, os.sep)
105 self.test_files.append(fn)
106 return fn
108 def make_server(self, addr, svrcls, hdlrbase):
109 class MyServer(svrcls):
110 def handle_error(self, request, client_address):
111 self.close_request(request)
112 self.server_close()
113 raise
115 class MyHandler(hdlrbase):
116 def handle(self):
117 line = self.rfile.readline()
118 self.wfile.write(line)
120 if verbose: print("creating server")
121 server = MyServer(addr, MyHandler)
122 self.assertEquals(server.server_address, server.socket.getsockname())
123 return server
125 def run_server(self, svrcls, hdlrbase, testfunc):
126 server = self.make_server(self.pickaddr(svrcls.address_family),
127 svrcls, hdlrbase)
128 # We had the OS pick a port, so pull the real address out of
129 # the server.
130 addr = server.server_address
131 if verbose:
132 print("ADDR =", addr)
133 print("CLASS =", svrcls)
135 t = threading.Thread(
136 name='%s serving' % svrcls,
137 target=server.serve_forever,
138 # Short poll interval to make the test finish quickly.
139 # Time between requests is short enough that we won't wake
140 # up spuriously too many times.
141 kwargs={'poll_interval':0.01})
142 t.daemon = True # In case this function raises.
143 t.start()
144 if verbose: print("server running")
145 for i in range(3):
146 if verbose: print("test client", i)
147 testfunc(svrcls.address_family, addr)
148 if verbose: print("waiting for server")
149 server.shutdown()
150 t.join()
151 if verbose: print("done")
153 def stream_examine(self, proto, addr):
154 s = socket.socket(proto, socket.SOCK_STREAM)
155 s.connect(addr)
156 s.sendall(TEST_STR)
157 buf = data = receive(s, 100)
158 while data and b'\n' not in buf:
159 data = receive(s, 100)
160 buf += data
161 self.assertEquals(buf, TEST_STR)
162 s.close()
164 def dgram_examine(self, proto, addr):
165 s = socket.socket(proto, socket.SOCK_DGRAM)
166 s.sendto(TEST_STR, addr)
167 buf = data = receive(s, 100)
168 while data and b'\n' not in buf:
169 data = receive(s, 100)
170 buf += data
171 self.assertEquals(buf, TEST_STR)
172 s.close()
174 def test_TCPServer(self):
175 self.run_server(socketserver.TCPServer,
176 socketserver.StreamRequestHandler,
177 self.stream_examine)
179 def test_ThreadingTCPServer(self):
180 self.run_server(socketserver.ThreadingTCPServer,
181 socketserver.StreamRequestHandler,
182 self.stream_examine)
184 if HAVE_FORKING:
185 def test_ForkingTCPServer(self):
186 with simple_subprocess(self):
187 self.run_server(socketserver.ForkingTCPServer,
188 socketserver.StreamRequestHandler,
189 self.stream_examine)
191 if HAVE_UNIX_SOCKETS:
192 def test_UnixStreamServer(self):
193 self.run_server(socketserver.UnixStreamServer,
194 socketserver.StreamRequestHandler,
195 self.stream_examine)
197 def test_ThreadingUnixStreamServer(self):
198 self.run_server(socketserver.ThreadingUnixStreamServer,
199 socketserver.StreamRequestHandler,
200 self.stream_examine)
202 if HAVE_FORKING:
203 def test_ForkingUnixStreamServer(self):
204 with simple_subprocess(self):
205 self.run_server(ForkingUnixStreamServer,
206 socketserver.StreamRequestHandler,
207 self.stream_examine)
209 def test_UDPServer(self):
210 self.run_server(socketserver.UDPServer,
211 socketserver.DatagramRequestHandler,
212 self.dgram_examine)
214 def test_ThreadingUDPServer(self):
215 self.run_server(socketserver.ThreadingUDPServer,
216 socketserver.DatagramRequestHandler,
217 self.dgram_examine)
219 if HAVE_FORKING:
220 def test_ForkingUDPServer(self):
221 with simple_subprocess(self):
222 self.run_server(socketserver.ForkingUDPServer,
223 socketserver.DatagramRequestHandler,
224 self.dgram_examine)
226 # Alas, on Linux (at least) recvfrom() doesn't return a meaningful
227 # client address so this cannot work:
229 # if HAVE_UNIX_SOCKETS:
230 # def test_UnixDatagramServer(self):
231 # self.run_server(socketserver.UnixDatagramServer,
232 # socketserver.DatagramRequestHandler,
233 # self.dgram_examine)
235 # def test_ThreadingUnixDatagramServer(self):
236 # self.run_server(socketserver.ThreadingUnixDatagramServer,
237 # socketserver.DatagramRequestHandler,
238 # self.dgram_examine)
240 # if HAVE_FORKING:
241 # def test_ForkingUnixDatagramServer(self):
242 # self.run_server(socketserver.ForkingUnixDatagramServer,
243 # socketserver.DatagramRequestHandler,
244 # self.dgram_examine)
247 def test_main():
248 if imp.lock_held():
249 # If the import lock is held, the threads will hang
250 raise unittest.SkipTest("can't run when import lock is held")
252 test.support.run_unittest(SocketServerTest)
254 if __name__ == "__main__":
255 test_main()
256 signal_alarm(3) # Shutdown shouldn't take more than 3 seconds.