The silencing of DeprecationWarning was not taking -3 into consideration. Since
[python.git] / Lib / test / test_ssl.py
blob5aca13528a35f62a1b96ac0dcd878e56fc615f1a
1 # Test the support for SSL and sockets
3 import sys
4 import unittest
5 from test import test_support
6 import asyncore
7 import socket
8 import select
9 import errno
10 import subprocess
11 import time
12 import os
13 import pprint
14 import urllib, urlparse
15 import shutil
16 import traceback
18 from BaseHTTPServer import HTTPServer
19 from SimpleHTTPServer import SimpleHTTPRequestHandler
21 # Optionally test SSL support, if we have it in the tested platform
22 skip_expected = False
23 try:
24 import ssl
25 except ImportError:
26 skip_expected = True
28 HOST = test_support.HOST
29 CERTFILE = None
30 SVN_PYTHON_ORG_ROOT_CERT = None
32 def handle_error(prefix):
33 exc_format = ' '.join(traceback.format_exception(*sys.exc_info()))
34 if test_support.verbose:
35 sys.stdout.write(prefix + exc_format)
37 def testSimpleSSLwrap(self):
38 try:
39 ssl.sslwrap_simple(socket.socket(socket.AF_INET))
40 except IOError, e:
41 if e.errno == 32: # broken pipe when ssl_sock.do_handshake(), this test doesn't care about that
42 pass
43 else:
44 raise
45 try:
46 ssl.sslwrap_simple(socket.socket(socket.AF_INET)._sock)
47 except IOError, e:
48 if e.errno == 32: # broken pipe when ssl_sock.do_handshake(), this test doesn't care about that
49 pass
50 else:
51 raise
53 class BasicTests(unittest.TestCase):
55 def testSSLconnect(self):
56 if not test_support.is_resource_enabled('network'):
57 return
58 s = ssl.wrap_socket(socket.socket(socket.AF_INET),
59 cert_reqs=ssl.CERT_NONE)
60 s.connect(("svn.python.org", 443))
61 c = s.getpeercert()
62 if c:
63 raise test_support.TestFailed("Peer cert %s shouldn't be here!")
64 s.close()
66 # this should fail because we have no verification certs
67 s = ssl.wrap_socket(socket.socket(socket.AF_INET),
68 cert_reqs=ssl.CERT_REQUIRED)
69 try:
70 s.connect(("svn.python.org", 443))
71 except ssl.SSLError:
72 pass
73 finally:
74 s.close()
76 def testCrucialConstants(self):
77 ssl.PROTOCOL_SSLv2
78 ssl.PROTOCOL_SSLv23
79 ssl.PROTOCOL_SSLv3
80 ssl.PROTOCOL_TLSv1
81 ssl.CERT_NONE
82 ssl.CERT_OPTIONAL
83 ssl.CERT_REQUIRED
85 def testRAND(self):
86 v = ssl.RAND_status()
87 if test_support.verbose:
88 sys.stdout.write("\n RAND_status is %d (%s)\n"
89 % (v, (v and "sufficient randomness") or
90 "insufficient randomness"))
91 try:
92 ssl.RAND_egd(1)
93 except TypeError:
94 pass
95 else:
96 print "didn't raise TypeError"
97 ssl.RAND_add("this is a random string", 75.0)
99 def testParseCert(self):
100 # note that this uses an 'unofficial' function in _ssl.c,
101 # provided solely for this test, to exercise the certificate
102 # parsing code
103 p = ssl._ssl._test_decode_cert(CERTFILE, False)
104 if test_support.verbose:
105 sys.stdout.write("\n" + pprint.pformat(p) + "\n")
107 def testDERtoPEM(self):
109 pem = open(SVN_PYTHON_ORG_ROOT_CERT, 'r').read()
110 d1 = ssl.PEM_cert_to_DER_cert(pem)
111 p2 = ssl.DER_cert_to_PEM_cert(d1)
112 d2 = ssl.PEM_cert_to_DER_cert(p2)
113 if (d1 != d2):
114 raise test_support.TestFailed("PEM-to-DER or DER-to-PEM translation failed")
116 class NetworkedTests(unittest.TestCase):
118 def testConnect(self):
119 s = ssl.wrap_socket(socket.socket(socket.AF_INET),
120 cert_reqs=ssl.CERT_NONE)
121 s.connect(("svn.python.org", 443))
122 c = s.getpeercert()
123 if c:
124 raise test_support.TestFailed("Peer cert %s shouldn't be here!")
125 s.close()
127 # this should fail because we have no verification certs
128 s = ssl.wrap_socket(socket.socket(socket.AF_INET),
129 cert_reqs=ssl.CERT_REQUIRED)
130 try:
131 s.connect(("svn.python.org", 443))
132 except ssl.SSLError:
133 pass
134 finally:
135 s.close()
137 # this should succeed because we specify the root cert
138 s = ssl.wrap_socket(socket.socket(socket.AF_INET),
139 cert_reqs=ssl.CERT_REQUIRED,
140 ca_certs=SVN_PYTHON_ORG_ROOT_CERT)
141 try:
142 s.connect(("svn.python.org", 443))
143 except ssl.SSLError, x:
144 raise test_support.TestFailed("Unexpected exception %s" % x)
145 finally:
146 s.close()
149 def testNonBlockingHandshake(self):
150 s = socket.socket(socket.AF_INET)
151 s.connect(("svn.python.org", 443))
152 s.setblocking(False)
153 s = ssl.wrap_socket(s,
154 cert_reqs=ssl.CERT_NONE,
155 do_handshake_on_connect=False)
156 count = 0
157 while True:
158 try:
159 count += 1
160 s.do_handshake()
161 break
162 except ssl.SSLError, err:
163 if err.args[0] == ssl.SSL_ERROR_WANT_READ:
164 select.select([s], [], [])
165 elif err.args[0] == ssl.SSL_ERROR_WANT_WRITE:
166 select.select([], [s], [])
167 else:
168 raise
169 s.close()
170 if test_support.verbose:
171 sys.stdout.write("\nNeeded %d calls to do_handshake() to establish session.\n" % count)
173 def testFetchServerCert(self):
175 pem = ssl.get_server_certificate(("svn.python.org", 443))
176 if not pem:
177 raise test_support.TestFailed("No server certificate on svn.python.org:443!")
179 try:
180 pem = ssl.get_server_certificate(("svn.python.org", 443), ca_certs=CERTFILE)
181 except ssl.SSLError:
182 #should fail
183 pass
184 else:
185 raise test_support.TestFailed("Got server certificate %s for svn.python.org!" % pem)
187 pem = ssl.get_server_certificate(("svn.python.org", 443), ca_certs=SVN_PYTHON_ORG_ROOT_CERT)
188 if not pem:
189 raise test_support.TestFailed("No server certificate on svn.python.org:443!")
190 if test_support.verbose:
191 sys.stdout.write("\nVerified certificate for svn.python.org:443 is\n%s\n" % pem)
194 try:
195 import threading
196 except ImportError:
197 _have_threads = False
198 else:
200 _have_threads = True
202 class ThreadedEchoServer(threading.Thread):
204 class ConnectionHandler(threading.Thread):
206 """A mildly complicated class, because we want it to work both
207 with and without the SSL wrapper around the socket connection, so
208 that we can test the STARTTLS functionality."""
210 def __init__(self, server, connsock):
211 self.server = server
212 self.running = False
213 self.sock = connsock
214 self.sock.setblocking(1)
215 self.sslconn = None
216 threading.Thread.__init__(self)
217 self.daemon = True
219 def show_conn_details(self):
220 if self.server.certreqs == ssl.CERT_REQUIRED:
221 cert = self.sslconn.getpeercert()
222 if test_support.verbose and self.server.chatty:
223 sys.stdout.write(" client cert is " + pprint.pformat(cert) + "\n")
224 cert_binary = self.sslconn.getpeercert(True)
225 if test_support.verbose and self.server.chatty:
226 sys.stdout.write(" cert binary is " + str(len(cert_binary)) + " bytes\n")
227 cipher = self.sslconn.cipher()
228 if test_support.verbose and self.server.chatty:
229 sys.stdout.write(" server: connection cipher is now " + str(cipher) + "\n")
231 def wrap_conn (self):
232 try:
233 self.sslconn = ssl.wrap_socket(self.sock, server_side=True,
234 certfile=self.server.certificate,
235 ssl_version=self.server.protocol,
236 ca_certs=self.server.cacerts,
237 cert_reqs=self.server.certreqs)
238 except:
239 if self.server.chatty:
240 handle_error("\n server: bad connection attempt from " +
241 str(self.sock.getpeername()) + ":\n")
242 self.close()
243 if not self.server.expect_bad_connects:
244 # here, we want to stop the server, because this shouldn't
245 # happen in the context of our test case
246 self.running = False
247 # normally, we'd just stop here, but for the test
248 # harness, we want to stop the server
249 self.server.stop()
250 return False
252 else:
253 return True
255 def read(self):
256 if self.sslconn:
257 return self.sslconn.read()
258 else:
259 return self.sock.recv(1024)
261 def write(self, bytes):
262 if self.sslconn:
263 return self.sslconn.write(bytes)
264 else:
265 return self.sock.send(bytes)
267 def close(self):
268 if self.sslconn:
269 self.sslconn.close()
270 else:
271 self.sock._sock.close()
273 def run (self):
274 self.running = True
275 if not self.server.starttls_server:
276 if isinstance(self.sock, ssl.SSLSocket):
277 self.sslconn = self.sock
278 elif not self.wrap_conn():
279 return
280 self.show_conn_details()
281 while self.running:
282 try:
283 msg = self.read()
284 if not msg:
285 # eof, so quit this handler
286 self.running = False
287 self.close()
288 elif msg.strip() == 'over':
289 if test_support.verbose and self.server.connectionchatty:
290 sys.stdout.write(" server: client closed connection\n")
291 self.close()
292 return
293 elif self.server.starttls_server and msg.strip() == 'STARTTLS':
294 if test_support.verbose and self.server.connectionchatty:
295 sys.stdout.write(" server: read STARTTLS from client, sending OK...\n")
296 self.write("OK\n")
297 if not self.wrap_conn():
298 return
299 elif self.server.starttls_server and self.sslconn and msg.strip() == 'ENDTLS':
300 if test_support.verbose and self.server.connectionchatty:
301 sys.stdout.write(" server: read ENDTLS from client, sending OK...\n")
302 self.write("OK\n")
303 self.sslconn.unwrap()
304 self.sslconn = None
305 if test_support.verbose and self.server.connectionchatty:
306 sys.stdout.write(" server: connection is now unencrypted...\n")
307 else:
308 if (test_support.verbose and
309 self.server.connectionchatty):
310 ctype = (self.sslconn and "encrypted") or "unencrypted"
311 sys.stdout.write(" server: read %s (%s), sending back %s (%s)...\n"
312 % (repr(msg), ctype, repr(msg.lower()), ctype))
313 self.write(msg.lower())
314 except ssl.SSLError:
315 if self.server.chatty:
316 handle_error("Test server failure:\n")
317 self.close()
318 self.running = False
319 # normally, we'd just stop here, but for the test
320 # harness, we want to stop the server
321 self.server.stop()
322 except:
323 handle_error('')
325 def __init__(self, certificate, ssl_version=None,
326 certreqs=None, cacerts=None, expect_bad_connects=False,
327 chatty=True, connectionchatty=False, starttls_server=False,
328 wrap_accepting_socket=False):
330 if ssl_version is None:
331 ssl_version = ssl.PROTOCOL_TLSv1
332 if certreqs is None:
333 certreqs = ssl.CERT_NONE
334 self.certificate = certificate
335 self.protocol = ssl_version
336 self.certreqs = certreqs
337 self.cacerts = cacerts
338 self.expect_bad_connects = expect_bad_connects
339 self.chatty = chatty
340 self.connectionchatty = connectionchatty
341 self.starttls_server = starttls_server
342 self.sock = socket.socket()
343 self.flag = None
344 if wrap_accepting_socket:
345 self.sock = ssl.wrap_socket(self.sock, server_side=True,
346 certfile=self.certificate,
347 cert_reqs = self.certreqs,
348 ca_certs = self.cacerts,
349 ssl_version = self.protocol)
350 if test_support.verbose and self.chatty:
351 sys.stdout.write(' server: wrapped server socket as %s\n' % str(self.sock))
352 self.port = test_support.bind_port(self.sock)
353 self.active = False
354 threading.Thread.__init__(self)
355 self.daemon = True
357 def start (self, flag=None):
358 self.flag = flag
359 threading.Thread.start(self)
361 def run (self):
362 self.sock.settimeout(0.5)
363 self.sock.listen(5)
364 self.active = True
365 if self.flag:
366 # signal an event
367 self.flag.set()
368 while self.active:
369 try:
370 newconn, connaddr = self.sock.accept()
371 if test_support.verbose and self.chatty:
372 sys.stdout.write(' server: new connection from '
373 + str(connaddr) + '\n')
374 handler = self.ConnectionHandler(self, newconn)
375 handler.start()
376 except socket.timeout:
377 pass
378 except KeyboardInterrupt:
379 self.stop()
380 except:
381 if self.chatty:
382 handle_error("Test server failure:\n")
383 self.sock.close()
385 def stop (self):
386 self.active = False
388 class AsyncoreEchoServer(threading.Thread):
390 class EchoServer (asyncore.dispatcher):
392 class ConnectionHandler (asyncore.dispatcher_with_send):
394 def __init__(self, conn, certfile):
395 asyncore.dispatcher_with_send.__init__(self, conn)
396 self.socket = ssl.wrap_socket(conn, server_side=True,
397 certfile=certfile,
398 do_handshake_on_connect=True)
400 def readable(self):
401 if isinstance(self.socket, ssl.SSLSocket):
402 while self.socket.pending() > 0:
403 self.handle_read_event()
404 return True
406 def handle_read(self):
407 data = self.recv(1024)
408 self.send(data.lower())
410 def handle_close(self):
411 self.close()
412 if test_support.verbose:
413 sys.stdout.write(" server: closed connection %s\n" % self.socket)
415 def handle_error(self):
416 raise
418 def __init__(self, certfile):
419 self.certfile = certfile
420 asyncore.dispatcher.__init__(self)
421 self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
422 self.port = test_support.bind_port(self.socket)
423 self.listen(5)
425 def handle_accept(self):
426 sock_obj, addr = self.accept()
427 if test_support.verbose:
428 sys.stdout.write(" server: new connection from %s:%s\n" %addr)
429 self.ConnectionHandler(sock_obj, self.certfile)
431 def handle_error(self):
432 raise
434 def __init__(self, certfile):
435 self.flag = None
436 self.active = False
437 self.server = self.EchoServer(certfile)
438 self.port = self.server.port
439 threading.Thread.__init__(self)
440 self.daemon = True
442 def __str__(self):
443 return "<%s %s>" % (self.__class__.__name__, self.server)
445 def start (self, flag=None):
446 self.flag = flag
447 threading.Thread.start(self)
449 def run (self):
450 self.active = True
451 if self.flag:
452 self.flag.set()
453 while self.active:
454 try:
455 asyncore.loop(1)
456 except:
457 pass
459 def stop (self):
460 self.active = False
461 self.server.close()
463 class SocketServerHTTPSServer(threading.Thread):
465 class HTTPSServer(HTTPServer):
467 def __init__(self, server_address, RequestHandlerClass, certfile):
469 HTTPServer.__init__(self, server_address, RequestHandlerClass)
470 # we assume the certfile contains both private key and certificate
471 self.certfile = certfile
472 self.active = False
473 self.active_lock = threading.Lock()
474 self.allow_reuse_address = True
476 def __str__(self):
477 return ('<%s %s:%s>' %
478 (self.__class__.__name__,
479 self.server_name,
480 self.server_port))
482 def get_request (self):
483 # override this to wrap socket with SSL
484 sock, addr = self.socket.accept()
485 sslconn = ssl.wrap_socket(sock, server_side=True,
486 certfile=self.certfile)
487 return sslconn, addr
489 # The methods overridden below this are mainly so that we
490 # can run it in a thread and be able to stop it from another
491 # You probably wouldn't need them in other uses.
493 def server_activate(self):
494 # We want to run this in a thread for testing purposes,
495 # so we override this to set timeout, so that we get
496 # a chance to stop the server
497 self.socket.settimeout(0.5)
498 HTTPServer.server_activate(self)
500 def serve_forever(self):
501 # We want this to run in a thread, so we use a slightly
502 # modified version of "forever".
503 self.active = True
504 while 1:
505 try:
506 # We need to lock while handling the request.
507 # Another thread can close the socket after self.active
508 # has been checked and before the request is handled.
509 # This causes an exception when using the closed socket.
510 with self.active_lock:
511 if not self.active:
512 break
513 self.handle_request()
514 except socket.timeout:
515 pass
516 except KeyboardInterrupt:
517 self.server_close()
518 return
519 except:
520 sys.stdout.write(''.join(traceback.format_exception(*sys.exc_info())))
521 break
522 time.sleep(0.1)
524 def server_close(self):
525 # Again, we want this to run in a thread, so we need to override
526 # close to clear the "active" flag, so that serve_forever() will
527 # terminate.
528 with self.active_lock:
529 HTTPServer.server_close(self)
530 self.active = False
532 class RootedHTTPRequestHandler(SimpleHTTPRequestHandler):
534 # need to override translate_path to get a known root,
535 # instead of using os.curdir, since the test could be
536 # run from anywhere
538 server_version = "TestHTTPS/1.0"
540 root = None
542 def translate_path(self, path):
543 """Translate a /-separated PATH to the local filename syntax.
545 Components that mean special things to the local file system
546 (e.g. drive or directory names) are ignored. (XXX They should
547 probably be diagnosed.)
550 # abandon query parameters
551 path = urlparse.urlparse(path)[2]
552 path = os.path.normpath(urllib.unquote(path))
553 words = path.split('/')
554 words = filter(None, words)
555 path = self.root
556 for word in words:
557 drive, word = os.path.splitdrive(word)
558 head, word = os.path.split(word)
559 if word in self.root: continue
560 path = os.path.join(path, word)
561 return path
563 def log_message(self, format, *args):
565 # we override this to suppress logging unless "verbose"
567 if test_support.verbose:
568 sys.stdout.write(" server (%s:%d %s):\n [%s] %s\n" %
569 (self.server.server_address,
570 self.server.server_port,
571 self.request.cipher(),
572 self.log_date_time_string(),
573 format%args))
576 def __init__(self, certfile):
577 self.flag = None
578 self.active = False
579 self.RootedHTTPRequestHandler.root = os.path.split(CERTFILE)[0]
580 self.port = test_support.find_unused_port()
581 self.server = self.HTTPSServer(
582 (HOST, self.port), self.RootedHTTPRequestHandler, certfile)
583 threading.Thread.__init__(self)
584 self.daemon = True
586 def __str__(self):
587 return "<%s %s>" % (self.__class__.__name__, self.server)
589 def start (self, flag=None):
590 self.flag = flag
591 threading.Thread.start(self)
593 def run (self):
594 self.active = True
595 if self.flag:
596 self.flag.set()
597 self.server.serve_forever()
598 self.active = False
600 def stop (self):
601 self.active = False
602 self.server.server_close()
605 def badCertTest (certfile):
606 server = ThreadedEchoServer(CERTFILE,
607 certreqs=ssl.CERT_REQUIRED,
608 cacerts=CERTFILE, chatty=False)
609 flag = threading.Event()
610 server.start(flag)
611 # wait for it to start
612 flag.wait()
613 # try to connect
614 try:
615 try:
616 s = ssl.wrap_socket(socket.socket(),
617 certfile=certfile,
618 ssl_version=ssl.PROTOCOL_TLSv1)
619 s.connect((HOST, server.port))
620 except ssl.SSLError, x:
621 if test_support.verbose:
622 sys.stdout.write("\nSSLError is %s\n" % x[1])
623 except socket.error, x:
624 if test_support.verbose:
625 sys.stdout.write("\nsocket.error is %s\n" % x[1])
626 else:
627 raise test_support.TestFailed(
628 "Use of invalid cert should have failed!")
629 finally:
630 server.stop()
631 server.join()
633 def serverParamsTest (certfile, protocol, certreqs, cacertsfile,
634 client_certfile, client_protocol=None, indata="FOO\n",
635 chatty=True, connectionchatty=False,
636 wrap_accepting_socket=False):
638 server = ThreadedEchoServer(certfile,
639 certreqs=certreqs,
640 ssl_version=protocol,
641 cacerts=cacertsfile,
642 chatty=chatty,
643 connectionchatty=connectionchatty,
644 wrap_accepting_socket=wrap_accepting_socket)
645 flag = threading.Event()
646 server.start(flag)
647 # wait for it to start
648 flag.wait()
649 # try to connect
650 if client_protocol is None:
651 client_protocol = protocol
652 try:
653 try:
654 s = ssl.wrap_socket(socket.socket(),
655 certfile=client_certfile,
656 ca_certs=cacertsfile,
657 cert_reqs=certreqs,
658 ssl_version=client_protocol)
659 s.connect((HOST, server.port))
660 except ssl.SSLError, x:
661 raise test_support.TestFailed("Unexpected SSL error: " + str(x))
662 except Exception, x:
663 raise test_support.TestFailed("Unexpected exception: " + str(x))
664 else:
665 for arg in [indata, bytearray(indata), memoryview(indata)]:
666 if connectionchatty:
667 if test_support.verbose:
668 sys.stdout.write(
669 " client: sending %s...\n" % (repr(arg)))
670 s.write(arg)
671 outdata = s.read()
672 if connectionchatty:
673 if test_support.verbose:
674 sys.stdout.write(" client: read %s\n" % repr(outdata))
675 if outdata != indata.lower():
676 raise test_support.TestFailed(
677 "bad data <<%s>> (%d) received; expected <<%s>> (%d)\n"
678 % (outdata[:min(len(outdata),20)], len(outdata),
679 indata[:min(len(indata),20)].lower(), len(indata)))
680 s.write("over\n")
681 if connectionchatty:
682 if test_support.verbose:
683 sys.stdout.write(" client: closing connection.\n")
684 s.close()
685 finally:
686 server.stop()
687 server.join()
689 def tryProtocolCombo (server_protocol,
690 client_protocol,
691 expectedToWork,
692 certsreqs=None):
694 if certsreqs is None:
695 certsreqs = ssl.CERT_NONE
697 if certsreqs == ssl.CERT_NONE:
698 certtype = "CERT_NONE"
699 elif certsreqs == ssl.CERT_OPTIONAL:
700 certtype = "CERT_OPTIONAL"
701 elif certsreqs == ssl.CERT_REQUIRED:
702 certtype = "CERT_REQUIRED"
703 if test_support.verbose:
704 formatstr = (expectedToWork and " %s->%s %s\n") or " {%s->%s} %s\n"
705 sys.stdout.write(formatstr %
706 (ssl.get_protocol_name(client_protocol),
707 ssl.get_protocol_name(server_protocol),
708 certtype))
709 try:
710 serverParamsTest(CERTFILE, server_protocol, certsreqs,
711 CERTFILE, CERTFILE, client_protocol, chatty=False)
712 except test_support.TestFailed:
713 if expectedToWork:
714 raise
715 else:
716 if not expectedToWork:
717 raise test_support.TestFailed(
718 "Client protocol %s succeeded with server protocol %s!"
719 % (ssl.get_protocol_name(client_protocol),
720 ssl.get_protocol_name(server_protocol)))
723 class ThreadedTests(unittest.TestCase):
725 def testRudeShutdown(self):
727 listener_ready = threading.Event()
728 listener_gone = threading.Event()
729 port = test_support.find_unused_port()
731 # `listener` runs in a thread. It opens a socket listening on
732 # PORT, and sits in an accept() until the main thread connects.
733 # Then it rudely closes the socket, and sets Event `listener_gone`
734 # to let the main thread know the socket is gone.
735 def listener():
736 s = socket.socket()
737 s.bind((HOST, port))
738 s.listen(5)
739 listener_ready.set()
740 s.accept()
741 s = None # reclaim the socket object, which also closes it
742 listener_gone.set()
744 def connector():
745 listener_ready.wait()
746 s = socket.socket()
747 s.connect((HOST, port))
748 listener_gone.wait()
749 try:
750 ssl_sock = ssl.wrap_socket(s)
751 except IOError:
752 pass
753 else:
754 raise test_support.TestFailed(
755 'connecting to closed SSL socket should have failed')
757 t = threading.Thread(target=listener)
758 t.start()
759 connector()
760 t.join()
762 def testEcho (self):
764 if test_support.verbose:
765 sys.stdout.write("\n")
766 serverParamsTest(CERTFILE, ssl.PROTOCOL_TLSv1, ssl.CERT_NONE,
767 CERTFILE, CERTFILE, ssl.PROTOCOL_TLSv1,
768 chatty=True, connectionchatty=True)
770 def testReadCert(self):
772 if test_support.verbose:
773 sys.stdout.write("\n")
774 s2 = socket.socket()
775 server = ThreadedEchoServer(CERTFILE,
776 certreqs=ssl.CERT_NONE,
777 ssl_version=ssl.PROTOCOL_SSLv23,
778 cacerts=CERTFILE,
779 chatty=False)
780 flag = threading.Event()
781 server.start(flag)
782 # wait for it to start
783 flag.wait()
784 # try to connect
785 try:
786 try:
787 s = ssl.wrap_socket(socket.socket(),
788 certfile=CERTFILE,
789 ca_certs=CERTFILE,
790 cert_reqs=ssl.CERT_REQUIRED,
791 ssl_version=ssl.PROTOCOL_SSLv23)
792 s.connect((HOST, server.port))
793 except ssl.SSLError, x:
794 raise test_support.TestFailed(
795 "Unexpected SSL error: " + str(x))
796 except Exception, x:
797 raise test_support.TestFailed(
798 "Unexpected exception: " + str(x))
799 else:
800 if not s:
801 raise test_support.TestFailed(
802 "Can't SSL-handshake with test server")
803 cert = s.getpeercert()
804 if not cert:
805 raise test_support.TestFailed(
806 "Can't get peer certificate.")
807 cipher = s.cipher()
808 if test_support.verbose:
809 sys.stdout.write(pprint.pformat(cert) + '\n')
810 sys.stdout.write("Connection cipher is " + str(cipher) + '.\n')
811 if not cert.has_key('subject'):
812 raise test_support.TestFailed(
813 "No subject field in certificate: %s." %
814 pprint.pformat(cert))
815 if ((('organizationName', 'Python Software Foundation'),)
816 not in cert['subject']):
817 raise test_support.TestFailed(
818 "Missing or invalid 'organizationName' field in certificate subject; "
819 "should be 'Python Software Foundation'.")
820 s.close()
821 finally:
822 server.stop()
823 server.join()
825 def testNULLcert(self):
826 badCertTest(os.path.join(os.path.dirname(__file__) or os.curdir,
827 "nullcert.pem"))
828 def testMalformedCert(self):
829 badCertTest(os.path.join(os.path.dirname(__file__) or os.curdir,
830 "badcert.pem"))
831 def testWrongCert(self):
832 badCertTest(os.path.join(os.path.dirname(__file__) or os.curdir,
833 "wrongcert.pem"))
834 def testMalformedKey(self):
835 badCertTest(os.path.join(os.path.dirname(__file__) or os.curdir,
836 "badkey.pem"))
838 def testProtocolSSL2(self):
839 if test_support.verbose:
840 sys.stdout.write("\n")
841 tryProtocolCombo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True)
842 tryProtocolCombo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True, ssl.CERT_OPTIONAL)
843 tryProtocolCombo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True, ssl.CERT_REQUIRED)
844 tryProtocolCombo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv23, True)
845 tryProtocolCombo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv3, False)
846 tryProtocolCombo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLSv1, False)
848 def testProtocolSSL23(self):
849 if test_support.verbose:
850 sys.stdout.write("\n")
851 try:
852 tryProtocolCombo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv2, True)
853 except test_support.TestFailed, x:
854 # this fails on some older versions of OpenSSL (0.9.7l, for instance)
855 if test_support.verbose:
856 sys.stdout.write(
857 " SSL2 client to SSL23 server test unexpectedly failed:\n %s\n"
858 % str(x))
859 tryProtocolCombo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv3, True)
860 tryProtocolCombo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv23, True)
861 tryProtocolCombo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1, True)
863 tryProtocolCombo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv3, True, ssl.CERT_OPTIONAL)
864 tryProtocolCombo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv23, True, ssl.CERT_OPTIONAL)
865 tryProtocolCombo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1, True, ssl.CERT_OPTIONAL)
867 tryProtocolCombo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv3, True, ssl.CERT_REQUIRED)
868 tryProtocolCombo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv23, True, ssl.CERT_REQUIRED)
869 tryProtocolCombo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1, True, ssl.CERT_REQUIRED)
871 def testProtocolSSL3(self):
872 if test_support.verbose:
873 sys.stdout.write("\n")
874 tryProtocolCombo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, True)
875 tryProtocolCombo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, True, ssl.CERT_OPTIONAL)
876 tryProtocolCombo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, True, ssl.CERT_REQUIRED)
877 tryProtocolCombo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv2, False)
878 tryProtocolCombo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv23, False)
879 tryProtocolCombo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_TLSv1, False)
881 def testProtocolTLS1(self):
882 if test_support.verbose:
883 sys.stdout.write("\n")
884 tryProtocolCombo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, True)
885 tryProtocolCombo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, True, ssl.CERT_OPTIONAL)
886 tryProtocolCombo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, True, ssl.CERT_REQUIRED)
887 tryProtocolCombo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv2, False)
888 tryProtocolCombo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv3, False)
889 tryProtocolCombo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv23, False)
891 def testSTARTTLS (self):
893 msgs = ("msg 1", "MSG 2", "STARTTLS", "MSG 3", "msg 4", "ENDTLS", "msg 5", "msg 6")
895 server = ThreadedEchoServer(CERTFILE,
896 ssl_version=ssl.PROTOCOL_TLSv1,
897 starttls_server=True,
898 chatty=True,
899 connectionchatty=True)
900 flag = threading.Event()
901 server.start(flag)
902 # wait for it to start
903 flag.wait()
904 # try to connect
905 wrapped = False
906 try:
907 try:
908 s = socket.socket()
909 s.setblocking(1)
910 s.connect((HOST, server.port))
911 except Exception, x:
912 raise test_support.TestFailed("Unexpected exception: " + str(x))
913 else:
914 if test_support.verbose:
915 sys.stdout.write("\n")
916 for indata in msgs:
917 if test_support.verbose:
918 sys.stdout.write(
919 " client: sending %s...\n" % repr(indata))
920 if wrapped:
921 conn.write(indata)
922 outdata = conn.read()
923 else:
924 s.send(indata)
925 outdata = s.recv(1024)
926 if (indata == "STARTTLS" and
927 outdata.strip().lower().startswith("ok")):
928 if test_support.verbose:
929 sys.stdout.write(
930 " client: read %s from server, starting TLS...\n"
931 % repr(outdata))
932 conn = ssl.wrap_socket(s, ssl_version=ssl.PROTOCOL_TLSv1)
933 wrapped = True
934 elif (indata == "ENDTLS" and
935 outdata.strip().lower().startswith("ok")):
936 if test_support.verbose:
937 sys.stdout.write(
938 " client: read %s from server, ending TLS...\n"
939 % repr(outdata))
940 s = conn.unwrap()
941 wrapped = False
942 else:
943 if test_support.verbose:
944 sys.stdout.write(
945 " client: read %s from server\n" % repr(outdata))
946 if test_support.verbose:
947 sys.stdout.write(" client: closing connection.\n")
948 if wrapped:
949 conn.write("over\n")
950 else:
951 s.send("over\n")
952 s.close()
953 finally:
954 server.stop()
955 server.join()
957 def testSocketServer(self):
959 server = SocketServerHTTPSServer(CERTFILE)
960 flag = threading.Event()
961 server.start(flag)
962 # wait for it to start
963 flag.wait()
964 # try to connect
965 try:
966 if test_support.verbose:
967 sys.stdout.write('\n')
968 d1 = open(CERTFILE, 'rb').read()
969 d2 = ''
970 # now fetch the same data from the HTTPS server
971 url = 'https://127.0.0.1:%d/%s' % (
972 server.port, os.path.split(CERTFILE)[1])
973 f = urllib.urlopen(url)
974 dlen = f.info().getheader("content-length")
975 if dlen and (int(dlen) > 0):
976 d2 = f.read(int(dlen))
977 if test_support.verbose:
978 sys.stdout.write(
979 " client: read %d bytes from remote server '%s'\n"
980 % (len(d2), server))
981 f.close()
982 except:
983 msg = ''.join(traceback.format_exception(*sys.exc_info()))
984 if test_support.verbose:
985 sys.stdout.write('\n' + msg)
986 raise test_support.TestFailed(msg)
987 else:
988 if not (d1 == d2):
989 raise test_support.TestFailed(
990 "Couldn't fetch data from HTTPS server")
991 finally:
992 server.stop()
993 server.join()
995 def testWrappedAccept (self):
997 if test_support.verbose:
998 sys.stdout.write("\n")
999 serverParamsTest(CERTFILE, ssl.PROTOCOL_SSLv23, ssl.CERT_REQUIRED,
1000 CERTFILE, CERTFILE, ssl.PROTOCOL_SSLv23,
1001 chatty=True, connectionchatty=True,
1002 wrap_accepting_socket=True)
1005 def testAsyncoreServer (self):
1007 indata = "TEST MESSAGE of mixed case\n"
1009 if test_support.verbose:
1010 sys.stdout.write("\n")
1011 server = AsyncoreEchoServer(CERTFILE)
1012 flag = threading.Event()
1013 server.start(flag)
1014 # wait for it to start
1015 flag.wait()
1016 # try to connect
1017 try:
1018 try:
1019 s = ssl.wrap_socket(socket.socket())
1020 s.connect(('127.0.0.1', server.port))
1021 except ssl.SSLError, x:
1022 raise test_support.TestFailed("Unexpected SSL error: " + str(x))
1023 except Exception, x:
1024 raise test_support.TestFailed("Unexpected exception: " + str(x))
1025 else:
1026 if test_support.verbose:
1027 sys.stdout.write(
1028 " client: sending %s...\n" % (repr(indata)))
1029 s.write(indata)
1030 outdata = s.read()
1031 if test_support.verbose:
1032 sys.stdout.write(" client: read %s\n" % repr(outdata))
1033 if outdata != indata.lower():
1034 raise test_support.TestFailed(
1035 "bad data <<%s>> (%d) received; expected <<%s>> (%d)\n"
1036 % (outdata[:min(len(outdata),20)], len(outdata),
1037 indata[:min(len(indata),20)].lower(), len(indata)))
1038 s.write("over\n")
1039 if test_support.verbose:
1040 sys.stdout.write(" client: closing connection.\n")
1041 s.close()
1042 finally:
1043 server.stop()
1044 # wait for server thread to end
1045 server.join()
1048 def testAllRecvAndSendMethods(self):
1050 if test_support.verbose:
1051 sys.stdout.write("\n")
1053 server = ThreadedEchoServer(CERTFILE,
1054 certreqs=ssl.CERT_NONE,
1055 ssl_version=ssl.PROTOCOL_TLSv1,
1056 cacerts=CERTFILE,
1057 chatty=True,
1058 connectionchatty=False)
1059 flag = threading.Event()
1060 server.start(flag)
1061 # wait for it to start
1062 flag.wait()
1063 # try to connect
1064 try:
1065 s = ssl.wrap_socket(socket.socket(),
1066 server_side=False,
1067 certfile=CERTFILE,
1068 ca_certs=CERTFILE,
1069 cert_reqs=ssl.CERT_NONE,
1070 ssl_version=ssl.PROTOCOL_TLSv1)
1071 s.connect((HOST, server.port))
1072 except ssl.SSLError as x:
1073 raise support.TestFailed("Unexpected SSL error: " + str(x))
1074 except Exception as x:
1075 raise support.TestFailed("Unexpected exception: " + str(x))
1076 else:
1077 # helper methods for standardising recv* method signatures
1078 def _recv_into():
1079 b = bytearray("\0"*100)
1080 count = s.recv_into(b)
1081 return b[:count]
1083 def _recvfrom_into():
1084 b = bytearray("\0"*100)
1085 count, addr = s.recvfrom_into(b)
1086 return b[:count]
1088 # (name, method, whether to expect success, *args)
1089 send_methods = [
1090 ('send', s.send, True, []),
1091 ('sendto', s.sendto, False, ["some.address"]),
1092 ('sendall', s.sendall, True, []),
1094 recv_methods = [
1095 ('recv', s.recv, True, []),
1096 ('recvfrom', s.recvfrom, False, ["some.address"]),
1097 ('recv_into', _recv_into, True, []),
1098 ('recvfrom_into', _recvfrom_into, False, []),
1100 data_prefix = u"PREFIX_"
1102 for meth_name, send_meth, expect_success, args in send_methods:
1103 indata = data_prefix + meth_name
1104 try:
1105 send_meth(indata.encode('ASCII', 'strict'), *args)
1106 outdata = s.read()
1107 outdata = outdata.decode('ASCII', 'strict')
1108 if outdata != indata.lower():
1109 raise support.TestFailed(
1110 "While sending with <<%s>> bad data "
1111 "<<%r>> (%d) received; "
1112 "expected <<%r>> (%d)\n" % (
1113 meth_name, outdata[:20], len(outdata),
1114 indata[:20], len(indata)
1117 except ValueError as e:
1118 if expect_success:
1119 raise support.TestFailed(
1120 "Failed to send with method <<%s>>; "
1121 "expected to succeed.\n" % (meth_name,)
1123 if not str(e).startswith(meth_name):
1124 raise support.TestFailed(
1125 "Method <<%s>> failed with unexpected "
1126 "exception message: %s\n" % (
1127 meth_name, e
1131 for meth_name, recv_meth, expect_success, args in recv_methods:
1132 indata = data_prefix + meth_name
1133 try:
1134 s.send(indata.encode('ASCII', 'strict'))
1135 outdata = recv_meth(*args)
1136 outdata = outdata.decode('ASCII', 'strict')
1137 if outdata != indata.lower():
1138 raise support.TestFailed(
1139 "While receiving with <<%s>> bad data "
1140 "<<%r>> (%d) received; "
1141 "expected <<%r>> (%d)\n" % (
1142 meth_name, outdata[:20], len(outdata),
1143 indata[:20], len(indata)
1146 except ValueError as e:
1147 if expect_success:
1148 raise support.TestFailed(
1149 "Failed to receive with method <<%s>>; "
1150 "expected to succeed.\n" % (meth_name,)
1152 if not str(e).startswith(meth_name):
1153 raise support.TestFailed(
1154 "Method <<%s>> failed with unexpected "
1155 "exception message: %s\n" % (
1156 meth_name, e
1159 # consume data
1160 s.read()
1162 s.write("over\n".encode("ASCII", "strict"))
1163 s.close()
1164 finally:
1165 server.stop()
1166 server.join()
1169 def test_main(verbose=False):
1170 if skip_expected:
1171 raise unittest.SkipTest("No SSL support")
1173 global CERTFILE, SVN_PYTHON_ORG_ROOT_CERT
1174 CERTFILE = os.path.join(os.path.dirname(__file__) or os.curdir,
1175 "keycert.pem")
1176 SVN_PYTHON_ORG_ROOT_CERT = os.path.join(
1177 os.path.dirname(__file__) or os.curdir,
1178 "https_svn_python_org_root.pem")
1180 if (not os.path.exists(CERTFILE) or
1181 not os.path.exists(SVN_PYTHON_ORG_ROOT_CERT)):
1182 raise test_support.TestFailed("Can't read certificate files!")
1184 TESTPORT = test_support.find_unused_port()
1185 if not TESTPORT:
1186 raise test_support.TestFailed("Can't find open port to test servers on!")
1188 tests = [BasicTests]
1190 if test_support.is_resource_enabled('network'):
1191 tests.append(NetworkedTests)
1193 if _have_threads:
1194 thread_info = test_support.threading_setup()
1195 if thread_info and test_support.is_resource_enabled('network'):
1196 tests.append(ThreadedTests)
1198 test_support.run_unittest(*tests)
1200 if _have_threads:
1201 test_support.threading_cleanup(*thread_info)
1203 if __name__ == "__main__":
1204 test_main()