2 # -*- coding: utf-8 -*-
4 # Unix SMB/CIFS implementation.
5 # Copyright (C) Kamen Mazdrashki <kamenim@samba.org> 2010
7 # This program is free software; you can redistribute it and/or modify
8 # it under the terms of the GNU General Public License as published by
9 # the Free Software Foundation; either version 3 of the License, or
10 # (at your option) any later version.
12 # This program is distributed in the hope that it will be useful,
13 # but WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 # GNU General Public License for more details.
17 # You should have received a copy of the GNU General Public License
18 # along with this program. If not, see <http://www.gnu.org/licenses/>.
23 # export DC_SERVER=target_dc_or_local_samdb_url
24 # export SUBUNITRUN=$samba4srcdir/scripting/bin/subunitrun
25 # PYTHONPATH="$PYTHONPATH:$samba4srcdir/lib/ldb/tests/python" $SUBUNITRUN dsdb_schema_info -U"$DOMAIN/$DC_USERNAME"%"$DC_PASSWORD"
32 sys
.path
.insert(0, "bin/python")
34 samba
.ensure_external_module("testtools", "testtools")
36 from ldb
import SCOPE_BASE
, LdbError
39 import samba
.dcerpc
.drsuapi
40 from samba
.dcerpc
.drsblobs
import schemaInfoBlob
41 from samba
.ndr
import ndr_unpack
42 from samba
.dcerpc
.misc
import GUID
45 class SchemaInfoTestCase(samba
.tests
.TestCase
):
47 # static SamDB connection
51 super(SchemaInfoTestCase
, self
).setUp()
53 # connect SamDB if we haven't yet
54 if self
.sam_db
is None:
55 ldb_url
= samba
.tests
.env_get_var_value("DC_SERVER")
56 SchemaInfoTestCase
.sam_db
= samba
.tests
.connect_samdb(ldb_url
)
59 res
= self
.sam_db
.search(base
="", expression
="", scope
=SCOPE_BASE
, attrs
=["*"])
60 self
.assertEquals(len(res
), 1)
61 self
.schema_dn
= res
[0]["schemaNamingContext"][0]
62 self
.base_dn
= res
[0]["defaultNamingContext"][0]
63 self
.forest_level
= int(res
[0]["forestFunctionality"][0])
65 # get DC invocation_id
66 self
.invocation_id
= GUID(self
.sam_db
.get_invocation_id())
69 super(SchemaInfoTestCase
, self
).tearDown()
71 def _getSchemaInfo(self
):
73 schema_info_data
= self
.sam_db
.searchone(attribute
="schemaInfo",
74 basedn
=self
.schema_dn
,
75 expression
="(objectClass=*)",
77 self
.assertEqual(len(schema_info_data
), 21)
78 schema_info
= ndr_unpack(schemaInfoBlob
, schema_info_data
)
79 self
.assertEqual(schema_info
.marker
, 0xFF)
81 # create default schemaInfo if
82 # attribute value is not created yet
83 schema_info
= schemaInfoBlob()
84 schema_info
.revision
= 0
85 schema_info
.invocation_id
= self
.invocation_id
88 def _checkSchemaInfo(self
, schi_before
, schi_after
):
89 self
.assertEqual(schi_before
.revision
+ 1, schi_after
.revision
)
90 self
.assertEqual(schi_before
.invocation_id
, schi_after
.invocation_id
)
91 self
.assertEqual(schi_after
.invocation_id
, self
.invocation_id
)
93 def _ldap_schemaUpdateNow(self
):
100 self
.sam_db
.modify_ldif(ldif
)
102 def _make_obj_names(self
, prefix
):
103 obj_name
= prefix
+ time
.strftime("%s", time
.gmtime())
104 obj_ldap_name
= obj_name
.replace("-", "")
105 obj_dn
= "CN=%s,%s" % (obj_name
, self
.schema_dn
)
106 return (obj_name
, obj_ldap_name
, obj_dn
)
108 def _make_attr_ldif(self
, attr_name
, attr_dn
):
110 dn: """ + attr_dn
+ """
112 objectClass: attributeSchema
113 adminDescription: """ + attr_name
+ """
114 adminDisplayName: """ + attr_name
+ """
115 cn: """ + attr_name
+ """
116 attributeId: 1.2.840.""" + str(random
.randint(1,100000)) + """.1.5.9940
117 attributeSyntax: 2.5.5.12
125 def test_AddModifyAttribute(self
):
126 # get initial schemaInfo
127 schi_before
= self
._getSchemaInfo
()
129 # create names for an attribute to add
130 (attr_name
, attr_ldap_name
, attr_dn
) = self
._make
_obj
_names
("schemaInfo-Attr-")
131 ldif
= self
._make
_attr
_ldif
(attr_name
, attr_dn
)
133 # add the new attribute
134 self
.sam_db
.add_ldif(ldif
)
135 self
._ldap
_schemaUpdateNow
()
136 # compare resulting schemaInfo
137 schi_after
= self
._getSchemaInfo
()
138 self
._checkSchemaInfo
(schi_before
, schi_after
)
140 # rename the Attribute
141 attr_dn_new
= attr_dn
.replace(attr_name
, attr_name
+ "-NEW")
143 self
.sam_db
.rename(attr_dn
, attr_dn_new
)
144 except LdbError
, (num
, _
):
145 self
.fail("failed to change lDAPDisplayName for %s: %s" % (attr_name
, _
))
147 # compare resulting schemaInfo
148 schi_after
= self
._getSchemaInfo
()
149 self
._checkSchemaInfo
(schi_before
, schi_after
)
153 def _make_class_ldif(self
, class_name
, class_dn
):
155 dn: """ + class_dn
+ """
157 objectClass: classSchema
158 adminDescription: """ + class_name
+ """
159 adminDisplayName: """ + class_name
+ """
160 cn: """ + class_name
+ """
161 governsId: 1.2.840.""" + str(random
.randint(1,100000)) + """.1.5.9939
163 objectClassCategory: 1
164 subClassOf: organizationalPerson
166 systemMustContain: cn
171 def test_AddModifyClass(self
):
172 # get initial schemaInfo
173 schi_before
= self
._getSchemaInfo
()
175 # create names for a Class to add
176 (class_name
, class_ldap_name
, class_dn
) = self
._make
_obj
_names
("schemaInfo-Class-")
177 ldif
= self
._make
_class
_ldif
(class_name
, class_dn
)
180 self
.sam_db
.add_ldif(ldif
)
181 self
._ldap
_schemaUpdateNow
()
182 # compare resulting schemaInfo
183 schi_after
= self
._getSchemaInfo
()
184 self
._checkSchemaInfo
(schi_before
, schi_after
)
187 class_dn_new
= class_dn
.replace(class_name
, class_name
+ "-NEW")
189 self
.sam_db
.rename(class_dn
, class_dn_new
)
190 except LdbError
, (num
, _
):
191 self
.fail("failed to change lDAPDisplayName for %s: %s" % (class_name
, _
))
193 # compare resulting schemaInfo
194 schi_after
= self
._getSchemaInfo
()
195 self
._checkSchemaInfo
(schi_before
, schi_after
)