Issue #6615: logging: Used weak references in internal handler list. Thanks to flox...
[python.git] / Lib / test / test_socket.py
blob472f4035f1271ebfcc70aa2397a703f3a509705a
1 #!/usr/bin/env python
3 import unittest
4 from test import test_support
6 import errno
7 import socket
8 import select
9 import thread, threading
10 import time
11 import traceback
12 import Queue
13 import sys
14 import os
15 import array
16 from weakref import proxy
17 import signal
19 HOST = test_support.HOST
20 MSG = 'Michael Gilfix was here\n'
22 class SocketTCPTest(unittest.TestCase):
24 def setUp(self):
25 self.serv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
26 self.port = test_support.bind_port(self.serv)
27 self.serv.listen(1)
29 def tearDown(self):
30 self.serv.close()
31 self.serv = None
33 class SocketUDPTest(unittest.TestCase):
35 def setUp(self):
36 self.serv = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
37 self.port = test_support.bind_port(self.serv)
39 def tearDown(self):
40 self.serv.close()
41 self.serv = None
43 class ThreadableTest:
44 """Threadable Test class
46 The ThreadableTest class makes it easy to create a threaded
47 client/server pair from an existing unit test. To create a
48 new threaded class from an existing unit test, use multiple
49 inheritance:
51 class NewClass (OldClass, ThreadableTest):
52 pass
54 This class defines two new fixture functions with obvious
55 purposes for overriding:
57 clientSetUp ()
58 clientTearDown ()
60 Any new test functions within the class must then define
61 tests in pairs, where the test name is preceeded with a
62 '_' to indicate the client portion of the test. Ex:
64 def testFoo(self):
65 # Server portion
67 def _testFoo(self):
68 # Client portion
70 Any exceptions raised by the clients during their tests
71 are caught and transferred to the main thread to alert
72 the testing framework.
74 Note, the server setup function cannot call any blocking
75 functions that rely on the client thread during setup,
76 unless serverExplicitReady() is called just before
77 the blocking call (such as in setting up a client/server
78 connection and performing the accept() in setUp().
79 """
81 def __init__(self):
82 # Swap the true setup function
83 self.__setUp = self.setUp
84 self.__tearDown = self.tearDown
85 self.setUp = self._setUp
86 self.tearDown = self._tearDown
88 def serverExplicitReady(self):
89 """This method allows the server to explicitly indicate that
90 it wants the client thread to proceed. This is useful if the
91 server is about to execute a blocking routine that is
92 dependent upon the client thread during its setup routine."""
93 self.server_ready.set()
95 def _setUp(self):
96 self.server_ready = threading.Event()
97 self.client_ready = threading.Event()
98 self.done = threading.Event()
99 self.queue = Queue.Queue(1)
101 # Do some munging to start the client test.
102 methodname = self.id()
103 i = methodname.rfind('.')
104 methodname = methodname[i+1:]
105 test_method = getattr(self, '_' + methodname)
106 self.client_thread = thread.start_new_thread(
107 self.clientRun, (test_method,))
109 self.__setUp()
110 if not self.server_ready.is_set():
111 self.server_ready.set()
112 self.client_ready.wait()
114 def _tearDown(self):
115 self.__tearDown()
116 self.done.wait()
118 if not self.queue.empty():
119 msg = self.queue.get()
120 self.fail(msg)
122 def clientRun(self, test_func):
123 self.server_ready.wait()
124 self.client_ready.set()
125 self.clientSetUp()
126 if not callable(test_func):
127 raise TypeError, "test_func must be a callable function"
128 try:
129 test_func()
130 except Exception, strerror:
131 self.queue.put(strerror)
132 self.clientTearDown()
134 def clientSetUp(self):
135 raise NotImplementedError, "clientSetUp must be implemented."
137 def clientTearDown(self):
138 self.done.set()
139 thread.exit()
141 class ThreadedTCPSocketTest(SocketTCPTest, ThreadableTest):
143 def __init__(self, methodName='runTest'):
144 SocketTCPTest.__init__(self, methodName=methodName)
145 ThreadableTest.__init__(self)
147 def clientSetUp(self):
148 self.cli = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
150 def clientTearDown(self):
151 self.cli.close()
152 self.cli = None
153 ThreadableTest.clientTearDown(self)
155 class ThreadedUDPSocketTest(SocketUDPTest, ThreadableTest):
157 def __init__(self, methodName='runTest'):
158 SocketUDPTest.__init__(self, methodName=methodName)
159 ThreadableTest.__init__(self)
161 def clientSetUp(self):
162 self.cli = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
164 class SocketConnectedTest(ThreadedTCPSocketTest):
166 def __init__(self, methodName='runTest'):
167 ThreadedTCPSocketTest.__init__(self, methodName=methodName)
169 def setUp(self):
170 ThreadedTCPSocketTest.setUp(self)
171 # Indicate explicitly we're ready for the client thread to
172 # proceed and then perform the blocking call to accept
173 self.serverExplicitReady()
174 conn, addr = self.serv.accept()
175 self.cli_conn = conn
177 def tearDown(self):
178 self.cli_conn.close()
179 self.cli_conn = None
180 ThreadedTCPSocketTest.tearDown(self)
182 def clientSetUp(self):
183 ThreadedTCPSocketTest.clientSetUp(self)
184 self.cli.connect((HOST, self.port))
185 self.serv_conn = self.cli
187 def clientTearDown(self):
188 self.serv_conn.close()
189 self.serv_conn = None
190 ThreadedTCPSocketTest.clientTearDown(self)
192 class SocketPairTest(unittest.TestCase, ThreadableTest):
194 def __init__(self, methodName='runTest'):
195 unittest.TestCase.__init__(self, methodName=methodName)
196 ThreadableTest.__init__(self)
198 def setUp(self):
199 self.serv, self.cli = socket.socketpair()
201 def tearDown(self):
202 self.serv.close()
203 self.serv = None
205 def clientSetUp(self):
206 pass
208 def clientTearDown(self):
209 self.cli.close()
210 self.cli = None
211 ThreadableTest.clientTearDown(self)
214 #######################################################################
215 ## Begin Tests
217 class GeneralModuleTests(unittest.TestCase):
219 def test_weakref(self):
220 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
221 p = proxy(s)
222 self.assertEqual(p.fileno(), s.fileno())
223 s.close()
224 s = None
225 try:
226 p.fileno()
227 except ReferenceError:
228 pass
229 else:
230 self.fail('Socket proxy still exists')
232 def testSocketError(self):
233 # Testing socket module exceptions
234 def raise_error(*args, **kwargs):
235 raise socket.error
236 def raise_herror(*args, **kwargs):
237 raise socket.herror
238 def raise_gaierror(*args, **kwargs):
239 raise socket.gaierror
240 self.assertRaises(socket.error, raise_error,
241 "Error raising socket exception.")
242 self.assertRaises(socket.error, raise_herror,
243 "Error raising socket exception.")
244 self.assertRaises(socket.error, raise_gaierror,
245 "Error raising socket exception.")
247 def testCrucialConstants(self):
248 # Testing for mission critical constants
249 socket.AF_INET
250 socket.SOCK_STREAM
251 socket.SOCK_DGRAM
252 socket.SOCK_RAW
253 socket.SOCK_RDM
254 socket.SOCK_SEQPACKET
255 socket.SOL_SOCKET
256 socket.SO_REUSEADDR
258 def testHostnameRes(self):
259 # Testing hostname resolution mechanisms
260 hostname = socket.gethostname()
261 try:
262 ip = socket.gethostbyname(hostname)
263 except socket.error:
264 # Probably name lookup wasn't set up right; skip this test
265 return
266 self.assertTrue(ip.find('.') >= 0, "Error resolving host to ip.")
267 try:
268 hname, aliases, ipaddrs = socket.gethostbyaddr(ip)
269 except socket.error:
270 # Probably a similar problem as above; skip this test
271 return
272 all_host_names = [hostname, hname] + aliases
273 fqhn = socket.getfqdn(ip)
274 if not fqhn in all_host_names:
275 self.fail("Error testing host resolution mechanisms. (fqdn: %s, all: %s)" % (fqhn, repr(all_host_names)))
277 def testRefCountGetNameInfo(self):
278 # Testing reference count for getnameinfo
279 if hasattr(sys, "getrefcount"):
280 try:
281 # On some versions, this loses a reference
282 orig = sys.getrefcount(__name__)
283 socket.getnameinfo(__name__,0)
284 except TypeError:
285 if sys.getrefcount(__name__) <> orig:
286 self.fail("socket.getnameinfo loses a reference")
288 def testInterpreterCrash(self):
289 # Making sure getnameinfo doesn't crash the interpreter
290 try:
291 # On some versions, this crashes the interpreter.
292 socket.getnameinfo(('x', 0, 0, 0), 0)
293 except socket.error:
294 pass
296 def testNtoH(self):
297 # This just checks that htons etc. are their own inverse,
298 # when looking at the lower 16 or 32 bits.
299 sizes = {socket.htonl: 32, socket.ntohl: 32,
300 socket.htons: 16, socket.ntohs: 16}
301 for func, size in sizes.items():
302 mask = (1L<<size) - 1
303 for i in (0, 1, 0xffff, ~0xffff, 2, 0x01234567, 0x76543210):
304 self.assertEqual(i & mask, func(func(i&mask)) & mask)
306 swapped = func(mask)
307 self.assertEqual(swapped & mask, mask)
308 self.assertRaises(OverflowError, func, 1L<<34)
310 def testNtoHErrors(self):
311 good_values = [ 1, 2, 3, 1L, 2L, 3L ]
312 bad_values = [ -1, -2, -3, -1L, -2L, -3L ]
313 for k in good_values:
314 socket.ntohl(k)
315 socket.ntohs(k)
316 socket.htonl(k)
317 socket.htons(k)
318 for k in bad_values:
319 self.assertRaises(OverflowError, socket.ntohl, k)
320 self.assertRaises(OverflowError, socket.ntohs, k)
321 self.assertRaises(OverflowError, socket.htonl, k)
322 self.assertRaises(OverflowError, socket.htons, k)
324 def testGetServBy(self):
325 eq = self.assertEqual
326 # Find one service that exists, then check all the related interfaces.
327 # I've ordered this by protocols that have both a tcp and udp
328 # protocol, at least for modern Linuxes.
329 if sys.platform in ('linux2', 'freebsd4', 'freebsd5', 'freebsd6',
330 'freebsd7', 'freebsd8', 'darwin'):
331 # avoid the 'echo' service on this platform, as there is an
332 # assumption breaking non-standard port/protocol entry
333 services = ('daytime', 'qotd', 'domain')
334 else:
335 services = ('echo', 'daytime', 'domain')
336 for service in services:
337 try:
338 port = socket.getservbyname(service, 'tcp')
339 break
340 except socket.error:
341 pass
342 else:
343 raise socket.error
344 # Try same call with optional protocol omitted
345 port2 = socket.getservbyname(service)
346 eq(port, port2)
347 # Try udp, but don't barf it it doesn't exist
348 try:
349 udpport = socket.getservbyname(service, 'udp')
350 except socket.error:
351 udpport = None
352 else:
353 eq(udpport, port)
354 # Now make sure the lookup by port returns the same service name
355 eq(socket.getservbyport(port2), service)
356 eq(socket.getservbyport(port, 'tcp'), service)
357 if udpport is not None:
358 eq(socket.getservbyport(udpport, 'udp'), service)
359 # Make sure getservbyport does not accept out of range ports.
360 self.assertRaises(OverflowError, socket.getservbyport, -1)
361 self.assertRaises(OverflowError, socket.getservbyport, 65536)
363 def testDefaultTimeout(self):
364 # Testing default timeout
365 # The default timeout should initially be None
366 self.assertEqual(socket.getdefaulttimeout(), None)
367 s = socket.socket()
368 self.assertEqual(s.gettimeout(), None)
369 s.close()
371 # Set the default timeout to 10, and see if it propagates
372 socket.setdefaulttimeout(10)
373 self.assertEqual(socket.getdefaulttimeout(), 10)
374 s = socket.socket()
375 self.assertEqual(s.gettimeout(), 10)
376 s.close()
378 # Reset the default timeout to None, and see if it propagates
379 socket.setdefaulttimeout(None)
380 self.assertEqual(socket.getdefaulttimeout(), None)
381 s = socket.socket()
382 self.assertEqual(s.gettimeout(), None)
383 s.close()
385 # Check that setting it to an invalid value raises ValueError
386 self.assertRaises(ValueError, socket.setdefaulttimeout, -1)
388 # Check that setting it to an invalid type raises TypeError
389 self.assertRaises(TypeError, socket.setdefaulttimeout, "spam")
391 def testIPv4_inet_aton_fourbytes(self):
392 if not hasattr(socket, 'inet_aton'):
393 return # No inet_aton, nothing to check
394 # Test that issue1008086 and issue767150 are fixed.
395 # It must return 4 bytes.
396 self.assertEquals('\x00'*4, socket.inet_aton('0.0.0.0'))
397 self.assertEquals('\xff'*4, socket.inet_aton('255.255.255.255'))
399 def testIPv4toString(self):
400 if not hasattr(socket, 'inet_pton'):
401 return # No inet_pton() on this platform
402 from socket import inet_aton as f, inet_pton, AF_INET
403 g = lambda a: inet_pton(AF_INET, a)
405 self.assertEquals('\x00\x00\x00\x00', f('0.0.0.0'))
406 self.assertEquals('\xff\x00\xff\x00', f('255.0.255.0'))
407 self.assertEquals('\xaa\xaa\xaa\xaa', f('170.170.170.170'))
408 self.assertEquals('\x01\x02\x03\x04', f('1.2.3.4'))
409 self.assertEquals('\xff\xff\xff\xff', f('255.255.255.255'))
411 self.assertEquals('\x00\x00\x00\x00', g('0.0.0.0'))
412 self.assertEquals('\xff\x00\xff\x00', g('255.0.255.0'))
413 self.assertEquals('\xaa\xaa\xaa\xaa', g('170.170.170.170'))
414 self.assertEquals('\xff\xff\xff\xff', g('255.255.255.255'))
416 def testIPv6toString(self):
417 if not hasattr(socket, 'inet_pton'):
418 return # No inet_pton() on this platform
419 try:
420 from socket import inet_pton, AF_INET6, has_ipv6
421 if not has_ipv6:
422 return
423 except ImportError:
424 return
425 f = lambda a: inet_pton(AF_INET6, a)
427 self.assertEquals('\x00' * 16, f('::'))
428 self.assertEquals('\x00' * 16, f('0::0'))
429 self.assertEquals('\x00\x01' + '\x00' * 14, f('1::'))
430 self.assertEquals(
431 '\x45\xef\x76\xcb\x00\x1a\x56\xef\xaf\xeb\x0b\xac\x19\x24\xae\xae',
432 f('45ef:76cb:1a:56ef:afeb:bac:1924:aeae')
435 def testStringToIPv4(self):
436 if not hasattr(socket, 'inet_ntop'):
437 return # No inet_ntop() on this platform
438 from socket import inet_ntoa as f, inet_ntop, AF_INET
439 g = lambda a: inet_ntop(AF_INET, a)
441 self.assertEquals('1.0.1.0', f('\x01\x00\x01\x00'))
442 self.assertEquals('170.85.170.85', f('\xaa\x55\xaa\x55'))
443 self.assertEquals('255.255.255.255', f('\xff\xff\xff\xff'))
444 self.assertEquals('1.2.3.4', f('\x01\x02\x03\x04'))
446 self.assertEquals('1.0.1.0', g('\x01\x00\x01\x00'))
447 self.assertEquals('170.85.170.85', g('\xaa\x55\xaa\x55'))
448 self.assertEquals('255.255.255.255', g('\xff\xff\xff\xff'))
450 def testStringToIPv6(self):
451 if not hasattr(socket, 'inet_ntop'):
452 return # No inet_ntop() on this platform
453 try:
454 from socket import inet_ntop, AF_INET6, has_ipv6
455 if not has_ipv6:
456 return
457 except ImportError:
458 return
459 f = lambda a: inet_ntop(AF_INET6, a)
461 self.assertEquals('::', f('\x00' * 16))
462 self.assertEquals('::1', f('\x00' * 15 + '\x01'))
463 self.assertEquals(
464 'aef:b01:506:1001:ffff:9997:55:170',
465 f('\x0a\xef\x0b\x01\x05\x06\x10\x01\xff\xff\x99\x97\x00\x55\x01\x70')
468 # XXX The following don't test module-level functionality...
470 def _get_unused_port(self, bind_address='0.0.0.0'):
471 """Use a temporary socket to elicit an unused ephemeral port.
473 Args:
474 bind_address: Hostname or IP address to search for a port on.
476 Returns: A most likely to be unused port.
478 tempsock = socket.socket()
479 tempsock.bind((bind_address, 0))
480 host, port = tempsock.getsockname()
481 tempsock.close()
482 return port
484 def testSockName(self):
485 # Testing getsockname()
486 port = self._get_unused_port()
487 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
488 sock.bind(("0.0.0.0", port))
489 name = sock.getsockname()
490 # XXX(nnorwitz): http://tinyurl.com/os5jz seems to indicate
491 # it reasonable to get the host's addr in addition to 0.0.0.0.
492 # At least for eCos. This is required for the S/390 to pass.
493 my_ip_addr = socket.gethostbyname(socket.gethostname())
494 self.assertTrue(name[0] in ("0.0.0.0", my_ip_addr), '%s invalid' % name[0])
495 self.assertEqual(name[1], port)
497 def testGetSockOpt(self):
498 # Testing getsockopt()
499 # We know a socket should start without reuse==0
500 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
501 reuse = sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR)
502 self.assertFalse(reuse != 0, "initial mode is reuse")
504 def testSetSockOpt(self):
505 # Testing setsockopt()
506 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
507 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
508 reuse = sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR)
509 self.assertFalse(reuse == 0, "failed to set reuse mode")
511 def testSendAfterClose(self):
512 # testing send() after close() with timeout
513 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
514 sock.settimeout(1)
515 sock.close()
516 self.assertRaises(socket.error, sock.send, "spam")
518 def testNewAttributes(self):
519 # testing .family, .type and .protocol
520 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
521 self.assertEqual(sock.family, socket.AF_INET)
522 self.assertEqual(sock.type, socket.SOCK_STREAM)
523 self.assertEqual(sock.proto, 0)
524 sock.close()
526 def test_getsockaddrarg(self):
527 host = '0.0.0.0'
528 port = self._get_unused_port(bind_address=host)
529 big_port = port + 65536
530 neg_port = port - 65536
531 sock = socket.socket()
532 try:
533 self.assertRaises(OverflowError, sock.bind, (host, big_port))
534 self.assertRaises(OverflowError, sock.bind, (host, neg_port))
535 sock.bind((host, port))
536 finally:
537 sock.close()
539 def test_sock_ioctl(self):
540 if os.name != "nt":
541 return
542 self.assertTrue(hasattr(socket.socket, 'ioctl'))
543 self.assertTrue(hasattr(socket, 'SIO_RCVALL'))
544 self.assertTrue(hasattr(socket, 'RCVALL_ON'))
545 self.assertTrue(hasattr(socket, 'RCVALL_OFF'))
546 self.assertTrue(hasattr(socket, 'SIO_KEEPALIVE_VALS'))
547 s = socket.socket()
548 self.assertRaises(ValueError, s.ioctl, -1, None)
549 s.ioctl(socket.SIO_KEEPALIVE_VALS, (1, 100, 100))
552 class BasicTCPTest(SocketConnectedTest):
554 def __init__(self, methodName='runTest'):
555 SocketConnectedTest.__init__(self, methodName=methodName)
557 def testRecv(self):
558 # Testing large receive over TCP
559 msg = self.cli_conn.recv(1024)
560 self.assertEqual(msg, MSG)
562 def _testRecv(self):
563 self.serv_conn.send(MSG)
565 def testOverFlowRecv(self):
566 # Testing receive in chunks over TCP
567 seg1 = self.cli_conn.recv(len(MSG) - 3)
568 seg2 = self.cli_conn.recv(1024)
569 msg = seg1 + seg2
570 self.assertEqual(msg, MSG)
572 def _testOverFlowRecv(self):
573 self.serv_conn.send(MSG)
575 def testRecvFrom(self):
576 # Testing large recvfrom() over TCP
577 msg, addr = self.cli_conn.recvfrom(1024)
578 self.assertEqual(msg, MSG)
580 def _testRecvFrom(self):
581 self.serv_conn.send(MSG)
583 def testOverFlowRecvFrom(self):
584 # Testing recvfrom() in chunks over TCP
585 seg1, addr = self.cli_conn.recvfrom(len(MSG)-3)
586 seg2, addr = self.cli_conn.recvfrom(1024)
587 msg = seg1 + seg2
588 self.assertEqual(msg, MSG)
590 def _testOverFlowRecvFrom(self):
591 self.serv_conn.send(MSG)
593 def testSendAll(self):
594 # Testing sendall() with a 2048 byte string over TCP
595 msg = ''
596 while 1:
597 read = self.cli_conn.recv(1024)
598 if not read:
599 break
600 msg += read
601 self.assertEqual(msg, 'f' * 2048)
603 def _testSendAll(self):
604 big_chunk = 'f' * 2048
605 self.serv_conn.sendall(big_chunk)
607 def testFromFd(self):
608 # Testing fromfd()
609 if not hasattr(socket, "fromfd"):
610 return # On Windows, this doesn't exist
611 fd = self.cli_conn.fileno()
612 sock = socket.fromfd(fd, socket.AF_INET, socket.SOCK_STREAM)
613 msg = sock.recv(1024)
614 self.assertEqual(msg, MSG)
616 def _testFromFd(self):
617 self.serv_conn.send(MSG)
619 def testShutdown(self):
620 # Testing shutdown()
621 msg = self.cli_conn.recv(1024)
622 self.assertEqual(msg, MSG)
623 # wait for _testShutdown to finish: on OS X, when the server
624 # closes the connection the client also becomes disconnected,
625 # and the client's shutdown call will fail. (Issue #4397.)
626 self.done.wait()
628 def _testShutdown(self):
629 self.serv_conn.send(MSG)
630 self.serv_conn.shutdown(2)
632 class BasicUDPTest(ThreadedUDPSocketTest):
634 def __init__(self, methodName='runTest'):
635 ThreadedUDPSocketTest.__init__(self, methodName=methodName)
637 def testSendtoAndRecv(self):
638 # Testing sendto() and Recv() over UDP
639 msg = self.serv.recv(len(MSG))
640 self.assertEqual(msg, MSG)
642 def _testSendtoAndRecv(self):
643 self.cli.sendto(MSG, 0, (HOST, self.port))
645 def testRecvFrom(self):
646 # Testing recvfrom() over UDP
647 msg, addr = self.serv.recvfrom(len(MSG))
648 self.assertEqual(msg, MSG)
650 def _testRecvFrom(self):
651 self.cli.sendto(MSG, 0, (HOST, self.port))
653 def testRecvFromNegative(self):
654 # Negative lengths passed to recvfrom should give ValueError.
655 self.assertRaises(ValueError, self.serv.recvfrom, -1)
657 def _testRecvFromNegative(self):
658 self.cli.sendto(MSG, 0, (HOST, self.port))
660 class TCPCloserTest(ThreadedTCPSocketTest):
662 def testClose(self):
663 conn, addr = self.serv.accept()
664 conn.close()
666 sd = self.cli
667 read, write, err = select.select([sd], [], [], 1.0)
668 self.assertEqual(read, [sd])
669 self.assertEqual(sd.recv(1), '')
671 def _testClose(self):
672 self.cli.connect((HOST, self.port))
673 time.sleep(1.0)
675 class BasicSocketPairTest(SocketPairTest):
677 def __init__(self, methodName='runTest'):
678 SocketPairTest.__init__(self, methodName=methodName)
680 def testRecv(self):
681 msg = self.serv.recv(1024)
682 self.assertEqual(msg, MSG)
684 def _testRecv(self):
685 self.cli.send(MSG)
687 def testSend(self):
688 self.serv.send(MSG)
690 def _testSend(self):
691 msg = self.cli.recv(1024)
692 self.assertEqual(msg, MSG)
694 class NonBlockingTCPTests(ThreadedTCPSocketTest):
696 def __init__(self, methodName='runTest'):
697 ThreadedTCPSocketTest.__init__(self, methodName=methodName)
699 def testSetBlocking(self):
700 # Testing whether set blocking works
701 self.serv.setblocking(0)
702 start = time.time()
703 try:
704 self.serv.accept()
705 except socket.error:
706 pass
707 end = time.time()
708 self.assertTrue((end - start) < 1.0, "Error setting non-blocking mode.")
710 def _testSetBlocking(self):
711 pass
713 def testAccept(self):
714 # Testing non-blocking accept
715 self.serv.setblocking(0)
716 try:
717 conn, addr = self.serv.accept()
718 except socket.error:
719 pass
720 else:
721 self.fail("Error trying to do non-blocking accept.")
722 read, write, err = select.select([self.serv], [], [])
723 if self.serv in read:
724 conn, addr = self.serv.accept()
725 else:
726 self.fail("Error trying to do accept after select.")
728 def _testAccept(self):
729 time.sleep(0.1)
730 self.cli.connect((HOST, self.port))
732 def testConnect(self):
733 # Testing non-blocking connect
734 conn, addr = self.serv.accept()
736 def _testConnect(self):
737 self.cli.settimeout(10)
738 self.cli.connect((HOST, self.port))
740 def testRecv(self):
741 # Testing non-blocking recv
742 conn, addr = self.serv.accept()
743 conn.setblocking(0)
744 try:
745 msg = conn.recv(len(MSG))
746 except socket.error:
747 pass
748 else:
749 self.fail("Error trying to do non-blocking recv.")
750 read, write, err = select.select([conn], [], [])
751 if conn in read:
752 msg = conn.recv(len(MSG))
753 self.assertEqual(msg, MSG)
754 else:
755 self.fail("Error during select call to non-blocking socket.")
757 def _testRecv(self):
758 self.cli.connect((HOST, self.port))
759 time.sleep(0.1)
760 self.cli.send(MSG)
762 class FileObjectClassTestCase(SocketConnectedTest):
764 bufsize = -1 # Use default buffer size
766 def __init__(self, methodName='runTest'):
767 SocketConnectedTest.__init__(self, methodName=methodName)
769 def setUp(self):
770 SocketConnectedTest.setUp(self)
771 self.serv_file = self.cli_conn.makefile('rb', self.bufsize)
773 def tearDown(self):
774 self.serv_file.close()
775 self.assertTrue(self.serv_file.closed)
776 self.serv_file = None
777 SocketConnectedTest.tearDown(self)
779 def clientSetUp(self):
780 SocketConnectedTest.clientSetUp(self)
781 self.cli_file = self.serv_conn.makefile('wb')
783 def clientTearDown(self):
784 self.cli_file.close()
785 self.assertTrue(self.cli_file.closed)
786 self.cli_file = None
787 SocketConnectedTest.clientTearDown(self)
789 def testSmallRead(self):
790 # Performing small file read test
791 first_seg = self.serv_file.read(len(MSG)-3)
792 second_seg = self.serv_file.read(3)
793 msg = first_seg + second_seg
794 self.assertEqual(msg, MSG)
796 def _testSmallRead(self):
797 self.cli_file.write(MSG)
798 self.cli_file.flush()
800 def testFullRead(self):
801 # read until EOF
802 msg = self.serv_file.read()
803 self.assertEqual(msg, MSG)
805 def _testFullRead(self):
806 self.cli_file.write(MSG)
807 self.cli_file.close()
809 def testUnbufferedRead(self):
810 # Performing unbuffered file read test
811 buf = ''
812 while 1:
813 char = self.serv_file.read(1)
814 if not char:
815 break
816 buf += char
817 self.assertEqual(buf, MSG)
819 def _testUnbufferedRead(self):
820 self.cli_file.write(MSG)
821 self.cli_file.flush()
823 def testReadline(self):
824 # Performing file readline test
825 line = self.serv_file.readline()
826 self.assertEqual(line, MSG)
828 def _testReadline(self):
829 self.cli_file.write(MSG)
830 self.cli_file.flush()
832 def testReadlineAfterRead(self):
833 a_baloo_is = self.serv_file.read(len("A baloo is"))
834 self.assertEqual("A baloo is", a_baloo_is)
835 _a_bear = self.serv_file.read(len(" a bear"))
836 self.assertEqual(" a bear", _a_bear)
837 line = self.serv_file.readline()
838 self.assertEqual("\n", line)
839 line = self.serv_file.readline()
840 self.assertEqual("A BALOO IS A BEAR.\n", line)
841 line = self.serv_file.readline()
842 self.assertEqual(MSG, line)
844 def _testReadlineAfterRead(self):
845 self.cli_file.write("A baloo is a bear\n")
846 self.cli_file.write("A BALOO IS A BEAR.\n")
847 self.cli_file.write(MSG)
848 self.cli_file.flush()
850 def testReadlineAfterReadNoNewline(self):
851 end_of_ = self.serv_file.read(len("End Of "))
852 self.assertEqual("End Of ", end_of_)
853 line = self.serv_file.readline()
854 self.assertEqual("Line", line)
856 def _testReadlineAfterReadNoNewline(self):
857 self.cli_file.write("End Of Line")
859 def testClosedAttr(self):
860 self.assertTrue(not self.serv_file.closed)
862 def _testClosedAttr(self):
863 self.assertTrue(not self.cli_file.closed)
866 class FileObjectInterruptedTestCase(unittest.TestCase):
867 """Test that the file object correctly handles EINTR internally."""
869 class MockSocket(object):
870 def __init__(self, recv_funcs=()):
871 # A generator that returns callables that we'll call for each
872 # call to recv().
873 self._recv_step = iter(recv_funcs)
875 def recv(self, size):
876 return self._recv_step.next()()
878 @staticmethod
879 def _raise_eintr():
880 raise socket.error(errno.EINTR)
882 def _test_readline(self, size=-1, **kwargs):
883 mock_sock = self.MockSocket(recv_funcs=[
884 lambda : "This is the first line\nAnd the sec",
885 self._raise_eintr,
886 lambda : "ond line is here\n",
887 lambda : "",
889 fo = socket._fileobject(mock_sock, **kwargs)
890 self.assertEquals(fo.readline(size), "This is the first line\n")
891 self.assertEquals(fo.readline(size), "And the second line is here\n")
893 def _test_read(self, size=-1, **kwargs):
894 mock_sock = self.MockSocket(recv_funcs=[
895 lambda : "This is the first line\nAnd the sec",
896 self._raise_eintr,
897 lambda : "ond line is here\n",
898 lambda : "",
900 fo = socket._fileobject(mock_sock, **kwargs)
901 self.assertEquals(fo.read(size), "This is the first line\n"
902 "And the second line is here\n")
904 def test_default(self):
905 self._test_readline()
906 self._test_readline(size=100)
907 self._test_read()
908 self._test_read(size=100)
910 def test_with_1k_buffer(self):
911 self._test_readline(bufsize=1024)
912 self._test_readline(size=100, bufsize=1024)
913 self._test_read(bufsize=1024)
914 self._test_read(size=100, bufsize=1024)
916 def _test_readline_no_buffer(self, size=-1):
917 mock_sock = self.MockSocket(recv_funcs=[
918 lambda : "aa",
919 lambda : "\n",
920 lambda : "BB",
921 self._raise_eintr,
922 lambda : "bb",
923 lambda : "",
925 fo = socket._fileobject(mock_sock, bufsize=0)
926 self.assertEquals(fo.readline(size), "aa\n")
927 self.assertEquals(fo.readline(size), "BBbb")
929 def test_no_buffer(self):
930 self._test_readline_no_buffer()
931 self._test_readline_no_buffer(size=4)
932 self._test_read(bufsize=0)
933 self._test_read(size=100, bufsize=0)
936 class UnbufferedFileObjectClassTestCase(FileObjectClassTestCase):
938 """Repeat the tests from FileObjectClassTestCase with bufsize==0.
940 In this case (and in this case only), it should be possible to
941 create a file object, read a line from it, create another file
942 object, read another line from it, without loss of data in the
943 first file object's buffer. Note that httplib relies on this
944 when reading multiple requests from the same socket."""
946 bufsize = 0 # Use unbuffered mode
948 def testUnbufferedReadline(self):
949 # Read a line, create a new file object, read another line with it
950 line = self.serv_file.readline() # first line
951 self.assertEqual(line, "A. " + MSG) # first line
952 self.serv_file = self.cli_conn.makefile('rb', 0)
953 line = self.serv_file.readline() # second line
954 self.assertEqual(line, "B. " + MSG) # second line
956 def _testUnbufferedReadline(self):
957 self.cli_file.write("A. " + MSG)
958 self.cli_file.write("B. " + MSG)
959 self.cli_file.flush()
961 class LineBufferedFileObjectClassTestCase(FileObjectClassTestCase):
963 bufsize = 1 # Default-buffered for reading; line-buffered for writing
966 class SmallBufferedFileObjectClassTestCase(FileObjectClassTestCase):
968 bufsize = 2 # Exercise the buffering code
971 class NetworkConnectionTest(object):
972 """Prove network connection."""
973 def clientSetUp(self):
974 # We're inherited below by BasicTCPTest2, which also inherits
975 # BasicTCPTest, which defines self.port referenced below.
976 self.cli = socket.create_connection((HOST, self.port))
977 self.serv_conn = self.cli
979 class BasicTCPTest2(NetworkConnectionTest, BasicTCPTest):
980 """Tests that NetworkConnection does not break existing TCP functionality.
983 class NetworkConnectionNoServer(unittest.TestCase):
984 def testWithoutServer(self):
985 port = test_support.find_unused_port()
986 self.assertRaises(
987 socket.error,
988 lambda: socket.create_connection((HOST, port))
991 class NetworkConnectionAttributesTest(SocketTCPTest, ThreadableTest):
993 def __init__(self, methodName='runTest'):
994 SocketTCPTest.__init__(self, methodName=methodName)
995 ThreadableTest.__init__(self)
997 def clientSetUp(self):
998 pass
1000 def clientTearDown(self):
1001 self.cli.close()
1002 self.cli = None
1003 ThreadableTest.clientTearDown(self)
1005 def _justAccept(self):
1006 conn, addr = self.serv.accept()
1008 testFamily = _justAccept
1009 def _testFamily(self):
1010 self.cli = socket.create_connection((HOST, self.port), timeout=30)
1011 self.assertEqual(self.cli.family, 2)
1013 testTimeoutDefault = _justAccept
1014 def _testTimeoutDefault(self):
1015 # passing no explicit timeout uses socket's global default
1016 self.assertTrue(socket.getdefaulttimeout() is None)
1017 socket.setdefaulttimeout(42)
1018 try:
1019 self.cli = socket.create_connection((HOST, self.port))
1020 finally:
1021 socket.setdefaulttimeout(None)
1022 self.assertEquals(self.cli.gettimeout(), 42)
1024 testTimeoutNone = _justAccept
1025 def _testTimeoutNone(self):
1026 # None timeout means the same as sock.settimeout(None)
1027 self.assertTrue(socket.getdefaulttimeout() is None)
1028 socket.setdefaulttimeout(30)
1029 try:
1030 self.cli = socket.create_connection((HOST, self.port), timeout=None)
1031 finally:
1032 socket.setdefaulttimeout(None)
1033 self.assertEqual(self.cli.gettimeout(), None)
1035 testTimeoutValueNamed = _justAccept
1036 def _testTimeoutValueNamed(self):
1037 self.cli = socket.create_connection((HOST, self.port), timeout=30)
1038 self.assertEqual(self.cli.gettimeout(), 30)
1040 testTimeoutValueNonamed = _justAccept
1041 def _testTimeoutValueNonamed(self):
1042 self.cli = socket.create_connection((HOST, self.port), 30)
1043 self.assertEqual(self.cli.gettimeout(), 30)
1045 class NetworkConnectionBehaviourTest(SocketTCPTest, ThreadableTest):
1047 def __init__(self, methodName='runTest'):
1048 SocketTCPTest.__init__(self, methodName=methodName)
1049 ThreadableTest.__init__(self)
1051 def clientSetUp(self):
1052 pass
1054 def clientTearDown(self):
1055 self.cli.close()
1056 self.cli = None
1057 ThreadableTest.clientTearDown(self)
1059 def testInsideTimeout(self):
1060 conn, addr = self.serv.accept()
1061 time.sleep(3)
1062 conn.send("done!")
1063 testOutsideTimeout = testInsideTimeout
1065 def _testInsideTimeout(self):
1066 self.cli = sock = socket.create_connection((HOST, self.port))
1067 data = sock.recv(5)
1068 self.assertEqual(data, "done!")
1070 def _testOutsideTimeout(self):
1071 self.cli = sock = socket.create_connection((HOST, self.port), timeout=1)
1072 self.assertRaises(socket.timeout, lambda: sock.recv(5))
1075 class Urllib2FileobjectTest(unittest.TestCase):
1077 # urllib2.HTTPHandler has "borrowed" socket._fileobject, and requires that
1078 # it close the socket if the close c'tor argument is true
1080 def testClose(self):
1081 class MockSocket:
1082 closed = False
1083 def flush(self): pass
1084 def close(self): self.closed = True
1086 # must not close unless we request it: the original use of _fileobject
1087 # by module socket requires that the underlying socket not be closed until
1088 # the _socketobject that created the _fileobject is closed
1089 s = MockSocket()
1090 f = socket._fileobject(s)
1091 f.close()
1092 self.assertTrue(not s.closed)
1094 s = MockSocket()
1095 f = socket._fileobject(s, close=True)
1096 f.close()
1097 self.assertTrue(s.closed)
1099 class TCPTimeoutTest(SocketTCPTest):
1101 def testTCPTimeout(self):
1102 def raise_timeout(*args, **kwargs):
1103 self.serv.settimeout(1.0)
1104 self.serv.accept()
1105 self.assertRaises(socket.timeout, raise_timeout,
1106 "Error generating a timeout exception (TCP)")
1108 def testTimeoutZero(self):
1109 ok = False
1110 try:
1111 self.serv.settimeout(0.0)
1112 foo = self.serv.accept()
1113 except socket.timeout:
1114 self.fail("caught timeout instead of error (TCP)")
1115 except socket.error:
1116 ok = True
1117 except:
1118 self.fail("caught unexpected exception (TCP)")
1119 if not ok:
1120 self.fail("accept() returned success when we did not expect it")
1122 def testInterruptedTimeout(self):
1123 # XXX I don't know how to do this test on MSWindows or any other
1124 # plaform that doesn't support signal.alarm() or os.kill(), though
1125 # the bug should have existed on all platforms.
1126 if not hasattr(signal, "alarm"):
1127 return # can only test on *nix
1128 self.serv.settimeout(5.0) # must be longer than alarm
1129 class Alarm(Exception):
1130 pass
1131 def alarm_handler(signal, frame):
1132 raise Alarm
1133 old_alarm = signal.signal(signal.SIGALRM, alarm_handler)
1134 try:
1135 signal.alarm(2) # POSIX allows alarm to be up to 1 second early
1136 try:
1137 foo = self.serv.accept()
1138 except socket.timeout:
1139 self.fail("caught timeout instead of Alarm")
1140 except Alarm:
1141 pass
1142 except:
1143 self.fail("caught other exception instead of Alarm:"
1144 " %s(%s):\n%s" %
1145 (sys.exc_info()[:2] + (traceback.format_exc(),)))
1146 else:
1147 self.fail("nothing caught")
1148 finally:
1149 signal.alarm(0) # shut off alarm
1150 except Alarm:
1151 self.fail("got Alarm in wrong place")
1152 finally:
1153 # no alarm can be pending. Safe to restore old handler.
1154 signal.signal(signal.SIGALRM, old_alarm)
1156 class UDPTimeoutTest(SocketTCPTest):
1158 def testUDPTimeout(self):
1159 def raise_timeout(*args, **kwargs):
1160 self.serv.settimeout(1.0)
1161 self.serv.recv(1024)
1162 self.assertRaises(socket.timeout, raise_timeout,
1163 "Error generating a timeout exception (UDP)")
1165 def testTimeoutZero(self):
1166 ok = False
1167 try:
1168 self.serv.settimeout(0.0)
1169 foo = self.serv.recv(1024)
1170 except socket.timeout:
1171 self.fail("caught timeout instead of error (UDP)")
1172 except socket.error:
1173 ok = True
1174 except:
1175 self.fail("caught unexpected exception (UDP)")
1176 if not ok:
1177 self.fail("recv() returned success when we did not expect it")
1179 class TestExceptions(unittest.TestCase):
1181 def testExceptionTree(self):
1182 self.assertTrue(issubclass(socket.error, Exception))
1183 self.assertTrue(issubclass(socket.herror, socket.error))
1184 self.assertTrue(issubclass(socket.gaierror, socket.error))
1185 self.assertTrue(issubclass(socket.timeout, socket.error))
1187 class TestLinuxAbstractNamespace(unittest.TestCase):
1189 UNIX_PATH_MAX = 108
1191 def testLinuxAbstractNamespace(self):
1192 address = "\x00python-test-hello\x00\xff"
1193 s1 = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
1194 s1.bind(address)
1195 s1.listen(1)
1196 s2 = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
1197 s2.connect(s1.getsockname())
1198 s1.accept()
1199 self.assertEqual(s1.getsockname(), address)
1200 self.assertEqual(s2.getpeername(), address)
1202 def testMaxName(self):
1203 address = "\x00" + "h" * (self.UNIX_PATH_MAX - 1)
1204 s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
1205 s.bind(address)
1206 self.assertEqual(s.getsockname(), address)
1208 def testNameOverflow(self):
1209 address = "\x00" + "h" * self.UNIX_PATH_MAX
1210 s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
1211 self.assertRaises(socket.error, s.bind, address)
1214 class BufferIOTest(SocketConnectedTest):
1216 Test the buffer versions of socket.recv() and socket.send().
1218 def __init__(self, methodName='runTest'):
1219 SocketConnectedTest.__init__(self, methodName=methodName)
1221 def testRecvInto(self):
1222 buf = array.array('c', ' '*1024)
1223 nbytes = self.cli_conn.recv_into(buf)
1224 self.assertEqual(nbytes, len(MSG))
1225 msg = buf.tostring()[:len(MSG)]
1226 self.assertEqual(msg, MSG)
1228 def _testRecvInto(self):
1229 buf = buffer(MSG)
1230 self.serv_conn.send(buf)
1232 def testRecvFromInto(self):
1233 buf = array.array('c', ' '*1024)
1234 nbytes, addr = self.cli_conn.recvfrom_into(buf)
1235 self.assertEqual(nbytes, len(MSG))
1236 msg = buf.tostring()[:len(MSG)]
1237 self.assertEqual(msg, MSG)
1239 def _testRecvFromInto(self):
1240 buf = buffer(MSG)
1241 self.serv_conn.send(buf)
1244 TIPC_STYPE = 2000
1245 TIPC_LOWER = 200
1246 TIPC_UPPER = 210
1248 def isTipcAvailable():
1249 """Check if the TIPC module is loaded
1251 The TIPC module is not loaded automatically on Ubuntu and probably
1252 other Linux distros.
1254 if not hasattr(socket, "AF_TIPC"):
1255 return False
1256 if not os.path.isfile("/proc/modules"):
1257 return False
1258 with open("/proc/modules") as f:
1259 for line in f:
1260 if line.startswith("tipc "):
1261 return True
1262 if test_support.verbose:
1263 print "TIPC module is not loaded, please 'sudo modprobe tipc'"
1264 return False
1266 class TIPCTest (unittest.TestCase):
1267 def testRDM(self):
1268 srv = socket.socket(socket.AF_TIPC, socket.SOCK_RDM)
1269 cli = socket.socket(socket.AF_TIPC, socket.SOCK_RDM)
1271 srv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
1272 srvaddr = (socket.TIPC_ADDR_NAMESEQ, TIPC_STYPE,
1273 TIPC_LOWER, TIPC_UPPER)
1274 srv.bind(srvaddr)
1276 sendaddr = (socket.TIPC_ADDR_NAME, TIPC_STYPE,
1277 TIPC_LOWER + (TIPC_UPPER - TIPC_LOWER) / 2, 0)
1278 cli.sendto(MSG, sendaddr)
1280 msg, recvaddr = srv.recvfrom(1024)
1282 self.assertEqual(cli.getsockname(), recvaddr)
1283 self.assertEqual(msg, MSG)
1286 class TIPCThreadableTest (unittest.TestCase, ThreadableTest):
1287 def __init__(self, methodName = 'runTest'):
1288 unittest.TestCase.__init__(self, methodName = methodName)
1289 ThreadableTest.__init__(self)
1291 def setUp(self):
1292 self.srv = socket.socket(socket.AF_TIPC, socket.SOCK_STREAM)
1293 self.srv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
1294 srvaddr = (socket.TIPC_ADDR_NAMESEQ, TIPC_STYPE,
1295 TIPC_LOWER, TIPC_UPPER)
1296 self.srv.bind(srvaddr)
1297 self.srv.listen(5)
1298 self.serverExplicitReady()
1299 self.conn, self.connaddr = self.srv.accept()
1301 def clientSetUp(self):
1302 # The is a hittable race between serverExplicitReady() and the
1303 # accept() call; sleep a little while to avoid it, otherwise
1304 # we could get an exception
1305 time.sleep(0.1)
1306 self.cli = socket.socket(socket.AF_TIPC, socket.SOCK_STREAM)
1307 addr = (socket.TIPC_ADDR_NAME, TIPC_STYPE,
1308 TIPC_LOWER + (TIPC_UPPER - TIPC_LOWER) / 2, 0)
1309 self.cli.connect(addr)
1310 self.cliaddr = self.cli.getsockname()
1312 def testStream(self):
1313 msg = self.conn.recv(1024)
1314 self.assertEqual(msg, MSG)
1315 self.assertEqual(self.cliaddr, self.connaddr)
1317 def _testStream(self):
1318 self.cli.send(MSG)
1319 self.cli.close()
1322 def test_main():
1323 tests = [GeneralModuleTests, BasicTCPTest, TCPCloserTest, TCPTimeoutTest,
1324 TestExceptions, BufferIOTest, BasicTCPTest2]
1325 if sys.platform != 'mac':
1326 tests.extend([ BasicUDPTest, UDPTimeoutTest ])
1328 tests.extend([
1329 NonBlockingTCPTests,
1330 FileObjectClassTestCase,
1331 FileObjectInterruptedTestCase,
1332 UnbufferedFileObjectClassTestCase,
1333 LineBufferedFileObjectClassTestCase,
1334 SmallBufferedFileObjectClassTestCase,
1335 Urllib2FileobjectTest,
1336 NetworkConnectionNoServer,
1337 NetworkConnectionAttributesTest,
1338 NetworkConnectionBehaviourTest,
1340 if hasattr(socket, "socketpair"):
1341 tests.append(BasicSocketPairTest)
1342 if sys.platform == 'linux2':
1343 tests.append(TestLinuxAbstractNamespace)
1344 if isTipcAvailable():
1345 tests.append(TIPCTest)
1346 tests.append(TIPCThreadableTest)
1348 thread_info = test_support.threading_setup()
1349 test_support.run_unittest(*tests)
1350 test_support.threading_cleanup(*thread_info)
1352 if __name__ == "__main__":
1353 test_main()