2 # -*- coding: utf-8 -*-
3 # This is a port of the original in testprogs/ejs/ldap.js
11 sys
.path
.insert(0, "bin/python")
13 samba
.ensure_external_module("testtools", "testtools")
14 samba
.ensure_external_module("subunit", "subunit/python")
16 import samba
.getopt
as options
18 from samba
.auth
import system_session
19 from ldb
import SCOPE_ONELEVEL
, SCOPE_BASE
, LdbError
20 from ldb
import ERR_NO_SUCH_OBJECT
21 from ldb
import ERR_UNWILLING_TO_PERFORM
22 from ldb
import ERR_CONSTRAINT_VIOLATION
23 from ldb
import Message
, MessageElement
, Dn
24 from ldb
import FLAG_MOD_REPLACE
25 from samba
.samdb
import SamDB
26 from samba
.dsdb
import DS_DOMAIN_FUNCTION_2003
27 from samba
.tests
import delete_force
29 from subunit
.run
import SubunitTestRunner
32 parser
= optparse
.OptionParser("ldap_schema.py [options] <host>")
33 sambaopts
= options
.SambaOptions(parser
)
34 parser
.add_option_group(sambaopts
)
35 parser
.add_option_group(options
.VersionOptions(parser
))
36 # use command line creds if available
37 credopts
= options
.CredentialsOptions(parser
)
38 parser
.add_option_group(credopts
)
39 opts
, args
= parser
.parse_args()
47 lp
= sambaopts
.get_loadparm()
48 creds
= credopts
.get_credentials(lp
)
51 class SchemaTests(samba
.tests
.TestCase
):
54 super(SchemaTests
, self
).setUp()
56 self
.base_dn
= ldb
.domain_dn()
57 self
.schema_dn
= ldb
.get_schema_basedn().get_linearized()
59 def test_generated_schema(self
):
60 """Testing we can read the generated schema via LDAP"""
61 res
= self
.ldb
.search("cn=aggregate,"+self
.schema_dn
, scope
=SCOPE_BASE
,
62 attrs
=["objectClasses", "attributeTypes", "dITContentRules"])
63 self
.assertEquals(len(res
), 1)
64 self
.assertTrue("dITContentRules" in res
[0])
65 self
.assertTrue("objectClasses" in res
[0])
66 self
.assertTrue("attributeTypes" in res
[0])
68 def test_generated_schema_is_operational(self
):
69 """Testing we don't get the generated schema via LDAP by default"""
70 res
= self
.ldb
.search("cn=aggregate,"+self
.schema_dn
, scope
=SCOPE_BASE
,
72 self
.assertEquals(len(res
), 1)
73 self
.assertFalse("dITContentRules" in res
[0])
74 self
.assertFalse("objectClasses" in res
[0])
75 self
.assertFalse("attributeTypes" in res
[0])
77 def test_schemaUpdateNow(self
):
78 """Testing schemaUpdateNow"""
79 attr_name
= "test-Attr" + time
.strftime("%s", time
.gmtime())
80 attr_ldap_display_name
= attr_name
.replace("-", "")
83 dn: CN=%s,%s""" % (attr_name
, self
.schema_dn
) + """
85 objectClass: attributeSchema
86 adminDescription: """ + attr_name
+ """
87 adminDisplayName: """ + attr_name
+ """
88 cn: """ + attr_name
+ """
89 attributeId: 1.2.840.""" + str(random
.randint(1,100000)) + """.1.5.9940
90 attributeSyntax: 2.5.5.12
96 self
.ldb
.add_ldif(ldif
)
97 # We must do a schemaUpdateNow otherwise it's not 100% sure that the schema
98 # will contain the new attribute
105 self
.ldb
.modify_ldif(ldif
)
107 # Search for created attribute
109 res
= self
.ldb
.search("cn=%s,%s" % (attr_name
, self
.schema_dn
), scope
=SCOPE_BASE
, attrs
=["*"])
110 self
.assertEquals(len(res
), 1)
111 self
.assertEquals(res
[0]["lDAPDisplayName"][0], attr_ldap_display_name
)
112 self
.assertTrue("schemaIDGUID" in res
[0])
114 class_name
= "test-Class" + time
.strftime("%s", time
.gmtime())
115 class_ldap_display_name
= class_name
.replace("-", "")
117 # First try to create a class with a wrong "defaultObjectCategory"
119 dn: CN=%s,%s""" % (class_name
, self
.schema_dn
) + """
121 objectClass: classSchema
122 defaultObjectCategory: CN=_
123 adminDescription: """ + class_name
+ """
124 adminDisplayName: """ + class_name
+ """
125 cn: """ + class_name
+ """
126 governsId: 1.2.840.""" + str(random
.randint(1,100000)) + """.1.5.9939
128 objectClassCategory: 1
129 subClassOf: organizationalPerson
132 systemMustContain: cn
133 systemMustContain: """ + attr_ldap_display_name
+ """
137 self
.ldb
.add_ldif(ldif
)
139 except LdbError
, (num
, _
):
140 self
.assertEquals(num
, ERR_CONSTRAINT_VIOLATION
)
143 dn: CN=%s,%s""" % (class_name
, self
.schema_dn
) + """
145 objectClass: classSchema
146 adminDescription: """ + class_name
+ """
147 adminDisplayName: """ + class_name
+ """
148 cn: """ + class_name
+ """
149 governsId: 1.2.840.""" + str(random
.randint(1,100000)) + """.1.5.9939
151 objectClassCategory: 1
152 subClassOf: organizationalPerson
155 systemMustContain: cn
156 systemMustContain: """ + attr_ldap_display_name
+ """
159 self
.ldb
.add_ldif(ldif
)
161 # Search for created objectclass
163 res
= self
.ldb
.search("cn=%s,%s" % (class_name
, self
.schema_dn
), scope
=SCOPE_BASE
, attrs
=["*"])
164 self
.assertEquals(len(res
), 1)
165 self
.assertEquals(res
[0]["lDAPDisplayName"][0], class_ldap_display_name
)
166 self
.assertEquals(res
[0]["defaultObjectCategory"][0], res
[0]["distinguishedName"][0])
167 self
.assertTrue("schemaIDGUID" in res
[0])
175 self
.ldb
.modify_ldif(ldif
)
177 object_name
= "obj" + time
.strftime("%s", time
.gmtime())
180 dn: CN=%s,CN=Users,%s"""% (object_name
, self
.base_dn
) + """
181 objectClass: organizationalPerson
183 objectClass: """ + class_ldap_display_name
+ """
185 cn: """ + object_name
+ """
187 objectCategory: CN=%s,%s"""% (class_name
, self
.schema_dn
) + """
188 distinguishedName: CN=%s,CN=Users,%s"""% (object_name
, self
.base_dn
) + """
189 name: """ + object_name
+ """
190 """ + attr_ldap_display_name
+ """: test
192 self
.ldb
.add_ldif(ldif
)
194 # Search for created object
196 res
= self
.ldb
.search("cn=%s,cn=Users,%s" % (object_name
, self
.base_dn
), scope
=SCOPE_BASE
, attrs
=["*"])
197 self
.assertEquals(len(res
), 1)
199 delete_force(self
.ldb
, "cn=%s,cn=Users,%s" % (object_name
, self
.base_dn
))
201 def test_subClassOf(self
):
202 """ Testing usage of custom child schamaClass
205 class_name
= "my-Class" + time
.strftime("%s", time
.gmtime())
206 class_ldap_display_name
= class_name
.replace("-", "")
209 dn: CN=%s,%s""" % (class_name
, self
.schema_dn
) + """
211 objectClass: classSchema
212 adminDescription: """ + class_name
+ """
213 adminDisplayName: """ + class_name
+ """
214 cn: """ + class_name
+ """
215 governsId: 1.2.840.""" + str(random
.randint(1,100000)) + """.1.5.9939
217 objectClassCategory: 1
218 subClassOf: organizationalUnit
222 self
.ldb
.add_ldif(ldif
)
224 # Search for created objectclass
226 res
= self
.ldb
.search("cn=%s,%s" % (class_name
, self
.schema_dn
), scope
=SCOPE_BASE
, attrs
=["*"])
227 self
.assertEquals(len(res
), 1)
228 self
.assertEquals(res
[0]["lDAPDisplayName"][0], class_ldap_display_name
)
229 self
.assertEquals(res
[0]["defaultObjectCategory"][0], res
[0]["distinguishedName"][0])
230 self
.assertTrue("schemaIDGUID" in res
[0])
238 self
.ldb
.modify_ldif(ldif
)
240 object_name
= "org" + time
.strftime("%s", time
.gmtime())
243 dn: OU=%s,%s""" % (object_name
, self
.base_dn
) + """
244 objectClass: """ + class_ldap_display_name
+ """
245 ou: """ + object_name
+ """
248 self
.ldb
.add_ldif(ldif
)
250 # Search for created object
252 res
= self
.ldb
.search("ou=%s,%s" % (object_name
, self
.base_dn
), scope
=SCOPE_BASE
, attrs
=["*"])
253 self
.assertEquals(len(res
), 1)
255 delete_force(self
.ldb
, "ou=%s,%s" % (object_name
, self
.base_dn
))
258 class SchemaTests_msDS_IntId(samba
.tests
.TestCase
):
261 super(SchemaTests_msDS_IntId
, self
).setUp()
263 res
= ldb
.search(base
="", expression
="", scope
=SCOPE_BASE
, attrs
=["*"])
264 self
.assertEquals(len(res
), 1)
265 self
.schema_dn
= res
[0]["schemaNamingContext"][0]
266 self
.base_dn
= res
[0]["defaultNamingContext"][0]
267 self
.forest_level
= int(res
[0]["forestFunctionality"][0])
269 def _ldap_schemaUpdateNow(self
):
276 self
.ldb
.modify_ldif(ldif
)
278 def _make_obj_names(self
, prefix
):
279 class_name
= prefix
+ time
.strftime("%s", time
.gmtime())
280 class_ldap_name
= class_name
.replace("-", "")
281 class_dn
= "CN=%s,%s" % (class_name
, self
.schema_dn
)
282 return (class_name
, class_ldap_name
, class_dn
)
284 def _is_schema_base_object(self
, ldb_msg
):
285 """Test systemFlags for SYSTEM_FLAG_SCHEMA_BASE_OBJECT (16)"""
287 if "systemFlags" in ldb_msg
:
288 systemFlags
= int(ldb_msg
["systemFlags"][0])
289 return (systemFlags
& 16) != 0
291 def _make_attr_ldif(self
, attr_name
, attr_dn
):
293 dn: """ + attr_dn
+ """
295 objectClass: attributeSchema
296 adminDescription: """ + attr_name
+ """
297 adminDisplayName: """ + attr_name
+ """
298 cn: """ + attr_name
+ """
299 attributeId: 1.2.840.""" + str(random
.randint(1,100000)) + """.1.5.9940
300 attributeSyntax: 2.5.5.12
308 def test_msDS_IntId_on_attr(self
):
309 """Testing msDs-IntId creation for Attributes.
310 See MS-ADTS - 3.1.1.Attributes
312 This test should verify that:
313 - Creating attribute with 'msDS-IntId' fails with ERR_UNWILLING_TO_PERFORM
314 - Adding 'msDS-IntId' on existing attribute fails with ERR_CONSTRAINT_VIOLATION
315 - Creating attribute with 'msDS-IntId' set and FLAG_SCHEMA_BASE_OBJECT flag
316 set fails with ERR_UNWILLING_TO_PERFORM
317 - Attributes created with FLAG_SCHEMA_BASE_OBJECT not set have
318 'msDS-IntId' attribute added internally
321 # 1. Create attribute without systemFlags
322 # msDS-IntId should be created if forest functional
323 # level is >= DS_DOMAIN_FUNCTION_2003
324 # and missing otherwise
325 (attr_name
, attr_ldap_name
, attr_dn
) = self
._make
_obj
_names
("msDS-IntId-Attr-1-")
326 ldif
= self
._make
_attr
_ldif
(attr_name
, attr_dn
)
328 # try to add msDS-IntId during Attribute creation
329 ldif_fail
= ldif
+ "msDS-IntId: -1993108831\n"
331 self
.ldb
.add_ldif(ldif_fail
)
332 self
.fail("Adding attribute with preset msDS-IntId should fail")
333 except LdbError
, (num
, _
):
334 self
.assertEquals(num
, ERR_UNWILLING_TO_PERFORM
)
336 # add the new attribute and update schema
337 self
.ldb
.add_ldif(ldif
)
338 self
._ldap
_schemaUpdateNow
()
340 # Search for created attribute
342 res
= self
.ldb
.search(attr_dn
, scope
=SCOPE_BASE
, attrs
=["*"])
343 self
.assertEquals(len(res
), 1)
344 self
.assertEquals(res
[0]["lDAPDisplayName"][0], attr_ldap_name
)
345 if self
.forest_level
>= DS_DOMAIN_FUNCTION_2003
:
346 if self
._is
_schema
_base
_object
(res
[0]):
347 self
.assertTrue("msDS-IntId" not in res
[0])
349 self
.assertTrue("msDS-IntId" in res
[0])
351 self
.assertTrue("msDS-IntId" not in res
[0])
354 msg
.dn
= Dn(self
.ldb
, attr_dn
)
355 msg
["msDS-IntId"] = MessageElement("-1993108831", FLAG_MOD_REPLACE
, "msDS-IntId")
358 self
.fail("Modifying msDS-IntId should return error")
359 except LdbError
, (num
, _
):
360 self
.assertEquals(num
, ERR_CONSTRAINT_VIOLATION
)
362 # 2. Create attribute with systemFlags = FLAG_SCHEMA_BASE_OBJECT
363 # msDS-IntId should be created if forest functional
364 # level is >= DS_DOMAIN_FUNCTION_2003
365 # and missing otherwise
366 (attr_name
, attr_ldap_name
, attr_dn
) = self
._make
_obj
_names
("msDS-IntId-Attr-2-")
367 ldif
= self
._make
_attr
_ldif
(attr_name
, attr_dn
)
368 ldif
+= "systemFlags: 16\n"
370 # try to add msDS-IntId during Attribute creation
371 ldif_fail
= ldif
+ "msDS-IntId: -1993108831\n"
373 self
.ldb
.add_ldif(ldif_fail
)
374 self
.fail("Adding attribute with preset msDS-IntId should fail")
375 except LdbError
, (num
, _
):
376 self
.assertEquals(num
, ERR_UNWILLING_TO_PERFORM
)
378 # add the new attribute and update schema
379 self
.ldb
.add_ldif(ldif
)
380 self
._ldap
_schemaUpdateNow
()
382 # Search for created attribute
384 res
= self
.ldb
.search(attr_dn
, scope
=SCOPE_BASE
, attrs
=["*"])
385 self
.assertEquals(len(res
), 1)
386 self
.assertEquals(res
[0]["lDAPDisplayName"][0], attr_ldap_name
)
387 if self
.forest_level
>= DS_DOMAIN_FUNCTION_2003
:
388 if self
._is
_schema
_base
_object
(res
[0]):
389 self
.assertTrue("msDS-IntId" not in res
[0])
391 self
.assertTrue("msDS-IntId" in res
[0])
393 self
.assertTrue("msDS-IntId" not in res
[0])
396 msg
.dn
= Dn(self
.ldb
, attr_dn
)
397 msg
["msDS-IntId"] = MessageElement("-1993108831", FLAG_MOD_REPLACE
, "msDS-IntId")
400 self
.fail("Modifying msDS-IntId should return error")
401 except LdbError
, (num
, _
):
402 self
.assertEquals(num
, ERR_CONSTRAINT_VIOLATION
)
405 def _make_class_ldif(self
, class_dn
, class_name
):
407 dn: """ + class_dn
+ """
409 objectClass: classSchema
410 adminDescription: """ + class_name
+ """
411 adminDisplayName: """ + class_name
+ """
412 cn: """ + class_name
+ """
413 governsId: 1.2.840.""" + str(random
.randint(1,100000)) + """.1.5.9939
415 objectClassCategory: 1
416 subClassOf: organizationalPerson
418 systemMustContain: cn
423 def test_msDS_IntId_on_class(self
):
424 """Testing msDs-IntId creation for Class
425 Reference: MS-ADTS - 3.1.1.2.4.8 Class classSchema"""
427 # 1. Create Class without systemFlags
428 # msDS-IntId should be created if forest functional
429 # level is >= DS_DOMAIN_FUNCTION_2003
430 # and missing otherwise
431 (class_name
, class_ldap_name
, class_dn
) = self
._make
_obj
_names
("msDS-IntId-Class-1-")
432 ldif
= self
._make
_class
_ldif
(class_dn
, class_name
)
434 # try to add msDS-IntId during Class creation
435 ldif_add
= ldif
+ "msDS-IntId: -1993108831\n"
436 self
.ldb
.add_ldif(ldif_add
)
437 self
._ldap
_schemaUpdateNow
()
439 res
= self
.ldb
.search(class_dn
, scope
=SCOPE_BASE
, attrs
=["*"])
440 self
.assertEquals(len(res
), 1)
441 self
.assertEquals(res
[0]["msDS-IntId"][0], "-1993108831")
443 # add a new Class and update schema
444 (class_name
, class_ldap_name
, class_dn
) = self
._make
_obj
_names
("msDS-IntId-Class-2-")
445 ldif
= self
._make
_class
_ldif
(class_dn
, class_name
)
447 self
.ldb
.add_ldif(ldif
)
448 self
._ldap
_schemaUpdateNow
()
450 # Search for created Class
451 res
= self
.ldb
.search(class_dn
, scope
=SCOPE_BASE
, attrs
=["*"])
452 self
.assertEquals(len(res
), 1)
453 self
.assertFalse("msDS-IntId" in res
[0])
456 msg
.dn
= Dn(self
.ldb
, class_dn
)
457 msg
["msDS-IntId"] = MessageElement("-1993108831", FLAG_MOD_REPLACE
, "msDS-IntId")
460 self
.fail("Modifying msDS-IntId should return error")
461 except LdbError
, (num
, _
):
462 self
.assertEquals(num
, ERR_CONSTRAINT_VIOLATION
)
464 # 2. Create Class with systemFlags = FLAG_SCHEMA_BASE_OBJECT
465 # msDS-IntId should be created if forest functional
466 # level is >= DS_DOMAIN_FUNCTION_2003
467 # and missing otherwise
468 (class_name
, class_ldap_name
, class_dn
) = self
._make
_obj
_names
("msDS-IntId-Class-3-")
469 ldif
= self
._make
_class
_ldif
(class_dn
, class_name
)
470 ldif
+= "systemFlags: 16\n"
472 # try to add msDS-IntId during Class creation
473 ldif_add
= ldif
+ "msDS-IntId: -1993108831\n"
474 self
.ldb
.add_ldif(ldif_add
)
476 res
= self
.ldb
.search(class_dn
, scope
=SCOPE_BASE
, attrs
=["*"])
477 self
.assertEquals(len(res
), 1)
478 self
.assertEquals(res
[0]["msDS-IntId"][0], "-1993108831")
480 # add the new Class and update schema
481 (class_name
, class_ldap_name
, class_dn
) = self
._make
_obj
_names
("msDS-IntId-Class-4-")
482 ldif
= self
._make
_class
_ldif
(class_dn
, class_name
)
483 ldif
+= "systemFlags: 16\n"
485 self
.ldb
.add_ldif(ldif
)
486 self
._ldap
_schemaUpdateNow
()
488 # Search for created Class
489 res
= self
.ldb
.search(class_dn
, scope
=SCOPE_BASE
, attrs
=["*"])
490 self
.assertEquals(len(res
), 1)
491 self
.assertFalse("msDS-IntId" in res
[0])
494 msg
.dn
= Dn(self
.ldb
, class_dn
)
495 msg
["msDS-IntId"] = MessageElement("-1993108831", FLAG_MOD_REPLACE
, "msDS-IntId")
498 self
.fail("Modifying msDS-IntId should return error")
499 except LdbError
, (num
, _
):
500 self
.assertEquals(num
, ERR_CONSTRAINT_VIOLATION
)
501 res
= self
.ldb
.search(class_dn
, scope
=SCOPE_BASE
, attrs
=["*"])
502 self
.assertEquals(len(res
), 1)
503 self
.assertFalse("msDS-IntId" in res
[0])
506 def test_verify_msDS_IntId(self
):
507 """Verify msDS-IntId exists only on attributes without FLAG_SCHEMA_BASE_OBJECT flag set"""
509 res
= self
.ldb
.search(self
.schema_dn
, scope
=SCOPE_ONELEVEL
,
510 expression
="objectClass=attributeSchema",
511 attrs
=["systemFlags", "msDS-IntId", "attributeID", "cn"])
512 self
.assertTrue(len(res
) > 1)
514 if self
.forest_level
>= DS_DOMAIN_FUNCTION_2003
:
515 if self
._is
_schema
_base
_object
(ldb_msg
):
516 self
.assertTrue("msDS-IntId" not in ldb_msg
)
518 # don't assert here as there are plenty of
519 # attributes under w2k8 that are not part of
520 # Base Schema (SYSTEM_FLAG_SCHEMA_BASE_OBJECT flag not set)
521 # has not msDS-IntId attribute set
522 #self.assertTrue("msDS-IntId" in ldb_msg, "msDS-IntId expected on: %s" % ldb_msg.dn)
523 if "msDS-IntId" not in ldb_msg
:
525 print "%3d warning: msDS-IntId expected on: %-30s %s" % (count
, ldb_msg
["attributeID"], ldb_msg
["cn"])
527 self
.assertTrue("msDS-IntId" not in ldb_msg
)
530 class SchemaTests_msDS_isRODC(samba
.tests
.TestCase
):
533 super(SchemaTests_msDS_isRODC
, self
).setUp()
535 res
= ldb
.search(base
="", expression
="", scope
=SCOPE_BASE
, attrs
=["*"])
536 self
.assertEquals(len(res
), 1)
537 self
.base_dn
= res
[0]["defaultNamingContext"][0]
539 def test_objectClass_ntdsdsa(self
):
540 res
= self
.ldb
.search(self
.base_dn
, expression
="objectClass=nTDSDSA",
541 attrs
=["msDS-isRODC"], controls
=["search_options:1:2"])
543 self
.assertTrue("msDS-isRODC" in ldb_msg
)
545 def test_objectClass_server(self
):
546 res
= self
.ldb
.search(self
.base_dn
, expression
="objectClass=server",
547 attrs
=["msDS-isRODC"], controls
=["search_options:1:2"])
549 ntds_search_dn
= "CN=NTDS Settings,%s" % ldb_msg
['dn']
551 res_check
= self
.ldb
.search(ntds_search_dn
, attrs
=["objectCategory"])
552 except LdbError
, (num
, _
):
553 self
.assertEquals(num
, ERR_NO_SUCH_OBJECT
)
554 print("Server entry %s doesn't have a NTDS settings object" % res
[0]['dn'])
556 self
.assertTrue("objectCategory" in res_check
[0])
557 self
.assertTrue("msDS-isRODC" in ldb_msg
)
559 def test_objectClass_computer(self
):
560 res
= self
.ldb
.search(self
.base_dn
, expression
="objectClass=computer",
561 attrs
=["serverReferenceBL","msDS-isRODC"], controls
=["search_options:1:2"])
563 if "serverReferenceBL" not in ldb_msg
:
564 print("Computer entry %s doesn't have a serverReferenceBL attribute" % ldb_msg
['dn'])
566 self
.assertTrue("msDS-isRODC" in ldb_msg
)
568 if not "://" in host
:
569 if os
.path
.isfile(host
):
570 host
= "tdb://%s" % host
572 host
= "ldap://%s" % host
575 if host
.startswith("ldap://"):
576 # user 'paged_search' module when connecting remotely
577 ldb_options
= ["modules:paged_searches"]
579 ldb
= SamDB(host
, credentials
=creds
, session_info
=system_session(lp
), lp
=lp
, options
=ldb_options
)
581 runner
= SubunitTestRunner()
583 if not runner
.run(unittest
.makeSuite(SchemaTests
)).wasSuccessful():
585 if not runner
.run(unittest
.makeSuite(SchemaTests_msDS_IntId
)).wasSuccessful():
587 if not runner
.run(unittest
.makeSuite(SchemaTests_msDS_isRODC
)).wasSuccessful():