CVE-2018-1140 ldb: Add tests for search add and rename with a bad dn= DN
[Samba.git] / lib / ldb / tests / python / api.py
blob48fac887f1cb437d1eeb397d43994731095fc697
1 #!/usr/bin/env python
2 # Simple tests for the ldb python bindings.
3 # Copyright (C) 2007 Jelmer Vernooij <jelmer@samba.org>
5 import os
6 from unittest import TestCase
7 import sys
8 import gc
9 import time
10 import ldb
11 import shutil
13 PY3 = sys.version_info > (3, 0)
15 TDB_PREFIX = "tdb://"
16 MDB_PREFIX = "mdb://"
19 def tempdir():
20 import tempfile
21 try:
22 dir_prefix = os.path.join(os.environ["SELFTEST_PREFIX"], "tmp")
23 except KeyError:
24 dir_prefix = None
25 return tempfile.mkdtemp(dir=dir_prefix)
28 class NoContextTests(TestCase):
30 def test_valid_attr_name(self):
31 self.assertTrue(ldb.valid_attr_name("foo"))
32 self.assertFalse(ldb.valid_attr_name("24foo"))
34 def test_timestring(self):
35 self.assertEqual("19700101000000.0Z", ldb.timestring(0))
36 self.assertEqual("20071119191012.0Z", ldb.timestring(1195499412))
38 def test_string_to_time(self):
39 self.assertEqual(0, ldb.string_to_time("19700101000000.0Z"))
40 self.assertEqual(1195499412, ldb.string_to_time("20071119191012.0Z"))
42 def test_binary_encode(self):
43 encoded = ldb.binary_encode(b'test\\x')
44 decoded = ldb.binary_decode(encoded)
45 self.assertEqual(decoded, b'test\\x')
47 encoded2 = ldb.binary_encode('test\\x')
48 self.assertEqual(encoded2, encoded)
51 class LdbBaseTest(TestCase):
52 def setUp(self):
53 super(LdbBaseTest, self).setUp()
54 try:
55 if self.prefix is None:
56 self.prefix = TDB_PREFIX
57 except AttributeError:
58 self.prefix = TDB_PREFIX
60 def tearDown(self):
61 super(LdbBaseTest, self).tearDown()
63 def url(self):
64 return self.prefix + self.filename
66 def flags(self):
67 if self.prefix == MDB_PREFIX:
68 return ldb.FLG_NOSYNC
69 else:
70 return 0
73 class SimpleLdb(LdbBaseTest):
75 def setUp(self):
76 super(SimpleLdb, self).setUp()
77 self.testdir = tempdir()
78 self.filename = os.path.join(self.testdir, "test.ldb")
79 self.ldb = ldb.Ldb(self.url(), flags=self.flags())
81 def tearDown(self):
82 shutil.rmtree(self.testdir)
83 super(SimpleLdb, self).tearDown()
84 # Ensure the LDB is closed now, so we close the FD
85 del(self.ldb)
87 def test_connect(self):
88 ldb.Ldb(self.url(), flags=self.flags())
90 def test_connect_none(self):
91 ldb.Ldb()
93 def test_connect_later(self):
94 x = ldb.Ldb()
95 x.connect(self.url(), flags=self.flags())
97 def test_repr(self):
98 x = ldb.Ldb()
99 self.assertTrue(repr(x).startswith("<ldb connection"))
101 def test_set_create_perms(self):
102 x = ldb.Ldb()
103 x.set_create_perms(0o600)
105 def test_modules_none(self):
106 x = ldb.Ldb()
107 self.assertEqual([], x.modules())
109 def test_modules_tdb(self):
110 x = ldb.Ldb(self.url(), flags=self.flags())
111 self.assertEqual("[<ldb module 'tdb'>]", repr(x.modules()))
113 def test_firstmodule_none(self):
114 x = ldb.Ldb()
115 self.assertEqual(x.firstmodule, None)
117 def test_firstmodule_tdb(self):
118 x = ldb.Ldb(self.url(), flags=self.flags())
119 mod = x.firstmodule
120 self.assertEqual(repr(mod), "<ldb module 'tdb'>")
122 def test_search(self):
123 l = ldb.Ldb(self.url(), flags=self.flags())
124 self.assertEqual(len(l.search()), 0)
126 def test_search_controls(self):
127 l = ldb.Ldb(self.url(), flags=self.flags())
128 self.assertEqual(len(l.search(controls=["paged_results:0:5"])), 0)
130 def test_search_attrs(self):
131 l = ldb.Ldb(self.url(), flags=self.flags())
132 self.assertEqual(len(l.search(ldb.Dn(l, ""), ldb.SCOPE_SUBTREE, "(dc=*)", ["dc"])), 0)
134 def test_search_string_dn(self):
135 l = ldb.Ldb(self.url(), flags=self.flags())
136 self.assertEqual(len(l.search("", ldb.SCOPE_SUBTREE, "(dc=*)", ["dc"])), 0)
138 def test_search_attr_string(self):
139 l = ldb.Ldb(self.url(), flags=self.flags())
140 self.assertRaises(TypeError, l.search, attrs="dc")
141 self.assertRaises(TypeError, l.search, attrs=b"dc")
143 def test_opaque(self):
144 l = ldb.Ldb(self.url(), flags=self.flags())
145 l.set_opaque("my_opaque", l)
146 self.assertTrue(l.get_opaque("my_opaque") is not None)
147 self.assertEqual(None, l.get_opaque("unknown"))
149 def test_search_scope_base_empty_db(self):
150 l = ldb.Ldb(self.url(), flags=self.flags())
151 self.assertEqual(len(l.search(ldb.Dn(l, "dc=foo1"),
152 ldb.SCOPE_BASE)), 0)
154 def test_search_scope_onelevel_empty_db(self):
155 l = ldb.Ldb(self.url(), flags=self.flags())
156 self.assertEqual(len(l.search(ldb.Dn(l, "dc=foo1"),
157 ldb.SCOPE_ONELEVEL)), 0)
159 def test_delete(self):
160 l = ldb.Ldb(self.url(), flags=self.flags())
161 self.assertRaises(ldb.LdbError, lambda: l.delete(ldb.Dn(l, "dc=foo2")))
163 def test_delete_w_unhandled_ctrl(self):
164 l = ldb.Ldb(self.url(), flags=self.flags())
165 m = ldb.Message()
166 m.dn = ldb.Dn(l, "dc=foo1")
167 m["b"] = [b"a"]
168 l.add(m)
169 self.assertRaises(ldb.LdbError, lambda: l.delete(m.dn, ["search_options:1:2"]))
170 l.delete(m.dn)
172 def test_contains(self):
173 name = self.url()
174 l = ldb.Ldb(name, flags=self.flags())
175 self.assertFalse(ldb.Dn(l, "dc=foo3") in l)
176 l = ldb.Ldb(name, flags=self.flags())
177 m = ldb.Message()
178 m.dn = ldb.Dn(l, "dc=foo3")
179 m["b"] = ["a"]
180 l.add(m)
181 try:
182 self.assertTrue(ldb.Dn(l, "dc=foo3") in l)
183 self.assertFalse(ldb.Dn(l, "dc=foo4") in l)
184 finally:
185 l.delete(m.dn)
187 def test_get_config_basedn(self):
188 l = ldb.Ldb(self.url(), flags=self.flags())
189 self.assertEqual(None, l.get_config_basedn())
191 def test_get_root_basedn(self):
192 l = ldb.Ldb(self.url(), flags=self.flags())
193 self.assertEqual(None, l.get_root_basedn())
195 def test_get_schema_basedn(self):
196 l = ldb.Ldb(self.url(), flags=self.flags())
197 self.assertEqual(None, l.get_schema_basedn())
199 def test_get_default_basedn(self):
200 l = ldb.Ldb(self.url(), flags=self.flags())
201 self.assertEqual(None, l.get_default_basedn())
203 def test_add(self):
204 l = ldb.Ldb(self.url(), flags=self.flags())
205 m = ldb.Message()
206 m.dn = ldb.Dn(l, "dc=foo4")
207 m["bla"] = b"bla"
208 self.assertEqual(len(l.search()), 0)
209 l.add(m)
210 try:
211 self.assertEqual(len(l.search()), 1)
212 finally:
213 l.delete(ldb.Dn(l, "dc=foo4"))
215 def test_search_iterator(self):
216 l = ldb.Ldb(self.url(), flags=self.flags())
217 s = l.search_iterator()
218 s.abandon()
219 try:
220 for me in s:
221 self.fail()
222 self.fail()
223 except RuntimeError as re:
224 pass
225 try:
226 s.abandon()
227 self.fail()
228 except RuntimeError as re:
229 pass
230 try:
231 s.result()
232 self.fail()
233 except RuntimeError as re:
234 pass
236 s = l.search_iterator()
237 count = 0
238 for me in s:
239 self.assertTrue(isinstance(me, ldb.Message))
240 count += 1
241 r = s.result()
242 self.assertEqual(len(r), 0)
243 self.assertEqual(count, 0)
245 m1 = ldb.Message()
246 m1.dn = ldb.Dn(l, "dc=foo4")
247 m1["bla"] = b"bla"
248 l.add(m1)
249 try:
250 s = l.search_iterator()
251 msgs = []
252 for me in s:
253 self.assertTrue(isinstance(me, ldb.Message))
254 count += 1
255 msgs.append(me)
256 r = s.result()
257 self.assertEqual(len(r), 0)
258 self.assertEqual(len(msgs), 1)
259 self.assertEqual(msgs[0].dn, m1.dn)
261 m2 = ldb.Message()
262 m2.dn = ldb.Dn(l, "dc=foo5")
263 m2["bla"] = b"bla"
264 l.add(m2)
266 s = l.search_iterator()
267 msgs = []
268 for me in s:
269 self.assertTrue(isinstance(me, ldb.Message))
270 count += 1
271 msgs.append(me)
272 r = s.result()
273 self.assertEqual(len(r), 0)
274 self.assertEqual(len(msgs), 2)
275 if msgs[0].dn == m1.dn:
276 self.assertEqual(msgs[0].dn, m1.dn)
277 self.assertEqual(msgs[1].dn, m2.dn)
278 else:
279 self.assertEqual(msgs[0].dn, m2.dn)
280 self.assertEqual(msgs[1].dn, m1.dn)
282 s = l.search_iterator()
283 msgs = []
284 for me in s:
285 self.assertTrue(isinstance(me, ldb.Message))
286 count += 1
287 msgs.append(me)
288 break
289 try:
290 s.result()
291 self.fail()
292 except RuntimeError as re:
293 pass
294 for me in s:
295 self.assertTrue(isinstance(me, ldb.Message))
296 count += 1
297 msgs.append(me)
298 break
299 for me in s:
300 self.fail()
302 r = s.result()
303 self.assertEqual(len(r), 0)
304 self.assertEqual(len(msgs), 2)
305 if msgs[0].dn == m1.dn:
306 self.assertEqual(msgs[0].dn, m1.dn)
307 self.assertEqual(msgs[1].dn, m2.dn)
308 else:
309 self.assertEqual(msgs[0].dn, m2.dn)
310 self.assertEqual(msgs[1].dn, m1.dn)
311 finally:
312 l.delete(ldb.Dn(l, "dc=foo4"))
313 l.delete(ldb.Dn(l, "dc=foo5"))
315 def test_add_text(self):
316 l = ldb.Ldb(self.url(), flags=self.flags())
317 m = ldb.Message()
318 m.dn = ldb.Dn(l, "dc=foo4")
319 m["bla"] = "bla"
320 self.assertEqual(len(l.search()), 0)
321 l.add(m)
322 try:
323 self.assertEqual(len(l.search()), 1)
324 finally:
325 l.delete(ldb.Dn(l, "dc=foo4"))
327 def test_add_w_unhandled_ctrl(self):
328 l = ldb.Ldb(self.url(), flags=self.flags())
329 m = ldb.Message()
330 m.dn = ldb.Dn(l, "dc=foo4")
331 m["bla"] = b"bla"
332 self.assertEqual(len(l.search()), 0)
333 self.assertRaises(ldb.LdbError, lambda: l.add(m,["search_options:1:2"]))
335 def test_add_dict(self):
336 l = ldb.Ldb(self.url(), flags=self.flags())
337 m = {"dn": ldb.Dn(l, "dc=foo5"),
338 "bla": b"bla"}
339 self.assertEqual(len(l.search()), 0)
340 l.add(m)
341 try:
342 self.assertEqual(len(l.search()), 1)
343 finally:
344 l.delete(ldb.Dn(l, "dc=foo5"))
346 def test_add_dict_text(self):
347 l = ldb.Ldb(self.url(), flags=self.flags())
348 m = {"dn": ldb.Dn(l, "dc=foo5"),
349 "bla": "bla"}
350 self.assertEqual(len(l.search()), 0)
351 l.add(m)
352 try:
353 self.assertEqual(len(l.search()), 1)
354 finally:
355 l.delete(ldb.Dn(l, "dc=foo5"))
357 def test_add_dict_string_dn(self):
358 l = ldb.Ldb(self.url(), flags=self.flags())
359 m = {"dn": "dc=foo6", "bla": b"bla"}
360 self.assertEqual(len(l.search()), 0)
361 l.add(m)
362 try:
363 self.assertEqual(len(l.search()), 1)
364 finally:
365 l.delete(ldb.Dn(l, "dc=foo6"))
367 def test_add_dict_bytes_dn(self):
368 l = ldb.Ldb(self.url(), flags=self.flags())
369 m = {"dn": b"dc=foo6", "bla": b"bla"}
370 self.assertEqual(len(l.search()), 0)
371 l.add(m)
372 try:
373 self.assertEqual(len(l.search()), 1)
374 finally:
375 l.delete(ldb.Dn(l, "dc=foo6"))
377 def test_rename(self):
378 l = ldb.Ldb(self.url(), flags=self.flags())
379 m = ldb.Message()
380 m.dn = ldb.Dn(l, "dc=foo7")
381 m["bla"] = b"bla"
382 self.assertEqual(len(l.search()), 0)
383 l.add(m)
384 try:
385 l.rename(ldb.Dn(l, "dc=foo7"), ldb.Dn(l, "dc=bar"))
386 self.assertEqual(len(l.search()), 1)
387 finally:
388 l.delete(ldb.Dn(l, "dc=bar"))
390 def test_rename_string_dns(self):
391 l = ldb.Ldb(self.url(), flags=self.flags())
392 m = ldb.Message()
393 m.dn = ldb.Dn(l, "dc=foo8")
394 m["bla"] = b"bla"
395 self.assertEqual(len(l.search()), 0)
396 l.add(m)
397 self.assertEqual(len(l.search()), 1)
398 try:
399 l.rename("dc=foo8", "dc=bar")
400 self.assertEqual(len(l.search()), 1)
401 finally:
402 l.delete(ldb.Dn(l, "dc=bar"))
404 def test_rename_bad_string_dns(self):
405 l = ldb.Ldb(self.url(), flags=self.flags())
406 m = ldb.Message()
407 m.dn = ldb.Dn(l, "dc=foo8")
408 m["bla"] = b"bla"
409 m["objectUUID"] = b"0123456789abcdef"
410 self.assertEqual(len(l.search()), 0)
411 l.add(m)
412 self.assertEqual(len(l.search()), 1)
413 self.assertRaises(ldb.LdbError,lambda: l.rename("dcXfoo8", "dc=bar"))
414 self.assertRaises(ldb.LdbError,lambda: l.rename("dc=foo8", "dcXbar"))
415 l.delete(ldb.Dn(l, "dc=foo8"))
417 def test_empty_dn(self):
418 l = ldb.Ldb(self.url(), flags=self.flags())
419 self.assertEqual(0, len(l.search()))
420 m = ldb.Message()
421 m.dn = ldb.Dn(l, "dc=empty")
422 l.add(m)
423 rm = l.search()
424 self.assertEqual(1, len(rm))
425 self.assertEqual(set(["dn", "distinguishedName"]), set(rm[0].keys()))
427 rm = l.search(m.dn)
428 self.assertEqual(1, len(rm))
429 self.assertEqual(set(["dn", "distinguishedName"]), set(rm[0].keys()))
430 rm = l.search(m.dn, attrs=["blah"])
431 self.assertEqual(1, len(rm))
432 self.assertEqual(0, len(rm[0]))
434 def test_modify_delete(self):
435 l = ldb.Ldb(self.url(), flags=self.flags())
436 m = ldb.Message()
437 m.dn = ldb.Dn(l, "dc=modifydelete")
438 m["bla"] = [b"1234"]
439 l.add(m)
440 rm = l.search(m.dn)[0]
441 self.assertEqual([b"1234"], list(rm["bla"]))
442 try:
443 m = ldb.Message()
444 m.dn = ldb.Dn(l, "dc=modifydelete")
445 m["bla"] = ldb.MessageElement([], ldb.FLAG_MOD_DELETE, "bla")
446 self.assertEqual(ldb.FLAG_MOD_DELETE, m["bla"].flags())
447 l.modify(m)
448 rm = l.search(m.dn)
449 self.assertEqual(1, len(rm))
450 self.assertEqual(set(["dn", "distinguishedName"]), set(rm[0].keys()))
451 rm = l.search(m.dn, attrs=["bla"])
452 self.assertEqual(1, len(rm))
453 self.assertEqual(0, len(rm[0]))
454 finally:
455 l.delete(ldb.Dn(l, "dc=modifydelete"))
457 def test_modify_delete_text(self):
458 l = ldb.Ldb(self.url(), flags=self.flags())
459 m = ldb.Message()
460 m.dn = ldb.Dn(l, "dc=modifydelete")
461 m.text["bla"] = ["1234"]
462 l.add(m)
463 rm = l.search(m.dn)[0]
464 self.assertEqual(["1234"], list(rm.text["bla"]))
465 try:
466 m = ldb.Message()
467 m.dn = ldb.Dn(l, "dc=modifydelete")
468 m["bla"] = ldb.MessageElement([], ldb.FLAG_MOD_DELETE, "bla")
469 self.assertEqual(ldb.FLAG_MOD_DELETE, m["bla"].flags())
470 l.modify(m)
471 rm = l.search(m.dn)
472 self.assertEqual(1, len(rm))
473 self.assertEqual(set(["dn", "distinguishedName"]), set(rm[0].keys()))
474 rm = l.search(m.dn, attrs=["bla"])
475 self.assertEqual(1, len(rm))
476 self.assertEqual(0, len(rm[0]))
477 finally:
478 l.delete(ldb.Dn(l, "dc=modifydelete"))
480 def test_modify_add(self):
481 l = ldb.Ldb(self.url(), flags=self.flags())
482 m = ldb.Message()
483 m.dn = ldb.Dn(l, "dc=add")
484 m["bla"] = [b"1234"]
485 l.add(m)
486 try:
487 m = ldb.Message()
488 m.dn = ldb.Dn(l, "dc=add")
489 m["bla"] = ldb.MessageElement([b"456"], ldb.FLAG_MOD_ADD, "bla")
490 self.assertEqual(ldb.FLAG_MOD_ADD, m["bla"].flags())
491 l.modify(m)
492 rm = l.search(m.dn)[0]
493 self.assertEqual(2, len(rm))
494 self.assertEqual([b"1234", b"456"], list(rm["bla"]))
495 finally:
496 l.delete(ldb.Dn(l, "dc=add"))
498 def test_modify_add_text(self):
499 l = ldb.Ldb(self.url(), flags=self.flags())
500 m = ldb.Message()
501 m.dn = ldb.Dn(l, "dc=add")
502 m.text["bla"] = ["1234"]
503 l.add(m)
504 try:
505 m = ldb.Message()
506 m.dn = ldb.Dn(l, "dc=add")
507 m["bla"] = ldb.MessageElement(["456"], ldb.FLAG_MOD_ADD, "bla")
508 self.assertEqual(ldb.FLAG_MOD_ADD, m["bla"].flags())
509 l.modify(m)
510 rm = l.search(m.dn)[0]
511 self.assertEqual(2, len(rm))
512 self.assertEqual(["1234", "456"], list(rm.text["bla"]))
513 finally:
514 l.delete(ldb.Dn(l, "dc=add"))
516 def test_modify_replace(self):
517 l = ldb.Ldb(self.url(), flags=self.flags())
518 m = ldb.Message()
519 m.dn = ldb.Dn(l, "dc=modify2")
520 m["bla"] = [b"1234", b"456"]
521 l.add(m)
522 try:
523 m = ldb.Message()
524 m.dn = ldb.Dn(l, "dc=modify2")
525 m["bla"] = ldb.MessageElement([b"789"], ldb.FLAG_MOD_REPLACE, "bla")
526 self.assertEqual(ldb.FLAG_MOD_REPLACE, m["bla"].flags())
527 l.modify(m)
528 rm = l.search(m.dn)[0]
529 self.assertEqual(2, len(rm))
530 self.assertEqual([b"789"], list(rm["bla"]))
531 rm = l.search(m.dn, attrs=["bla"])[0]
532 self.assertEqual(1, len(rm))
533 finally:
534 l.delete(ldb.Dn(l, "dc=modify2"))
536 def test_modify_replace_text(self):
537 l = ldb.Ldb(self.url(), flags=self.flags())
538 m = ldb.Message()
539 m.dn = ldb.Dn(l, "dc=modify2")
540 m.text["bla"] = ["1234", "456"]
541 l.add(m)
542 try:
543 m = ldb.Message()
544 m.dn = ldb.Dn(l, "dc=modify2")
545 m["bla"] = ldb.MessageElement(["789"], ldb.FLAG_MOD_REPLACE, "bla")
546 self.assertEqual(ldb.FLAG_MOD_REPLACE, m["bla"].flags())
547 l.modify(m)
548 rm = l.search(m.dn)[0]
549 self.assertEqual(2, len(rm))
550 self.assertEqual(["789"], list(rm.text["bla"]))
551 rm = l.search(m.dn, attrs=["bla"])[0]
552 self.assertEqual(1, len(rm))
553 finally:
554 l.delete(ldb.Dn(l, "dc=modify2"))
556 def test_modify_flags_change(self):
557 l = ldb.Ldb(self.url(), flags=self.flags())
558 m = ldb.Message()
559 m.dn = ldb.Dn(l, "dc=add")
560 m["bla"] = [b"1234"]
561 l.add(m)
562 try:
563 m = ldb.Message()
564 m.dn = ldb.Dn(l, "dc=add")
565 m["bla"] = ldb.MessageElement([b"456"], ldb.FLAG_MOD_ADD, "bla")
566 self.assertEqual(ldb.FLAG_MOD_ADD, m["bla"].flags())
567 l.modify(m)
568 rm = l.search(m.dn)[0]
569 self.assertEqual(2, len(rm))
570 self.assertEqual([b"1234", b"456"], list(rm["bla"]))
572 # Now create another modify, but switch the flags before we do it
573 m["bla"] = ldb.MessageElement([b"456"], ldb.FLAG_MOD_ADD, "bla")
574 m["bla"].set_flags(ldb.FLAG_MOD_DELETE)
575 l.modify(m)
576 rm = l.search(m.dn, attrs=["bla"])[0]
577 self.assertEqual(1, len(rm))
578 self.assertEqual([b"1234"], list(rm["bla"]))
579 finally:
580 l.delete(ldb.Dn(l, "dc=add"))
582 def test_modify_flags_change_text(self):
583 l = ldb.Ldb(self.url(), flags=self.flags())
584 m = ldb.Message()
585 m.dn = ldb.Dn(l, "dc=add")
586 m.text["bla"] = ["1234"]
587 l.add(m)
588 try:
589 m = ldb.Message()
590 m.dn = ldb.Dn(l, "dc=add")
591 m["bla"] = ldb.MessageElement(["456"], ldb.FLAG_MOD_ADD, "bla")
592 self.assertEqual(ldb.FLAG_MOD_ADD, m["bla"].flags())
593 l.modify(m)
594 rm = l.search(m.dn)[0]
595 self.assertEqual(2, len(rm))
596 self.assertEqual(["1234", "456"], list(rm.text["bla"]))
598 # Now create another modify, but switch the flags before we do it
599 m["bla"] = ldb.MessageElement(["456"], ldb.FLAG_MOD_ADD, "bla")
600 m["bla"].set_flags(ldb.FLAG_MOD_DELETE)
601 l.modify(m)
602 rm = l.search(m.dn, attrs=["bla"])[0]
603 self.assertEqual(1, len(rm))
604 self.assertEqual(["1234"], list(rm.text["bla"]))
605 finally:
606 l.delete(ldb.Dn(l, "dc=add"))
608 def test_transaction_commit(self):
609 l = ldb.Ldb(self.url(), flags=self.flags())
610 l.transaction_start()
611 m = ldb.Message(ldb.Dn(l, "dc=foo9"))
612 m["foo"] = [b"bar"]
613 l.add(m)
614 l.transaction_commit()
615 l.delete(m.dn)
617 def test_transaction_cancel(self):
618 l = ldb.Ldb(self.url(), flags=self.flags())
619 l.transaction_start()
620 m = ldb.Message(ldb.Dn(l, "dc=foo10"))
621 m["foo"] = [b"bar"]
622 l.add(m)
623 l.transaction_cancel()
624 self.assertEqual(0, len(l.search(ldb.Dn(l, "dc=foo10"))))
626 def test_set_debug(self):
627 def my_report_fn(level, text):
628 pass
629 l = ldb.Ldb(self.url(), flags=self.flags())
630 l.set_debug(my_report_fn)
632 def test_zero_byte_string(self):
633 """Testing we do not get trapped in the \0 byte in a property string."""
634 l = ldb.Ldb(self.url(), flags=self.flags())
635 l.add({
636 "dn" : b"dc=somedn",
637 "objectclass" : b"user",
638 "cN" : b"LDAPtestUSER",
639 "givenname" : b"ldap",
640 "displayname" : b"foo\0bar",
642 res = l.search(expression="(dn=dc=somedn)")
643 self.assertEqual(b"foo\0bar", res[0]["displayname"][0])
645 def test_no_crash_broken_expr(self):
646 l = ldb.Ldb(self.url(), flags=self.flags())
647 self.assertRaises(ldb.LdbError,lambda: l.search("", ldb.SCOPE_SUBTREE, "&(dc=*)(dn=*)", ["dc"]))
649 class SearchTests(LdbBaseTest):
650 def tearDown(self):
651 shutil.rmtree(self.testdir)
652 super(SearchTests, self).tearDown()
654 # Ensure the LDB is closed now, so we close the FD
655 del(self.l)
658 def setUp(self):
659 super(SearchTests, self).setUp()
660 self.testdir = tempdir()
661 self.filename = os.path.join(self.testdir, "search_test.ldb")
662 options = ["modules:rdn_name"]
663 if hasattr(self, 'IDXCHECK'):
664 options.append("disable_full_db_scan_for_self_test:1")
665 self.l = ldb.Ldb(self.url(),
666 flags=self.flags(),
667 options=options)
668 try:
669 self.l.add(self.index)
670 except AttributeError:
671 pass
673 self.l.add({"dn": "@ATTRIBUTES",
674 "DC": "CASE_INSENSITIVE"})
676 # Note that we can't use the name objectGUID here, as we
677 # want to stay clear of the objectGUID handler in LDB and
678 # instead use just the 16 bytes raw, which we just keep
679 # to printable chars here for ease of handling.
681 self.l.add({"dn": "DC=SAMBA,DC=ORG",
682 "name": b"samba.org",
683 "objectUUID": b"0123456789abcddf"})
684 self.l.add({"dn": "OU=ADMIN,DC=SAMBA,DC=ORG",
685 "name": b"Admins",
686 "x": "z", "y": "a",
687 "objectUUID": b"0123456789abcde1"})
688 self.l.add({"dn": "OU=USERS,DC=SAMBA,DC=ORG",
689 "name": b"Users",
690 "x": "z", "y": "a",
691 "objectUUID": b"0123456789abcde2"})
692 self.l.add({"dn": "OU=OU1,DC=SAMBA,DC=ORG",
693 "name": b"OU #1",
694 "x": "y", "y": "a",
695 "objectUUID": b"0123456789abcde3"})
696 self.l.add({"dn": "OU=OU2,DC=SAMBA,DC=ORG",
697 "name": b"OU #2",
698 "x": "y", "y": "a",
699 "objectUUID": b"0123456789abcde4"})
700 self.l.add({"dn": "OU=OU3,DC=SAMBA,DC=ORG",
701 "name": b"OU #3",
702 "x": "y", "y": "a",
703 "objectUUID": b"0123456789abcde5"})
704 self.l.add({"dn": "OU=OU4,DC=SAMBA,DC=ORG",
705 "name": b"OU #4",
706 "x": "y", "y": "a",
707 "objectUUID": b"0123456789abcde6"})
708 self.l.add({"dn": "OU=OU5,DC=SAMBA,DC=ORG",
709 "name": b"OU #5",
710 "x": "y", "y": "a",
711 "objectUUID": b"0123456789abcde7"})
712 self.l.add({"dn": "OU=OU6,DC=SAMBA,DC=ORG",
713 "name": b"OU #6",
714 "x": "y", "y": "a",
715 "objectUUID": b"0123456789abcde8"})
716 self.l.add({"dn": "OU=OU7,DC=SAMBA,DC=ORG",
717 "name": b"OU #7",
718 "x": "y", "y": "a",
719 "objectUUID": b"0123456789abcde9"})
720 self.l.add({"dn": "OU=OU8,DC=SAMBA,DC=ORG",
721 "name": b"OU #8",
722 "x": "y", "y": "a",
723 "objectUUID": b"0123456789abcde0"})
724 self.l.add({"dn": "OU=OU9,DC=SAMBA,DC=ORG",
725 "name": b"OU #9",
726 "x": "y", "y": "a",
727 "objectUUID": b"0123456789abcdea"})
728 self.l.add({"dn": "OU=OU10,DC=SAMBA,DC=ORG",
729 "name": b"OU #10",
730 "x": "y", "y": "a",
731 "objectUUID": b"0123456789abcdeb"})
732 self.l.add({"dn": "OU=OU11,DC=SAMBA,DC=ORG",
733 "name": b"OU #10",
734 "x": "y", "y": "a",
735 "objectUUID": b"0123456789abcdec"})
736 self.l.add({"dn": "OU=OU12,DC=SAMBA,DC=ORG",
737 "name": b"OU #10",
738 "x": "y", "y": "b",
739 "objectUUID": b"0123456789abcded"})
740 self.l.add({"dn": "OU=OU13,DC=SAMBA,DC=ORG",
741 "name": b"OU #10",
742 "x": "x", "y": "b",
743 "objectUUID": b"0123456789abcdee"})
744 self.l.add({"dn": "OU=OU14,DC=SAMBA,DC=ORG",
745 "name": b"OU #10",
746 "x": "x", "y": "b",
747 "objectUUID": b"0123456789abcd01"})
748 self.l.add({"dn": "OU=OU15,DC=SAMBA,DC=ORG",
749 "name": b"OU #10",
750 "x": "x", "y": "b",
751 "objectUUID": b"0123456789abcd02"})
752 self.l.add({"dn": "OU=OU16,DC=SAMBA,DC=ORG",
753 "name": b"OU #10",
754 "x": "x", "y": "b",
755 "objectUUID": b"0123456789abcd03"})
756 self.l.add({"dn": "OU=OU17,DC=SAMBA,DC=ORG",
757 "name": b"OU #10",
758 "x": "x", "y": "b",
759 "objectUUID": b"0123456789abcd04"})
760 self.l.add({"dn": "OU=OU18,DC=SAMBA,DC=ORG",
761 "name": b"OU #10",
762 "x": "x", "y": "b",
763 "objectUUID": b"0123456789abcd05"})
764 self.l.add({"dn": "OU=OU19,DC=SAMBA,DC=ORG",
765 "name": b"OU #10",
766 "x": "x", "y": "b",
767 "objectUUID": b"0123456789abcd06"})
768 self.l.add({"dn": "OU=OU20,DC=SAMBA,DC=ORG",
769 "name": b"OU #10",
770 "x": "x", "y": "b",
771 "objectUUID": b"0123456789abcd07"})
772 self.l.add({"dn": "OU=OU21,DC=SAMBA,DC=ORG",
773 "name": b"OU #10",
774 "x": "x", "y": "c",
775 "objectUUID": b"0123456789abcd08"})
776 self.l.add({"dn": "OU=OU22,DC=SAMBA,DC=ORG",
777 "name": b"OU #10",
778 "x": "x", "y": "c",
779 "objectUUID": b"0123456789abcd09"})
781 def test_base(self):
782 """Testing a search"""
784 res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
785 scope=ldb.SCOPE_BASE)
786 self.assertEqual(len(res11), 1)
788 def test_base_lower(self):
789 """Testing a search"""
791 res11 = self.l.search(base="OU=OU11,DC=samba,DC=org",
792 scope=ldb.SCOPE_BASE)
793 self.assertEqual(len(res11), 1)
795 def test_base_or(self):
796 """Testing a search"""
798 res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
799 scope=ldb.SCOPE_BASE,
800 expression="(|(ou=ou11)(ou=ou12))")
801 self.assertEqual(len(res11), 1)
803 def test_base_or2(self):
804 """Testing a search"""
806 res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
807 scope=ldb.SCOPE_BASE,
808 expression="(|(x=y)(y=b))")
809 self.assertEqual(len(res11), 1)
811 def test_base_and(self):
812 """Testing a search"""
814 res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
815 scope=ldb.SCOPE_BASE,
816 expression="(&(ou=ou11)(ou=ou12))")
817 self.assertEqual(len(res11), 0)
819 def test_base_and2(self):
820 """Testing a search"""
822 res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
823 scope=ldb.SCOPE_BASE,
824 expression="(&(x=y)(y=a))")
825 self.assertEqual(len(res11), 1)
827 def test_base_false(self):
828 """Testing a search"""
830 res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
831 scope=ldb.SCOPE_BASE,
832 expression="(|(ou=ou13)(ou=ou12))")
833 self.assertEqual(len(res11), 0)
835 def test_check_base_false(self):
836 """Testing a search"""
837 res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
838 scope=ldb.SCOPE_BASE,
839 expression="(|(ou=ou13)(ou=ou12))")
840 self.assertEqual(len(res11), 0)
842 def test_check_base_error(self):
843 """Testing a search"""
844 checkbaseonsearch = {"dn": "@OPTIONS",
845 "checkBaseOnSearch": b"TRUE"}
846 try:
847 self.l.add(checkbaseonsearch)
848 except ldb.LdbError as err:
849 enum = err.args[0]
850 self.assertEqual(enum, ldb.ERR_ENTRY_ALREADY_EXISTS)
851 m = ldb.Message.from_dict(self.l,
852 checkbaseonsearch)
853 self.l.modify(m)
855 try:
856 res11 = self.l.search(base="OU=OU11x,DC=SAMBA,DC=ORG",
857 scope=ldb.SCOPE_BASE,
858 expression="(|(ou=ou13)(ou=ou12))")
859 self.fail("Should have failed on missing base")
860 except ldb.LdbError as err:
861 enum = err.args[0]
862 self.assertEqual(enum, ldb.ERR_NO_SUCH_OBJECT)
864 def test_subtree_and(self):
865 """Testing a search"""
867 res11 = self.l.search(base="DC=SAMBA,DC=ORG",
868 scope=ldb.SCOPE_SUBTREE,
869 expression="(&(ou=ou11)(ou=ou12))")
870 self.assertEqual(len(res11), 0)
872 def test_subtree_and2(self):
873 """Testing a search"""
875 res11 = self.l.search(base="DC=SAMBA,DC=ORG",
876 scope=ldb.SCOPE_SUBTREE,
877 expression="(&(x=y)(|(y=b)(y=c)))")
878 self.assertEqual(len(res11), 1)
880 def test_subtree_and2_lower(self):
881 """Testing a search"""
883 res11 = self.l.search(base="DC=samba,DC=org",
884 scope=ldb.SCOPE_SUBTREE,
885 expression="(&(x=y)(|(y=b)(y=c)))")
886 self.assertEqual(len(res11), 1)
888 def test_subtree_or(self):
889 """Testing a search"""
891 res11 = self.l.search(base="DC=SAMBA,DC=ORG",
892 scope=ldb.SCOPE_SUBTREE,
893 expression="(|(ou=ou11)(ou=ou12))")
894 self.assertEqual(len(res11), 2)
896 def test_subtree_or2(self):
897 """Testing a search"""
899 res11 = self.l.search(base="DC=SAMBA,DC=ORG",
900 scope=ldb.SCOPE_SUBTREE,
901 expression="(|(x=y)(y=b))")
902 self.assertEqual(len(res11), 20)
904 def test_subtree_or3(self):
905 """Testing a search"""
907 res11 = self.l.search(base="DC=SAMBA,DC=ORG",
908 scope=ldb.SCOPE_SUBTREE,
909 expression="(|(x=y)(y=b)(y=c))")
910 self.assertEqual(len(res11), 22)
912 def test_one_and(self):
913 """Testing a search"""
915 res11 = self.l.search(base="DC=SAMBA,DC=ORG",
916 scope=ldb.SCOPE_ONELEVEL,
917 expression="(&(ou=ou11)(ou=ou12))")
918 self.assertEqual(len(res11), 0)
920 def test_one_and2(self):
921 """Testing a search"""
923 res11 = self.l.search(base="DC=SAMBA,DC=ORG",
924 scope=ldb.SCOPE_ONELEVEL,
925 expression="(&(x=y)(y=b))")
926 self.assertEqual(len(res11), 1)
928 def test_one_or(self):
929 """Testing a search"""
931 res11 = self.l.search(base="DC=SAMBA,DC=ORG",
932 scope=ldb.SCOPE_ONELEVEL,
933 expression="(|(ou=ou11)(ou=ou12))")
934 self.assertEqual(len(res11), 2)
936 def test_one_or2(self):
937 """Testing a search"""
939 res11 = self.l.search(base="DC=SAMBA,DC=ORG",
940 scope=ldb.SCOPE_ONELEVEL,
941 expression="(|(x=y)(y=b))")
942 self.assertEqual(len(res11), 20)
944 def test_one_or2_lower(self):
945 """Testing a search"""
947 res11 = self.l.search(base="DC=samba,DC=org",
948 scope=ldb.SCOPE_ONELEVEL,
949 expression="(|(x=y)(y=b))")
950 self.assertEqual(len(res11), 20)
952 def test_one_unindexable(self):
953 """Testing a search"""
955 try:
956 res11 = self.l.search(base="DC=samba,DC=org",
957 scope=ldb.SCOPE_ONELEVEL,
958 expression="(y=b*)")
959 if hasattr(self, 'IDX') and \
960 not hasattr(self, 'IDXONE') and \
961 hasattr(self, 'IDXCHECK'):
962 self.fail("Should have failed as un-indexed search")
964 self.assertEqual(len(res11), 9)
966 except ldb.LdbError as err:
967 enum = err.args[0]
968 estr = err.args[1]
969 self.assertEqual(enum, ldb.ERR_INAPPROPRIATE_MATCHING)
970 self.assertIn(estr, "ldb FULL SEARCH disabled")
972 def test_one_unindexable_presence(self):
973 """Testing a search"""
975 try:
976 res11 = self.l.search(base="DC=samba,DC=org",
977 scope=ldb.SCOPE_ONELEVEL,
978 expression="(y=*)")
979 if hasattr(self, 'IDX') and \
980 not hasattr(self, 'IDXONE') and \
981 hasattr(self, 'IDXCHECK'):
982 self.fail("Should have failed as un-indexed search")
984 self.assertEqual(len(res11), 24)
986 except ldb.LdbError as err:
987 enum = err.args[0]
988 estr = err.args[1]
989 self.assertEqual(enum, ldb.ERR_INAPPROPRIATE_MATCHING)
990 self.assertIn(estr, "ldb FULL SEARCH disabled")
993 def test_subtree_and_or(self):
994 """Testing a search"""
996 res11 = self.l.search(base="DC=SAMBA,DC=ORG",
997 scope=ldb.SCOPE_SUBTREE,
998 expression="(&(|(x=z)(y=b))(x=x)(y=c))")
999 self.assertEqual(len(res11), 0)
1001 def test_subtree_and_or2(self):
1002 """Testing a search"""
1004 res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1005 scope=ldb.SCOPE_SUBTREE,
1006 expression="(&(x=x)(y=c)(|(x=z)(y=b)))")
1007 self.assertEqual(len(res11), 0)
1009 def test_subtree_and_or3(self):
1010 """Testing a search"""
1012 res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1013 scope=ldb.SCOPE_SUBTREE,
1014 expression="(&(|(ou=ou11)(ou=ou10))(|(x=y)(y=b)(y=c)))")
1015 self.assertEqual(len(res11), 2)
1017 def test_subtree_and_or4(self):
1018 """Testing a search"""
1020 res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1021 scope=ldb.SCOPE_SUBTREE,
1022 expression="(&(|(x=y)(y=b)(y=c))(|(ou=ou11)(ou=ou10)))")
1023 self.assertEqual(len(res11), 2)
1025 def test_subtree_and_or5(self):
1026 """Testing a search"""
1028 res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1029 scope=ldb.SCOPE_SUBTREE,
1030 expression="(&(|(x=y)(y=b)(y=c))(ou=ou11))")
1031 self.assertEqual(len(res11), 1)
1033 def test_subtree_or_and(self):
1034 """Testing a search"""
1036 res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1037 scope=ldb.SCOPE_SUBTREE,
1038 expression="(|(x=x)(y=c)(&(x=z)(y=b)))")
1039 self.assertEqual(len(res11), 10)
1041 def test_subtree_large_and_unique(self):
1042 """Testing a search"""
1044 res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1045 scope=ldb.SCOPE_SUBTREE,
1046 expression="(&(ou=ou10)(y=a))")
1047 self.assertEqual(len(res11), 1)
1049 def test_subtree_and_none(self):
1050 """Testing a search"""
1052 res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1053 scope=ldb.SCOPE_SUBTREE,
1054 expression="(&(ou=ouX)(y=a))")
1055 self.assertEqual(len(res11), 0)
1057 def test_subtree_and_idx_record(self):
1058 """Testing a search against the index record"""
1060 res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1061 scope=ldb.SCOPE_SUBTREE,
1062 expression="(@IDXDN=DC=SAMBA,DC=ORG)")
1063 self.assertEqual(len(res11), 0)
1065 def test_subtree_and_idxone_record(self):
1066 """Testing a search against the index record"""
1068 res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1069 scope=ldb.SCOPE_SUBTREE,
1070 expression="(@IDXONE=DC=SAMBA,DC=ORG)")
1071 self.assertEqual(len(res11), 0)
1073 def test_subtree_unindexable(self):
1074 """Testing a search"""
1076 try:
1077 res11 = self.l.search(base="DC=samba,DC=org",
1078 scope=ldb.SCOPE_SUBTREE,
1079 expression="(y=b*)")
1080 if hasattr(self, 'IDX') and \
1081 hasattr(self, 'IDXCHECK'):
1082 self.fail("Should have failed as un-indexed search")
1084 self.assertEqual(len(res11), 9)
1086 except ldb.LdbError as err:
1087 enum = err.args[0]
1088 estr = err.args[1]
1089 self.assertEqual(enum, ldb.ERR_INAPPROPRIATE_MATCHING)
1090 self.assertIn(estr, "ldb FULL SEARCH disabled")
1092 def test_subtree_unindexable_presence(self):
1093 """Testing a search"""
1095 try:
1096 res11 = self.l.search(base="DC=samba,DC=org",
1097 scope=ldb.SCOPE_SUBTREE,
1098 expression="(y=*)")
1099 if hasattr(self, 'IDX') and \
1100 hasattr(self, 'IDXCHECK'):
1101 self.fail("Should have failed as un-indexed search")
1103 self.assertEqual(len(res11), 24)
1105 except ldb.LdbError as err:
1106 enum = err.args[0]
1107 estr = err.args[1]
1108 self.assertEqual(enum, ldb.ERR_INAPPROPRIATE_MATCHING)
1109 self.assertIn(estr, "ldb FULL SEARCH disabled")
1112 def test_dn_filter_one(self):
1113 """Testing that a dn= filter succeeds
1114 (or fails with disallowDNFilter
1115 set and IDXGUID or (IDX and not IDXONE) mode)
1116 when the scope is SCOPE_ONELEVEL.
1118 This should be made more consistent, but for now lock in
1119 the behaviour
1123 res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1124 scope=ldb.SCOPE_ONELEVEL,
1125 expression="(dn=OU=OU1,DC=SAMBA,DC=ORG)")
1126 if hasattr(self, 'disallowDNFilter') and \
1127 hasattr(self, 'IDX') and \
1128 (hasattr(self, 'IDXGUID') or \
1129 ((hasattr(self, 'IDXONE') == False and hasattr(self, 'IDX')))):
1130 self.assertEqual(len(res11), 0)
1131 else:
1132 self.assertEqual(len(res11), 1)
1134 def test_dn_filter_subtree(self):
1135 """Testing that a dn= filter succeeds
1136 (or fails with disallowDNFilter set)
1137 when the scope is SCOPE_SUBTREE"""
1139 res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1140 scope=ldb.SCOPE_SUBTREE,
1141 expression="(dn=OU=OU1,DC=SAMBA,DC=ORG)")
1142 if hasattr(self, 'disallowDNFilter') \
1143 and hasattr(self, 'IDX'):
1144 self.assertEqual(len(res11), 0)
1145 else:
1146 self.assertEqual(len(res11), 1)
1148 def test_dn_filter_base(self):
1149 """Testing that (incorrectly) a dn= filter works
1150 when the scope is SCOPE_BASE"""
1152 res11 = self.l.search(base="OU=OU1,DC=SAMBA,DC=ORG",
1153 scope=ldb.SCOPE_BASE,
1154 expression="(dn=OU=OU1,DC=SAMBA,DC=ORG)")
1156 # At some point we should fix this, but it isn't trivial
1157 self.assertEqual(len(res11), 1)
1159 def test_distinguishedName_filter_one(self):
1160 """Testing that a distinguishedName= filter succeeds
1161 when the scope is SCOPE_ONELEVEL.
1163 This should be made more consistent, but for now lock in
1164 the behaviour
1168 res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1169 scope=ldb.SCOPE_ONELEVEL,
1170 expression="(distinguishedName=OU=OU1,DC=SAMBA,DC=ORG)")
1171 self.assertEqual(len(res11), 1)
1173 def test_distinguishedName_filter_subtree(self):
1174 """Testing that a distinguishedName= filter succeeds
1175 when the scope is SCOPE_SUBTREE"""
1177 res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1178 scope=ldb.SCOPE_SUBTREE,
1179 expression="(distinguishedName=OU=OU1,DC=SAMBA,DC=ORG)")
1180 self.assertEqual(len(res11), 1)
1182 def test_distinguishedName_filter_base(self):
1183 """Testing that (incorrectly) a distinguishedName= filter works
1184 when the scope is SCOPE_BASE"""
1186 res11 = self.l.search(base="OU=OU1,DC=SAMBA,DC=ORG",
1187 scope=ldb.SCOPE_BASE,
1188 expression="(distinguishedName=OU=OU1,DC=SAMBA,DC=ORG)")
1190 # At some point we should fix this, but it isn't trivial
1191 self.assertEqual(len(res11), 1)
1193 def test_bad_dn_filter_base(self):
1194 """Testing that a dn= filter on an invalid DN works
1195 when the scope is SCOPE_BASE but
1196 returns zero results"""
1198 res11 = self.l.search(base="OU=OU1,DC=SAMBA,DC=ORG",
1199 scope=ldb.SCOPE_BASE,
1200 expression="(dn=OU=OU1,DC=SAMBA,DCXXXX)")
1202 # At some point we should fix this, but it isn't trivial
1203 self.assertEqual(len(res11), 0)
1206 def test_bad_dn_filter_one(self):
1207 """Testing that a dn= filter succeeds but returns zero
1208 results when the DN is not valid on a SCOPE_ONELEVEL search
1212 res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1213 scope=ldb.SCOPE_ONELEVEL,
1214 expression="(dn=OU=OU1,DC=SAMBA,DCXXXX)")
1215 self.assertEqual(len(res11), 0)
1217 def test_bad_dn_filter_subtree(self):
1218 """Testing that a dn= filter succeeds but returns zero
1219 results when the DN is not valid on a SCOPE_SUBTREE search
1223 res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1224 scope=ldb.SCOPE_SUBTREE,
1225 expression="(dn=OU=OU1,DC=SAMBA,DCXXXX)")
1226 self.assertEqual(len(res11), 0)
1228 def test_bad_distinguishedName_filter_base(self):
1229 """Testing that a distinguishedName= filter on an invalid DN works
1230 when the scope is SCOPE_BASE but
1231 returns zero results"""
1233 res11 = self.l.search(base="OU=OU1,DC=SAMBA,DC=ORG",
1234 scope=ldb.SCOPE_BASE,
1235 expression="(distinguishedName=OU=OU1,DC=SAMBA,DCXXXX)")
1237 # At some point we should fix this, but it isn't trivial
1238 self.assertEqual(len(res11), 0)
1241 def test_bad_distinguishedName_filter_one(self):
1242 """Testing that a distinguishedName= filter succeeds but returns zero
1243 results when the DN is not valid on a SCOPE_ONELEVEL search
1247 res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1248 scope=ldb.SCOPE_ONELEVEL,
1249 expression="(distinguishedName=OU=OU1,DC=SAMBA,DCXXXX)")
1250 self.assertEqual(len(res11), 0)
1252 def test_bad_distinguishedName_filter_subtree(self):
1253 """Testing that a distinguishedName= filter succeeds but returns zero
1254 results when the DN is not valid on a SCOPE_SUBTREE search
1258 res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1259 scope=ldb.SCOPE_SUBTREE,
1260 expression="(distinguishedName=OU=OU1,DC=SAMBA,DCXXXX)")
1261 self.assertEqual(len(res11), 0)
1264 class IndexedSearchTests(SearchTests):
1265 """Test searches using the index, to ensure the index doesn't
1266 break things"""
1267 def setUp(self):
1268 super(IndexedSearchTests, self).setUp()
1269 self.l.add({"dn": "@INDEXLIST",
1270 "@IDXATTR": [b"x", b"y", b"ou"]})
1271 self.IDX = True
1273 class IndexedCheckSearchTests(IndexedSearchTests):
1274 """Test searches using the index, to ensure the index doesn't
1275 break things (full scan disabled)"""
1276 def setUp(self):
1277 self.IDXCHECK = True
1278 super(IndexedCheckSearchTests, self).setUp()
1280 class IndexedSearchDnFilterTests(SearchTests):
1281 """Test searches using the index, to ensure the index doesn't
1282 break things"""
1283 def setUp(self):
1284 super(IndexedSearchDnFilterTests, self).setUp()
1285 self.l.add({"dn": "@OPTIONS",
1286 "disallowDNFilter": "TRUE"})
1287 self.disallowDNFilter = True
1289 self.l.add({"dn": "@INDEXLIST",
1290 "@IDXATTR": [b"x", b"y", b"ou"]})
1291 self.IDX = True
1293 class IndexedAndOneLevelSearchTests(SearchTests):
1294 """Test searches using the index including @IDXONE, to ensure
1295 the index doesn't break things"""
1296 def setUp(self):
1297 super(IndexedAndOneLevelSearchTests, self).setUp()
1298 self.l.add({"dn": "@INDEXLIST",
1299 "@IDXATTR": [b"x", b"y", b"ou"],
1300 "@IDXONE": [b"1"]})
1301 self.IDX = True
1302 self.IDXONE = True
1304 class IndexedCheckedAndOneLevelSearchTests(IndexedAndOneLevelSearchTests):
1305 """Test searches using the index including @IDXONE, to ensure
1306 the index doesn't break things (full scan disabled)"""
1307 def setUp(self):
1308 self.IDXCHECK = True
1309 super(IndexedCheckedAndOneLevelSearchTests, self).setUp()
1311 class IndexedAndOneLevelDNFilterSearchTests(SearchTests):
1312 """Test searches using the index including @IDXONE, to ensure
1313 the index doesn't break things"""
1314 def setUp(self):
1315 super(IndexedAndOneLevelDNFilterSearchTests, self).setUp()
1316 self.l.add({"dn": "@OPTIONS",
1317 "disallowDNFilter": "TRUE"})
1318 self.disallowDNFilter = True
1320 self.l.add({"dn": "@INDEXLIST",
1321 "@IDXATTR": [b"x", b"y", b"ou"],
1322 "@IDXONE": [b"1"]})
1323 self.IDX = True
1324 self.IDXONE = True
1326 class GUIDIndexedSearchTests(SearchTests):
1327 """Test searches using the index, to ensure the index doesn't
1328 break things"""
1329 def setUp(self):
1330 super(GUIDIndexedSearchTests, self).setUp()
1332 self.l.add({"dn": "@INDEXLIST",
1333 "@IDXATTR": [b"x", b"y", b"ou"],
1334 "@IDXGUID": [b"objectUUID"],
1335 "@IDX_DN_GUID": [b"GUID"]})
1336 self.IDXGUID = True
1337 self.IDXONE = True
1340 class GUIDIndexedDNFilterSearchTests(SearchTests):
1341 """Test searches using the index, to ensure the index doesn't
1342 break things"""
1343 def setUp(self):
1344 super(GUIDIndexedDNFilterSearchTests, self).setUp()
1345 self.l.add({"dn": "@OPTIONS",
1346 "disallowDNFilter": "TRUE"})
1347 self.disallowDNFilter = True
1349 self.l.add({"dn": "@INDEXLIST",
1350 "@IDXATTR": [b"x", b"y", b"ou"],
1351 "@IDXGUID": [b"objectUUID"],
1352 "@IDX_DN_GUID": [b"GUID"]})
1353 self.IDX = True
1354 self.IDXGUID = True
1356 class GUIDAndOneLevelIndexedSearchTests(SearchTests):
1357 """Test searches using the index including @IDXONE, to ensure
1358 the index doesn't break things"""
1359 def setUp(self):
1360 super(GUIDAndOneLevelIndexedSearchTests, self).setUp()
1361 self.l.add({"dn": "@OPTIONS",
1362 "disallowDNFilter": "TRUE"})
1363 self.disallowDNFilter = True
1365 self.l.add({"dn": "@INDEXLIST",
1366 "@IDXATTR": [b"x", b"y", b"ou"],
1367 "@IDXONE": [b"1"],
1368 "@IDXGUID": [b"objectUUID"],
1369 "@IDX_DN_GUID": [b"GUID"]})
1370 self.IDX = True
1371 self.IDXGUID = True
1372 self.IDXONE = True
1375 class AddModifyTests(LdbBaseTest):
1376 def tearDown(self):
1377 shutil.rmtree(self.testdir)
1378 super(AddModifyTests, self).tearDown()
1380 # Ensure the LDB is closed now, so we close the FD
1381 del(self.l)
1383 def setUp(self):
1384 super(AddModifyTests, self).setUp()
1385 self.testdir = tempdir()
1386 self.filename = os.path.join(self.testdir, "add_test.ldb")
1387 self.l = ldb.Ldb(self.url(),
1388 flags=self.flags(),
1389 options=["modules:rdn_name"])
1390 self.l.add({"dn": "DC=SAMBA,DC=ORG",
1391 "name": b"samba.org",
1392 "objectUUID": b"0123456789abcdef"})
1393 self.l.add({"dn": "@ATTRIBUTES",
1394 "objectUUID": "UNIQUE_INDEX"})
1396 def test_add_dup(self):
1397 self.l.add({"dn": "OU=DUP,DC=SAMBA,DC=ORG",
1398 "name": b"Admins",
1399 "x": "z", "y": "a",
1400 "objectUUID": b"0123456789abcde1"})
1401 try:
1402 self.l.add({"dn": "OU=DUP,DC=SAMBA,DC=ORG",
1403 "name": b"Admins",
1404 "x": "z", "y": "a",
1405 "objectUUID": b"0123456789abcde2"})
1406 self.fail("Should have failed adding dupliate entry")
1407 except ldb.LdbError as err:
1408 enum = err.args[0]
1409 self.assertEqual(enum, ldb.ERR_ENTRY_ALREADY_EXISTS)
1411 def test_add_bad(self):
1412 try:
1413 self.l.add({"dn": "BAD,DC=SAMBA,DC=ORG",
1414 "name": b"Admins",
1415 "x": "z", "y": "a",
1416 "objectUUID": b"0123456789abcde1"})
1417 self.fail("Should have failed adding entry with invalid DN")
1418 except ldb.LdbError as err:
1419 enum = err.args[0]
1420 self.assertEqual(enum, ldb.ERR_INVALID_DN_SYNTAX)
1422 def test_add_del_add(self):
1423 self.l.add({"dn": "OU=DUP,DC=SAMBA,DC=ORG",
1424 "name": b"Admins",
1425 "x": "z", "y": "a",
1426 "objectUUID": b"0123456789abcde1"})
1427 self.l.delete("OU=DUP,DC=SAMBA,DC=ORG")
1428 self.l.add({"dn": "OU=DUP,DC=SAMBA,DC=ORG",
1429 "name": b"Admins",
1430 "x": "z", "y": "a",
1431 "objectUUID": b"0123456789abcde2"})
1433 def test_add_move_add(self):
1434 self.l.add({"dn": "OU=DUP,DC=SAMBA,DC=ORG",
1435 "name": b"Admins",
1436 "x": "z", "y": "a",
1437 "objectUUID": b"0123456789abcde1"})
1438 self.l.rename("OU=DUP,DC=SAMBA,DC=ORG",
1439 "OU=DUP2,DC=SAMBA,DC=ORG")
1440 self.l.add({"dn": "OU=DUP,DC=SAMBA,DC=ORG",
1441 "name": b"Admins",
1442 "x": "z", "y": "a",
1443 "objectUUID": b"0123456789abcde2"})
1445 def test_add_move_fail_move_move(self):
1446 self.l.add({"dn": "OU=DUP,DC=SAMBA,DC=ORG",
1447 "name": b"Admins",
1448 "x": "z", "y": "a",
1449 "objectUUID": b"0123456789abcde1"})
1450 self.l.add({"dn": "OU=DUP2,DC=SAMBA,DC=ORG",
1451 "name": b"Admins",
1452 "x": "z", "y": "a",
1453 "objectUUID": b"0123456789abcde2"})
1454 try:
1455 self.l.rename("OU=DUP,DC=SAMBA,DC=ORG",
1456 "OU=DUP2,DC=SAMBA,DC=ORG")
1457 self.fail("Should have failed on duplicate DN")
1458 except ldb.LdbError as err:
1459 enum = err.args[0]
1460 self.assertEqual(enum, ldb.ERR_ENTRY_ALREADY_EXISTS)
1462 self.l.rename("OU=DUP2,DC=SAMBA,DC=ORG",
1463 "OU=DUP3,DC=SAMBA,DC=ORG")
1465 self.l.rename("OU=DUP,DC=SAMBA,DC=ORG",
1466 "OU=DUP2,DC=SAMBA,DC=ORG")
1468 res2 = self.l.search(base="DC=SAMBA,DC=ORG",
1469 scope=ldb.SCOPE_SUBTREE,
1470 expression="(objectUUID=0123456789abcde1)")
1471 self.assertEqual(len(res2), 1)
1472 self.assertEqual(str(res2[0].dn), "OU=DUP2,DC=SAMBA,DC=ORG")
1474 res3 = self.l.search(base="DC=SAMBA,DC=ORG",
1475 scope=ldb.SCOPE_SUBTREE,
1476 expression="(objectUUID=0123456789abcde2)")
1477 self.assertEqual(len(res3), 1)
1478 self.assertEqual(str(res3[0].dn), "OU=DUP3,DC=SAMBA,DC=ORG")
1480 def test_move_missing(self):
1481 try:
1482 self.l.rename("OU=DUP,DC=SAMBA,DC=ORG",
1483 "OU=DUP2,DC=SAMBA,DC=ORG")
1484 self.fail("Should have failed on missing")
1485 except ldb.LdbError as err:
1486 enum = err.args[0]
1487 self.assertEqual(enum, ldb.ERR_NO_SUCH_OBJECT)
1489 def test_move_missing2(self):
1490 self.l.add({"dn": "OU=DUP2,DC=SAMBA,DC=ORG",
1491 "name": b"Admins",
1492 "x": "z", "y": "a",
1493 "objectUUID": b"0123456789abcde2"})
1495 try:
1496 self.l.rename("OU=DUP,DC=SAMBA,DC=ORG",
1497 "OU=DUP2,DC=SAMBA,DC=ORG")
1498 self.fail("Should have failed on missing")
1499 except ldb.LdbError as err:
1500 enum = err.args[0]
1501 self.assertEqual(enum, ldb.ERR_NO_SUCH_OBJECT)
1503 def test_move_bad(self):
1504 self.l.add({"dn": "OU=DUP2,DC=SAMBA,DC=ORG",
1505 "name": b"Admins",
1506 "x": "z", "y": "a",
1507 "objectUUID": b"0123456789abcde2"})
1509 try:
1510 self.l.rename("OUXDUP,DC=SAMBA,DC=ORG",
1511 "OU=DUP2,DC=SAMBA,DC=ORG")
1512 self.fail("Should have failed on invalid DN")
1513 except ldb.LdbError as err:
1514 enum = err.args[0]
1515 self.assertEqual(enum, ldb.ERR_INVALID_DN_SYNTAX)
1517 def test_move_bad2(self):
1518 self.l.add({"dn": "OU=DUP2,DC=SAMBA,DC=ORG",
1519 "name": b"Admins",
1520 "x": "z", "y": "a",
1521 "objectUUID": b"0123456789abcde2"})
1523 try:
1524 self.l.rename("OU=DUP,DC=SAMBA,DC=ORG",
1525 "OUXDUP2,DC=SAMBA,DC=ORG")
1526 self.fail("Should have failed on missing")
1527 except ldb.LdbError as err:
1528 enum = err.args[0]
1529 self.assertEqual(enum, ldb.ERR_INVALID_DN_SYNTAX)
1531 def test_move_fail_move_add(self):
1532 self.l.add({"dn": "OU=DUP,DC=SAMBA,DC=ORG",
1533 "name": b"Admins",
1534 "x": "z", "y": "a",
1535 "objectUUID": b"0123456789abcde1"})
1536 self.l.add({"dn": "OU=DUP2,DC=SAMBA,DC=ORG",
1537 "name": b"Admins",
1538 "x": "z", "y": "a",
1539 "objectUUID": b"0123456789abcde2"})
1540 try:
1541 self.l.rename("OU=DUP,DC=SAMBA,DC=ORG",
1542 "OU=DUP2,DC=SAMBA,DC=ORG")
1543 self.fail("Should have failed on duplicate DN")
1544 except ldb.LdbError as err:
1545 enum = err.args[0]
1546 self.assertEqual(enum, ldb.ERR_ENTRY_ALREADY_EXISTS)
1548 self.l.rename("OU=DUP2,DC=SAMBA,DC=ORG",
1549 "OU=DUP3,DC=SAMBA,DC=ORG")
1551 self.l.add({"dn": "OU=DUP2,DC=SAMBA,DC=ORG",
1552 "name": b"Admins",
1553 "x": "z", "y": "a",
1554 "objectUUID": b"0123456789abcde3"})
1557 class IndexedAddModifyTests(AddModifyTests):
1558 """Test searches using the index, to ensure the index doesn't
1559 break things"""
1560 def setUp(self):
1561 super(IndexedAddModifyTests, self).setUp()
1562 self.l.add({"dn": "@INDEXLIST",
1563 "@IDXATTR": [b"x", b"y", b"ou", b"objectUUID"],
1564 "@IDXONE": [b"1"]})
1566 def test_duplicate_GUID(self):
1567 try:
1568 self.l.add({"dn": "OU=DUPGUID,DC=SAMBA,DC=ORG",
1569 "name": b"Admins",
1570 "x": "z", "y": "a",
1571 "objectUUID": b"0123456789abcdef"})
1572 self.fail("Should have failed adding dupliate GUID")
1573 except ldb.LdbError as err:
1574 enum = err.args[0]
1575 self.assertEqual(enum, ldb.ERR_CONSTRAINT_VIOLATION)
1577 def test_duplicate_name_dup_GUID(self):
1578 self.l.add({"dn": "OU=ADMIN,DC=SAMBA,DC=ORG",
1579 "name": b"Admins",
1580 "x": "z", "y": "a",
1581 "objectUUID": b"a123456789abcdef"})
1582 try:
1583 self.l.add({"dn": "OU=ADMIN,DC=SAMBA,DC=ORG",
1584 "name": b"Admins",
1585 "x": "z", "y": "a",
1586 "objectUUID": b"a123456789abcdef"})
1587 self.fail("Should have failed adding dupliate GUID")
1588 except ldb.LdbError as err:
1589 enum = err.args[0]
1590 self.assertEqual(enum, ldb.ERR_ENTRY_ALREADY_EXISTS)
1592 def test_duplicate_name_dup_GUID2(self):
1593 self.l.add({"dn": "OU=ADMIN,DC=SAMBA,DC=ORG",
1594 "name": b"Admins",
1595 "x": "z", "y": "a",
1596 "objectUUID": b"abc3456789abcdef"})
1597 try:
1598 self.l.add({"dn": "OU=ADMIN,DC=SAMBA,DC=ORG",
1599 "name": b"Admins",
1600 "x": "z", "y": "a",
1601 "objectUUID": b"aaa3456789abcdef"})
1602 self.fail("Should have failed adding dupliate DN")
1603 except ldb.LdbError as err:
1604 enum = err.args[0]
1605 self.assertEqual(enum, ldb.ERR_ENTRY_ALREADY_EXISTS)
1607 # Checking the GUID didn't stick in the index
1608 self.l.add({"dn": "OU=DUP,DC=SAMBA,DC=ORG",
1609 "name": b"Admins",
1610 "x": "z", "y": "a",
1611 "objectUUID": b"aaa3456789abcdef"})
1613 def test_add_dup_guid_add(self):
1614 self.l.add({"dn": "OU=DUP,DC=SAMBA,DC=ORG",
1615 "name": b"Admins",
1616 "x": "z", "y": "a",
1617 "objectUUID": b"0123456789abcde1"})
1618 try:
1619 self.l.add({"dn": "OU=DUP2,DC=SAMBA,DC=ORG",
1620 "name": b"Admins",
1621 "x": "z", "y": "a",
1622 "objectUUID": b"0123456789abcde1"})
1623 self.fail("Should have failed on duplicate GUID")
1625 except ldb.LdbError as err:
1626 enum = err.args[0]
1627 self.assertEqual(enum, ldb.ERR_CONSTRAINT_VIOLATION)
1629 self.l.add({"dn": "OU=DUP2,DC=SAMBA,DC=ORG",
1630 "name": b"Admins",
1631 "x": "z", "y": "a",
1632 "objectUUID": b"0123456789abcde2"})
1634 class GUIDIndexedAddModifyTests(IndexedAddModifyTests):
1635 """Test searches using the index, to ensure the index doesn't
1636 break things"""
1637 def setUp(self):
1638 super(GUIDIndexedAddModifyTests, self).setUp()
1639 indexlist = {"dn": "@INDEXLIST",
1640 "@IDXATTR": [b"x", b"y", b"ou"],
1641 "@IDXONE": [b"1"],
1642 "@IDXGUID": [b"objectUUID"],
1643 "@IDX_DN_GUID": [b"GUID"]}
1644 m = ldb.Message.from_dict(self.l, indexlist, ldb.FLAG_MOD_REPLACE)
1645 self.l.modify(m)
1648 class GUIDTransIndexedAddModifyTests(GUIDIndexedAddModifyTests):
1649 """Test GUID index behaviour insdie the transaction"""
1650 def setUp(self):
1651 super(GUIDTransIndexedAddModifyTests, self).setUp()
1652 self.l.transaction_start()
1654 def tearDown(self):
1655 self.l.transaction_commit()
1656 super(GUIDTransIndexedAddModifyTests, self).tearDown()
1658 class TransIndexedAddModifyTests(IndexedAddModifyTests):
1659 """Test index behaviour insdie the transaction"""
1660 def setUp(self):
1661 super(TransIndexedAddModifyTests, self).setUp()
1662 self.l.transaction_start()
1664 def tearDown(self):
1665 self.l.transaction_commit()
1666 super(TransIndexedAddModifyTests, self).tearDown()
1669 class BadIndexTests(LdbBaseTest):
1670 def setUp(self):
1671 super(BadIndexTests, self).setUp()
1672 self.testdir = tempdir()
1673 self.filename = os.path.join(self.testdir, "test.ldb")
1674 self.ldb = ldb.Ldb(self.url(), flags=self.flags())
1675 if hasattr(self, 'IDXGUID'):
1676 self.ldb.add({"dn": "@INDEXLIST",
1677 "@IDXATTR": [b"x", b"y", b"ou"],
1678 "@IDXGUID": [b"objectUUID"],
1679 "@IDX_DN_GUID": [b"GUID"]})
1680 else:
1681 self.ldb.add({"dn": "@INDEXLIST",
1682 "@IDXATTR": [b"x", b"y", b"ou"]})
1684 super(BadIndexTests, self).setUp()
1686 def test_unique(self):
1687 self.ldb.add({"dn": "x=x,dc=samba,dc=org",
1688 "objectUUID": b"0123456789abcde1",
1689 "y": "1"})
1690 self.ldb.add({"dn": "x=y,dc=samba,dc=org",
1691 "objectUUID": b"0123456789abcde2",
1692 "y": "1"})
1693 self.ldb.add({"dn": "x=z,dc=samba,dc=org",
1694 "objectUUID": b"0123456789abcde3",
1695 "y": "1"})
1697 res = self.ldb.search(expression="(y=1)",
1698 base="dc=samba,dc=org")
1699 self.assertEquals(len(res), 3)
1701 # Now set this to unique index, but forget to check the result
1702 try:
1703 self.ldb.add({"dn": "@ATTRIBUTES",
1704 "y": "UNIQUE_INDEX"})
1705 self.fail()
1706 except ldb.LdbError:
1707 pass
1709 # We must still have a working index
1710 res = self.ldb.search(expression="(y=1)",
1711 base="dc=samba,dc=org")
1712 self.assertEquals(len(res), 3)
1714 def test_unique_transaction(self):
1715 self.ldb.add({"dn": "x=x,dc=samba,dc=org",
1716 "objectUUID": b"0123456789abcde1",
1717 "y": "1"})
1718 self.ldb.add({"dn": "x=y,dc=samba,dc=org",
1719 "objectUUID": b"0123456789abcde2",
1720 "y": "1"})
1721 self.ldb.add({"dn": "x=z,dc=samba,dc=org",
1722 "objectUUID": b"0123456789abcde3",
1723 "y": "1"})
1725 res = self.ldb.search(expression="(y=1)",
1726 base="dc=samba,dc=org")
1727 self.assertEquals(len(res), 3)
1729 self.ldb.transaction_start()
1731 # Now set this to unique index, but forget to check the result
1732 try:
1733 self.ldb.add({"dn": "@ATTRIBUTES",
1734 "y": "UNIQUE_INDEX"})
1735 except ldb.LdbError:
1736 pass
1738 try:
1739 self.ldb.transaction_commit()
1740 self.fail()
1742 except ldb.LdbError as err:
1743 enum = err.args[0]
1744 self.assertEqual(enum, ldb.ERR_OPERATIONS_ERROR)
1746 # We must still have a working index
1747 res = self.ldb.search(expression="(y=1)",
1748 base="dc=samba,dc=org")
1750 self.assertEquals(len(res), 3)
1752 def test_casefold(self):
1753 self.ldb.add({"dn": "x=x,dc=samba,dc=org",
1754 "objectUUID": b"0123456789abcde1",
1755 "y": "a"})
1756 self.ldb.add({"dn": "x=y,dc=samba,dc=org",
1757 "objectUUID": b"0123456789abcde2",
1758 "y": "A"})
1759 self.ldb.add({"dn": "x=z,dc=samba,dc=org",
1760 "objectUUID": b"0123456789abcde3",
1761 "y": ["a", "A"]})
1763 res = self.ldb.search(expression="(y=a)",
1764 base="dc=samba,dc=org")
1765 self.assertEquals(len(res), 2)
1767 self.ldb.add({"dn": "@ATTRIBUTES",
1768 "y": "CASE_INSENSITIVE"})
1770 # We must still have a working index
1771 res = self.ldb.search(expression="(y=a)",
1772 base="dc=samba,dc=org")
1774 if hasattr(self, 'IDXGUID'):
1775 self.assertEquals(len(res), 3)
1776 else:
1777 # We should not return this entry twice, but sadly
1778 # we have not yet fixed
1779 # https://bugzilla.samba.org/show_bug.cgi?id=13361
1780 self.assertEquals(len(res), 4)
1782 def test_casefold_transaction(self):
1783 self.ldb.add({"dn": "x=x,dc=samba,dc=org",
1784 "objectUUID": b"0123456789abcde1",
1785 "y": "a"})
1786 self.ldb.add({"dn": "x=y,dc=samba,dc=org",
1787 "objectUUID": b"0123456789abcde2",
1788 "y": "A"})
1789 self.ldb.add({"dn": "x=z,dc=samba,dc=org",
1790 "objectUUID": b"0123456789abcde3",
1791 "y": ["a", "A"]})
1793 res = self.ldb.search(expression="(y=a)",
1794 base="dc=samba,dc=org")
1795 self.assertEquals(len(res), 2)
1797 self.ldb.transaction_start()
1799 self.ldb.add({"dn": "@ATTRIBUTES",
1800 "y": "CASE_INSENSITIVE"})
1802 self.ldb.transaction_commit()
1804 # We must still have a working index
1805 res = self.ldb.search(expression="(y=a)",
1806 base="dc=samba,dc=org")
1808 if hasattr(self, 'IDXGUID'):
1809 self.assertEquals(len(res), 3)
1810 else:
1811 # We should not return this entry twice, but sadly
1812 # we have not yet fixed
1813 # https://bugzilla.samba.org/show_bug.cgi?id=13361
1814 self.assertEquals(len(res), 4)
1817 def tearDown(self):
1818 super(BadIndexTests, self).tearDown()
1821 class GUIDBadIndexTests(BadIndexTests):
1822 """Test Bad index things with GUID index mode"""
1823 def setUp(self):
1824 self.IDXGUID = True
1826 super(GUIDBadIndexTests, self).setUp()
1829 class DnTests(TestCase):
1831 def setUp(self):
1832 super(DnTests, self).setUp()
1833 self.ldb = ldb.Ldb()
1835 def tearDown(self):
1836 super(DnTests, self).tearDown()
1837 del(self.ldb)
1839 def test_set_dn_invalid(self):
1840 x = ldb.Message()
1841 def assign():
1842 x.dn = "astring"
1843 self.assertRaises(TypeError, assign)
1845 def test_eq(self):
1846 x = ldb.Dn(self.ldb, "dc=foo11,bar=bloe")
1847 y = ldb.Dn(self.ldb, "dc=foo11,bar=bloe")
1848 self.assertEqual(x, y)
1849 y = ldb.Dn(self.ldb, "dc=foo11,bar=blie")
1850 self.assertNotEqual(x, y)
1852 def test_str(self):
1853 x = ldb.Dn(self.ldb, "dc=foo12,bar=bloe")
1854 self.assertEqual(x.__str__(), "dc=foo12,bar=bloe")
1856 def test_repr(self):
1857 x = ldb.Dn(self.ldb, "dc=foo13,bla=blie")
1858 self.assertEqual(x.__repr__(), "Dn('dc=foo13,bla=blie')")
1860 def test_get_casefold(self):
1861 x = ldb.Dn(self.ldb, "dc=foo14,bar=bloe")
1862 self.assertEqual(x.get_casefold(), "DC=FOO14,BAR=bloe")
1864 def test_validate(self):
1865 x = ldb.Dn(self.ldb, "dc=foo15,bar=bloe")
1866 self.assertTrue(x.validate())
1868 def test_parent(self):
1869 x = ldb.Dn(self.ldb, "dc=foo16,bar=bloe")
1870 self.assertEqual("bar=bloe", x.parent().__str__())
1872 def test_parent_nonexistent(self):
1873 x = ldb.Dn(self.ldb, "@BLA")
1874 self.assertEqual(None, x.parent())
1876 def test_is_valid(self):
1877 x = ldb.Dn(self.ldb, "dc=foo18,dc=bloe")
1878 self.assertTrue(x.is_valid())
1879 x = ldb.Dn(self.ldb, "")
1880 self.assertTrue(x.is_valid())
1882 def test_is_special(self):
1883 x = ldb.Dn(self.ldb, "dc=foo19,bar=bloe")
1884 self.assertFalse(x.is_special())
1885 x = ldb.Dn(self.ldb, "@FOOBAR")
1886 self.assertTrue(x.is_special())
1888 def test_check_special(self):
1889 x = ldb.Dn(self.ldb, "dc=foo20,bar=bloe")
1890 self.assertFalse(x.check_special("FOOBAR"))
1891 x = ldb.Dn(self.ldb, "@FOOBAR")
1892 self.assertTrue(x.check_special("@FOOBAR"))
1894 def test_len(self):
1895 x = ldb.Dn(self.ldb, "dc=foo21,bar=bloe")
1896 self.assertEqual(2, len(x))
1897 x = ldb.Dn(self.ldb, "dc=foo21")
1898 self.assertEqual(1, len(x))
1900 def test_add_child(self):
1901 x = ldb.Dn(self.ldb, "dc=foo22,bar=bloe")
1902 self.assertTrue(x.add_child(ldb.Dn(self.ldb, "bla=bloe")))
1903 self.assertEqual("bla=bloe,dc=foo22,bar=bloe", x.__str__())
1905 def test_add_base(self):
1906 x = ldb.Dn(self.ldb, "dc=foo23,bar=bloe")
1907 base = ldb.Dn(self.ldb, "bla=bloe")
1908 self.assertTrue(x.add_base(base))
1909 self.assertEqual("dc=foo23,bar=bloe,bla=bloe", x.__str__())
1911 def test_add_child_str(self):
1912 x = ldb.Dn(self.ldb, "dc=foo22,bar=bloe")
1913 self.assertTrue(x.add_child("bla=bloe"))
1914 self.assertEqual("bla=bloe,dc=foo22,bar=bloe", x.__str__())
1916 def test_add_base_str(self):
1917 x = ldb.Dn(self.ldb, "dc=foo23,bar=bloe")
1918 base = "bla=bloe"
1919 self.assertTrue(x.add_base(base))
1920 self.assertEqual("dc=foo23,bar=bloe,bla=bloe", x.__str__())
1922 def test_add(self):
1923 x = ldb.Dn(self.ldb, "dc=foo24")
1924 y = ldb.Dn(self.ldb, "bar=bla")
1925 self.assertEqual("dc=foo24,bar=bla", str(x + y))
1927 def test_remove_base_components(self):
1928 x = ldb.Dn(self.ldb, "dc=foo24,dc=samba,dc=org")
1929 x.remove_base_components(len(x)-1)
1930 self.assertEqual("dc=foo24", str(x))
1932 def test_parse_ldif(self):
1933 msgs = self.ldb.parse_ldif("dn: foo=bar\n")
1934 msg = next(msgs)
1935 self.assertEqual("foo=bar", str(msg[1].dn))
1936 self.assertTrue(isinstance(msg[1], ldb.Message))
1937 ldif = self.ldb.write_ldif(msg[1], ldb.CHANGETYPE_NONE)
1938 self.assertEqual("dn: foo=bar\n\n", ldif)
1940 def test_parse_ldif_more(self):
1941 msgs = self.ldb.parse_ldif("dn: foo=bar\n\n\ndn: bar=bar")
1942 msg = next(msgs)
1943 self.assertEqual("foo=bar", str(msg[1].dn))
1944 msg = next(msgs)
1945 self.assertEqual("bar=bar", str(msg[1].dn))
1947 def test_canonical_string(self):
1948 x = ldb.Dn(self.ldb, "dc=foo25,bar=bloe")
1949 self.assertEqual("/bloe/foo25", x.canonical_str())
1951 def test_canonical_ex_string(self):
1952 x = ldb.Dn(self.ldb, "dc=foo26,bar=bloe")
1953 self.assertEqual("/bloe\nfoo26", x.canonical_ex_str())
1955 def test_ldb_is_child_of(self):
1956 """Testing ldb_dn_compare_dn"""
1957 dn1 = ldb.Dn(self.ldb, "dc=base")
1958 dn2 = ldb.Dn(self.ldb, "cn=foo,dc=base")
1959 dn3 = ldb.Dn(self.ldb, "cn=bar,dc=base")
1960 dn4 = ldb.Dn(self.ldb, "cn=baz,cn=bar,dc=base")
1962 self.assertTrue(dn2.is_child_of(dn1))
1963 self.assertTrue(dn4.is_child_of(dn1))
1964 self.assertTrue(dn4.is_child_of(dn3))
1965 self.assertFalse(dn3.is_child_of(dn2))
1966 self.assertFalse(dn1.is_child_of(dn4))
1968 def test_ldb_is_child_of_str(self):
1969 """Testing ldb_dn_compare_dn"""
1970 dn1_str = "dc=base"
1971 dn2_str = "cn=foo,dc=base"
1972 dn3_str = "cn=bar,dc=base"
1973 dn4_str = "cn=baz,cn=bar,dc=base"
1975 dn1 = ldb.Dn(self.ldb, dn1_str)
1976 dn2 = ldb.Dn(self.ldb, dn2_str)
1977 dn3 = ldb.Dn(self.ldb, dn3_str)
1978 dn4 = ldb.Dn(self.ldb, dn4_str)
1980 self.assertTrue(dn2.is_child_of(dn1_str))
1981 self.assertTrue(dn4.is_child_of(dn1_str))
1982 self.assertTrue(dn4.is_child_of(dn3_str))
1983 self.assertFalse(dn3.is_child_of(dn2_str))
1984 self.assertFalse(dn1.is_child_of(dn4_str))
1986 def test_get_component_name(self):
1987 dn = ldb.Dn(self.ldb, "cn=foo,dc=base")
1988 self.assertEqual(dn.get_component_name(0), 'cn')
1989 self.assertEqual(dn.get_component_name(1), 'dc')
1990 self.assertEqual(dn.get_component_name(2), None)
1991 self.assertEqual(dn.get_component_name(-1), None)
1993 def test_get_component_value(self):
1994 dn = ldb.Dn(self.ldb, "cn=foo,dc=base")
1995 self.assertEqual(dn.get_component_value(0), 'foo')
1996 self.assertEqual(dn.get_component_value(1), 'base')
1997 self.assertEqual(dn.get_component_name(2), None)
1998 self.assertEqual(dn.get_component_name(-1), None)
2000 def test_set_component(self):
2001 dn = ldb.Dn(self.ldb, "cn=foo,dc=base")
2002 dn.set_component(0, 'cn', 'bar')
2003 self.assertEqual(str(dn), "cn=bar,dc=base")
2004 dn.set_component(1, 'o', 'asep')
2005 self.assertEqual(str(dn), "cn=bar,o=asep")
2006 self.assertRaises(TypeError, dn.set_component, 2, 'dc', 'base')
2007 self.assertEqual(str(dn), "cn=bar,o=asep")
2008 dn.set_component(1, 'o', 'a,b+c')
2009 self.assertEqual(str(dn), r"cn=bar,o=a\,b\+c")
2011 def test_set_component_bytes(self):
2012 dn = ldb.Dn(self.ldb, "cn=foo,dc=base")
2013 dn.set_component(0, 'cn', b'bar')
2014 self.assertEqual(str(dn), "cn=bar,dc=base")
2015 dn.set_component(1, 'o', b'asep')
2016 self.assertEqual(str(dn), "cn=bar,o=asep")
2018 def test_set_component_none(self):
2019 dn = ldb.Dn(self.ldb, "cn=foo,cn=bar,dc=base")
2020 self.assertRaises(TypeError, dn.set_component, 1, 'cn', None)
2022 def test_get_extended_component_null(self):
2023 dn = ldb.Dn(self.ldb, "cn=foo,cn=bar,dc=base")
2024 self.assertEqual(dn.get_extended_component("TEST"), None)
2026 def test_get_extended_component(self):
2027 self.ldb._register_test_extensions()
2028 dn = ldb.Dn(self.ldb, "<TEST=foo>;cn=bar,dc=base")
2029 self.assertEqual(dn.get_extended_component("TEST"), b"foo")
2031 def test_set_extended_component(self):
2032 self.ldb._register_test_extensions()
2033 dn = ldb.Dn(self.ldb, "dc=base")
2034 dn.set_extended_component("TEST", "foo")
2035 self.assertEqual(dn.get_extended_component("TEST"), b"foo")
2036 dn.set_extended_component("TEST", b"bar")
2037 self.assertEqual(dn.get_extended_component("TEST"), b"bar")
2039 def test_extended_str(self):
2040 self.ldb._register_test_extensions()
2041 dn = ldb.Dn(self.ldb, "<TEST=foo>;cn=bar,dc=base")
2042 self.assertEqual(dn.extended_str(), "<TEST=foo>;cn=bar,dc=base")
2044 def test_get_rdn_name(self):
2045 dn = ldb.Dn(self.ldb, "cn=foo,dc=base")
2046 self.assertEqual(dn.get_rdn_name(), 'cn')
2048 def test_get_rdn_value(self):
2049 dn = ldb.Dn(self.ldb, "cn=foo,dc=base")
2050 self.assertEqual(dn.get_rdn_value(), 'foo')
2052 def test_get_casefold(self):
2053 dn = ldb.Dn(self.ldb, "cn=foo,dc=base")
2054 self.assertEqual(dn.get_casefold(), 'CN=FOO,DC=BASE')
2056 def test_get_linearized(self):
2057 dn = ldb.Dn(self.ldb, "cn=foo,dc=base")
2058 self.assertEqual(dn.get_linearized(), 'cn=foo,dc=base')
2060 def test_is_null(self):
2061 dn = ldb.Dn(self.ldb, "cn=foo,dc=base")
2062 self.assertFalse(dn.is_null())
2064 dn = ldb.Dn(self.ldb, '')
2065 self.assertTrue(dn.is_null())
2067 class LdbMsgTests(TestCase):
2069 def setUp(self):
2070 super(LdbMsgTests, self).setUp()
2071 self.msg = ldb.Message()
2073 def test_init_dn(self):
2074 self.msg = ldb.Message(ldb.Dn(ldb.Ldb(), "dc=foo27"))
2075 self.assertEqual("dc=foo27", str(self.msg.dn))
2077 def test_iter_items(self):
2078 self.assertEqual(0, len(self.msg.items()))
2079 self.msg.dn = ldb.Dn(ldb.Ldb(), "dc=foo28")
2080 self.assertEqual(1, len(self.msg.items()))
2082 def test_repr(self):
2083 self.msg.dn = ldb.Dn(ldb.Ldb(), "dc=foo29")
2084 self.msg["dc"] = b"foo"
2085 if PY3:
2086 self.assertIn(repr(self.msg), [
2087 "Message({'dn': Dn('dc=foo29'), 'dc': MessageElement([b'foo'])})",
2088 "Message({'dc': MessageElement([b'foo']), 'dn': Dn('dc=foo29')})",
2090 self.assertIn(repr(self.msg.text), [
2091 "Message({'dn': Dn('dc=foo29'), 'dc': MessageElement([b'foo'])}).text",
2092 "Message({'dc': MessageElement([b'foo']), 'dn': Dn('dc=foo29')}).text",
2094 else:
2095 self.assertEqual(
2096 repr(self.msg),
2097 "Message({'dn': Dn('dc=foo29'), 'dc': MessageElement(['foo'])})")
2098 self.assertEqual(
2099 repr(self.msg.text),
2100 "Message({'dn': Dn('dc=foo29'), 'dc': MessageElement(['foo'])}).text")
2102 def test_len(self):
2103 self.assertEqual(0, len(self.msg))
2105 def test_notpresent(self):
2106 self.assertRaises(KeyError, lambda: self.msg["foo"])
2108 def test_del(self):
2109 del self.msg["foo"]
2111 def test_add(self):
2112 self.msg.add(ldb.MessageElement([b"456"], ldb.FLAG_MOD_ADD, "bla"))
2114 def test_add_text(self):
2115 self.msg.add(ldb.MessageElement(["456"], ldb.FLAG_MOD_ADD, "bla"))
2117 def test_elements_empty(self):
2118 self.assertEqual([], self.msg.elements())
2120 def test_elements(self):
2121 el = ldb.MessageElement([b"456"], ldb.FLAG_MOD_ADD, "bla")
2122 self.msg.add(el)
2123 self.assertEqual([el], self.msg.elements())
2124 self.assertEqual([el.text], self.msg.text.elements())
2126 def test_add_value(self):
2127 self.assertEqual(0, len(self.msg))
2128 self.msg["foo"] = [b"foo"]
2129 self.assertEqual(1, len(self.msg))
2131 def test_add_value_text(self):
2132 self.assertEqual(0, len(self.msg))
2133 self.msg["foo"] = ["foo"]
2134 self.assertEqual(1, len(self.msg))
2136 def test_add_value_multiple(self):
2137 self.assertEqual(0, len(self.msg))
2138 self.msg["foo"] = [b"foo", b"bla"]
2139 self.assertEqual(1, len(self.msg))
2140 self.assertEqual([b"foo", b"bla"], list(self.msg["foo"]))
2142 def test_add_value_multiple_text(self):
2143 self.assertEqual(0, len(self.msg))
2144 self.msg["foo"] = ["foo", "bla"]
2145 self.assertEqual(1, len(self.msg))
2146 self.assertEqual(["foo", "bla"], list(self.msg.text["foo"]))
2148 def test_set_value(self):
2149 self.msg["foo"] = [b"fool"]
2150 self.assertEqual([b"fool"], list(self.msg["foo"]))
2151 self.msg["foo"] = [b"bar"]
2152 self.assertEqual([b"bar"], list(self.msg["foo"]))
2154 def test_set_value_text(self):
2155 self.msg["foo"] = ["fool"]
2156 self.assertEqual(["fool"], list(self.msg.text["foo"]))
2157 self.msg["foo"] = ["bar"]
2158 self.assertEqual(["bar"], list(self.msg.text["foo"]))
2160 def test_keys(self):
2161 self.msg.dn = ldb.Dn(ldb.Ldb(), "@BASEINFO")
2162 self.msg["foo"] = [b"bla"]
2163 self.msg["bar"] = [b"bla"]
2164 self.assertEqual(["dn", "foo", "bar"], self.msg.keys())
2166 def test_keys_text(self):
2167 self.msg.dn = ldb.Dn(ldb.Ldb(), "@BASEINFO")
2168 self.msg["foo"] = ["bla"]
2169 self.msg["bar"] = ["bla"]
2170 self.assertEqual(["dn", "foo", "bar"], self.msg.text.keys())
2172 def test_dn(self):
2173 self.msg.dn = ldb.Dn(ldb.Ldb(), "@BASEINFO")
2174 self.assertEqual("@BASEINFO", self.msg.dn.__str__())
2176 def test_get_dn(self):
2177 self.msg.dn = ldb.Dn(ldb.Ldb(), "@BASEINFO")
2178 self.assertEqual("@BASEINFO", self.msg.get("dn").__str__())
2180 def test_dn_text(self):
2181 self.msg.text.dn = ldb.Dn(ldb.Ldb(), "@BASEINFO")
2182 self.assertEqual("@BASEINFO", str(self.msg.dn))
2183 self.assertEqual("@BASEINFO", str(self.msg.text.dn))
2185 def test_get_dn_text(self):
2186 self.msg.dn = ldb.Dn(ldb.Ldb(), "@BASEINFO")
2187 self.assertEqual("@BASEINFO", str(self.msg.get("dn")))
2188 self.assertEqual("@BASEINFO", str(self.msg.text.get("dn")))
2190 def test_get_invalid(self):
2191 self.msg.dn = ldb.Dn(ldb.Ldb(), "@BASEINFO")
2192 self.assertRaises(TypeError, self.msg.get, 42)
2194 def test_get_other(self):
2195 self.msg["foo"] = [b"bar"]
2196 self.assertEqual(b"bar", self.msg.get("foo")[0])
2197 self.assertEqual(b"bar", self.msg.get("foo", idx=0))
2198 self.assertEqual(None, self.msg.get("foo", idx=1))
2199 self.assertEqual("", self.msg.get("foo", default='', idx=1))
2201 def test_get_other_text(self):
2202 self.msg["foo"] = ["bar"]
2203 self.assertEqual(["bar"], list(self.msg.text.get("foo")))
2204 self.assertEqual("bar", self.msg.text.get("foo")[0])
2205 self.assertEqual("bar", self.msg.text.get("foo", idx=0))
2206 self.assertEqual(None, self.msg.get("foo", idx=1))
2207 self.assertEqual("", self.msg.get("foo", default='', idx=1))
2209 def test_get_default(self):
2210 self.assertEqual(None, self.msg.get("tatayoyo", idx=0))
2211 self.assertEqual("anniecordie", self.msg.get("tatayoyo", "anniecordie"))
2213 def test_get_default_text(self):
2214 self.assertEqual(None, self.msg.text.get("tatayoyo", idx=0))
2215 self.assertEqual("anniecordie", self.msg.text.get("tatayoyo", "anniecordie"))
2217 def test_get_unknown(self):
2218 self.assertEqual(None, self.msg.get("lalalala"))
2220 def test_get_unknown_text(self):
2221 self.assertEqual(None, self.msg.text.get("lalalala"))
2223 def test_msg_diff(self):
2224 l = ldb.Ldb()
2225 msgs = l.parse_ldif("dn: foo=bar\nfoo: bar\nbaz: do\n\ndn: foo=bar\nfoo: bar\nbaz: dont\n")
2226 msg1 = next(msgs)[1]
2227 msg2 = next(msgs)[1]
2228 msgdiff = l.msg_diff(msg1, msg2)
2229 self.assertEqual("foo=bar", msgdiff.get("dn").__str__())
2230 self.assertRaises(KeyError, lambda: msgdiff["foo"])
2231 self.assertEqual(1, len(msgdiff))
2233 def test_equal_empty(self):
2234 msg1 = ldb.Message()
2235 msg2 = ldb.Message()
2236 self.assertEqual(msg1, msg2)
2238 def test_equal_simplel(self):
2239 db = ldb.Ldb()
2240 msg1 = ldb.Message()
2241 msg1.dn = ldb.Dn(db, "foo=bar")
2242 msg2 = ldb.Message()
2243 msg2.dn = ldb.Dn(db, "foo=bar")
2244 self.assertEqual(msg1, msg2)
2245 msg1['foo'] = b'bar'
2246 msg2['foo'] = b'bar'
2247 self.assertEqual(msg1, msg2)
2248 msg2['foo'] = b'blie'
2249 self.assertNotEqual(msg1, msg2)
2250 msg2['foo'] = b'blie'
2252 def test_from_dict(self):
2253 rec = {"dn": "dc=fromdict",
2254 "a1": [b"a1-val1", b"a1-val1"]}
2255 l = ldb.Ldb()
2256 # check different types of input Flags
2257 for flags in [ldb.FLAG_MOD_ADD, ldb.FLAG_MOD_REPLACE, ldb.FLAG_MOD_DELETE]:
2258 m = ldb.Message.from_dict(l, rec, flags)
2259 self.assertEqual(rec["a1"], list(m["a1"]))
2260 self.assertEqual(flags, m["a1"].flags())
2261 # check input params
2262 self.assertRaises(TypeError, ldb.Message.from_dict, dict(), rec, ldb.FLAG_MOD_REPLACE)
2263 self.assertRaises(TypeError, ldb.Message.from_dict, l, list(), ldb.FLAG_MOD_REPLACE)
2264 self.assertRaises(ValueError, ldb.Message.from_dict, l, rec, 0)
2265 # Message.from_dict expects dictionary with 'dn'
2266 err_rec = {"a1": [b"a1-val1", b"a1-val1"]}
2267 self.assertRaises(TypeError, ldb.Message.from_dict, l, err_rec, ldb.FLAG_MOD_REPLACE)
2269 def test_from_dict_text(self):
2270 rec = {"dn": "dc=fromdict",
2271 "a1": ["a1-val1", "a1-val1"]}
2272 l = ldb.Ldb()
2273 # check different types of input Flags
2274 for flags in [ldb.FLAG_MOD_ADD, ldb.FLAG_MOD_REPLACE, ldb.FLAG_MOD_DELETE]:
2275 m = ldb.Message.from_dict(l, rec, flags)
2276 self.assertEqual(rec["a1"], list(m.text["a1"]))
2277 self.assertEqual(flags, m.text["a1"].flags())
2278 # check input params
2279 self.assertRaises(TypeError, ldb.Message.from_dict, dict(), rec, ldb.FLAG_MOD_REPLACE)
2280 self.assertRaises(TypeError, ldb.Message.from_dict, l, list(), ldb.FLAG_MOD_REPLACE)
2281 self.assertRaises(ValueError, ldb.Message.from_dict, l, rec, 0)
2282 # Message.from_dict expects dictionary with 'dn'
2283 err_rec = {"a1": ["a1-val1", "a1-val1"]}
2284 self.assertRaises(TypeError, ldb.Message.from_dict, l, err_rec, ldb.FLAG_MOD_REPLACE)
2286 def test_copy_add_message_element(self):
2287 m = ldb.Message()
2288 m["1"] = ldb.MessageElement([b"val 111"], ldb.FLAG_MOD_ADD, "1")
2289 m["2"] = ldb.MessageElement([b"val 222"], ldb.FLAG_MOD_ADD, "2")
2290 mto = ldb.Message()
2291 mto["1"] = m["1"]
2292 mto["2"] = m["2"]
2293 self.assertEqual(mto["1"], m["1"])
2294 self.assertEqual(mto["2"], m["2"])
2295 mto = ldb.Message()
2296 mto.add(m["1"])
2297 mto.add(m["2"])
2298 self.assertEqual(mto["1"], m["1"])
2299 self.assertEqual(mto["2"], m["2"])
2301 def test_copy_add_message_element_text(self):
2302 m = ldb.Message()
2303 m["1"] = ldb.MessageElement(["val 111"], ldb.FLAG_MOD_ADD, "1")
2304 m["2"] = ldb.MessageElement(["val 222"], ldb.FLAG_MOD_ADD, "2")
2305 mto = ldb.Message()
2306 mto["1"] = m["1"]
2307 mto["2"] = m["2"]
2308 self.assertEqual(mto["1"], m.text["1"])
2309 self.assertEqual(mto["2"], m.text["2"])
2310 mto = ldb.Message()
2311 mto.add(m["1"])
2312 mto.add(m["2"])
2313 self.assertEqual(mto.text["1"], m.text["1"])
2314 self.assertEqual(mto.text["2"], m.text["2"])
2315 self.assertEqual(mto["1"], m["1"])
2316 self.assertEqual(mto["2"], m["2"])
2319 class MessageElementTests(TestCase):
2321 def test_cmp_element(self):
2322 x = ldb.MessageElement([b"foo"])
2323 y = ldb.MessageElement([b"foo"])
2324 z = ldb.MessageElement([b"bzr"])
2325 self.assertEqual(x, y)
2326 self.assertNotEqual(x, z)
2328 def test_cmp_element_text(self):
2329 x = ldb.MessageElement([b"foo"])
2330 y = ldb.MessageElement(["foo"])
2331 self.assertEqual(x, y)
2333 def test_create_iterable(self):
2334 x = ldb.MessageElement([b"foo"])
2335 self.assertEqual([b"foo"], list(x))
2336 self.assertEqual(["foo"], list(x.text))
2338 def test_repr(self):
2339 x = ldb.MessageElement([b"foo"])
2340 if PY3:
2341 self.assertEqual("MessageElement([b'foo'])", repr(x))
2342 self.assertEqual("MessageElement([b'foo']).text", repr(x.text))
2343 else:
2344 self.assertEqual("MessageElement(['foo'])", repr(x))
2345 self.assertEqual("MessageElement(['foo']).text", repr(x.text))
2346 x = ldb.MessageElement([b"foo", b"bla"])
2347 self.assertEqual(2, len(x))
2348 if PY3:
2349 self.assertEqual("MessageElement([b'foo',b'bla'])", repr(x))
2350 self.assertEqual("MessageElement([b'foo',b'bla']).text", repr(x.text))
2351 else:
2352 self.assertEqual("MessageElement(['foo','bla'])", repr(x))
2353 self.assertEqual("MessageElement(['foo','bla']).text", repr(x.text))
2355 def test_get_item(self):
2356 x = ldb.MessageElement([b"foo", b"bar"])
2357 self.assertEqual(b"foo", x[0])
2358 self.assertEqual(b"bar", x[1])
2359 self.assertEqual(b"bar", x[-1])
2360 self.assertRaises(IndexError, lambda: x[45])
2362 def test_get_item_text(self):
2363 x = ldb.MessageElement(["foo", "bar"])
2364 self.assertEqual("foo", x.text[0])
2365 self.assertEqual("bar", x.text[1])
2366 self.assertEqual("bar", x.text[-1])
2367 self.assertRaises(IndexError, lambda: x[45])
2369 def test_len(self):
2370 x = ldb.MessageElement([b"foo", b"bar"])
2371 self.assertEqual(2, len(x))
2373 def test_eq(self):
2374 x = ldb.MessageElement([b"foo", b"bar"])
2375 y = ldb.MessageElement([b"foo", b"bar"])
2376 self.assertEqual(y, x)
2377 x = ldb.MessageElement([b"foo"])
2378 self.assertNotEqual(y, x)
2379 y = ldb.MessageElement([b"foo"])
2380 self.assertEqual(y, x)
2382 def test_extended(self):
2383 el = ldb.MessageElement([b"456"], ldb.FLAG_MOD_ADD, "bla")
2384 if PY3:
2385 self.assertEqual("MessageElement([b'456'])", repr(el))
2386 self.assertEqual("MessageElement([b'456']).text", repr(el.text))
2387 else:
2388 self.assertEqual("MessageElement(['456'])", repr(el))
2389 self.assertEqual("MessageElement(['456']).text", repr(el.text))
2391 def test_bad_text(self):
2392 el = ldb.MessageElement(b'\xba\xdd')
2393 self.assertRaises(UnicodeDecodeError, el.text.__getitem__, 0)
2396 class ModuleTests(TestCase):
2398 def setUp(self):
2399 super(ModuleTests, self).setUp()
2400 self.testdir = tempdir()
2401 self.filename = os.path.join(self.testdir, "test.ldb")
2402 self.ldb = ldb.Ldb(self.filename)
2404 def tearDown(self):
2405 shutil.rmtree(self.testdir)
2406 super(ModuleTests, self).setUp()
2408 def test_register_module(self):
2409 class ExampleModule:
2410 name = "example"
2411 ldb.register_module(ExampleModule)
2413 def test_use_module(self):
2414 ops = []
2415 class ExampleModule:
2416 name = "bla"
2418 def __init__(self, ldb, next):
2419 ops.append("init")
2420 self.next = next
2422 def search(self, *args, **kwargs):
2423 return self.next.search(*args, **kwargs)
2425 def request(self, *args, **kwargs):
2426 pass
2428 ldb.register_module(ExampleModule)
2429 l = ldb.Ldb(self.filename)
2430 l.add({"dn": "@MODULES", "@LIST": "bla"})
2431 self.assertEqual([], ops)
2432 l = ldb.Ldb(self.filename)
2433 self.assertEqual(["init"], ops)
2435 class LdbResultTests(LdbBaseTest):
2437 def setUp(self):
2438 super(LdbResultTests, self).setUp()
2439 self.testdir = tempdir()
2440 self.filename = os.path.join(self.testdir, "test.ldb")
2441 self.l = ldb.Ldb(self.url(), flags=self.flags())
2442 self.l.add({"dn": "DC=SAMBA,DC=ORG", "name": b"samba.org"})
2443 self.l.add({"dn": "OU=ADMIN,DC=SAMBA,DC=ORG", "name": b"Admins"})
2444 self.l.add({"dn": "OU=USERS,DC=SAMBA,DC=ORG", "name": b"Users"})
2445 self.l.add({"dn": "OU=OU1,DC=SAMBA,DC=ORG", "name": b"OU #1"})
2446 self.l.add({"dn": "OU=OU2,DC=SAMBA,DC=ORG", "name": b"OU #2"})
2447 self.l.add({"dn": "OU=OU3,DC=SAMBA,DC=ORG", "name": b"OU #3"})
2448 self.l.add({"dn": "OU=OU4,DC=SAMBA,DC=ORG", "name": b"OU #4"})
2449 self.l.add({"dn": "OU=OU5,DC=SAMBA,DC=ORG", "name": b"OU #5"})
2450 self.l.add({"dn": "OU=OU6,DC=SAMBA,DC=ORG", "name": b"OU #6"})
2451 self.l.add({"dn": "OU=OU7,DC=SAMBA,DC=ORG", "name": b"OU #7"})
2452 self.l.add({"dn": "OU=OU8,DC=SAMBA,DC=ORG", "name": b"OU #8"})
2453 self.l.add({"dn": "OU=OU9,DC=SAMBA,DC=ORG", "name": b"OU #9"})
2454 self.l.add({"dn": "OU=OU10,DC=SAMBA,DC=ORG", "name": b"OU #10"})
2456 def tearDown(self):
2457 shutil.rmtree(self.testdir)
2458 super(LdbResultTests, self).tearDown()
2459 # Ensure the LDB is closed now, so we close the FD
2460 del(self.l)
2462 def test_return_type(self):
2463 res = self.l.search()
2464 self.assertEqual(str(res), "<ldb result>")
2466 def test_get_msgs(self):
2467 res = self.l.search()
2468 list = res.msgs
2470 def test_get_controls(self):
2471 res = self.l.search()
2472 list = res.controls
2474 def test_get_referals(self):
2475 res = self.l.search()
2476 list = res.referals
2478 def test_iter_msgs(self):
2479 found = False
2480 for l in self.l.search().msgs:
2481 if str(l.dn) == "OU=OU10,DC=SAMBA,DC=ORG":
2482 found = True
2483 self.assertTrue(found)
2485 def test_iter_msgs_count(self):
2486 self.assertTrue(self.l.search().count > 0)
2487 # 13 objects has been added to the DC=SAMBA, DC=ORG
2488 self.assertEqual(self.l.search(base="DC=SAMBA,DC=ORG").count, 13)
2490 def test_iter_controls(self):
2491 res = self.l.search().controls
2492 it = iter(res)
2494 def test_create_control(self):
2495 self.assertRaises(ValueError, ldb.Control, self.l, "tatayoyo:0")
2496 c = ldb.Control(self.l, "relax:1")
2497 self.assertEqual(c.critical, True)
2498 self.assertEqual(c.oid, "1.3.6.1.4.1.4203.666.5.12")
2500 def test_iter_refs(self):
2501 res = self.l.search().referals
2502 it = iter(res)
2504 def test_search_sequence_msgs(self):
2505 found = False
2506 res = self.l.search().msgs
2508 for i in range(0, len(res)):
2509 l = res[i]
2510 if str(l.dn) == "OU=OU10,DC=SAMBA,DC=ORG":
2511 found = True
2512 self.assertTrue(found)
2514 def test_search_as_iter(self):
2515 found = False
2516 res = self.l.search()
2518 for l in res:
2519 if str(l.dn) == "OU=OU10,DC=SAMBA,DC=ORG":
2520 found = True
2521 self.assertTrue(found)
2523 def test_search_iter(self):
2524 found = False
2525 res = self.l.search_iterator()
2527 for l in res:
2528 if str(l.dn) == "OU=OU10,DC=SAMBA,DC=ORG":
2529 found = True
2530 self.assertTrue(found)
2533 # Show that search results can't see into a transaction
2534 def test_search_against_trans(self):
2535 found11 = False
2537 (r1, w1) = os.pipe()
2539 (r2, w2) = os.pipe()
2541 # For the first element, fork a child that will
2542 # write to the DB
2543 pid = os.fork()
2544 if pid == 0:
2545 # In the child, re-open
2546 del(self.l)
2547 gc.collect()
2549 child_ldb = ldb.Ldb(self.url(), flags=self.flags())
2550 # start a transaction
2551 child_ldb.transaction_start()
2553 # write to it
2554 child_ldb.add({"dn": "OU=OU11,DC=SAMBA,DC=ORG",
2555 "name": b"samba.org"})
2557 os.write(w1, b"added")
2559 # Now wait for the search to be done
2560 os.read(r2, 6)
2562 # and commit
2563 try:
2564 child_ldb.transaction_commit()
2565 except LdbError as err:
2566 # We print this here to see what went wrong in the child
2567 print(err)
2568 os._exit(1)
2570 os.write(w1, b"transaction")
2571 os._exit(0)
2573 self.assertEqual(os.read(r1, 5), b"added")
2575 # This should not turn up until the transaction is concluded
2576 res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
2577 scope=ldb.SCOPE_BASE)
2578 self.assertEqual(len(res11), 0)
2580 os.write(w2, b"search")
2582 # Now wait for the transaction to be done. This should
2583 # deadlock, but the search doesn't hold a read lock for the
2584 # iterator lifetime currently.
2585 self.assertEqual(os.read(r1, 11), b"transaction")
2587 # This should now turn up, as the transaction is over
2588 res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
2589 scope=ldb.SCOPE_BASE)
2590 self.assertEqual(len(res11), 1)
2592 self.assertFalse(found11)
2594 (got_pid, status) = os.waitpid(pid, 0)
2595 self.assertEqual(got_pid, pid)
2598 def test_search_iter_against_trans(self):
2599 found = False
2600 found11 = False
2602 # We need to hold this iterator open to hold the all-record
2603 # lock
2604 res = self.l.search_iterator()
2606 (r1, w1) = os.pipe()
2608 (r2, w2) = os.pipe()
2610 # For the first element, with the sequence open (which
2611 # means with ldb locks held), fork a child that will
2612 # write to the DB
2613 pid = os.fork()
2614 if pid == 0:
2615 # In the child, re-open
2616 del(res)
2617 del(self.l)
2618 gc.collect()
2620 child_ldb = ldb.Ldb(self.url(), flags=self.flags())
2621 # start a transaction
2622 child_ldb.transaction_start()
2624 # write to it
2625 child_ldb.add({"dn": "OU=OU11,DC=SAMBA,DC=ORG",
2626 "name": b"samba.org"})
2628 os.write(w1, b"added")
2630 # Now wait for the search to be done
2631 os.read(r2, 6)
2633 # and commit
2634 try:
2635 child_ldb.transaction_commit()
2636 except LdbError as err:
2637 # We print this here to see what went wrong in the child
2638 print(err)
2639 os._exit(1)
2641 os.write(w1, b"transaction")
2642 os._exit(0)
2644 self.assertEqual(os.read(r1, 5), b"added")
2646 # This should not turn up until the transaction is concluded
2647 res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
2648 scope=ldb.SCOPE_BASE)
2649 self.assertEqual(len(res11), 0)
2651 os.write(w2, b"search")
2653 # allow the transaction to start
2654 time.sleep(1)
2656 # This should not turn up until the search finishes and
2657 # removed the read lock, but for ldb_tdb that happened as soon
2658 # as we called the first res.next()
2659 res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
2660 scope=ldb.SCOPE_BASE)
2661 self.assertEqual(len(res11), 0)
2663 # These results are all collected at the first next(res) call
2664 for l in res:
2665 if str(l.dn) == "OU=OU10,DC=SAMBA,DC=ORG":
2666 found = True
2667 if str(l.dn) == "OU=OU11,DC=SAMBA,DC=ORG":
2668 found11 = True
2670 # Now wait for the transaction to be done.
2671 self.assertEqual(os.read(r1, 11), b"transaction")
2673 # This should now turn up, as the transaction is over and all
2674 # read locks are gone
2675 res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
2676 scope=ldb.SCOPE_BASE)
2677 self.assertEqual(len(res11), 1)
2679 self.assertTrue(found)
2680 self.assertFalse(found11)
2682 (got_pid, status) = os.waitpid(pid, 0)
2683 self.assertEqual(got_pid, pid)
2686 class BadTypeTests(TestCase):
2687 def test_control(self):
2688 l = ldb.Ldb()
2689 self.assertRaises(TypeError, ldb.Control, '<bad type>', 'relax:1')
2690 self.assertRaises(TypeError, ldb.Control, ldb, 1234)
2692 def test_modify(self):
2693 l = ldb.Ldb()
2694 dn = ldb.Dn(l, 'a=b')
2695 m = ldb.Message(dn)
2696 self.assertRaises(TypeError, l.modify, '<bad type>')
2697 self.assertRaises(TypeError, l.modify, m, '<bad type>')
2699 def test_add(self):
2700 l = ldb.Ldb()
2701 dn = ldb.Dn(l, 'a=b')
2702 m = ldb.Message(dn)
2703 self.assertRaises(TypeError, l.add, '<bad type>')
2704 self.assertRaises(TypeError, l.add, m, '<bad type>')
2706 def test_delete(self):
2707 l = ldb.Ldb()
2708 dn = ldb.Dn(l, 'a=b')
2709 self.assertRaises(TypeError, l.add, '<bad type>')
2710 self.assertRaises(TypeError, l.add, dn, '<bad type>')
2712 def test_rename(self):
2713 l = ldb.Ldb()
2714 dn = ldb.Dn(l, 'a=b')
2715 self.assertRaises(TypeError, l.add, '<bad type>', dn)
2716 self.assertRaises(TypeError, l.add, dn, '<bad type>')
2717 self.assertRaises(TypeError, l.add, dn, dn, '<bad type>')
2719 def test_search(self):
2720 l = ldb.Ldb()
2721 self.assertRaises(TypeError, l.search, base=1234)
2722 self.assertRaises(TypeError, l.search, scope='<bad type>')
2723 self.assertRaises(TypeError, l.search, expression=1234)
2724 self.assertRaises(TypeError, l.search, attrs='<bad type>')
2725 self.assertRaises(TypeError, l.search, controls='<bad type>')
2728 class VersionTests(TestCase):
2730 def test_version(self):
2731 self.assertTrue(isinstance(ldb.__version__, str))
2734 if __name__ == '__main__':
2735 import unittest
2736 unittest.TestProgram()