4 from test
import test_support
9 import thread
, threading
16 from weakref
import proxy
19 HOST
= test_support
.HOST
20 MSG
= 'Michael Gilfix was here\n'
22 class SocketTCPTest(unittest
.TestCase
):
25 self
.serv
= socket
.socket(socket
.AF_INET
, socket
.SOCK_STREAM
)
26 self
.port
= test_support
.bind_port(self
.serv
)
33 class SocketUDPTest(unittest
.TestCase
):
36 self
.serv
= socket
.socket(socket
.AF_INET
, socket
.SOCK_DGRAM
)
37 self
.port
= test_support
.bind_port(self
.serv
)
44 """Threadable Test class
46 The ThreadableTest class makes it easy to create a threaded
47 client/server pair from an existing unit test. To create a
48 new threaded class from an existing unit test, use multiple
51 class NewClass (OldClass, ThreadableTest):
54 This class defines two new fixture functions with obvious
55 purposes for overriding:
60 Any new test functions within the class must then define
61 tests in pairs, where the test name is preceeded with a
62 '_' to indicate the client portion of the test. Ex:
70 Any exceptions raised by the clients during their tests
71 are caught and transferred to the main thread to alert
72 the testing framework.
74 Note, the server setup function cannot call any blocking
75 functions that rely on the client thread during setup,
76 unless serverExplicitReady() is called just before
77 the blocking call (such as in setting up a client/server
78 connection and performing the accept() in setUp().
82 # Swap the true setup function
83 self
.__setUp
= self
.setUp
84 self
.__tearDown
= self
.tearDown
85 self
.setUp
= self
._setUp
86 self
.tearDown
= self
._tearDown
88 def serverExplicitReady(self
):
89 """This method allows the server to explicitly indicate that
90 it wants the client thread to proceed. This is useful if the
91 server is about to execute a blocking routine that is
92 dependent upon the client thread during its setup routine."""
93 self
.server_ready
.set()
96 self
.server_ready
= threading
.Event()
97 self
.client_ready
= threading
.Event()
98 self
.done
= threading
.Event()
99 self
.queue
= Queue
.Queue(1)
101 # Do some munging to start the client test.
102 methodname
= self
.id()
103 i
= methodname
.rfind('.')
104 methodname
= methodname
[i
+1:]
105 test_method
= getattr(self
, '_' + methodname
)
106 self
.client_thread
= thread
.start_new_thread(
107 self
.clientRun
, (test_method
,))
110 if not self
.server_ready
.is_set():
111 self
.server_ready
.set()
112 self
.client_ready
.wait()
118 if not self
.queue
.empty():
119 msg
= self
.queue
.get()
122 def clientRun(self
, test_func
):
123 self
.server_ready
.wait()
124 self
.client_ready
.set()
126 if not callable(test_func
):
127 raise TypeError, "test_func must be a callable function"
130 except Exception, strerror
:
131 self
.queue
.put(strerror
)
132 self
.clientTearDown()
134 def clientSetUp(self
):
135 raise NotImplementedError, "clientSetUp must be implemented."
137 def clientTearDown(self
):
141 class ThreadedTCPSocketTest(SocketTCPTest
, ThreadableTest
):
143 def __init__(self
, methodName
='runTest'):
144 SocketTCPTest
.__init
__(self
, methodName
=methodName
)
145 ThreadableTest
.__init
__(self
)
147 def clientSetUp(self
):
148 self
.cli
= socket
.socket(socket
.AF_INET
, socket
.SOCK_STREAM
)
150 def clientTearDown(self
):
153 ThreadableTest
.clientTearDown(self
)
155 class ThreadedUDPSocketTest(SocketUDPTest
, ThreadableTest
):
157 def __init__(self
, methodName
='runTest'):
158 SocketUDPTest
.__init
__(self
, methodName
=methodName
)
159 ThreadableTest
.__init
__(self
)
161 def clientSetUp(self
):
162 self
.cli
= socket
.socket(socket
.AF_INET
, socket
.SOCK_DGRAM
)
164 class SocketConnectedTest(ThreadedTCPSocketTest
):
166 def __init__(self
, methodName
='runTest'):
167 ThreadedTCPSocketTest
.__init
__(self
, methodName
=methodName
)
170 ThreadedTCPSocketTest
.setUp(self
)
171 # Indicate explicitly we're ready for the client thread to
172 # proceed and then perform the blocking call to accept
173 self
.serverExplicitReady()
174 conn
, addr
= self
.serv
.accept()
178 self
.cli_conn
.close()
180 ThreadedTCPSocketTest
.tearDown(self
)
182 def clientSetUp(self
):
183 ThreadedTCPSocketTest
.clientSetUp(self
)
184 self
.cli
.connect((HOST
, self
.port
))
185 self
.serv_conn
= self
.cli
187 def clientTearDown(self
):
188 self
.serv_conn
.close()
189 self
.serv_conn
= None
190 ThreadedTCPSocketTest
.clientTearDown(self
)
192 class SocketPairTest(unittest
.TestCase
, ThreadableTest
):
194 def __init__(self
, methodName
='runTest'):
195 unittest
.TestCase
.__init
__(self
, methodName
=methodName
)
196 ThreadableTest
.__init
__(self
)
199 self
.serv
, self
.cli
= socket
.socketpair()
205 def clientSetUp(self
):
208 def clientTearDown(self
):
211 ThreadableTest
.clientTearDown(self
)
214 #######################################################################
217 class GeneralModuleTests(unittest
.TestCase
):
219 def test_weakref(self
):
220 s
= socket
.socket(socket
.AF_INET
, socket
.SOCK_STREAM
)
222 self
.assertEqual(p
.fileno(), s
.fileno())
227 except ReferenceError:
230 self
.fail('Socket proxy still exists')
232 def testSocketError(self
):
233 # Testing socket module exceptions
234 def raise_error(*args
, **kwargs
):
236 def raise_herror(*args
, **kwargs
):
238 def raise_gaierror(*args
, **kwargs
):
239 raise socket
.gaierror
240 self
.assertRaises(socket
.error
, raise_error
,
241 "Error raising socket exception.")
242 self
.assertRaises(socket
.error
, raise_herror
,
243 "Error raising socket exception.")
244 self
.assertRaises(socket
.error
, raise_gaierror
,
245 "Error raising socket exception.")
247 def testCrucialConstants(self
):
248 # Testing for mission critical constants
254 socket
.SOCK_SEQPACKET
258 def testHostnameRes(self
):
259 # Testing hostname resolution mechanisms
260 hostname
= socket
.gethostname()
262 ip
= socket
.gethostbyname(hostname
)
264 # Probably name lookup wasn't set up right; skip this test
266 self
.assertTrue(ip
.find('.') >= 0, "Error resolving host to ip.")
268 hname
, aliases
, ipaddrs
= socket
.gethostbyaddr(ip
)
270 # Probably a similar problem as above; skip this test
272 all_host_names
= [hostname
, hname
] + aliases
273 fqhn
= socket
.getfqdn(ip
)
274 if not fqhn
in all_host_names
:
275 self
.fail("Error testing host resolution mechanisms. (fqdn: %s, all: %s)" % (fqhn
, repr(all_host_names
)))
277 def testRefCountGetNameInfo(self
):
278 # Testing reference count for getnameinfo
279 if hasattr(sys
, "getrefcount"):
281 # On some versions, this loses a reference
282 orig
= sys
.getrefcount(__name__
)
283 socket
.getnameinfo(__name__
,0)
285 if sys
.getrefcount(__name__
) <> orig
:
286 self
.fail("socket.getnameinfo loses a reference")
288 def testInterpreterCrash(self
):
289 # Making sure getnameinfo doesn't crash the interpreter
291 # On some versions, this crashes the interpreter.
292 socket
.getnameinfo(('x', 0, 0, 0), 0)
297 # This just checks that htons etc. are their own inverse,
298 # when looking at the lower 16 or 32 bits.
299 sizes
= {socket
.htonl
: 32, socket
.ntohl
: 32,
300 socket
.htons
: 16, socket
.ntohs
: 16}
301 for func
, size
in sizes
.items():
302 mask
= (1L<<size
) - 1
303 for i
in (0, 1, 0xffff, ~
0xffff, 2, 0x01234567, 0x76543210):
304 self
.assertEqual(i
& mask
, func(func(i
&mask
)) & mask
)
307 self
.assertEqual(swapped
& mask
, mask
)
308 self
.assertRaises(OverflowError, func
, 1L<<34)
310 def testNtoHErrors(self
):
311 good_values
= [ 1, 2, 3, 1L, 2L, 3L ]
312 bad_values
= [ -1, -2, -3, -1L, -2L, -3L ]
313 for k
in good_values
:
319 self
.assertRaises(OverflowError, socket
.ntohl
, k
)
320 self
.assertRaises(OverflowError, socket
.ntohs
, k
)
321 self
.assertRaises(OverflowError, socket
.htonl
, k
)
322 self
.assertRaises(OverflowError, socket
.htons
, k
)
324 def testGetServBy(self
):
325 eq
= self
.assertEqual
326 # Find one service that exists, then check all the related interfaces.
327 # I've ordered this by protocols that have both a tcp and udp
328 # protocol, at least for modern Linuxes.
329 if sys
.platform
in ('linux2', 'freebsd4', 'freebsd5', 'freebsd6',
330 'freebsd7', 'freebsd8', 'darwin'):
331 # avoid the 'echo' service on this platform, as there is an
332 # assumption breaking non-standard port/protocol entry
333 services
= ('daytime', 'qotd', 'domain')
335 services
= ('echo', 'daytime', 'domain')
336 for service
in services
:
338 port
= socket
.getservbyname(service
, 'tcp')
344 # Try same call with optional protocol omitted
345 port2
= socket
.getservbyname(service
)
347 # Try udp, but don't barf it it doesn't exist
349 udpport
= socket
.getservbyname(service
, 'udp')
354 # Now make sure the lookup by port returns the same service name
355 eq(socket
.getservbyport(port2
), service
)
356 eq(socket
.getservbyport(port
, 'tcp'), service
)
357 if udpport
is not None:
358 eq(socket
.getservbyport(udpport
, 'udp'), service
)
359 # Make sure getservbyport does not accept out of range ports.
360 self
.assertRaises(OverflowError, socket
.getservbyport
, -1)
361 self
.assertRaises(OverflowError, socket
.getservbyport
, 65536)
363 def testDefaultTimeout(self
):
364 # Testing default timeout
365 # The default timeout should initially be None
366 self
.assertEqual(socket
.getdefaulttimeout(), None)
368 self
.assertEqual(s
.gettimeout(), None)
371 # Set the default timeout to 10, and see if it propagates
372 socket
.setdefaulttimeout(10)
373 self
.assertEqual(socket
.getdefaulttimeout(), 10)
375 self
.assertEqual(s
.gettimeout(), 10)
378 # Reset the default timeout to None, and see if it propagates
379 socket
.setdefaulttimeout(None)
380 self
.assertEqual(socket
.getdefaulttimeout(), None)
382 self
.assertEqual(s
.gettimeout(), None)
385 # Check that setting it to an invalid value raises ValueError
386 self
.assertRaises(ValueError, socket
.setdefaulttimeout
, -1)
388 # Check that setting it to an invalid type raises TypeError
389 self
.assertRaises(TypeError, socket
.setdefaulttimeout
, "spam")
391 def testIPv4_inet_aton_fourbytes(self
):
392 if not hasattr(socket
, 'inet_aton'):
393 return # No inet_aton, nothing to check
394 # Test that issue1008086 and issue767150 are fixed.
395 # It must return 4 bytes.
396 self
.assertEquals('\x00'*4, socket
.inet_aton('0.0.0.0'))
397 self
.assertEquals('\xff'*4, socket
.inet_aton('255.255.255.255'))
399 def testIPv4toString(self
):
400 if not hasattr(socket
, 'inet_pton'):
401 return # No inet_pton() on this platform
402 from socket
import inet_aton
as f
, inet_pton
, AF_INET
403 g
= lambda a
: inet_pton(AF_INET
, a
)
405 self
.assertEquals('\x00\x00\x00\x00', f('0.0.0.0'))
406 self
.assertEquals('\xff\x00\xff\x00', f('255.0.255.0'))
407 self
.assertEquals('\xaa\xaa\xaa\xaa', f('170.170.170.170'))
408 self
.assertEquals('\x01\x02\x03\x04', f('1.2.3.4'))
409 self
.assertEquals('\xff\xff\xff\xff', f('255.255.255.255'))
411 self
.assertEquals('\x00\x00\x00\x00', g('0.0.0.0'))
412 self
.assertEquals('\xff\x00\xff\x00', g('255.0.255.0'))
413 self
.assertEquals('\xaa\xaa\xaa\xaa', g('170.170.170.170'))
414 self
.assertEquals('\xff\xff\xff\xff', g('255.255.255.255'))
416 def testIPv6toString(self
):
417 if not hasattr(socket
, 'inet_pton'):
418 return # No inet_pton() on this platform
420 from socket
import inet_pton
, AF_INET6
, has_ipv6
425 f
= lambda a
: inet_pton(AF_INET6
, a
)
427 self
.assertEquals('\x00' * 16, f('::'))
428 self
.assertEquals('\x00' * 16, f('0::0'))
429 self
.assertEquals('\x00\x01' + '\x00' * 14, f('1::'))
431 '\x45\xef\x76\xcb\x00\x1a\x56\xef\xaf\xeb\x0b\xac\x19\x24\xae\xae',
432 f('45ef:76cb:1a:56ef:afeb:bac:1924:aeae')
435 def testStringToIPv4(self
):
436 if not hasattr(socket
, 'inet_ntop'):
437 return # No inet_ntop() on this platform
438 from socket
import inet_ntoa
as f
, inet_ntop
, AF_INET
439 g
= lambda a
: inet_ntop(AF_INET
, a
)
441 self
.assertEquals('1.0.1.0', f('\x01\x00\x01\x00'))
442 self
.assertEquals('170.85.170.85', f('\xaa\x55\xaa\x55'))
443 self
.assertEquals('255.255.255.255', f('\xff\xff\xff\xff'))
444 self
.assertEquals('1.2.3.4', f('\x01\x02\x03\x04'))
446 self
.assertEquals('1.0.1.0', g('\x01\x00\x01\x00'))
447 self
.assertEquals('170.85.170.85', g('\xaa\x55\xaa\x55'))
448 self
.assertEquals('255.255.255.255', g('\xff\xff\xff\xff'))
450 def testStringToIPv6(self
):
451 if not hasattr(socket
, 'inet_ntop'):
452 return # No inet_ntop() on this platform
454 from socket
import inet_ntop
, AF_INET6
, has_ipv6
459 f
= lambda a
: inet_ntop(AF_INET6
, a
)
461 self
.assertEquals('::', f('\x00' * 16))
462 self
.assertEquals('::1', f('\x00' * 15 + '\x01'))
464 'aef:b01:506:1001:ffff:9997:55:170',
465 f('\x0a\xef\x0b\x01\x05\x06\x10\x01\xff\xff\x99\x97\x00\x55\x01\x70')
468 # XXX The following don't test module-level functionality...
470 def _get_unused_port(self
, bind_address
='0.0.0.0'):
471 """Use a temporary socket to elicit an unused ephemeral port.
474 bind_address: Hostname or IP address to search for a port on.
476 Returns: A most likely to be unused port.
478 tempsock
= socket
.socket()
479 tempsock
.bind((bind_address
, 0))
480 host
, port
= tempsock
.getsockname()
484 def testSockName(self
):
485 # Testing getsockname()
486 port
= self
._get
_unused
_port
()
487 sock
= socket
.socket(socket
.AF_INET
, socket
.SOCK_STREAM
)
488 sock
.bind(("0.0.0.0", port
))
489 name
= sock
.getsockname()
490 # XXX(nnorwitz): http://tinyurl.com/os5jz seems to indicate
491 # it reasonable to get the host's addr in addition to 0.0.0.0.
492 # At least for eCos. This is required for the S/390 to pass.
493 my_ip_addr
= socket
.gethostbyname(socket
.gethostname())
494 self
.assertTrue(name
[0] in ("0.0.0.0", my_ip_addr
), '%s invalid' % name
[0])
495 self
.assertEqual(name
[1], port
)
497 def testGetSockOpt(self
):
498 # Testing getsockopt()
499 # We know a socket should start without reuse==0
500 sock
= socket
.socket(socket
.AF_INET
, socket
.SOCK_STREAM
)
501 reuse
= sock
.getsockopt(socket
.SOL_SOCKET
, socket
.SO_REUSEADDR
)
502 self
.assertFalse(reuse
!= 0, "initial mode is reuse")
504 def testSetSockOpt(self
):
505 # Testing setsockopt()
506 sock
= socket
.socket(socket
.AF_INET
, socket
.SOCK_STREAM
)
507 sock
.setsockopt(socket
.SOL_SOCKET
, socket
.SO_REUSEADDR
, 1)
508 reuse
= sock
.getsockopt(socket
.SOL_SOCKET
, socket
.SO_REUSEADDR
)
509 self
.assertFalse(reuse
== 0, "failed to set reuse mode")
511 def testSendAfterClose(self
):
512 # testing send() after close() with timeout
513 sock
= socket
.socket(socket
.AF_INET
, socket
.SOCK_STREAM
)
516 self
.assertRaises(socket
.error
, sock
.send
, "spam")
518 def testNewAttributes(self
):
519 # testing .family, .type and .protocol
520 sock
= socket
.socket(socket
.AF_INET
, socket
.SOCK_STREAM
)
521 self
.assertEqual(sock
.family
, socket
.AF_INET
)
522 self
.assertEqual(sock
.type, socket
.SOCK_STREAM
)
523 self
.assertEqual(sock
.proto
, 0)
526 def test_getsockaddrarg(self
):
528 port
= self
._get
_unused
_port
(bind_address
=host
)
529 big_port
= port
+ 65536
530 neg_port
= port
- 65536
531 sock
= socket
.socket()
533 self
.assertRaises(OverflowError, sock
.bind
, (host
, big_port
))
534 self
.assertRaises(OverflowError, sock
.bind
, (host
, neg_port
))
535 sock
.bind((host
, port
))
539 def test_sock_ioctl(self
):
542 self
.assertTrue(hasattr(socket
.socket
, 'ioctl'))
543 self
.assertTrue(hasattr(socket
, 'SIO_RCVALL'))
544 self
.assertTrue(hasattr(socket
, 'RCVALL_ON'))
545 self
.assertTrue(hasattr(socket
, 'RCVALL_OFF'))
546 self
.assertTrue(hasattr(socket
, 'SIO_KEEPALIVE_VALS'))
548 self
.assertRaises(ValueError, s
.ioctl
, -1, None)
549 s
.ioctl(socket
.SIO_KEEPALIVE_VALS
, (1, 100, 100))
552 class BasicTCPTest(SocketConnectedTest
):
554 def __init__(self
, methodName
='runTest'):
555 SocketConnectedTest
.__init
__(self
, methodName
=methodName
)
558 # Testing large receive over TCP
559 msg
= self
.cli_conn
.recv(1024)
560 self
.assertEqual(msg
, MSG
)
563 self
.serv_conn
.send(MSG
)
565 def testOverFlowRecv(self
):
566 # Testing receive in chunks over TCP
567 seg1
= self
.cli_conn
.recv(len(MSG
) - 3)
568 seg2
= self
.cli_conn
.recv(1024)
570 self
.assertEqual(msg
, MSG
)
572 def _testOverFlowRecv(self
):
573 self
.serv_conn
.send(MSG
)
575 def testRecvFrom(self
):
576 # Testing large recvfrom() over TCP
577 msg
, addr
= self
.cli_conn
.recvfrom(1024)
578 self
.assertEqual(msg
, MSG
)
580 def _testRecvFrom(self
):
581 self
.serv_conn
.send(MSG
)
583 def testOverFlowRecvFrom(self
):
584 # Testing recvfrom() in chunks over TCP
585 seg1
, addr
= self
.cli_conn
.recvfrom(len(MSG
)-3)
586 seg2
, addr
= self
.cli_conn
.recvfrom(1024)
588 self
.assertEqual(msg
, MSG
)
590 def _testOverFlowRecvFrom(self
):
591 self
.serv_conn
.send(MSG
)
593 def testSendAll(self
):
594 # Testing sendall() with a 2048 byte string over TCP
597 read
= self
.cli_conn
.recv(1024)
601 self
.assertEqual(msg
, 'f' * 2048)
603 def _testSendAll(self
):
604 big_chunk
= 'f' * 2048
605 self
.serv_conn
.sendall(big_chunk
)
607 def testFromFd(self
):
609 if not hasattr(socket
, "fromfd"):
610 return # On Windows, this doesn't exist
611 fd
= self
.cli_conn
.fileno()
612 sock
= socket
.fromfd(fd
, socket
.AF_INET
, socket
.SOCK_STREAM
)
613 msg
= sock
.recv(1024)
614 self
.assertEqual(msg
, MSG
)
616 def _testFromFd(self
):
617 self
.serv_conn
.send(MSG
)
619 def testShutdown(self
):
621 msg
= self
.cli_conn
.recv(1024)
622 self
.assertEqual(msg
, MSG
)
623 # wait for _testShutdown to finish: on OS X, when the server
624 # closes the connection the client also becomes disconnected,
625 # and the client's shutdown call will fail. (Issue #4397.)
628 def _testShutdown(self
):
629 self
.serv_conn
.send(MSG
)
630 self
.serv_conn
.shutdown(2)
632 class BasicUDPTest(ThreadedUDPSocketTest
):
634 def __init__(self
, methodName
='runTest'):
635 ThreadedUDPSocketTest
.__init
__(self
, methodName
=methodName
)
637 def testSendtoAndRecv(self
):
638 # Testing sendto() and Recv() over UDP
639 msg
= self
.serv
.recv(len(MSG
))
640 self
.assertEqual(msg
, MSG
)
642 def _testSendtoAndRecv(self
):
643 self
.cli
.sendto(MSG
, 0, (HOST
, self
.port
))
645 def testRecvFrom(self
):
646 # Testing recvfrom() over UDP
647 msg
, addr
= self
.serv
.recvfrom(len(MSG
))
648 self
.assertEqual(msg
, MSG
)
650 def _testRecvFrom(self
):
651 self
.cli
.sendto(MSG
, 0, (HOST
, self
.port
))
653 def testRecvFromNegative(self
):
654 # Negative lengths passed to recvfrom should give ValueError.
655 self
.assertRaises(ValueError, self
.serv
.recvfrom
, -1)
657 def _testRecvFromNegative(self
):
658 self
.cli
.sendto(MSG
, 0, (HOST
, self
.port
))
660 class TCPCloserTest(ThreadedTCPSocketTest
):
663 conn
, addr
= self
.serv
.accept()
667 read
, write
, err
= select
.select([sd
], [], [], 1.0)
668 self
.assertEqual(read
, [sd
])
669 self
.assertEqual(sd
.recv(1), '')
671 def _testClose(self
):
672 self
.cli
.connect((HOST
, self
.port
))
675 class BasicSocketPairTest(SocketPairTest
):
677 def __init__(self
, methodName
='runTest'):
678 SocketPairTest
.__init
__(self
, methodName
=methodName
)
681 msg
= self
.serv
.recv(1024)
682 self
.assertEqual(msg
, MSG
)
691 msg
= self
.cli
.recv(1024)
692 self
.assertEqual(msg
, MSG
)
694 class NonBlockingTCPTests(ThreadedTCPSocketTest
):
696 def __init__(self
, methodName
='runTest'):
697 ThreadedTCPSocketTest
.__init
__(self
, methodName
=methodName
)
699 def testSetBlocking(self
):
700 # Testing whether set blocking works
701 self
.serv
.setblocking(0)
708 self
.assertTrue((end
- start
) < 1.0, "Error setting non-blocking mode.")
710 def _testSetBlocking(self
):
713 def testAccept(self
):
714 # Testing non-blocking accept
715 self
.serv
.setblocking(0)
717 conn
, addr
= self
.serv
.accept()
721 self
.fail("Error trying to do non-blocking accept.")
722 read
, write
, err
= select
.select([self
.serv
], [], [])
723 if self
.serv
in read
:
724 conn
, addr
= self
.serv
.accept()
726 self
.fail("Error trying to do accept after select.")
728 def _testAccept(self
):
730 self
.cli
.connect((HOST
, self
.port
))
732 def testConnect(self
):
733 # Testing non-blocking connect
734 conn
, addr
= self
.serv
.accept()
736 def _testConnect(self
):
737 self
.cli
.settimeout(10)
738 self
.cli
.connect((HOST
, self
.port
))
741 # Testing non-blocking recv
742 conn
, addr
= self
.serv
.accept()
745 msg
= conn
.recv(len(MSG
))
749 self
.fail("Error trying to do non-blocking recv.")
750 read
, write
, err
= select
.select([conn
], [], [])
752 msg
= conn
.recv(len(MSG
))
753 self
.assertEqual(msg
, MSG
)
755 self
.fail("Error during select call to non-blocking socket.")
758 self
.cli
.connect((HOST
, self
.port
))
762 class FileObjectClassTestCase(SocketConnectedTest
):
764 bufsize
= -1 # Use default buffer size
766 def __init__(self
, methodName
='runTest'):
767 SocketConnectedTest
.__init
__(self
, methodName
=methodName
)
770 SocketConnectedTest
.setUp(self
)
771 self
.serv_file
= self
.cli_conn
.makefile('rb', self
.bufsize
)
774 self
.serv_file
.close()
775 self
.assertTrue(self
.serv_file
.closed
)
776 self
.serv_file
= None
777 SocketConnectedTest
.tearDown(self
)
779 def clientSetUp(self
):
780 SocketConnectedTest
.clientSetUp(self
)
781 self
.cli_file
= self
.serv_conn
.makefile('wb')
783 def clientTearDown(self
):
784 self
.cli_file
.close()
785 self
.assertTrue(self
.cli_file
.closed
)
787 SocketConnectedTest
.clientTearDown(self
)
789 def testSmallRead(self
):
790 # Performing small file read test
791 first_seg
= self
.serv_file
.read(len(MSG
)-3)
792 second_seg
= self
.serv_file
.read(3)
793 msg
= first_seg
+ second_seg
794 self
.assertEqual(msg
, MSG
)
796 def _testSmallRead(self
):
797 self
.cli_file
.write(MSG
)
798 self
.cli_file
.flush()
800 def testFullRead(self
):
802 msg
= self
.serv_file
.read()
803 self
.assertEqual(msg
, MSG
)
805 def _testFullRead(self
):
806 self
.cli_file
.write(MSG
)
807 self
.cli_file
.close()
809 def testUnbufferedRead(self
):
810 # Performing unbuffered file read test
813 char
= self
.serv_file
.read(1)
817 self
.assertEqual(buf
, MSG
)
819 def _testUnbufferedRead(self
):
820 self
.cli_file
.write(MSG
)
821 self
.cli_file
.flush()
823 def testReadline(self
):
824 # Performing file readline test
825 line
= self
.serv_file
.readline()
826 self
.assertEqual(line
, MSG
)
828 def _testReadline(self
):
829 self
.cli_file
.write(MSG
)
830 self
.cli_file
.flush()
832 def testReadlineAfterRead(self
):
833 a_baloo_is
= self
.serv_file
.read(len("A baloo is"))
834 self
.assertEqual("A baloo is", a_baloo_is
)
835 _a_bear
= self
.serv_file
.read(len(" a bear"))
836 self
.assertEqual(" a bear", _a_bear
)
837 line
= self
.serv_file
.readline()
838 self
.assertEqual("\n", line
)
839 line
= self
.serv_file
.readline()
840 self
.assertEqual("A BALOO IS A BEAR.\n", line
)
841 line
= self
.serv_file
.readline()
842 self
.assertEqual(MSG
, line
)
844 def _testReadlineAfterRead(self
):
845 self
.cli_file
.write("A baloo is a bear\n")
846 self
.cli_file
.write("A BALOO IS A BEAR.\n")
847 self
.cli_file
.write(MSG
)
848 self
.cli_file
.flush()
850 def testReadlineAfterReadNoNewline(self
):
851 end_of_
= self
.serv_file
.read(len("End Of "))
852 self
.assertEqual("End Of ", end_of_
)
853 line
= self
.serv_file
.readline()
854 self
.assertEqual("Line", line
)
856 def _testReadlineAfterReadNoNewline(self
):
857 self
.cli_file
.write("End Of Line")
859 def testClosedAttr(self
):
860 self
.assertTrue(not self
.serv_file
.closed
)
862 def _testClosedAttr(self
):
863 self
.assertTrue(not self
.cli_file
.closed
)
866 class FileObjectInterruptedTestCase(unittest
.TestCase
):
867 """Test that the file object correctly handles EINTR internally."""
869 class MockSocket(object):
870 def __init__(self
, recv_funcs
=()):
871 # A generator that returns callables that we'll call for each
873 self
._recv
_step
= iter(recv_funcs
)
875 def recv(self
, size
):
876 return self
._recv
_step
.next()()
880 raise socket
.error(errno
.EINTR
)
882 def _test_readline(self
, size
=-1, **kwargs
):
883 mock_sock
= self
.MockSocket(recv_funcs
=[
884 lambda : "This is the first line\nAnd the sec",
886 lambda : "ond line is here\n",
889 fo
= socket
._fileobject
(mock_sock
, **kwargs
)
890 self
.assertEquals(fo
.readline(size
), "This is the first line\n")
891 self
.assertEquals(fo
.readline(size
), "And the second line is here\n")
893 def _test_read(self
, size
=-1, **kwargs
):
894 mock_sock
= self
.MockSocket(recv_funcs
=[
895 lambda : "This is the first line\nAnd the sec",
897 lambda : "ond line is here\n",
900 fo
= socket
._fileobject
(mock_sock
, **kwargs
)
901 self
.assertEquals(fo
.read(size
), "This is the first line\n"
902 "And the second line is here\n")
904 def test_default(self
):
905 self
._test
_readline
()
906 self
._test
_readline
(size
=100)
908 self
._test
_read
(size
=100)
910 def test_with_1k_buffer(self
):
911 self
._test
_readline
(bufsize
=1024)
912 self
._test
_readline
(size
=100, bufsize
=1024)
913 self
._test
_read
(bufsize
=1024)
914 self
._test
_read
(size
=100, bufsize
=1024)
916 def _test_readline_no_buffer(self
, size
=-1):
917 mock_sock
= self
.MockSocket(recv_funcs
=[
925 fo
= socket
._fileobject
(mock_sock
, bufsize
=0)
926 self
.assertEquals(fo
.readline(size
), "aa\n")
927 self
.assertEquals(fo
.readline(size
), "BBbb")
929 def test_no_buffer(self
):
930 self
._test
_readline
_no
_buffer
()
931 self
._test
_readline
_no
_buffer
(size
=4)
932 self
._test
_read
(bufsize
=0)
933 self
._test
_read
(size
=100, bufsize
=0)
936 class UnbufferedFileObjectClassTestCase(FileObjectClassTestCase
):
938 """Repeat the tests from FileObjectClassTestCase with bufsize==0.
940 In this case (and in this case only), it should be possible to
941 create a file object, read a line from it, create another file
942 object, read another line from it, without loss of data in the
943 first file object's buffer. Note that httplib relies on this
944 when reading multiple requests from the same socket."""
946 bufsize
= 0 # Use unbuffered mode
948 def testUnbufferedReadline(self
):
949 # Read a line, create a new file object, read another line with it
950 line
= self
.serv_file
.readline() # first line
951 self
.assertEqual(line
, "A. " + MSG
) # first line
952 self
.serv_file
= self
.cli_conn
.makefile('rb', 0)
953 line
= self
.serv_file
.readline() # second line
954 self
.assertEqual(line
, "B. " + MSG
) # second line
956 def _testUnbufferedReadline(self
):
957 self
.cli_file
.write("A. " + MSG
)
958 self
.cli_file
.write("B. " + MSG
)
959 self
.cli_file
.flush()
961 class LineBufferedFileObjectClassTestCase(FileObjectClassTestCase
):
963 bufsize
= 1 # Default-buffered for reading; line-buffered for writing
966 class SmallBufferedFileObjectClassTestCase(FileObjectClassTestCase
):
968 bufsize
= 2 # Exercise the buffering code
971 class NetworkConnectionTest(object):
972 """Prove network connection."""
973 def clientSetUp(self
):
974 # We're inherited below by BasicTCPTest2, which also inherits
975 # BasicTCPTest, which defines self.port referenced below.
976 self
.cli
= socket
.create_connection((HOST
, self
.port
))
977 self
.serv_conn
= self
.cli
979 class BasicTCPTest2(NetworkConnectionTest
, BasicTCPTest
):
980 """Tests that NetworkConnection does not break existing TCP functionality.
983 class NetworkConnectionNoServer(unittest
.TestCase
):
984 def testWithoutServer(self
):
985 port
= test_support
.find_unused_port()
988 lambda: socket
.create_connection((HOST
, port
))
991 class NetworkConnectionAttributesTest(SocketTCPTest
, ThreadableTest
):
993 def __init__(self
, methodName
='runTest'):
994 SocketTCPTest
.__init
__(self
, methodName
=methodName
)
995 ThreadableTest
.__init
__(self
)
997 def clientSetUp(self
):
998 self
.source_port
= test_support
.find_unused_port()
1000 def clientTearDown(self
):
1003 ThreadableTest
.clientTearDown(self
)
1005 def _justAccept(self
):
1006 conn
, addr
= self
.serv
.accept()
1008 testFamily
= _justAccept
1009 def _testFamily(self
):
1010 self
.cli
= socket
.create_connection((HOST
, self
.port
), timeout
=30)
1011 self
.assertEqual(self
.cli
.family
, 2)
1013 testSourceAddress
= _justAccept
1014 def _testSourceAddress(self
):
1015 self
.cli
= socket
.create_connection((HOST
, self
.port
), timeout
=30,
1016 source_address
=('', self
.source_port
))
1017 self
.assertEqual(self
.cli
.getsockname()[1], self
.source_port
)
1018 # The port number being used is sufficient to show that the bind()
1021 testTimeoutDefault
= _justAccept
1022 def _testTimeoutDefault(self
):
1023 # passing no explicit timeout uses socket's global default
1024 self
.assertTrue(socket
.getdefaulttimeout() is None)
1025 socket
.setdefaulttimeout(42)
1027 self
.cli
= socket
.create_connection((HOST
, self
.port
))
1029 socket
.setdefaulttimeout(None)
1030 self
.assertEquals(self
.cli
.gettimeout(), 42)
1032 testTimeoutNone
= _justAccept
1033 def _testTimeoutNone(self
):
1034 # None timeout means the same as sock.settimeout(None)
1035 self
.assertTrue(socket
.getdefaulttimeout() is None)
1036 socket
.setdefaulttimeout(30)
1038 self
.cli
= socket
.create_connection((HOST
, self
.port
), timeout
=None)
1040 socket
.setdefaulttimeout(None)
1041 self
.assertEqual(self
.cli
.gettimeout(), None)
1043 testTimeoutValueNamed
= _justAccept
1044 def _testTimeoutValueNamed(self
):
1045 self
.cli
= socket
.create_connection((HOST
, self
.port
), timeout
=30)
1046 self
.assertEqual(self
.cli
.gettimeout(), 30)
1048 testTimeoutValueNonamed
= _justAccept
1049 def _testTimeoutValueNonamed(self
):
1050 self
.cli
= socket
.create_connection((HOST
, self
.port
), 30)
1051 self
.assertEqual(self
.cli
.gettimeout(), 30)
1053 class NetworkConnectionBehaviourTest(SocketTCPTest
, ThreadableTest
):
1055 def __init__(self
, methodName
='runTest'):
1056 SocketTCPTest
.__init
__(self
, methodName
=methodName
)
1057 ThreadableTest
.__init
__(self
)
1059 def clientSetUp(self
):
1062 def clientTearDown(self
):
1065 ThreadableTest
.clientTearDown(self
)
1067 def testInsideTimeout(self
):
1068 conn
, addr
= self
.serv
.accept()
1071 testOutsideTimeout
= testInsideTimeout
1073 def _testInsideTimeout(self
):
1074 self
.cli
= sock
= socket
.create_connection((HOST
, self
.port
))
1076 self
.assertEqual(data
, "done!")
1078 def _testOutsideTimeout(self
):
1079 self
.cli
= sock
= socket
.create_connection((HOST
, self
.port
), timeout
=1)
1080 self
.assertRaises(socket
.timeout
, lambda: sock
.recv(5))
1083 class Urllib2FileobjectTest(unittest
.TestCase
):
1085 # urllib2.HTTPHandler has "borrowed" socket._fileobject, and requires that
1086 # it close the socket if the close c'tor argument is true
1088 def testClose(self
):
1091 def flush(self
): pass
1092 def close(self
): self
.closed
= True
1094 # must not close unless we request it: the original use of _fileobject
1095 # by module socket requires that the underlying socket not be closed until
1096 # the _socketobject that created the _fileobject is closed
1098 f
= socket
._fileobject
(s
)
1100 self
.assertTrue(not s
.closed
)
1103 f
= socket
._fileobject
(s
, close
=True)
1105 self
.assertTrue(s
.closed
)
1107 class TCPTimeoutTest(SocketTCPTest
):
1109 def testTCPTimeout(self
):
1110 def raise_timeout(*args
, **kwargs
):
1111 self
.serv
.settimeout(1.0)
1113 self
.assertRaises(socket
.timeout
, raise_timeout
,
1114 "Error generating a timeout exception (TCP)")
1116 def testTimeoutZero(self
):
1119 self
.serv
.settimeout(0.0)
1120 foo
= self
.serv
.accept()
1121 except socket
.timeout
:
1122 self
.fail("caught timeout instead of error (TCP)")
1123 except socket
.error
:
1126 self
.fail("caught unexpected exception (TCP)")
1128 self
.fail("accept() returned success when we did not expect it")
1130 def testInterruptedTimeout(self
):
1131 # XXX I don't know how to do this test on MSWindows or any other
1132 # plaform that doesn't support signal.alarm() or os.kill(), though
1133 # the bug should have existed on all platforms.
1134 if not hasattr(signal
, "alarm"):
1135 return # can only test on *nix
1136 self
.serv
.settimeout(5.0) # must be longer than alarm
1137 class Alarm(Exception):
1139 def alarm_handler(signal
, frame
):
1141 old_alarm
= signal
.signal(signal
.SIGALRM
, alarm_handler
)
1143 signal
.alarm(2) # POSIX allows alarm to be up to 1 second early
1145 foo
= self
.serv
.accept()
1146 except socket
.timeout
:
1147 self
.fail("caught timeout instead of Alarm")
1151 self
.fail("caught other exception instead of Alarm:"
1153 (sys
.exc_info()[:2] + (traceback
.format_exc(),)))
1155 self
.fail("nothing caught")
1157 signal
.alarm(0) # shut off alarm
1159 self
.fail("got Alarm in wrong place")
1161 # no alarm can be pending. Safe to restore old handler.
1162 signal
.signal(signal
.SIGALRM
, old_alarm
)
1164 class UDPTimeoutTest(SocketTCPTest
):
1166 def testUDPTimeout(self
):
1167 def raise_timeout(*args
, **kwargs
):
1168 self
.serv
.settimeout(1.0)
1169 self
.serv
.recv(1024)
1170 self
.assertRaises(socket
.timeout
, raise_timeout
,
1171 "Error generating a timeout exception (UDP)")
1173 def testTimeoutZero(self
):
1176 self
.serv
.settimeout(0.0)
1177 foo
= self
.serv
.recv(1024)
1178 except socket
.timeout
:
1179 self
.fail("caught timeout instead of error (UDP)")
1180 except socket
.error
:
1183 self
.fail("caught unexpected exception (UDP)")
1185 self
.fail("recv() returned success when we did not expect it")
1187 class TestExceptions(unittest
.TestCase
):
1189 def testExceptionTree(self
):
1190 self
.assertTrue(issubclass(socket
.error
, Exception))
1191 self
.assertTrue(issubclass(socket
.herror
, socket
.error
))
1192 self
.assertTrue(issubclass(socket
.gaierror
, socket
.error
))
1193 self
.assertTrue(issubclass(socket
.timeout
, socket
.error
))
1195 class TestLinuxAbstractNamespace(unittest
.TestCase
):
1199 def testLinuxAbstractNamespace(self
):
1200 address
= "\x00python-test-hello\x00\xff"
1201 s1
= socket
.socket(socket
.AF_UNIX
, socket
.SOCK_STREAM
)
1204 s2
= socket
.socket(socket
.AF_UNIX
, socket
.SOCK_STREAM
)
1205 s2
.connect(s1
.getsockname())
1207 self
.assertEqual(s1
.getsockname(), address
)
1208 self
.assertEqual(s2
.getpeername(), address
)
1210 def testMaxName(self
):
1211 address
= "\x00" + "h" * (self
.UNIX_PATH_MAX
- 1)
1212 s
= socket
.socket(socket
.AF_UNIX
, socket
.SOCK_STREAM
)
1214 self
.assertEqual(s
.getsockname(), address
)
1216 def testNameOverflow(self
):
1217 address
= "\x00" + "h" * self
.UNIX_PATH_MAX
1218 s
= socket
.socket(socket
.AF_UNIX
, socket
.SOCK_STREAM
)
1219 self
.assertRaises(socket
.error
, s
.bind
, address
)
1222 class BufferIOTest(SocketConnectedTest
):
1224 Test the buffer versions of socket.recv() and socket.send().
1226 def __init__(self
, methodName
='runTest'):
1227 SocketConnectedTest
.__init
__(self
, methodName
=methodName
)
1229 def testRecvInto(self
):
1230 buf
= array
.array('c', ' '*1024)
1231 nbytes
= self
.cli_conn
.recv_into(buf
)
1232 self
.assertEqual(nbytes
, len(MSG
))
1233 msg
= buf
.tostring()[:len(MSG
)]
1234 self
.assertEqual(msg
, MSG
)
1236 def _testRecvInto(self
):
1238 self
.serv_conn
.send(buf
)
1240 def testRecvFromInto(self
):
1241 buf
= array
.array('c', ' '*1024)
1242 nbytes
, addr
= self
.cli_conn
.recvfrom_into(buf
)
1243 self
.assertEqual(nbytes
, len(MSG
))
1244 msg
= buf
.tostring()[:len(MSG
)]
1245 self
.assertEqual(msg
, MSG
)
1247 def _testRecvFromInto(self
):
1249 self
.serv_conn
.send(buf
)
1256 def isTipcAvailable():
1257 """Check if the TIPC module is loaded
1259 The TIPC module is not loaded automatically on Ubuntu and probably
1260 other Linux distros.
1262 if not hasattr(socket
, "AF_TIPC"):
1264 if not os
.path
.isfile("/proc/modules"):
1266 with
open("/proc/modules") as f
:
1268 if line
.startswith("tipc "):
1270 if test_support
.verbose
:
1271 print "TIPC module is not loaded, please 'sudo modprobe tipc'"
1274 class TIPCTest (unittest
.TestCase
):
1276 srv
= socket
.socket(socket
.AF_TIPC
, socket
.SOCK_RDM
)
1277 cli
= socket
.socket(socket
.AF_TIPC
, socket
.SOCK_RDM
)
1279 srv
.setsockopt(socket
.SOL_SOCKET
, socket
.SO_REUSEADDR
, 1)
1280 srvaddr
= (socket
.TIPC_ADDR_NAMESEQ
, TIPC_STYPE
,
1281 TIPC_LOWER
, TIPC_UPPER
)
1284 sendaddr
= (socket
.TIPC_ADDR_NAME
, TIPC_STYPE
,
1285 TIPC_LOWER
+ (TIPC_UPPER
- TIPC_LOWER
) / 2, 0)
1286 cli
.sendto(MSG
, sendaddr
)
1288 msg
, recvaddr
= srv
.recvfrom(1024)
1290 self
.assertEqual(cli
.getsockname(), recvaddr
)
1291 self
.assertEqual(msg
, MSG
)
1294 class TIPCThreadableTest (unittest
.TestCase
, ThreadableTest
):
1295 def __init__(self
, methodName
= 'runTest'):
1296 unittest
.TestCase
.__init
__(self
, methodName
= methodName
)
1297 ThreadableTest
.__init
__(self
)
1300 self
.srv
= socket
.socket(socket
.AF_TIPC
, socket
.SOCK_STREAM
)
1301 self
.srv
.setsockopt(socket
.SOL_SOCKET
, socket
.SO_REUSEADDR
, 1)
1302 srvaddr
= (socket
.TIPC_ADDR_NAMESEQ
, TIPC_STYPE
,
1303 TIPC_LOWER
, TIPC_UPPER
)
1304 self
.srv
.bind(srvaddr
)
1306 self
.serverExplicitReady()
1307 self
.conn
, self
.connaddr
= self
.srv
.accept()
1309 def clientSetUp(self
):
1310 # The is a hittable race between serverExplicitReady() and the
1311 # accept() call; sleep a little while to avoid it, otherwise
1312 # we could get an exception
1314 self
.cli
= socket
.socket(socket
.AF_TIPC
, socket
.SOCK_STREAM
)
1315 addr
= (socket
.TIPC_ADDR_NAME
, TIPC_STYPE
,
1316 TIPC_LOWER
+ (TIPC_UPPER
- TIPC_LOWER
) / 2, 0)
1317 self
.cli
.connect(addr
)
1318 self
.cliaddr
= self
.cli
.getsockname()
1320 def testStream(self
):
1321 msg
= self
.conn
.recv(1024)
1322 self
.assertEqual(msg
, MSG
)
1323 self
.assertEqual(self
.cliaddr
, self
.connaddr
)
1325 def _testStream(self
):
1331 tests
= [GeneralModuleTests
, BasicTCPTest
, TCPCloserTest
, TCPTimeoutTest
,
1332 TestExceptions
, BufferIOTest
, BasicTCPTest2
]
1333 if sys
.platform
!= 'mac':
1334 tests
.extend([ BasicUDPTest
, UDPTimeoutTest
])
1337 NonBlockingTCPTests
,
1338 FileObjectClassTestCase
,
1339 FileObjectInterruptedTestCase
,
1340 UnbufferedFileObjectClassTestCase
,
1341 LineBufferedFileObjectClassTestCase
,
1342 SmallBufferedFileObjectClassTestCase
,
1343 Urllib2FileobjectTest
,
1344 NetworkConnectionNoServer
,
1345 NetworkConnectionAttributesTest
,
1346 NetworkConnectionBehaviourTest
,
1348 if hasattr(socket
, "socketpair"):
1349 tests
.append(BasicSocketPairTest
)
1350 if sys
.platform
== 'linux2':
1351 tests
.append(TestLinuxAbstractNamespace
)
1352 if isTipcAvailable():
1353 tests
.append(TIPCTest
)
1354 tests
.append(TIPCThreadableTest
)
1356 thread_info
= test_support
.threading_setup()
1357 test_support
.run_unittest(*tests
)
1358 test_support
.threading_cleanup(*thread_info
)
1360 if __name__
== "__main__":