1 # Unix SMB/CIFS implementation. Tests for dsdb
2 # Copyright (C) Matthieu Patou <mat@matws.net> 2010
4 # This program is free software; you can redistribute it and/or modify
5 # it under the terms of the GNU General Public License as published by
6 # the Free Software Foundation; either version 3 of the License, or
7 # (at your option) any later version.
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
14 # You should have received a copy of the GNU General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 """Tests for samba.dsdb."""
20 from samba
.credentials
import Credentials
21 from samba
.samdb
import SamDB
22 from samba
.auth
import system_session
23 from samba
.tests
import TestCase
24 from samba
.ndr
import ndr_unpack
, ndr_pack
25 from samba
.dcerpc
import drsblobs
32 class DsdbTests(TestCase
):
35 super(DsdbTests
, self
).setUp()
36 self
.lp
= samba
.tests
.env_loadparm()
37 self
.creds
= Credentials()
38 self
.creds
.guess(self
.lp
)
39 self
.session
= system_session()
40 self
.samdb
= SamDB(session_info
=self
.session
,
41 credentials
=self
.creds
,
44 def test_get_oid_from_attrid(self
):
45 oid
= self
.samdb
.get_oid_from_attid(591614)
46 self
.assertEquals(oid
, "1.2.840.113556.1.4.1790")
48 def test_error_replpropertymetadata(self
):
49 res
= self
.samdb
.search(expression
="cn=Administrator",
50 scope
=ldb
.SCOPE_SUBTREE
,
51 attrs
=["replPropertyMetaData"])
52 repl
= ndr_unpack(drsblobs
.replPropertyMetaDataBlob
,
53 str(res
[0]["replPropertyMetaData"]))
56 # Search for Description
58 old_version
= o
.version
59 o
.version
= o
.version
+ 1
60 replBlob
= ndr_pack(repl
)
63 msg
["replPropertyMetaData"] = ldb
.MessageElement(replBlob
, ldb
.FLAG_MOD_REPLACE
, "replPropertyMetaData")
64 self
.assertRaises(ldb
.LdbError
, self
.samdb
.modify
, msg
, ["local_oid:1.3.6.1.4.1.7165.4.3.14:0"])
66 def test_error_replpropertymetadata_nochange(self
):
67 res
= self
.samdb
.search(expression
="cn=Administrator",
68 scope
=ldb
.SCOPE_SUBTREE
,
69 attrs
=["replPropertyMetaData"])
70 repl
= ndr_unpack(drsblobs
.replPropertyMetaDataBlob
,
71 str(res
[0]["replPropertyMetaData"]))
72 replBlob
= ndr_pack(repl
)
75 msg
["replPropertyMetaData"] = ldb
.MessageElement(replBlob
, ldb
.FLAG_MOD_REPLACE
, "replPropertyMetaData")
76 self
.assertRaises(ldb
.LdbError
, self
.samdb
.modify
, msg
, ["local_oid:1.3.6.1.4.1.7165.4.3.14:0"])
78 def test_error_replpropertymetadata_allow_sort(self
):
79 res
= self
.samdb
.search(expression
="cn=Administrator",
80 scope
=ldb
.SCOPE_SUBTREE
,
81 attrs
=["replPropertyMetaData"])
82 repl
= ndr_unpack(drsblobs
.replPropertyMetaDataBlob
,
83 str(res
[0]["replPropertyMetaData"]))
84 replBlob
= ndr_pack(repl
)
87 msg
["replPropertyMetaData"] = ldb
.MessageElement(replBlob
, ldb
.FLAG_MOD_REPLACE
, "replPropertyMetaData")
88 self
.samdb
.modify(msg
, ["local_oid:1.3.6.1.4.1.7165.4.3.14:0", "local_oid:1.3.6.1.4.1.7165.4.3.25:0"])
90 def test_twoatt_replpropertymetadata(self
):
91 res
= self
.samdb
.search(expression
="cn=Administrator",
92 scope
=ldb
.SCOPE_SUBTREE
,
93 attrs
=["replPropertyMetaData", "uSNChanged"])
94 repl
= ndr_unpack(drsblobs
.replPropertyMetaDataBlob
,
95 str(res
[0]["replPropertyMetaData"]))
98 # Search for Description
100 old_version
= o
.version
101 o
.version
= o
.version
+ 1
102 o
.local_usn
= long(str(res
[0]["uSNChanged"])) + 1
103 replBlob
= ndr_pack(repl
)
106 msg
["replPropertyMetaData"] = ldb
.MessageElement(replBlob
, ldb
.FLAG_MOD_REPLACE
, "replPropertyMetaData")
107 msg
["description"] = ldb
.MessageElement("new val", ldb
.FLAG_MOD_REPLACE
, "description")
108 self
.assertRaises(ldb
.LdbError
, self
.samdb
.modify
, msg
, ["local_oid:1.3.6.1.4.1.7165.4.3.14:0"])
110 def test_set_replpropertymetadata(self
):
111 res
= self
.samdb
.search(expression
="cn=Administrator",
112 scope
=ldb
.SCOPE_SUBTREE
,
113 attrs
=["replPropertyMetaData", "uSNChanged"])
114 repl
= ndr_unpack(drsblobs
.replPropertyMetaDataBlob
,
115 str(res
[0]["replPropertyMetaData"]))
118 # Search for Description
120 old_version
= o
.version
121 o
.version
= o
.version
+ 1
122 o
.local_usn
= long(str(res
[0]["uSNChanged"])) + 1
123 o
.originating_usn
= long(str(res
[0]["uSNChanged"])) + 1
124 replBlob
= ndr_pack(repl
)
127 msg
["replPropertyMetaData"] = ldb
.MessageElement(replBlob
, ldb
.FLAG_MOD_REPLACE
, "replPropertyMetaData")
128 self
.samdb
.modify(msg
, ["local_oid:1.3.6.1.4.1.7165.4.3.14:0"])
130 def test_ok_get_attribute_from_attid(self
):
131 self
.assertEquals(self
.samdb
.get_attribute_from_attid(13), "description")
133 def test_ko_get_attribute_from_attid(self
):
134 self
.assertEquals(self
.samdb
.get_attribute_from_attid(11979), None)
136 def test_get_attribute_replmetadata_version(self
):
137 res
= self
.samdb
.search(expression
="cn=Administrator",
138 scope
=ldb
.SCOPE_SUBTREE
,
140 self
.assertEquals(len(res
), 1)
142 self
.assertEqual(self
.samdb
.get_attribute_replmetadata_version(dn
, "unicodePwd"), 1)
144 def test_set_attribute_replmetadata_version(self
):
145 res
= self
.samdb
.search(expression
="cn=Administrator",
146 scope
=ldb
.SCOPE_SUBTREE
,
148 self
.assertEquals(len(res
), 1)
150 version
= self
.samdb
.get_attribute_replmetadata_version(dn
, "description")
151 self
.samdb
.set_attribute_replmetadata_version(dn
, "description", version
+ 2)
152 self
.assertEqual(self
.samdb
.get_attribute_replmetadata_version(dn
, "description"), version
+ 2)
154 def test_db_lock1(self
):
155 basedn
= self
.samdb
.get_default_basedn()
160 # In the child, close the main DB, re-open just one DB
163 self
.samdb
= SamDB(session_info
=self
.session
,
164 credentials
=self
.creds
,
167 self
.samdb
.transaction_start()
169 dn
= "cn=test_db_lock_user,cn=users," + str(basedn
)
172 "objectclass": "user",
174 self
.samdb
.delete(dn
)
176 # Obtain a write lock
177 self
.samdb
.transaction_prepare_commit()
178 os
.write(w1
, b
"prepared")
181 # Drop the write lock
182 self
.samdb
.transaction_cancel()
185 self
.assertEqual(os
.read(r1
, 8), b
"prepared")
189 # We need to hold this iterator open to hold the all-record lock.
190 res
= self
.samdb
.search_iterator()
192 # This should take at least 2 seconds because the transaction
193 # has a write lock on one backend db open
200 self
.assertGreater(end
- start
, 1.9)
202 (got_pid
, status
) = os
.waitpid(pid
, 0)
203 self
.assertEqual(got_pid
, pid
)
204 self
.assertTrue(os
.WIFEXITED(status
))
205 self
.assertEqual(os
.WEXITSTATUS(status
), 0)
207 def test_db_lock2(self
):
208 basedn
= self
.samdb
.get_default_basedn()
214 # In the child, close the main DB, re-open
217 self
.samdb
= SamDB(session_info
=self
.session
,
218 credentials
=self
.creds
,
221 # We need to hold this iterator open to hold the all-record lock.
222 res
= self
.samdb
.search_iterator()
224 os
.write(w2
, b
"start")
225 if (os
.read(r1
, 7) != b
"started"):
229 if (os
.read(r1
, 5) != b
"added"):
232 # Wait 2 seconds to block prepare_commit() in the child.
233 os
.write(w2
, b
"prepare")
240 if (os
.read(r1
, 8) != b
"prepared"):
245 # We can start the transaction during the search
246 # because both just grab the all-record read lock.
247 self
.assertEqual(os
.read(r2
, 5), b
"start")
248 self
.samdb
.transaction_start()
249 os
.write(w1
, b
"started")
251 self
.assertEqual(os
.read(r2
, 3), b
"add")
252 dn
= "cn=test_db_lock_user,cn=users," + str(basedn
)
255 "objectclass": "user",
257 self
.samdb
.delete(dn
)
258 os
.write(w1
, b
"added")
260 # Obtain a write lock, this will block until
261 # the parent releases the read lock.
262 self
.assertEqual(os
.read(r2
, 7), b
"prepare")
264 self
.samdb
.transaction_prepare_commit()
267 self
.assertGreater(end
- start
, 1.9)
271 os
.write(w1
, b
"prepared")
273 # Drop the write lock
274 self
.samdb
.transaction_cancel()
276 (got_pid
, status
) = os
.waitpid(pid
, 0)
277 self
.assertEqual(got_pid
, pid
)
278 self
.assertTrue(os
.WIFEXITED(status
))
279 self
.assertEqual(os
.WEXITSTATUS(status
), 0)
281 def test_db_lock3(self
):
282 basedn
= self
.samdb
.get_default_basedn()
288 # In the child, close the main DB, re-open
291 self
.samdb
= SamDB(session_info
=self
.session
,
292 credentials
=self
.creds
,
295 # We need to hold this iterator open to hold the all-record lock.
296 res
= self
.samdb
.search_iterator()
298 os
.write(w2
, b
"start")
299 if (os
.read(r1
, 7) != b
"started"):
303 if (os
.read(r1
, 5) != b
"added"):
306 # Wait 2 seconds to block prepare_commit() in the child.
307 os
.write(w2
, b
"prepare")
314 if (os
.read(r1
, 8) != b
"prepared"):
319 # We can start the transaction during the search
320 # because both just grab the all-record read lock.
321 self
.assertEqual(os
.read(r2
, 5), b
"start")
322 self
.samdb
.transaction_start()
323 os
.write(w1
, b
"started")
325 self
.assertEqual(os
.read(r2
, 3), b
"add")
327 # This will end up in the top level db
328 dn
= "@DSDB_LOCK_TEST"
331 self
.samdb
.delete(dn
)
332 os
.write(w1
, b
"added")
334 # Obtain a write lock, this will block until
335 # the child releases the read lock.
336 self
.assertEqual(os
.read(r2
, 7), b
"prepare")
338 self
.samdb
.transaction_prepare_commit()
340 self
.assertGreater(end
- start
, 1.9)
341 os
.write(w1
, b
"prepared")
343 # Drop the write lock
344 self
.samdb
.transaction_cancel()
346 (got_pid
, status
) = os
.waitpid(pid
, 0)
347 self
.assertTrue(os
.WIFEXITED(status
))
348 self
.assertEqual(os
.WEXITSTATUS(status
), 0)
349 self
.assertEqual(got_pid
, pid
)
352 def test_full_db_lock1(self
):
353 basedn
= self
.samdb
.get_default_basedn()
354 backend_filename
= "%s.ldb" % basedn
.get_casefold()
355 backend_subpath
= os
.path
.join("sam.ldb.d",
357 backend_path
= self
.lp
.private_path(backend_subpath
)
362 # In the child, close the main DB, re-open just one DB
366 backenddb
= ldb
.Ldb(backend_path
)
369 backenddb
.transaction_start()
371 backenddb
.add({"dn":"@DSDB_LOCK_TEST"})
372 backenddb
.delete("@DSDB_LOCK_TEST")
374 # Obtain a write lock
375 backenddb
.transaction_prepare_commit()
376 os
.write(w1
, b
"prepared")
379 # Drop the write lock
380 backenddb
.transaction_cancel()
383 self
.assertEqual(os
.read(r1
, 8), b
"prepared")
387 # We need to hold this iterator open to hold the all-record lock.
388 res
= self
.samdb
.search_iterator()
390 # This should take at least 2 seconds because the transaction
391 # has a write lock on one backend db open
394 self
.assertGreater(end
- start
, 1.9)
400 (got_pid
, status
) = os
.waitpid(pid
, 0)
401 self
.assertEqual(got_pid
, pid
)
402 self
.assertTrue(os
.WIFEXITED(status
))
403 self
.assertEqual(os
.WEXITSTATUS(status
), 0)
405 def test_full_db_lock2(self
):
406 basedn
= self
.samdb
.get_default_basedn()
407 backend_filename
= "%s.ldb" % basedn
.get_casefold()
408 backend_subpath
= os
.path
.join("sam.ldb.d",
410 backend_path
= self
.lp
.private_path(backend_subpath
)
417 # In the child, close the main DB, re-open
420 self
.samdb
= SamDB(session_info
=self
.session
,
421 credentials
=self
.creds
,
424 # We need to hold this iterator open to hold the all-record lock.
425 res
= self
.samdb
.search_iterator()
427 os
.write(w2
, b
"start")
428 if (os
.read(r1
, 7) != b
"started"):
431 if (os
.read(r1
, 5) != b
"added"):
434 # Wait 2 seconds to block prepare_commit() in the child.
435 os
.write(w2
, b
"prepare")
442 if (os
.read(r1
, 8) != b
"prepared"):
447 # In the parent, close the main DB, re-open just one DB
450 backenddb
= ldb
.Ldb(backend_path
)
452 # We can start the transaction during the search
453 # because both just grab the all-record read lock.
454 self
.assertEqual(os
.read(r2
, 5), b
"start")
455 backenddb
.transaction_start()
456 os
.write(w1
, b
"started")
458 self
.assertEqual(os
.read(r2
, 3), b
"add")
459 backenddb
.add({"dn":"@DSDB_LOCK_TEST"})
460 backenddb
.delete("@DSDB_LOCK_TEST")
461 os
.write(w1
, b
"added")
463 # Obtain a write lock, this will block until
464 # the child releases the read lock.
465 self
.assertEqual(os
.read(r2
, 7), b
"prepare")
467 backenddb
.transaction_prepare_commit()
471 self
.assertGreater(end
- start
, 1.9)
475 os
.write(w1
, b
"prepared")
477 # Drop the write lock
478 backenddb
.transaction_cancel()
480 (got_pid
, status
) = os
.waitpid(pid
, 0)
481 self
.assertEqual(got_pid
, pid
)
482 self
.assertTrue(os
.WIFEXITED(status
))
483 self
.assertEqual(os
.WEXITSTATUS(status
), 0)