Use loadparm since it's required by some modules.
[Samba/gebeck_regimport.git] / source4 / dsdb / samdb / ldb_modules / tests / samba3sam.py
blob882376cb0919da4bcb05bea9131b2c64be787911
1 #!/usr/bin/python
3 # Unix SMB/CIFS implementation.
4 # Copyright (C) Jelmer Vernooij <jelmer@samba.org> 2005-2008
5 # Copyright (C) Martin Kuehl <mkhl@samba.org> 2006
7 # This is a Python port of the original in testprogs/ejs/samba3sam.js
8 #
9 # This program is free software; you can redistribute it and/or modify
10 # it under the terms of the GNU General Public License as published by
11 # the Free Software Foundation; either version 3 of the License, or
12 # (at your option) any later version.
14 # This program is distributed in the hope that it will be useful,
15 # but WITHOUT ANY WARRANTY; without even the implied warranty of
16 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 # GNU General Public License for more details.
19 # You should have received a copy of the GNU General Public License
20 # along with this program. If not, see <http://www.gnu.org/licenses/>.
23 """Tests for the samba3sam LDB module, which maps Samba3 LDAP to AD LDAP."""
25 import os
26 import ldb
27 from ldb import SCOPE_DEFAULT, SCOPE_BASE, SCOPE_SUBTREE
28 from samba import Ldb, substitute_var
29 from samba.tests import LdbTestCase, TestCaseInTempDir, cmdline_loadparm
31 datadir = os.path.join(os.path.dirname(__file__),
32 "../../../../../testdata/samba3")
34 def read_datafile(filename):
35 return open(os.path.join(datadir, filename), 'r').read()
37 def ldb_debug(l, text):
38 print text
41 class MapBaseTestCase(TestCaseInTempDir):
42 """Base test case for mapping tests."""
44 def setup_modules(self, ldb, s3, s4):
45 ldb.add({"dn": "@MAP=samba3sam",
46 "@FROM": s4.basedn,
47 "@TO": "sambaDomainName=TESTS," + s3.basedn})
49 ldb.add({"dn": "@MODULES",
50 "@LIST": "rootdse,paged_results,server_sort,extended_dn,asq,samldb,password_hash,operational,objectguid,rdn_name,samba3sam,partition"})
52 ldb.add({"dn": "@PARTITION",
53 "partition": ["%s:%s" % (s4.basedn, s4.url),
54 "%s:%s" % (s3.basedn, s3.url)],
55 "replicateEntries": ["@ATTRIBUTES", "@INDEXLIST"]})
57 def setUp(self):
58 super(MapBaseTestCase, self).setUp()
60 def make_dn(basedn, rdn):
61 return "%s,sambaDomainName=TESTS,%s" % (rdn, basedn)
63 def make_s4dn(basedn, rdn):
64 return "%s,%s" % (rdn, basedn)
66 self.ldbfile = os.path.join(self.tempdir, "test.ldb")
67 self.ldburl = "tdb://" + self.ldbfile
69 tempdir = self.tempdir
71 class Target:
72 """Simple helper class that contains data for a specific SAM
73 connection."""
74 def __init__(self, file, basedn, dn):
75 self.file = os.path.join(tempdir, file)
76 self.url = "tdb://" + self.file
77 self.basedn = basedn
78 self.substvars = {"BASEDN": self.basedn}
79 self.db = Ldb(lp=cmdline_loadparm)
80 self._dn = dn
82 def dn(self, rdn):
83 return self._dn(self.basedn, rdn)
85 def connect(self):
86 return self.db.connect(self.url)
88 def setup_data(self, path):
89 self.add_ldif(read_datafile(path))
91 def subst(self, text):
92 return substitute_var(text, self.substvars)
94 def add_ldif(self, ldif):
95 self.db.add_ldif(self.subst(ldif))
97 def modify_ldif(self, ldif):
98 self.db.modify_ldif(self.subst(ldif))
100 self.samba4 = Target("samba4.ldb", "dc=vernstok,dc=nl", make_s4dn)
101 self.samba3 = Target("samba3.ldb", "cn=Samba3Sam", make_dn)
102 self.templates = Target("templates.ldb", "cn=templates", None)
104 self.samba3.connect()
105 self.templates.connect()
106 self.samba4.connect()
108 def tearDown(self):
109 os.unlink(self.ldbfile)
110 os.unlink(self.samba3.file)
111 os.unlink(self.templates.file)
112 os.unlink(self.samba4.file)
113 super(MapBaseTestCase, self).tearDown()
116 class Samba3SamTestCase(MapBaseTestCase):
118 def setUp(self):
119 super(Samba3SamTestCase, self).setUp()
120 ldb = Ldb(self.ldburl, lp=cmdline_loadparm)
121 self.samba3.setup_data("samba3.ldif")
122 self.templates.setup_data("provision_samba3sam_templates.ldif")
123 ldif = read_datafile("provision_samba3sam.ldif")
124 ldb.add_ldif(self.samba4.subst(ldif))
125 self.setup_modules(ldb, self.samba3, self.samba4)
126 del ldb
127 self.ldb = Ldb(self.ldburl, lp=cmdline_loadparm)
129 def test_search_non_mapped(self):
130 """Looking up by non-mapped attribute"""
131 msg = self.ldb.search(expression="(cn=Administrator)")
132 self.assertEquals(len(msg), 1)
133 self.assertEquals(msg[0]["cn"], "Administrator")
135 def test_search_non_mapped(self):
136 """Looking up by mapped attribute"""
137 msg = self.ldb.search(expression="(name=Backup Operators)")
138 self.assertEquals(len(msg), 1)
139 self.assertEquals(msg[0]["name"], "Backup Operators")
141 def test_old_name_of_renamed(self):
142 """Looking up by old name of renamed attribute"""
143 msg = self.ldb.search(expression="(displayName=Backup Operators)")
144 self.assertEquals(len(msg), 0)
146 def test_mapped_containing_sid(self):
147 """Looking up mapped entry containing SID"""
148 msg = self.ldb.search(expression="(cn=Replicator)")
149 self.assertEquals(len(msg), 1)
150 self.assertEquals(str(msg[0].dn),
151 "cn=Replicator,ou=Groups,dc=vernstok,dc=nl")
152 self.assertTrue("objectSid" in msg[0])
153 # FIXME: NDR unpack msg[0]["objectSid"] before comparing:
154 # self.assertEquals(msg[0]["objectSid"],
155 # "S-1-5-21-4231626423-2410014848-2360679739-552")
156 # Check mapping of objectClass
157 oc = set(msg[0]["objectClass"])
158 self.assertEquals(oc, set(["group"]))
160 def test_search_by_objclass(self):
161 """Looking up by objectClass"""
162 msg = self.ldb.search(expression="(|(objectClass=user)(cn=Administrator))")
163 self.assertEquals(set([str(m.dn) for m in msg]),
164 set(["unixName=Administrator,ou=Users,dc=vernstok,dc=nl",
165 "unixName=nobody,ou=Users,dc=vernstok,dc=nl"]))
167 def test_s3sam_modify(self):
168 # Adding a record that will be fallbacked
169 self.ldb.add({"dn": "cn=Foo",
170 "foo": "bar",
171 "blah": "Blie",
172 "cn": "Foo",
173 "showInAdvancedViewOnly": "TRUE"}
176 # Checking for existence of record (local)
177 # TODO: This record must be searched in the local database, which is
178 # currently only supported for base searches
179 # msg = ldb.search(expression="(cn=Foo)", ['foo','blah','cn','showInAdvancedViewOnly')]
180 # TODO: Actually, this version should work as well but doesn't...
183 msg = self.ldb.search(expression="(cn=Foo)", base="cn=Foo",
184 scope=SCOPE_BASE,
185 attrs=['foo','blah','cn','showInAdvancedViewOnly'])
186 self.assertEquals(len(msg), 1)
187 self.assertEquals(msg[0]["showInAdvancedViewOnly"], "TRUE")
188 self.assertEquals(msg[0]["foo"], "bar")
189 self.assertEquals(msg[0]["blah"], "Blie")
191 # Adding record that will be mapped
192 self.ldb.add({"dn": "cn=Niemand,cn=Users,dc=vernstok,dc=nl",
193 "objectClass": "user",
194 "unixName": "bin",
195 "sambaUnicodePwd": "geheim",
196 "cn": "Niemand"})
198 # Checking for existence of record (remote)
199 msg = self.ldb.search(expression="(unixName=bin)",
200 attrs=['unixName','cn','dn', 'sambaUnicodePwd'])
201 self.assertEquals(len(msg), 1)
202 self.assertEquals(msg[0]["cn"], "Niemand")
203 self.assertEquals(msg[0]["sambaUnicodePwd"], "geheim")
205 # Checking for existence of record (local && remote)
206 msg = self.ldb.search(expression="(&(unixName=bin)(sambaUnicodePwd=geheim))",
207 attrs=['unixName','cn','dn', 'sambaUnicodePwd'])
208 self.assertEquals(len(msg), 1) # TODO: should check with more records
209 self.assertEquals(msg[0]["cn"], "Niemand")
210 self.assertEquals(msg[0]["unixName"], "bin")
211 self.assertEquals(msg[0]["sambaUnicodePwd"], "geheim")
213 # Checking for existence of record (local || remote)
214 msg = self.ldb.search(expression="(|(unixName=bin)(sambaUnicodePwd=geheim))",
215 attrs=['unixName','cn','dn', 'sambaUnicodePwd'])
216 #print "got %d replies" % len(msg)
217 self.assertEquals(len(msg), 1) # TODO: should check with more records
218 self.assertEquals(msg[0]["cn"], "Niemand")
219 self.assertEquals(msg[0]["unixName"], "bin")
220 self.assertEquals(msg[0]["sambaUnicodePwd"], "geheim")
222 # Checking for data in destination database
223 msg = self.samba3.db.search(expression="(cn=Niemand)")
224 self.assertTrue(len(msg) >= 1)
225 self.assertEquals(msg[0]["sambaSID"],
226 "S-1-5-21-4231626423-2410014848-2360679739-2001")
227 self.assertEquals(msg[0]["displayName"], "Niemand")
229 # Adding attribute...
230 self.ldb.modify_ldif("""
231 dn: cn=Niemand,cn=Users,dc=vernstok,dc=nl
232 changetype: modify
233 add: description
234 description: Blah
235 """)
237 # Checking whether changes are still there...
238 msg = self.ldb.search(expression="(cn=Niemand)")
239 self.assertTrue(len(msg) >= 1)
240 self.assertEquals(msg[0]["cn"], "Niemand")
241 self.assertEquals(msg[0]["description"], "Blah")
243 # Modifying attribute...
244 self.ldb.modify_ldif("""
245 dn: cn=Niemand,cn=Users,dc=vernstok,dc=nl
246 changetype: modify
247 replace: description
248 description: Blie
249 """)
251 # Checking whether changes are still there...
252 msg = self.ldb.search(expression="(cn=Niemand)")
253 self.assertTrue(len(msg) >= 1)
254 self.assertEquals(msg[0]["description"], "Blie")
256 # Deleting attribute...
257 self.ldb.modify_ldif("""
258 dn: cn=Niemand,cn=Users,dc=vernstok,dc=nl
259 changetype: modify
260 delete: description
261 """)
263 # Checking whether changes are no longer there...
264 msg = self.ldb.search(expression="(cn=Niemand)")
265 self.assertTrue(len(msg) >= 1)
266 self.assertTrue(not "description" in msg[0])
268 # Renaming record...
269 self.ldb.rename("cn=Niemand,cn=Users,dc=vernstok,dc=nl",
270 "cn=Niemand2,cn=Users,dc=vernstok,dc=nl")
272 # Checking whether DN has changed...
273 msg = self.ldb.search(expression="(cn=Niemand2)")
274 self.assertEquals(len(msg), 1)
275 self.assertEquals(str(msg[0].dn),
276 "cn=Niemand2,cn=Users,dc=vernstok,dc=nl")
278 # Deleting record...
279 self.ldb.delete("cn=Niemand2,cn=Users,dc=vernstok,dc=nl")
281 # Checking whether record is gone...
282 msg = self.ldb.search(expression="(cn=Niemand2)")
283 self.assertEquals(len(msg), 0)
286 class MapTestCase(MapBaseTestCase):
288 def setUp(self):
289 super(MapTestCase, self).setUp()
290 ldb = Ldb(self.ldburl, lp=cmdline_loadparm)
291 self.templates.setup_data("provision_samba3sam_templates.ldif")
292 ldif = read_datafile("provision_samba3sam.ldif")
293 ldb.add_ldif(self.samba4.subst(ldif))
294 self.setup_modules(ldb, self.samba3, self.samba4)
295 del ldb
296 self.ldb = Ldb(self.ldburl, lp=cmdline_loadparm)
298 def test_map_search(self):
299 """Running search tests on mapped data."""
300 self.samba3.db.add({
301 "dn": "sambaDomainName=TESTS," + self.samba3.basedn,
302 "objectclass": ["sambaDomain", "top"],
303 "sambaSID": "S-1-5-21-4231626423-2410014848-2360679739",
304 "sambaNextRid": "2000",
305 "sambaDomainName": "TESTS"
308 # Add a set of split records
309 self.ldb.add_ldif("""
310 dn: """+ self.samba4.dn("cn=X") + """
311 objectClass: user
312 cn: X
313 codePage: x
314 revision: x
315 dnsHostName: x
316 nextRid: y
317 lastLogon: x
318 description: x
319 objectSid: S-1-5-21-4231626423-2410014848-2360679739-552
320 primaryGroupID: 1-5-21-4231626423-2410014848-2360679739-512
322 """)
324 self.ldb.add({
325 "dn": self.samba4.dn("cn=Y"),
326 "objectClass": "top",
327 "cn": "Y",
328 "codePage": "x",
329 "revision": "x",
330 "dnsHostName": "y",
331 "nextRid": "y",
332 "lastLogon": "y",
333 "description": "x"})
335 self.ldb.add({
336 "dn": self.samba4.dn("cn=Z"),
337 "objectClass": "top",
338 "cn": "Z",
339 "codePage": "x",
340 "revision": "y",
341 "dnsHostName": "z",
342 "nextRid": "y",
343 "lastLogon": "z",
344 "description": "y"})
346 # Add a set of remote records
348 self.samba3.db.add({
349 "dn": self.samba3.dn("cn=A"),
350 "objectClass": "posixAccount",
351 "cn": "A",
352 "sambaNextRid": "x",
353 "sambaBadPasswordCount": "x",
354 "sambaLogonTime": "x",
355 "description": "x",
356 "sambaSID": "S-1-5-21-4231626423-2410014848-2360679739-552",
357 "sambaPrimaryGroupSID": "S-1-5-21-4231626423-2410014848-2360679739-512"})
359 self.samba3.db.add({
360 "dn": self.samba3.dn("cn=B"),
361 "objectClass": "top",
362 "cn": "B",
363 "sambaNextRid": "x",
364 "sambaBadPasswordCount": "x",
365 "sambaLogonTime": "y",
366 "description": "x"})
368 self.samba3.db.add({
369 "dn": self.samba3.dn("cn=C"),
370 "objectClass": "top",
371 "cn": "C",
372 "sambaNextRid": "x",
373 "sambaBadPasswordCount": "y",
374 "sambaLogonTime": "z",
375 "description": "y"})
377 # Testing search by DN
379 # Search remote record by local DN
380 dn = self.samba4.dn("cn=A")
381 res = self.ldb.search(dn, scope=SCOPE_BASE,
382 attrs=["dnsHostName", "lastLogon"])
383 self.assertEquals(len(res), 1)
384 self.assertEquals(str(res[0].dn), dn)
385 self.assertTrue(not "dnsHostName" in res[0])
386 self.assertEquals(res[0]["lastLogon"], "x")
388 # Search remote record by remote DN
389 dn = self.samba3.dn("cn=A")
390 res = self.samba3.db.search(dn, scope=SCOPE_BASE,
391 attrs=["dnsHostName", "lastLogon", "sambaLogonTime"])
392 self.assertEquals(len(res), 1)
393 self.assertEquals(str(res[0].dn), dn)
394 self.assertTrue(not "dnsHostName" in res[0])
395 self.assertTrue(not "lastLogon" in res[0])
396 self.assertEquals(res[0]["sambaLogonTime"], "x")
398 # Search split record by local DN
399 dn = self.samba4.dn("cn=X")
400 res = self.ldb.search(dn, scope=SCOPE_BASE,
401 attrs=["dnsHostName", "lastLogon"])
402 self.assertEquals(len(res), 1)
403 self.assertEquals(str(res[0].dn), dn)
404 self.assertEquals(res[0]["dnsHostName"], "x")
405 self.assertEquals(res[0]["lastLogon"], "x")
407 # Search split record by remote DN
408 dn = self.samba3.dn("cn=X")
409 res = self.samba3.db.search(dn, scope=SCOPE_BASE,
410 attrs=["dnsHostName", "lastLogon", "sambaLogonTime"])
411 self.assertEquals(len(res), 1)
412 self.assertEquals(str(res[0].dn), dn)
413 self.assertTrue(not "dnsHostName" in res[0])
414 self.assertTrue(not "lastLogon" in res[0])
415 self.assertEquals(res[0]["sambaLogonTime"], "x")
417 # Testing search by attribute
419 # Search by ignored attribute
420 res = self.ldb.search(expression="(revision=x)", scope=SCOPE_DEFAULT,
421 attrs=["dnsHostName", "lastLogon"])
422 self.assertEquals(len(res), 2)
423 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=Y"))
424 self.assertEquals(res[0]["dnsHostName"], "y")
425 self.assertEquals(res[0]["lastLogon"], "y")
426 self.assertEquals(str(res[1].dn), self.samba4.dn("cn=X"))
427 self.assertEquals(res[1]["dnsHostName"], "x")
428 self.assertEquals(res[1]["lastLogon"], "x")
430 # Search by kept attribute
431 res = self.ldb.search(expression="(description=y)",
432 scope=SCOPE_DEFAULT, attrs=["dnsHostName", "lastLogon"])
433 self.assertEquals(len(res), 2)
434 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=Z"))
435 self.assertEquals(res[0]["dnsHostName"], "z")
436 self.assertEquals(res[0]["lastLogon"], "z")
437 self.assertEquals(str(res[1].dn), self.samba4.dn("cn=C"))
438 self.assertTrue(not "dnsHostName" in res[1])
439 self.assertEquals(res[1]["lastLogon"], "z")
441 # Search by renamed attribute
442 res = self.ldb.search(expression="(badPwdCount=x)", scope=SCOPE_DEFAULT,
443 attrs=["dnsHostName", "lastLogon"])
444 self.assertEquals(len(res), 2)
445 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=B"))
446 self.assertTrue(not "dnsHostName" in res[0])
447 self.assertEquals(res[0]["lastLogon"], "y")
448 self.assertEquals(str(res[1].dn), self.samba4.dn("cn=A"))
449 self.assertTrue(not "dnsHostName" in res[1])
450 self.assertEquals(res[1]["lastLogon"], "x")
452 # Search by converted attribute
453 # TODO:
454 # Using the SID directly in the parse tree leads to conversion
455 # errors, letting the search fail with no results.
456 #res = self.ldb.search("(objectSid=S-1-5-21-4231626423-2410014848-2360679739-552)", scope=SCOPE_DEFAULT, attrs)
457 res = self.ldb.search(expression="(objectSid=*)", base=None, scope=SCOPE_DEFAULT, attrs=["dnsHostName", "lastLogon", "objectSid"])
458 self.assertEquals(len(res), 3)
459 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=X"))
460 self.assertEquals(res[0]["dnsHostName"], "x")
461 self.assertEquals(res[0]["lastLogon"], "x")
462 # FIXME:Properly compare sid,requires converting between NDR encoding
463 # and string
464 #self.assertEquals(res[0]["objectSid"],
465 # "S-1-5-21-4231626423-2410014848-2360679739-552")
466 self.assertTrue("objectSid" in res[0])
467 self.assertEquals(str(res[1].dn), self.samba4.dn("cn=A"))
468 self.assertTrue(not "dnsHostName" in res[1])
469 self.assertEquals(res[1]["lastLogon"], "x")
470 # FIXME: Properly compare sid,see above
471 #self.assertEquals(res[1]["objectSid"],
472 # "S-1-5-21-4231626423-2410014848-2360679739-552")
473 self.assertTrue("objectSid" in res[1])
475 # Search by generated attribute
476 # In most cases, this even works when the mapping is missing
477 # a `convert_operator' by enumerating the remote db.
478 res = self.ldb.search(expression="(primaryGroupID=512)",
479 attrs=["dnsHostName", "lastLogon", "primaryGroupID"])
480 self.assertEquals(len(res), 1)
481 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=A"))
482 self.assertTrue(not "dnsHostName" in res[0])
483 self.assertEquals(res[0]["lastLogon"], "x")
484 self.assertEquals(res[0]["primaryGroupID"], "512")
486 # TODO: There should actually be two results, A and X. The
487 # primaryGroupID of X seems to get corrupted somewhere, and the
488 # objectSid isn't available during the generation of remote (!) data,
489 # which can be observed with the following search. Also note that Xs
490 # objectSid seems to be fine in the previous search for objectSid... */
491 #res = ldb.search(expression="(primaryGroupID=*)", NULL, ldb. SCOPE_DEFAULT, attrs)
492 #print len(res) + " results found"
493 #for i in range(len(res)):
494 # for (obj in res[i]) {
495 # print obj + ": " + res[i][obj]
497 # print "---"
500 # Search by remote name of renamed attribute */
501 res = self.ldb.search(expression="(sambaBadPasswordCount=*)",
502 attrs=["dnsHostName", "lastLogon"])
503 self.assertEquals(len(res), 0)
505 # Search by objectClass
506 attrs = ["dnsHostName", "lastLogon", "objectClass"]
507 res = self.ldb.search(expression="(objectClass=user)", attrs=attrs)
508 self.assertEquals(len(res), 2)
509 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=X"))
510 self.assertEquals(res[0]["dnsHostName"], "x")
511 self.assertEquals(res[0]["lastLogon"], "x")
512 self.assertEquals(res[0]["objectClass"][0], "user")
513 self.assertEquals(str(res[1].dn), self.samba4.dn("cn=A"))
514 self.assertTrue(not "dnsHostName" in res[1])
515 self.assertEquals(res[1]["lastLogon"], "x")
516 self.assertEquals(res[1]["objectClass"][0], "user")
518 # Prove that the objectClass is actually used for the search
519 res = self.ldb.search(expression="(|(objectClass=user)(badPwdCount=x))",
520 attrs=attrs)
521 self.assertEquals(len(res), 3)
522 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=B"))
523 self.assertTrue(not "dnsHostName" in res[0])
524 self.assertEquals(res[0]["lastLogon"], "y")
525 self.assertEquals(set(res[0]["objectClass"]), set(["top"]))
526 self.assertEquals(str(res[1].dn), self.samba4.dn("cn=X"))
527 self.assertEquals(res[1]["dnsHostName"], "x")
528 self.assertEquals(res[1]["lastLogon"], "x")
529 self.assertEquals(res[1]["objectClass"][0], "user")
530 self.assertEquals(str(res[2].dn), self.samba4.dn("cn=A"))
531 self.assertTrue(not "dnsHostName" in res[2])
532 self.assertEquals(res[2]["lastLogon"], "x")
533 self.assertEquals(res[2]["objectClass"][0], "user")
535 # Testing search by parse tree
537 # Search by conjunction of local attributes
538 res = self.ldb.search(expression="(&(codePage=x)(revision=x))",
539 attrs=["dnsHostName", "lastLogon"])
540 self.assertEquals(len(res), 2)
541 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=Y"))
542 self.assertEquals(res[0]["dnsHostName"], "y")
543 self.assertEquals(res[0]["lastLogon"], "y")
544 self.assertEquals(str(res[1].dn), self.samba4.dn("cn=X"))
545 self.assertEquals(res[1]["dnsHostName"], "x")
546 self.assertEquals(res[1]["lastLogon"], "x")
548 # Search by conjunction of remote attributes
549 res = self.ldb.search(expression="(&(lastLogon=x)(description=x))",
550 attrs=["dnsHostName", "lastLogon"])
551 self.assertEquals(len(res), 2)
552 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=X"))
553 self.assertEquals(res[0]["dnsHostName"], "x")
554 self.assertEquals(res[0]["lastLogon"], "x")
555 self.assertEquals(str(res[1].dn), self.samba4.dn("cn=A"))
556 self.assertTrue(not "dnsHostName" in res[1])
557 self.assertEquals(res[1]["lastLogon"], "x")
559 # Search by conjunction of local and remote attribute
560 res = self.ldb.search(expression="(&(codePage=x)(description=x))",
561 attrs=["dnsHostName", "lastLogon"])
562 self.assertEquals(len(res), 2)
563 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=Y"))
564 self.assertEquals(res[0]["dnsHostName"], "y")
565 self.assertEquals(res[0]["lastLogon"], "y")
566 self.assertEquals(str(res[1].dn), self.samba4.dn("cn=X"))
567 self.assertEquals(res[1]["dnsHostName"], "x")
568 self.assertEquals(res[1]["lastLogon"], "x")
570 # Search by conjunction of local and remote attribute w/o match
571 attrs = ["dnsHostName", "lastLogon"]
572 res = self.ldb.search(expression="(&(codePage=x)(nextRid=x))",
573 attrs=attrs)
574 self.assertEquals(len(res), 0)
575 res = self.ldb.search(expression="(&(revision=x)(lastLogon=z))",
576 attrs=attrs)
577 self.assertEquals(len(res), 0)
579 # Search by disjunction of local attributes
580 res = self.ldb.search(expression="(|(revision=x)(dnsHostName=x))",
581 attrs=["dnsHostName", "lastLogon"])
582 self.assertEquals(len(res), 2)
583 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=Y"))
584 self.assertEquals(res[0]["dnsHostName"], "y")
585 self.assertEquals(res[0]["lastLogon"], "y")
586 self.assertEquals(str(res[1].dn), self.samba4.dn("cn=X"))
587 self.assertEquals(res[1]["dnsHostName"], "x")
588 self.assertEquals(res[1]["lastLogon"], "x")
590 # Search by disjunction of remote attributes
591 res = self.ldb.search(expression="(|(badPwdCount=x)(lastLogon=x))",
592 attrs=["dnsHostName", "lastLogon"])
593 self.assertEquals(len(res), 3)
594 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=B"))
595 self.assertFalse("dnsHostName" in res[0])
596 self.assertEquals(res[0]["lastLogon"], "y")
597 self.assertEquals(str(res[1].dn), self.samba4.dn("cn=X"))
598 self.assertEquals(res[1]["dnsHostName"], "x")
599 self.assertEquals(res[1]["lastLogon"], "x")
600 self.assertEquals(str(res[2].dn), self.samba4.dn("cn=A"))
601 self.assertFalse("dnsHostName" in res[2])
602 self.assertEquals(res[2]["lastLogon"], "x")
604 # Search by disjunction of local and remote attribute
605 res = self.ldb.search(expression="(|(revision=x)(lastLogon=y))",
606 attrs=["dnsHostName", "lastLogon"])
607 self.assertEquals(len(res), 3)
608 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=Y"))
609 self.assertEquals(res[0]["dnsHostName"], "y")
610 self.assertEquals(res[0]["lastLogon"], "y")
611 self.assertEquals(str(res[1].dn), self.samba4.dn("cn=B"))
612 self.assertFalse("dnsHostName" in res[1])
613 self.assertEquals(res[1]["lastLogon"], "y")
614 self.assertEquals(str(res[2].dn), self.samba4.dn("cn=X"))
615 self.assertEquals(res[2]["dnsHostName"], "x")
616 self.assertEquals(res[2]["lastLogon"], "x")
618 # Search by disjunction of local and remote attribute w/o match
619 res = self.ldb.search(expression="(|(codePage=y)(nextRid=z))",
620 attrs=["dnsHostName", "lastLogon"])
621 self.assertEquals(len(res), 0)
623 # Search by negated local attribute
624 res = self.ldb.search(expression="(!(revision=x))",
625 attrs=["dnsHostName", "lastLogon"])
626 self.assertEquals(len(res), 5)
627 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=B"))
628 self.assertTrue(not "dnsHostName" in res[0])
629 self.assertEquals(res[0]["lastLogon"], "y")
630 self.assertEquals(str(res[1].dn), self.samba4.dn("cn=A"))
631 self.assertTrue(not "dnsHostName" in res[1])
632 self.assertEquals(res[1]["lastLogon"], "x")
633 self.assertEquals(str(res[2].dn), self.samba4.dn("cn=Z"))
634 self.assertEquals(res[2]["dnsHostName"], "z")
635 self.assertEquals(res[2]["lastLogon"], "z")
636 self.assertEquals(str(res[3].dn), self.samba4.dn("cn=C"))
637 self.assertTrue(not "dnsHostName" in res[3])
638 self.assertEquals(res[3]["lastLogon"], "z")
640 # Search by negated remote attribute
641 res = self.ldb.search(expression="(!(description=x))",
642 attrs=["dnsHostName", "lastLogon"])
643 self.assertEquals(len(res), 3)
644 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=Z"))
645 self.assertEquals(res[0]["dnsHostName"], "z")
646 self.assertEquals(res[0]["lastLogon"], "z")
647 self.assertEquals(str(res[1].dn), self.samba4.dn("cn=C"))
648 self.assertTrue(not "dnsHostName" in res[1])
649 self.assertEquals(res[1]["lastLogon"], "z")
651 # Search by negated conjunction of local attributes
652 res = self.ldb.search(expression="(!(&(codePage=x)(revision=x)))",
653 attrs=["dnsHostName", "lastLogon"])
654 self.assertEquals(len(res), 5)
655 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=B"))
656 self.assertTrue(not "dnsHostName" in res[0])
657 self.assertEquals(res[0]["lastLogon"], "y")
658 self.assertEquals(str(res[1].dn), self.samba4.dn("cn=A"))
659 self.assertTrue(not "dnsHostName" in res[1])
660 self.assertEquals(res[1]["lastLogon"], "x")
661 self.assertEquals(str(res[2].dn), self.samba4.dn("cn=Z"))
662 self.assertEquals(res[2]["dnsHostName"], "z")
663 self.assertEquals(res[2]["lastLogon"], "z")
664 self.assertEquals(str(res[3].dn), self.samba4.dn("cn=C"))
665 self.assertTrue(not "dnsHostName" in res[3])
666 self.assertEquals(res[3]["lastLogon"], "z")
668 # Search by negated conjunction of remote attributes
669 res = self.ldb.search(expression="(!(&(lastLogon=x)(description=x)))",
670 attrs=["dnsHostName", "lastLogon"])
671 self.assertEquals(len(res), 5)
672 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=Y"))
673 self.assertEquals(res[0]["dnsHostName"], "y")
674 self.assertEquals(res[0]["lastLogon"], "y")
675 self.assertEquals(str(res[1].dn), self.samba4.dn("cn=B"))
676 self.assertTrue(not "dnsHostName" in res[1])
677 self.assertEquals(res[1]["lastLogon"], "y")
678 self.assertEquals(str(res[2].dn), self.samba4.dn("cn=Z"))
679 self.assertEquals(res[2]["dnsHostName"], "z")
680 self.assertEquals(res[2]["lastLogon"], "z")
681 self.assertEquals(str(res[3].dn), self.samba4.dn("cn=C"))
682 self.assertTrue(not "dnsHostName" in res[3])
683 self.assertEquals(res[3]["lastLogon"], "z")
685 # Search by negated conjunction of local and remote attribute
686 res = self.ldb.search(expression="(!(&(codePage=x)(description=x)))",
687 attrs=["dnsHostName", "lastLogon"])
688 self.assertEquals(len(res), 5)
689 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=B"))
690 self.assertTrue(not "dnsHostName" in res[0])
691 self.assertEquals(res[0]["lastLogon"], "y")
692 self.assertEquals(str(res[1].dn), self.samba4.dn("cn=A"))
693 self.assertTrue(not "dnsHostName" in res[1])
694 self.assertEquals(res[1]["lastLogon"], "x")
695 self.assertEquals(str(res[2].dn), self.samba4.dn("cn=Z"))
696 self.assertEquals(res[2]["dnsHostName"], "z")
697 self.assertEquals(res[2]["lastLogon"], "z")
698 self.assertEquals(str(res[3].dn), self.samba4.dn("cn=C"))
699 self.assertTrue(not "dnsHostName" in res[3])
700 self.assertEquals(res[3]["lastLogon"], "z")
702 # Search by negated disjunction of local attributes
703 res = self.ldb.search(expression="(!(|(revision=x)(dnsHostName=x)))",
704 attrs=["dnsHostName", "lastLogon"])
705 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=B"))
706 self.assertTrue(not "dnsHostName" in res[0])
707 self.assertEquals(res[0]["lastLogon"], "y")
708 self.assertEquals(str(res[1].dn), self.samba4.dn("cn=A"))
709 self.assertTrue(not "dnsHostName" in res[1])
710 self.assertEquals(res[1]["lastLogon"], "x")
711 self.assertEquals(str(res[2].dn), self.samba4.dn("cn=Z"))
712 self.assertEquals(res[2]["dnsHostName"], "z")
713 self.assertEquals(res[2]["lastLogon"], "z")
714 self.assertEquals(str(res[3].dn), self.samba4.dn("cn=C"))
715 self.assertTrue(not "dnsHostName" in res[3])
716 self.assertEquals(res[3]["lastLogon"], "z")
718 # Search by negated disjunction of remote attributes
719 res = self.ldb.search(expression="(!(|(badPwdCount=x)(lastLogon=x)))",
720 attrs=["dnsHostName", "lastLogon"])
721 self.assertEquals(len(res), 4)
722 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=Y"))
723 self.assertEquals(res[0]["dnsHostName"], "y")
724 self.assertEquals(res[0]["lastLogon"], "y")
725 self.assertEquals(str(res[1].dn), self.samba4.dn("cn=Z"))
726 self.assertEquals(res[1]["dnsHostName"], "z")
727 self.assertEquals(res[1]["lastLogon"], "z")
728 self.assertEquals(str(res[2].dn), self.samba4.dn("cn=C"))
729 self.assertTrue(not "dnsHostName" in res[2])
730 self.assertEquals(res[2]["lastLogon"], "z")
732 # Search by negated disjunction of local and remote attribute
733 res = self.ldb.search(expression="(!(|(revision=x)(lastLogon=y)))",
734 attrs=["dnsHostName", "lastLogon"])
735 self.assertEquals(len(res), 4)
736 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=A"))
737 self.assertTrue(not "dnsHostName" in res[0])
738 self.assertEquals(res[0]["lastLogon"], "x")
739 self.assertEquals(str(res[1].dn), self.samba4.dn("cn=Z"))
740 self.assertEquals(res[1]["dnsHostName"], "z")
741 self.assertEquals(res[1]["lastLogon"], "z")
742 self.assertEquals(str(res[2].dn), self.samba4.dn("cn=C"))
743 self.assertTrue(not "dnsHostName" in res[2])
744 self.assertEquals(res[2]["lastLogon"], "z")
746 # Search by complex parse tree
747 res = self.ldb.search(expression="(|(&(revision=x)(dnsHostName=x))(!(&(description=x)(nextRid=y)))(badPwdCount=y))", attrs=["dnsHostName", "lastLogon"])
748 self.assertEquals(len(res), 6)
749 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=B"))
750 self.assertTrue(not "dnsHostName" in res[0])
751 self.assertEquals(res[0]["lastLogon"], "y")
752 self.assertEquals(str(res[1].dn), self.samba4.dn("cn=X"))
753 self.assertEquals(res[1]["dnsHostName"], "x")
754 self.assertEquals(res[1]["lastLogon"], "x")
755 self.assertEquals(str(res[2].dn), self.samba4.dn("cn=A"))
756 self.assertTrue(not "dnsHostName" in res[2])
757 self.assertEquals(res[2]["lastLogon"], "x")
758 self.assertEquals(str(res[3].dn), self.samba4.dn("cn=Z"))
759 self.assertEquals(res[3]["dnsHostName"], "z")
760 self.assertEquals(res[3]["lastLogon"], "z")
761 self.assertEquals(str(res[4].dn), self.samba4.dn("cn=C"))
762 self.assertTrue(not "dnsHostName" in res[4])
763 self.assertEquals(res[4]["lastLogon"], "z")
765 # Clean up
766 dns = [self.samba4.dn("cn=%s" % n) for n in ["A","B","C","X","Y","Z"]]
767 for dn in dns:
768 self.ldb.delete(dn)
770 def test_map_modify_local(self):
771 """Modification of local records."""
772 # Add local record
773 dn = "cn=test,dc=idealx,dc=org"
774 self.ldb.add({"dn": dn,
775 "cn": "test",
776 "foo": "bar",
777 "revision": "1",
778 "description": "test"})
779 # Check it's there
780 attrs = ["foo", "revision", "description"]
781 res = self.ldb.search(dn, scope=SCOPE_BASE, attrs=attrs)
782 self.assertEquals(len(res), 1)
783 self.assertEquals(str(res[0].dn), dn)
784 self.assertEquals(res[0]["foo"], "bar")
785 self.assertEquals(res[0]["revision"], "1")
786 self.assertEquals(res[0]["description"], "test")
787 # Check it's not in the local db
788 res = self.samba4.db.search(expression="(cn=test)",
789 scope=SCOPE_DEFAULT, attrs=attrs)
790 self.assertEquals(len(res), 0)
791 # Check it's not in the remote db
792 res = self.samba3.db.search(expression="(cn=test)",
793 scope=SCOPE_DEFAULT, attrs=attrs)
794 self.assertEquals(len(res), 0)
796 # Modify local record
797 ldif = """
798 dn: """ + dn + """
799 replace: foo
800 foo: baz
801 replace: description
802 description: foo
804 self.ldb.modify_ldif(ldif)
805 # Check in local db
806 res = self.ldb.search(dn, scope=SCOPE_BASE, attrs=attrs)
807 self.assertEquals(len(res), 1)
808 self.assertEquals(str(res[0].dn), dn)
809 self.assertEquals(res[0]["foo"], "baz")
810 self.assertEquals(res[0]["revision"], "1")
811 self.assertEquals(res[0]["description"], "foo")
813 # Rename local record
814 dn2 = "cn=toast,dc=idealx,dc=org"
815 self.ldb.rename(dn, dn2)
816 # Check in local db
817 res = self.ldb.search(dn2, scope=SCOPE_BASE, attrs=attrs)
818 self.assertEquals(len(res), 1)
819 self.assertEquals(str(res[0].dn), dn2)
820 self.assertEquals(res[0]["foo"], "baz")
821 self.assertEquals(res[0]["revision"], "1")
822 self.assertEquals(res[0]["description"], "foo")
824 # Delete local record
825 self.ldb.delete(dn2)
826 # Check it's gone
827 res = self.ldb.search(dn2, scope=SCOPE_BASE)
828 self.assertEquals(len(res), 0)
830 def test_map_modify_remote_remote(self):
831 """Modification of remote data of remote records"""
832 # Add remote record
833 dn = self.samba4.dn("cn=test")
834 dn2 = self.samba3.dn("cn=test")
835 self.samba3.db.add({"dn": dn2,
836 "cn": "test",
837 "description": "foo",
838 "sambaBadPasswordCount": "3",
839 "sambaNextRid": "1001"})
840 # Check it's there
841 res = self.samba3.db.search(dn2, scope=SCOPE_BASE,
842 attrs=["description", "sambaBadPasswordCount", "sambaNextRid"])
843 self.assertEquals(len(res), 1)
844 self.assertEquals(str(res[0].dn), dn2)
845 self.assertEquals(res[0]["description"], "foo")
846 self.assertEquals(res[0]["sambaBadPasswordCount"], "3")
847 self.assertEquals(res[0]["sambaNextRid"], "1001")
848 # Check in mapped db
849 attrs = ["description", "badPwdCount", "nextRid"]
850 res = self.ldb.search(dn, scope=SCOPE_BASE, attrs=attrs, expression="")
851 self.assertEquals(len(res), 1)
852 self.assertEquals(str(res[0].dn), dn)
853 self.assertEquals(res[0]["description"], "foo")
854 self.assertEquals(res[0]["badPwdCount"], "3")
855 self.assertEquals(res[0]["nextRid"], "1001")
856 # Check in local db
857 res = self.samba4.db.search(dn, scope=SCOPE_BASE, attrs=attrs)
858 self.assertEquals(len(res), 0)
860 # Modify remote data of remote record
861 ldif = """
862 dn: """ + dn + """
863 replace: description
864 description: test
865 replace: badPwdCount
866 badPwdCount: 4
868 self.ldb.modify_ldif(ldif)
869 # Check in mapped db
870 res = self.ldb.search(dn, scope=SCOPE_BASE,
871 attrs=["description", "badPwdCount", "nextRid"])
872 self.assertEquals(len(res), 1)
873 self.assertEquals(str(res[0].dn), dn)
874 self.assertEquals(res[0]["description"], "test")
875 self.assertEquals(res[0]["badPwdCount"], "4")
876 self.assertEquals(res[0]["nextRid"], "1001")
877 # Check in remote db
878 res = self.samba3.db.search(dn2, scope=SCOPE_BASE,
879 attrs=["description", "sambaBadPasswordCount", "sambaNextRid"])
880 self.assertEquals(len(res), 1)
881 self.assertEquals(str(res[0].dn), dn2)
882 self.assertEquals(res[0]["description"], "test")
883 self.assertEquals(res[0]["sambaBadPasswordCount"], "4")
884 self.assertEquals(res[0]["sambaNextRid"], "1001")
886 # Rename remote record
887 dn2 = self.samba4.dn("cn=toast")
888 self.ldb.rename(dn, dn2)
889 # Check in mapped db
890 dn = dn2
891 res = self.ldb.search(dn, scope=SCOPE_BASE,
892 attrs=["description", "badPwdCount", "nextRid"])
893 self.assertEquals(len(res), 1)
894 self.assertEquals(str(res[0].dn), dn)
895 self.assertEquals(res[0]["description"], "test")
896 self.assertEquals(res[0]["badPwdCount"], "4")
897 self.assertEquals(res[0]["nextRid"], "1001")
898 # Check in remote db
899 dn2 = self.samba3.dn("cn=toast")
900 res = self.samba3.db.search(dn2, scope=SCOPE_BASE,
901 attrs=["description", "sambaBadPasswordCount", "sambaNextRid"])
902 self.assertEquals(len(res), 1)
903 self.assertEquals(str(res[0].dn), dn2)
904 self.assertEquals(res[0]["description"], "test")
905 self.assertEquals(res[0]["sambaBadPasswordCount"], "4")
906 self.assertEquals(res[0]["sambaNextRid"], "1001")
908 # Delete remote record
909 self.ldb.delete(dn)
910 # Check in mapped db that it's removed
911 res = self.ldb.search(dn, scope=SCOPE_BASE)
912 self.assertEquals(len(res), 0)
913 # Check in remote db
914 res = self.samba3.db.search(dn2, scope=SCOPE_BASE)
915 self.assertEquals(len(res), 0)
917 def test_map_modify_remote_local(self):
918 """Modification of local data of remote records"""
919 # Add remote record (same as before)
920 dn = self.samba4.dn("cn=test")
921 dn2 = self.samba3.dn("cn=test")
922 self.samba3.db.add({"dn": dn2,
923 "cn": "test",
924 "description": "foo",
925 "sambaBadPasswordCount": "3",
926 "sambaNextRid": "1001"})
928 # Modify local data of remote record
929 ldif = """
930 dn: """ + dn + """
931 add: revision
932 revision: 1
933 replace: description
934 description: test
937 self.ldb.modify_ldif(ldif)
938 # Check in mapped db
939 attrs = ["revision", "description"]
940 res = self.ldb.search(dn, scope=SCOPE_BASE, attrs=attrs)
941 self.assertEquals(len(res), 1)
942 self.assertEquals(str(res[0].dn), dn)
943 self.assertEquals(res[0]["description"], "test")
944 self.assertEquals(res[0]["revision"], "1")
945 # Check in remote db
946 res = self.samba3.db.search(dn2, scope=SCOPE_BASE, attrs=attrs)
947 self.assertEquals(len(res), 1)
948 self.assertEquals(str(res[0].dn), dn2)
949 self.assertEquals(res[0]["description"], "test")
950 self.assertTrue(not "revision" in res[0])
951 # Check in local db
952 res = self.samba4.db.search(dn, scope=SCOPE_BASE, attrs=attrs)
953 self.assertEquals(len(res), 1)
954 self.assertEquals(str(res[0].dn), dn)
955 self.assertTrue(not "description" in res[0])
956 self.assertEquals(res[0]["revision"], "1")
958 # Delete (newly) split record
959 self.ldb.delete(dn)
961 def test_map_modify_split(self):
962 """Testing modification of split records"""
963 # Add split record
964 dn = self.samba4.dn("cn=test")
965 dn2 = self.samba3.dn("cn=test")
966 self.ldb.add({
967 "dn": dn,
968 "cn": "test",
969 "description": "foo",
970 "badPwdCount": "3",
971 "nextRid": "1001",
972 "revision": "1"})
973 # Check it's there
974 attrs = ["description", "badPwdCount", "nextRid", "revision"]
975 res = self.ldb.search(dn, scope=SCOPE_BASE, attrs=attrs)
976 self.assertEquals(len(res), 1)
977 self.assertEquals(str(res[0].dn), dn)
978 self.assertEquals(res[0]["description"], "foo")
979 self.assertEquals(res[0]["badPwdCount"], "3")
980 self.assertEquals(res[0]["nextRid"], "1001")
981 self.assertEquals(res[0]["revision"], "1")
982 # Check in local db
983 res = self.samba4.db.search(dn, scope=SCOPE_BASE, attrs=attrs)
984 self.assertEquals(len(res), 1)
985 self.assertEquals(str(res[0].dn), dn)
986 self.assertTrue(not "description" in res[0])
987 self.assertTrue(not "badPwdCount" in res[0])
988 self.assertTrue(not "nextRid" in res[0])
989 self.assertEquals(res[0]["revision"], "1")
990 # Check in remote db
991 attrs = ["description", "sambaBadPasswordCount", "sambaNextRid",
992 "revision"]
993 res = self.samba3.db.search(dn2, scope=SCOPE_BASE, attrs=attrs)
994 self.assertEquals(len(res), 1)
995 self.assertEquals(str(res[0].dn), dn2)
996 self.assertEquals(res[0]["description"], "foo")
997 self.assertEquals(res[0]["sambaBadPasswordCount"], "3")
998 self.assertEquals(res[0]["sambaNextRid"], "1001")
999 self.assertTrue(not "revision" in res[0])
1001 # Modify of split record
1002 ldif = """
1003 dn: """ + dn + """
1004 replace: description
1005 description: test
1006 replace: badPwdCount
1007 badPwdCount: 4
1008 replace: revision
1009 revision: 2
1011 self.ldb.modify_ldif(ldif)
1012 # Check in mapped db
1013 attrs = ["description", "badPwdCount", "nextRid", "revision"]
1014 res = self.ldb.search(dn, scope=SCOPE_BASE, attrs=attrs)
1015 self.assertEquals(len(res), 1)
1016 self.assertEquals(str(res[0].dn), dn)
1017 self.assertEquals(res[0]["description"], "test")
1018 self.assertEquals(res[0]["badPwdCount"], "4")
1019 self.assertEquals(res[0]["nextRid"], "1001")
1020 self.assertEquals(res[0]["revision"], "2")
1021 # Check in local db
1022 res = self.samba4.db.search(dn, scope=SCOPE_BASE, attrs=attrs)
1023 self.assertEquals(len(res), 1)
1024 self.assertEquals(str(res[0].dn), dn)
1025 self.assertTrue(not "description" in res[0])
1026 self.assertTrue(not "badPwdCount" in res[0])
1027 self.assertTrue(not "nextRid" in res[0])
1028 self.assertEquals(res[0]["revision"], "2")
1029 # Check in remote db
1030 attrs = ["description", "sambaBadPasswordCount", "sambaNextRid",
1031 "revision"]
1032 res = self.samba3.db.search(dn2, scope=SCOPE_BASE, attrs=attrs)
1033 self.assertEquals(len(res), 1)
1034 self.assertEquals(str(res[0].dn), dn2)
1035 self.assertEquals(res[0]["description"], "test")
1036 self.assertEquals(res[0]["sambaBadPasswordCount"], "4")
1037 self.assertEquals(res[0]["sambaNextRid"], "1001")
1038 self.assertTrue(not "revision" in res[0])
1040 # Rename split record
1041 dn2 = self.samba4.dn("cn=toast")
1042 self.ldb.rename(dn, dn2)
1043 # Check in mapped db
1044 dn = dn2
1045 attrs = ["description", "badPwdCount", "nextRid", "revision"]
1046 res = self.ldb.search(dn, scope=SCOPE_BASE, attrs=attrs)
1047 self.assertEquals(len(res), 1)
1048 self.assertEquals(str(res[0].dn), dn)
1049 self.assertEquals(res[0]["description"], "test")
1050 self.assertEquals(res[0]["badPwdCount"], "4")
1051 self.assertEquals(res[0]["nextRid"], "1001")
1052 self.assertEquals(res[0]["revision"], "2")
1053 # Check in local db
1054 res = self.samba4.db.search(dn, scope=SCOPE_BASE, attrs=attrs)
1055 self.assertEquals(len(res), 1)
1056 self.assertEquals(str(res[0].dn), dn)
1057 self.assertTrue(not "description" in res[0])
1058 self.assertTrue(not "badPwdCount" in res[0])
1059 self.assertTrue(not "nextRid" in res[0])
1060 self.assertEquals(res[0]["revision"], "2")
1061 # Check in remote db
1062 dn2 = self.samba3.dn("cn=toast")
1063 res = self.samba3.db.search(dn2, scope=SCOPE_BASE,
1064 attrs=["description", "sambaBadPasswordCount", "sambaNextRid",
1065 "revision"])
1066 self.assertEquals(len(res), 1)
1067 self.assertEquals(str(res[0].dn), dn2)
1068 self.assertEquals(res[0]["description"], "test")
1069 self.assertEquals(res[0]["sambaBadPasswordCount"], "4")
1070 self.assertEquals(res[0]["sambaNextRid"], "1001")
1071 self.assertTrue(not "revision" in res[0])
1073 # Delete split record
1074 self.ldb.delete(dn)
1075 # Check in mapped db
1076 res = self.ldb.search(dn, scope=SCOPE_BASE)
1077 self.assertEquals(len(res), 0)
1078 # Check in local db
1079 res = self.samba4.db.search(dn, scope=SCOPE_BASE)
1080 self.assertEquals(len(res), 0)
1081 # Check in remote db
1082 res = self.samba3.db.search(dn2, scope=SCOPE_BASE)
1083 self.assertEquals(len(res), 0)