2 # -*- coding: utf-8 -*-
4 # Unix SMB/CIFS implementation.
5 # Copyright (C) Kamen Mazdrashki <kamenim@samba.org> 2010
7 # This program is free software; you can redistribute it and/or modify
8 # it under the terms of the GNU General Public License as published by
9 # the Free Software Foundation; either version 3 of the License, or
10 # (at your option) any later version.
12 # This program is distributed in the hope that it will be useful,
13 # but WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 # GNU General Public License for more details.
17 # You should have received a copy of the GNU General Public License
18 # along with this program. If not, see <http://www.gnu.org/licenses/>.
23 # export DC_SERVER=target_dc_or_local_samdb_url
24 # export SUBUNITRUN=$samba4srcdir/scripting/bin/subunitrun
25 # PYTHONPATH="$PYTHONPATH:$samba4srcdir/lib/ldb/tests/python" $SUBUNITRUN dsdb_schema_info -U"$DOMAIN/$DC_USERNAME"%"$DC_PASSWORD"
32 sys
.path
.insert(0, "bin/python")
35 from ldb
import SCOPE_BASE
, LdbError
38 import samba
.dcerpc
.drsuapi
39 from samba
.dcerpc
.drsblobs
import schemaInfoBlob
40 from samba
.ndr
import ndr_unpack
41 from samba
.dcerpc
.misc
import GUID
44 class SchemaInfoTestCase(samba
.tests
.TestCase
):
46 # static SamDB connection
50 super(SchemaInfoTestCase
, self
).setUp()
52 # connect SamDB if we haven't yet
53 if self
.sam_db
is None:
54 ldb_url
= "ldap://%s" % samba
.tests
.env_get_var_value("DC_SERVER")
55 SchemaInfoTestCase
.sam_db
= samba
.tests
.connect_samdb(ldb_url
)
58 res
= self
.sam_db
.search(base
="", expression
="", scope
=SCOPE_BASE
, attrs
=["*"])
59 self
.assertEquals(len(res
), 1)
60 self
.schema_dn
= res
[0]["schemaNamingContext"][0]
61 self
.base_dn
= res
[0]["defaultNamingContext"][0]
62 self
.forest_level
= int(res
[0]["forestFunctionality"][0])
64 # get DC invocation_id
65 self
.invocation_id
= GUID(self
.sam_db
.get_invocation_id())
68 super(SchemaInfoTestCase
, self
).tearDown()
70 def _getSchemaInfo(self
):
72 schema_info_data
= self
.sam_db
.searchone(attribute
="schemaInfo",
73 basedn
=self
.schema_dn
,
74 expression
="(objectClass=*)",
76 self
.assertEqual(len(schema_info_data
), 21)
77 schema_info
= ndr_unpack(schemaInfoBlob
, schema_info_data
)
78 self
.assertEqual(schema_info
.marker
, 0xFF)
80 # create default schemaInfo if
81 # attribute value is not created yet
82 schema_info
= schemaInfoBlob()
83 schema_info
.revision
= 0
84 schema_info
.invocation_id
= self
.invocation_id
87 def _checkSchemaInfo(self
, schi_before
, schi_after
):
88 self
.assertEqual(schi_before
.revision
+ 1, schi_after
.revision
)
89 self
.assertEqual(schi_before
.invocation_id
, schi_after
.invocation_id
)
90 self
.assertEqual(schi_after
.invocation_id
, self
.invocation_id
)
92 def _ldap_schemaUpdateNow(self
):
99 self
.sam_db
.modify_ldif(ldif
)
101 def _make_obj_names(self
, prefix
):
102 obj_name
= prefix
+ time
.strftime("%s", time
.gmtime())
103 obj_ldap_name
= obj_name
.replace("-", "")
104 obj_dn
= "CN=%s,%s" % (obj_name
, self
.schema_dn
)
105 return (obj_name
, obj_ldap_name
, obj_dn
)
107 def _make_attr_ldif(self
, attr_name
, attr_dn
):
109 dn: """ + attr_dn
+ """
111 objectClass: attributeSchema
112 adminDescription: """ + attr_name
+ """
113 adminDisplayName: """ + attr_name
+ """
114 cn: """ + attr_name
+ """
115 attributeId: 1.2.840.""" + str(random
.randint(1,100000)) + """.1.5.9940
116 attributeSyntax: 2.5.5.12
124 def test_AddModifyAttribute(self
):
125 # get initial schemaInfo
126 schi_before
= self
._getSchemaInfo
()
128 # create names for an attribute to add
129 (attr_name
, attr_ldap_name
, attr_dn
) = self
._make
_obj
_names
("schemaInfo-Attr-")
130 ldif
= self
._make
_attr
_ldif
(attr_name
, attr_dn
)
132 # add the new attribute
133 self
.sam_db
.add_ldif(ldif
)
134 self
._ldap
_schemaUpdateNow
()
135 # compare resulting schemaInfo
136 schi_after
= self
._getSchemaInfo
()
137 self
._checkSchemaInfo
(schi_before
, schi_after
)
139 # rename the Attribute
140 attr_dn_new
= attr_dn
.replace(attr_name
, attr_name
+ "-NEW")
142 self
.sam_db
.rename(attr_dn
, attr_dn_new
)
143 except LdbError
, (num
, _
):
144 self
.fail("failed to change lDAPDisplayName for %s: %s" % (attr_name
, _
))
146 # compare resulting schemaInfo
147 schi_after
= self
._getSchemaInfo
()
148 self
._checkSchemaInfo
(schi_before
, schi_after
)
152 def _make_class_ldif(self
, class_name
, class_dn
):
154 dn: """ + class_dn
+ """
156 objectClass: classSchema
157 adminDescription: """ + class_name
+ """
158 adminDisplayName: """ + class_name
+ """
159 cn: """ + class_name
+ """
160 governsId: 1.3.6.1.4.1.7165.4.6.2.""" + str(random
.randint(1,100000)) + """
162 objectClassCategory: 1
163 subClassOf: organizationalPerson
165 systemMustContain: cn
170 def test_AddModifyClass(self
):
171 # get initial schemaInfo
172 schi_before
= self
._getSchemaInfo
()
174 # create names for a Class to add
175 (class_name
, class_ldap_name
, class_dn
) = self
._make
_obj
_names
("schemaInfo-Class-")
176 ldif
= self
._make
_class
_ldif
(class_name
, class_dn
)
179 self
.sam_db
.add_ldif(ldif
)
180 self
._ldap
_schemaUpdateNow
()
181 # compare resulting schemaInfo
182 schi_after
= self
._getSchemaInfo
()
183 self
._checkSchemaInfo
(schi_before
, schi_after
)
186 class_dn_new
= class_dn
.replace(class_name
, class_name
+ "-NEW")
188 self
.sam_db
.rename(class_dn
, class_dn_new
)
189 except LdbError
, (num
, _
):
190 self
.fail("failed to change lDAPDisplayName for %s: %s" % (class_name
, _
))
192 # compare resulting schemaInfo
193 schi_after
= self
._getSchemaInfo
()
194 self
._checkSchemaInfo
(schi_before
, schi_after
)