gpo: Decode base64 root cert before importing
[Samba.git] / python / samba / gp / gp_cert_auto_enroll_ext.py
blobb3ecdc5bf5e42d46cb0aae9b8653da9e8f4cdee6
1 # gp_cert_auto_enroll_ext samba group policy
2 # Copyright (C) David Mulder <dmulder@suse.com> 2021
4 # This program is free software; you can redistribute it and/or modify
5 # it under the terms of the GNU General Public License as published by
6 # the Free Software Foundation; either version 3 of the License, or
7 # (at your option) any later version.
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
14 # You should have received a copy of the GNU General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
17 import os
18 import operator
19 import requests
20 from samba.gp.gpclass import gp_pol_ext, gp_applier, GPOSTATE
21 from samba import Ldb
22 from ldb import SCOPE_SUBTREE, SCOPE_BASE
23 from samba.auth import system_session
24 from samba.gp.gpclass import get_dc_hostname
25 import base64
26 from shutil import which
27 from subprocess import Popen, PIPE
28 import re
29 import json
30 from samba.gp.util.logging import log
31 import struct
32 try:
33 from cryptography.hazmat.primitives.serialization.pkcs7 import \
34 load_der_pkcs7_certificates
35 except ModuleNotFoundError:
36 def load_der_pkcs7_certificates(x): return []
37 log.error('python cryptography missing pkcs7 support. '
38 'Certificate chain parsing will fail')
39 from cryptography.hazmat.primitives.serialization import Encoding
40 from cryptography.x509 import load_der_x509_certificate
41 from cryptography.hazmat.backends import default_backend
42 from samba.common import get_string
44 cert_wrap = b"""
45 -----BEGIN CERTIFICATE-----
47 -----END CERTIFICATE-----"""
48 endpoint_re = '(https|HTTPS)://(?P<server>[a-zA-Z0-9.-]+)/ADPolicyProvider' + \
49 '_CEP_(?P<auth>[a-zA-Z]+)/service.svc/CEP'
51 global_trust_dirs = ['/etc/pki/trust/anchors', # SUSE
52 '/etc/pki/ca-trust/source/anchors', # RHEL/Fedora
53 '/usr/local/share/ca-certificates'] # Debian/Ubuntu
55 def octet_string_to_objectGUID(data):
56 """Convert an octet string to an objectGUID."""
57 return '%s-%s-%s-%s-%s' % ('%02x' % struct.unpack('<L', data[0:4])[0],
58 '%02x' % struct.unpack('<H', data[4:6])[0],
59 '%02x' % struct.unpack('<H', data[6:8])[0],
60 '%02x' % struct.unpack('>H', data[8:10])[0],
61 '%02x%02x' % struct.unpack('>HL', data[10:]))
64 def group_and_sort_end_point_information(end_point_information):
65 """Group and Sort End Point Information.
67 [MS-CAESO] 4.4.5.3.2.3
68 In this step autoenrollment processes the end point information by grouping
69 it by CEP ID and sorting in the order with which it will use the end point
70 to access the CEP information.
71 """
72 # Create groups of the CertificateEnrollmentPolicyEndPoint instances that
73 # have the same value of the EndPoint.PolicyID datum.
74 end_point_groups = {}
75 for e in end_point_information:
76 if e['PolicyID'] not in end_point_groups.keys():
77 end_point_groups[e['PolicyID']] = []
78 end_point_groups[e['PolicyID']].append(e)
80 # Sort each group by following these rules:
81 for end_point_group in end_point_groups.values():
82 # Sort the CertificateEnrollmentPolicyEndPoint instances in ascending
83 # order based on the EndPoint.Cost value.
84 end_point_group.sort(key=lambda e: e['Cost'])
86 # For instances that have the same EndPoint.Cost:
87 cost_list = [e['Cost'] for e in end_point_group]
88 costs = set(cost_list)
89 for cost in costs:
90 i = cost_list.index(cost)
91 j = len(cost_list)-operator.indexOf(reversed(cost_list), cost)-1
92 if i == j:
93 continue
95 # Sort those that have EndPoint.Authentication equal to Kerberos
96 # first. Then sort those that have EndPoint.Authentication equal to
97 # Anonymous. The rest of the CertificateEnrollmentPolicyEndPoint
98 # instances follow in an arbitrary order.
99 def sort_auth(e):
100 # 0x2 - Kerberos
101 if e['AuthFlags'] == 0x2:
102 return 0
103 # 0x1 - Anonymous
104 elif e['AuthFlags'] == 0x1:
105 return 1
106 else:
107 return 2
108 end_point_group[i:j+1] = sorted(end_point_group[i:j+1],
109 key=sort_auth)
110 return list(end_point_groups.values())
112 def obtain_end_point_information(entries):
113 """Obtain End Point Information.
115 [MS-CAESO] 4.4.5.3.2.2
116 In this step autoenrollment initializes the
117 CertificateEnrollmentPolicyEndPoints table.
119 end_point_information = {}
120 section = 'Software\\Policies\\Microsoft\\Cryptography\\PolicyServers\\'
121 for e in entries:
122 if not e.keyname.startswith(section):
123 continue
124 name = e.keyname.replace(section, '')
125 if name not in end_point_information.keys():
126 end_point_information[name] = {}
127 end_point_information[name][e.valuename] = e.data
128 for ca in end_point_information.values():
129 m = re.match(endpoint_re, ca['URL'])
130 if m:
131 name = '%s-CA' % m.group('server').replace('.', '-')
132 ca['name'] = name
133 ca['hostname'] = m.group('server')
134 ca['auth'] = m.group('auth')
135 elif ca['URL'].lower() != 'ldap:':
136 edata = { 'endpoint': ca['URL'] }
137 log.error('Failed to parse the endpoint', edata)
138 return {}
139 end_point_information = \
140 group_and_sort_end_point_information(end_point_information.values())
141 return end_point_information
143 def fetch_certification_authorities(ldb):
144 """Initialize CAs.
146 [MS-CAESO] 4.4.5.3.1.2
148 result = []
149 basedn = ldb.get_default_basedn()
150 # Autoenrollment MUST do an LDAP search for the CA information
151 # (pKIEnrollmentService) objects under the following container:
152 dn = 'CN=Enrollment Services,CN=Public Key Services,CN=Services,CN=Configuration,%s' % basedn
153 attrs = ['cACertificate', 'cn', 'dNSHostName']
154 expr = '(objectClass=pKIEnrollmentService)'
155 res = ldb.search(dn, SCOPE_SUBTREE, expr, attrs)
156 if len(res) == 0:
157 return result
158 for es in res:
159 data = { 'name': get_string(es['cn'][0]),
160 'hostname': get_string(es['dNSHostName'][0]),
161 'cACertificate': get_string(base64.b64encode(es['cACertificate'][0]))
163 result.append(data)
164 return result
166 def fetch_template_attrs(ldb, name, attrs=None):
167 if attrs is None:
168 attrs = ['msPKI-Minimal-Key-Size']
169 basedn = ldb.get_default_basedn()
170 dn = 'CN=Certificate Templates,CN=Public Key Services,CN=Services,CN=Configuration,%s' % basedn
171 expr = '(cn=%s)' % name
172 res = ldb.search(dn, SCOPE_SUBTREE, expr, attrs)
173 if len(res) == 1 and 'msPKI-Minimal-Key-Size' in res[0]:
174 return dict(res[0])
175 else:
176 return {'msPKI-Minimal-Key-Size': ['2048']}
178 def format_root_cert(cert):
179 return cert_wrap % re.sub(b"(.{64})", b"\\1\n", cert.encode(), 0, re.DOTALL)
181 def find_cepces_submit():
182 certmonger_dirs = [os.environ.get("PATH"), '/usr/lib/certmonger',
183 '/usr/libexec/certmonger']
184 return which('cepces-submit', path=':'.join(certmonger_dirs))
186 def get_supported_templates(server):
187 cepces_submit = find_cepces_submit()
188 if not cepces_submit or not os.path.exists(cepces_submit):
189 log.error('Failed to find cepces-submit')
190 return []
192 env = os.environ
193 env['CERTMONGER_OPERATION'] = 'GET-SUPPORTED-TEMPLATES'
194 p = Popen([cepces_submit, '--server=%s' % server, '--auth=Kerberos'],
195 env=env, stdout=PIPE, stderr=PIPE)
196 out, err = p.communicate()
197 if p.returncode != 0:
198 data = {'Error': err.decode()}
199 log.error('Failed to fetch the list of supported templates.', data)
200 return out.strip().split()
203 def getca(ca, url, trust_dir):
204 """Fetch Certificate Chain from the CA."""
205 root_cert = os.path.join(trust_dir, '%s.crt' % ca['name'])
206 root_certs = []
208 try:
209 r = requests.get(url=url, params={'operation': 'GetCACert',
210 'message': 'CAIdentifier'})
211 except requests.exceptions.ConnectionError:
212 log.warn('Failed to establish a new connection')
213 r = None
214 if r is None or r.content == b'' or r.headers['Content-Type'] == 'text/html':
215 log.warn('Failed to fetch the root certificate chain.')
216 log.warn('The Network Device Enrollment Service is either not' +
217 ' installed or not configured.')
218 if 'cACertificate' in ca:
219 log.warn('Installing the server certificate only.')
220 der_certificate = base64.b64decode(ca['cACertificate'])
221 try:
222 cert = load_der_x509_certificate(der_certificate)
223 except TypeError:
224 cert = load_der_x509_certificate(der_certificate,
225 default_backend())
226 cert_data = cert.public_bytes(Encoding.PEM)
227 with open(root_cert, 'wb') as w:
228 w.write(cert_data)
229 root_certs.append(root_cert)
230 return root_certs
232 if r.headers['Content-Type'] == 'application/x-x509-ca-cert':
233 # Older versions of load_der_x509_certificate require a backend param
234 try:
235 cert = load_der_x509_certificate(r.content)
236 except TypeError:
237 cert = load_der_x509_certificate(r.content, default_backend())
238 cert_data = cert.public_bytes(Encoding.PEM)
239 with open(root_cert, 'wb') as w:
240 w.write(cert_data)
241 root_certs.append(root_cert)
242 elif r.headers['Content-Type'] == 'application/x-x509-ca-ra-cert':
243 certs = load_der_pkcs7_certificates(r.content)
244 for i in range(0, len(certs)):
245 cert = certs[i].public_bytes(Encoding.PEM)
246 filename, extension = root_cert.rsplit('.', 1)
247 dest = '%s.%d.%s' % (filename, i, extension)
248 with open(dest, 'wb') as w:
249 w.write(cert)
250 root_certs.append(dest)
251 else:
252 log.warn('getca: Wrong (or missing) MIME content type')
254 return root_certs
257 def find_global_trust_dir():
258 """Return the global trust dir using known paths from various Linux distros."""
259 for trust_dir in global_trust_dirs:
260 if os.path.isdir(trust_dir):
261 return trust_dir
262 return global_trust_dirs[0]
264 def update_ca_command():
265 """Return the command to update the CA trust store."""
266 return which('update-ca-certificates') or which('update-ca-trust')
268 def changed(new_data, old_data):
269 """Return True if any key present in both dicts has changed."""
270 return any((new_data[k] != old_data[k] if k in old_data else False) \
271 for k in new_data.keys())
273 def cert_enroll(ca, ldb, trust_dir, private_dir, auth='Kerberos'):
274 """Install the root certificate chain."""
275 data = dict({'files': [], 'templates': []}, **ca)
276 url = 'http://%s/CertSrv/mscep/mscep.dll/pkiclient.exe?' % ca['hostname']
277 root_certs = getca(ca, url, trust_dir)
278 data['files'].extend(root_certs)
279 global_trust_dir = find_global_trust_dir()
280 for src in root_certs:
281 # Symlink the certs to global trust dir
282 dst = os.path.join(global_trust_dir, os.path.basename(src))
283 try:
284 os.symlink(src, dst)
285 data['files'].append(dst)
286 except PermissionError:
287 log.warn('Failed to symlink root certificate to the'
288 ' admin trust anchors')
289 except FileNotFoundError:
290 log.warn('Failed to symlink root certificate to the'
291 ' admin trust anchors.'
292 ' The directory was not found', global_trust_dir)
293 except FileExistsError:
294 # If we're simply downloading a renewed cert, the symlink
295 # already exists. Ignore the FileExistsError. Preserve the
296 # existing symlink in the unapply data.
297 data['files'].append(dst)
298 update = update_ca_command()
299 if update is not None:
300 Popen([update]).wait()
301 # Setup Certificate Auto Enrollment
302 getcert = which('getcert')
303 cepces_submit = find_cepces_submit()
304 if getcert is not None and os.path.exists(cepces_submit):
305 p = Popen([getcert, 'add-ca', '-c', ca['name'], '-e',
306 '%s --server=%s --auth=%s' % (cepces_submit,
307 ca['hostname'], auth)],
308 stdout=PIPE, stderr=PIPE)
309 out, err = p.communicate()
310 log.debug(out.decode())
311 if p.returncode != 0:
312 data = { 'Error': err.decode(), 'CA': ca['name'] }
313 log.error('Failed to add Certificate Authority', data)
314 supported_templates = get_supported_templates(ca['hostname'])
315 for template in supported_templates:
316 attrs = fetch_template_attrs(ldb, template)
317 nickname = '%s.%s' % (ca['name'], template.decode())
318 keyfile = os.path.join(private_dir, '%s.key' % nickname)
319 certfile = os.path.join(trust_dir, '%s.crt' % nickname)
320 p = Popen([getcert, 'request', '-c', ca['name'],
321 '-T', template.decode(),
322 '-I', nickname, '-k', keyfile, '-f', certfile,
323 '-g', attrs['msPKI-Minimal-Key-Size'][0]],
324 stdout=PIPE, stderr=PIPE)
325 out, err = p.communicate()
326 log.debug(out.decode())
327 if p.returncode != 0:
328 data = { 'Error': err.decode(), 'Certificate': nickname }
329 log.error('Failed to request certificate', data)
330 data['files'].extend([keyfile, certfile])
331 data['templates'].append(nickname)
332 if update is not None:
333 Popen([update]).wait()
334 else:
335 log.warn('certmonger and cepces must be installed for ' +
336 'certificate auto enrollment to work')
337 return json.dumps(data)
339 class gp_cert_auto_enroll_ext(gp_pol_ext, gp_applier):
340 def __str__(self):
341 return r'Cryptography\AutoEnrollment'
343 def unapply(self, guid, attribute, value):
344 ca_cn = base64.b64decode(attribute)
345 data = json.loads(value)
346 getcert = which('getcert')
347 if getcert is not None:
348 Popen([getcert, 'remove-ca', '-c', ca_cn]).wait()
349 for nickname in data['templates']:
350 Popen([getcert, 'stop-tracking', '-i', nickname]).wait()
351 for f in data['files']:
352 if os.path.exists(f):
353 if os.path.exists(f):
354 os.unlink(f)
355 self.cache_remove_attribute(guid, attribute)
357 def apply(self, guid, ca, applier_func, *args, **kwargs):
358 attribute = base64.b64encode(ca['name'].encode()).decode()
359 # If the policy has changed, unapply, then apply new policy
360 old_val = self.cache_get_attribute_value(guid, attribute)
361 old_data = json.loads(old_val) if old_val is not None else {}
362 templates = ['%s.%s' % (ca['name'], t.decode()) for t in get_supported_templates(ca['hostname'])]
363 new_data = { 'templates': templates, **ca }
364 if changed(new_data, old_data) or self.cache_get_apply_state() == GPOSTATE.ENFORCE:
365 self.unapply(guid, attribute, old_val)
366 # If policy is already applied and unchanged, skip application
367 if old_val is not None and not changed(new_data, old_data) and \
368 self.cache_get_apply_state() != GPOSTATE.ENFORCE:
369 return
371 # Apply the policy and log the changes
372 data = applier_func(*args, **kwargs)
373 self.cache_add_attribute(guid, attribute, data)
375 def process_group_policy(self, deleted_gpo_list, changed_gpo_list,
376 trust_dir=None, private_dir=None):
377 if trust_dir is None:
378 trust_dir = self.lp.cache_path('certs')
379 if private_dir is None:
380 private_dir = self.lp.private_path('certs')
381 if not os.path.exists(trust_dir):
382 os.mkdir(trust_dir, mode=0o755)
383 if not os.path.exists(private_dir):
384 os.mkdir(private_dir, mode=0o700)
386 for guid, settings in deleted_gpo_list:
387 if str(self) in settings:
388 for ca_cn_enc, data in settings[str(self)].items():
389 self.unapply(guid, ca_cn_enc, data)
391 for gpo in changed_gpo_list:
392 if gpo.file_sys_path:
393 section = r'Software\Policies\Microsoft\Cryptography\AutoEnrollment'
394 pol_file = 'MACHINE/Registry.pol'
395 path = os.path.join(gpo.file_sys_path, pol_file)
396 pol_conf = self.parse(path)
397 if not pol_conf:
398 continue
399 for e in pol_conf.entries:
400 if e.keyname == section and e.valuename == 'AEPolicy':
401 # This policy applies as specified in [MS-CAESO] 4.4.5.1
402 if e.data & 0x8000:
403 continue # The policy is disabled
404 enroll = e.data & 0x1 == 0x1
405 manage = e.data & 0x2 == 0x2
406 retrive_pending = e.data & 0x4 == 0x4
407 if enroll:
408 ca_names = self.__enroll(gpo.name,
409 pol_conf.entries,
410 trust_dir, private_dir)
412 # Cleanup any old CAs that have been removed
413 ca_attrs = [base64.b64encode(n.encode()).decode() \
414 for n in ca_names]
415 self.clean(gpo.name, keep=ca_attrs)
416 else:
417 # If enrollment has been disabled for this GPO,
418 # remove any existing policy
419 ca_attrs = \
420 self.cache_get_all_attribute_values(gpo.name)
421 self.clean(gpo.name, remove=list(ca_attrs.keys()))
423 def __read_cep_data(self, guid, ldb, end_point_information,
424 trust_dir, private_dir):
425 """Read CEP Data.
427 [MS-CAESO] 4.4.5.3.2.4
428 In this step autoenrollment initializes instances of the
429 CertificateEnrollmentPolicy by accessing end points associated with CEP
430 groups created in the previous step.
432 # For each group created in the previous step:
433 for end_point_group in end_point_information:
434 # Pick an arbitrary instance of the
435 # CertificateEnrollmentPolicyEndPoint from the group
436 e = end_point_group[0]
438 # If this instance does not have the AutoEnrollmentEnabled flag set
439 # in the EndPoint.Flags, continue with the next group.
440 if not e['Flags'] & 0x10:
441 continue
443 # If the current group contains a
444 # CertificateEnrollmentPolicyEndPoint instance with EndPoint.URI
445 # equal to "LDAP":
446 if any([e['URL'] == 'LDAP:' for e in end_point_group]):
447 # Perform an LDAP search to read the value of the objectGuid
448 # attribute of the root object of the forest root domain NC. If
449 # any errors are encountered, continue with the next group.
450 res = ldb.search('', SCOPE_BASE, '(objectClass=*)',
451 ['rootDomainNamingContext'])
452 if len(res) != 1:
453 continue
454 res2 = ldb.search(res[0]['rootDomainNamingContext'][0],
455 SCOPE_BASE, '(objectClass=*)',
456 ['objectGUID'])
457 if len(res2) != 1:
458 continue
460 # Compare the value read in the previous step to the
461 # EndPoint.PolicyId datum CertificateEnrollmentPolicyEndPoint
462 # instance. If the values do not match, continue with the next
463 # group.
464 objectGUID = '{%s}' % \
465 octet_string_to_objectGUID(res2[0]['objectGUID'][0]).upper()
466 if objectGUID != e['PolicyID']:
467 continue
469 # For each CertificateEnrollmentPolicyEndPoint instance for that
470 # group:
471 ca_names = []
472 for ca in end_point_group:
473 # If EndPoint.URI equals "LDAP":
474 if ca['URL'] == 'LDAP:':
475 # This is a basic configuration.
476 cas = fetch_certification_authorities(ldb)
477 for _ca in cas:
478 self.apply(guid, _ca, cert_enroll, _ca, ldb, trust_dir,
479 private_dir)
480 ca_names.append(_ca['name'])
481 # If EndPoint.URI starts with "HTTPS//":
482 elif ca['URL'].lower().startswith('https://'):
483 self.apply(guid, ca, cert_enroll, ca, ldb, trust_dir,
484 private_dir, auth=ca['auth'])
485 ca_names.append(ca['name'])
486 else:
487 edata = { 'endpoint': ca['URL'] }
488 log.error('Unrecognized endpoint', edata)
489 return ca_names
491 def __enroll(self, guid, entries, trust_dir, private_dir):
492 url = 'ldap://%s' % get_dc_hostname(self.creds, self.lp)
493 ldb = Ldb(url=url, session_info=system_session(),
494 lp=self.lp, credentials=self.creds)
496 ca_names = []
497 end_point_information = obtain_end_point_information(entries)
498 if len(end_point_information) > 0:
499 ca_names.extend(self.__read_cep_data(guid, ldb,
500 end_point_information,
501 trust_dir, private_dir))
502 else:
503 cas = fetch_certification_authorities(ldb)
504 for ca in cas:
505 self.apply(guid, ca, cert_enroll, ca, ldb, trust_dir,
506 private_dir)
507 ca_names.append(ca['name'])
508 return ca_names
510 def rsop(self, gpo):
511 output = {}
512 pol_file = 'MACHINE/Registry.pol'
513 section = r'Software\Policies\Microsoft\Cryptography\AutoEnrollment'
514 if gpo.file_sys_path:
515 path = os.path.join(gpo.file_sys_path, pol_file)
516 pol_conf = self.parse(path)
517 if not pol_conf:
518 return output
519 for e in pol_conf.entries:
520 if e.keyname == section and e.valuename == 'AEPolicy':
521 enroll = e.data & 0x1 == 0x1
522 if e.data & 0x8000 or not enroll:
523 continue
524 output['Auto Enrollment Policy'] = {}
525 url = 'ldap://%s' % get_dc_hostname(self.creds, self.lp)
526 ldb = Ldb(url=url, session_info=system_session(),
527 lp=self.lp, credentials=self.creds)
528 end_point_information = \
529 obtain_end_point_information(pol_conf.entries)
530 cas = fetch_certification_authorities(ldb)
531 if len(end_point_information) > 0:
532 cas2 = [ep for sl in end_point_information for ep in sl]
533 if any([ca['URL'] == 'LDAP:' for ca in cas2]):
534 cas.extend(cas2)
535 else:
536 cas = cas2
537 for ca in cas:
538 if 'URL' in ca and ca['URL'] == 'LDAP:':
539 continue
540 policy = 'Auto Enrollment Policy'
541 cn = ca['name']
542 if policy not in output:
543 output[policy] = {}
544 output[policy][cn] = {}
545 if 'cACertificate' in ca:
546 output[policy][cn]['CA Certificate'] = \
547 format_root_cert(ca['cACertificate']).decode()
548 output[policy][cn]['Auto Enrollment Server'] = \
549 ca['hostname']
550 supported_templates = \
551 get_supported_templates(ca['hostname'])
552 output[policy][cn]['Templates'] = \
553 [t.decode() for t in supported_templates]
554 return output