python: use python3 style super statements
[samba.git] / python / samba / samdb.py
blob9bbec435062f5a71dacb39f10f7f2f7ec27f9aac
1 # Unix SMB/CIFS implementation.
2 # Copyright (C) Jelmer Vernooij <jelmer@samba.org> 2007-2010
3 # Copyright (C) Matthias Dieter Wallnoefer 2009
5 # Based on the original in EJS:
6 # Copyright (C) Andrew Tridgell <tridge@samba.org> 2005
7 # Copyright (C) Giampaolo Lauria <lauria2@yahoo.com> 2011
9 # This program is free software; you can redistribute it and/or modify
10 # it under the terms of the GNU General Public License as published by
11 # the Free Software Foundation; either version 3 of the License, or
12 # (at your option) any later version.
14 # This program is distributed in the hope that it will be useful,
15 # but WITHOUT ANY WARRANTY; without even the implied warranty of
16 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 # GNU General Public License for more details.
19 # You should have received a copy of the GNU General Public License
20 # along with this program. If not, see <http://www.gnu.org/licenses/>.
23 """Convenience functions for using the SAM."""
25 import samba
26 import ldb
27 import time
28 import base64
29 import os
30 import re
31 from samba import dsdb, dsdb_dns
32 from samba.ndr import ndr_unpack, ndr_pack
33 from samba.dcerpc import drsblobs, misc
34 from samba.common import normalise_int32
35 from samba.common import get_bytes, cmp
36 from samba.dcerpc import security
37 from samba import is_ad_dc_built
38 import binascii
40 __docformat__ = "restructuredText"
43 def get_default_backend_store():
44 return "tdb"
46 class SamDBError(Exception):
47 pass
49 class SamDBNotFoundError(SamDBError):
50 pass
52 class SamDB(samba.Ldb):
53 """The SAM database."""
55 hash_oid_name = {}
56 hash_well_known = {}
58 class _CleanUpOnError:
59 def __init__(self, samdb, dn):
60 self.samdb = samdb
61 self.dn = dn
63 def __enter__(self):
64 pass
66 def __exit__(self, exc_type, exc_val, exc_tb):
67 if exc_type is not None:
68 # We failed to modify the account. If we connected to the
69 # database over LDAP, we don't have transactions, and so when
70 # we call transaction_cancel(), the account will still exist in
71 # a half-created state. We'll delete the account to ensure that
72 # doesn't happen.
73 self.samdb.delete(self.dn)
75 # Don't suppress any exceptions
76 return False
78 def __init__(self, url=None, lp=None, modules_dir=None, session_info=None,
79 credentials=None, flags=ldb.FLG_DONT_CREATE_DB,
80 options=None, global_schema=True,
81 auto_connect=True, am_rodc=None):
82 self.lp = lp
83 if not auto_connect:
84 url = None
85 elif url is None and lp is not None:
86 url = lp.samdb_url()
88 self.url = url
90 super().__init__(url=url, lp=lp, modules_dir=modules_dir,
91 session_info=session_info, credentials=credentials, flags=flags,
92 options=options)
94 if global_schema:
95 dsdb._dsdb_set_global_schema(self)
97 if am_rodc is not None:
98 dsdb._dsdb_set_am_rodc(self, am_rodc)
100 def connect(self, url=None, flags=0, options=None):
101 """connect to the database"""
102 if self.lp is not None and not os.path.exists(url):
103 url = self.lp.private_path(url)
104 self.url = url
106 super().connect(url=url, flags=flags, options=options)
108 def am_rodc(self):
109 """return True if we are an RODC"""
110 return dsdb._am_rodc(self)
112 def am_pdc(self):
113 """return True if we are an PDC emulator"""
114 return dsdb._am_pdc(self)
116 def domain_dn(self):
117 """return the domain DN"""
118 return str(self.get_default_basedn())
120 def schema_dn(self):
121 """return the schema partition dn"""
122 return str(self.get_schema_basedn())
124 def disable_account(self, search_filter):
125 """Disables an account
127 :param search_filter: LDAP filter to find the user (eg
128 samccountname=name)
131 flags = samba.dsdb.UF_ACCOUNTDISABLE
132 self.toggle_userAccountFlags(search_filter, flags, on=True)
134 def enable_account(self, search_filter):
135 """Enables an account
137 :param search_filter: LDAP filter to find the user (eg
138 samccountname=name)
141 flags = samba.dsdb.UF_ACCOUNTDISABLE | samba.dsdb.UF_PASSWD_NOTREQD
142 self.toggle_userAccountFlags(search_filter, flags, on=False)
144 def toggle_userAccountFlags(self, search_filter, flags, flags_str=None,
145 on=True, strict=False):
146 """Toggle_userAccountFlags
148 :param search_filter: LDAP filter to find the user (eg
149 samccountname=name)
150 :param flags: samba.dsdb.UF_* flags
151 :param on: on=True (default) => set, on=False => unset
152 :param strict: strict=False (default) ignore if no action is needed
153 strict=True raises an Exception if...
155 res = self.search(base=self.domain_dn(), scope=ldb.SCOPE_SUBTREE,
156 expression=search_filter, attrs=["userAccountControl"])
157 if len(res) == 0:
158 raise Exception("Unable to find account where '%s'" % search_filter)
159 assert(len(res) == 1)
160 account_dn = res[0].dn
162 old_uac = int(res[0]["userAccountControl"][0])
163 if on:
164 if strict and (old_uac & flags):
165 error = "Account flag(s) '%s' already set" % flags_str
166 raise Exception(error)
168 new_uac = old_uac | flags
169 else:
170 if strict and not (old_uac & flags):
171 error = "Account flag(s) '%s' already unset" % flags_str
172 raise Exception(error)
174 new_uac = old_uac & ~flags
176 if old_uac == new_uac:
177 return
179 mod = """
180 dn: %s
181 changetype: modify
182 delete: userAccountControl
183 userAccountControl: %u
184 add: userAccountControl
185 userAccountControl: %u
186 """ % (account_dn, old_uac, new_uac)
187 self.modify_ldif(mod)
189 def force_password_change_at_next_login(self, search_filter):
190 """Forces a password change at next login
192 :param search_filter: LDAP filter to find the user (eg
193 samccountname=name)
195 res = self.search(base=self.domain_dn(), scope=ldb.SCOPE_SUBTREE,
196 expression=search_filter, attrs=[])
197 if len(res) == 0:
198 raise Exception('Unable to find user "%s"' % search_filter)
199 assert(len(res) == 1)
200 user_dn = res[0].dn
202 mod = """
203 dn: %s
204 changetype: modify
205 replace: pwdLastSet
206 pwdLastSet: 0
207 """ % (user_dn)
208 self.modify_ldif(mod)
210 def unlock_account(self, search_filter):
211 """Unlock a user account by resetting lockoutTime to 0.
212 This does also reset the badPwdCount to 0.
214 :param search_filter: LDAP filter to find the user (e.g.
215 sAMAccountName=username)
217 res = self.search(base=self.domain_dn(),
218 scope=ldb.SCOPE_SUBTREE,
219 expression=search_filter,
220 attrs=[])
221 if len(res) == 0:
222 raise SamDBNotFoundError('Unable to find user "%s"' % search_filter)
223 if len(res) != 1:
224 raise SamDBError('User "%s" is not unique' % search_filter)
225 user_dn = res[0].dn
227 mod = """
228 dn: %s
229 changetype: modify
230 replace: lockoutTime
231 lockoutTime: 0
232 """ % (user_dn)
233 self.modify_ldif(mod)
235 def newgroup(self, groupname, groupou=None, grouptype=None,
236 description=None, mailaddress=None, notes=None, sd=None,
237 gidnumber=None, nisdomain=None):
238 """Adds a new group with additional parameters
240 :param groupname: Name of the new group
241 :param grouptype: Type of the new group
242 :param description: Description of the new group
243 :param mailaddress: Email address of the new group
244 :param notes: Notes of the new group
245 :param gidnumber: GID Number of the new group
246 :param nisdomain: NIS Domain Name of the new group
247 :param sd: security descriptor of the object
250 if groupou:
251 group_dn = "CN=%s,%s,%s" % (groupname, groupou, self.domain_dn())
252 else:
253 group_dn = "CN=%s,%s" % (groupname, self.get_wellknown_dn(
254 self.get_default_basedn(),
255 dsdb.DS_GUID_USERS_CONTAINER))
257 # The new user record. Note the reliance on the SAMLDB module which
258 # fills in the default information
259 ldbmessage = {"dn": group_dn,
260 "sAMAccountName": groupname,
261 "objectClass": "group"}
263 if grouptype is not None:
264 ldbmessage["groupType"] = normalise_int32(grouptype)
266 if description is not None:
267 ldbmessage["description"] = description
269 if mailaddress is not None:
270 ldbmessage["mail"] = mailaddress
272 if notes is not None:
273 ldbmessage["info"] = notes
275 if gidnumber is not None:
276 ldbmessage["gidNumber"] = normalise_int32(gidnumber)
278 if nisdomain is not None:
279 ldbmessage["msSFU30Name"] = groupname
280 ldbmessage["msSFU30NisDomain"] = nisdomain
282 if sd is not None:
283 ldbmessage["nTSecurityDescriptor"] = ndr_pack(sd)
285 self.add(ldbmessage)
287 def deletegroup(self, groupname):
288 """Deletes a group
290 :param groupname: Name of the target group
293 groupfilter = "(&(sAMAccountName=%s)(objectCategory=%s,%s))" % (ldb.binary_encode(groupname), "CN=Group,CN=Schema,CN=Configuration", self.domain_dn())
294 self.transaction_start()
295 try:
296 targetgroup = self.search(base=self.domain_dn(), scope=ldb.SCOPE_SUBTREE,
297 expression=groupfilter, attrs=[])
298 if len(targetgroup) == 0:
299 raise Exception('Unable to find group "%s"' % groupname)
300 assert(len(targetgroup) == 1)
301 self.delete(targetgroup[0].dn)
302 except:
303 self.transaction_cancel()
304 raise
305 else:
306 self.transaction_commit()
308 def group_member_filter(self, member, member_types):
309 filter = ""
311 all_member_types = [ 'user',
312 'group',
313 'computer',
314 'serviceaccount',
315 'contact',
318 if 'all' in member_types:
319 member_types = all_member_types
321 for member_type in member_types:
322 if member_type not in all_member_types:
323 raise Exception('Invalid group member type "%s". '
324 'Valid types are %s and all.' %
325 (member_type, ", ".join(all_member_types)))
327 if 'user' in member_types:
328 filter += ('(&(sAMAccountName=%s)(samAccountType=%d))' %
329 (ldb.binary_encode(member), dsdb.ATYPE_NORMAL_ACCOUNT))
330 if 'group' in member_types:
331 filter += ('(&(sAMAccountName=%s)'
332 '(objectClass=group)'
333 '(!(groupType:1.2.840.113556.1.4.803:=1)))' %
334 ldb.binary_encode(member))
335 if 'computer' in member_types:
336 samaccountname = member
337 if member[-1] != '$':
338 samaccountname = "%s$" % member
339 filter += ('(&(samAccountType=%d)'
340 '(!(objectCategory=msDS-ManagedServiceAccount))'
341 '(sAMAccountName=%s))' %
342 (dsdb.ATYPE_WORKSTATION_TRUST,
343 ldb.binary_encode(samaccountname)))
344 if 'serviceaccount' in member_types:
345 samaccountname = member
346 if member[-1] != '$':
347 samaccountname = "%s$" % member
348 filter += ('(&(samAccountType=%d)'
349 '(objectCategory=msDS-ManagedServiceAccount)'
350 '(sAMAccountName=%s))' %
351 (dsdb.ATYPE_WORKSTATION_TRUST,
352 ldb.binary_encode(samaccountname)))
353 if 'contact' in member_types:
354 filter += ('(&(objectCategory=Person)(!(objectSid=*))(name=%s))' %
355 ldb.binary_encode(member))
357 filter = "(|%s)" % filter
359 return filter
361 def add_remove_group_members(self, groupname, members,
362 add_members_operation=True,
363 member_types=None,
364 member_base_dn=None):
365 """Adds or removes group members
367 :param groupname: Name of the target group
368 :param members: list of group members
369 :param add_members_operation: Defines if its an add or remove
370 operation
372 if member_types is None:
373 member_types = ['user', 'group', 'computer']
375 groupfilter = "(&(sAMAccountName=%s)(objectCategory=%s,%s))" % (
376 ldb.binary_encode(groupname), "CN=Group,CN=Schema,CN=Configuration", self.domain_dn())
378 self.transaction_start()
379 try:
380 targetgroup = self.search(base=self.domain_dn(), scope=ldb.SCOPE_SUBTREE,
381 expression=groupfilter, attrs=['member'])
382 if len(targetgroup) == 0:
383 raise Exception('Unable to find group "%s"' % groupname)
384 assert(len(targetgroup) == 1)
386 modified = False
388 addtargettogroup = """
389 dn: %s
390 changetype: modify
391 """ % (str(targetgroup[0].dn))
393 for member in members:
394 targetmember_dn = None
395 if member_base_dn is None:
396 member_base_dn = self.domain_dn()
398 try:
399 membersid = security.dom_sid(member)
400 targetmember_dn = "<SID=%s>" % str(membersid)
401 except ValueError:
402 pass
404 if targetmember_dn is None:
405 try:
406 member_dn = ldb.Dn(self, member)
407 if member_dn.get_linearized() == member_dn.extended_str(1):
408 full_member_dn = self.normalize_dn_in_domain(member_dn)
409 else:
410 full_member_dn = member_dn
411 targetmember_dn = full_member_dn.extended_str(1)
412 except ValueError as e:
413 pass
415 if targetmember_dn is None:
416 filter = self.group_member_filter(member, member_types)
417 targetmember = self.search(base=member_base_dn,
418 scope=ldb.SCOPE_SUBTREE,
419 expression=filter,
420 attrs=[])
422 if len(targetmember) > 1:
423 targetmemberlist_str = ""
424 for msg in targetmember:
425 targetmemberlist_str += "%s\n" % msg.get("dn")
426 raise Exception('Found multiple results for "%s":\n%s' %
427 (member, targetmemberlist_str))
428 if len(targetmember) != 1:
429 raise Exception('Unable to find "%s". Operation cancelled.' % member)
430 targetmember_dn = targetmember[0].dn.extended_str(1)
432 if add_members_operation is True and (targetgroup[0].get('member') is None or get_bytes(targetmember_dn) not in [str(x) for x in targetgroup[0]['member']]):
433 modified = True
434 addtargettogroup += """add: member
435 member: %s
436 """ % (str(targetmember_dn))
438 elif add_members_operation is False and (targetgroup[0].get('member') is not None and get_bytes(targetmember_dn) in targetgroup[0]['member']):
439 modified = True
440 addtargettogroup += """delete: member
441 member: %s
442 """ % (str(targetmember_dn))
444 if modified is True:
445 self.modify_ldif(addtargettogroup)
447 except:
448 self.transaction_cancel()
449 raise
450 else:
451 self.transaction_commit()
453 def prepare_attr_replace(self, msg, old, attr_name, value):
454 """Changes the MessageElement with the given attr_name of the
455 given Message. If the value is "" set an empty value and the flag
456 FLAG_MOD_DELETE, otherwise set the new value and FLAG_MOD_REPLACE.
457 If the value is None or the Message contains the attr_name with this
458 value, nothing will changed."""
459 # skip unchanged attribute
460 if value is None:
461 return
462 if attr_name in old and str(value) == str(old[attr_name]):
463 return
465 # remove attribute
466 if len(value) == 0:
467 if attr_name in old:
468 el = ldb.MessageElement([], ldb.FLAG_MOD_DELETE, attr_name)
469 msg.add(el)
470 return
472 # change attribute
473 el = ldb.MessageElement(value, ldb.FLAG_MOD_REPLACE, attr_name)
474 msg.add(el)
476 def fullname_from_names(self, given_name=None, initials=None, surname=None,
477 old_attrs=None, fallback_default=""):
478 """Prepares new combined fullname, using the name parts.
479 Used for things like displayName or cn.
480 Use the original name values, if no new one is specified."""
481 if old_attrs is None:
482 old_attrs = {}
484 attrs = {"givenName": given_name,
485 "initials": initials,
486 "sn": surname}
488 # if the attribute is not specified, try to use the old one
489 for attr_name, attr_value in attrs.items():
490 if attr_value is None and attr_name in old_attrs:
491 attrs[attr_name] = str(old_attrs[attr_name])
493 # add '.' to initials if initials are not None and not "" and if the initials
494 # don't have already a '.' at the end
495 if attrs["initials"] and not attrs["initials"].endswith('.'):
496 attrs["initials"] += '.'
498 # remove empty values (None and '')
499 attrs_values = list(filter(None, attrs.values()))
501 # fullname is the combination of not-empty values as string, separated by ' '
502 fullname = ' '.join(attrs_values)
504 if fullname == '':
505 return fallback_default
507 return fullname
509 def newuser(self, username, password,
510 force_password_change_at_next_login_req=False,
511 useusernameascn=False, userou=None, surname=None, givenname=None,
512 initials=None, profilepath=None, scriptpath=None, homedrive=None,
513 homedirectory=None, jobtitle=None, department=None, company=None,
514 description=None, mailaddress=None, internetaddress=None,
515 telephonenumber=None, physicaldeliveryoffice=None, sd=None,
516 setpassword=True, uidnumber=None, gidnumber=None, gecos=None,
517 loginshell=None, uid=None, nisdomain=None, unixhome=None,
518 smartcard_required=False):
519 """Adds a new user with additional parameters
521 :param username: Name of the new user
522 :param password: Password for the new user
523 :param force_password_change_at_next_login_req: Force password change
524 :param useusernameascn: Use username as cn rather that firstname +
525 initials + lastname
526 :param userou: Object container (without domainDN postfix) for new user
527 :param surname: Surname of the new user
528 :param givenname: First name of the new user
529 :param initials: Initials of the new user
530 :param profilepath: Profile path of the new user
531 :param scriptpath: Logon script path of the new user
532 :param homedrive: Home drive of the new user
533 :param homedirectory: Home directory of the new user
534 :param jobtitle: Job title of the new user
535 :param department: Department of the new user
536 :param company: Company of the new user
537 :param description: of the new user
538 :param mailaddress: Email address of the new user
539 :param internetaddress: Home page of the new user
540 :param telephonenumber: Phone number of the new user
541 :param physicaldeliveryoffice: Office location of the new user
542 :param sd: security descriptor of the object
543 :param setpassword: optionally disable password reset
544 :param uidnumber: RFC2307 Unix numeric UID of the new user
545 :param gidnumber: RFC2307 Unix primary GID of the new user
546 :param gecos: RFC2307 Unix GECOS field of the new user
547 :param loginshell: RFC2307 Unix login shell of the new user
548 :param uid: RFC2307 Unix username of the new user
549 :param nisdomain: RFC2307 Unix NIS domain of the new user
550 :param unixhome: RFC2307 Unix home directory of the new user
551 :param smartcard_required: set the UF_SMARTCARD_REQUIRED bit of the new user
554 displayname = self.fullname_from_names(given_name=givenname,
555 initials=initials,
556 surname=surname)
557 cn = username
558 if useusernameascn is None and displayname != "":
559 cn = displayname
561 if userou:
562 user_dn = "CN=%s,%s,%s" % (cn, userou, self.domain_dn())
563 else:
564 user_dn = "CN=%s,%s" % (cn, self.get_wellknown_dn(
565 self.get_default_basedn(),
566 dsdb.DS_GUID_USERS_CONTAINER))
568 dnsdomain = ldb.Dn(self, self.domain_dn()).canonical_str().replace("/", "")
569 user_principal_name = "%s@%s" % (username, dnsdomain)
570 # The new user record. Note the reliance on the SAMLDB module which
571 # fills in the default information
572 ldbmessage = {"dn": user_dn,
573 "sAMAccountName": username,
574 "userPrincipalName": user_principal_name,
575 "objectClass": "user"}
577 if smartcard_required:
578 ldbmessage["userAccountControl"] = str(dsdb.UF_NORMAL_ACCOUNT |
579 dsdb.UF_SMARTCARD_REQUIRED)
580 setpassword = False
582 if surname is not None:
583 ldbmessage["sn"] = surname
585 if givenname is not None:
586 ldbmessage["givenName"] = givenname
588 if displayname != "":
589 ldbmessage["displayName"] = displayname
590 ldbmessage["name"] = displayname
592 if initials is not None:
593 ldbmessage["initials"] = '%s.' % initials
595 if profilepath is not None:
596 ldbmessage["profilePath"] = profilepath
598 if scriptpath is not None:
599 ldbmessage["scriptPath"] = scriptpath
601 if homedrive is not None:
602 ldbmessage["homeDrive"] = homedrive
604 if homedirectory is not None:
605 ldbmessage["homeDirectory"] = homedirectory
607 if jobtitle is not None:
608 ldbmessage["title"] = jobtitle
610 if department is not None:
611 ldbmessage["department"] = department
613 if company is not None:
614 ldbmessage["company"] = company
616 if description is not None:
617 ldbmessage["description"] = description
619 if mailaddress is not None:
620 ldbmessage["mail"] = mailaddress
622 if internetaddress is not None:
623 ldbmessage["wWWHomePage"] = internetaddress
625 if telephonenumber is not None:
626 ldbmessage["telephoneNumber"] = telephonenumber
628 if physicaldeliveryoffice is not None:
629 ldbmessage["physicalDeliveryOfficeName"] = physicaldeliveryoffice
631 if sd is not None:
632 ldbmessage["nTSecurityDescriptor"] = ndr_pack(sd)
634 ldbmessage2 = None
635 if any(map(lambda b: b is not None, (uid, uidnumber, gidnumber, gecos,
636 loginshell, nisdomain, unixhome))):
637 ldbmessage2 = ldb.Message()
638 ldbmessage2.dn = ldb.Dn(self, user_dn)
639 if uid is not None:
640 ldbmessage2["uid"] = ldb.MessageElement(str(uid), ldb.FLAG_MOD_REPLACE, 'uid')
641 if uidnumber is not None:
642 ldbmessage2["uidNumber"] = ldb.MessageElement(str(uidnumber), ldb.FLAG_MOD_REPLACE, 'uidNumber')
643 if gidnumber is not None:
644 ldbmessage2["gidNumber"] = ldb.MessageElement(str(gidnumber), ldb.FLAG_MOD_REPLACE, 'gidNumber')
645 if gecos is not None:
646 ldbmessage2["gecos"] = ldb.MessageElement(str(gecos), ldb.FLAG_MOD_REPLACE, 'gecos')
647 if loginshell is not None:
648 ldbmessage2["loginShell"] = ldb.MessageElement(str(loginshell), ldb.FLAG_MOD_REPLACE, 'loginShell')
649 if unixhome is not None:
650 ldbmessage2["unixHomeDirectory"] = ldb.MessageElement(
651 str(unixhome), ldb.FLAG_MOD_REPLACE, 'unixHomeDirectory')
652 if nisdomain is not None:
653 ldbmessage2["msSFU30NisDomain"] = ldb.MessageElement(
654 str(nisdomain), ldb.FLAG_MOD_REPLACE, 'msSFU30NisDomain')
655 ldbmessage2["msSFU30Name"] = ldb.MessageElement(
656 str(username), ldb.FLAG_MOD_REPLACE, 'msSFU30Name')
657 ldbmessage2["unixUserPassword"] = ldb.MessageElement(
658 'ABCD!efgh12345$67890', ldb.FLAG_MOD_REPLACE,
659 'unixUserPassword')
661 self.transaction_start()
662 try:
663 self.add(ldbmessage)
665 with self._CleanUpOnError(self, user_dn):
666 if ldbmessage2:
667 self.modify(ldbmessage2)
669 # Sets the password for it
670 if setpassword:
671 self.setpassword(("(distinguishedName=%s)" %
672 ldb.binary_encode(user_dn)),
673 password,
674 force_password_change_at_next_login_req)
675 except:
676 self.transaction_cancel()
677 raise
678 else:
679 self.transaction_commit()
681 def newcontact(self,
682 fullcontactname=None,
683 ou=None,
684 surname=None,
685 givenname=None,
686 initials=None,
687 displayname=None,
688 jobtitle=None,
689 department=None,
690 company=None,
691 description=None,
692 mailaddress=None,
693 internetaddress=None,
694 telephonenumber=None,
695 mobilenumber=None,
696 physicaldeliveryoffice=None):
697 """Adds a new contact with additional parameters
699 :param fullcontactname: Optional full name of the new contact
700 :param ou: Object container for new contact
701 :param surname: Surname of the new contact
702 :param givenname: First name of the new contact
703 :param initials: Initials of the new contact
704 :param displayname: displayName of the new contact
705 :param jobtitle: Job title of the new contact
706 :param department: Department of the new contact
707 :param company: Company of the new contact
708 :param description: Description of the new contact
709 :param mailaddress: Email address of the new contact
710 :param internetaddress: Home page of the new contact
711 :param telephonenumber: Phone number of the new contact
712 :param mobilenumber: Primary mobile number of the new contact
713 :param physicaldeliveryoffice: Office location of the new contact
716 # Prepare the contact name like the RSAT, using the name parts.
717 cn = self.fullname_from_names(given_name=givenname,
718 initials=initials,
719 surname=surname)
721 # Use the specified fullcontactname instead of the previously prepared
722 # contact name, if it is specified.
723 # This is similar to the "Full name" value of the RSAT.
724 if fullcontactname is not None:
725 cn = fullcontactname
727 if fullcontactname is None and cn == "":
728 raise Exception('No name for contact specified')
730 contactcontainer_dn = self.domain_dn()
731 if ou:
732 contactcontainer_dn = self.normalize_dn_in_domain(ou)
734 contact_dn = "CN=%s,%s" % (cn, contactcontainer_dn)
736 ldbmessage = {"dn": contact_dn,
737 "objectClass": "contact",
740 if surname is not None:
741 ldbmessage["sn"] = surname
743 if givenname is not None:
744 ldbmessage["givenName"] = givenname
746 if displayname is not None:
747 ldbmessage["displayName"] = displayname
749 if initials is not None:
750 ldbmessage["initials"] = '%s.' % initials
752 if jobtitle is not None:
753 ldbmessage["title"] = jobtitle
755 if department is not None:
756 ldbmessage["department"] = department
758 if company is not None:
759 ldbmessage["company"] = company
761 if description is not None:
762 ldbmessage["description"] = description
764 if mailaddress is not None:
765 ldbmessage["mail"] = mailaddress
767 if internetaddress is not None:
768 ldbmessage["wWWHomePage"] = internetaddress
770 if telephonenumber is not None:
771 ldbmessage["telephoneNumber"] = telephonenumber
773 if mobilenumber is not None:
774 ldbmessage["mobile"] = mobilenumber
776 if physicaldeliveryoffice is not None:
777 ldbmessage["physicalDeliveryOfficeName"] = physicaldeliveryoffice
779 self.add(ldbmessage)
781 return cn
783 def newcomputer(self, computername, computerou=None, description=None,
784 prepare_oldjoin=False, ip_address_list=None,
785 service_principal_name_list=None):
786 """Adds a new user with additional parameters
788 :param computername: Name of the new computer
789 :param computerou: Object container for new computer
790 :param description: Description of the new computer
791 :param prepare_oldjoin: Preset computer password for oldjoin mechanism
792 :param ip_address_list: ip address list for DNS A or AAAA record
793 :param service_principal_name_list: string list of servicePincipalName
796 cn = re.sub(r"\$$", "", computername)
797 if cn.count('$'):
798 raise Exception('Illegal computername "%s"' % computername)
799 samaccountname = "%s$" % cn
801 computercontainer_dn = self.get_wellknown_dn(self.get_default_basedn(),
802 dsdb.DS_GUID_COMPUTERS_CONTAINER)
803 if computerou:
804 computercontainer_dn = self.normalize_dn_in_domain(computerou)
806 computer_dn = "CN=%s,%s" % (cn, computercontainer_dn)
808 ldbmessage = {"dn": computer_dn,
809 "sAMAccountName": samaccountname,
810 "objectClass": "computer",
813 if description is not None:
814 ldbmessage["description"] = description
816 if service_principal_name_list:
817 ldbmessage["servicePrincipalName"] = service_principal_name_list
819 accountcontrol = str(dsdb.UF_WORKSTATION_TRUST_ACCOUNT |
820 dsdb.UF_ACCOUNTDISABLE)
821 if prepare_oldjoin:
822 accountcontrol = str(dsdb.UF_WORKSTATION_TRUST_ACCOUNT)
823 ldbmessage["userAccountControl"] = accountcontrol
825 if ip_address_list:
826 ldbmessage['dNSHostName'] = '{}.{}'.format(
827 cn, self.domain_dns_name())
829 self.transaction_start()
830 try:
831 self.add(ldbmessage)
833 if prepare_oldjoin:
834 password = cn.lower()
835 with self._CleanUpOnError(self, computer_dn):
836 self.setpassword(("(distinguishedName=%s)" %
837 ldb.binary_encode(computer_dn)),
838 password, False)
839 except:
840 self.transaction_cancel()
841 raise
842 else:
843 self.transaction_commit()
845 def deleteuser(self, username):
846 """Deletes a user
848 :param username: Name of the target user
851 filter = "(&(sAMAccountName=%s)(objectCategory=%s,%s))" % (ldb.binary_encode(username), "CN=Person,CN=Schema,CN=Configuration", self.domain_dn())
852 self.transaction_start()
853 try:
854 target = self.search(base=self.domain_dn(), scope=ldb.SCOPE_SUBTREE,
855 expression=filter, attrs=[])
856 if len(target) == 0:
857 raise Exception('Unable to find user "%s"' % username)
858 assert(len(target) == 1)
859 self.delete(target[0].dn)
860 except:
861 self.transaction_cancel()
862 raise
863 else:
864 self.transaction_commit()
866 def setpassword(self, search_filter, password,
867 force_change_at_next_login=False, username=None):
868 """Sets the password for a user
870 :param search_filter: LDAP filter to find the user (eg
871 samccountname=name)
872 :param password: Password for the user
873 :param force_change_at_next_login: Force password change
875 self.transaction_start()
876 try:
877 res = self.search(base=self.domain_dn(), scope=ldb.SCOPE_SUBTREE,
878 expression=search_filter, attrs=[])
879 if len(res) == 0:
880 raise Exception('Unable to find user "%s"' % (username or search_filter))
881 if len(res) > 1:
882 raise Exception('Matched %u multiple users with filter "%s"' % (len(res), search_filter))
883 user_dn = res[0].dn
884 if not isinstance(password, str):
885 pw = password.decode('utf-8')
886 else:
887 pw = password
888 pw = ('"' + pw + '"').encode('utf-16-le')
889 setpw = """
890 dn: %s
891 changetype: modify
892 replace: unicodePwd
893 unicodePwd:: %s
894 """ % (user_dn, base64.b64encode(pw).decode('utf-8'))
896 self.modify_ldif(setpw)
898 if force_change_at_next_login:
899 self.force_password_change_at_next_login(
900 "(distinguishedName=" + str(user_dn) + ")")
902 # modify the userAccountControl to remove the disabled bit
903 self.enable_account(search_filter)
904 except:
905 self.transaction_cancel()
906 raise
907 else:
908 self.transaction_commit()
910 def setexpiry(self, search_filter, expiry_seconds, no_expiry_req=False):
911 """Sets the account expiry for a user
913 :param search_filter: LDAP filter to find the user (eg
914 samaccountname=name)
915 :param expiry_seconds: expiry time from now in seconds
916 :param no_expiry_req: if set, then don't expire password
918 self.transaction_start()
919 try:
920 res = self.search(base=self.domain_dn(), scope=ldb.SCOPE_SUBTREE,
921 expression=search_filter,
922 attrs=["userAccountControl", "accountExpires"])
923 if len(res) == 0:
924 raise Exception('Unable to find user "%s"' % search_filter)
925 assert(len(res) == 1)
926 user_dn = res[0].dn
928 userAccountControl = int(res[0]["userAccountControl"][0])
929 if no_expiry_req:
930 userAccountControl = userAccountControl | 0x10000
931 accountExpires = 0
932 else:
933 userAccountControl = userAccountControl & ~0x10000
934 accountExpires = samba.unix2nttime(expiry_seconds + int(time.time()))
936 setexp = """
937 dn: %s
938 changetype: modify
939 replace: userAccountControl
940 userAccountControl: %u
941 replace: accountExpires
942 accountExpires: %u
943 """ % (user_dn, userAccountControl, accountExpires)
945 self.modify_ldif(setexp)
946 except:
947 self.transaction_cancel()
948 raise
949 else:
950 self.transaction_commit()
952 def set_domain_sid(self, sid):
953 """Change the domain SID used by this LDB.
955 :param sid: The new domain sid to use.
957 dsdb._samdb_set_domain_sid(self, sid)
959 def get_domain_sid(self):
960 """Read the domain SID used by this LDB. """
961 return dsdb._samdb_get_domain_sid(self)
963 domain_sid = property(get_domain_sid, set_domain_sid,
964 doc="SID for the domain")
966 def set_invocation_id(self, invocation_id):
967 """Set the invocation id for this SamDB handle.
969 :param invocation_id: GUID of the invocation id.
971 dsdb._dsdb_set_ntds_invocation_id(self, invocation_id)
973 def get_invocation_id(self):
974 """Get the invocation_id id"""
975 return dsdb._samdb_ntds_invocation_id(self)
977 invocation_id = property(get_invocation_id, set_invocation_id,
978 doc="Invocation ID GUID")
980 def get_oid_from_attid(self, attid):
981 return dsdb._dsdb_get_oid_from_attid(self, attid)
983 def get_attid_from_lDAPDisplayName(self, ldap_display_name,
984 is_schema_nc=False):
985 """return the attribute ID for a LDAP attribute as an integer as found in DRSUAPI"""
986 return dsdb._dsdb_get_attid_from_lDAPDisplayName(self,
987 ldap_display_name, is_schema_nc)
989 def get_syntax_oid_from_lDAPDisplayName(self, ldap_display_name):
990 """return the syntax OID for a LDAP attribute as a string"""
991 return dsdb._dsdb_get_syntax_oid_from_lDAPDisplayName(self, ldap_display_name)
993 def get_systemFlags_from_lDAPDisplayName(self, ldap_display_name):
994 """return the systemFlags for a LDAP attribute as a integer"""
995 return dsdb._dsdb_get_systemFlags_from_lDAPDisplayName(self, ldap_display_name)
997 def get_linkId_from_lDAPDisplayName(self, ldap_display_name):
998 """return the linkID for a LDAP attribute as a integer"""
999 return dsdb._dsdb_get_linkId_from_lDAPDisplayName(self, ldap_display_name)
1001 def get_lDAPDisplayName_by_attid(self, attid):
1002 """return the lDAPDisplayName from an integer DRS attribute ID"""
1003 return dsdb._dsdb_get_lDAPDisplayName_by_attid(self, attid)
1005 def get_backlink_from_lDAPDisplayName(self, ldap_display_name):
1006 """return the attribute name of the corresponding backlink from the name
1007 of a forward link attribute. If there is no backlink return None"""
1008 return dsdb._dsdb_get_backlink_from_lDAPDisplayName(self, ldap_display_name)
1010 def set_ntds_settings_dn(self, ntds_settings_dn):
1011 """Set the NTDS Settings DN, as would be returned on the dsServiceName
1012 rootDSE attribute.
1014 This allows the DN to be set before the database fully exists
1016 :param ntds_settings_dn: The new DN to use
1018 dsdb._samdb_set_ntds_settings_dn(self, ntds_settings_dn)
1020 def get_ntds_GUID(self):
1021 """Get the NTDS objectGUID"""
1022 return dsdb._samdb_ntds_objectGUID(self)
1024 def get_timestr(self):
1025 """Get the current time as generalized time string"""
1026 res = self.search(base="",
1027 scope=ldb.SCOPE_BASE,
1028 attrs=["currentTime"])
1029 return str(res[0]["currentTime"][0])
1031 def get_time(self):
1032 """Get the current time as UNIX time"""
1033 return ldb.string_to_time(self.get_timestr())
1035 def get_nttime(self):
1036 """Get the current time as NT time"""
1037 return samba.unix2nttime(self.get_time())
1039 def server_site_name(self):
1040 """Get the server site name"""
1041 return dsdb._samdb_server_site_name(self)
1043 def host_dns_name(self):
1044 """return the DNS name of this host"""
1045 res = self.search(base='', scope=ldb.SCOPE_BASE, attrs=['dNSHostName'])
1046 return str(res[0]['dNSHostName'][0])
1048 def domain_dns_name(self):
1049 """return the DNS name of the domain root"""
1050 domain_dn = self.get_default_basedn()
1051 return domain_dn.canonical_str().split('/')[0]
1053 def domain_netbios_name(self):
1054 """return the NetBIOS name of the domain root"""
1055 domain_dn = self.get_default_basedn()
1056 dns_name = self.domain_dns_name()
1057 filter = "(&(objectClass=crossRef)(nETBIOSName=*)(ncName=%s)(dnsroot=%s))" % (domain_dn, dns_name)
1058 partitions_dn = self.get_partitions_dn()
1059 res = self.search(partitions_dn,
1060 scope=ldb.SCOPE_ONELEVEL,
1061 expression=filter)
1062 try:
1063 netbios_domain = res[0]["nETBIOSName"][0].decode()
1064 except IndexError:
1065 return None
1066 return netbios_domain
1068 def forest_dns_name(self):
1069 """return the DNS name of the forest root"""
1070 forest_dn = self.get_root_basedn()
1071 return forest_dn.canonical_str().split('/')[0]
1073 def load_partition_usn(self, base_dn):
1074 return dsdb._dsdb_load_partition_usn(self, base_dn)
1076 def set_schema(self, schema, write_indices_and_attributes=True):
1077 self.set_schema_from_ldb(schema.ldb, write_indices_and_attributes=write_indices_and_attributes)
1079 def set_schema_from_ldb(self, ldb_conn, write_indices_and_attributes=True):
1080 dsdb._dsdb_set_schema_from_ldb(self, ldb_conn, write_indices_and_attributes)
1082 def set_schema_update_now(self):
1083 ldif = """
1085 changetype: modify
1086 add: schemaUpdateNow
1087 schemaUpdateNow: 1
1089 self.modify_ldif(ldif)
1091 def dsdb_DsReplicaAttribute(self, ldb, ldap_display_name, ldif_elements):
1092 """convert a list of attribute values to a DRSUAPI DsReplicaAttribute"""
1093 return dsdb._dsdb_DsReplicaAttribute(ldb, ldap_display_name, ldif_elements)
1095 def dsdb_normalise_attributes(self, ldb, ldap_display_name, ldif_elements):
1096 """normalise a list of attribute values"""
1097 return dsdb._dsdb_normalise_attributes(ldb, ldap_display_name, ldif_elements)
1099 def get_attribute_from_attid(self, attid):
1100 """ Get from an attid the associated attribute
1102 :param attid: The attribute id for searched attribute
1103 :return: The name of the attribute associated with this id
1105 if len(self.hash_oid_name.keys()) == 0:
1106 self._populate_oid_attid()
1107 if self.get_oid_from_attid(attid) in self.hash_oid_name:
1108 return self.hash_oid_name[self.get_oid_from_attid(attid)]
1109 else:
1110 return None
1112 def _populate_oid_attid(self):
1113 """Populate the hash hash_oid_name.
1115 This hash contains the oid of the attribute as a key and
1116 its display name as a value
1118 self.hash_oid_name = {}
1119 res = self.search(expression="objectClass=attributeSchema",
1120 controls=["search_options:1:2"],
1121 attrs=["attributeID",
1122 "lDAPDisplayName"])
1123 if len(res) > 0:
1124 for e in res:
1125 strDisplay = str(e.get("lDAPDisplayName"))
1126 self.hash_oid_name[str(e.get("attributeID"))] = strDisplay
1128 def get_attribute_replmetadata_version(self, dn, att):
1129 """Get the version field trom the replPropertyMetaData for
1130 the given field
1132 :param dn: The on which we want to get the version
1133 :param att: The name of the attribute
1134 :return: The value of the version field in the replPropertyMetaData
1135 for the given attribute. None if the attribute is not replicated
1138 res = self.search(expression="distinguishedName=%s" % dn,
1139 scope=ldb.SCOPE_SUBTREE,
1140 controls=["search_options:1:2"],
1141 attrs=["replPropertyMetaData"])
1142 if len(res) == 0:
1143 return None
1145 repl = ndr_unpack(drsblobs.replPropertyMetaDataBlob,
1146 res[0]["replPropertyMetaData"][0])
1147 ctr = repl.ctr
1148 if len(self.hash_oid_name.keys()) == 0:
1149 self._populate_oid_attid()
1150 for o in ctr.array:
1151 # Search for Description
1152 att_oid = self.get_oid_from_attid(o.attid)
1153 if att_oid in self.hash_oid_name and\
1154 att.lower() == self.hash_oid_name[att_oid].lower():
1155 return o.version
1156 return None
1158 def set_attribute_replmetadata_version(self, dn, att, value,
1159 addifnotexist=False):
1160 res = self.search(expression="distinguishedName=%s" % dn,
1161 scope=ldb.SCOPE_SUBTREE,
1162 controls=["search_options:1:2"],
1163 attrs=["replPropertyMetaData"])
1164 if len(res) == 0:
1165 return None
1167 repl = ndr_unpack(drsblobs.replPropertyMetaDataBlob,
1168 res[0]["replPropertyMetaData"][0])
1169 ctr = repl.ctr
1170 now = samba.unix2nttime(int(time.time()))
1171 found = False
1172 if len(self.hash_oid_name.keys()) == 0:
1173 self._populate_oid_attid()
1174 for o in ctr.array:
1175 # Search for Description
1176 att_oid = self.get_oid_from_attid(o.attid)
1177 if att_oid in self.hash_oid_name and\
1178 att.lower() == self.hash_oid_name[att_oid].lower():
1179 found = True
1180 seq = self.sequence_number(ldb.SEQ_NEXT)
1181 o.version = value
1182 o.originating_change_time = now
1183 o.originating_invocation_id = misc.GUID(self.get_invocation_id())
1184 o.originating_usn = seq
1185 o.local_usn = seq
1187 if not found and addifnotexist and len(ctr.array) > 0:
1188 o2 = drsblobs.replPropertyMetaData1()
1189 o2.attid = 589914
1190 att_oid = self.get_oid_from_attid(o2.attid)
1191 seq = self.sequence_number(ldb.SEQ_NEXT)
1192 o2.version = value
1193 o2.originating_change_time = now
1194 o2.originating_invocation_id = misc.GUID(self.get_invocation_id())
1195 o2.originating_usn = seq
1196 o2.local_usn = seq
1197 found = True
1198 tab = ctr.array
1199 tab.append(o2)
1200 ctr.count = ctr.count + 1
1201 ctr.array = tab
1203 if found:
1204 replBlob = ndr_pack(repl)
1205 msg = ldb.Message()
1206 msg.dn = res[0].dn
1207 msg["replPropertyMetaData"] = \
1208 ldb.MessageElement(replBlob,
1209 ldb.FLAG_MOD_REPLACE,
1210 "replPropertyMetaData")
1211 self.modify(msg, ["local_oid:1.3.6.1.4.1.7165.4.3.14:0"])
1213 def write_prefixes_from_schema(self):
1214 dsdb._dsdb_write_prefixes_from_schema_to_ldb(self)
1216 def get_partitions_dn(self):
1217 return dsdb._dsdb_get_partitions_dn(self)
1219 def get_nc_root(self, dn):
1220 return dsdb._dsdb_get_nc_root(self, dn)
1222 def get_wellknown_dn(self, nc_root, wkguid):
1223 h_nc = self.hash_well_known.get(str(nc_root))
1224 dn = None
1225 if h_nc is not None:
1226 dn = h_nc.get(wkguid)
1227 if dn is None:
1228 dn = dsdb._dsdb_get_wellknown_dn(self, nc_root, wkguid)
1229 if dn is None:
1230 return dn
1231 if h_nc is None:
1232 self.hash_well_known[str(nc_root)] = {}
1233 h_nc = self.hash_well_known[str(nc_root)]
1234 h_nc[wkguid] = dn
1235 return dn
1237 def set_minPwdAge(self, value):
1238 if not isinstance(value, bytes):
1239 value = str(value).encode('utf8')
1240 m = ldb.Message()
1241 m.dn = ldb.Dn(self, self.domain_dn())
1242 m["minPwdAge"] = ldb.MessageElement(value, ldb.FLAG_MOD_REPLACE, "minPwdAge")
1243 self.modify(m)
1245 def get_minPwdAge(self):
1246 res = self.search(self.domain_dn(), scope=ldb.SCOPE_BASE, attrs=["minPwdAge"])
1247 if len(res) == 0:
1248 return None
1249 elif "minPwdAge" not in res[0]:
1250 return None
1251 else:
1252 return int(res[0]["minPwdAge"][0])
1254 def set_maxPwdAge(self, value):
1255 if not isinstance(value, bytes):
1256 value = str(value).encode('utf8')
1257 m = ldb.Message()
1258 m.dn = ldb.Dn(self, self.domain_dn())
1259 m["maxPwdAge"] = ldb.MessageElement(value, ldb.FLAG_MOD_REPLACE, "maxPwdAge")
1260 self.modify(m)
1262 def get_maxPwdAge(self):
1263 res = self.search(self.domain_dn(), scope=ldb.SCOPE_BASE, attrs=["maxPwdAge"])
1264 if len(res) == 0:
1265 return None
1266 elif "maxPwdAge" not in res[0]:
1267 return None
1268 else:
1269 return int(res[0]["maxPwdAge"][0])
1271 def set_minPwdLength(self, value):
1272 if not isinstance(value, bytes):
1273 value = str(value).encode('utf8')
1274 m = ldb.Message()
1275 m.dn = ldb.Dn(self, self.domain_dn())
1276 m["minPwdLength"] = ldb.MessageElement(value, ldb.FLAG_MOD_REPLACE, "minPwdLength")
1277 self.modify(m)
1279 def get_minPwdLength(self):
1280 res = self.search(self.domain_dn(), scope=ldb.SCOPE_BASE, attrs=["minPwdLength"])
1281 if len(res) == 0:
1282 return None
1283 elif "minPwdLength" not in res[0]:
1284 return None
1285 else:
1286 return int(res[0]["minPwdLength"][0])
1288 def set_pwdProperties(self, value):
1289 if not isinstance(value, bytes):
1290 value = str(value).encode('utf8')
1291 m = ldb.Message()
1292 m.dn = ldb.Dn(self, self.domain_dn())
1293 m["pwdProperties"] = ldb.MessageElement(value, ldb.FLAG_MOD_REPLACE, "pwdProperties")
1294 self.modify(m)
1296 def get_pwdProperties(self):
1297 res = self.search(self.domain_dn(), scope=ldb.SCOPE_BASE, attrs=["pwdProperties"])
1298 if len(res) == 0:
1299 return None
1300 elif "pwdProperties" not in res[0]:
1301 return None
1302 else:
1303 return int(res[0]["pwdProperties"][0])
1305 def set_dsheuristics(self, dsheuristics):
1306 m = ldb.Message()
1307 m.dn = ldb.Dn(self, "CN=Directory Service,CN=Windows NT,CN=Services,%s"
1308 % self.get_config_basedn().get_linearized())
1309 if dsheuristics is not None:
1310 m["dSHeuristics"] = \
1311 ldb.MessageElement(dsheuristics,
1312 ldb.FLAG_MOD_REPLACE,
1313 "dSHeuristics")
1314 else:
1315 m["dSHeuristics"] = \
1316 ldb.MessageElement([], ldb.FLAG_MOD_DELETE,
1317 "dSHeuristics")
1318 self.modify(m)
1320 def get_dsheuristics(self):
1321 res = self.search("CN=Directory Service,CN=Windows NT,CN=Services,%s"
1322 % self.get_config_basedn().get_linearized(),
1323 scope=ldb.SCOPE_BASE, attrs=["dSHeuristics"])
1324 if len(res) == 0:
1325 dsheuristics = None
1326 elif "dSHeuristics" in res[0]:
1327 dsheuristics = res[0]["dSHeuristics"][0]
1328 else:
1329 dsheuristics = None
1331 return dsheuristics
1333 def create_ou(self, ou_dn, description=None, name=None, sd=None):
1334 """Creates an organizationalUnit object
1335 :param ou_dn: dn of the new object
1336 :param description: description attribute
1337 :param name: name attribute
1338 :param sd: security descriptor of the object, can be
1339 an SDDL string or security.descriptor type
1341 m = {"dn": ou_dn,
1342 "objectClass": "organizationalUnit"}
1344 if description:
1345 m["description"] = description
1346 if name:
1347 m["name"] = name
1349 if sd:
1350 m["nTSecurityDescriptor"] = ndr_pack(sd)
1351 self.add(m)
1353 def sequence_number(self, seq_type):
1354 """Returns the value of the sequence number according to the requested type
1355 :param seq_type: type of sequence number
1357 self.transaction_start()
1358 try:
1359 seq = super().sequence_number(seq_type)
1360 except:
1361 self.transaction_cancel()
1362 raise
1363 else:
1364 self.transaction_commit()
1365 return seq
1367 def get_dsServiceName(self):
1368 """get the NTDS DN from the rootDSE"""
1369 res = self.search(base="", scope=ldb.SCOPE_BASE, attrs=["dsServiceName"])
1370 return str(res[0]["dsServiceName"][0])
1372 def get_serverName(self):
1373 """get the server DN from the rootDSE"""
1374 res = self.search(base="", scope=ldb.SCOPE_BASE, attrs=["serverName"])
1375 return str(res[0]["serverName"][0])
1377 def dns_lookup(self, dns_name, dns_partition=None):
1378 """Do a DNS lookup in the database, returns the NDR database structures"""
1379 if dns_partition is None:
1380 return dsdb_dns.lookup(self, dns_name)
1381 else:
1382 return dsdb_dns.lookup(self, dns_name,
1383 dns_partition=dns_partition)
1385 def dns_extract(self, el):
1386 """Return the NDR database structures from a dnsRecord element"""
1387 return dsdb_dns.extract(self, el)
1389 def dns_replace(self, dns_name, new_records):
1390 """Do a DNS modification on the database, sets the NDR database
1391 structures on a DNS name
1393 return dsdb_dns.replace(self, dns_name, new_records)
1395 def dns_replace_by_dn(self, dn, new_records):
1396 """Do a DNS modification on the database, sets the NDR database
1397 structures on a LDB DN
1399 This routine is important because if the last record on the DN
1400 is removed, this routine will put a tombstone in the record.
1402 return dsdb_dns.replace_by_dn(self, dn, new_records)
1404 def garbage_collect_tombstones(self, dn, current_time,
1405 tombstone_lifetime=None):
1406 """garbage_collect_tombstones(lp, samdb, [dn], current_time, tombstone_lifetime)
1407 -> (num_objects_expunged, num_links_expunged)"""
1409 if not is_ad_dc_built():
1410 raise SamDBError('Cannot garbage collect tombstones: '
1411 'AD DC was not built')
1413 if tombstone_lifetime is None:
1414 return dsdb._dsdb_garbage_collect_tombstones(self, dn,
1415 current_time)
1416 else:
1417 return dsdb._dsdb_garbage_collect_tombstones(self, dn,
1418 current_time,
1419 tombstone_lifetime)
1421 def create_own_rid_set(self):
1422 """create a RID set for this DSA"""
1423 return dsdb._dsdb_create_own_rid_set(self)
1425 def allocate_rid(self):
1426 """return a new RID from the RID Pool on this DSA"""
1427 return dsdb._dsdb_allocate_rid(self)
1429 def next_free_rid(self):
1430 """return the next free RID from the RID Pool on this DSA.
1432 :note: This function is not intended for general use, and care must be
1433 taken if it is used to generate objectSIDs. The returned RID is not
1434 formally reserved for use, creating the possibility of duplicate
1435 objectSIDs.
1437 rid, _ = self.free_rid_bounds()
1438 return rid
1440 def free_rid_bounds(self):
1441 """return the low and high bounds (inclusive) of RIDs that are
1442 available for use in this DSA's current RID pool.
1444 :note: This function is not intended for general use, and care must be
1445 taken if it is used to generate objectSIDs. The returned range of
1446 RIDs is not formally reserved for use, creating the possibility of
1447 duplicate objectSIDs.
1449 # Get DN of this server's RID Set
1450 server_name_dn = ldb.Dn(self, self.get_serverName())
1451 res = self.search(base=server_name_dn,
1452 scope=ldb.SCOPE_BASE,
1453 attrs=["serverReference"])
1454 try:
1455 server_ref = res[0]["serverReference"]
1456 except KeyError:
1457 raise ldb.LdbError(
1458 ldb.ERR_NO_SUCH_ATTRIBUTE,
1459 "No RID Set DN - "
1460 "Cannot find attribute serverReference of %s "
1461 "to calculate reference dn" % server_name_dn) from None
1462 server_ref_dn = ldb.Dn(self, server_ref[0].decode("utf-8"))
1464 res = self.search(base=server_ref_dn,
1465 scope=ldb.SCOPE_BASE,
1466 attrs=["rIDSetReferences"])
1467 try:
1468 rid_set_refs = res[0]["rIDSetReferences"]
1469 except KeyError:
1470 raise ldb.LdbError(
1471 ldb.ERR_NO_SUCH_ATTRIBUTE,
1472 "No RID Set DN - "
1473 "Cannot find attribute rIDSetReferences of %s "
1474 "to calculate reference dn" % server_ref_dn) from None
1475 rid_set_dn = ldb.Dn(self, rid_set_refs[0].decode("utf-8"))
1477 # Get the alloc pools and next RID of this RID Set
1478 res = self.search(base=rid_set_dn,
1479 scope=ldb.SCOPE_BASE,
1480 attrs=["rIDAllocationPool",
1481 "rIDPreviousAllocationPool",
1482 "rIDNextRID"])
1484 uint32_max = 2**32 - 1
1485 uint64_max = 2**64 - 1
1487 try:
1488 alloc_pool = int(res[0]["rIDAllocationPool"][0])
1489 except KeyError:
1490 alloc_pool = uint64_max
1491 if alloc_pool == uint64_max:
1492 raise ldb.LdbError(ldb.ERR_OPERATIONS_ERROR,
1493 "Bad RID Set %s" % rid_set_dn)
1495 try:
1496 prev_pool = int(res[0]["rIDPreviousAllocationPool"][0])
1497 except KeyError:
1498 prev_pool = uint64_max
1499 try:
1500 next_rid = int(res[0]["rIDNextRID"][0])
1501 except KeyError:
1502 next_rid = uint32_max
1504 # If we never used a pool, set up our first pool
1505 if prev_pool == uint64_max or next_rid == uint32_max:
1506 prev_pool = alloc_pool
1507 next_rid = prev_pool & uint32_max
1508 else:
1509 next_rid += 1
1511 # Now check if our current pool is still usable
1512 prev_pool_lo = prev_pool & uint32_max
1513 prev_pool_hi = prev_pool >> 32
1514 if next_rid > prev_pool_hi:
1515 # We need a new pool, check if we already have a new one
1516 # Otherwise we return an error code.
1517 if alloc_pool == prev_pool:
1518 raise ldb.LdbError(ldb.ERR_OPERATIONS_ERROR,
1519 "RID pools out of RIDs")
1521 # Now use the new pool
1522 prev_pool = alloc_pool
1523 prev_pool_lo = prev_pool & uint32_max
1524 prev_pool_hi = prev_pool >> 32
1525 next_rid = prev_pool_lo
1527 if next_rid < prev_pool_lo or next_rid > prev_pool_hi:
1528 raise ldb.LdbError(ldb.ERR_OPERATIONS_ERROR,
1529 "Bad RID chosen %d from range %d-%d" %
1530 (next_rid, prev_pool_lo, prev_pool_hi))
1532 return next_rid, prev_pool_hi
1534 def normalize_dn_in_domain(self, dn):
1535 """return a new DN expanded by adding the domain DN
1537 If the dn is already a child of the domain DN, just
1538 return it as-is.
1540 :param dn: relative dn
1542 domain_dn = ldb.Dn(self, self.domain_dn())
1544 if isinstance(dn, ldb.Dn):
1545 dn = str(dn)
1547 full_dn = ldb.Dn(self, dn)
1548 if not full_dn.is_child_of(domain_dn):
1549 full_dn.add_base(domain_dn)
1550 return full_dn
1552 class dsdb_Dn(object):
1553 """a class for binary DN"""
1555 def __init__(self, samdb, dnstring, syntax_oid=None):
1556 """create a dsdb_Dn"""
1557 if syntax_oid is None:
1558 # auto-detect based on string
1559 if dnstring.startswith("B:"):
1560 syntax_oid = dsdb.DSDB_SYNTAX_BINARY_DN
1561 elif dnstring.startswith("S:"):
1562 syntax_oid = dsdb.DSDB_SYNTAX_STRING_DN
1563 else:
1564 syntax_oid = dsdb.DSDB_SYNTAX_OR_NAME
1565 if syntax_oid in [dsdb.DSDB_SYNTAX_BINARY_DN, dsdb.DSDB_SYNTAX_STRING_DN]:
1566 # it is a binary DN
1567 colons = dnstring.split(':')
1568 if len(colons) < 4:
1569 raise RuntimeError("Invalid DN %s" % dnstring)
1570 prefix_len = 4 + len(colons[1]) + int(colons[1])
1571 self.prefix = dnstring[0:prefix_len]
1572 self.binary = self.prefix[3 + len(colons[1]):-1]
1573 self.dnstring = dnstring[prefix_len:]
1574 else:
1575 self.dnstring = dnstring
1576 self.prefix = ''
1577 self.binary = ''
1578 self.dn = ldb.Dn(samdb, self.dnstring)
1580 def __str__(self):
1581 return self.prefix + str(self.dn.extended_str(mode=1))
1583 def __cmp__(self, other):
1584 """ compare dsdb_Dn values similar to parsed_dn_compare()"""
1585 dn1 = self
1586 dn2 = other
1587 guid1 = dn1.dn.get_extended_component("GUID")
1588 guid2 = dn2.dn.get_extended_component("GUID")
1590 v = cmp(guid1, guid2)
1591 if v != 0:
1592 return v
1593 v = cmp(dn1.binary, dn2.binary)
1594 return v
1596 # In Python3, __cmp__ is replaced by these 6 methods
1597 def __eq__(self, other):
1598 return self.__cmp__(other) == 0
1600 def __ne__(self, other):
1601 return self.__cmp__(other) != 0
1603 def __lt__(self, other):
1604 return self.__cmp__(other) < 0
1606 def __le__(self, other):
1607 return self.__cmp__(other) <= 0
1609 def __gt__(self, other):
1610 return self.__cmp__(other) > 0
1612 def __ge__(self, other):
1613 return self.__cmp__(other) >= 0
1615 def get_binary_integer(self):
1616 """return binary part of a dsdb_Dn as an integer, or None"""
1617 if self.prefix == '':
1618 return None
1619 return int(self.binary, 16)
1621 def get_bytes(self):
1622 """return binary as a byte string"""
1623 return binascii.unhexlify(self.binary)