1 # Unix SMB/CIFS implementation.
2 # Copyright (C) Jelmer Vernooij <jelmer@samba.org> 2005-2008
3 # Copyright (C) Martin Kuehl <mkhl@samba.org> 2006
5 # This is a Python port of the original in testprogs/ejs/samba3sam.js
7 # This program is free software; you can redistribute it and/or modify
8 # it under the terms of the GNU General Public License as published by
9 # the Free Software Foundation; either version 3 of the License, or
10 # (at your option) any later version.
12 # This program is distributed in the hope that it will be useful,
13 # but WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 # GNU General Public License for more details.
17 # You should have received a copy of the GNU General Public License
18 # along with this program. If not, see <http://www.gnu.org/licenses/>.
21 """Tests for the samba3sam LDB module, which maps Samba3 LDAP to AD LDAP."""
25 from ldb
import SCOPE_DEFAULT
, SCOPE_BASE
26 from samba
import Ldb
, substitute_var
27 from samba
.tests
import TestCaseInTempDir
, env_loadparm
28 import samba
.dcerpc
.security
30 from samba
.auth
import system_session
31 from operator
import attrgetter
34 def read_datafile(filename
):
35 paths
= [ "../../../../../testdata/samba3",
36 "../../../../testdata/samba3" ]
38 datadir
= os
.path
.join(os
.path
.dirname(__file__
), p
)
39 if os
.path
.exists(datadir
):
41 return open(os
.path
.join(datadir
, filename
), 'r').read()
43 def ldb_debug(l
, text
):
47 class MapBaseTestCase(TestCaseInTempDir
):
48 """Base test case for mapping tests."""
50 def setup_modules(self
, ldb
, s3
, s4
):
51 ldb
.add({"dn": "@MAP=samba3sam",
53 "@TO": "sambaDomainName=TESTS," + s3
.basedn
})
55 ldb
.add({"dn": "@MODULES",
56 "@LIST": "rootdse,paged_results,server_sort,asq,samldb,password_hash,operational,objectguid,rdn_name,samba3sam,samba3sid,show_deleted,partition"})
58 ldb
.add({"dn": "@PARTITION",
59 "partition": ["%s" % (s4
.basedn_casefold
),
60 "%s" % (s3
.basedn_casefold
)],
61 "replicateEntries": ["@ATTRIBUTES", "@INDEXLIST"],
65 self
.lp
= env_loadparm()
66 self
.lp
.set("workgroup", "TESTS")
67 self
.lp
.set("netbios name", "TESTS")
68 super(MapBaseTestCase
, self
).setUp()
70 def make_dn(basedn
, rdn
):
71 return "%s,sambaDomainName=TESTS,%s" % (rdn
, basedn
)
73 def make_s4dn(basedn
, rdn
):
74 return "%s,%s" % (rdn
, basedn
)
76 self
.ldbfile
= os
.path
.join(self
.tempdir
, "test.ldb")
77 self
.ldburl
= "tdb://" + self
.ldbfile
79 tempdir
= self
.tempdir
82 """Simple helper class that contains data for a specific SAM
85 def __init__(self
, basedn
, dn
, lp
):
86 self
.db
= Ldb(lp
=lp
, session_info
=system_session())
87 self
.db
.set_opaque("skip_allocate_sids", "true");
89 self
.basedn_casefold
= ldb
.Dn(self
.db
, basedn
).get_casefold()
90 self
.substvars
= {"BASEDN": self
.basedn
}
91 self
.file = os
.path
.join(tempdir
, "%s.ldb" % self
.basedn_casefold
)
92 self
.url
= "tdb://" + self
.file
96 return self
._dn
(self
.basedn
, rdn
)
99 return self
.db
.connect(self
.url
)
101 def setup_data(self
, path
):
102 self
.add_ldif(read_datafile(path
))
104 def subst(self
, text
):
105 return substitute_var(text
, self
.substvars
)
107 def add_ldif(self
, ldif
):
108 self
.db
.add_ldif(self
.subst(ldif
))
110 def modify_ldif(self
, ldif
):
111 self
.db
.modify_ldif(self
.subst(ldif
))
113 self
.samba4
= Target("dc=vernstok,dc=nl", make_s4dn
, self
.lp
)
114 self
.samba3
= Target("cn=Samba3Sam", make_dn
, self
.lp
)
116 self
.samba3
.connect()
117 self
.samba4
.connect()
120 os
.unlink(self
.ldbfile
)
121 os
.unlink(self
.samba3
.file)
122 os
.unlink(self
.samba4
.file)
123 pdir
= "%s.d" % self
.ldbfile
124 mdata
= os
.path
.join(pdir
, "metadata.tdb")
125 if os
.path
.exists(mdata
):
128 super(MapBaseTestCase
, self
).tearDown()
130 def assertSidEquals(self
, text
, ndr_sid
):
131 sid_obj1
= samba
.ndr
.ndr_unpack(samba
.dcerpc
.security
.dom_sid
,
133 sid_obj2
= samba
.dcerpc
.security
.dom_sid(text
)
134 self
.assertEquals(sid_obj1
, sid_obj2
)
137 class Samba3SamTestCase(MapBaseTestCase
):
140 super(Samba3SamTestCase
, self
).setUp()
141 ldb
= Ldb(self
.ldburl
, lp
=self
.lp
, session_info
=system_session())
142 ldb
.set_opaque("skip_allocate_sids", "true");
143 self
.samba3
.setup_data("samba3.ldif")
144 ldif
= read_datafile("provision_samba3sam.ldif")
145 ldb
.add_ldif(self
.samba4
.subst(ldif
))
146 self
.setup_modules(ldb
, self
.samba3
, self
.samba4
)
148 self
.ldb
= Ldb(self
.ldburl
, lp
=self
.lp
, session_info
=system_session())
149 self
.ldb
.set_opaque("skip_allocate_sids", "true");
151 def test_search_non_mapped(self
):
152 """Looking up by non-mapped attribute"""
153 msg
= self
.ldb
.search(expression
="(cn=Administrator)")
154 self
.assertEquals(len(msg
), 1)
155 self
.assertEquals(msg
[0]["cn"], "Administrator")
157 def test_search_non_mapped(self
):
158 """Looking up by mapped attribute"""
159 msg
= self
.ldb
.search(expression
="(name=Backup Operators)")
160 self
.assertEquals(len(msg
), 1)
161 self
.assertEquals(str(msg
[0]["name"]), "Backup Operators")
163 def test_old_name_of_renamed(self
):
164 """Looking up by old name of renamed attribute"""
165 msg
= self
.ldb
.search(expression
="(displayName=Backup Operators)")
166 self
.assertEquals(len(msg
), 0)
168 def test_mapped_containing_sid(self
):
169 """Looking up mapped entry containing SID"""
170 msg
= self
.ldb
.search(expression
="(cn=Replicator)")
171 self
.assertEquals(len(msg
), 1)
172 self
.assertEquals(str(msg
[0].dn
),
173 "cn=Replicator,ou=Groups,dc=vernstok,dc=nl")
174 self
.assertTrue("objectSid" in msg
[0])
175 self
.assertSidEquals("S-1-5-21-4231626423-2410014848-2360679739-1052",
177 oc
= set(msg
[0]["objectClass"])
178 self
.assertEquals(oc
, set(["group"]))
180 def test_search_by_objclass(self
):
181 """Looking up by objectClass"""
182 msg
= self
.ldb
.search(expression
="(|(objectClass=user)(cn=Administrator))")
183 self
.assertEquals(set([str(m
.dn
) for m
in msg
]),
184 set(["unixName=Administrator,ou=Users,dc=vernstok,dc=nl",
185 "unixName=nobody,ou=Users,dc=vernstok,dc=nl"]))
187 def test_s3sam_modify(self
):
188 # Adding a record that will be fallbacked
194 "showInAdvancedViewOnly": "TRUE"})
196 # Checking for existence of record (local)
197 # TODO: This record must be searched in the local database, which is
198 # currently only supported for base searches
199 # msg = ldb.search(expression="(cn=Foo)", ['foo','blah','cn','showInAdvancedViewOnly')]
200 # TODO: Actually, this version should work as well but doesn't...
203 msg
= self
.ldb
.search(expression
="(cn=Foo)", base
="cn=Foo",
205 attrs
=['foo','blah','cn','showInAdvancedViewOnly'])
206 self
.assertEquals(len(msg
), 1)
207 self
.assertEquals(str(msg
[0]["showInAdvancedViewOnly"]), "TRUE")
208 self
.assertEquals(str(msg
[0]["foo"]), "bar")
209 self
.assertEquals(str(msg
[0]["blah"]), "Blie")
211 # Adding record that will be mapped
212 self
.ldb
.add({"dn": "cn=Niemand,cn=Users,dc=vernstok,dc=nl",
213 "objectClass": "user",
215 "sambaUnicodePwd": "geheim",
218 # Checking for existence of record (remote)
219 msg
= self
.ldb
.search(expression
="(unixName=bin)",
220 attrs
=['unixName','cn','dn', 'sambaUnicodePwd'])
221 self
.assertEquals(len(msg
), 1)
222 self
.assertEquals(str(msg
[0]["cn"]), "Niemand")
223 self
.assertEquals(str(msg
[0]["sambaUnicodePwd"]), "geheim")
225 # Checking for existence of record (local && remote)
226 msg
= self
.ldb
.search(expression
="(&(unixName=bin)(sambaUnicodePwd=geheim))",
227 attrs
=['unixName','cn','dn', 'sambaUnicodePwd'])
228 self
.assertEquals(len(msg
), 1) # TODO: should check with more records
229 self
.assertEquals(str(msg
[0]["cn"]), "Niemand")
230 self
.assertEquals(str(msg
[0]["unixName"]), "bin")
231 self
.assertEquals(str(msg
[0]["sambaUnicodePwd"]), "geheim")
233 # Checking for existence of record (local || remote)
234 msg
= self
.ldb
.search(expression
="(|(unixName=bin)(sambaUnicodePwd=geheim))",
235 attrs
=['unixName','cn','dn', 'sambaUnicodePwd'])
236 #print "got %d replies" % len(msg)
237 self
.assertEquals(len(msg
), 1) # TODO: should check with more records
238 self
.assertEquals(str(msg
[0]["cn"]), "Niemand")
239 self
.assertEquals(str(msg
[0]["unixName"]), "bin")
240 self
.assertEquals(str(msg
[0]["sambaUnicodePwd"]), "geheim")
242 # Checking for data in destination database
243 msg
= self
.samba3
.db
.search(expression
="(cn=Niemand)")
244 self
.assertTrue(len(msg
) >= 1)
245 self
.assertEquals(str(msg
[0]["sambaSID"]),
246 "S-1-5-21-4231626423-2410014848-2360679739-2001")
247 self
.assertEquals(str(msg
[0]["displayName"]), "Niemand")
249 # Adding attribute...
250 self
.ldb
.modify_ldif("""
251 dn: cn=Niemand,cn=Users,dc=vernstok,dc=nl
257 # Checking whether changes are still there...
258 msg
= self
.ldb
.search(expression
="(cn=Niemand)")
259 self
.assertTrue(len(msg
) >= 1)
260 self
.assertEquals(str(msg
[0]["cn"]), "Niemand")
261 self
.assertEquals(str(msg
[0]["description"]), "Blah")
263 # Modifying attribute...
264 self
.ldb
.modify_ldif("""
265 dn: cn=Niemand,cn=Users,dc=vernstok,dc=nl
271 # Checking whether changes are still there...
272 msg
= self
.ldb
.search(expression
="(cn=Niemand)")
273 self
.assertTrue(len(msg
) >= 1)
274 self
.assertEquals(str(msg
[0]["description"]), "Blie")
276 # Deleting attribute...
277 self
.ldb
.modify_ldif("""
278 dn: cn=Niemand,cn=Users,dc=vernstok,dc=nl
283 # Checking whether changes are no longer there...
284 msg
= self
.ldb
.search(expression
="(cn=Niemand)")
285 self
.assertTrue(len(msg
) >= 1)
286 self
.assertTrue(not "description" in msg
[0])
289 self
.ldb
.rename("cn=Niemand,cn=Users,dc=vernstok,dc=nl",
290 "cn=Niemand2,cn=Users,dc=vernstok,dc=nl")
292 # Checking whether DN has changed...
293 msg
= self
.ldb
.search(expression
="(cn=Niemand2)")
294 self
.assertEquals(len(msg
), 1)
295 self
.assertEquals(str(msg
[0].dn
),
296 "cn=Niemand2,cn=Users,dc=vernstok,dc=nl")
299 self
.ldb
.delete("cn=Niemand2,cn=Users,dc=vernstok,dc=nl")
301 # Checking whether record is gone...
302 msg
= self
.ldb
.search(expression
="(cn=Niemand2)")
303 self
.assertEquals(len(msg
), 0)
306 class MapTestCase(MapBaseTestCase
):
309 super(MapTestCase
, self
).setUp()
310 ldb
= Ldb(self
.ldburl
, lp
=self
.lp
, session_info
=system_session())
311 ldb
.set_opaque("skip_allocate_sids", "true");
312 ldif
= read_datafile("provision_samba3sam.ldif")
313 ldb
.add_ldif(self
.samba4
.subst(ldif
))
314 self
.setup_modules(ldb
, self
.samba3
, self
.samba4
)
316 self
.ldb
= Ldb(self
.ldburl
, lp
=self
.lp
, session_info
=system_session())
317 self
.ldb
.set_opaque("skip_allocate_sids", "true");
319 def test_map_search(self
):
320 """Running search tests on mapped data."""
322 "dn": "sambaDomainName=TESTS," + self
.samba3
.basedn
,
323 "objectclass": ["sambaDomain", "top"],
324 "sambaSID": "S-1-5-21-4231626423-2410014848-2360679739",
325 "sambaNextRid": "2000",
326 "sambaDomainName": "TESTS"
329 # Add a set of split records
330 self
.ldb
.add_ldif("""
331 dn: """+ self
.samba4
.dn("cn=Domain Users") + """
334 objectSid: S-1-5-21-4231626423-2410014848-2360679739-513
337 # Add a set of split records
338 self
.ldb
.add_ldif("""
339 dn: """+ self
.samba4
.dn("cn=X") + """
348 objectSid: S-1-5-21-4231626423-2410014848-2360679739-1052
352 "dn": self
.samba4
.dn("cn=Y"),
353 "objectClass": "top",
363 "dn": self
.samba4
.dn("cn=Z"),
364 "objectClass": "top",
373 # Add a set of remote records
376 "dn": self
.samba3
.dn("cn=A"),
377 "objectClass": "posixAccount",
380 "sambaBadPasswordCount": "x",
381 "sambaLogonTime": "x",
383 "sambaSID": "S-1-5-21-4231626423-2410014848-2360679739-1052",
384 "sambaPrimaryGroupSID": "S-1-5-21-4231626423-2410014848-2360679739-512"})
387 "dn": self
.samba3
.dn("cn=B"),
388 "objectClass": "top",
391 "sambaBadPasswordCount": "x",
392 "sambaLogonTime": "y",
396 "dn": self
.samba3
.dn("cn=C"),
397 "objectClass": "top",
400 "sambaBadPasswordCount": "y",
401 "sambaLogonTime": "z",
404 # Testing search by DN
406 # Search remote record by local DN
407 dn
= self
.samba4
.dn("cn=A")
408 res
= self
.ldb
.search(dn
, scope
=SCOPE_BASE
,
409 attrs
=["dnsHostName", "lastLogon"])
410 self
.assertEquals(len(res
), 1)
411 self
.assertEquals(str(res
[0].dn
), dn
)
412 self
.assertTrue(not "dnsHostName" in res
[0])
413 self
.assertEquals(str(res
[0]["lastLogon"]), "x")
415 # Search remote record by remote DN
416 dn
= self
.samba3
.dn("cn=A")
417 res
= self
.samba3
.db
.search(dn
, scope
=SCOPE_BASE
,
418 attrs
=["dnsHostName", "lastLogon", "sambaLogonTime"])
419 self
.assertEquals(len(res
), 1)
420 self
.assertEquals(str(res
[0].dn
), dn
)
421 self
.assertTrue(not "dnsHostName" in res
[0])
422 self
.assertTrue(not "lastLogon" in res
[0])
423 self
.assertEquals(str(res
[0]["sambaLogonTime"]), "x")
425 # Search split record by local DN
426 dn
= self
.samba4
.dn("cn=X")
427 res
= self
.ldb
.search(dn
, scope
=SCOPE_BASE
,
428 attrs
=["dnsHostName", "lastLogon"])
429 self
.assertEquals(len(res
), 1)
430 self
.assertEquals(str(res
[0].dn
), dn
)
431 self
.assertEquals(str(res
[0]["dnsHostName"]), "x")
432 self
.assertEquals(str(res
[0]["lastLogon"]), "x")
434 # Search split record by remote DN
435 dn
= self
.samba3
.dn("cn=X")
436 res
= self
.samba3
.db
.search(dn
, scope
=SCOPE_BASE
,
437 attrs
=["dnsHostName", "lastLogon", "sambaLogonTime"])
438 self
.assertEquals(len(res
), 1)
439 self
.assertEquals(str(res
[0].dn
), dn
)
440 self
.assertTrue(not "dnsHostName" in res
[0])
441 self
.assertTrue(not "lastLogon" in res
[0])
442 self
.assertEquals(str(res
[0]["sambaLogonTime"]), "x")
444 # Testing search by attribute
446 # Search by ignored attribute
447 res
= self
.ldb
.search(expression
="(revision=x)", scope
=SCOPE_DEFAULT
,
448 attrs
=["dnsHostName", "lastLogon"])
449 self
.assertEquals(len(res
), 2)
450 res
= sorted(res
, key
=attrgetter('dn'))
451 self
.assertEquals(str(res
[0].dn
), self
.samba4
.dn("cn=X"))
452 self
.assertEquals(str(res
[0]["dnsHostName"]), "x")
453 self
.assertEquals(str(res
[0]["lastLogon"]), "x")
454 self
.assertEquals(str(res
[1].dn
), self
.samba4
.dn("cn=Y"))
455 self
.assertEquals(str(res
[1]["dnsHostName"]), "y")
456 self
.assertEquals(str(res
[1]["lastLogon"]), "y")
458 # Search by kept attribute
459 res
= self
.ldb
.search(expression
="(description=y)",
460 scope
=SCOPE_DEFAULT
, attrs
=["dnsHostName", "lastLogon"])
461 self
.assertEquals(len(res
), 2)
462 res
= sorted(res
, key
=attrgetter('dn'))
463 self
.assertEquals(str(res
[0].dn
), self
.samba4
.dn("cn=C"))
464 self
.assertTrue(not "dnsHostName" in res
[0])
465 self
.assertEquals(str(res
[0]["lastLogon"]), "z")
466 self
.assertEquals(str(res
[1].dn
), self
.samba4
.dn("cn=Z"))
467 self
.assertEquals(str(res
[1]["dnsHostName"]), "z")
468 self
.assertEquals(str(res
[1]["lastLogon"]), "z")
470 # Search by renamed attribute
471 res
= self
.ldb
.search(expression
="(badPwdCount=x)", scope
=SCOPE_DEFAULT
,
472 attrs
=["dnsHostName", "lastLogon"])
473 self
.assertEquals(len(res
), 2)
474 res
= sorted(res
, key
=attrgetter('dn'))
475 self
.assertEquals(str(res
[0].dn
), self
.samba4
.dn("cn=A"))
476 self
.assertTrue(not "dnsHostName" in res
[0])
477 self
.assertEquals(str(res
[0]["lastLogon"]), "x")
478 self
.assertEquals(str(res
[1].dn
), self
.samba4
.dn("cn=B"))
479 self
.assertTrue(not "dnsHostName" in res
[1])
480 self
.assertEquals(str(res
[1]["lastLogon"]), "y")
482 # Search by converted attribute
484 # Using the SID directly in the parse tree leads to conversion
485 # errors, letting the search fail with no results.
486 #res = self.ldb.search("(objectSid=S-1-5-21-4231626423-2410014848-2360679739-1052)", scope=SCOPE_DEFAULT, attrs)
487 res
= self
.ldb
.search(expression
="(objectSid=*)", base
=None, scope
=SCOPE_DEFAULT
, attrs
=["dnsHostName", "lastLogon", "objectSid"])
488 self
.assertEquals(len(res
), 4)
489 res
= sorted(res
, key
=attrgetter('dn'))
490 self
.assertEquals(str(res
[1].dn
), self
.samba4
.dn("cn=X"))
491 self
.assertEquals(str(res
[1]["dnsHostName"]), "x")
492 self
.assertEquals(str(res
[1]["lastLogon"]), "x")
493 self
.assertSidEquals("S-1-5-21-4231626423-2410014848-2360679739-1052",
495 self
.assertTrue("objectSid" in res
[1])
496 self
.assertEquals(str(res
[0].dn
), self
.samba4
.dn("cn=A"))
497 self
.assertTrue(not "dnsHostName" in res
[0])
498 self
.assertEquals(str(res
[0]["lastLogon"]), "x")
499 self
.assertSidEquals("S-1-5-21-4231626423-2410014848-2360679739-1052",
501 self
.assertTrue("objectSid" in res
[0])
503 # Search by generated attribute
504 # In most cases, this even works when the mapping is missing
505 # a `convert_operator' by enumerating the remote db.
506 res
= self
.ldb
.search(expression
="(primaryGroupID=512)",
507 attrs
=["dnsHostName", "lastLogon", "primaryGroupID"])
508 self
.assertEquals(len(res
), 1)
509 self
.assertEquals(str(res
[0].dn
), self
.samba4
.dn("cn=A"))
510 self
.assertTrue(not "dnsHostName" in res
[0])
511 self
.assertEquals(str(res
[0]["lastLogon"]), "x")
512 self
.assertEquals(str(res
[0]["primaryGroupID"]), "512")
514 # Note that Xs "objectSid" seems to be fine in the previous search for
516 #res = ldb.search(expression="(primaryGroupID=*)", NULL, ldb. SCOPE_DEFAULT, attrs)
517 #print len(res) + " results found"
518 #for i in range(len(res)):
519 # for (obj in res[i]) {
520 # print obj + ": " + res[i][obj]
525 # Search by remote name of renamed attribute */
526 res
= self
.ldb
.search(expression
="(sambaBadPasswordCount=*)",
527 attrs
=["dnsHostName", "lastLogon"])
528 self
.assertEquals(len(res
), 0)
530 # Search by objectClass
531 attrs
= ["dnsHostName", "lastLogon", "objectClass"]
532 res
= self
.ldb
.search(expression
="(objectClass=user)", attrs
=attrs
)
533 self
.assertEquals(len(res
), 2)
534 res
= sorted(res
, key
=attrgetter('dn'))
535 self
.assertEquals(str(res
[0].dn
), self
.samba4
.dn("cn=A"))
536 self
.assertTrue(not "dnsHostName" in res
[0])
537 self
.assertEquals(str(res
[0]["lastLogon"]), "x")
538 self
.assertEquals(str(res
[0]["objectClass"][0]), "user")
539 self
.assertEquals(str(res
[1].dn
), self
.samba4
.dn("cn=X"))
540 self
.assertEquals(str(res
[1]["dnsHostName"]), "x")
541 self
.assertEquals(str(res
[1]["lastLogon"]), "x")
542 self
.assertEquals(str(res
[1]["objectClass"][0]), "user")
544 # Prove that the objectClass is actually used for the search
545 res
= self
.ldb
.search(expression
="(|(objectClass=user)(badPwdCount=x))",
547 self
.assertEquals(len(res
), 3)
548 res
= sorted(res
, key
=attrgetter('dn'))
549 self
.assertEquals(str(res
[0].dn
), self
.samba4
.dn("cn=A"))
550 self
.assertTrue(not "dnsHostName" in res
[0])
551 self
.assertEquals(str(res
[0]["lastLogon"]), "x")
552 self
.assertEquals(res
[0]["objectClass"][0], "user")
553 self
.assertEquals(str(res
[1].dn
), self
.samba4
.dn("cn=B"))
554 self
.assertTrue(not "dnsHostName" in res
[1])
555 self
.assertEquals(str(res
[1]["lastLogon"]), "y")
556 self
.assertEquals(set(res
[1]["objectClass"]), set(["top"]))
557 self
.assertEquals(str(res
[2].dn
), self
.samba4
.dn("cn=X"))
558 self
.assertEquals(str(res
[2]["dnsHostName"]), "x")
559 self
.assertEquals(str(res
[2]["lastLogon"]), "x")
560 self
.assertEquals(str(res
[2]["objectClass"][0]), "user")
562 # Testing search by parse tree
564 # Search by conjunction of local attributes
565 res
= self
.ldb
.search(expression
="(&(codePage=x)(revision=x))",
566 attrs
=["dnsHostName", "lastLogon"])
567 self
.assertEquals(len(res
), 2)
568 res
= sorted(res
, key
=attrgetter('dn'))
569 self
.assertEquals(str(res
[0].dn
), self
.samba4
.dn("cn=X"))
570 self
.assertEquals(str(res
[0]["dnsHostName"]), "x")
571 self
.assertEquals(str(res
[0]["lastLogon"]), "x")
572 self
.assertEquals(str(res
[1].dn
), self
.samba4
.dn("cn=Y"))
573 self
.assertEquals(str(res
[1]["dnsHostName"]), "y")
574 self
.assertEquals(str(res
[1]["lastLogon"]), "y")
576 # Search by conjunction of remote attributes
577 res
= self
.ldb
.search(expression
="(&(lastLogon=x)(description=x))",
578 attrs
=["dnsHostName", "lastLogon"])
579 self
.assertEquals(len(res
), 2)
580 res
= sorted(res
, key
=attrgetter('dn'))
581 self
.assertEquals(str(res
[0].dn
), self
.samba4
.dn("cn=A"))
582 self
.assertTrue(not "dnsHostName" in res
[0])
583 self
.assertEquals(str(res
[0]["lastLogon"]), "x")
584 self
.assertEquals(str(res
[1].dn
), self
.samba4
.dn("cn=X"))
585 self
.assertEquals(str(res
[1]["dnsHostName"]), "x")
586 self
.assertEquals(str(res
[1]["lastLogon"]), "x")
588 # Search by conjunction of local and remote attribute
589 res
= self
.ldb
.search(expression
="(&(codePage=x)(description=x))",
590 attrs
=["dnsHostName", "lastLogon"])
591 self
.assertEquals(len(res
), 2)
592 res
= sorted(res
, key
=attrgetter('dn'))
593 self
.assertEquals(str(res
[0].dn
), self
.samba4
.dn("cn=X"))
594 self
.assertEquals(str(res
[0]["dnsHostName"]), "x")
595 self
.assertEquals(str(res
[0]["lastLogon"]), "x")
596 self
.assertEquals(str(res
[1].dn
), self
.samba4
.dn("cn=Y"))
597 self
.assertEquals(str(res
[1]["dnsHostName"]), "y")
598 self
.assertEquals(str(res
[1]["lastLogon"]), "y")
600 # Search by conjunction of local and remote attribute w/o match
601 attrs
= ["dnsHostName", "lastLogon"]
602 res
= self
.ldb
.search(expression
="(&(codePage=x)(nextRid=x))",
604 self
.assertEquals(len(res
), 0)
605 res
= self
.ldb
.search(expression
="(&(revision=x)(lastLogon=z))",
607 self
.assertEquals(len(res
), 0)
609 # Search by disjunction of local attributes
610 res
= self
.ldb
.search(expression
="(|(revision=x)(dnsHostName=x))",
611 attrs
=["dnsHostName", "lastLogon"])
612 self
.assertEquals(len(res
), 2)
613 res
= sorted(res
, key
=attrgetter('dn'))
614 self
.assertEquals(str(res
[0].dn
), self
.samba4
.dn("cn=X"))
615 self
.assertEquals(str(res
[0]["dnsHostName"]), "x")
616 self
.assertEquals(str(res
[0]["lastLogon"]), "x")
617 self
.assertEquals(str(res
[1].dn
), self
.samba4
.dn("cn=Y"))
618 self
.assertEquals(str(res
[1]["dnsHostName"]), "y")
619 self
.assertEquals(str(res
[1]["lastLogon"]), "y")
621 # Search by disjunction of remote attributes
622 res
= self
.ldb
.search(expression
="(|(badPwdCount=x)(lastLogon=x))",
623 attrs
=["dnsHostName", "lastLogon"])
624 self
.assertEquals(len(res
), 3)
625 res
= sorted(res
, key
=attrgetter('dn'))
626 self
.assertEquals(str(res
[0].dn
), self
.samba4
.dn("cn=A"))
627 self
.assertFalse("dnsHostName" in res
[0])
628 self
.assertEquals(str(res
[0]["lastLogon"]), "x")
629 self
.assertEquals(str(res
[1].dn
), self
.samba4
.dn("cn=B"))
630 self
.assertFalse("dnsHostName" in res
[1])
631 self
.assertEquals(str(res
[1]["lastLogon"]), "y")
632 self
.assertEquals(str(res
[2].dn
), self
.samba4
.dn("cn=X"))
633 self
.assertEquals(str(res
[2]["dnsHostName"]), "x")
634 self
.assertEquals(str(res
[2]["lastLogon"]), "x")
636 # Search by disjunction of local and remote attribute
637 res
= self
.ldb
.search(expression
="(|(revision=x)(lastLogon=y))",
638 attrs
=["dnsHostName", "lastLogon"])
639 self
.assertEquals(len(res
), 3)
640 res
= sorted(res
, key
=attrgetter('dn'))
641 self
.assertEquals(str(res
[0].dn
), self
.samba4
.dn("cn=B"))
642 self
.assertFalse("dnsHostName" in res
[0])
643 self
.assertEquals(str(res
[0]["lastLogon"]), "y")
644 self
.assertEquals(str(res
[1].dn
), self
.samba4
.dn("cn=X"))
645 self
.assertEquals(str(res
[1]["dnsHostName"]), "x")
646 self
.assertEquals(str(res
[1]["lastLogon"]), "x")
647 self
.assertEquals(str(res
[2].dn
), self
.samba4
.dn("cn=Y"))
648 self
.assertEquals(str(res
[2]["dnsHostName"]), "y")
649 self
.assertEquals(str(res
[2]["lastLogon"]), "y")
651 # Search by disjunction of local and remote attribute w/o match
652 res
= self
.ldb
.search(expression
="(|(codePage=y)(nextRid=z))",
653 attrs
=["dnsHostName", "lastLogon"])
654 self
.assertEquals(len(res
), 0)
656 # Search by negated local attribute
657 res
= self
.ldb
.search(expression
="(!(revision=x))",
658 attrs
=["dnsHostName", "lastLogon"])
659 self
.assertEquals(len(res
), 6)
660 res
= sorted(res
, key
=attrgetter('dn'))
661 self
.assertEquals(str(res
[0].dn
), self
.samba4
.dn("cn=A"))
662 self
.assertTrue(not "dnsHostName" in res
[0])
663 self
.assertEquals(str(res
[0]["lastLogon"]), "x")
664 self
.assertEquals(str(res
[1].dn
), self
.samba4
.dn("cn=B"))
665 self
.assertTrue(not "dnsHostName" in res
[1])
666 self
.assertEquals(str(res
[1]["lastLogon"]), "y")
667 self
.assertEquals(str(res
[2].dn
), self
.samba4
.dn("cn=C"))
668 self
.assertTrue(not "dnsHostName" in res
[2])
669 self
.assertEquals(str(res
[2]["lastLogon"]), "z")
670 self
.assertEquals(str(res
[3].dn
), self
.samba4
.dn("cn=Z"))
671 self
.assertEquals(str(res
[3]["dnsHostName"]), "z")
672 self
.assertEquals(str(res
[3]["lastLogon"]), "z")
674 # Search by negated remote attribute
675 res
= self
.ldb
.search(expression
="(!(description=x))",
676 attrs
=["dnsHostName", "lastLogon"])
677 self
.assertEquals(len(res
), 4)
678 res
= sorted(res
, key
=attrgetter('dn'))
679 self
.assertEquals(str(res
[0].dn
), self
.samba4
.dn("cn=C"))
680 self
.assertTrue(not "dnsHostName" in res
[0])
681 self
.assertEquals(str(res
[0]["lastLogon"]), "z")
682 self
.assertEquals(str(res
[1].dn
), self
.samba4
.dn("cn=Z"))
683 self
.assertEquals(str(res
[1]["dnsHostName"]), "z")
684 self
.assertEquals(str(res
[1]["lastLogon"]), "z")
686 # Search by negated conjunction of local attributes
687 res
= self
.ldb
.search(expression
="(!(&(codePage=x)(revision=x)))",
688 attrs
=["dnsHostName", "lastLogon"])
689 self
.assertEquals(len(res
), 6)
690 res
= sorted(res
, key
=attrgetter('dn'))
691 self
.assertEquals(str(res
[0].dn
), self
.samba4
.dn("cn=A"))
692 self
.assertTrue(not "dnsHostName" in res
[0])
693 self
.assertEquals(str(res
[0]["lastLogon"]), "x")
694 self
.assertEquals(str(res
[1].dn
), self
.samba4
.dn("cn=B"))
695 self
.assertTrue(not "dnsHostName" in res
[1])
696 self
.assertEquals(str(res
[1]["lastLogon"]), "y")
697 self
.assertEquals(str(res
[2].dn
), self
.samba4
.dn("cn=C"))
698 self
.assertTrue(not "dnsHostName" in res
[2])
699 self
.assertEquals(str(res
[2]["lastLogon"]), "z")
700 self
.assertEquals(str(res
[3].dn
), self
.samba4
.dn("cn=Z"))
701 self
.assertEquals(str(res
[3]["dnsHostName"]), "z")
702 self
.assertEquals(str(res
[3]["lastLogon"]), "z")
704 # Search by negated conjunction of remote attributes
705 res
= self
.ldb
.search(expression
="(!(&(lastLogon=x)(description=x)))",
706 attrs
=["dnsHostName", "lastLogon"])
707 self
.assertEquals(len(res
), 6)
708 res
= sorted(res
, key
=attrgetter('dn'))
709 self
.assertEquals(str(res
[0].dn
), self
.samba4
.dn("cn=B"))
710 self
.assertTrue(not "dnsHostName" in res
[0])
711 self
.assertEquals(str(res
[0]["lastLogon"]), "y")
712 self
.assertEquals(str(res
[1].dn
), self
.samba4
.dn("cn=C"))
713 self
.assertTrue(not "dnsHostName" in res
[1])
714 self
.assertEquals(str(res
[1]["lastLogon"]), "z")
715 self
.assertEquals(str(res
[2].dn
), self
.samba4
.dn("cn=Y"))
716 self
.assertEquals(str(res
[2]["dnsHostName"]), "y")
717 self
.assertEquals(str(res
[2]["lastLogon"]), "y")
718 self
.assertEquals(str(res
[3].dn
), self
.samba4
.dn("cn=Z"))
719 self
.assertEquals(str(res
[3]["dnsHostName"]), "z")
720 self
.assertEquals(str(res
[3]["lastLogon"]), "z")
722 # Search by negated conjunction of local and remote attribute
723 res
= self
.ldb
.search(expression
="(!(&(codePage=x)(description=x)))",
724 attrs
=["dnsHostName", "lastLogon"])
725 self
.assertEquals(len(res
), 6)
726 res
= sorted(res
, key
=attrgetter('dn'))
727 self
.assertEquals(str(res
[0].dn
), self
.samba4
.dn("cn=A"))
728 self
.assertTrue(not "dnsHostName" in res
[0])
729 self
.assertEquals(str(res
[0]["lastLogon"]), "x")
730 self
.assertEquals(str(res
[1].dn
), self
.samba4
.dn("cn=B"))
731 self
.assertTrue(not "dnsHostName" in res
[1])
732 self
.assertEquals(str(res
[1]["lastLogon"]), "y")
733 self
.assertEquals(str(res
[2].dn
), self
.samba4
.dn("cn=C"))
734 self
.assertTrue(not "dnsHostName" in res
[2])
735 self
.assertEquals(str(res
[2]["lastLogon"]), "z")
736 self
.assertEquals(str(res
[3].dn
), self
.samba4
.dn("cn=Z"))
737 self
.assertEquals(str(res
[3]["dnsHostName"]), "z")
738 self
.assertEquals(str(res
[3]["lastLogon"]), "z")
740 # Search by negated disjunction of local attributes
741 res
= self
.ldb
.search(expression
="(!(|(revision=x)(dnsHostName=x)))",
742 attrs
=["dnsHostName", "lastLogon"])
743 res
= sorted(res
, key
=attrgetter('dn'))
744 self
.assertEquals(str(res
[0].dn
), self
.samba4
.dn("cn=A"))
745 self
.assertTrue(not "dnsHostName" in res
[0])
746 self
.assertEquals(str(res
[0]["lastLogon"]), "x")
747 self
.assertEquals(str(res
[1].dn
), self
.samba4
.dn("cn=B"))
748 self
.assertTrue(not "dnsHostName" in res
[1])
749 self
.assertEquals(str(res
[1]["lastLogon"]), "y")
750 self
.assertEquals(str(res
[2].dn
), self
.samba4
.dn("cn=C"))
751 self
.assertTrue(not "dnsHostName" in res
[2])
752 self
.assertEquals(str(res
[2]["lastLogon"]), "z")
753 self
.assertEquals(str(res
[3].dn
), self
.samba4
.dn("cn=Z"))
754 self
.assertEquals(str(res
[3]["dnsHostName"]), "z")
755 self
.assertEquals(str(res
[3]["lastLogon"]), "z")
757 # Search by negated disjunction of remote attributes
758 res
= self
.ldb
.search(expression
="(!(|(badPwdCount=x)(lastLogon=x)))",
759 attrs
=["dnsHostName", "lastLogon"])
760 self
.assertEquals(len(res
), 5)
761 res
= sorted(res
, key
=attrgetter('dn'))
762 self
.assertEquals(str(res
[0].dn
), self
.samba4
.dn("cn=C"))
763 self
.assertTrue(not "dnsHostName" in res
[0])
764 self
.assertEquals(str(res
[0]["lastLogon"]), "z")
765 self
.assertEquals(str(res
[1].dn
), self
.samba4
.dn("cn=Y"))
766 self
.assertEquals(str(res
[1]["dnsHostName"]), "y")
767 self
.assertEquals(str(res
[1]["lastLogon"]), "y")
768 self
.assertEquals(str(res
[2].dn
), self
.samba4
.dn("cn=Z"))
769 self
.assertEquals(str(res
[2]["dnsHostName"]), "z")
770 self
.assertEquals(str(res
[2]["lastLogon"]), "z")
772 # Search by negated disjunction of local and remote attribute
773 res
= self
.ldb
.search(expression
="(!(|(revision=x)(lastLogon=y)))",
774 attrs
=["dnsHostName", "lastLogon"])
775 self
.assertEquals(len(res
), 5)
776 res
= sorted(res
, key
=attrgetter('dn'))
777 self
.assertEquals(str(res
[0].dn
), self
.samba4
.dn("cn=A"))
778 self
.assertTrue(not "dnsHostName" in res
[0])
779 self
.assertEquals(str(res
[0]["lastLogon"]), "x")
780 self
.assertEquals(str(res
[1].dn
), self
.samba4
.dn("cn=C"))
781 self
.assertTrue(not "dnsHostName" in res
[1])
782 self
.assertEquals(str(res
[1]["lastLogon"]), "z")
783 self
.assertEquals(str(res
[2].dn
), self
.samba4
.dn("cn=Z"))
784 self
.assertEquals(str(res
[2]["dnsHostName"]), "z")
785 self
.assertEquals(str(res
[2]["lastLogon"]), "z")
787 # Search by complex parse tree
788 res
= self
.ldb
.search(expression
="(|(&(revision=x)(dnsHostName=x))(!(&(description=x)(nextRid=y)))(badPwdCount=y))", attrs
=["dnsHostName", "lastLogon"])
789 self
.assertEquals(len(res
), 7)
790 res
= sorted(res
, key
=attrgetter('dn'))
791 self
.assertEquals(str(res
[0].dn
), self
.samba4
.dn("cn=A"))
792 self
.assertTrue(not "dnsHostName" in res
[0])
793 self
.assertEquals(str(res
[0]["lastLogon"]), "x")
794 self
.assertEquals(str(res
[1].dn
), self
.samba4
.dn("cn=B"))
795 self
.assertTrue(not "dnsHostName" in res
[1])
796 self
.assertEquals(str(res
[1]["lastLogon"]), "y")
797 self
.assertEquals(str(res
[2].dn
), self
.samba4
.dn("cn=C"))
798 self
.assertTrue(not "dnsHostName" in res
[2])
799 self
.assertEquals(str(res
[2]["lastLogon"]), "z")
800 self
.assertEquals(str(res
[3].dn
), self
.samba4
.dn("cn=X"))
801 self
.assertEquals(str(res
[3]["dnsHostName"]), "x")
802 self
.assertEquals(str(res
[3]["lastLogon"]), "x")
803 self
.assertEquals(str(res
[4].dn
), self
.samba4
.dn("cn=Z"))
804 self
.assertEquals(str(res
[4]["dnsHostName"]), "z")
805 self
.assertEquals(str(res
[4]["lastLogon"]), "z")
808 dns
= [self
.samba4
.dn("cn=%s" % n
) for n
in ["A","B","C","X","Y","Z"]]
812 def test_map_modify_local(self
):
813 """Modification of local records."""
815 dn
= "cn=test,dc=idealx,dc=org"
816 self
.ldb
.add({"dn": dn
,
820 "description": "test"})
822 attrs
= ["foo", "revision", "description"]
823 res
= self
.ldb
.search(dn
, scope
=SCOPE_BASE
, attrs
=attrs
)
824 self
.assertEquals(len(res
), 1)
825 self
.assertEquals(str(res
[0].dn
), dn
)
826 self
.assertEquals(str(res
[0]["foo"]), "bar")
827 self
.assertEquals(str(res
[0]["revision"]), "1")
828 self
.assertEquals(str(res
[0]["description"]), "test")
829 # Check it's not in the local db
830 res
= self
.samba4
.db
.search(expression
="(cn=test)",
831 scope
=SCOPE_DEFAULT
, attrs
=attrs
)
832 self
.assertEquals(len(res
), 0)
833 # Check it's not in the remote db
834 res
= self
.samba3
.db
.search(expression
="(cn=test)",
835 scope
=SCOPE_DEFAULT
, attrs
=attrs
)
836 self
.assertEquals(len(res
), 0)
838 # Modify local record
846 self
.ldb
.modify_ldif(ldif
)
848 res
= self
.ldb
.search(dn
, scope
=SCOPE_BASE
, attrs
=attrs
)
849 self
.assertEquals(len(res
), 1)
850 self
.assertEquals(str(res
[0].dn
), dn
)
851 self
.assertEquals(str(res
[0]["foo"]), "baz")
852 self
.assertEquals(str(res
[0]["revision"]), "1")
853 self
.assertEquals(str(res
[0]["description"]), "foo")
855 # Rename local record
856 dn2
= "cn=toast,dc=idealx,dc=org"
857 self
.ldb
.rename(dn
, dn2
)
859 res
= self
.ldb
.search(dn2
, scope
=SCOPE_BASE
, attrs
=attrs
)
860 self
.assertEquals(len(res
), 1)
861 self
.assertEquals(str(res
[0].dn
), dn2
)
862 self
.assertEquals(str(res
[0]["foo"]), "baz")
863 self
.assertEquals(str(res
[0]["revision"]), "1")
864 self
.assertEquals(str(res
[0]["description"]), "foo")
866 # Delete local record
869 res
= self
.ldb
.search(dn2
, scope
=SCOPE_BASE
)
870 self
.assertEquals(len(res
), 0)
872 def test_map_modify_remote_remote(self
):
873 """Modification of remote data of remote records"""
875 dn
= self
.samba4
.dn("cn=test")
876 dn2
= self
.samba3
.dn("cn=test")
877 self
.samba3
.db
.add({"dn": dn2
,
879 "description": "foo",
880 "sambaBadPasswordCount": "3",
881 "sambaNextRid": "1001"})
883 res
= self
.samba3
.db
.search(dn2
, scope
=SCOPE_BASE
,
884 attrs
=["description", "sambaBadPasswordCount", "sambaNextRid"])
885 self
.assertEquals(len(res
), 1)
886 self
.assertEquals(str(res
[0].dn
), dn2
)
887 self
.assertEquals(str(res
[0]["description"]), "foo")
888 self
.assertEquals(str(res
[0]["sambaBadPasswordCount"]), "3")
889 self
.assertEquals(str(res
[0]["sambaNextRid"]), "1001")
891 attrs
= ["description", "badPwdCount", "nextRid"]
892 res
= self
.ldb
.search(dn
, scope
=SCOPE_BASE
, attrs
=attrs
, expression
="")
893 self
.assertEquals(len(res
), 1)
894 self
.assertEquals(str(res
[0].dn
), dn
)
895 self
.assertEquals(str(res
[0]["description"]), "foo")
896 self
.assertEquals(str(res
[0]["badPwdCount"]), "3")
897 self
.assertEquals(str(res
[0]["nextRid"]), "1001")
899 res
= self
.samba4
.db
.search(dn
, scope
=SCOPE_BASE
, attrs
=attrs
)
900 self
.assertEquals(len(res
), 0)
902 # Modify remote data of remote record
910 self
.ldb
.modify_ldif(ldif
)
912 res
= self
.ldb
.search(dn
, scope
=SCOPE_BASE
,
913 attrs
=["description", "badPwdCount", "nextRid"])
914 self
.assertEquals(len(res
), 1)
915 self
.assertEquals(str(res
[0].dn
), dn
)
916 self
.assertEquals(str(res
[0]["description"]), "test")
917 self
.assertEquals(str(res
[0]["badPwdCount"]), "4")
918 self
.assertEquals(str(res
[0]["nextRid"]), "1001")
920 res
= self
.samba3
.db
.search(dn2
, scope
=SCOPE_BASE
,
921 attrs
=["description", "sambaBadPasswordCount", "sambaNextRid"])
922 self
.assertEquals(len(res
), 1)
923 self
.assertEquals(str(res
[0].dn
), dn2
)
924 self
.assertEquals(str(res
[0]["description"]), "test")
925 self
.assertEquals(str(res
[0]["sambaBadPasswordCount"]), "4")
926 self
.assertEquals(str(res
[0]["sambaNextRid"]), "1001")
928 # Rename remote record
929 dn2
= self
.samba4
.dn("cn=toast")
930 self
.ldb
.rename(dn
, dn2
)
933 res
= self
.ldb
.search(dn
, scope
=SCOPE_BASE
,
934 attrs
=["description", "badPwdCount", "nextRid"])
935 self
.assertEquals(len(res
), 1)
936 self
.assertEquals(str(res
[0].dn
), dn
)
937 self
.assertEquals(str(res
[0]["description"]), "test")
938 self
.assertEquals(str(res
[0]["badPwdCount"]), "4")
939 self
.assertEquals(str(res
[0]["nextRid"]), "1001")
941 dn2
= self
.samba3
.dn("cn=toast")
942 res
= self
.samba3
.db
.search(dn2
, scope
=SCOPE_BASE
,
943 attrs
=["description", "sambaBadPasswordCount", "sambaNextRid"])
944 self
.assertEquals(len(res
), 1)
945 self
.assertEquals(str(res
[0].dn
), dn2
)
946 self
.assertEquals(str(res
[0]["description"]), "test")
947 self
.assertEquals(str(res
[0]["sambaBadPasswordCount"]), "4")
948 self
.assertEquals(str(res
[0]["sambaNextRid"]), "1001")
950 # Delete remote record
952 # Check in mapped db that it's removed
953 res
= self
.ldb
.search(dn
, scope
=SCOPE_BASE
)
954 self
.assertEquals(len(res
), 0)
956 res
= self
.samba3
.db
.search(dn2
, scope
=SCOPE_BASE
)
957 self
.assertEquals(len(res
), 0)
959 def test_map_modify_remote_local(self
):
960 """Modification of local data of remote records"""
961 # Add remote record (same as before)
962 dn
= self
.samba4
.dn("cn=test")
963 dn2
= self
.samba3
.dn("cn=test")
964 self
.samba3
.db
.add({"dn": dn2
,
966 "description": "foo",
967 "sambaBadPasswordCount": "3",
968 "sambaNextRid": "1001"})
970 # Modify local data of remote record
979 self
.ldb
.modify_ldif(ldif
)
981 attrs
= ["revision", "description"]
982 res
= self
.ldb
.search(dn
, scope
=SCOPE_BASE
, attrs
=attrs
)
983 self
.assertEquals(len(res
), 1)
984 self
.assertEquals(str(res
[0].dn
), dn
)
985 self
.assertEquals(str(res
[0]["description"]), "test")
986 self
.assertEquals(str(res
[0]["revision"]), "1")
988 res
= self
.samba3
.db
.search(dn2
, scope
=SCOPE_BASE
, attrs
=attrs
)
989 self
.assertEquals(len(res
), 1)
990 self
.assertEquals(str(res
[0].dn
), dn2
)
991 self
.assertEquals(str(res
[0]["description"]), "test")
992 self
.assertTrue(not "revision" in res
[0])
994 res
= self
.samba4
.db
.search(dn
, scope
=SCOPE_BASE
, attrs
=attrs
)
995 self
.assertEquals(len(res
), 1)
996 self
.assertEquals(str(res
[0].dn
), dn
)
997 self
.assertTrue(not "description" in res
[0])
998 self
.assertEquals(str(res
[0]["revision"]), "1")
1000 # Delete (newly) split record
1003 def test_map_modify_split(self
):
1004 """Testing modification of split records"""
1006 dn
= self
.samba4
.dn("cn=test")
1007 dn2
= self
.samba3
.dn("cn=test")
1011 "description": "foo",
1016 attrs
= ["description", "badPwdCount", "nextRid", "revision"]
1017 res
= self
.ldb
.search(dn
, scope
=SCOPE_BASE
, attrs
=attrs
)
1018 self
.assertEquals(len(res
), 1)
1019 self
.assertEquals(str(res
[0].dn
), dn
)
1020 self
.assertEquals(str(res
[0]["description"]), "foo")
1021 self
.assertEquals(str(res
[0]["badPwdCount"]), "3")
1022 self
.assertEquals(str(res
[0]["nextRid"]), "1001")
1023 self
.assertEquals(str(res
[0]["revision"]), "1")
1025 res
= self
.samba4
.db
.search(dn
, scope
=SCOPE_BASE
, attrs
=attrs
)
1026 self
.assertEquals(len(res
), 1)
1027 self
.assertEquals(str(res
[0].dn
), dn
)
1028 self
.assertTrue(not "description" in res
[0])
1029 self
.assertTrue(not "badPwdCount" in res
[0])
1030 self
.assertTrue(not "nextRid" in res
[0])
1031 self
.assertEquals(str(res
[0]["revision"]), "1")
1032 # Check in remote db
1033 attrs
= ["description", "sambaBadPasswordCount", "sambaNextRid",
1035 res
= self
.samba3
.db
.search(dn2
, scope
=SCOPE_BASE
, attrs
=attrs
)
1036 self
.assertEquals(len(res
), 1)
1037 self
.assertEquals(str(res
[0].dn
), dn2
)
1038 self
.assertEquals(str(res
[0]["description"]), "foo")
1039 self
.assertEquals(str(res
[0]["sambaBadPasswordCount"]), "3")
1040 self
.assertEquals(str(res
[0]["sambaNextRid"]), "1001")
1041 self
.assertTrue(not "revision" in res
[0])
1043 # Modify of split record
1046 replace: description
1048 replace: badPwdCount
1053 self
.ldb
.modify_ldif(ldif
)
1054 # Check in mapped db
1055 attrs
= ["description", "badPwdCount", "nextRid", "revision"]
1056 res
= self
.ldb
.search(dn
, scope
=SCOPE_BASE
, attrs
=attrs
)
1057 self
.assertEquals(len(res
), 1)
1058 self
.assertEquals(str(res
[0].dn
), dn
)
1059 self
.assertEquals(str(res
[0]["description"]), "test")
1060 self
.assertEquals(str(res
[0]["badPwdCount"]), "4")
1061 self
.assertEquals(str(res
[0]["nextRid"]), "1001")
1062 self
.assertEquals(str(res
[0]["revision"]), "2")
1064 res
= self
.samba4
.db
.search(dn
, scope
=SCOPE_BASE
, attrs
=attrs
)
1065 self
.assertEquals(len(res
), 1)
1066 self
.assertEquals(str(res
[0].dn
), dn
)
1067 self
.assertTrue(not "description" in res
[0])
1068 self
.assertTrue(not "badPwdCount" in res
[0])
1069 self
.assertTrue(not "nextRid" in res
[0])
1070 self
.assertEquals(str(res
[0]["revision"]), "2")
1071 # Check in remote db
1072 attrs
= ["description", "sambaBadPasswordCount", "sambaNextRid",
1074 res
= self
.samba3
.db
.search(dn2
, scope
=SCOPE_BASE
, attrs
=attrs
)
1075 self
.assertEquals(len(res
), 1)
1076 self
.assertEquals(str(res
[0].dn
), dn2
)
1077 self
.assertEquals(str(res
[0]["description"]), "test")
1078 self
.assertEquals(str(res
[0]["sambaBadPasswordCount"]), "4")
1079 self
.assertEquals(str(res
[0]["sambaNextRid"]), "1001")
1080 self
.assertTrue(not "revision" in res
[0])
1082 # Rename split record
1083 dn2
= self
.samba4
.dn("cn=toast")
1084 self
.ldb
.rename(dn
, dn2
)
1085 # Check in mapped db
1087 attrs
= ["description", "badPwdCount", "nextRid", "revision"]
1088 res
= self
.ldb
.search(dn
, scope
=SCOPE_BASE
, attrs
=attrs
)
1089 self
.assertEquals(len(res
), 1)
1090 self
.assertEquals(str(res
[0].dn
), dn
)
1091 self
.assertEquals(str(res
[0]["description"]), "test")
1092 self
.assertEquals(str(res
[0]["badPwdCount"]), "4")
1093 self
.assertEquals(str(res
[0]["nextRid"]), "1001")
1094 self
.assertEquals(str(res
[0]["revision"]), "2")
1096 res
= self
.samba4
.db
.search(dn
, scope
=SCOPE_BASE
, attrs
=attrs
)
1097 self
.assertEquals(len(res
), 1)
1098 self
.assertEquals(str(res
[0].dn
), dn
)
1099 self
.assertTrue(not "description" in res
[0])
1100 self
.assertTrue(not "badPwdCount" in res
[0])
1101 self
.assertTrue(not "nextRid" in res
[0])
1102 self
.assertEquals(str(res
[0]["revision"]), "2")
1103 # Check in remote db
1104 dn2
= self
.samba3
.dn("cn=toast")
1105 res
= self
.samba3
.db
.search(dn2
, scope
=SCOPE_BASE
,
1106 attrs
=["description", "sambaBadPasswordCount", "sambaNextRid",
1108 self
.assertEquals(len(res
), 1)
1109 self
.assertEquals(str(res
[0].dn
), dn2
)
1110 self
.assertEquals(str(res
[0]["description"]), "test")
1111 self
.assertEquals(str(res
[0]["sambaBadPasswordCount"]), "4")
1112 self
.assertEquals(str(res
[0]["sambaNextRid"]), "1001")
1113 self
.assertTrue(not "revision" in res
[0])
1115 # Delete split record
1117 # Check in mapped db
1118 res
= self
.ldb
.search(dn
, scope
=SCOPE_BASE
)
1119 self
.assertEquals(len(res
), 0)
1121 res
= self
.samba4
.db
.search(dn
, scope
=SCOPE_BASE
)
1122 self
.assertEquals(len(res
), 0)
1123 # Check in remote db
1124 res
= self
.samba3
.db
.search(dn2
, scope
=SCOPE_BASE
)
1125 self
.assertEquals(len(res
), 0)