Change a variable type to avoid signed overflow; replace repeated '19999' constant...
[python.git] / Lib / test / test_socket.py
blobfaa06c5a8d1b8ff450404c50e7b23a5aa65fbac9
1 #!/usr/bin/env python
3 import unittest
4 from test import test_support
6 import errno
7 import socket
8 import select
9 import thread, threading
10 import time
11 import traceback
12 import Queue
13 import sys
14 import os
15 import array
16 from weakref import proxy
17 import signal
19 HOST = test_support.HOST
20 MSG = 'Michael Gilfix was here\n'
22 class SocketTCPTest(unittest.TestCase):
24 def setUp(self):
25 self.serv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
26 self.port = test_support.bind_port(self.serv)
27 self.serv.listen(1)
29 def tearDown(self):
30 self.serv.close()
31 self.serv = None
33 class SocketUDPTest(unittest.TestCase):
35 def setUp(self):
36 self.serv = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
37 self.port = test_support.bind_port(self.serv)
39 def tearDown(self):
40 self.serv.close()
41 self.serv = None
43 class ThreadableTest:
44 """Threadable Test class
46 The ThreadableTest class makes it easy to create a threaded
47 client/server pair from an existing unit test. To create a
48 new threaded class from an existing unit test, use multiple
49 inheritance:
51 class NewClass (OldClass, ThreadableTest):
52 pass
54 This class defines two new fixture functions with obvious
55 purposes for overriding:
57 clientSetUp ()
58 clientTearDown ()
60 Any new test functions within the class must then define
61 tests in pairs, where the test name is preceeded with a
62 '_' to indicate the client portion of the test. Ex:
64 def testFoo(self):
65 # Server portion
67 def _testFoo(self):
68 # Client portion
70 Any exceptions raised by the clients during their tests
71 are caught and transferred to the main thread to alert
72 the testing framework.
74 Note, the server setup function cannot call any blocking
75 functions that rely on the client thread during setup,
76 unless serverExplicitReady() is called just before
77 the blocking call (such as in setting up a client/server
78 connection and performing the accept() in setUp().
79 """
81 def __init__(self):
82 # Swap the true setup function
83 self.__setUp = self.setUp
84 self.__tearDown = self.tearDown
85 self.setUp = self._setUp
86 self.tearDown = self._tearDown
88 def serverExplicitReady(self):
89 """This method allows the server to explicitly indicate that
90 it wants the client thread to proceed. This is useful if the
91 server is about to execute a blocking routine that is
92 dependent upon the client thread during its setup routine."""
93 self.server_ready.set()
95 def _setUp(self):
96 self.server_ready = threading.Event()
97 self.client_ready = threading.Event()
98 self.done = threading.Event()
99 self.queue = Queue.Queue(1)
101 # Do some munging to start the client test.
102 methodname = self.id()
103 i = methodname.rfind('.')
104 methodname = methodname[i+1:]
105 test_method = getattr(self, '_' + methodname)
106 self.client_thread = thread.start_new_thread(
107 self.clientRun, (test_method,))
109 self.__setUp()
110 if not self.server_ready.is_set():
111 self.server_ready.set()
112 self.client_ready.wait()
114 def _tearDown(self):
115 self.__tearDown()
116 self.done.wait()
118 if not self.queue.empty():
119 msg = self.queue.get()
120 self.fail(msg)
122 def clientRun(self, test_func):
123 self.server_ready.wait()
124 self.client_ready.set()
125 self.clientSetUp()
126 if not callable(test_func):
127 raise TypeError, "test_func must be a callable function"
128 try:
129 test_func()
130 except Exception, strerror:
131 self.queue.put(strerror)
132 self.clientTearDown()
134 def clientSetUp(self):
135 raise NotImplementedError, "clientSetUp must be implemented."
137 def clientTearDown(self):
138 self.done.set()
139 thread.exit()
141 class ThreadedTCPSocketTest(SocketTCPTest, ThreadableTest):
143 def __init__(self, methodName='runTest'):
144 SocketTCPTest.__init__(self, methodName=methodName)
145 ThreadableTest.__init__(self)
147 def clientSetUp(self):
148 self.cli = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
150 def clientTearDown(self):
151 self.cli.close()
152 self.cli = None
153 ThreadableTest.clientTearDown(self)
155 class ThreadedUDPSocketTest(SocketUDPTest, ThreadableTest):
157 def __init__(self, methodName='runTest'):
158 SocketUDPTest.__init__(self, methodName=methodName)
159 ThreadableTest.__init__(self)
161 def clientSetUp(self):
162 self.cli = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
164 class SocketConnectedTest(ThreadedTCPSocketTest):
166 def __init__(self, methodName='runTest'):
167 ThreadedTCPSocketTest.__init__(self, methodName=methodName)
169 def setUp(self):
170 ThreadedTCPSocketTest.setUp(self)
171 # Indicate explicitly we're ready for the client thread to
172 # proceed and then perform the blocking call to accept
173 self.serverExplicitReady()
174 conn, addr = self.serv.accept()
175 self.cli_conn = conn
177 def tearDown(self):
178 self.cli_conn.close()
179 self.cli_conn = None
180 ThreadedTCPSocketTest.tearDown(self)
182 def clientSetUp(self):
183 ThreadedTCPSocketTest.clientSetUp(self)
184 self.cli.connect((HOST, self.port))
185 self.serv_conn = self.cli
187 def clientTearDown(self):
188 self.serv_conn.close()
189 self.serv_conn = None
190 ThreadedTCPSocketTest.clientTearDown(self)
192 class SocketPairTest(unittest.TestCase, ThreadableTest):
194 def __init__(self, methodName='runTest'):
195 unittest.TestCase.__init__(self, methodName=methodName)
196 ThreadableTest.__init__(self)
198 def setUp(self):
199 self.serv, self.cli = socket.socketpair()
201 def tearDown(self):
202 self.serv.close()
203 self.serv = None
205 def clientSetUp(self):
206 pass
208 def clientTearDown(self):
209 self.cli.close()
210 self.cli = None
211 ThreadableTest.clientTearDown(self)
214 #######################################################################
215 ## Begin Tests
217 class GeneralModuleTests(unittest.TestCase):
219 def test_weakref(self):
220 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
221 p = proxy(s)
222 self.assertEqual(p.fileno(), s.fileno())
223 s.close()
224 s = None
225 try:
226 p.fileno()
227 except ReferenceError:
228 pass
229 else:
230 self.fail('Socket proxy still exists')
232 def testSocketError(self):
233 # Testing socket module exceptions
234 def raise_error(*args, **kwargs):
235 raise socket.error
236 def raise_herror(*args, **kwargs):
237 raise socket.herror
238 def raise_gaierror(*args, **kwargs):
239 raise socket.gaierror
240 self.assertRaises(socket.error, raise_error,
241 "Error raising socket exception.")
242 self.assertRaises(socket.error, raise_herror,
243 "Error raising socket exception.")
244 self.assertRaises(socket.error, raise_gaierror,
245 "Error raising socket exception.")
247 def testCrucialConstants(self):
248 # Testing for mission critical constants
249 socket.AF_INET
250 socket.SOCK_STREAM
251 socket.SOCK_DGRAM
252 socket.SOCK_RAW
253 socket.SOCK_RDM
254 socket.SOCK_SEQPACKET
255 socket.SOL_SOCKET
256 socket.SO_REUSEADDR
258 def testHostnameRes(self):
259 # Testing hostname resolution mechanisms
260 hostname = socket.gethostname()
261 try:
262 ip = socket.gethostbyname(hostname)
263 except socket.error:
264 # Probably name lookup wasn't set up right; skip this test
265 return
266 self.assertTrue(ip.find('.') >= 0, "Error resolving host to ip.")
267 try:
268 hname, aliases, ipaddrs = socket.gethostbyaddr(ip)
269 except socket.error:
270 # Probably a similar problem as above; skip this test
271 return
272 all_host_names = [hostname, hname] + aliases
273 fqhn = socket.getfqdn(ip)
274 if not fqhn in all_host_names:
275 self.fail("Error testing host resolution mechanisms. (fqdn: %s, all: %s)" % (fqhn, repr(all_host_names)))
277 def testRefCountGetNameInfo(self):
278 # Testing reference count for getnameinfo
279 if hasattr(sys, "getrefcount"):
280 try:
281 # On some versions, this loses a reference
282 orig = sys.getrefcount(__name__)
283 socket.getnameinfo(__name__,0)
284 except TypeError:
285 if sys.getrefcount(__name__) <> orig:
286 self.fail("socket.getnameinfo loses a reference")
288 def testInterpreterCrash(self):
289 # Making sure getnameinfo doesn't crash the interpreter
290 try:
291 # On some versions, this crashes the interpreter.
292 socket.getnameinfo(('x', 0, 0, 0), 0)
293 except socket.error:
294 pass
296 def testNtoH(self):
297 # This just checks that htons etc. are their own inverse,
298 # when looking at the lower 16 or 32 bits.
299 sizes = {socket.htonl: 32, socket.ntohl: 32,
300 socket.htons: 16, socket.ntohs: 16}
301 for func, size in sizes.items():
302 mask = (1L<<size) - 1
303 for i in (0, 1, 0xffff, ~0xffff, 2, 0x01234567, 0x76543210):
304 self.assertEqual(i & mask, func(func(i&mask)) & mask)
306 swapped = func(mask)
307 self.assertEqual(swapped & mask, mask)
308 self.assertRaises(OverflowError, func, 1L<<34)
310 def testNtoHErrors(self):
311 good_values = [ 1, 2, 3, 1L, 2L, 3L ]
312 bad_values = [ -1, -2, -3, -1L, -2L, -3L ]
313 for k in good_values:
314 socket.ntohl(k)
315 socket.ntohs(k)
316 socket.htonl(k)
317 socket.htons(k)
318 for k in bad_values:
319 self.assertRaises(OverflowError, socket.ntohl, k)
320 self.assertRaises(OverflowError, socket.ntohs, k)
321 self.assertRaises(OverflowError, socket.htonl, k)
322 self.assertRaises(OverflowError, socket.htons, k)
324 def testGetServBy(self):
325 eq = self.assertEqual
326 # Find one service that exists, then check all the related interfaces.
327 # I've ordered this by protocols that have both a tcp and udp
328 # protocol, at least for modern Linuxes.
329 if sys.platform in ('linux2', 'freebsd4', 'freebsd5', 'freebsd6',
330 'freebsd7', 'freebsd8', 'darwin'):
331 # avoid the 'echo' service on this platform, as there is an
332 # assumption breaking non-standard port/protocol entry
333 services = ('daytime', 'qotd', 'domain')
334 else:
335 services = ('echo', 'daytime', 'domain')
336 for service in services:
337 try:
338 port = socket.getservbyname(service, 'tcp')
339 break
340 except socket.error:
341 pass
342 else:
343 raise socket.error
344 # Try same call with optional protocol omitted
345 port2 = socket.getservbyname(service)
346 eq(port, port2)
347 # Try udp, but don't barf it it doesn't exist
348 try:
349 udpport = socket.getservbyname(service, 'udp')
350 except socket.error:
351 udpport = None
352 else:
353 eq(udpport, port)
354 # Now make sure the lookup by port returns the same service name
355 eq(socket.getservbyport(port2), service)
356 eq(socket.getservbyport(port, 'tcp'), service)
357 if udpport is not None:
358 eq(socket.getservbyport(udpport, 'udp'), service)
359 # Make sure getservbyport does not accept out of range ports.
360 self.assertRaises(OverflowError, socket.getservbyport, -1)
361 self.assertRaises(OverflowError, socket.getservbyport, 65536)
363 def testDefaultTimeout(self):
364 # Testing default timeout
365 # The default timeout should initially be None
366 self.assertEqual(socket.getdefaulttimeout(), None)
367 s = socket.socket()
368 self.assertEqual(s.gettimeout(), None)
369 s.close()
371 # Set the default timeout to 10, and see if it propagates
372 socket.setdefaulttimeout(10)
373 self.assertEqual(socket.getdefaulttimeout(), 10)
374 s = socket.socket()
375 self.assertEqual(s.gettimeout(), 10)
376 s.close()
378 # Reset the default timeout to None, and see if it propagates
379 socket.setdefaulttimeout(None)
380 self.assertEqual(socket.getdefaulttimeout(), None)
381 s = socket.socket()
382 self.assertEqual(s.gettimeout(), None)
383 s.close()
385 # Check that setting it to an invalid value raises ValueError
386 self.assertRaises(ValueError, socket.setdefaulttimeout, -1)
388 # Check that setting it to an invalid type raises TypeError
389 self.assertRaises(TypeError, socket.setdefaulttimeout, "spam")
391 def testIPv4_inet_aton_fourbytes(self):
392 if not hasattr(socket, 'inet_aton'):
393 return # No inet_aton, nothing to check
394 # Test that issue1008086 and issue767150 are fixed.
395 # It must return 4 bytes.
396 self.assertEquals('\x00'*4, socket.inet_aton('0.0.0.0'))
397 self.assertEquals('\xff'*4, socket.inet_aton('255.255.255.255'))
399 def testIPv4toString(self):
400 if not hasattr(socket, 'inet_pton'):
401 return # No inet_pton() on this platform
402 from socket import inet_aton as f, inet_pton, AF_INET
403 g = lambda a: inet_pton(AF_INET, a)
405 self.assertEquals('\x00\x00\x00\x00', f('0.0.0.0'))
406 self.assertEquals('\xff\x00\xff\x00', f('255.0.255.0'))
407 self.assertEquals('\xaa\xaa\xaa\xaa', f('170.170.170.170'))
408 self.assertEquals('\x01\x02\x03\x04', f('1.2.3.4'))
409 self.assertEquals('\xff\xff\xff\xff', f('255.255.255.255'))
411 self.assertEquals('\x00\x00\x00\x00', g('0.0.0.0'))
412 self.assertEquals('\xff\x00\xff\x00', g('255.0.255.0'))
413 self.assertEquals('\xaa\xaa\xaa\xaa', g('170.170.170.170'))
414 self.assertEquals('\xff\xff\xff\xff', g('255.255.255.255'))
416 def testIPv6toString(self):
417 if not hasattr(socket, 'inet_pton'):
418 return # No inet_pton() on this platform
419 try:
420 from socket import inet_pton, AF_INET6, has_ipv6
421 if not has_ipv6:
422 return
423 except ImportError:
424 return
425 f = lambda a: inet_pton(AF_INET6, a)
427 self.assertEquals('\x00' * 16, f('::'))
428 self.assertEquals('\x00' * 16, f('0::0'))
429 self.assertEquals('\x00\x01' + '\x00' * 14, f('1::'))
430 self.assertEquals(
431 '\x45\xef\x76\xcb\x00\x1a\x56\xef\xaf\xeb\x0b\xac\x19\x24\xae\xae',
432 f('45ef:76cb:1a:56ef:afeb:bac:1924:aeae')
435 def testStringToIPv4(self):
436 if not hasattr(socket, 'inet_ntop'):
437 return # No inet_ntop() on this platform
438 from socket import inet_ntoa as f, inet_ntop, AF_INET
439 g = lambda a: inet_ntop(AF_INET, a)
441 self.assertEquals('1.0.1.0', f('\x01\x00\x01\x00'))
442 self.assertEquals('170.85.170.85', f('\xaa\x55\xaa\x55'))
443 self.assertEquals('255.255.255.255', f('\xff\xff\xff\xff'))
444 self.assertEquals('1.2.3.4', f('\x01\x02\x03\x04'))
446 self.assertEquals('1.0.1.0', g('\x01\x00\x01\x00'))
447 self.assertEquals('170.85.170.85', g('\xaa\x55\xaa\x55'))
448 self.assertEquals('255.255.255.255', g('\xff\xff\xff\xff'))
450 def testStringToIPv6(self):
451 if not hasattr(socket, 'inet_ntop'):
452 return # No inet_ntop() on this platform
453 try:
454 from socket import inet_ntop, AF_INET6, has_ipv6
455 if not has_ipv6:
456 return
457 except ImportError:
458 return
459 f = lambda a: inet_ntop(AF_INET6, a)
461 self.assertEquals('::', f('\x00' * 16))
462 self.assertEquals('::1', f('\x00' * 15 + '\x01'))
463 self.assertEquals(
464 'aef:b01:506:1001:ffff:9997:55:170',
465 f('\x0a\xef\x0b\x01\x05\x06\x10\x01\xff\xff\x99\x97\x00\x55\x01\x70')
468 # XXX The following don't test module-level functionality...
470 def _get_unused_port(self, bind_address='0.0.0.0'):
471 """Use a temporary socket to elicit an unused ephemeral port.
473 Args:
474 bind_address: Hostname or IP address to search for a port on.
476 Returns: A most likely to be unused port.
478 tempsock = socket.socket()
479 tempsock.bind((bind_address, 0))
480 host, port = tempsock.getsockname()
481 tempsock.close()
482 return port
484 def testSockName(self):
485 # Testing getsockname()
486 port = self._get_unused_port()
487 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
488 sock.bind(("0.0.0.0", port))
489 name = sock.getsockname()
490 # XXX(nnorwitz): http://tinyurl.com/os5jz seems to indicate
491 # it reasonable to get the host's addr in addition to 0.0.0.0.
492 # At least for eCos. This is required for the S/390 to pass.
493 my_ip_addr = socket.gethostbyname(socket.gethostname())
494 self.assertTrue(name[0] in ("0.0.0.0", my_ip_addr), '%s invalid' % name[0])
495 self.assertEqual(name[1], port)
497 def testGetSockOpt(self):
498 # Testing getsockopt()
499 # We know a socket should start without reuse==0
500 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
501 reuse = sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR)
502 self.assertFalse(reuse != 0, "initial mode is reuse")
504 def testSetSockOpt(self):
505 # Testing setsockopt()
506 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
507 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
508 reuse = sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR)
509 self.assertFalse(reuse == 0, "failed to set reuse mode")
511 def testSendAfterClose(self):
512 # testing send() after close() with timeout
513 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
514 sock.settimeout(1)
515 sock.close()
516 self.assertRaises(socket.error, sock.send, "spam")
518 def testNewAttributes(self):
519 # testing .family, .type and .protocol
520 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
521 self.assertEqual(sock.family, socket.AF_INET)
522 self.assertEqual(sock.type, socket.SOCK_STREAM)
523 self.assertEqual(sock.proto, 0)
524 sock.close()
526 def test_getsockaddrarg(self):
527 host = '0.0.0.0'
528 port = self._get_unused_port(bind_address=host)
529 big_port = port + 65536
530 neg_port = port - 65536
531 sock = socket.socket()
532 try:
533 self.assertRaises(OverflowError, sock.bind, (host, big_port))
534 self.assertRaises(OverflowError, sock.bind, (host, neg_port))
535 sock.bind((host, port))
536 finally:
537 sock.close()
539 def test_sock_ioctl(self):
540 if os.name != "nt":
541 return
542 self.assertTrue(hasattr(socket.socket, 'ioctl'))
543 self.assertTrue(hasattr(socket, 'SIO_RCVALL'))
544 self.assertTrue(hasattr(socket, 'RCVALL_ON'))
545 self.assertTrue(hasattr(socket, 'RCVALL_OFF'))
546 self.assertTrue(hasattr(socket, 'SIO_KEEPALIVE_VALS'))
547 s = socket.socket()
548 self.assertRaises(ValueError, s.ioctl, -1, None)
549 s.ioctl(socket.SIO_KEEPALIVE_VALS, (1, 100, 100))
552 class BasicTCPTest(SocketConnectedTest):
554 def __init__(self, methodName='runTest'):
555 SocketConnectedTest.__init__(self, methodName=methodName)
557 def testRecv(self):
558 # Testing large receive over TCP
559 msg = self.cli_conn.recv(1024)
560 self.assertEqual(msg, MSG)
562 def _testRecv(self):
563 self.serv_conn.send(MSG)
565 def testOverFlowRecv(self):
566 # Testing receive in chunks over TCP
567 seg1 = self.cli_conn.recv(len(MSG) - 3)
568 seg2 = self.cli_conn.recv(1024)
569 msg = seg1 + seg2
570 self.assertEqual(msg, MSG)
572 def _testOverFlowRecv(self):
573 self.serv_conn.send(MSG)
575 def testRecvFrom(self):
576 # Testing large recvfrom() over TCP
577 msg, addr = self.cli_conn.recvfrom(1024)
578 self.assertEqual(msg, MSG)
580 def _testRecvFrom(self):
581 self.serv_conn.send(MSG)
583 def testOverFlowRecvFrom(self):
584 # Testing recvfrom() in chunks over TCP
585 seg1, addr = self.cli_conn.recvfrom(len(MSG)-3)
586 seg2, addr = self.cli_conn.recvfrom(1024)
587 msg = seg1 + seg2
588 self.assertEqual(msg, MSG)
590 def _testOverFlowRecvFrom(self):
591 self.serv_conn.send(MSG)
593 def testSendAll(self):
594 # Testing sendall() with a 2048 byte string over TCP
595 msg = ''
596 while 1:
597 read = self.cli_conn.recv(1024)
598 if not read:
599 break
600 msg += read
601 self.assertEqual(msg, 'f' * 2048)
603 def _testSendAll(self):
604 big_chunk = 'f' * 2048
605 self.serv_conn.sendall(big_chunk)
607 def testFromFd(self):
608 # Testing fromfd()
609 if not hasattr(socket, "fromfd"):
610 return # On Windows, this doesn't exist
611 fd = self.cli_conn.fileno()
612 sock = socket.fromfd(fd, socket.AF_INET, socket.SOCK_STREAM)
613 msg = sock.recv(1024)
614 self.assertEqual(msg, MSG)
616 def _testFromFd(self):
617 self.serv_conn.send(MSG)
619 def testShutdown(self):
620 # Testing shutdown()
621 msg = self.cli_conn.recv(1024)
622 self.assertEqual(msg, MSG)
623 # wait for _testShutdown to finish: on OS X, when the server
624 # closes the connection the client also becomes disconnected,
625 # and the client's shutdown call will fail. (Issue #4397.)
626 self.done.wait()
628 def _testShutdown(self):
629 self.serv_conn.send(MSG)
630 self.serv_conn.shutdown(2)
632 class BasicUDPTest(ThreadedUDPSocketTest):
634 def __init__(self, methodName='runTest'):
635 ThreadedUDPSocketTest.__init__(self, methodName=methodName)
637 def testSendtoAndRecv(self):
638 # Testing sendto() and Recv() over UDP
639 msg = self.serv.recv(len(MSG))
640 self.assertEqual(msg, MSG)
642 def _testSendtoAndRecv(self):
643 self.cli.sendto(MSG, 0, (HOST, self.port))
645 def testRecvFrom(self):
646 # Testing recvfrom() over UDP
647 msg, addr = self.serv.recvfrom(len(MSG))
648 self.assertEqual(msg, MSG)
650 def _testRecvFrom(self):
651 self.cli.sendto(MSG, 0, (HOST, self.port))
653 def testRecvFromNegative(self):
654 # Negative lengths passed to recvfrom should give ValueError.
655 self.assertRaises(ValueError, self.serv.recvfrom, -1)
657 def _testRecvFromNegative(self):
658 self.cli.sendto(MSG, 0, (HOST, self.port))
660 class TCPCloserTest(ThreadedTCPSocketTest):
662 def testClose(self):
663 conn, addr = self.serv.accept()
664 conn.close()
666 sd = self.cli
667 read, write, err = select.select([sd], [], [], 1.0)
668 self.assertEqual(read, [sd])
669 self.assertEqual(sd.recv(1), '')
671 def _testClose(self):
672 self.cli.connect((HOST, self.port))
673 time.sleep(1.0)
675 class BasicSocketPairTest(SocketPairTest):
677 def __init__(self, methodName='runTest'):
678 SocketPairTest.__init__(self, methodName=methodName)
680 def testRecv(self):
681 msg = self.serv.recv(1024)
682 self.assertEqual(msg, MSG)
684 def _testRecv(self):
685 self.cli.send(MSG)
687 def testSend(self):
688 self.serv.send(MSG)
690 def _testSend(self):
691 msg = self.cli.recv(1024)
692 self.assertEqual(msg, MSG)
694 class NonBlockingTCPTests(ThreadedTCPSocketTest):
696 def __init__(self, methodName='runTest'):
697 ThreadedTCPSocketTest.__init__(self, methodName=methodName)
699 def testSetBlocking(self):
700 # Testing whether set blocking works
701 self.serv.setblocking(0)
702 start = time.time()
703 try:
704 self.serv.accept()
705 except socket.error:
706 pass
707 end = time.time()
708 self.assertTrue((end - start) < 1.0, "Error setting non-blocking mode.")
710 def _testSetBlocking(self):
711 pass
713 def testAccept(self):
714 # Testing non-blocking accept
715 self.serv.setblocking(0)
716 try:
717 conn, addr = self.serv.accept()
718 except socket.error:
719 pass
720 else:
721 self.fail("Error trying to do non-blocking accept.")
722 read, write, err = select.select([self.serv], [], [])
723 if self.serv in read:
724 conn, addr = self.serv.accept()
725 else:
726 self.fail("Error trying to do accept after select.")
728 def _testAccept(self):
729 time.sleep(0.1)
730 self.cli.connect((HOST, self.port))
732 def testConnect(self):
733 # Testing non-blocking connect
734 conn, addr = self.serv.accept()
736 def _testConnect(self):
737 self.cli.settimeout(10)
738 self.cli.connect((HOST, self.port))
740 def testRecv(self):
741 # Testing non-blocking recv
742 conn, addr = self.serv.accept()
743 conn.setblocking(0)
744 try:
745 msg = conn.recv(len(MSG))
746 except socket.error:
747 pass
748 else:
749 self.fail("Error trying to do non-blocking recv.")
750 read, write, err = select.select([conn], [], [])
751 if conn in read:
752 msg = conn.recv(len(MSG))
753 self.assertEqual(msg, MSG)
754 else:
755 self.fail("Error during select call to non-blocking socket.")
757 def _testRecv(self):
758 self.cli.connect((HOST, self.port))
759 time.sleep(0.1)
760 self.cli.send(MSG)
762 class FileObjectClassTestCase(SocketConnectedTest):
764 bufsize = -1 # Use default buffer size
766 def __init__(self, methodName='runTest'):
767 SocketConnectedTest.__init__(self, methodName=methodName)
769 def setUp(self):
770 SocketConnectedTest.setUp(self)
771 self.serv_file = self.cli_conn.makefile('rb', self.bufsize)
773 def tearDown(self):
774 self.serv_file.close()
775 self.assertTrue(self.serv_file.closed)
776 self.serv_file = None
777 SocketConnectedTest.tearDown(self)
779 def clientSetUp(self):
780 SocketConnectedTest.clientSetUp(self)
781 self.cli_file = self.serv_conn.makefile('wb')
783 def clientTearDown(self):
784 self.cli_file.close()
785 self.assertTrue(self.cli_file.closed)
786 self.cli_file = None
787 SocketConnectedTest.clientTearDown(self)
789 def testSmallRead(self):
790 # Performing small file read test
791 first_seg = self.serv_file.read(len(MSG)-3)
792 second_seg = self.serv_file.read(3)
793 msg = first_seg + second_seg
794 self.assertEqual(msg, MSG)
796 def _testSmallRead(self):
797 self.cli_file.write(MSG)
798 self.cli_file.flush()
800 def testFullRead(self):
801 # read until EOF
802 msg = self.serv_file.read()
803 self.assertEqual(msg, MSG)
805 def _testFullRead(self):
806 self.cli_file.write(MSG)
807 self.cli_file.close()
809 def testUnbufferedRead(self):
810 # Performing unbuffered file read test
811 buf = ''
812 while 1:
813 char = self.serv_file.read(1)
814 if not char:
815 break
816 buf += char
817 self.assertEqual(buf, MSG)
819 def _testUnbufferedRead(self):
820 self.cli_file.write(MSG)
821 self.cli_file.flush()
823 def testReadline(self):
824 # Performing file readline test
825 line = self.serv_file.readline()
826 self.assertEqual(line, MSG)
828 def _testReadline(self):
829 self.cli_file.write(MSG)
830 self.cli_file.flush()
832 def testReadlineAfterRead(self):
833 a_baloo_is = self.serv_file.read(len("A baloo is"))
834 self.assertEqual("A baloo is", a_baloo_is)
835 _a_bear = self.serv_file.read(len(" a bear"))
836 self.assertEqual(" a bear", _a_bear)
837 line = self.serv_file.readline()
838 self.assertEqual("\n", line)
839 line = self.serv_file.readline()
840 self.assertEqual("A BALOO IS A BEAR.\n", line)
841 line = self.serv_file.readline()
842 self.assertEqual(MSG, line)
844 def _testReadlineAfterRead(self):
845 self.cli_file.write("A baloo is a bear\n")
846 self.cli_file.write("A BALOO IS A BEAR.\n")
847 self.cli_file.write(MSG)
848 self.cli_file.flush()
850 def testReadlineAfterReadNoNewline(self):
851 end_of_ = self.serv_file.read(len("End Of "))
852 self.assertEqual("End Of ", end_of_)
853 line = self.serv_file.readline()
854 self.assertEqual("Line", line)
856 def _testReadlineAfterReadNoNewline(self):
857 self.cli_file.write("End Of Line")
859 def testClosedAttr(self):
860 self.assertTrue(not self.serv_file.closed)
862 def _testClosedAttr(self):
863 self.assertTrue(not self.cli_file.closed)
866 class FileObjectInterruptedTestCase(unittest.TestCase):
867 """Test that the file object correctly handles EINTR internally."""
869 class MockSocket(object):
870 def __init__(self, recv_funcs=()):
871 # A generator that returns callables that we'll call for each
872 # call to recv().
873 self._recv_step = iter(recv_funcs)
875 def recv(self, size):
876 return self._recv_step.next()()
878 @staticmethod
879 def _raise_eintr():
880 raise socket.error(errno.EINTR)
882 def _test_readline(self, size=-1, **kwargs):
883 mock_sock = self.MockSocket(recv_funcs=[
884 lambda : "This is the first line\nAnd the sec",
885 self._raise_eintr,
886 lambda : "ond line is here\n",
887 lambda : "",
889 fo = socket._fileobject(mock_sock, **kwargs)
890 self.assertEquals(fo.readline(size), "This is the first line\n")
891 self.assertEquals(fo.readline(size), "And the second line is here\n")
893 def _test_read(self, size=-1, **kwargs):
894 mock_sock = self.MockSocket(recv_funcs=[
895 lambda : "This is the first line\nAnd the sec",
896 self._raise_eintr,
897 lambda : "ond line is here\n",
898 lambda : "",
900 fo = socket._fileobject(mock_sock, **kwargs)
901 self.assertEquals(fo.read(size), "This is the first line\n"
902 "And the second line is here\n")
904 def test_default(self):
905 self._test_readline()
906 self._test_readline(size=100)
907 self._test_read()
908 self._test_read(size=100)
910 def test_with_1k_buffer(self):
911 self._test_readline(bufsize=1024)
912 self._test_readline(size=100, bufsize=1024)
913 self._test_read(bufsize=1024)
914 self._test_read(size=100, bufsize=1024)
916 def _test_readline_no_buffer(self, size=-1):
917 mock_sock = self.MockSocket(recv_funcs=[
918 lambda : "aa",
919 lambda : "\n",
920 lambda : "BB",
921 self._raise_eintr,
922 lambda : "bb",
923 lambda : "",
925 fo = socket._fileobject(mock_sock, bufsize=0)
926 self.assertEquals(fo.readline(size), "aa\n")
927 self.assertEquals(fo.readline(size), "BBbb")
929 def test_no_buffer(self):
930 self._test_readline_no_buffer()
931 self._test_readline_no_buffer(size=4)
932 self._test_read(bufsize=0)
933 self._test_read(size=100, bufsize=0)
936 class UnbufferedFileObjectClassTestCase(FileObjectClassTestCase):
938 """Repeat the tests from FileObjectClassTestCase with bufsize==0.
940 In this case (and in this case only), it should be possible to
941 create a file object, read a line from it, create another file
942 object, read another line from it, without loss of data in the
943 first file object's buffer. Note that httplib relies on this
944 when reading multiple requests from the same socket."""
946 bufsize = 0 # Use unbuffered mode
948 def testUnbufferedReadline(self):
949 # Read a line, create a new file object, read another line with it
950 line = self.serv_file.readline() # first line
951 self.assertEqual(line, "A. " + MSG) # first line
952 self.serv_file = self.cli_conn.makefile('rb', 0)
953 line = self.serv_file.readline() # second line
954 self.assertEqual(line, "B. " + MSG) # second line
956 def _testUnbufferedReadline(self):
957 self.cli_file.write("A. " + MSG)
958 self.cli_file.write("B. " + MSG)
959 self.cli_file.flush()
961 class LineBufferedFileObjectClassTestCase(FileObjectClassTestCase):
963 bufsize = 1 # Default-buffered for reading; line-buffered for writing
966 class SmallBufferedFileObjectClassTestCase(FileObjectClassTestCase):
968 bufsize = 2 # Exercise the buffering code
971 class NetworkConnectionTest(object):
972 """Prove network connection."""
973 def clientSetUp(self):
974 # We're inherited below by BasicTCPTest2, which also inherits
975 # BasicTCPTest, which defines self.port referenced below.
976 self.cli = socket.create_connection((HOST, self.port))
977 self.serv_conn = self.cli
979 class BasicTCPTest2(NetworkConnectionTest, BasicTCPTest):
980 """Tests that NetworkConnection does not break existing TCP functionality.
983 class NetworkConnectionNoServer(unittest.TestCase):
984 def testWithoutServer(self):
985 port = test_support.find_unused_port()
986 self.assertRaises(
987 socket.error,
988 lambda: socket.create_connection((HOST, port))
991 class NetworkConnectionAttributesTest(SocketTCPTest, ThreadableTest):
993 def __init__(self, methodName='runTest'):
994 SocketTCPTest.__init__(self, methodName=methodName)
995 ThreadableTest.__init__(self)
997 def clientSetUp(self):
998 self.source_port = test_support.find_unused_port()
1000 def clientTearDown(self):
1001 self.cli.close()
1002 self.cli = None
1003 ThreadableTest.clientTearDown(self)
1005 def _justAccept(self):
1006 conn, addr = self.serv.accept()
1008 testFamily = _justAccept
1009 def _testFamily(self):
1010 self.cli = socket.create_connection((HOST, self.port), timeout=30)
1011 self.assertEqual(self.cli.family, 2)
1013 testSourceAddress = _justAccept
1014 def _testSourceAddress(self):
1015 self.cli = socket.create_connection((HOST, self.port), timeout=30,
1016 source_address=('', self.source_port))
1017 self.assertEqual(self.cli.getsockname()[1], self.source_port)
1018 # The port number being used is sufficient to show that the bind()
1019 # call happened.
1021 testTimeoutDefault = _justAccept
1022 def _testTimeoutDefault(self):
1023 # passing no explicit timeout uses socket's global default
1024 self.assertTrue(socket.getdefaulttimeout() is None)
1025 socket.setdefaulttimeout(42)
1026 try:
1027 self.cli = socket.create_connection((HOST, self.port))
1028 finally:
1029 socket.setdefaulttimeout(None)
1030 self.assertEquals(self.cli.gettimeout(), 42)
1032 testTimeoutNone = _justAccept
1033 def _testTimeoutNone(self):
1034 # None timeout means the same as sock.settimeout(None)
1035 self.assertTrue(socket.getdefaulttimeout() is None)
1036 socket.setdefaulttimeout(30)
1037 try:
1038 self.cli = socket.create_connection((HOST, self.port), timeout=None)
1039 finally:
1040 socket.setdefaulttimeout(None)
1041 self.assertEqual(self.cli.gettimeout(), None)
1043 testTimeoutValueNamed = _justAccept
1044 def _testTimeoutValueNamed(self):
1045 self.cli = socket.create_connection((HOST, self.port), timeout=30)
1046 self.assertEqual(self.cli.gettimeout(), 30)
1048 testTimeoutValueNonamed = _justAccept
1049 def _testTimeoutValueNonamed(self):
1050 self.cli = socket.create_connection((HOST, self.port), 30)
1051 self.assertEqual(self.cli.gettimeout(), 30)
1053 class NetworkConnectionBehaviourTest(SocketTCPTest, ThreadableTest):
1055 def __init__(self, methodName='runTest'):
1056 SocketTCPTest.__init__(self, methodName=methodName)
1057 ThreadableTest.__init__(self)
1059 def clientSetUp(self):
1060 pass
1062 def clientTearDown(self):
1063 self.cli.close()
1064 self.cli = None
1065 ThreadableTest.clientTearDown(self)
1067 def testInsideTimeout(self):
1068 conn, addr = self.serv.accept()
1069 time.sleep(3)
1070 conn.send("done!")
1071 testOutsideTimeout = testInsideTimeout
1073 def _testInsideTimeout(self):
1074 self.cli = sock = socket.create_connection((HOST, self.port))
1075 data = sock.recv(5)
1076 self.assertEqual(data, "done!")
1078 def _testOutsideTimeout(self):
1079 self.cli = sock = socket.create_connection((HOST, self.port), timeout=1)
1080 self.assertRaises(socket.timeout, lambda: sock.recv(5))
1083 class Urllib2FileobjectTest(unittest.TestCase):
1085 # urllib2.HTTPHandler has "borrowed" socket._fileobject, and requires that
1086 # it close the socket if the close c'tor argument is true
1088 def testClose(self):
1089 class MockSocket:
1090 closed = False
1091 def flush(self): pass
1092 def close(self): self.closed = True
1094 # must not close unless we request it: the original use of _fileobject
1095 # by module socket requires that the underlying socket not be closed until
1096 # the _socketobject that created the _fileobject is closed
1097 s = MockSocket()
1098 f = socket._fileobject(s)
1099 f.close()
1100 self.assertTrue(not s.closed)
1102 s = MockSocket()
1103 f = socket._fileobject(s, close=True)
1104 f.close()
1105 self.assertTrue(s.closed)
1107 class TCPTimeoutTest(SocketTCPTest):
1109 def testTCPTimeout(self):
1110 def raise_timeout(*args, **kwargs):
1111 self.serv.settimeout(1.0)
1112 self.serv.accept()
1113 self.assertRaises(socket.timeout, raise_timeout,
1114 "Error generating a timeout exception (TCP)")
1116 def testTimeoutZero(self):
1117 ok = False
1118 try:
1119 self.serv.settimeout(0.0)
1120 foo = self.serv.accept()
1121 except socket.timeout:
1122 self.fail("caught timeout instead of error (TCP)")
1123 except socket.error:
1124 ok = True
1125 except:
1126 self.fail("caught unexpected exception (TCP)")
1127 if not ok:
1128 self.fail("accept() returned success when we did not expect it")
1130 def testInterruptedTimeout(self):
1131 # XXX I don't know how to do this test on MSWindows or any other
1132 # plaform that doesn't support signal.alarm() or os.kill(), though
1133 # the bug should have existed on all platforms.
1134 if not hasattr(signal, "alarm"):
1135 return # can only test on *nix
1136 self.serv.settimeout(5.0) # must be longer than alarm
1137 class Alarm(Exception):
1138 pass
1139 def alarm_handler(signal, frame):
1140 raise Alarm
1141 old_alarm = signal.signal(signal.SIGALRM, alarm_handler)
1142 try:
1143 signal.alarm(2) # POSIX allows alarm to be up to 1 second early
1144 try:
1145 foo = self.serv.accept()
1146 except socket.timeout:
1147 self.fail("caught timeout instead of Alarm")
1148 except Alarm:
1149 pass
1150 except:
1151 self.fail("caught other exception instead of Alarm:"
1152 " %s(%s):\n%s" %
1153 (sys.exc_info()[:2] + (traceback.format_exc(),)))
1154 else:
1155 self.fail("nothing caught")
1156 finally:
1157 signal.alarm(0) # shut off alarm
1158 except Alarm:
1159 self.fail("got Alarm in wrong place")
1160 finally:
1161 # no alarm can be pending. Safe to restore old handler.
1162 signal.signal(signal.SIGALRM, old_alarm)
1164 class UDPTimeoutTest(SocketTCPTest):
1166 def testUDPTimeout(self):
1167 def raise_timeout(*args, **kwargs):
1168 self.serv.settimeout(1.0)
1169 self.serv.recv(1024)
1170 self.assertRaises(socket.timeout, raise_timeout,
1171 "Error generating a timeout exception (UDP)")
1173 def testTimeoutZero(self):
1174 ok = False
1175 try:
1176 self.serv.settimeout(0.0)
1177 foo = self.serv.recv(1024)
1178 except socket.timeout:
1179 self.fail("caught timeout instead of error (UDP)")
1180 except socket.error:
1181 ok = True
1182 except:
1183 self.fail("caught unexpected exception (UDP)")
1184 if not ok:
1185 self.fail("recv() returned success when we did not expect it")
1187 class TestExceptions(unittest.TestCase):
1189 def testExceptionTree(self):
1190 self.assertTrue(issubclass(socket.error, Exception))
1191 self.assertTrue(issubclass(socket.herror, socket.error))
1192 self.assertTrue(issubclass(socket.gaierror, socket.error))
1193 self.assertTrue(issubclass(socket.timeout, socket.error))
1195 class TestLinuxAbstractNamespace(unittest.TestCase):
1197 UNIX_PATH_MAX = 108
1199 def testLinuxAbstractNamespace(self):
1200 address = "\x00python-test-hello\x00\xff"
1201 s1 = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
1202 s1.bind(address)
1203 s1.listen(1)
1204 s2 = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
1205 s2.connect(s1.getsockname())
1206 s1.accept()
1207 self.assertEqual(s1.getsockname(), address)
1208 self.assertEqual(s2.getpeername(), address)
1210 def testMaxName(self):
1211 address = "\x00" + "h" * (self.UNIX_PATH_MAX - 1)
1212 s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
1213 s.bind(address)
1214 self.assertEqual(s.getsockname(), address)
1216 def testNameOverflow(self):
1217 address = "\x00" + "h" * self.UNIX_PATH_MAX
1218 s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
1219 self.assertRaises(socket.error, s.bind, address)
1222 class BufferIOTest(SocketConnectedTest):
1224 Test the buffer versions of socket.recv() and socket.send().
1226 def __init__(self, methodName='runTest'):
1227 SocketConnectedTest.__init__(self, methodName=methodName)
1229 def testRecvInto(self):
1230 buf = array.array('c', ' '*1024)
1231 nbytes = self.cli_conn.recv_into(buf)
1232 self.assertEqual(nbytes, len(MSG))
1233 msg = buf.tostring()[:len(MSG)]
1234 self.assertEqual(msg, MSG)
1236 def _testRecvInto(self):
1237 buf = buffer(MSG)
1238 self.serv_conn.send(buf)
1240 def testRecvFromInto(self):
1241 buf = array.array('c', ' '*1024)
1242 nbytes, addr = self.cli_conn.recvfrom_into(buf)
1243 self.assertEqual(nbytes, len(MSG))
1244 msg = buf.tostring()[:len(MSG)]
1245 self.assertEqual(msg, MSG)
1247 def _testRecvFromInto(self):
1248 buf = buffer(MSG)
1249 self.serv_conn.send(buf)
1252 TIPC_STYPE = 2000
1253 TIPC_LOWER = 200
1254 TIPC_UPPER = 210
1256 def isTipcAvailable():
1257 """Check if the TIPC module is loaded
1259 The TIPC module is not loaded automatically on Ubuntu and probably
1260 other Linux distros.
1262 if not hasattr(socket, "AF_TIPC"):
1263 return False
1264 if not os.path.isfile("/proc/modules"):
1265 return False
1266 with open("/proc/modules") as f:
1267 for line in f:
1268 if line.startswith("tipc "):
1269 return True
1270 if test_support.verbose:
1271 print "TIPC module is not loaded, please 'sudo modprobe tipc'"
1272 return False
1274 class TIPCTest (unittest.TestCase):
1275 def testRDM(self):
1276 srv = socket.socket(socket.AF_TIPC, socket.SOCK_RDM)
1277 cli = socket.socket(socket.AF_TIPC, socket.SOCK_RDM)
1279 srv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
1280 srvaddr = (socket.TIPC_ADDR_NAMESEQ, TIPC_STYPE,
1281 TIPC_LOWER, TIPC_UPPER)
1282 srv.bind(srvaddr)
1284 sendaddr = (socket.TIPC_ADDR_NAME, TIPC_STYPE,
1285 TIPC_LOWER + (TIPC_UPPER - TIPC_LOWER) / 2, 0)
1286 cli.sendto(MSG, sendaddr)
1288 msg, recvaddr = srv.recvfrom(1024)
1290 self.assertEqual(cli.getsockname(), recvaddr)
1291 self.assertEqual(msg, MSG)
1294 class TIPCThreadableTest (unittest.TestCase, ThreadableTest):
1295 def __init__(self, methodName = 'runTest'):
1296 unittest.TestCase.__init__(self, methodName = methodName)
1297 ThreadableTest.__init__(self)
1299 def setUp(self):
1300 self.srv = socket.socket(socket.AF_TIPC, socket.SOCK_STREAM)
1301 self.srv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
1302 srvaddr = (socket.TIPC_ADDR_NAMESEQ, TIPC_STYPE,
1303 TIPC_LOWER, TIPC_UPPER)
1304 self.srv.bind(srvaddr)
1305 self.srv.listen(5)
1306 self.serverExplicitReady()
1307 self.conn, self.connaddr = self.srv.accept()
1309 def clientSetUp(self):
1310 # The is a hittable race between serverExplicitReady() and the
1311 # accept() call; sleep a little while to avoid it, otherwise
1312 # we could get an exception
1313 time.sleep(0.1)
1314 self.cli = socket.socket(socket.AF_TIPC, socket.SOCK_STREAM)
1315 addr = (socket.TIPC_ADDR_NAME, TIPC_STYPE,
1316 TIPC_LOWER + (TIPC_UPPER - TIPC_LOWER) / 2, 0)
1317 self.cli.connect(addr)
1318 self.cliaddr = self.cli.getsockname()
1320 def testStream(self):
1321 msg = self.conn.recv(1024)
1322 self.assertEqual(msg, MSG)
1323 self.assertEqual(self.cliaddr, self.connaddr)
1325 def _testStream(self):
1326 self.cli.send(MSG)
1327 self.cli.close()
1330 def test_main():
1331 tests = [GeneralModuleTests, BasicTCPTest, TCPCloserTest, TCPTimeoutTest,
1332 TestExceptions, BufferIOTest, BasicTCPTest2]
1333 if sys.platform != 'mac':
1334 tests.extend([ BasicUDPTest, UDPTimeoutTest ])
1336 tests.extend([
1337 NonBlockingTCPTests,
1338 FileObjectClassTestCase,
1339 FileObjectInterruptedTestCase,
1340 UnbufferedFileObjectClassTestCase,
1341 LineBufferedFileObjectClassTestCase,
1342 SmallBufferedFileObjectClassTestCase,
1343 Urllib2FileobjectTest,
1344 NetworkConnectionNoServer,
1345 NetworkConnectionAttributesTest,
1346 NetworkConnectionBehaviourTest,
1348 if hasattr(socket, "socketpair"):
1349 tests.append(BasicSocketPairTest)
1350 if sys.platform == 'linux2':
1351 tests.append(TestLinuxAbstractNamespace)
1352 if isTipcAvailable():
1353 tests.append(TIPCTest)
1354 tests.append(TIPCThreadableTest)
1356 thread_info = test_support.threading_setup()
1357 test_support.run_unittest(*tests)
1358 test_support.threading_cleanup(*thread_info)
1360 if __name__ == "__main__":
1361 test_main()