python:tests: Create a test user for the dsdb test
[Samba.git] / python / samba / tests / dsdb.py
blob493ea25db53112fe450e049d01a771e74a022ce1
1 # Unix SMB/CIFS implementation. Tests for dsdb
2 # Copyright (C) Matthieu Patou <mat@matws.net> 2010
4 # This program is free software; you can redistribute it and/or modify
5 # it under the terms of the GNU General Public License as published by
6 # the Free Software Foundation; either version 3 of the License, or
7 # (at your option) any later version.
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
14 # You should have received a copy of the GNU General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 """Tests for samba.dsdb."""
20 from samba.credentials import Credentials
21 from samba.samdb import SamDB
22 from samba.auth import system_session
23 from samba.tests import TestCase
24 from samba.tests import delete_force
25 from samba.ndr import ndr_unpack, ndr_pack
26 from samba.dcerpc import drsblobs
27 from samba import dsdb
28 import ldb
29 import os
30 import samba
31 import gc
32 import time
34 class DsdbTests(TestCase):
36 def setUp(self):
37 super(DsdbTests, self).setUp()
38 self.lp = samba.tests.env_loadparm()
39 self.creds = Credentials()
40 self.creds.guess(self.lp)
41 self.session = system_session()
42 self.samdb = SamDB(session_info=self.session,
43 credentials=self.creds,
44 lp=self.lp)
46 # Create a test user
47 user_name = "samdb-testuser"
48 user_pass = samba.generate_random_password(32, 32)
49 user_description = "Test user for dsdb test"
51 base_dn = self.samdb.domain_dn()
53 self.account_dn = "cn=" + user_name + ",cn=Users," + base_dn
54 delete_force(self.samdb, self.account_dn)
55 self.samdb.newuser(username=user_name,
56 password=user_pass,
57 description=user_description)
59 def test_get_oid_from_attrid(self):
60 oid = self.samdb.get_oid_from_attid(591614)
61 self.assertEquals(oid, "1.2.840.113556.1.4.1790")
63 def test_error_replpropertymetadata(self):
64 res = self.samdb.search(scope=ldb.SCOPE_SUBTREE,
65 base=self.account_dn,
66 attrs=["replPropertyMetaData"])
67 repl = ndr_unpack(drsblobs.replPropertyMetaDataBlob,
68 str(res[0]["replPropertyMetaData"]))
69 ctr = repl.ctr
70 for o in ctr.array:
71 # Search for Description
72 if o.attid == 13:
73 old_version = o.version
74 o.version = o.version + 1
75 replBlob = ndr_pack(repl)
76 msg = ldb.Message()
77 msg.dn = res[0].dn
78 msg["replPropertyMetaData"] = ldb.MessageElement(replBlob, ldb.FLAG_MOD_REPLACE, "replPropertyMetaData")
79 self.assertRaises(ldb.LdbError, self.samdb.modify, msg, ["local_oid:1.3.6.1.4.1.7165.4.3.14:0"])
81 def test_error_replpropertymetadata_nochange(self):
82 res = self.samdb.search(scope=ldb.SCOPE_SUBTREE,
83 base=self.account_dn,
84 attrs=["replPropertyMetaData"])
85 repl = ndr_unpack(drsblobs.replPropertyMetaDataBlob,
86 str(res[0]["replPropertyMetaData"]))
87 replBlob = ndr_pack(repl)
88 msg = ldb.Message()
89 msg.dn = res[0].dn
90 msg["replPropertyMetaData"] = ldb.MessageElement(replBlob, ldb.FLAG_MOD_REPLACE, "replPropertyMetaData")
91 self.assertRaises(ldb.LdbError, self.samdb.modify, msg, ["local_oid:1.3.6.1.4.1.7165.4.3.14:0"])
93 def test_error_replpropertymetadata_allow_sort(self):
94 res = self.samdb.search(scope=ldb.SCOPE_SUBTREE,
95 base=self.account_dn,
96 attrs=["replPropertyMetaData"])
97 repl = ndr_unpack(drsblobs.replPropertyMetaDataBlob,
98 str(res[0]["replPropertyMetaData"]))
99 replBlob = ndr_pack(repl)
100 msg = ldb.Message()
101 msg.dn = res[0].dn
102 msg["replPropertyMetaData"] = ldb.MessageElement(replBlob, ldb.FLAG_MOD_REPLACE, "replPropertyMetaData")
103 self.samdb.modify(msg, ["local_oid:1.3.6.1.4.1.7165.4.3.14:0", "local_oid:1.3.6.1.4.1.7165.4.3.25:0"])
105 def test_twoatt_replpropertymetadata(self):
106 res = self.samdb.search(scope=ldb.SCOPE_SUBTREE,
107 base=self.account_dn,
108 attrs=["replPropertyMetaData", "uSNChanged"])
109 repl = ndr_unpack(drsblobs.replPropertyMetaDataBlob,
110 str(res[0]["replPropertyMetaData"]))
111 ctr = repl.ctr
112 for o in ctr.array:
113 # Search for Description
114 if o.attid == 13:
115 old_version = o.version
116 o.version = o.version + 1
117 o.local_usn = long(str(res[0]["uSNChanged"])) + 1
118 replBlob = ndr_pack(repl)
119 msg = ldb.Message()
120 msg.dn = res[0].dn
121 msg["replPropertyMetaData"] = ldb.MessageElement(replBlob, ldb.FLAG_MOD_REPLACE, "replPropertyMetaData")
122 msg["description"] = ldb.MessageElement("new val", ldb.FLAG_MOD_REPLACE, "description")
123 self.assertRaises(ldb.LdbError, self.samdb.modify, msg, ["local_oid:1.3.6.1.4.1.7165.4.3.14:0"])
125 def test_set_replpropertymetadata(self):
126 res = self.samdb.search(scope=ldb.SCOPE_SUBTREE,
127 base=self.account_dn,
128 attrs=["replPropertyMetaData", "uSNChanged"])
129 repl = ndr_unpack(drsblobs.replPropertyMetaDataBlob,
130 str(res[0]["replPropertyMetaData"]))
131 ctr = repl.ctr
132 for o in ctr.array:
133 # Search for Description
134 if o.attid == 13:
135 old_version = o.version
136 o.version = o.version + 1
137 o.local_usn = long(str(res[0]["uSNChanged"])) + 1
138 o.originating_usn = long(str(res[0]["uSNChanged"])) + 1
139 replBlob = ndr_pack(repl)
140 msg = ldb.Message()
141 msg.dn = res[0].dn
142 msg["replPropertyMetaData"] = ldb.MessageElement(replBlob, ldb.FLAG_MOD_REPLACE, "replPropertyMetaData")
143 self.samdb.modify(msg, ["local_oid:1.3.6.1.4.1.7165.4.3.14:0"])
145 def test_ok_get_attribute_from_attid(self):
146 self.assertEquals(self.samdb.get_attribute_from_attid(13), "description")
148 def test_ko_get_attribute_from_attid(self):
149 self.assertEquals(self.samdb.get_attribute_from_attid(11979), None)
151 def test_get_attribute_replmetadata_version(self):
152 res = self.samdb.search(scope=ldb.SCOPE_SUBTREE,
153 base=self.account_dn,
154 attrs=["dn"])
155 self.assertEquals(len(res), 1)
156 dn = str(res[0].dn)
157 self.assertEqual(self.samdb.get_attribute_replmetadata_version(dn, "unicodePwd"), 2)
159 def test_set_attribute_replmetadata_version(self):
160 res = self.samdb.search(scope=ldb.SCOPE_SUBTREE,
161 base=self.account_dn,
162 attrs=["dn"])
163 self.assertEquals(len(res), 1)
164 dn = str(res[0].dn)
165 version = self.samdb.get_attribute_replmetadata_version(dn, "description")
166 self.samdb.set_attribute_replmetadata_version(dn, "description", version + 2)
167 self.assertEqual(self.samdb.get_attribute_replmetadata_version(dn, "description"), version + 2)
169 def test_db_lock1(self):
170 basedn = self.samdb.get_default_basedn()
171 (r1, w1) = os.pipe()
173 pid = os.fork()
174 if pid == 0:
175 # In the child, close the main DB, re-open just one DB
176 del(self.samdb)
177 gc.collect()
178 self.samdb = SamDB(session_info=self.session,
179 credentials=self.creds,
180 lp=self.lp)
182 self.samdb.transaction_start()
184 dn = "cn=test_db_lock_user,cn=users," + str(basedn)
185 self.samdb.add({
186 "dn": dn,
187 "objectclass": "user",
189 self.samdb.delete(dn)
191 # Obtain a write lock
192 self.samdb.transaction_prepare_commit()
193 os.write(w1, b"prepared")
194 time.sleep(2)
196 # Drop the write lock
197 self.samdb.transaction_cancel()
198 os._exit(0)
200 self.assertEqual(os.read(r1, 8), b"prepared")
202 start = time.time()
204 # We need to hold this iterator open to hold the all-record lock.
205 res = self.samdb.search_iterator()
207 # This should take at least 2 seconds because the transaction
208 # has a write lock on one backend db open
210 # Release the locks
211 for l in res:
212 pass
214 end = time.time()
215 self.assertGreater(end - start, 1.9)
217 (got_pid, status) = os.waitpid(pid, 0)
218 self.assertEqual(got_pid, pid)
219 self.assertTrue(os.WIFEXITED(status))
220 self.assertEqual(os.WEXITSTATUS(status), 0)
222 def test_db_lock2(self):
223 basedn = self.samdb.get_default_basedn()
224 (r1, w1) = os.pipe()
225 (r2, w2) = os.pipe()
227 pid = os.fork()
228 if pid == 0:
229 # In the child, close the main DB, re-open
230 del(self.samdb)
231 gc.collect()
232 self.samdb = SamDB(session_info=self.session,
233 credentials=self.creds,
234 lp=self.lp)
236 # We need to hold this iterator open to hold the all-record lock.
237 res = self.samdb.search_iterator()
239 os.write(w2, b"start")
240 if (os.read(r1, 7) != b"started"):
241 os._exit(1)
243 os.write(w2, b"add")
244 if (os.read(r1, 5) != b"added"):
245 os._exit(2)
247 # Wait 2 seconds to block prepare_commit() in the child.
248 os.write(w2, b"prepare")
249 time.sleep(2)
251 # Release the locks
252 for l in res:
253 pass
255 if (os.read(r1, 8) != b"prepared"):
256 os._exit(3)
258 os._exit(0)
260 # We can start the transaction during the search
261 # because both just grab the all-record read lock.
262 self.assertEqual(os.read(r2, 5), b"start")
263 self.samdb.transaction_start()
264 os.write(w1, b"started")
266 self.assertEqual(os.read(r2, 3), b"add")
267 dn = "cn=test_db_lock_user,cn=users," + str(basedn)
268 self.samdb.add({
269 "dn": dn,
270 "objectclass": "user",
272 self.samdb.delete(dn)
273 os.write(w1, b"added")
275 # Obtain a write lock, this will block until
276 # the parent releases the read lock.
277 self.assertEqual(os.read(r2, 7), b"prepare")
278 start = time.time()
279 self.samdb.transaction_prepare_commit()
280 end = time.time()
281 try:
282 self.assertGreater(end - start, 1.9)
283 except:
284 raise
285 finally:
286 os.write(w1, b"prepared")
288 # Drop the write lock
289 self.samdb.transaction_cancel()
291 (got_pid, status) = os.waitpid(pid, 0)
292 self.assertEqual(got_pid, pid)
293 self.assertTrue(os.WIFEXITED(status))
294 self.assertEqual(os.WEXITSTATUS(status), 0)
296 def test_db_lock3(self):
297 basedn = self.samdb.get_default_basedn()
298 (r1, w1) = os.pipe()
299 (r2, w2) = os.pipe()
301 pid = os.fork()
302 if pid == 0:
303 # In the child, close the main DB, re-open
304 del(self.samdb)
305 gc.collect()
306 self.samdb = SamDB(session_info=self.session,
307 credentials=self.creds,
308 lp=self.lp)
310 # We need to hold this iterator open to hold the all-record lock.
311 res = self.samdb.search_iterator()
313 os.write(w2, b"start")
314 if (os.read(r1, 7) != b"started"):
315 os._exit(1)
317 os.write(w2, b"add")
318 if (os.read(r1, 5) != b"added"):
319 os._exit(2)
321 # Wait 2 seconds to block prepare_commit() in the child.
322 os.write(w2, b"prepare")
323 time.sleep(2)
325 # Release the locks
326 for l in res:
327 pass
329 if (os.read(r1, 8) != b"prepared"):
330 os._exit(3)
332 os._exit(0)
334 # We can start the transaction during the search
335 # because both just grab the all-record read lock.
336 self.assertEqual(os.read(r2, 5), b"start")
337 self.samdb.transaction_start()
338 os.write(w1, b"started")
340 self.assertEqual(os.read(r2, 3), b"add")
342 # This will end up in the top level db
343 dn = "@DSDB_LOCK_TEST"
344 self.samdb.add({
345 "dn": dn})
346 self.samdb.delete(dn)
347 os.write(w1, b"added")
349 # Obtain a write lock, this will block until
350 # the child releases the read lock.
351 self.assertEqual(os.read(r2, 7), b"prepare")
352 start = time.time()
353 self.samdb.transaction_prepare_commit()
354 end = time.time()
355 self.assertGreater(end - start, 1.9)
356 os.write(w1, b"prepared")
358 # Drop the write lock
359 self.samdb.transaction_cancel()
361 (got_pid, status) = os.waitpid(pid, 0)
362 self.assertTrue(os.WIFEXITED(status))
363 self.assertEqual(os.WEXITSTATUS(status), 0)
364 self.assertEqual(got_pid, pid)
367 def _test_full_db_lock1(self, backend_path):
368 (r1, w1) = os.pipe()
370 pid = os.fork()
371 if pid == 0:
372 # In the child, close the main DB, re-open just one DB
373 del(self.samdb)
374 gc.collect()
376 backenddb = ldb.Ldb(backend_path)
379 backenddb.transaction_start()
381 backenddb.add({"dn":"@DSDB_LOCK_TEST"})
382 backenddb.delete("@DSDB_LOCK_TEST")
384 # Obtain a write lock
385 backenddb.transaction_prepare_commit()
386 os.write(w1, b"prepared")
387 time.sleep(2)
389 # Drop the write lock
390 backenddb.transaction_cancel()
391 os._exit(0)
393 self.assertEqual(os.read(r1, 8), b"prepared")
395 start = time.time()
397 # We need to hold this iterator open to hold the all-record lock.
398 res = self.samdb.search_iterator()
400 # This should take at least 2 seconds because the transaction
401 # has a write lock on one backend db open
403 end = time.time()
404 self.assertGreater(end - start, 1.9)
406 # Release the locks
407 for l in res:
408 pass
410 (got_pid, status) = os.waitpid(pid, 0)
411 self.assertEqual(got_pid, pid)
412 self.assertTrue(os.WIFEXITED(status))
413 self.assertEqual(os.WEXITSTATUS(status), 0)
415 def test_full_db_lock1(self):
416 basedn = self.samdb.get_default_basedn()
417 backend_filename = "%s.ldb" % basedn.get_casefold()
418 backend_subpath = os.path.join("sam.ldb.d",
419 backend_filename)
420 backend_path = self.lp.private_path(backend_subpath)
421 self._test_full_db_lock1(backend_path)
424 def test_full_db_lock1_config(self):
425 basedn = self.samdb.get_config_basedn()
426 backend_filename = "%s.ldb" % basedn.get_casefold()
427 backend_subpath = os.path.join("sam.ldb.d",
428 backend_filename)
429 backend_path = self.lp.private_path(backend_subpath)
430 self._test_full_db_lock1(backend_path)
433 def _test_full_db_lock2(self, backend_path):
434 (r1, w1) = os.pipe()
435 (r2, w2) = os.pipe()
437 pid = os.fork()
438 if pid == 0:
440 # In the child, close the main DB, re-open
441 del(self.samdb)
442 gc.collect()
443 self.samdb = SamDB(session_info=self.session,
444 credentials=self.creds,
445 lp=self.lp)
447 # We need to hold this iterator open to hold the all-record lock.
448 res = self.samdb.search_iterator()
450 os.write(w2, b"start")
451 if (os.read(r1, 7) != b"started"):
452 os._exit(1)
453 os.write(w2, b"add")
454 if (os.read(r1, 5) != b"added"):
455 os._exit(2)
457 # Wait 2 seconds to block prepare_commit() in the child.
458 os.write(w2, b"prepare")
459 time.sleep(2)
461 # Release the locks
462 for l in res:
463 pass
465 if (os.read(r1, 8) != b"prepared"):
466 os._exit(3)
468 os._exit(0)
470 # In the parent, close the main DB, re-open just one DB
471 del(self.samdb)
472 gc.collect()
473 backenddb = ldb.Ldb(backend_path)
475 # We can start the transaction during the search
476 # because both just grab the all-record read lock.
477 self.assertEqual(os.read(r2, 5), b"start")
478 backenddb.transaction_start()
479 os.write(w1, b"started")
481 self.assertEqual(os.read(r2, 3), b"add")
482 backenddb.add({"dn":"@DSDB_LOCK_TEST"})
483 backenddb.delete("@DSDB_LOCK_TEST")
484 os.write(w1, b"added")
486 # Obtain a write lock, this will block until
487 # the child releases the read lock.
488 self.assertEqual(os.read(r2, 7), b"prepare")
489 start = time.time()
490 backenddb.transaction_prepare_commit()
491 end = time.time()
493 try:
494 self.assertGreater(end - start, 1.9)
495 except:
496 raise
497 finally:
498 os.write(w1, b"prepared")
500 # Drop the write lock
501 backenddb.transaction_cancel()
503 (got_pid, status) = os.waitpid(pid, 0)
504 self.assertEqual(got_pid, pid)
505 self.assertTrue(os.WIFEXITED(status))
506 self.assertEqual(os.WEXITSTATUS(status), 0)
508 def test_full_db_lock2(self):
509 basedn = self.samdb.get_default_basedn()
510 backend_filename = "%s.ldb" % basedn.get_casefold()
511 backend_subpath = os.path.join("sam.ldb.d",
512 backend_filename)
513 backend_path = self.lp.private_path(backend_subpath)
514 self._test_full_db_lock2(backend_path)
516 def test_full_db_lock2_config(self):
517 basedn = self.samdb.get_config_basedn()
518 backend_filename = "%s.ldb" % basedn.get_casefold()
519 backend_subpath = os.path.join("sam.ldb.d",
520 backend_filename)
521 backend_path = self.lp.private_path(backend_subpath)
522 self._test_full_db_lock2(backend_path)
524 def test_no_error_on_invalid_control(self):
525 try:
526 res = self.samdb.search(scope=ldb.SCOPE_SUBTREE,
527 base=self.account_dn,
528 attrs=["replPropertyMetaData"],
529 controls=["local_oid:%s:0"
530 % dsdb.DSDB_CONTROL_INVALID_NOT_IMPLEMENTED])
531 except ldb.LdbError as e:
532 self.fail("Should have not raised an exception")
534 def test_error_on_invalid_critical_control(self):
535 try:
536 res = self.samdb.search(scope=ldb.SCOPE_SUBTREE,
537 base=self.account_dn,
538 attrs=["replPropertyMetaData"],
539 controls=["local_oid:%s:1"
540 % dsdb.DSDB_CONTROL_INVALID_NOT_IMPLEMENTED])
541 except ldb.LdbError as e:
542 if e[0] != ldb.ERR_UNSUPPORTED_CRITICAL_EXTENSION:
543 self.fail("Got %s should have got ERR_UNSUPPORTED_CRITICAL_EXTENSION"
544 % e[1])