2 # -*- coding: utf-8 -*-
3 # Originally based on ./sam.py
9 sys
.path
.insert(0, "bin/python")
11 from samba
.tests
.subunitrun
import SubunitOptions
, TestProgram
13 import samba
.getopt
as options
15 from samba
.auth
import system_session
17 from samba
.samdb
import SamDB
18 from samba
.dcerpc
import misc
20 parser
= optparse
.OptionParser("linked_attributes.py [options] <host>")
21 sambaopts
= options
.SambaOptions(parser
)
22 parser
.add_option_group(sambaopts
)
23 parser
.add_option_group(options
.VersionOptions(parser
))
24 # use command line creds if available
25 credopts
= options
.CredentialsOptions(parser
)
26 parser
.add_option_group(credopts
)
27 subunitopts
= SubunitOptions(parser
)
28 parser
.add_option_group(subunitopts
)
30 parser
.add_option('--delete-in-setup', action
='store_true',
31 help="cleanup in setup")
33 parser
.add_option('--no-cleanup', action
='store_true',
34 help="don't cleanup in teardown")
36 parser
.add_option('--no-reveal-internals', action
='store_true',
37 help="Only use windows compatible ldap controls")
39 opts
, args
= parser
.parse_args()
47 lp
= sambaopts
.get_loadparm()
48 creds
= credopts
.get_credentials(lp
)
51 class LATestException(Exception):
55 class LATests(samba
.tests
.TestCase
):
58 super(LATests
, self
).setUp()
59 self
.samdb
= SamDB(host
, credentials
=creds
,
60 session_info
=system_session(lp
), lp
=lp
)
62 self
.base_dn
= self
.samdb
.domain_dn()
63 self
.ou
= "OU=la,%s" % self
.base_dn
64 if opts
.delete_in_setup
:
66 self
.samdb
.delete(self
.ou
, ['tree_delete:1'])
67 except ldb
.LdbError
, e
:
68 print "tried deleting %s, got error %s" % (self
.ou
, e
)
69 self
.samdb
.add({'objectclass': 'organizationalUnit',
73 super(LATests
, self
).tearDown()
74 if not opts
.no_cleanup
:
75 self
.samdb
.delete(self
.ou
, ['tree_delete:1'])
77 def add_object(self
, cn
, objectclass
, more_attrs
={}):
78 dn
= "CN=%s,%s" % (cn
, self
.ou
)
80 'objectclass': objectclass
,
82 attrs
.update(more_attrs
)
87 def add_objects(self
, n
, objectclass
, prefix
=None, more_attrs
={}):
92 dns
.append(self
.add_object("%s%d" % (prefix
, i
+ 1),
94 more_attrs
=more_attrs
))
97 def add_linked_attribute(self
, src
, dest
, attr
='member',
100 m
.dn
= ldb
.Dn(self
.samdb
, src
)
101 m
[attr
] = ldb
.MessageElement(dest
, ldb
.FLAG_MOD_ADD
, attr
)
102 self
.samdb
.modify(m
, controls
=controls
)
104 def remove_linked_attribute(self
, src
, dest
, attr
='member',
107 m
.dn
= ldb
.Dn(self
.samdb
, src
)
108 m
[attr
] = ldb
.MessageElement(dest
, ldb
.FLAG_MOD_DELETE
, attr
)
109 self
.samdb
.modify(m
, controls
=controls
)
111 def replace_linked_attribute(self
, src
, dest
, attr
='member',
114 m
.dn
= ldb
.Dn(self
.samdb
, src
)
115 m
[attr
] = ldb
.MessageElement(dest
, ldb
.FLAG_MOD_REPLACE
, attr
)
116 self
.samdb
.modify(m
, controls
=controls
)
118 def attr_search(self
, obj
, attr
, scope
=ldb
.SCOPE_BASE
, **controls
):
119 if opts
.no_reveal_internals
:
120 if 'reveal_internals' in controls
:
121 del controls
['reveal_internals']
123 controls
= ['%s:%d' % (k
, int(v
)) for k
, v
in controls
.items()]
125 res
= self
.samdb
.search(obj
,
131 def assert_links(self
, obj
, expected
, attr
, msg
='', **kwargs
):
132 res
= self
.attr_search(obj
, attr
, **kwargs
)
134 if len(expected
) == 0:
136 self
.fail("found attr '%s' in %s" % (attr
, res
[0]))
140 results
= list([x
[attr
] for x
in res
][0])
142 self
.fail("missing attr '%s' on %s" % (attr
, obj
))
144 expected
= sorted(expected
)
145 results
= sorted(results
)
147 if expected
!= results
:
149 print "expected %s" % expected
150 print "received %s" % results
152 self
.assertEqual(results
, expected
)
154 def assert_back_links(self
, obj
, expected
, attr
='memberOf', **kwargs
):
155 self
.assert_links(obj
, expected
, attr
=attr
,
156 msg
='back links do not match', **kwargs
)
158 def assert_forward_links(self
, obj
, expected
, attr
='member', **kwargs
):
159 self
.assert_links(obj
, expected
, attr
=attr
,
160 msg
='forward links do not match', **kwargs
)
162 def get_object_guid(self
, dn
):
163 res
= self
.samdb
.search(dn
,
164 scope
=ldb
.SCOPE_BASE
,
165 attrs
=['objectGUID'])
166 return str(misc
.GUID(res
[0]['objectGUID'][0]))
168 def _test_la_backlinks(self
, reveal
=False):
173 kwargs
= {'reveal_internals': 0}
175 u1
, u2
= self
.add_objects(2, 'user', 'u_%s' % tag
)
176 g1
, g2
= self
.add_objects(2, 'group', 'g_%s' % tag
)
178 self
.add_linked_attribute(g1
, u1
)
179 self
.add_linked_attribute(g2
, u1
)
180 self
.add_linked_attribute(g2
, u2
)
182 self
.assert_back_links(u1
, [g1
, g2
], **kwargs
)
183 self
.assert_back_links(u2
, [g2
], **kwargs
)
185 def test_la_backlinks(self
):
186 self
._test
_la
_backlinks
()
188 def test_la_backlinks_reveal(self
):
189 if opts
.no_reveal_internals
:
190 print 'skipping because --no-reveal-internals'
192 self
._test
_la
_backlinks
(True)
194 def _test_la_backlinks_delete_group(self
, reveal
=False):
199 kwargs
= {'reveal_internals': 0}
201 u1
, u2
= self
.add_objects(2, 'user', 'u_' + tag
)
202 g1
, g2
= self
.add_objects(2, 'group', 'g_' + tag
)
204 self
.add_linked_attribute(g1
, u1
)
205 self
.add_linked_attribute(g2
, u1
)
206 self
.add_linked_attribute(g2
, u2
)
208 self
.samdb
.delete(g2
, ['tree_delete:1'])
210 self
.assert_back_links(u1
, [g1
], **kwargs
)
211 self
.assert_back_links(u2
, set(), **kwargs
)
213 def test_la_backlinks_delete_group(self
):
214 self
._test
_la
_backlinks
_delete
_group
()
216 def test_la_backlinks_delete_group_reveal(self
):
217 if opts
.no_reveal_internals
:
218 print 'skipping because --no-reveal-internals'
220 self
._test
_la
_backlinks
_delete
_group
(True)
222 def test_links_all_delete_group(self
):
223 u1
, u2
= self
.add_objects(2, 'user', 'u_all_del_group')
224 g1
, g2
= self
.add_objects(2, 'group', 'g_all_del_group')
225 g2guid
= self
.get_object_guid(g2
)
227 self
.add_linked_attribute(g1
, u1
)
228 self
.add_linked_attribute(g2
, u1
)
229 self
.add_linked_attribute(g2
, u2
)
231 self
.samdb
.delete(g2
)
232 self
.assert_back_links(u1
, [g1
], show_deleted
=1, show_recycled
=1,
233 show_deactivated_link
=0)
234 self
.assert_back_links(u2
, set(), show_deleted
=1, show_recycled
=1,
235 show_deactivated_link
=0)
236 self
.assert_forward_links(g1
, [u1
], show_deleted
=1, show_recycled
=1,
237 show_deactivated_link
=0)
238 self
.assert_forward_links('<GUID=%s>' % g2guid
,
239 [], show_deleted
=1, show_recycled
=1,
240 show_deactivated_link
=0)
242 def test_links_all_delete_group_reveal(self
):
243 u1
, u2
= self
.add_objects(2, 'user', 'u_all_del_group_reveal')
244 g1
, g2
= self
.add_objects(2, 'group', 'g_all_del_group_reveal')
245 g2guid
= self
.get_object_guid(g2
)
247 self
.add_linked_attribute(g1
, u1
)
248 self
.add_linked_attribute(g2
, u1
)
249 self
.add_linked_attribute(g2
, u2
)
251 self
.samdb
.delete(g2
)
252 self
.assert_back_links(u1
, [g1
], show_deleted
=1, show_recycled
=1,
253 show_deactivated_link
=0,
255 self
.assert_back_links(u2
, set(), show_deleted
=1, show_recycled
=1,
256 show_deactivated_link
=0,
258 self
.assert_forward_links(g1
, [u1
], show_deleted
=1, show_recycled
=1,
259 show_deactivated_link
=0,
261 self
.assert_forward_links('<GUID=%s>' % g2guid
,
262 [], show_deleted
=1, show_recycled
=1,
263 show_deactivated_link
=0,
266 def test_la_links_delete_link(self
):
267 u1
, u2
= self
.add_objects(2, 'user', 'u_del_link')
268 g1
, g2
= self
.add_objects(2, 'group', 'g_del_link')
270 res
= self
.samdb
.search(g1
, scope
=ldb
.SCOPE_BASE
,
271 attrs
=['uSNChanged'])
272 old_usn1
= int(res
[0]['uSNChanged'][0])
274 self
.add_linked_attribute(g1
, u1
)
276 res
= self
.samdb
.search(g1
, scope
=ldb
.SCOPE_BASE
,
277 attrs
=['uSNChanged'])
278 new_usn1
= int(res
[0]['uSNChanged'][0])
280 self
.assertNotEqual(old_usn1
, new_usn1
, "USN should have incremented")
282 self
.add_linked_attribute(g2
, u1
)
283 self
.add_linked_attribute(g2
, u2
)
285 res
= self
.samdb
.search(g2
, scope
=ldb
.SCOPE_BASE
,
286 attrs
=['uSNChanged'])
287 old_usn2
= int(res
[0]['uSNChanged'][0])
289 self
.remove_linked_attribute(g2
, u1
)
291 res
= self
.samdb
.search(g2
, scope
=ldb
.SCOPE_BASE
,
292 attrs
=['uSNChanged'])
293 new_usn2
= int(res
[0]['uSNChanged'][0])
295 self
.assertNotEqual(old_usn2
, new_usn2
, "USN should have incremented")
297 self
.assert_forward_links(g1
, [u1
])
298 self
.assert_forward_links(g2
, [u2
])
300 self
.add_linked_attribute(g2
, u1
)
301 self
.assert_forward_links(g2
, [u1
, u2
])
302 self
.remove_linked_attribute(g2
, u2
)
303 self
.assert_forward_links(g2
, [u1
])
304 self
.remove_linked_attribute(g2
, u1
)
305 self
.assert_forward_links(g2
, [])
306 self
.remove_linked_attribute(g1
, [])
307 self
.assert_forward_links(g1
, [])
309 # removing a duplicate link in the same message should fail
310 self
.add_linked_attribute(g2
, [u1
, u2
])
311 self
.assertRaises(ldb
.LdbError
,
312 self
.remove_linked_attribute
,g2
, [u1
, u1
])
314 def _test_la_links_delete_link_reveal(self
):
315 u1
, u2
= self
.add_objects(2, 'user', 'u_del_link_reveal')
316 g1
, g2
= self
.add_objects(2, 'group', 'g_del_link_reveal')
318 self
.add_linked_attribute(g1
, u1
)
319 self
.add_linked_attribute(g2
, u1
)
320 self
.add_linked_attribute(g2
, u2
)
322 self
.remove_linked_attribute(g2
, u1
)
324 self
.assert_forward_links(g2
, [u1
, u2
], show_deleted
=1,
326 show_deactivated_link
=0,
330 def test_la_links_delete_link_reveal(self
):
331 if opts
.no_reveal_internals
:
332 print 'skipping because --no-reveal-internals'
334 self
._test
_la
_links
_delete
_link
_reveal
()
336 def test_la_links_delete_user(self
):
337 u1
, u2
= self
.add_objects(2, 'user', 'u_del_user')
338 g1
, g2
= self
.add_objects(2, 'group', 'g_del_user')
340 self
.add_linked_attribute(g1
, u1
)
341 self
.add_linked_attribute(g2
, u1
)
342 self
.add_linked_attribute(g2
, u2
)
344 res
= self
.samdb
.search(g1
, scope
=ldb
.SCOPE_BASE
,
345 attrs
=['uSNChanged'])
346 old_usn1
= int(res
[0]['uSNChanged'][0])
348 res
= self
.samdb
.search(g2
, scope
=ldb
.SCOPE_BASE
,
349 attrs
=['uSNChanged'])
350 old_usn2
= int(res
[0]['uSNChanged'][0])
352 self
.samdb
.delete(u1
)
354 self
.assert_forward_links(g1
, [])
355 self
.assert_forward_links(g2
, [u2
])
357 res
= self
.samdb
.search(g1
, scope
=ldb
.SCOPE_BASE
,
358 attrs
=['uSNChanged'])
359 new_usn1
= int(res
[0]['uSNChanged'][0])
361 res
= self
.samdb
.search(g2
, scope
=ldb
.SCOPE_BASE
,
362 attrs
=['uSNChanged'])
363 new_usn2
= int(res
[0]['uSNChanged'][0])
365 # Assert the USN on the alternate object is unchanged
366 self
.assertEqual(old_usn1
, new_usn1
)
367 self
.assertEqual(old_usn2
, new_usn2
)
369 def test_la_links_delete_user_reveal(self
):
370 u1
, u2
= self
.add_objects(2, 'user', 'u_del_user_reveal')
371 g1
, g2
= self
.add_objects(2, 'group', 'g_del_user_reveal')
373 self
.add_linked_attribute(g1
, u1
)
374 self
.add_linked_attribute(g2
, u1
)
375 self
.add_linked_attribute(g2
, u2
)
377 self
.samdb
.delete(u1
)
379 self
.assert_forward_links(g2
, [u2
],
380 show_deleted
=1, show_recycled
=1,
381 show_deactivated_link
=0,
383 self
.assert_forward_links(g1
, [],
384 show_deleted
=1, show_recycled
=1,
385 show_deactivated_link
=0,
388 def test_multiple_links(self
):
389 u1
, u2
, u3
, u4
= self
.add_objects(4, 'user', 'u_multiple_links')
390 g1
, g2
, g3
, g4
= self
.add_objects(4, 'group', 'g_multiple_links')
392 self
.add_linked_attribute(g1
, [u1
, u2
, u3
, u4
])
393 self
.add_linked_attribute(g2
, [u3
, u1
])
394 self
.add_linked_attribute(g3
, u2
)
397 # adding u2 twice should be an error
398 self
.add_linked_attribute(g2
, [u1
, u2
, u3
, u2
])
399 except ldb
.LdbError
as (num
, msg
):
400 if num
!= ldb
.ERR_ENTRY_ALREADY_EXISTS
:
401 self
.fail("adding duplicate values, expected "
402 "ERR_ENTRY_ALREADY_EXISTS, (%d) "
403 "got %d" % (ldb
.ERR_ENTRY_ALREADY_EXISTS
, num
))
405 self
.fail("adding duplicate values succeed when it shouldn't")
407 self
.assert_forward_links(g1
, [u1
, u2
, u3
, u4
])
408 self
.assert_forward_links(g2
, [u3
, u1
])
409 self
.assert_forward_links(g3
, [u2
])
410 self
.assert_back_links(u1
, [g2
, g1
])
411 self
.assert_back_links(u2
, [g3
, g1
])
412 self
.assert_back_links(u3
, [g2
, g1
])
413 self
.assert_back_links(u4
, [g1
])
415 self
.remove_linked_attribute(g2
, [u1
, u3
])
416 self
.remove_linked_attribute(g1
, [u1
, u3
])
418 self
.assert_forward_links(g1
, [u2
, u4
])
419 self
.assert_forward_links(g2
, [])
420 self
.assert_forward_links(g3
, [u2
])
421 self
.assert_back_links(u1
, [])
422 self
.assert_back_links(u2
, [g3
, g1
])
423 self
.assert_back_links(u3
, [])
424 self
.assert_back_links(u4
, [g1
])
426 self
.add_linked_attribute(g1
, [u1
, u3
])
427 self
.add_linked_attribute(g2
, [u3
, u1
])
428 self
.add_linked_attribute(g3
, [u1
, u3
])
430 self
.assert_forward_links(g1
, [u1
, u2
, u3
, u4
])
431 self
.assert_forward_links(g2
, [u1
, u3
])
432 self
.assert_forward_links(g3
, [u1
, u2
, u3
])
433 self
.assert_back_links(u1
, [g1
, g2
, g3
])
434 self
.assert_back_links(u2
, [g3
, g1
])
435 self
.assert_back_links(u3
, [g3
, g2
, g1
])
436 self
.assert_back_links(u4
, [g1
])
438 def test_la_links_replace(self
):
439 u1
, u2
, u3
, u4
= self
.add_objects(4, 'user', 'u_replace')
440 g1
, g2
, g3
, g4
= self
.add_objects(4, 'group', 'g_replace')
442 self
.add_linked_attribute(g1
, [u1
, u2
])
443 self
.add_linked_attribute(g2
, [u1
, u3
])
444 self
.add_linked_attribute(g3
, u1
)
446 self
.replace_linked_attribute(g1
, [u2
])
447 self
.replace_linked_attribute(g2
, [u2
, u3
])
448 self
.replace_linked_attribute(g3
, [u1
, u3
])
449 self
.replace_linked_attribute(g4
, [u4
])
451 self
.assert_forward_links(g1
, [u2
])
452 self
.assert_forward_links(g2
, [u3
, u2
])
453 self
.assert_forward_links(g3
, [u3
, u1
])
454 self
.assert_forward_links(g4
, [u4
])
455 self
.assert_back_links(u1
, [g3
])
456 self
.assert_back_links(u2
, [g1
, g2
])
457 self
.assert_back_links(u3
, [g2
, g3
])
458 self
.assert_back_links(u4
, [g4
])
460 self
.replace_linked_attribute(g1
, [u1
, u2
, u3
])
461 self
.replace_linked_attribute(g2
, [u1
])
462 self
.replace_linked_attribute(g3
, [u2
])
463 self
.replace_linked_attribute(g4
, [])
465 self
.assert_forward_links(g1
, [u1
, u2
, u3
])
466 self
.assert_forward_links(g2
, [u1
])
467 self
.assert_forward_links(g3
, [u2
])
468 self
.assert_forward_links(g4
, [])
469 self
.assert_back_links(u1
, [g1
, g2
])
470 self
.assert_back_links(u2
, [g1
, g3
])
471 self
.assert_back_links(u3
, [g1
])
472 self
.assert_back_links(u4
, [])
475 # adding u2 twice should be an error
476 self
.replace_linked_attribute(g2
, [u1
, u2
, u3
, u2
])
477 except ldb
.LdbError
as (num
, msg
):
478 if num
!= ldb
.ERR_ENTRY_ALREADY_EXISTS
:
479 self
.fail("adding duplicate values, expected "
480 "ERR_ENTRY_ALREADY_EXISTS, (%d) "
481 "got %d" % (ldb
.ERR_ENTRY_ALREADY_EXISTS
, num
))
483 self
.fail("replacing duplicate values succeeded when it shouldn't")
485 def test_la_links_replace2(self
):
486 users
= self
.add_objects(12, 'user', 'u_replace2')
487 g1
, = self
.add_objects(1, 'group', 'g_replace2')
489 self
.add_linked_attribute(g1
, users
[:6])
490 self
.assert_forward_links(g1
, users
[:6])
491 self
.replace_linked_attribute(g1
, users
)
492 self
.assert_forward_links(g1
, users
)
493 self
.replace_linked_attribute(g1
, users
[6:])
494 self
.assert_forward_links(g1
, users
[6:])
495 self
.remove_linked_attribute(g1
, users
[6:9])
496 self
.assert_forward_links(g1
, users
[9:])
497 self
.remove_linked_attribute(g1
, users
[9:])
498 self
.assert_forward_links(g1
, [])
500 def test_la_links_permutations(self
):
501 """Make sure the order in which we add links doesn't matter."""
502 users
= self
.add_objects(3, 'user', 'u_permutations')
503 groups
= self
.add_objects(6, 'group', 'g_permutations')
505 for g
, p
in zip(groups
, itertools
.permutations(users
)):
506 self
.add_linked_attribute(g
, p
)
508 # everyone should be in every group
510 self
.assert_forward_links(g
, users
)
513 self
.assert_back_links(u
, groups
)
515 for g
, p
in zip(groups
[::-1], itertools
.permutations(users
)):
516 self
.replace_linked_attribute(g
, p
)
519 self
.assert_forward_links(g
, users
)
522 self
.assert_back_links(u
, groups
)
524 for g
, p
in zip(groups
, itertools
.permutations(users
)):
525 self
.remove_linked_attribute(g
, p
)
528 self
.assert_forward_links(g
, [])
531 self
.assert_back_links(u
, [])
533 def test_la_links_relaxed(self
):
534 """Check that the relax control doesn't mess with linked attributes."""
535 relax_control
= ['relax:0']
537 users
= self
.add_objects(10, 'user', 'u_relax')
538 groups
= self
.add_objects(3, 'group', 'g_relax',
539 more_attrs
={'member': users
[:2]})
540 g_relax1
, g_relax2
, g_uptight
= groups
542 # g_relax1 has all users added at once
543 # g_relax2 gets them one at a time in reverse order
544 # g_uptight never relaxes
546 self
.add_linked_attribute(g_relax1
, users
[2:5], controls
=relax_control
)
548 for u
in reversed(users
[2:5]):
549 self
.add_linked_attribute(g_relax2
, u
, controls
=relax_control
)
550 self
.add_linked_attribute(g_uptight
, u
)
553 self
.assert_forward_links(g
, users
[:5])
555 self
.add_linked_attribute(g
, users
[5:7])
556 self
.assert_forward_links(g
, users
[:7])
559 self
.add_linked_attribute(g
, u
)
561 self
.assert_forward_links(g
, users
)
564 self
.assert_back_links(u
, groups
)
566 # try some replacement permutations
571 random
.shuffle(users2
)
572 self
.replace_linked_attribute(g_relax1
, users2
,
573 controls
=relax_control
)
575 self
.assert_forward_links(g_relax1
, users
)
578 random
.shuffle(users2
)
579 self
.remove_linked_attribute(g_relax2
, users2
,
580 controls
=relax_control
)
581 self
.remove_linked_attribute(g_uptight
, users2
)
583 self
.replace_linked_attribute(g_relax1
, [], controls
=relax_control
)
585 random
.shuffle(users2
)
586 self
.add_linked_attribute(g_relax2
, users2
,
587 controls
=relax_control
)
588 self
.add_linked_attribute(g_uptight
, users2
)
589 self
.replace_linked_attribute(g_relax1
, users2
,
590 controls
=relax_control
)
592 self
.assert_forward_links(g_relax1
, users
)
593 self
.assert_forward_links(g_relax2
, users
)
594 self
.assert_forward_links(g_uptight
, users
)
597 self
.assert_back_links(u
, groups
)
599 def test_add_all_at_once(self
):
600 """All these other tests are creating linked attributes after the
601 objects are there. We want to test creating them all at once
604 users
= self
.add_objects(7, 'user', 'u_all_at_once')
605 g1
, g3
= self
.add_objects(2, 'group', 'g_all_at_once',
606 more_attrs
={'member': users
})
607 (g2
,) = self
.add_objects(1, 'group', 'g_all_at_once2',
608 more_attrs
={'member': users
[:5]})
611 self
.add_objects(1, 'group', 'g_with_duplicate_links',
612 more_attrs
={'member': users
[:5] + users
[1:2]})
613 except ldb
.LdbError
as (num
, msg
):
614 if num
!= ldb
.ERR_ENTRY_ALREADY_EXISTS
:
615 self
.fail("adding duplicate values, expected "
616 "ERR_ENTRY_ALREADY_EXISTS, (%d) "
617 "got %d" % (ldb
.ERR_ENTRY_ALREADY_EXISTS
, num
))
619 self
.assert_forward_links(g1
, users
)
620 self
.assert_forward_links(g2
, users
[:5])
621 self
.assert_forward_links(g3
, users
)
623 self
.assert_back_links(u
, [g1
, g2
, g3
])
625 self
.assert_back_links(u
, [g1
, g3
])
627 self
.remove_linked_attribute(g2
, users
[0])
628 self
.remove_linked_attribute(g2
, users
[1])
629 self
.add_linked_attribute(g2
, users
[1])
630 self
.add_linked_attribute(g2
, users
[5])
631 self
.add_linked_attribute(g2
, users
[6])
633 self
.assert_forward_links(g1
, users
)
634 self
.assert_forward_links(g2
, users
[1:])
637 self
.remove_linked_attribute(g2
, u
)
638 self
.remove_linked_attribute(g1
, users
)
643 self
.assert_forward_links(g1
, [])
644 self
.assert_forward_links(g2
, [])
645 self
.assert_forward_links(g3
, [])
647 def test_one_way_attributes(self
):
648 e1
, e2
= self
.add_objects(2, 'msExchConfigurationContainer',
650 guid
= self
.get_object_guid(e2
)
652 self
.add_linked_attribute(e1
, e2
, attr
="addressBookRoots")
653 self
.assert_forward_links(e1
, [e2
], attr
='addressBookRoots')
655 self
.samdb
.delete(e2
)
657 res
= self
.samdb
.search("<GUID=%s>" % guid
,
658 scope
=ldb
.SCOPE_BASE
,
659 controls
=['show_deleted:1',
662 new_dn
= str(res
[0].dn
)
663 self
.assert_forward_links(e1
, [new_dn
], attr
='addressBookRoots')
664 self
.assert_forward_links(e1
, [new_dn
],
665 attr
='addressBookRoots',
666 show_deactivated_link
=0)
668 def test_one_way_attributes_delete_link(self
):
669 e1
, e2
= self
.add_objects(2, 'msExchConfigurationContainer',
671 guid
= self
.get_object_guid(e2
)
673 self
.add_linked_attribute(e1
, e2
, attr
="addressBookRoots")
674 self
.assert_forward_links(e1
, [e2
], attr
='addressBookRoots')
676 self
.remove_linked_attribute(e1
, e2
, attr
="addressBookRoots")
678 self
.assert_forward_links(e1
, [], attr
='addressBookRoots')
679 self
.assert_forward_links(e1
, [], attr
='addressBookRoots',
680 show_deactivated_link
=0)
682 def test_pretend_one_way_attributes(self
):
683 e1
, e2
= self
.add_objects(2, 'msExchConfigurationContainer',
685 guid
= self
.get_object_guid(e2
)
687 self
.add_linked_attribute(e1
, e2
, attr
="addressBookRoots2")
688 self
.assert_forward_links(e1
, [e2
], attr
='addressBookRoots2')
690 self
.samdb
.delete(e2
)
691 res
= self
.samdb
.search("<GUID=%s>" % guid
,
692 scope
=ldb
.SCOPE_BASE
,
693 controls
=['show_deleted:1',
696 new_dn
= str(res
[0].dn
)
698 self
.assert_forward_links(e1
, [], attr
='addressBookRoots2')
699 self
.assert_forward_links(e1
, [], attr
='addressBookRoots2',
700 show_deactivated_link
=0)
702 def test_pretend_one_way_attributes_delete_link(self
):
703 e1
, e2
= self
.add_objects(2, 'msExchConfigurationContainer',
705 guid
= self
.get_object_guid(e2
)
707 self
.add_linked_attribute(e1
, e2
, attr
="addressBookRoots2")
708 self
.assert_forward_links(e1
, [e2
], attr
='addressBookRoots2')
710 self
.remove_linked_attribute(e1
, e2
, attr
="addressBookRoots2")
712 self
.assert_forward_links(e1
, [], attr
='addressBookRoots2')
713 self
.assert_forward_links(e1
, [], attr
='addressBookRoots2',
714 show_deactivated_link
=0)
716 if "://" not in host
:
717 if os
.path
.isfile(host
):
718 host
= "tdb://%s" % host
720 host
= "ldap://%s" % host
723 TestProgram(module
=__name__
, opts
=subunitopts
)