move sections
[python/dscho.git] / Lib / test / test_socket.py
blob430a6474ed1555a5cb3005a3815daa48698f1889
1 #!/usr/bin/env python
3 import unittest
4 from test import test_support
6 import errno
7 import socket
8 import select
9 import time
10 import traceback
11 import Queue
12 import sys
13 import os
14 import array
15 from weakref import proxy
16 import signal
18 try:
19 import thread
20 import threading
21 except ImportError:
22 thread = None
23 threading = None
25 HOST = test_support.HOST
26 MSG = 'Michael Gilfix was here\n'
28 class SocketTCPTest(unittest.TestCase):
30 def setUp(self):
31 self.serv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
32 self.port = test_support.bind_port(self.serv)
33 self.serv.listen(1)
35 def tearDown(self):
36 self.serv.close()
37 self.serv = None
39 class SocketUDPTest(unittest.TestCase):
41 def setUp(self):
42 self.serv = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
43 self.port = test_support.bind_port(self.serv)
45 def tearDown(self):
46 self.serv.close()
47 self.serv = None
49 class ThreadableTest:
50 """Threadable Test class
52 The ThreadableTest class makes it easy to create a threaded
53 client/server pair from an existing unit test. To create a
54 new threaded class from an existing unit test, use multiple
55 inheritance:
57 class NewClass (OldClass, ThreadableTest):
58 pass
60 This class defines two new fixture functions with obvious
61 purposes for overriding:
63 clientSetUp ()
64 clientTearDown ()
66 Any new test functions within the class must then define
67 tests in pairs, where the test name is preceeded with a
68 '_' to indicate the client portion of the test. Ex:
70 def testFoo(self):
71 # Server portion
73 def _testFoo(self):
74 # Client portion
76 Any exceptions raised by the clients during their tests
77 are caught and transferred to the main thread to alert
78 the testing framework.
80 Note, the server setup function cannot call any blocking
81 functions that rely on the client thread during setup,
82 unless serverExplicitReady() is called just before
83 the blocking call (such as in setting up a client/server
84 connection and performing the accept() in setUp().
85 """
87 def __init__(self):
88 # Swap the true setup function
89 self.__setUp = self.setUp
90 self.__tearDown = self.tearDown
91 self.setUp = self._setUp
92 self.tearDown = self._tearDown
94 def serverExplicitReady(self):
95 """This method allows the server to explicitly indicate that
96 it wants the client thread to proceed. This is useful if the
97 server is about to execute a blocking routine that is
98 dependent upon the client thread during its setup routine."""
99 self.server_ready.set()
101 def _setUp(self):
102 self.server_ready = threading.Event()
103 self.client_ready = threading.Event()
104 self.done = threading.Event()
105 self.queue = Queue.Queue(1)
107 # Do some munging to start the client test.
108 methodname = self.id()
109 i = methodname.rfind('.')
110 methodname = methodname[i+1:]
111 test_method = getattr(self, '_' + methodname)
112 self.client_thread = thread.start_new_thread(
113 self.clientRun, (test_method,))
115 self.__setUp()
116 if not self.server_ready.is_set():
117 self.server_ready.set()
118 self.client_ready.wait()
120 def _tearDown(self):
121 self.__tearDown()
122 self.done.wait()
124 if not self.queue.empty():
125 msg = self.queue.get()
126 self.fail(msg)
128 def clientRun(self, test_func):
129 self.server_ready.wait()
130 self.client_ready.set()
131 self.clientSetUp()
132 with test_support.check_py3k_warnings():
133 if not callable(test_func):
134 raise TypeError("test_func must be a callable function.")
135 try:
136 test_func()
137 except Exception, strerror:
138 self.queue.put(strerror)
139 self.clientTearDown()
141 def clientSetUp(self):
142 raise NotImplementedError("clientSetUp must be implemented.")
144 def clientTearDown(self):
145 self.done.set()
146 thread.exit()
148 class ThreadedTCPSocketTest(SocketTCPTest, ThreadableTest):
150 def __init__(self, methodName='runTest'):
151 SocketTCPTest.__init__(self, methodName=methodName)
152 ThreadableTest.__init__(self)
154 def clientSetUp(self):
155 self.cli = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
157 def clientTearDown(self):
158 self.cli.close()
159 self.cli = None
160 ThreadableTest.clientTearDown(self)
162 class ThreadedUDPSocketTest(SocketUDPTest, ThreadableTest):
164 def __init__(self, methodName='runTest'):
165 SocketUDPTest.__init__(self, methodName=methodName)
166 ThreadableTest.__init__(self)
168 def clientSetUp(self):
169 self.cli = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
171 class SocketConnectedTest(ThreadedTCPSocketTest):
173 def __init__(self, methodName='runTest'):
174 ThreadedTCPSocketTest.__init__(self, methodName=methodName)
176 def setUp(self):
177 ThreadedTCPSocketTest.setUp(self)
178 # Indicate explicitly we're ready for the client thread to
179 # proceed and then perform the blocking call to accept
180 self.serverExplicitReady()
181 conn, addr = self.serv.accept()
182 self.cli_conn = conn
184 def tearDown(self):
185 self.cli_conn.close()
186 self.cli_conn = None
187 ThreadedTCPSocketTest.tearDown(self)
189 def clientSetUp(self):
190 ThreadedTCPSocketTest.clientSetUp(self)
191 self.cli.connect((HOST, self.port))
192 self.serv_conn = self.cli
194 def clientTearDown(self):
195 self.serv_conn.close()
196 self.serv_conn = None
197 ThreadedTCPSocketTest.clientTearDown(self)
199 class SocketPairTest(unittest.TestCase, ThreadableTest):
201 def __init__(self, methodName='runTest'):
202 unittest.TestCase.__init__(self, methodName=methodName)
203 ThreadableTest.__init__(self)
205 def setUp(self):
206 self.serv, self.cli = socket.socketpair()
208 def tearDown(self):
209 self.serv.close()
210 self.serv = None
212 def clientSetUp(self):
213 pass
215 def clientTearDown(self):
216 self.cli.close()
217 self.cli = None
218 ThreadableTest.clientTearDown(self)
221 #######################################################################
222 ## Begin Tests
224 class GeneralModuleTests(unittest.TestCase):
226 def test_weakref(self):
227 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
228 p = proxy(s)
229 self.assertEqual(p.fileno(), s.fileno())
230 s.close()
231 s = None
232 try:
233 p.fileno()
234 except ReferenceError:
235 pass
236 else:
237 self.fail('Socket proxy still exists')
239 def testSocketError(self):
240 # Testing socket module exceptions
241 def raise_error(*args, **kwargs):
242 raise socket.error
243 def raise_herror(*args, **kwargs):
244 raise socket.herror
245 def raise_gaierror(*args, **kwargs):
246 raise socket.gaierror
247 self.assertRaises(socket.error, raise_error,
248 "Error raising socket exception.")
249 self.assertRaises(socket.error, raise_herror,
250 "Error raising socket exception.")
251 self.assertRaises(socket.error, raise_gaierror,
252 "Error raising socket exception.")
254 def testCrucialConstants(self):
255 # Testing for mission critical constants
256 socket.AF_INET
257 socket.SOCK_STREAM
258 socket.SOCK_DGRAM
259 socket.SOCK_RAW
260 socket.SOCK_RDM
261 socket.SOCK_SEQPACKET
262 socket.SOL_SOCKET
263 socket.SO_REUSEADDR
265 def testHostnameRes(self):
266 # Testing hostname resolution mechanisms
267 hostname = socket.gethostname()
268 try:
269 ip = socket.gethostbyname(hostname)
270 except socket.error:
271 # Probably name lookup wasn't set up right; skip this test
272 return
273 self.assertTrue(ip.find('.') >= 0, "Error resolving host to ip.")
274 try:
275 hname, aliases, ipaddrs = socket.gethostbyaddr(ip)
276 except socket.error:
277 # Probably a similar problem as above; skip this test
278 return
279 all_host_names = [hostname, hname] + aliases
280 fqhn = socket.getfqdn(ip)
281 if not fqhn in all_host_names:
282 self.fail("Error testing host resolution mechanisms. (fqdn: %s, all: %s)" % (fqhn, repr(all_host_names)))
284 def testRefCountGetNameInfo(self):
285 # Testing reference count for getnameinfo
286 if hasattr(sys, "getrefcount"):
287 try:
288 # On some versions, this loses a reference
289 orig = sys.getrefcount(__name__)
290 socket.getnameinfo(__name__,0)
291 except TypeError:
292 self.assertEqual(sys.getrefcount(__name__), orig,
293 "socket.getnameinfo loses a reference")
295 def testInterpreterCrash(self):
296 # Making sure getnameinfo doesn't crash the interpreter
297 try:
298 # On some versions, this crashes the interpreter.
299 socket.getnameinfo(('x', 0, 0, 0), 0)
300 except socket.error:
301 pass
303 def testNtoH(self):
304 # This just checks that htons etc. are their own inverse,
305 # when looking at the lower 16 or 32 bits.
306 sizes = {socket.htonl: 32, socket.ntohl: 32,
307 socket.htons: 16, socket.ntohs: 16}
308 for func, size in sizes.items():
309 mask = (1L<<size) - 1
310 for i in (0, 1, 0xffff, ~0xffff, 2, 0x01234567, 0x76543210):
311 self.assertEqual(i & mask, func(func(i&mask)) & mask)
313 swapped = func(mask)
314 self.assertEqual(swapped & mask, mask)
315 self.assertRaises(OverflowError, func, 1L<<34)
317 def testNtoHErrors(self):
318 good_values = [ 1, 2, 3, 1L, 2L, 3L ]
319 bad_values = [ -1, -2, -3, -1L, -2L, -3L ]
320 for k in good_values:
321 socket.ntohl(k)
322 socket.ntohs(k)
323 socket.htonl(k)
324 socket.htons(k)
325 for k in bad_values:
326 self.assertRaises(OverflowError, socket.ntohl, k)
327 self.assertRaises(OverflowError, socket.ntohs, k)
328 self.assertRaises(OverflowError, socket.htonl, k)
329 self.assertRaises(OverflowError, socket.htons, k)
331 def testGetServBy(self):
332 eq = self.assertEqual
333 # Find one service that exists, then check all the related interfaces.
334 # I've ordered this by protocols that have both a tcp and udp
335 # protocol, at least for modern Linuxes.
336 if sys.platform in ('linux2', 'freebsd4', 'freebsd5', 'freebsd6',
337 'freebsd7', 'freebsd8', 'darwin'):
338 # avoid the 'echo' service on this platform, as there is an
339 # assumption breaking non-standard port/protocol entry
340 services = ('daytime', 'qotd', 'domain')
341 else:
342 services = ('echo', 'daytime', 'domain')
343 for service in services:
344 try:
345 port = socket.getservbyname(service, 'tcp')
346 break
347 except socket.error:
348 pass
349 else:
350 raise socket.error
351 # Try same call with optional protocol omitted
352 port2 = socket.getservbyname(service)
353 eq(port, port2)
354 # Try udp, but don't barf it it doesn't exist
355 try:
356 udpport = socket.getservbyname(service, 'udp')
357 except socket.error:
358 udpport = None
359 else:
360 eq(udpport, port)
361 # Now make sure the lookup by port returns the same service name
362 eq(socket.getservbyport(port2), service)
363 eq(socket.getservbyport(port, 'tcp'), service)
364 if udpport is not None:
365 eq(socket.getservbyport(udpport, 'udp'), service)
366 # Make sure getservbyport does not accept out of range ports.
367 self.assertRaises(OverflowError, socket.getservbyport, -1)
368 self.assertRaises(OverflowError, socket.getservbyport, 65536)
370 def testDefaultTimeout(self):
371 # Testing default timeout
372 # The default timeout should initially be None
373 self.assertEqual(socket.getdefaulttimeout(), None)
374 s = socket.socket()
375 self.assertEqual(s.gettimeout(), None)
376 s.close()
378 # Set the default timeout to 10, and see if it propagates
379 socket.setdefaulttimeout(10)
380 self.assertEqual(socket.getdefaulttimeout(), 10)
381 s = socket.socket()
382 self.assertEqual(s.gettimeout(), 10)
383 s.close()
385 # Reset the default timeout to None, and see if it propagates
386 socket.setdefaulttimeout(None)
387 self.assertEqual(socket.getdefaulttimeout(), None)
388 s = socket.socket()
389 self.assertEqual(s.gettimeout(), None)
390 s.close()
392 # Check that setting it to an invalid value raises ValueError
393 self.assertRaises(ValueError, socket.setdefaulttimeout, -1)
395 # Check that setting it to an invalid type raises TypeError
396 self.assertRaises(TypeError, socket.setdefaulttimeout, "spam")
398 def testIPv4_inet_aton_fourbytes(self):
399 if not hasattr(socket, 'inet_aton'):
400 return # No inet_aton, nothing to check
401 # Test that issue1008086 and issue767150 are fixed.
402 # It must return 4 bytes.
403 self.assertEquals('\x00'*4, socket.inet_aton('0.0.0.0'))
404 self.assertEquals('\xff'*4, socket.inet_aton('255.255.255.255'))
406 def testIPv4toString(self):
407 if not hasattr(socket, 'inet_pton'):
408 return # No inet_pton() on this platform
409 from socket import inet_aton as f, inet_pton, AF_INET
410 g = lambda a: inet_pton(AF_INET, a)
412 self.assertEquals('\x00\x00\x00\x00', f('0.0.0.0'))
413 self.assertEquals('\xff\x00\xff\x00', f('255.0.255.0'))
414 self.assertEquals('\xaa\xaa\xaa\xaa', f('170.170.170.170'))
415 self.assertEquals('\x01\x02\x03\x04', f('1.2.3.4'))
416 self.assertEquals('\xff\xff\xff\xff', f('255.255.255.255'))
418 self.assertEquals('\x00\x00\x00\x00', g('0.0.0.0'))
419 self.assertEquals('\xff\x00\xff\x00', g('255.0.255.0'))
420 self.assertEquals('\xaa\xaa\xaa\xaa', g('170.170.170.170'))
421 self.assertEquals('\xff\xff\xff\xff', g('255.255.255.255'))
423 def testIPv6toString(self):
424 if not hasattr(socket, 'inet_pton'):
425 return # No inet_pton() on this platform
426 try:
427 from socket import inet_pton, AF_INET6, has_ipv6
428 if not has_ipv6:
429 return
430 except ImportError:
431 return
432 f = lambda a: inet_pton(AF_INET6, a)
434 self.assertEquals('\x00' * 16, f('::'))
435 self.assertEquals('\x00' * 16, f('0::0'))
436 self.assertEquals('\x00\x01' + '\x00' * 14, f('1::'))
437 self.assertEquals(
438 '\x45\xef\x76\xcb\x00\x1a\x56\xef\xaf\xeb\x0b\xac\x19\x24\xae\xae',
439 f('45ef:76cb:1a:56ef:afeb:bac:1924:aeae')
442 def testStringToIPv4(self):
443 if not hasattr(socket, 'inet_ntop'):
444 return # No inet_ntop() on this platform
445 from socket import inet_ntoa as f, inet_ntop, AF_INET
446 g = lambda a: inet_ntop(AF_INET, a)
448 self.assertEquals('1.0.1.0', f('\x01\x00\x01\x00'))
449 self.assertEquals('170.85.170.85', f('\xaa\x55\xaa\x55'))
450 self.assertEquals('255.255.255.255', f('\xff\xff\xff\xff'))
451 self.assertEquals('1.2.3.4', f('\x01\x02\x03\x04'))
453 self.assertEquals('1.0.1.0', g('\x01\x00\x01\x00'))
454 self.assertEquals('170.85.170.85', g('\xaa\x55\xaa\x55'))
455 self.assertEquals('255.255.255.255', g('\xff\xff\xff\xff'))
457 def testStringToIPv6(self):
458 if not hasattr(socket, 'inet_ntop'):
459 return # No inet_ntop() on this platform
460 try:
461 from socket import inet_ntop, AF_INET6, has_ipv6
462 if not has_ipv6:
463 return
464 except ImportError:
465 return
466 f = lambda a: inet_ntop(AF_INET6, a)
468 self.assertEquals('::', f('\x00' * 16))
469 self.assertEquals('::1', f('\x00' * 15 + '\x01'))
470 self.assertEquals(
471 'aef:b01:506:1001:ffff:9997:55:170',
472 f('\x0a\xef\x0b\x01\x05\x06\x10\x01\xff\xff\x99\x97\x00\x55\x01\x70')
475 # XXX The following don't test module-level functionality...
477 def _get_unused_port(self, bind_address='0.0.0.0'):
478 """Use a temporary socket to elicit an unused ephemeral port.
480 Args:
481 bind_address: Hostname or IP address to search for a port on.
483 Returns: A most likely to be unused port.
485 tempsock = socket.socket()
486 tempsock.bind((bind_address, 0))
487 host, port = tempsock.getsockname()
488 tempsock.close()
489 return port
491 def testSockName(self):
492 # Testing getsockname()
493 port = self._get_unused_port()
494 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
495 sock.bind(("0.0.0.0", port))
496 name = sock.getsockname()
497 # XXX(nnorwitz): http://tinyurl.com/os5jz seems to indicate
498 # it reasonable to get the host's addr in addition to 0.0.0.0.
499 # At least for eCos. This is required for the S/390 to pass.
500 my_ip_addr = socket.gethostbyname(socket.gethostname())
501 self.assertIn(name[0], ("0.0.0.0", my_ip_addr), '%s invalid' % name[0])
502 self.assertEqual(name[1], port)
504 def testGetSockOpt(self):
505 # Testing getsockopt()
506 # We know a socket should start without reuse==0
507 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
508 reuse = sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR)
509 self.assertFalse(reuse != 0, "initial mode is reuse")
511 def testSetSockOpt(self):
512 # Testing setsockopt()
513 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
514 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
515 reuse = sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR)
516 self.assertFalse(reuse == 0, "failed to set reuse mode")
518 def testSendAfterClose(self):
519 # testing send() after close() with timeout
520 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
521 sock.settimeout(1)
522 sock.close()
523 self.assertRaises(socket.error, sock.send, "spam")
525 def testNewAttributes(self):
526 # testing .family, .type and .protocol
527 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
528 self.assertEqual(sock.family, socket.AF_INET)
529 self.assertEqual(sock.type, socket.SOCK_STREAM)
530 self.assertEqual(sock.proto, 0)
531 sock.close()
533 def test_getsockaddrarg(self):
534 host = '0.0.0.0'
535 port = self._get_unused_port(bind_address=host)
536 big_port = port + 65536
537 neg_port = port - 65536
538 sock = socket.socket()
539 try:
540 self.assertRaises(OverflowError, sock.bind, (host, big_port))
541 self.assertRaises(OverflowError, sock.bind, (host, neg_port))
542 sock.bind((host, port))
543 finally:
544 sock.close()
546 def test_sock_ioctl(self):
547 if os.name != "nt":
548 return
549 self.assertTrue(hasattr(socket.socket, 'ioctl'))
550 self.assertTrue(hasattr(socket, 'SIO_RCVALL'))
551 self.assertTrue(hasattr(socket, 'RCVALL_ON'))
552 self.assertTrue(hasattr(socket, 'RCVALL_OFF'))
553 self.assertTrue(hasattr(socket, 'SIO_KEEPALIVE_VALS'))
554 s = socket.socket()
555 self.assertRaises(ValueError, s.ioctl, -1, None)
556 s.ioctl(socket.SIO_KEEPALIVE_VALS, (1, 100, 100))
559 @unittest.skipUnless(thread, 'Threading required for this test.')
560 class BasicTCPTest(SocketConnectedTest):
562 def __init__(self, methodName='runTest'):
563 SocketConnectedTest.__init__(self, methodName=methodName)
565 def testRecv(self):
566 # Testing large receive over TCP
567 msg = self.cli_conn.recv(1024)
568 self.assertEqual(msg, MSG)
570 def _testRecv(self):
571 self.serv_conn.send(MSG)
573 def testOverFlowRecv(self):
574 # Testing receive in chunks over TCP
575 seg1 = self.cli_conn.recv(len(MSG) - 3)
576 seg2 = self.cli_conn.recv(1024)
577 msg = seg1 + seg2
578 self.assertEqual(msg, MSG)
580 def _testOverFlowRecv(self):
581 self.serv_conn.send(MSG)
583 def testRecvFrom(self):
584 # Testing large recvfrom() over TCP
585 msg, addr = self.cli_conn.recvfrom(1024)
586 self.assertEqual(msg, MSG)
588 def _testRecvFrom(self):
589 self.serv_conn.send(MSG)
591 def testOverFlowRecvFrom(self):
592 # Testing recvfrom() in chunks over TCP
593 seg1, addr = self.cli_conn.recvfrom(len(MSG)-3)
594 seg2, addr = self.cli_conn.recvfrom(1024)
595 msg = seg1 + seg2
596 self.assertEqual(msg, MSG)
598 def _testOverFlowRecvFrom(self):
599 self.serv_conn.send(MSG)
601 def testSendAll(self):
602 # Testing sendall() with a 2048 byte string over TCP
603 msg = ''
604 while 1:
605 read = self.cli_conn.recv(1024)
606 if not read:
607 break
608 msg += read
609 self.assertEqual(msg, 'f' * 2048)
611 def _testSendAll(self):
612 big_chunk = 'f' * 2048
613 self.serv_conn.sendall(big_chunk)
615 def testFromFd(self):
616 # Testing fromfd()
617 if not hasattr(socket, "fromfd"):
618 return # On Windows, this doesn't exist
619 fd = self.cli_conn.fileno()
620 sock = socket.fromfd(fd, socket.AF_INET, socket.SOCK_STREAM)
621 msg = sock.recv(1024)
622 self.assertEqual(msg, MSG)
624 def _testFromFd(self):
625 self.serv_conn.send(MSG)
627 def testShutdown(self):
628 # Testing shutdown()
629 msg = self.cli_conn.recv(1024)
630 self.assertEqual(msg, MSG)
631 # wait for _testShutdown to finish: on OS X, when the server
632 # closes the connection the client also becomes disconnected,
633 # and the client's shutdown call will fail. (Issue #4397.)
634 self.done.wait()
636 def _testShutdown(self):
637 self.serv_conn.send(MSG)
638 self.serv_conn.shutdown(2)
640 @unittest.skipUnless(thread, 'Threading required for this test.')
641 class BasicUDPTest(ThreadedUDPSocketTest):
643 def __init__(self, methodName='runTest'):
644 ThreadedUDPSocketTest.__init__(self, methodName=methodName)
646 def testSendtoAndRecv(self):
647 # Testing sendto() and Recv() over UDP
648 msg = self.serv.recv(len(MSG))
649 self.assertEqual(msg, MSG)
651 def _testSendtoAndRecv(self):
652 self.cli.sendto(MSG, 0, (HOST, self.port))
654 def testRecvFrom(self):
655 # Testing recvfrom() over UDP
656 msg, addr = self.serv.recvfrom(len(MSG))
657 self.assertEqual(msg, MSG)
659 def _testRecvFrom(self):
660 self.cli.sendto(MSG, 0, (HOST, self.port))
662 def testRecvFromNegative(self):
663 # Negative lengths passed to recvfrom should give ValueError.
664 self.assertRaises(ValueError, self.serv.recvfrom, -1)
666 def _testRecvFromNegative(self):
667 self.cli.sendto(MSG, 0, (HOST, self.port))
669 @unittest.skipUnless(thread, 'Threading required for this test.')
670 class TCPCloserTest(ThreadedTCPSocketTest):
672 def testClose(self):
673 conn, addr = self.serv.accept()
674 conn.close()
676 sd = self.cli
677 read, write, err = select.select([sd], [], [], 1.0)
678 self.assertEqual(read, [sd])
679 self.assertEqual(sd.recv(1), '')
681 def _testClose(self):
682 self.cli.connect((HOST, self.port))
683 time.sleep(1.0)
685 @unittest.skipUnless(thread, 'Threading required for this test.')
686 class BasicSocketPairTest(SocketPairTest):
688 def __init__(self, methodName='runTest'):
689 SocketPairTest.__init__(self, methodName=methodName)
691 def testRecv(self):
692 msg = self.serv.recv(1024)
693 self.assertEqual(msg, MSG)
695 def _testRecv(self):
696 self.cli.send(MSG)
698 def testSend(self):
699 self.serv.send(MSG)
701 def _testSend(self):
702 msg = self.cli.recv(1024)
703 self.assertEqual(msg, MSG)
705 @unittest.skipUnless(thread, 'Threading required for this test.')
706 class NonBlockingTCPTests(ThreadedTCPSocketTest):
708 def __init__(self, methodName='runTest'):
709 ThreadedTCPSocketTest.__init__(self, methodName=methodName)
711 def testSetBlocking(self):
712 # Testing whether set blocking works
713 self.serv.setblocking(0)
714 start = time.time()
715 try:
716 self.serv.accept()
717 except socket.error:
718 pass
719 end = time.time()
720 self.assertTrue((end - start) < 1.0, "Error setting non-blocking mode.")
722 def _testSetBlocking(self):
723 pass
725 def testAccept(self):
726 # Testing non-blocking accept
727 self.serv.setblocking(0)
728 try:
729 conn, addr = self.serv.accept()
730 except socket.error:
731 pass
732 else:
733 self.fail("Error trying to do non-blocking accept.")
734 read, write, err = select.select([self.serv], [], [])
735 if self.serv in read:
736 conn, addr = self.serv.accept()
737 else:
738 self.fail("Error trying to do accept after select.")
740 def _testAccept(self):
741 time.sleep(0.1)
742 self.cli.connect((HOST, self.port))
744 def testConnect(self):
745 # Testing non-blocking connect
746 conn, addr = self.serv.accept()
748 def _testConnect(self):
749 self.cli.settimeout(10)
750 self.cli.connect((HOST, self.port))
752 def testRecv(self):
753 # Testing non-blocking recv
754 conn, addr = self.serv.accept()
755 conn.setblocking(0)
756 try:
757 msg = conn.recv(len(MSG))
758 except socket.error:
759 pass
760 else:
761 self.fail("Error trying to do non-blocking recv.")
762 read, write, err = select.select([conn], [], [])
763 if conn in read:
764 msg = conn.recv(len(MSG))
765 self.assertEqual(msg, MSG)
766 else:
767 self.fail("Error during select call to non-blocking socket.")
769 def _testRecv(self):
770 self.cli.connect((HOST, self.port))
771 time.sleep(0.1)
772 self.cli.send(MSG)
774 @unittest.skipUnless(thread, 'Threading required for this test.')
775 class FileObjectClassTestCase(SocketConnectedTest):
777 bufsize = -1 # Use default buffer size
779 def __init__(self, methodName='runTest'):
780 SocketConnectedTest.__init__(self, methodName=methodName)
782 def setUp(self):
783 SocketConnectedTest.setUp(self)
784 self.serv_file = self.cli_conn.makefile('rb', self.bufsize)
786 def tearDown(self):
787 self.serv_file.close()
788 self.assertTrue(self.serv_file.closed)
789 self.serv_file = None
790 SocketConnectedTest.tearDown(self)
792 def clientSetUp(self):
793 SocketConnectedTest.clientSetUp(self)
794 self.cli_file = self.serv_conn.makefile('wb')
796 def clientTearDown(self):
797 self.cli_file.close()
798 self.assertTrue(self.cli_file.closed)
799 self.cli_file = None
800 SocketConnectedTest.clientTearDown(self)
802 def testSmallRead(self):
803 # Performing small file read test
804 first_seg = self.serv_file.read(len(MSG)-3)
805 second_seg = self.serv_file.read(3)
806 msg = first_seg + second_seg
807 self.assertEqual(msg, MSG)
809 def _testSmallRead(self):
810 self.cli_file.write(MSG)
811 self.cli_file.flush()
813 def testFullRead(self):
814 # read until EOF
815 msg = self.serv_file.read()
816 self.assertEqual(msg, MSG)
818 def _testFullRead(self):
819 self.cli_file.write(MSG)
820 self.cli_file.close()
822 def testUnbufferedRead(self):
823 # Performing unbuffered file read test
824 buf = ''
825 while 1:
826 char = self.serv_file.read(1)
827 if not char:
828 break
829 buf += char
830 self.assertEqual(buf, MSG)
832 def _testUnbufferedRead(self):
833 self.cli_file.write(MSG)
834 self.cli_file.flush()
836 def testReadline(self):
837 # Performing file readline test
838 line = self.serv_file.readline()
839 self.assertEqual(line, MSG)
841 def _testReadline(self):
842 self.cli_file.write(MSG)
843 self.cli_file.flush()
845 def testReadlineAfterRead(self):
846 a_baloo_is = self.serv_file.read(len("A baloo is"))
847 self.assertEqual("A baloo is", a_baloo_is)
848 _a_bear = self.serv_file.read(len(" a bear"))
849 self.assertEqual(" a bear", _a_bear)
850 line = self.serv_file.readline()
851 self.assertEqual("\n", line)
852 line = self.serv_file.readline()
853 self.assertEqual("A BALOO IS A BEAR.\n", line)
854 line = self.serv_file.readline()
855 self.assertEqual(MSG, line)
857 def _testReadlineAfterRead(self):
858 self.cli_file.write("A baloo is a bear\n")
859 self.cli_file.write("A BALOO IS A BEAR.\n")
860 self.cli_file.write(MSG)
861 self.cli_file.flush()
863 def testReadlineAfterReadNoNewline(self):
864 end_of_ = self.serv_file.read(len("End Of "))
865 self.assertEqual("End Of ", end_of_)
866 line = self.serv_file.readline()
867 self.assertEqual("Line", line)
869 def _testReadlineAfterReadNoNewline(self):
870 self.cli_file.write("End Of Line")
872 def testClosedAttr(self):
873 self.assertTrue(not self.serv_file.closed)
875 def _testClosedAttr(self):
876 self.assertTrue(not self.cli_file.closed)
879 class FileObjectInterruptedTestCase(unittest.TestCase):
880 """Test that the file object correctly handles EINTR internally."""
882 class MockSocket(object):
883 def __init__(self, recv_funcs=()):
884 # A generator that returns callables that we'll call for each
885 # call to recv().
886 self._recv_step = iter(recv_funcs)
888 def recv(self, size):
889 return self._recv_step.next()()
891 @staticmethod
892 def _raise_eintr():
893 raise socket.error(errno.EINTR)
895 def _test_readline(self, size=-1, **kwargs):
896 mock_sock = self.MockSocket(recv_funcs=[
897 lambda : "This is the first line\nAnd the sec",
898 self._raise_eintr,
899 lambda : "ond line is here\n",
900 lambda : "",
902 fo = socket._fileobject(mock_sock, **kwargs)
903 self.assertEquals(fo.readline(size), "This is the first line\n")
904 self.assertEquals(fo.readline(size), "And the second line is here\n")
906 def _test_read(self, size=-1, **kwargs):
907 mock_sock = self.MockSocket(recv_funcs=[
908 lambda : "This is the first line\nAnd the sec",
909 self._raise_eintr,
910 lambda : "ond line is here\n",
911 lambda : "",
913 fo = socket._fileobject(mock_sock, **kwargs)
914 self.assertEquals(fo.read(size), "This is the first line\n"
915 "And the second line is here\n")
917 def test_default(self):
918 self._test_readline()
919 self._test_readline(size=100)
920 self._test_read()
921 self._test_read(size=100)
923 def test_with_1k_buffer(self):
924 self._test_readline(bufsize=1024)
925 self._test_readline(size=100, bufsize=1024)
926 self._test_read(bufsize=1024)
927 self._test_read(size=100, bufsize=1024)
929 def _test_readline_no_buffer(self, size=-1):
930 mock_sock = self.MockSocket(recv_funcs=[
931 lambda : "aa",
932 lambda : "\n",
933 lambda : "BB",
934 self._raise_eintr,
935 lambda : "bb",
936 lambda : "",
938 fo = socket._fileobject(mock_sock, bufsize=0)
939 self.assertEquals(fo.readline(size), "aa\n")
940 self.assertEquals(fo.readline(size), "BBbb")
942 def test_no_buffer(self):
943 self._test_readline_no_buffer()
944 self._test_readline_no_buffer(size=4)
945 self._test_read(bufsize=0)
946 self._test_read(size=100, bufsize=0)
949 class UnbufferedFileObjectClassTestCase(FileObjectClassTestCase):
951 """Repeat the tests from FileObjectClassTestCase with bufsize==0.
953 In this case (and in this case only), it should be possible to
954 create a file object, read a line from it, create another file
955 object, read another line from it, without loss of data in the
956 first file object's buffer. Note that httplib relies on this
957 when reading multiple requests from the same socket."""
959 bufsize = 0 # Use unbuffered mode
961 def testUnbufferedReadline(self):
962 # Read a line, create a new file object, read another line with it
963 line = self.serv_file.readline() # first line
964 self.assertEqual(line, "A. " + MSG) # first line
965 self.serv_file = self.cli_conn.makefile('rb', 0)
966 line = self.serv_file.readline() # second line
967 self.assertEqual(line, "B. " + MSG) # second line
969 def _testUnbufferedReadline(self):
970 self.cli_file.write("A. " + MSG)
971 self.cli_file.write("B. " + MSG)
972 self.cli_file.flush()
974 class LineBufferedFileObjectClassTestCase(FileObjectClassTestCase):
976 bufsize = 1 # Default-buffered for reading; line-buffered for writing
979 class SmallBufferedFileObjectClassTestCase(FileObjectClassTestCase):
981 bufsize = 2 # Exercise the buffering code
984 class NetworkConnectionTest(object):
985 """Prove network connection."""
986 def clientSetUp(self):
987 # We're inherited below by BasicTCPTest2, which also inherits
988 # BasicTCPTest, which defines self.port referenced below.
989 self.cli = socket.create_connection((HOST, self.port))
990 self.serv_conn = self.cli
992 class BasicTCPTest2(NetworkConnectionTest, BasicTCPTest):
993 """Tests that NetworkConnection does not break existing TCP functionality.
996 class NetworkConnectionNoServer(unittest.TestCase):
997 def testWithoutServer(self):
998 port = test_support.find_unused_port()
999 self.assertRaises(
1000 socket.error,
1001 lambda: socket.create_connection((HOST, port))
1004 @unittest.skipUnless(thread, 'Threading required for this test.')
1005 class NetworkConnectionAttributesTest(SocketTCPTest, ThreadableTest):
1007 def __init__(self, methodName='runTest'):
1008 SocketTCPTest.__init__(self, methodName=methodName)
1009 ThreadableTest.__init__(self)
1011 def clientSetUp(self):
1012 self.source_port = test_support.find_unused_port()
1014 def clientTearDown(self):
1015 self.cli.close()
1016 self.cli = None
1017 ThreadableTest.clientTearDown(self)
1019 def _justAccept(self):
1020 conn, addr = self.serv.accept()
1022 testFamily = _justAccept
1023 def _testFamily(self):
1024 self.cli = socket.create_connection((HOST, self.port), timeout=30)
1025 self.assertEqual(self.cli.family, 2)
1027 testSourceAddress = _justAccept
1028 def _testSourceAddress(self):
1029 self.cli = socket.create_connection((HOST, self.port), timeout=30,
1030 source_address=('', self.source_port))
1031 self.assertEqual(self.cli.getsockname()[1], self.source_port)
1032 # The port number being used is sufficient to show that the bind()
1033 # call happened.
1035 testTimeoutDefault = _justAccept
1036 def _testTimeoutDefault(self):
1037 # passing no explicit timeout uses socket's global default
1038 self.assertTrue(socket.getdefaulttimeout() is None)
1039 socket.setdefaulttimeout(42)
1040 try:
1041 self.cli = socket.create_connection((HOST, self.port))
1042 finally:
1043 socket.setdefaulttimeout(None)
1044 self.assertEquals(self.cli.gettimeout(), 42)
1046 testTimeoutNone = _justAccept
1047 def _testTimeoutNone(self):
1048 # None timeout means the same as sock.settimeout(None)
1049 self.assertTrue(socket.getdefaulttimeout() is None)
1050 socket.setdefaulttimeout(30)
1051 try:
1052 self.cli = socket.create_connection((HOST, self.port), timeout=None)
1053 finally:
1054 socket.setdefaulttimeout(None)
1055 self.assertEqual(self.cli.gettimeout(), None)
1057 testTimeoutValueNamed = _justAccept
1058 def _testTimeoutValueNamed(self):
1059 self.cli = socket.create_connection((HOST, self.port), timeout=30)
1060 self.assertEqual(self.cli.gettimeout(), 30)
1062 testTimeoutValueNonamed = _justAccept
1063 def _testTimeoutValueNonamed(self):
1064 self.cli = socket.create_connection((HOST, self.port), 30)
1065 self.assertEqual(self.cli.gettimeout(), 30)
1067 @unittest.skipUnless(thread, 'Threading required for this test.')
1068 class NetworkConnectionBehaviourTest(SocketTCPTest, ThreadableTest):
1070 def __init__(self, methodName='runTest'):
1071 SocketTCPTest.__init__(self, methodName=methodName)
1072 ThreadableTest.__init__(self)
1074 def clientSetUp(self):
1075 pass
1077 def clientTearDown(self):
1078 self.cli.close()
1079 self.cli = None
1080 ThreadableTest.clientTearDown(self)
1082 def testInsideTimeout(self):
1083 conn, addr = self.serv.accept()
1084 time.sleep(3)
1085 conn.send("done!")
1086 testOutsideTimeout = testInsideTimeout
1088 def _testInsideTimeout(self):
1089 self.cli = sock = socket.create_connection((HOST, self.port))
1090 data = sock.recv(5)
1091 self.assertEqual(data, "done!")
1093 def _testOutsideTimeout(self):
1094 self.cli = sock = socket.create_connection((HOST, self.port), timeout=1)
1095 self.assertRaises(socket.timeout, lambda: sock.recv(5))
1098 class Urllib2FileobjectTest(unittest.TestCase):
1100 # urllib2.HTTPHandler has "borrowed" socket._fileobject, and requires that
1101 # it close the socket if the close c'tor argument is true
1103 def testClose(self):
1104 class MockSocket:
1105 closed = False
1106 def flush(self): pass
1107 def close(self): self.closed = True
1109 # must not close unless we request it: the original use of _fileobject
1110 # by module socket requires that the underlying socket not be closed until
1111 # the _socketobject that created the _fileobject is closed
1112 s = MockSocket()
1113 f = socket._fileobject(s)
1114 f.close()
1115 self.assertTrue(not s.closed)
1117 s = MockSocket()
1118 f = socket._fileobject(s, close=True)
1119 f.close()
1120 self.assertTrue(s.closed)
1122 class TCPTimeoutTest(SocketTCPTest):
1124 def testTCPTimeout(self):
1125 def raise_timeout(*args, **kwargs):
1126 self.serv.settimeout(1.0)
1127 self.serv.accept()
1128 self.assertRaises(socket.timeout, raise_timeout,
1129 "Error generating a timeout exception (TCP)")
1131 def testTimeoutZero(self):
1132 ok = False
1133 try:
1134 self.serv.settimeout(0.0)
1135 foo = self.serv.accept()
1136 except socket.timeout:
1137 self.fail("caught timeout instead of error (TCP)")
1138 except socket.error:
1139 ok = True
1140 except:
1141 self.fail("caught unexpected exception (TCP)")
1142 if not ok:
1143 self.fail("accept() returned success when we did not expect it")
1145 def testInterruptedTimeout(self):
1146 # XXX I don't know how to do this test on MSWindows or any other
1147 # plaform that doesn't support signal.alarm() or os.kill(), though
1148 # the bug should have existed on all platforms.
1149 if not hasattr(signal, "alarm"):
1150 return # can only test on *nix
1151 self.serv.settimeout(5.0) # must be longer than alarm
1152 class Alarm(Exception):
1153 pass
1154 def alarm_handler(signal, frame):
1155 raise Alarm
1156 old_alarm = signal.signal(signal.SIGALRM, alarm_handler)
1157 try:
1158 signal.alarm(2) # POSIX allows alarm to be up to 1 second early
1159 try:
1160 foo = self.serv.accept()
1161 except socket.timeout:
1162 self.fail("caught timeout instead of Alarm")
1163 except Alarm:
1164 pass
1165 except:
1166 self.fail("caught other exception instead of Alarm:"
1167 " %s(%s):\n%s" %
1168 (sys.exc_info()[:2] + (traceback.format_exc(),)))
1169 else:
1170 self.fail("nothing caught")
1171 finally:
1172 signal.alarm(0) # shut off alarm
1173 except Alarm:
1174 self.fail("got Alarm in wrong place")
1175 finally:
1176 # no alarm can be pending. Safe to restore old handler.
1177 signal.signal(signal.SIGALRM, old_alarm)
1179 class UDPTimeoutTest(SocketTCPTest):
1181 def testUDPTimeout(self):
1182 def raise_timeout(*args, **kwargs):
1183 self.serv.settimeout(1.0)
1184 self.serv.recv(1024)
1185 self.assertRaises(socket.timeout, raise_timeout,
1186 "Error generating a timeout exception (UDP)")
1188 def testTimeoutZero(self):
1189 ok = False
1190 try:
1191 self.serv.settimeout(0.0)
1192 foo = self.serv.recv(1024)
1193 except socket.timeout:
1194 self.fail("caught timeout instead of error (UDP)")
1195 except socket.error:
1196 ok = True
1197 except:
1198 self.fail("caught unexpected exception (UDP)")
1199 if not ok:
1200 self.fail("recv() returned success when we did not expect it")
1202 class TestExceptions(unittest.TestCase):
1204 def testExceptionTree(self):
1205 self.assertTrue(issubclass(socket.error, Exception))
1206 self.assertTrue(issubclass(socket.herror, socket.error))
1207 self.assertTrue(issubclass(socket.gaierror, socket.error))
1208 self.assertTrue(issubclass(socket.timeout, socket.error))
1210 class TestLinuxAbstractNamespace(unittest.TestCase):
1212 UNIX_PATH_MAX = 108
1214 def testLinuxAbstractNamespace(self):
1215 address = "\x00python-test-hello\x00\xff"
1216 s1 = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
1217 s1.bind(address)
1218 s1.listen(1)
1219 s2 = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
1220 s2.connect(s1.getsockname())
1221 s1.accept()
1222 self.assertEqual(s1.getsockname(), address)
1223 self.assertEqual(s2.getpeername(), address)
1225 def testMaxName(self):
1226 address = "\x00" + "h" * (self.UNIX_PATH_MAX - 1)
1227 s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
1228 s.bind(address)
1229 self.assertEqual(s.getsockname(), address)
1231 def testNameOverflow(self):
1232 address = "\x00" + "h" * self.UNIX_PATH_MAX
1233 s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
1234 self.assertRaises(socket.error, s.bind, address)
1237 @unittest.skipUnless(thread, 'Threading required for this test.')
1238 class BufferIOTest(SocketConnectedTest):
1240 Test the buffer versions of socket.recv() and socket.send().
1242 def __init__(self, methodName='runTest'):
1243 SocketConnectedTest.__init__(self, methodName=methodName)
1245 def testRecvIntoArray(self):
1246 buf = array.array('c', ' '*1024)
1247 nbytes = self.cli_conn.recv_into(buf)
1248 self.assertEqual(nbytes, len(MSG))
1249 msg = buf.tostring()[:len(MSG)]
1250 self.assertEqual(msg, MSG)
1252 def _testRecvIntoArray(self):
1253 with test_support.check_py3k_warnings():
1254 buf = buffer(MSG)
1255 self.serv_conn.send(buf)
1257 def testRecvIntoBytearray(self):
1258 buf = bytearray(1024)
1259 nbytes = self.cli_conn.recv_into(buf)
1260 self.assertEqual(nbytes, len(MSG))
1261 msg = buf[:len(MSG)]
1262 self.assertEqual(msg, MSG)
1264 _testRecvIntoBytearray = _testRecvIntoArray
1266 def testRecvIntoMemoryview(self):
1267 buf = bytearray(1024)
1268 nbytes = self.cli_conn.recv_into(memoryview(buf))
1269 self.assertEqual(nbytes, len(MSG))
1270 msg = buf[:len(MSG)]
1271 self.assertEqual(msg, MSG)
1273 _testRecvIntoMemoryview = _testRecvIntoArray
1275 def testRecvFromIntoArray(self):
1276 buf = array.array('c', ' '*1024)
1277 nbytes, addr = self.cli_conn.recvfrom_into(buf)
1278 self.assertEqual(nbytes, len(MSG))
1279 msg = buf.tostring()[:len(MSG)]
1280 self.assertEqual(msg, MSG)
1282 def _testRecvFromIntoArray(self):
1283 with test_support.check_py3k_warnings():
1284 buf = buffer(MSG)
1285 self.serv_conn.send(buf)
1287 def testRecvFromIntoBytearray(self):
1288 buf = bytearray(1024)
1289 nbytes, addr = self.cli_conn.recvfrom_into(buf)
1290 self.assertEqual(nbytes, len(MSG))
1291 msg = buf[:len(MSG)]
1292 self.assertEqual(msg, MSG)
1294 _testRecvFromIntoBytearray = _testRecvFromIntoArray
1296 def testRecvFromIntoMemoryview(self):
1297 buf = bytearray(1024)
1298 nbytes, addr = self.cli_conn.recvfrom_into(memoryview(buf))
1299 self.assertEqual(nbytes, len(MSG))
1300 msg = buf[:len(MSG)]
1301 self.assertEqual(msg, MSG)
1303 _testRecvFromIntoMemoryview = _testRecvFromIntoArray
1306 TIPC_STYPE = 2000
1307 TIPC_LOWER = 200
1308 TIPC_UPPER = 210
1310 def isTipcAvailable():
1311 """Check if the TIPC module is loaded
1313 The TIPC module is not loaded automatically on Ubuntu and probably
1314 other Linux distros.
1316 if not hasattr(socket, "AF_TIPC"):
1317 return False
1318 if not os.path.isfile("/proc/modules"):
1319 return False
1320 with open("/proc/modules") as f:
1321 for line in f:
1322 if line.startswith("tipc "):
1323 return True
1324 if test_support.verbose:
1325 print "TIPC module is not loaded, please 'sudo modprobe tipc'"
1326 return False
1328 class TIPCTest (unittest.TestCase):
1329 def testRDM(self):
1330 srv = socket.socket(socket.AF_TIPC, socket.SOCK_RDM)
1331 cli = socket.socket(socket.AF_TIPC, socket.SOCK_RDM)
1333 srv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
1334 srvaddr = (socket.TIPC_ADDR_NAMESEQ, TIPC_STYPE,
1335 TIPC_LOWER, TIPC_UPPER)
1336 srv.bind(srvaddr)
1338 sendaddr = (socket.TIPC_ADDR_NAME, TIPC_STYPE,
1339 TIPC_LOWER + (TIPC_UPPER - TIPC_LOWER) / 2, 0)
1340 cli.sendto(MSG, sendaddr)
1342 msg, recvaddr = srv.recvfrom(1024)
1344 self.assertEqual(cli.getsockname(), recvaddr)
1345 self.assertEqual(msg, MSG)
1348 class TIPCThreadableTest (unittest.TestCase, ThreadableTest):
1349 def __init__(self, methodName = 'runTest'):
1350 unittest.TestCase.__init__(self, methodName = methodName)
1351 ThreadableTest.__init__(self)
1353 def setUp(self):
1354 self.srv = socket.socket(socket.AF_TIPC, socket.SOCK_STREAM)
1355 self.srv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
1356 srvaddr = (socket.TIPC_ADDR_NAMESEQ, TIPC_STYPE,
1357 TIPC_LOWER, TIPC_UPPER)
1358 self.srv.bind(srvaddr)
1359 self.srv.listen(5)
1360 self.serverExplicitReady()
1361 self.conn, self.connaddr = self.srv.accept()
1363 def clientSetUp(self):
1364 # The is a hittable race between serverExplicitReady() and the
1365 # accept() call; sleep a little while to avoid it, otherwise
1366 # we could get an exception
1367 time.sleep(0.1)
1368 self.cli = socket.socket(socket.AF_TIPC, socket.SOCK_STREAM)
1369 addr = (socket.TIPC_ADDR_NAME, TIPC_STYPE,
1370 TIPC_LOWER + (TIPC_UPPER - TIPC_LOWER) / 2, 0)
1371 self.cli.connect(addr)
1372 self.cliaddr = self.cli.getsockname()
1374 def testStream(self):
1375 msg = self.conn.recv(1024)
1376 self.assertEqual(msg, MSG)
1377 self.assertEqual(self.cliaddr, self.connaddr)
1379 def _testStream(self):
1380 self.cli.send(MSG)
1381 self.cli.close()
1384 def test_main():
1385 tests = [GeneralModuleTests, BasicTCPTest, TCPCloserTest, TCPTimeoutTest,
1386 TestExceptions, BufferIOTest, BasicTCPTest2, BasicUDPTest,
1387 UDPTimeoutTest ]
1389 tests.extend([
1390 NonBlockingTCPTests,
1391 FileObjectClassTestCase,
1392 FileObjectInterruptedTestCase,
1393 UnbufferedFileObjectClassTestCase,
1394 LineBufferedFileObjectClassTestCase,
1395 SmallBufferedFileObjectClassTestCase,
1396 Urllib2FileobjectTest,
1397 NetworkConnectionNoServer,
1398 NetworkConnectionAttributesTest,
1399 NetworkConnectionBehaviourTest,
1401 if hasattr(socket, "socketpair"):
1402 tests.append(BasicSocketPairTest)
1403 if sys.platform == 'linux2':
1404 tests.append(TestLinuxAbstractNamespace)
1405 if isTipcAvailable():
1406 tests.append(TIPCTest)
1407 tests.append(TIPCThreadableTest)
1409 thread_info = test_support.threading_setup()
1410 test_support.run_unittest(*tests)
1411 test_support.threading_cleanup(*thread_info)
1413 if __name__ == "__main__":
1414 test_main()