tests/ntacls: unblock failing gitlab pipelines because test_setntacl_forcenative
[Samba.git] / python / samba / ntacls.py
blob24af056d2a4be8d06770d1bfd19216067ccb0af7
1 # Unix SMB/CIFS implementation.
2 # Copyright (C) Matthieu Patou <mat@matws.net> 2009-2010
5 # This program is free software; you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; either version 3 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 """NT Acls."""
21 import os
22 import tempfile
23 import shutil
25 import samba.xattr_native
26 import samba.xattr_tdb
27 import samba.posix_eadb
28 from samba.samba3 import param as s3param
29 from samba.dcerpc import security, xattr, idmap
30 from samba.ndr import ndr_pack, ndr_unpack
31 from samba.samba3 import smbd
32 from samba.samba3 import libsmb_samba_internal as libsmb
33 from samba.logger import get_samba_logger
34 from samba import NTSTATUSError
35 from samba.auth_util import system_session_unix
36 from samba import safe_tarfile as tarfile
39 # don't include volumes
40 SMB_FILE_ATTRIBUTE_FLAGS = libsmb.FILE_ATTRIBUTE_SYSTEM | \
41 libsmb.FILE_ATTRIBUTE_DIRECTORY | \
42 libsmb.FILE_ATTRIBUTE_ARCHIVE | \
43 libsmb.FILE_ATTRIBUTE_HIDDEN
46 SECURITY_SECINFO_FLAGS = security.SECINFO_OWNER | \
47 security.SECINFO_GROUP | \
48 security.SECINFO_DACL | \
49 security.SECINFO_SACL
51 class XattrBackendError(Exception):
52 """A generic xattr backend error."""
55 def checkset_backend(lp, backend, eadbfile):
56 """return the path to the eadb, or None"""
57 if backend is None:
58 xattr_tdb = lp.get("xattr_tdb:file")
59 if xattr_tdb is not None:
60 return (samba.xattr_tdb, lp.get("xattr_tdb:file"))
61 posix_eadb = lp.get("posix:eadb")
62 if posix_eadb is not None:
63 return (samba.posix_eadb, lp.get("posix:eadb"))
64 return (None, None)
65 elif backend == "native":
66 return (None, None)
67 elif backend == "eadb":
68 if eadbfile is not None:
69 return (samba.posix_eadb, eadbfile)
70 else:
71 return (samba.posix_eadb, os.path.abspath(os.path.join(lp.get("private dir"), "eadb.tdb")))
72 elif backend == "tdb":
73 if eadbfile is not None:
74 return (samba.xattr_tdb, eadbfile)
75 else:
76 state_dir = lp.get("state directory")
77 db_path = os.path.abspath(os.path.join(state_dir, "xattr.tdb"))
78 return (samba.xattr_tdb, db_path)
79 else:
80 raise XattrBackendError("Invalid xattr backend choice %s" % backend)
83 def getdosinfo(lp, file):
84 try:
85 attribute = samba.xattr_native.wrap_getxattr(file,
86 xattr.XATTR_DOSATTRIB_NAME_S3)
87 except Exception:
88 return
90 return ndr_unpack(xattr.DOSATTRIB, attribute)
93 def getntacl(lp,
94 file,
95 session_info,
96 backend=None,
97 eadbfile=None,
98 direct_db_access=True,
99 service=None):
100 if direct_db_access:
101 (backend_obj, dbname) = checkset_backend(lp, backend, eadbfile)
102 if dbname is not None:
103 try:
104 attribute = backend_obj.wrap_getxattr(dbname, file,
105 xattr.XATTR_NTACL_NAME)
106 except Exception:
107 # FIXME: Don't catch all exceptions, just those related to opening
108 # xattrdb
109 print("Fail to open %s" % dbname)
110 attribute = samba.xattr_native.wrap_getxattr(file,
111 xattr.XATTR_NTACL_NAME)
112 else:
113 attribute = samba.xattr_native.wrap_getxattr(file,
114 xattr.XATTR_NTACL_NAME)
115 ntacl = ndr_unpack(xattr.NTACL, attribute)
116 if ntacl.version == 1:
117 return ntacl.info
118 elif ntacl.version == 2:
119 return ntacl.info.sd
120 elif ntacl.version == 3:
121 return ntacl.info.sd
122 elif ntacl.version == 4:
123 return ntacl.info.sd
124 else:
125 return smbd.get_nt_acl(file,
126 SECURITY_SECINFO_FLAGS,
127 session_info,
128 service=service)
131 def setntacl(lp, file, sddl, domsid, session_info,
132 backend=None, eadbfile=None,
133 use_ntvfs=True, skip_invalid_chown=False,
134 passdb=None, service=None):
136 A wrapper for smbd set_nt_acl api.
138 Args:
139 lp (LoadParam): load param from conf
140 file (str): a path to file or dir
141 sddl (str): ntacl sddl string
142 service (str): name of share service, e.g.: sysvol
143 session_info (auth_session_info): session info for authentication
145 Note:
146 Get `session_info` with `samba.auth.user_session`, do not use the
147 `admin_session` api.
149 Returns:
150 None
153 assert(isinstance(domsid, str) or isinstance(domsid, security.dom_sid))
154 if isinstance(domsid, str):
155 sid = security.dom_sid(domsid)
156 elif isinstance(domsid, security.dom_sid):
157 sid = domsid
158 domsid = str(sid)
160 assert(isinstance(sddl, str) or isinstance(sddl, security.descriptor))
161 if isinstance(sddl, str):
162 sd = security.descriptor.from_sddl(sddl, sid)
163 elif isinstance(sddl, security.descriptor):
164 sd = sddl
165 sddl = sd.as_sddl(sid)
167 if not use_ntvfs and skip_invalid_chown:
168 # Check if the owner can be resolved as a UID
169 (owner_id, owner_type) = passdb.sid_to_id(sd.owner_sid)
170 if ((owner_type != idmap.ID_TYPE_UID) and (owner_type != idmap.ID_TYPE_BOTH)):
171 # Check if this particular owner SID was domain admins,
172 # because we special-case this as mapping to
173 # 'administrator' instead.
174 if sd.owner_sid == security.dom_sid("%s-%d" % (domsid, security.DOMAIN_RID_ADMINS)):
175 administrator = security.dom_sid("%s-%d" % (domsid, security.DOMAIN_RID_ADMINISTRATOR))
176 (admin_id, admin_type) = passdb.sid_to_id(administrator)
178 # Confirm we have a UID for administrator
179 if ((admin_type == idmap.ID_TYPE_UID) or (admin_type == idmap.ID_TYPE_BOTH)):
181 # Set it, changing the owner to 'administrator' rather than domain admins
182 sd2 = sd
183 sd2.owner_sid = administrator
185 smbd.set_nt_acl(
186 file, SECURITY_SECINFO_FLAGS, sd2,
187 session_info,
188 service=service)
190 # and then set an NTVFS ACL (which does not set the posix ACL) to pretend the owner really was set
191 use_ntvfs = True
192 else:
193 raise XattrBackendError("Unable to find UID for domain administrator %s, got id %d of type %d" % (administrator, admin_id, admin_type))
194 else:
195 # For all other owning users, reset the owner to root
196 # and then set the ACL without changing the owner
198 # This won't work in test environments, as it tries a real (rather than xattr-based fake) chown
200 os.chown(file, 0, 0)
201 smbd.set_nt_acl(
202 file,
203 security.SECINFO_GROUP |
204 security.SECINFO_DACL |
205 security.SECINFO_SACL,
207 session_info,
208 service=service)
210 if use_ntvfs:
211 (backend_obj, dbname) = checkset_backend(lp, backend, eadbfile)
212 ntacl = xattr.NTACL()
213 ntacl.version = 1
214 ntacl.info = sd
215 if dbname is not None:
216 try:
217 backend_obj.wrap_setxattr(dbname,
218 file, xattr.XATTR_NTACL_NAME, ndr_pack(ntacl))
219 except Exception:
220 # FIXME: Don't catch all exceptions, just those related to opening
221 # xattrdb
222 print("Fail to open %s" % dbname)
223 samba.xattr_native.wrap_setxattr(file, xattr.XATTR_NTACL_NAME,
224 ndr_pack(ntacl))
225 else:
226 samba.xattr_native.wrap_setxattr(file, xattr.XATTR_NTACL_NAME,
227 ndr_pack(ntacl))
228 else:
229 smbd.set_nt_acl(
230 file, SECURITY_SECINFO_FLAGS, sd,
231 service=service, session_info=session_info)
234 def ldapmask2filemask(ldm):
235 """Takes the access mask of a DS ACE and transform them in a File ACE mask.
237 RIGHT_DS_CREATE_CHILD = 0x00000001
238 RIGHT_DS_DELETE_CHILD = 0x00000002
239 RIGHT_DS_LIST_CONTENTS = 0x00000004
240 ACTRL_DS_SELF = 0x00000008
241 RIGHT_DS_READ_PROPERTY = 0x00000010
242 RIGHT_DS_WRITE_PROPERTY = 0x00000020
243 RIGHT_DS_DELETE_TREE = 0x00000040
244 RIGHT_DS_LIST_OBJECT = 0x00000080
245 RIGHT_DS_CONTROL_ACCESS = 0x00000100
246 FILE_READ_DATA = 0x0001
247 FILE_LIST_DIRECTORY = 0x0001
248 FILE_WRITE_DATA = 0x0002
249 FILE_ADD_FILE = 0x0002
250 FILE_APPEND_DATA = 0x0004
251 FILE_ADD_SUBDIRECTORY = 0x0004
252 FILE_CREATE_PIPE_INSTANCE = 0x0004
253 FILE_READ_EA = 0x0008
254 FILE_WRITE_EA = 0x0010
255 FILE_EXECUTE = 0x0020
256 FILE_TRAVERSE = 0x0020
257 FILE_DELETE_CHILD = 0x0040
258 FILE_READ_ATTRIBUTES = 0x0080
259 FILE_WRITE_ATTRIBUTES = 0x0100
260 DELETE = 0x00010000
261 READ_CONTROL = 0x00020000
262 WRITE_DAC = 0x00040000
263 WRITE_OWNER = 0x00080000
264 SYNCHRONIZE = 0x00100000
265 STANDARD_RIGHTS_ALL = 0x001F0000
267 filemask = ldm & STANDARD_RIGHTS_ALL
269 if (ldm & RIGHT_DS_READ_PROPERTY) and (ldm & RIGHT_DS_LIST_CONTENTS):
270 filemask = filemask | (SYNCHRONIZE | FILE_LIST_DIRECTORY |
271 FILE_READ_ATTRIBUTES | FILE_READ_EA |
272 FILE_READ_DATA | FILE_EXECUTE)
274 if ldm & RIGHT_DS_WRITE_PROPERTY:
275 filemask = filemask | (SYNCHRONIZE | FILE_WRITE_DATA |
276 FILE_APPEND_DATA | FILE_WRITE_EA |
277 FILE_WRITE_ATTRIBUTES | FILE_ADD_FILE |
278 FILE_ADD_SUBDIRECTORY)
280 if ldm & RIGHT_DS_CREATE_CHILD:
281 filemask = filemask | (FILE_ADD_SUBDIRECTORY | FILE_ADD_FILE)
283 if ldm & RIGHT_DS_DELETE_CHILD:
284 filemask = filemask | FILE_DELETE_CHILD
286 return filemask
289 def dsacl2fsacl(dssddl, sid, as_sddl=True):
292 This function takes an the SDDL representation of a DS
293 ACL and return the SDDL representation of this ACL adapted
294 for files. It's used for Policy object provision
296 ref = security.descriptor.from_sddl(dssddl, sid)
297 fdescr = security.descriptor()
298 fdescr.owner_sid = ref.owner_sid
299 fdescr.group_sid = ref.group_sid
300 fdescr.type = ref.type
301 fdescr.revision = ref.revision
302 aces = ref.dacl.aces
303 for i in range(0, len(aces)):
304 ace = aces[i]
305 if ace.type in (security.SEC_ACE_TYPE_ACCESS_ALLOWED_OBJECT,
306 security.SEC_ACE_TYPE_ACCESS_ALLOWED) and str(ace.trustee) != security.SID_BUILTIN_PREW2K:
307 # if fdescr.type & security.SEC_DESC_DACL_AUTO_INHERITED:
308 ace.flags = ace.flags | security.SEC_ACE_FLAG_OBJECT_INHERIT | security.SEC_ACE_FLAG_CONTAINER_INHERIT
309 if str(ace.trustee) == security.SID_CREATOR_OWNER:
310 # For Creator/Owner the IO flag is set as this ACE has only a sense for child objects
311 ace.flags = ace.flags | security.SEC_ACE_FLAG_INHERIT_ONLY
312 ace.access_mask = ldapmask2filemask(ace.access_mask)
313 fdescr.dacl_add(ace)
315 if not as_sddl:
316 return fdescr
318 return fdescr.as_sddl(sid)
321 class SMBHelper:
323 A wrapper class for SMB connection
325 smb_path: path with separator "\\" other than "/"
328 def __init__(self, smb_conn, dom_sid):
329 self.smb_conn = smb_conn
330 self.dom_sid = dom_sid
332 def get_acl(self, smb_path, as_sddl=False,
333 sinfo=None, access_mask=None):
334 assert '/' not in smb_path
336 ntacl_sd = self.smb_conn.get_acl(smb_path,
337 sinfo=sinfo,
338 access_mask=access_mask)
340 return ntacl_sd.as_sddl(self.dom_sid) if as_sddl else ntacl_sd
342 def set_acl(self, smb_path, ntacl_sd,
343 sinfo=None, access_mask=None):
344 assert '/' not in smb_path
346 assert(isinstance(ntacl_sd, str) or isinstance(ntacl_sd, security.descriptor))
347 if isinstance(ntacl_sd, str):
348 tmp_desc = security.descriptor.from_sddl(ntacl_sd, self.domain_sid)
349 elif isinstance(ntacl_sd, security.descriptor):
350 tmp_desc = ntacl_sd
352 self.smb_conn.set_acl(smb_path, tmp_desc,
353 sinfo=sinfo,
354 access_mask=access_mask)
356 def list(self, smb_path=''):
358 List file and dir base names in smb_path without recursive.
360 assert '/' not in smb_path
361 return self.smb_conn.list(smb_path, attribs=SMB_FILE_ATTRIBUTE_FLAGS)
363 def is_dir(self, attrib):
365 Check whether the attrib value is a directory.
367 attrib is from list method.
369 return bool(attrib & libsmb.FILE_ATTRIBUTE_DIRECTORY)
371 def join(self, root, name):
373 Join path with '\\'
375 return root + '\\' + name if root else name
377 def loadfile(self, smb_path):
378 assert '/' not in smb_path
379 return self.smb_conn.loadfile(smb_path)
381 def create_tree(self, tree, smb_path=''):
383 Create files as defined in tree
385 for name, content in tree.items():
386 fullname = self.join(smb_path, name)
387 if isinstance(content, dict): # a dir
388 if not self.smb_conn.chkpath(fullname):
389 self.smb_conn.mkdir(fullname)
390 self.create_tree(content, smb_path=fullname)
391 else: # a file
392 self.smb_conn.savefile(fullname, content)
394 def get_tree(self, smb_path=''):
396 Get the tree structure via smb conn
398 self.smb_conn.list example:
402 'attrib': 16,
403 'mtime': 1528848309,
404 'name': 'dir1',
405 'short_name': 'dir1',
406 'size': 0L
407 }, {
408 'attrib': 32,
409 'mtime': 1528848309,
410 'name': 'file0.txt',
411 'short_name': 'file0.txt',
412 'size': 10L
416 tree = {}
417 for item in self.list(smb_path):
418 name = item['name']
419 fullname = self.join(smb_path, name)
420 if self.is_dir(item['attrib']):
421 tree[name] = self.get_tree(smb_path=fullname)
422 else:
423 tree[name] = self.loadfile(fullname)
424 return tree
426 def get_ntacls(self, smb_path=''):
428 Get ntacl for each file and dir via smb conn
430 ntacls = {}
431 for item in self.list(smb_path):
432 name = item['name']
433 fullname = self.join(smb_path, name)
434 if self.is_dir(item['attrib']):
435 ntacls.update(self.get_ntacls(smb_path=fullname))
436 else:
437 ntacl_sd = self.get_acl(fullname)
438 ntacls[fullname] = ntacl_sd.as_sddl(self.dom_sid)
439 return ntacls
441 def delete_tree(self):
442 for item in self.list():
443 name = item['name']
444 if self.is_dir(item['attrib']):
445 self.smb_conn.deltree(name)
446 else:
447 self.smb_conn.unlink(name)
450 class NtaclsHelper:
452 def __init__(self, service, smb_conf_path, dom_sid):
453 self.service = service
454 self.dom_sid = dom_sid
456 # this is important to help smbd find services.
457 self.lp = s3param.get_context()
458 self.lp.load(smb_conf_path)
460 self.use_ntvfs = "smb" in self.lp.get("server services")
462 def getntacl(self, path, session_info, as_sddl=False, direct_db_access=None):
463 if direct_db_access is None:
464 direct_db_access = self.use_ntvfs
466 ntacl_sd = getntacl(
467 self.lp, path, session_info,
468 direct_db_access=direct_db_access,
469 service=self.service)
471 return ntacl_sd.as_sddl(self.dom_sid) if as_sddl else ntacl_sd
473 def setntacl(self, path, ntacl_sd, session_info):
474 # ntacl_sd can be obj or str
475 return setntacl(self.lp, path, ntacl_sd, self.dom_sid, session_info,
476 use_ntvfs=self.use_ntvfs)
479 def _create_ntacl_file(dst, ntacl_sddl_str):
480 with open(dst + '.NTACL', 'w') as f:
481 f.write(ntacl_sddl_str)
484 def _read_ntacl_file(src):
485 ntacl_file = src + '.NTACL'
487 if not os.path.exists(ntacl_file):
488 return None
490 with open(ntacl_file, 'r') as f:
491 return f.read()
494 def backup_online(smb_conn, dest_tarfile_path, dom_sid):
496 Backup all files and dirs with ntacl for the serive behind smb_conn.
498 1. Create a temp dir as container dir
499 2. Backup all files with dir structure into container dir
500 3. Generate file.NTACL files for each file and dir in container dir
501 4. Create a tar file from container dir(without top level folder)
502 5. Delete container dir
505 logger = get_samba_logger()
507 if isinstance(dom_sid, str):
508 dom_sid = security.dom_sid(dom_sid)
510 smb_helper = SMBHelper(smb_conn, dom_sid)
512 remotedir = '' # root dir
514 localdir = tempfile.mkdtemp()
516 r_dirs = [remotedir]
517 l_dirs = [localdir]
519 while r_dirs:
520 r_dir = r_dirs.pop()
521 l_dir = l_dirs.pop()
523 for e in smb_helper.list(smb_path=r_dir):
524 r_name = smb_helper.join(r_dir, e['name'])
525 l_name = os.path.join(l_dir, e['name'])
527 if smb_helper.is_dir(e['attrib']):
528 r_dirs.append(r_name)
529 l_dirs.append(l_name)
530 os.mkdir(l_name)
531 else:
532 data = smb_helper.loadfile(r_name)
533 with open(l_name, 'wb') as f:
534 f.write(data)
536 # get ntacl for this entry and save alongside
537 try:
538 ntacl_sddl_str = smb_helper.get_acl(r_name, as_sddl=True)
539 _create_ntacl_file(l_name, ntacl_sddl_str)
540 except NTSTATUSError as e:
541 logger.error('Failed to get the ntacl for %s: %s' %
542 (r_name, e.args[1]))
543 logger.warning('The permissions for %s may not be' % r_name +
544 ' restored correctly')
546 with tarfile.open(name=dest_tarfile_path, mode='w:gz') as tar:
547 for name in os.listdir(localdir):
548 path = os.path.join(localdir, name)
549 tar.add(path, arcname=name)
551 shutil.rmtree(localdir)
554 def backup_offline(src_service_path, dest_tarfile_path, smb_conf_path, dom_sid):
556 Backup files and ntacls to a tarfile for a service
558 service = src_service_path.rstrip('/').rsplit('/', 1)[-1]
559 tempdir = tempfile.mkdtemp()
560 session_info = system_session_unix()
562 ntacls_helper = NtaclsHelper(service, smb_conf_path, dom_sid)
564 for dirpath, dirnames, filenames in os.walk(src_service_path):
565 # each dir only cares about its direct children
566 rel_dirpath = os.path.relpath(dirpath, start=src_service_path)
567 dst_dirpath = os.path.join(tempdir, rel_dirpath)
569 # create sub dirs and NTACL file
570 for dirname in dirnames:
571 src = os.path.join(dirpath, dirname)
572 dst = os.path.join(dst_dirpath, dirname)
573 # mkdir with metadata
574 smbd.mkdir(dst, session_info, service)
575 ntacl_sddl_str = ntacls_helper.getntacl(src, session_info, as_sddl=True)
576 _create_ntacl_file(dst, ntacl_sddl_str)
578 # create files and NTACL file, then copy data
579 for filename in filenames:
580 src = os.path.join(dirpath, filename)
581 dst = os.path.join(dst_dirpath, filename)
582 # create an empty file with metadata
583 smbd.create_file(dst, session_info, service)
584 ntacl_sddl_str = ntacls_helper.getntacl(src, session_info, as_sddl=True)
585 _create_ntacl_file(dst, ntacl_sddl_str)
587 # now put data in
588 with open(src, 'rb') as src_file:
589 data = src_file.read()
590 with open(dst, 'wb') as dst_file:
591 dst_file.write(data)
593 # add all files in tempdir to tarfile without a top folder
594 with tarfile.open(name=dest_tarfile_path, mode='w:gz') as tar:
595 for name in os.listdir(tempdir):
596 path = os.path.join(tempdir, name)
597 tar.add(path, arcname=name)
599 shutil.rmtree(tempdir)
602 def backup_restore(src_tarfile_path, dst_service_path, samdb_conn, smb_conf_path):
604 Restore files and ntacls from a tarfile to a service
606 logger = get_samba_logger()
607 service = dst_service_path.rstrip('/').rsplit('/', 1)[-1]
608 tempdir = tempfile.mkdtemp() # src files
610 dom_sid_str = samdb_conn.get_domain_sid()
611 dom_sid = security.dom_sid(dom_sid_str)
613 ntacls_helper = NtaclsHelper(service, smb_conf_path, dom_sid)
614 session_info = system_session_unix()
616 with tarfile.open(src_tarfile_path) as f:
617 f.extractall(path=tempdir)
618 # e.g.: /tmp/tmpRNystY/{dir1,dir1.NTACL,...file1,file1.NTACL}
620 for dirpath, dirnames, filenames in os.walk(tempdir):
621 rel_dirpath = os.path.relpath(dirpath, start=tempdir)
622 dst_dirpath = os.path.normpath(
623 os.path.join(dst_service_path, rel_dirpath))
625 for dirname in dirnames:
626 if not dirname.endswith('.NTACL'):
627 src = os.path.join(dirpath, dirname)
628 dst = os.path.join(dst_dirpath, dirname)
629 if not os.path.isdir(dst):
630 # dst must be absolute path for smbd API
631 smbd.mkdir(dst, session_info, service)
633 ntacl_sddl_str = _read_ntacl_file(src)
634 if ntacl_sddl_str:
635 ntacls_helper.setntacl(dst, ntacl_sddl_str, session_info)
636 else:
637 logger.warning(
638 'Failed to restore ntacl for directory %s.' % dst
639 + ' Please check the permissions are correct')
641 for filename in filenames:
642 if not filename.endswith('.NTACL'):
643 src = os.path.join(dirpath, filename)
644 dst = os.path.join(dst_dirpath, filename)
645 if not os.path.isfile(dst):
646 # dst must be absolute path for smbd API
647 smbd.create_file(dst, session_info, service)
649 ntacl_sddl_str = _read_ntacl_file(src)
650 if ntacl_sddl_str:
651 ntacls_helper.setntacl(dst, ntacl_sddl_str, session_info)
652 else:
653 logger.warning('Failed to restore ntacl for file %s.' % dst
654 + ' Please check the permissions are correct')
656 # now put data in
657 with open(src, 'rb') as src_file:
658 data = src_file.read()
659 with open(dst, 'wb') as dst_file:
660 dst_file.write(data)
662 shutil.rmtree(tempdir)