1 # Unix SMB/CIFS implementation. Tests for dsdb
2 # Copyright (C) Matthieu Patou <mat@matws.net> 2010
4 # This program is free software; you can redistribute it and/or modify
5 # it under the terms of the GNU General Public License as published by
6 # the Free Software Foundation; either version 3 of the License, or
7 # (at your option) any later version.
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
14 # You should have received a copy of the GNU General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 """Tests for samba.dsdb."""
20 from samba
.credentials
import Credentials
21 from samba
.samdb
import SamDB
22 from samba
.auth
import system_session
23 from samba
.tests
import TestCase
24 from samba
.ndr
import ndr_unpack
, ndr_pack
25 from samba
.dcerpc
import drsblobs
26 from samba
import dsdb
33 class DsdbTests(TestCase
):
36 super(DsdbTests
, self
).setUp()
37 self
.lp
= samba
.tests
.env_loadparm()
38 self
.creds
= Credentials()
39 self
.creds
.guess(self
.lp
)
40 self
.session
= system_session()
41 self
.samdb
= SamDB(session_info
=self
.session
,
42 credentials
=self
.creds
,
45 def test_get_oid_from_attrid(self
):
46 oid
= self
.samdb
.get_oid_from_attid(591614)
47 self
.assertEquals(oid
, "1.2.840.113556.1.4.1790")
49 def test_error_replpropertymetadata(self
):
50 res
= self
.samdb
.search(expression
="cn=Administrator",
51 scope
=ldb
.SCOPE_SUBTREE
,
52 attrs
=["replPropertyMetaData"])
53 repl
= ndr_unpack(drsblobs
.replPropertyMetaDataBlob
,
54 str(res
[0]["replPropertyMetaData"]))
57 # Search for Description
59 old_version
= o
.version
60 o
.version
= o
.version
+ 1
61 replBlob
= ndr_pack(repl
)
64 msg
["replPropertyMetaData"] = ldb
.MessageElement(replBlob
, ldb
.FLAG_MOD_REPLACE
, "replPropertyMetaData")
65 self
.assertRaises(ldb
.LdbError
, self
.samdb
.modify
, msg
, ["local_oid:1.3.6.1.4.1.7165.4.3.14:0"])
67 def test_error_replpropertymetadata_nochange(self
):
68 res
= self
.samdb
.search(expression
="cn=Administrator",
69 scope
=ldb
.SCOPE_SUBTREE
,
70 attrs
=["replPropertyMetaData"])
71 repl
= ndr_unpack(drsblobs
.replPropertyMetaDataBlob
,
72 str(res
[0]["replPropertyMetaData"]))
73 replBlob
= ndr_pack(repl
)
76 msg
["replPropertyMetaData"] = ldb
.MessageElement(replBlob
, ldb
.FLAG_MOD_REPLACE
, "replPropertyMetaData")
77 self
.assertRaises(ldb
.LdbError
, self
.samdb
.modify
, msg
, ["local_oid:1.3.6.1.4.1.7165.4.3.14:0"])
79 def test_error_replpropertymetadata_allow_sort(self
):
80 res
= self
.samdb
.search(expression
="cn=Administrator",
81 scope
=ldb
.SCOPE_SUBTREE
,
82 attrs
=["replPropertyMetaData"])
83 repl
= ndr_unpack(drsblobs
.replPropertyMetaDataBlob
,
84 str(res
[0]["replPropertyMetaData"]))
85 replBlob
= ndr_pack(repl
)
88 msg
["replPropertyMetaData"] = ldb
.MessageElement(replBlob
, ldb
.FLAG_MOD_REPLACE
, "replPropertyMetaData")
89 self
.samdb
.modify(msg
, ["local_oid:1.3.6.1.4.1.7165.4.3.14:0", "local_oid:1.3.6.1.4.1.7165.4.3.25:0"])
91 def test_twoatt_replpropertymetadata(self
):
92 res
= self
.samdb
.search(expression
="cn=Administrator",
93 scope
=ldb
.SCOPE_SUBTREE
,
94 attrs
=["replPropertyMetaData", "uSNChanged"])
95 repl
= ndr_unpack(drsblobs
.replPropertyMetaDataBlob
,
96 str(res
[0]["replPropertyMetaData"]))
99 # Search for Description
101 old_version
= o
.version
102 o
.version
= o
.version
+ 1
103 o
.local_usn
= long(str(res
[0]["uSNChanged"])) + 1
104 replBlob
= ndr_pack(repl
)
107 msg
["replPropertyMetaData"] = ldb
.MessageElement(replBlob
, ldb
.FLAG_MOD_REPLACE
, "replPropertyMetaData")
108 msg
["description"] = ldb
.MessageElement("new val", ldb
.FLAG_MOD_REPLACE
, "description")
109 self
.assertRaises(ldb
.LdbError
, self
.samdb
.modify
, msg
, ["local_oid:1.3.6.1.4.1.7165.4.3.14:0"])
111 def test_set_replpropertymetadata(self
):
112 res
= self
.samdb
.search(expression
="cn=Administrator",
113 scope
=ldb
.SCOPE_SUBTREE
,
114 attrs
=["replPropertyMetaData", "uSNChanged"])
115 repl
= ndr_unpack(drsblobs
.replPropertyMetaDataBlob
,
116 str(res
[0]["replPropertyMetaData"]))
119 # Search for Description
121 old_version
= o
.version
122 o
.version
= o
.version
+ 1
123 o
.local_usn
= long(str(res
[0]["uSNChanged"])) + 1
124 o
.originating_usn
= long(str(res
[0]["uSNChanged"])) + 1
125 replBlob
= ndr_pack(repl
)
128 msg
["replPropertyMetaData"] = ldb
.MessageElement(replBlob
, ldb
.FLAG_MOD_REPLACE
, "replPropertyMetaData")
129 self
.samdb
.modify(msg
, ["local_oid:1.3.6.1.4.1.7165.4.3.14:0"])
131 def test_ok_get_attribute_from_attid(self
):
132 self
.assertEquals(self
.samdb
.get_attribute_from_attid(13), "description")
134 def test_ko_get_attribute_from_attid(self
):
135 self
.assertEquals(self
.samdb
.get_attribute_from_attid(11979), None)
137 def test_get_attribute_replmetadata_version(self
):
138 res
= self
.samdb
.search(expression
="cn=Administrator",
139 scope
=ldb
.SCOPE_SUBTREE
,
141 self
.assertEquals(len(res
), 1)
143 self
.assertEqual(self
.samdb
.get_attribute_replmetadata_version(dn
, "unicodePwd"), 1)
145 def test_set_attribute_replmetadata_version(self
):
146 res
= self
.samdb
.search(expression
="cn=Administrator",
147 scope
=ldb
.SCOPE_SUBTREE
,
149 self
.assertEquals(len(res
), 1)
151 version
= self
.samdb
.get_attribute_replmetadata_version(dn
, "description")
152 self
.samdb
.set_attribute_replmetadata_version(dn
, "description", version
+ 2)
153 self
.assertEqual(self
.samdb
.get_attribute_replmetadata_version(dn
, "description"), version
+ 2)
155 def test_db_lock1(self
):
156 basedn
= self
.samdb
.get_default_basedn()
161 # In the child, close the main DB, re-open just one DB
164 self
.samdb
= SamDB(session_info
=self
.session
,
165 credentials
=self
.creds
,
168 self
.samdb
.transaction_start()
170 dn
= "cn=test_db_lock_user,cn=users," + str(basedn
)
173 "objectclass": "user",
175 self
.samdb
.delete(dn
)
177 # Obtain a write lock
178 self
.samdb
.transaction_prepare_commit()
179 os
.write(w1
, b
"prepared")
182 # Drop the write lock
183 self
.samdb
.transaction_cancel()
186 self
.assertEqual(os
.read(r1
, 8), b
"prepared")
190 # We need to hold this iterator open to hold the all-record lock.
191 res
= self
.samdb
.search_iterator()
193 # This should take at least 2 seconds because the transaction
194 # has a write lock on one backend db open
201 self
.assertGreater(end
- start
, 1.9)
203 (got_pid
, status
) = os
.waitpid(pid
, 0)
204 self
.assertEqual(got_pid
, pid
)
205 self
.assertTrue(os
.WIFEXITED(status
))
206 self
.assertEqual(os
.WEXITSTATUS(status
), 0)
208 def test_db_lock2(self
):
209 basedn
= self
.samdb
.get_default_basedn()
215 # In the child, close the main DB, re-open
218 self
.samdb
= SamDB(session_info
=self
.session
,
219 credentials
=self
.creds
,
222 # We need to hold this iterator open to hold the all-record lock.
223 res
= self
.samdb
.search_iterator()
225 os
.write(w2
, b
"start")
226 if (os
.read(r1
, 7) != b
"started"):
230 if (os
.read(r1
, 5) != b
"added"):
233 # Wait 2 seconds to block prepare_commit() in the child.
234 os
.write(w2
, b
"prepare")
241 if (os
.read(r1
, 8) != b
"prepared"):
246 # We can start the transaction during the search
247 # because both just grab the all-record read lock.
248 self
.assertEqual(os
.read(r2
, 5), b
"start")
249 self
.samdb
.transaction_start()
250 os
.write(w1
, b
"started")
252 self
.assertEqual(os
.read(r2
, 3), b
"add")
253 dn
= "cn=test_db_lock_user,cn=users," + str(basedn
)
256 "objectclass": "user",
258 self
.samdb
.delete(dn
)
259 os
.write(w1
, b
"added")
261 # Obtain a write lock, this will block until
262 # the parent releases the read lock.
263 self
.assertEqual(os
.read(r2
, 7), b
"prepare")
265 self
.samdb
.transaction_prepare_commit()
268 self
.assertGreater(end
- start
, 1.9)
272 os
.write(w1
, b
"prepared")
274 # Drop the write lock
275 self
.samdb
.transaction_cancel()
277 (got_pid
, status
) = os
.waitpid(pid
, 0)
278 self
.assertEqual(got_pid
, pid
)
279 self
.assertTrue(os
.WIFEXITED(status
))
280 self
.assertEqual(os
.WEXITSTATUS(status
), 0)
282 def test_db_lock3(self
):
283 basedn
= self
.samdb
.get_default_basedn()
289 # In the child, close the main DB, re-open
292 self
.samdb
= SamDB(session_info
=self
.session
,
293 credentials
=self
.creds
,
296 # We need to hold this iterator open to hold the all-record lock.
297 res
= self
.samdb
.search_iterator()
299 os
.write(w2
, b
"start")
300 if (os
.read(r1
, 7) != b
"started"):
304 if (os
.read(r1
, 5) != b
"added"):
307 # Wait 2 seconds to block prepare_commit() in the child.
308 os
.write(w2
, b
"prepare")
315 if (os
.read(r1
, 8) != b
"prepared"):
320 # We can start the transaction during the search
321 # because both just grab the all-record read lock.
322 self
.assertEqual(os
.read(r2
, 5), b
"start")
323 self
.samdb
.transaction_start()
324 os
.write(w1
, b
"started")
326 self
.assertEqual(os
.read(r2
, 3), b
"add")
328 # This will end up in the top level db
329 dn
= "@DSDB_LOCK_TEST"
332 self
.samdb
.delete(dn
)
333 os
.write(w1
, b
"added")
335 # Obtain a write lock, this will block until
336 # the child releases the read lock.
337 self
.assertEqual(os
.read(r2
, 7), b
"prepare")
339 self
.samdb
.transaction_prepare_commit()
341 self
.assertGreater(end
- start
, 1.9)
342 os
.write(w1
, b
"prepared")
344 # Drop the write lock
345 self
.samdb
.transaction_cancel()
347 (got_pid
, status
) = os
.waitpid(pid
, 0)
348 self
.assertTrue(os
.WIFEXITED(status
))
349 self
.assertEqual(os
.WEXITSTATUS(status
), 0)
350 self
.assertEqual(got_pid
, pid
)
353 def _test_full_db_lock1(self
, backend_path
):
358 # In the child, close the main DB, re-open just one DB
362 backenddb
= ldb
.Ldb(backend_path
)
365 backenddb
.transaction_start()
367 backenddb
.add({"dn":"@DSDB_LOCK_TEST"})
368 backenddb
.delete("@DSDB_LOCK_TEST")
370 # Obtain a write lock
371 backenddb
.transaction_prepare_commit()
372 os
.write(w1
, b
"prepared")
375 # Drop the write lock
376 backenddb
.transaction_cancel()
379 self
.assertEqual(os
.read(r1
, 8), b
"prepared")
383 # We need to hold this iterator open to hold the all-record lock.
384 res
= self
.samdb
.search_iterator()
386 # This should take at least 2 seconds because the transaction
387 # has a write lock on one backend db open
390 self
.assertGreater(end
- start
, 1.9)
396 (got_pid
, status
) = os
.waitpid(pid
, 0)
397 self
.assertEqual(got_pid
, pid
)
398 self
.assertTrue(os
.WIFEXITED(status
))
399 self
.assertEqual(os
.WEXITSTATUS(status
), 0)
401 def test_full_db_lock1(self
):
402 basedn
= self
.samdb
.get_default_basedn()
403 backend_filename
= "%s.ldb" % basedn
.get_casefold()
404 backend_subpath
= os
.path
.join("sam.ldb.d",
406 backend_path
= self
.lp
.private_path(backend_subpath
)
407 self
._test
_full
_db
_lock
1(backend_path
)
410 def test_full_db_lock1_config(self
):
411 basedn
= self
.samdb
.get_config_basedn()
412 backend_filename
= "%s.ldb" % basedn
.get_casefold()
413 backend_subpath
= os
.path
.join("sam.ldb.d",
415 backend_path
= self
.lp
.private_path(backend_subpath
)
416 self
._test
_full
_db
_lock
1(backend_path
)
419 def _test_full_db_lock2(self
, backend_path
):
426 # In the child, close the main DB, re-open
429 self
.samdb
= SamDB(session_info
=self
.session
,
430 credentials
=self
.creds
,
433 # We need to hold this iterator open to hold the all-record lock.
434 res
= self
.samdb
.search_iterator()
436 os
.write(w2
, b
"start")
437 if (os
.read(r1
, 7) != b
"started"):
440 if (os
.read(r1
, 5) != b
"added"):
443 # Wait 2 seconds to block prepare_commit() in the child.
444 os
.write(w2
, b
"prepare")
451 if (os
.read(r1
, 8) != b
"prepared"):
456 # In the parent, close the main DB, re-open just one DB
459 backenddb
= ldb
.Ldb(backend_path
)
461 # We can start the transaction during the search
462 # because both just grab the all-record read lock.
463 self
.assertEqual(os
.read(r2
, 5), b
"start")
464 backenddb
.transaction_start()
465 os
.write(w1
, b
"started")
467 self
.assertEqual(os
.read(r2
, 3), b
"add")
468 backenddb
.add({"dn":"@DSDB_LOCK_TEST"})
469 backenddb
.delete("@DSDB_LOCK_TEST")
470 os
.write(w1
, b
"added")
472 # Obtain a write lock, this will block until
473 # the child releases the read lock.
474 self
.assertEqual(os
.read(r2
, 7), b
"prepare")
476 backenddb
.transaction_prepare_commit()
480 self
.assertGreater(end
- start
, 1.9)
484 os
.write(w1
, b
"prepared")
486 # Drop the write lock
487 backenddb
.transaction_cancel()
489 (got_pid
, status
) = os
.waitpid(pid
, 0)
490 self
.assertEqual(got_pid
, pid
)
491 self
.assertTrue(os
.WIFEXITED(status
))
492 self
.assertEqual(os
.WEXITSTATUS(status
), 0)
494 def test_full_db_lock2(self
):
495 basedn
= self
.samdb
.get_default_basedn()
496 backend_filename
= "%s.ldb" % basedn
.get_casefold()
497 backend_subpath
= os
.path
.join("sam.ldb.d",
499 backend_path
= self
.lp
.private_path(backend_subpath
)
500 self
._test
_full
_db
_lock
2(backend_path
)
502 def test_full_db_lock2_config(self
):
503 basedn
= self
.samdb
.get_config_basedn()
504 backend_filename
= "%s.ldb" % basedn
.get_casefold()
505 backend_subpath
= os
.path
.join("sam.ldb.d",
507 backend_path
= self
.lp
.private_path(backend_subpath
)
508 self
._test
_full
_db
_lock
2(backend_path
)
510 def test_no_error_on_invalid_control(self
):
512 res
= self
.samdb
.search(expression
="cn=Administrator",
513 scope
=ldb
.SCOPE_SUBTREE
,
514 attrs
=["replPropertyMetaData"],
515 controls
=["local_oid:%s:0"
516 % dsdb
.DSDB_CONTROL_INVALID_NOT_IMPLEMENTED
])
517 except ldb
.LdbError
as e
:
518 self
.fail("Should have not raised an exception")
520 def test_error_on_invalid_critical_control(self
):
522 res
= self
.samdb
.search(expression
="cn=Administrator",
523 scope
=ldb
.SCOPE_SUBTREE
,
524 attrs
=["replPropertyMetaData"],
525 controls
=["local_oid:%s:1"
526 % dsdb
.DSDB_CONTROL_INVALID_NOT_IMPLEMENTED
])
527 except ldb
.LdbError
as e
:
528 if e
[0] != ldb
.ERR_UNSUPPORTED_CRITICAL_EXTENSION
:
529 self
.fail("Got %s should have got ERR_UNSUPPORTED_CRITICAL_EXTENSION"