VERSION: Disable GIT_SNAPSHOT for the 4.9.13 release.
[Samba.git] / python / samba / ntacls.py
blobff2e92679c16e4962831c88a8b117020c4820592
1 # Unix SMB/CIFS implementation.
2 # Copyright (C) Matthieu Patou <mat@matws.net> 2009-2010
5 # This program is free software; you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; either version 3 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
19 from __future__ import print_function
20 """NT Acls."""
23 import os
24 import tarfile
25 import tempfile
26 import shutil
28 import samba.xattr_native, samba.xattr_tdb, samba.posix_eadb
29 from samba.samba3 import param as s3param
30 from samba.dcerpc import security, xattr, idmap
31 from samba.ndr import ndr_pack, ndr_unpack
32 from samba.samba3 import smbd
33 from samba.auth import admin_session
34 from samba import smb
36 # don't include volumes
37 SMB_FILE_ATTRIBUTE_FLAGS = smb.FILE_ATTRIBUTE_SYSTEM | \
38 smb.FILE_ATTRIBUTE_DIRECTORY | \
39 smb.FILE_ATTRIBUTE_ARCHIVE | \
40 smb.FILE_ATTRIBUTE_HIDDEN
43 SECURITY_SECINFO_FLAGS = security.SECINFO_OWNER | \
44 security.SECINFO_GROUP | \
45 security.SECINFO_DACL | \
46 security.SECINFO_SACL
49 # SEC_FLAG_SYSTEM_SECURITY is required otherwise get Access Denied
50 SECURITY_SEC_FLAGS = security.SEC_FLAG_SYSTEM_SECURITY | \
51 security.SEC_STD_READ_CONTROL
54 class XattrBackendError(Exception):
55 """A generic xattr backend error."""
58 def checkset_backend(lp, backend, eadbfile):
59 '''return the path to the eadb, or None'''
60 if backend is None:
61 xattr_tdb = lp.get("xattr_tdb:file")
62 if xattr_tdb is not None:
63 return (samba.xattr_tdb, lp.get("xattr_tdb:file"))
64 posix_eadb = lp.get("posix:eadb")
65 if posix_eadb is not None:
66 return (samba.posix_eadb, lp.get("posix:eadb"))
67 return (None, None)
68 elif backend == "native":
69 return (None, None)
70 elif backend == "eadb":
71 if eadbfile is not None:
72 return (samba.posix_eadb, eadbfile)
73 else:
74 return (samba.posix_eadb, os.path.abspath(os.path.join(lp.get("private dir"), "eadb.tdb")))
75 elif backend == "tdb":
76 if eadbfile is not None:
77 return (samba.xattr_tdb, eadbfile)
78 else:
79 state_dir = lp.get("state directory")
80 db_path = os.path.abspath(os.path.join(state_dir, "xattr.tdb"))
81 return (samba.xattr_tdb, db_path)
82 else:
83 raise XattrBackendError("Invalid xattr backend choice %s"%backend)
85 def getdosinfo(lp, file):
86 try:
87 attribute = samba.xattr_native.wrap_getxattr(file,
88 xattr.XATTR_DOSATTRIB_NAME_S3)
89 except Exception:
90 return
92 return ndr_unpack(xattr.DOSATTRIB, attribute)
94 def getntacl(lp, file, backend=None, eadbfile=None, direct_db_access=True, service=None):
95 if direct_db_access:
96 (backend_obj, dbname) = checkset_backend(lp, backend, eadbfile)
97 if dbname is not None:
98 try:
99 attribute = backend_obj.wrap_getxattr(dbname, file,
100 xattr.XATTR_NTACL_NAME)
101 except Exception:
102 # FIXME: Don't catch all exceptions, just those related to opening
103 # xattrdb
104 print("Fail to open %s" % dbname)
105 attribute = samba.xattr_native.wrap_getxattr(file,
106 xattr.XATTR_NTACL_NAME)
107 else:
108 attribute = samba.xattr_native.wrap_getxattr(file,
109 xattr.XATTR_NTACL_NAME)
110 ntacl = ndr_unpack(xattr.NTACL, attribute)
111 if ntacl.version == 1:
112 return ntacl.info
113 elif ntacl.version == 2:
114 return ntacl.info.sd
115 elif ntacl.version == 3:
116 return ntacl.info.sd
117 elif ntacl.version == 4:
118 return ntacl.info.sd
119 else:
120 return smbd.get_nt_acl(file, SECURITY_SECINFO_FLAGS, service=service)
123 def setntacl(lp, file, sddl, domsid,
124 backend=None, eadbfile=None,
125 use_ntvfs=True, skip_invalid_chown=False,
126 passdb=None, service=None, session_info=None):
128 A wrapper for smbd set_nt_acl api.
130 Args:
131 lp (LoadParam): load param from conf
132 file (str): a path to file or dir
133 sddl (str): ntacl sddl string
134 service (str): name of share service, e.g.: sysvol
135 session_info (auth_session_info): session info for authentication
137 Note:
138 Get `session_info` with `samba.auth.user_session`, do not use the
139 `admin_session` api.
141 Returns:
142 None
145 assert(isinstance(domsid, str) or isinstance(domsid, security.dom_sid))
146 if isinstance(domsid, str):
147 sid = security.dom_sid(domsid)
148 elif isinstance(domsid, security.dom_sid):
149 sid = domsid
150 domsid = str(sid)
152 assert(isinstance(sddl, str) or isinstance(sddl, security.descriptor))
153 if isinstance(sddl, str):
154 sd = security.descriptor.from_sddl(sddl, sid)
155 elif isinstance(sddl, security.descriptor):
156 sd = sddl
157 sddl = sd.as_sddl(sid)
159 if not use_ntvfs and skip_invalid_chown:
160 # Check if the owner can be resolved as a UID
161 (owner_id, owner_type) = passdb.sid_to_id(sd.owner_sid)
162 if ((owner_type != idmap.ID_TYPE_UID) and (owner_type != idmap.ID_TYPE_BOTH)):
163 # Check if this particular owner SID was domain admins,
164 # because we special-case this as mapping to
165 # 'administrator' instead.
166 if sd.owner_sid == security.dom_sid("%s-%d" % (domsid, security.DOMAIN_RID_ADMINS)):
167 administrator = security.dom_sid("%s-%d" % (domsid, security.DOMAIN_RID_ADMINISTRATOR))
168 (admin_id, admin_type) = passdb.sid_to_id(administrator)
170 # Confirm we have a UID for administrator
171 if ((admin_type == idmap.ID_TYPE_UID) or (admin_type == idmap.ID_TYPE_BOTH)):
173 # Set it, changing the owner to 'administrator' rather than domain admins
174 sd2 = sd
175 sd2.owner_sid = administrator
177 smbd.set_nt_acl(
178 file, SECURITY_SECINFO_FLAGS, sd2,
179 service=service, session_info=session_info)
181 # and then set an NTVFS ACL (which does not set the posix ACL) to pretend the owner really was set
182 use_ntvfs = True
183 else:
184 raise XattrBackendError("Unable to find UID for domain administrator %s, got id %d of type %d" % (administrator, admin_id, admin_type))
185 else:
186 # For all other owning users, reset the owner to root
187 # and then set the ACL without changing the owner
189 # This won't work in test environments, as it tries a real (rather than xattr-based fake) chown
191 os.chown(file, 0, 0)
192 smbd.set_nt_acl(
193 file,
194 security.SECINFO_GROUP |
195 security.SECINFO_DACL |
196 security.SECINFO_SACL,
197 sd, service=service, session_info=session_info)
199 if use_ntvfs:
200 (backend_obj, dbname) = checkset_backend(lp, backend, eadbfile)
201 ntacl = xattr.NTACL()
202 ntacl.version = 1
203 ntacl.info = sd
204 if dbname is not None:
205 try:
206 backend_obj.wrap_setxattr(dbname,
207 file, xattr.XATTR_NTACL_NAME, ndr_pack(ntacl))
208 except Exception:
209 # FIXME: Don't catch all exceptions, just those related to opening
210 # xattrdb
211 print("Fail to open %s" % dbname)
212 samba.xattr_native.wrap_setxattr(file, xattr.XATTR_NTACL_NAME,
213 ndr_pack(ntacl))
214 else:
215 samba.xattr_native.wrap_setxattr(file, xattr.XATTR_NTACL_NAME,
216 ndr_pack(ntacl))
217 else:
218 smbd.set_nt_acl(
219 file, SECURITY_SECINFO_FLAGS, sd,
220 service=service, session_info=session_info)
223 def ldapmask2filemask(ldm):
224 """Takes the access mask of a DS ACE and transform them in a File ACE mask.
226 RIGHT_DS_CREATE_CHILD = 0x00000001
227 RIGHT_DS_DELETE_CHILD = 0x00000002
228 RIGHT_DS_LIST_CONTENTS = 0x00000004
229 ACTRL_DS_SELF = 0x00000008
230 RIGHT_DS_READ_PROPERTY = 0x00000010
231 RIGHT_DS_WRITE_PROPERTY = 0x00000020
232 RIGHT_DS_DELETE_TREE = 0x00000040
233 RIGHT_DS_LIST_OBJECT = 0x00000080
234 RIGHT_DS_CONTROL_ACCESS = 0x00000100
235 FILE_READ_DATA = 0x0001
236 FILE_LIST_DIRECTORY = 0x0001
237 FILE_WRITE_DATA = 0x0002
238 FILE_ADD_FILE = 0x0002
239 FILE_APPEND_DATA = 0x0004
240 FILE_ADD_SUBDIRECTORY = 0x0004
241 FILE_CREATE_PIPE_INSTANCE = 0x0004
242 FILE_READ_EA = 0x0008
243 FILE_WRITE_EA = 0x0010
244 FILE_EXECUTE = 0x0020
245 FILE_TRAVERSE = 0x0020
246 FILE_DELETE_CHILD = 0x0040
247 FILE_READ_ATTRIBUTES = 0x0080
248 FILE_WRITE_ATTRIBUTES = 0x0100
249 DELETE = 0x00010000
250 READ_CONTROL = 0x00020000
251 WRITE_DAC = 0x00040000
252 WRITE_OWNER = 0x00080000
253 SYNCHRONIZE = 0x00100000
254 STANDARD_RIGHTS_ALL = 0x001F0000
256 filemask = ldm & STANDARD_RIGHTS_ALL
258 if (ldm & RIGHT_DS_READ_PROPERTY) and (ldm & RIGHT_DS_LIST_CONTENTS):
259 filemask = filemask | (SYNCHRONIZE | FILE_LIST_DIRECTORY |
260 FILE_READ_ATTRIBUTES | FILE_READ_EA |
261 FILE_READ_DATA | FILE_EXECUTE)
263 if ldm & RIGHT_DS_WRITE_PROPERTY:
264 filemask = filemask | (SYNCHRONIZE | FILE_WRITE_DATA |
265 FILE_APPEND_DATA | FILE_WRITE_EA |
266 FILE_WRITE_ATTRIBUTES | FILE_ADD_FILE |
267 FILE_ADD_SUBDIRECTORY)
269 if ldm & RIGHT_DS_CREATE_CHILD:
270 filemask = filemask | (FILE_ADD_SUBDIRECTORY | FILE_ADD_FILE)
272 if ldm & RIGHT_DS_DELETE_CHILD:
273 filemask = filemask | FILE_DELETE_CHILD
275 return filemask
278 def dsacl2fsacl(dssddl, sid, as_sddl=True):
281 This function takes an the SDDL representation of a DS
282 ACL and return the SDDL representation of this ACL adapted
283 for files. It's used for Policy object provision
285 ref = security.descriptor.from_sddl(dssddl, sid)
286 fdescr = security.descriptor()
287 fdescr.owner_sid = ref.owner_sid
288 fdescr.group_sid = ref.group_sid
289 fdescr.type = ref.type
290 fdescr.revision = ref.revision
291 aces = ref.dacl.aces
292 for i in range(0, len(aces)):
293 ace = aces[i]
294 if not ace.type & security.SEC_ACE_TYPE_ACCESS_ALLOWED_OBJECT and str(ace.trustee) != security.SID_BUILTIN_PREW2K:
295 # if fdescr.type & security.SEC_DESC_DACL_AUTO_INHERITED:
296 ace.flags = ace.flags | security.SEC_ACE_FLAG_OBJECT_INHERIT | security.SEC_ACE_FLAG_CONTAINER_INHERIT
297 if str(ace.trustee) == security.SID_CREATOR_OWNER:
298 # For Creator/Owner the IO flag is set as this ACE has only a sense for child objects
299 ace.flags = ace.flags | security.SEC_ACE_FLAG_INHERIT_ONLY
300 ace.access_mask = ldapmask2filemask(ace.access_mask)
301 fdescr.dacl_add(ace)
303 if not as_sddl:
304 return fdescr
306 return fdescr.as_sddl(sid)
309 class SMBHelper:
311 A wrapper class for SMB connection
313 smb_path: path with separator "\\" other than "/"
316 def __init__(self, smb_conn, dom_sid):
317 self.smb_conn = smb_conn
318 self.dom_sid = dom_sid
320 def get_acl(self, smb_path, as_sddl=False):
321 assert '/' not in smb_path
323 ntacl_sd = self.smb_conn.get_acl(
324 smb_path, SECURITY_SECINFO_FLAGS, SECURITY_SEC_FLAGS)
326 return ntacl_sd.as_sddl(self.dom_sid) if as_sddl else ntacl_sd
328 def list(self, smb_path=''):
330 List file and dir base names in smb_path without recursive.
332 assert '/' not in smb_path
333 return self.smb_conn.list(smb_path, attribs=SMB_FILE_ATTRIBUTE_FLAGS)
335 def is_dir(self, attrib):
337 Check whether the attrib value is a directory.
339 attrib is from list method.
341 return bool(attrib & smb.FILE_ATTRIBUTE_DIRECTORY)
343 def join(self, root, name):
345 Join path with '\\'
347 return root + '\\' + name if root else name
349 def loadfile(self, smb_path):
350 assert '/' not in smb_path
351 return self.smb_conn.loadfile(smb_path)
353 def create_tree(self, tree, smb_path=''):
355 Create files as defined in tree
357 for name, content in tree.items():
358 fullname = self.join(smb_path, name)
359 if isinstance(content, dict): # a dir
360 if not self.smb_conn.chkpath(fullname):
361 self.smb_conn.mkdir(fullname)
362 self.create_tree(content, smb_path=fullname)
363 else: # a file
364 self.smb_conn.savefile(fullname, content)
366 def get_tree(self, smb_path=''):
368 Get the tree structure via smb conn
370 self.smb_conn.list example:
374 'attrib': 16,
375 'mtime': 1528848309,
376 'name': 'dir1',
377 'short_name': 'dir1',
378 'size': 0L
379 }, {
380 'attrib': 32,
381 'mtime': 1528848309,
382 'name': 'file0.txt',
383 'short_name': 'file0.txt',
384 'size': 10L
388 tree = {}
389 for item in self.list(smb_path):
390 name = item['name']
391 fullname = self.join(smb_path, name)
392 if self.is_dir(item['attrib']):
393 tree[name] = self.get_tree(smb_path=fullname)
394 else:
395 tree[name] = self.loadfile(fullname)
396 return tree
398 def get_ntacls(self, smb_path=''):
400 Get ntacl for each file and dir via smb conn
402 ntacls = {}
403 for item in self.list(smb_path):
404 name = item['name']
405 fullname = self.join(smb_path, name)
406 if self.is_dir(item['attrib']):
407 ntacls.update(self.get_ntacls(smb_path=fullname))
408 else:
409 ntacl_sd = self.get_acl(fullname)
410 ntacls[fullname] = ntacl_sd.as_sddl(self.dom_sid)
411 return ntacls
413 def delete_tree(self):
414 for item in self.list():
415 name = item['name']
416 if self.is_dir(item['attrib']):
417 self.smb_conn.deltree(name)
418 else:
419 self.smb_conn.unlink(name)
422 class NtaclsHelper:
424 def __init__(self, service, smb_conf_path, dom_sid):
425 self.service = service
426 self.dom_sid = dom_sid
428 # this is important to help smbd find services.
429 self.lp = s3param.get_context()
430 self.lp.load(smb_conf_path)
432 self.use_ntvfs = "smb" in self.lp.get("server services")
434 def getntacl(self, path, as_sddl=False, direct_db_access=None):
435 if direct_db_access is None:
436 direct_db_access = self.use_ntvfs
438 ntacl_sd = getntacl(
439 self.lp, path,
440 direct_db_access=direct_db_access,
441 service=self.service)
443 return ntacl_sd.as_sddl(self.dom_sid) if as_sddl else ntacl_sd
445 def setntacl(self, path, ntacl_sd):
446 # ntacl_sd can be obj or str
447 return setntacl(self.lp, path, ntacl_sd, self.dom_sid)
450 def _create_ntacl_file(dst, ntacl_sddl_str):
451 with open(dst + '.NTACL', 'w') as f:
452 f.write(ntacl_sddl_str)
455 def _read_ntacl_file(src):
456 with open(src + '.NTACL', 'r') as f:
457 return f.read()
460 def backup_online(smb_conn, dest_tarfile_path, dom_sid):
462 Backup all files and dirs with ntacl for the serive behind smb_conn.
464 1. Create a temp dir as container dir
465 2. Backup all files with dir structure into container dir
466 3. Generate file.NTACL files for each file and dir in contianer dir
467 4. Create a tar file from container dir(without top level folder)
468 5. Delete contianer dir
471 if isinstance(dom_sid, str):
472 dom_sid = security.dom_sid(dom_sid)
474 smb_helper = SMBHelper(smb_conn, dom_sid)
476 remotedir = '' # root dir
478 localdir = tempfile.mkdtemp()
480 r_dirs = [remotedir]
481 l_dirs = [localdir]
483 while r_dirs:
484 r_dir = r_dirs.pop()
485 l_dir = l_dirs.pop()
487 for e in smb_helper.list(smb_path=r_dir):
488 r_name = smb_helper.join(r_dir, e['name'])
489 l_name = os.path.join(l_dir, e['name'])
491 if smb_helper.is_dir(e['attrib']):
492 r_dirs.append(r_name)
493 l_dirs.append(l_name)
494 os.mkdir(l_name)
495 else:
496 data = smb_helper.loadfile(r_name)
497 with open(l_name, 'wb') as f:
498 f.write(data)
500 # get ntacl for this entry and save alongside
501 ntacl_sddl_str = smb_helper.get_acl(r_name, as_sddl=True)
502 _create_ntacl_file(l_name, ntacl_sddl_str)
504 with tarfile.open(name=dest_tarfile_path, mode='w:gz') as tar:
505 for name in os.listdir(localdir):
506 path = os.path.join(localdir, name)
507 tar.add(path, arcname=name)
509 shutil.rmtree(localdir)
512 def backup_offline(src_service_path, dest_tarfile_path, samdb_conn, smb_conf_path):
514 Backup files and ntacls to a tarfile for a service
516 service = src_service_path.rstrip('/').rsplit('/', 1)[-1]
517 tempdir = tempfile.mkdtemp()
519 dom_sid_str = samdb_conn.get_domain_sid()
520 dom_sid = security.dom_sid(dom_sid_str)
522 ntacls_helper = NtaclsHelper(service, smb_conf_path, dom_sid)
524 for dirpath, dirnames, filenames in os.walk(src_service_path):
525 # each dir only cares about its direct children
526 rel_dirpath = os.path.relpath(dirpath, start=src_service_path)
527 dst_dirpath = os.path.join(tempdir, rel_dirpath)
529 # create sub dirs and NTACL file
530 for dirname in dirnames:
531 src = os.path.join(dirpath, dirname)
532 dst = os.path.join(dst_dirpath, dirname)
533 # mkdir with metadata
534 smbd.mkdir(dst, service)
535 ntacl_sddl_str = ntacls_helper.getntacl(src, as_sddl=True)
536 _create_ntacl_file(dst, ntacl_sddl_str)
538 # create files and NTACL file, then copy data
539 for filename in filenames:
540 src = os.path.join(dirpath, filename)
541 dst = os.path.join(dst_dirpath, filename)
542 # create an empty file with metadata
543 smbd.create_file(dst, service)
544 ntacl_sddl_str = ntacls_helper.getntacl(src, as_sddl=True)
545 _create_ntacl_file(dst, ntacl_sddl_str)
547 # now put data in
548 with open(src, 'rb') as src_file:
549 data = src_file.read()
550 with open(dst, 'wb') as dst_file:
551 dst_file.write(data)
553 # add all files in tempdir to tarfile without a top folder
554 with tarfile.open(name=dest_tarfile_path, mode='w:gz') as tar:
555 for name in os.listdir(tempdir):
556 path = os.path.join(tempdir, name)
557 tar.add(path, arcname=name)
559 shutil.rmtree(tempdir)
562 def backup_restore(src_tarfile_path, dst_service_path, samdb_conn, smb_conf_path):
564 Restore files and ntacls from a tarfile to a service
566 service = dst_service_path.rstrip('/').rsplit('/', 1)[-1]
567 tempdir = tempfile.mkdtemp() # src files
569 dom_sid_str = samdb_conn.get_domain_sid()
570 dom_sid = security.dom_sid(dom_sid_str)
572 ntacls_helper = NtaclsHelper(service, smb_conf_path, dom_sid)
574 with tarfile.open(src_tarfile_path) as f:
575 f.extractall(path=tempdir)
576 # e.g.: /tmp/tmpRNystY/{dir1,dir1.NTACL,...file1,file1.NTACL}
578 for dirpath, dirnames, filenames in os.walk(tempdir):
579 rel_dirpath = os.path.relpath(dirpath, start=tempdir)
580 dst_dirpath = os.path.normpath(
581 os.path.join(dst_service_path, rel_dirpath))
583 for dirname in dirnames:
584 if not dirname.endswith('.NTACL'):
585 src = os.path.join(dirpath, dirname)
586 dst = os.path.join(dst_dirpath, dirname)
587 if not os.path.isdir(dst):
588 # dst must be absolute path for smbd API
589 smbd.mkdir(dst, service)
590 ntacl_sddl_str = _read_ntacl_file(src)
591 ntacls_helper.setntacl(dst, ntacl_sddl_str)
593 for filename in filenames:
594 if not filename.endswith('.NTACL'):
595 src = os.path.join(dirpath, filename)
596 dst = os.path.join(dst_dirpath, filename)
597 if not os.path.isfile(dst):
598 # dst must be absolute path for smbd API
599 smbd.create_file(dst, service)
600 ntacl_sddl_str = _read_ntacl_file(src)
601 ntacls_helper.setntacl(dst, ntacl_sddl_str)
603 # now put data in
604 with open(src, 'rb') as src_file:
605 data = src_file.read()
606 with open(dst, 'wb') as dst_file:
607 dst_file.write(data)
609 shutil.rmtree(tempdir)