1 # -*- coding: utf-8 -*-
3 # Unix SMB/CIFS implementation.
4 # Copyright (C) Kamen Mazdrashki <kamenim@samba.org> 2010
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 3 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful,
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 # GNU General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program. If not, see <http://www.gnu.org/licenses/>.
22 # export SUBUNITRUN=$samba4srcdir/scripting/bin/subunitrun
23 # PYTHONPATH="$PYTHONPATH:$samba4srcdir/dsdb/tests/python" $SUBUNITRUN dsdb_schema_attributes
31 from ldb
import SCOPE_BASE
, LdbError
34 class SchemaAttributesTestCase(samba
.tests
.TestCase
):
37 super(SchemaAttributesTestCase
, self
).setUp()
39 self
.lp
= samba
.tests
.env_loadparm()
40 self
.samdb
= samba
.tests
.connect_samdb(self
.lp
.samdb_url())
43 res
= self
.samdb
.search(base
="", expression
="", scope
=SCOPE_BASE
, attrs
=["*"])
44 self
.assertEquals(len(res
), 1)
45 self
.schema_dn
= res
[0]["schemaNamingContext"][0]
46 self
.base_dn
= res
[0]["defaultNamingContext"][0]
47 self
.forest_level
= int(res
[0]["forestFunctionality"][0])
50 super(SchemaAttributesTestCase
, self
).tearDown()
52 def _ldap_schemaUpdateNow(self
):
59 self
.samdb
.modify_ldif(ldif
)
61 def _make_obj_names(self
, prefix
):
62 obj_name
= prefix
+ time
.strftime("%s", time
.gmtime())
63 obj_ldap_name
= obj_name
.replace("-", "")
64 obj_dn
= "CN=%s,%s" % (obj_name
, self
.schema_dn
)
65 return (obj_name
, obj_ldap_name
, obj_dn
)
67 def _make_attr_ldif(self
, attr_name
, attr_dn
, sub_oid
, extra
=None):
69 dn: """ + attr_dn
+ """
71 objectClass: attributeSchema
72 adminDescription: """ + attr_name
+ """
73 adminDisplayName: """ + attr_name
+ """
74 cn: """ + attr_name
+ """
75 attributeId: 1.3.6.1.4.1.7165.4.6.1.8.%d.""" % sub_oid
+ str(random
.randint(1, 100000)) + """
76 attributeSyntax: 2.5.5.12
88 def test_AddIndexedAttribute(self
):
89 # create names for an attribute to add
90 (attr_name
, attr_ldap_name
, attr_dn
) = self
._make
_obj
_names
("schemaAttributes-Attr-")
91 ldif
= self
._make
_attr
_ldif
(attr_name
, attr_dn
, 1,
92 "searchFlags: %d" % samba
.dsdb
.SEARCH_FLAG_ATTINDEX
)
94 # add the new attribute
95 self
.samdb
.add_ldif(ldif
)
96 self
._ldap
_schemaUpdateNow
()
100 attr_res
= self
.samdb
.search(base
="@ATTRIBUTES", scope
=ldb
.SCOPE_BASE
)
102 self
.assertIn(attr_ldap_name
, attr_res
[0])
103 self
.assertEquals(len(attr_res
[0][attr_ldap_name
]), 1)
104 self
.assertEquals(str(attr_res
[0][attr_ldap_name
][0]), "CASE_INSENSITIVE")
108 idx_res
= self
.samdb
.search(base
="@INDEXLIST", scope
=ldb
.SCOPE_BASE
)
110 self
.assertIn(attr_ldap_name
, [str(x
) for x
in idx_res
[0]["@IDXATTR"]])
112 def test_AddUnIndexedAttribute(self
):
113 # create names for an attribute to add
114 (attr_name
, attr_ldap_name
, attr_dn
) = self
._make
_obj
_names
("schemaAttributes-Attr-")
115 ldif
= self
._make
_attr
_ldif
(attr_name
, attr_dn
, 2)
117 # add the new attribute
118 self
.samdb
.add_ldif(ldif
)
119 self
._ldap
_schemaUpdateNow
()
123 attr_res
= self
.samdb
.search(base
="@ATTRIBUTES", scope
=ldb
.SCOPE_BASE
)
125 self
.assertIn(attr_ldap_name
, attr_res
[0])
126 self
.assertEquals(len(attr_res
[0][attr_ldap_name
]), 1)
127 self
.assertEquals(str(attr_res
[0][attr_ldap_name
][0]), "CASE_INSENSITIVE")
131 idx_res
= self
.samdb
.search(base
="@INDEXLIST", scope
=ldb
.SCOPE_BASE
)
133 self
.assertNotIn(attr_ldap_name
, [str(x
) for x
in idx_res
[0]["@IDXATTR"]])
135 def test_AddTwoIndexedAttributes(self
):
136 # create names for an attribute to add
137 (attr_name
, attr_ldap_name
, attr_dn
) = self
._make
_obj
_names
("schemaAttributes-Attr-")
138 ldif
= self
._make
_attr
_ldif
(attr_name
, attr_dn
, 3,
139 "searchFlags: %d" % samba
.dsdb
.SEARCH_FLAG_ATTINDEX
)
141 # add the new attribute
142 self
.samdb
.add_ldif(ldif
)
143 self
._ldap
_schemaUpdateNow
()
145 # create names for an attribute to add
146 (attr_name2
, attr_ldap_name2
, attr_dn2
) = self
._make
_obj
_names
("schemaAttributes-Attr-")
147 ldif
= self
._make
_attr
_ldif
(attr_name2
, attr_dn2
, 4,
148 "searchFlags: %d" % samba
.dsdb
.SEARCH_FLAG_ATTINDEX
)
150 # add the new attribute
151 self
.samdb
.add_ldif(ldif
)
152 self
._ldap
_schemaUpdateNow
()
156 attr_res
= self
.samdb
.search(base
="@ATTRIBUTES", scope
=ldb
.SCOPE_BASE
)
158 self
.assertIn(attr_ldap_name
, attr_res
[0])
159 self
.assertEquals(len(attr_res
[0][attr_ldap_name
]), 1)
160 self
.assertEquals(str(attr_res
[0][attr_ldap_name
][0]), "CASE_INSENSITIVE")
162 self
.assertIn(attr_ldap_name2
, attr_res
[0])
163 self
.assertEquals(len(attr_res
[0][attr_ldap_name2
]), 1)
164 self
.assertEquals(str(attr_res
[0][attr_ldap_name2
][0]), "CASE_INSENSITIVE")
168 idx_res
= self
.samdb
.search(base
="@INDEXLIST", scope
=ldb
.SCOPE_BASE
)
170 self
.assertIn(attr_ldap_name
, [str(x
) for x
in idx_res
[0]["@IDXATTR"]])
171 self
.assertIn(attr_ldap_name2
, [str(x
) for x
in idx_res
[0]["@IDXATTR"]])
173 def test_modify_at_attributes(self
):
174 m
= {"dn": "@ATTRIBUTES",
175 "@TEST_EXTRA": ["HIDDEN"]
178 msg
= ldb
.Message
.from_dict(self
.samdb
, m
, ldb
.FLAG_MOD_ADD
)
179 self
.samdb
.modify(msg
)
181 res
= self
.samdb
.search(base
="@ATTRIBUTES", scope
=ldb
.SCOPE_BASE
,
182 attrs
=["@TEST_EXTRA"])
183 self
.assertEquals(len(res
), 1)
184 self
.assertEquals(str(res
[0].dn
), "@ATTRIBUTES")
185 self
.assertEquals(len(res
[0]), 1)
186 self
.assertTrue("@TEST_EXTRA" in res
[0])
187 self
.assertEquals(len(res
[0]["@TEST_EXTRA"]), 1)
188 self
.assertEquals(str(res
[0]["@TEST_EXTRA"][0]), "HIDDEN")
190 samdb2
= samba
.tests
.connect_samdb(self
.lp
.samdb_url())
192 # We now only update the @ATTRIBUTES when a transaction happens
193 # rather than making a read of the DB do writes.
195 # This avoids locking issues and is more expected
197 samdb2
.transaction_start()
198 samdb2
.transaction_commit()
200 res
= self
.samdb
.search(base
="@ATTRIBUTES", scope
=ldb
.SCOPE_BASE
,
201 attrs
=["@TEST_EXTRA"])
202 self
.assertEquals(len(res
), 1)
203 self
.assertEquals(str(res
[0].dn
), "@ATTRIBUTES")
204 self
.assertEquals(len(res
[0]), 0)
205 self
.assertFalse("@TEST_EXTRA" in res
[0])
207 def test_modify_at_indexlist(self
):
208 m
= {"dn": "@INDEXLIST",
212 msg
= ldb
.Message
.from_dict(self
.samdb
, m
, ldb
.FLAG_MOD_ADD
)
213 self
.samdb
.modify(msg
)
215 res
= self
.samdb
.search(base
="@INDEXLIST", scope
=ldb
.SCOPE_BASE
,
216 attrs
=["@TEST_EXTRA"])
217 self
.assertEquals(len(res
), 1)
218 self
.assertEquals(str(res
[0].dn
), "@INDEXLIST")
219 self
.assertEquals(len(res
[0]), 1)
220 self
.assertTrue("@TEST_EXTRA" in res
[0])
221 self
.assertEquals(len(res
[0]["@TEST_EXTRA"]), 1)
222 self
.assertEquals(str(res
[0]["@TEST_EXTRA"][0]), "1")
224 samdb2
= samba
.tests
.connect_samdb(self
.lp
.samdb_url())
226 # We now only update the @INDEXLIST when a transaction happens
227 # rather than making a read of the DB do writes.
229 # This avoids locking issues and is more expected
231 samdb2
.transaction_start()
232 samdb2
.transaction_commit()
234 res
= self
.samdb
.search(base
="@INDEXLIST", scope
=ldb
.SCOPE_BASE
,
235 attrs
=["@TEST_EXTRA"])
236 self
.assertEquals(len(res
), 1)
237 self
.assertEquals(str(res
[0].dn
), "@INDEXLIST")
238 self
.assertEquals(len(res
[0]), 0)
239 self
.assertFalse("@TEST_EXTRA" in res
[0])
241 def test_modify_fail_of_at_indexlist(self
):
242 m
= {"dn": "@INDEXLIST",
243 "@TEST_NOT_EXTRA": ["1"]
246 msg
= ldb
.Message
.from_dict(self
.samdb
, m
, ldb
.FLAG_MOD_DELETE
)
248 self
.samdb
.modify(msg
)
249 self
.fail("modify of @INDEXLIST with a failed constraint should fail")
250 except LdbError
as err
:
252 self
.assertEquals(enum
, ldb
.ERR_NO_SUCH_ATTRIBUTE
)