1 # Test the support for SSL and sockets
5 from test
import test_support
14 import urllib
, urlparse
18 from BaseHTTPServer
import HTTPServer
19 from SimpleHTTPServer
import SimpleHTTPRequestHandler
21 # Optionally test SSL support, if we have it in the tested platform
28 HOST
= test_support
.HOST
30 SVN_PYTHON_ORG_ROOT_CERT
= None
32 def handle_error(prefix
):
33 exc_format
= ' '.join(traceback
.format_exception(*sys
.exc_info()))
34 if test_support
.verbose
:
35 sys
.stdout
.write(prefix
+ exc_format
)
37 def testSimpleSSLwrap(self
):
39 ssl
.sslwrap_simple(socket
.socket(socket
.AF_INET
))
41 if e
.errno
== 32: # broken pipe when ssl_sock.do_handshake(), this test doesn't care about that
46 ssl
.sslwrap_simple(socket
.socket(socket
.AF_INET
)._sock
)
48 if e
.errno
== 32: # broken pipe when ssl_sock.do_handshake(), this test doesn't care about that
53 class BasicTests(unittest
.TestCase
):
55 def testSSLconnect(self
):
56 if not test_support
.is_resource_enabled('network'):
58 s
= ssl
.wrap_socket(socket
.socket(socket
.AF_INET
),
59 cert_reqs
=ssl
.CERT_NONE
)
60 s
.connect(("svn.python.org", 443))
63 raise test_support
.TestFailed("Peer cert %s shouldn't be here!")
66 # this should fail because we have no verification certs
67 s
= ssl
.wrap_socket(socket
.socket(socket
.AF_INET
),
68 cert_reqs
=ssl
.CERT_REQUIRED
)
70 s
.connect(("svn.python.org", 443))
76 def testCrucialConstants(self
):
87 if test_support
.verbose
:
88 sys
.stdout
.write("\n RAND_status is %d (%s)\n"
89 % (v
, (v
and "sufficient randomness") or
90 "insufficient randomness"))
96 print "didn't raise TypeError"
97 ssl
.RAND_add("this is a random string", 75.0)
99 def testParseCert(self
):
100 # note that this uses an 'unofficial' function in _ssl.c,
101 # provided solely for this test, to exercise the certificate
103 p
= ssl
._ssl
._test
_decode
_cert
(CERTFILE
, False)
104 if test_support
.verbose
:
105 sys
.stdout
.write("\n" + pprint
.pformat(p
) + "\n")
107 def testDERtoPEM(self
):
109 pem
= open(SVN_PYTHON_ORG_ROOT_CERT
, 'r').read()
110 d1
= ssl
.PEM_cert_to_DER_cert(pem
)
111 p2
= ssl
.DER_cert_to_PEM_cert(d1
)
112 d2
= ssl
.PEM_cert_to_DER_cert(p2
)
114 raise test_support
.TestFailed("PEM-to-DER or DER-to-PEM translation failed")
116 class NetworkedTests(unittest
.TestCase
):
118 def testConnect(self
):
119 s
= ssl
.wrap_socket(socket
.socket(socket
.AF_INET
),
120 cert_reqs
=ssl
.CERT_NONE
)
121 s
.connect(("svn.python.org", 443))
124 raise test_support
.TestFailed("Peer cert %s shouldn't be here!")
127 # this should fail because we have no verification certs
128 s
= ssl
.wrap_socket(socket
.socket(socket
.AF_INET
),
129 cert_reqs
=ssl
.CERT_REQUIRED
)
131 s
.connect(("svn.python.org", 443))
137 # this should succeed because we specify the root cert
138 s
= ssl
.wrap_socket(socket
.socket(socket
.AF_INET
),
139 cert_reqs
=ssl
.CERT_REQUIRED
,
140 ca_certs
=SVN_PYTHON_ORG_ROOT_CERT
)
142 s
.connect(("svn.python.org", 443))
143 except ssl
.SSLError
, x
:
144 raise test_support
.TestFailed("Unexpected exception %s" % x
)
149 def testNonBlockingHandshake(self
):
150 s
= socket
.socket(socket
.AF_INET
)
151 s
.connect(("svn.python.org", 443))
153 s
= ssl
.wrap_socket(s
,
154 cert_reqs
=ssl
.CERT_NONE
,
155 do_handshake_on_connect
=False)
162 except ssl
.SSLError
, err
:
163 if err
.args
[0] == ssl
.SSL_ERROR_WANT_READ
:
164 select
.select([s
], [], [])
165 elif err
.args
[0] == ssl
.SSL_ERROR_WANT_WRITE
:
166 select
.select([], [s
], [])
170 if test_support
.verbose
:
171 sys
.stdout
.write("\nNeeded %d calls to do_handshake() to establish session.\n" % count
)
173 def testFetchServerCert(self
):
175 pem
= ssl
.get_server_certificate(("svn.python.org", 443))
177 raise test_support
.TestFailed("No server certificate on svn.python.org:443!")
180 pem
= ssl
.get_server_certificate(("svn.python.org", 443), ca_certs
=CERTFILE
)
185 raise test_support
.TestFailed("Got server certificate %s for svn.python.org!" % pem
)
187 pem
= ssl
.get_server_certificate(("svn.python.org", 443), ca_certs
=SVN_PYTHON_ORG_ROOT_CERT
)
189 raise test_support
.TestFailed("No server certificate on svn.python.org:443!")
190 if test_support
.verbose
:
191 sys
.stdout
.write("\nVerified certificate for svn.python.org:443 is\n%s\n" % pem
)
197 _have_threads
= False
202 class ThreadedEchoServer(threading
.Thread
):
204 class ConnectionHandler(threading
.Thread
):
206 """A mildly complicated class, because we want it to work both
207 with and without the SSL wrapper around the socket connection, so
208 that we can test the STARTTLS functionality."""
210 def __init__(self
, server
, connsock
):
214 self
.sock
.setblocking(1)
216 threading
.Thread
.__init
__(self
)
219 def show_conn_details(self
):
220 if self
.server
.certreqs
== ssl
.CERT_REQUIRED
:
221 cert
= self
.sslconn
.getpeercert()
222 if test_support
.verbose
and self
.server
.chatty
:
223 sys
.stdout
.write(" client cert is " + pprint
.pformat(cert
) + "\n")
224 cert_binary
= self
.sslconn
.getpeercert(True)
225 if test_support
.verbose
and self
.server
.chatty
:
226 sys
.stdout
.write(" cert binary is " + str(len(cert_binary
)) + " bytes\n")
227 cipher
= self
.sslconn
.cipher()
228 if test_support
.verbose
and self
.server
.chatty
:
229 sys
.stdout
.write(" server: connection cipher is now " + str(cipher
) + "\n")
231 def wrap_conn (self
):
233 self
.sslconn
= ssl
.wrap_socket(self
.sock
, server_side
=True,
234 certfile
=self
.server
.certificate
,
235 ssl_version
=self
.server
.protocol
,
236 ca_certs
=self
.server
.cacerts
,
237 cert_reqs
=self
.server
.certreqs
)
239 if self
.server
.chatty
:
240 handle_error("\n server: bad connection attempt from " +
241 str(self
.sock
.getpeername()) + ":\n")
243 if not self
.server
.expect_bad_connects
:
244 # here, we want to stop the server, because this shouldn't
245 # happen in the context of our test case
247 # normally, we'd just stop here, but for the test
248 # harness, we want to stop the server
257 return self
.sslconn
.read()
259 return self
.sock
.recv(1024)
261 def write(self
, bytes
):
263 return self
.sslconn
.write(bytes
)
265 return self
.sock
.send(bytes
)
271 self
.sock
._sock
.close()
275 if not self
.server
.starttls_server
:
276 if isinstance(self
.sock
, ssl
.SSLSocket
):
277 self
.sslconn
= self
.sock
278 elif not self
.wrap_conn():
280 self
.show_conn_details()
285 # eof, so quit this handler
288 elif msg
.strip() == 'over':
289 if test_support
.verbose
and self
.server
.connectionchatty
:
290 sys
.stdout
.write(" server: client closed connection\n")
293 elif self
.server
.starttls_server
and msg
.strip() == 'STARTTLS':
294 if test_support
.verbose
and self
.server
.connectionchatty
:
295 sys
.stdout
.write(" server: read STARTTLS from client, sending OK...\n")
297 if not self
.wrap_conn():
299 elif self
.server
.starttls_server
and self
.sslconn
and msg
.strip() == 'ENDTLS':
300 if test_support
.verbose
and self
.server
.connectionchatty
:
301 sys
.stdout
.write(" server: read ENDTLS from client, sending OK...\n")
303 self
.sslconn
.unwrap()
305 if test_support
.verbose
and self
.server
.connectionchatty
:
306 sys
.stdout
.write(" server: connection is now unencrypted...\n")
308 if (test_support
.verbose
and
309 self
.server
.connectionchatty
):
310 ctype
= (self
.sslconn
and "encrypted") or "unencrypted"
311 sys
.stdout
.write(" server: read %s (%s), sending back %s (%s)...\n"
312 % (repr(msg
), ctype
, repr(msg
.lower()), ctype
))
313 self
.write(msg
.lower())
315 if self
.server
.chatty
:
316 handle_error("Test server failure:\n")
319 # normally, we'd just stop here, but for the test
320 # harness, we want to stop the server
325 def __init__(self
, certificate
, ssl_version
=None,
326 certreqs
=None, cacerts
=None, expect_bad_connects
=False,
327 chatty
=True, connectionchatty
=False, starttls_server
=False,
328 wrap_accepting_socket
=False):
330 if ssl_version
is None:
331 ssl_version
= ssl
.PROTOCOL_TLSv1
333 certreqs
= ssl
.CERT_NONE
334 self
.certificate
= certificate
335 self
.protocol
= ssl_version
336 self
.certreqs
= certreqs
337 self
.cacerts
= cacerts
338 self
.expect_bad_connects
= expect_bad_connects
340 self
.connectionchatty
= connectionchatty
341 self
.starttls_server
= starttls_server
342 self
.sock
= socket
.socket()
344 if wrap_accepting_socket
:
345 self
.sock
= ssl
.wrap_socket(self
.sock
, server_side
=True,
346 certfile
=self
.certificate
,
347 cert_reqs
= self
.certreqs
,
348 ca_certs
= self
.cacerts
,
349 ssl_version
= self
.protocol
)
350 if test_support
.verbose
and self
.chatty
:
351 sys
.stdout
.write(' server: wrapped server socket as %s\n' % str(self
.sock
))
352 self
.port
= test_support
.bind_port(self
.sock
)
354 threading
.Thread
.__init
__(self
)
357 def start (self
, flag
=None):
359 threading
.Thread
.start(self
)
362 self
.sock
.settimeout(0.5)
370 newconn
, connaddr
= self
.sock
.accept()
371 if test_support
.verbose
and self
.chatty
:
372 sys
.stdout
.write(' server: new connection from '
373 + str(connaddr
) + '\n')
374 handler
= self
.ConnectionHandler(self
, newconn
)
376 except socket
.timeout
:
378 except KeyboardInterrupt:
382 handle_error("Test server failure:\n")
388 class AsyncoreEchoServer(threading
.Thread
):
390 class EchoServer (asyncore
.dispatcher
):
392 class ConnectionHandler (asyncore
.dispatcher_with_send
):
394 def __init__(self
, conn
, certfile
):
395 asyncore
.dispatcher_with_send
.__init
__(self
, conn
)
396 self
.socket
= ssl
.wrap_socket(conn
, server_side
=True,
398 do_handshake_on_connect
=True)
401 if isinstance(self
.socket
, ssl
.SSLSocket
):
402 while self
.socket
.pending() > 0:
403 self
.handle_read_event()
406 def handle_read(self
):
407 data
= self
.recv(1024)
408 self
.send(data
.lower())
410 def handle_close(self
):
412 if test_support
.verbose
:
413 sys
.stdout
.write(" server: closed connection %s\n" % self
.socket
)
415 def handle_error(self
):
418 def __init__(self
, certfile
):
419 self
.certfile
= certfile
420 asyncore
.dispatcher
.__init
__(self
)
421 self
.create_socket(socket
.AF_INET
, socket
.SOCK_STREAM
)
422 self
.port
= test_support
.bind_port(self
.socket
)
425 def handle_accept(self
):
426 sock_obj
, addr
= self
.accept()
427 if test_support
.verbose
:
428 sys
.stdout
.write(" server: new connection from %s:%s\n" %addr
)
429 self
.ConnectionHandler(sock_obj
, self
.certfile
)
431 def handle_error(self
):
434 def __init__(self
, certfile
):
437 self
.server
= self
.EchoServer(certfile
)
438 self
.port
= self
.server
.port
439 threading
.Thread
.__init
__(self
)
443 return "<%s %s>" % (self
.__class
__.__name
__, self
.server
)
445 def start (self
, flag
=None):
447 threading
.Thread
.start(self
)
463 class SocketServerHTTPSServer(threading
.Thread
):
465 class HTTPSServer(HTTPServer
):
467 def __init__(self
, server_address
, RequestHandlerClass
, certfile
):
469 HTTPServer
.__init
__(self
, server_address
, RequestHandlerClass
)
470 # we assume the certfile contains both private key and certificate
471 self
.certfile
= certfile
473 self
.active_lock
= threading
.Lock()
474 self
.allow_reuse_address
= True
477 return ('<%s %s:%s>' %
478 (self
.__class
__.__name
__,
482 def get_request (self
):
483 # override this to wrap socket with SSL
484 sock
, addr
= self
.socket
.accept()
485 sslconn
= ssl
.wrap_socket(sock
, server_side
=True,
486 certfile
=self
.certfile
)
489 # The methods overridden below this are mainly so that we
490 # can run it in a thread and be able to stop it from another
491 # You probably wouldn't need them in other uses.
493 def server_activate(self
):
494 # We want to run this in a thread for testing purposes,
495 # so we override this to set timeout, so that we get
496 # a chance to stop the server
497 self
.socket
.settimeout(0.5)
498 HTTPServer
.server_activate(self
)
500 def serve_forever(self
):
501 # We want this to run in a thread, so we use a slightly
502 # modified version of "forever".
506 # We need to lock while handling the request.
507 # Another thread can close the socket after self.active
508 # has been checked and before the request is handled.
509 # This causes an exception when using the closed socket.
510 with self
.active_lock
:
513 self
.handle_request()
514 except socket
.timeout
:
516 except KeyboardInterrupt:
520 sys
.stdout
.write(''.join(traceback
.format_exception(*sys
.exc_info())))
524 def server_close(self
):
525 # Again, we want this to run in a thread, so we need to override
526 # close to clear the "active" flag, so that serve_forever() will
528 with self
.active_lock
:
529 HTTPServer
.server_close(self
)
532 class RootedHTTPRequestHandler(SimpleHTTPRequestHandler
):
534 # need to override translate_path to get a known root,
535 # instead of using os.curdir, since the test could be
538 server_version
= "TestHTTPS/1.0"
542 def translate_path(self
, path
):
543 """Translate a /-separated PATH to the local filename syntax.
545 Components that mean special things to the local file system
546 (e.g. drive or directory names) are ignored. (XXX They should
547 probably be diagnosed.)
550 # abandon query parameters
551 path
= urlparse
.urlparse(path
)[2]
552 path
= os
.path
.normpath(urllib
.unquote(path
))
553 words
= path
.split('/')
554 words
= filter(None, words
)
557 drive
, word
= os
.path
.splitdrive(word
)
558 head
, word
= os
.path
.split(word
)
559 if word
in self
.root
: continue
560 path
= os
.path
.join(path
, word
)
563 def log_message(self
, format
, *args
):
565 # we override this to suppress logging unless "verbose"
567 if test_support
.verbose
:
568 sys
.stdout
.write(" server (%s:%d %s):\n [%s] %s\n" %
569 (self
.server
.server_address
,
570 self
.server
.server_port
,
571 self
.request
.cipher(),
572 self
.log_date_time_string(),
576 def __init__(self
, certfile
):
579 self
.RootedHTTPRequestHandler
.root
= os
.path
.split(CERTFILE
)[0]
580 self
.port
= test_support
.find_unused_port()
581 self
.server
= self
.HTTPSServer(
582 (HOST
, self
.port
), self
.RootedHTTPRequestHandler
, certfile
)
583 threading
.Thread
.__init
__(self
)
587 return "<%s %s>" % (self
.__class
__.__name
__, self
.server
)
589 def start (self
, flag
=None):
591 threading
.Thread
.start(self
)
597 self
.server
.serve_forever()
602 self
.server
.server_close()
605 def badCertTest (certfile
):
606 server
= ThreadedEchoServer(CERTFILE
,
607 certreqs
=ssl
.CERT_REQUIRED
,
608 cacerts
=CERTFILE
, chatty
=False)
609 flag
= threading
.Event()
611 # wait for it to start
616 s
= ssl
.wrap_socket(socket
.socket(),
618 ssl_version
=ssl
.PROTOCOL_TLSv1
)
619 s
.connect((HOST
, server
.port
))
620 except ssl
.SSLError
, x
:
621 if test_support
.verbose
:
622 sys
.stdout
.write("\nSSLError is %s\n" % x
[1])
623 except socket
.error
, x
:
624 if test_support
.verbose
:
625 sys
.stdout
.write("\nsocket.error is %s\n" % x
[1])
627 raise test_support
.TestFailed(
628 "Use of invalid cert should have failed!")
633 def serverParamsTest (certfile
, protocol
, certreqs
, cacertsfile
,
634 client_certfile
, client_protocol
=None, indata
="FOO\n",
635 chatty
=True, connectionchatty
=False,
636 wrap_accepting_socket
=False):
638 server
= ThreadedEchoServer(certfile
,
640 ssl_version
=protocol
,
643 connectionchatty
=connectionchatty
,
644 wrap_accepting_socket
=wrap_accepting_socket
)
645 flag
= threading
.Event()
647 # wait for it to start
650 if client_protocol
is None:
651 client_protocol
= protocol
654 s
= ssl
.wrap_socket(socket
.socket(),
655 certfile
=client_certfile
,
656 ca_certs
=cacertsfile
,
658 ssl_version
=client_protocol
)
659 s
.connect((HOST
, server
.port
))
660 except ssl
.SSLError
, x
:
661 raise test_support
.TestFailed("Unexpected SSL error: " + str(x
))
663 raise test_support
.TestFailed("Unexpected exception: " + str(x
))
665 for arg
in [indata
, bytearray(indata
), memoryview(indata
)]:
667 if test_support
.verbose
:
669 " client: sending %s...\n" % (repr(arg
)))
673 if test_support
.verbose
:
674 sys
.stdout
.write(" client: read %s\n" % repr(outdata
))
675 if outdata
!= indata
.lower():
676 raise test_support
.TestFailed(
677 "bad data <<%s>> (%d) received; expected <<%s>> (%d)\n"
678 % (outdata
[:min(len(outdata
),20)], len(outdata
),
679 indata
[:min(len(indata
),20)].lower(), len(indata
)))
682 if test_support
.verbose
:
683 sys
.stdout
.write(" client: closing connection.\n")
689 def tryProtocolCombo (server_protocol
,
694 if certsreqs
is None:
695 certsreqs
= ssl
.CERT_NONE
697 if certsreqs
== ssl
.CERT_NONE
:
698 certtype
= "CERT_NONE"
699 elif certsreqs
== ssl
.CERT_OPTIONAL
:
700 certtype
= "CERT_OPTIONAL"
701 elif certsreqs
== ssl
.CERT_REQUIRED
:
702 certtype
= "CERT_REQUIRED"
703 if test_support
.verbose
:
704 formatstr
= (expectedToWork
and " %s->%s %s\n") or " {%s->%s} %s\n"
705 sys
.stdout
.write(formatstr
%
706 (ssl
.get_protocol_name(client_protocol
),
707 ssl
.get_protocol_name(server_protocol
),
710 serverParamsTest(CERTFILE
, server_protocol
, certsreqs
,
711 CERTFILE
, CERTFILE
, client_protocol
, chatty
=False)
712 except test_support
.TestFailed
:
716 if not expectedToWork
:
717 raise test_support
.TestFailed(
718 "Client protocol %s succeeded with server protocol %s!"
719 % (ssl
.get_protocol_name(client_protocol
),
720 ssl
.get_protocol_name(server_protocol
)))
723 class ThreadedTests(unittest
.TestCase
):
725 def testRudeShutdown(self
):
727 listener_ready
= threading
.Event()
728 listener_gone
= threading
.Event()
729 port
= test_support
.find_unused_port()
731 # `listener` runs in a thread. It opens a socket listening on
732 # PORT, and sits in an accept() until the main thread connects.
733 # Then it rudely closes the socket, and sets Event `listener_gone`
734 # to let the main thread know the socket is gone.
741 s
= None # reclaim the socket object, which also closes it
745 listener_ready
.wait()
747 s
.connect((HOST
, port
))
750 ssl_sock
= ssl
.wrap_socket(s
)
754 raise test_support
.TestFailed(
755 'connecting to closed SSL socket should have failed')
757 t
= threading
.Thread(target
=listener
)
764 if test_support
.verbose
:
765 sys
.stdout
.write("\n")
766 serverParamsTest(CERTFILE
, ssl
.PROTOCOL_TLSv1
, ssl
.CERT_NONE
,
767 CERTFILE
, CERTFILE
, ssl
.PROTOCOL_TLSv1
,
768 chatty
=True, connectionchatty
=True)
770 def testReadCert(self
):
772 if test_support
.verbose
:
773 sys
.stdout
.write("\n")
775 server
= ThreadedEchoServer(CERTFILE
,
776 certreqs
=ssl
.CERT_NONE
,
777 ssl_version
=ssl
.PROTOCOL_SSLv23
,
780 flag
= threading
.Event()
782 # wait for it to start
787 s
= ssl
.wrap_socket(socket
.socket(),
790 cert_reqs
=ssl
.CERT_REQUIRED
,
791 ssl_version
=ssl
.PROTOCOL_SSLv23
)
792 s
.connect((HOST
, server
.port
))
793 except ssl
.SSLError
, x
:
794 raise test_support
.TestFailed(
795 "Unexpected SSL error: " + str(x
))
797 raise test_support
.TestFailed(
798 "Unexpected exception: " + str(x
))
801 raise test_support
.TestFailed(
802 "Can't SSL-handshake with test server")
803 cert
= s
.getpeercert()
805 raise test_support
.TestFailed(
806 "Can't get peer certificate.")
808 if test_support
.verbose
:
809 sys
.stdout
.write(pprint
.pformat(cert
) + '\n')
810 sys
.stdout
.write("Connection cipher is " + str(cipher
) + '.\n')
811 if not cert
.has_key('subject'):
812 raise test_support
.TestFailed(
813 "No subject field in certificate: %s." %
814 pprint
.pformat(cert
))
815 if ((('organizationName', 'Python Software Foundation'),)
816 not in cert
['subject']):
817 raise test_support
.TestFailed(
818 "Missing or invalid 'organizationName' field in certificate subject; "
819 "should be 'Python Software Foundation'.")
825 def testNULLcert(self
):
826 badCertTest(os
.path
.join(os
.path
.dirname(__file__
) or os
.curdir
,
828 def testMalformedCert(self
):
829 badCertTest(os
.path
.join(os
.path
.dirname(__file__
) or os
.curdir
,
831 def testWrongCert(self
):
832 badCertTest(os
.path
.join(os
.path
.dirname(__file__
) or os
.curdir
,
834 def testMalformedKey(self
):
835 badCertTest(os
.path
.join(os
.path
.dirname(__file__
) or os
.curdir
,
838 def testProtocolSSL2(self
):
839 if test_support
.verbose
:
840 sys
.stdout
.write("\n")
841 tryProtocolCombo(ssl
.PROTOCOL_SSLv2
, ssl
.PROTOCOL_SSLv2
, True)
842 tryProtocolCombo(ssl
.PROTOCOL_SSLv2
, ssl
.PROTOCOL_SSLv2
, True, ssl
.CERT_OPTIONAL
)
843 tryProtocolCombo(ssl
.PROTOCOL_SSLv2
, ssl
.PROTOCOL_SSLv2
, True, ssl
.CERT_REQUIRED
)
844 tryProtocolCombo(ssl
.PROTOCOL_SSLv2
, ssl
.PROTOCOL_SSLv23
, True)
845 tryProtocolCombo(ssl
.PROTOCOL_SSLv2
, ssl
.PROTOCOL_SSLv3
, False)
846 tryProtocolCombo(ssl
.PROTOCOL_SSLv2
, ssl
.PROTOCOL_TLSv1
, False)
848 def testProtocolSSL23(self
):
849 if test_support
.verbose
:
850 sys
.stdout
.write("\n")
852 tryProtocolCombo(ssl
.PROTOCOL_SSLv23
, ssl
.PROTOCOL_SSLv2
, True)
853 except test_support
.TestFailed
, x
:
854 # this fails on some older versions of OpenSSL (0.9.7l, for instance)
855 if test_support
.verbose
:
857 " SSL2 client to SSL23 server test unexpectedly failed:\n %s\n"
859 tryProtocolCombo(ssl
.PROTOCOL_SSLv23
, ssl
.PROTOCOL_SSLv3
, True)
860 tryProtocolCombo(ssl
.PROTOCOL_SSLv23
, ssl
.PROTOCOL_SSLv23
, True)
861 tryProtocolCombo(ssl
.PROTOCOL_SSLv23
, ssl
.PROTOCOL_TLSv1
, True)
863 tryProtocolCombo(ssl
.PROTOCOL_SSLv23
, ssl
.PROTOCOL_SSLv3
, True, ssl
.CERT_OPTIONAL
)
864 tryProtocolCombo(ssl
.PROTOCOL_SSLv23
, ssl
.PROTOCOL_SSLv23
, True, ssl
.CERT_OPTIONAL
)
865 tryProtocolCombo(ssl
.PROTOCOL_SSLv23
, ssl
.PROTOCOL_TLSv1
, True, ssl
.CERT_OPTIONAL
)
867 tryProtocolCombo(ssl
.PROTOCOL_SSLv23
, ssl
.PROTOCOL_SSLv3
, True, ssl
.CERT_REQUIRED
)
868 tryProtocolCombo(ssl
.PROTOCOL_SSLv23
, ssl
.PROTOCOL_SSLv23
, True, ssl
.CERT_REQUIRED
)
869 tryProtocolCombo(ssl
.PROTOCOL_SSLv23
, ssl
.PROTOCOL_TLSv1
, True, ssl
.CERT_REQUIRED
)
871 def testProtocolSSL3(self
):
872 if test_support
.verbose
:
873 sys
.stdout
.write("\n")
874 tryProtocolCombo(ssl
.PROTOCOL_SSLv3
, ssl
.PROTOCOL_SSLv3
, True)
875 tryProtocolCombo(ssl
.PROTOCOL_SSLv3
, ssl
.PROTOCOL_SSLv3
, True, ssl
.CERT_OPTIONAL
)
876 tryProtocolCombo(ssl
.PROTOCOL_SSLv3
, ssl
.PROTOCOL_SSLv3
, True, ssl
.CERT_REQUIRED
)
877 tryProtocolCombo(ssl
.PROTOCOL_SSLv3
, ssl
.PROTOCOL_SSLv2
, False)
878 tryProtocolCombo(ssl
.PROTOCOL_SSLv3
, ssl
.PROTOCOL_SSLv23
, False)
879 tryProtocolCombo(ssl
.PROTOCOL_SSLv3
, ssl
.PROTOCOL_TLSv1
, False)
881 def testProtocolTLS1(self
):
882 if test_support
.verbose
:
883 sys
.stdout
.write("\n")
884 tryProtocolCombo(ssl
.PROTOCOL_TLSv1
, ssl
.PROTOCOL_TLSv1
, True)
885 tryProtocolCombo(ssl
.PROTOCOL_TLSv1
, ssl
.PROTOCOL_TLSv1
, True, ssl
.CERT_OPTIONAL
)
886 tryProtocolCombo(ssl
.PROTOCOL_TLSv1
, ssl
.PROTOCOL_TLSv1
, True, ssl
.CERT_REQUIRED
)
887 tryProtocolCombo(ssl
.PROTOCOL_TLSv1
, ssl
.PROTOCOL_SSLv2
, False)
888 tryProtocolCombo(ssl
.PROTOCOL_TLSv1
, ssl
.PROTOCOL_SSLv3
, False)
889 tryProtocolCombo(ssl
.PROTOCOL_TLSv1
, ssl
.PROTOCOL_SSLv23
, False)
891 def testSTARTTLS (self
):
893 msgs
= ("msg 1", "MSG 2", "STARTTLS", "MSG 3", "msg 4", "ENDTLS", "msg 5", "msg 6")
895 server
= ThreadedEchoServer(CERTFILE
,
896 ssl_version
=ssl
.PROTOCOL_TLSv1
,
897 starttls_server
=True,
899 connectionchatty
=True)
900 flag
= threading
.Event()
902 # wait for it to start
910 s
.connect((HOST
, server
.port
))
912 raise test_support
.TestFailed("Unexpected exception: " + str(x
))
914 if test_support
.verbose
:
915 sys
.stdout
.write("\n")
917 if test_support
.verbose
:
919 " client: sending %s...\n" % repr(indata
))
922 outdata
= conn
.read()
925 outdata
= s
.recv(1024)
926 if (indata
== "STARTTLS" and
927 outdata
.strip().lower().startswith("ok")):
928 if test_support
.verbose
:
930 " client: read %s from server, starting TLS...\n"
932 conn
= ssl
.wrap_socket(s
, ssl_version
=ssl
.PROTOCOL_TLSv1
)
934 elif (indata
== "ENDTLS" and
935 outdata
.strip().lower().startswith("ok")):
936 if test_support
.verbose
:
938 " client: read %s from server, ending TLS...\n"
943 if test_support
.verbose
:
945 " client: read %s from server\n" % repr(outdata
))
946 if test_support
.verbose
:
947 sys
.stdout
.write(" client: closing connection.\n")
957 def testSocketServer(self
):
959 server
= SocketServerHTTPSServer(CERTFILE
)
960 flag
= threading
.Event()
962 # wait for it to start
966 if test_support
.verbose
:
967 sys
.stdout
.write('\n')
968 d1
= open(CERTFILE
, 'rb').read()
970 # now fetch the same data from the HTTPS server
971 url
= 'https://127.0.0.1:%d/%s' % (
972 server
.port
, os
.path
.split(CERTFILE
)[1])
973 f
= urllib
.urlopen(url
)
974 dlen
= f
.info().getheader("content-length")
975 if dlen
and (int(dlen
) > 0):
976 d2
= f
.read(int(dlen
))
977 if test_support
.verbose
:
979 " client: read %d bytes from remote server '%s'\n"
983 msg
= ''.join(traceback
.format_exception(*sys
.exc_info()))
984 if test_support
.verbose
:
985 sys
.stdout
.write('\n' + msg
)
986 raise test_support
.TestFailed(msg
)
989 raise test_support
.TestFailed(
990 "Couldn't fetch data from HTTPS server")
995 def testWrappedAccept (self
):
997 if test_support
.verbose
:
998 sys
.stdout
.write("\n")
999 serverParamsTest(CERTFILE
, ssl
.PROTOCOL_SSLv23
, ssl
.CERT_REQUIRED
,
1000 CERTFILE
, CERTFILE
, ssl
.PROTOCOL_SSLv23
,
1001 chatty
=True, connectionchatty
=True,
1002 wrap_accepting_socket
=True)
1005 def testAsyncoreServer (self
):
1007 indata
= "TEST MESSAGE of mixed case\n"
1009 if test_support
.verbose
:
1010 sys
.stdout
.write("\n")
1011 server
= AsyncoreEchoServer(CERTFILE
)
1012 flag
= threading
.Event()
1014 # wait for it to start
1019 s
= ssl
.wrap_socket(socket
.socket())
1020 s
.connect(('127.0.0.1', server
.port
))
1021 except ssl
.SSLError
, x
:
1022 raise test_support
.TestFailed("Unexpected SSL error: " + str(x
))
1023 except Exception, x
:
1024 raise test_support
.TestFailed("Unexpected exception: " + str(x
))
1026 if test_support
.verbose
:
1028 " client: sending %s...\n" % (repr(indata
)))
1031 if test_support
.verbose
:
1032 sys
.stdout
.write(" client: read %s\n" % repr(outdata
))
1033 if outdata
!= indata
.lower():
1034 raise test_support
.TestFailed(
1035 "bad data <<%s>> (%d) received; expected <<%s>> (%d)\n"
1036 % (outdata
[:min(len(outdata
),20)], len(outdata
),
1037 indata
[:min(len(indata
),20)].lower(), len(indata
)))
1039 if test_support
.verbose
:
1040 sys
.stdout
.write(" client: closing connection.\n")
1044 # wait for server thread to end
1048 def testAllRecvAndSendMethods(self
):
1050 if test_support
.verbose
:
1051 sys
.stdout
.write("\n")
1053 server
= ThreadedEchoServer(CERTFILE
,
1054 certreqs
=ssl
.CERT_NONE
,
1055 ssl_version
=ssl
.PROTOCOL_TLSv1
,
1058 connectionchatty
=False)
1059 flag
= threading
.Event()
1061 # wait for it to start
1065 s
= ssl
.wrap_socket(socket
.socket(),
1069 cert_reqs
=ssl
.CERT_NONE
,
1070 ssl_version
=ssl
.PROTOCOL_TLSv1
)
1071 s
.connect((HOST
, server
.port
))
1072 except ssl
.SSLError
as x
:
1073 raise support
.TestFailed("Unexpected SSL error: " + str(x
))
1074 except Exception as x
:
1075 raise support
.TestFailed("Unexpected exception: " + str(x
))
1077 # helper methods for standardising recv* method signatures
1079 b
= bytearray("\0"*100)
1080 count
= s
.recv_into(b
)
1083 def _recvfrom_into():
1084 b
= bytearray("\0"*100)
1085 count
, addr
= s
.recvfrom_into(b
)
1088 # (name, method, whether to expect success, *args)
1090 ('send', s
.send
, True, []),
1091 ('sendto', s
.sendto
, False, ["some.address"]),
1092 ('sendall', s
.sendall
, True, []),
1095 ('recv', s
.recv
, True, []),
1096 ('recvfrom', s
.recvfrom
, False, ["some.address"]),
1097 ('recv_into', _recv_into
, True, []),
1098 ('recvfrom_into', _recvfrom_into
, False, []),
1100 data_prefix
= u
"PREFIX_"
1102 for meth_name
, send_meth
, expect_success
, args
in send_methods
:
1103 indata
= data_prefix
+ meth_name
1105 send_meth(indata
.encode('ASCII', 'strict'), *args
)
1107 outdata
= outdata
.decode('ASCII', 'strict')
1108 if outdata
!= indata
.lower():
1109 raise support
.TestFailed(
1110 "While sending with <<%s>> bad data "
1111 "<<%r>> (%d) received; "
1112 "expected <<%r>> (%d)\n" % (
1113 meth_name
, outdata
[:20], len(outdata
),
1114 indata
[:20], len(indata
)
1117 except ValueError as e
:
1119 raise support
.TestFailed(
1120 "Failed to send with method <<%s>>; "
1121 "expected to succeed.\n" % (meth_name
,)
1123 if not str(e
).startswith(meth_name
):
1124 raise support
.TestFailed(
1125 "Method <<%s>> failed with unexpected "
1126 "exception message: %s\n" % (
1131 for meth_name
, recv_meth
, expect_success
, args
in recv_methods
:
1132 indata
= data_prefix
+ meth_name
1134 s
.send(indata
.encode('ASCII', 'strict'))
1135 outdata
= recv_meth(*args
)
1136 outdata
= outdata
.decode('ASCII', 'strict')
1137 if outdata
!= indata
.lower():
1138 raise support
.TestFailed(
1139 "While receiving with <<%s>> bad data "
1140 "<<%r>> (%d) received; "
1141 "expected <<%r>> (%d)\n" % (
1142 meth_name
, outdata
[:20], len(outdata
),
1143 indata
[:20], len(indata
)
1146 except ValueError as e
:
1148 raise support
.TestFailed(
1149 "Failed to receive with method <<%s>>; "
1150 "expected to succeed.\n" % (meth_name
,)
1152 if not str(e
).startswith(meth_name
):
1153 raise support
.TestFailed(
1154 "Method <<%s>> failed with unexpected "
1155 "exception message: %s\n" % (
1162 s
.write("over\n".encode("ASCII", "strict"))
1169 def test_main(verbose
=False):
1171 raise unittest
.SkipTest("No SSL support")
1173 global CERTFILE
, SVN_PYTHON_ORG_ROOT_CERT
1174 CERTFILE
= os
.path
.join(os
.path
.dirname(__file__
) or os
.curdir
,
1176 SVN_PYTHON_ORG_ROOT_CERT
= os
.path
.join(
1177 os
.path
.dirname(__file__
) or os
.curdir
,
1178 "https_svn_python_org_root.pem")
1180 if (not os
.path
.exists(CERTFILE
) or
1181 not os
.path
.exists(SVN_PYTHON_ORG_ROOT_CERT
)):
1182 raise test_support
.TestFailed("Can't read certificate files!")
1184 TESTPORT
= test_support
.find_unused_port()
1186 raise test_support
.TestFailed("Can't find open port to test servers on!")
1188 tests
= [BasicTests
]
1190 if test_support
.is_resource_enabled('network'):
1191 tests
.append(NetworkedTests
)
1194 thread_info
= test_support
.threading_setup()
1195 if thread_info
and test_support
.is_resource_enabled('network'):
1196 tests
.append(ThreadedTests
)
1198 test_support
.run_unittest(*tests
)
1201 test_support
.threading_cleanup(*thread_info
)
1203 if __name__
== "__main__":