move sections
[python/dscho.git] / Lib / test / test_ssl.py
blob813968ead6ff36758254638b4ac5250c91412eff
1 # Test the support for SSL and sockets
3 import sys
4 import unittest
5 from test import test_support
6 import asyncore
7 import socket
8 import select
9 import time
10 import gc
11 import os
12 import errno
13 import pprint
14 import urllib, urlparse
15 import traceback
16 import weakref
18 from BaseHTTPServer import HTTPServer
19 from SimpleHTTPServer import SimpleHTTPRequestHandler
21 # Optionally test SSL support, if we have it in the tested platform
22 skip_expected = False
23 try:
24 import ssl
25 except ImportError:
26 skip_expected = True
28 HOST = test_support.HOST
29 CERTFILE = None
30 SVN_PYTHON_ORG_ROOT_CERT = None
32 def handle_error(prefix):
33 exc_format = ' '.join(traceback.format_exception(*sys.exc_info()))
34 if test_support.verbose:
35 sys.stdout.write(prefix + exc_format)
38 class BasicTests(unittest.TestCase):
40 def test_sslwrap_simple(self):
41 # A crude test for the legacy API
42 try:
43 ssl.sslwrap_simple(socket.socket(socket.AF_INET))
44 except IOError, e:
45 if e.errno == 32: # broken pipe when ssl_sock.do_handshake(), this test doesn't care about that
46 pass
47 else:
48 raise
49 try:
50 ssl.sslwrap_simple(socket.socket(socket.AF_INET)._sock)
51 except IOError, e:
52 if e.errno == 32: # broken pipe when ssl_sock.do_handshake(), this test doesn't care about that
53 pass
54 else:
55 raise
57 def test_constants(self):
58 ssl.PROTOCOL_SSLv2
59 ssl.PROTOCOL_SSLv23
60 ssl.PROTOCOL_SSLv3
61 ssl.PROTOCOL_TLSv1
62 ssl.CERT_NONE
63 ssl.CERT_OPTIONAL
64 ssl.CERT_REQUIRED
66 def test_random(self):
67 v = ssl.RAND_status()
68 if test_support.verbose:
69 sys.stdout.write("\n RAND_status is %d (%s)\n"
70 % (v, (v and "sufficient randomness") or
71 "insufficient randomness"))
72 try:
73 ssl.RAND_egd(1)
74 except TypeError:
75 pass
76 else:
77 print "didn't raise TypeError"
78 ssl.RAND_add("this is a random string", 75.0)
80 def test_parse_cert(self):
81 # note that this uses an 'unofficial' function in _ssl.c,
82 # provided solely for this test, to exercise the certificate
83 # parsing code
84 p = ssl._ssl._test_decode_cert(CERTFILE, False)
85 if test_support.verbose:
86 sys.stdout.write("\n" + pprint.pformat(p) + "\n")
88 def test_DER_to_PEM(self):
89 with open(SVN_PYTHON_ORG_ROOT_CERT, 'r') as f:
90 pem = f.read()
91 d1 = ssl.PEM_cert_to_DER_cert(pem)
92 p2 = ssl.DER_cert_to_PEM_cert(d1)
93 d2 = ssl.PEM_cert_to_DER_cert(p2)
94 self.assertEqual(d1, d2)
95 if not p2.startswith(ssl.PEM_HEADER + '\n'):
96 self.fail("DER-to-PEM didn't include correct header:\n%r\n" % p2)
97 if not p2.endswith('\n' + ssl.PEM_FOOTER + '\n'):
98 self.fail("DER-to-PEM didn't include correct footer:\n%r\n" % p2)
100 def test_openssl_version(self):
101 n = ssl.OPENSSL_VERSION_NUMBER
102 t = ssl.OPENSSL_VERSION_INFO
103 s = ssl.OPENSSL_VERSION
104 self.assertIsInstance(n, (int, long))
105 self.assertIsInstance(t, tuple)
106 self.assertIsInstance(s, str)
107 # Some sanity checks follow
108 # >= 0.9
109 self.assertGreaterEqual(n, 0x900000)
110 # < 2.0
111 self.assertLess(n, 0x20000000)
112 major, minor, fix, patch, status = t
113 self.assertGreaterEqual(major, 0)
114 self.assertLess(major, 2)
115 self.assertGreaterEqual(minor, 0)
116 self.assertLess(minor, 256)
117 self.assertGreaterEqual(fix, 0)
118 self.assertLess(fix, 256)
119 self.assertGreaterEqual(patch, 0)
120 self.assertLessEqual(patch, 26)
121 self.assertGreaterEqual(status, 0)
122 self.assertLessEqual(status, 15)
123 # Version string as returned by OpenSSL, the format might change
124 self.assertTrue(s.startswith("OpenSSL {:d}.{:d}.{:d}".format(major, minor, fix)),
125 (s, t))
127 def test_ciphers(self):
128 if not test_support.is_resource_enabled('network'):
129 return
130 remote = ("svn.python.org", 443)
131 s = ssl.wrap_socket(socket.socket(socket.AF_INET),
132 cert_reqs=ssl.CERT_NONE, ciphers="ALL")
133 s.connect(remote)
134 s = ssl.wrap_socket(socket.socket(socket.AF_INET),
135 cert_reqs=ssl.CERT_NONE, ciphers="DEFAULT")
136 s.connect(remote)
137 # Error checking occurs when connecting, because the SSL context
138 # isn't created before.
139 s = ssl.wrap_socket(socket.socket(socket.AF_INET),
140 cert_reqs=ssl.CERT_NONE, ciphers="^$:,;?*'dorothyx")
141 with self.assertRaisesRegexp(ssl.SSLError, "No cipher can be selected"):
142 s.connect(remote)
144 @test_support.cpython_only
145 def test_refcycle(self):
146 # Issue #7943: an SSL object doesn't create reference cycles with
147 # itself.
148 s = socket.socket(socket.AF_INET)
149 ss = ssl.wrap_socket(s)
150 wr = weakref.ref(ss)
151 del ss
152 self.assertEqual(wr(), None)
155 class NetworkedTests(unittest.TestCase):
157 def test_connect(self):
158 s = ssl.wrap_socket(socket.socket(socket.AF_INET),
159 cert_reqs=ssl.CERT_NONE)
160 s.connect(("svn.python.org", 443))
161 c = s.getpeercert()
162 if c:
163 self.fail("Peer cert %s shouldn't be here!")
164 s.close()
166 # this should fail because we have no verification certs
167 s = ssl.wrap_socket(socket.socket(socket.AF_INET),
168 cert_reqs=ssl.CERT_REQUIRED)
169 try:
170 s.connect(("svn.python.org", 443))
171 except ssl.SSLError:
172 pass
173 finally:
174 s.close()
176 # this should succeed because we specify the root cert
177 s = ssl.wrap_socket(socket.socket(socket.AF_INET),
178 cert_reqs=ssl.CERT_REQUIRED,
179 ca_certs=SVN_PYTHON_ORG_ROOT_CERT)
180 try:
181 s.connect(("svn.python.org", 443))
182 finally:
183 s.close()
185 @unittest.skipIf(os.name == "nt", "Can't use a socket as a file under Windows")
186 def test_makefile_close(self):
187 # Issue #5238: creating a file-like object with makefile() shouldn't
188 # delay closing the underlying "real socket" (here tested with its
189 # file descriptor, hence skipping the test under Windows).
190 ss = ssl.wrap_socket(socket.socket(socket.AF_INET))
191 ss.connect(("svn.python.org", 443))
192 fd = ss.fileno()
193 f = ss.makefile()
194 f.close()
195 # The fd is still open
196 os.read(fd, 0)
197 # Closing the SSL socket should close the fd too
198 ss.close()
199 gc.collect()
200 with self.assertRaises(OSError) as e:
201 os.read(fd, 0)
202 self.assertEqual(e.exception.errno, errno.EBADF)
204 def test_non_blocking_handshake(self):
205 s = socket.socket(socket.AF_INET)
206 s.connect(("svn.python.org", 443))
207 s.setblocking(False)
208 s = ssl.wrap_socket(s,
209 cert_reqs=ssl.CERT_NONE,
210 do_handshake_on_connect=False)
211 count = 0
212 while True:
213 try:
214 count += 1
215 s.do_handshake()
216 break
217 except ssl.SSLError, err:
218 if err.args[0] == ssl.SSL_ERROR_WANT_READ:
219 select.select([s], [], [])
220 elif err.args[0] == ssl.SSL_ERROR_WANT_WRITE:
221 select.select([], [s], [])
222 else:
223 raise
224 s.close()
225 if test_support.verbose:
226 sys.stdout.write("\nNeeded %d calls to do_handshake() to establish session.\n" % count)
228 def test_get_server_certificate(self):
229 pem = ssl.get_server_certificate(("svn.python.org", 443))
230 if not pem:
231 self.fail("No server certificate on svn.python.org:443!")
233 try:
234 pem = ssl.get_server_certificate(("svn.python.org", 443), ca_certs=CERTFILE)
235 except ssl.SSLError:
236 #should fail
237 pass
238 else:
239 self.fail("Got server certificate %s for svn.python.org!" % pem)
241 pem = ssl.get_server_certificate(("svn.python.org", 443), ca_certs=SVN_PYTHON_ORG_ROOT_CERT)
242 if not pem:
243 self.fail("No server certificate on svn.python.org:443!")
244 if test_support.verbose:
245 sys.stdout.write("\nVerified certificate for svn.python.org:443 is\n%s\n" % pem)
247 def test_algorithms(self):
248 # Issue #8484: all algorithms should be available when verifying a
249 # certificate.
250 # SHA256 was added in OpenSSL 0.9.8
251 if ssl.OPENSSL_VERSION_INFO < (0, 9, 8, 0, 15):
252 self.skipTest("SHA256 not available on %r" % ssl.OPENSSL_VERSION)
253 # NOTE: https://sha256.tbs-internet.com is another possible test host
254 remote = ("sha2.hboeck.de", 443)
255 sha256_cert = os.path.join(os.path.dirname(__file__), "sha256.pem")
256 s = ssl.wrap_socket(socket.socket(socket.AF_INET),
257 cert_reqs=ssl.CERT_REQUIRED,
258 ca_certs=sha256_cert,)
259 with test_support.transient_internet():
260 try:
261 s.connect(remote)
262 if test_support.verbose:
263 sys.stdout.write("\nCipher with %r is %r\n" %
264 (remote, s.cipher()))
265 sys.stdout.write("Certificate is:\n%s\n" %
266 pprint.pformat(s.getpeercert()))
267 finally:
268 s.close()
271 try:
272 import threading
273 except ImportError:
274 _have_threads = False
275 else:
276 _have_threads = True
278 class ThreadedEchoServer(threading.Thread):
280 class ConnectionHandler(threading.Thread):
282 """A mildly complicated class, because we want it to work both
283 with and without the SSL wrapper around the socket connection, so
284 that we can test the STARTTLS functionality."""
286 def __init__(self, server, connsock):
287 self.server = server
288 self.running = False
289 self.sock = connsock
290 self.sock.setblocking(1)
291 self.sslconn = None
292 threading.Thread.__init__(self)
293 self.daemon = True
295 def show_conn_details(self):
296 if self.server.certreqs == ssl.CERT_REQUIRED:
297 cert = self.sslconn.getpeercert()
298 if test_support.verbose and self.server.chatty:
299 sys.stdout.write(" client cert is " + pprint.pformat(cert) + "\n")
300 cert_binary = self.sslconn.getpeercert(True)
301 if test_support.verbose and self.server.chatty:
302 sys.stdout.write(" cert binary is " + str(len(cert_binary)) + " bytes\n")
303 cipher = self.sslconn.cipher()
304 if test_support.verbose and self.server.chatty:
305 sys.stdout.write(" server: connection cipher is now " + str(cipher) + "\n")
307 def wrap_conn(self):
308 try:
309 self.sslconn = ssl.wrap_socket(self.sock, server_side=True,
310 certfile=self.server.certificate,
311 ssl_version=self.server.protocol,
312 ca_certs=self.server.cacerts,
313 cert_reqs=self.server.certreqs,
314 ciphers=self.server.ciphers)
315 except ssl.SSLError:
316 # XXX Various errors can have happened here, for example
317 # a mismatching protocol version, an invalid certificate,
318 # or a low-level bug. This should be made more discriminating.
319 if self.server.chatty:
320 handle_error("\n server: bad connection attempt from " +
321 str(self.sock.getpeername()) + ":\n")
322 self.close()
323 self.running = False
324 self.server.stop()
325 return False
326 else:
327 return True
329 def read(self):
330 if self.sslconn:
331 return self.sslconn.read()
332 else:
333 return self.sock.recv(1024)
335 def write(self, bytes):
336 if self.sslconn:
337 return self.sslconn.write(bytes)
338 else:
339 return self.sock.send(bytes)
341 def close(self):
342 if self.sslconn:
343 self.sslconn.close()
344 else:
345 self.sock._sock.close()
347 def run(self):
348 self.running = True
349 if not self.server.starttls_server:
350 if isinstance(self.sock, ssl.SSLSocket):
351 self.sslconn = self.sock
352 elif not self.wrap_conn():
353 return
354 self.show_conn_details()
355 while self.running:
356 try:
357 msg = self.read()
358 if not msg:
359 # eof, so quit this handler
360 self.running = False
361 self.close()
362 elif msg.strip() == 'over':
363 if test_support.verbose and self.server.connectionchatty:
364 sys.stdout.write(" server: client closed connection\n")
365 self.close()
366 return
367 elif self.server.starttls_server and msg.strip() == 'STARTTLS':
368 if test_support.verbose and self.server.connectionchatty:
369 sys.stdout.write(" server: read STARTTLS from client, sending OK...\n")
370 self.write("OK\n")
371 if not self.wrap_conn():
372 return
373 elif self.server.starttls_server and self.sslconn and msg.strip() == 'ENDTLS':
374 if test_support.verbose and self.server.connectionchatty:
375 sys.stdout.write(" server: read ENDTLS from client, sending OK...\n")
376 self.write("OK\n")
377 self.sslconn.unwrap()
378 self.sslconn = None
379 if test_support.verbose and self.server.connectionchatty:
380 sys.stdout.write(" server: connection is now unencrypted...\n")
381 else:
382 if (test_support.verbose and
383 self.server.connectionchatty):
384 ctype = (self.sslconn and "encrypted") or "unencrypted"
385 sys.stdout.write(" server: read %s (%s), sending back %s (%s)...\n"
386 % (repr(msg), ctype, repr(msg.lower()), ctype))
387 self.write(msg.lower())
388 except ssl.SSLError:
389 if self.server.chatty:
390 handle_error("Test server failure:\n")
391 self.close()
392 self.running = False
393 # normally, we'd just stop here, but for the test
394 # harness, we want to stop the server
395 self.server.stop()
397 def __init__(self, certificate, ssl_version=None,
398 certreqs=None, cacerts=None,
399 chatty=True, connectionchatty=False, starttls_server=False,
400 wrap_accepting_socket=False, ciphers=None):
402 if ssl_version is None:
403 ssl_version = ssl.PROTOCOL_TLSv1
404 if certreqs is None:
405 certreqs = ssl.CERT_NONE
406 self.certificate = certificate
407 self.protocol = ssl_version
408 self.certreqs = certreqs
409 self.cacerts = cacerts
410 self.ciphers = ciphers
411 self.chatty = chatty
412 self.connectionchatty = connectionchatty
413 self.starttls_server = starttls_server
414 self.sock = socket.socket()
415 self.flag = None
416 if wrap_accepting_socket:
417 self.sock = ssl.wrap_socket(self.sock, server_side=True,
418 certfile=self.certificate,
419 cert_reqs = self.certreqs,
420 ca_certs = self.cacerts,
421 ssl_version = self.protocol,
422 ciphers = self.ciphers)
423 if test_support.verbose and self.chatty:
424 sys.stdout.write(' server: wrapped server socket as %s\n' % str(self.sock))
425 self.port = test_support.bind_port(self.sock)
426 self.active = False
427 threading.Thread.__init__(self)
428 self.daemon = True
430 def start(self, flag=None):
431 self.flag = flag
432 threading.Thread.start(self)
434 def run(self):
435 self.sock.settimeout(0.05)
436 self.sock.listen(5)
437 self.active = True
438 if self.flag:
439 # signal an event
440 self.flag.set()
441 while self.active:
442 try:
443 newconn, connaddr = self.sock.accept()
444 if test_support.verbose and self.chatty:
445 sys.stdout.write(' server: new connection from '
446 + str(connaddr) + '\n')
447 handler = self.ConnectionHandler(self, newconn)
448 handler.start()
449 except socket.timeout:
450 pass
451 except KeyboardInterrupt:
452 self.stop()
453 self.sock.close()
455 def stop(self):
456 self.active = False
458 class AsyncoreEchoServer(threading.Thread):
460 class EchoServer(asyncore.dispatcher):
462 class ConnectionHandler(asyncore.dispatcher_with_send):
464 def __init__(self, conn, certfile):
465 asyncore.dispatcher_with_send.__init__(self, conn)
466 self.socket = ssl.wrap_socket(conn, server_side=True,
467 certfile=certfile,
468 do_handshake_on_connect=False)
469 self._ssl_accepting = True
471 def readable(self):
472 if isinstance(self.socket, ssl.SSLSocket):
473 while self.socket.pending() > 0:
474 self.handle_read_event()
475 return True
477 def _do_ssl_handshake(self):
478 try:
479 self.socket.do_handshake()
480 except ssl.SSLError, err:
481 if err.args[0] in (ssl.SSL_ERROR_WANT_READ,
482 ssl.SSL_ERROR_WANT_WRITE):
483 return
484 elif err.args[0] == ssl.SSL_ERROR_EOF:
485 return self.handle_close()
486 raise
487 except socket.error, err:
488 if err.args[0] == errno.ECONNABORTED:
489 return self.handle_close()
490 else:
491 self._ssl_accepting = False
493 def handle_read(self):
494 if self._ssl_accepting:
495 self._do_ssl_handshake()
496 else:
497 data = self.recv(1024)
498 if data and data.strip() != 'over':
499 self.send(data.lower())
501 def handle_close(self):
502 self.close()
503 if test_support.verbose:
504 sys.stdout.write(" server: closed connection %s\n" % self.socket)
506 def handle_error(self):
507 raise
509 def __init__(self, certfile):
510 self.certfile = certfile
511 asyncore.dispatcher.__init__(self)
512 self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
513 self.port = test_support.bind_port(self.socket)
514 self.listen(5)
516 def handle_accept(self):
517 sock_obj, addr = self.accept()
518 if test_support.verbose:
519 sys.stdout.write(" server: new connection from %s:%s\n" %addr)
520 self.ConnectionHandler(sock_obj, self.certfile)
522 def handle_error(self):
523 raise
525 def __init__(self, certfile):
526 self.flag = None
527 self.active = False
528 self.server = self.EchoServer(certfile)
529 self.port = self.server.port
530 threading.Thread.__init__(self)
531 self.daemon = True
533 def __str__(self):
534 return "<%s %s>" % (self.__class__.__name__, self.server)
536 def start(self, flag=None):
537 self.flag = flag
538 threading.Thread.start(self)
540 def run(self):
541 self.active = True
542 if self.flag:
543 self.flag.set()
544 while self.active:
545 asyncore.loop(0.05)
547 def stop(self):
548 self.active = False
549 self.server.close()
551 class SocketServerHTTPSServer(threading.Thread):
553 class HTTPSServer(HTTPServer):
555 def __init__(self, server_address, RequestHandlerClass, certfile):
556 HTTPServer.__init__(self, server_address, RequestHandlerClass)
557 # we assume the certfile contains both private key and certificate
558 self.certfile = certfile
559 self.allow_reuse_address = True
561 def __str__(self):
562 return ('<%s %s:%s>' %
563 (self.__class__.__name__,
564 self.server_name,
565 self.server_port))
567 def get_request(self):
568 # override this to wrap socket with SSL
569 sock, addr = self.socket.accept()
570 sslconn = ssl.wrap_socket(sock, server_side=True,
571 certfile=self.certfile)
572 return sslconn, addr
574 class RootedHTTPRequestHandler(SimpleHTTPRequestHandler):
575 # need to override translate_path to get a known root,
576 # instead of using os.curdir, since the test could be
577 # run from anywhere
579 server_version = "TestHTTPS/1.0"
581 root = None
583 def translate_path(self, path):
584 """Translate a /-separated PATH to the local filename syntax.
586 Components that mean special things to the local file system
587 (e.g. drive or directory names) are ignored. (XXX They should
588 probably be diagnosed.)
591 # abandon query parameters
592 path = urlparse.urlparse(path)[2]
593 path = os.path.normpath(urllib.unquote(path))
594 words = path.split('/')
595 words = filter(None, words)
596 path = self.root
597 for word in words:
598 drive, word = os.path.splitdrive(word)
599 head, word = os.path.split(word)
600 if word in self.root: continue
601 path = os.path.join(path, word)
602 return path
604 def log_message(self, format, *args):
606 # we override this to suppress logging unless "verbose"
608 if test_support.verbose:
609 sys.stdout.write(" server (%s:%d %s):\n [%s] %s\n" %
610 (self.server.server_address,
611 self.server.server_port,
612 self.request.cipher(),
613 self.log_date_time_string(),
614 format%args))
617 def __init__(self, certfile):
618 self.flag = None
619 self.RootedHTTPRequestHandler.root = os.path.split(CERTFILE)[0]
620 self.server = self.HTTPSServer(
621 (HOST, 0), self.RootedHTTPRequestHandler, certfile)
622 self.port = self.server.server_port
623 threading.Thread.__init__(self)
624 self.daemon = True
626 def __str__(self):
627 return "<%s %s>" % (self.__class__.__name__, self.server)
629 def start(self, flag=None):
630 self.flag = flag
631 threading.Thread.start(self)
633 def run(self):
634 if self.flag:
635 self.flag.set()
636 self.server.serve_forever(0.05)
638 def stop(self):
639 self.server.shutdown()
642 def bad_cert_test(certfile):
644 Launch a server with CERT_REQUIRED, and check that trying to
645 connect to it with the given client certificate fails.
647 server = ThreadedEchoServer(CERTFILE,
648 certreqs=ssl.CERT_REQUIRED,
649 cacerts=CERTFILE, chatty=False)
650 flag = threading.Event()
651 server.start(flag)
652 # wait for it to start
653 flag.wait()
654 # try to connect
655 try:
656 try:
657 s = ssl.wrap_socket(socket.socket(),
658 certfile=certfile,
659 ssl_version=ssl.PROTOCOL_TLSv1)
660 s.connect((HOST, server.port))
661 except ssl.SSLError, x:
662 if test_support.verbose:
663 sys.stdout.write("\nSSLError is %s\n" % x[1])
664 except socket.error, x:
665 if test_support.verbose:
666 sys.stdout.write("\nsocket.error is %s\n" % x[1])
667 else:
668 raise AssertionError("Use of invalid cert should have failed!")
669 finally:
670 server.stop()
671 server.join()
673 def server_params_test(certfile, protocol, certreqs, cacertsfile,
674 client_certfile, client_protocol=None, indata="FOO\n",
675 ciphers=None, chatty=True, connectionchatty=False,
676 wrap_accepting_socket=False):
678 Launch a server, connect a client to it and try various reads
679 and writes.
681 server = ThreadedEchoServer(certfile,
682 certreqs=certreqs,
683 ssl_version=protocol,
684 cacerts=cacertsfile,
685 ciphers=ciphers,
686 chatty=chatty,
687 connectionchatty=connectionchatty,
688 wrap_accepting_socket=wrap_accepting_socket)
689 flag = threading.Event()
690 server.start(flag)
691 # wait for it to start
692 flag.wait()
693 # try to connect
694 if client_protocol is None:
695 client_protocol = protocol
696 try:
697 s = ssl.wrap_socket(socket.socket(),
698 certfile=client_certfile,
699 ca_certs=cacertsfile,
700 ciphers=ciphers,
701 cert_reqs=certreqs,
702 ssl_version=client_protocol)
703 s.connect((HOST, server.port))
704 for arg in [indata, bytearray(indata), memoryview(indata)]:
705 if connectionchatty:
706 if test_support.verbose:
707 sys.stdout.write(
708 " client: sending %s...\n" % (repr(arg)))
709 s.write(arg)
710 outdata = s.read()
711 if connectionchatty:
712 if test_support.verbose:
713 sys.stdout.write(" client: read %s\n" % repr(outdata))
714 if outdata != indata.lower():
715 raise AssertionError(
716 "bad data <<%s>> (%d) received; expected <<%s>> (%d)\n"
717 % (outdata[:min(len(outdata),20)], len(outdata),
718 indata[:min(len(indata),20)].lower(), len(indata)))
719 s.write("over\n")
720 if connectionchatty:
721 if test_support.verbose:
722 sys.stdout.write(" client: closing connection.\n")
723 s.close()
724 finally:
725 server.stop()
726 server.join()
728 def try_protocol_combo(server_protocol,
729 client_protocol,
730 expect_success,
731 certsreqs=None):
732 if certsreqs is None:
733 certsreqs = ssl.CERT_NONE
734 certtype = {
735 ssl.CERT_NONE: "CERT_NONE",
736 ssl.CERT_OPTIONAL: "CERT_OPTIONAL",
737 ssl.CERT_REQUIRED: "CERT_REQUIRED",
738 }[certsreqs]
739 if test_support.verbose:
740 formatstr = (expect_success and " %s->%s %s\n") or " {%s->%s} %s\n"
741 sys.stdout.write(formatstr %
742 (ssl.get_protocol_name(client_protocol),
743 ssl.get_protocol_name(server_protocol),
744 certtype))
745 try:
746 # NOTE: we must enable "ALL" ciphers, otherwise an SSLv23 client
747 # will send an SSLv3 hello (rather than SSLv2) starting from
748 # OpenSSL 1.0.0 (see issue #8322).
749 server_params_test(CERTFILE, server_protocol, certsreqs,
750 CERTFILE, CERTFILE, client_protocol,
751 ciphers="ALL", chatty=False)
752 # Protocol mismatch can result in either an SSLError, or a
753 # "Connection reset by peer" error.
754 except ssl.SSLError:
755 if expect_success:
756 raise
757 except socket.error as e:
758 if expect_success or e.errno != errno.ECONNRESET:
759 raise
760 else:
761 if not expect_success:
762 raise AssertionError(
763 "Client protocol %s succeeded with server protocol %s!"
764 % (ssl.get_protocol_name(client_protocol),
765 ssl.get_protocol_name(server_protocol)))
768 class ThreadedTests(unittest.TestCase):
770 def test_rude_shutdown(self):
771 """A brutal shutdown of an SSL server should raise an IOError
772 in the client when attempting handshake.
774 listener_ready = threading.Event()
775 listener_gone = threading.Event()
777 s = socket.socket()
778 port = test_support.bind_port(s, HOST)
780 # `listener` runs in a thread. It sits in an accept() until
781 # the main thread connects. Then it rudely closes the socket,
782 # and sets Event `listener_gone` to let the main thread know
783 # the socket is gone.
784 def listener():
785 s.listen(5)
786 listener_ready.set()
787 s.accept()
788 s.close()
789 listener_gone.set()
791 def connector():
792 listener_ready.wait()
793 c = socket.socket()
794 c.connect((HOST, port))
795 listener_gone.wait()
796 try:
797 ssl_sock = ssl.wrap_socket(c)
798 except IOError:
799 pass
800 else:
801 self.fail('connecting to closed SSL socket should have failed')
803 t = threading.Thread(target=listener)
804 t.start()
805 try:
806 connector()
807 finally:
808 t.join()
810 def test_echo(self):
811 """Basic test of an SSL client connecting to a server"""
812 if test_support.verbose:
813 sys.stdout.write("\n")
814 server_params_test(CERTFILE, ssl.PROTOCOL_TLSv1, ssl.CERT_NONE,
815 CERTFILE, CERTFILE, ssl.PROTOCOL_TLSv1,
816 chatty=True, connectionchatty=True)
818 def test_getpeercert(self):
819 if test_support.verbose:
820 sys.stdout.write("\n")
821 s2 = socket.socket()
822 server = ThreadedEchoServer(CERTFILE,
823 certreqs=ssl.CERT_NONE,
824 ssl_version=ssl.PROTOCOL_SSLv23,
825 cacerts=CERTFILE,
826 chatty=False)
827 flag = threading.Event()
828 server.start(flag)
829 # wait for it to start
830 flag.wait()
831 # try to connect
832 try:
833 s = ssl.wrap_socket(socket.socket(),
834 certfile=CERTFILE,
835 ca_certs=CERTFILE,
836 cert_reqs=ssl.CERT_REQUIRED,
837 ssl_version=ssl.PROTOCOL_SSLv23)
838 s.connect((HOST, server.port))
839 cert = s.getpeercert()
840 self.assertTrue(cert, "Can't get peer certificate.")
841 cipher = s.cipher()
842 if test_support.verbose:
843 sys.stdout.write(pprint.pformat(cert) + '\n')
844 sys.stdout.write("Connection cipher is " + str(cipher) + '.\n')
845 if 'subject' not in cert:
846 self.fail("No subject field in certificate: %s." %
847 pprint.pformat(cert))
848 if ((('organizationName', 'Python Software Foundation'),)
849 not in cert['subject']):
850 self.fail(
851 "Missing or invalid 'organizationName' field in certificate subject; "
852 "should be 'Python Software Foundation'.")
853 s.close()
854 finally:
855 server.stop()
856 server.join()
858 def test_empty_cert(self):
859 """Connecting with an empty cert file"""
860 bad_cert_test(os.path.join(os.path.dirname(__file__) or os.curdir,
861 "nullcert.pem"))
862 def test_malformed_cert(self):
863 """Connecting with a badly formatted certificate (syntax error)"""
864 bad_cert_test(os.path.join(os.path.dirname(__file__) or os.curdir,
865 "badcert.pem"))
866 def test_nonexisting_cert(self):
867 """Connecting with a non-existing cert file"""
868 bad_cert_test(os.path.join(os.path.dirname(__file__) or os.curdir,
869 "wrongcert.pem"))
870 def test_malformed_key(self):
871 """Connecting with a badly formatted key (syntax error)"""
872 bad_cert_test(os.path.join(os.path.dirname(__file__) or os.curdir,
873 "badkey.pem"))
875 def test_protocol_sslv2(self):
876 """Connecting to an SSLv2 server with various client options"""
877 if test_support.verbose:
878 sys.stdout.write("\n")
879 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True)
880 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True, ssl.CERT_OPTIONAL)
881 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True, ssl.CERT_REQUIRED)
882 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv23, True)
883 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv3, False)
884 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLSv1, False)
886 def test_protocol_sslv23(self):
887 """Connecting to an SSLv23 server with various client options"""
888 if test_support.verbose:
889 sys.stdout.write("\n")
890 try:
891 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv2, True)
892 except (ssl.SSLError, socket.error), x:
893 # this fails on some older versions of OpenSSL (0.9.7l, for instance)
894 if test_support.verbose:
895 sys.stdout.write(
896 " SSL2 client to SSL23 server test unexpectedly failed:\n %s\n"
897 % str(x))
898 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv3, True)
899 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv23, True)
900 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1, True)
902 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv3, True, ssl.CERT_OPTIONAL)
903 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv23, True, ssl.CERT_OPTIONAL)
904 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1, True, ssl.CERT_OPTIONAL)
906 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv3, True, ssl.CERT_REQUIRED)
907 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv23, True, ssl.CERT_REQUIRED)
908 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1, True, ssl.CERT_REQUIRED)
910 def test_protocol_sslv3(self):
911 """Connecting to an SSLv3 server with various client options"""
912 if test_support.verbose:
913 sys.stdout.write("\n")
914 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, True)
915 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, True, ssl.CERT_OPTIONAL)
916 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, True, ssl.CERT_REQUIRED)
917 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv2, False)
918 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv23, False)
919 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_TLSv1, False)
921 def test_protocol_tlsv1(self):
922 """Connecting to a TLSv1 server with various client options"""
923 if test_support.verbose:
924 sys.stdout.write("\n")
925 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, True)
926 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, True, ssl.CERT_OPTIONAL)
927 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, True, ssl.CERT_REQUIRED)
928 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv2, False)
929 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv3, False)
930 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv23, False)
932 def test_starttls(self):
933 """Switching from clear text to encrypted and back again."""
934 msgs = ("msg 1", "MSG 2", "STARTTLS", "MSG 3", "msg 4", "ENDTLS", "msg 5", "msg 6")
936 server = ThreadedEchoServer(CERTFILE,
937 ssl_version=ssl.PROTOCOL_TLSv1,
938 starttls_server=True,
939 chatty=True,
940 connectionchatty=True)
941 flag = threading.Event()
942 server.start(flag)
943 # wait for it to start
944 flag.wait()
945 # try to connect
946 wrapped = False
947 try:
948 s = socket.socket()
949 s.setblocking(1)
950 s.connect((HOST, server.port))
951 if test_support.verbose:
952 sys.stdout.write("\n")
953 for indata in msgs:
954 if test_support.verbose:
955 sys.stdout.write(
956 " client: sending %s...\n" % repr(indata))
957 if wrapped:
958 conn.write(indata)
959 outdata = conn.read()
960 else:
961 s.send(indata)
962 outdata = s.recv(1024)
963 if (indata == "STARTTLS" and
964 outdata.strip().lower().startswith("ok")):
965 # STARTTLS ok, switch to secure mode
966 if test_support.verbose:
967 sys.stdout.write(
968 " client: read %s from server, starting TLS...\n"
969 % repr(outdata))
970 conn = ssl.wrap_socket(s, ssl_version=ssl.PROTOCOL_TLSv1)
971 wrapped = True
972 elif (indata == "ENDTLS" and
973 outdata.strip().lower().startswith("ok")):
974 # ENDTLS ok, switch back to clear text
975 if test_support.verbose:
976 sys.stdout.write(
977 " client: read %s from server, ending TLS...\n"
978 % repr(outdata))
979 s = conn.unwrap()
980 wrapped = False
981 else:
982 if test_support.verbose:
983 sys.stdout.write(
984 " client: read %s from server\n" % repr(outdata))
985 if test_support.verbose:
986 sys.stdout.write(" client: closing connection.\n")
987 if wrapped:
988 conn.write("over\n")
989 else:
990 s.send("over\n")
991 s.close()
992 finally:
993 server.stop()
994 server.join()
996 def test_socketserver(self):
997 """Using a SocketServer to create and manage SSL connections."""
998 server = SocketServerHTTPSServer(CERTFILE)
999 flag = threading.Event()
1000 server.start(flag)
1001 # wait for it to start
1002 flag.wait()
1003 # try to connect
1004 try:
1005 if test_support.verbose:
1006 sys.stdout.write('\n')
1007 with open(CERTFILE, 'rb') as f:
1008 d1 = f.read()
1009 d2 = ''
1010 # now fetch the same data from the HTTPS server
1011 url = 'https://127.0.0.1:%d/%s' % (
1012 server.port, os.path.split(CERTFILE)[1])
1013 with test_support.check_py3k_warnings():
1014 f = urllib.urlopen(url)
1015 dlen = f.info().getheader("content-length")
1016 if dlen and (int(dlen) > 0):
1017 d2 = f.read(int(dlen))
1018 if test_support.verbose:
1019 sys.stdout.write(
1020 " client: read %d bytes from remote server '%s'\n"
1021 % (len(d2), server))
1022 f.close()
1023 self.assertEqual(d1, d2)
1024 finally:
1025 server.stop()
1026 server.join()
1028 def test_wrapped_accept(self):
1029 """Check the accept() method on SSL sockets."""
1030 if test_support.verbose:
1031 sys.stdout.write("\n")
1032 server_params_test(CERTFILE, ssl.PROTOCOL_SSLv23, ssl.CERT_REQUIRED,
1033 CERTFILE, CERTFILE, ssl.PROTOCOL_SSLv23,
1034 chatty=True, connectionchatty=True,
1035 wrap_accepting_socket=True)
1037 def test_asyncore_server(self):
1038 """Check the example asyncore integration."""
1039 indata = "TEST MESSAGE of mixed case\n"
1041 if test_support.verbose:
1042 sys.stdout.write("\n")
1043 server = AsyncoreEchoServer(CERTFILE)
1044 flag = threading.Event()
1045 server.start(flag)
1046 # wait for it to start
1047 flag.wait()
1048 # try to connect
1049 try:
1050 s = ssl.wrap_socket(socket.socket())
1051 s.connect(('127.0.0.1', server.port))
1052 if test_support.verbose:
1053 sys.stdout.write(
1054 " client: sending %s...\n" % (repr(indata)))
1055 s.write(indata)
1056 outdata = s.read()
1057 if test_support.verbose:
1058 sys.stdout.write(" client: read %s\n" % repr(outdata))
1059 if outdata != indata.lower():
1060 self.fail(
1061 "bad data <<%s>> (%d) received; expected <<%s>> (%d)\n"
1062 % (outdata[:min(len(outdata),20)], len(outdata),
1063 indata[:min(len(indata),20)].lower(), len(indata)))
1064 s.write("over\n")
1065 if test_support.verbose:
1066 sys.stdout.write(" client: closing connection.\n")
1067 s.close()
1068 finally:
1069 server.stop()
1070 # wait for server thread to end
1071 server.join()
1073 def test_recv_send(self):
1074 """Test recv(), send() and friends."""
1075 if test_support.verbose:
1076 sys.stdout.write("\n")
1078 server = ThreadedEchoServer(CERTFILE,
1079 certreqs=ssl.CERT_NONE,
1080 ssl_version=ssl.PROTOCOL_TLSv1,
1081 cacerts=CERTFILE,
1082 chatty=True,
1083 connectionchatty=False)
1084 flag = threading.Event()
1085 server.start(flag)
1086 # wait for it to start
1087 flag.wait()
1088 # try to connect
1089 s = ssl.wrap_socket(socket.socket(),
1090 server_side=False,
1091 certfile=CERTFILE,
1092 ca_certs=CERTFILE,
1093 cert_reqs=ssl.CERT_NONE,
1094 ssl_version=ssl.PROTOCOL_TLSv1)
1095 s.connect((HOST, server.port))
1096 try:
1097 # helper methods for standardising recv* method signatures
1098 def _recv_into():
1099 b = bytearray("\0"*100)
1100 count = s.recv_into(b)
1101 return b[:count]
1103 def _recvfrom_into():
1104 b = bytearray("\0"*100)
1105 count, addr = s.recvfrom_into(b)
1106 return b[:count]
1108 # (name, method, whether to expect success, *args)
1109 send_methods = [
1110 ('send', s.send, True, []),
1111 ('sendto', s.sendto, False, ["some.address"]),
1112 ('sendall', s.sendall, True, []),
1114 recv_methods = [
1115 ('recv', s.recv, True, []),
1116 ('recvfrom', s.recvfrom, False, ["some.address"]),
1117 ('recv_into', _recv_into, True, []),
1118 ('recvfrom_into', _recvfrom_into, False, []),
1120 data_prefix = u"PREFIX_"
1122 for meth_name, send_meth, expect_success, args in send_methods:
1123 indata = data_prefix + meth_name
1124 try:
1125 send_meth(indata.encode('ASCII', 'strict'), *args)
1126 outdata = s.read()
1127 outdata = outdata.decode('ASCII', 'strict')
1128 if outdata != indata.lower():
1129 self.fail(
1130 "While sending with <<%s>> bad data "
1131 "<<%r>> (%d) received; "
1132 "expected <<%r>> (%d)\n" % (
1133 meth_name, outdata[:20], len(outdata),
1134 indata[:20], len(indata)
1137 except ValueError as e:
1138 if expect_success:
1139 self.fail(
1140 "Failed to send with method <<%s>>; "
1141 "expected to succeed.\n" % (meth_name,)
1143 if not str(e).startswith(meth_name):
1144 self.fail(
1145 "Method <<%s>> failed with unexpected "
1146 "exception message: %s\n" % (
1147 meth_name, e
1151 for meth_name, recv_meth, expect_success, args in recv_methods:
1152 indata = data_prefix + meth_name
1153 try:
1154 s.send(indata.encode('ASCII', 'strict'))
1155 outdata = recv_meth(*args)
1156 outdata = outdata.decode('ASCII', 'strict')
1157 if outdata != indata.lower():
1158 self.fail(
1159 "While receiving with <<%s>> bad data "
1160 "<<%r>> (%d) received; "
1161 "expected <<%r>> (%d)\n" % (
1162 meth_name, outdata[:20], len(outdata),
1163 indata[:20], len(indata)
1166 except ValueError as e:
1167 if expect_success:
1168 self.fail(
1169 "Failed to receive with method <<%s>>; "
1170 "expected to succeed.\n" % (meth_name,)
1172 if not str(e).startswith(meth_name):
1173 self.fail(
1174 "Method <<%s>> failed with unexpected "
1175 "exception message: %s\n" % (
1176 meth_name, e
1179 # consume data
1180 s.read()
1182 s.write("over\n".encode("ASCII", "strict"))
1183 s.close()
1184 finally:
1185 server.stop()
1186 server.join()
1188 def test_handshake_timeout(self):
1189 # Issue #5103: SSL handshake must respect the socket timeout
1190 server = socket.socket(socket.AF_INET)
1191 host = "127.0.0.1"
1192 port = test_support.bind_port(server)
1193 started = threading.Event()
1194 finish = False
1196 def serve():
1197 server.listen(5)
1198 started.set()
1199 conns = []
1200 while not finish:
1201 r, w, e = select.select([server], [], [], 0.1)
1202 if server in r:
1203 # Let the socket hang around rather than having
1204 # it closed by garbage collection.
1205 conns.append(server.accept()[0])
1207 t = threading.Thread(target=serve)
1208 t.start()
1209 started.wait()
1211 try:
1212 try:
1213 c = socket.socket(socket.AF_INET)
1214 c.settimeout(0.2)
1215 c.connect((host, port))
1216 # Will attempt handshake and time out
1217 self.assertRaisesRegexp(ssl.SSLError, "timed out",
1218 ssl.wrap_socket, c)
1219 finally:
1220 c.close()
1221 try:
1222 c = socket.socket(socket.AF_INET)
1223 c.settimeout(0.2)
1224 c = ssl.wrap_socket(c)
1225 # Will attempt handshake and time out
1226 self.assertRaisesRegexp(ssl.SSLError, "timed out",
1227 c.connect, (host, port))
1228 finally:
1229 c.close()
1230 finally:
1231 finish = True
1232 t.join()
1233 server.close()
1236 def test_main(verbose=False):
1237 if skip_expected:
1238 raise unittest.SkipTest("No SSL support")
1240 global CERTFILE, SVN_PYTHON_ORG_ROOT_CERT
1241 CERTFILE = os.path.join(os.path.dirname(__file__) or os.curdir,
1242 "keycert.pem")
1243 SVN_PYTHON_ORG_ROOT_CERT = os.path.join(
1244 os.path.dirname(__file__) or os.curdir,
1245 "https_svn_python_org_root.pem")
1247 if (not os.path.exists(CERTFILE) or
1248 not os.path.exists(SVN_PYTHON_ORG_ROOT_CERT)):
1249 raise test_support.TestFailed("Can't read certificate files!")
1251 tests = [BasicTests]
1253 if test_support.is_resource_enabled('network'):
1254 tests.append(NetworkedTests)
1256 if _have_threads:
1257 thread_info = test_support.threading_setup()
1258 if thread_info and test_support.is_resource_enabled('network'):
1259 tests.append(ThreadedTests)
1261 try:
1262 test_support.run_unittest(*tests)
1263 finally:
1264 if _have_threads:
1265 test_support.threading_cleanup(*thread_info)
1267 if __name__ == "__main__":
1268 test_main()