2 from datetime
import datetime
, timedelta
4 # Make a call to strptime before starting threads to
5 # prevent thread safety issues.
6 datetime
.strptime('1970-01-01 12:00:00', "%Y-%m-%d %H:%M:%S")
10 from pyasn1
.codec
.der
import decoder
, encoder
11 from pyasn1
.type.univ
import Any
, ObjectIdentifier
, OctetString
12 from pyasn1
.type.char
import BMPString
, IA5String
, UTF8String
13 from pyasn1
.type.useful
import GeneralizedTime
14 from pyasn1_modules
.rfc2459
import (Certificate
, DirectoryString
,
15 SubjectAltName
, GeneralNames
,
17 from pyasn1_modules
.rfc2459
import id_ce_subjectAltName
as SUBJECT_ALT_NAME
18 from pyasn1_modules
.rfc2459
import id_at_commonName
as COMMON_NAME
20 XMPP_ADDR
= ObjectIdentifier('1.3.6.1.5.5.7.8.5')
21 SRV_NAME
= ObjectIdentifier('1.3.6.1.5.5.7.8.7')
28 log
= logging
.getLogger(__name__
)
31 class CertificateError(Exception):
36 encoding
= 'utf-16-be' if isinstance(data
, BMPString
) else 'utf-8'
37 return bytes(data
).decode(encoding
)
40 def extract_names(raw_cert
):
41 results
= {'CN': set(),
47 cert
= decoder
.decode(raw_cert
, asn1Spec
=Certificate())[0]
48 tbs
= cert
.getComponentByName('tbsCertificate')
49 subject
= tbs
.getComponentByName('subject')
50 extensions
= tbs
.getComponentByName('extensions') or []
52 # Extract the CommonName(s) from the cert.
56 oid
= name
.getComponentByName('type')
57 value
= name
.getComponentByName('value')
59 if oid
!= COMMON_NAME
:
62 value
= decoder
.decode(value
, asn1Spec
=DirectoryString())[0]
63 value
= decode_str(value
.getComponent())
64 results
['CN'].add(value
)
66 # Extract the Subject Alternate Names (DNS, SRV, URI, XMPPAddr)
67 for extension
in extensions
:
68 oid
= extension
.getComponentByName('extnID')
69 if oid
!= SUBJECT_ALT_NAME
:
72 value
= decoder
.decode(extension
.getComponentByName('extnValue'),
73 asn1Spec
=OctetString())[0]
74 sa_names
= decoder
.decode(value
, asn1Spec
=SubjectAltName())[0]
76 name_type
= name
.getName()
77 if name_type
== 'dNSName':
78 results
['DNS'].add(decode_str(name
.getComponent()))
79 if name_type
== 'uniformResourceIdentifier':
80 value
= decode_str(name
.getComponent())
81 if value
.startswith('xmpp:'):
82 results
['URI'].add(value
[5:])
83 elif name_type
== 'otherName':
84 name
= name
.getComponent()
86 oid
= name
.getComponentByName('type-id')
87 value
= name
.getComponentByName('value')
90 value
= decoder
.decode(value
, asn1Spec
=UTF8String())[0]
91 results
['XMPPAddr'].add(decode_str(value
))
93 value
= decoder
.decode(value
, asn1Spec
=IA5String())[0]
94 results
['SRV'].add(decode_str(value
))
99 def extract_dates(raw_cert
):
101 log
.warning("Could not find pyasn1 and pyasn1_modules. " + \
102 "SSL certificate expiration COULD NOT BE VERIFIED.")
105 cert
= decoder
.decode(raw_cert
, asn1Spec
=Certificate())[0]
106 tbs
= cert
.getComponentByName('tbsCertificate')
107 validity
= tbs
.getComponentByName('validity')
109 not_before
= validity
.getComponentByName('notBefore')
110 not_before
= str(not_before
.getComponent())
112 not_after
= validity
.getComponentByName('notAfter')
113 not_after
= str(not_after
.getComponent())
115 if isinstance(not_before
, GeneralizedTime
):
116 not_before
= datetime
.strptime(not_before
, '%Y%m%d%H%M%SZ')
118 not_before
= datetime
.strptime(not_before
, '%y%m%d%H%M%SZ')
120 if isinstance(not_after
, GeneralizedTime
):
121 not_after
= datetime
.strptime(not_after
, '%Y%m%d%H%M%SZ')
123 not_after
= datetime
.strptime(not_after
, '%y%m%d%H%M%SZ')
125 return not_before
, not_after
128 def get_ttl(raw_cert
):
129 not_before
, not_after
= extract_dates(raw_cert
)
130 if not_after
is None:
132 return not_after
- datetime
.utcnow()
135 def verify(expected
, raw_cert
):
137 log
.warning("Could not find pyasn1 and pyasn1_modules. " + \
138 "SSL certificate COULD NOT BE VERIFIED.")
141 not_before
, not_after
= extract_dates(raw_cert
)
142 cert_names
= extract_names(raw_cert
)
144 now
= datetime
.utcnow()
147 raise CertificateError(
148 'Certificate has not entered its valid date range.')
151 raise CertificateError(
152 'Certificate has expired.')
155 expected_wild
= expected
[expected
.index('.'):]
157 expected_wild
= expected
158 expected_srv
= '_xmpp-client.%s' % expected
160 for name
in cert_names
['XMPPAddr']:
163 for name
in cert_names
['SRV']:
164 if name
== expected_srv
or name
== expected
:
166 for name
in cert_names
['DNS']:
169 if name
.startswith('*'):
171 name_wild
= name
[name
.index('.'):]
174 if expected_wild
== name_wild
:
176 for name
in cert_names
['URI']:
179 for name
in cert_names
['CN']:
183 raise CertificateError(
184 'Could not match certficate against hostname: %s' % expected
)