Bump to 1.3.1
[slixmpp.git] / sleekxmpp / xmlstream / cert.py
blob71146f36539ad66aaf16f5ca1d8d009ee1be2be6
1 import logging
2 from datetime import datetime, timedelta
4 # Make a call to strptime before starting threads to
5 # prevent thread safety issues.
6 datetime.strptime('1970-01-01 12:00:00', "%Y-%m-%d %H:%M:%S")
9 try:
10 from pyasn1.codec.der import decoder, encoder
11 from pyasn1.type.univ import Any, ObjectIdentifier, OctetString
12 from pyasn1.type.char import BMPString, IA5String, UTF8String
13 from pyasn1.type.useful import GeneralizedTime
14 from pyasn1_modules.rfc2459 import (Certificate, DirectoryString,
15 SubjectAltName, GeneralNames,
16 GeneralName)
17 from pyasn1_modules.rfc2459 import id_ce_subjectAltName as SUBJECT_ALT_NAME
18 from pyasn1_modules.rfc2459 import id_at_commonName as COMMON_NAME
20 XMPP_ADDR = ObjectIdentifier('1.3.6.1.5.5.7.8.5')
21 SRV_NAME = ObjectIdentifier('1.3.6.1.5.5.7.8.7')
23 HAVE_PYASN1 = True
24 except ImportError:
25 HAVE_PYASN1 = False
28 log = logging.getLogger(__name__)
31 class CertificateError(Exception):
32 pass
35 def decode_str(data):
36 encoding = 'utf-16-be' if isinstance(data, BMPString) else 'utf-8'
37 return bytes(data).decode(encoding)
40 def extract_names(raw_cert):
41 results = {'CN': set(),
42 'DNS': set(),
43 'SRV': set(),
44 'URI': set(),
45 'XMPPAddr': set()}
47 cert = decoder.decode(raw_cert, asn1Spec=Certificate())[0]
48 tbs = cert.getComponentByName('tbsCertificate')
49 subject = tbs.getComponentByName('subject')
50 extensions = tbs.getComponentByName('extensions') or []
52 # Extract the CommonName(s) from the cert.
53 for rdnss in subject:
54 for rdns in rdnss:
55 for name in rdns:
56 oid = name.getComponentByName('type')
57 value = name.getComponentByName('value')
59 if oid != COMMON_NAME:
60 continue
62 value = decoder.decode(value, asn1Spec=DirectoryString())[0]
63 value = decode_str(value.getComponent())
64 results['CN'].add(value)
66 # Extract the Subject Alternate Names (DNS, SRV, URI, XMPPAddr)
67 for extension in extensions:
68 oid = extension.getComponentByName('extnID')
69 if oid != SUBJECT_ALT_NAME:
70 continue
72 value = decoder.decode(extension.getComponentByName('extnValue'),
73 asn1Spec=OctetString())[0]
74 sa_names = decoder.decode(value, asn1Spec=SubjectAltName())[0]
75 for name in sa_names:
76 name_type = name.getName()
77 if name_type == 'dNSName':
78 results['DNS'].add(decode_str(name.getComponent()))
79 if name_type == 'uniformResourceIdentifier':
80 value = decode_str(name.getComponent())
81 if value.startswith('xmpp:'):
82 results['URI'].add(value[5:])
83 elif name_type == 'otherName':
84 name = name.getComponent()
86 oid = name.getComponentByName('type-id')
87 value = name.getComponentByName('value')
89 if oid == XMPP_ADDR:
90 value = decoder.decode(value, asn1Spec=UTF8String())[0]
91 results['XMPPAddr'].add(decode_str(value))
92 elif oid == SRV_NAME:
93 value = decoder.decode(value, asn1Spec=IA5String())[0]
94 results['SRV'].add(decode_str(value))
96 return results
99 def extract_dates(raw_cert):
100 if not HAVE_PYASN1:
101 log.warning("Could not find pyasn1 and pyasn1_modules. " + \
102 "SSL certificate expiration COULD NOT BE VERIFIED.")
103 return None, None
105 cert = decoder.decode(raw_cert, asn1Spec=Certificate())[0]
106 tbs = cert.getComponentByName('tbsCertificate')
107 validity = tbs.getComponentByName('validity')
109 not_before = validity.getComponentByName('notBefore')
110 not_before = str(not_before.getComponent())
112 not_after = validity.getComponentByName('notAfter')
113 not_after = str(not_after.getComponent())
115 if isinstance(not_before, GeneralizedTime):
116 not_before = datetime.strptime(not_before, '%Y%m%d%H%M%SZ')
117 else:
118 not_before = datetime.strptime(not_before, '%y%m%d%H%M%SZ')
120 if isinstance(not_after, GeneralizedTime):
121 not_after = datetime.strptime(not_after, '%Y%m%d%H%M%SZ')
122 else:
123 not_after = datetime.strptime(not_after, '%y%m%d%H%M%SZ')
125 return not_before, not_after
128 def get_ttl(raw_cert):
129 not_before, not_after = extract_dates(raw_cert)
130 if not_after is None:
131 return None
132 return not_after - datetime.utcnow()
135 def verify(expected, raw_cert):
136 if not HAVE_PYASN1:
137 log.warning("Could not find pyasn1 and pyasn1_modules. " + \
138 "SSL certificate COULD NOT BE VERIFIED.")
139 return
141 not_before, not_after = extract_dates(raw_cert)
142 cert_names = extract_names(raw_cert)
144 now = datetime.utcnow()
146 if not_before > now:
147 raise CertificateError(
148 'Certificate has not entered its valid date range.')
150 if not_after <= now:
151 raise CertificateError(
152 'Certificate has expired.')
154 if '.' in expected:
155 expected_wild = expected[expected.index('.'):]
156 else:
157 expected_wild = expected
158 expected_srv = '_xmpp-client.%s' % expected
160 for name in cert_names['XMPPAddr']:
161 if name == expected:
162 return True
163 for name in cert_names['SRV']:
164 if name == expected_srv or name == expected:
165 return True
166 for name in cert_names['DNS']:
167 if name == expected:
168 return True
169 if name.startswith('*'):
170 if '.' in name:
171 name_wild = name[name.index('.'):]
172 else:
173 name_wild = name
174 if expected_wild == name_wild:
175 return True
176 for name in cert_names['URI']:
177 if name == expected:
178 return True
179 for name in cert_names['CN']:
180 if name == expected:
181 return True
183 raise CertificateError(
184 'Could not match certficate against hostname: %s' % expected)