Fix the socket tests so they can be run concurrently. Backport candidate
[python.git] / Lib / test / test_socket.py
blobe0aa58cc1cc84eea53807517c633448f018b1026
1 #!/usr/bin/env python
3 import unittest
4 from test import test_support
6 import socket
7 import select
8 import time
9 import thread, threading
10 import Queue
11 import sys
12 import array
13 from weakref import proxy
15 PORT = 50007
16 HOST = 'localhost'
17 MSG = 'Michael Gilfix was here\n'
19 class SocketTCPTest(unittest.TestCase):
21 def setUp(self):
22 self.serv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
23 self.serv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
24 global PORT
25 PORT = test_support.bind_port(self.serv, HOST, PORT)
26 self.serv.listen(1)
28 def tearDown(self):
29 self.serv.close()
30 self.serv = None
32 class SocketUDPTest(unittest.TestCase):
34 def setUp(self):
35 self.serv = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
36 self.serv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
37 global PORT
38 PORT = test_support.bind_port(self.serv, HOST, PORT)
40 def tearDown(self):
41 self.serv.close()
42 self.serv = None
44 class ThreadableTest:
45 """Threadable Test class
47 The ThreadableTest class makes it easy to create a threaded
48 client/server pair from an existing unit test. To create a
49 new threaded class from an existing unit test, use multiple
50 inheritance:
52 class NewClass (OldClass, ThreadableTest):
53 pass
55 This class defines two new fixture functions with obvious
56 purposes for overriding:
58 clientSetUp ()
59 clientTearDown ()
61 Any new test functions within the class must then define
62 tests in pairs, where the test name is preceeded with a
63 '_' to indicate the client portion of the test. Ex:
65 def testFoo(self):
66 # Server portion
68 def _testFoo(self):
69 # Client portion
71 Any exceptions raised by the clients during their tests
72 are caught and transferred to the main thread to alert
73 the testing framework.
75 Note, the server setup function cannot call any blocking
76 functions that rely on the client thread during setup,
77 unless serverExplicityReady() is called just before
78 the blocking call (such as in setting up a client/server
79 connection and performing the accept() in setUp().
80 """
82 def __init__(self):
83 # Swap the true setup function
84 self.__setUp = self.setUp
85 self.__tearDown = self.tearDown
86 self.setUp = self._setUp
87 self.tearDown = self._tearDown
89 def serverExplicitReady(self):
90 """This method allows the server to explicitly indicate that
91 it wants the client thread to proceed. This is useful if the
92 server is about to execute a blocking routine that is
93 dependent upon the client thread during its setup routine."""
94 self.server_ready.set()
96 def _setUp(self):
97 self.server_ready = threading.Event()
98 self.client_ready = threading.Event()
99 self.done = threading.Event()
100 self.queue = Queue.Queue(1)
102 # Do some munging to start the client test.
103 methodname = self.id()
104 i = methodname.rfind('.')
105 methodname = methodname[i+1:]
106 test_method = getattr(self, '_' + methodname)
107 self.client_thread = thread.start_new_thread(
108 self.clientRun, (test_method,))
110 self.__setUp()
111 if not self.server_ready.isSet():
112 self.server_ready.set()
113 self.client_ready.wait()
115 def _tearDown(self):
116 self.__tearDown()
117 self.done.wait()
119 if not self.queue.empty():
120 msg = self.queue.get()
121 self.fail(msg)
123 def clientRun(self, test_func):
124 self.server_ready.wait()
125 self.client_ready.set()
126 self.clientSetUp()
127 if not callable(test_func):
128 raise TypeError, "test_func must be a callable function"
129 try:
130 test_func()
131 except Exception, strerror:
132 self.queue.put(strerror)
133 self.clientTearDown()
135 def clientSetUp(self):
136 raise NotImplementedError, "clientSetUp must be implemented."
138 def clientTearDown(self):
139 self.done.set()
140 thread.exit()
142 class ThreadedTCPSocketTest(SocketTCPTest, ThreadableTest):
144 def __init__(self, methodName='runTest'):
145 SocketTCPTest.__init__(self, methodName=methodName)
146 ThreadableTest.__init__(self)
148 def clientSetUp(self):
149 self.cli = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
151 def clientTearDown(self):
152 self.cli.close()
153 self.cli = None
154 ThreadableTest.clientTearDown(self)
156 class ThreadedUDPSocketTest(SocketUDPTest, ThreadableTest):
158 def __init__(self, methodName='runTest'):
159 SocketUDPTest.__init__(self, methodName=methodName)
160 ThreadableTest.__init__(self)
162 def clientSetUp(self):
163 self.cli = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
165 class SocketConnectedTest(ThreadedTCPSocketTest):
167 def __init__(self, methodName='runTest'):
168 ThreadedTCPSocketTest.__init__(self, methodName=methodName)
170 def setUp(self):
171 ThreadedTCPSocketTest.setUp(self)
172 # Indicate explicitly we're ready for the client thread to
173 # proceed and then perform the blocking call to accept
174 self.serverExplicitReady()
175 conn, addr = self.serv.accept()
176 self.cli_conn = conn
178 def tearDown(self):
179 self.cli_conn.close()
180 self.cli_conn = None
181 ThreadedTCPSocketTest.tearDown(self)
183 def clientSetUp(self):
184 ThreadedTCPSocketTest.clientSetUp(self)
185 self.cli.connect((HOST, PORT))
186 self.serv_conn = self.cli
188 def clientTearDown(self):
189 self.serv_conn.close()
190 self.serv_conn = None
191 ThreadedTCPSocketTest.clientTearDown(self)
193 class SocketPairTest(unittest.TestCase, ThreadableTest):
195 def __init__(self, methodName='runTest'):
196 unittest.TestCase.__init__(self, methodName=methodName)
197 ThreadableTest.__init__(self)
199 def setUp(self):
200 self.serv, self.cli = socket.socketpair()
202 def tearDown(self):
203 self.serv.close()
204 self.serv = None
206 def clientSetUp(self):
207 pass
209 def clientTearDown(self):
210 self.cli.close()
211 self.cli = None
212 ThreadableTest.clientTearDown(self)
215 #######################################################################
216 ## Begin Tests
218 class GeneralModuleTests(unittest.TestCase):
220 def test_weakref(self):
221 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
222 p = proxy(s)
223 self.assertEqual(p.fileno(), s.fileno())
224 s.close()
225 s = None
226 try:
227 p.fileno()
228 except ReferenceError:
229 pass
230 else:
231 self.fail('Socket proxy still exists')
233 def testSocketError(self):
234 # Testing socket module exceptions
235 def raise_error(*args, **kwargs):
236 raise socket.error
237 def raise_herror(*args, **kwargs):
238 raise socket.herror
239 def raise_gaierror(*args, **kwargs):
240 raise socket.gaierror
241 self.failUnlessRaises(socket.error, raise_error,
242 "Error raising socket exception.")
243 self.failUnlessRaises(socket.error, raise_herror,
244 "Error raising socket exception.")
245 self.failUnlessRaises(socket.error, raise_gaierror,
246 "Error raising socket exception.")
248 def testCrucialConstants(self):
249 # Testing for mission critical constants
250 socket.AF_INET
251 socket.SOCK_STREAM
252 socket.SOCK_DGRAM
253 socket.SOCK_RAW
254 socket.SOCK_RDM
255 socket.SOCK_SEQPACKET
256 socket.SOL_SOCKET
257 socket.SO_REUSEADDR
259 def testHostnameRes(self):
260 # Testing hostname resolution mechanisms
261 hostname = socket.gethostname()
262 try:
263 ip = socket.gethostbyname(hostname)
264 except socket.error:
265 # Probably name lookup wasn't set up right; skip this test
266 return
267 self.assert_(ip.find('.') >= 0, "Error resolving host to ip.")
268 try:
269 hname, aliases, ipaddrs = socket.gethostbyaddr(ip)
270 except socket.error:
271 # Probably a similar problem as above; skip this test
272 return
273 all_host_names = [hostname, hname] + aliases
274 fqhn = socket.getfqdn(ip)
275 if not fqhn in all_host_names:
276 self.fail("Error testing host resolution mechanisms. (fqdn: %s, all: %s)" % (fqhn, repr(all_host_names)))
278 def testRefCountGetNameInfo(self):
279 # Testing reference count for getnameinfo
280 import sys
281 if hasattr(sys, "getrefcount"):
282 try:
283 # On some versions, this loses a reference
284 orig = sys.getrefcount(__name__)
285 socket.getnameinfo(__name__,0)
286 except SystemError:
287 if sys.getrefcount(__name__) <> orig:
288 self.fail("socket.getnameinfo loses a reference")
290 def testInterpreterCrash(self):
291 # Making sure getnameinfo doesn't crash the interpreter
292 try:
293 # On some versions, this crashes the interpreter.
294 socket.getnameinfo(('x', 0, 0, 0), 0)
295 except socket.error:
296 pass
298 def testNtoH(self):
299 # This just checks that htons etc. are their own inverse,
300 # when looking at the lower 16 or 32 bits.
301 sizes = {socket.htonl: 32, socket.ntohl: 32,
302 socket.htons: 16, socket.ntohs: 16}
303 for func, size in sizes.items():
304 mask = (1L<<size) - 1
305 for i in (0, 1, 0xffff, ~0xffff, 2, 0x01234567, 0x76543210):
306 self.assertEqual(i & mask, func(func(i&mask)) & mask)
308 swapped = func(mask)
309 self.assertEqual(swapped & mask, mask)
310 self.assertRaises(OverflowError, func, 1L<<34)
312 def testGetServBy(self):
313 eq = self.assertEqual
314 # Find one service that exists, then check all the related interfaces.
315 # I've ordered this by protocols that have both a tcp and udp
316 # protocol, at least for modern Linuxes.
317 if sys.platform in ('linux2', 'freebsd4', 'freebsd5', 'freebsd6',
318 'freebsd7', 'darwin'):
319 # avoid the 'echo' service on this platform, as there is an
320 # assumption breaking non-standard port/protocol entry
321 services = ('daytime', 'qotd', 'domain')
322 else:
323 services = ('echo', 'daytime', 'domain')
324 for service in services:
325 try:
326 port = socket.getservbyname(service, 'tcp')
327 break
328 except socket.error:
329 pass
330 else:
331 raise socket.error
332 # Try same call with optional protocol omitted
333 port2 = socket.getservbyname(service)
334 eq(port, port2)
335 # Try udp, but don't barf it it doesn't exist
336 try:
337 udpport = socket.getservbyname(service, 'udp')
338 except socket.error:
339 udpport = None
340 else:
341 eq(udpport, port)
342 # Now make sure the lookup by port returns the same service name
343 eq(socket.getservbyport(port2), service)
344 eq(socket.getservbyport(port, 'tcp'), service)
345 if udpport is not None:
346 eq(socket.getservbyport(udpport, 'udp'), service)
348 def testDefaultTimeout(self):
349 # Testing default timeout
350 # The default timeout should initially be None
351 self.assertEqual(socket.getdefaulttimeout(), None)
352 s = socket.socket()
353 self.assertEqual(s.gettimeout(), None)
354 s.close()
356 # Set the default timeout to 10, and see if it propagates
357 socket.setdefaulttimeout(10)
358 self.assertEqual(socket.getdefaulttimeout(), 10)
359 s = socket.socket()
360 self.assertEqual(s.gettimeout(), 10)
361 s.close()
363 # Reset the default timeout to None, and see if it propagates
364 socket.setdefaulttimeout(None)
365 self.assertEqual(socket.getdefaulttimeout(), None)
366 s = socket.socket()
367 self.assertEqual(s.gettimeout(), None)
368 s.close()
370 # Check that setting it to an invalid value raises ValueError
371 self.assertRaises(ValueError, socket.setdefaulttimeout, -1)
373 # Check that setting it to an invalid type raises TypeError
374 self.assertRaises(TypeError, socket.setdefaulttimeout, "spam")
376 def testIPv4toString(self):
377 if not hasattr(socket, 'inet_pton'):
378 return # No inet_pton() on this platform
379 from socket import inet_aton as f, inet_pton, AF_INET
380 g = lambda a: inet_pton(AF_INET, a)
382 self.assertEquals('\x00\x00\x00\x00', f('0.0.0.0'))
383 self.assertEquals('\xff\x00\xff\x00', f('255.0.255.0'))
384 self.assertEquals('\xaa\xaa\xaa\xaa', f('170.170.170.170'))
385 self.assertEquals('\x01\x02\x03\x04', f('1.2.3.4'))
386 self.assertEquals('\xff\xff\xff\xff', f('255.255.255.255'))
388 self.assertEquals('\x00\x00\x00\x00', g('0.0.0.0'))
389 self.assertEquals('\xff\x00\xff\x00', g('255.0.255.0'))
390 self.assertEquals('\xaa\xaa\xaa\xaa', g('170.170.170.170'))
391 self.assertEquals('\xff\xff\xff\xff', g('255.255.255.255'))
393 def testIPv6toString(self):
394 if not hasattr(socket, 'inet_pton'):
395 return # No inet_pton() on this platform
396 try:
397 from socket import inet_pton, AF_INET6, has_ipv6
398 if not has_ipv6:
399 return
400 except ImportError:
401 return
402 f = lambda a: inet_pton(AF_INET6, a)
404 self.assertEquals('\x00' * 16, f('::'))
405 self.assertEquals('\x00' * 16, f('0::0'))
406 self.assertEquals('\x00\x01' + '\x00' * 14, f('1::'))
407 self.assertEquals(
408 '\x45\xef\x76\xcb\x00\x1a\x56\xef\xaf\xeb\x0b\xac\x19\x24\xae\xae',
409 f('45ef:76cb:1a:56ef:afeb:bac:1924:aeae')
412 def testStringToIPv4(self):
413 if not hasattr(socket, 'inet_ntop'):
414 return # No inet_ntop() on this platform
415 from socket import inet_ntoa as f, inet_ntop, AF_INET
416 g = lambda a: inet_ntop(AF_INET, a)
418 self.assertEquals('1.0.1.0', f('\x01\x00\x01\x00'))
419 self.assertEquals('170.85.170.85', f('\xaa\x55\xaa\x55'))
420 self.assertEquals('255.255.255.255', f('\xff\xff\xff\xff'))
421 self.assertEquals('1.2.3.4', f('\x01\x02\x03\x04'))
423 self.assertEquals('1.0.1.0', g('\x01\x00\x01\x00'))
424 self.assertEquals('170.85.170.85', g('\xaa\x55\xaa\x55'))
425 self.assertEquals('255.255.255.255', g('\xff\xff\xff\xff'))
427 def testStringToIPv6(self):
428 if not hasattr(socket, 'inet_ntop'):
429 return # No inet_ntop() on this platform
430 try:
431 from socket import inet_ntop, AF_INET6, has_ipv6
432 if not has_ipv6:
433 return
434 except ImportError:
435 return
436 f = lambda a: inet_ntop(AF_INET6, a)
438 self.assertEquals('::', f('\x00' * 16))
439 self.assertEquals('::1', f('\x00' * 15 + '\x01'))
440 self.assertEquals(
441 'aef:b01:506:1001:ffff:9997:55:170',
442 f('\x0a\xef\x0b\x01\x05\x06\x10\x01\xff\xff\x99\x97\x00\x55\x01\x70')
445 # XXX The following don't test module-level functionality...
447 def testSockName(self):
448 # Testing getsockname()
449 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
450 sock.bind(("0.0.0.0", PORT+1))
451 name = sock.getsockname()
452 # XXX(nnorwitz): http://tinyurl.com/os5jz seems to indicate
453 # it reasonable to get the host's addr in addition to 0.0.0.0.
454 # At least for eCos. This is required for the S/390 to pass.
455 my_ip_addr = socket.gethostbyname(socket.gethostname())
456 self.assert_(name[0] in ("0.0.0.0", my_ip_addr), '%s invalid' % name[0])
457 self.assertEqual(name[1], PORT+1)
459 def testGetSockOpt(self):
460 # Testing getsockopt()
461 # We know a socket should start without reuse==0
462 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
463 reuse = sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR)
464 self.failIf(reuse != 0, "initial mode is reuse")
466 def testSetSockOpt(self):
467 # Testing setsockopt()
468 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
469 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
470 reuse = sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR)
471 self.failIf(reuse == 0, "failed to set reuse mode")
473 def testSendAfterClose(self):
474 # testing send() after close() with timeout
475 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
476 sock.settimeout(1)
477 sock.close()
478 self.assertRaises(socket.error, sock.send, "spam")
480 def testNewAttributes(self):
481 # testing .family, .type and .protocol
482 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
483 self.assertEqual(sock.family, socket.AF_INET)
484 self.assertEqual(sock.type, socket.SOCK_STREAM)
485 self.assertEqual(sock.proto, 0)
486 sock.close()
488 class BasicTCPTest(SocketConnectedTest):
490 def __init__(self, methodName='runTest'):
491 SocketConnectedTest.__init__(self, methodName=methodName)
493 def testRecv(self):
494 # Testing large receive over TCP
495 msg = self.cli_conn.recv(1024)
496 self.assertEqual(msg, MSG)
498 def _testRecv(self):
499 self.serv_conn.send(MSG)
501 def testOverFlowRecv(self):
502 # Testing receive in chunks over TCP
503 seg1 = self.cli_conn.recv(len(MSG) - 3)
504 seg2 = self.cli_conn.recv(1024)
505 msg = seg1 + seg2
506 self.assertEqual(msg, MSG)
508 def _testOverFlowRecv(self):
509 self.serv_conn.send(MSG)
511 def testRecvFrom(self):
512 # Testing large recvfrom() over TCP
513 msg, addr = self.cli_conn.recvfrom(1024)
514 self.assertEqual(msg, MSG)
516 def _testRecvFrom(self):
517 self.serv_conn.send(MSG)
519 def testOverFlowRecvFrom(self):
520 # Testing recvfrom() in chunks over TCP
521 seg1, addr = self.cli_conn.recvfrom(len(MSG)-3)
522 seg2, addr = self.cli_conn.recvfrom(1024)
523 msg = seg1 + seg2
524 self.assertEqual(msg, MSG)
526 def _testOverFlowRecvFrom(self):
527 self.serv_conn.send(MSG)
529 def testSendAll(self):
530 # Testing sendall() with a 2048 byte string over TCP
531 msg = ''
532 while 1:
533 read = self.cli_conn.recv(1024)
534 if not read:
535 break
536 msg += read
537 self.assertEqual(msg, 'f' * 2048)
539 def _testSendAll(self):
540 big_chunk = 'f' * 2048
541 self.serv_conn.sendall(big_chunk)
543 def testFromFd(self):
544 # Testing fromfd()
545 if not hasattr(socket, "fromfd"):
546 return # On Windows, this doesn't exist
547 fd = self.cli_conn.fileno()
548 sock = socket.fromfd(fd, socket.AF_INET, socket.SOCK_STREAM)
549 msg = sock.recv(1024)
550 self.assertEqual(msg, MSG)
552 def _testFromFd(self):
553 self.serv_conn.send(MSG)
555 def testShutdown(self):
556 # Testing shutdown()
557 msg = self.cli_conn.recv(1024)
558 self.assertEqual(msg, MSG)
560 def _testShutdown(self):
561 self.serv_conn.send(MSG)
562 self.serv_conn.shutdown(2)
564 class BasicUDPTest(ThreadedUDPSocketTest):
566 def __init__(self, methodName='runTest'):
567 ThreadedUDPSocketTest.__init__(self, methodName=methodName)
569 def testSendtoAndRecv(self):
570 # Testing sendto() and Recv() over UDP
571 msg = self.serv.recv(len(MSG))
572 self.assertEqual(msg, MSG)
574 def _testSendtoAndRecv(self):
575 self.cli.sendto(MSG, 0, (HOST, PORT))
577 def testRecvFrom(self):
578 # Testing recvfrom() over UDP
579 msg, addr = self.serv.recvfrom(len(MSG))
580 self.assertEqual(msg, MSG)
582 def _testRecvFrom(self):
583 self.cli.sendto(MSG, 0, (HOST, PORT))
585 class BasicSocketPairTest(SocketPairTest):
587 def __init__(self, methodName='runTest'):
588 SocketPairTest.__init__(self, methodName=methodName)
590 def testRecv(self):
591 msg = self.serv.recv(1024)
592 self.assertEqual(msg, MSG)
594 def _testRecv(self):
595 self.cli.send(MSG)
597 def testSend(self):
598 self.serv.send(MSG)
600 def _testSend(self):
601 msg = self.cli.recv(1024)
602 self.assertEqual(msg, MSG)
604 class NonBlockingTCPTests(ThreadedTCPSocketTest):
606 def __init__(self, methodName='runTest'):
607 ThreadedTCPSocketTest.__init__(self, methodName=methodName)
609 def testSetBlocking(self):
610 # Testing whether set blocking works
611 self.serv.setblocking(0)
612 start = time.time()
613 try:
614 self.serv.accept()
615 except socket.error:
616 pass
617 end = time.time()
618 self.assert_((end - start) < 1.0, "Error setting non-blocking mode.")
620 def _testSetBlocking(self):
621 pass
623 def testAccept(self):
624 # Testing non-blocking accept
625 self.serv.setblocking(0)
626 try:
627 conn, addr = self.serv.accept()
628 except socket.error:
629 pass
630 else:
631 self.fail("Error trying to do non-blocking accept.")
632 read, write, err = select.select([self.serv], [], [])
633 if self.serv in read:
634 conn, addr = self.serv.accept()
635 else:
636 self.fail("Error trying to do accept after select.")
638 def _testAccept(self):
639 time.sleep(0.1)
640 self.cli.connect((HOST, PORT))
642 def testConnect(self):
643 # Testing non-blocking connect
644 conn, addr = self.serv.accept()
646 def _testConnect(self):
647 self.cli.settimeout(10)
648 self.cli.connect((HOST, PORT))
650 def testRecv(self):
651 # Testing non-blocking recv
652 conn, addr = self.serv.accept()
653 conn.setblocking(0)
654 try:
655 msg = conn.recv(len(MSG))
656 except socket.error:
657 pass
658 else:
659 self.fail("Error trying to do non-blocking recv.")
660 read, write, err = select.select([conn], [], [])
661 if conn in read:
662 msg = conn.recv(len(MSG))
663 self.assertEqual(msg, MSG)
664 else:
665 self.fail("Error during select call to non-blocking socket.")
667 def _testRecv(self):
668 self.cli.connect((HOST, PORT))
669 time.sleep(0.1)
670 self.cli.send(MSG)
672 class FileObjectClassTestCase(SocketConnectedTest):
674 bufsize = -1 # Use default buffer size
676 def __init__(self, methodName='runTest'):
677 SocketConnectedTest.__init__(self, methodName=methodName)
679 def setUp(self):
680 SocketConnectedTest.setUp(self)
681 self.serv_file = self.cli_conn.makefile('rb', self.bufsize)
683 def tearDown(self):
684 self.serv_file.close()
685 self.assert_(self.serv_file.closed)
686 self.serv_file = None
687 SocketConnectedTest.tearDown(self)
689 def clientSetUp(self):
690 SocketConnectedTest.clientSetUp(self)
691 self.cli_file = self.serv_conn.makefile('wb')
693 def clientTearDown(self):
694 self.cli_file.close()
695 self.assert_(self.cli_file.closed)
696 self.cli_file = None
697 SocketConnectedTest.clientTearDown(self)
699 def testSmallRead(self):
700 # Performing small file read test
701 first_seg = self.serv_file.read(len(MSG)-3)
702 second_seg = self.serv_file.read(3)
703 msg = first_seg + second_seg
704 self.assertEqual(msg, MSG)
706 def _testSmallRead(self):
707 self.cli_file.write(MSG)
708 self.cli_file.flush()
710 def testFullRead(self):
711 # read until EOF
712 msg = self.serv_file.read()
713 self.assertEqual(msg, MSG)
715 def _testFullRead(self):
716 self.cli_file.write(MSG)
717 self.cli_file.close()
719 def testUnbufferedRead(self):
720 # Performing unbuffered file read test
721 buf = ''
722 while 1:
723 char = self.serv_file.read(1)
724 if not char:
725 break
726 buf += char
727 self.assertEqual(buf, MSG)
729 def _testUnbufferedRead(self):
730 self.cli_file.write(MSG)
731 self.cli_file.flush()
733 def testReadline(self):
734 # Performing file readline test
735 line = self.serv_file.readline()
736 self.assertEqual(line, MSG)
738 def _testReadline(self):
739 self.cli_file.write(MSG)
740 self.cli_file.flush()
742 def testClosedAttr(self):
743 self.assert_(not self.serv_file.closed)
745 def _testClosedAttr(self):
746 self.assert_(not self.cli_file.closed)
748 class UnbufferedFileObjectClassTestCase(FileObjectClassTestCase):
750 """Repeat the tests from FileObjectClassTestCase with bufsize==0.
752 In this case (and in this case only), it should be possible to
753 create a file object, read a line from it, create another file
754 object, read another line from it, without loss of data in the
755 first file object's buffer. Note that httplib relies on this
756 when reading multiple requests from the same socket."""
758 bufsize = 0 # Use unbuffered mode
760 def testUnbufferedReadline(self):
761 # Read a line, create a new file object, read another line with it
762 line = self.serv_file.readline() # first line
763 self.assertEqual(line, "A. " + MSG) # first line
764 self.serv_file = self.cli_conn.makefile('rb', 0)
765 line = self.serv_file.readline() # second line
766 self.assertEqual(line, "B. " + MSG) # second line
768 def _testUnbufferedReadline(self):
769 self.cli_file.write("A. " + MSG)
770 self.cli_file.write("B. " + MSG)
771 self.cli_file.flush()
773 class LineBufferedFileObjectClassTestCase(FileObjectClassTestCase):
775 bufsize = 1 # Default-buffered for reading; line-buffered for writing
778 class SmallBufferedFileObjectClassTestCase(FileObjectClassTestCase):
780 bufsize = 2 # Exercise the buffering code
782 class TCPTimeoutTest(SocketTCPTest):
784 def testTCPTimeout(self):
785 def raise_timeout(*args, **kwargs):
786 self.serv.settimeout(1.0)
787 self.serv.accept()
788 self.failUnlessRaises(socket.timeout, raise_timeout,
789 "Error generating a timeout exception (TCP)")
791 def testTimeoutZero(self):
792 ok = False
793 try:
794 self.serv.settimeout(0.0)
795 foo = self.serv.accept()
796 except socket.timeout:
797 self.fail("caught timeout instead of error (TCP)")
798 except socket.error:
799 ok = True
800 except:
801 self.fail("caught unexpected exception (TCP)")
802 if not ok:
803 self.fail("accept() returned success when we did not expect it")
805 class UDPTimeoutTest(SocketTCPTest):
807 def testUDPTimeout(self):
808 def raise_timeout(*args, **kwargs):
809 self.serv.settimeout(1.0)
810 self.serv.recv(1024)
811 self.failUnlessRaises(socket.timeout, raise_timeout,
812 "Error generating a timeout exception (UDP)")
814 def testTimeoutZero(self):
815 ok = False
816 try:
817 self.serv.settimeout(0.0)
818 foo = self.serv.recv(1024)
819 except socket.timeout:
820 self.fail("caught timeout instead of error (UDP)")
821 except socket.error:
822 ok = True
823 except:
824 self.fail("caught unexpected exception (UDP)")
825 if not ok:
826 self.fail("recv() returned success when we did not expect it")
828 class TestExceptions(unittest.TestCase):
830 def testExceptionTree(self):
831 self.assert_(issubclass(socket.error, Exception))
832 self.assert_(issubclass(socket.herror, socket.error))
833 self.assert_(issubclass(socket.gaierror, socket.error))
834 self.assert_(issubclass(socket.timeout, socket.error))
836 class TestLinuxAbstractNamespace(unittest.TestCase):
838 UNIX_PATH_MAX = 108
840 def testLinuxAbstractNamespace(self):
841 address = "\x00python-test-hello\x00\xff"
842 s1 = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
843 s1.bind(address)
844 s1.listen(1)
845 s2 = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
846 s2.connect(s1.getsockname())
847 s1.accept()
848 self.assertEqual(s1.getsockname(), address)
849 self.assertEqual(s2.getpeername(), address)
851 def testMaxName(self):
852 address = "\x00" + "h" * (self.UNIX_PATH_MAX - 1)
853 s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
854 s.bind(address)
855 self.assertEqual(s.getsockname(), address)
857 def testNameOverflow(self):
858 address = "\x00" + "h" * self.UNIX_PATH_MAX
859 s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
860 self.assertRaises(socket.error, s.bind, address)
863 class BufferIOTest(SocketConnectedTest):
865 Test the buffer versions of socket.recv() and socket.send().
867 def __init__(self, methodName='runTest'):
868 SocketConnectedTest.__init__(self, methodName=methodName)
870 def testRecvInto(self):
871 buf = array.array('c', ' '*1024)
872 nbytes = self.cli_conn.recv_into(buf)
873 self.assertEqual(nbytes, len(MSG))
874 msg = buf.tostring()[:len(MSG)]
875 self.assertEqual(msg, MSG)
877 def _testRecvInto(self):
878 buf = buffer(MSG)
879 self.serv_conn.send(buf)
881 def testRecvFromInto(self):
882 buf = array.array('c', ' '*1024)
883 nbytes, addr = self.cli_conn.recvfrom_into(buf)
884 self.assertEqual(nbytes, len(MSG))
885 msg = buf.tostring()[:len(MSG)]
886 self.assertEqual(msg, MSG)
888 def _testRecvFromInto(self):
889 buf = buffer(MSG)
890 self.serv_conn.send(buf)
892 def test_main():
893 tests = [GeneralModuleTests, BasicTCPTest, TCPTimeoutTest, TestExceptions,
894 BufferIOTest]
895 if sys.platform != 'mac':
896 tests.extend([ BasicUDPTest, UDPTimeoutTest ])
898 tests.extend([
899 NonBlockingTCPTests,
900 FileObjectClassTestCase,
901 UnbufferedFileObjectClassTestCase,
902 LineBufferedFileObjectClassTestCase,
903 SmallBufferedFileObjectClassTestCase
905 if hasattr(socket, "socketpair"):
906 tests.append(BasicSocketPairTest)
907 if sys.platform == 'linux2':
908 tests.append(TestLinuxAbstractNamespace)
909 test_support.run_unittest(*tests)
911 if __name__ == "__main__":
912 test_main()