auth/ntlmssp: don't require NTLMSSP_SIGN for smb connections
[Samba.git] / python / samba / tests / samba3sam.py
blobd4347cd52c8c83f1c45fa069370a253587edf93a
1 # Unix SMB/CIFS implementation.
2 # Copyright (C) Jelmer Vernooij <jelmer@samba.org> 2005-2008
3 # Copyright (C) Martin Kuehl <mkhl@samba.org> 2006
5 # This is a Python port of the original in testprogs/ejs/samba3sam.js
7 # This program is free software; you can redistribute it and/or modify
8 # it under the terms of the GNU General Public License as published by
9 # the Free Software Foundation; either version 3 of the License, or
10 # (at your option) any later version.
12 # This program is distributed in the hope that it will be useful,
13 # but WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 # GNU General Public License for more details.
17 # You should have received a copy of the GNU General Public License
18 # along with this program. If not, see <http://www.gnu.org/licenses/>.
21 """Tests for the samba3sam LDB module, which maps Samba3 LDAP to AD LDAP."""
23 import os
24 import ldb
25 from ldb import SCOPE_DEFAULT, SCOPE_BASE
26 from samba import Ldb, substitute_var
27 from samba.tests import TestCaseInTempDir, env_loadparm
28 import samba.dcerpc.security
29 import samba.ndr
30 from samba.auth import system_session
31 from operator import attrgetter
34 def read_datafile(filename):
35 paths = [ "../../../../../testdata/samba3",
36 "../../../../testdata/samba3" ]
37 for p in paths:
38 datadir = os.path.join(os.path.dirname(__file__), p)
39 if os.path.exists(datadir):
40 break
41 return open(os.path.join(datadir, filename), 'r').read()
43 def ldb_debug(l, text):
44 print text
47 class MapBaseTestCase(TestCaseInTempDir):
48 """Base test case for mapping tests."""
50 def setup_modules(self, ldb, s3, s4):
51 ldb.add({"dn": "@MAP=samba3sam",
52 "@FROM": s4.basedn,
53 "@TO": "sambaDomainName=TESTS," + s3.basedn})
55 ldb.add({"dn": "@MODULES",
56 "@LIST": "rootdse,paged_results,server_sort,asq,samldb,password_hash,operational,objectguid,rdn_name,samba3sam,samba3sid,show_deleted,partition"})
58 ldb.add({"dn": "@PARTITION",
59 "partition": ["%s" % (s4.basedn_casefold),
60 "%s" % (s3.basedn_casefold)],
61 "replicateEntries": ["@ATTRIBUTES", "@INDEXLIST"],
62 "modules": "*:"})
64 def setUp(self):
65 self.lp = env_loadparm()
66 self.lp.set("workgroup", "TESTS")
67 self.lp.set("netbios name", "TESTS")
68 super(MapBaseTestCase, self).setUp()
70 def make_dn(basedn, rdn):
71 return "%s,sambaDomainName=TESTS,%s" % (rdn, basedn)
73 def make_s4dn(basedn, rdn):
74 return "%s,%s" % (rdn, basedn)
76 self.ldbfile = os.path.join(self.tempdir, "test.ldb")
77 self.ldburl = "tdb://" + self.ldbfile
79 tempdir = self.tempdir
81 class Target:
82 """Simple helper class that contains data for a specific SAM
83 connection."""
85 def __init__(self, basedn, dn, lp):
86 self.db = Ldb(lp=lp, session_info=system_session())
87 self.db.set_opaque("skip_allocate_sids", "true");
88 self.basedn = basedn
89 self.basedn_casefold = ldb.Dn(self.db, basedn).get_casefold()
90 self.substvars = {"BASEDN": self.basedn}
91 self.file = os.path.join(tempdir, "%s.ldb" % self.basedn_casefold)
92 self.url = "tdb://" + self.file
93 self._dn = dn
95 def dn(self, rdn):
96 return self._dn(self.basedn, rdn)
98 def connect(self):
99 return self.db.connect(self.url)
101 def setup_data(self, path):
102 self.add_ldif(read_datafile(path))
104 def subst(self, text):
105 return substitute_var(text, self.substvars)
107 def add_ldif(self, ldif):
108 self.db.add_ldif(self.subst(ldif))
110 def modify_ldif(self, ldif):
111 self.db.modify_ldif(self.subst(ldif))
113 self.samba4 = Target("dc=vernstok,dc=nl", make_s4dn, self.lp)
114 self.samba3 = Target("cn=Samba3Sam", make_dn, self.lp)
116 self.samba3.connect()
117 self.samba4.connect()
119 def tearDown(self):
120 os.unlink(self.ldbfile)
121 os.unlink(self.samba3.file)
122 os.unlink(self.samba4.file)
123 pdir = "%s.d" % self.ldbfile
124 mdata = os.path.join(pdir, "metadata.tdb")
125 if os.path.exists(mdata):
126 os.unlink(mdata)
127 os.rmdir(pdir)
128 super(MapBaseTestCase, self).tearDown()
130 def assertSidEquals(self, text, ndr_sid):
131 sid_obj1 = samba.ndr.ndr_unpack(samba.dcerpc.security.dom_sid,
132 str(ndr_sid[0]))
133 sid_obj2 = samba.dcerpc.security.dom_sid(text)
134 self.assertEquals(sid_obj1, sid_obj2)
137 class Samba3SamTestCase(MapBaseTestCase):
139 def setUp(self):
140 super(Samba3SamTestCase, self).setUp()
141 ldb = Ldb(self.ldburl, lp=self.lp, session_info=system_session())
142 ldb.set_opaque("skip_allocate_sids", "true");
143 self.samba3.setup_data("samba3.ldif")
144 ldif = read_datafile("provision_samba3sam.ldif")
145 ldb.add_ldif(self.samba4.subst(ldif))
146 self.setup_modules(ldb, self.samba3, self.samba4)
147 del ldb
148 self.ldb = Ldb(self.ldburl, lp=self.lp, session_info=system_session())
149 self.ldb.set_opaque("skip_allocate_sids", "true");
151 def test_search_non_mapped(self):
152 """Looking up by non-mapped attribute"""
153 msg = self.ldb.search(expression="(cn=Administrator)")
154 self.assertEquals(len(msg), 1)
155 self.assertEquals(msg[0]["cn"], "Administrator")
157 def test_search_non_mapped(self):
158 """Looking up by mapped attribute"""
159 msg = self.ldb.search(expression="(name=Backup Operators)")
160 self.assertEquals(len(msg), 1)
161 self.assertEquals(str(msg[0]["name"]), "Backup Operators")
163 def test_old_name_of_renamed(self):
164 """Looking up by old name of renamed attribute"""
165 msg = self.ldb.search(expression="(displayName=Backup Operators)")
166 self.assertEquals(len(msg), 0)
168 def test_mapped_containing_sid(self):
169 """Looking up mapped entry containing SID"""
170 msg = self.ldb.search(expression="(cn=Replicator)")
171 self.assertEquals(len(msg), 1)
172 self.assertEquals(str(msg[0].dn),
173 "cn=Replicator,ou=Groups,dc=vernstok,dc=nl")
174 self.assertTrue("objectSid" in msg[0])
175 self.assertSidEquals("S-1-5-21-4231626423-2410014848-2360679739-1052",
176 msg[0]["objectSid"])
177 oc = set(msg[0]["objectClass"])
178 self.assertEquals(oc, set(["group"]))
180 def test_search_by_objclass(self):
181 """Looking up by objectClass"""
182 msg = self.ldb.search(expression="(|(objectClass=user)(cn=Administrator))")
183 self.assertEquals(set([str(m.dn) for m in msg]),
184 set(["unixName=Administrator,ou=Users,dc=vernstok,dc=nl",
185 "unixName=nobody,ou=Users,dc=vernstok,dc=nl"]))
187 def test_s3sam_modify(self):
188 # Adding a record that will be fallbacked
189 self.ldb.add({
190 "dn": "cn=Foo",
191 "foo": "bar",
192 "blah": "Blie",
193 "cn": "Foo",
194 "showInAdvancedViewOnly": "TRUE"})
196 # Checking for existence of record (local)
197 # TODO: This record must be searched in the local database, which is
198 # currently only supported for base searches
199 # msg = ldb.search(expression="(cn=Foo)", ['foo','blah','cn','showInAdvancedViewOnly')]
200 # TODO: Actually, this version should work as well but doesn't...
203 msg = self.ldb.search(expression="(cn=Foo)", base="cn=Foo",
204 scope=SCOPE_BASE,
205 attrs=['foo','blah','cn','showInAdvancedViewOnly'])
206 self.assertEquals(len(msg), 1)
207 self.assertEquals(str(msg[0]["showInAdvancedViewOnly"]), "TRUE")
208 self.assertEquals(str(msg[0]["foo"]), "bar")
209 self.assertEquals(str(msg[0]["blah"]), "Blie")
211 # Adding record that will be mapped
212 self.ldb.add({"dn": "cn=Niemand,cn=Users,dc=vernstok,dc=nl",
213 "objectClass": "user",
214 "unixName": "bin",
215 "sambaUnicodePwd": "geheim",
216 "cn": "Niemand"})
218 # Checking for existence of record (remote)
219 msg = self.ldb.search(expression="(unixName=bin)",
220 attrs=['unixName','cn','dn', 'sambaUnicodePwd'])
221 self.assertEquals(len(msg), 1)
222 self.assertEquals(str(msg[0]["cn"]), "Niemand")
223 self.assertEquals(str(msg[0]["sambaUnicodePwd"]), "geheim")
225 # Checking for existence of record (local && remote)
226 msg = self.ldb.search(expression="(&(unixName=bin)(sambaUnicodePwd=geheim))",
227 attrs=['unixName','cn','dn', 'sambaUnicodePwd'])
228 self.assertEquals(len(msg), 1) # TODO: should check with more records
229 self.assertEquals(str(msg[0]["cn"]), "Niemand")
230 self.assertEquals(str(msg[0]["unixName"]), "bin")
231 self.assertEquals(str(msg[0]["sambaUnicodePwd"]), "geheim")
233 # Checking for existence of record (local || remote)
234 msg = self.ldb.search(expression="(|(unixName=bin)(sambaUnicodePwd=geheim))",
235 attrs=['unixName','cn','dn', 'sambaUnicodePwd'])
236 #print "got %d replies" % len(msg)
237 self.assertEquals(len(msg), 1) # TODO: should check with more records
238 self.assertEquals(str(msg[0]["cn"]), "Niemand")
239 self.assertEquals(str(msg[0]["unixName"]), "bin")
240 self.assertEquals(str(msg[0]["sambaUnicodePwd"]), "geheim")
242 # Checking for data in destination database
243 msg = self.samba3.db.search(expression="(cn=Niemand)")
244 self.assertTrue(len(msg) >= 1)
245 self.assertEquals(str(msg[0]["sambaSID"]),
246 "S-1-5-21-4231626423-2410014848-2360679739-2001")
247 self.assertEquals(str(msg[0]["displayName"]), "Niemand")
249 # Adding attribute...
250 self.ldb.modify_ldif("""
251 dn: cn=Niemand,cn=Users,dc=vernstok,dc=nl
252 changetype: modify
253 add: description
254 description: Blah
255 """)
257 # Checking whether changes are still there...
258 msg = self.ldb.search(expression="(cn=Niemand)")
259 self.assertTrue(len(msg) >= 1)
260 self.assertEquals(str(msg[0]["cn"]), "Niemand")
261 self.assertEquals(str(msg[0]["description"]), "Blah")
263 # Modifying attribute...
264 self.ldb.modify_ldif("""
265 dn: cn=Niemand,cn=Users,dc=vernstok,dc=nl
266 changetype: modify
267 replace: description
268 description: Blie
269 """)
271 # Checking whether changes are still there...
272 msg = self.ldb.search(expression="(cn=Niemand)")
273 self.assertTrue(len(msg) >= 1)
274 self.assertEquals(str(msg[0]["description"]), "Blie")
276 # Deleting attribute...
277 self.ldb.modify_ldif("""
278 dn: cn=Niemand,cn=Users,dc=vernstok,dc=nl
279 changetype: modify
280 delete: description
281 """)
283 # Checking whether changes are no longer there...
284 msg = self.ldb.search(expression="(cn=Niemand)")
285 self.assertTrue(len(msg) >= 1)
286 self.assertTrue(not "description" in msg[0])
288 # Renaming record...
289 self.ldb.rename("cn=Niemand,cn=Users,dc=vernstok,dc=nl",
290 "cn=Niemand2,cn=Users,dc=vernstok,dc=nl")
292 # Checking whether DN has changed...
293 msg = self.ldb.search(expression="(cn=Niemand2)")
294 self.assertEquals(len(msg), 1)
295 self.assertEquals(str(msg[0].dn),
296 "cn=Niemand2,cn=Users,dc=vernstok,dc=nl")
298 # Deleting record...
299 self.ldb.delete("cn=Niemand2,cn=Users,dc=vernstok,dc=nl")
301 # Checking whether record is gone...
302 msg = self.ldb.search(expression="(cn=Niemand2)")
303 self.assertEquals(len(msg), 0)
306 class MapTestCase(MapBaseTestCase):
308 def setUp(self):
309 super(MapTestCase, self).setUp()
310 ldb = Ldb(self.ldburl, lp=self.lp, session_info=system_session())
311 ldb.set_opaque("skip_allocate_sids", "true");
312 ldif = read_datafile("provision_samba3sam.ldif")
313 ldb.add_ldif(self.samba4.subst(ldif))
314 self.setup_modules(ldb, self.samba3, self.samba4)
315 del ldb
316 self.ldb = Ldb(self.ldburl, lp=self.lp, session_info=system_session())
317 self.ldb.set_opaque("skip_allocate_sids", "true");
319 def test_map_search(self):
320 """Running search tests on mapped data."""
321 self.samba3.db.add({
322 "dn": "sambaDomainName=TESTS," + self.samba3.basedn,
323 "objectclass": ["sambaDomain", "top"],
324 "sambaSID": "S-1-5-21-4231626423-2410014848-2360679739",
325 "sambaNextRid": "2000",
326 "sambaDomainName": "TESTS"
329 # Add a set of split records
330 self.ldb.add_ldif("""
331 dn: """+ self.samba4.dn("cn=Domain Users") + """
332 objectClass: group
333 cn: Domain Users
334 objectSid: S-1-5-21-4231626423-2410014848-2360679739-513
335 """)
337 # Add a set of split records
338 self.ldb.add_ldif("""
339 dn: """+ self.samba4.dn("cn=X") + """
340 objectClass: user
341 cn: X
342 codePage: x
343 revision: x
344 dnsHostName: x
345 nextRid: y
346 lastLogon: x
347 description: x
348 objectSid: S-1-5-21-4231626423-2410014848-2360679739-1052
349 """)
351 self.ldb.add({
352 "dn": self.samba4.dn("cn=Y"),
353 "objectClass": "top",
354 "cn": "Y",
355 "codePage": "x",
356 "revision": "x",
357 "dnsHostName": "y",
358 "nextRid": "y",
359 "lastLogon": "y",
360 "description": "x"})
362 self.ldb.add({
363 "dn": self.samba4.dn("cn=Z"),
364 "objectClass": "top",
365 "cn": "Z",
366 "codePage": "x",
367 "revision": "y",
368 "dnsHostName": "z",
369 "nextRid": "y",
370 "lastLogon": "z",
371 "description": "y"})
373 # Add a set of remote records
375 self.samba3.db.add({
376 "dn": self.samba3.dn("cn=A"),
377 "objectClass": "posixAccount",
378 "cn": "A",
379 "sambaNextRid": "x",
380 "sambaBadPasswordCount": "x",
381 "sambaLogonTime": "x",
382 "description": "x",
383 "sambaSID": "S-1-5-21-4231626423-2410014848-2360679739-1052",
384 "sambaPrimaryGroupSID": "S-1-5-21-4231626423-2410014848-2360679739-512"})
386 self.samba3.db.add({
387 "dn": self.samba3.dn("cn=B"),
388 "objectClass": "top",
389 "cn": "B",
390 "sambaNextRid": "x",
391 "sambaBadPasswordCount": "x",
392 "sambaLogonTime": "y",
393 "description": "x"})
395 self.samba3.db.add({
396 "dn": self.samba3.dn("cn=C"),
397 "objectClass": "top",
398 "cn": "C",
399 "sambaNextRid": "x",
400 "sambaBadPasswordCount": "y",
401 "sambaLogonTime": "z",
402 "description": "y"})
404 # Testing search by DN
406 # Search remote record by local DN
407 dn = self.samba4.dn("cn=A")
408 res = self.ldb.search(dn, scope=SCOPE_BASE,
409 attrs=["dnsHostName", "lastLogon"])
410 self.assertEquals(len(res), 1)
411 self.assertEquals(str(res[0].dn), dn)
412 self.assertTrue(not "dnsHostName" in res[0])
413 self.assertEquals(str(res[0]["lastLogon"]), "x")
415 # Search remote record by remote DN
416 dn = self.samba3.dn("cn=A")
417 res = self.samba3.db.search(dn, scope=SCOPE_BASE,
418 attrs=["dnsHostName", "lastLogon", "sambaLogonTime"])
419 self.assertEquals(len(res), 1)
420 self.assertEquals(str(res[0].dn), dn)
421 self.assertTrue(not "dnsHostName" in res[0])
422 self.assertTrue(not "lastLogon" in res[0])
423 self.assertEquals(str(res[0]["sambaLogonTime"]), "x")
425 # Search split record by local DN
426 dn = self.samba4.dn("cn=X")
427 res = self.ldb.search(dn, scope=SCOPE_BASE,
428 attrs=["dnsHostName", "lastLogon"])
429 self.assertEquals(len(res), 1)
430 self.assertEquals(str(res[0].dn), dn)
431 self.assertEquals(str(res[0]["dnsHostName"]), "x")
432 self.assertEquals(str(res[0]["lastLogon"]), "x")
434 # Search split record by remote DN
435 dn = self.samba3.dn("cn=X")
436 res = self.samba3.db.search(dn, scope=SCOPE_BASE,
437 attrs=["dnsHostName", "lastLogon", "sambaLogonTime"])
438 self.assertEquals(len(res), 1)
439 self.assertEquals(str(res[0].dn), dn)
440 self.assertTrue(not "dnsHostName" in res[0])
441 self.assertTrue(not "lastLogon" in res[0])
442 self.assertEquals(str(res[0]["sambaLogonTime"]), "x")
444 # Testing search by attribute
446 # Search by ignored attribute
447 res = self.ldb.search(expression="(revision=x)", scope=SCOPE_DEFAULT,
448 attrs=["dnsHostName", "lastLogon"])
449 self.assertEquals(len(res), 2)
450 res = sorted(res, key=attrgetter('dn'))
451 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=X"))
452 self.assertEquals(str(res[0]["dnsHostName"]), "x")
453 self.assertEquals(str(res[0]["lastLogon"]), "x")
454 self.assertEquals(str(res[1].dn), self.samba4.dn("cn=Y"))
455 self.assertEquals(str(res[1]["dnsHostName"]), "y")
456 self.assertEquals(str(res[1]["lastLogon"]), "y")
458 # Search by kept attribute
459 res = self.ldb.search(expression="(description=y)",
460 scope=SCOPE_DEFAULT, attrs=["dnsHostName", "lastLogon"])
461 self.assertEquals(len(res), 2)
462 res = sorted(res, key=attrgetter('dn'))
463 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=C"))
464 self.assertTrue(not "dnsHostName" in res[0])
465 self.assertEquals(str(res[0]["lastLogon"]), "z")
466 self.assertEquals(str(res[1].dn), self.samba4.dn("cn=Z"))
467 self.assertEquals(str(res[1]["dnsHostName"]), "z")
468 self.assertEquals(str(res[1]["lastLogon"]), "z")
470 # Search by renamed attribute
471 res = self.ldb.search(expression="(badPwdCount=x)", scope=SCOPE_DEFAULT,
472 attrs=["dnsHostName", "lastLogon"])
473 self.assertEquals(len(res), 2)
474 res = sorted(res, key=attrgetter('dn'))
475 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=A"))
476 self.assertTrue(not "dnsHostName" in res[0])
477 self.assertEquals(str(res[0]["lastLogon"]), "x")
478 self.assertEquals(str(res[1].dn), self.samba4.dn("cn=B"))
479 self.assertTrue(not "dnsHostName" in res[1])
480 self.assertEquals(str(res[1]["lastLogon"]), "y")
482 # Search by converted attribute
483 # TODO:
484 # Using the SID directly in the parse tree leads to conversion
485 # errors, letting the search fail with no results.
486 #res = self.ldb.search("(objectSid=S-1-5-21-4231626423-2410014848-2360679739-1052)", scope=SCOPE_DEFAULT, attrs)
487 res = self.ldb.search(expression="(objectSid=*)", base=None, scope=SCOPE_DEFAULT, attrs=["dnsHostName", "lastLogon", "objectSid"])
488 self.assertEquals(len(res), 4)
489 res = sorted(res, key=attrgetter('dn'))
490 self.assertEquals(str(res[1].dn), self.samba4.dn("cn=X"))
491 self.assertEquals(str(res[1]["dnsHostName"]), "x")
492 self.assertEquals(str(res[1]["lastLogon"]), "x")
493 self.assertSidEquals("S-1-5-21-4231626423-2410014848-2360679739-1052",
494 res[1]["objectSid"])
495 self.assertTrue("objectSid" in res[1])
496 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=A"))
497 self.assertTrue(not "dnsHostName" in res[0])
498 self.assertEquals(str(res[0]["lastLogon"]), "x")
499 self.assertSidEquals("S-1-5-21-4231626423-2410014848-2360679739-1052",
500 res[0]["objectSid"])
501 self.assertTrue("objectSid" in res[0])
503 # Search by generated attribute
504 # In most cases, this even works when the mapping is missing
505 # a `convert_operator' by enumerating the remote db.
506 res = self.ldb.search(expression="(primaryGroupID=512)",
507 attrs=["dnsHostName", "lastLogon", "primaryGroupID"])
508 self.assertEquals(len(res), 1)
509 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=A"))
510 self.assertTrue(not "dnsHostName" in res[0])
511 self.assertEquals(str(res[0]["lastLogon"]), "x")
512 self.assertEquals(str(res[0]["primaryGroupID"]), "512")
514 # Note that Xs "objectSid" seems to be fine in the previous search for
515 # "objectSid"...
516 #res = ldb.search(expression="(primaryGroupID=*)", NULL, ldb. SCOPE_DEFAULT, attrs)
517 #print len(res) + " results found"
518 #for i in range(len(res)):
519 # for (obj in res[i]) {
520 # print obj + ": " + res[i][obj]
522 # print "---"
525 # Search by remote name of renamed attribute */
526 res = self.ldb.search(expression="(sambaBadPasswordCount=*)",
527 attrs=["dnsHostName", "lastLogon"])
528 self.assertEquals(len(res), 0)
530 # Search by objectClass
531 attrs = ["dnsHostName", "lastLogon", "objectClass"]
532 res = self.ldb.search(expression="(objectClass=user)", attrs=attrs)
533 self.assertEquals(len(res), 2)
534 res = sorted(res, key=attrgetter('dn'))
535 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=A"))
536 self.assertTrue(not "dnsHostName" in res[0])
537 self.assertEquals(str(res[0]["lastLogon"]), "x")
538 self.assertEquals(str(res[0]["objectClass"][0]), "user")
539 self.assertEquals(str(res[1].dn), self.samba4.dn("cn=X"))
540 self.assertEquals(str(res[1]["dnsHostName"]), "x")
541 self.assertEquals(str(res[1]["lastLogon"]), "x")
542 self.assertEquals(str(res[1]["objectClass"][0]), "user")
544 # Prove that the objectClass is actually used for the search
545 res = self.ldb.search(expression="(|(objectClass=user)(badPwdCount=x))",
546 attrs=attrs)
547 self.assertEquals(len(res), 3)
548 res = sorted(res, key=attrgetter('dn'))
549 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=A"))
550 self.assertTrue(not "dnsHostName" in res[0])
551 self.assertEquals(str(res[0]["lastLogon"]), "x")
552 self.assertEquals(res[0]["objectClass"][0], "user")
553 self.assertEquals(str(res[1].dn), self.samba4.dn("cn=B"))
554 self.assertTrue(not "dnsHostName" in res[1])
555 self.assertEquals(str(res[1]["lastLogon"]), "y")
556 self.assertEquals(set(res[1]["objectClass"]), set(["top"]))
557 self.assertEquals(str(res[2].dn), self.samba4.dn("cn=X"))
558 self.assertEquals(str(res[2]["dnsHostName"]), "x")
559 self.assertEquals(str(res[2]["lastLogon"]), "x")
560 self.assertEquals(str(res[2]["objectClass"][0]), "user")
562 # Testing search by parse tree
564 # Search by conjunction of local attributes
565 res = self.ldb.search(expression="(&(codePage=x)(revision=x))",
566 attrs=["dnsHostName", "lastLogon"])
567 self.assertEquals(len(res), 2)
568 res = sorted(res, key=attrgetter('dn'))
569 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=X"))
570 self.assertEquals(str(res[0]["dnsHostName"]), "x")
571 self.assertEquals(str(res[0]["lastLogon"]), "x")
572 self.assertEquals(str(res[1].dn), self.samba4.dn("cn=Y"))
573 self.assertEquals(str(res[1]["dnsHostName"]), "y")
574 self.assertEquals(str(res[1]["lastLogon"]), "y")
576 # Search by conjunction of remote attributes
577 res = self.ldb.search(expression="(&(lastLogon=x)(description=x))",
578 attrs=["dnsHostName", "lastLogon"])
579 self.assertEquals(len(res), 2)
580 res = sorted(res, key=attrgetter('dn'))
581 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=A"))
582 self.assertTrue(not "dnsHostName" in res[0])
583 self.assertEquals(str(res[0]["lastLogon"]), "x")
584 self.assertEquals(str(res[1].dn), self.samba4.dn("cn=X"))
585 self.assertEquals(str(res[1]["dnsHostName"]), "x")
586 self.assertEquals(str(res[1]["lastLogon"]), "x")
588 # Search by conjunction of local and remote attribute
589 res = self.ldb.search(expression="(&(codePage=x)(description=x))",
590 attrs=["dnsHostName", "lastLogon"])
591 self.assertEquals(len(res), 2)
592 res = sorted(res, key=attrgetter('dn'))
593 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=X"))
594 self.assertEquals(str(res[0]["dnsHostName"]), "x")
595 self.assertEquals(str(res[0]["lastLogon"]), "x")
596 self.assertEquals(str(res[1].dn), self.samba4.dn("cn=Y"))
597 self.assertEquals(str(res[1]["dnsHostName"]), "y")
598 self.assertEquals(str(res[1]["lastLogon"]), "y")
600 # Search by conjunction of local and remote attribute w/o match
601 attrs = ["dnsHostName", "lastLogon"]
602 res = self.ldb.search(expression="(&(codePage=x)(nextRid=x))",
603 attrs=attrs)
604 self.assertEquals(len(res), 0)
605 res = self.ldb.search(expression="(&(revision=x)(lastLogon=z))",
606 attrs=attrs)
607 self.assertEquals(len(res), 0)
609 # Search by disjunction of local attributes
610 res = self.ldb.search(expression="(|(revision=x)(dnsHostName=x))",
611 attrs=["dnsHostName", "lastLogon"])
612 self.assertEquals(len(res), 2)
613 res = sorted(res, key=attrgetter('dn'))
614 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=X"))
615 self.assertEquals(str(res[0]["dnsHostName"]), "x")
616 self.assertEquals(str(res[0]["lastLogon"]), "x")
617 self.assertEquals(str(res[1].dn), self.samba4.dn("cn=Y"))
618 self.assertEquals(str(res[1]["dnsHostName"]), "y")
619 self.assertEquals(str(res[1]["lastLogon"]), "y")
621 # Search by disjunction of remote attributes
622 res = self.ldb.search(expression="(|(badPwdCount=x)(lastLogon=x))",
623 attrs=["dnsHostName", "lastLogon"])
624 self.assertEquals(len(res), 3)
625 res = sorted(res, key=attrgetter('dn'))
626 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=A"))
627 self.assertFalse("dnsHostName" in res[0])
628 self.assertEquals(str(res[0]["lastLogon"]), "x")
629 self.assertEquals(str(res[1].dn), self.samba4.dn("cn=B"))
630 self.assertFalse("dnsHostName" in res[1])
631 self.assertEquals(str(res[1]["lastLogon"]), "y")
632 self.assertEquals(str(res[2].dn), self.samba4.dn("cn=X"))
633 self.assertEquals(str(res[2]["dnsHostName"]), "x")
634 self.assertEquals(str(res[2]["lastLogon"]), "x")
636 # Search by disjunction of local and remote attribute
637 res = self.ldb.search(expression="(|(revision=x)(lastLogon=y))",
638 attrs=["dnsHostName", "lastLogon"])
639 self.assertEquals(len(res), 3)
640 res = sorted(res, key=attrgetter('dn'))
641 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=B"))
642 self.assertFalse("dnsHostName" in res[0])
643 self.assertEquals(str(res[0]["lastLogon"]), "y")
644 self.assertEquals(str(res[1].dn), self.samba4.dn("cn=X"))
645 self.assertEquals(str(res[1]["dnsHostName"]), "x")
646 self.assertEquals(str(res[1]["lastLogon"]), "x")
647 self.assertEquals(str(res[2].dn), self.samba4.dn("cn=Y"))
648 self.assertEquals(str(res[2]["dnsHostName"]), "y")
649 self.assertEquals(str(res[2]["lastLogon"]), "y")
651 # Search by disjunction of local and remote attribute w/o match
652 res = self.ldb.search(expression="(|(codePage=y)(nextRid=z))",
653 attrs=["dnsHostName", "lastLogon"])
654 self.assertEquals(len(res), 0)
656 # Search by negated local attribute
657 res = self.ldb.search(expression="(!(revision=x))",
658 attrs=["dnsHostName", "lastLogon"])
659 self.assertEquals(len(res), 6)
660 res = sorted(res, key=attrgetter('dn'))
661 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=A"))
662 self.assertTrue(not "dnsHostName" in res[0])
663 self.assertEquals(str(res[0]["lastLogon"]), "x")
664 self.assertEquals(str(res[1].dn), self.samba4.dn("cn=B"))
665 self.assertTrue(not "dnsHostName" in res[1])
666 self.assertEquals(str(res[1]["lastLogon"]), "y")
667 self.assertEquals(str(res[2].dn), self.samba4.dn("cn=C"))
668 self.assertTrue(not "dnsHostName" in res[2])
669 self.assertEquals(str(res[2]["lastLogon"]), "z")
670 self.assertEquals(str(res[3].dn), self.samba4.dn("cn=Z"))
671 self.assertEquals(str(res[3]["dnsHostName"]), "z")
672 self.assertEquals(str(res[3]["lastLogon"]), "z")
674 # Search by negated remote attribute
675 res = self.ldb.search(expression="(!(description=x))",
676 attrs=["dnsHostName", "lastLogon"])
677 self.assertEquals(len(res), 4)
678 res = sorted(res, key=attrgetter('dn'))
679 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=C"))
680 self.assertTrue(not "dnsHostName" in res[0])
681 self.assertEquals(str(res[0]["lastLogon"]), "z")
682 self.assertEquals(str(res[1].dn), self.samba4.dn("cn=Z"))
683 self.assertEquals(str(res[1]["dnsHostName"]), "z")
684 self.assertEquals(str(res[1]["lastLogon"]), "z")
686 # Search by negated conjunction of local attributes
687 res = self.ldb.search(expression="(!(&(codePage=x)(revision=x)))",
688 attrs=["dnsHostName", "lastLogon"])
689 self.assertEquals(len(res), 6)
690 res = sorted(res, key=attrgetter('dn'))
691 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=A"))
692 self.assertTrue(not "dnsHostName" in res[0])
693 self.assertEquals(str(res[0]["lastLogon"]), "x")
694 self.assertEquals(str(res[1].dn), self.samba4.dn("cn=B"))
695 self.assertTrue(not "dnsHostName" in res[1])
696 self.assertEquals(str(res[1]["lastLogon"]), "y")
697 self.assertEquals(str(res[2].dn), self.samba4.dn("cn=C"))
698 self.assertTrue(not "dnsHostName" in res[2])
699 self.assertEquals(str(res[2]["lastLogon"]), "z")
700 self.assertEquals(str(res[3].dn), self.samba4.dn("cn=Z"))
701 self.assertEquals(str(res[3]["dnsHostName"]), "z")
702 self.assertEquals(str(res[3]["lastLogon"]), "z")
704 # Search by negated conjunction of remote attributes
705 res = self.ldb.search(expression="(!(&(lastLogon=x)(description=x)))",
706 attrs=["dnsHostName", "lastLogon"])
707 self.assertEquals(len(res), 6)
708 res = sorted(res, key=attrgetter('dn'))
709 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=B"))
710 self.assertTrue(not "dnsHostName" in res[0])
711 self.assertEquals(str(res[0]["lastLogon"]), "y")
712 self.assertEquals(str(res[1].dn), self.samba4.dn("cn=C"))
713 self.assertTrue(not "dnsHostName" in res[1])
714 self.assertEquals(str(res[1]["lastLogon"]), "z")
715 self.assertEquals(str(res[2].dn), self.samba4.dn("cn=Y"))
716 self.assertEquals(str(res[2]["dnsHostName"]), "y")
717 self.assertEquals(str(res[2]["lastLogon"]), "y")
718 self.assertEquals(str(res[3].dn), self.samba4.dn("cn=Z"))
719 self.assertEquals(str(res[3]["dnsHostName"]), "z")
720 self.assertEquals(str(res[3]["lastLogon"]), "z")
722 # Search by negated conjunction of local and remote attribute
723 res = self.ldb.search(expression="(!(&(codePage=x)(description=x)))",
724 attrs=["dnsHostName", "lastLogon"])
725 self.assertEquals(len(res), 6)
726 res = sorted(res, key=attrgetter('dn'))
727 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=A"))
728 self.assertTrue(not "dnsHostName" in res[0])
729 self.assertEquals(str(res[0]["lastLogon"]), "x")
730 self.assertEquals(str(res[1].dn), self.samba4.dn("cn=B"))
731 self.assertTrue(not "dnsHostName" in res[1])
732 self.assertEquals(str(res[1]["lastLogon"]), "y")
733 self.assertEquals(str(res[2].dn), self.samba4.dn("cn=C"))
734 self.assertTrue(not "dnsHostName" in res[2])
735 self.assertEquals(str(res[2]["lastLogon"]), "z")
736 self.assertEquals(str(res[3].dn), self.samba4.dn("cn=Z"))
737 self.assertEquals(str(res[3]["dnsHostName"]), "z")
738 self.assertEquals(str(res[3]["lastLogon"]), "z")
740 # Search by negated disjunction of local attributes
741 res = self.ldb.search(expression="(!(|(revision=x)(dnsHostName=x)))",
742 attrs=["dnsHostName", "lastLogon"])
743 res = sorted(res, key=attrgetter('dn'))
744 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=A"))
745 self.assertTrue(not "dnsHostName" in res[0])
746 self.assertEquals(str(res[0]["lastLogon"]), "x")
747 self.assertEquals(str(res[1].dn), self.samba4.dn("cn=B"))
748 self.assertTrue(not "dnsHostName" in res[1])
749 self.assertEquals(str(res[1]["lastLogon"]), "y")
750 self.assertEquals(str(res[2].dn), self.samba4.dn("cn=C"))
751 self.assertTrue(not "dnsHostName" in res[2])
752 self.assertEquals(str(res[2]["lastLogon"]), "z")
753 self.assertEquals(str(res[3].dn), self.samba4.dn("cn=Z"))
754 self.assertEquals(str(res[3]["dnsHostName"]), "z")
755 self.assertEquals(str(res[3]["lastLogon"]), "z")
757 # Search by negated disjunction of remote attributes
758 res = self.ldb.search(expression="(!(|(badPwdCount=x)(lastLogon=x)))",
759 attrs=["dnsHostName", "lastLogon"])
760 self.assertEquals(len(res), 5)
761 res = sorted(res, key=attrgetter('dn'))
762 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=C"))
763 self.assertTrue(not "dnsHostName" in res[0])
764 self.assertEquals(str(res[0]["lastLogon"]), "z")
765 self.assertEquals(str(res[1].dn), self.samba4.dn("cn=Y"))
766 self.assertEquals(str(res[1]["dnsHostName"]), "y")
767 self.assertEquals(str(res[1]["lastLogon"]), "y")
768 self.assertEquals(str(res[2].dn), self.samba4.dn("cn=Z"))
769 self.assertEquals(str(res[2]["dnsHostName"]), "z")
770 self.assertEquals(str(res[2]["lastLogon"]), "z")
772 # Search by negated disjunction of local and remote attribute
773 res = self.ldb.search(expression="(!(|(revision=x)(lastLogon=y)))",
774 attrs=["dnsHostName", "lastLogon"])
775 self.assertEquals(len(res), 5)
776 res = sorted(res, key=attrgetter('dn'))
777 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=A"))
778 self.assertTrue(not "dnsHostName" in res[0])
779 self.assertEquals(str(res[0]["lastLogon"]), "x")
780 self.assertEquals(str(res[1].dn), self.samba4.dn("cn=C"))
781 self.assertTrue(not "dnsHostName" in res[1])
782 self.assertEquals(str(res[1]["lastLogon"]), "z")
783 self.assertEquals(str(res[2].dn), self.samba4.dn("cn=Z"))
784 self.assertEquals(str(res[2]["dnsHostName"]), "z")
785 self.assertEquals(str(res[2]["lastLogon"]), "z")
787 # Search by complex parse tree
788 res = self.ldb.search(expression="(|(&(revision=x)(dnsHostName=x))(!(&(description=x)(nextRid=y)))(badPwdCount=y))", attrs=["dnsHostName", "lastLogon"])
789 self.assertEquals(len(res), 7)
790 res = sorted(res, key=attrgetter('dn'))
791 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=A"))
792 self.assertTrue(not "dnsHostName" in res[0])
793 self.assertEquals(str(res[0]["lastLogon"]), "x")
794 self.assertEquals(str(res[1].dn), self.samba4.dn("cn=B"))
795 self.assertTrue(not "dnsHostName" in res[1])
796 self.assertEquals(str(res[1]["lastLogon"]), "y")
797 self.assertEquals(str(res[2].dn), self.samba4.dn("cn=C"))
798 self.assertTrue(not "dnsHostName" in res[2])
799 self.assertEquals(str(res[2]["lastLogon"]), "z")
800 self.assertEquals(str(res[3].dn), self.samba4.dn("cn=X"))
801 self.assertEquals(str(res[3]["dnsHostName"]), "x")
802 self.assertEquals(str(res[3]["lastLogon"]), "x")
803 self.assertEquals(str(res[4].dn), self.samba4.dn("cn=Z"))
804 self.assertEquals(str(res[4]["dnsHostName"]), "z")
805 self.assertEquals(str(res[4]["lastLogon"]), "z")
807 # Clean up
808 dns = [self.samba4.dn("cn=%s" % n) for n in ["A","B","C","X","Y","Z"]]
809 for dn in dns:
810 self.ldb.delete(dn)
812 def test_map_modify_local(self):
813 """Modification of local records."""
814 # Add local record
815 dn = "cn=test,dc=idealx,dc=org"
816 self.ldb.add({"dn": dn,
817 "cn": "test",
818 "foo": "bar",
819 "revision": "1",
820 "description": "test"})
821 # Check it's there
822 attrs = ["foo", "revision", "description"]
823 res = self.ldb.search(dn, scope=SCOPE_BASE, attrs=attrs)
824 self.assertEquals(len(res), 1)
825 self.assertEquals(str(res[0].dn), dn)
826 self.assertEquals(str(res[0]["foo"]), "bar")
827 self.assertEquals(str(res[0]["revision"]), "1")
828 self.assertEquals(str(res[0]["description"]), "test")
829 # Check it's not in the local db
830 res = self.samba4.db.search(expression="(cn=test)",
831 scope=SCOPE_DEFAULT, attrs=attrs)
832 self.assertEquals(len(res), 0)
833 # Check it's not in the remote db
834 res = self.samba3.db.search(expression="(cn=test)",
835 scope=SCOPE_DEFAULT, attrs=attrs)
836 self.assertEquals(len(res), 0)
838 # Modify local record
839 ldif = """
840 dn: """ + dn + """
841 replace: foo
842 foo: baz
843 replace: description
844 description: foo
846 self.ldb.modify_ldif(ldif)
847 # Check in local db
848 res = self.ldb.search(dn, scope=SCOPE_BASE, attrs=attrs)
849 self.assertEquals(len(res), 1)
850 self.assertEquals(str(res[0].dn), dn)
851 self.assertEquals(str(res[0]["foo"]), "baz")
852 self.assertEquals(str(res[0]["revision"]), "1")
853 self.assertEquals(str(res[0]["description"]), "foo")
855 # Rename local record
856 dn2 = "cn=toast,dc=idealx,dc=org"
857 self.ldb.rename(dn, dn2)
858 # Check in local db
859 res = self.ldb.search(dn2, scope=SCOPE_BASE, attrs=attrs)
860 self.assertEquals(len(res), 1)
861 self.assertEquals(str(res[0].dn), dn2)
862 self.assertEquals(str(res[0]["foo"]), "baz")
863 self.assertEquals(str(res[0]["revision"]), "1")
864 self.assertEquals(str(res[0]["description"]), "foo")
866 # Delete local record
867 self.ldb.delete(dn2)
868 # Check it's gone
869 res = self.ldb.search(dn2, scope=SCOPE_BASE)
870 self.assertEquals(len(res), 0)
872 def test_map_modify_remote_remote(self):
873 """Modification of remote data of remote records"""
874 # Add remote record
875 dn = self.samba4.dn("cn=test")
876 dn2 = self.samba3.dn("cn=test")
877 self.samba3.db.add({"dn": dn2,
878 "cn": "test",
879 "description": "foo",
880 "sambaBadPasswordCount": "3",
881 "sambaNextRid": "1001"})
882 # Check it's there
883 res = self.samba3.db.search(dn2, scope=SCOPE_BASE,
884 attrs=["description", "sambaBadPasswordCount", "sambaNextRid"])
885 self.assertEquals(len(res), 1)
886 self.assertEquals(str(res[0].dn), dn2)
887 self.assertEquals(str(res[0]["description"]), "foo")
888 self.assertEquals(str(res[0]["sambaBadPasswordCount"]), "3")
889 self.assertEquals(str(res[0]["sambaNextRid"]), "1001")
890 # Check in mapped db
891 attrs = ["description", "badPwdCount", "nextRid"]
892 res = self.ldb.search(dn, scope=SCOPE_BASE, attrs=attrs, expression="")
893 self.assertEquals(len(res), 1)
894 self.assertEquals(str(res[0].dn), dn)
895 self.assertEquals(str(res[0]["description"]), "foo")
896 self.assertEquals(str(res[0]["badPwdCount"]), "3")
897 self.assertEquals(str(res[0]["nextRid"]), "1001")
898 # Check in local db
899 res = self.samba4.db.search(dn, scope=SCOPE_BASE, attrs=attrs)
900 self.assertEquals(len(res), 0)
902 # Modify remote data of remote record
903 ldif = """
904 dn: """ + dn + """
905 replace: description
906 description: test
907 replace: badPwdCount
908 badPwdCount: 4
910 self.ldb.modify_ldif(ldif)
911 # Check in mapped db
912 res = self.ldb.search(dn, scope=SCOPE_BASE,
913 attrs=["description", "badPwdCount", "nextRid"])
914 self.assertEquals(len(res), 1)
915 self.assertEquals(str(res[0].dn), dn)
916 self.assertEquals(str(res[0]["description"]), "test")
917 self.assertEquals(str(res[0]["badPwdCount"]), "4")
918 self.assertEquals(str(res[0]["nextRid"]), "1001")
919 # Check in remote db
920 res = self.samba3.db.search(dn2, scope=SCOPE_BASE,
921 attrs=["description", "sambaBadPasswordCount", "sambaNextRid"])
922 self.assertEquals(len(res), 1)
923 self.assertEquals(str(res[0].dn), dn2)
924 self.assertEquals(str(res[0]["description"]), "test")
925 self.assertEquals(str(res[0]["sambaBadPasswordCount"]), "4")
926 self.assertEquals(str(res[0]["sambaNextRid"]), "1001")
928 # Rename remote record
929 dn2 = self.samba4.dn("cn=toast")
930 self.ldb.rename(dn, dn2)
931 # Check in mapped db
932 dn = dn2
933 res = self.ldb.search(dn, scope=SCOPE_BASE,
934 attrs=["description", "badPwdCount", "nextRid"])
935 self.assertEquals(len(res), 1)
936 self.assertEquals(str(res[0].dn), dn)
937 self.assertEquals(str(res[0]["description"]), "test")
938 self.assertEquals(str(res[0]["badPwdCount"]), "4")
939 self.assertEquals(str(res[0]["nextRid"]), "1001")
940 # Check in remote db
941 dn2 = self.samba3.dn("cn=toast")
942 res = self.samba3.db.search(dn2, scope=SCOPE_BASE,
943 attrs=["description", "sambaBadPasswordCount", "sambaNextRid"])
944 self.assertEquals(len(res), 1)
945 self.assertEquals(str(res[0].dn), dn2)
946 self.assertEquals(str(res[0]["description"]), "test")
947 self.assertEquals(str(res[0]["sambaBadPasswordCount"]), "4")
948 self.assertEquals(str(res[0]["sambaNextRid"]), "1001")
950 # Delete remote record
951 self.ldb.delete(dn)
952 # Check in mapped db that it's removed
953 res = self.ldb.search(dn, scope=SCOPE_BASE)
954 self.assertEquals(len(res), 0)
955 # Check in remote db
956 res = self.samba3.db.search(dn2, scope=SCOPE_BASE)
957 self.assertEquals(len(res), 0)
959 def test_map_modify_remote_local(self):
960 """Modification of local data of remote records"""
961 # Add remote record (same as before)
962 dn = self.samba4.dn("cn=test")
963 dn2 = self.samba3.dn("cn=test")
964 self.samba3.db.add({"dn": dn2,
965 "cn": "test",
966 "description": "foo",
967 "sambaBadPasswordCount": "3",
968 "sambaNextRid": "1001"})
970 # Modify local data of remote record
971 ldif = """
972 dn: """ + dn + """
973 add: revision
974 revision: 1
975 replace: description
976 description: test
979 self.ldb.modify_ldif(ldif)
980 # Check in mapped db
981 attrs = ["revision", "description"]
982 res = self.ldb.search(dn, scope=SCOPE_BASE, attrs=attrs)
983 self.assertEquals(len(res), 1)
984 self.assertEquals(str(res[0].dn), dn)
985 self.assertEquals(str(res[0]["description"]), "test")
986 self.assertEquals(str(res[0]["revision"]), "1")
987 # Check in remote db
988 res = self.samba3.db.search(dn2, scope=SCOPE_BASE, attrs=attrs)
989 self.assertEquals(len(res), 1)
990 self.assertEquals(str(res[0].dn), dn2)
991 self.assertEquals(str(res[0]["description"]), "test")
992 self.assertTrue(not "revision" in res[0])
993 # Check in local db
994 res = self.samba4.db.search(dn, scope=SCOPE_BASE, attrs=attrs)
995 self.assertEquals(len(res), 1)
996 self.assertEquals(str(res[0].dn), dn)
997 self.assertTrue(not "description" in res[0])
998 self.assertEquals(str(res[0]["revision"]), "1")
1000 # Delete (newly) split record
1001 self.ldb.delete(dn)
1003 def test_map_modify_split(self):
1004 """Testing modification of split records"""
1005 # Add split record
1006 dn = self.samba4.dn("cn=test")
1007 dn2 = self.samba3.dn("cn=test")
1008 self.ldb.add({
1009 "dn": dn,
1010 "cn": "test",
1011 "description": "foo",
1012 "badPwdCount": "3",
1013 "nextRid": "1001",
1014 "revision": "1"})
1015 # Check it's there
1016 attrs = ["description", "badPwdCount", "nextRid", "revision"]
1017 res = self.ldb.search(dn, scope=SCOPE_BASE, attrs=attrs)
1018 self.assertEquals(len(res), 1)
1019 self.assertEquals(str(res[0].dn), dn)
1020 self.assertEquals(str(res[0]["description"]), "foo")
1021 self.assertEquals(str(res[0]["badPwdCount"]), "3")
1022 self.assertEquals(str(res[0]["nextRid"]), "1001")
1023 self.assertEquals(str(res[0]["revision"]), "1")
1024 # Check in local db
1025 res = self.samba4.db.search(dn, scope=SCOPE_BASE, attrs=attrs)
1026 self.assertEquals(len(res), 1)
1027 self.assertEquals(str(res[0].dn), dn)
1028 self.assertTrue(not "description" in res[0])
1029 self.assertTrue(not "badPwdCount" in res[0])
1030 self.assertTrue(not "nextRid" in res[0])
1031 self.assertEquals(str(res[0]["revision"]), "1")
1032 # Check in remote db
1033 attrs = ["description", "sambaBadPasswordCount", "sambaNextRid",
1034 "revision"]
1035 res = self.samba3.db.search(dn2, scope=SCOPE_BASE, attrs=attrs)
1036 self.assertEquals(len(res), 1)
1037 self.assertEquals(str(res[0].dn), dn2)
1038 self.assertEquals(str(res[0]["description"]), "foo")
1039 self.assertEquals(str(res[0]["sambaBadPasswordCount"]), "3")
1040 self.assertEquals(str(res[0]["sambaNextRid"]), "1001")
1041 self.assertTrue(not "revision" in res[0])
1043 # Modify of split record
1044 ldif = """
1045 dn: """ + dn + """
1046 replace: description
1047 description: test
1048 replace: badPwdCount
1049 badPwdCount: 4
1050 replace: revision
1051 revision: 2
1053 self.ldb.modify_ldif(ldif)
1054 # Check in mapped db
1055 attrs = ["description", "badPwdCount", "nextRid", "revision"]
1056 res = self.ldb.search(dn, scope=SCOPE_BASE, attrs=attrs)
1057 self.assertEquals(len(res), 1)
1058 self.assertEquals(str(res[0].dn), dn)
1059 self.assertEquals(str(res[0]["description"]), "test")
1060 self.assertEquals(str(res[0]["badPwdCount"]), "4")
1061 self.assertEquals(str(res[0]["nextRid"]), "1001")
1062 self.assertEquals(str(res[0]["revision"]), "2")
1063 # Check in local db
1064 res = self.samba4.db.search(dn, scope=SCOPE_BASE, attrs=attrs)
1065 self.assertEquals(len(res), 1)
1066 self.assertEquals(str(res[0].dn), dn)
1067 self.assertTrue(not "description" in res[0])
1068 self.assertTrue(not "badPwdCount" in res[0])
1069 self.assertTrue(not "nextRid" in res[0])
1070 self.assertEquals(str(res[0]["revision"]), "2")
1071 # Check in remote db
1072 attrs = ["description", "sambaBadPasswordCount", "sambaNextRid",
1073 "revision"]
1074 res = self.samba3.db.search(dn2, scope=SCOPE_BASE, attrs=attrs)
1075 self.assertEquals(len(res), 1)
1076 self.assertEquals(str(res[0].dn), dn2)
1077 self.assertEquals(str(res[0]["description"]), "test")
1078 self.assertEquals(str(res[0]["sambaBadPasswordCount"]), "4")
1079 self.assertEquals(str(res[0]["sambaNextRid"]), "1001")
1080 self.assertTrue(not "revision" in res[0])
1082 # Rename split record
1083 dn2 = self.samba4.dn("cn=toast")
1084 self.ldb.rename(dn, dn2)
1085 # Check in mapped db
1086 dn = dn2
1087 attrs = ["description", "badPwdCount", "nextRid", "revision"]
1088 res = self.ldb.search(dn, scope=SCOPE_BASE, attrs=attrs)
1089 self.assertEquals(len(res), 1)
1090 self.assertEquals(str(res[0].dn), dn)
1091 self.assertEquals(str(res[0]["description"]), "test")
1092 self.assertEquals(str(res[0]["badPwdCount"]), "4")
1093 self.assertEquals(str(res[0]["nextRid"]), "1001")
1094 self.assertEquals(str(res[0]["revision"]), "2")
1095 # Check in local db
1096 res = self.samba4.db.search(dn, scope=SCOPE_BASE, attrs=attrs)
1097 self.assertEquals(len(res), 1)
1098 self.assertEquals(str(res[0].dn), dn)
1099 self.assertTrue(not "description" in res[0])
1100 self.assertTrue(not "badPwdCount" in res[0])
1101 self.assertTrue(not "nextRid" in res[0])
1102 self.assertEquals(str(res[0]["revision"]), "2")
1103 # Check in remote db
1104 dn2 = self.samba3.dn("cn=toast")
1105 res = self.samba3.db.search(dn2, scope=SCOPE_BASE,
1106 attrs=["description", "sambaBadPasswordCount", "sambaNextRid",
1107 "revision"])
1108 self.assertEquals(len(res), 1)
1109 self.assertEquals(str(res[0].dn), dn2)
1110 self.assertEquals(str(res[0]["description"]), "test")
1111 self.assertEquals(str(res[0]["sambaBadPasswordCount"]), "4")
1112 self.assertEquals(str(res[0]["sambaNextRid"]), "1001")
1113 self.assertTrue(not "revision" in res[0])
1115 # Delete split record
1116 self.ldb.delete(dn)
1117 # Check in mapped db
1118 res = self.ldb.search(dn, scope=SCOPE_BASE)
1119 self.assertEquals(len(res), 0)
1120 # Check in local db
1121 res = self.samba4.db.search(dn, scope=SCOPE_BASE)
1122 self.assertEquals(len(res), 0)
1123 # Check in remote db
1124 res = self.samba3.db.search(dn2, scope=SCOPE_BASE)
1125 self.assertEquals(len(res), 0)