ldb:tests: add a test for dotted i uppercase
[Samba.git] / lib / ldb / tests / python / api.py
blobbdd69f0773418aa4b04e9f150ef04fd01da1f692
1 #!/usr/bin/env python3
2 # Simple tests for the ldb python bindings.
3 # Copyright (C) 2007 Jelmer Vernooij <jelmer@samba.org>
5 import os
6 from unittest import TestCase
7 import sys
8 sys.path.insert(0, "bin/python")
9 import gc
10 import time
11 import ldb
12 import shutil
13 import errno
16 TDB_PREFIX = "tdb://"
17 MDB_PREFIX = "mdb://"
19 MDB_INDEX_OBJ = {
20 "dn": "@INDEXLIST",
21 "@IDXONE": [b"1"],
22 "@IDXGUID": [b"objectUUID"],
23 "@IDX_DN_GUID": [b"GUID"]
27 def tempdir():
28 import tempfile
29 try:
30 dir_prefix = os.path.join(os.environ["SELFTEST_PREFIX"], "tmp")
31 except KeyError:
32 dir_prefix = None
33 return tempfile.mkdtemp(dir=dir_prefix)
36 class NoContextTests(TestCase):
38 def test_valid_attr_name(self):
39 self.assertTrue(ldb.valid_attr_name("foo"))
40 self.assertFalse(ldb.valid_attr_name("24foo"))
42 def test_timestring(self):
43 self.assertEqual("19700101000000.0Z", ldb.timestring(0))
44 self.assertEqual("20071119191012.0Z", ldb.timestring(1195499412))
46 self.assertEqual("00000101000000.0Z", ldb.timestring(-62167219200))
47 self.assertEqual("99991231235959.0Z", ldb.timestring(253402300799))
49 # should result with OSError EOVERFLOW from gmtime()
50 with self.assertRaises(OSError) as err:
51 ldb.timestring(-62167219201)
52 self.assertEqual(err.exception.errno, errno.EOVERFLOW)
53 with self.assertRaises(OSError) as err:
54 ldb.timestring(253402300800)
55 self.assertEqual(err.exception.errno, errno.EOVERFLOW)
56 with self.assertRaises(OSError) as err:
57 ldb.timestring(0x7fffffffffffffff)
58 self.assertEqual(err.exception.errno, errno.EOVERFLOW)
60 def test_string_to_time(self):
61 self.assertEqual(0, ldb.string_to_time("19700101000000.0Z"))
62 self.assertEqual(-1, ldb.string_to_time("19691231235959.0Z"))
63 self.assertEqual(1195499412, ldb.string_to_time("20071119191012.0Z"))
65 self.assertEqual(-62167219200, ldb.string_to_time("00000101000000.0Z"))
66 self.assertEqual(253402300799, ldb.string_to_time("99991231235959.0Z"))
68 def test_binary_encode(self):
69 encoded = ldb.binary_encode(b'test\\x')
70 decoded = ldb.binary_decode(encoded)
71 self.assertEqual(decoded, b'test\\x')
73 encoded2 = ldb.binary_encode('test\\x')
74 self.assertEqual(encoded2, encoded)
77 class LdbBaseTest(TestCase):
78 def setUp(self):
79 super(LdbBaseTest, self).setUp()
80 try:
81 if self.prefix is None:
82 self.prefix = TDB_PREFIX
83 except AttributeError:
84 self.prefix = TDB_PREFIX
86 def tearDown(self):
87 super(LdbBaseTest, self).tearDown()
89 def url(self):
90 return self.prefix + self.filename
92 def flags(self):
93 if self.prefix == MDB_PREFIX:
94 return ldb.FLG_NOSYNC
95 else:
96 return 0
99 class SimpleLdb(LdbBaseTest):
101 def setUp(self):
102 super(SimpleLdb, self).setUp()
103 self.testdir = tempdir()
104 self.filename = os.path.join(self.testdir, "test.ldb")
105 self.ldb = ldb.Ldb(self.url(), flags=self.flags())
106 try:
107 self.ldb.add(self.index)
108 except AttributeError:
109 pass
111 def tearDown(self):
112 shutil.rmtree(self.testdir)
113 super(SimpleLdb, self).tearDown()
114 # Ensure the LDB is closed now, so we close the FD
115 del(self.ldb)
117 def test_connect(self):
118 ldb.Ldb(self.url(), flags=self.flags())
120 def test_connect_none(self):
121 ldb.Ldb()
123 def test_connect_later(self):
124 x = ldb.Ldb()
125 x.connect(self.url(), flags=self.flags())
127 def test_connect_twice(self):
128 url = self.url()
129 x = ldb.Ldb(url)
130 with self.assertRaises(ldb.LdbError):
131 x.connect(url, flags=self.flags())
133 def test_connect_twice_later(self):
134 url = self.url()
135 flags = self.flags()
136 x = ldb.Ldb()
137 x.connect(url, flags)
138 with self.assertRaises(ldb.LdbError):
139 x.connect(url, flags)
141 def test_connect_and_disconnect(self):
142 url = self.url()
143 flags = self.flags()
144 x = ldb.Ldb()
145 x.connect(url, flags)
146 x.disconnect()
147 x.connect(url, flags)
148 x.disconnect()
150 def test_repr(self):
151 x = ldb.Ldb()
152 self.assertTrue(repr(x).startswith("<ldb connection"))
154 def test_set_create_perms(self):
155 x = ldb.Ldb()
156 x.set_create_perms(0o600)
158 def test_search(self):
159 l = ldb.Ldb(self.url(), flags=self.flags())
160 self.assertEqual(len(l.search()), 0)
162 def test_search_controls(self):
163 l = ldb.Ldb(self.url(), flags=self.flags())
164 self.assertEqual(len(l.search(controls=["paged_results:0:5"])), 0)
166 def test_utf8_ldb_Dn(self):
167 l = ldb.Ldb(self.url(), flags=self.flags())
168 dn = ldb.Dn(l, (b'a=' + b'\xc4\x85\xc4\x87\xc4\x99\xc5\x82\xc5\x84\xc3\xb3\xc5\x9b\xc5\xba\xc5\xbc').decode('utf8'))
170 def test_utf8_encoded_ldb_Dn(self):
171 l = ldb.Ldb(self.url(), flags=self.flags())
172 dn_encoded_utf8 = b'a=' + b'\xc4\x85\xc4\x87\xc4\x99\xc5\x82\xc5\x84\xc3\xb3\xc5\x9b\xc5\xba\xc5\xbc'
173 try:
174 dn = ldb.Dn(l, dn_encoded_utf8)
175 except UnicodeDecodeError as e:
176 raise
177 except TypeError as te:
178 p3errors = ["argument 2 must be str, not bytes",
179 "Can't convert 'bytes' object to str implicitly"]
180 self.assertIn(str(te), p3errors)
182 def test_search_attrs(self):
183 l = ldb.Ldb(self.url(), flags=self.flags())
184 self.assertEqual(len(l.search(ldb.Dn(l, ""), ldb.SCOPE_SUBTREE, "(dc=*)", ["dc"])), 0)
186 def test_search_string_dn(self):
187 l = ldb.Ldb(self.url(), flags=self.flags())
188 self.assertEqual(len(l.search("", ldb.SCOPE_SUBTREE, "(dc=*)", ["dc"])), 0)
190 def test_search_attr_string(self):
191 l = ldb.Ldb(self.url(), flags=self.flags())
192 self.assertRaises(TypeError, l.search, attrs="dc")
193 self.assertRaises(TypeError, l.search, attrs=b"dc")
195 def test_opaque(self):
196 l = ldb.Ldb(self.url(), flags=self.flags())
197 l.set_opaque("my_opaque", True)
198 self.assertTrue(l.get_opaque("my_opaque") is not None)
199 self.assertEqual(None, l.get_opaque("unknown"))
201 def test_opaque_bool(self):
202 """Test that we can set boolean opaque values."""
204 db = ldb.Ldb(self.url(), flags=self.flags())
205 name = "my_opaque"
207 db.set_opaque(name, False)
208 self.assertEqual(False, db.get_opaque(name))
210 db.set_opaque(name, True)
211 self.assertEqual(True, db.get_opaque(name))
213 def test_opaque_int(self):
214 """Test that we can set (positive) integer opaque values."""
216 db = ldb.Ldb(self.url(), flags=self.flags())
217 name = "my_opaque"
219 db.set_opaque(name, 0)
220 self.assertEqual(0, db.get_opaque(name))
222 db.set_opaque(name, 12345678)
223 self.assertEqual(12345678, db.get_opaque(name))
225 # Negative values can’t be set.
226 self.assertRaises(OverflowError, db.set_opaque, name, -99999)
228 def test_opaque_string(self):
229 """Test that we can set string opaque values."""
231 db = ldb.Ldb(self.url(), flags=self.flags())
232 name = "my_opaque"
234 db.set_opaque(name, "")
235 self.assertEqual("", db.get_opaque(name))
237 db.set_opaque(name, "foo bar")
238 self.assertEqual("foo bar", db.get_opaque(name))
240 def test_opaque_none(self):
241 """Test that we can set an opaque to None to effectively unset it."""
243 db = ldb.Ldb(self.url(), flags=self.flags())
244 name = "my_opaque"
246 # An opaque that has not been set is the same as None.
247 self.assertIsNone(db.get_opaque(name))
249 # Give the opaque a value.
250 db.set_opaque(name, 3)
251 self.assertEqual(3, db.get_opaque(name))
253 # Test that we can set the opaque to None to unset it.
254 db.set_opaque(name, None)
255 self.assertIsNone(db.get_opaque(name))
257 def test_opaque_unsupported(self):
258 """Test that trying to set unsupported values raises an error."""
260 db = ldb.Ldb(self.url(), flags=self.flags())
261 name = "my_opaque"
263 self.assertRaises(ValueError, db.set_opaque, name, [])
264 self.assertRaises(ValueError, db.set_opaque, name, ())
265 self.assertRaises(ValueError, db.set_opaque, name, 3.14)
266 self.assertRaises(ValueError, db.set_opaque, name, 3+2j)
267 self.assertRaises(ValueError, db.set_opaque, name, b'foo')
269 def test_search_scope_base_empty_db(self):
270 l = ldb.Ldb(self.url(), flags=self.flags())
271 self.assertEqual(len(l.search(ldb.Dn(l, "dc=foo1"),
272 ldb.SCOPE_BASE)), 0)
274 def test_search_scope_onelevel_empty_db(self):
275 l = ldb.Ldb(self.url(), flags=self.flags())
276 self.assertEqual(len(l.search(ldb.Dn(l, "dc=foo1"),
277 ldb.SCOPE_ONELEVEL)), 0)
279 def test_delete(self):
280 l = ldb.Ldb(self.url(), flags=self.flags())
281 self.assertRaises(ldb.LdbError, lambda: l.delete(ldb.Dn(l, "dc=foo2")))
283 def test_delete_w_unhandled_ctrl(self):
284 l = ldb.Ldb(self.url(), flags=self.flags())
285 m = ldb.Message()
286 m.dn = ldb.Dn(l, "dc=foo1")
287 m["b"] = [b"a"]
288 m["objectUUID"] = b"0123456789abcdef"
289 l.add(m)
290 self.assertRaises(ldb.LdbError, lambda: l.delete(m.dn, ["search_options:1:2"]))
291 l.delete(m.dn)
293 def test_contains(self):
294 name = self.url()
295 l = ldb.Ldb(name, flags=self.flags())
296 self.assertFalse(ldb.Dn(l, "dc=foo3") in l)
297 l = ldb.Ldb(name, flags=self.flags())
298 m = ldb.Message()
299 m.dn = ldb.Dn(l, "dc=foo3")
300 m["b"] = ["a"]
301 m["objectUUID"] = b"0123456789abcdef"
302 l.add(m)
303 try:
304 self.assertTrue(ldb.Dn(l, "dc=foo3") in l)
305 self.assertFalse(ldb.Dn(l, "dc=foo4") in l)
306 finally:
307 l.delete(m.dn)
309 def test_get_config_basedn(self):
310 l = ldb.Ldb(self.url(), flags=self.flags())
311 self.assertEqual(None, l.get_config_basedn())
313 def test_get_root_basedn(self):
314 l = ldb.Ldb(self.url(), flags=self.flags())
315 self.assertEqual(None, l.get_root_basedn())
317 def test_get_schema_basedn(self):
318 l = ldb.Ldb(self.url(), flags=self.flags())
319 self.assertEqual(None, l.get_schema_basedn())
321 def test_get_default_basedn(self):
322 l = ldb.Ldb(self.url(), flags=self.flags())
323 self.assertEqual(None, l.get_default_basedn())
325 def test_add(self):
326 l = ldb.Ldb(self.url(), flags=self.flags())
327 m = ldb.Message()
328 m.dn = ldb.Dn(l, "dc=foo4")
329 m["bla"] = b"bla"
330 m["objectUUID"] = b"0123456789abcdef"
331 self.assertEqual(len(l.search()), 0)
332 l.add(m)
333 try:
334 self.assertEqual(len(l.search()), 1)
335 finally:
336 l.delete(ldb.Dn(l, "dc=foo4"))
338 def test_search_iterator(self):
339 l = ldb.Ldb(self.url(), flags=self.flags())
340 s = l.search_iterator()
341 s.abandon()
342 try:
343 for me in s:
344 self.fail()
345 self.fail()
346 except RuntimeError as re:
347 pass
348 try:
349 s.abandon()
350 self.fail()
351 except RuntimeError as re:
352 pass
353 try:
354 s.result()
355 self.fail()
356 except RuntimeError as re:
357 pass
359 s = l.search_iterator()
360 count = 0
361 for me in s:
362 self.assertTrue(isinstance(me, ldb.Message))
363 count += 1
364 r = s.result()
365 self.assertEqual(len(r), 0)
366 self.assertEqual(count, 0)
368 m1 = ldb.Message()
369 m1.dn = ldb.Dn(l, "dc=foo4")
370 m1["bla"] = b"bla"
371 m1["objectUUID"] = b"0123456789abcdef"
372 l.add(m1)
373 try:
374 s = l.search_iterator()
375 msgs = []
376 for me in s:
377 self.assertTrue(isinstance(me, ldb.Message))
378 count += 1
379 msgs.append(me)
380 r = s.result()
381 self.assertEqual(len(r), 0)
382 self.assertEqual(len(msgs), 1)
383 self.assertEqual(msgs[0].dn, m1.dn)
385 m2 = ldb.Message()
386 m2.dn = ldb.Dn(l, "dc=foo5")
387 m2["bla"] = b"bla"
388 m2["objectUUID"] = b"0123456789abcdee"
389 l.add(m2)
391 s = l.search_iterator()
392 msgs = []
393 for me in s:
394 self.assertTrue(isinstance(me, ldb.Message))
395 count += 1
396 msgs.append(me)
397 r = s.result()
398 self.assertEqual(len(r), 0)
399 self.assertEqual(len(msgs), 2)
400 if msgs[0].dn == m1.dn:
401 self.assertEqual(msgs[0].dn, m1.dn)
402 self.assertEqual(msgs[1].dn, m2.dn)
403 else:
404 self.assertEqual(msgs[0].dn, m2.dn)
405 self.assertEqual(msgs[1].dn, m1.dn)
407 s = l.search_iterator()
408 msgs = []
409 for me in s:
410 self.assertTrue(isinstance(me, ldb.Message))
411 count += 1
412 msgs.append(me)
413 break
414 try:
415 s.result()
416 self.fail()
417 except RuntimeError as re:
418 pass
419 for me in s:
420 self.assertTrue(isinstance(me, ldb.Message))
421 count += 1
422 msgs.append(me)
423 break
424 for me in s:
425 self.fail()
427 r = s.result()
428 self.assertEqual(len(r), 0)
429 self.assertEqual(len(msgs), 2)
430 if msgs[0].dn == m1.dn:
431 self.assertEqual(msgs[0].dn, m1.dn)
432 self.assertEqual(msgs[1].dn, m2.dn)
433 else:
434 self.assertEqual(msgs[0].dn, m2.dn)
435 self.assertEqual(msgs[1].dn, m1.dn)
436 finally:
437 l.delete(ldb.Dn(l, "dc=foo4"))
438 l.delete(ldb.Dn(l, "dc=foo5"))
440 def test_add_text(self):
441 l = ldb.Ldb(self.url(), flags=self.flags())
442 m = ldb.Message()
443 m.dn = ldb.Dn(l, "dc=foo4")
444 m["bla"] = "bla"
445 m["objectUUID"] = b"0123456789abcdef"
446 self.assertEqual(len(l.search()), 0)
447 l.add(m)
448 try:
449 self.assertEqual(len(l.search()), 1)
450 finally:
451 l.delete(ldb.Dn(l, "dc=foo4"))
453 def test_add_w_unhandled_ctrl(self):
454 l = ldb.Ldb(self.url(), flags=self.flags())
455 m = ldb.Message()
456 m.dn = ldb.Dn(l, "dc=foo4")
457 m["bla"] = b"bla"
458 self.assertEqual(len(l.search()), 0)
459 self.assertRaises(ldb.LdbError, lambda: l.add(m, ["search_options:1:2"]))
461 def test_add_dict(self):
462 l = ldb.Ldb(self.url(), flags=self.flags())
463 m = {"dn": ldb.Dn(l, "dc=foo5"),
464 "bla": b"bla",
465 "objectUUID": b"0123456789abcdef"}
466 self.assertEqual(len(l.search()), 0)
467 l.add(m)
468 try:
469 self.assertEqual(len(l.search()), 1)
470 finally:
471 l.delete(ldb.Dn(l, "dc=foo5"))
473 def test_add_dict_text(self):
474 l = ldb.Ldb(self.url(), flags=self.flags())
475 m = {"dn": ldb.Dn(l, "dc=foo5"),
476 "bla": "bla",
477 "objectUUID": b"0123456789abcdef"}
478 self.assertEqual(len(l.search()), 0)
479 l.add(m)
480 try:
481 self.assertEqual(len(l.search()), 1)
482 finally:
483 l.delete(ldb.Dn(l, "dc=foo5"))
485 def test_add_dict_string_dn(self):
486 l = ldb.Ldb(self.url(), flags=self.flags())
487 m = {"dn": "dc=foo6", "bla": b"bla",
488 "objectUUID": b"0123456789abcdef"}
489 self.assertEqual(len(l.search()), 0)
490 l.add(m)
491 try:
492 self.assertEqual(len(l.search()), 1)
493 finally:
494 l.delete(ldb.Dn(l, "dc=foo6"))
496 def test_add_dict_bytes_dn(self):
497 l = ldb.Ldb(self.url(), flags=self.flags())
498 m = {"dn": b"dc=foo6", "bla": b"bla",
499 "objectUUID": b"0123456789abcdef"}
500 self.assertEqual(len(l.search()), 0)
501 l.add(m)
502 try:
503 self.assertEqual(len(l.search()), 1)
504 finally:
505 l.delete(ldb.Dn(l, "dc=foo6"))
507 def test_rename(self):
508 l = ldb.Ldb(self.url(), flags=self.flags())
509 m = ldb.Message()
510 m.dn = ldb.Dn(l, "dc=foo7")
511 m["bla"] = b"bla"
512 m["objectUUID"] = b"0123456789abcdef"
513 self.assertEqual(len(l.search()), 0)
514 l.add(m)
515 try:
516 l.rename(ldb.Dn(l, "dc=foo7"), ldb.Dn(l, "dc=bar"))
517 self.assertEqual(len(l.search()), 1)
518 finally:
519 l.delete(ldb.Dn(l, "dc=bar"))
521 def test_rename_string_dns(self):
522 l = ldb.Ldb(self.url(), flags=self.flags())
523 m = ldb.Message()
524 m.dn = ldb.Dn(l, "dc=foo8")
525 m["bla"] = b"bla"
526 m["objectUUID"] = b"0123456789abcdef"
527 self.assertEqual(len(l.search()), 0)
528 l.add(m)
529 self.assertEqual(len(l.search()), 1)
530 try:
531 l.rename("dc=foo8", "dc=bar")
532 self.assertEqual(len(l.search()), 1)
533 finally:
534 l.delete(ldb.Dn(l, "dc=bar"))
536 def test_rename_bad_string_dns(self):
537 l = ldb.Ldb(self.url(), flags=self.flags())
538 m = ldb.Message()
539 m.dn = ldb.Dn(l, "dc=foo8")
540 m["bla"] = b"bla"
541 m["objectUUID"] = b"0123456789abcdef"
542 self.assertEqual(len(l.search()), 0)
543 l.add(m)
544 self.assertEqual(len(l.search()), 1)
545 self.assertRaises(ldb.LdbError,lambda: l.rename("dcXfoo8", "dc=bar"))
546 self.assertRaises(ldb.LdbError,lambda: l.rename("dc=foo8", "dcXbar"))
547 l.delete(ldb.Dn(l, "dc=foo8"))
549 def test_empty_dn(self):
550 l = ldb.Ldb(self.url(), flags=self.flags())
551 self.assertEqual(0, len(l.search()))
552 m = ldb.Message()
553 m.dn = ldb.Dn(l, "dc=empty")
554 m["objectUUID"] = b"0123456789abcdef"
555 l.add(m)
556 rm = l.search()
557 self.assertEqual(1, len(rm))
558 self.assertEqual(set(["dn", "distinguishedName", "objectUUID"]),
559 set(rm[0].keys()))
561 rm = l.search(m.dn)
562 self.assertEqual(1, len(rm))
563 self.assertEqual(set(["dn", "distinguishedName", "objectUUID"]),
564 set(rm[0].keys()))
565 rm = l.search(m.dn, attrs=["blah"])
566 self.assertEqual(1, len(rm))
567 self.assertEqual(0, len(rm[0]))
569 def test_modify_delete(self):
570 l = ldb.Ldb(self.url(), flags=self.flags())
571 m = ldb.Message()
572 m.dn = ldb.Dn(l, "dc=modifydelete")
573 m["bla"] = [b"1234"]
574 m["objectUUID"] = b"0123456789abcdef"
575 l.add(m)
576 rm = l.search(m.dn)[0]
577 self.assertEqual([b"1234"], list(rm["bla"]))
578 try:
579 m = ldb.Message()
580 m.dn = ldb.Dn(l, "dc=modifydelete")
581 m["bla"] = ldb.MessageElement([], ldb.FLAG_MOD_DELETE, "bla")
582 self.assertEqual(ldb.FLAG_MOD_DELETE, m["bla"].flags())
583 l.modify(m)
584 rm = l.search(m.dn)
585 self.assertEqual(1, len(rm))
586 self.assertEqual(set(["dn", "distinguishedName", "objectUUID"]),
587 set(rm[0].keys()))
588 rm = l.search(m.dn, attrs=["bla"])
589 self.assertEqual(1, len(rm))
590 self.assertEqual(0, len(rm[0]))
591 finally:
592 l.delete(ldb.Dn(l, "dc=modifydelete"))
594 def test_modify_delete_text(self):
595 l = ldb.Ldb(self.url(), flags=self.flags())
596 m = ldb.Message()
597 m.dn = ldb.Dn(l, "dc=modifydelete")
598 m.text["bla"] = ["1234"]
599 m["objectUUID"] = b"0123456789abcdef"
600 l.add(m)
601 rm = l.search(m.dn)[0]
602 self.assertEqual(["1234"], list(rm.text["bla"]))
603 try:
604 m = ldb.Message()
605 m.dn = ldb.Dn(l, "dc=modifydelete")
606 m["bla"] = ldb.MessageElement([], ldb.FLAG_MOD_DELETE, "bla")
607 self.assertEqual(ldb.FLAG_MOD_DELETE, m["bla"].flags())
608 l.modify(m)
609 rm = l.search(m.dn)
610 self.assertEqual(1, len(rm))
611 self.assertEqual(set(["dn", "distinguishedName", "objectUUID"]),
612 set(rm[0].keys()))
613 rm = l.search(m.dn, attrs=["bla"])
614 self.assertEqual(1, len(rm))
615 self.assertEqual(0, len(rm[0]))
616 finally:
617 l.delete(ldb.Dn(l, "dc=modifydelete"))
619 def test_modify_add(self):
620 l = ldb.Ldb(self.url(), flags=self.flags())
621 m = ldb.Message()
622 m.dn = ldb.Dn(l, "dc=add")
623 m["bla"] = [b"1234"]
624 m["objectUUID"] = b"0123456789abcdef"
625 l.add(m)
626 try:
627 m = ldb.Message()
628 m.dn = ldb.Dn(l, "dc=add")
629 m["bla"] = ldb.MessageElement([b"456"], ldb.FLAG_MOD_ADD, "bla")
630 self.assertEqual(ldb.FLAG_MOD_ADD, m["bla"].flags())
631 l.modify(m)
632 rm = l.search(m.dn)[0]
633 self.assertEqual(3, len(rm))
634 self.assertEqual([b"1234", b"456"], list(rm["bla"]))
635 finally:
636 l.delete(ldb.Dn(l, "dc=add"))
638 def test_modify_add_text(self):
639 l = ldb.Ldb(self.url(), flags=self.flags())
640 m = ldb.Message()
641 m.dn = ldb.Dn(l, "dc=add")
642 m.text["bla"] = ["1234"]
643 m["objectUUID"] = b"0123456789abcdef"
644 l.add(m)
645 try:
646 m = ldb.Message()
647 m.dn = ldb.Dn(l, "dc=add")
648 m["bla"] = ldb.MessageElement(["456"], ldb.FLAG_MOD_ADD, "bla")
649 self.assertEqual(ldb.FLAG_MOD_ADD, m["bla"].flags())
650 l.modify(m)
651 rm = l.search(m.dn)[0]
652 self.assertEqual(3, len(rm))
653 self.assertEqual(["1234", "456"], list(rm.text["bla"]))
654 finally:
655 l.delete(ldb.Dn(l, "dc=add"))
657 def test_modify_replace(self):
658 l = ldb.Ldb(self.url(), flags=self.flags())
659 m = ldb.Message()
660 m.dn = ldb.Dn(l, "dc=modify2")
661 m["bla"] = [b"1234", b"456"]
662 m["objectUUID"] = b"0123456789abcdef"
663 l.add(m)
664 try:
665 m = ldb.Message()
666 m.dn = ldb.Dn(l, "dc=modify2")
667 m["bla"] = ldb.MessageElement([b"789"], ldb.FLAG_MOD_REPLACE, "bla")
668 self.assertEqual(ldb.FLAG_MOD_REPLACE, m["bla"].flags())
669 l.modify(m)
670 rm = l.search(m.dn)[0]
671 self.assertEqual(3, len(rm))
672 self.assertEqual([b"789"], list(rm["bla"]))
673 rm = l.search(m.dn, attrs=["bla"])[0]
674 self.assertEqual(1, len(rm))
675 finally:
676 l.delete(ldb.Dn(l, "dc=modify2"))
678 def test_modify_replace_text(self):
679 l = ldb.Ldb(self.url(), flags=self.flags())
680 m = ldb.Message()
681 m.dn = ldb.Dn(l, "dc=modify2")
682 m.text["bla"] = ["1234", "456"]
683 m["objectUUID"] = b"0123456789abcdef"
684 l.add(m)
685 try:
686 m = ldb.Message()
687 m.dn = ldb.Dn(l, "dc=modify2")
688 m["bla"] = ldb.MessageElement(["789"], ldb.FLAG_MOD_REPLACE, "bla")
689 self.assertEqual(ldb.FLAG_MOD_REPLACE, m["bla"].flags())
690 l.modify(m)
691 rm = l.search(m.dn)[0]
692 self.assertEqual(3, len(rm))
693 self.assertEqual(["789"], list(rm.text["bla"]))
694 rm = l.search(m.dn, attrs=["bla"])[0]
695 self.assertEqual(1, len(rm))
696 finally:
697 l.delete(ldb.Dn(l, "dc=modify2"))
699 def test_modify_flags_change(self):
700 l = ldb.Ldb(self.url(), flags=self.flags())
701 m = ldb.Message()
702 m.dn = ldb.Dn(l, "dc=add")
703 m["bla"] = [b"1234"]
704 m["objectUUID"] = b"0123456789abcdef"
705 l.add(m)
706 try:
707 m = ldb.Message()
708 m.dn = ldb.Dn(l, "dc=add")
709 m["bla"] = ldb.MessageElement([b"456"], ldb.FLAG_MOD_ADD, "bla")
710 self.assertEqual(ldb.FLAG_MOD_ADD, m["bla"].flags())
711 l.modify(m)
712 rm = l.search(m.dn)[0]
713 self.assertEqual(3, len(rm))
714 self.assertEqual([b"1234", b"456"], list(rm["bla"]))
716 # Now create another modify, but switch the flags before we do it
717 m["bla"] = ldb.MessageElement([b"456"], ldb.FLAG_MOD_ADD, "bla")
718 m["bla"].set_flags(ldb.FLAG_MOD_DELETE)
719 l.modify(m)
720 rm = l.search(m.dn, attrs=["bla"])[0]
721 self.assertEqual(1, len(rm))
722 self.assertEqual([b"1234"], list(rm["bla"]))
723 finally:
724 l.delete(ldb.Dn(l, "dc=add"))
726 def test_modify_flags_change_text(self):
727 l = ldb.Ldb(self.url(), flags=self.flags())
728 m = ldb.Message()
729 m.dn = ldb.Dn(l, "dc=add")
730 m.text["bla"] = ["1234"]
731 m["objectUUID"] = b"0123456789abcdef"
732 l.add(m)
733 try:
734 m = ldb.Message()
735 m.dn = ldb.Dn(l, "dc=add")
736 m["bla"] = ldb.MessageElement(["456"], ldb.FLAG_MOD_ADD, "bla")
737 self.assertEqual(ldb.FLAG_MOD_ADD, m["bla"].flags())
738 l.modify(m)
739 rm = l.search(m.dn)[0]
740 self.assertEqual(3, len(rm))
741 self.assertEqual(["1234", "456"], list(rm.text["bla"]))
743 # Now create another modify, but switch the flags before we do it
744 m["bla"] = ldb.MessageElement(["456"], ldb.FLAG_MOD_ADD, "bla")
745 m["bla"].set_flags(ldb.FLAG_MOD_DELETE)
746 l.modify(m)
747 rm = l.search(m.dn, attrs=["bla"])[0]
748 self.assertEqual(1, len(rm))
749 self.assertEqual(["1234"], list(rm.text["bla"]))
750 finally:
751 l.delete(ldb.Dn(l, "dc=add"))
753 def test_transaction_commit(self):
754 l = ldb.Ldb(self.url(), flags=self.flags())
755 l.transaction_start()
756 m = ldb.Message(ldb.Dn(l, "dc=foo9"))
757 m["foo"] = [b"bar"]
758 m["objectUUID"] = b"0123456789abcdef"
759 l.add(m)
760 l.transaction_commit()
761 l.delete(m.dn)
763 def test_transaction_cancel(self):
764 l = ldb.Ldb(self.url(), flags=self.flags())
765 l.transaction_start()
766 m = ldb.Message(ldb.Dn(l, "dc=foo10"))
767 m["foo"] = [b"bar"]
768 m["objectUUID"] = b"0123456789abcdee"
769 l.add(m)
770 l.transaction_cancel()
771 self.assertEqual(0, len(l.search(ldb.Dn(l, "dc=foo10"))))
773 def test_set_debug(self):
774 def my_report_fn(level, text):
775 pass
776 l = ldb.Ldb(self.url(), flags=self.flags())
777 l.set_debug(my_report_fn)
779 def test_zero_byte_string(self):
780 """Testing we do not get trapped in the \0 byte in a property string."""
781 l = ldb.Ldb(self.url(), flags=self.flags())
782 l.add({
783 "dn": b"dc=somedn",
784 "objectclass": b"user",
785 "cN": b"LDAPtestUSER",
786 "givenname": b"ldap",
787 "displayname": b"foo\0bar",
788 "objectUUID": b"0123456789abcdef"
790 res = l.search(expression="(dn=dc=somedn)")
791 self.assertEqual(b"foo\0bar", res[0]["displayname"][0])
793 def test_no_crash_broken_expr(self):
794 l = ldb.Ldb(self.url(), flags=self.flags())
795 self.assertRaises(ldb.LdbError, lambda: l.search("", ldb.SCOPE_SUBTREE, "&(dc=*)(dn=*)", ["dc"]))
797 # Run the SimpleLdb tests against an lmdb backend
800 class SimpleLdbLmdb(SimpleLdb):
802 def setUp(self):
803 if os.environ.get('HAVE_LMDB', '1') == '0':
804 self.skipTest("No lmdb backend")
805 self.prefix = MDB_PREFIX
806 self.index = MDB_INDEX_OBJ
807 super(SimpleLdbLmdb, self).setUp()
809 def tearDown(self):
810 super(SimpleLdbLmdb, self).tearDown()
813 class SimpleLdbNoLmdb(LdbBaseTest):
815 def setUp(self):
816 if os.environ.get('HAVE_LMDB', '1') != '0':
817 self.skipTest("lmdb backend enabled")
818 self.prefix = MDB_PREFIX
819 self.index = MDB_INDEX_OBJ
820 super(SimpleLdbNoLmdb, self).setUp()
822 def tearDown(self):
823 super(SimpleLdbNoLmdb, self).tearDown()
825 def test_lmdb_disabled(self):
826 self.testdir = tempdir()
827 self.filename = os.path.join(self.testdir, "test.ldb")
828 try:
829 self.ldb = ldb.Ldb(self.url(), flags=self.flags())
830 self.fail("Should have failed on missing LMDB")
831 except ldb.LdbError as err:
832 enum = err.args[0]
833 self.assertEqual(enum, ldb.ERR_OTHER)
836 class SearchTests(LdbBaseTest):
837 def tearDown(self):
838 shutil.rmtree(self.testdir)
839 super(SearchTests, self).tearDown()
841 # Ensure the LDB is closed now, so we close the FD
842 del(self.l)
844 def setUp(self):
845 super(SearchTests, self).setUp()
846 self.testdir = tempdir()
847 self.filename = os.path.join(self.testdir, "search_test.ldb")
848 options = ["modules:rdn_name"]
849 if hasattr(self, 'IDXCHECK'):
850 options.append("disable_full_db_scan_for_self_test:1")
851 self.l = ldb.Ldb(self.url(),
852 flags=self.flags(),
853 options=options)
854 try:
855 self.l.add(self.index)
856 except AttributeError:
857 pass
859 self.l.add({"dn": "@ATTRIBUTES",
860 "DC": "CASE_INSENSITIVE"})
862 # Note that we can't use the name objectGUID here, as we
863 # want to stay clear of the objectGUID handler in LDB and
864 # instead use just the 16 bytes raw, which we just keep
865 # to printable chars here for ease of handling.
867 self.l.add({"dn": "DC=ORG",
868 "name": b"org",
869 "objectUUID": b"0000000000abcdef"})
870 self.l.add({"dn": "DC=EXAMPLE,DC=ORG",
871 "name": b"org",
872 "objectUUID": b"0000000001abcdef"})
873 self.l.add({"dn": "OU=OU1,DC=EXAMPLE,DC=ORG",
874 "name": b"OU #1",
875 "x": "y", "y": "a",
876 "objectUUID": b"0023456789abcde3"})
877 self.l.add({"dn": "OU=OU2,DC=EXAMPLE,DC=ORG",
878 "name": b"OU #2",
879 "x": "y", "y": "a",
880 "objectUUID": b"0023456789abcde4"})
881 self.l.add({"dn": "OU=OU3,DC=EXAMPLE,DC=ORG",
882 "name": b"OU #3",
883 "x": "y", "y": "a",
884 "objectUUID": b"0023456789abcde5"})
885 self.l.add({"dn": "OU=OU4,DC=EXAMPLE,DC=ORG",
886 "name": b"OU #4",
887 "x": "z", "y": "b",
888 "objectUUID": b"0023456789abcde6"})
889 self.l.add({"dn": "OU=OU5,DC=EXAMPLE,DC=ORG",
890 "name": b"OU #5",
891 "x": "y", "y": "a",
892 "objectUUID": b"0023456789abcde7"})
893 self.l.add({"dn": "OU=OU6,DC=EXAMPLE,DC=ORG",
894 "name": b"OU #6",
895 "x": "y", "y": "a",
896 "objectUUID": b"0023456789abcde8"})
897 self.l.add({"dn": "OU=OU7,DC=EXAMPLE,DC=ORG",
898 "name": b"OU #7",
899 "x": "y", "y": "c",
900 "objectUUID": b"0023456789abcde9"})
901 self.l.add({"dn": "OU=OU8,DC=EXAMPLE,DC=ORG",
902 "name": b"OU #8",
903 "x": "y", "y": "b",
904 "objectUUID": b"0023456789abcde0"})
905 self.l.add({"dn": "OU=OU9,DC=EXAMPLE,DC=ORG",
906 "name": b"OU #9",
907 "x": "y", "y": "a",
908 "objectUUID": b"0023456789abcdea"})
910 self.l.add({"dn": "DC=EXAMPLE,DC=COM",
911 "name": b"org",
912 "objectUUID": b"0000000011abcdef"})
914 self.l.add({"dn": "DC=EXAMPLE,DC=NET",
915 "name": b"org",
916 "objectUUID": b"0000000021abcdef"})
918 self.l.add({"dn": "OU=UNIQUE,DC=EXAMPLE,DC=NET",
919 "objectUUID": b"0000000022abcdef"})
921 self.l.add({"dn": "DC=SAMBA,DC=ORG",
922 "name": b"samba.org",
923 "objectUUID": b"0123456789abcdef"})
924 self.l.add({"dn": "OU=ADMIN,DC=SAMBA,DC=ORG",
925 "name": b"Admins",
926 "x": "z", "y": "a",
927 "objectUUID": b"0123456789abcde1"})
928 self.l.add({"dn": "OU=USERS,DC=SAMBA,DC=ORG",
929 "name": b"Users",
930 "x": "z", "y": "a",
931 "objectUUID": b"0123456789abcde2"})
932 self.l.add({"dn": "OU=OU1,DC=SAMBA,DC=ORG",
933 "name": b"OU #1",
934 "x": "y", "y": "a",
935 "objectUUID": b"0123456789abcde3"})
936 self.l.add({"dn": "OU=OU2,DC=SAMBA,DC=ORG",
937 "name": b"OU #2",
938 "x": "y", "y": "a",
939 "objectUUID": b"0123456789abcde4"})
940 self.l.add({"dn": "OU=OU3,DC=SAMBA,DC=ORG",
941 "name": b"OU #3",
942 "x": "y", "y": "a",
943 "objectUUID": b"0123456789abcde5"})
944 self.l.add({"dn": "OU=OU4,DC=SAMBA,DC=ORG",
945 "name": b"OU #4",
946 "x": "y", "y": "a",
947 "objectUUID": b"0123456789abcde6"})
948 self.l.add({"dn": "OU=OU5,DC=SAMBA,DC=ORG",
949 "name": b"OU #5",
950 "x": "y", "y": "a",
951 "objectUUID": b"0123456789abcde7"})
952 self.l.add({"dn": "OU=OU6,DC=SAMBA,DC=ORG",
953 "name": b"OU #6",
954 "x": "y", "y": "a",
955 "objectUUID": b"0123456789abcde8"})
956 self.l.add({"dn": "OU=OU7,DC=SAMBA,DC=ORG",
957 "name": b"OU #7",
958 "x": "y", "y": "a",
959 "objectUUID": b"0123456789abcde9"})
960 self.l.add({"dn": "OU=OU8,DC=SAMBA,DC=ORG",
961 "name": b"OU #8",
962 "x": "y", "y": "a",
963 "objectUUID": b"0123456789abcde0"})
964 self.l.add({"dn": "OU=OU9,DC=SAMBA,DC=ORG",
965 "name": b"OU #9",
966 "x": "y", "y": "a",
967 "objectUUID": b"0123456789abcdea"})
968 self.l.add({"dn": "OU=OU10,DC=SAMBA,DC=ORG",
969 "name": b"OU #10",
970 "x": "y", "y": "a",
971 "objectUUID": b"0123456789abcdeb"})
972 self.l.add({"dn": "OU=OU11,DC=SAMBA,DC=ORG",
973 "name": b"OU #10",
974 "x": "y", "y": "a",
975 "objectUUID": b"0123456789abcdec"})
976 self.l.add({"dn": "OU=OU12,DC=SAMBA,DC=ORG",
977 "name": b"OU #10",
978 "x": "y", "y": "b",
979 "objectUUID": b"0123456789abcded"})
980 self.l.add({"dn": "OU=OU13,DC=SAMBA,DC=ORG",
981 "name": b"OU #10",
982 "x": "x", "y": "b",
983 "objectUUID": b"0123456789abcdee"})
984 self.l.add({"dn": "OU=OU14,DC=SAMBA,DC=ORG",
985 "name": b"OU #10",
986 "x": "x", "y": "b",
987 "objectUUID": b"0123456789abcd01"})
988 self.l.add({"dn": "OU=OU15,DC=SAMBA,DC=ORG",
989 "name": b"OU #10",
990 "x": "x", "y": "b",
991 "objectUUID": b"0123456789abcd02"})
992 self.l.add({"dn": "OU=OU16,DC=SAMBA,DC=ORG",
993 "name": b"OU #10",
994 "x": "x", "y": "b",
995 "objectUUID": b"0123456789abcd03"})
996 self.l.add({"dn": "OU=OU17,DC=SAMBA,DC=ORG",
997 "name": b"OU #10",
998 "x": "x", "y": "b",
999 "objectUUID": b"0123456789abcd04"})
1000 self.l.add({"dn": "OU=OU18,DC=SAMBA,DC=ORG",
1001 "name": b"OU #10",
1002 "x": "x", "y": "b",
1003 "objectUUID": b"0123456789abcd05"})
1004 self.l.add({"dn": "OU=OU19,DC=SAMBA,DC=ORG",
1005 "name": b"OU #10",
1006 "x": "x", "y": "b",
1007 "objectUUID": b"0123456789abcd06"})
1008 self.l.add({"dn": "OU=OU20,DC=SAMBA,DC=ORG",
1009 "name": b"OU #10",
1010 "x": "x", "y": "b",
1011 "objectUUID": b"0123456789abcd07"})
1012 self.l.add({"dn": "OU=OU21,DC=SAMBA,DC=ORG",
1013 "name": b"OU #10",
1014 "x": "x", "y": "c",
1015 "objectUUID": b"0123456789abcd08"})
1016 self.l.add({"dn": "OU=OU22,DC=SAMBA,DC=ORG",
1017 "name": b"OU #10",
1018 "x": "x", "y": "c",
1019 "objectUUID": b"0123456789abcd09"})
1021 def test_base(self):
1022 """Testing a search"""
1024 res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
1025 scope=ldb.SCOPE_BASE)
1026 self.assertEqual(len(res11), 1)
1028 def test_base_lower(self):
1029 """Testing a search"""
1031 res11 = self.l.search(base="OU=OU11,DC=samba,DC=org",
1032 scope=ldb.SCOPE_BASE)
1033 self.assertEqual(len(res11), 1)
1035 def test_base_or(self):
1036 """Testing a search"""
1038 res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
1039 scope=ldb.SCOPE_BASE,
1040 expression="(|(ou=ou11)(ou=ou12))")
1041 self.assertEqual(len(res11), 1)
1043 def test_base_or2(self):
1044 """Testing a search"""
1046 res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
1047 scope=ldb.SCOPE_BASE,
1048 expression="(|(x=y)(y=b))")
1049 self.assertEqual(len(res11), 1)
1051 def test_base_and(self):
1052 """Testing a search"""
1054 res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
1055 scope=ldb.SCOPE_BASE,
1056 expression="(&(ou=ou11)(ou=ou12))")
1057 self.assertEqual(len(res11), 0)
1059 def test_base_and2(self):
1060 """Testing a search"""
1062 res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
1063 scope=ldb.SCOPE_BASE,
1064 expression="(&(x=y)(y=a))")
1065 self.assertEqual(len(res11), 1)
1067 def test_base_false(self):
1068 """Testing a search"""
1070 res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
1071 scope=ldb.SCOPE_BASE,
1072 expression="(|(ou=ou13)(ou=ou12))")
1073 self.assertEqual(len(res11), 0)
1075 def test_check_base_false(self):
1076 """Testing a search"""
1077 res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
1078 scope=ldb.SCOPE_BASE,
1079 expression="(|(ou=ou13)(ou=ou12))")
1080 self.assertEqual(len(res11), 0)
1082 def test_check_base_error(self):
1083 """Testing a search"""
1084 checkbaseonsearch = {"dn": "@OPTIONS",
1085 "checkBaseOnSearch": b"TRUE"}
1086 try:
1087 self.l.add(checkbaseonsearch)
1088 except ldb.LdbError as err:
1089 enum = err.args[0]
1090 self.assertEqual(enum, ldb.ERR_ENTRY_ALREADY_EXISTS)
1091 m = ldb.Message.from_dict(self.l,
1092 checkbaseonsearch)
1093 self.l.modify(m)
1095 try:
1096 res11 = self.l.search(base="OU=OU11x,DC=SAMBA,DC=ORG",
1097 scope=ldb.SCOPE_BASE,
1098 expression="(|(ou=ou13)(ou=ou12))")
1099 self.fail("Should have failed on missing base")
1100 except ldb.LdbError as err:
1101 enum = err.args[0]
1102 self.assertEqual(enum, ldb.ERR_NO_SUCH_OBJECT)
1104 def test_subtree(self):
1105 """Testing a search"""
1107 try:
1108 res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1109 scope=ldb.SCOPE_SUBTREE)
1110 if hasattr(self, 'IDXCHECK'):
1111 self.fail()
1112 except ldb.LdbError as err:
1113 enum = err.args[0]
1114 estr = err.args[1]
1115 self.assertEqual(enum, ldb.ERR_INAPPROPRIATE_MATCHING)
1116 self.assertIn(estr, "ldb FULL SEARCH disabled")
1117 else:
1118 self.assertEqual(len(res11), 25)
1120 def test_subtree2(self):
1121 """Testing a search"""
1123 try:
1124 res11 = self.l.search(base="DC=ORG",
1125 scope=ldb.SCOPE_SUBTREE)
1126 if hasattr(self, 'IDXCHECK'):
1127 self.fail()
1128 except ldb.LdbError as err:
1129 enum = err.args[0]
1130 estr = err.args[1]
1131 self.assertEqual(enum, ldb.ERR_INAPPROPRIATE_MATCHING)
1132 self.assertIn(estr, "ldb FULL SEARCH disabled")
1133 else:
1134 self.assertEqual(len(res11), 36)
1136 def test_subtree_and(self):
1137 """Testing a search"""
1139 res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1140 scope=ldb.SCOPE_SUBTREE,
1141 expression="(&(ou=ou11)(ou=ou12))")
1142 self.assertEqual(len(res11), 0)
1144 def test_subtree_and2(self):
1145 """Testing a search"""
1147 res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1148 scope=ldb.SCOPE_SUBTREE,
1149 expression="(&(x=y)(|(y=b)(y=c)))")
1150 self.assertEqual(len(res11), 1)
1152 def test_subtree_and2_lower(self):
1153 """Testing a search"""
1155 res11 = self.l.search(base="DC=samba,DC=org",
1156 scope=ldb.SCOPE_SUBTREE,
1157 expression="(&(x=y)(|(y=b)(y=c)))")
1158 self.assertEqual(len(res11), 1)
1160 def test_subtree_or(self):
1161 """Testing a search"""
1163 res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1164 scope=ldb.SCOPE_SUBTREE,
1165 expression="(|(ou=ou11)(ou=ou12))")
1166 self.assertEqual(len(res11), 2)
1168 def test_subtree_or2(self):
1169 """Testing a search"""
1171 res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1172 scope=ldb.SCOPE_SUBTREE,
1173 expression="(|(x=y)(y=b))")
1174 self.assertEqual(len(res11), 20)
1176 def test_subtree_or3(self):
1177 """Testing a search"""
1179 res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1180 scope=ldb.SCOPE_SUBTREE,
1181 expression="(|(x=y)(y=b)(y=c))")
1182 self.assertEqual(len(res11), 22)
1184 def test_one_and(self):
1185 """Testing a search"""
1187 res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1188 scope=ldb.SCOPE_ONELEVEL,
1189 expression="(&(ou=ou11)(ou=ou12))")
1190 self.assertEqual(len(res11), 0)
1192 def test_one_and2(self):
1193 """Testing a search"""
1195 res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1196 scope=ldb.SCOPE_ONELEVEL,
1197 expression="(&(x=y)(y=b))")
1198 self.assertEqual(len(res11), 1)
1200 def test_one_or(self):
1201 """Testing a search"""
1203 res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1204 scope=ldb.SCOPE_ONELEVEL,
1205 expression="(|(ou=ou11)(ou=ou12))")
1206 self.assertEqual(len(res11), 2)
1208 def test_one_or2(self):
1209 """Testing a search"""
1211 res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1212 scope=ldb.SCOPE_ONELEVEL,
1213 expression="(|(x=y)(y=b))")
1214 self.assertEqual(len(res11), 20)
1216 def test_one_or2_lower(self):
1217 """Testing a search"""
1219 res11 = self.l.search(base="DC=samba,DC=org",
1220 scope=ldb.SCOPE_ONELEVEL,
1221 expression="(|(x=y)(y=b))")
1222 self.assertEqual(len(res11), 20)
1224 def test_one_unindexable(self):
1225 """Testing a search"""
1227 try:
1228 res11 = self.l.search(base="DC=samba,DC=org",
1229 scope=ldb.SCOPE_ONELEVEL,
1230 expression="(y=b*)")
1231 if hasattr(self, 'IDX') and \
1232 not hasattr(self, 'IDXONE') and \
1233 hasattr(self, 'IDXCHECK'):
1234 self.fail("Should have failed as un-indexed search")
1236 self.assertEqual(len(res11), 9)
1238 except ldb.LdbError as err:
1239 enum = err.args[0]
1240 estr = err.args[1]
1241 self.assertEqual(enum, ldb.ERR_INAPPROPRIATE_MATCHING)
1242 self.assertIn(estr, "ldb FULL SEARCH disabled")
1244 def test_one_unindexable_presence(self):
1245 """Testing a search"""
1247 try:
1248 res11 = self.l.search(base="DC=samba,DC=org",
1249 scope=ldb.SCOPE_ONELEVEL,
1250 expression="(y=*)")
1251 if hasattr(self, 'IDX') and \
1252 not hasattr(self, 'IDXONE') and \
1253 hasattr(self, 'IDXCHECK'):
1254 self.fail("Should have failed as un-indexed search")
1256 self.assertEqual(len(res11), 24)
1258 except ldb.LdbError as err:
1259 enum = err.args[0]
1260 estr = err.args[1]
1261 self.assertEqual(enum, ldb.ERR_INAPPROPRIATE_MATCHING)
1262 self.assertIn(estr, "ldb FULL SEARCH disabled")
1264 def test_subtree_and_or(self):
1265 """Testing a search"""
1267 res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1268 scope=ldb.SCOPE_SUBTREE,
1269 expression="(&(|(x=z)(y=b))(x=x)(y=c))")
1270 self.assertEqual(len(res11), 0)
1272 def test_subtree_and_or2(self):
1273 """Testing a search"""
1275 res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1276 scope=ldb.SCOPE_SUBTREE,
1277 expression="(&(x=x)(y=c)(|(x=z)(y=b)))")
1278 self.assertEqual(len(res11), 0)
1280 def test_subtree_and_or3(self):
1281 """Testing a search"""
1283 res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1284 scope=ldb.SCOPE_SUBTREE,
1285 expression="(&(|(ou=ou11)(ou=ou10))(|(x=y)(y=b)(y=c)))")
1286 self.assertEqual(len(res11), 2)
1288 def test_subtree_and_or4(self):
1289 """Testing a search"""
1291 res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1292 scope=ldb.SCOPE_SUBTREE,
1293 expression="(&(|(x=y)(y=b)(y=c))(|(ou=ou11)(ou=ou10)))")
1294 self.assertEqual(len(res11), 2)
1296 def test_subtree_and_or5(self):
1297 """Testing a search"""
1299 res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1300 scope=ldb.SCOPE_SUBTREE,
1301 expression="(&(|(x=y)(y=b)(y=c))(ou=ou11))")
1302 self.assertEqual(len(res11), 1)
1304 def test_subtree_or_and(self):
1305 """Testing a search"""
1307 res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1308 scope=ldb.SCOPE_SUBTREE,
1309 expression="(|(x=x)(y=c)(&(x=z)(y=b)))")
1310 self.assertEqual(len(res11), 10)
1312 def test_subtree_large_and_unique(self):
1313 """Testing a search"""
1315 res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1316 scope=ldb.SCOPE_SUBTREE,
1317 expression="(&(ou=ou10)(y=a))")
1318 self.assertEqual(len(res11), 1)
1320 def test_subtree_unique(self):
1321 """Testing a search"""
1323 res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1324 scope=ldb.SCOPE_SUBTREE,
1325 expression="(ou=ou10)")
1326 self.assertEqual(len(res11), 1)
1328 def test_subtree_unique_elsewhere(self):
1329 """Testing a search"""
1331 res11 = self.l.search(base="DC=EXAMPLE,DC=ORG",
1332 scope=ldb.SCOPE_SUBTREE,
1333 expression="(ou=ou10)")
1334 self.assertEqual(len(res11), 0)
1336 def test_subtree_unique_elsewhere2(self):
1337 """Testing a search"""
1339 res11 = self.l.search(base="DC=EXAMPLE,DC=NET",
1340 scope=ldb.SCOPE_SUBTREE,
1341 expression="(ou=unique)")
1342 self.assertEqual(len(res11), 1)
1344 def test_subtree_uni123_elsewhere(self):
1345 """Testing a search, where the search term contains a (normal ASCII)
1346 dotted-i, that will be upper-cased to 'Ä°', U+0130, LATIN
1347 CAPITAL LETTER I WITH DOT ABOVE in certain locales including
1348 tr_TR in which this test is sometimes run.
1350 The search term should fail because the ou does not exist, but
1351 we used to get it wrong in unindexed searches which stopped
1352 comparing at the i, ignoring the rest of the string, which is
1353 not the same as the existing ou ('123' != 'que').
1355 res11 = self.l.search(base="DC=EXAMPLE,DC=NET",
1356 scope=ldb.SCOPE_SUBTREE,
1357 expression="(ou=uni123)")
1358 self.assertEqual(len(res11), 0)
1360 def test_subtree_unique_elsewhere3(self):
1361 """Testing a search"""
1363 res11 = self.l.search(base="DC=EXAMPLE,DC=ORG",
1364 scope=ldb.SCOPE_SUBTREE,
1365 expression="(ou=unique)")
1366 self.assertEqual(len(res11), 0)
1368 def test_subtree_unique_elsewhere4(self):
1369 """Testing a search"""
1371 res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1372 scope=ldb.SCOPE_SUBTREE,
1373 expression="(ou=unique)")
1374 self.assertEqual(len(res11), 0)
1376 def test_subtree_unique_elsewhere5(self):
1377 """Testing a search"""
1379 res11 = self.l.search(base="DC=EXAMPLE,DC=COM",
1380 scope=ldb.SCOPE_SUBTREE,
1381 expression="(ou=unique)")
1382 self.assertEqual(len(res11), 0)
1384 def test_subtree_unique_elsewhere6(self):
1385 """Testing a search"""
1387 res11 = self.l.search(base="DC=EXAMPLE,DC=ORG",
1388 scope=ldb.SCOPE_SUBTREE,
1389 expression="(ou=unique)")
1390 self.assertEqual(len(res11), 0)
1392 def test_subtree_unique_elsewhere7(self):
1393 """Testing a search"""
1395 res11 = self.l.search(base="DC=EXAMPLE,DC=COM",
1396 scope=ldb.SCOPE_SUBTREE,
1397 expression="(ou=ou10)")
1398 self.assertEqual(len(res11), 0)
1400 def test_subtree_unique_here(self):
1401 """Testing a search"""
1403 res11 = self.l.search(base="OU=UNIQUE,DC=EXAMPLE,DC=NET",
1404 scope=ldb.SCOPE_SUBTREE,
1405 expression="(ou=unique)")
1406 self.assertEqual(len(res11), 1)
1408 def test_subtree_and_none(self):
1409 """Testing a search"""
1411 res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1412 scope=ldb.SCOPE_SUBTREE,
1413 expression="(&(ou=ouX)(y=a))")
1414 self.assertEqual(len(res11), 0)
1416 def test_subtree_and_idx_record(self):
1417 """Testing a search against the index record"""
1419 res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1420 scope=ldb.SCOPE_SUBTREE,
1421 expression="(@IDXDN=DC=SAMBA,DC=ORG)")
1422 self.assertEqual(len(res11), 0)
1424 def test_subtree_and_idxone_record(self):
1425 """Testing a search against the index record"""
1427 res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1428 scope=ldb.SCOPE_SUBTREE,
1429 expression="(@IDXONE=DC=SAMBA,DC=ORG)")
1430 self.assertEqual(len(res11), 0)
1432 def test_onelevel(self):
1433 """Testing a search"""
1435 try:
1436 res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1437 scope=ldb.SCOPE_ONELEVEL)
1438 if hasattr(self, 'IDXCHECK') \
1439 and not hasattr(self, 'IDXONE'):
1440 self.fail()
1441 except ldb.LdbError as err:
1442 enum = err.args[0]
1443 estr = err.args[1]
1444 self.assertEqual(enum, ldb.ERR_INAPPROPRIATE_MATCHING)
1445 self.assertIn(estr, "ldb FULL SEARCH disabled")
1446 else:
1447 self.assertEqual(len(res11), 24)
1449 def test_onelevel2(self):
1450 """Testing a search"""
1452 try:
1453 res11 = self.l.search(base="DC=EXAMPLE,DC=ORG",
1454 scope=ldb.SCOPE_ONELEVEL)
1455 if hasattr(self, 'IDXCHECK') \
1456 and not hasattr(self, 'IDXONE'):
1457 self.fail()
1458 self.fail()
1459 except ldb.LdbError as err:
1460 enum = err.args[0]
1461 estr = err.args[1]
1462 self.assertEqual(enum, ldb.ERR_INAPPROPRIATE_MATCHING)
1463 self.assertIn(estr, "ldb FULL SEARCH disabled")
1464 else:
1465 self.assertEqual(len(res11), 9)
1467 def test_onelevel_and_or(self):
1468 """Testing a search"""
1470 res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1471 scope=ldb.SCOPE_ONELEVEL,
1472 expression="(&(|(x=z)(y=b))(x=x)(y=c))")
1473 self.assertEqual(len(res11), 0)
1475 def test_onelevel_and_or2(self):
1476 """Testing a search"""
1478 res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1479 scope=ldb.SCOPE_ONELEVEL,
1480 expression="(&(x=x)(y=c)(|(x=z)(y=b)))")
1481 self.assertEqual(len(res11), 0)
1483 def test_onelevel_and_or3(self):
1484 """Testing a search"""
1486 res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1487 scope=ldb.SCOPE_ONELEVEL,
1488 expression="(&(|(ou=ou11)(ou=ou10))(|(x=y)(y=b)(y=c)))")
1489 self.assertEqual(len(res11), 2)
1491 def test_onelevel_and_or4(self):
1492 """Testing a search"""
1494 res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1495 scope=ldb.SCOPE_ONELEVEL,
1496 expression="(&(|(x=y)(y=b)(y=c))(|(ou=ou11)(ou=ou10)))")
1497 self.assertEqual(len(res11), 2)
1499 def test_onelevel_and_or5(self):
1500 """Testing a search"""
1502 res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1503 scope=ldb.SCOPE_ONELEVEL,
1504 expression="(&(|(x=y)(y=b)(y=c))(ou=ou11))")
1505 self.assertEqual(len(res11), 1)
1507 def test_onelevel_or_and(self):
1508 """Testing a search"""
1510 res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1511 scope=ldb.SCOPE_ONELEVEL,
1512 expression="(|(x=x)(y=c)(&(x=z)(y=b)))")
1513 self.assertEqual(len(res11), 10)
1515 def test_onelevel_large_and_unique(self):
1516 """Testing a search"""
1518 res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1519 scope=ldb.SCOPE_ONELEVEL,
1520 expression="(&(ou=ou10)(y=a))")
1521 self.assertEqual(len(res11), 1)
1523 def test_onelevel_unique(self):
1524 """Testing a search"""
1526 res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1527 scope=ldb.SCOPE_ONELEVEL,
1528 expression="(ou=ou10)")
1529 self.assertEqual(len(res11), 1)
1531 def test_onelevel_unique_elsewhere(self):
1532 """Testing a search"""
1534 res11 = self.l.search(base="DC=EXAMPLE,DC=ORG",
1535 scope=ldb.SCOPE_ONELEVEL,
1536 expression="(ou=ou10)")
1537 self.assertEqual(len(res11), 0)
1539 def test_onelevel_unique_elsewhere2(self):
1540 """Testing a search (showing that onelevel is not subtree)"""
1542 res11 = self.l.search(base="DC=EXAMPLE,DC=NET",
1543 scope=ldb.SCOPE_ONELEVEL,
1544 expression="(ou=unique)")
1545 self.assertEqual(len(res11), 1)
1547 def test_onelevel_unique_elsewhere3(self):
1548 """Testing a search (showing that onelevel is not subtree)"""
1550 res11 = self.l.search(base="DC=EXAMPLE,DC=ORG",
1551 scope=ldb.SCOPE_ONELEVEL,
1552 expression="(ou=unique)")
1553 self.assertEqual(len(res11), 0)
1555 def test_onelevel_unique_elsewhere4(self):
1556 """Testing a search (showing that onelevel is not subtree)"""
1558 res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1559 scope=ldb.SCOPE_ONELEVEL,
1560 expression="(ou=unique)")
1561 self.assertEqual(len(res11), 0)
1563 def test_onelevel_unique_elsewhere5(self):
1564 """Testing a search (showing that onelevel is not subtree)"""
1566 res11 = self.l.search(base="DC=EXAMPLE,DC=COM",
1567 scope=ldb.SCOPE_ONELEVEL,
1568 expression="(ou=unique)")
1569 self.assertEqual(len(res11), 0)
1571 def test_onelevel_unique_elsewhere6(self):
1572 """Testing a search"""
1574 res11 = self.l.search(base="DC=EXAMPLE,DC=COM",
1575 scope=ldb.SCOPE_ONELEVEL,
1576 expression="(ou=ou10)")
1577 self.assertEqual(len(res11), 0)
1579 def test_onelevel_unique_here(self):
1580 """Testing a search"""
1582 res11 = self.l.search(base="OU=UNIQUE,DC=EXAMPLE,DC=NET",
1583 scope=ldb.SCOPE_ONELEVEL,
1584 expression="(ou=unique)")
1585 self.assertEqual(len(res11), 0)
1587 def test_onelevel_and_none(self):
1588 """Testing a search"""
1590 res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1591 scope=ldb.SCOPE_ONELEVEL,
1592 expression="(&(ou=ouX)(y=a))")
1593 self.assertEqual(len(res11), 0)
1595 def test_onelevel_and_idx_record(self):
1596 """Testing a search against the index record"""
1598 res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1599 scope=ldb.SCOPE_ONELEVEL,
1600 expression="(@IDXDN=DC=SAMBA,DC=ORG)")
1601 self.assertEqual(len(res11), 0)
1603 def test_onelevel_and_idxone_record(self):
1604 """Testing a search against the index record"""
1606 res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1607 scope=ldb.SCOPE_ONELEVEL,
1608 expression="(@IDXONE=DC=SAMBA,DC=ORG)")
1609 self.assertEqual(len(res11), 0)
1611 def test_subtree_unindexable(self):
1612 """Testing a search"""
1614 try:
1615 res11 = self.l.search(base="DC=samba,DC=org",
1616 scope=ldb.SCOPE_SUBTREE,
1617 expression="(y=b*)")
1618 if hasattr(self, 'IDX') and \
1619 hasattr(self, 'IDXCHECK'):
1620 self.fail("Should have failed as un-indexed search")
1622 self.assertEqual(len(res11), 9)
1624 except ldb.LdbError as err:
1625 enum = err.args[0]
1626 estr = err.args[1]
1627 self.assertEqual(enum, ldb.ERR_INAPPROPRIATE_MATCHING)
1628 self.assertIn(estr, "ldb FULL SEARCH disabled")
1630 def test_onelevel_only_and_or(self):
1631 """Testing a search (showing that onelevel is not subtree)"""
1633 res11 = self.l.search(base="DC=ORG",
1634 scope=ldb.SCOPE_ONELEVEL,
1635 expression="(&(|(x=z)(y=b))(x=x)(y=c))")
1636 self.assertEqual(len(res11), 0)
1638 def test_onelevel_only_and_or2(self):
1639 """Testing a search (showing that onelevel is not subtree)"""
1641 res11 = self.l.search(base="DC=ORG",
1642 scope=ldb.SCOPE_ONELEVEL,
1643 expression="(&(x=x)(y=c)(|(x=z)(y=b)))")
1644 self.assertEqual(len(res11), 0)
1646 def test_onelevel_only_and_or3(self):
1647 """Testing a search (showing that onelevel is not subtree)"""
1649 res11 = self.l.search(base="DC=ORG",
1650 scope=ldb.SCOPE_ONELEVEL,
1651 expression="(&(|(ou=ou11)(ou=ou10))(|(x=y)(y=b)(y=c)))")
1652 self.assertEqual(len(res11), 0)
1654 def test_onelevel_only_and_or4(self):
1655 """Testing a search (showing that onelevel is not subtree)"""
1657 res11 = self.l.search(base="DC=ORG",
1658 scope=ldb.SCOPE_ONELEVEL,
1659 expression="(&(|(x=y)(y=b)(y=c))(|(ou=ou11)(ou=ou10)))")
1660 self.assertEqual(len(res11), 0)
1662 def test_onelevel_only_and_or5(self):
1663 """Testing a search (showing that onelevel is not subtree)"""
1665 res11 = self.l.search(base="DC=ORG",
1666 scope=ldb.SCOPE_ONELEVEL,
1667 expression="(&(|(x=y)(y=b)(y=c))(ou=ou11))")
1668 self.assertEqual(len(res11), 0)
1670 def test_onelevel_only_or_and(self):
1671 """Testing a search (showing that onelevel is not subtree)"""
1673 res11 = self.l.search(base="DC=ORG",
1674 scope=ldb.SCOPE_ONELEVEL,
1675 expression="(|(x=x)(y=c)(&(x=z)(y=b)))")
1676 self.assertEqual(len(res11), 0)
1678 def test_onelevel_only_large_and_unique(self):
1679 """Testing a search (showing that onelevel is not subtree)"""
1681 res11 = self.l.search(base="DC=ORG",
1682 scope=ldb.SCOPE_ONELEVEL,
1683 expression="(&(ou=ou10)(y=a))")
1684 self.assertEqual(len(res11), 0)
1686 def test_onelevel_only_unique(self):
1687 """Testing a search (showing that onelevel is not subtree)"""
1689 res11 = self.l.search(base="DC=ORG",
1690 scope=ldb.SCOPE_ONELEVEL,
1691 expression="(ou=ou10)")
1692 self.assertEqual(len(res11), 0)
1694 def test_onelevel_only_unique2(self):
1695 """Testing a search"""
1697 res11 = self.l.search(base="DC=ORG",
1698 scope=ldb.SCOPE_ONELEVEL,
1699 expression="(ou=unique)")
1700 self.assertEqual(len(res11), 0)
1702 def test_onelevel_only_and_none(self):
1703 """Testing a search (showing that onelevel is not subtree)"""
1705 res11 = self.l.search(base="DC=ORG",
1706 scope=ldb.SCOPE_ONELEVEL,
1707 expression="(&(ou=ouX)(y=a))")
1708 self.assertEqual(len(res11), 0)
1710 def test_onelevel_small_and_or(self):
1711 """Testing a search (showing that onelevel is not subtree)"""
1713 res11 = self.l.search(base="DC=EXAMPLE,DC=ORG",
1714 scope=ldb.SCOPE_ONELEVEL,
1715 expression="(&(|(x=z)(y=b))(x=x)(y=c))")
1716 self.assertEqual(len(res11), 0)
1718 def test_onelevel_small_and_or2(self):
1719 """Testing a search (showing that onelevel is not subtree)"""
1721 res11 = self.l.search(base="DC=EXAMPLE,DC=ORG",
1722 scope=ldb.SCOPE_ONELEVEL,
1723 expression="(&(x=x)(y=c)(|(x=z)(y=b)))")
1724 self.assertEqual(len(res11), 0)
1726 def test_onelevel_small_and_or3(self):
1727 """Testing a search (showing that onelevel is not subtree)"""
1729 res11 = self.l.search(base="DC=EXAMPLE,DC=ORG",
1730 scope=ldb.SCOPE_ONELEVEL,
1731 expression="(&(|(ou=ou1)(ou=ou2))(|(x=y)(y=b)(y=c)))")
1732 self.assertEqual(len(res11), 2)
1734 def test_onelevel_small_and_or4(self):
1735 """Testing a search (showing that onelevel is not subtree)"""
1737 res11 = self.l.search(base="DC=EXAMPLE,DC=ORG",
1738 scope=ldb.SCOPE_ONELEVEL,
1739 expression="(&(|(x=y)(y=b)(y=c))(|(ou=ou1)(ou=ou2)))")
1740 self.assertEqual(len(res11), 2)
1742 def test_onelevel_small_and_or5(self):
1743 """Testing a search (showing that onelevel is not subtree)"""
1745 res11 = self.l.search(base="DC=EXAMPLE,DC=ORG",
1746 scope=ldb.SCOPE_ONELEVEL,
1747 expression="(&(|(x=y)(y=b)(y=c))(ou=ou1))")
1748 self.assertEqual(len(res11), 1)
1750 def test_onelevel_small_or_and(self):
1751 """Testing a search (showing that onelevel is not subtree)"""
1753 res11 = self.l.search(base="DC=EXAMPLE,DC=ORG",
1754 scope=ldb.SCOPE_ONELEVEL,
1755 expression="(|(x=x)(y=c)(&(x=z)(y=b)))")
1756 self.assertEqual(len(res11), 2)
1758 def test_onelevel_small_large_and_unique(self):
1759 """Testing a search (showing that onelevel is not subtree)"""
1761 res11 = self.l.search(base="DC=EXAMPLE,DC=ORG",
1762 scope=ldb.SCOPE_ONELEVEL,
1763 expression="(&(ou=ou9)(y=a))")
1764 self.assertEqual(len(res11), 1)
1766 def test_onelevel_small_unique_elsewhere(self):
1767 """Testing a search (showing that onelevel is not subtree)"""
1769 res11 = self.l.search(base="DC=EXAMPLE,DC=ORG",
1770 scope=ldb.SCOPE_ONELEVEL,
1771 expression="(ou=ou10)")
1772 self.assertEqual(len(res11), 0)
1774 def test_onelevel_small_and_none(self):
1775 """Testing a search (showing that onelevel is not subtree)"""
1777 res11 = self.l.search(base="DC=EXAMPLE,DC=ORG",
1778 scope=ldb.SCOPE_ONELEVEL,
1779 expression="(&(ou=ouX)(y=a))")
1780 self.assertEqual(len(res11), 0)
1782 def test_subtree_unindexable_presence(self):
1783 """Testing a search"""
1785 try:
1786 res11 = self.l.search(base="DC=samba,DC=org",
1787 scope=ldb.SCOPE_SUBTREE,
1788 expression="(y=*)")
1789 if hasattr(self, 'IDX') and \
1790 hasattr(self, 'IDXCHECK'):
1791 self.fail("Should have failed as un-indexed search")
1793 self.assertEqual(len(res11), 24)
1795 except ldb.LdbError as err:
1796 enum = err.args[0]
1797 estr = err.args[1]
1798 self.assertEqual(enum, ldb.ERR_INAPPROPRIATE_MATCHING)
1799 self.assertIn(estr, "ldb FULL SEARCH disabled")
1801 def test_dn_filter_one(self):
1802 """Testing that a dn= filter succeeds
1803 (or fails with disallowDNFilter
1804 set and IDXGUID or (IDX and not IDXONE) mode)
1805 when the scope is SCOPE_ONELEVEL.
1807 This should be made more consistent, but for now lock in
1808 the behaviour
1812 res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1813 scope=ldb.SCOPE_ONELEVEL,
1814 expression="(dn=OU=OU1,DC=SAMBA,DC=ORG)")
1815 if hasattr(self, 'disallowDNFilter') and \
1816 hasattr(self, 'IDX') and \
1817 (hasattr(self, 'IDXGUID') or
1818 ((not hasattr(self, 'IDXONE') and hasattr(self, 'IDX')))):
1819 self.assertEqual(len(res11), 0)
1820 else:
1821 self.assertEqual(len(res11), 1)
1823 def test_dn_filter_subtree(self):
1824 """Testing that a dn= filter succeeds
1825 (or fails with disallowDNFilter set)
1826 when the scope is SCOPE_SUBTREE"""
1828 res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1829 scope=ldb.SCOPE_SUBTREE,
1830 expression="(dn=OU=OU1,DC=SAMBA,DC=ORG)")
1831 if hasattr(self, 'disallowDNFilter') \
1832 and hasattr(self, 'IDX'):
1833 self.assertEqual(len(res11), 0)
1834 else:
1835 self.assertEqual(len(res11), 1)
1837 def test_dn_filter_base(self):
1838 """Testing that (incorrectly) a dn= filter works
1839 when the scope is SCOPE_BASE"""
1841 res11 = self.l.search(base="OU=OU1,DC=SAMBA,DC=ORG",
1842 scope=ldb.SCOPE_BASE,
1843 expression="(dn=OU=OU1,DC=SAMBA,DC=ORG)")
1845 # At some point we should fix this, but it isn't trivial
1846 self.assertEqual(len(res11), 1)
1848 def test_distinguishedName_filter_one(self):
1849 """Testing that a distinguishedName= filter succeeds
1850 when the scope is SCOPE_ONELEVEL.
1852 This should be made more consistent, but for now lock in
1853 the behaviour
1857 res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1858 scope=ldb.SCOPE_ONELEVEL,
1859 expression="(distinguishedName=OU=OU1,DC=SAMBA,DC=ORG)")
1860 self.assertEqual(len(res11), 1)
1862 def test_distinguishedName_filter_subtree(self):
1863 """Testing that a distinguishedName= filter succeeds
1864 when the scope is SCOPE_SUBTREE"""
1866 res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1867 scope=ldb.SCOPE_SUBTREE,
1868 expression="(distinguishedName=OU=OU1,DC=SAMBA,DC=ORG)")
1869 self.assertEqual(len(res11), 1)
1871 def test_distinguishedName_filter_base(self):
1872 """Testing that (incorrectly) a distinguishedName= filter works
1873 when the scope is SCOPE_BASE"""
1875 res11 = self.l.search(base="OU=OU1,DC=SAMBA,DC=ORG",
1876 scope=ldb.SCOPE_BASE,
1877 expression="(distinguishedName=OU=OU1,DC=SAMBA,DC=ORG)")
1879 # At some point we should fix this, but it isn't trivial
1880 self.assertEqual(len(res11), 1)
1882 def test_bad_dn_filter_base(self):
1883 """Testing that a dn= filter on an invalid DN works
1884 when the scope is SCOPE_BASE but
1885 returns zero results"""
1887 res11 = self.l.search(base="OU=OU1,DC=SAMBA,DC=ORG",
1888 scope=ldb.SCOPE_BASE,
1889 expression="(dn=OU=OU1,DC=SAMBA,DCXXXX)")
1891 # At some point we should fix this, but it isn't trivial
1892 self.assertEqual(len(res11), 0)
1895 def test_bad_dn_filter_one(self):
1896 """Testing that a dn= filter succeeds but returns zero
1897 results when the DN is not valid on a SCOPE_ONELEVEL search
1901 res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1902 scope=ldb.SCOPE_ONELEVEL,
1903 expression="(dn=OU=OU1,DC=SAMBA,DCXXXX)")
1904 self.assertEqual(len(res11), 0)
1906 def test_bad_dn_filter_subtree(self):
1907 """Testing that a dn= filter succeeds but returns zero
1908 results when the DN is not valid on a SCOPE_SUBTREE search
1912 res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1913 scope=ldb.SCOPE_SUBTREE,
1914 expression="(dn=OU=OU1,DC=SAMBA,DCXXXX)")
1915 self.assertEqual(len(res11), 0)
1917 def test_bad_distinguishedName_filter_base(self):
1918 """Testing that a distinguishedName= filter on an invalid DN works
1919 when the scope is SCOPE_BASE but
1920 returns zero results"""
1922 res11 = self.l.search(base="OU=OU1,DC=SAMBA,DC=ORG",
1923 scope=ldb.SCOPE_BASE,
1924 expression="(distinguishedName=OU=OU1,DC=SAMBA,DCXXXX)")
1926 # At some point we should fix this, but it isn't trivial
1927 self.assertEqual(len(res11), 0)
1930 def test_bad_distinguishedName_filter_one(self):
1931 """Testing that a distinguishedName= filter succeeds but returns zero
1932 results when the DN is not valid on a SCOPE_ONELEVEL search
1936 res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1937 scope=ldb.SCOPE_ONELEVEL,
1938 expression="(distinguishedName=OU=OU1,DC=SAMBA,DCXXXX)")
1939 self.assertEqual(len(res11), 0)
1941 def test_bad_distinguishedName_filter_subtree(self):
1942 """Testing that a distinguishedName= filter succeeds but returns zero
1943 results when the DN is not valid on a SCOPE_SUBTREE search
1947 res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1948 scope=ldb.SCOPE_SUBTREE,
1949 expression="(distinguishedName=OU=OU1,DC=SAMBA,DCXXXX)")
1950 self.assertEqual(len(res11), 0)
1952 def test_bad_dn_search_base(self):
1953 """Testing with a bad base DN (SCOPE_BASE)"""
1955 try:
1956 res11 = self.l.search(base="OU=OU1,DC=SAMBA,DCXXX",
1957 scope=ldb.SCOPE_BASE)
1958 self.fail("Should have failed with ERR_INVALID_DN_SYNTAX")
1959 except ldb.LdbError as err:
1960 enum = err.args[0]
1961 self.assertEqual(enum, ldb.ERR_INVALID_DN_SYNTAX)
1964 def test_bad_dn_search_one(self):
1965 """Testing with a bad base DN (SCOPE_ONELEVEL)"""
1967 try:
1968 res11 = self.l.search(base="DC=SAMBA,DCXXXX",
1969 scope=ldb.SCOPE_ONELEVEL)
1970 self.fail("Should have failed with ERR_INVALID_DN_SYNTAX")
1971 except ldb.LdbError as err:
1972 enum = err.args[0]
1973 self.assertEqual(enum, ldb.ERR_INVALID_DN_SYNTAX)
1975 def test_bad_dn_search_subtree(self):
1976 """Testing with a bad base DN (SCOPE_SUBTREE)"""
1978 try:
1979 res11 = self.l.search(base="DC=SAMBA,DCXXXX",
1980 scope=ldb.SCOPE_SUBTREE)
1981 self.fail("Should have failed with ERR_INVALID_DN_SYNTAX")
1982 except ldb.LdbError as err:
1983 enum = err.args[0]
1984 self.assertEqual(enum, ldb.ERR_INVALID_DN_SYNTAX)
1988 # Run the search tests against an lmdb backend
1989 class SearchTestsLmdb(SearchTests):
1991 def setUp(self):
1992 if os.environ.get('HAVE_LMDB', '1') == '0':
1993 self.skipTest("No lmdb backend")
1994 self.prefix = MDB_PREFIX
1995 self.index = MDB_INDEX_OBJ
1996 super(SearchTestsLmdb, self).setUp()
1998 def tearDown(self):
1999 super(SearchTestsLmdb, self).tearDown()
2002 class IndexedSearchTests(SearchTests):
2003 """Test searches using the index, to ensure the index doesn't
2004 break things"""
2006 def setUp(self):
2007 super(IndexedSearchTests, self).setUp()
2008 self.l.add({"dn": "@INDEXLIST",
2009 "@IDXATTR": [b"x", b"y", b"ou"]})
2010 self.IDX = True
2013 class IndexedCheckSearchTests(IndexedSearchTests):
2014 """Test searches using the index, to ensure the index doesn't
2015 break things (full scan disabled)"""
2017 def setUp(self):
2018 self.IDXCHECK = True
2019 super(IndexedCheckSearchTests, self).setUp()
2022 class IndexedSearchDnFilterTests(SearchTests):
2023 """Test searches using the index, to ensure the index doesn't
2024 break things"""
2026 def setUp(self):
2027 super(IndexedSearchDnFilterTests, self).setUp()
2028 self.l.add({"dn": "@OPTIONS",
2029 "disallowDNFilter": "TRUE"})
2030 self.disallowDNFilter = True
2032 self.l.add({"dn": "@INDEXLIST",
2033 "@IDXATTR": [b"x", b"y", b"ou"]})
2034 self.IDX = True
2037 class IndexedAndOneLevelSearchTests(SearchTests):
2038 """Test searches using the index including @IDXONE, to ensure
2039 the index doesn't break things"""
2041 def setUp(self):
2042 super(IndexedAndOneLevelSearchTests, self).setUp()
2043 self.l.add({"dn": "@INDEXLIST",
2044 "@IDXATTR": [b"x", b"y", b"ou"],
2045 "@IDXONE": [b"1"]})
2046 self.IDX = True
2047 self.IDXONE = True
2050 class IndexedCheckedAndOneLevelSearchTests(IndexedAndOneLevelSearchTests):
2051 """Test searches using the index including @IDXONE, to ensure
2052 the index doesn't break things (full scan disabled)"""
2054 def setUp(self):
2055 self.IDXCHECK = True
2056 super(IndexedCheckedAndOneLevelSearchTests, self).setUp()
2059 class IndexedAndOneLevelDNFilterSearchTests(SearchTests):
2060 """Test searches using the index including @IDXONE, to ensure
2061 the index doesn't break things"""
2063 def setUp(self):
2064 super(IndexedAndOneLevelDNFilterSearchTests, self).setUp()
2065 self.l.add({"dn": "@OPTIONS",
2066 "disallowDNFilter": "TRUE",
2067 "checkBaseOnSearch": "TRUE"})
2068 self.disallowDNFilter = True
2069 self.checkBaseOnSearch = True
2071 self.l.add({"dn": "@INDEXLIST",
2072 "@IDXATTR": [b"x", b"y", b"ou"],
2073 "@IDXONE": [b"1"]})
2074 self.IDX = True
2075 self.IDXONE = True
2078 class GUIDIndexedSearchTests(SearchTests):
2079 """Test searches using the index, to ensure the index doesn't
2080 break things"""
2082 def setUp(self):
2083 self.index = {"dn": "@INDEXLIST",
2084 "@IDXATTR": [b"x", b"y", b"ou"],
2085 "@IDXGUID": [b"objectUUID"],
2086 "@IDX_DN_GUID": [b"GUID"]}
2087 super(GUIDIndexedSearchTests, self).setUp()
2089 self.IDXGUID = True
2092 class GUIDIndexedDNFilterSearchTests(SearchTests):
2093 """Test searches using the index, to ensure the index doesn't
2094 break things"""
2096 def setUp(self):
2097 self.index = {"dn": "@INDEXLIST",
2098 "@IDXATTR": [b"x", b"y", b"ou"],
2099 "@IDXGUID": [b"objectUUID"],
2100 "@IDX_DN_GUID": [b"GUID"]}
2101 super(GUIDIndexedDNFilterSearchTests, self).setUp()
2102 self.l.add({"dn": "@OPTIONS",
2103 "disallowDNFilter": "TRUE",
2104 "checkBaseOnSearch": "TRUE"})
2105 self.disallowDNFilter = True
2106 self.checkBaseOnSearch = True
2107 self.IDX = True
2108 self.IDXGUID = True
2111 class GUIDAndOneLevelIndexedSearchTests(SearchTests):
2112 """Test searches using the index including @IDXONE, to ensure
2113 the index doesn't break things"""
2115 def setUp(self):
2116 self.index = {"dn": "@INDEXLIST",
2117 "@IDXATTR": [b"x", b"y", b"ou"],
2118 "@IDXONE": [b"1"],
2119 "@IDXGUID": [b"objectUUID"],
2120 "@IDX_DN_GUID": [b"GUID"]}
2121 super(GUIDAndOneLevelIndexedSearchTests, self).setUp()
2122 self.l.add({"dn": "@OPTIONS",
2123 "disallowDNFilter": "TRUE",
2124 "checkBaseOnSearch": "TRUE"})
2125 self.disallowDNFilter = True
2126 self.checkBaseOnSearch = True
2127 self.IDX = True
2128 self.IDXGUID = True
2129 self.IDXONE = True
2132 class GUIDIndexedSearchTestsLmdb(GUIDIndexedSearchTests):
2134 def setUp(self):
2135 if os.environ.get('HAVE_LMDB', '1') == '0':
2136 self.skipTest("No lmdb backend")
2137 self.prefix = MDB_PREFIX
2138 super(GUIDIndexedSearchTestsLmdb, self).setUp()
2140 def tearDown(self):
2141 super(GUIDIndexedSearchTestsLmdb, self).tearDown()
2144 class GUIDIndexedDNFilterSearchTestsLmdb(GUIDIndexedDNFilterSearchTests):
2146 def setUp(self):
2147 if os.environ.get('HAVE_LMDB', '1') == '0':
2148 self.skipTest("No lmdb backend")
2149 self.prefix = MDB_PREFIX
2150 super(GUIDIndexedDNFilterSearchTestsLmdb, self).setUp()
2152 def tearDown(self):
2153 super(GUIDIndexedDNFilterSearchTestsLmdb, self).tearDown()
2156 class GUIDAndOneLevelIndexedSearchTestsLmdb(GUIDAndOneLevelIndexedSearchTests):
2158 def setUp(self):
2159 if os.environ.get('HAVE_LMDB', '1') == '0':
2160 self.skipTest("No lmdb backend")
2161 self.prefix = MDB_PREFIX
2162 super(GUIDAndOneLevelIndexedSearchTestsLmdb, self).setUp()
2164 def tearDown(self):
2165 super(GUIDAndOneLevelIndexedSearchTestsLmdb, self).tearDown()
2168 class AddModifyTests(LdbBaseTest):
2169 def tearDown(self):
2170 shutil.rmtree(self.testdir)
2171 super(AddModifyTests, self).tearDown()
2173 # Ensure the LDB is closed now, so we close the FD
2174 del(self.l)
2176 def setUp(self):
2177 super(AddModifyTests, self).setUp()
2178 self.testdir = tempdir()
2179 self.filename = os.path.join(self.testdir, "add_test.ldb")
2180 self.l = ldb.Ldb(self.url(),
2181 flags=self.flags(),
2182 options=["modules:rdn_name"])
2183 try:
2184 self.l.add(self.index)
2185 except AttributeError:
2186 pass
2188 self.l.add({"dn": "DC=SAMBA,DC=ORG",
2189 "name": b"samba.org",
2190 "objectUUID": b"0123456789abcdef"})
2191 self.l.add({"dn": "@ATTRIBUTES",
2192 "objectUUID": "UNIQUE_INDEX"})
2194 def test_add_dup(self):
2195 self.l.add({"dn": "OU=DUP,DC=SAMBA,DC=ORG",
2196 "name": b"Admins",
2197 "x": "z", "y": "a",
2198 "objectUUID": b"0123456789abcde1"})
2199 try:
2200 self.l.add({"dn": "OU=DUP,DC=SAMBA,DC=ORG",
2201 "name": b"Admins",
2202 "x": "z", "y": "a",
2203 "objectUUID": b"0123456789abcde2"})
2204 self.fail("Should have failed adding duplicate entry")
2205 except ldb.LdbError as err:
2206 enum = err.args[0]
2207 self.assertEqual(enum, ldb.ERR_ENTRY_ALREADY_EXISTS)
2209 def test_add_bad(self):
2210 try:
2211 self.l.add({"dn": "BAD,DC=SAMBA,DC=ORG",
2212 "name": b"Admins",
2213 "x": "z", "y": "a",
2214 "objectUUID": b"0123456789abcde1"})
2215 self.fail("Should have failed adding entry with invalid DN")
2216 except ldb.LdbError as err:
2217 enum = err.args[0]
2218 self.assertEqual(enum, ldb.ERR_INVALID_DN_SYNTAX)
2220 def test_add_del_add(self):
2221 self.l.add({"dn": "OU=DUP,DC=SAMBA,DC=ORG",
2222 "name": b"Admins",
2223 "x": "z", "y": "a",
2224 "objectUUID": b"0123456789abcde1"})
2225 self.l.delete("OU=DUP,DC=SAMBA,DC=ORG")
2226 self.l.add({"dn": "OU=DUP,DC=SAMBA,DC=ORG",
2227 "name": b"Admins",
2228 "x": "z", "y": "a",
2229 "objectUUID": b"0123456789abcde2"})
2231 def test_add_move_add(self):
2232 self.l.add({"dn": "OU=DUP,DC=SAMBA,DC=ORG",
2233 "name": b"Admins",
2234 "x": "z", "y": "a",
2235 "objectUUID": b"0123456789abcde1"})
2236 self.l.rename("OU=DUP,DC=SAMBA,DC=ORG",
2237 "OU=DUP2,DC=SAMBA,DC=ORG")
2238 self.l.add({"dn": "OU=DUP,DC=SAMBA,DC=ORG",
2239 "name": b"Admins",
2240 "x": "z", "y": "a",
2241 "objectUUID": b"0123456789abcde2"})
2243 def test_add_move_fail_move_move(self):
2244 self.l.add({"dn": "OU=DUP,DC=SAMBA,DC=ORG",
2245 "name": b"Admins",
2246 "x": "z", "y": "a",
2247 "objectUUID": b"0123456789abcde1"})
2248 self.l.add({"dn": "OU=DUP2,DC=SAMBA,DC=ORG",
2249 "name": b"Admins",
2250 "x": "z", "y": "a",
2251 "objectUUID": b"0123456789abcde2"})
2253 res2 = self.l.search(base="DC=SAMBA,DC=ORG",
2254 scope=ldb.SCOPE_SUBTREE,
2255 expression="(objectUUID=0123456789abcde1)")
2256 self.assertEqual(len(res2), 1)
2257 self.assertEqual(str(res2[0].dn), "OU=DUP,DC=SAMBA,DC=ORG")
2259 res3 = self.l.search(base="DC=SAMBA,DC=ORG",
2260 scope=ldb.SCOPE_SUBTREE,
2261 expression="(objectUUID=0123456789abcde2)")
2262 self.assertEqual(len(res3), 1)
2263 self.assertEqual(str(res3[0].dn), "OU=DUP2,DC=SAMBA,DC=ORG")
2265 try:
2266 self.l.rename("OU=DUP,DC=SAMBA,DC=ORG",
2267 "OU=DUP2,DC=SAMBA,DC=ORG")
2268 self.fail("Should have failed on duplicate DN")
2269 except ldb.LdbError as err:
2270 enum = err.args[0]
2271 self.assertEqual(enum, ldb.ERR_ENTRY_ALREADY_EXISTS)
2273 self.l.rename("OU=DUP2,DC=SAMBA,DC=ORG",
2274 "OU=DUP3,DC=SAMBA,DC=ORG")
2276 self.l.rename("OU=DUP,DC=SAMBA,DC=ORG",
2277 "OU=DUP2,DC=SAMBA,DC=ORG")
2279 res2 = self.l.search(base="DC=SAMBA,DC=ORG",
2280 scope=ldb.SCOPE_SUBTREE,
2281 expression="(objectUUID=0123456789abcde1)")
2282 self.assertEqual(len(res2), 1)
2283 self.assertEqual(str(res2[0].dn), "OU=DUP2,DC=SAMBA,DC=ORG")
2285 res3 = self.l.search(base="DC=SAMBA,DC=ORG",
2286 scope=ldb.SCOPE_SUBTREE,
2287 expression="(objectUUID=0123456789abcde2)")
2288 self.assertEqual(len(res3), 1)
2289 self.assertEqual(str(res3[0].dn), "OU=DUP3,DC=SAMBA,DC=ORG")
2291 def test_move_missing(self):
2292 try:
2293 self.l.rename("OU=DUP,DC=SAMBA,DC=ORG",
2294 "OU=DUP2,DC=SAMBA,DC=ORG")
2295 self.fail("Should have failed on missing")
2296 except ldb.LdbError as err:
2297 enum = err.args[0]
2298 self.assertEqual(enum, ldb.ERR_NO_SUCH_OBJECT)
2300 def test_move_missing2(self):
2301 self.l.add({"dn": "OU=DUP2,DC=SAMBA,DC=ORG",
2302 "name": b"Admins",
2303 "x": "z", "y": "a",
2304 "objectUUID": b"0123456789abcde2"})
2306 try:
2307 self.l.rename("OU=DUP,DC=SAMBA,DC=ORG",
2308 "OU=DUP2,DC=SAMBA,DC=ORG")
2309 self.fail("Should have failed on missing")
2310 except ldb.LdbError as err:
2311 enum = err.args[0]
2312 self.assertEqual(enum, ldb.ERR_NO_SUCH_OBJECT)
2314 def test_move_bad(self):
2315 self.l.add({"dn": "OU=DUP2,DC=SAMBA,DC=ORG",
2316 "name": b"Admins",
2317 "x": "z", "y": "a",
2318 "objectUUID": b"0123456789abcde2"})
2320 try:
2321 self.l.rename("OUXDUP,DC=SAMBA,DC=ORG",
2322 "OU=DUP2,DC=SAMBA,DC=ORG")
2323 self.fail("Should have failed on invalid DN")
2324 except ldb.LdbError as err:
2325 enum = err.args[0]
2326 self.assertEqual(enum, ldb.ERR_INVALID_DN_SYNTAX)
2328 def test_move_bad2(self):
2329 self.l.add({"dn": "OU=DUP2,DC=SAMBA,DC=ORG",
2330 "name": b"Admins",
2331 "x": "z", "y": "a",
2332 "objectUUID": b"0123456789abcde2"})
2334 try:
2335 self.l.rename("OU=DUP,DC=SAMBA,DC=ORG",
2336 "OUXDUP2,DC=SAMBA,DC=ORG")
2337 self.fail("Should have failed on missing")
2338 except ldb.LdbError as err:
2339 enum = err.args[0]
2340 self.assertEqual(enum, ldb.ERR_INVALID_DN_SYNTAX)
2342 def test_move_fail_move_add(self):
2343 self.l.add({"dn": "OU=DUP,DC=SAMBA,DC=ORG",
2344 "name": b"Admins",
2345 "x": "z", "y": "a",
2346 "objectUUID": b"0123456789abcde1"})
2347 self.l.add({"dn": "OU=DUP2,DC=SAMBA,DC=ORG",
2348 "name": b"Admins",
2349 "x": "z", "y": "a",
2350 "objectUUID": b"0123456789abcde2"})
2351 try:
2352 self.l.rename("OU=DUP,DC=SAMBA,DC=ORG",
2353 "OU=DUP2,DC=SAMBA,DC=ORG")
2354 self.fail("Should have failed on duplicate DN")
2355 except ldb.LdbError as err:
2356 enum = err.args[0]
2357 self.assertEqual(enum, ldb.ERR_ENTRY_ALREADY_EXISTS)
2359 self.l.rename("OU=DUP2,DC=SAMBA,DC=ORG",
2360 "OU=DUP3,DC=SAMBA,DC=ORG")
2362 self.l.add({"dn": "OU=DUP2,DC=SAMBA,DC=ORG",
2363 "name": b"Admins",
2364 "x": "z", "y": "a",
2365 "objectUUID": b"0123456789abcde3"})
2368 class AddModifyTestsLmdb(AddModifyTests):
2370 def setUp(self):
2371 if os.environ.get('HAVE_LMDB', '1') == '0':
2372 self.skipTest("No lmdb backend")
2373 self.prefix = MDB_PREFIX
2374 self.index = MDB_INDEX_OBJ
2375 super(AddModifyTestsLmdb, self).setUp()
2377 def tearDown(self):
2378 super(AddModifyTestsLmdb, self).tearDown()
2381 class IndexedAddModifyTests(AddModifyTests):
2382 """Test searches using the index, to ensure the index doesn't
2383 break things"""
2385 def setUp(self):
2386 if not hasattr(self, 'index'):
2387 self.index = {"dn": "@INDEXLIST",
2388 "@IDXATTR": [b"x", b"y", b"ou", b"objectUUID", b"z"],
2389 "@IDXONE": [b"1"]}
2390 super(IndexedAddModifyTests, self).setUp()
2392 def test_duplicate_GUID(self):
2393 try:
2394 self.l.add({"dn": "OU=DUPGUID,DC=SAMBA,DC=ORG",
2395 "name": b"Admins",
2396 "x": "z", "y": "a",
2397 "objectUUID": b"0123456789abcdef"})
2398 self.fail("Should have failed adding duplicate GUID")
2399 except ldb.LdbError as err:
2400 enum = err.args[0]
2401 self.assertEqual(enum, ldb.ERR_CONSTRAINT_VIOLATION)
2403 def test_duplicate_name_dup_GUID(self):
2404 self.l.add({"dn": "OU=ADMIN,DC=SAMBA,DC=ORG",
2405 "name": b"Admins",
2406 "x": "z", "y": "a",
2407 "objectUUID": b"a123456789abcdef"})
2408 try:
2409 self.l.add({"dn": "OU=ADMIN,DC=SAMBA,DC=ORG",
2410 "name": b"Admins",
2411 "x": "z", "y": "a",
2412 "objectUUID": b"a123456789abcdef"})
2413 self.fail("Should have failed adding duplicate GUID")
2414 except ldb.LdbError as err:
2415 enum = err.args[0]
2416 self.assertEqual(enum, ldb.ERR_ENTRY_ALREADY_EXISTS)
2418 def test_duplicate_name_dup_GUID2(self):
2419 self.l.add({"dn": "OU=ADMIN,DC=SAMBA,DC=ORG",
2420 "name": b"Admins",
2421 "x": "z", "y": "a",
2422 "objectUUID": b"abc3456789abcdef"})
2423 try:
2424 self.l.add({"dn": "OU=ADMIN,DC=SAMBA,DC=ORG",
2425 "name": b"Admins",
2426 "x": "z", "y": "a",
2427 "objectUUID": b"aaa3456789abcdef"})
2428 self.fail("Should have failed adding duplicate DN")
2429 except ldb.LdbError as err:
2430 enum = err.args[0]
2431 self.assertEqual(enum, ldb.ERR_ENTRY_ALREADY_EXISTS)
2433 # Checking the GUID didn't stick in the index
2434 self.l.add({"dn": "OU=DUP,DC=SAMBA,DC=ORG",
2435 "name": b"Admins",
2436 "x": "z", "y": "a",
2437 "objectUUID": b"aaa3456789abcdef"})
2439 def test_add_dup_guid_add(self):
2440 self.l.add({"dn": "OU=DUP,DC=SAMBA,DC=ORG",
2441 "name": b"Admins",
2442 "x": "z", "y": "a",
2443 "objectUUID": b"0123456789abcde1"})
2444 try:
2445 self.l.add({"dn": "OU=DUP2,DC=SAMBA,DC=ORG",
2446 "name": b"Admins",
2447 "x": "z", "y": "a",
2448 "objectUUID": b"0123456789abcde1"})
2449 self.fail("Should have failed on duplicate GUID")
2451 except ldb.LdbError as err:
2452 enum = err.args[0]
2453 self.assertEqual(enum, ldb.ERR_CONSTRAINT_VIOLATION)
2455 self.l.add({"dn": "OU=DUP2,DC=SAMBA,DC=ORG",
2456 "name": b"Admins",
2457 "x": "z", "y": "a",
2458 "objectUUID": b"0123456789abcde2"})
2460 def test_duplicate_index_values(self):
2461 self.l.add({"dn": "OU=DIV1,DC=SAMBA,DC=ORG",
2462 "name": b"Admins",
2463 "z": "1",
2464 "objectUUID": b"0123456789abcdff"})
2465 self.l.add({"dn": "OU=DIV2,DC=SAMBA,DC=ORG",
2466 "name": b"Admins",
2467 "z": "1",
2468 "objectUUID": b"0123456789abcdfd"})
2471 class GUIDIndexedAddModifyTests(IndexedAddModifyTests):
2472 """Test searches using the index, to ensure the index doesn't
2473 break things"""
2475 def setUp(self):
2476 self.index = {"dn": "@INDEXLIST",
2477 "@IDXATTR": [b"x", b"y", b"ou"],
2478 "@IDXONE": [b"1"],
2479 "@IDXGUID": [b"objectUUID"],
2480 "@IDX_DN_GUID": [b"GUID"]}
2481 super(GUIDIndexedAddModifyTests, self).setUp()
2484 class GUIDTransIndexedAddModifyTests(GUIDIndexedAddModifyTests):
2485 """Test GUID index behaviour insdie the transaction"""
2487 def setUp(self):
2488 super(GUIDTransIndexedAddModifyTests, self).setUp()
2489 self.l.transaction_start()
2491 def tearDown(self):
2492 self.l.transaction_commit()
2493 super(GUIDTransIndexedAddModifyTests, self).tearDown()
2496 class TransIndexedAddModifyTests(IndexedAddModifyTests):
2497 """Test index behaviour insdie the transaction"""
2499 def setUp(self):
2500 super(TransIndexedAddModifyTests, self).setUp()
2501 self.l.transaction_start()
2503 def tearDown(self):
2504 self.l.transaction_commit()
2505 super(TransIndexedAddModifyTests, self).tearDown()
2508 class GuidIndexedAddModifyTestsLmdb(GUIDIndexedAddModifyTests):
2510 def setUp(self):
2511 if os.environ.get('HAVE_LMDB', '1') == '0':
2512 self.skipTest("No lmdb backend")
2513 self.prefix = MDB_PREFIX
2514 super(GuidIndexedAddModifyTestsLmdb, self).setUp()
2516 def tearDown(self):
2517 super(GuidIndexedAddModifyTestsLmdb, self).tearDown()
2520 class GuidTransIndexedAddModifyTestsLmdb(GUIDTransIndexedAddModifyTests):
2522 def setUp(self):
2523 if os.environ.get('HAVE_LMDB', '1') == '0':
2524 self.skipTest("No lmdb backend")
2525 self.prefix = MDB_PREFIX
2526 super(GuidTransIndexedAddModifyTestsLmdb, self).setUp()
2528 def tearDown(self):
2529 super(GuidTransIndexedAddModifyTestsLmdb, self).tearDown()
2532 class BadIndexTests(LdbBaseTest):
2533 def setUp(self):
2534 super(BadIndexTests, self).setUp()
2535 self.testdir = tempdir()
2536 self.filename = os.path.join(self.testdir, "test.ldb")
2537 self.ldb = ldb.Ldb(self.url(), flags=self.flags())
2538 if hasattr(self, 'IDXGUID'):
2539 self.ldb.add({"dn": "@INDEXLIST",
2540 "@IDXATTR": [b"x", b"y", b"ou"],
2541 "@IDXGUID": [b"objectUUID"],
2542 "@IDX_DN_GUID": [b"GUID"]})
2543 else:
2544 self.ldb.add({"dn": "@INDEXLIST",
2545 "@IDXATTR": [b"x", b"y", b"ou"]})
2547 super(BadIndexTests, self).setUp()
2549 def test_unique(self):
2550 self.ldb.add({"dn": "x=x,dc=samba,dc=org",
2551 "objectUUID": b"0123456789abcde1",
2552 "y": "1"})
2553 self.ldb.add({"dn": "x=y,dc=samba,dc=org",
2554 "objectUUID": b"0123456789abcde2",
2555 "y": "1"})
2556 self.ldb.add({"dn": "x=z,dc=samba,dc=org",
2557 "objectUUID": b"0123456789abcde3",
2558 "y": "1"})
2560 res = self.ldb.search(expression="(y=1)",
2561 base="dc=samba,dc=org")
2562 self.assertEqual(len(res), 3)
2564 # Now set this to unique index, but forget to check the result
2565 try:
2566 self.ldb.add({"dn": "@ATTRIBUTES",
2567 "y": "UNIQUE_INDEX"})
2568 self.fail()
2569 except ldb.LdbError:
2570 pass
2572 # We must still have a working index
2573 res = self.ldb.search(expression="(y=1)",
2574 base="dc=samba,dc=org")
2575 self.assertEqual(len(res), 3)
2577 def test_unique_transaction(self):
2578 self.ldb.add({"dn": "x=x,dc=samba,dc=org",
2579 "objectUUID": b"0123456789abcde1",
2580 "y": "1"})
2581 self.ldb.add({"dn": "x=y,dc=samba,dc=org",
2582 "objectUUID": b"0123456789abcde2",
2583 "y": "1"})
2584 self.ldb.add({"dn": "x=z,dc=samba,dc=org",
2585 "objectUUID": b"0123456789abcde3",
2586 "y": "1"})
2588 res = self.ldb.search(expression="(y=1)",
2589 base="dc=samba,dc=org")
2590 self.assertEqual(len(res), 3)
2592 self.ldb.transaction_start()
2594 # Now set this to unique index, but forget to check the result
2595 try:
2596 self.ldb.add({"dn": "@ATTRIBUTES",
2597 "y": "UNIQUE_INDEX"})
2598 except ldb.LdbError:
2599 pass
2601 try:
2602 self.ldb.transaction_commit()
2603 self.fail()
2605 except ldb.LdbError as err:
2606 enum = err.args[0]
2607 self.assertEqual(enum, ldb.ERR_OPERATIONS_ERROR)
2609 # We must still have a working index
2610 res = self.ldb.search(expression="(y=1)",
2611 base="dc=samba,dc=org")
2613 self.assertEqual(len(res), 3)
2615 def test_casefold(self):
2616 self.ldb.add({"dn": "x=x,dc=samba,dc=org",
2617 "objectUUID": b"0123456789abcde1",
2618 "y": "a"})
2619 self.ldb.add({"dn": "x=y,dc=samba,dc=org",
2620 "objectUUID": b"0123456789abcde2",
2621 "y": "A"})
2622 self.ldb.add({"dn": "x=z,dc=samba,dc=org",
2623 "objectUUID": b"0123456789abcde3",
2624 "y": ["a", "A"]})
2626 res = self.ldb.search(expression="(y=a)",
2627 base="dc=samba,dc=org")
2628 self.assertEqual(len(res), 2)
2630 self.ldb.add({"dn": "@ATTRIBUTES",
2631 "y": "CASE_INSENSITIVE"})
2633 # We must still have a working index
2634 res = self.ldb.search(expression="(y=a)",
2635 base="dc=samba,dc=org")
2637 if hasattr(self, 'IDXGUID'):
2638 self.assertEqual(len(res), 3)
2639 else:
2640 # We should not return this entry twice, but sadly
2641 # we have not yet fixed
2642 # https://bugzilla.samba.org/show_bug.cgi?id=13361
2643 self.assertEqual(len(res), 4)
2645 def test_casefold_transaction(self):
2646 self.ldb.add({"dn": "x=x,dc=samba,dc=org",
2647 "objectUUID": b"0123456789abcde1",
2648 "y": "a"})
2649 self.ldb.add({"dn": "x=y,dc=samba,dc=org",
2650 "objectUUID": b"0123456789abcde2",
2651 "y": "A"})
2652 self.ldb.add({"dn": "x=z,dc=samba,dc=org",
2653 "objectUUID": b"0123456789abcde3",
2654 "y": ["a", "A"]})
2656 res = self.ldb.search(expression="(y=a)",
2657 base="dc=samba,dc=org")
2658 self.assertEqual(len(res), 2)
2660 self.ldb.transaction_start()
2662 self.ldb.add({"dn": "@ATTRIBUTES",
2663 "y": "CASE_INSENSITIVE"})
2665 self.ldb.transaction_commit()
2667 # We must still have a working index
2668 res = self.ldb.search(expression="(y=a)",
2669 base="dc=samba,dc=org")
2671 if hasattr(self, 'IDXGUID'):
2672 self.assertEqual(len(res), 3)
2673 else:
2674 # We should not return this entry twice, but sadly
2675 # we have not yet fixed
2676 # https://bugzilla.samba.org/show_bug.cgi?id=13361
2677 self.assertEqual(len(res), 4)
2679 def test_modify_transaction(self):
2680 self.ldb.add({"dn": "x=y,dc=samba,dc=org",
2681 "objectUUID": b"0123456789abcde1",
2682 "y": "2",
2683 "z": "2"})
2685 res = self.ldb.search(expression="(y=2)",
2686 base="dc=samba,dc=org")
2687 self.assertEqual(len(res), 1)
2689 self.ldb.add({"dn": "@ATTRIBUTES",
2690 "y": "UNIQUE_INDEX"})
2692 self.ldb.transaction_start()
2694 m = ldb.Message()
2695 m.dn = ldb.Dn(self.ldb, "x=y,dc=samba,dc=org")
2696 m["0"] = ldb.MessageElement([], ldb.FLAG_MOD_DELETE, "y")
2697 m["1"] = ldb.MessageElement([], ldb.FLAG_MOD_DELETE, "not-here")
2699 try:
2700 self.ldb.modify(m)
2701 self.fail()
2703 except ldb.LdbError as err:
2704 enum = err.args[0]
2705 self.assertEqual(enum, ldb.ERR_NO_SUCH_ATTRIBUTE)
2707 try:
2708 self.ldb.transaction_commit()
2709 # We should fail here, but we want to be sure
2710 # we fail below
2712 except ldb.LdbError as err:
2713 enum = err.args[0]
2714 self.assertEqual(enum, ldb.ERR_OPERATIONS_ERROR)
2716 # The index should still be pointing to x=y
2717 res = self.ldb.search(expression="(y=2)",
2718 base="dc=samba,dc=org")
2719 self.assertEqual(len(res), 1)
2721 try:
2722 self.ldb.add({"dn": "x=y2,dc=samba,dc=org",
2723 "objectUUID": b"0123456789abcde2",
2724 "y": "2"})
2725 self.fail("Added unique attribute twice")
2726 except ldb.LdbError as err:
2727 enum = err.args[0]
2728 self.assertEqual(enum, ldb.ERR_CONSTRAINT_VIOLATION)
2730 res = self.ldb.search(expression="(y=2)",
2731 base="dc=samba,dc=org")
2732 self.assertEqual(len(res), 1)
2733 self.assertEqual(str(res[0].dn), "x=y,dc=samba,dc=org")
2735 def tearDown(self):
2736 super(BadIndexTests, self).tearDown()
2739 class GUIDBadIndexTests(BadIndexTests):
2740 """Test Bad index things with GUID index mode"""
2742 def setUp(self):
2743 self.IDXGUID = True
2745 super(GUIDBadIndexTests, self).setUp()
2748 class GUIDBadIndexTestsLmdb(BadIndexTests):
2750 def setUp(self):
2751 if os.environ.get('HAVE_LMDB', '1') == '0':
2752 self.skipTest("No lmdb backend")
2753 self.prefix = MDB_PREFIX
2754 self.index = MDB_INDEX_OBJ
2755 self.IDXGUID = True
2756 super(GUIDBadIndexTestsLmdb, self).setUp()
2758 def tearDown(self):
2759 super(GUIDBadIndexTestsLmdb, self).tearDown()
2762 class BatchModeTests(LdbBaseTest):
2764 def setUp(self):
2765 super(BatchModeTests, self).setUp()
2766 self.testdir = tempdir()
2767 self.filename = os.path.join(self.testdir, "test.ldb")
2768 self.ldb = ldb.Ldb(self.url(),
2769 flags=self.flags(),
2770 options=["batch_mode:1"])
2771 if hasattr(self, 'IDXGUID'):
2772 self.ldb.add({"dn": "@INDEXLIST",
2773 "@IDXATTR": [b"x", b"y", b"ou"],
2774 "@IDXGUID": [b"objectUUID"],
2775 "@IDX_DN_GUID": [b"GUID"]})
2776 else:
2777 self.ldb.add({"dn": "@INDEXLIST",
2778 "@IDXATTR": [b"x", b"y", b"ou"]})
2780 def test_modify_transaction(self):
2781 self.ldb.add({"dn": "x=y,dc=samba,dc=org",
2782 "objectUUID": b"0123456789abcde1",
2783 "y": "2",
2784 "z": "2"})
2786 res = self.ldb.search(expression="(y=2)",
2787 base="dc=samba,dc=org")
2788 self.assertEqual(len(res), 1)
2790 self.ldb.add({"dn": "@ATTRIBUTES",
2791 "y": "UNIQUE_INDEX"})
2793 self.ldb.transaction_start()
2795 m = ldb.Message()
2796 m.dn = ldb.Dn(self.ldb, "x=y,dc=samba,dc=org")
2797 m["0"] = ldb.MessageElement([], ldb.FLAG_MOD_DELETE, "y")
2798 m["1"] = ldb.MessageElement([], ldb.FLAG_MOD_DELETE, "not-here")
2800 try:
2801 self.ldb.modify(m)
2802 self.fail()
2804 except ldb.LdbError as err:
2805 enum = err.args[0]
2806 self.assertEqual(enum, ldb.ERR_NO_SUCH_ATTRIBUTE)
2808 try:
2809 self.ldb.transaction_commit()
2810 self.fail("Commit should have failed as we were in batch mode")
2811 except ldb.LdbError as err:
2812 enum = err.args[0]
2813 self.assertEqual(enum, ldb.ERR_OPERATIONS_ERROR)
2815 def tearDown(self):
2816 super(BatchModeTests, self).tearDown()
2819 class DnTests(TestCase):
2821 def setUp(self):
2822 super(DnTests, self).setUp()
2823 self.ldb = ldb.Ldb()
2825 def tearDown(self):
2826 super(DnTests, self).tearDown()
2827 del(self.ldb)
2829 def test_set_dn_invalid(self):
2830 x = ldb.Message()
2832 def assign():
2833 x.dn = "astring"
2834 self.assertRaises(TypeError, assign)
2836 def test_eq(self):
2837 x = ldb.Dn(self.ldb, "dc=foo11,bar=bloe")
2838 y = ldb.Dn(self.ldb, "dc=foo11,bar=bloe")
2839 self.assertEqual(x, y)
2840 y = ldb.Dn(self.ldb, "dc=foo11,bar=blie")
2841 self.assertNotEqual(x, y)
2843 def test_str(self):
2844 x = ldb.Dn(self.ldb, "dc=foo12,bar=bloe")
2845 self.assertEqual(x.__str__(), "dc=foo12,bar=bloe")
2847 def test_repr(self):
2848 x = ldb.Dn(self.ldb, "dc=foo13,bla=blie")
2849 self.assertEqual(x.__repr__(), "Dn('dc=foo13,bla=blie')")
2851 def test_get_casefold_2(self):
2852 x = ldb.Dn(self.ldb, "dc=foo14,bar=bloe")
2853 self.assertEqual(x.get_casefold(), "DC=FOO14,BAR=bloe")
2855 def test_get_casefold_dotted_i(self):
2856 x = ldb.Dn(self.ldb, "dc=foo14,bir=blie")
2857 self.assertEqual(x.get_casefold(), "DC=FOO14,BIR=blie")
2859 def test_validate(self):
2860 x = ldb.Dn(self.ldb, "dc=foo15,bar=bloe")
2861 self.assertTrue(x.validate())
2863 def test_parent(self):
2864 x = ldb.Dn(self.ldb, "dc=foo16,bar=bloe")
2865 self.assertEqual("bar=bloe", x.parent().__str__())
2867 def test_parent_nonexistent(self):
2868 x = ldb.Dn(self.ldb, "@BLA")
2869 self.assertEqual(None, x.parent())
2871 def test_is_valid(self):
2872 x = ldb.Dn(self.ldb, "dc=foo18,dc=bloe")
2873 self.assertTrue(x.is_valid())
2874 x = ldb.Dn(self.ldb, "")
2875 self.assertTrue(x.is_valid())
2877 def test_is_special(self):
2878 x = ldb.Dn(self.ldb, "dc=foo19,bar=bloe")
2879 self.assertFalse(x.is_special())
2880 x = ldb.Dn(self.ldb, "@FOOBAR")
2881 self.assertTrue(x.is_special())
2883 def test_check_special(self):
2884 x = ldb.Dn(self.ldb, "dc=foo20,bar=bloe")
2885 self.assertFalse(x.check_special("FOOBAR"))
2886 x = ldb.Dn(self.ldb, "@FOOBAR")
2887 self.assertTrue(x.check_special("@FOOBAR"))
2889 def test_len(self):
2890 x = ldb.Dn(self.ldb, "dc=foo21,bar=bloe")
2891 self.assertEqual(2, len(x))
2892 x = ldb.Dn(self.ldb, "dc=foo21")
2893 self.assertEqual(1, len(x))
2895 def test_add_child(self):
2896 x = ldb.Dn(self.ldb, "dc=foo22,bar=bloe")
2897 self.assertTrue(x.add_child(ldb.Dn(self.ldb, "bla=bloe")))
2898 self.assertEqual("bla=bloe,dc=foo22,bar=bloe", x.__str__())
2900 def test_add_base(self):
2901 x = ldb.Dn(self.ldb, "dc=foo23,bar=bloe")
2902 base = ldb.Dn(self.ldb, "bla=bloe")
2903 self.assertTrue(x.add_base(base))
2904 self.assertEqual("dc=foo23,bar=bloe,bla=bloe", x.__str__())
2906 def test_add_child_str(self):
2907 x = ldb.Dn(self.ldb, "dc=foo22,bar=bloe")
2908 self.assertTrue(x.add_child("bla=bloe"))
2909 self.assertEqual("bla=bloe,dc=foo22,bar=bloe", x.__str__())
2911 def test_add_base_str(self):
2912 x = ldb.Dn(self.ldb, "dc=foo23,bar=bloe")
2913 base = "bla=bloe"
2914 self.assertTrue(x.add_base(base))
2915 self.assertEqual("dc=foo23,bar=bloe,bla=bloe", x.__str__())
2917 def test_add(self):
2918 x = ldb.Dn(self.ldb, "dc=foo24")
2919 y = ldb.Dn(self.ldb, "bar=bla")
2920 self.assertEqual("dc=foo24,bar=bla", str(x + y))
2922 def test_remove_base_components(self):
2923 x = ldb.Dn(self.ldb, "dc=foo24,dc=samba,dc=org")
2924 x.remove_base_components(len(x) - 1)
2925 self.assertEqual("dc=foo24", str(x))
2927 def test_parse_ldif(self):
2928 msgs = self.ldb.parse_ldif("dn: foo=bar\n")
2929 msg = next(msgs)
2930 self.assertEqual("foo=bar", str(msg[1].dn))
2931 self.assertTrue(isinstance(msg[1], ldb.Message))
2932 ldif = self.ldb.write_ldif(msg[1], ldb.CHANGETYPE_NONE)
2933 self.assertEqual("dn: foo=bar\n\n", ldif)
2935 def test_parse_ldif_more(self):
2936 msgs = self.ldb.parse_ldif("dn: foo=bar\n\n\ndn: bar=bar")
2937 msg = next(msgs)
2938 self.assertEqual("foo=bar", str(msg[1].dn))
2939 msg = next(msgs)
2940 self.assertEqual("bar=bar", str(msg[1].dn))
2942 def test_print_ldif(self):
2943 ldif = '''dn: dc=foo27
2944 foo: foo
2947 self.msg = ldb.Message(ldb.Dn(self.ldb, "dc=foo27"))
2948 self.msg["foo"] = [b"foo"]
2949 self.assertEqual(ldif,
2950 self.ldb.write_ldif(self.msg,
2951 ldb.CHANGETYPE_NONE))
2953 def test_print_ldif_binary(self):
2954 # this also confirms that ldb flags are set even without a URL)
2955 self.ldb = ldb.Ldb(flags=ldb.FLG_SHOW_BINARY)
2956 ldif = '''dn: dc=foo27
2957 foo: f
2958 öö
2961 self.msg = ldb.Message(ldb.Dn(self.ldb, "dc=foo27"))
2962 self.msg["foo"] = ["f\nöö"]
2963 self.assertEqual(ldif,
2964 self.ldb.write_ldif(self.msg,
2965 ldb.CHANGETYPE_NONE))
2968 def test_print_ldif_no_base64_bad(self):
2969 ldif = '''dn: dc=foo27
2970 foo: f
2971 öö
2974 self.msg = ldb.Message(ldb.Dn(self.ldb, "dc=foo27"))
2975 self.msg["foo"] = ["f\nöö"]
2976 self.msg["foo"].set_flags(ldb.FLAG_FORCE_NO_BASE64_LDIF)
2977 self.assertEqual(ldif,
2978 self.ldb.write_ldif(self.msg,
2979 ldb.CHANGETYPE_NONE))
2981 def test_print_ldif_no_base64_good(self):
2982 ldif = '''dn: dc=foo27
2983 foo: föö
2986 self.msg = ldb.Message(ldb.Dn(self.ldb, "dc=foo27"))
2987 self.msg["foo"] = ["föö"]
2988 self.msg["foo"].set_flags(ldb.FLAG_FORCE_NO_BASE64_LDIF)
2989 self.assertEqual(ldif,
2990 self.ldb.write_ldif(self.msg,
2991 ldb.CHANGETYPE_NONE))
2993 def test_canonical_string(self):
2994 x = ldb.Dn(self.ldb, "dc=foo25,bar=bloe")
2995 self.assertEqual("/bloe/foo25", x.canonical_str())
2997 def test_canonical_ex_string(self):
2998 x = ldb.Dn(self.ldb, "dc=foo26,bar=bloe")
2999 self.assertEqual("/bloe\nfoo26", x.canonical_ex_str())
3001 def test_ldb_is_child_of(self):
3002 """Testing ldb_dn_compare_dn"""
3003 dn1 = ldb.Dn(self.ldb, "dc=base")
3004 dn2 = ldb.Dn(self.ldb, "cn=foo,dc=base")
3005 dn3 = ldb.Dn(self.ldb, "cn=bar,dc=base")
3006 dn4 = ldb.Dn(self.ldb, "cn=baz,cn=bar,dc=base")
3008 self.assertTrue(dn1.is_child_of(dn1))
3009 self.assertTrue(dn2.is_child_of(dn1))
3010 self.assertTrue(dn4.is_child_of(dn1))
3011 self.assertTrue(dn4.is_child_of(dn3))
3012 self.assertTrue(dn4.is_child_of(dn4))
3013 self.assertFalse(dn3.is_child_of(dn2))
3014 self.assertFalse(dn1.is_child_of(dn4))
3016 def test_ldb_is_child_of_str(self):
3017 """Testing ldb_dn_compare_dn"""
3018 dn1_str = "dc=base"
3019 dn2_str = "cn=foo,dc=base"
3020 dn3_str = "cn=bar,dc=base"
3021 dn4_str = "cn=baz,cn=bar,dc=base"
3023 dn1 = ldb.Dn(self.ldb, dn1_str)
3024 dn2 = ldb.Dn(self.ldb, dn2_str)
3025 dn3 = ldb.Dn(self.ldb, dn3_str)
3026 dn4 = ldb.Dn(self.ldb, dn4_str)
3028 self.assertTrue(dn1.is_child_of(dn1_str))
3029 self.assertTrue(dn2.is_child_of(dn1_str))
3030 self.assertTrue(dn4.is_child_of(dn1_str))
3031 self.assertTrue(dn4.is_child_of(dn3_str))
3032 self.assertTrue(dn4.is_child_of(dn4_str))
3033 self.assertFalse(dn3.is_child_of(dn2_str))
3034 self.assertFalse(dn1.is_child_of(dn4_str))
3036 def test_get_component_name(self):
3037 dn = ldb.Dn(self.ldb, "cn=foo,dc=base")
3038 self.assertEqual(dn.get_component_name(0), 'cn')
3039 self.assertEqual(dn.get_component_name(1), 'dc')
3040 self.assertEqual(dn.get_component_name(2), None)
3041 self.assertEqual(dn.get_component_name(-1), None)
3043 def test_get_component_value(self):
3044 dn = ldb.Dn(self.ldb, "cn=foo,dc=base")
3045 self.assertEqual(dn.get_component_value(0), 'foo')
3046 self.assertEqual(dn.get_component_value(1), 'base')
3047 self.assertEqual(dn.get_component_name(2), None)
3048 self.assertEqual(dn.get_component_name(-1), None)
3050 def test_set_component(self):
3051 dn = ldb.Dn(self.ldb, "cn=foo,dc=base")
3052 dn.set_component(0, 'cn', 'bar')
3053 self.assertEqual(str(dn), "cn=bar,dc=base")
3054 dn.set_component(1, 'o', 'asep')
3055 self.assertEqual(str(dn), "cn=bar,o=asep")
3056 self.assertRaises(TypeError, dn.set_component, 2, 'dc', 'base')
3057 self.assertEqual(str(dn), "cn=bar,o=asep")
3058 dn.set_component(1, 'o', 'a,b+c')
3059 self.assertEqual(str(dn), r"cn=bar,o=a\,b\+c")
3061 def test_set_component_bytes(self):
3062 dn = ldb.Dn(self.ldb, "cn=foo,dc=base")
3063 dn.set_component(0, 'cn', b'bar')
3064 self.assertEqual(str(dn), "cn=bar,dc=base")
3065 dn.set_component(1, 'o', b'asep')
3066 self.assertEqual(str(dn), "cn=bar,o=asep")
3068 def test_set_component_none(self):
3069 dn = ldb.Dn(self.ldb, "cn=foo,cn=bar,dc=base")
3070 self.assertRaises(TypeError, dn.set_component, 1, 'cn', None)
3072 def test_get_extended_component_null(self):
3073 dn = ldb.Dn(self.ldb, "cn=foo,cn=bar,dc=base")
3074 self.assertEqual(dn.get_extended_component("TEST"), None)
3076 def test_get_extended_component(self):
3077 self.ldb._register_test_extensions()
3078 dn = ldb.Dn(self.ldb, "<TEST=foo>;cn=bar,dc=base")
3079 self.assertEqual(dn.get_extended_component("TEST"), b"foo")
3081 def test_set_extended_component(self):
3082 self.ldb._register_test_extensions()
3083 dn = ldb.Dn(self.ldb, "dc=base")
3084 dn.set_extended_component("TEST", "foo")
3085 self.assertEqual(dn.get_extended_component("TEST"), b"foo")
3086 dn.set_extended_component("TEST", b"bar")
3087 self.assertEqual(dn.get_extended_component("TEST"), b"bar")
3089 def test_extended_str(self):
3090 self.ldb._register_test_extensions()
3091 dn = ldb.Dn(self.ldb, "<TEST=foo>;cn=bar,dc=base")
3092 self.assertEqual(dn.extended_str(), "<TEST=foo>;cn=bar,dc=base")
3094 def test_get_rdn_name(self):
3095 dn = ldb.Dn(self.ldb, "cn=foo,dc=base")
3096 self.assertEqual(dn.get_rdn_name(), 'cn')
3098 def test_get_rdn_value(self):
3099 dn = ldb.Dn(self.ldb, "cn=foo,dc=base")
3100 self.assertEqual(dn.get_rdn_value(), 'foo')
3102 def test_get_casefold(self):
3103 dn = ldb.Dn(self.ldb, "cn=foo,dc=base")
3104 self.assertEqual(dn.get_casefold(), 'CN=FOO,DC=BASE')
3106 def test_get_linearized(self):
3107 dn = ldb.Dn(self.ldb, "cn=foo,dc=base")
3108 self.assertEqual(dn.get_linearized(), 'cn=foo,dc=base')
3110 def test_is_null(self):
3111 dn = ldb.Dn(self.ldb, "cn=foo,dc=base")
3112 self.assertFalse(dn.is_null())
3114 dn = ldb.Dn(self.ldb, '')
3115 self.assertTrue(dn.is_null())
3118 class LdbMsgTests(TestCase):
3120 def setUp(self):
3121 super(LdbMsgTests, self).setUp()
3122 self.msg = ldb.Message()
3124 def test_init_dn(self):
3125 self.msg = ldb.Message(ldb.Dn(ldb.Ldb(), "dc=foo27"))
3126 self.assertEqual("dc=foo27", str(self.msg.dn))
3128 def test_iter_items(self):
3129 self.assertEqual(0, len(self.msg.items()))
3130 self.msg.dn = ldb.Dn(ldb.Ldb(), "dc=foo28")
3131 self.assertEqual(1, len(self.msg.items()))
3133 def test_items(self):
3134 self.msg["foo"] = ["foo"]
3135 self.msg["bar"] = ["bar"]
3136 try:
3137 items = self.msg.items()
3138 except:
3139 self.fail()
3140 self.assertEqual([("foo", ldb.MessageElement(["foo"])),
3141 ("bar", ldb.MessageElement(["bar"]))],
3142 items)
3144 self.msg.dn = ldb.Dn(ldb.Ldb(), "dc=test")
3145 try:
3146 items = self.msg.items()
3147 except:
3148 self.fail()
3149 self.assertEqual([("dn", ldb.Dn(ldb.Ldb(), "dc=test")),
3150 ("foo", ldb.MessageElement(["foo"])),
3151 ("bar", ldb.MessageElement(["bar"]))],
3152 items)
3154 def test_repr(self):
3155 self.msg.dn = ldb.Dn(ldb.Ldb(), "dc=foo29")
3156 self.msg["dc"] = b"foo"
3157 self.assertIn(repr(self.msg), [
3158 "Message({'dn': Dn('dc=foo29'), 'dc': MessageElement([b'foo'])})",
3159 "Message({'dc': MessageElement([b'foo']), 'dn': Dn('dc=foo29')})",
3161 self.assertIn(repr(self.msg.text), [
3162 "Message({'dn': Dn('dc=foo29'), 'dc': MessageElement([b'foo'])}).text",
3163 "Message({'dc': MessageElement([b'foo']), 'dn': Dn('dc=foo29')}).text",
3166 def test_len(self):
3167 self.assertEqual(0, len(self.msg))
3169 def test_notpresent(self):
3170 self.assertRaises(KeyError, lambda: self.msg["foo"])
3172 def test_invalid(self):
3173 try:
3174 self.assertRaises(TypeError, lambda: self.msg[42])
3175 except KeyError:
3176 self.fail()
3178 def test_del(self):
3179 del self.msg["foo"]
3181 def test_add(self):
3182 self.msg.add(ldb.MessageElement([b"456"], ldb.FLAG_MOD_ADD, "bla"))
3184 def test_add_text(self):
3185 self.msg.add(ldb.MessageElement(["456"], ldb.FLAG_MOD_ADD, "bla"))
3187 def test_elements_empty(self):
3188 self.assertEqual([], self.msg.elements())
3190 def test_elements(self):
3191 el = ldb.MessageElement([b"456"], ldb.FLAG_MOD_ADD, "bla")
3192 self.msg.add(el)
3193 self.assertEqual([el], self.msg.elements())
3194 self.assertEqual([el.text], self.msg.text.elements())
3196 def test_add_value(self):
3197 self.assertEqual(0, len(self.msg))
3198 self.msg["foo"] = [b"foo"]
3199 self.assertEqual(1, len(self.msg))
3201 def test_add_value_text(self):
3202 self.assertEqual(0, len(self.msg))
3203 self.msg["foo"] = ["foo"]
3204 self.assertEqual(1, len(self.msg))
3206 def test_add_value_multiple(self):
3207 self.assertEqual(0, len(self.msg))
3208 self.msg["foo"] = [b"foo", b"bla"]
3209 self.assertEqual(1, len(self.msg))
3210 self.assertEqual([b"foo", b"bla"], list(self.msg["foo"]))
3212 def test_add_value_multiple_text(self):
3213 self.assertEqual(0, len(self.msg))
3214 self.msg["foo"] = ["foo", "bla"]
3215 self.assertEqual(1, len(self.msg))
3216 self.assertEqual(["foo", "bla"], list(self.msg.text["foo"]))
3218 def test_set_value(self):
3219 self.msg["foo"] = [b"fool"]
3220 self.assertEqual([b"fool"], list(self.msg["foo"]))
3221 self.msg["foo"] = [b"bar"]
3222 self.assertEqual([b"bar"], list(self.msg["foo"]))
3224 def test_set_value_text(self):
3225 self.msg["foo"] = ["fool"]
3226 self.assertEqual(["fool"], list(self.msg.text["foo"]))
3227 self.msg["foo"] = ["bar"]
3228 self.assertEqual(["bar"], list(self.msg.text["foo"]))
3230 def test_keys(self):
3231 self.msg.dn = ldb.Dn(ldb.Ldb(), "@BASEINFO")
3232 self.msg["foo"] = [b"bla"]
3233 self.msg["bar"] = [b"bla"]
3234 self.assertEqual(["dn", "foo", "bar"], list(self.msg.keys()))
3236 def test_keys_text(self):
3237 self.msg.dn = ldb.Dn(ldb.Ldb(), "@BASEINFO")
3238 self.msg["foo"] = ["bla"]
3239 self.msg["bar"] = ["bla"]
3240 self.assertEqual(["dn", "foo", "bar"], list(self.msg.text.keys()))
3242 def test_dn(self):
3243 self.msg.dn = ldb.Dn(ldb.Ldb(), "@BASEINFO")
3244 self.assertEqual("@BASEINFO", self.msg.dn.__str__())
3246 def test_get_dn(self):
3247 self.msg.dn = ldb.Dn(ldb.Ldb(), "@BASEINFO")
3248 self.assertEqual("@BASEINFO", self.msg.get("dn").__str__())
3250 def test_dn_text(self):
3251 self.msg.text.dn = ldb.Dn(ldb.Ldb(), "@BASEINFO")
3252 self.assertEqual("@BASEINFO", str(self.msg.dn))
3253 self.assertEqual("@BASEINFO", str(self.msg.text.dn))
3255 def test_get_dn_text(self):
3256 self.msg.dn = ldb.Dn(ldb.Ldb(), "@BASEINFO")
3257 self.assertEqual("@BASEINFO", str(self.msg.get("dn")))
3258 self.assertEqual("@BASEINFO", str(self.msg.text.get("dn")))
3260 def test_get_invalid(self):
3261 self.msg.dn = ldb.Dn(ldb.Ldb(), "@BASEINFO")
3262 self.assertRaises(TypeError, self.msg.get, 42)
3264 def test_get_other(self):
3265 self.msg["foo"] = [b"bar"]
3266 self.assertEqual(b"bar", self.msg.get("foo")[0])
3267 self.assertEqual(b"bar", self.msg.get("foo", idx=0))
3268 self.assertEqual(None, self.msg.get("foo", idx=1))
3269 self.assertEqual("", self.msg.get("foo", default='', idx=1))
3271 def test_get_other_text(self):
3272 self.msg["foo"] = ["bar"]
3273 self.assertEqual(["bar"], list(self.msg.text.get("foo")))
3274 self.assertEqual("bar", self.msg.text.get("foo")[0])
3275 self.assertEqual("bar", self.msg.text.get("foo", idx=0))
3276 self.assertEqual(None, self.msg.get("foo", idx=1))
3277 self.assertEqual("", self.msg.get("foo", default='', idx=1))
3279 def test_get_default(self):
3280 self.assertEqual(None, self.msg.get("tatayoyo", idx=0))
3281 self.assertEqual("anniecordie", self.msg.get("tatayoyo", "anniecordie"))
3283 def test_get_default_text(self):
3284 self.assertEqual(None, self.msg.text.get("tatayoyo", idx=0))
3285 self.assertEqual("anniecordie", self.msg.text.get("tatayoyo", "anniecordie"))
3287 def test_get_unknown(self):
3288 self.assertEqual(None, self.msg.get("lalalala"))
3290 def test_get_unknown_text(self):
3291 self.assertEqual(None, self.msg.text.get("lalalala"))
3293 def test_contains(self):
3294 self.msg['foo'] = ['bar']
3295 self.assertIn('foo', self.msg)
3297 self.msg['Foo'] = ['bar']
3298 self.assertIn('Foo', self.msg)
3300 def test_contains_case(self):
3301 self.msg['foo'] = ['bar']
3302 self.assertIn('Foo', self.msg)
3304 self.msg['Foo'] = ['bar']
3305 self.assertIn('foo', self.msg)
3307 def test_contains_dn(self):
3308 self.assertIn('dn', self.msg)
3310 def test_contains_dn_case(self):
3311 self.assertIn('DN', self.msg)
3313 def test_contains_invalid(self):
3314 self.assertRaises(TypeError, lambda: None in self.msg)
3316 def test_msg_diff(self):
3317 l = ldb.Ldb()
3318 msgs = l.parse_ldif("dn: foo=bar\nfoo: bar\nbaz: do\n\ndn: foo=bar\nfoo: bar\nbaz: dont\n")
3319 msg1 = next(msgs)[1]
3320 msg2 = next(msgs)[1]
3321 msgdiff = l.msg_diff(msg1, msg2)
3322 self.assertEqual("foo=bar", msgdiff.get("dn").__str__())
3323 self.assertRaises(KeyError, lambda: msgdiff["foo"])
3324 self.assertEqual(1, len(msgdiff))
3326 def test_equal_empty(self):
3327 msg1 = ldb.Message()
3328 msg2 = ldb.Message()
3329 self.assertEqual(msg1, msg2)
3331 def test_equal_simplel(self):
3332 db = ldb.Ldb()
3333 msg1 = ldb.Message()
3334 msg1.dn = ldb.Dn(db, "foo=bar")
3335 msg2 = ldb.Message()
3336 msg2.dn = ldb.Dn(db, "foo=bar")
3337 self.assertEqual(msg1, msg2)
3338 msg1['foo'] = b'bar'
3339 msg2['foo'] = b'bar'
3340 self.assertEqual(msg1, msg2)
3341 msg2['foo'] = b'blie'
3342 self.assertNotEqual(msg1, msg2)
3343 msg2['foo'] = b'blie'
3345 def test_from_dict(self):
3346 rec = {"dn": "dc=fromdict",
3347 "a1": [b"a1-val1", b"a1-val1"]}
3348 l = ldb.Ldb()
3349 # check different types of input Flags
3350 for flags in [ldb.FLAG_MOD_ADD, ldb.FLAG_MOD_REPLACE, ldb.FLAG_MOD_DELETE]:
3351 m = ldb.Message.from_dict(l, rec, flags)
3352 self.assertEqual(rec["a1"], list(m["a1"]))
3353 self.assertEqual(flags, m["a1"].flags())
3354 # check input params
3355 self.assertRaises(TypeError, ldb.Message.from_dict, dict(), rec, ldb.FLAG_MOD_REPLACE)
3356 self.assertRaises(TypeError, ldb.Message.from_dict, l, list(), ldb.FLAG_MOD_REPLACE)
3357 self.assertRaises(ValueError, ldb.Message.from_dict, l, rec, 0)
3358 # Message.from_dict expects dictionary with 'dn'
3359 err_rec = {"a1": [b"a1-val1", b"a1-val1"]}
3360 self.assertRaises(TypeError, ldb.Message.from_dict, l, err_rec, ldb.FLAG_MOD_REPLACE)
3362 def test_from_dict_text(self):
3363 rec = {"dn": "dc=fromdict",
3364 "a1": ["a1-val1", "a1-val1"]}
3365 l = ldb.Ldb()
3366 # check different types of input Flags
3367 for flags in [ldb.FLAG_MOD_ADD, ldb.FLAG_MOD_REPLACE, ldb.FLAG_MOD_DELETE]:
3368 m = ldb.Message.from_dict(l, rec, flags)
3369 self.assertEqual(rec["a1"], list(m.text["a1"]))
3370 self.assertEqual(flags, m.text["a1"].flags())
3371 # check input params
3372 self.assertRaises(TypeError, ldb.Message.from_dict, dict(), rec, ldb.FLAG_MOD_REPLACE)
3373 self.assertRaises(TypeError, ldb.Message.from_dict, l, list(), ldb.FLAG_MOD_REPLACE)
3374 self.assertRaises(ValueError, ldb.Message.from_dict, l, rec, 0)
3375 # Message.from_dict expects dictionary with 'dn'
3376 err_rec = {"a1": ["a1-val1", "a1-val1"]}
3377 self.assertRaises(TypeError, ldb.Message.from_dict, l, err_rec, ldb.FLAG_MOD_REPLACE)
3379 def test_copy_add_message_element(self):
3380 m = ldb.Message()
3381 m["1"] = ldb.MessageElement([b"val 111"], ldb.FLAG_MOD_ADD, "1")
3382 m["2"] = ldb.MessageElement([b"val 222"], ldb.FLAG_MOD_ADD, "2")
3383 mto = ldb.Message()
3384 mto["1"] = m["1"]
3385 mto["2"] = m["2"]
3386 self.assertEqual(mto["1"], m["1"])
3387 self.assertEqual(mto["2"], m["2"])
3388 mto = ldb.Message()
3389 mto.add(m["1"])
3390 mto.add(m["2"])
3391 self.assertEqual(mto["1"], m["1"])
3392 self.assertEqual(mto["2"], m["2"])
3394 def test_copy_add_message_element_text(self):
3395 m = ldb.Message()
3396 m["1"] = ldb.MessageElement(["val 111"], ldb.FLAG_MOD_ADD, "1")
3397 m["2"] = ldb.MessageElement(["val 222"], ldb.FLAG_MOD_ADD, "2")
3398 mto = ldb.Message()
3399 mto["1"] = m["1"]
3400 mto["2"] = m["2"]
3401 self.assertEqual(mto["1"], m.text["1"])
3402 self.assertEqual(mto["2"], m.text["2"])
3403 mto = ldb.Message()
3404 mto.add(m["1"])
3405 mto.add(m["2"])
3406 self.assertEqual(mto.text["1"], m.text["1"])
3407 self.assertEqual(mto.text["2"], m.text["2"])
3408 self.assertEqual(mto["1"], m["1"])
3409 self.assertEqual(mto["2"], m["2"])
3412 class MessageElementTests(TestCase):
3414 def test_cmp_element(self):
3415 x = ldb.MessageElement([b"foo"])
3416 y = ldb.MessageElement([b"foo"])
3417 z = ldb.MessageElement([b"bzr"])
3418 self.assertEqual(x, y)
3419 self.assertNotEqual(x, z)
3421 def test_cmp_element_text(self):
3422 x = ldb.MessageElement([b"foo"])
3423 y = ldb.MessageElement(["foo"])
3424 self.assertEqual(x, y)
3426 def test_create_iterable(self):
3427 x = ldb.MessageElement([b"foo"])
3428 self.assertEqual([b"foo"], list(x))
3429 self.assertEqual(["foo"], list(x.text))
3431 def test_repr(self):
3432 x = ldb.MessageElement([b"foo"])
3433 self.assertEqual("MessageElement([b'foo'])", repr(x))
3434 self.assertEqual("MessageElement([b'foo']).text", repr(x.text))
3435 x = ldb.MessageElement([b"foo", b"bla"])
3436 self.assertEqual(2, len(x))
3437 self.assertEqual("MessageElement([b'foo',b'bla'])", repr(x))
3438 self.assertEqual("MessageElement([b'foo',b'bla']).text", repr(x.text))
3440 def test_get_item(self):
3441 x = ldb.MessageElement([b"foo", b"bar"])
3442 self.assertEqual(b"foo", x[0])
3443 self.assertEqual(b"bar", x[1])
3444 self.assertEqual(b"bar", x[-1])
3445 self.assertRaises(IndexError, lambda: x[45])
3447 def test_get_item_text(self):
3448 x = ldb.MessageElement(["foo", "bar"])
3449 self.assertEqual("foo", x.text[0])
3450 self.assertEqual("bar", x.text[1])
3451 self.assertEqual("bar", x.text[-1])
3452 self.assertRaises(IndexError, lambda: x[45])
3454 def test_len(self):
3455 x = ldb.MessageElement([b"foo", b"bar"])
3456 self.assertEqual(2, len(x))
3458 def test_eq(self):
3459 x = ldb.MessageElement([b"foo", b"bar"])
3460 y = ldb.MessageElement([b"foo", b"bar"])
3461 self.assertEqual(y, x)
3462 x = ldb.MessageElement([b"foo"])
3463 self.assertNotEqual(y, x)
3464 y = ldb.MessageElement([b"foo"])
3465 self.assertEqual(y, x)
3467 def test_extended(self):
3468 el = ldb.MessageElement([b"456"], ldb.FLAG_MOD_ADD, "bla")
3469 self.assertEqual("MessageElement([b'456'])", repr(el))
3470 self.assertEqual("MessageElement([b'456']).text", repr(el.text))
3472 def test_bad_text(self):
3473 el = ldb.MessageElement(b'\xba\xdd')
3474 self.assertRaises(UnicodeDecodeError, el.text.__getitem__, 0)
3477 class LdbResultTests(LdbBaseTest):
3479 def setUp(self):
3480 super(LdbResultTests, self).setUp()
3481 self.testdir = tempdir()
3482 self.filename = os.path.join(self.testdir, "test.ldb")
3483 self.l = ldb.Ldb(self.url(), flags=self.flags())
3484 try:
3485 self.l.add(self.index)
3486 except AttributeError:
3487 pass
3488 self.l.add({"dn": "DC=SAMBA,DC=ORG", "name": b"samba.org",
3489 "objectUUID": b"0123456789abcde0"})
3490 self.l.add({"dn": "OU=ADMIN,DC=SAMBA,DC=ORG", "name": b"Admins",
3491 "objectUUID": b"0123456789abcde1"})
3492 self.l.add({"dn": "OU=USERS,DC=SAMBA,DC=ORG", "name": b"Users",
3493 "objectUUID": b"0123456789abcde2"})
3494 self.l.add({"dn": "OU=OU1,DC=SAMBA,DC=ORG", "name": b"OU #1",
3495 "objectUUID": b"0123456789abcde3"})
3496 self.l.add({"dn": "OU=OU2,DC=SAMBA,DC=ORG", "name": b"OU #2",
3497 "objectUUID": b"0123456789abcde4"})
3498 self.l.add({"dn": "OU=OU3,DC=SAMBA,DC=ORG", "name": b"OU #3",
3499 "objectUUID": b"0123456789abcde5"})
3500 self.l.add({"dn": "OU=OU4,DC=SAMBA,DC=ORG", "name": b"OU #4",
3501 "objectUUID": b"0123456789abcde6"})
3502 self.l.add({"dn": "OU=OU5,DC=SAMBA,DC=ORG", "name": b"OU #5",
3503 "objectUUID": b"0123456789abcde7"})
3504 self.l.add({"dn": "OU=OU6,DC=SAMBA,DC=ORG", "name": b"OU #6",
3505 "objectUUID": b"0123456789abcde8"})
3506 self.l.add({"dn": "OU=OU7,DC=SAMBA,DC=ORG", "name": b"OU #7",
3507 "objectUUID": b"0123456789abcde9"})
3508 self.l.add({"dn": "OU=OU8,DC=SAMBA,DC=ORG", "name": b"OU #8",
3509 "objectUUID": b"0123456789abcdea"})
3510 self.l.add({"dn": "OU=OU9,DC=SAMBA,DC=ORG", "name": b"OU #9",
3511 "objectUUID": b"0123456789abcdeb"})
3512 self.l.add({"dn": "OU=OU10,DC=SAMBA,DC=ORG", "name": b"OU #10",
3513 "objectUUID": b"0123456789abcdec"})
3515 def tearDown(self):
3516 shutil.rmtree(self.testdir)
3517 super(LdbResultTests, self).tearDown()
3518 # Ensure the LDB is closed now, so we close the FD
3519 del(self.l)
3521 def test_return_type(self):
3522 res = self.l.search()
3523 self.assertEqual(str(res), "<ldb result>")
3525 def test_get_msgs(self):
3526 res = self.l.search()
3527 list = res.msgs
3529 def test_get_controls(self):
3530 res = self.l.search()
3531 list = res.controls
3533 def test_get_referals(self):
3534 res = self.l.search()
3535 list = res.referals
3537 def test_iter_msgs(self):
3538 found = False
3539 for l in self.l.search().msgs:
3540 if str(l.dn) == "OU=OU10,DC=SAMBA,DC=ORG":
3541 found = True
3542 self.assertTrue(found)
3544 def test_iter_msgs_count(self):
3545 self.assertTrue(self.l.search().count > 0)
3546 # 13 objects has been added to the DC=SAMBA, DC=ORG
3547 self.assertEqual(self.l.search(base="DC=SAMBA,DC=ORG").count, 13)
3549 def test_iter_controls(self):
3550 res = self.l.search().controls
3551 it = iter(res)
3553 def test_create_control(self):
3554 self.assertRaises(ValueError, ldb.Control, self.l, "tatayoyo:0")
3555 c = ldb.Control(self.l, "relax:1")
3556 self.assertEqual(c.critical, True)
3557 self.assertEqual(c.oid, "1.3.6.1.4.1.4203.666.5.12")
3559 def test_iter_refs(self):
3560 res = self.l.search().referals
3561 it = iter(res)
3563 def test_search_sequence_msgs(self):
3564 found = False
3565 res = self.l.search().msgs
3567 for i in range(0, len(res)):
3568 l = res[i]
3569 if str(l.dn) == "OU=OU10,DC=SAMBA,DC=ORG":
3570 found = True
3571 self.assertTrue(found)
3573 def test_search_as_iter(self):
3574 found = False
3575 res = self.l.search()
3577 for l in res:
3578 if str(l.dn) == "OU=OU10,DC=SAMBA,DC=ORG":
3579 found = True
3580 self.assertTrue(found)
3582 def test_search_iter(self):
3583 found = False
3584 res = self.l.search_iterator()
3586 for l in res:
3587 if str(l.dn) == "OU=OU10,DC=SAMBA,DC=ORG":
3588 found = True
3589 self.assertTrue(found)
3591 # Show that search results can't see into a transaction
3593 def test_search_against_trans(self):
3594 found11 = False
3596 (r1, w1) = os.pipe()
3598 (r2, w2) = os.pipe()
3600 # For the first element, fork a child that will
3601 # write to the DB
3602 pid = os.fork()
3603 if pid == 0:
3604 # In the child, re-open
3605 del(self.l)
3606 gc.collect()
3608 child_ldb = ldb.Ldb(self.url(), flags=self.flags())
3609 # start a transaction
3610 child_ldb.transaction_start()
3612 # write to it
3613 child_ldb.add({"dn": "OU=OU11,DC=SAMBA,DC=ORG",
3614 "name": b"samba.org",
3615 "objectUUID": b"o123456789acbdef"})
3617 os.write(w1, b"added")
3619 # Now wait for the search to be done
3620 os.read(r2, 6)
3622 # and commit
3623 try:
3624 child_ldb.transaction_commit()
3625 except ldb.LdbError as err:
3626 # We print this here to see what went wrong in the child
3627 print(err)
3628 os._exit(1)
3630 os.write(w1, b"transaction")
3631 os._exit(0)
3633 self.assertEqual(os.read(r1, 5), b"added")
3635 # This should not turn up until the transaction is concluded
3636 res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
3637 scope=ldb.SCOPE_BASE)
3638 self.assertEqual(len(res11), 0)
3640 os.write(w2, b"search")
3642 # Now wait for the transaction to be done. This should
3643 # deadlock, but the search doesn't hold a read lock for the
3644 # iterator lifetime currently.
3645 self.assertEqual(os.read(r1, 11), b"transaction")
3647 # This should now turn up, as the transaction is over
3648 res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
3649 scope=ldb.SCOPE_BASE)
3650 self.assertEqual(len(res11), 1)
3652 self.assertFalse(found11)
3654 (got_pid, status) = os.waitpid(pid, 0)
3655 self.assertEqual(got_pid, pid)
3657 def test_search_iter_against_trans(self):
3658 found = False
3659 found11 = False
3661 # We need to hold this iterator open to hold the all-record
3662 # lock
3663 res = self.l.search_iterator()
3665 (r1, w1) = os.pipe()
3667 (r2, w2) = os.pipe()
3669 # For the first element, with the sequence open (which
3670 # means with ldb locks held), fork a child that will
3671 # write to the DB
3672 pid = os.fork()
3673 if pid == 0:
3674 # In the child, re-open
3675 del(res)
3676 del(self.l)
3677 gc.collect()
3679 child_ldb = ldb.Ldb(self.url(), flags=self.flags())
3680 # start a transaction
3681 child_ldb.transaction_start()
3683 # write to it
3684 child_ldb.add({"dn": "OU=OU11,DC=SAMBA,DC=ORG",
3685 "name": b"samba.org",
3686 "objectUUID": b"o123456789acbdef"})
3688 os.write(w1, b"added")
3690 # Now wait for the search to be done
3691 os.read(r2, 6)
3693 # and commit
3694 try:
3695 child_ldb.transaction_commit()
3696 except ldb.LdbError as err:
3697 # We print this here to see what went wrong in the child
3698 print(err)
3699 os._exit(1)
3701 os.write(w1, b"transaction")
3702 os._exit(0)
3704 self.assertEqual(os.read(r1, 5), b"added")
3706 # This should not turn up until the transaction is concluded
3707 res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
3708 scope=ldb.SCOPE_BASE)
3709 self.assertEqual(len(res11), 0)
3711 os.write(w2, b"search")
3713 # allow the transaction to start
3714 time.sleep(1)
3716 # This should not turn up until the search finishes and
3717 # removed the read lock, but for ldb_tdb that happened as soon
3718 # as we called the first res.next()
3719 res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
3720 scope=ldb.SCOPE_BASE)
3721 self.assertEqual(len(res11), 0)
3723 # These results are all collected at the first next(res) call
3724 for l in res:
3725 if str(l.dn) == "OU=OU10,DC=SAMBA,DC=ORG":
3726 found = True
3727 if str(l.dn) == "OU=OU11,DC=SAMBA,DC=ORG":
3728 found11 = True
3730 # Now wait for the transaction to be done.
3731 self.assertEqual(os.read(r1, 11), b"transaction")
3733 # This should now turn up, as the transaction is over and all
3734 # read locks are gone
3735 res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
3736 scope=ldb.SCOPE_BASE)
3737 self.assertEqual(len(res11), 1)
3739 self.assertTrue(found)
3740 self.assertFalse(found11)
3742 (got_pid, status) = os.waitpid(pid, 0)
3743 self.assertEqual(got_pid, pid)
3746 class LdbResultTestsLmdb(LdbResultTests):
3748 def setUp(self):
3749 if os.environ.get('HAVE_LMDB', '1') == '0':
3750 self.skipTest("No lmdb backend")
3751 self.prefix = MDB_PREFIX
3752 self.index = MDB_INDEX_OBJ
3753 super(LdbResultTestsLmdb, self).setUp()
3755 def tearDown(self):
3756 super(LdbResultTestsLmdb, self).tearDown()
3759 class BadTypeTests(TestCase):
3760 def test_control(self):
3761 l = ldb.Ldb()
3762 self.assertRaises(TypeError, ldb.Control, '<bad type>', 'relax:1')
3763 self.assertRaises(TypeError, ldb.Control, ldb, 1234)
3765 def test_modify(self):
3766 l = ldb.Ldb()
3767 dn = ldb.Dn(l, 'a=b')
3768 m = ldb.Message(dn)
3769 self.assertRaises(TypeError, l.modify, '<bad type>')
3770 self.assertRaises(TypeError, l.modify, m, '<bad type>')
3772 def test_add(self):
3773 l = ldb.Ldb()
3774 dn = ldb.Dn(l, 'a=b')
3775 m = ldb.Message(dn)
3776 self.assertRaises(TypeError, l.add, '<bad type>')
3777 self.assertRaises(TypeError, l.add, m, '<bad type>')
3779 def test_delete(self):
3780 l = ldb.Ldb()
3781 dn = ldb.Dn(l, 'a=b')
3782 self.assertRaises(TypeError, l.add, '<bad type>')
3783 self.assertRaises(TypeError, l.add, dn, '<bad type>')
3785 def test_rename(self):
3786 l = ldb.Ldb()
3787 dn = ldb.Dn(l, 'a=b')
3788 self.assertRaises(TypeError, l.add, '<bad type>', dn)
3789 self.assertRaises(TypeError, l.add, dn, '<bad type>')
3790 self.assertRaises(TypeError, l.add, dn, dn, '<bad type>')
3792 def test_search(self):
3793 l = ldb.Ldb()
3794 self.assertRaises(TypeError, l.search, base=1234)
3795 self.assertRaises(TypeError, l.search, scope='<bad type>')
3796 self.assertRaises(TypeError, l.search, expression=1234)
3797 self.assertRaises(TypeError, l.search, attrs='<bad type>')
3798 self.assertRaises(TypeError, l.search, controls='<bad type>')
3801 class VersionTests(TestCase):
3803 def test_version(self):
3804 self.assertTrue(isinstance(ldb.__version__, str))
3806 class NestedTransactionTests(LdbBaseTest):
3807 def setUp(self):
3808 super(NestedTransactionTests, self).setUp()
3809 self.testdir = tempdir()
3810 self.filename = os.path.join(self.testdir, "test.ldb")
3811 self.ldb = ldb.Ldb(self.url(), flags=self.flags())
3812 self.ldb.add({"dn": "@INDEXLIST",
3813 "@IDXATTR": [b"x", b"y", b"ou"],
3814 "@IDXGUID": [b"objectUUID"],
3815 "@IDX_DN_GUID": [b"GUID"]})
3817 super(NestedTransactionTests, self).setUp()
3820 # This test documents that currently ldb does not support true nested
3821 # transactions.
3823 # Note: The test is written so that it treats failure as pass.
3824 # It is done this way as standalone ldb builds do not use the samba
3825 # known fail mechanism
3827 def test_nested_transactions(self):
3829 self.ldb.transaction_start()
3831 self.ldb.add({"dn": "x=x1,dc=samba,dc=org",
3832 "objectUUID": b"0123456789abcde1"})
3833 res = self.ldb.search(expression="(objectUUID=0123456789abcde1)",
3834 base="dc=samba,dc=org")
3835 self.assertEqual(len(res), 1)
3837 self.ldb.add({"dn": "x=x2,dc=samba,dc=org",
3838 "objectUUID": b"0123456789abcde2"})
3839 res = self.ldb.search(expression="(objectUUID=0123456789abcde2)",
3840 base="dc=samba,dc=org")
3841 self.assertEqual(len(res), 1)
3843 self.ldb.transaction_start()
3844 self.ldb.add({"dn": "x=x3,dc=samba,dc=org",
3845 "objectUUID": b"0123456789abcde3"})
3846 res = self.ldb.search(expression="(objectUUID=0123456789abcde3)",
3847 base="dc=samba,dc=org")
3848 self.assertEqual(len(res), 1)
3849 self.ldb.transaction_cancel()
3851 # Check that we can not see the record added by the cancelled
3852 # transaction.
3853 # Currently this fails as ldb does not support true nested
3854 # transactions, and only the outer commits and cancels have an effect
3856 res = self.ldb.search(expression="(objectUUID=0123456789abcde3)",
3857 base="dc=samba,dc=org")
3859 # FIXME this test currently passes on a failure, i.e. if nested
3860 # transaction support worked correctly the correct test would
3861 # be.
3862 # self.assertEqual(len(res), 0)
3863 # as the add of objectUUID=0123456789abcde3 would reverted when
3864 # the sub transaction it was nested in was rolled back.
3866 # Currently this is not the case so the record is still present.
3867 self.assertEqual(len(res), 1)
3870 # Commit the outer transaction
3872 self.ldb.transaction_commit()
3874 # Now check we can still see the records added in the outer
3875 # transaction.
3877 res = self.ldb.search(expression="(objectUUID=0123456789abcde1)",
3878 base="dc=samba,dc=org")
3879 self.assertEqual(len(res), 1)
3880 res = self.ldb.search(expression="(objectUUID=0123456789abcde2)",
3881 base="dc=samba,dc=org")
3882 self.assertEqual(len(res), 1)
3884 # And that we can't see the records added by the nested transaction.
3886 res = self.ldb.search(expression="(objectUUID=0123456789abcde3)",
3887 base="dc=samba,dc=org")
3888 # FIXME again if nested transactions worked correctly we would not
3889 # see this record. The test should be.
3890 # self.assertEqual(len(res), 0)
3891 self.assertEqual(len(res), 1)
3893 def tearDown(self):
3894 super(NestedTransactionTests, self).tearDown()
3897 class LmdbNestedTransactionTests(NestedTransactionTests):
3899 def setUp(self):
3900 if os.environ.get('HAVE_LMDB', '1') == '0':
3901 self.skipTest("No lmdb backend")
3902 self.prefix = MDB_PREFIX
3903 self.index = MDB_INDEX_OBJ
3904 super(LmdbNestedTransactionTests, self).setUp()
3906 def tearDown(self):
3907 super(LmdbNestedTransactionTests, self).tearDown()
3910 if __name__ == '__main__':
3911 import unittest
3912 unittest.TestProgram()