1 # Test the support for SSL and sockets
5 from test
import test_support
14 import urllib
, urlparse
18 from BaseHTTPServer
import HTTPServer
19 from SimpleHTTPServer
import SimpleHTTPRequestHandler
21 # Optionally test SSL support, if we have it in the tested platform
28 HOST
= test_support
.HOST
30 SVN_PYTHON_ORG_ROOT_CERT
= None
32 def handle_error(prefix
):
33 exc_format
= ' '.join(traceback
.format_exception(*sys
.exc_info()))
34 if test_support
.verbose
:
35 sys
.stdout
.write(prefix
+ exc_format
)
38 class BasicTests(unittest
.TestCase
):
40 def test_sslwrap_simple(self
):
41 # A crude test for the legacy API
43 ssl
.sslwrap_simple(socket
.socket(socket
.AF_INET
))
45 if e
.errno
== 32: # broken pipe when ssl_sock.do_handshake(), this test doesn't care about that
50 ssl
.sslwrap_simple(socket
.socket(socket
.AF_INET
)._sock
)
52 if e
.errno
== 32: # broken pipe when ssl_sock.do_handshake(), this test doesn't care about that
57 def test_constants(self
):
66 def test_random(self
):
68 if test_support
.verbose
:
69 sys
.stdout
.write("\n RAND_status is %d (%s)\n"
70 % (v
, (v
and "sufficient randomness") or
71 "insufficient randomness"))
77 print "didn't raise TypeError"
78 ssl
.RAND_add("this is a random string", 75.0)
80 def test_parse_cert(self
):
81 # note that this uses an 'unofficial' function in _ssl.c,
82 # provided solely for this test, to exercise the certificate
84 p
= ssl
._ssl
._test
_decode
_cert
(CERTFILE
, False)
85 if test_support
.verbose
:
86 sys
.stdout
.write("\n" + pprint
.pformat(p
) + "\n")
88 def test_DER_to_PEM(self
):
89 with
open(SVN_PYTHON_ORG_ROOT_CERT
, 'r') as f
:
91 d1
= ssl
.PEM_cert_to_DER_cert(pem
)
92 p2
= ssl
.DER_cert_to_PEM_cert(d1
)
93 d2
= ssl
.PEM_cert_to_DER_cert(p2
)
94 self
.assertEqual(d1
, d2
)
95 if not p2
.startswith(ssl
.PEM_HEADER
+ '\n'):
96 self
.fail("DER-to-PEM didn't include correct header:\n%r\n" % p2
)
97 if not p2
.endswith('\n' + ssl
.PEM_FOOTER
+ '\n'):
98 self
.fail("DER-to-PEM didn't include correct footer:\n%r\n" % p2
)
100 def test_openssl_version(self
):
101 n
= ssl
.OPENSSL_VERSION_NUMBER
102 t
= ssl
.OPENSSL_VERSION_INFO
103 s
= ssl
.OPENSSL_VERSION
104 self
.assertIsInstance(n
, (int, long))
105 self
.assertIsInstance(t
, tuple)
106 self
.assertIsInstance(s
, str)
107 # Some sanity checks follow
109 self
.assertGreaterEqual(n
, 0x900000)
111 self
.assertLess(n
, 0x20000000)
112 major
, minor
, fix
, patch
, status
= t
113 self
.assertGreaterEqual(major
, 0)
114 self
.assertLess(major
, 2)
115 self
.assertGreaterEqual(minor
, 0)
116 self
.assertLess(minor
, 256)
117 self
.assertGreaterEqual(fix
, 0)
118 self
.assertLess(fix
, 256)
119 self
.assertGreaterEqual(patch
, 0)
120 self
.assertLessEqual(patch
, 26)
121 self
.assertGreaterEqual(status
, 0)
122 self
.assertLessEqual(status
, 15)
123 # Version string as returned by OpenSSL, the format might change
124 self
.assertTrue(s
.startswith("OpenSSL {:d}.{:d}.{:d}".format(major
, minor
, fix
)),
127 def test_ciphers(self
):
128 if not test_support
.is_resource_enabled('network'):
130 remote
= ("svn.python.org", 443)
131 s
= ssl
.wrap_socket(socket
.socket(socket
.AF_INET
),
132 cert_reqs
=ssl
.CERT_NONE
, ciphers
="ALL")
134 s
= ssl
.wrap_socket(socket
.socket(socket
.AF_INET
),
135 cert_reqs
=ssl
.CERT_NONE
, ciphers
="DEFAULT")
137 # Error checking occurs when connecting, because the SSL context
138 # isn't created before.
139 s
= ssl
.wrap_socket(socket
.socket(socket
.AF_INET
),
140 cert_reqs
=ssl
.CERT_NONE
, ciphers
="^$:,;?*'dorothyx")
141 with self
.assertRaisesRegexp(ssl
.SSLError
, "No cipher can be selected"):
144 @test_support.cpython_only
145 def test_refcycle(self
):
146 # Issue #7943: an SSL object doesn't create reference cycles with
148 s
= socket
.socket(socket
.AF_INET
)
149 ss
= ssl
.wrap_socket(s
)
152 self
.assertEqual(wr(), None)
155 class NetworkedTests(unittest
.TestCase
):
157 def test_connect(self
):
158 s
= ssl
.wrap_socket(socket
.socket(socket
.AF_INET
),
159 cert_reqs
=ssl
.CERT_NONE
)
160 s
.connect(("svn.python.org", 443))
163 self
.fail("Peer cert %s shouldn't be here!")
166 # this should fail because we have no verification certs
167 s
= ssl
.wrap_socket(socket
.socket(socket
.AF_INET
),
168 cert_reqs
=ssl
.CERT_REQUIRED
)
170 s
.connect(("svn.python.org", 443))
176 # this should succeed because we specify the root cert
177 s
= ssl
.wrap_socket(socket
.socket(socket
.AF_INET
),
178 cert_reqs
=ssl
.CERT_REQUIRED
,
179 ca_certs
=SVN_PYTHON_ORG_ROOT_CERT
)
181 s
.connect(("svn.python.org", 443))
185 @unittest.skipIf(os
.name
== "nt", "Can't use a socket as a file under Windows")
186 def test_makefile_close(self
):
187 # Issue #5238: creating a file-like object with makefile() shouldn't
188 # delay closing the underlying "real socket" (here tested with its
189 # file descriptor, hence skipping the test under Windows).
190 ss
= ssl
.wrap_socket(socket
.socket(socket
.AF_INET
))
191 ss
.connect(("svn.python.org", 443))
195 # The fd is still open
197 # Closing the SSL socket should close the fd too
200 with self
.assertRaises(OSError) as e
:
202 self
.assertEqual(e
.exception
.errno
, errno
.EBADF
)
204 def test_non_blocking_handshake(self
):
205 s
= socket
.socket(socket
.AF_INET
)
206 s
.connect(("svn.python.org", 443))
208 s
= ssl
.wrap_socket(s
,
209 cert_reqs
=ssl
.CERT_NONE
,
210 do_handshake_on_connect
=False)
217 except ssl
.SSLError
, err
:
218 if err
.args
[0] == ssl
.SSL_ERROR_WANT_READ
:
219 select
.select([s
], [], [])
220 elif err
.args
[0] == ssl
.SSL_ERROR_WANT_WRITE
:
221 select
.select([], [s
], [])
225 if test_support
.verbose
:
226 sys
.stdout
.write("\nNeeded %d calls to do_handshake() to establish session.\n" % count
)
228 def test_get_server_certificate(self
):
229 pem
= ssl
.get_server_certificate(("svn.python.org", 443))
231 self
.fail("No server certificate on svn.python.org:443!")
234 pem
= ssl
.get_server_certificate(("svn.python.org", 443), ca_certs
=CERTFILE
)
239 self
.fail("Got server certificate %s for svn.python.org!" % pem
)
241 pem
= ssl
.get_server_certificate(("svn.python.org", 443), ca_certs
=SVN_PYTHON_ORG_ROOT_CERT
)
243 self
.fail("No server certificate on svn.python.org:443!")
244 if test_support
.verbose
:
245 sys
.stdout
.write("\nVerified certificate for svn.python.org:443 is\n%s\n" % pem
)
247 def test_algorithms(self
):
248 # Issue #8484: all algorithms should be available when verifying a
250 # SHA256 was added in OpenSSL 0.9.8
251 if ssl
.OPENSSL_VERSION_INFO
< (0, 9, 8, 0, 15):
252 self
.skipTest("SHA256 not available on %r" % ssl
.OPENSSL_VERSION
)
253 # NOTE: https://sha256.tbs-internet.com is another possible test host
254 remote
= ("sha2.hboeck.de", 443)
255 sha256_cert
= os
.path
.join(os
.path
.dirname(__file__
), "sha256.pem")
256 s
= ssl
.wrap_socket(socket
.socket(socket
.AF_INET
),
257 cert_reqs
=ssl
.CERT_REQUIRED
,
258 ca_certs
=sha256_cert
,)
259 with test_support
.transient_internet():
262 if test_support
.verbose
:
263 sys
.stdout
.write("\nCipher with %r is %r\n" %
264 (remote
, s
.cipher()))
265 sys
.stdout
.write("Certificate is:\n%s\n" %
266 pprint
.pformat(s
.getpeercert()))
274 _have_threads
= False
278 class ThreadedEchoServer(threading
.Thread
):
280 class ConnectionHandler(threading
.Thread
):
282 """A mildly complicated class, because we want it to work both
283 with and without the SSL wrapper around the socket connection, so
284 that we can test the STARTTLS functionality."""
286 def __init__(self
, server
, connsock
):
290 self
.sock
.setblocking(1)
292 threading
.Thread
.__init
__(self
)
295 def show_conn_details(self
):
296 if self
.server
.certreqs
== ssl
.CERT_REQUIRED
:
297 cert
= self
.sslconn
.getpeercert()
298 if test_support
.verbose
and self
.server
.chatty
:
299 sys
.stdout
.write(" client cert is " + pprint
.pformat(cert
) + "\n")
300 cert_binary
= self
.sslconn
.getpeercert(True)
301 if test_support
.verbose
and self
.server
.chatty
:
302 sys
.stdout
.write(" cert binary is " + str(len(cert_binary
)) + " bytes\n")
303 cipher
= self
.sslconn
.cipher()
304 if test_support
.verbose
and self
.server
.chatty
:
305 sys
.stdout
.write(" server: connection cipher is now " + str(cipher
) + "\n")
309 self
.sslconn
= ssl
.wrap_socket(self
.sock
, server_side
=True,
310 certfile
=self
.server
.certificate
,
311 ssl_version
=self
.server
.protocol
,
312 ca_certs
=self
.server
.cacerts
,
313 cert_reqs
=self
.server
.certreqs
,
314 ciphers
=self
.server
.ciphers
)
316 # XXX Various errors can have happened here, for example
317 # a mismatching protocol version, an invalid certificate,
318 # or a low-level bug. This should be made more discriminating.
319 if self
.server
.chatty
:
320 handle_error("\n server: bad connection attempt from " +
321 str(self
.sock
.getpeername()) + ":\n")
331 return self
.sslconn
.read()
333 return self
.sock
.recv(1024)
335 def write(self
, bytes
):
337 return self
.sslconn
.write(bytes
)
339 return self
.sock
.send(bytes
)
345 self
.sock
._sock
.close()
349 if not self
.server
.starttls_server
:
350 if isinstance(self
.sock
, ssl
.SSLSocket
):
351 self
.sslconn
= self
.sock
352 elif not self
.wrap_conn():
354 self
.show_conn_details()
359 # eof, so quit this handler
362 elif msg
.strip() == 'over':
363 if test_support
.verbose
and self
.server
.connectionchatty
:
364 sys
.stdout
.write(" server: client closed connection\n")
367 elif self
.server
.starttls_server
and msg
.strip() == 'STARTTLS':
368 if test_support
.verbose
and self
.server
.connectionchatty
:
369 sys
.stdout
.write(" server: read STARTTLS from client, sending OK...\n")
371 if not self
.wrap_conn():
373 elif self
.server
.starttls_server
and self
.sslconn
and msg
.strip() == 'ENDTLS':
374 if test_support
.verbose
and self
.server
.connectionchatty
:
375 sys
.stdout
.write(" server: read ENDTLS from client, sending OK...\n")
377 self
.sslconn
.unwrap()
379 if test_support
.verbose
and self
.server
.connectionchatty
:
380 sys
.stdout
.write(" server: connection is now unencrypted...\n")
382 if (test_support
.verbose
and
383 self
.server
.connectionchatty
):
384 ctype
= (self
.sslconn
and "encrypted") or "unencrypted"
385 sys
.stdout
.write(" server: read %s (%s), sending back %s (%s)...\n"
386 % (repr(msg
), ctype
, repr(msg
.lower()), ctype
))
387 self
.write(msg
.lower())
389 if self
.server
.chatty
:
390 handle_error("Test server failure:\n")
393 # normally, we'd just stop here, but for the test
394 # harness, we want to stop the server
397 def __init__(self
, certificate
, ssl_version
=None,
398 certreqs
=None, cacerts
=None,
399 chatty
=True, connectionchatty
=False, starttls_server
=False,
400 wrap_accepting_socket
=False, ciphers
=None):
402 if ssl_version
is None:
403 ssl_version
= ssl
.PROTOCOL_TLSv1
405 certreqs
= ssl
.CERT_NONE
406 self
.certificate
= certificate
407 self
.protocol
= ssl_version
408 self
.certreqs
= certreqs
409 self
.cacerts
= cacerts
410 self
.ciphers
= ciphers
412 self
.connectionchatty
= connectionchatty
413 self
.starttls_server
= starttls_server
414 self
.sock
= socket
.socket()
416 if wrap_accepting_socket
:
417 self
.sock
= ssl
.wrap_socket(self
.sock
, server_side
=True,
418 certfile
=self
.certificate
,
419 cert_reqs
= self
.certreqs
,
420 ca_certs
= self
.cacerts
,
421 ssl_version
= self
.protocol
,
422 ciphers
= self
.ciphers
)
423 if test_support
.verbose
and self
.chatty
:
424 sys
.stdout
.write(' server: wrapped server socket as %s\n' % str(self
.sock
))
425 self
.port
= test_support
.bind_port(self
.sock
)
427 threading
.Thread
.__init
__(self
)
430 def start(self
, flag
=None):
432 threading
.Thread
.start(self
)
435 self
.sock
.settimeout(0.05)
443 newconn
, connaddr
= self
.sock
.accept()
444 if test_support
.verbose
and self
.chatty
:
445 sys
.stdout
.write(' server: new connection from '
446 + str(connaddr
) + '\n')
447 handler
= self
.ConnectionHandler(self
, newconn
)
449 except socket
.timeout
:
451 except KeyboardInterrupt:
458 class AsyncoreEchoServer(threading
.Thread
):
460 class EchoServer(asyncore
.dispatcher
):
462 class ConnectionHandler(asyncore
.dispatcher_with_send
):
464 def __init__(self
, conn
, certfile
):
465 asyncore
.dispatcher_with_send
.__init
__(self
, conn
)
466 self
.socket
= ssl
.wrap_socket(conn
, server_side
=True,
468 do_handshake_on_connect
=False)
469 self
._ssl
_accepting
= True
472 if isinstance(self
.socket
, ssl
.SSLSocket
):
473 while self
.socket
.pending() > 0:
474 self
.handle_read_event()
477 def _do_ssl_handshake(self
):
479 self
.socket
.do_handshake()
480 except ssl
.SSLError
, err
:
481 if err
.args
[0] in (ssl
.SSL_ERROR_WANT_READ
,
482 ssl
.SSL_ERROR_WANT_WRITE
):
484 elif err
.args
[0] == ssl
.SSL_ERROR_EOF
:
485 return self
.handle_close()
487 except socket
.error
, err
:
488 if err
.args
[0] == errno
.ECONNABORTED
:
489 return self
.handle_close()
491 self
._ssl
_accepting
= False
493 def handle_read(self
):
494 if self
._ssl
_accepting
:
495 self
._do
_ssl
_handshake
()
497 data
= self
.recv(1024)
498 if data
and data
.strip() != 'over':
499 self
.send(data
.lower())
501 def handle_close(self
):
503 if test_support
.verbose
:
504 sys
.stdout
.write(" server: closed connection %s\n" % self
.socket
)
506 def handle_error(self
):
509 def __init__(self
, certfile
):
510 self
.certfile
= certfile
511 asyncore
.dispatcher
.__init
__(self
)
512 self
.create_socket(socket
.AF_INET
, socket
.SOCK_STREAM
)
513 self
.port
= test_support
.bind_port(self
.socket
)
516 def handle_accept(self
):
517 sock_obj
, addr
= self
.accept()
518 if test_support
.verbose
:
519 sys
.stdout
.write(" server: new connection from %s:%s\n" %addr
)
520 self
.ConnectionHandler(sock_obj
, self
.certfile
)
522 def handle_error(self
):
525 def __init__(self
, certfile
):
528 self
.server
= self
.EchoServer(certfile
)
529 self
.port
= self
.server
.port
530 threading
.Thread
.__init
__(self
)
534 return "<%s %s>" % (self
.__class
__.__name
__, self
.server
)
536 def start(self
, flag
=None):
538 threading
.Thread
.start(self
)
551 class SocketServerHTTPSServer(threading
.Thread
):
553 class HTTPSServer(HTTPServer
):
555 def __init__(self
, server_address
, RequestHandlerClass
, certfile
):
556 HTTPServer
.__init
__(self
, server_address
, RequestHandlerClass
)
557 # we assume the certfile contains both private key and certificate
558 self
.certfile
= certfile
559 self
.allow_reuse_address
= True
562 return ('<%s %s:%s>' %
563 (self
.__class
__.__name
__,
567 def get_request(self
):
568 # override this to wrap socket with SSL
569 sock
, addr
= self
.socket
.accept()
570 sslconn
= ssl
.wrap_socket(sock
, server_side
=True,
571 certfile
=self
.certfile
)
574 class RootedHTTPRequestHandler(SimpleHTTPRequestHandler
):
575 # need to override translate_path to get a known root,
576 # instead of using os.curdir, since the test could be
579 server_version
= "TestHTTPS/1.0"
583 def translate_path(self
, path
):
584 """Translate a /-separated PATH to the local filename syntax.
586 Components that mean special things to the local file system
587 (e.g. drive or directory names) are ignored. (XXX They should
588 probably be diagnosed.)
591 # abandon query parameters
592 path
= urlparse
.urlparse(path
)[2]
593 path
= os
.path
.normpath(urllib
.unquote(path
))
594 words
= path
.split('/')
595 words
= filter(None, words
)
598 drive
, word
= os
.path
.splitdrive(word
)
599 head
, word
= os
.path
.split(word
)
600 if word
in self
.root
: continue
601 path
= os
.path
.join(path
, word
)
604 def log_message(self
, format
, *args
):
606 # we override this to suppress logging unless "verbose"
608 if test_support
.verbose
:
609 sys
.stdout
.write(" server (%s:%d %s):\n [%s] %s\n" %
610 (self
.server
.server_address
,
611 self
.server
.server_port
,
612 self
.request
.cipher(),
613 self
.log_date_time_string(),
617 def __init__(self
, certfile
):
619 self
.RootedHTTPRequestHandler
.root
= os
.path
.split(CERTFILE
)[0]
620 self
.server
= self
.HTTPSServer(
621 (HOST
, 0), self
.RootedHTTPRequestHandler
, certfile
)
622 self
.port
= self
.server
.server_port
623 threading
.Thread
.__init
__(self
)
627 return "<%s %s>" % (self
.__class
__.__name
__, self
.server
)
629 def start(self
, flag
=None):
631 threading
.Thread
.start(self
)
636 self
.server
.serve_forever(0.05)
639 self
.server
.shutdown()
642 def bad_cert_test(certfile
):
644 Launch a server with CERT_REQUIRED, and check that trying to
645 connect to it with the given client certificate fails.
647 server
= ThreadedEchoServer(CERTFILE
,
648 certreqs
=ssl
.CERT_REQUIRED
,
649 cacerts
=CERTFILE
, chatty
=False)
650 flag
= threading
.Event()
652 # wait for it to start
657 s
= ssl
.wrap_socket(socket
.socket(),
659 ssl_version
=ssl
.PROTOCOL_TLSv1
)
660 s
.connect((HOST
, server
.port
))
661 except ssl
.SSLError
, x
:
662 if test_support
.verbose
:
663 sys
.stdout
.write("\nSSLError is %s\n" % x
[1])
664 except socket
.error
, x
:
665 if test_support
.verbose
:
666 sys
.stdout
.write("\nsocket.error is %s\n" % x
[1])
668 raise AssertionError("Use of invalid cert should have failed!")
673 def server_params_test(certfile
, protocol
, certreqs
, cacertsfile
,
674 client_certfile
, client_protocol
=None, indata
="FOO\n",
675 ciphers
=None, chatty
=True, connectionchatty
=False,
676 wrap_accepting_socket
=False):
678 Launch a server, connect a client to it and try various reads
681 server
= ThreadedEchoServer(certfile
,
683 ssl_version
=protocol
,
687 connectionchatty
=connectionchatty
,
688 wrap_accepting_socket
=wrap_accepting_socket
)
689 flag
= threading
.Event()
691 # wait for it to start
694 if client_protocol
is None:
695 client_protocol
= protocol
697 s
= ssl
.wrap_socket(socket
.socket(),
698 certfile
=client_certfile
,
699 ca_certs
=cacertsfile
,
702 ssl_version
=client_protocol
)
703 s
.connect((HOST
, server
.port
))
704 for arg
in [indata
, bytearray(indata
), memoryview(indata
)]:
706 if test_support
.verbose
:
708 " client: sending %s...\n" % (repr(arg
)))
712 if test_support
.verbose
:
713 sys
.stdout
.write(" client: read %s\n" % repr(outdata
))
714 if outdata
!= indata
.lower():
715 raise AssertionError(
716 "bad data <<%s>> (%d) received; expected <<%s>> (%d)\n"
717 % (outdata
[:min(len(outdata
),20)], len(outdata
),
718 indata
[:min(len(indata
),20)].lower(), len(indata
)))
721 if test_support
.verbose
:
722 sys
.stdout
.write(" client: closing connection.\n")
728 def try_protocol_combo(server_protocol
,
732 if certsreqs
is None:
733 certsreqs
= ssl
.CERT_NONE
735 ssl
.CERT_NONE
: "CERT_NONE",
736 ssl
.CERT_OPTIONAL
: "CERT_OPTIONAL",
737 ssl
.CERT_REQUIRED
: "CERT_REQUIRED",
739 if test_support
.verbose
:
740 formatstr
= (expect_success
and " %s->%s %s\n") or " {%s->%s} %s\n"
741 sys
.stdout
.write(formatstr
%
742 (ssl
.get_protocol_name(client_protocol
),
743 ssl
.get_protocol_name(server_protocol
),
746 # NOTE: we must enable "ALL" ciphers, otherwise an SSLv23 client
747 # will send an SSLv3 hello (rather than SSLv2) starting from
748 # OpenSSL 1.0.0 (see issue #8322).
749 server_params_test(CERTFILE
, server_protocol
, certsreqs
,
750 CERTFILE
, CERTFILE
, client_protocol
,
751 ciphers
="ALL", chatty
=False)
752 # Protocol mismatch can result in either an SSLError, or a
753 # "Connection reset by peer" error.
757 except socket
.error
as e
:
758 if expect_success
or e
.errno
!= errno
.ECONNRESET
:
761 if not expect_success
:
762 raise AssertionError(
763 "Client protocol %s succeeded with server protocol %s!"
764 % (ssl
.get_protocol_name(client_protocol
),
765 ssl
.get_protocol_name(server_protocol
)))
768 class ThreadedTests(unittest
.TestCase
):
770 def test_rude_shutdown(self
):
771 """A brutal shutdown of an SSL server should raise an IOError
772 in the client when attempting handshake.
774 listener_ready
= threading
.Event()
775 listener_gone
= threading
.Event()
778 port
= test_support
.bind_port(s
, HOST
)
780 # `listener` runs in a thread. It sits in an accept() until
781 # the main thread connects. Then it rudely closes the socket,
782 # and sets Event `listener_gone` to let the main thread know
783 # the socket is gone.
792 listener_ready
.wait()
794 c
.connect((HOST
, port
))
797 ssl_sock
= ssl
.wrap_socket(c
)
801 self
.fail('connecting to closed SSL socket should have failed')
803 t
= threading
.Thread(target
=listener
)
811 """Basic test of an SSL client connecting to a server"""
812 if test_support
.verbose
:
813 sys
.stdout
.write("\n")
814 server_params_test(CERTFILE
, ssl
.PROTOCOL_TLSv1
, ssl
.CERT_NONE
,
815 CERTFILE
, CERTFILE
, ssl
.PROTOCOL_TLSv1
,
816 chatty
=True, connectionchatty
=True)
818 def test_getpeercert(self
):
819 if test_support
.verbose
:
820 sys
.stdout
.write("\n")
822 server
= ThreadedEchoServer(CERTFILE
,
823 certreqs
=ssl
.CERT_NONE
,
824 ssl_version
=ssl
.PROTOCOL_SSLv23
,
827 flag
= threading
.Event()
829 # wait for it to start
833 s
= ssl
.wrap_socket(socket
.socket(),
836 cert_reqs
=ssl
.CERT_REQUIRED
,
837 ssl_version
=ssl
.PROTOCOL_SSLv23
)
838 s
.connect((HOST
, server
.port
))
839 cert
= s
.getpeercert()
840 self
.assertTrue(cert
, "Can't get peer certificate.")
842 if test_support
.verbose
:
843 sys
.stdout
.write(pprint
.pformat(cert
) + '\n')
844 sys
.stdout
.write("Connection cipher is " + str(cipher
) + '.\n')
845 if 'subject' not in cert
:
846 self
.fail("No subject field in certificate: %s." %
847 pprint
.pformat(cert
))
848 if ((('organizationName', 'Python Software Foundation'),)
849 not in cert
['subject']):
851 "Missing or invalid 'organizationName' field in certificate subject; "
852 "should be 'Python Software Foundation'.")
858 def test_empty_cert(self
):
859 """Connecting with an empty cert file"""
860 bad_cert_test(os
.path
.join(os
.path
.dirname(__file__
) or os
.curdir
,
862 def test_malformed_cert(self
):
863 """Connecting with a badly formatted certificate (syntax error)"""
864 bad_cert_test(os
.path
.join(os
.path
.dirname(__file__
) or os
.curdir
,
866 def test_nonexisting_cert(self
):
867 """Connecting with a non-existing cert file"""
868 bad_cert_test(os
.path
.join(os
.path
.dirname(__file__
) or os
.curdir
,
870 def test_malformed_key(self
):
871 """Connecting with a badly formatted key (syntax error)"""
872 bad_cert_test(os
.path
.join(os
.path
.dirname(__file__
) or os
.curdir
,
875 def test_protocol_sslv2(self
):
876 """Connecting to an SSLv2 server with various client options"""
877 if test_support
.verbose
:
878 sys
.stdout
.write("\n")
879 try_protocol_combo(ssl
.PROTOCOL_SSLv2
, ssl
.PROTOCOL_SSLv2
, True)
880 try_protocol_combo(ssl
.PROTOCOL_SSLv2
, ssl
.PROTOCOL_SSLv2
, True, ssl
.CERT_OPTIONAL
)
881 try_protocol_combo(ssl
.PROTOCOL_SSLv2
, ssl
.PROTOCOL_SSLv2
, True, ssl
.CERT_REQUIRED
)
882 try_protocol_combo(ssl
.PROTOCOL_SSLv2
, ssl
.PROTOCOL_SSLv23
, True)
883 try_protocol_combo(ssl
.PROTOCOL_SSLv2
, ssl
.PROTOCOL_SSLv3
, False)
884 try_protocol_combo(ssl
.PROTOCOL_SSLv2
, ssl
.PROTOCOL_TLSv1
, False)
886 def test_protocol_sslv23(self
):
887 """Connecting to an SSLv23 server with various client options"""
888 if test_support
.verbose
:
889 sys
.stdout
.write("\n")
891 try_protocol_combo(ssl
.PROTOCOL_SSLv23
, ssl
.PROTOCOL_SSLv2
, True)
892 except (ssl
.SSLError
, socket
.error
), x
:
893 # this fails on some older versions of OpenSSL (0.9.7l, for instance)
894 if test_support
.verbose
:
896 " SSL2 client to SSL23 server test unexpectedly failed:\n %s\n"
898 try_protocol_combo(ssl
.PROTOCOL_SSLv23
, ssl
.PROTOCOL_SSLv3
, True)
899 try_protocol_combo(ssl
.PROTOCOL_SSLv23
, ssl
.PROTOCOL_SSLv23
, True)
900 try_protocol_combo(ssl
.PROTOCOL_SSLv23
, ssl
.PROTOCOL_TLSv1
, True)
902 try_protocol_combo(ssl
.PROTOCOL_SSLv23
, ssl
.PROTOCOL_SSLv3
, True, ssl
.CERT_OPTIONAL
)
903 try_protocol_combo(ssl
.PROTOCOL_SSLv23
, ssl
.PROTOCOL_SSLv23
, True, ssl
.CERT_OPTIONAL
)
904 try_protocol_combo(ssl
.PROTOCOL_SSLv23
, ssl
.PROTOCOL_TLSv1
, True, ssl
.CERT_OPTIONAL
)
906 try_protocol_combo(ssl
.PROTOCOL_SSLv23
, ssl
.PROTOCOL_SSLv3
, True, ssl
.CERT_REQUIRED
)
907 try_protocol_combo(ssl
.PROTOCOL_SSLv23
, ssl
.PROTOCOL_SSLv23
, True, ssl
.CERT_REQUIRED
)
908 try_protocol_combo(ssl
.PROTOCOL_SSLv23
, ssl
.PROTOCOL_TLSv1
, True, ssl
.CERT_REQUIRED
)
910 def test_protocol_sslv3(self
):
911 """Connecting to an SSLv3 server with various client options"""
912 if test_support
.verbose
:
913 sys
.stdout
.write("\n")
914 try_protocol_combo(ssl
.PROTOCOL_SSLv3
, ssl
.PROTOCOL_SSLv3
, True)
915 try_protocol_combo(ssl
.PROTOCOL_SSLv3
, ssl
.PROTOCOL_SSLv3
, True, ssl
.CERT_OPTIONAL
)
916 try_protocol_combo(ssl
.PROTOCOL_SSLv3
, ssl
.PROTOCOL_SSLv3
, True, ssl
.CERT_REQUIRED
)
917 try_protocol_combo(ssl
.PROTOCOL_SSLv3
, ssl
.PROTOCOL_SSLv2
, False)
918 try_protocol_combo(ssl
.PROTOCOL_SSLv3
, ssl
.PROTOCOL_SSLv23
, False)
919 try_protocol_combo(ssl
.PROTOCOL_SSLv3
, ssl
.PROTOCOL_TLSv1
, False)
921 def test_protocol_tlsv1(self
):
922 """Connecting to a TLSv1 server with various client options"""
923 if test_support
.verbose
:
924 sys
.stdout
.write("\n")
925 try_protocol_combo(ssl
.PROTOCOL_TLSv1
, ssl
.PROTOCOL_TLSv1
, True)
926 try_protocol_combo(ssl
.PROTOCOL_TLSv1
, ssl
.PROTOCOL_TLSv1
, True, ssl
.CERT_OPTIONAL
)
927 try_protocol_combo(ssl
.PROTOCOL_TLSv1
, ssl
.PROTOCOL_TLSv1
, True, ssl
.CERT_REQUIRED
)
928 try_protocol_combo(ssl
.PROTOCOL_TLSv1
, ssl
.PROTOCOL_SSLv2
, False)
929 try_protocol_combo(ssl
.PROTOCOL_TLSv1
, ssl
.PROTOCOL_SSLv3
, False)
930 try_protocol_combo(ssl
.PROTOCOL_TLSv1
, ssl
.PROTOCOL_SSLv23
, False)
932 def test_starttls(self
):
933 """Switching from clear text to encrypted and back again."""
934 msgs
= ("msg 1", "MSG 2", "STARTTLS", "MSG 3", "msg 4", "ENDTLS", "msg 5", "msg 6")
936 server
= ThreadedEchoServer(CERTFILE
,
937 ssl_version
=ssl
.PROTOCOL_TLSv1
,
938 starttls_server
=True,
940 connectionchatty
=True)
941 flag
= threading
.Event()
943 # wait for it to start
950 s
.connect((HOST
, server
.port
))
951 if test_support
.verbose
:
952 sys
.stdout
.write("\n")
954 if test_support
.verbose
:
956 " client: sending %s...\n" % repr(indata
))
959 outdata
= conn
.read()
962 outdata
= s
.recv(1024)
963 if (indata
== "STARTTLS" and
964 outdata
.strip().lower().startswith("ok")):
965 # STARTTLS ok, switch to secure mode
966 if test_support
.verbose
:
968 " client: read %s from server, starting TLS...\n"
970 conn
= ssl
.wrap_socket(s
, ssl_version
=ssl
.PROTOCOL_TLSv1
)
972 elif (indata
== "ENDTLS" and
973 outdata
.strip().lower().startswith("ok")):
974 # ENDTLS ok, switch back to clear text
975 if test_support
.verbose
:
977 " client: read %s from server, ending TLS...\n"
982 if test_support
.verbose
:
984 " client: read %s from server\n" % repr(outdata
))
985 if test_support
.verbose
:
986 sys
.stdout
.write(" client: closing connection.\n")
996 def test_socketserver(self
):
997 """Using a SocketServer to create and manage SSL connections."""
998 server
= SocketServerHTTPSServer(CERTFILE
)
999 flag
= threading
.Event()
1001 # wait for it to start
1005 if test_support
.verbose
:
1006 sys
.stdout
.write('\n')
1007 with
open(CERTFILE
, 'rb') as f
:
1010 # now fetch the same data from the HTTPS server
1011 url
= 'https://127.0.0.1:%d/%s' % (
1012 server
.port
, os
.path
.split(CERTFILE
)[1])
1013 with test_support
.check_py3k_warnings():
1014 f
= urllib
.urlopen(url
)
1015 dlen
= f
.info().getheader("content-length")
1016 if dlen
and (int(dlen
) > 0):
1017 d2
= f
.read(int(dlen
))
1018 if test_support
.verbose
:
1020 " client: read %d bytes from remote server '%s'\n"
1021 % (len(d2
), server
))
1023 self
.assertEqual(d1
, d2
)
1028 def test_wrapped_accept(self
):
1029 """Check the accept() method on SSL sockets."""
1030 if test_support
.verbose
:
1031 sys
.stdout
.write("\n")
1032 server_params_test(CERTFILE
, ssl
.PROTOCOL_SSLv23
, ssl
.CERT_REQUIRED
,
1033 CERTFILE
, CERTFILE
, ssl
.PROTOCOL_SSLv23
,
1034 chatty
=True, connectionchatty
=True,
1035 wrap_accepting_socket
=True)
1037 def test_asyncore_server(self
):
1038 """Check the example asyncore integration."""
1039 indata
= "TEST MESSAGE of mixed case\n"
1041 if test_support
.verbose
:
1042 sys
.stdout
.write("\n")
1043 server
= AsyncoreEchoServer(CERTFILE
)
1044 flag
= threading
.Event()
1046 # wait for it to start
1050 s
= ssl
.wrap_socket(socket
.socket())
1051 s
.connect(('127.0.0.1', server
.port
))
1052 if test_support
.verbose
:
1054 " client: sending %s...\n" % (repr(indata
)))
1057 if test_support
.verbose
:
1058 sys
.stdout
.write(" client: read %s\n" % repr(outdata
))
1059 if outdata
!= indata
.lower():
1061 "bad data <<%s>> (%d) received; expected <<%s>> (%d)\n"
1062 % (outdata
[:min(len(outdata
),20)], len(outdata
),
1063 indata
[:min(len(indata
),20)].lower(), len(indata
)))
1065 if test_support
.verbose
:
1066 sys
.stdout
.write(" client: closing connection.\n")
1070 # wait for server thread to end
1073 def test_recv_send(self
):
1074 """Test recv(), send() and friends."""
1075 if test_support
.verbose
:
1076 sys
.stdout
.write("\n")
1078 server
= ThreadedEchoServer(CERTFILE
,
1079 certreqs
=ssl
.CERT_NONE
,
1080 ssl_version
=ssl
.PROTOCOL_TLSv1
,
1083 connectionchatty
=False)
1084 flag
= threading
.Event()
1086 # wait for it to start
1089 s
= ssl
.wrap_socket(socket
.socket(),
1093 cert_reqs
=ssl
.CERT_NONE
,
1094 ssl_version
=ssl
.PROTOCOL_TLSv1
)
1095 s
.connect((HOST
, server
.port
))
1097 # helper methods for standardising recv* method signatures
1099 b
= bytearray("\0"*100)
1100 count
= s
.recv_into(b
)
1103 def _recvfrom_into():
1104 b
= bytearray("\0"*100)
1105 count
, addr
= s
.recvfrom_into(b
)
1108 # (name, method, whether to expect success, *args)
1110 ('send', s
.send
, True, []),
1111 ('sendto', s
.sendto
, False, ["some.address"]),
1112 ('sendall', s
.sendall
, True, []),
1115 ('recv', s
.recv
, True, []),
1116 ('recvfrom', s
.recvfrom
, False, ["some.address"]),
1117 ('recv_into', _recv_into
, True, []),
1118 ('recvfrom_into', _recvfrom_into
, False, []),
1120 data_prefix
= u
"PREFIX_"
1122 for meth_name
, send_meth
, expect_success
, args
in send_methods
:
1123 indata
= data_prefix
+ meth_name
1125 send_meth(indata
.encode('ASCII', 'strict'), *args
)
1127 outdata
= outdata
.decode('ASCII', 'strict')
1128 if outdata
!= indata
.lower():
1130 "While sending with <<%s>> bad data "
1131 "<<%r>> (%d) received; "
1132 "expected <<%r>> (%d)\n" % (
1133 meth_name
, outdata
[:20], len(outdata
),
1134 indata
[:20], len(indata
)
1137 except ValueError as e
:
1140 "Failed to send with method <<%s>>; "
1141 "expected to succeed.\n" % (meth_name
,)
1143 if not str(e
).startswith(meth_name
):
1145 "Method <<%s>> failed with unexpected "
1146 "exception message: %s\n" % (
1151 for meth_name
, recv_meth
, expect_success
, args
in recv_methods
:
1152 indata
= data_prefix
+ meth_name
1154 s
.send(indata
.encode('ASCII', 'strict'))
1155 outdata
= recv_meth(*args
)
1156 outdata
= outdata
.decode('ASCII', 'strict')
1157 if outdata
!= indata
.lower():
1159 "While receiving with <<%s>> bad data "
1160 "<<%r>> (%d) received; "
1161 "expected <<%r>> (%d)\n" % (
1162 meth_name
, outdata
[:20], len(outdata
),
1163 indata
[:20], len(indata
)
1166 except ValueError as e
:
1169 "Failed to receive with method <<%s>>; "
1170 "expected to succeed.\n" % (meth_name
,)
1172 if not str(e
).startswith(meth_name
):
1174 "Method <<%s>> failed with unexpected "
1175 "exception message: %s\n" % (
1182 s
.write("over\n".encode("ASCII", "strict"))
1188 def test_handshake_timeout(self
):
1189 # Issue #5103: SSL handshake must respect the socket timeout
1190 server
= socket
.socket(socket
.AF_INET
)
1192 port
= test_support
.bind_port(server
)
1193 started
= threading
.Event()
1201 r
, w
, e
= select
.select([server
], [], [], 0.1)
1203 # Let the socket hang around rather than having
1204 # it closed by garbage collection.
1205 conns
.append(server
.accept()[0])
1207 t
= threading
.Thread(target
=serve
)
1213 c
= socket
.socket(socket
.AF_INET
)
1215 c
.connect((host
, port
))
1216 # Will attempt handshake and time out
1217 self
.assertRaisesRegexp(ssl
.SSLError
, "timed out",
1222 c
= socket
.socket(socket
.AF_INET
)
1224 c
= ssl
.wrap_socket(c
)
1225 # Will attempt handshake and time out
1226 self
.assertRaisesRegexp(ssl
.SSLError
, "timed out",
1227 c
.connect
, (host
, port
))
1236 def test_main(verbose
=False):
1238 raise unittest
.SkipTest("No SSL support")
1240 global CERTFILE
, SVN_PYTHON_ORG_ROOT_CERT
1241 CERTFILE
= os
.path
.join(os
.path
.dirname(__file__
) or os
.curdir
,
1243 SVN_PYTHON_ORG_ROOT_CERT
= os
.path
.join(
1244 os
.path
.dirname(__file__
) or os
.curdir
,
1245 "https_svn_python_org_root.pem")
1247 if (not os
.path
.exists(CERTFILE
) or
1248 not os
.path
.exists(SVN_PYTHON_ORG_ROOT_CERT
)):
1249 raise test_support
.TestFailed("Can't read certificate files!")
1251 tests
= [BasicTests
]
1253 if test_support
.is_resource_enabled('network'):
1254 tests
.append(NetworkedTests
)
1257 thread_info
= test_support
.threading_setup()
1258 if thread_info
and test_support
.is_resource_enabled('network'):
1259 tests
.append(ThreadedTests
)
1262 test_support
.run_unittest(*tests
)
1265 test_support
.threading_cleanup(*thread_info
)
1267 if __name__
== "__main__":