s4-torture: fix type of enum in various places
[Samba.git] / python / samba / tests / dsdb.py
bloba9f569b6c674958bc38725796f1a4f96fd4220d0
1 # Unix SMB/CIFS implementation. Tests for dsdb
2 # Copyright (C) Matthieu Patou <mat@matws.net> 2010
4 # This program is free software; you can redistribute it and/or modify
5 # it under the terms of the GNU General Public License as published by
6 # the Free Software Foundation; either version 3 of the License, or
7 # (at your option) any later version.
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
14 # You should have received a copy of the GNU General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 """Tests for samba.dsdb."""
20 from samba.credentials import Credentials
21 from samba.samdb import SamDB
22 from samba.auth import system_session
23 from samba.tests import TestCase
24 from samba.ndr import ndr_unpack, ndr_pack
25 from samba.dcerpc import drsblobs
26 from samba import dsdb
27 import ldb
28 import os
29 import samba
30 import gc
31 import time
33 class DsdbTests(TestCase):
35 def setUp(self):
36 super(DsdbTests, self).setUp()
37 self.lp = samba.tests.env_loadparm()
38 self.creds = Credentials()
39 self.creds.guess(self.lp)
40 self.session = system_session()
41 self.samdb = SamDB(session_info=self.session,
42 credentials=self.creds,
43 lp=self.lp)
45 def test_get_oid_from_attrid(self):
46 oid = self.samdb.get_oid_from_attid(591614)
47 self.assertEquals(oid, "1.2.840.113556.1.4.1790")
49 def test_error_replpropertymetadata(self):
50 res = self.samdb.search(expression="cn=Administrator",
51 scope=ldb.SCOPE_SUBTREE,
52 attrs=["replPropertyMetaData"])
53 repl = ndr_unpack(drsblobs.replPropertyMetaDataBlob,
54 str(res[0]["replPropertyMetaData"]))
55 ctr = repl.ctr
56 for o in ctr.array:
57 # Search for Description
58 if o.attid == 13:
59 old_version = o.version
60 o.version = o.version + 1
61 replBlob = ndr_pack(repl)
62 msg = ldb.Message()
63 msg.dn = res[0].dn
64 msg["replPropertyMetaData"] = ldb.MessageElement(replBlob, ldb.FLAG_MOD_REPLACE, "replPropertyMetaData")
65 self.assertRaises(ldb.LdbError, self.samdb.modify, msg, ["local_oid:1.3.6.1.4.1.7165.4.3.14:0"])
67 def test_error_replpropertymetadata_nochange(self):
68 res = self.samdb.search(expression="cn=Administrator",
69 scope=ldb.SCOPE_SUBTREE,
70 attrs=["replPropertyMetaData"])
71 repl = ndr_unpack(drsblobs.replPropertyMetaDataBlob,
72 str(res[0]["replPropertyMetaData"]))
73 replBlob = ndr_pack(repl)
74 msg = ldb.Message()
75 msg.dn = res[0].dn
76 msg["replPropertyMetaData"] = ldb.MessageElement(replBlob, ldb.FLAG_MOD_REPLACE, "replPropertyMetaData")
77 self.assertRaises(ldb.LdbError, self.samdb.modify, msg, ["local_oid:1.3.6.1.4.1.7165.4.3.14:0"])
79 def test_error_replpropertymetadata_allow_sort(self):
80 res = self.samdb.search(expression="cn=Administrator",
81 scope=ldb.SCOPE_SUBTREE,
82 attrs=["replPropertyMetaData"])
83 repl = ndr_unpack(drsblobs.replPropertyMetaDataBlob,
84 str(res[0]["replPropertyMetaData"]))
85 replBlob = ndr_pack(repl)
86 msg = ldb.Message()
87 msg.dn = res[0].dn
88 msg["replPropertyMetaData"] = ldb.MessageElement(replBlob, ldb.FLAG_MOD_REPLACE, "replPropertyMetaData")
89 self.samdb.modify(msg, ["local_oid:1.3.6.1.4.1.7165.4.3.14:0", "local_oid:1.3.6.1.4.1.7165.4.3.25:0"])
91 def test_twoatt_replpropertymetadata(self):
92 res = self.samdb.search(expression="cn=Administrator",
93 scope=ldb.SCOPE_SUBTREE,
94 attrs=["replPropertyMetaData", "uSNChanged"])
95 repl = ndr_unpack(drsblobs.replPropertyMetaDataBlob,
96 str(res[0]["replPropertyMetaData"]))
97 ctr = repl.ctr
98 for o in ctr.array:
99 # Search for Description
100 if o.attid == 13:
101 old_version = o.version
102 o.version = o.version + 1
103 o.local_usn = long(str(res[0]["uSNChanged"])) + 1
104 replBlob = ndr_pack(repl)
105 msg = ldb.Message()
106 msg.dn = res[0].dn
107 msg["replPropertyMetaData"] = ldb.MessageElement(replBlob, ldb.FLAG_MOD_REPLACE, "replPropertyMetaData")
108 msg["description"] = ldb.MessageElement("new val", ldb.FLAG_MOD_REPLACE, "description")
109 self.assertRaises(ldb.LdbError, self.samdb.modify, msg, ["local_oid:1.3.6.1.4.1.7165.4.3.14:0"])
111 def test_set_replpropertymetadata(self):
112 res = self.samdb.search(expression="cn=Administrator",
113 scope=ldb.SCOPE_SUBTREE,
114 attrs=["replPropertyMetaData", "uSNChanged"])
115 repl = ndr_unpack(drsblobs.replPropertyMetaDataBlob,
116 str(res[0]["replPropertyMetaData"]))
117 ctr = repl.ctr
118 for o in ctr.array:
119 # Search for Description
120 if o.attid == 13:
121 old_version = o.version
122 o.version = o.version + 1
123 o.local_usn = long(str(res[0]["uSNChanged"])) + 1
124 o.originating_usn = long(str(res[0]["uSNChanged"])) + 1
125 replBlob = ndr_pack(repl)
126 msg = ldb.Message()
127 msg.dn = res[0].dn
128 msg["replPropertyMetaData"] = ldb.MessageElement(replBlob, ldb.FLAG_MOD_REPLACE, "replPropertyMetaData")
129 self.samdb.modify(msg, ["local_oid:1.3.6.1.4.1.7165.4.3.14:0"])
131 def test_ok_get_attribute_from_attid(self):
132 self.assertEquals(self.samdb.get_attribute_from_attid(13), "description")
134 def test_ko_get_attribute_from_attid(self):
135 self.assertEquals(self.samdb.get_attribute_from_attid(11979), None)
137 def test_get_attribute_replmetadata_version(self):
138 res = self.samdb.search(expression="cn=Administrator",
139 scope=ldb.SCOPE_SUBTREE,
140 attrs=["dn"])
141 self.assertEquals(len(res), 1)
142 dn = str(res[0].dn)
143 self.assertEqual(self.samdb.get_attribute_replmetadata_version(dn, "unicodePwd"), 1)
145 def test_set_attribute_replmetadata_version(self):
146 res = self.samdb.search(expression="cn=Administrator",
147 scope=ldb.SCOPE_SUBTREE,
148 attrs=["dn"])
149 self.assertEquals(len(res), 1)
150 dn = str(res[0].dn)
151 version = self.samdb.get_attribute_replmetadata_version(dn, "description")
152 self.samdb.set_attribute_replmetadata_version(dn, "description", version + 2)
153 self.assertEqual(self.samdb.get_attribute_replmetadata_version(dn, "description"), version + 2)
155 def test_db_lock1(self):
156 basedn = self.samdb.get_default_basedn()
157 (r1, w1) = os.pipe()
159 pid = os.fork()
160 if pid == 0:
161 # In the child, close the main DB, re-open just one DB
162 del(self.samdb)
163 gc.collect()
164 self.samdb = SamDB(session_info=self.session,
165 credentials=self.creds,
166 lp=self.lp)
168 self.samdb.transaction_start()
170 dn = "cn=test_db_lock_user,cn=users," + str(basedn)
171 self.samdb.add({
172 "dn": dn,
173 "objectclass": "user",
175 self.samdb.delete(dn)
177 # Obtain a write lock
178 self.samdb.transaction_prepare_commit()
179 os.write(w1, b"prepared")
180 time.sleep(2)
182 # Drop the write lock
183 self.samdb.transaction_cancel()
184 os._exit(0)
186 self.assertEqual(os.read(r1, 8), b"prepared")
188 start = time.time()
190 # We need to hold this iterator open to hold the all-record lock.
191 res = self.samdb.search_iterator()
193 # This should take at least 2 seconds because the transaction
194 # has a write lock on one backend db open
196 # Release the locks
197 for l in res:
198 pass
200 end = time.time()
201 self.assertGreater(end - start, 1.9)
203 (got_pid, status) = os.waitpid(pid, 0)
204 self.assertEqual(got_pid, pid)
205 self.assertTrue(os.WIFEXITED(status))
206 self.assertEqual(os.WEXITSTATUS(status), 0)
208 def test_db_lock2(self):
209 basedn = self.samdb.get_default_basedn()
210 (r1, w1) = os.pipe()
211 (r2, w2) = os.pipe()
213 pid = os.fork()
214 if pid == 0:
215 # In the child, close the main DB, re-open
216 del(self.samdb)
217 gc.collect()
218 self.samdb = SamDB(session_info=self.session,
219 credentials=self.creds,
220 lp=self.lp)
222 # We need to hold this iterator open to hold the all-record lock.
223 res = self.samdb.search_iterator()
225 os.write(w2, b"start")
226 if (os.read(r1, 7) != b"started"):
227 os._exit(1)
229 os.write(w2, b"add")
230 if (os.read(r1, 5) != b"added"):
231 os._exit(2)
233 # Wait 2 seconds to block prepare_commit() in the child.
234 os.write(w2, b"prepare")
235 time.sleep(2)
237 # Release the locks
238 for l in res:
239 pass
241 if (os.read(r1, 8) != b"prepared"):
242 os._exit(3)
244 os._exit(0)
246 # We can start the transaction during the search
247 # because both just grab the all-record read lock.
248 self.assertEqual(os.read(r2, 5), b"start")
249 self.samdb.transaction_start()
250 os.write(w1, b"started")
252 self.assertEqual(os.read(r2, 3), b"add")
253 dn = "cn=test_db_lock_user,cn=users," + str(basedn)
254 self.samdb.add({
255 "dn": dn,
256 "objectclass": "user",
258 self.samdb.delete(dn)
259 os.write(w1, b"added")
261 # Obtain a write lock, this will block until
262 # the parent releases the read lock.
263 self.assertEqual(os.read(r2, 7), b"prepare")
264 start = time.time()
265 self.samdb.transaction_prepare_commit()
266 end = time.time()
267 try:
268 self.assertGreater(end - start, 1.9)
269 except:
270 raise
271 finally:
272 os.write(w1, b"prepared")
274 # Drop the write lock
275 self.samdb.transaction_cancel()
277 (got_pid, status) = os.waitpid(pid, 0)
278 self.assertEqual(got_pid, pid)
279 self.assertTrue(os.WIFEXITED(status))
280 self.assertEqual(os.WEXITSTATUS(status), 0)
282 def test_db_lock3(self):
283 basedn = self.samdb.get_default_basedn()
284 (r1, w1) = os.pipe()
285 (r2, w2) = os.pipe()
287 pid = os.fork()
288 if pid == 0:
289 # In the child, close the main DB, re-open
290 del(self.samdb)
291 gc.collect()
292 self.samdb = SamDB(session_info=self.session,
293 credentials=self.creds,
294 lp=self.lp)
296 # We need to hold this iterator open to hold the all-record lock.
297 res = self.samdb.search_iterator()
299 os.write(w2, b"start")
300 if (os.read(r1, 7) != b"started"):
301 os._exit(1)
303 os.write(w2, b"add")
304 if (os.read(r1, 5) != b"added"):
305 os._exit(2)
307 # Wait 2 seconds to block prepare_commit() in the child.
308 os.write(w2, b"prepare")
309 time.sleep(2)
311 # Release the locks
312 for l in res:
313 pass
315 if (os.read(r1, 8) != b"prepared"):
316 os._exit(3)
318 os._exit(0)
320 # We can start the transaction during the search
321 # because both just grab the all-record read lock.
322 self.assertEqual(os.read(r2, 5), b"start")
323 self.samdb.transaction_start()
324 os.write(w1, b"started")
326 self.assertEqual(os.read(r2, 3), b"add")
328 # This will end up in the top level db
329 dn = "@DSDB_LOCK_TEST"
330 self.samdb.add({
331 "dn": dn})
332 self.samdb.delete(dn)
333 os.write(w1, b"added")
335 # Obtain a write lock, this will block until
336 # the child releases the read lock.
337 self.assertEqual(os.read(r2, 7), b"prepare")
338 start = time.time()
339 self.samdb.transaction_prepare_commit()
340 end = time.time()
341 self.assertGreater(end - start, 1.9)
342 os.write(w1, b"prepared")
344 # Drop the write lock
345 self.samdb.transaction_cancel()
347 (got_pid, status) = os.waitpid(pid, 0)
348 self.assertTrue(os.WIFEXITED(status))
349 self.assertEqual(os.WEXITSTATUS(status), 0)
350 self.assertEqual(got_pid, pid)
353 def _test_full_db_lock1(self, backend_path):
354 (r1, w1) = os.pipe()
356 pid = os.fork()
357 if pid == 0:
358 # In the child, close the main DB, re-open just one DB
359 del(self.samdb)
360 gc.collect()
362 backenddb = ldb.Ldb(backend_path)
365 backenddb.transaction_start()
367 backenddb.add({"dn":"@DSDB_LOCK_TEST"})
368 backenddb.delete("@DSDB_LOCK_TEST")
370 # Obtain a write lock
371 backenddb.transaction_prepare_commit()
372 os.write(w1, b"prepared")
373 time.sleep(2)
375 # Drop the write lock
376 backenddb.transaction_cancel()
377 os._exit(0)
379 self.assertEqual(os.read(r1, 8), b"prepared")
381 start = time.time()
383 # We need to hold this iterator open to hold the all-record lock.
384 res = self.samdb.search_iterator()
386 # This should take at least 2 seconds because the transaction
387 # has a write lock on one backend db open
389 end = time.time()
390 self.assertGreater(end - start, 1.9)
392 # Release the locks
393 for l in res:
394 pass
396 (got_pid, status) = os.waitpid(pid, 0)
397 self.assertEqual(got_pid, pid)
398 self.assertTrue(os.WIFEXITED(status))
399 self.assertEqual(os.WEXITSTATUS(status), 0)
401 def test_full_db_lock1(self):
402 basedn = self.samdb.get_default_basedn()
403 backend_filename = "%s.ldb" % basedn.get_casefold()
404 backend_subpath = os.path.join("sam.ldb.d",
405 backend_filename)
406 backend_path = self.lp.private_path(backend_subpath)
407 self._test_full_db_lock1(backend_path)
410 def test_full_db_lock1_config(self):
411 basedn = self.samdb.get_config_basedn()
412 backend_filename = "%s.ldb" % basedn.get_casefold()
413 backend_subpath = os.path.join("sam.ldb.d",
414 backend_filename)
415 backend_path = self.lp.private_path(backend_subpath)
416 self._test_full_db_lock1(backend_path)
419 def _test_full_db_lock2(self, backend_path):
420 (r1, w1) = os.pipe()
421 (r2, w2) = os.pipe()
423 pid = os.fork()
424 if pid == 0:
426 # In the child, close the main DB, re-open
427 del(self.samdb)
428 gc.collect()
429 self.samdb = SamDB(session_info=self.session,
430 credentials=self.creds,
431 lp=self.lp)
433 # We need to hold this iterator open to hold the all-record lock.
434 res = self.samdb.search_iterator()
436 os.write(w2, b"start")
437 if (os.read(r1, 7) != b"started"):
438 os._exit(1)
439 os.write(w2, b"add")
440 if (os.read(r1, 5) != b"added"):
441 os._exit(2)
443 # Wait 2 seconds to block prepare_commit() in the child.
444 os.write(w2, b"prepare")
445 time.sleep(2)
447 # Release the locks
448 for l in res:
449 pass
451 if (os.read(r1, 8) != b"prepared"):
452 os._exit(3)
454 os._exit(0)
456 # In the parent, close the main DB, re-open just one DB
457 del(self.samdb)
458 gc.collect()
459 backenddb = ldb.Ldb(backend_path)
461 # We can start the transaction during the search
462 # because both just grab the all-record read lock.
463 self.assertEqual(os.read(r2, 5), b"start")
464 backenddb.transaction_start()
465 os.write(w1, b"started")
467 self.assertEqual(os.read(r2, 3), b"add")
468 backenddb.add({"dn":"@DSDB_LOCK_TEST"})
469 backenddb.delete("@DSDB_LOCK_TEST")
470 os.write(w1, b"added")
472 # Obtain a write lock, this will block until
473 # the child releases the read lock.
474 self.assertEqual(os.read(r2, 7), b"prepare")
475 start = time.time()
476 backenddb.transaction_prepare_commit()
477 end = time.time()
479 try:
480 self.assertGreater(end - start, 1.9)
481 except:
482 raise
483 finally:
484 os.write(w1, b"prepared")
486 # Drop the write lock
487 backenddb.transaction_cancel()
489 (got_pid, status) = os.waitpid(pid, 0)
490 self.assertEqual(got_pid, pid)
491 self.assertTrue(os.WIFEXITED(status))
492 self.assertEqual(os.WEXITSTATUS(status), 0)
494 def test_full_db_lock2(self):
495 basedn = self.samdb.get_default_basedn()
496 backend_filename = "%s.ldb" % basedn.get_casefold()
497 backend_subpath = os.path.join("sam.ldb.d",
498 backend_filename)
499 backend_path = self.lp.private_path(backend_subpath)
500 self._test_full_db_lock2(backend_path)
502 def test_full_db_lock2_config(self):
503 basedn = self.samdb.get_config_basedn()
504 backend_filename = "%s.ldb" % basedn.get_casefold()
505 backend_subpath = os.path.join("sam.ldb.d",
506 backend_filename)
507 backend_path = self.lp.private_path(backend_subpath)
508 self._test_full_db_lock2(backend_path)
510 def test_no_error_on_invalid_control(self):
511 try:
512 res = self.samdb.search(expression="cn=Administrator",
513 scope=ldb.SCOPE_SUBTREE,
514 attrs=["replPropertyMetaData"],
515 controls=["local_oid:%s:0"
516 % dsdb.DSDB_CONTROL_INVALID_NOT_IMPLEMENTED])
517 except ldb.LdbError as e:
518 self.fail("Should have not raised an exception")
520 def test_error_on_invalid_critical_control(self):
521 try:
522 res = self.samdb.search(expression="cn=Administrator",
523 scope=ldb.SCOPE_SUBTREE,
524 attrs=["replPropertyMetaData"],
525 controls=["local_oid:%s:1"
526 % dsdb.DSDB_CONTROL_INVALID_NOT_IMPLEMENTED])
527 except ldb.LdbError as e:
528 if e[0] != ldb.ERR_UNSUPPORTED_CRITICAL_EXTENSION:
529 self.fail("Got %s should have got ERR_UNSUPPORTED_CRITICAL_EXTENSION"
530 % e[1])