4 from test
import test_support
15 from weakref
import proxy
25 HOST
= test_support
.HOST
26 MSG
= 'Michael Gilfix was here\n'
28 class SocketTCPTest(unittest
.TestCase
):
31 self
.serv
= socket
.socket(socket
.AF_INET
, socket
.SOCK_STREAM
)
32 self
.port
= test_support
.bind_port(self
.serv
)
39 class SocketUDPTest(unittest
.TestCase
):
42 self
.serv
= socket
.socket(socket
.AF_INET
, socket
.SOCK_DGRAM
)
43 self
.port
= test_support
.bind_port(self
.serv
)
50 """Threadable Test class
52 The ThreadableTest class makes it easy to create a threaded
53 client/server pair from an existing unit test. To create a
54 new threaded class from an existing unit test, use multiple
57 class NewClass (OldClass, ThreadableTest):
60 This class defines two new fixture functions with obvious
61 purposes for overriding:
66 Any new test functions within the class must then define
67 tests in pairs, where the test name is preceeded with a
68 '_' to indicate the client portion of the test. Ex:
76 Any exceptions raised by the clients during their tests
77 are caught and transferred to the main thread to alert
78 the testing framework.
80 Note, the server setup function cannot call any blocking
81 functions that rely on the client thread during setup,
82 unless serverExplicitReady() is called just before
83 the blocking call (such as in setting up a client/server
84 connection and performing the accept() in setUp().
88 # Swap the true setup function
89 self
.__setUp
= self
.setUp
90 self
.__tearDown
= self
.tearDown
91 self
.setUp
= self
._setUp
92 self
.tearDown
= self
._tearDown
94 def serverExplicitReady(self
):
95 """This method allows the server to explicitly indicate that
96 it wants the client thread to proceed. This is useful if the
97 server is about to execute a blocking routine that is
98 dependent upon the client thread during its setup routine."""
99 self
.server_ready
.set()
102 self
.server_ready
= threading
.Event()
103 self
.client_ready
= threading
.Event()
104 self
.done
= threading
.Event()
105 self
.queue
= Queue
.Queue(1)
107 # Do some munging to start the client test.
108 methodname
= self
.id()
109 i
= methodname
.rfind('.')
110 methodname
= methodname
[i
+1:]
111 test_method
= getattr(self
, '_' + methodname
)
112 self
.client_thread
= thread
.start_new_thread(
113 self
.clientRun
, (test_method
,))
116 if not self
.server_ready
.is_set():
117 self
.server_ready
.set()
118 self
.client_ready
.wait()
124 if not self
.queue
.empty():
125 msg
= self
.queue
.get()
128 def clientRun(self
, test_func
):
129 self
.server_ready
.wait()
130 self
.client_ready
.set()
132 with test_support
.check_py3k_warnings():
133 if not callable(test_func
):
134 raise TypeError("test_func must be a callable function.")
137 except Exception, strerror
:
138 self
.queue
.put(strerror
)
139 self
.clientTearDown()
141 def clientSetUp(self
):
142 raise NotImplementedError("clientSetUp must be implemented.")
144 def clientTearDown(self
):
148 class ThreadedTCPSocketTest(SocketTCPTest
, ThreadableTest
):
150 def __init__(self
, methodName
='runTest'):
151 SocketTCPTest
.__init
__(self
, methodName
=methodName
)
152 ThreadableTest
.__init
__(self
)
154 def clientSetUp(self
):
155 self
.cli
= socket
.socket(socket
.AF_INET
, socket
.SOCK_STREAM
)
157 def clientTearDown(self
):
160 ThreadableTest
.clientTearDown(self
)
162 class ThreadedUDPSocketTest(SocketUDPTest
, ThreadableTest
):
164 def __init__(self
, methodName
='runTest'):
165 SocketUDPTest
.__init
__(self
, methodName
=methodName
)
166 ThreadableTest
.__init
__(self
)
168 def clientSetUp(self
):
169 self
.cli
= socket
.socket(socket
.AF_INET
, socket
.SOCK_DGRAM
)
171 class SocketConnectedTest(ThreadedTCPSocketTest
):
173 def __init__(self
, methodName
='runTest'):
174 ThreadedTCPSocketTest
.__init
__(self
, methodName
=methodName
)
177 ThreadedTCPSocketTest
.setUp(self
)
178 # Indicate explicitly we're ready for the client thread to
179 # proceed and then perform the blocking call to accept
180 self
.serverExplicitReady()
181 conn
, addr
= self
.serv
.accept()
185 self
.cli_conn
.close()
187 ThreadedTCPSocketTest
.tearDown(self
)
189 def clientSetUp(self
):
190 ThreadedTCPSocketTest
.clientSetUp(self
)
191 self
.cli
.connect((HOST
, self
.port
))
192 self
.serv_conn
= self
.cli
194 def clientTearDown(self
):
195 self
.serv_conn
.close()
196 self
.serv_conn
= None
197 ThreadedTCPSocketTest
.clientTearDown(self
)
199 class SocketPairTest(unittest
.TestCase
, ThreadableTest
):
201 def __init__(self
, methodName
='runTest'):
202 unittest
.TestCase
.__init
__(self
, methodName
=methodName
)
203 ThreadableTest
.__init
__(self
)
206 self
.serv
, self
.cli
= socket
.socketpair()
212 def clientSetUp(self
):
215 def clientTearDown(self
):
218 ThreadableTest
.clientTearDown(self
)
221 #######################################################################
224 class GeneralModuleTests(unittest
.TestCase
):
226 def test_weakref(self
):
227 s
= socket
.socket(socket
.AF_INET
, socket
.SOCK_STREAM
)
229 self
.assertEqual(p
.fileno(), s
.fileno())
234 except ReferenceError:
237 self
.fail('Socket proxy still exists')
239 def testSocketError(self
):
240 # Testing socket module exceptions
241 def raise_error(*args
, **kwargs
):
243 def raise_herror(*args
, **kwargs
):
245 def raise_gaierror(*args
, **kwargs
):
246 raise socket
.gaierror
247 self
.assertRaises(socket
.error
, raise_error
,
248 "Error raising socket exception.")
249 self
.assertRaises(socket
.error
, raise_herror
,
250 "Error raising socket exception.")
251 self
.assertRaises(socket
.error
, raise_gaierror
,
252 "Error raising socket exception.")
254 def testCrucialConstants(self
):
255 # Testing for mission critical constants
261 socket
.SOCK_SEQPACKET
265 def testHostnameRes(self
):
266 # Testing hostname resolution mechanisms
267 hostname
= socket
.gethostname()
269 ip
= socket
.gethostbyname(hostname
)
271 # Probably name lookup wasn't set up right; skip this test
273 self
.assertTrue(ip
.find('.') >= 0, "Error resolving host to ip.")
275 hname
, aliases
, ipaddrs
= socket
.gethostbyaddr(ip
)
277 # Probably a similar problem as above; skip this test
279 all_host_names
= [hostname
, hname
] + aliases
280 fqhn
= socket
.getfqdn(ip
)
281 if not fqhn
in all_host_names
:
282 self
.fail("Error testing host resolution mechanisms. (fqdn: %s, all: %s)" % (fqhn
, repr(all_host_names
)))
284 def testRefCountGetNameInfo(self
):
285 # Testing reference count for getnameinfo
286 if hasattr(sys
, "getrefcount"):
288 # On some versions, this loses a reference
289 orig
= sys
.getrefcount(__name__
)
290 socket
.getnameinfo(__name__
,0)
292 self
.assertEqual(sys
.getrefcount(__name__
), orig
,
293 "socket.getnameinfo loses a reference")
295 def testInterpreterCrash(self
):
296 # Making sure getnameinfo doesn't crash the interpreter
298 # On some versions, this crashes the interpreter.
299 socket
.getnameinfo(('x', 0, 0, 0), 0)
304 # This just checks that htons etc. are their own inverse,
305 # when looking at the lower 16 or 32 bits.
306 sizes
= {socket
.htonl
: 32, socket
.ntohl
: 32,
307 socket
.htons
: 16, socket
.ntohs
: 16}
308 for func
, size
in sizes
.items():
309 mask
= (1L<<size
) - 1
310 for i
in (0, 1, 0xffff, ~
0xffff, 2, 0x01234567, 0x76543210):
311 self
.assertEqual(i
& mask
, func(func(i
&mask
)) & mask
)
314 self
.assertEqual(swapped
& mask
, mask
)
315 self
.assertRaises(OverflowError, func
, 1L<<34)
317 def testNtoHErrors(self
):
318 good_values
= [ 1, 2, 3, 1L, 2L, 3L ]
319 bad_values
= [ -1, -2, -3, -1L, -2L, -3L ]
320 for k
in good_values
:
326 self
.assertRaises(OverflowError, socket
.ntohl
, k
)
327 self
.assertRaises(OverflowError, socket
.ntohs
, k
)
328 self
.assertRaises(OverflowError, socket
.htonl
, k
)
329 self
.assertRaises(OverflowError, socket
.htons
, k
)
331 def testGetServBy(self
):
332 eq
= self
.assertEqual
333 # Find one service that exists, then check all the related interfaces.
334 # I've ordered this by protocols that have both a tcp and udp
335 # protocol, at least for modern Linuxes.
336 if sys
.platform
in ('linux2', 'freebsd4', 'freebsd5', 'freebsd6',
337 'freebsd7', 'freebsd8', 'darwin'):
338 # avoid the 'echo' service on this platform, as there is an
339 # assumption breaking non-standard port/protocol entry
340 services
= ('daytime', 'qotd', 'domain')
342 services
= ('echo', 'daytime', 'domain')
343 for service
in services
:
345 port
= socket
.getservbyname(service
, 'tcp')
351 # Try same call with optional protocol omitted
352 port2
= socket
.getservbyname(service
)
354 # Try udp, but don't barf it it doesn't exist
356 udpport
= socket
.getservbyname(service
, 'udp')
361 # Now make sure the lookup by port returns the same service name
362 eq(socket
.getservbyport(port2
), service
)
363 eq(socket
.getservbyport(port
, 'tcp'), service
)
364 if udpport
is not None:
365 eq(socket
.getservbyport(udpport
, 'udp'), service
)
366 # Make sure getservbyport does not accept out of range ports.
367 self
.assertRaises(OverflowError, socket
.getservbyport
, -1)
368 self
.assertRaises(OverflowError, socket
.getservbyport
, 65536)
370 def testDefaultTimeout(self
):
371 # Testing default timeout
372 # The default timeout should initially be None
373 self
.assertEqual(socket
.getdefaulttimeout(), None)
375 self
.assertEqual(s
.gettimeout(), None)
378 # Set the default timeout to 10, and see if it propagates
379 socket
.setdefaulttimeout(10)
380 self
.assertEqual(socket
.getdefaulttimeout(), 10)
382 self
.assertEqual(s
.gettimeout(), 10)
385 # Reset the default timeout to None, and see if it propagates
386 socket
.setdefaulttimeout(None)
387 self
.assertEqual(socket
.getdefaulttimeout(), None)
389 self
.assertEqual(s
.gettimeout(), None)
392 # Check that setting it to an invalid value raises ValueError
393 self
.assertRaises(ValueError, socket
.setdefaulttimeout
, -1)
395 # Check that setting it to an invalid type raises TypeError
396 self
.assertRaises(TypeError, socket
.setdefaulttimeout
, "spam")
398 def testIPv4_inet_aton_fourbytes(self
):
399 if not hasattr(socket
, 'inet_aton'):
400 return # No inet_aton, nothing to check
401 # Test that issue1008086 and issue767150 are fixed.
402 # It must return 4 bytes.
403 self
.assertEquals('\x00'*4, socket
.inet_aton('0.0.0.0'))
404 self
.assertEquals('\xff'*4, socket
.inet_aton('255.255.255.255'))
406 def testIPv4toString(self
):
407 if not hasattr(socket
, 'inet_pton'):
408 return # No inet_pton() on this platform
409 from socket
import inet_aton
as f
, inet_pton
, AF_INET
410 g
= lambda a
: inet_pton(AF_INET
, a
)
412 self
.assertEquals('\x00\x00\x00\x00', f('0.0.0.0'))
413 self
.assertEquals('\xff\x00\xff\x00', f('255.0.255.0'))
414 self
.assertEquals('\xaa\xaa\xaa\xaa', f('170.170.170.170'))
415 self
.assertEquals('\x01\x02\x03\x04', f('1.2.3.4'))
416 self
.assertEquals('\xff\xff\xff\xff', f('255.255.255.255'))
418 self
.assertEquals('\x00\x00\x00\x00', g('0.0.0.0'))
419 self
.assertEquals('\xff\x00\xff\x00', g('255.0.255.0'))
420 self
.assertEquals('\xaa\xaa\xaa\xaa', g('170.170.170.170'))
421 self
.assertEquals('\xff\xff\xff\xff', g('255.255.255.255'))
423 def testIPv6toString(self
):
424 if not hasattr(socket
, 'inet_pton'):
425 return # No inet_pton() on this platform
427 from socket
import inet_pton
, AF_INET6
, has_ipv6
432 f
= lambda a
: inet_pton(AF_INET6
, a
)
434 self
.assertEquals('\x00' * 16, f('::'))
435 self
.assertEquals('\x00' * 16, f('0::0'))
436 self
.assertEquals('\x00\x01' + '\x00' * 14, f('1::'))
438 '\x45\xef\x76\xcb\x00\x1a\x56\xef\xaf\xeb\x0b\xac\x19\x24\xae\xae',
439 f('45ef:76cb:1a:56ef:afeb:bac:1924:aeae')
442 def testStringToIPv4(self
):
443 if not hasattr(socket
, 'inet_ntop'):
444 return # No inet_ntop() on this platform
445 from socket
import inet_ntoa
as f
, inet_ntop
, AF_INET
446 g
= lambda a
: inet_ntop(AF_INET
, a
)
448 self
.assertEquals('1.0.1.0', f('\x01\x00\x01\x00'))
449 self
.assertEquals('170.85.170.85', f('\xaa\x55\xaa\x55'))
450 self
.assertEquals('255.255.255.255', f('\xff\xff\xff\xff'))
451 self
.assertEquals('1.2.3.4', f('\x01\x02\x03\x04'))
453 self
.assertEquals('1.0.1.0', g('\x01\x00\x01\x00'))
454 self
.assertEquals('170.85.170.85', g('\xaa\x55\xaa\x55'))
455 self
.assertEquals('255.255.255.255', g('\xff\xff\xff\xff'))
457 def testStringToIPv6(self
):
458 if not hasattr(socket
, 'inet_ntop'):
459 return # No inet_ntop() on this platform
461 from socket
import inet_ntop
, AF_INET6
, has_ipv6
466 f
= lambda a
: inet_ntop(AF_INET6
, a
)
468 self
.assertEquals('::', f('\x00' * 16))
469 self
.assertEquals('::1', f('\x00' * 15 + '\x01'))
471 'aef:b01:506:1001:ffff:9997:55:170',
472 f('\x0a\xef\x0b\x01\x05\x06\x10\x01\xff\xff\x99\x97\x00\x55\x01\x70')
475 # XXX The following don't test module-level functionality...
477 def _get_unused_port(self
, bind_address
='0.0.0.0'):
478 """Use a temporary socket to elicit an unused ephemeral port.
481 bind_address: Hostname or IP address to search for a port on.
483 Returns: A most likely to be unused port.
485 tempsock
= socket
.socket()
486 tempsock
.bind((bind_address
, 0))
487 host
, port
= tempsock
.getsockname()
491 def testSockName(self
):
492 # Testing getsockname()
493 port
= self
._get
_unused
_port
()
494 sock
= socket
.socket(socket
.AF_INET
, socket
.SOCK_STREAM
)
495 sock
.bind(("0.0.0.0", port
))
496 name
= sock
.getsockname()
497 # XXX(nnorwitz): http://tinyurl.com/os5jz seems to indicate
498 # it reasonable to get the host's addr in addition to 0.0.0.0.
499 # At least for eCos. This is required for the S/390 to pass.
500 my_ip_addr
= socket
.gethostbyname(socket
.gethostname())
501 self
.assertIn(name
[0], ("0.0.0.0", my_ip_addr
), '%s invalid' % name
[0])
502 self
.assertEqual(name
[1], port
)
504 def testGetSockOpt(self
):
505 # Testing getsockopt()
506 # We know a socket should start without reuse==0
507 sock
= socket
.socket(socket
.AF_INET
, socket
.SOCK_STREAM
)
508 reuse
= sock
.getsockopt(socket
.SOL_SOCKET
, socket
.SO_REUSEADDR
)
509 self
.assertFalse(reuse
!= 0, "initial mode is reuse")
511 def testSetSockOpt(self
):
512 # Testing setsockopt()
513 sock
= socket
.socket(socket
.AF_INET
, socket
.SOCK_STREAM
)
514 sock
.setsockopt(socket
.SOL_SOCKET
, socket
.SO_REUSEADDR
, 1)
515 reuse
= sock
.getsockopt(socket
.SOL_SOCKET
, socket
.SO_REUSEADDR
)
516 self
.assertFalse(reuse
== 0, "failed to set reuse mode")
518 def testSendAfterClose(self
):
519 # testing send() after close() with timeout
520 sock
= socket
.socket(socket
.AF_INET
, socket
.SOCK_STREAM
)
523 self
.assertRaises(socket
.error
, sock
.send
, "spam")
525 def testNewAttributes(self
):
526 # testing .family, .type and .protocol
527 sock
= socket
.socket(socket
.AF_INET
, socket
.SOCK_STREAM
)
528 self
.assertEqual(sock
.family
, socket
.AF_INET
)
529 self
.assertEqual(sock
.type, socket
.SOCK_STREAM
)
530 self
.assertEqual(sock
.proto
, 0)
533 def test_getsockaddrarg(self
):
535 port
= self
._get
_unused
_port
(bind_address
=host
)
536 big_port
= port
+ 65536
537 neg_port
= port
- 65536
538 sock
= socket
.socket()
540 self
.assertRaises(OverflowError, sock
.bind
, (host
, big_port
))
541 self
.assertRaises(OverflowError, sock
.bind
, (host
, neg_port
))
542 sock
.bind((host
, port
))
546 def test_sock_ioctl(self
):
549 self
.assertTrue(hasattr(socket
.socket
, 'ioctl'))
550 self
.assertTrue(hasattr(socket
, 'SIO_RCVALL'))
551 self
.assertTrue(hasattr(socket
, 'RCVALL_ON'))
552 self
.assertTrue(hasattr(socket
, 'RCVALL_OFF'))
553 self
.assertTrue(hasattr(socket
, 'SIO_KEEPALIVE_VALS'))
555 self
.assertRaises(ValueError, s
.ioctl
, -1, None)
556 s
.ioctl(socket
.SIO_KEEPALIVE_VALS
, (1, 100, 100))
559 @unittest.skipUnless(thread
, 'Threading required for this test.')
560 class BasicTCPTest(SocketConnectedTest
):
562 def __init__(self
, methodName
='runTest'):
563 SocketConnectedTest
.__init
__(self
, methodName
=methodName
)
566 # Testing large receive over TCP
567 msg
= self
.cli_conn
.recv(1024)
568 self
.assertEqual(msg
, MSG
)
571 self
.serv_conn
.send(MSG
)
573 def testOverFlowRecv(self
):
574 # Testing receive in chunks over TCP
575 seg1
= self
.cli_conn
.recv(len(MSG
) - 3)
576 seg2
= self
.cli_conn
.recv(1024)
578 self
.assertEqual(msg
, MSG
)
580 def _testOverFlowRecv(self
):
581 self
.serv_conn
.send(MSG
)
583 def testRecvFrom(self
):
584 # Testing large recvfrom() over TCP
585 msg
, addr
= self
.cli_conn
.recvfrom(1024)
586 self
.assertEqual(msg
, MSG
)
588 def _testRecvFrom(self
):
589 self
.serv_conn
.send(MSG
)
591 def testOverFlowRecvFrom(self
):
592 # Testing recvfrom() in chunks over TCP
593 seg1
, addr
= self
.cli_conn
.recvfrom(len(MSG
)-3)
594 seg2
, addr
= self
.cli_conn
.recvfrom(1024)
596 self
.assertEqual(msg
, MSG
)
598 def _testOverFlowRecvFrom(self
):
599 self
.serv_conn
.send(MSG
)
601 def testSendAll(self
):
602 # Testing sendall() with a 2048 byte string over TCP
605 read
= self
.cli_conn
.recv(1024)
609 self
.assertEqual(msg
, 'f' * 2048)
611 def _testSendAll(self
):
612 big_chunk
= 'f' * 2048
613 self
.serv_conn
.sendall(big_chunk
)
615 def testFromFd(self
):
617 if not hasattr(socket
, "fromfd"):
618 return # On Windows, this doesn't exist
619 fd
= self
.cli_conn
.fileno()
620 sock
= socket
.fromfd(fd
, socket
.AF_INET
, socket
.SOCK_STREAM
)
621 msg
= sock
.recv(1024)
622 self
.assertEqual(msg
, MSG
)
624 def _testFromFd(self
):
625 self
.serv_conn
.send(MSG
)
627 def testShutdown(self
):
629 msg
= self
.cli_conn
.recv(1024)
630 self
.assertEqual(msg
, MSG
)
631 # wait for _testShutdown to finish: on OS X, when the server
632 # closes the connection the client also becomes disconnected,
633 # and the client's shutdown call will fail. (Issue #4397.)
636 def _testShutdown(self
):
637 self
.serv_conn
.send(MSG
)
638 self
.serv_conn
.shutdown(2)
640 @unittest.skipUnless(thread
, 'Threading required for this test.')
641 class BasicUDPTest(ThreadedUDPSocketTest
):
643 def __init__(self
, methodName
='runTest'):
644 ThreadedUDPSocketTest
.__init
__(self
, methodName
=methodName
)
646 def testSendtoAndRecv(self
):
647 # Testing sendto() and Recv() over UDP
648 msg
= self
.serv
.recv(len(MSG
))
649 self
.assertEqual(msg
, MSG
)
651 def _testSendtoAndRecv(self
):
652 self
.cli
.sendto(MSG
, 0, (HOST
, self
.port
))
654 def testRecvFrom(self
):
655 # Testing recvfrom() over UDP
656 msg
, addr
= self
.serv
.recvfrom(len(MSG
))
657 self
.assertEqual(msg
, MSG
)
659 def _testRecvFrom(self
):
660 self
.cli
.sendto(MSG
, 0, (HOST
, self
.port
))
662 def testRecvFromNegative(self
):
663 # Negative lengths passed to recvfrom should give ValueError.
664 self
.assertRaises(ValueError, self
.serv
.recvfrom
, -1)
666 def _testRecvFromNegative(self
):
667 self
.cli
.sendto(MSG
, 0, (HOST
, self
.port
))
669 @unittest.skipUnless(thread
, 'Threading required for this test.')
670 class TCPCloserTest(ThreadedTCPSocketTest
):
673 conn
, addr
= self
.serv
.accept()
677 read
, write
, err
= select
.select([sd
], [], [], 1.0)
678 self
.assertEqual(read
, [sd
])
679 self
.assertEqual(sd
.recv(1), '')
681 def _testClose(self
):
682 self
.cli
.connect((HOST
, self
.port
))
685 @unittest.skipUnless(thread
, 'Threading required for this test.')
686 class BasicSocketPairTest(SocketPairTest
):
688 def __init__(self
, methodName
='runTest'):
689 SocketPairTest
.__init
__(self
, methodName
=methodName
)
692 msg
= self
.serv
.recv(1024)
693 self
.assertEqual(msg
, MSG
)
702 msg
= self
.cli
.recv(1024)
703 self
.assertEqual(msg
, MSG
)
705 @unittest.skipUnless(thread
, 'Threading required for this test.')
706 class NonBlockingTCPTests(ThreadedTCPSocketTest
):
708 def __init__(self
, methodName
='runTest'):
709 ThreadedTCPSocketTest
.__init
__(self
, methodName
=methodName
)
711 def testSetBlocking(self
):
712 # Testing whether set blocking works
713 self
.serv
.setblocking(0)
720 self
.assertTrue((end
- start
) < 1.0, "Error setting non-blocking mode.")
722 def _testSetBlocking(self
):
725 def testAccept(self
):
726 # Testing non-blocking accept
727 self
.serv
.setblocking(0)
729 conn
, addr
= self
.serv
.accept()
733 self
.fail("Error trying to do non-blocking accept.")
734 read
, write
, err
= select
.select([self
.serv
], [], [])
735 if self
.serv
in read
:
736 conn
, addr
= self
.serv
.accept()
738 self
.fail("Error trying to do accept after select.")
740 def _testAccept(self
):
742 self
.cli
.connect((HOST
, self
.port
))
744 def testConnect(self
):
745 # Testing non-blocking connect
746 conn
, addr
= self
.serv
.accept()
748 def _testConnect(self
):
749 self
.cli
.settimeout(10)
750 self
.cli
.connect((HOST
, self
.port
))
753 # Testing non-blocking recv
754 conn
, addr
= self
.serv
.accept()
757 msg
= conn
.recv(len(MSG
))
761 self
.fail("Error trying to do non-blocking recv.")
762 read
, write
, err
= select
.select([conn
], [], [])
764 msg
= conn
.recv(len(MSG
))
765 self
.assertEqual(msg
, MSG
)
767 self
.fail("Error during select call to non-blocking socket.")
770 self
.cli
.connect((HOST
, self
.port
))
774 @unittest.skipUnless(thread
, 'Threading required for this test.')
775 class FileObjectClassTestCase(SocketConnectedTest
):
777 bufsize
= -1 # Use default buffer size
779 def __init__(self
, methodName
='runTest'):
780 SocketConnectedTest
.__init
__(self
, methodName
=methodName
)
783 SocketConnectedTest
.setUp(self
)
784 self
.serv_file
= self
.cli_conn
.makefile('rb', self
.bufsize
)
787 self
.serv_file
.close()
788 self
.assertTrue(self
.serv_file
.closed
)
789 self
.serv_file
= None
790 SocketConnectedTest
.tearDown(self
)
792 def clientSetUp(self
):
793 SocketConnectedTest
.clientSetUp(self
)
794 self
.cli_file
= self
.serv_conn
.makefile('wb')
796 def clientTearDown(self
):
797 self
.cli_file
.close()
798 self
.assertTrue(self
.cli_file
.closed
)
800 SocketConnectedTest
.clientTearDown(self
)
802 def testSmallRead(self
):
803 # Performing small file read test
804 first_seg
= self
.serv_file
.read(len(MSG
)-3)
805 second_seg
= self
.serv_file
.read(3)
806 msg
= first_seg
+ second_seg
807 self
.assertEqual(msg
, MSG
)
809 def _testSmallRead(self
):
810 self
.cli_file
.write(MSG
)
811 self
.cli_file
.flush()
813 def testFullRead(self
):
815 msg
= self
.serv_file
.read()
816 self
.assertEqual(msg
, MSG
)
818 def _testFullRead(self
):
819 self
.cli_file
.write(MSG
)
820 self
.cli_file
.close()
822 def testUnbufferedRead(self
):
823 # Performing unbuffered file read test
826 char
= self
.serv_file
.read(1)
830 self
.assertEqual(buf
, MSG
)
832 def _testUnbufferedRead(self
):
833 self
.cli_file
.write(MSG
)
834 self
.cli_file
.flush()
836 def testReadline(self
):
837 # Performing file readline test
838 line
= self
.serv_file
.readline()
839 self
.assertEqual(line
, MSG
)
841 def _testReadline(self
):
842 self
.cli_file
.write(MSG
)
843 self
.cli_file
.flush()
845 def testReadlineAfterRead(self
):
846 a_baloo_is
= self
.serv_file
.read(len("A baloo is"))
847 self
.assertEqual("A baloo is", a_baloo_is
)
848 _a_bear
= self
.serv_file
.read(len(" a bear"))
849 self
.assertEqual(" a bear", _a_bear
)
850 line
= self
.serv_file
.readline()
851 self
.assertEqual("\n", line
)
852 line
= self
.serv_file
.readline()
853 self
.assertEqual("A BALOO IS A BEAR.\n", line
)
854 line
= self
.serv_file
.readline()
855 self
.assertEqual(MSG
, line
)
857 def _testReadlineAfterRead(self
):
858 self
.cli_file
.write("A baloo is a bear\n")
859 self
.cli_file
.write("A BALOO IS A BEAR.\n")
860 self
.cli_file
.write(MSG
)
861 self
.cli_file
.flush()
863 def testReadlineAfterReadNoNewline(self
):
864 end_of_
= self
.serv_file
.read(len("End Of "))
865 self
.assertEqual("End Of ", end_of_
)
866 line
= self
.serv_file
.readline()
867 self
.assertEqual("Line", line
)
869 def _testReadlineAfterReadNoNewline(self
):
870 self
.cli_file
.write("End Of Line")
872 def testClosedAttr(self
):
873 self
.assertTrue(not self
.serv_file
.closed
)
875 def _testClosedAttr(self
):
876 self
.assertTrue(not self
.cli_file
.closed
)
879 class FileObjectInterruptedTestCase(unittest
.TestCase
):
880 """Test that the file object correctly handles EINTR internally."""
882 class MockSocket(object):
883 def __init__(self
, recv_funcs
=()):
884 # A generator that returns callables that we'll call for each
886 self
._recv
_step
= iter(recv_funcs
)
888 def recv(self
, size
):
889 return self
._recv
_step
.next()()
893 raise socket
.error(errno
.EINTR
)
895 def _test_readline(self
, size
=-1, **kwargs
):
896 mock_sock
= self
.MockSocket(recv_funcs
=[
897 lambda : "This is the first line\nAnd the sec",
899 lambda : "ond line is here\n",
902 fo
= socket
._fileobject
(mock_sock
, **kwargs
)
903 self
.assertEquals(fo
.readline(size
), "This is the first line\n")
904 self
.assertEquals(fo
.readline(size
), "And the second line is here\n")
906 def _test_read(self
, size
=-1, **kwargs
):
907 mock_sock
= self
.MockSocket(recv_funcs
=[
908 lambda : "This is the first line\nAnd the sec",
910 lambda : "ond line is here\n",
913 fo
= socket
._fileobject
(mock_sock
, **kwargs
)
914 self
.assertEquals(fo
.read(size
), "This is the first line\n"
915 "And the second line is here\n")
917 def test_default(self
):
918 self
._test
_readline
()
919 self
._test
_readline
(size
=100)
921 self
._test
_read
(size
=100)
923 def test_with_1k_buffer(self
):
924 self
._test
_readline
(bufsize
=1024)
925 self
._test
_readline
(size
=100, bufsize
=1024)
926 self
._test
_read
(bufsize
=1024)
927 self
._test
_read
(size
=100, bufsize
=1024)
929 def _test_readline_no_buffer(self
, size
=-1):
930 mock_sock
= self
.MockSocket(recv_funcs
=[
938 fo
= socket
._fileobject
(mock_sock
, bufsize
=0)
939 self
.assertEquals(fo
.readline(size
), "aa\n")
940 self
.assertEquals(fo
.readline(size
), "BBbb")
942 def test_no_buffer(self
):
943 self
._test
_readline
_no
_buffer
()
944 self
._test
_readline
_no
_buffer
(size
=4)
945 self
._test
_read
(bufsize
=0)
946 self
._test
_read
(size
=100, bufsize
=0)
949 class UnbufferedFileObjectClassTestCase(FileObjectClassTestCase
):
951 """Repeat the tests from FileObjectClassTestCase with bufsize==0.
953 In this case (and in this case only), it should be possible to
954 create a file object, read a line from it, create another file
955 object, read another line from it, without loss of data in the
956 first file object's buffer. Note that httplib relies on this
957 when reading multiple requests from the same socket."""
959 bufsize
= 0 # Use unbuffered mode
961 def testUnbufferedReadline(self
):
962 # Read a line, create a new file object, read another line with it
963 line
= self
.serv_file
.readline() # first line
964 self
.assertEqual(line
, "A. " + MSG
) # first line
965 self
.serv_file
= self
.cli_conn
.makefile('rb', 0)
966 line
= self
.serv_file
.readline() # second line
967 self
.assertEqual(line
, "B. " + MSG
) # second line
969 def _testUnbufferedReadline(self
):
970 self
.cli_file
.write("A. " + MSG
)
971 self
.cli_file
.write("B. " + MSG
)
972 self
.cli_file
.flush()
974 class LineBufferedFileObjectClassTestCase(FileObjectClassTestCase
):
976 bufsize
= 1 # Default-buffered for reading; line-buffered for writing
979 class SmallBufferedFileObjectClassTestCase(FileObjectClassTestCase
):
981 bufsize
= 2 # Exercise the buffering code
984 class NetworkConnectionTest(object):
985 """Prove network connection."""
986 def clientSetUp(self
):
987 # We're inherited below by BasicTCPTest2, which also inherits
988 # BasicTCPTest, which defines self.port referenced below.
989 self
.cli
= socket
.create_connection((HOST
, self
.port
))
990 self
.serv_conn
= self
.cli
992 class BasicTCPTest2(NetworkConnectionTest
, BasicTCPTest
):
993 """Tests that NetworkConnection does not break existing TCP functionality.
996 class NetworkConnectionNoServer(unittest
.TestCase
):
997 def testWithoutServer(self
):
998 port
= test_support
.find_unused_port()
1001 lambda: socket
.create_connection((HOST
, port
))
1004 @unittest.skipUnless(thread
, 'Threading required for this test.')
1005 class NetworkConnectionAttributesTest(SocketTCPTest
, ThreadableTest
):
1007 def __init__(self
, methodName
='runTest'):
1008 SocketTCPTest
.__init
__(self
, methodName
=methodName
)
1009 ThreadableTest
.__init
__(self
)
1011 def clientSetUp(self
):
1012 self
.source_port
= test_support
.find_unused_port()
1014 def clientTearDown(self
):
1017 ThreadableTest
.clientTearDown(self
)
1019 def _justAccept(self
):
1020 conn
, addr
= self
.serv
.accept()
1022 testFamily
= _justAccept
1023 def _testFamily(self
):
1024 self
.cli
= socket
.create_connection((HOST
, self
.port
), timeout
=30)
1025 self
.assertEqual(self
.cli
.family
, 2)
1027 testSourceAddress
= _justAccept
1028 def _testSourceAddress(self
):
1029 self
.cli
= socket
.create_connection((HOST
, self
.port
), timeout
=30,
1030 source_address
=('', self
.source_port
))
1031 self
.assertEqual(self
.cli
.getsockname()[1], self
.source_port
)
1032 # The port number being used is sufficient to show that the bind()
1035 testTimeoutDefault
= _justAccept
1036 def _testTimeoutDefault(self
):
1037 # passing no explicit timeout uses socket's global default
1038 self
.assertTrue(socket
.getdefaulttimeout() is None)
1039 socket
.setdefaulttimeout(42)
1041 self
.cli
= socket
.create_connection((HOST
, self
.port
))
1043 socket
.setdefaulttimeout(None)
1044 self
.assertEquals(self
.cli
.gettimeout(), 42)
1046 testTimeoutNone
= _justAccept
1047 def _testTimeoutNone(self
):
1048 # None timeout means the same as sock.settimeout(None)
1049 self
.assertTrue(socket
.getdefaulttimeout() is None)
1050 socket
.setdefaulttimeout(30)
1052 self
.cli
= socket
.create_connection((HOST
, self
.port
), timeout
=None)
1054 socket
.setdefaulttimeout(None)
1055 self
.assertEqual(self
.cli
.gettimeout(), None)
1057 testTimeoutValueNamed
= _justAccept
1058 def _testTimeoutValueNamed(self
):
1059 self
.cli
= socket
.create_connection((HOST
, self
.port
), timeout
=30)
1060 self
.assertEqual(self
.cli
.gettimeout(), 30)
1062 testTimeoutValueNonamed
= _justAccept
1063 def _testTimeoutValueNonamed(self
):
1064 self
.cli
= socket
.create_connection((HOST
, self
.port
), 30)
1065 self
.assertEqual(self
.cli
.gettimeout(), 30)
1067 @unittest.skipUnless(thread
, 'Threading required for this test.')
1068 class NetworkConnectionBehaviourTest(SocketTCPTest
, ThreadableTest
):
1070 def __init__(self
, methodName
='runTest'):
1071 SocketTCPTest
.__init
__(self
, methodName
=methodName
)
1072 ThreadableTest
.__init
__(self
)
1074 def clientSetUp(self
):
1077 def clientTearDown(self
):
1080 ThreadableTest
.clientTearDown(self
)
1082 def testInsideTimeout(self
):
1083 conn
, addr
= self
.serv
.accept()
1086 testOutsideTimeout
= testInsideTimeout
1088 def _testInsideTimeout(self
):
1089 self
.cli
= sock
= socket
.create_connection((HOST
, self
.port
))
1091 self
.assertEqual(data
, "done!")
1093 def _testOutsideTimeout(self
):
1094 self
.cli
= sock
= socket
.create_connection((HOST
, self
.port
), timeout
=1)
1095 self
.assertRaises(socket
.timeout
, lambda: sock
.recv(5))
1098 class Urllib2FileobjectTest(unittest
.TestCase
):
1100 # urllib2.HTTPHandler has "borrowed" socket._fileobject, and requires that
1101 # it close the socket if the close c'tor argument is true
1103 def testClose(self
):
1106 def flush(self
): pass
1107 def close(self
): self
.closed
= True
1109 # must not close unless we request it: the original use of _fileobject
1110 # by module socket requires that the underlying socket not be closed until
1111 # the _socketobject that created the _fileobject is closed
1113 f
= socket
._fileobject
(s
)
1115 self
.assertTrue(not s
.closed
)
1118 f
= socket
._fileobject
(s
, close
=True)
1120 self
.assertTrue(s
.closed
)
1122 class TCPTimeoutTest(SocketTCPTest
):
1124 def testTCPTimeout(self
):
1125 def raise_timeout(*args
, **kwargs
):
1126 self
.serv
.settimeout(1.0)
1128 self
.assertRaises(socket
.timeout
, raise_timeout
,
1129 "Error generating a timeout exception (TCP)")
1131 def testTimeoutZero(self
):
1134 self
.serv
.settimeout(0.0)
1135 foo
= self
.serv
.accept()
1136 except socket
.timeout
:
1137 self
.fail("caught timeout instead of error (TCP)")
1138 except socket
.error
:
1141 self
.fail("caught unexpected exception (TCP)")
1143 self
.fail("accept() returned success when we did not expect it")
1145 def testInterruptedTimeout(self
):
1146 # XXX I don't know how to do this test on MSWindows or any other
1147 # plaform that doesn't support signal.alarm() or os.kill(), though
1148 # the bug should have existed on all platforms.
1149 if not hasattr(signal
, "alarm"):
1150 return # can only test on *nix
1151 self
.serv
.settimeout(5.0) # must be longer than alarm
1152 class Alarm(Exception):
1154 def alarm_handler(signal
, frame
):
1156 old_alarm
= signal
.signal(signal
.SIGALRM
, alarm_handler
)
1158 signal
.alarm(2) # POSIX allows alarm to be up to 1 second early
1160 foo
= self
.serv
.accept()
1161 except socket
.timeout
:
1162 self
.fail("caught timeout instead of Alarm")
1166 self
.fail("caught other exception instead of Alarm:"
1168 (sys
.exc_info()[:2] + (traceback
.format_exc(),)))
1170 self
.fail("nothing caught")
1172 signal
.alarm(0) # shut off alarm
1174 self
.fail("got Alarm in wrong place")
1176 # no alarm can be pending. Safe to restore old handler.
1177 signal
.signal(signal
.SIGALRM
, old_alarm
)
1179 class UDPTimeoutTest(SocketTCPTest
):
1181 def testUDPTimeout(self
):
1182 def raise_timeout(*args
, **kwargs
):
1183 self
.serv
.settimeout(1.0)
1184 self
.serv
.recv(1024)
1185 self
.assertRaises(socket
.timeout
, raise_timeout
,
1186 "Error generating a timeout exception (UDP)")
1188 def testTimeoutZero(self
):
1191 self
.serv
.settimeout(0.0)
1192 foo
= self
.serv
.recv(1024)
1193 except socket
.timeout
:
1194 self
.fail("caught timeout instead of error (UDP)")
1195 except socket
.error
:
1198 self
.fail("caught unexpected exception (UDP)")
1200 self
.fail("recv() returned success when we did not expect it")
1202 class TestExceptions(unittest
.TestCase
):
1204 def testExceptionTree(self
):
1205 self
.assertTrue(issubclass(socket
.error
, Exception))
1206 self
.assertTrue(issubclass(socket
.herror
, socket
.error
))
1207 self
.assertTrue(issubclass(socket
.gaierror
, socket
.error
))
1208 self
.assertTrue(issubclass(socket
.timeout
, socket
.error
))
1210 class TestLinuxAbstractNamespace(unittest
.TestCase
):
1214 def testLinuxAbstractNamespace(self
):
1215 address
= "\x00python-test-hello\x00\xff"
1216 s1
= socket
.socket(socket
.AF_UNIX
, socket
.SOCK_STREAM
)
1219 s2
= socket
.socket(socket
.AF_UNIX
, socket
.SOCK_STREAM
)
1220 s2
.connect(s1
.getsockname())
1222 self
.assertEqual(s1
.getsockname(), address
)
1223 self
.assertEqual(s2
.getpeername(), address
)
1225 def testMaxName(self
):
1226 address
= "\x00" + "h" * (self
.UNIX_PATH_MAX
- 1)
1227 s
= socket
.socket(socket
.AF_UNIX
, socket
.SOCK_STREAM
)
1229 self
.assertEqual(s
.getsockname(), address
)
1231 def testNameOverflow(self
):
1232 address
= "\x00" + "h" * self
.UNIX_PATH_MAX
1233 s
= socket
.socket(socket
.AF_UNIX
, socket
.SOCK_STREAM
)
1234 self
.assertRaises(socket
.error
, s
.bind
, address
)
1237 @unittest.skipUnless(thread
, 'Threading required for this test.')
1238 class BufferIOTest(SocketConnectedTest
):
1240 Test the buffer versions of socket.recv() and socket.send().
1242 def __init__(self
, methodName
='runTest'):
1243 SocketConnectedTest
.__init
__(self
, methodName
=methodName
)
1245 def testRecvIntoArray(self
):
1246 buf
= array
.array('c', ' '*1024)
1247 nbytes
= self
.cli_conn
.recv_into(buf
)
1248 self
.assertEqual(nbytes
, len(MSG
))
1249 msg
= buf
.tostring()[:len(MSG
)]
1250 self
.assertEqual(msg
, MSG
)
1252 def _testRecvIntoArray(self
):
1253 with test_support
.check_py3k_warnings():
1255 self
.serv_conn
.send(buf
)
1257 def testRecvIntoBytearray(self
):
1258 buf
= bytearray(1024)
1259 nbytes
= self
.cli_conn
.recv_into(buf
)
1260 self
.assertEqual(nbytes
, len(MSG
))
1261 msg
= buf
[:len(MSG
)]
1262 self
.assertEqual(msg
, MSG
)
1264 _testRecvIntoBytearray
= _testRecvIntoArray
1266 def testRecvIntoMemoryview(self
):
1267 buf
= bytearray(1024)
1268 nbytes
= self
.cli_conn
.recv_into(memoryview(buf
))
1269 self
.assertEqual(nbytes
, len(MSG
))
1270 msg
= buf
[:len(MSG
)]
1271 self
.assertEqual(msg
, MSG
)
1273 _testRecvIntoMemoryview
= _testRecvIntoArray
1275 def testRecvFromIntoArray(self
):
1276 buf
= array
.array('c', ' '*1024)
1277 nbytes
, addr
= self
.cli_conn
.recvfrom_into(buf
)
1278 self
.assertEqual(nbytes
, len(MSG
))
1279 msg
= buf
.tostring()[:len(MSG
)]
1280 self
.assertEqual(msg
, MSG
)
1282 def _testRecvFromIntoArray(self
):
1283 with test_support
.check_py3k_warnings():
1285 self
.serv_conn
.send(buf
)
1287 def testRecvFromIntoBytearray(self
):
1288 buf
= bytearray(1024)
1289 nbytes
, addr
= self
.cli_conn
.recvfrom_into(buf
)
1290 self
.assertEqual(nbytes
, len(MSG
))
1291 msg
= buf
[:len(MSG
)]
1292 self
.assertEqual(msg
, MSG
)
1294 _testRecvFromIntoBytearray
= _testRecvFromIntoArray
1296 def testRecvFromIntoMemoryview(self
):
1297 buf
= bytearray(1024)
1298 nbytes
, addr
= self
.cli_conn
.recvfrom_into(memoryview(buf
))
1299 self
.assertEqual(nbytes
, len(MSG
))
1300 msg
= buf
[:len(MSG
)]
1301 self
.assertEqual(msg
, MSG
)
1303 _testRecvFromIntoMemoryview
= _testRecvFromIntoArray
1310 def isTipcAvailable():
1311 """Check if the TIPC module is loaded
1313 The TIPC module is not loaded automatically on Ubuntu and probably
1314 other Linux distros.
1316 if not hasattr(socket
, "AF_TIPC"):
1318 if not os
.path
.isfile("/proc/modules"):
1320 with
open("/proc/modules") as f
:
1322 if line
.startswith("tipc "):
1324 if test_support
.verbose
:
1325 print "TIPC module is not loaded, please 'sudo modprobe tipc'"
1328 class TIPCTest (unittest
.TestCase
):
1330 srv
= socket
.socket(socket
.AF_TIPC
, socket
.SOCK_RDM
)
1331 cli
= socket
.socket(socket
.AF_TIPC
, socket
.SOCK_RDM
)
1333 srv
.setsockopt(socket
.SOL_SOCKET
, socket
.SO_REUSEADDR
, 1)
1334 srvaddr
= (socket
.TIPC_ADDR_NAMESEQ
, TIPC_STYPE
,
1335 TIPC_LOWER
, TIPC_UPPER
)
1338 sendaddr
= (socket
.TIPC_ADDR_NAME
, TIPC_STYPE
,
1339 TIPC_LOWER
+ (TIPC_UPPER
- TIPC_LOWER
) / 2, 0)
1340 cli
.sendto(MSG
, sendaddr
)
1342 msg
, recvaddr
= srv
.recvfrom(1024)
1344 self
.assertEqual(cli
.getsockname(), recvaddr
)
1345 self
.assertEqual(msg
, MSG
)
1348 class TIPCThreadableTest (unittest
.TestCase
, ThreadableTest
):
1349 def __init__(self
, methodName
= 'runTest'):
1350 unittest
.TestCase
.__init
__(self
, methodName
= methodName
)
1351 ThreadableTest
.__init
__(self
)
1354 self
.srv
= socket
.socket(socket
.AF_TIPC
, socket
.SOCK_STREAM
)
1355 self
.srv
.setsockopt(socket
.SOL_SOCKET
, socket
.SO_REUSEADDR
, 1)
1356 srvaddr
= (socket
.TIPC_ADDR_NAMESEQ
, TIPC_STYPE
,
1357 TIPC_LOWER
, TIPC_UPPER
)
1358 self
.srv
.bind(srvaddr
)
1360 self
.serverExplicitReady()
1361 self
.conn
, self
.connaddr
= self
.srv
.accept()
1363 def clientSetUp(self
):
1364 # The is a hittable race between serverExplicitReady() and the
1365 # accept() call; sleep a little while to avoid it, otherwise
1366 # we could get an exception
1368 self
.cli
= socket
.socket(socket
.AF_TIPC
, socket
.SOCK_STREAM
)
1369 addr
= (socket
.TIPC_ADDR_NAME
, TIPC_STYPE
,
1370 TIPC_LOWER
+ (TIPC_UPPER
- TIPC_LOWER
) / 2, 0)
1371 self
.cli
.connect(addr
)
1372 self
.cliaddr
= self
.cli
.getsockname()
1374 def testStream(self
):
1375 msg
= self
.conn
.recv(1024)
1376 self
.assertEqual(msg
, MSG
)
1377 self
.assertEqual(self
.cliaddr
, self
.connaddr
)
1379 def _testStream(self
):
1385 tests
= [GeneralModuleTests
, BasicTCPTest
, TCPCloserTest
, TCPTimeoutTest
,
1386 TestExceptions
, BufferIOTest
, BasicTCPTest2
, BasicUDPTest
,
1390 NonBlockingTCPTests
,
1391 FileObjectClassTestCase
,
1392 FileObjectInterruptedTestCase
,
1393 UnbufferedFileObjectClassTestCase
,
1394 LineBufferedFileObjectClassTestCase
,
1395 SmallBufferedFileObjectClassTestCase
,
1396 Urllib2FileobjectTest
,
1397 NetworkConnectionNoServer
,
1398 NetworkConnectionAttributesTest
,
1399 NetworkConnectionBehaviourTest
,
1401 if hasattr(socket
, "socketpair"):
1402 tests
.append(BasicSocketPairTest
)
1403 if sys
.platform
== 'linux2':
1404 tests
.append(TestLinuxAbstractNamespace
)
1405 if isTipcAvailable():
1406 tests
.append(TIPCTest
)
1407 tests
.append(TIPCThreadableTest
)
1409 thread_info
= test_support
.threading_setup()
1410 test_support
.run_unittest(*tests
)
1411 test_support
.threading_cleanup(*thread_info
)
1413 if __name__
== "__main__":