4 from test
import test_support
9 import thread
, threading
16 from weakref
import proxy
19 HOST
= test_support
.HOST
20 MSG
= 'Michael Gilfix was here\n'
22 class SocketTCPTest(unittest
.TestCase
):
25 self
.serv
= socket
.socket(socket
.AF_INET
, socket
.SOCK_STREAM
)
26 self
.port
= test_support
.bind_port(self
.serv
)
33 class SocketUDPTest(unittest
.TestCase
):
36 self
.serv
= socket
.socket(socket
.AF_INET
, socket
.SOCK_DGRAM
)
37 self
.port
= test_support
.bind_port(self
.serv
)
44 """Threadable Test class
46 The ThreadableTest class makes it easy to create a threaded
47 client/server pair from an existing unit test. To create a
48 new threaded class from an existing unit test, use multiple
51 class NewClass (OldClass, ThreadableTest):
54 This class defines two new fixture functions with obvious
55 purposes for overriding:
60 Any new test functions within the class must then define
61 tests in pairs, where the test name is preceeded with a
62 '_' to indicate the client portion of the test. Ex:
70 Any exceptions raised by the clients during their tests
71 are caught and transferred to the main thread to alert
72 the testing framework.
74 Note, the server setup function cannot call any blocking
75 functions that rely on the client thread during setup,
76 unless serverExplicitReady() is called just before
77 the blocking call (such as in setting up a client/server
78 connection and performing the accept() in setUp().
82 # Swap the true setup function
83 self
.__setUp
= self
.setUp
84 self
.__tearDown
= self
.tearDown
85 self
.setUp
= self
._setUp
86 self
.tearDown
= self
._tearDown
88 def serverExplicitReady(self
):
89 """This method allows the server to explicitly indicate that
90 it wants the client thread to proceed. This is useful if the
91 server is about to execute a blocking routine that is
92 dependent upon the client thread during its setup routine."""
93 self
.server_ready
.set()
96 self
.server_ready
= threading
.Event()
97 self
.client_ready
= threading
.Event()
98 self
.done
= threading
.Event()
99 self
.queue
= Queue
.Queue(1)
101 # Do some munging to start the client test.
102 methodname
= self
.id()
103 i
= methodname
.rfind('.')
104 methodname
= methodname
[i
+1:]
105 test_method
= getattr(self
, '_' + methodname
)
106 self
.client_thread
= thread
.start_new_thread(
107 self
.clientRun
, (test_method
,))
110 if not self
.server_ready
.is_set():
111 self
.server_ready
.set()
112 self
.client_ready
.wait()
118 if not self
.queue
.empty():
119 msg
= self
.queue
.get()
122 def clientRun(self
, test_func
):
123 self
.server_ready
.wait()
124 self
.client_ready
.set()
126 if not callable(test_func
):
127 raise TypeError, "test_func must be a callable function"
130 except Exception, strerror
:
131 self
.queue
.put(strerror
)
132 self
.clientTearDown()
134 def clientSetUp(self
):
135 raise NotImplementedError, "clientSetUp must be implemented."
137 def clientTearDown(self
):
141 class ThreadedTCPSocketTest(SocketTCPTest
, ThreadableTest
):
143 def __init__(self
, methodName
='runTest'):
144 SocketTCPTest
.__init
__(self
, methodName
=methodName
)
145 ThreadableTest
.__init
__(self
)
147 def clientSetUp(self
):
148 self
.cli
= socket
.socket(socket
.AF_INET
, socket
.SOCK_STREAM
)
150 def clientTearDown(self
):
153 ThreadableTest
.clientTearDown(self
)
155 class ThreadedUDPSocketTest(SocketUDPTest
, ThreadableTest
):
157 def __init__(self
, methodName
='runTest'):
158 SocketUDPTest
.__init
__(self
, methodName
=methodName
)
159 ThreadableTest
.__init
__(self
)
161 def clientSetUp(self
):
162 self
.cli
= socket
.socket(socket
.AF_INET
, socket
.SOCK_DGRAM
)
164 class SocketConnectedTest(ThreadedTCPSocketTest
):
166 def __init__(self
, methodName
='runTest'):
167 ThreadedTCPSocketTest
.__init
__(self
, methodName
=methodName
)
170 ThreadedTCPSocketTest
.setUp(self
)
171 # Indicate explicitly we're ready for the client thread to
172 # proceed and then perform the blocking call to accept
173 self
.serverExplicitReady()
174 conn
, addr
= self
.serv
.accept()
178 self
.cli_conn
.close()
180 ThreadedTCPSocketTest
.tearDown(self
)
182 def clientSetUp(self
):
183 ThreadedTCPSocketTest
.clientSetUp(self
)
184 self
.cli
.connect((HOST
, self
.port
))
185 self
.serv_conn
= self
.cli
187 def clientTearDown(self
):
188 self
.serv_conn
.close()
189 self
.serv_conn
= None
190 ThreadedTCPSocketTest
.clientTearDown(self
)
192 class SocketPairTest(unittest
.TestCase
, ThreadableTest
):
194 def __init__(self
, methodName
='runTest'):
195 unittest
.TestCase
.__init
__(self
, methodName
=methodName
)
196 ThreadableTest
.__init
__(self
)
199 self
.serv
, self
.cli
= socket
.socketpair()
205 def clientSetUp(self
):
208 def clientTearDown(self
):
211 ThreadableTest
.clientTearDown(self
)
214 #######################################################################
217 class GeneralModuleTests(unittest
.TestCase
):
219 def test_weakref(self
):
220 s
= socket
.socket(socket
.AF_INET
, socket
.SOCK_STREAM
)
222 self
.assertEqual(p
.fileno(), s
.fileno())
227 except ReferenceError:
230 self
.fail('Socket proxy still exists')
232 def testSocketError(self
):
233 # Testing socket module exceptions
234 def raise_error(*args
, **kwargs
):
236 def raise_herror(*args
, **kwargs
):
238 def raise_gaierror(*args
, **kwargs
):
239 raise socket
.gaierror
240 self
.assertRaises(socket
.error
, raise_error
,
241 "Error raising socket exception.")
242 self
.assertRaises(socket
.error
, raise_herror
,
243 "Error raising socket exception.")
244 self
.assertRaises(socket
.error
, raise_gaierror
,
245 "Error raising socket exception.")
247 def testCrucialConstants(self
):
248 # Testing for mission critical constants
254 socket
.SOCK_SEQPACKET
258 def testHostnameRes(self
):
259 # Testing hostname resolution mechanisms
260 hostname
= socket
.gethostname()
262 ip
= socket
.gethostbyname(hostname
)
264 # Probably name lookup wasn't set up right; skip this test
266 self
.assertTrue(ip
.find('.') >= 0, "Error resolving host to ip.")
268 hname
, aliases
, ipaddrs
= socket
.gethostbyaddr(ip
)
270 # Probably a similar problem as above; skip this test
272 all_host_names
= [hostname
, hname
] + aliases
273 fqhn
= socket
.getfqdn(ip
)
274 if not fqhn
in all_host_names
:
275 self
.fail("Error testing host resolution mechanisms. (fqdn: %s, all: %s)" % (fqhn
, repr(all_host_names
)))
277 def testRefCountGetNameInfo(self
):
278 # Testing reference count for getnameinfo
279 if hasattr(sys
, "getrefcount"):
281 # On some versions, this loses a reference
282 orig
= sys
.getrefcount(__name__
)
283 socket
.getnameinfo(__name__
,0)
285 if sys
.getrefcount(__name__
) <> orig
:
286 self
.fail("socket.getnameinfo loses a reference")
288 def testInterpreterCrash(self
):
289 # Making sure getnameinfo doesn't crash the interpreter
291 # On some versions, this crashes the interpreter.
292 socket
.getnameinfo(('x', 0, 0, 0), 0)
297 # This just checks that htons etc. are their own inverse,
298 # when looking at the lower 16 or 32 bits.
299 sizes
= {socket
.htonl
: 32, socket
.ntohl
: 32,
300 socket
.htons
: 16, socket
.ntohs
: 16}
301 for func
, size
in sizes
.items():
302 mask
= (1L<<size
) - 1
303 for i
in (0, 1, 0xffff, ~
0xffff, 2, 0x01234567, 0x76543210):
304 self
.assertEqual(i
& mask
, func(func(i
&mask
)) & mask
)
307 self
.assertEqual(swapped
& mask
, mask
)
308 self
.assertRaises(OverflowError, func
, 1L<<34)
310 def testNtoHErrors(self
):
311 good_values
= [ 1, 2, 3, 1L, 2L, 3L ]
312 bad_values
= [ -1, -2, -3, -1L, -2L, -3L ]
313 for k
in good_values
:
319 self
.assertRaises(OverflowError, socket
.ntohl
, k
)
320 self
.assertRaises(OverflowError, socket
.ntohs
, k
)
321 self
.assertRaises(OverflowError, socket
.htonl
, k
)
322 self
.assertRaises(OverflowError, socket
.htons
, k
)
324 def testGetServBy(self
):
325 eq
= self
.assertEqual
326 # Find one service that exists, then check all the related interfaces.
327 # I've ordered this by protocols that have both a tcp and udp
328 # protocol, at least for modern Linuxes.
329 if sys
.platform
in ('linux2', 'freebsd4', 'freebsd5', 'freebsd6',
330 'freebsd7', 'freebsd8', 'darwin'):
331 # avoid the 'echo' service on this platform, as there is an
332 # assumption breaking non-standard port/protocol entry
333 services
= ('daytime', 'qotd', 'domain')
335 services
= ('echo', 'daytime', 'domain')
336 for service
in services
:
338 port
= socket
.getservbyname(service
, 'tcp')
344 # Try same call with optional protocol omitted
345 port2
= socket
.getservbyname(service
)
347 # Try udp, but don't barf it it doesn't exist
349 udpport
= socket
.getservbyname(service
, 'udp')
354 # Now make sure the lookup by port returns the same service name
355 eq(socket
.getservbyport(port2
), service
)
356 eq(socket
.getservbyport(port
, 'tcp'), service
)
357 if udpport
is not None:
358 eq(socket
.getservbyport(udpport
, 'udp'), service
)
359 # Make sure getservbyport does not accept out of range ports.
360 self
.assertRaises(OverflowError, socket
.getservbyport
, -1)
361 self
.assertRaises(OverflowError, socket
.getservbyport
, 65536)
363 def testDefaultTimeout(self
):
364 # Testing default timeout
365 # The default timeout should initially be None
366 self
.assertEqual(socket
.getdefaulttimeout(), None)
368 self
.assertEqual(s
.gettimeout(), None)
371 # Set the default timeout to 10, and see if it propagates
372 socket
.setdefaulttimeout(10)
373 self
.assertEqual(socket
.getdefaulttimeout(), 10)
375 self
.assertEqual(s
.gettimeout(), 10)
378 # Reset the default timeout to None, and see if it propagates
379 socket
.setdefaulttimeout(None)
380 self
.assertEqual(socket
.getdefaulttimeout(), None)
382 self
.assertEqual(s
.gettimeout(), None)
385 # Check that setting it to an invalid value raises ValueError
386 self
.assertRaises(ValueError, socket
.setdefaulttimeout
, -1)
388 # Check that setting it to an invalid type raises TypeError
389 self
.assertRaises(TypeError, socket
.setdefaulttimeout
, "spam")
391 def testIPv4_inet_aton_fourbytes(self
):
392 if not hasattr(socket
, 'inet_aton'):
393 return # No inet_aton, nothing to check
394 # Test that issue1008086 and issue767150 are fixed.
395 # It must return 4 bytes.
396 self
.assertEquals('\x00'*4, socket
.inet_aton('0.0.0.0'))
397 self
.assertEquals('\xff'*4, socket
.inet_aton('255.255.255.255'))
399 def testIPv4toString(self
):
400 if not hasattr(socket
, 'inet_pton'):
401 return # No inet_pton() on this platform
402 from socket
import inet_aton
as f
, inet_pton
, AF_INET
403 g
= lambda a
: inet_pton(AF_INET
, a
)
405 self
.assertEquals('\x00\x00\x00\x00', f('0.0.0.0'))
406 self
.assertEquals('\xff\x00\xff\x00', f('255.0.255.0'))
407 self
.assertEquals('\xaa\xaa\xaa\xaa', f('170.170.170.170'))
408 self
.assertEquals('\x01\x02\x03\x04', f('1.2.3.4'))
409 self
.assertEquals('\xff\xff\xff\xff', f('255.255.255.255'))
411 self
.assertEquals('\x00\x00\x00\x00', g('0.0.0.0'))
412 self
.assertEquals('\xff\x00\xff\x00', g('255.0.255.0'))
413 self
.assertEquals('\xaa\xaa\xaa\xaa', g('170.170.170.170'))
414 self
.assertEquals('\xff\xff\xff\xff', g('255.255.255.255'))
416 def testIPv6toString(self
):
417 if not hasattr(socket
, 'inet_pton'):
418 return # No inet_pton() on this platform
420 from socket
import inet_pton
, AF_INET6
, has_ipv6
425 f
= lambda a
: inet_pton(AF_INET6
, a
)
427 self
.assertEquals('\x00' * 16, f('::'))
428 self
.assertEquals('\x00' * 16, f('0::0'))
429 self
.assertEquals('\x00\x01' + '\x00' * 14, f('1::'))
431 '\x45\xef\x76\xcb\x00\x1a\x56\xef\xaf\xeb\x0b\xac\x19\x24\xae\xae',
432 f('45ef:76cb:1a:56ef:afeb:bac:1924:aeae')
435 def testStringToIPv4(self
):
436 if not hasattr(socket
, 'inet_ntop'):
437 return # No inet_ntop() on this platform
438 from socket
import inet_ntoa
as f
, inet_ntop
, AF_INET
439 g
= lambda a
: inet_ntop(AF_INET
, a
)
441 self
.assertEquals('1.0.1.0', f('\x01\x00\x01\x00'))
442 self
.assertEquals('170.85.170.85', f('\xaa\x55\xaa\x55'))
443 self
.assertEquals('255.255.255.255', f('\xff\xff\xff\xff'))
444 self
.assertEquals('1.2.3.4', f('\x01\x02\x03\x04'))
446 self
.assertEquals('1.0.1.0', g('\x01\x00\x01\x00'))
447 self
.assertEquals('170.85.170.85', g('\xaa\x55\xaa\x55'))
448 self
.assertEquals('255.255.255.255', g('\xff\xff\xff\xff'))
450 def testStringToIPv6(self
):
451 if not hasattr(socket
, 'inet_ntop'):
452 return # No inet_ntop() on this platform
454 from socket
import inet_ntop
, AF_INET6
, has_ipv6
459 f
= lambda a
: inet_ntop(AF_INET6
, a
)
461 self
.assertEquals('::', f('\x00' * 16))
462 self
.assertEquals('::1', f('\x00' * 15 + '\x01'))
464 'aef:b01:506:1001:ffff:9997:55:170',
465 f('\x0a\xef\x0b\x01\x05\x06\x10\x01\xff\xff\x99\x97\x00\x55\x01\x70')
468 # XXX The following don't test module-level functionality...
470 def _get_unused_port(self
, bind_address
='0.0.0.0'):
471 """Use a temporary socket to elicit an unused ephemeral port.
474 bind_address: Hostname or IP address to search for a port on.
476 Returns: A most likely to be unused port.
478 tempsock
= socket
.socket()
479 tempsock
.bind((bind_address
, 0))
480 host
, port
= tempsock
.getsockname()
484 def testSockName(self
):
485 # Testing getsockname()
486 port
= self
._get
_unused
_port
()
487 sock
= socket
.socket(socket
.AF_INET
, socket
.SOCK_STREAM
)
488 sock
.bind(("0.0.0.0", port
))
489 name
= sock
.getsockname()
490 # XXX(nnorwitz): http://tinyurl.com/os5jz seems to indicate
491 # it reasonable to get the host's addr in addition to 0.0.0.0.
492 # At least for eCos. This is required for the S/390 to pass.
493 my_ip_addr
= socket
.gethostbyname(socket
.gethostname())
494 self
.assertTrue(name
[0] in ("0.0.0.0", my_ip_addr
), '%s invalid' % name
[0])
495 self
.assertEqual(name
[1], port
)
497 def testGetSockOpt(self
):
498 # Testing getsockopt()
499 # We know a socket should start without reuse==0
500 sock
= socket
.socket(socket
.AF_INET
, socket
.SOCK_STREAM
)
501 reuse
= sock
.getsockopt(socket
.SOL_SOCKET
, socket
.SO_REUSEADDR
)
502 self
.assertFalse(reuse
!= 0, "initial mode is reuse")
504 def testSetSockOpt(self
):
505 # Testing setsockopt()
506 sock
= socket
.socket(socket
.AF_INET
, socket
.SOCK_STREAM
)
507 sock
.setsockopt(socket
.SOL_SOCKET
, socket
.SO_REUSEADDR
, 1)
508 reuse
= sock
.getsockopt(socket
.SOL_SOCKET
, socket
.SO_REUSEADDR
)
509 self
.assertFalse(reuse
== 0, "failed to set reuse mode")
511 def testSendAfterClose(self
):
512 # testing send() after close() with timeout
513 sock
= socket
.socket(socket
.AF_INET
, socket
.SOCK_STREAM
)
516 self
.assertRaises(socket
.error
, sock
.send
, "spam")
518 def testNewAttributes(self
):
519 # testing .family, .type and .protocol
520 sock
= socket
.socket(socket
.AF_INET
, socket
.SOCK_STREAM
)
521 self
.assertEqual(sock
.family
, socket
.AF_INET
)
522 self
.assertEqual(sock
.type, socket
.SOCK_STREAM
)
523 self
.assertEqual(sock
.proto
, 0)
526 def test_getsockaddrarg(self
):
528 port
= self
._get
_unused
_port
(bind_address
=host
)
529 big_port
= port
+ 65536
530 neg_port
= port
- 65536
531 sock
= socket
.socket()
533 self
.assertRaises(OverflowError, sock
.bind
, (host
, big_port
))
534 self
.assertRaises(OverflowError, sock
.bind
, (host
, neg_port
))
535 sock
.bind((host
, port
))
539 def test_sock_ioctl(self
):
542 self
.assertTrue(hasattr(socket
.socket
, 'ioctl'))
543 self
.assertTrue(hasattr(socket
, 'SIO_RCVALL'))
544 self
.assertTrue(hasattr(socket
, 'RCVALL_ON'))
545 self
.assertTrue(hasattr(socket
, 'RCVALL_OFF'))
546 self
.assertTrue(hasattr(socket
, 'SIO_KEEPALIVE_VALS'))
548 self
.assertRaises(ValueError, s
.ioctl
, -1, None)
549 s
.ioctl(socket
.SIO_KEEPALIVE_VALS
, (1, 100, 100))
552 class BasicTCPTest(SocketConnectedTest
):
554 def __init__(self
, methodName
='runTest'):
555 SocketConnectedTest
.__init
__(self
, methodName
=methodName
)
558 # Testing large receive over TCP
559 msg
= self
.cli_conn
.recv(1024)
560 self
.assertEqual(msg
, MSG
)
563 self
.serv_conn
.send(MSG
)
565 def testOverFlowRecv(self
):
566 # Testing receive in chunks over TCP
567 seg1
= self
.cli_conn
.recv(len(MSG
) - 3)
568 seg2
= self
.cli_conn
.recv(1024)
570 self
.assertEqual(msg
, MSG
)
572 def _testOverFlowRecv(self
):
573 self
.serv_conn
.send(MSG
)
575 def testRecvFrom(self
):
576 # Testing large recvfrom() over TCP
577 msg
, addr
= self
.cli_conn
.recvfrom(1024)
578 self
.assertEqual(msg
, MSG
)
580 def _testRecvFrom(self
):
581 self
.serv_conn
.send(MSG
)
583 def testOverFlowRecvFrom(self
):
584 # Testing recvfrom() in chunks over TCP
585 seg1
, addr
= self
.cli_conn
.recvfrom(len(MSG
)-3)
586 seg2
, addr
= self
.cli_conn
.recvfrom(1024)
588 self
.assertEqual(msg
, MSG
)
590 def _testOverFlowRecvFrom(self
):
591 self
.serv_conn
.send(MSG
)
593 def testSendAll(self
):
594 # Testing sendall() with a 2048 byte string over TCP
597 read
= self
.cli_conn
.recv(1024)
601 self
.assertEqual(msg
, 'f' * 2048)
603 def _testSendAll(self
):
604 big_chunk
= 'f' * 2048
605 self
.serv_conn
.sendall(big_chunk
)
607 def testFromFd(self
):
609 if not hasattr(socket
, "fromfd"):
610 return # On Windows, this doesn't exist
611 fd
= self
.cli_conn
.fileno()
612 sock
= socket
.fromfd(fd
, socket
.AF_INET
, socket
.SOCK_STREAM
)
613 msg
= sock
.recv(1024)
614 self
.assertEqual(msg
, MSG
)
616 def _testFromFd(self
):
617 self
.serv_conn
.send(MSG
)
619 def testShutdown(self
):
621 msg
= self
.cli_conn
.recv(1024)
622 self
.assertEqual(msg
, MSG
)
623 # wait for _testShutdown to finish: on OS X, when the server
624 # closes the connection the client also becomes disconnected,
625 # and the client's shutdown call will fail. (Issue #4397.)
628 def _testShutdown(self
):
629 self
.serv_conn
.send(MSG
)
630 self
.serv_conn
.shutdown(2)
632 class BasicUDPTest(ThreadedUDPSocketTest
):
634 def __init__(self
, methodName
='runTest'):
635 ThreadedUDPSocketTest
.__init
__(self
, methodName
=methodName
)
637 def testSendtoAndRecv(self
):
638 # Testing sendto() and Recv() over UDP
639 msg
= self
.serv
.recv(len(MSG
))
640 self
.assertEqual(msg
, MSG
)
642 def _testSendtoAndRecv(self
):
643 self
.cli
.sendto(MSG
, 0, (HOST
, self
.port
))
645 def testRecvFrom(self
):
646 # Testing recvfrom() over UDP
647 msg
, addr
= self
.serv
.recvfrom(len(MSG
))
648 self
.assertEqual(msg
, MSG
)
650 def _testRecvFrom(self
):
651 self
.cli
.sendto(MSG
, 0, (HOST
, self
.port
))
653 def testRecvFromNegative(self
):
654 # Negative lengths passed to recvfrom should give ValueError.
655 self
.assertRaises(ValueError, self
.serv
.recvfrom
, -1)
657 def _testRecvFromNegative(self
):
658 self
.cli
.sendto(MSG
, 0, (HOST
, self
.port
))
660 class TCPCloserTest(ThreadedTCPSocketTest
):
663 conn
, addr
= self
.serv
.accept()
667 read
, write
, err
= select
.select([sd
], [], [], 1.0)
668 self
.assertEqual(read
, [sd
])
669 self
.assertEqual(sd
.recv(1), '')
671 def _testClose(self
):
672 self
.cli
.connect((HOST
, self
.port
))
675 class BasicSocketPairTest(SocketPairTest
):
677 def __init__(self
, methodName
='runTest'):
678 SocketPairTest
.__init
__(self
, methodName
=methodName
)
681 msg
= self
.serv
.recv(1024)
682 self
.assertEqual(msg
, MSG
)
691 msg
= self
.cli
.recv(1024)
692 self
.assertEqual(msg
, MSG
)
694 class NonBlockingTCPTests(ThreadedTCPSocketTest
):
696 def __init__(self
, methodName
='runTest'):
697 ThreadedTCPSocketTest
.__init
__(self
, methodName
=methodName
)
699 def testSetBlocking(self
):
700 # Testing whether set blocking works
701 self
.serv
.setblocking(0)
708 self
.assertTrue((end
- start
) < 1.0, "Error setting non-blocking mode.")
710 def _testSetBlocking(self
):
713 def testAccept(self
):
714 # Testing non-blocking accept
715 self
.serv
.setblocking(0)
717 conn
, addr
= self
.serv
.accept()
721 self
.fail("Error trying to do non-blocking accept.")
722 read
, write
, err
= select
.select([self
.serv
], [], [])
723 if self
.serv
in read
:
724 conn
, addr
= self
.serv
.accept()
726 self
.fail("Error trying to do accept after select.")
728 def _testAccept(self
):
730 self
.cli
.connect((HOST
, self
.port
))
732 def testConnect(self
):
733 # Testing non-blocking connect
734 conn
, addr
= self
.serv
.accept()
736 def _testConnect(self
):
737 self
.cli
.settimeout(10)
738 self
.cli
.connect((HOST
, self
.port
))
741 # Testing non-blocking recv
742 conn
, addr
= self
.serv
.accept()
745 msg
= conn
.recv(len(MSG
))
749 self
.fail("Error trying to do non-blocking recv.")
750 read
, write
, err
= select
.select([conn
], [], [])
752 msg
= conn
.recv(len(MSG
))
753 self
.assertEqual(msg
, MSG
)
755 self
.fail("Error during select call to non-blocking socket.")
758 self
.cli
.connect((HOST
, self
.port
))
762 class FileObjectClassTestCase(SocketConnectedTest
):
764 bufsize
= -1 # Use default buffer size
766 def __init__(self
, methodName
='runTest'):
767 SocketConnectedTest
.__init
__(self
, methodName
=methodName
)
770 SocketConnectedTest
.setUp(self
)
771 self
.serv_file
= self
.cli_conn
.makefile('rb', self
.bufsize
)
774 self
.serv_file
.close()
775 self
.assertTrue(self
.serv_file
.closed
)
776 self
.serv_file
= None
777 SocketConnectedTest
.tearDown(self
)
779 def clientSetUp(self
):
780 SocketConnectedTest
.clientSetUp(self
)
781 self
.cli_file
= self
.serv_conn
.makefile('wb')
783 def clientTearDown(self
):
784 self
.cli_file
.close()
785 self
.assertTrue(self
.cli_file
.closed
)
787 SocketConnectedTest
.clientTearDown(self
)
789 def testSmallRead(self
):
790 # Performing small file read test
791 first_seg
= self
.serv_file
.read(len(MSG
)-3)
792 second_seg
= self
.serv_file
.read(3)
793 msg
= first_seg
+ second_seg
794 self
.assertEqual(msg
, MSG
)
796 def _testSmallRead(self
):
797 self
.cli_file
.write(MSG
)
798 self
.cli_file
.flush()
800 def testFullRead(self
):
802 msg
= self
.serv_file
.read()
803 self
.assertEqual(msg
, MSG
)
805 def _testFullRead(self
):
806 self
.cli_file
.write(MSG
)
807 self
.cli_file
.close()
809 def testUnbufferedRead(self
):
810 # Performing unbuffered file read test
813 char
= self
.serv_file
.read(1)
817 self
.assertEqual(buf
, MSG
)
819 def _testUnbufferedRead(self
):
820 self
.cli_file
.write(MSG
)
821 self
.cli_file
.flush()
823 def testReadline(self
):
824 # Performing file readline test
825 line
= self
.serv_file
.readline()
826 self
.assertEqual(line
, MSG
)
828 def _testReadline(self
):
829 self
.cli_file
.write(MSG
)
830 self
.cli_file
.flush()
832 def testReadlineAfterRead(self
):
833 a_baloo_is
= self
.serv_file
.read(len("A baloo is"))
834 self
.assertEqual("A baloo is", a_baloo_is
)
835 _a_bear
= self
.serv_file
.read(len(" a bear"))
836 self
.assertEqual(" a bear", _a_bear
)
837 line
= self
.serv_file
.readline()
838 self
.assertEqual("\n", line
)
839 line
= self
.serv_file
.readline()
840 self
.assertEqual("A BALOO IS A BEAR.\n", line
)
841 line
= self
.serv_file
.readline()
842 self
.assertEqual(MSG
, line
)
844 def _testReadlineAfterRead(self
):
845 self
.cli_file
.write("A baloo is a bear\n")
846 self
.cli_file
.write("A BALOO IS A BEAR.\n")
847 self
.cli_file
.write(MSG
)
848 self
.cli_file
.flush()
850 def testReadlineAfterReadNoNewline(self
):
851 end_of_
= self
.serv_file
.read(len("End Of "))
852 self
.assertEqual("End Of ", end_of_
)
853 line
= self
.serv_file
.readline()
854 self
.assertEqual("Line", line
)
856 def _testReadlineAfterReadNoNewline(self
):
857 self
.cli_file
.write("End Of Line")
859 def testClosedAttr(self
):
860 self
.assertTrue(not self
.serv_file
.closed
)
862 def _testClosedAttr(self
):
863 self
.assertTrue(not self
.cli_file
.closed
)
866 class FileObjectInterruptedTestCase(unittest
.TestCase
):
867 """Test that the file object correctly handles EINTR internally."""
869 class MockSocket(object):
870 def __init__(self
, recv_funcs
=()):
871 # A generator that returns callables that we'll call for each
873 self
._recv
_step
= iter(recv_funcs
)
875 def recv(self
, size
):
876 return self
._recv
_step
.next()()
880 raise socket
.error(errno
.EINTR
)
882 def _test_readline(self
, size
=-1, **kwargs
):
883 mock_sock
= self
.MockSocket(recv_funcs
=[
884 lambda : "This is the first line\nAnd the sec",
886 lambda : "ond line is here\n",
889 fo
= socket
._fileobject
(mock_sock
, **kwargs
)
890 self
.assertEquals(fo
.readline(size
), "This is the first line\n")
891 self
.assertEquals(fo
.readline(size
), "And the second line is here\n")
893 def _test_read(self
, size
=-1, **kwargs
):
894 mock_sock
= self
.MockSocket(recv_funcs
=[
895 lambda : "This is the first line\nAnd the sec",
897 lambda : "ond line is here\n",
900 fo
= socket
._fileobject
(mock_sock
, **kwargs
)
901 self
.assertEquals(fo
.read(size
), "This is the first line\n"
902 "And the second line is here\n")
904 def test_default(self
):
905 self
._test
_readline
()
906 self
._test
_readline
(size
=100)
908 self
._test
_read
(size
=100)
910 def test_with_1k_buffer(self
):
911 self
._test
_readline
(bufsize
=1024)
912 self
._test
_readline
(size
=100, bufsize
=1024)
913 self
._test
_read
(bufsize
=1024)
914 self
._test
_read
(size
=100, bufsize
=1024)
916 def _test_readline_no_buffer(self
, size
=-1):
917 mock_sock
= self
.MockSocket(recv_funcs
=[
925 fo
= socket
._fileobject
(mock_sock
, bufsize
=0)
926 self
.assertEquals(fo
.readline(size
), "aa\n")
927 self
.assertEquals(fo
.readline(size
), "BBbb")
929 def test_no_buffer(self
):
930 self
._test
_readline
_no
_buffer
()
931 self
._test
_readline
_no
_buffer
(size
=4)
932 self
._test
_read
(bufsize
=0)
933 self
._test
_read
(size
=100, bufsize
=0)
936 class UnbufferedFileObjectClassTestCase(FileObjectClassTestCase
):
938 """Repeat the tests from FileObjectClassTestCase with bufsize==0.
940 In this case (and in this case only), it should be possible to
941 create a file object, read a line from it, create another file
942 object, read another line from it, without loss of data in the
943 first file object's buffer. Note that httplib relies on this
944 when reading multiple requests from the same socket."""
946 bufsize
= 0 # Use unbuffered mode
948 def testUnbufferedReadline(self
):
949 # Read a line, create a new file object, read another line with it
950 line
= self
.serv_file
.readline() # first line
951 self
.assertEqual(line
, "A. " + MSG
) # first line
952 self
.serv_file
= self
.cli_conn
.makefile('rb', 0)
953 line
= self
.serv_file
.readline() # second line
954 self
.assertEqual(line
, "B. " + MSG
) # second line
956 def _testUnbufferedReadline(self
):
957 self
.cli_file
.write("A. " + MSG
)
958 self
.cli_file
.write("B. " + MSG
)
959 self
.cli_file
.flush()
961 class LineBufferedFileObjectClassTestCase(FileObjectClassTestCase
):
963 bufsize
= 1 # Default-buffered for reading; line-buffered for writing
966 class SmallBufferedFileObjectClassTestCase(FileObjectClassTestCase
):
968 bufsize
= 2 # Exercise the buffering code
971 class NetworkConnectionTest(object):
972 """Prove network connection."""
973 def clientSetUp(self
):
974 # We're inherited below by BasicTCPTest2, which also inherits
975 # BasicTCPTest, which defines self.port referenced below.
976 self
.cli
= socket
.create_connection((HOST
, self
.port
))
977 self
.serv_conn
= self
.cli
979 class BasicTCPTest2(NetworkConnectionTest
, BasicTCPTest
):
980 """Tests that NetworkConnection does not break existing TCP functionality.
983 class NetworkConnectionNoServer(unittest
.TestCase
):
984 def testWithoutServer(self
):
985 port
= test_support
.find_unused_port()
988 lambda: socket
.create_connection((HOST
, port
))
991 class NetworkConnectionAttributesTest(SocketTCPTest
, ThreadableTest
):
993 def __init__(self
, methodName
='runTest'):
994 SocketTCPTest
.__init
__(self
, methodName
=methodName
)
995 ThreadableTest
.__init
__(self
)
997 def clientSetUp(self
):
1000 def clientTearDown(self
):
1003 ThreadableTest
.clientTearDown(self
)
1005 def _justAccept(self
):
1006 conn
, addr
= self
.serv
.accept()
1008 testFamily
= _justAccept
1009 def _testFamily(self
):
1010 self
.cli
= socket
.create_connection((HOST
, self
.port
), timeout
=30)
1011 self
.assertEqual(self
.cli
.family
, 2)
1013 testTimeoutDefault
= _justAccept
1014 def _testTimeoutDefault(self
):
1015 # passing no explicit timeout uses socket's global default
1016 self
.assertTrue(socket
.getdefaulttimeout() is None)
1017 socket
.setdefaulttimeout(42)
1019 self
.cli
= socket
.create_connection((HOST
, self
.port
))
1021 socket
.setdefaulttimeout(None)
1022 self
.assertEquals(self
.cli
.gettimeout(), 42)
1024 testTimeoutNone
= _justAccept
1025 def _testTimeoutNone(self
):
1026 # None timeout means the same as sock.settimeout(None)
1027 self
.assertTrue(socket
.getdefaulttimeout() is None)
1028 socket
.setdefaulttimeout(30)
1030 self
.cli
= socket
.create_connection((HOST
, self
.port
), timeout
=None)
1032 socket
.setdefaulttimeout(None)
1033 self
.assertEqual(self
.cli
.gettimeout(), None)
1035 testTimeoutValueNamed
= _justAccept
1036 def _testTimeoutValueNamed(self
):
1037 self
.cli
= socket
.create_connection((HOST
, self
.port
), timeout
=30)
1038 self
.assertEqual(self
.cli
.gettimeout(), 30)
1040 testTimeoutValueNonamed
= _justAccept
1041 def _testTimeoutValueNonamed(self
):
1042 self
.cli
= socket
.create_connection((HOST
, self
.port
), 30)
1043 self
.assertEqual(self
.cli
.gettimeout(), 30)
1045 class NetworkConnectionBehaviourTest(SocketTCPTest
, ThreadableTest
):
1047 def __init__(self
, methodName
='runTest'):
1048 SocketTCPTest
.__init
__(self
, methodName
=methodName
)
1049 ThreadableTest
.__init
__(self
)
1051 def clientSetUp(self
):
1054 def clientTearDown(self
):
1057 ThreadableTest
.clientTearDown(self
)
1059 def testInsideTimeout(self
):
1060 conn
, addr
= self
.serv
.accept()
1063 testOutsideTimeout
= testInsideTimeout
1065 def _testInsideTimeout(self
):
1066 self
.cli
= sock
= socket
.create_connection((HOST
, self
.port
))
1068 self
.assertEqual(data
, "done!")
1070 def _testOutsideTimeout(self
):
1071 self
.cli
= sock
= socket
.create_connection((HOST
, self
.port
), timeout
=1)
1072 self
.assertRaises(socket
.timeout
, lambda: sock
.recv(5))
1075 class Urllib2FileobjectTest(unittest
.TestCase
):
1077 # urllib2.HTTPHandler has "borrowed" socket._fileobject, and requires that
1078 # it close the socket if the close c'tor argument is true
1080 def testClose(self
):
1083 def flush(self
): pass
1084 def close(self
): self
.closed
= True
1086 # must not close unless we request it: the original use of _fileobject
1087 # by module socket requires that the underlying socket not be closed until
1088 # the _socketobject that created the _fileobject is closed
1090 f
= socket
._fileobject
(s
)
1092 self
.assertTrue(not s
.closed
)
1095 f
= socket
._fileobject
(s
, close
=True)
1097 self
.assertTrue(s
.closed
)
1099 class TCPTimeoutTest(SocketTCPTest
):
1101 def testTCPTimeout(self
):
1102 def raise_timeout(*args
, **kwargs
):
1103 self
.serv
.settimeout(1.0)
1105 self
.assertRaises(socket
.timeout
, raise_timeout
,
1106 "Error generating a timeout exception (TCP)")
1108 def testTimeoutZero(self
):
1111 self
.serv
.settimeout(0.0)
1112 foo
= self
.serv
.accept()
1113 except socket
.timeout
:
1114 self
.fail("caught timeout instead of error (TCP)")
1115 except socket
.error
:
1118 self
.fail("caught unexpected exception (TCP)")
1120 self
.fail("accept() returned success when we did not expect it")
1122 def testInterruptedTimeout(self
):
1123 # XXX I don't know how to do this test on MSWindows or any other
1124 # plaform that doesn't support signal.alarm() or os.kill(), though
1125 # the bug should have existed on all platforms.
1126 if not hasattr(signal
, "alarm"):
1127 return # can only test on *nix
1128 self
.serv
.settimeout(5.0) # must be longer than alarm
1129 class Alarm(Exception):
1131 def alarm_handler(signal
, frame
):
1133 old_alarm
= signal
.signal(signal
.SIGALRM
, alarm_handler
)
1135 signal
.alarm(2) # POSIX allows alarm to be up to 1 second early
1137 foo
= self
.serv
.accept()
1138 except socket
.timeout
:
1139 self
.fail("caught timeout instead of Alarm")
1143 self
.fail("caught other exception instead of Alarm:"
1145 (sys
.exc_info()[:2] + (traceback
.format_exc(),)))
1147 self
.fail("nothing caught")
1149 signal
.alarm(0) # shut off alarm
1151 self
.fail("got Alarm in wrong place")
1153 # no alarm can be pending. Safe to restore old handler.
1154 signal
.signal(signal
.SIGALRM
, old_alarm
)
1156 class UDPTimeoutTest(SocketTCPTest
):
1158 def testUDPTimeout(self
):
1159 def raise_timeout(*args
, **kwargs
):
1160 self
.serv
.settimeout(1.0)
1161 self
.serv
.recv(1024)
1162 self
.assertRaises(socket
.timeout
, raise_timeout
,
1163 "Error generating a timeout exception (UDP)")
1165 def testTimeoutZero(self
):
1168 self
.serv
.settimeout(0.0)
1169 foo
= self
.serv
.recv(1024)
1170 except socket
.timeout
:
1171 self
.fail("caught timeout instead of error (UDP)")
1172 except socket
.error
:
1175 self
.fail("caught unexpected exception (UDP)")
1177 self
.fail("recv() returned success when we did not expect it")
1179 class TestExceptions(unittest
.TestCase
):
1181 def testExceptionTree(self
):
1182 self
.assertTrue(issubclass(socket
.error
, Exception))
1183 self
.assertTrue(issubclass(socket
.herror
, socket
.error
))
1184 self
.assertTrue(issubclass(socket
.gaierror
, socket
.error
))
1185 self
.assertTrue(issubclass(socket
.timeout
, socket
.error
))
1187 class TestLinuxAbstractNamespace(unittest
.TestCase
):
1191 def testLinuxAbstractNamespace(self
):
1192 address
= "\x00python-test-hello\x00\xff"
1193 s1
= socket
.socket(socket
.AF_UNIX
, socket
.SOCK_STREAM
)
1196 s2
= socket
.socket(socket
.AF_UNIX
, socket
.SOCK_STREAM
)
1197 s2
.connect(s1
.getsockname())
1199 self
.assertEqual(s1
.getsockname(), address
)
1200 self
.assertEqual(s2
.getpeername(), address
)
1202 def testMaxName(self
):
1203 address
= "\x00" + "h" * (self
.UNIX_PATH_MAX
- 1)
1204 s
= socket
.socket(socket
.AF_UNIX
, socket
.SOCK_STREAM
)
1206 self
.assertEqual(s
.getsockname(), address
)
1208 def testNameOverflow(self
):
1209 address
= "\x00" + "h" * self
.UNIX_PATH_MAX
1210 s
= socket
.socket(socket
.AF_UNIX
, socket
.SOCK_STREAM
)
1211 self
.assertRaises(socket
.error
, s
.bind
, address
)
1214 class BufferIOTest(SocketConnectedTest
):
1216 Test the buffer versions of socket.recv() and socket.send().
1218 def __init__(self
, methodName
='runTest'):
1219 SocketConnectedTest
.__init
__(self
, methodName
=methodName
)
1221 def testRecvInto(self
):
1222 buf
= array
.array('c', ' '*1024)
1223 nbytes
= self
.cli_conn
.recv_into(buf
)
1224 self
.assertEqual(nbytes
, len(MSG
))
1225 msg
= buf
.tostring()[:len(MSG
)]
1226 self
.assertEqual(msg
, MSG
)
1228 def _testRecvInto(self
):
1230 self
.serv_conn
.send(buf
)
1232 def testRecvFromInto(self
):
1233 buf
= array
.array('c', ' '*1024)
1234 nbytes
, addr
= self
.cli_conn
.recvfrom_into(buf
)
1235 self
.assertEqual(nbytes
, len(MSG
))
1236 msg
= buf
.tostring()[:len(MSG
)]
1237 self
.assertEqual(msg
, MSG
)
1239 def _testRecvFromInto(self
):
1241 self
.serv_conn
.send(buf
)
1248 def isTipcAvailable():
1249 """Check if the TIPC module is loaded
1251 The TIPC module is not loaded automatically on Ubuntu and probably
1252 other Linux distros.
1254 if not hasattr(socket
, "AF_TIPC"):
1256 if not os
.path
.isfile("/proc/modules"):
1258 with
open("/proc/modules") as f
:
1260 if line
.startswith("tipc "):
1262 if test_support
.verbose
:
1263 print "TIPC module is not loaded, please 'sudo modprobe tipc'"
1266 class TIPCTest (unittest
.TestCase
):
1268 srv
= socket
.socket(socket
.AF_TIPC
, socket
.SOCK_RDM
)
1269 cli
= socket
.socket(socket
.AF_TIPC
, socket
.SOCK_RDM
)
1271 srv
.setsockopt(socket
.SOL_SOCKET
, socket
.SO_REUSEADDR
, 1)
1272 srvaddr
= (socket
.TIPC_ADDR_NAMESEQ
, TIPC_STYPE
,
1273 TIPC_LOWER
, TIPC_UPPER
)
1276 sendaddr
= (socket
.TIPC_ADDR_NAME
, TIPC_STYPE
,
1277 TIPC_LOWER
+ (TIPC_UPPER
- TIPC_LOWER
) / 2, 0)
1278 cli
.sendto(MSG
, sendaddr
)
1280 msg
, recvaddr
= srv
.recvfrom(1024)
1282 self
.assertEqual(cli
.getsockname(), recvaddr
)
1283 self
.assertEqual(msg
, MSG
)
1286 class TIPCThreadableTest (unittest
.TestCase
, ThreadableTest
):
1287 def __init__(self
, methodName
= 'runTest'):
1288 unittest
.TestCase
.__init
__(self
, methodName
= methodName
)
1289 ThreadableTest
.__init
__(self
)
1292 self
.srv
= socket
.socket(socket
.AF_TIPC
, socket
.SOCK_STREAM
)
1293 self
.srv
.setsockopt(socket
.SOL_SOCKET
, socket
.SO_REUSEADDR
, 1)
1294 srvaddr
= (socket
.TIPC_ADDR_NAMESEQ
, TIPC_STYPE
,
1295 TIPC_LOWER
, TIPC_UPPER
)
1296 self
.srv
.bind(srvaddr
)
1298 self
.serverExplicitReady()
1299 self
.conn
, self
.connaddr
= self
.srv
.accept()
1301 def clientSetUp(self
):
1302 # The is a hittable race between serverExplicitReady() and the
1303 # accept() call; sleep a little while to avoid it, otherwise
1304 # we could get an exception
1306 self
.cli
= socket
.socket(socket
.AF_TIPC
, socket
.SOCK_STREAM
)
1307 addr
= (socket
.TIPC_ADDR_NAME
, TIPC_STYPE
,
1308 TIPC_LOWER
+ (TIPC_UPPER
- TIPC_LOWER
) / 2, 0)
1309 self
.cli
.connect(addr
)
1310 self
.cliaddr
= self
.cli
.getsockname()
1312 def testStream(self
):
1313 msg
= self
.conn
.recv(1024)
1314 self
.assertEqual(msg
, MSG
)
1315 self
.assertEqual(self
.cliaddr
, self
.connaddr
)
1317 def _testStream(self
):
1323 tests
= [GeneralModuleTests
, BasicTCPTest
, TCPCloserTest
, TCPTimeoutTest
,
1324 TestExceptions
, BufferIOTest
, BasicTCPTest2
]
1325 if sys
.platform
!= 'mac':
1326 tests
.extend([ BasicUDPTest
, UDPTimeoutTest
])
1329 NonBlockingTCPTests
,
1330 FileObjectClassTestCase
,
1331 FileObjectInterruptedTestCase
,
1332 UnbufferedFileObjectClassTestCase
,
1333 LineBufferedFileObjectClassTestCase
,
1334 SmallBufferedFileObjectClassTestCase
,
1335 Urllib2FileobjectTest
,
1336 NetworkConnectionNoServer
,
1337 NetworkConnectionAttributesTest
,
1338 NetworkConnectionBehaviourTest
,
1340 if hasattr(socket
, "socketpair"):
1341 tests
.append(BasicSocketPairTest
)
1342 if sys
.platform
== 'linux2':
1343 tests
.append(TestLinuxAbstractNamespace
)
1344 if isTipcAvailable():
1345 tests
.append(TIPCTest
)
1346 tests
.append(TIPCThreadableTest
)
1348 thread_info
= test_support
.threading_setup()
1349 test_support
.run_unittest(*tests
)
1350 test_support
.threading_cleanup(*thread_info
)
1352 if __name__
== "__main__":