dsdb: Add new test adding a record to the top level sam.ldb file
[Samba.git] / python / samba / tests / dsdb.py
blobcfe19093b9c7086a8c1878d2bb5bb5bab495121c
1 # Unix SMB/CIFS implementation. Tests for dsdb
2 # Copyright (C) Matthieu Patou <mat@matws.net> 2010
4 # This program is free software; you can redistribute it and/or modify
5 # it under the terms of the GNU General Public License as published by
6 # the Free Software Foundation; either version 3 of the License, or
7 # (at your option) any later version.
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
14 # You should have received a copy of the GNU General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 """Tests for samba.dsdb."""
20 from samba.credentials import Credentials
21 from samba.samdb import SamDB
22 from samba.auth import system_session
23 from samba.tests import TestCase
24 from samba.ndr import ndr_unpack, ndr_pack
25 from samba.dcerpc import drsblobs
26 import ldb
27 import os
28 import samba
29 import gc
30 import time
32 class DsdbTests(TestCase):
34 def setUp(self):
35 super(DsdbTests, self).setUp()
36 self.lp = samba.tests.env_loadparm()
37 self.creds = Credentials()
38 self.creds.guess(self.lp)
39 self.session = system_session()
40 self.samdb = SamDB(session_info=self.session,
41 credentials=self.creds,
42 lp=self.lp)
44 def test_get_oid_from_attrid(self):
45 oid = self.samdb.get_oid_from_attid(591614)
46 self.assertEquals(oid, "1.2.840.113556.1.4.1790")
48 def test_error_replpropertymetadata(self):
49 res = self.samdb.search(expression="cn=Administrator",
50 scope=ldb.SCOPE_SUBTREE,
51 attrs=["replPropertyMetaData"])
52 repl = ndr_unpack(drsblobs.replPropertyMetaDataBlob,
53 str(res[0]["replPropertyMetaData"]))
54 ctr = repl.ctr
55 for o in ctr.array:
56 # Search for Description
57 if o.attid == 13:
58 old_version = o.version
59 o.version = o.version + 1
60 replBlob = ndr_pack(repl)
61 msg = ldb.Message()
62 msg.dn = res[0].dn
63 msg["replPropertyMetaData"] = ldb.MessageElement(replBlob, ldb.FLAG_MOD_REPLACE, "replPropertyMetaData")
64 self.assertRaises(ldb.LdbError, self.samdb.modify, msg, ["local_oid:1.3.6.1.4.1.7165.4.3.14:0"])
66 def test_error_replpropertymetadata_nochange(self):
67 res = self.samdb.search(expression="cn=Administrator",
68 scope=ldb.SCOPE_SUBTREE,
69 attrs=["replPropertyMetaData"])
70 repl = ndr_unpack(drsblobs.replPropertyMetaDataBlob,
71 str(res[0]["replPropertyMetaData"]))
72 replBlob = ndr_pack(repl)
73 msg = ldb.Message()
74 msg.dn = res[0].dn
75 msg["replPropertyMetaData"] = ldb.MessageElement(replBlob, ldb.FLAG_MOD_REPLACE, "replPropertyMetaData")
76 self.assertRaises(ldb.LdbError, self.samdb.modify, msg, ["local_oid:1.3.6.1.4.1.7165.4.3.14:0"])
78 def test_error_replpropertymetadata_allow_sort(self):
79 res = self.samdb.search(expression="cn=Administrator",
80 scope=ldb.SCOPE_SUBTREE,
81 attrs=["replPropertyMetaData"])
82 repl = ndr_unpack(drsblobs.replPropertyMetaDataBlob,
83 str(res[0]["replPropertyMetaData"]))
84 replBlob = ndr_pack(repl)
85 msg = ldb.Message()
86 msg.dn = res[0].dn
87 msg["replPropertyMetaData"] = ldb.MessageElement(replBlob, ldb.FLAG_MOD_REPLACE, "replPropertyMetaData")
88 self.samdb.modify(msg, ["local_oid:1.3.6.1.4.1.7165.4.3.14:0", "local_oid:1.3.6.1.4.1.7165.4.3.25:0"])
90 def test_twoatt_replpropertymetadata(self):
91 res = self.samdb.search(expression="cn=Administrator",
92 scope=ldb.SCOPE_SUBTREE,
93 attrs=["replPropertyMetaData", "uSNChanged"])
94 repl = ndr_unpack(drsblobs.replPropertyMetaDataBlob,
95 str(res[0]["replPropertyMetaData"]))
96 ctr = repl.ctr
97 for o in ctr.array:
98 # Search for Description
99 if o.attid == 13:
100 old_version = o.version
101 o.version = o.version + 1
102 o.local_usn = long(str(res[0]["uSNChanged"])) + 1
103 replBlob = ndr_pack(repl)
104 msg = ldb.Message()
105 msg.dn = res[0].dn
106 msg["replPropertyMetaData"] = ldb.MessageElement(replBlob, ldb.FLAG_MOD_REPLACE, "replPropertyMetaData")
107 msg["description"] = ldb.MessageElement("new val", ldb.FLAG_MOD_REPLACE, "description")
108 self.assertRaises(ldb.LdbError, self.samdb.modify, msg, ["local_oid:1.3.6.1.4.1.7165.4.3.14:0"])
110 def test_set_replpropertymetadata(self):
111 res = self.samdb.search(expression="cn=Administrator",
112 scope=ldb.SCOPE_SUBTREE,
113 attrs=["replPropertyMetaData", "uSNChanged"])
114 repl = ndr_unpack(drsblobs.replPropertyMetaDataBlob,
115 str(res[0]["replPropertyMetaData"]))
116 ctr = repl.ctr
117 for o in ctr.array:
118 # Search for Description
119 if o.attid == 13:
120 old_version = o.version
121 o.version = o.version + 1
122 o.local_usn = long(str(res[0]["uSNChanged"])) + 1
123 o.originating_usn = long(str(res[0]["uSNChanged"])) + 1
124 replBlob = ndr_pack(repl)
125 msg = ldb.Message()
126 msg.dn = res[0].dn
127 msg["replPropertyMetaData"] = ldb.MessageElement(replBlob, ldb.FLAG_MOD_REPLACE, "replPropertyMetaData")
128 self.samdb.modify(msg, ["local_oid:1.3.6.1.4.1.7165.4.3.14:0"])
130 def test_ok_get_attribute_from_attid(self):
131 self.assertEquals(self.samdb.get_attribute_from_attid(13), "description")
133 def test_ko_get_attribute_from_attid(self):
134 self.assertEquals(self.samdb.get_attribute_from_attid(11979), None)
136 def test_get_attribute_replmetadata_version(self):
137 res = self.samdb.search(expression="cn=Administrator",
138 scope=ldb.SCOPE_SUBTREE,
139 attrs=["dn"])
140 self.assertEquals(len(res), 1)
141 dn = str(res[0].dn)
142 self.assertEqual(self.samdb.get_attribute_replmetadata_version(dn, "unicodePwd"), 1)
144 def test_set_attribute_replmetadata_version(self):
145 res = self.samdb.search(expression="cn=Administrator",
146 scope=ldb.SCOPE_SUBTREE,
147 attrs=["dn"])
148 self.assertEquals(len(res), 1)
149 dn = str(res[0].dn)
150 version = self.samdb.get_attribute_replmetadata_version(dn, "description")
151 self.samdb.set_attribute_replmetadata_version(dn, "description", version + 2)
152 self.assertEqual(self.samdb.get_attribute_replmetadata_version(dn, "description"), version + 2)
154 def test_db_lock1(self):
155 basedn = self.samdb.get_default_basedn()
156 (r1, w1) = os.pipe()
158 pid = os.fork()
159 if pid == 0:
160 # In the child, close the main DB, re-open just one DB
161 del(self.samdb)
162 gc.collect()
163 self.samdb = SamDB(session_info=self.session,
164 credentials=self.creds,
165 lp=self.lp)
167 self.samdb.transaction_start()
169 dn = "cn=test_db_lock_user,cn=users," + str(basedn)
170 self.samdb.add({
171 "dn": dn,
172 "objectclass": "user",
174 self.samdb.delete(dn)
176 # Obtain a write lock
177 self.samdb.transaction_prepare_commit()
178 os.write(w1, b"prepared")
179 time.sleep(2)
181 # Drop the write lock
182 self.samdb.transaction_cancel()
183 os._exit(0)
185 self.assertEqual(os.read(r1, 8), b"prepared")
187 start = time.time()
189 # We need to hold this iterator open to hold the all-record lock.
190 res = self.samdb.search_iterator()
192 # This should take at least 2 seconds because the transaction
193 # has a write lock on one backend db open
195 # Release the locks
196 for l in res:
197 pass
199 end = time.time()
200 self.assertGreater(end - start, 1.9)
202 (got_pid, status) = os.waitpid(pid, 0)
203 self.assertEqual(got_pid, pid)
204 self.assertTrue(os.WIFEXITED(status))
205 self.assertEqual(os.WEXITSTATUS(status), 0)
207 def test_db_lock2(self):
208 basedn = self.samdb.get_default_basedn()
209 (r1, w1) = os.pipe()
210 (r2, w2) = os.pipe()
212 pid = os.fork()
213 if pid == 0:
214 # In the child, close the main DB, re-open
215 del(self.samdb)
216 gc.collect()
217 self.samdb = SamDB(session_info=self.session,
218 credentials=self.creds,
219 lp=self.lp)
221 # We need to hold this iterator open to hold the all-record lock.
222 res = self.samdb.search_iterator()
224 os.write(w2, b"start")
225 if (os.read(r1, 7) != b"started"):
226 os._exit(1)
228 os.write(w2, b"add")
229 if (os.read(r1, 5) != b"added"):
230 os._exit(2)
232 # Wait 2 seconds to block prepare_commit() in the child.
233 os.write(w2, b"prepare")
234 time.sleep(2)
236 # Release the locks
237 for l in res:
238 pass
240 if (os.read(r1, 8) != b"prepared"):
241 os._exit(3)
243 os._exit(0)
245 # We can start the transaction during the search
246 # because both just grab the all-record read lock.
247 self.assertEqual(os.read(r2, 5), b"start")
248 self.samdb.transaction_start()
249 os.write(w1, b"started")
251 self.assertEqual(os.read(r2, 3), b"add")
252 dn = "cn=test_db_lock_user,cn=users," + str(basedn)
253 self.samdb.add({
254 "dn": dn,
255 "objectclass": "user",
257 self.samdb.delete(dn)
258 os.write(w1, b"added")
260 # Obtain a write lock, this will block until
261 # the parent releases the read lock.
262 self.assertEqual(os.read(r2, 7), b"prepare")
263 start = time.time()
264 self.samdb.transaction_prepare_commit()
265 end = time.time()
266 try:
267 self.assertGreater(end - start, 1.9)
268 except:
269 raise
270 finally:
271 os.write(w1, b"prepared")
273 # Drop the write lock
274 self.samdb.transaction_cancel()
276 (got_pid, status) = os.waitpid(pid, 0)
277 self.assertEqual(got_pid, pid)
278 self.assertTrue(os.WIFEXITED(status))
279 self.assertEqual(os.WEXITSTATUS(status), 0)
281 def test_db_lock3(self):
282 basedn = self.samdb.get_default_basedn()
283 (r1, w1) = os.pipe()
284 (r2, w2) = os.pipe()
286 pid = os.fork()
287 if pid == 0:
288 # In the child, close the main DB, re-open
289 del(self.samdb)
290 gc.collect()
291 self.samdb = SamDB(session_info=self.session,
292 credentials=self.creds,
293 lp=self.lp)
295 # We need to hold this iterator open to hold the all-record lock.
296 res = self.samdb.search_iterator()
298 os.write(w2, b"start")
299 if (os.read(r1, 7) != b"started"):
300 os._exit(1)
302 os.write(w2, b"add")
303 if (os.read(r1, 5) != b"added"):
304 os._exit(2)
306 # Wait 2 seconds to block prepare_commit() in the child.
307 os.write(w2, b"prepare")
308 time.sleep(2)
310 # Release the locks
311 for l in res:
312 pass
314 if (os.read(r1, 8) != b"prepared"):
315 os._exit(3)
317 os._exit(0)
319 # We can start the transaction during the search
320 # because both just grab the all-record read lock.
321 self.assertEqual(os.read(r2, 5), b"start")
322 self.samdb.transaction_start()
323 os.write(w1, b"started")
325 self.assertEqual(os.read(r2, 3), b"add")
327 # This will end up in the top level db
328 dn = "@DSDB_LOCK_TEST"
329 self.samdb.add({
330 "dn": dn})
331 self.samdb.delete(dn)
332 os.write(w1, b"added")
334 # Obtain a write lock, this will block until
335 # the child releases the read lock.
336 self.assertEqual(os.read(r2, 7), b"prepare")
337 start = time.time()
338 self.samdb.transaction_prepare_commit()
339 end = time.time()
340 self.assertGreater(end - start, 1.9)
341 os.write(w1, b"prepared")
343 # Drop the write lock
344 self.samdb.transaction_cancel()
346 (got_pid, status) = os.waitpid(pid, 0)
347 self.assertTrue(os.WIFEXITED(status))
348 self.assertEqual(os.WEXITSTATUS(status), 0)
349 self.assertEqual(got_pid, pid)
352 def test_full_db_lock1(self):
353 basedn = self.samdb.get_default_basedn()
354 backend_filename = "%s.ldb" % basedn.get_casefold()
355 backend_subpath = os.path.join("sam.ldb.d",
356 backend_filename)
357 backend_path = self.lp.private_path(backend_subpath)
358 (r1, w1) = os.pipe()
360 pid = os.fork()
361 if pid == 0:
362 # In the child, close the main DB, re-open just one DB
363 del(self.samdb)
364 gc.collect()
366 backenddb = ldb.Ldb(backend_path)
369 backenddb.transaction_start()
371 backenddb.add({"dn":"@DSDB_LOCK_TEST"})
372 backenddb.delete("@DSDB_LOCK_TEST")
374 # Obtain a write lock
375 backenddb.transaction_prepare_commit()
376 os.write(w1, b"prepared")
377 time.sleep(2)
379 # Drop the write lock
380 backenddb.transaction_cancel()
381 os._exit(0)
383 self.assertEqual(os.read(r1, 8), b"prepared")
385 start = time.time()
387 # We need to hold this iterator open to hold the all-record lock.
388 res = self.samdb.search_iterator()
390 # This should take at least 2 seconds because the transaction
391 # has a write lock on one backend db open
393 end = time.time()
394 self.assertGreater(end - start, 1.9)
396 # Release the locks
397 for l in res:
398 pass
400 (got_pid, status) = os.waitpid(pid, 0)
401 self.assertEqual(got_pid, pid)
402 self.assertTrue(os.WIFEXITED(status))
403 self.assertEqual(os.WEXITSTATUS(status), 0)
405 def test_full_db_lock2(self):
406 basedn = self.samdb.get_default_basedn()
407 backend_filename = "%s.ldb" % basedn.get_casefold()
408 backend_subpath = os.path.join("sam.ldb.d",
409 backend_filename)
410 backend_path = self.lp.private_path(backend_subpath)
411 (r1, w1) = os.pipe()
412 (r2, w2) = os.pipe()
414 pid = os.fork()
415 if pid == 0:
417 # In the child, close the main DB, re-open
418 del(self.samdb)
419 gc.collect()
420 self.samdb = SamDB(session_info=self.session,
421 credentials=self.creds,
422 lp=self.lp)
424 # We need to hold this iterator open to hold the all-record lock.
425 res = self.samdb.search_iterator()
427 os.write(w2, b"start")
428 if (os.read(r1, 7) != b"started"):
429 os._exit(1)
430 os.write(w2, b"add")
431 if (os.read(r1, 5) != b"added"):
432 os._exit(2)
434 # Wait 2 seconds to block prepare_commit() in the child.
435 os.write(w2, b"prepare")
436 time.sleep(2)
438 # Release the locks
439 for l in res:
440 pass
442 if (os.read(r1, 8) != b"prepared"):
443 os._exit(3)
445 os._exit(0)
447 # In the parent, close the main DB, re-open just one DB
448 del(self.samdb)
449 gc.collect()
450 backenddb = ldb.Ldb(backend_path)
452 # We can start the transaction during the search
453 # because both just grab the all-record read lock.
454 self.assertEqual(os.read(r2, 5), b"start")
455 backenddb.transaction_start()
456 os.write(w1, b"started")
458 self.assertEqual(os.read(r2, 3), b"add")
459 backenddb.add({"dn":"@DSDB_LOCK_TEST"})
460 backenddb.delete("@DSDB_LOCK_TEST")
461 os.write(w1, b"added")
463 # Obtain a write lock, this will block until
464 # the child releases the read lock.
465 self.assertEqual(os.read(r2, 7), b"prepare")
466 start = time.time()
467 backenddb.transaction_prepare_commit()
468 end = time.time()
470 try:
471 self.assertGreater(end - start, 1.9)
472 except:
473 raise
474 finally:
475 os.write(w1, b"prepared")
477 # Drop the write lock
478 backenddb.transaction_cancel()
480 (got_pid, status) = os.waitpid(pid, 0)
481 self.assertEqual(got_pid, pid)
482 self.assertTrue(os.WIFEXITED(status))
483 self.assertEqual(os.WEXITSTATUS(status), 0)