gp: Template changes should invalidate cache
[Samba.git] / python / samba / gp / gp_cert_auto_enroll_ext.py
blob8233713e8ad807f4bb21b225a4af947710948e96
1 # gp_cert_auto_enroll_ext samba group policy
2 # Copyright (C) David Mulder <dmulder@suse.com> 2021
4 # This program is free software; you can redistribute it and/or modify
5 # it under the terms of the GNU General Public License as published by
6 # the Free Software Foundation; either version 3 of the License, or
7 # (at your option) any later version.
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
14 # You should have received a copy of the GNU General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
17 import os
18 import operator
19 import requests
20 from samba.gp.gpclass import gp_pol_ext, gp_applier, GPOSTATE
21 from samba import Ldb
22 from ldb import SCOPE_SUBTREE, SCOPE_BASE
23 from samba.auth import system_session
24 from samba.gp.gpclass import get_dc_hostname
25 import base64
26 from shutil import which
27 from subprocess import Popen, PIPE
28 import re
29 import json
30 from samba.gp.util.logging import log
31 import struct
32 try:
33 from cryptography.hazmat.primitives.serialization.pkcs7 import \
34 load_der_pkcs7_certificates
35 except ModuleNotFoundError:
36 def load_der_pkcs7_certificates(x): return []
37 log.error('python cryptography missing pkcs7 support. '
38 'Certificate chain parsing will fail')
39 from cryptography.hazmat.primitives.serialization import Encoding
40 from cryptography.x509 import load_der_x509_certificate
41 from cryptography.hazmat.backends import default_backend
42 from samba.common import get_string
44 cert_wrap = b"""
45 -----BEGIN CERTIFICATE-----
47 -----END CERTIFICATE-----"""
48 endpoint_re = '(https|HTTPS)://(?P<server>[a-zA-Z0-9.-]+)/ADPolicyProvider' + \
49 '_CEP_(?P<auth>[a-zA-Z]+)/service.svc/CEP'
51 global_trust_dirs = ['/etc/pki/trust/anchors', # SUSE
52 '/etc/pki/ca-trust/source/anchors', # RHEL/Fedora
53 '/usr/local/share/ca-certificates'] # Debian/Ubuntu
55 def octet_string_to_objectGUID(data):
56 """Convert an octet string to an objectGUID."""
57 return '%s-%s-%s-%s-%s' % ('%02x' % struct.unpack('<L', data[0:4])[0],
58 '%02x' % struct.unpack('<H', data[4:6])[0],
59 '%02x' % struct.unpack('<H', data[6:8])[0],
60 '%02x' % struct.unpack('>H', data[8:10])[0],
61 '%02x%02x' % struct.unpack('>HL', data[10:]))
64 def group_and_sort_end_point_information(end_point_information):
65 """Group and Sort End Point Information.
67 [MS-CAESO] 4.4.5.3.2.3
68 In this step autoenrollment processes the end point information by grouping
69 it by CEP ID and sorting in the order with which it will use the end point
70 to access the CEP information.
71 """
72 # Create groups of the CertificateEnrollmentPolicyEndPoint instances that
73 # have the same value of the EndPoint.PolicyID datum.
74 end_point_groups = {}
75 for e in end_point_information:
76 if e['PolicyID'] not in end_point_groups.keys():
77 end_point_groups[e['PolicyID']] = []
78 end_point_groups[e['PolicyID']].append(e)
80 # Sort each group by following these rules:
81 for end_point_group in end_point_groups.values():
82 # Sort the CertificateEnrollmentPolicyEndPoint instances in ascending
83 # order based on the EndPoint.Cost value.
84 end_point_group.sort(key=lambda e: e['Cost'])
86 # For instances that have the same EndPoint.Cost:
87 cost_list = [e['Cost'] for e in end_point_group]
88 costs = set(cost_list)
89 for cost in costs:
90 i = cost_list.index(cost)
91 j = len(cost_list)-operator.indexOf(reversed(cost_list), cost)-1
92 if i == j:
93 continue
95 # Sort those that have EndPoint.Authentication equal to Kerberos
96 # first. Then sort those that have EndPoint.Authentication equal to
97 # Anonymous. The rest of the CertificateEnrollmentPolicyEndPoint
98 # instances follow in an arbitrary order.
99 def sort_auth(e):
100 # 0x2 - Kerberos
101 if e['AuthFlags'] == 0x2:
102 return 0
103 # 0x1 - Anonymous
104 elif e['AuthFlags'] == 0x1:
105 return 1
106 else:
107 return 2
108 end_point_group[i:j+1] = sorted(end_point_group[i:j+1],
109 key=sort_auth)
110 return list(end_point_groups.values())
112 def obtain_end_point_information(entries):
113 """Obtain End Point Information.
115 [MS-CAESO] 4.4.5.3.2.2
116 In this step autoenrollment initializes the
117 CertificateEnrollmentPolicyEndPoints table.
119 end_point_information = {}
120 section = 'Software\\Policies\\Microsoft\\Cryptography\\PolicyServers\\'
121 for e in entries:
122 if not e.keyname.startswith(section):
123 continue
124 name = e.keyname.replace(section, '')
125 if name not in end_point_information.keys():
126 end_point_information[name] = {}
127 end_point_information[name][e.valuename] = e.data
128 for ca in end_point_information.values():
129 m = re.match(endpoint_re, ca['URL'])
130 if m:
131 name = '%s-CA' % m.group('server').replace('.', '-')
132 ca['name'] = name
133 ca['hostname'] = m.group('server')
134 ca['auth'] = m.group('auth')
135 elif ca['URL'].lower() != 'ldap:':
136 edata = { 'endpoint': ca['URL'] }
137 log.error('Failed to parse the endpoint', edata)
138 return {}
139 end_point_information = \
140 group_and_sort_end_point_information(end_point_information.values())
141 return end_point_information
143 def fetch_certification_authorities(ldb):
144 """Initialize CAs.
146 [MS-CAESO] 4.4.5.3.1.2
148 result = []
149 basedn = ldb.get_default_basedn()
150 # Autoenrollment MUST do an LDAP search for the CA information
151 # (pKIEnrollmentService) objects under the following container:
152 dn = 'CN=Enrollment Services,CN=Public Key Services,CN=Services,CN=Configuration,%s' % basedn
153 attrs = ['cACertificate', 'cn', 'dNSHostName']
154 expr = '(objectClass=pKIEnrollmentService)'
155 res = ldb.search(dn, SCOPE_SUBTREE, expr, attrs)
156 if len(res) == 0:
157 return result
158 for es in res:
159 data = { 'name': get_string(es['cn'][0]),
160 'hostname': get_string(es['dNSHostName'][0]),
161 'cACertificate': get_string(base64.b64encode(es['cACertificate'][0]))
163 result.append(data)
164 return result
166 def fetch_template_attrs(ldb, name, attrs=None):
167 if attrs is None:
168 attrs = ['msPKI-Minimal-Key-Size']
169 basedn = ldb.get_default_basedn()
170 dn = 'CN=Certificate Templates,CN=Public Key Services,CN=Services,CN=Configuration,%s' % basedn
171 expr = '(cn=%s)' % name
172 res = ldb.search(dn, SCOPE_SUBTREE, expr, attrs)
173 if len(res) == 1 and 'msPKI-Minimal-Key-Size' in res[0]:
174 return dict(res[0])
175 else:
176 return {'msPKI-Minimal-Key-Size': ['2048']}
178 def format_root_cert(cert):
179 return cert_wrap % re.sub(b"(.{64})", b"\\1\n", cert.encode(), 0, re.DOTALL)
181 def find_cepces_submit():
182 certmonger_dirs = [os.environ.get("PATH"), '/usr/lib/certmonger',
183 '/usr/libexec/certmonger']
184 return which('cepces-submit', path=':'.join(certmonger_dirs))
186 def get_supported_templates(server):
187 cepces_submit = find_cepces_submit()
188 if os.path.exists(cepces_submit):
189 env = os.environ
190 env['CERTMONGER_OPERATION'] = 'GET-SUPPORTED-TEMPLATES'
191 p = Popen([cepces_submit, '--server=%s' % server, '--auth=Kerberos'],
192 env=env, stdout=PIPE, stderr=PIPE)
193 out, err = p.communicate()
194 if p.returncode != 0:
195 data = { 'Error': err.decode() }
196 log.error('Failed to fetch the list of supported templates.', data)
197 return out.strip().split()
198 return []
201 def getca(ca, url, trust_dir):
202 """Fetch Certificate Chain from the CA."""
203 root_cert = os.path.join(trust_dir, '%s.crt' % ca['name'])
204 root_certs = []
206 try:
207 r = requests.get(url=url, params={'operation': 'GetCACert',
208 'message': 'CAIdentifier'})
209 except requests.exceptions.ConnectionError:
210 log.warn('Failed to establish a new connection')
211 r = None
212 if r is None or r.content == b'' or r.headers['Content-Type'] == 'text/html':
213 log.warn('Failed to fetch the root certificate chain.')
214 log.warn('The Network Device Enrollment Service is either not' +
215 ' installed or not configured.')
216 if 'cACertificate' in ca:
217 log.warn('Installing the server certificate only.')
218 try:
219 cert = load_der_x509_certificate(ca['cACertificate'])
220 except TypeError:
221 cert = load_der_x509_certificate(ca['cACertificate'],
222 default_backend())
223 cert_data = cert.public_bytes(Encoding.PEM)
224 with open(root_cert, 'wb') as w:
225 w.write(cert_data)
226 root_certs.append(root_cert)
227 return root_certs
229 if r.headers['Content-Type'] == 'application/x-x509-ca-cert':
230 # Older versions of load_der_x509_certificate require a backend param
231 try:
232 cert = load_der_x509_certificate(r.content)
233 except TypeError:
234 cert = load_der_x509_certificate(r.content, default_backend())
235 cert_data = cert.public_bytes(Encoding.PEM)
236 with open(root_cert, 'wb') as w:
237 w.write(cert_data)
238 root_certs.append(root_cert)
239 elif r.headers['Content-Type'] == 'application/x-x509-ca-ra-cert':
240 certs = load_der_pkcs7_certificates(r.content)
241 for i in range(0, len(certs)):
242 cert = certs[i].public_bytes(Encoding.PEM)
243 filename, extension = root_cert.rsplit('.', 1)
244 dest = '%s.%d.%s' % (filename, i, extension)
245 with open(dest, 'wb') as w:
246 w.write(cert)
247 root_certs.append(dest)
248 else:
249 log.warn('getca: Wrong (or missing) MIME content type')
251 return root_certs
254 def find_global_trust_dir():
255 """Return the global trust dir using known paths from various Linux distros."""
256 for trust_dir in global_trust_dirs:
257 if os.path.isdir(trust_dir):
258 return trust_dir
259 return global_trust_dirs[0]
261 def update_ca_command():
262 """Return the command to update the CA trust store."""
263 return which('update-ca-certificates') or which('update-ca-trust')
265 def changed(new_data, old_data):
266 """Return True if any key present in both dicts has changed."""
267 return any((new_data[k] != old_data[k] if k in old_data else False) \
268 for k in new_data.keys())
270 def cert_enroll(ca, ldb, trust_dir, private_dir, auth='Kerberos'):
271 """Install the root certificate chain."""
272 data = dict({'files': [], 'templates': []}, **ca)
273 url = 'http://%s/CertSrv/mscep/mscep.dll/pkiclient.exe?' % ca['hostname']
274 root_certs = getca(ca, url, trust_dir)
275 data['files'].extend(root_certs)
276 global_trust_dir = find_global_trust_dir()
277 for src in root_certs:
278 # Symlink the certs to global trust dir
279 dst = os.path.join(global_trust_dir, os.path.basename(src))
280 try:
281 os.symlink(src, dst)
282 data['files'].append(dst)
283 except PermissionError:
284 log.warn('Failed to symlink root certificate to the'
285 ' admin trust anchors')
286 except FileNotFoundError:
287 log.warn('Failed to symlink root certificate to the'
288 ' admin trust anchors.'
289 ' The directory was not found', global_trust_dir)
290 except FileExistsError:
291 # If we're simply downloading a renewed cert, the symlink
292 # already exists. Ignore the FileExistsError. Preserve the
293 # existing symlink in the unapply data.
294 data['files'].append(dst)
295 update = update_ca_command()
296 if update is not None:
297 Popen([update]).wait()
298 # Setup Certificate Auto Enrollment
299 getcert = which('getcert')
300 cepces_submit = find_cepces_submit()
301 if getcert is not None and os.path.exists(cepces_submit):
302 p = Popen([getcert, 'add-ca', '-c', ca['name'], '-e',
303 '%s --server=%s --auth=%s' % (cepces_submit,
304 ca['hostname'], auth)],
305 stdout=PIPE, stderr=PIPE)
306 out, err = p.communicate()
307 log.debug(out.decode())
308 if p.returncode != 0:
309 data = { 'Error': err.decode(), 'CA': ca['name'] }
310 log.error('Failed to add Certificate Authority', data)
311 supported_templates = get_supported_templates(ca['hostname'])
312 for template in supported_templates:
313 attrs = fetch_template_attrs(ldb, template)
314 nickname = '%s.%s' % (ca['name'], template.decode())
315 keyfile = os.path.join(private_dir, '%s.key' % nickname)
316 certfile = os.path.join(trust_dir, '%s.crt' % nickname)
317 p = Popen([getcert, 'request', '-c', ca['name'],
318 '-T', template.decode(),
319 '-I', nickname, '-k', keyfile, '-f', certfile,
320 '-g', attrs['msPKI-Minimal-Key-Size'][0]],
321 stdout=PIPE, stderr=PIPE)
322 out, err = p.communicate()
323 log.debug(out.decode())
324 if p.returncode != 0:
325 data = { 'Error': err.decode(), 'Certificate': nickname }
326 log.error('Failed to request certificate', data)
327 data['files'].extend([keyfile, certfile])
328 data['templates'].append(nickname)
329 if update is not None:
330 Popen([update]).wait()
331 else:
332 log.warn('certmonger and cepces must be installed for ' +
333 'certificate auto enrollment to work')
334 return json.dumps(data)
336 class gp_cert_auto_enroll_ext(gp_pol_ext, gp_applier):
337 def __str__(self):
338 return 'Cryptography\AutoEnrollment'
340 def unapply(self, guid, attribute, value):
341 ca_cn = base64.b64decode(attribute)
342 data = json.loads(value)
343 getcert = which('getcert')
344 if getcert is not None:
345 Popen([getcert, 'remove-ca', '-c', ca_cn]).wait()
346 for nickname in data['templates']:
347 Popen([getcert, 'stop-tracking', '-i', nickname]).wait()
348 for f in data['files']:
349 if os.path.exists(f):
350 if os.path.exists(f):
351 os.unlink(f)
352 self.cache_remove_attribute(guid, attribute)
354 def apply(self, guid, ca, applier_func, *args, **kwargs):
355 attribute = base64.b64encode(ca['name'].encode()).decode()
356 # If the policy has changed, unapply, then apply new policy
357 old_val = self.cache_get_attribute_value(guid, attribute)
358 old_data = json.loads(old_val) if old_val is not None else {}
359 templates = ['%s.%s' % (ca['name'], t.decode()) for t in get_supported_templates(ca['hostname'])]
360 new_data = { 'templates': templates, **ca }
361 if changed(new_data, old_data) or self.cache_get_apply_state() == GPOSTATE.ENFORCE:
362 self.unapply(guid, attribute, old_val)
363 # If policy is already applied and unchanged, skip application
364 if old_val is not None and not changed(new_data, old_data) and \
365 self.cache_get_apply_state() != GPOSTATE.ENFORCE:
366 return
368 # Apply the policy and log the changes
369 data = applier_func(*args, **kwargs)
370 self.cache_add_attribute(guid, attribute, data)
372 def process_group_policy(self, deleted_gpo_list, changed_gpo_list,
373 trust_dir=None, private_dir=None):
374 if trust_dir is None:
375 trust_dir = self.lp.cache_path('certs')
376 if private_dir is None:
377 private_dir = self.lp.private_path('certs')
378 if not os.path.exists(trust_dir):
379 os.mkdir(trust_dir, mode=0o755)
380 if not os.path.exists(private_dir):
381 os.mkdir(private_dir, mode=0o700)
383 for guid, settings in deleted_gpo_list:
384 if str(self) in settings:
385 for ca_cn_enc, data in settings[str(self)].items():
386 self.unapply(guid, ca_cn_enc, data)
388 for gpo in changed_gpo_list:
389 if gpo.file_sys_path:
390 section = 'Software\Policies\Microsoft\Cryptography\AutoEnrollment'
391 pol_file = 'MACHINE/Registry.pol'
392 path = os.path.join(gpo.file_sys_path, pol_file)
393 pol_conf = self.parse(path)
394 if not pol_conf:
395 continue
396 for e in pol_conf.entries:
397 if e.keyname == section and e.valuename == 'AEPolicy':
398 # This policy applies as specified in [MS-CAESO] 4.4.5.1
399 if e.data & 0x8000:
400 continue # The policy is disabled
401 enroll = e.data & 0x1 == 0x1
402 manage = e.data & 0x2 == 0x2
403 retrive_pending = e.data & 0x4 == 0x4
404 if enroll:
405 ca_names = self.__enroll(gpo.name,
406 pol_conf.entries,
407 trust_dir, private_dir)
409 # Cleanup any old CAs that have been removed
410 ca_attrs = [base64.b64encode(n.encode()).decode() \
411 for n in ca_names]
412 self.clean(gpo.name, keep=ca_attrs)
413 else:
414 # If enrollment has been disabled for this GPO,
415 # remove any existing policy
416 ca_attrs = \
417 self.cache_get_all_attribute_values(gpo.name)
418 self.clean(gpo.name, remove=ca_attrs)
420 def __read_cep_data(self, guid, ldb, end_point_information,
421 trust_dir, private_dir):
422 """Read CEP Data.
424 [MS-CAESO] 4.4.5.3.2.4
425 In this step autoenrollment initializes instances of the
426 CertificateEnrollmentPolicy by accessing end points associated with CEP
427 groups created in the previous step.
429 # For each group created in the previous step:
430 for end_point_group in end_point_information:
431 # Pick an arbitrary instance of the
432 # CertificateEnrollmentPolicyEndPoint from the group
433 e = end_point_group[0]
435 # If this instance does not have the AutoEnrollmentEnabled flag set
436 # in the EndPoint.Flags, continue with the next group.
437 if not e['Flags'] & 0x10:
438 continue
440 # If the current group contains a
441 # CertificateEnrollmentPolicyEndPoint instance with EndPoint.URI
442 # equal to "LDAP":
443 if any([e['URL'] == 'LDAP:' for e in end_point_group]):
444 # Perform an LDAP search to read the value of the objectGuid
445 # attribute of the root object of the forest root domain NC. If
446 # any errors are encountered, continue with the next group.
447 res = ldb.search('', SCOPE_BASE, '(objectClass=*)',
448 ['rootDomainNamingContext'])
449 if len(res) != 1:
450 continue
451 res2 = ldb.search(res[0]['rootDomainNamingContext'][0],
452 SCOPE_BASE, '(objectClass=*)',
453 ['objectGUID'])
454 if len(res2) != 1:
455 continue
457 # Compare the value read in the previous step to the
458 # EndPoint.PolicyId datum CertificateEnrollmentPolicyEndPoint
459 # instance. If the values do not match, continue with the next
460 # group.
461 objectGUID = '{%s}' % \
462 octet_string_to_objectGUID(res2[0]['objectGUID'][0]).upper()
463 if objectGUID != e['PolicyID']:
464 continue
466 # For each CertificateEnrollmentPolicyEndPoint instance for that
467 # group:
468 ca_names = []
469 for ca in end_point_group:
470 # If EndPoint.URI equals "LDAP":
471 if ca['URL'] == 'LDAP:':
472 # This is a basic configuration.
473 cas = fetch_certification_authorities(ldb)
474 for _ca in cas:
475 self.apply(guid, _ca, cert_enroll, _ca, ldb, trust_dir,
476 private_dir)
477 ca_names.append(_ca['name'])
478 # If EndPoint.URI starts with "HTTPS//":
479 elif ca['URL'].lower().startswith('https://'):
480 self.apply(guid, ca, cert_enroll, ca, ldb, trust_dir,
481 private_dir, auth=ca['auth'])
482 ca_names.append(ca['name'])
483 else:
484 edata = { 'endpoint': ca['URL'] }
485 log.error('Unrecognized endpoint', edata)
486 return ca_names
488 def __enroll(self, guid, entries, trust_dir, private_dir):
489 url = 'ldap://%s' % get_dc_hostname(self.creds, self.lp)
490 ldb = Ldb(url=url, session_info=system_session(),
491 lp=self.lp, credentials=self.creds)
493 ca_names = []
494 end_point_information = obtain_end_point_information(entries)
495 if len(end_point_information) > 0:
496 ca_names.extend(self.__read_cep_data(guid, ldb,
497 end_point_information,
498 trust_dir, private_dir))
499 else:
500 cas = fetch_certification_authorities(ldb)
501 for ca in cas:
502 self.apply(guid, ca, cert_enroll, ca, ldb, trust_dir,
503 private_dir)
504 ca_names.append(ca['name'])
505 return ca_names
507 def rsop(self, gpo):
508 output = {}
509 pol_file = 'MACHINE/Registry.pol'
510 section = 'Software\Policies\Microsoft\Cryptography\AutoEnrollment'
511 if gpo.file_sys_path:
512 path = os.path.join(gpo.file_sys_path, pol_file)
513 pol_conf = self.parse(path)
514 if not pol_conf:
515 return output
516 for e in pol_conf.entries:
517 if e.keyname == section and e.valuename == 'AEPolicy':
518 enroll = e.data & 0x1 == 0x1
519 if e.data & 0x8000 or not enroll:
520 continue
521 output['Auto Enrollment Policy'] = {}
522 url = 'ldap://%s' % get_dc_hostname(self.creds, self.lp)
523 ldb = Ldb(url=url, session_info=system_session(),
524 lp=self.lp, credentials=self.creds)
525 end_point_information = \
526 obtain_end_point_information(pol_conf.entries)
527 cas = fetch_certification_authorities(ldb)
528 if len(end_point_information) > 0:
529 cas2 = [ep for sl in end_point_information for ep in sl]
530 if any([ca['URL'] == 'LDAP:' for ca in cas2]):
531 cas.extend(cas2)
532 else:
533 cas = cas2
534 for ca in cas:
535 if 'URL' in ca and ca['URL'] == 'LDAP:':
536 continue
537 policy = 'Auto Enrollment Policy'
538 cn = ca['name']
539 if policy not in output:
540 output[policy] = {}
541 output[policy][cn] = {}
542 if 'cACertificate' in ca:
543 output[policy][cn]['CA Certificate'] = \
544 format_root_cert(ca['cACertificate']).decode()
545 output[policy][cn]['Auto Enrollment Server'] = \
546 ca['hostname']
547 supported_templates = \
548 get_supported_templates(ca['hostname'])
549 output[policy][cn]['Templates'] = \
550 [t.decode() for t in supported_templates]
551 return output