Issue #5330: C functions called with keyword arguments were not reported by
[python.git] / Lib / test / test_socket.py
blob5c906d776a70673cb6f7d79a6b87c90fa841d693
1 #!/usr/bin/env python
3 import unittest
4 from test import test_support
6 import errno
7 import socket
8 import select
9 import thread, threading
10 import time
11 import traceback
12 import Queue
13 import sys
14 import os
15 import array
16 from weakref import proxy
17 import signal
19 HOST = test_support.HOST
20 MSG = 'Michael Gilfix was here\n'
22 class SocketTCPTest(unittest.TestCase):
24 def setUp(self):
25 self.serv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
26 self.port = test_support.bind_port(self.serv)
27 self.serv.listen(1)
29 def tearDown(self):
30 self.serv.close()
31 self.serv = None
33 class SocketUDPTest(unittest.TestCase):
35 def setUp(self):
36 self.serv = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
37 self.port = test_support.bind_port(self.serv)
39 def tearDown(self):
40 self.serv.close()
41 self.serv = None
43 class ThreadableTest:
44 """Threadable Test class
46 The ThreadableTest class makes it easy to create a threaded
47 client/server pair from an existing unit test. To create a
48 new threaded class from an existing unit test, use multiple
49 inheritance:
51 class NewClass (OldClass, ThreadableTest):
52 pass
54 This class defines two new fixture functions with obvious
55 purposes for overriding:
57 clientSetUp ()
58 clientTearDown ()
60 Any new test functions within the class must then define
61 tests in pairs, where the test name is preceeded with a
62 '_' to indicate the client portion of the test. Ex:
64 def testFoo(self):
65 # Server portion
67 def _testFoo(self):
68 # Client portion
70 Any exceptions raised by the clients during their tests
71 are caught and transferred to the main thread to alert
72 the testing framework.
74 Note, the server setup function cannot call any blocking
75 functions that rely on the client thread during setup,
76 unless serverExplicitReady() is called just before
77 the blocking call (such as in setting up a client/server
78 connection and performing the accept() in setUp().
79 """
81 def __init__(self):
82 # Swap the true setup function
83 self.__setUp = self.setUp
84 self.__tearDown = self.tearDown
85 self.setUp = self._setUp
86 self.tearDown = self._tearDown
88 def serverExplicitReady(self):
89 """This method allows the server to explicitly indicate that
90 it wants the client thread to proceed. This is useful if the
91 server is about to execute a blocking routine that is
92 dependent upon the client thread during its setup routine."""
93 self.server_ready.set()
95 def _setUp(self):
96 self.server_ready = threading.Event()
97 self.client_ready = threading.Event()
98 self.done = threading.Event()
99 self.queue = Queue.Queue(1)
101 # Do some munging to start the client test.
102 methodname = self.id()
103 i = methodname.rfind('.')
104 methodname = methodname[i+1:]
105 test_method = getattr(self, '_' + methodname)
106 self.client_thread = thread.start_new_thread(
107 self.clientRun, (test_method,))
109 self.__setUp()
110 if not self.server_ready.is_set():
111 self.server_ready.set()
112 self.client_ready.wait()
114 def _tearDown(self):
115 self.__tearDown()
116 self.done.wait()
118 if not self.queue.empty():
119 msg = self.queue.get()
120 self.fail(msg)
122 def clientRun(self, test_func):
123 self.server_ready.wait()
124 self.client_ready.set()
125 self.clientSetUp()
126 if not callable(test_func):
127 raise TypeError, "test_func must be a callable function"
128 try:
129 test_func()
130 except Exception, strerror:
131 self.queue.put(strerror)
132 self.clientTearDown()
134 def clientSetUp(self):
135 raise NotImplementedError, "clientSetUp must be implemented."
137 def clientTearDown(self):
138 self.done.set()
139 thread.exit()
141 class ThreadedTCPSocketTest(SocketTCPTest, ThreadableTest):
143 def __init__(self, methodName='runTest'):
144 SocketTCPTest.__init__(self, methodName=methodName)
145 ThreadableTest.__init__(self)
147 def clientSetUp(self):
148 self.cli = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
150 def clientTearDown(self):
151 self.cli.close()
152 self.cli = None
153 ThreadableTest.clientTearDown(self)
155 class ThreadedUDPSocketTest(SocketUDPTest, ThreadableTest):
157 def __init__(self, methodName='runTest'):
158 SocketUDPTest.__init__(self, methodName=methodName)
159 ThreadableTest.__init__(self)
161 def clientSetUp(self):
162 self.cli = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
164 class SocketConnectedTest(ThreadedTCPSocketTest):
166 def __init__(self, methodName='runTest'):
167 ThreadedTCPSocketTest.__init__(self, methodName=methodName)
169 def setUp(self):
170 ThreadedTCPSocketTest.setUp(self)
171 # Indicate explicitly we're ready for the client thread to
172 # proceed and then perform the blocking call to accept
173 self.serverExplicitReady()
174 conn, addr = self.serv.accept()
175 self.cli_conn = conn
177 def tearDown(self):
178 self.cli_conn.close()
179 self.cli_conn = None
180 ThreadedTCPSocketTest.tearDown(self)
182 def clientSetUp(self):
183 ThreadedTCPSocketTest.clientSetUp(self)
184 self.cli.connect((HOST, self.port))
185 self.serv_conn = self.cli
187 def clientTearDown(self):
188 self.serv_conn.close()
189 self.serv_conn = None
190 ThreadedTCPSocketTest.clientTearDown(self)
192 class SocketPairTest(unittest.TestCase, ThreadableTest):
194 def __init__(self, methodName='runTest'):
195 unittest.TestCase.__init__(self, methodName=methodName)
196 ThreadableTest.__init__(self)
198 def setUp(self):
199 self.serv, self.cli = socket.socketpair()
201 def tearDown(self):
202 self.serv.close()
203 self.serv = None
205 def clientSetUp(self):
206 pass
208 def clientTearDown(self):
209 self.cli.close()
210 self.cli = None
211 ThreadableTest.clientTearDown(self)
214 #######################################################################
215 ## Begin Tests
217 class GeneralModuleTests(unittest.TestCase):
219 def test_weakref(self):
220 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
221 p = proxy(s)
222 self.assertEqual(p.fileno(), s.fileno())
223 s.close()
224 s = None
225 try:
226 p.fileno()
227 except ReferenceError:
228 pass
229 else:
230 self.fail('Socket proxy still exists')
232 def testSocketError(self):
233 # Testing socket module exceptions
234 def raise_error(*args, **kwargs):
235 raise socket.error
236 def raise_herror(*args, **kwargs):
237 raise socket.herror
238 def raise_gaierror(*args, **kwargs):
239 raise socket.gaierror
240 self.failUnlessRaises(socket.error, raise_error,
241 "Error raising socket exception.")
242 self.failUnlessRaises(socket.error, raise_herror,
243 "Error raising socket exception.")
244 self.failUnlessRaises(socket.error, raise_gaierror,
245 "Error raising socket exception.")
247 def testCrucialConstants(self):
248 # Testing for mission critical constants
249 socket.AF_INET
250 socket.SOCK_STREAM
251 socket.SOCK_DGRAM
252 socket.SOCK_RAW
253 socket.SOCK_RDM
254 socket.SOCK_SEQPACKET
255 socket.SOL_SOCKET
256 socket.SO_REUSEADDR
258 def testHostnameRes(self):
259 # Testing hostname resolution mechanisms
260 hostname = socket.gethostname()
261 try:
262 ip = socket.gethostbyname(hostname)
263 except socket.error:
264 # Probably name lookup wasn't set up right; skip this test
265 return
266 self.assert_(ip.find('.') >= 0, "Error resolving host to ip.")
267 try:
268 hname, aliases, ipaddrs = socket.gethostbyaddr(ip)
269 except socket.error:
270 # Probably a similar problem as above; skip this test
271 return
272 all_host_names = [hostname, hname] + aliases
273 fqhn = socket.getfqdn(ip)
274 if not fqhn in all_host_names:
275 self.fail("Error testing host resolution mechanisms. (fqdn: %s, all: %s)" % (fqhn, repr(all_host_names)))
277 def testRefCountGetNameInfo(self):
278 # Testing reference count for getnameinfo
279 if hasattr(sys, "getrefcount"):
280 try:
281 # On some versions, this loses a reference
282 orig = sys.getrefcount(__name__)
283 socket.getnameinfo(__name__,0)
284 except SystemError:
285 if sys.getrefcount(__name__) <> orig:
286 self.fail("socket.getnameinfo loses a reference")
288 def testInterpreterCrash(self):
289 # Making sure getnameinfo doesn't crash the interpreter
290 try:
291 # On some versions, this crashes the interpreter.
292 socket.getnameinfo(('x', 0, 0, 0), 0)
293 except socket.error:
294 pass
296 def testNtoH(self):
297 # This just checks that htons etc. are their own inverse,
298 # when looking at the lower 16 or 32 bits.
299 sizes = {socket.htonl: 32, socket.ntohl: 32,
300 socket.htons: 16, socket.ntohs: 16}
301 for func, size in sizes.items():
302 mask = (1L<<size) - 1
303 for i in (0, 1, 0xffff, ~0xffff, 2, 0x01234567, 0x76543210):
304 self.assertEqual(i & mask, func(func(i&mask)) & mask)
306 swapped = func(mask)
307 self.assertEqual(swapped & mask, mask)
308 self.assertRaises(OverflowError, func, 1L<<34)
310 def testNtoHErrors(self):
311 good_values = [ 1, 2, 3, 1L, 2L, 3L ]
312 bad_values = [ -1, -2, -3, -1L, -2L, -3L ]
313 for k in good_values:
314 socket.ntohl(k)
315 socket.ntohs(k)
316 socket.htonl(k)
317 socket.htons(k)
318 for k in bad_values:
319 self.assertRaises(OverflowError, socket.ntohl, k)
320 self.assertRaises(OverflowError, socket.ntohs, k)
321 self.assertRaises(OverflowError, socket.htonl, k)
322 self.assertRaises(OverflowError, socket.htons, k)
324 def testGetServBy(self):
325 eq = self.assertEqual
326 # Find one service that exists, then check all the related interfaces.
327 # I've ordered this by protocols that have both a tcp and udp
328 # protocol, at least for modern Linuxes.
329 if sys.platform in ('linux2', 'freebsd4', 'freebsd5', 'freebsd6',
330 'freebsd7', 'freebsd8', 'darwin'):
331 # avoid the 'echo' service on this platform, as there is an
332 # assumption breaking non-standard port/protocol entry
333 services = ('daytime', 'qotd', 'domain')
334 else:
335 services = ('echo', 'daytime', 'domain')
336 for service in services:
337 try:
338 port = socket.getservbyname(service, 'tcp')
339 break
340 except socket.error:
341 pass
342 else:
343 raise socket.error
344 # Try same call with optional protocol omitted
345 port2 = socket.getservbyname(service)
346 eq(port, port2)
347 # Try udp, but don't barf it it doesn't exist
348 try:
349 udpport = socket.getservbyname(service, 'udp')
350 except socket.error:
351 udpport = None
352 else:
353 eq(udpport, port)
354 # Now make sure the lookup by port returns the same service name
355 eq(socket.getservbyport(port2), service)
356 eq(socket.getservbyport(port, 'tcp'), service)
357 if udpport is not None:
358 eq(socket.getservbyport(udpport, 'udp'), service)
359 # Make sure getservbyport does not accept out of range ports.
360 self.assertRaises(OverflowError, socket.getservbyport, -1)
361 self.assertRaises(OverflowError, socket.getservbyport, 65536)
363 def testDefaultTimeout(self):
364 # Testing default timeout
365 # The default timeout should initially be None
366 self.assertEqual(socket.getdefaulttimeout(), None)
367 s = socket.socket()
368 self.assertEqual(s.gettimeout(), None)
369 s.close()
371 # Set the default timeout to 10, and see if it propagates
372 socket.setdefaulttimeout(10)
373 self.assertEqual(socket.getdefaulttimeout(), 10)
374 s = socket.socket()
375 self.assertEqual(s.gettimeout(), 10)
376 s.close()
378 # Reset the default timeout to None, and see if it propagates
379 socket.setdefaulttimeout(None)
380 self.assertEqual(socket.getdefaulttimeout(), None)
381 s = socket.socket()
382 self.assertEqual(s.gettimeout(), None)
383 s.close()
385 # Check that setting it to an invalid value raises ValueError
386 self.assertRaises(ValueError, socket.setdefaulttimeout, -1)
388 # Check that setting it to an invalid type raises TypeError
389 self.assertRaises(TypeError, socket.setdefaulttimeout, "spam")
391 def testIPv4_inet_aton_fourbytes(self):
392 if not hasattr(socket, 'inet_aton'):
393 return # No inet_aton, nothing to check
394 # Test that issue1008086 and issue767150 are fixed.
395 # It must return 4 bytes.
396 self.assertEquals('\x00'*4, socket.inet_aton('0.0.0.0'))
397 self.assertEquals('\xff'*4, socket.inet_aton('255.255.255.255'))
399 def testIPv4toString(self):
400 if not hasattr(socket, 'inet_pton'):
401 return # No inet_pton() on this platform
402 from socket import inet_aton as f, inet_pton, AF_INET
403 g = lambda a: inet_pton(AF_INET, a)
405 self.assertEquals('\x00\x00\x00\x00', f('0.0.0.0'))
406 self.assertEquals('\xff\x00\xff\x00', f('255.0.255.0'))
407 self.assertEquals('\xaa\xaa\xaa\xaa', f('170.170.170.170'))
408 self.assertEquals('\x01\x02\x03\x04', f('1.2.3.4'))
409 self.assertEquals('\xff\xff\xff\xff', f('255.255.255.255'))
411 self.assertEquals('\x00\x00\x00\x00', g('0.0.0.0'))
412 self.assertEquals('\xff\x00\xff\x00', g('255.0.255.0'))
413 self.assertEquals('\xaa\xaa\xaa\xaa', g('170.170.170.170'))
414 self.assertEquals('\xff\xff\xff\xff', g('255.255.255.255'))
416 def testIPv6toString(self):
417 if not hasattr(socket, 'inet_pton'):
418 return # No inet_pton() on this platform
419 try:
420 from socket import inet_pton, AF_INET6, has_ipv6
421 if not has_ipv6:
422 return
423 except ImportError:
424 return
425 f = lambda a: inet_pton(AF_INET6, a)
427 self.assertEquals('\x00' * 16, f('::'))
428 self.assertEquals('\x00' * 16, f('0::0'))
429 self.assertEquals('\x00\x01' + '\x00' * 14, f('1::'))
430 self.assertEquals(
431 '\x45\xef\x76\xcb\x00\x1a\x56\xef\xaf\xeb\x0b\xac\x19\x24\xae\xae',
432 f('45ef:76cb:1a:56ef:afeb:bac:1924:aeae')
435 def testStringToIPv4(self):
436 if not hasattr(socket, 'inet_ntop'):
437 return # No inet_ntop() on this platform
438 from socket import inet_ntoa as f, inet_ntop, AF_INET
439 g = lambda a: inet_ntop(AF_INET, a)
441 self.assertEquals('1.0.1.0', f('\x01\x00\x01\x00'))
442 self.assertEquals('170.85.170.85', f('\xaa\x55\xaa\x55'))
443 self.assertEquals('255.255.255.255', f('\xff\xff\xff\xff'))
444 self.assertEquals('1.2.3.4', f('\x01\x02\x03\x04'))
446 self.assertEquals('1.0.1.0', g('\x01\x00\x01\x00'))
447 self.assertEquals('170.85.170.85', g('\xaa\x55\xaa\x55'))
448 self.assertEquals('255.255.255.255', g('\xff\xff\xff\xff'))
450 def testStringToIPv6(self):
451 if not hasattr(socket, 'inet_ntop'):
452 return # No inet_ntop() on this platform
453 try:
454 from socket import inet_ntop, AF_INET6, has_ipv6
455 if not has_ipv6:
456 return
457 except ImportError:
458 return
459 f = lambda a: inet_ntop(AF_INET6, a)
461 self.assertEquals('::', f('\x00' * 16))
462 self.assertEquals('::1', f('\x00' * 15 + '\x01'))
463 self.assertEquals(
464 'aef:b01:506:1001:ffff:9997:55:170',
465 f('\x0a\xef\x0b\x01\x05\x06\x10\x01\xff\xff\x99\x97\x00\x55\x01\x70')
468 # XXX The following don't test module-level functionality...
470 def _get_unused_port(self, bind_address='0.0.0.0'):
471 """Use a temporary socket to elicit an unused ephemeral port.
473 Args:
474 bind_address: Hostname or IP address to search for a port on.
476 Returns: A most likely to be unused port.
478 tempsock = socket.socket()
479 tempsock.bind((bind_address, 0))
480 host, port = tempsock.getsockname()
481 tempsock.close()
482 return port
484 def testSockName(self):
485 # Testing getsockname()
486 port = self._get_unused_port()
487 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
488 sock.bind(("0.0.0.0", port))
489 name = sock.getsockname()
490 # XXX(nnorwitz): http://tinyurl.com/os5jz seems to indicate
491 # it reasonable to get the host's addr in addition to 0.0.0.0.
492 # At least for eCos. This is required for the S/390 to pass.
493 my_ip_addr = socket.gethostbyname(socket.gethostname())
494 self.assert_(name[0] in ("0.0.0.0", my_ip_addr), '%s invalid' % name[0])
495 self.assertEqual(name[1], port)
497 def testGetSockOpt(self):
498 # Testing getsockopt()
499 # We know a socket should start without reuse==0
500 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
501 reuse = sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR)
502 self.failIf(reuse != 0, "initial mode is reuse")
504 def testSetSockOpt(self):
505 # Testing setsockopt()
506 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
507 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
508 reuse = sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR)
509 self.failIf(reuse == 0, "failed to set reuse mode")
511 def testSendAfterClose(self):
512 # testing send() after close() with timeout
513 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
514 sock.settimeout(1)
515 sock.close()
516 self.assertRaises(socket.error, sock.send, "spam")
518 def testNewAttributes(self):
519 # testing .family, .type and .protocol
520 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
521 self.assertEqual(sock.family, socket.AF_INET)
522 self.assertEqual(sock.type, socket.SOCK_STREAM)
523 self.assertEqual(sock.proto, 0)
524 sock.close()
526 def test_getsockaddrarg(self):
527 host = '0.0.0.0'
528 port = self._get_unused_port(bind_address=host)
529 big_port = port + 65536
530 neg_port = port - 65536
531 sock = socket.socket()
532 try:
533 self.assertRaises(OverflowError, sock.bind, (host, big_port))
534 self.assertRaises(OverflowError, sock.bind, (host, neg_port))
535 sock.bind((host, port))
536 finally:
537 sock.close()
539 def test_sock_ioctl(self):
540 if os.name != "nt":
541 return
542 self.assert_(hasattr(socket.socket, 'ioctl'))
543 self.assert_(hasattr(socket, 'SIO_RCVALL'))
544 self.assert_(hasattr(socket, 'RCVALL_ON'))
545 self.assert_(hasattr(socket, 'RCVALL_OFF'))
548 class BasicTCPTest(SocketConnectedTest):
550 def __init__(self, methodName='runTest'):
551 SocketConnectedTest.__init__(self, methodName=methodName)
553 def testRecv(self):
554 # Testing large receive over TCP
555 msg = self.cli_conn.recv(1024)
556 self.assertEqual(msg, MSG)
558 def _testRecv(self):
559 self.serv_conn.send(MSG)
561 def testOverFlowRecv(self):
562 # Testing receive in chunks over TCP
563 seg1 = self.cli_conn.recv(len(MSG) - 3)
564 seg2 = self.cli_conn.recv(1024)
565 msg = seg1 + seg2
566 self.assertEqual(msg, MSG)
568 def _testOverFlowRecv(self):
569 self.serv_conn.send(MSG)
571 def testRecvFrom(self):
572 # Testing large recvfrom() over TCP
573 msg, addr = self.cli_conn.recvfrom(1024)
574 self.assertEqual(msg, MSG)
576 def _testRecvFrom(self):
577 self.serv_conn.send(MSG)
579 def testOverFlowRecvFrom(self):
580 # Testing recvfrom() in chunks over TCP
581 seg1, addr = self.cli_conn.recvfrom(len(MSG)-3)
582 seg2, addr = self.cli_conn.recvfrom(1024)
583 msg = seg1 + seg2
584 self.assertEqual(msg, MSG)
586 def _testOverFlowRecvFrom(self):
587 self.serv_conn.send(MSG)
589 def testSendAll(self):
590 # Testing sendall() with a 2048 byte string over TCP
591 msg = ''
592 while 1:
593 read = self.cli_conn.recv(1024)
594 if not read:
595 break
596 msg += read
597 self.assertEqual(msg, 'f' * 2048)
599 def _testSendAll(self):
600 big_chunk = 'f' * 2048
601 self.serv_conn.sendall(big_chunk)
603 def testFromFd(self):
604 # Testing fromfd()
605 if not hasattr(socket, "fromfd"):
606 return # On Windows, this doesn't exist
607 fd = self.cli_conn.fileno()
608 sock = socket.fromfd(fd, socket.AF_INET, socket.SOCK_STREAM)
609 msg = sock.recv(1024)
610 self.assertEqual(msg, MSG)
612 def _testFromFd(self):
613 self.serv_conn.send(MSG)
615 def testShutdown(self):
616 # Testing shutdown()
617 msg = self.cli_conn.recv(1024)
618 self.assertEqual(msg, MSG)
619 # wait for _testShutdown to finish: on OS X, when the server
620 # closes the connection the client also becomes disconnected,
621 # and the client's shutdown call will fail. (Issue #4397.)
622 self.done.wait()
624 def _testShutdown(self):
625 self.serv_conn.send(MSG)
626 self.serv_conn.shutdown(2)
628 class BasicUDPTest(ThreadedUDPSocketTest):
630 def __init__(self, methodName='runTest'):
631 ThreadedUDPSocketTest.__init__(self, methodName=methodName)
633 def testSendtoAndRecv(self):
634 # Testing sendto() and Recv() over UDP
635 msg = self.serv.recv(len(MSG))
636 self.assertEqual(msg, MSG)
638 def _testSendtoAndRecv(self):
639 self.cli.sendto(MSG, 0, (HOST, self.port))
641 def testRecvFrom(self):
642 # Testing recvfrom() over UDP
643 msg, addr = self.serv.recvfrom(len(MSG))
644 self.assertEqual(msg, MSG)
646 def _testRecvFrom(self):
647 self.cli.sendto(MSG, 0, (HOST, self.port))
649 def testRecvFromNegative(self):
650 # Negative lengths passed to recvfrom should give ValueError.
651 self.assertRaises(ValueError, self.serv.recvfrom, -1)
653 def _testRecvFromNegative(self):
654 self.cli.sendto(MSG, 0, (HOST, self.port))
656 class TCPCloserTest(ThreadedTCPSocketTest):
658 def testClose(self):
659 conn, addr = self.serv.accept()
660 conn.close()
662 sd = self.cli
663 read, write, err = select.select([sd], [], [], 1.0)
664 self.assertEqual(read, [sd])
665 self.assertEqual(sd.recv(1), '')
667 def _testClose(self):
668 self.cli.connect((HOST, self.port))
669 time.sleep(1.0)
671 class BasicSocketPairTest(SocketPairTest):
673 def __init__(self, methodName='runTest'):
674 SocketPairTest.__init__(self, methodName=methodName)
676 def testRecv(self):
677 msg = self.serv.recv(1024)
678 self.assertEqual(msg, MSG)
680 def _testRecv(self):
681 self.cli.send(MSG)
683 def testSend(self):
684 self.serv.send(MSG)
686 def _testSend(self):
687 msg = self.cli.recv(1024)
688 self.assertEqual(msg, MSG)
690 class NonBlockingTCPTests(ThreadedTCPSocketTest):
692 def __init__(self, methodName='runTest'):
693 ThreadedTCPSocketTest.__init__(self, methodName=methodName)
695 def testSetBlocking(self):
696 # Testing whether set blocking works
697 self.serv.setblocking(0)
698 start = time.time()
699 try:
700 self.serv.accept()
701 except socket.error:
702 pass
703 end = time.time()
704 self.assert_((end - start) < 1.0, "Error setting non-blocking mode.")
706 def _testSetBlocking(self):
707 pass
709 def testAccept(self):
710 # Testing non-blocking accept
711 self.serv.setblocking(0)
712 try:
713 conn, addr = self.serv.accept()
714 except socket.error:
715 pass
716 else:
717 self.fail("Error trying to do non-blocking accept.")
718 read, write, err = select.select([self.serv], [], [])
719 if self.serv in read:
720 conn, addr = self.serv.accept()
721 else:
722 self.fail("Error trying to do accept after select.")
724 def _testAccept(self):
725 time.sleep(0.1)
726 self.cli.connect((HOST, self.port))
728 def testConnect(self):
729 # Testing non-blocking connect
730 conn, addr = self.serv.accept()
732 def _testConnect(self):
733 self.cli.settimeout(10)
734 self.cli.connect((HOST, self.port))
736 def testRecv(self):
737 # Testing non-blocking recv
738 conn, addr = self.serv.accept()
739 conn.setblocking(0)
740 try:
741 msg = conn.recv(len(MSG))
742 except socket.error:
743 pass
744 else:
745 self.fail("Error trying to do non-blocking recv.")
746 read, write, err = select.select([conn], [], [])
747 if conn in read:
748 msg = conn.recv(len(MSG))
749 self.assertEqual(msg, MSG)
750 else:
751 self.fail("Error during select call to non-blocking socket.")
753 def _testRecv(self):
754 self.cli.connect((HOST, self.port))
755 time.sleep(0.1)
756 self.cli.send(MSG)
758 class FileObjectClassTestCase(SocketConnectedTest):
760 bufsize = -1 # Use default buffer size
762 def __init__(self, methodName='runTest'):
763 SocketConnectedTest.__init__(self, methodName=methodName)
765 def setUp(self):
766 SocketConnectedTest.setUp(self)
767 self.serv_file = self.cli_conn.makefile('rb', self.bufsize)
769 def tearDown(self):
770 self.serv_file.close()
771 self.assert_(self.serv_file.closed)
772 self.serv_file = None
773 SocketConnectedTest.tearDown(self)
775 def clientSetUp(self):
776 SocketConnectedTest.clientSetUp(self)
777 self.cli_file = self.serv_conn.makefile('wb')
779 def clientTearDown(self):
780 self.cli_file.close()
781 self.assert_(self.cli_file.closed)
782 self.cli_file = None
783 SocketConnectedTest.clientTearDown(self)
785 def testSmallRead(self):
786 # Performing small file read test
787 first_seg = self.serv_file.read(len(MSG)-3)
788 second_seg = self.serv_file.read(3)
789 msg = first_seg + second_seg
790 self.assertEqual(msg, MSG)
792 def _testSmallRead(self):
793 self.cli_file.write(MSG)
794 self.cli_file.flush()
796 def testFullRead(self):
797 # read until EOF
798 msg = self.serv_file.read()
799 self.assertEqual(msg, MSG)
801 def _testFullRead(self):
802 self.cli_file.write(MSG)
803 self.cli_file.close()
805 def testUnbufferedRead(self):
806 # Performing unbuffered file read test
807 buf = ''
808 while 1:
809 char = self.serv_file.read(1)
810 if not char:
811 break
812 buf += char
813 self.assertEqual(buf, MSG)
815 def _testUnbufferedRead(self):
816 self.cli_file.write(MSG)
817 self.cli_file.flush()
819 def testReadline(self):
820 # Performing file readline test
821 line = self.serv_file.readline()
822 self.assertEqual(line, MSG)
824 def _testReadline(self):
825 self.cli_file.write(MSG)
826 self.cli_file.flush()
828 def testReadlineAfterRead(self):
829 a_baloo_is = self.serv_file.read(len("A baloo is"))
830 self.assertEqual("A baloo is", a_baloo_is)
831 _a_bear = self.serv_file.read(len(" a bear"))
832 self.assertEqual(" a bear", _a_bear)
833 line = self.serv_file.readline()
834 self.assertEqual("\n", line)
835 line = self.serv_file.readline()
836 self.assertEqual("A BALOO IS A BEAR.\n", line)
837 line = self.serv_file.readline()
838 self.assertEqual(MSG, line)
840 def _testReadlineAfterRead(self):
841 self.cli_file.write("A baloo is a bear\n")
842 self.cli_file.write("A BALOO IS A BEAR.\n")
843 self.cli_file.write(MSG)
844 self.cli_file.flush()
846 def testReadlineAfterReadNoNewline(self):
847 end_of_ = self.serv_file.read(len("End Of "))
848 self.assertEqual("End Of ", end_of_)
849 line = self.serv_file.readline()
850 self.assertEqual("Line", line)
852 def _testReadlineAfterReadNoNewline(self):
853 self.cli_file.write("End Of Line")
855 def testClosedAttr(self):
856 self.assert_(not self.serv_file.closed)
858 def _testClosedAttr(self):
859 self.assert_(not self.cli_file.closed)
861 class UnbufferedFileObjectClassTestCase(FileObjectClassTestCase):
863 """Repeat the tests from FileObjectClassTestCase with bufsize==0.
865 In this case (and in this case only), it should be possible to
866 create a file object, read a line from it, create another file
867 object, read another line from it, without loss of data in the
868 first file object's buffer. Note that httplib relies on this
869 when reading multiple requests from the same socket."""
871 bufsize = 0 # Use unbuffered mode
873 def testUnbufferedReadline(self):
874 # Read a line, create a new file object, read another line with it
875 line = self.serv_file.readline() # first line
876 self.assertEqual(line, "A. " + MSG) # first line
877 self.serv_file = self.cli_conn.makefile('rb', 0)
878 line = self.serv_file.readline() # second line
879 self.assertEqual(line, "B. " + MSG) # second line
881 def _testUnbufferedReadline(self):
882 self.cli_file.write("A. " + MSG)
883 self.cli_file.write("B. " + MSG)
884 self.cli_file.flush()
886 class LineBufferedFileObjectClassTestCase(FileObjectClassTestCase):
888 bufsize = 1 # Default-buffered for reading; line-buffered for writing
891 class SmallBufferedFileObjectClassTestCase(FileObjectClassTestCase):
893 bufsize = 2 # Exercise the buffering code
896 class NetworkConnectionTest(object):
897 """Prove network connection."""
898 def clientSetUp(self):
899 # We're inherited below by BasicTCPTest2, which also inherits
900 # BasicTCPTest, which defines self.port referenced below.
901 self.cli = socket.create_connection((HOST, self.port))
902 self.serv_conn = self.cli
904 class BasicTCPTest2(NetworkConnectionTest, BasicTCPTest):
905 """Tests that NetworkConnection does not break existing TCP functionality.
908 class NetworkConnectionNoServer(unittest.TestCase):
909 def testWithoutServer(self):
910 port = test_support.find_unused_port()
911 self.failUnlessRaises(
912 socket.error,
913 lambda: socket.create_connection((HOST, port))
916 class NetworkConnectionAttributesTest(SocketTCPTest, ThreadableTest):
918 def __init__(self, methodName='runTest'):
919 SocketTCPTest.__init__(self, methodName=methodName)
920 ThreadableTest.__init__(self)
922 def clientSetUp(self):
923 pass
925 def clientTearDown(self):
926 self.cli.close()
927 self.cli = None
928 ThreadableTest.clientTearDown(self)
930 def _justAccept(self):
931 conn, addr = self.serv.accept()
933 testFamily = _justAccept
934 def _testFamily(self):
935 self.cli = socket.create_connection((HOST, self.port), timeout=30)
936 self.assertEqual(self.cli.family, 2)
938 testTimeoutDefault = _justAccept
939 def _testTimeoutDefault(self):
940 # passing no explicit timeout uses socket's global default
941 self.assert_(socket.getdefaulttimeout() is None)
942 socket.setdefaulttimeout(42)
943 try:
944 self.cli = socket.create_connection((HOST, self.port))
945 finally:
946 socket.setdefaulttimeout(None)
947 self.assertEquals(self.cli.gettimeout(), 42)
949 testTimeoutNone = _justAccept
950 def _testTimeoutNone(self):
951 # None timeout means the same as sock.settimeout(None)
952 self.assert_(socket.getdefaulttimeout() is None)
953 socket.setdefaulttimeout(30)
954 try:
955 self.cli = socket.create_connection((HOST, self.port), timeout=None)
956 finally:
957 socket.setdefaulttimeout(None)
958 self.assertEqual(self.cli.gettimeout(), None)
960 testTimeoutValueNamed = _justAccept
961 def _testTimeoutValueNamed(self):
962 self.cli = socket.create_connection((HOST, self.port), timeout=30)
963 self.assertEqual(self.cli.gettimeout(), 30)
965 testTimeoutValueNonamed = _justAccept
966 def _testTimeoutValueNonamed(self):
967 self.cli = socket.create_connection((HOST, self.port), 30)
968 self.assertEqual(self.cli.gettimeout(), 30)
970 class NetworkConnectionBehaviourTest(SocketTCPTest, ThreadableTest):
972 def __init__(self, methodName='runTest'):
973 SocketTCPTest.__init__(self, methodName=methodName)
974 ThreadableTest.__init__(self)
976 def clientSetUp(self):
977 pass
979 def clientTearDown(self):
980 self.cli.close()
981 self.cli = None
982 ThreadableTest.clientTearDown(self)
984 def testInsideTimeout(self):
985 conn, addr = self.serv.accept()
986 time.sleep(3)
987 conn.send("done!")
988 testOutsideTimeout = testInsideTimeout
990 def _testInsideTimeout(self):
991 self.cli = sock = socket.create_connection((HOST, self.port))
992 data = sock.recv(5)
993 self.assertEqual(data, "done!")
995 def _testOutsideTimeout(self):
996 self.cli = sock = socket.create_connection((HOST, self.port), timeout=1)
997 self.failUnlessRaises(socket.timeout, lambda: sock.recv(5))
1000 class Urllib2FileobjectTest(unittest.TestCase):
1002 # urllib2.HTTPHandler has "borrowed" socket._fileobject, and requires that
1003 # it close the socket if the close c'tor argument is true
1005 def testClose(self):
1006 class MockSocket:
1007 closed = False
1008 def flush(self): pass
1009 def close(self): self.closed = True
1011 # must not close unless we request it: the original use of _fileobject
1012 # by module socket requires that the underlying socket not be closed until
1013 # the _socketobject that created the _fileobject is closed
1014 s = MockSocket()
1015 f = socket._fileobject(s)
1016 f.close()
1017 self.assert_(not s.closed)
1019 s = MockSocket()
1020 f = socket._fileobject(s, close=True)
1021 f.close()
1022 self.assert_(s.closed)
1024 class TCPTimeoutTest(SocketTCPTest):
1026 def testTCPTimeout(self):
1027 def raise_timeout(*args, **kwargs):
1028 self.serv.settimeout(1.0)
1029 self.serv.accept()
1030 self.failUnlessRaises(socket.timeout, raise_timeout,
1031 "Error generating a timeout exception (TCP)")
1033 def testTimeoutZero(self):
1034 ok = False
1035 try:
1036 self.serv.settimeout(0.0)
1037 foo = self.serv.accept()
1038 except socket.timeout:
1039 self.fail("caught timeout instead of error (TCP)")
1040 except socket.error:
1041 ok = True
1042 except:
1043 self.fail("caught unexpected exception (TCP)")
1044 if not ok:
1045 self.fail("accept() returned success when we did not expect it")
1047 def testInterruptedTimeout(self):
1048 # XXX I don't know how to do this test on MSWindows or any other
1049 # plaform that doesn't support signal.alarm() or os.kill(), though
1050 # the bug should have existed on all platforms.
1051 if not hasattr(signal, "alarm"):
1052 return # can only test on *nix
1053 self.serv.settimeout(5.0) # must be longer than alarm
1054 class Alarm(Exception):
1055 pass
1056 def alarm_handler(signal, frame):
1057 raise Alarm
1058 old_alarm = signal.signal(signal.SIGALRM, alarm_handler)
1059 try:
1060 signal.alarm(2) # POSIX allows alarm to be up to 1 second early
1061 try:
1062 foo = self.serv.accept()
1063 except socket.timeout:
1064 self.fail("caught timeout instead of Alarm")
1065 except Alarm:
1066 pass
1067 except:
1068 self.fail("caught other exception instead of Alarm:"
1069 " %s(%s):\n%s" %
1070 (sys.exc_info()[:2] + (traceback.format_exc(),)))
1071 else:
1072 self.fail("nothing caught")
1073 finally:
1074 signal.alarm(0) # shut off alarm
1075 except Alarm:
1076 self.fail("got Alarm in wrong place")
1077 finally:
1078 # no alarm can be pending. Safe to restore old handler.
1079 signal.signal(signal.SIGALRM, old_alarm)
1081 class UDPTimeoutTest(SocketTCPTest):
1083 def testUDPTimeout(self):
1084 def raise_timeout(*args, **kwargs):
1085 self.serv.settimeout(1.0)
1086 self.serv.recv(1024)
1087 self.failUnlessRaises(socket.timeout, raise_timeout,
1088 "Error generating a timeout exception (UDP)")
1090 def testTimeoutZero(self):
1091 ok = False
1092 try:
1093 self.serv.settimeout(0.0)
1094 foo = self.serv.recv(1024)
1095 except socket.timeout:
1096 self.fail("caught timeout instead of error (UDP)")
1097 except socket.error:
1098 ok = True
1099 except:
1100 self.fail("caught unexpected exception (UDP)")
1101 if not ok:
1102 self.fail("recv() returned success when we did not expect it")
1104 class TestExceptions(unittest.TestCase):
1106 def testExceptionTree(self):
1107 self.assert_(issubclass(socket.error, Exception))
1108 self.assert_(issubclass(socket.herror, socket.error))
1109 self.assert_(issubclass(socket.gaierror, socket.error))
1110 self.assert_(issubclass(socket.timeout, socket.error))
1112 class TestLinuxAbstractNamespace(unittest.TestCase):
1114 UNIX_PATH_MAX = 108
1116 def testLinuxAbstractNamespace(self):
1117 address = "\x00python-test-hello\x00\xff"
1118 s1 = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
1119 s1.bind(address)
1120 s1.listen(1)
1121 s2 = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
1122 s2.connect(s1.getsockname())
1123 s1.accept()
1124 self.assertEqual(s1.getsockname(), address)
1125 self.assertEqual(s2.getpeername(), address)
1127 def testMaxName(self):
1128 address = "\x00" + "h" * (self.UNIX_PATH_MAX - 1)
1129 s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
1130 s.bind(address)
1131 self.assertEqual(s.getsockname(), address)
1133 def testNameOverflow(self):
1134 address = "\x00" + "h" * self.UNIX_PATH_MAX
1135 s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
1136 self.assertRaises(socket.error, s.bind, address)
1139 class BufferIOTest(SocketConnectedTest):
1141 Test the buffer versions of socket.recv() and socket.send().
1143 def __init__(self, methodName='runTest'):
1144 SocketConnectedTest.__init__(self, methodName=methodName)
1146 def testRecvInto(self):
1147 buf = array.array('c', ' '*1024)
1148 nbytes = self.cli_conn.recv_into(buf)
1149 self.assertEqual(nbytes, len(MSG))
1150 msg = buf.tostring()[:len(MSG)]
1151 self.assertEqual(msg, MSG)
1153 def _testRecvInto(self):
1154 buf = buffer(MSG)
1155 self.serv_conn.send(buf)
1157 def testRecvFromInto(self):
1158 buf = array.array('c', ' '*1024)
1159 nbytes, addr = self.cli_conn.recvfrom_into(buf)
1160 self.assertEqual(nbytes, len(MSG))
1161 msg = buf.tostring()[:len(MSG)]
1162 self.assertEqual(msg, MSG)
1164 def _testRecvFromInto(self):
1165 buf = buffer(MSG)
1166 self.serv_conn.send(buf)
1169 TIPC_STYPE = 2000
1170 TIPC_LOWER = 200
1171 TIPC_UPPER = 210
1173 def isTipcAvailable():
1174 """Check if the TIPC module is loaded
1176 The TIPC module is not loaded automatically on Ubuntu and probably
1177 other Linux distros.
1179 if not hasattr(socket, "AF_TIPC"):
1180 return False
1181 if not os.path.isfile("/proc/modules"):
1182 return False
1183 with open("/proc/modules") as f:
1184 for line in f:
1185 if line.startswith("tipc "):
1186 return True
1187 if test_support.verbose:
1188 print "TIPC module is not loaded, please 'sudo modprobe tipc'"
1189 return False
1191 class TIPCTest (unittest.TestCase):
1192 def testRDM(self):
1193 srv = socket.socket(socket.AF_TIPC, socket.SOCK_RDM)
1194 cli = socket.socket(socket.AF_TIPC, socket.SOCK_RDM)
1196 srv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
1197 srvaddr = (socket.TIPC_ADDR_NAMESEQ, TIPC_STYPE,
1198 TIPC_LOWER, TIPC_UPPER)
1199 srv.bind(srvaddr)
1201 sendaddr = (socket.TIPC_ADDR_NAME, TIPC_STYPE,
1202 TIPC_LOWER + (TIPC_UPPER - TIPC_LOWER) / 2, 0)
1203 cli.sendto(MSG, sendaddr)
1205 msg, recvaddr = srv.recvfrom(1024)
1207 self.assertEqual(cli.getsockname(), recvaddr)
1208 self.assertEqual(msg, MSG)
1211 class TIPCThreadableTest (unittest.TestCase, ThreadableTest):
1212 def __init__(self, methodName = 'runTest'):
1213 unittest.TestCase.__init__(self, methodName = methodName)
1214 ThreadableTest.__init__(self)
1216 def setUp(self):
1217 self.srv = socket.socket(socket.AF_TIPC, socket.SOCK_STREAM)
1218 self.srv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
1219 srvaddr = (socket.TIPC_ADDR_NAMESEQ, TIPC_STYPE,
1220 TIPC_LOWER, TIPC_UPPER)
1221 self.srv.bind(srvaddr)
1222 self.srv.listen(5)
1223 self.serverExplicitReady()
1224 self.conn, self.connaddr = self.srv.accept()
1226 def clientSetUp(self):
1227 # The is a hittable race between serverExplicitReady() and the
1228 # accept() call; sleep a little while to avoid it, otherwise
1229 # we could get an exception
1230 time.sleep(0.1)
1231 self.cli = socket.socket(socket.AF_TIPC, socket.SOCK_STREAM)
1232 addr = (socket.TIPC_ADDR_NAME, TIPC_STYPE,
1233 TIPC_LOWER + (TIPC_UPPER - TIPC_LOWER) / 2, 0)
1234 self.cli.connect(addr)
1235 self.cliaddr = self.cli.getsockname()
1237 def testStream(self):
1238 msg = self.conn.recv(1024)
1239 self.assertEqual(msg, MSG)
1240 self.assertEqual(self.cliaddr, self.connaddr)
1242 def _testStream(self):
1243 self.cli.send(MSG)
1244 self.cli.close()
1247 def test_main():
1248 tests = [GeneralModuleTests, BasicTCPTest, TCPCloserTest, TCPTimeoutTest,
1249 TestExceptions, BufferIOTest, BasicTCPTest2]
1250 if sys.platform != 'mac':
1251 tests.extend([ BasicUDPTest, UDPTimeoutTest ])
1253 tests.extend([
1254 NonBlockingTCPTests,
1255 FileObjectClassTestCase,
1256 UnbufferedFileObjectClassTestCase,
1257 LineBufferedFileObjectClassTestCase,
1258 SmallBufferedFileObjectClassTestCase,
1259 Urllib2FileobjectTest,
1260 NetworkConnectionNoServer,
1261 NetworkConnectionAttributesTest,
1262 NetworkConnectionBehaviourTest,
1264 if hasattr(socket, "socketpair"):
1265 tests.append(BasicSocketPairTest)
1266 if sys.platform == 'linux2':
1267 tests.append(TestLinuxAbstractNamespace)
1268 if isTipcAvailable():
1269 tests.append(TIPCTest)
1270 tests.append(TIPCThreadableTest)
1272 thread_info = test_support.threading_setup()
1273 test_support.run_unittest(*tests)
1274 test_support.threading_cleanup(*thread_info)
1276 if __name__ == "__main__":
1277 test_main()