1 # Unix SMB/CIFS implementation. Tests for dsdb
2 # Copyright (C) Matthieu Patou <mat@matws.net> 2010
4 # This program is free software; you can redistribute it and/or modify
5 # it under the terms of the GNU General Public License as published by
6 # the Free Software Foundation; either version 3 of the License, or
7 # (at your option) any later version.
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
14 # You should have received a copy of the GNU General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 """Tests for samba.dsdb."""
20 from samba
.credentials
import Credentials
21 from samba
.samdb
import SamDB
22 from samba
.auth
import system_session
23 from samba
.tests
import TestCase
24 from samba
.tests
import delete_force
25 from samba
.ndr
import ndr_unpack
, ndr_pack
26 from samba
.dcerpc
import drsblobs
27 from samba
import dsdb
34 class DsdbTests(TestCase
):
37 super(DsdbTests
, self
).setUp()
38 self
.lp
= samba
.tests
.env_loadparm()
39 self
.creds
= Credentials()
40 self
.creds
.guess(self
.lp
)
41 self
.session
= system_session()
42 self
.samdb
= SamDB(session_info
=self
.session
,
43 credentials
=self
.creds
,
47 user_name
= "samdb-testuser"
48 user_pass
= samba
.generate_random_password(32, 32)
49 user_description
= "Test user for dsdb test"
51 base_dn
= self
.samdb
.domain_dn()
53 self
.account_dn
= "cn=" + user_name
+ ",cn=Users," + base_dn
54 delete_force(self
.samdb
, self
.account_dn
)
55 self
.samdb
.newuser(username
=user_name
,
57 description
=user_description
)
59 def test_get_oid_from_attrid(self
):
60 oid
= self
.samdb
.get_oid_from_attid(591614)
61 self
.assertEquals(oid
, "1.2.840.113556.1.4.1790")
63 def test_error_replpropertymetadata(self
):
64 res
= self
.samdb
.search(scope
=ldb
.SCOPE_SUBTREE
,
66 attrs
=["replPropertyMetaData"])
67 repl
= ndr_unpack(drsblobs
.replPropertyMetaDataBlob
,
68 str(res
[0]["replPropertyMetaData"]))
71 # Search for Description
73 old_version
= o
.version
74 o
.version
= o
.version
+ 1
75 replBlob
= ndr_pack(repl
)
78 msg
["replPropertyMetaData"] = ldb
.MessageElement(replBlob
, ldb
.FLAG_MOD_REPLACE
, "replPropertyMetaData")
79 self
.assertRaises(ldb
.LdbError
, self
.samdb
.modify
, msg
, ["local_oid:1.3.6.1.4.1.7165.4.3.14:0"])
81 def test_error_replpropertymetadata_nochange(self
):
82 res
= self
.samdb
.search(scope
=ldb
.SCOPE_SUBTREE
,
84 attrs
=["replPropertyMetaData"])
85 repl
= ndr_unpack(drsblobs
.replPropertyMetaDataBlob
,
86 str(res
[0]["replPropertyMetaData"]))
87 replBlob
= ndr_pack(repl
)
90 msg
["replPropertyMetaData"] = ldb
.MessageElement(replBlob
, ldb
.FLAG_MOD_REPLACE
, "replPropertyMetaData")
91 self
.assertRaises(ldb
.LdbError
, self
.samdb
.modify
, msg
, ["local_oid:1.3.6.1.4.1.7165.4.3.14:0"])
93 def test_error_replpropertymetadata_allow_sort(self
):
94 res
= self
.samdb
.search(scope
=ldb
.SCOPE_SUBTREE
,
96 attrs
=["replPropertyMetaData"])
97 repl
= ndr_unpack(drsblobs
.replPropertyMetaDataBlob
,
98 str(res
[0]["replPropertyMetaData"]))
99 replBlob
= ndr_pack(repl
)
102 msg
["replPropertyMetaData"] = ldb
.MessageElement(replBlob
, ldb
.FLAG_MOD_REPLACE
, "replPropertyMetaData")
103 self
.samdb
.modify(msg
, ["local_oid:1.3.6.1.4.1.7165.4.3.14:0", "local_oid:1.3.6.1.4.1.7165.4.3.25:0"])
105 def test_twoatt_replpropertymetadata(self
):
106 res
= self
.samdb
.search(scope
=ldb
.SCOPE_SUBTREE
,
107 base
=self
.account_dn
,
108 attrs
=["replPropertyMetaData", "uSNChanged"])
109 repl
= ndr_unpack(drsblobs
.replPropertyMetaDataBlob
,
110 str(res
[0]["replPropertyMetaData"]))
113 # Search for Description
115 old_version
= o
.version
116 o
.version
= o
.version
+ 1
117 o
.local_usn
= long(str(res
[0]["uSNChanged"])) + 1
118 replBlob
= ndr_pack(repl
)
121 msg
["replPropertyMetaData"] = ldb
.MessageElement(replBlob
, ldb
.FLAG_MOD_REPLACE
, "replPropertyMetaData")
122 msg
["description"] = ldb
.MessageElement("new val", ldb
.FLAG_MOD_REPLACE
, "description")
123 self
.assertRaises(ldb
.LdbError
, self
.samdb
.modify
, msg
, ["local_oid:1.3.6.1.4.1.7165.4.3.14:0"])
125 def test_set_replpropertymetadata(self
):
126 res
= self
.samdb
.search(scope
=ldb
.SCOPE_SUBTREE
,
127 base
=self
.account_dn
,
128 attrs
=["replPropertyMetaData", "uSNChanged"])
129 repl
= ndr_unpack(drsblobs
.replPropertyMetaDataBlob
,
130 str(res
[0]["replPropertyMetaData"]))
133 # Search for Description
135 old_version
= o
.version
136 o
.version
= o
.version
+ 1
137 o
.local_usn
= long(str(res
[0]["uSNChanged"])) + 1
138 o
.originating_usn
= long(str(res
[0]["uSNChanged"])) + 1
139 replBlob
= ndr_pack(repl
)
142 msg
["replPropertyMetaData"] = ldb
.MessageElement(replBlob
, ldb
.FLAG_MOD_REPLACE
, "replPropertyMetaData")
143 self
.samdb
.modify(msg
, ["local_oid:1.3.6.1.4.1.7165.4.3.14:0"])
145 def test_ok_get_attribute_from_attid(self
):
146 self
.assertEquals(self
.samdb
.get_attribute_from_attid(13), "description")
148 def test_ko_get_attribute_from_attid(self
):
149 self
.assertEquals(self
.samdb
.get_attribute_from_attid(11979), None)
151 def test_get_attribute_replmetadata_version(self
):
152 res
= self
.samdb
.search(scope
=ldb
.SCOPE_SUBTREE
,
153 base
=self
.account_dn
,
155 self
.assertEquals(len(res
), 1)
157 self
.assertEqual(self
.samdb
.get_attribute_replmetadata_version(dn
, "unicodePwd"), 2)
159 def test_set_attribute_replmetadata_version(self
):
160 res
= self
.samdb
.search(scope
=ldb
.SCOPE_SUBTREE
,
161 base
=self
.account_dn
,
163 self
.assertEquals(len(res
), 1)
165 version
= self
.samdb
.get_attribute_replmetadata_version(dn
, "description")
166 self
.samdb
.set_attribute_replmetadata_version(dn
, "description", version
+ 2)
167 self
.assertEqual(self
.samdb
.get_attribute_replmetadata_version(dn
, "description"), version
+ 2)
169 def test_db_lock1(self
):
170 basedn
= self
.samdb
.get_default_basedn()
175 # In the child, close the main DB, re-open just one DB
178 self
.samdb
= SamDB(session_info
=self
.session
,
179 credentials
=self
.creds
,
182 self
.samdb
.transaction_start()
184 dn
= "cn=test_db_lock_user,cn=users," + str(basedn
)
187 "objectclass": "user",
189 self
.samdb
.delete(dn
)
191 # Obtain a write lock
192 self
.samdb
.transaction_prepare_commit()
193 os
.write(w1
, b
"prepared")
196 # Drop the write lock
197 self
.samdb
.transaction_cancel()
200 self
.assertEqual(os
.read(r1
, 8), b
"prepared")
204 # We need to hold this iterator open to hold the all-record lock.
205 res
= self
.samdb
.search_iterator()
207 # This should take at least 2 seconds because the transaction
208 # has a write lock on one backend db open
215 self
.assertGreater(end
- start
, 1.9)
217 (got_pid
, status
) = os
.waitpid(pid
, 0)
218 self
.assertEqual(got_pid
, pid
)
219 self
.assertTrue(os
.WIFEXITED(status
))
220 self
.assertEqual(os
.WEXITSTATUS(status
), 0)
222 def test_db_lock2(self
):
223 basedn
= self
.samdb
.get_default_basedn()
229 # In the child, close the main DB, re-open
232 self
.samdb
= SamDB(session_info
=self
.session
,
233 credentials
=self
.creds
,
236 # We need to hold this iterator open to hold the all-record lock.
237 res
= self
.samdb
.search_iterator()
239 os
.write(w2
, b
"start")
240 if (os
.read(r1
, 7) != b
"started"):
244 if (os
.read(r1
, 5) != b
"added"):
247 # Wait 2 seconds to block prepare_commit() in the child.
248 os
.write(w2
, b
"prepare")
255 if (os
.read(r1
, 8) != b
"prepared"):
260 # We can start the transaction during the search
261 # because both just grab the all-record read lock.
262 self
.assertEqual(os
.read(r2
, 5), b
"start")
263 self
.samdb
.transaction_start()
264 os
.write(w1
, b
"started")
266 self
.assertEqual(os
.read(r2
, 3), b
"add")
267 dn
= "cn=test_db_lock_user,cn=users," + str(basedn
)
270 "objectclass": "user",
272 self
.samdb
.delete(dn
)
273 os
.write(w1
, b
"added")
275 # Obtain a write lock, this will block until
276 # the parent releases the read lock.
277 self
.assertEqual(os
.read(r2
, 7), b
"prepare")
279 self
.samdb
.transaction_prepare_commit()
282 self
.assertGreater(end
- start
, 1.9)
286 os
.write(w1
, b
"prepared")
288 # Drop the write lock
289 self
.samdb
.transaction_cancel()
291 (got_pid
, status
) = os
.waitpid(pid
, 0)
292 self
.assertEqual(got_pid
, pid
)
293 self
.assertTrue(os
.WIFEXITED(status
))
294 self
.assertEqual(os
.WEXITSTATUS(status
), 0)
296 def test_db_lock3(self
):
297 basedn
= self
.samdb
.get_default_basedn()
303 # In the child, close the main DB, re-open
306 self
.samdb
= SamDB(session_info
=self
.session
,
307 credentials
=self
.creds
,
310 # We need to hold this iterator open to hold the all-record lock.
311 res
= self
.samdb
.search_iterator()
313 os
.write(w2
, b
"start")
314 if (os
.read(r1
, 7) != b
"started"):
318 if (os
.read(r1
, 5) != b
"added"):
321 # Wait 2 seconds to block prepare_commit() in the child.
322 os
.write(w2
, b
"prepare")
329 if (os
.read(r1
, 8) != b
"prepared"):
334 # We can start the transaction during the search
335 # because both just grab the all-record read lock.
336 self
.assertEqual(os
.read(r2
, 5), b
"start")
337 self
.samdb
.transaction_start()
338 os
.write(w1
, b
"started")
340 self
.assertEqual(os
.read(r2
, 3), b
"add")
342 # This will end up in the top level db
343 dn
= "@DSDB_LOCK_TEST"
346 self
.samdb
.delete(dn
)
347 os
.write(w1
, b
"added")
349 # Obtain a write lock, this will block until
350 # the child releases the read lock.
351 self
.assertEqual(os
.read(r2
, 7), b
"prepare")
353 self
.samdb
.transaction_prepare_commit()
355 self
.assertGreater(end
- start
, 1.9)
356 os
.write(w1
, b
"prepared")
358 # Drop the write lock
359 self
.samdb
.transaction_cancel()
361 (got_pid
, status
) = os
.waitpid(pid
, 0)
362 self
.assertTrue(os
.WIFEXITED(status
))
363 self
.assertEqual(os
.WEXITSTATUS(status
), 0)
364 self
.assertEqual(got_pid
, pid
)
367 def _test_full_db_lock1(self
, backend_path
):
372 # In the child, close the main DB, re-open just one DB
376 backenddb
= ldb
.Ldb(backend_path
)
379 backenddb
.transaction_start()
381 backenddb
.add({"dn":"@DSDB_LOCK_TEST"})
382 backenddb
.delete("@DSDB_LOCK_TEST")
384 # Obtain a write lock
385 backenddb
.transaction_prepare_commit()
386 os
.write(w1
, b
"prepared")
389 # Drop the write lock
390 backenddb
.transaction_cancel()
393 self
.assertEqual(os
.read(r1
, 8), b
"prepared")
397 # We need to hold this iterator open to hold the all-record lock.
398 res
= self
.samdb
.search_iterator()
400 # This should take at least 2 seconds because the transaction
401 # has a write lock on one backend db open
404 self
.assertGreater(end
- start
, 1.9)
410 (got_pid
, status
) = os
.waitpid(pid
, 0)
411 self
.assertEqual(got_pid
, pid
)
412 self
.assertTrue(os
.WIFEXITED(status
))
413 self
.assertEqual(os
.WEXITSTATUS(status
), 0)
415 def test_full_db_lock1(self
):
416 basedn
= self
.samdb
.get_default_basedn()
417 backend_filename
= "%s.ldb" % basedn
.get_casefold()
418 backend_subpath
= os
.path
.join("sam.ldb.d",
420 backend_path
= self
.lp
.private_path(backend_subpath
)
421 self
._test
_full
_db
_lock
1(backend_path
)
424 def test_full_db_lock1_config(self
):
425 basedn
= self
.samdb
.get_config_basedn()
426 backend_filename
= "%s.ldb" % basedn
.get_casefold()
427 backend_subpath
= os
.path
.join("sam.ldb.d",
429 backend_path
= self
.lp
.private_path(backend_subpath
)
430 self
._test
_full
_db
_lock
1(backend_path
)
433 def _test_full_db_lock2(self
, backend_path
):
440 # In the child, close the main DB, re-open
443 self
.samdb
= SamDB(session_info
=self
.session
,
444 credentials
=self
.creds
,
447 # We need to hold this iterator open to hold the all-record lock.
448 res
= self
.samdb
.search_iterator()
450 os
.write(w2
, b
"start")
451 if (os
.read(r1
, 7) != b
"started"):
454 if (os
.read(r1
, 5) != b
"added"):
457 # Wait 2 seconds to block prepare_commit() in the child.
458 os
.write(w2
, b
"prepare")
465 if (os
.read(r1
, 8) != b
"prepared"):
470 # In the parent, close the main DB, re-open just one DB
473 backenddb
= ldb
.Ldb(backend_path
)
475 # We can start the transaction during the search
476 # because both just grab the all-record read lock.
477 self
.assertEqual(os
.read(r2
, 5), b
"start")
478 backenddb
.transaction_start()
479 os
.write(w1
, b
"started")
481 self
.assertEqual(os
.read(r2
, 3), b
"add")
482 backenddb
.add({"dn":"@DSDB_LOCK_TEST"})
483 backenddb
.delete("@DSDB_LOCK_TEST")
484 os
.write(w1
, b
"added")
486 # Obtain a write lock, this will block until
487 # the child releases the read lock.
488 self
.assertEqual(os
.read(r2
, 7), b
"prepare")
490 backenddb
.transaction_prepare_commit()
494 self
.assertGreater(end
- start
, 1.9)
498 os
.write(w1
, b
"prepared")
500 # Drop the write lock
501 backenddb
.transaction_cancel()
503 (got_pid
, status
) = os
.waitpid(pid
, 0)
504 self
.assertEqual(got_pid
, pid
)
505 self
.assertTrue(os
.WIFEXITED(status
))
506 self
.assertEqual(os
.WEXITSTATUS(status
), 0)
508 def test_full_db_lock2(self
):
509 basedn
= self
.samdb
.get_default_basedn()
510 backend_filename
= "%s.ldb" % basedn
.get_casefold()
511 backend_subpath
= os
.path
.join("sam.ldb.d",
513 backend_path
= self
.lp
.private_path(backend_subpath
)
514 self
._test
_full
_db
_lock
2(backend_path
)
516 def test_full_db_lock2_config(self
):
517 basedn
= self
.samdb
.get_config_basedn()
518 backend_filename
= "%s.ldb" % basedn
.get_casefold()
519 backend_subpath
= os
.path
.join("sam.ldb.d",
521 backend_path
= self
.lp
.private_path(backend_subpath
)
522 self
._test
_full
_db
_lock
2(backend_path
)
524 def test_no_error_on_invalid_control(self
):
526 res
= self
.samdb
.search(scope
=ldb
.SCOPE_SUBTREE
,
527 base
=self
.account_dn
,
528 attrs
=["replPropertyMetaData"],
529 controls
=["local_oid:%s:0"
530 % dsdb
.DSDB_CONTROL_INVALID_NOT_IMPLEMENTED
])
531 except ldb
.LdbError
as e
:
532 self
.fail("Should have not raised an exception")
534 def test_error_on_invalid_critical_control(self
):
536 res
= self
.samdb
.search(scope
=ldb
.SCOPE_SUBTREE
,
537 base
=self
.account_dn
,
538 attrs
=["replPropertyMetaData"],
539 controls
=["local_oid:%s:1"
540 % dsdb
.DSDB_CONTROL_INVALID_NOT_IMPLEMENTED
])
541 except ldb
.LdbError
as e
:
542 if e
[0] != ldb
.ERR_UNSUPPORTED_CRITICAL_EXTENSION
:
543 self
.fail("Got %s should have got ERR_UNSUPPORTED_CRITICAL_EXTENSION"