2 # -*- coding: utf-8 -*-
3 # This is unit with tests for LDAP access checks
10 sys
.path
.append("bin/python")
12 samba
.ensure_external_module("testtools", "testtools")
13 samba
.ensure_external_module("subunit", "subunit/python")
15 import samba
.getopt
as options
18 SCOPE_BASE
, SCOPE_SUBTREE
, LdbError
, ERR_NO_SUCH_OBJECT
,
19 ERR_UNWILLING_TO_PERFORM
, ERR_INSUFFICIENT_ACCESS_RIGHTS
)
20 from ldb
import ERR_CONSTRAINT_VIOLATION
21 from ldb
import ERR_OPERATIONS_ERROR
22 from ldb
import Message
, MessageElement
, Dn
23 from ldb
import FLAG_MOD_REPLACE
, FLAG_MOD_DELETE
24 from samba
.ndr
import ndr_pack
, ndr_unpack
25 from samba
.dcerpc
import security
27 from samba
.auth
import system_session
28 from samba
import gensec
29 from samba
.samdb
import SamDB
30 from samba
.credentials
import Credentials
32 from subunit
.run
import SubunitTestRunner
35 parser
= optparse
.OptionParser("acl.py [options] <host>")
36 sambaopts
= options
.SambaOptions(parser
)
37 parser
.add_option_group(sambaopts
)
38 parser
.add_option_group(options
.VersionOptions(parser
))
40 # use command line creds if available
41 credopts
= options
.CredentialsOptions(parser
)
42 parser
.add_option_group(credopts
)
43 opts
, args
= parser
.parse_args()
51 lp
= sambaopts
.get_loadparm()
52 creds
= credopts
.get_credentials(lp
)
53 creds
.set_gensec_features(creds
.get_gensec_features() | gensec
.FEATURE_SEAL
)
59 class AclTests(samba
.tests
.TestCase
):
61 def delete_force(self
, ldb
, dn
):
64 except LdbError
, (num
, _
):
65 self
.assertEquals(num
, ERR_NO_SUCH_OBJECT
)
67 def find_domain_sid(self
, ldb
):
68 res
= ldb
.search(base
=self
.base_dn
, expression
="(objectClass=*)", scope
=SCOPE_BASE
)
69 return ndr_unpack(security
.dom_sid
,res
[0]["objectSid"][0])
72 super(AclTests
, self
).setUp()
74 self
.base_dn
= ldb
.domain_dn()
75 self
.domain_sid
= self
.find_domain_sid(self
.ldb_admin
)
76 self
.user_pass
= "samba123@"
77 self
.configuration_dn
= self
.ldb_admin
.get_config_basedn().get_linearized()
78 print "baseDN: %s" % self
.base_dn
80 def get_user_dn(self
, name
):
81 return "CN=%s,CN=Users,%s" % (name
, self
.base_dn
)
83 def modify_desc(self
, object_dn
, desc
):
84 """ Modify security descriptor using either SDDL string
85 or security.descriptor object
87 assert(isinstance(desc
, str) or isinstance(desc
, security
.descriptor
))
89 dn: """ + object_dn
+ """
91 replace: nTSecurityDescriptor
93 if isinstance(desc
, str):
94 mod
+= "nTSecurityDescriptor: %s" % desc
95 elif isinstance(desc
, security
.descriptor
):
96 mod
+= "nTSecurityDescriptor:: %s" % base64
.b64encode(ndr_pack(desc
))
97 self
.ldb_admin
.modify_ldif(mod
)
100 def create_active_user(self
, _ldb
, user_dn
):
102 dn: """ + user_dn
+ """
103 sAMAccountName: """ + user_dn
.split(",")[0][3:] + """
105 unicodePwd:: """ + base64
.b64encode("\"samba123@\"".encode('utf-16-le')) + """
110 def create_test_user(self
, _ldb
, user_dn
, desc
=None):
112 dn: """ + user_dn
+ """
113 sAMAccountName: """ + user_dn
.split(",")[0][3:] + """
115 userPassword: """ + self
.user_pass
+ """
119 assert(isinstance(desc
, str) or isinstance(desc
, security
.descriptor
))
120 if isinstance(desc
, str):
121 ldif
+= "nTSecurityDescriptor: %s" % desc
122 elif isinstance(desc
, security
.descriptor
):
123 ldif
+= "nTSecurityDescriptor:: %s" % base64
.b64encode(ndr_pack(desc
))
126 def create_group(self
, _ldb
, group_dn
, desc
=None):
128 dn: """ + group_dn
+ """
130 sAMAccountName: """ + group_dn
.split(",")[0][3:] + """
135 assert(isinstance(desc
, str) or isinstance(desc
, security
.descriptor
))
136 if isinstance(desc
, str):
137 ldif
+= "nTSecurityDescriptor: %s" % desc
138 elif isinstance(desc
, security
.descriptor
):
139 ldif
+= "nTSecurityDescriptor:: %s" % base64
.b64encode(ndr_pack(desc
))
142 def create_security_group(self
, _ldb
, group_dn
, desc
=None):
144 dn: """ + group_dn
+ """
146 sAMAccountName: """ + group_dn
.split(",")[0][3:] + """
147 groupType: -2147483646
151 assert(isinstance(desc
, str) or isinstance(desc
, security
.descriptor
))
152 if isinstance(desc
, str):
153 ldif
+= "nTSecurityDescriptor: %s" % desc
154 elif isinstance(desc
, security
.descriptor
):
155 ldif
+= "nTSecurityDescriptor:: %s" % base64
.b64encode(ndr_pack(desc
))
158 def read_desc(self
, object_dn
):
159 res
= self
.ldb_admin
.search(object_dn
, SCOPE_BASE
, None, ["nTSecurityDescriptor"])
160 desc
= res
[0]["nTSecurityDescriptor"][0]
161 return ndr_unpack(security
.descriptor
, desc
)
163 def get_ldb_connection(self
, target_username
, target_password
):
164 creds_tmp
= Credentials()
165 creds_tmp
.set_username(target_username
)
166 creds_tmp
.set_password(target_password
)
167 creds_tmp
.set_domain(creds
.get_domain())
168 creds_tmp
.set_realm(creds
.get_realm())
169 creds_tmp
.set_workstation(creds
.get_workstation())
170 creds_tmp
.set_gensec_features(creds_tmp
.get_gensec_features()
171 | gensec
.FEATURE_SEAL
)
172 ldb_target
= SamDB(url
=host
, credentials
=creds_tmp
, lp
=lp
)
175 def get_object_sid(self
, object_dn
):
176 res
= self
.ldb_admin
.search(object_dn
)
177 return ndr_unpack(security
.dom_sid
, res
[0]["objectSid"][0])
179 def dacl_add_ace(self
, object_dn
, ace
):
180 desc
= self
.read_desc(object_dn
)
181 desc_sddl
= desc
.as_sddl(self
.domain_sid
)
184 if desc_sddl
.find("(") >= 0:
185 desc_sddl
= desc_sddl
[:desc_sddl
.index("(")] + ace
+ desc_sddl
[desc_sddl
.index("("):]
187 desc_sddl
= desc_sddl
+ ace
188 self
.modify_desc(object_dn
, desc_sddl
)
190 def get_desc_sddl(self
, object_dn
):
191 """ Return object nTSecutiryDescriptor in SDDL format
193 desc
= self
.read_desc(object_dn
)
194 return desc
.as_sddl(self
.domain_sid
)
196 # Test if we have any additional groups for users than default ones
197 def assert_user_no_group_member(self
, username
):
198 res
= self
.ldb_admin
.search(self
.base_dn
, expression
="(distinguishedName=%s)" % self
.get_user_dn(username
))
200 self
.assertEqual(res
[0]["memberOf"][0], "")
206 def create_enable_user(self
, username
):
207 self
.create_active_user(self
.ldb_admin
, self
.get_user_dn(username
))
208 self
.ldb_admin
.enable_account("(sAMAccountName=" + username
+ ")")
210 #tests on ldap add operations
211 class AclAddTests(AclTests
):
214 super(AclAddTests
, self
).setUp()
215 # Domain admin that will be creator of OU parent-child structure
216 self
.usr_admin_owner
= "acl_add_user1"
217 # Second domain admin that will not be creator of OU parent-child structure
218 self
.usr_admin_not_owner
= "acl_add_user2"
220 self
.regular_user
= "acl_add_user3"
221 self
.create_enable_user(self
.usr_admin_owner
)
222 self
.create_enable_user(self
.usr_admin_not_owner
)
223 self
.create_enable_user(self
.regular_user
)
225 # add admins to the Domain Admins group
226 self
.ldb_admin
.add_remove_group_members("Domain Admins", self
.usr_admin_owner
,
227 add_members_operation
=True)
228 self
.ldb_admin
.add_remove_group_members("Domain Admins", self
.usr_admin_not_owner
,
229 add_members_operation
=True)
231 self
.ldb_owner
= self
.get_ldb_connection(self
.usr_admin_owner
, self
.user_pass
)
232 self
.ldb_notowner
= self
.get_ldb_connection(self
.usr_admin_not_owner
, self
.user_pass
)
233 self
.ldb_user
= self
.get_ldb_connection(self
.regular_user
, self
.user_pass
)
236 super(AclAddTests
, self
).tearDown()
237 self
.delete_force(self
.ldb_admin
, "CN=test_add_user1,OU=test_add_ou2,OU=test_add_ou1," + self
.base_dn
)
238 self
.delete_force(self
.ldb_admin
, "CN=test_add_group1,OU=test_add_ou2,OU=test_add_ou1," + self
.base_dn
)
239 self
.delete_force(self
.ldb_admin
, "OU=test_add_ou2,OU=test_add_ou1," + self
.base_dn
)
240 self
.delete_force(self
.ldb_admin
, "OU=test_add_ou1," + self
.base_dn
)
241 self
.delete_force(self
.ldb_admin
, self
.get_user_dn(self
.usr_admin_owner
))
242 self
.delete_force(self
.ldb_admin
, self
.get_user_dn(self
.usr_admin_not_owner
))
243 self
.delete_force(self
.ldb_admin
, self
.get_user_dn(self
.regular_user
))
245 # Make sure top OU is deleted (and so everything under it)
246 def assert_top_ou_deleted(self
):
247 res
= self
.ldb_admin
.search(self
.base_dn
,
248 expression
="(distinguishedName=%s,%s)" % (
249 "OU=test_add_ou1", self
.base_dn
))
250 self
.assertEqual(res
, [])
252 def test_add_u1(self
):
253 """Testing OU with the rights of Doman Admin not creator of the OU """
254 self
.assert_top_ou_deleted()
255 # Change descriptor for top level OU
256 self
.ldb_owner
.create_ou("OU=test_add_ou1," + self
.base_dn
)
257 self
.ldb_owner
.create_ou("OU=test_add_ou2,OU=test_add_ou1," + self
.base_dn
)
258 user_sid
= self
.get_object_sid(self
.get_user_dn(self
.usr_admin_not_owner
))
259 mod
= "(D;CI;WPCC;;;%s)" % str(user_sid
)
260 self
.dacl_add_ace("OU=test_add_ou1," + self
.base_dn
, mod
)
261 # Test user and group creation with another domain admin's credentials
262 self
.create_test_user(self
.ldb_notowner
, "CN=test_add_user1,OU=test_add_ou2,OU=test_add_ou1," + self
.base_dn
)
263 self
.create_group(self
.ldb_notowner
, "CN=test_add_group1,OU=test_add_ou2,OU=test_add_ou1," + self
.base_dn
)
264 # Make sure we HAVE created the two objects -- user and group
265 # !!! We should not be able to do that, but however beacuse of ACE ordering our inherited Deny ACE
266 # !!! comes after explicit (A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;DA) that comes from somewhere
267 res
= self
.ldb_admin
.search(self
.base_dn
, expression
="(distinguishedName=%s,%s)" % ("CN=test_add_user1,OU=test_add_ou2,OU=test_add_ou1", self
.base_dn
))
268 self
.assertTrue(len(res
) > 0)
269 res
= self
.ldb_admin
.search(self
.base_dn
, expression
="(distinguishedName=%s,%s)" % ("CN=test_add_group1,OU=test_add_ou2,OU=test_add_ou1", self
.base_dn
))
270 self
.assertTrue(len(res
) > 0)
272 def test_add_u2(self
):
273 """Testing OU with the regular user that has no rights granted over the OU """
274 self
.assert_top_ou_deleted()
275 # Create a parent-child OU structure with domain admin credentials
276 self
.ldb_owner
.create_ou("OU=test_add_ou1," + self
.base_dn
)
277 self
.ldb_owner
.create_ou("OU=test_add_ou2,OU=test_add_ou1," + self
.base_dn
)
278 # Test user and group creation with regular user credentials
280 self
.create_test_user(self
.ldb_user
, "CN=test_add_user1,OU=test_add_ou2,OU=test_add_ou1," + self
.base_dn
)
281 self
.create_group(self
.ldb_user
, "CN=test_add_group1,OU=test_add_ou2,OU=test_add_ou1," + self
.base_dn
)
282 except LdbError
, (num
, _
):
283 self
.assertEquals(num
, ERR_INSUFFICIENT_ACCESS_RIGHTS
)
286 # Make sure we HAVEN'T created any of two objects -- user or group
287 res
= self
.ldb_admin
.search(self
.base_dn
, expression
="(distinguishedName=%s,%s)" % ("CN=test_add_user1,OU=test_add_ou2,OU=test_add_ou1", self
.base_dn
))
288 self
.assertEqual(res
, [])
289 res
= self
.ldb_admin
.search(self
.base_dn
, expression
="(distinguishedName=%s,%s)" % ("CN=test_add_group1,OU=test_add_ou2,OU=test_add_ou1", self
.base_dn
))
290 self
.assertEqual(res
, [])
292 def test_add_u3(self
):
293 """Testing OU with the rights of regular user granted the right 'Create User child objects' """
294 self
.assert_top_ou_deleted()
295 # Change descriptor for top level OU
296 self
.ldb_owner
.create_ou("OU=test_add_ou1," + self
.base_dn
)
297 user_sid
= self
.get_object_sid(self
.get_user_dn(self
.regular_user
))
298 mod
= "(OA;CI;CC;bf967aba-0de6-11d0-a285-00aa003049e2;;%s)" % str(user_sid
)
299 self
.dacl_add_ace("OU=test_add_ou1," + self
.base_dn
, mod
)
300 self
.ldb_owner
.create_ou("OU=test_add_ou2,OU=test_add_ou1," + self
.base_dn
)
301 # Test user and group creation with granted user only to one of the objects
302 self
.create_test_user(self
.ldb_user
, "CN=test_add_user1,OU=test_add_ou2,OU=test_add_ou1," + self
.base_dn
)
304 self
.create_group(self
.ldb_user
, "CN=test_add_group1,OU=test_add_ou2,OU=test_add_ou1," + self
.base_dn
)
305 except LdbError
, (num
, _
):
306 self
.assertEquals(num
, ERR_INSUFFICIENT_ACCESS_RIGHTS
)
309 # Make sure we HAVE created the one of two objects -- user
310 res
= self
.ldb_admin
.search(self
.base_dn
,
311 expression
="(distinguishedName=%s,%s)" %
312 ("CN=test_add_user1,OU=test_add_ou2,OU=test_add_ou1",
314 self
.assertNotEqual(len(res
), 0)
315 res
= self
.ldb_admin
.search(self
.base_dn
,
316 expression
="(distinguishedName=%s,%s)" %
317 ("CN=test_add_group1,OU=test_add_ou2,OU=test_add_ou1",
319 self
.assertEqual(res
, [])
321 def test_add_u4(self
):
322 """ 4 Testing OU with the rights of Doman Admin creator of the OU"""
323 self
.assert_top_ou_deleted()
324 self
.ldb_owner
.create_ou("OU=test_add_ou1," + self
.base_dn
)
325 self
.ldb_owner
.create_ou("OU=test_add_ou2,OU=test_add_ou1," + self
.base_dn
)
326 self
.create_test_user(self
.ldb_owner
, "CN=test_add_user1,OU=test_add_ou2,OU=test_add_ou1," + self
.base_dn
)
327 self
.create_group(self
.ldb_owner
, "CN=test_add_group1,OU=test_add_ou2,OU=test_add_ou1," + self
.base_dn
)
328 # Make sure we have successfully created the two objects -- user and group
329 res
= self
.ldb_admin
.search(self
.base_dn
, expression
="(distinguishedName=%s,%s)" % ("CN=test_add_user1,OU=test_add_ou2,OU=test_add_ou1", self
.base_dn
))
330 self
.assertTrue(len(res
) > 0)
331 res
= self
.ldb_admin
.search(self
.base_dn
,
332 expression
="(distinguishedName=%s,%s)" % ("CN=test_add_group1,OU=test_add_ou2,OU=test_add_ou1", self
.base_dn
))
333 self
.assertTrue(len(res
) > 0)
335 #tests on ldap modify operations
336 class AclModifyTests(AclTests
):
339 super(AclModifyTests
, self
).setUp()
340 self
.user_with_wp
= "acl_mod_user1"
341 self
.user_with_sm
= "acl_mod_user2"
342 self
.user_with_group_sm
= "acl_mod_user3"
343 self
.create_enable_user(self
.user_with_wp
)
344 self
.create_enable_user(self
.user_with_sm
)
345 self
.create_enable_user(self
.user_with_group_sm
)
346 self
.ldb_user
= self
.get_ldb_connection(self
.user_with_wp
, self
.user_pass
)
347 self
.ldb_user2
= self
.get_ldb_connection(self
.user_with_sm
, self
.user_pass
)
348 self
.ldb_user3
= self
.get_ldb_connection(self
.user_with_group_sm
, self
.user_pass
)
349 self
.user_sid
= self
.get_object_sid( self
.get_user_dn(self
.user_with_wp
))
350 self
.create_group(self
.ldb_admin
, "CN=test_modify_group2,CN=Users," + self
.base_dn
)
351 self
.create_group(self
.ldb_admin
, "CN=test_modify_group3,CN=Users," + self
.base_dn
)
352 self
.create_test_user(self
.ldb_admin
, self
.get_user_dn("test_modify_user2"))
355 super(AclModifyTests
, self
).tearDown()
356 self
.delete_force(self
.ldb_admin
, self
.get_user_dn("test_modify_user1"))
357 self
.delete_force(self
.ldb_admin
, "CN=test_modify_group1,CN=Users," + self
.base_dn
)
358 self
.delete_force(self
.ldb_admin
, "CN=test_modify_group2,CN=Users," + self
.base_dn
)
359 self
.delete_force(self
.ldb_admin
, "CN=test_modify_group3,CN=Users," + self
.base_dn
)
360 self
.delete_force(self
.ldb_admin
, "OU=test_modify_ou1," + self
.base_dn
)
361 self
.delete_force(self
.ldb_admin
, self
.get_user_dn(self
.user_with_wp
))
362 self
.delete_force(self
.ldb_admin
, self
.get_user_dn(self
.user_with_sm
))
363 self
.delete_force(self
.ldb_admin
, self
.get_user_dn(self
.user_with_group_sm
))
364 self
.delete_force(self
.ldb_admin
, self
.get_user_dn("test_modify_user2"))
366 def test_modify_u1(self
):
367 """5 Modify one attribute if you have DS_WRITE_PROPERTY for it"""
368 mod
= "(OA;;WP;bf967953-0de6-11d0-a285-00aa003049e2;;%s)" % str(self
.user_sid
)
369 # First test object -- User
370 print "Testing modify on User object"
371 self
.create_test_user(self
.ldb_admin
, self
.get_user_dn("test_modify_user1"))
372 self
.dacl_add_ace(self
.get_user_dn("test_modify_user1"), mod
)
374 dn: """ + self
.get_user_dn("test_modify_user1") + """
377 displayName: test_changed"""
378 self
.ldb_user
.modify_ldif(ldif
)
379 res
= self
.ldb_admin
.search(self
.base_dn
,
380 expression
="(distinguishedName=%s)" % self
.get_user_dn("test_modify_user1"))
381 self
.assertEqual(res
[0]["displayName"][0], "test_changed")
382 # Second test object -- Group
383 print "Testing modify on Group object"
384 self
.create_group(self
.ldb_admin
, "CN=test_modify_group1,CN=Users," + self
.base_dn
)
385 self
.dacl_add_ace("CN=test_modify_group1,CN=Users," + self
.base_dn
, mod
)
387 dn: CN=test_modify_group1,CN=Users,""" + self
.base_dn
+ """
390 displayName: test_changed"""
391 self
.ldb_user
.modify_ldif(ldif
)
392 res
= self
.ldb_admin
.search(self
.base_dn
, expression
="(distinguishedName=%s)" % str("CN=test_modify_group1,CN=Users," + self
.base_dn
))
393 self
.assertEqual(res
[0]["displayName"][0], "test_changed")
394 # Third test object -- Organizational Unit
395 print "Testing modify on OU object"
396 #self.delete_force(self.ldb_admin, "OU=test_modify_ou1," + self.base_dn)
397 self
.ldb_admin
.create_ou("OU=test_modify_ou1," + self
.base_dn
)
398 self
.dacl_add_ace("OU=test_modify_ou1," + self
.base_dn
, mod
)
400 dn: OU=test_modify_ou1,""" + self
.base_dn
+ """
403 displayName: test_changed"""
404 self
.ldb_user
.modify_ldif(ldif
)
405 res
= self
.ldb_admin
.search(self
.base_dn
, expression
="(distinguishedName=%s)" % str("OU=test_modify_ou1," + self
.base_dn
))
406 self
.assertEqual(res
[0]["displayName"][0], "test_changed")
408 def test_modify_u2(self
):
409 """6 Modify two attributes as you have DS_WRITE_PROPERTY granted only for one of them"""
410 mod
= "(OA;;WP;bf967953-0de6-11d0-a285-00aa003049e2;;%s)" % str(self
.user_sid
)
411 # First test object -- User
412 print "Testing modify on User object"
413 #self.delete_force(self.ldb_admin, self.get_user_dn("test_modify_user1"))
414 self
.create_test_user(self
.ldb_admin
, self
.get_user_dn("test_modify_user1"))
415 self
.dacl_add_ace(self
.get_user_dn("test_modify_user1"), mod
)
416 # Modify on attribute you have rights for
418 dn: """ + self
.get_user_dn("test_modify_user1") + """
421 displayName: test_changed"""
422 self
.ldb_user
.modify_ldif(ldif
)
423 res
= self
.ldb_admin
.search(self
.base_dn
,
424 expression
="(distinguishedName=%s)" %
425 self
.get_user_dn("test_modify_user1"))
426 self
.assertEqual(res
[0]["displayName"][0], "test_changed")
427 # Modify on attribute you do not have rights for granted
429 dn: """ + self
.get_user_dn("test_modify_user1") + """
432 url: www.samba.org"""
434 self
.ldb_user
.modify_ldif(ldif
)
435 except LdbError
, (num
, _
):
436 self
.assertEquals(num
, ERR_INSUFFICIENT_ACCESS_RIGHTS
)
438 # This 'modify' operation should always throw ERR_INSUFFICIENT_ACCESS_RIGHTS
440 # Second test object -- Group
441 print "Testing modify on Group object"
442 self
.create_group(self
.ldb_admin
, "CN=test_modify_group1,CN=Users," + self
.base_dn
)
443 self
.dacl_add_ace("CN=test_modify_group1,CN=Users," + self
.base_dn
, mod
)
445 dn: CN=test_modify_group1,CN=Users,""" + self
.base_dn
+ """
448 displayName: test_changed"""
449 self
.ldb_user
.modify_ldif(ldif
)
450 res
= self
.ldb_admin
.search(self
.base_dn
,
451 expression
="(distinguishedName=%s)" %
452 str("CN=test_modify_group1,CN=Users," + self
.base_dn
))
453 self
.assertEqual(res
[0]["displayName"][0], "test_changed")
454 # Modify on attribute you do not have rights for granted
456 dn: CN=test_modify_group1,CN=Users,""" + self
.base_dn
+ """
459 url: www.samba.org"""
461 self
.ldb_user
.modify_ldif(ldif
)
462 except LdbError
, (num
, _
):
463 self
.assertEquals(num
, ERR_INSUFFICIENT_ACCESS_RIGHTS
)
465 # This 'modify' operation should always throw ERR_INSUFFICIENT_ACCESS_RIGHTS
467 # Second test object -- Organizational Unit
468 print "Testing modify on OU object"
469 self
.ldb_admin
.create_ou("OU=test_modify_ou1," + self
.base_dn
)
470 self
.dacl_add_ace("OU=test_modify_ou1," + self
.base_dn
, mod
)
472 dn: OU=test_modify_ou1,""" + self
.base_dn
+ """
475 displayName: test_changed"""
476 self
.ldb_user
.modify_ldif(ldif
)
477 res
= self
.ldb_admin
.search(self
.base_dn
,
478 expression
="(distinguishedName=%s)" % str("OU=test_modify_ou1,"
480 self
.assertEqual(res
[0]["displayName"][0], "test_changed")
481 # Modify on attribute you do not have rights for granted
483 dn: OU=test_modify_ou1,""" + self
.base_dn
+ """
486 url: www.samba.org"""
488 self
.ldb_user
.modify_ldif(ldif
)
489 except LdbError
, (num
, _
):
490 self
.assertEquals(num
, ERR_INSUFFICIENT_ACCESS_RIGHTS
)
492 # This 'modify' operation should always throw ERR_INSUFFICIENT_ACCESS_RIGHTS
495 def test_modify_u3(self
):
496 """7 Modify one attribute as you have no what so ever rights granted"""
497 # First test object -- User
498 print "Testing modify on User object"
499 self
.create_test_user(self
.ldb_admin
, self
.get_user_dn("test_modify_user1"))
500 # Modify on attribute you do not have rights for granted
502 dn: """ + self
.get_user_dn("test_modify_user1") + """
505 url: www.samba.org"""
507 self
.ldb_user
.modify_ldif(ldif
)
508 except LdbError
, (num
, _
):
509 self
.assertEquals(num
, ERR_INSUFFICIENT_ACCESS_RIGHTS
)
511 # This 'modify' operation should always throw ERR_INSUFFICIENT_ACCESS_RIGHTS
514 # Second test object -- Group
515 print "Testing modify on Group object"
516 self
.create_group(self
.ldb_admin
, "CN=test_modify_group1,CN=Users," + self
.base_dn
)
517 # Modify on attribute you do not have rights for granted
519 dn: CN=test_modify_group1,CN=Users,""" + self
.base_dn
+ """
522 url: www.samba.org"""
524 self
.ldb_user
.modify_ldif(ldif
)
525 except LdbError
, (num
, _
):
526 self
.assertEquals(num
, ERR_INSUFFICIENT_ACCESS_RIGHTS
)
528 # This 'modify' operation should always throw ERR_INSUFFICIENT_ACCESS_RIGHTS
531 # Second test object -- Organizational Unit
532 print "Testing modify on OU object"
533 #self.delete_force(self.ldb_admin, "OU=test_modify_ou1," + self.base_dn)
534 self
.ldb_admin
.create_ou("OU=test_modify_ou1," + self
.base_dn
)
535 # Modify on attribute you do not have rights for granted
537 dn: OU=test_modify_ou1,""" + self
.base_dn
+ """
540 url: www.samba.org"""
542 self
.ldb_user
.modify_ldif(ldif
)
543 except LdbError
, (num
, _
):
544 self
.assertEquals(num
, ERR_INSUFFICIENT_ACCESS_RIGHTS
)
546 # This 'modify' operation should always throw ERR_INSUFFICIENT_ACCESS_RIGHTS
550 def test_modify_u4(self
):
551 """11 Grant WP to PRINCIPAL_SELF and test modify"""
553 dn: """ + self
.get_user_dn(self
.user_with_wp
) + """
555 add: adminDescription
556 adminDescription: blah blah blah"""
558 self
.ldb_user
.modify_ldif(ldif
)
559 except LdbError
, (num
, _
):
560 self
.assertEquals(num
, ERR_INSUFFICIENT_ACCESS_RIGHTS
)
562 # This 'modify' operation should always throw ERR_INSUFFICIENT_ACCESS_RIGHTS
565 mod
= "(OA;;WP;bf967919-0de6-11d0-a285-00aa003049e2;;PS)"
566 self
.dacl_add_ace(self
.get_user_dn(self
.user_with_wp
), mod
)
567 # Modify on attribute you have rights for
568 self
.ldb_user
.modify_ldif(ldif
)
569 res
= self
.ldb_admin
.search(self
.base_dn
, expression
="(distinguishedName=%s)" \
570 % self
.get_user_dn(self
.user_with_wp
), attrs
=["adminDescription"] )
571 self
.assertEqual(res
[0]["adminDescription"][0], "blah blah blah")
573 def test_modify_u5(self
):
574 """12 test self membership"""
576 dn: CN=test_modify_group2,CN=Users,""" + self
.base_dn
+ """
579 Member: """ + self
.get_user_dn(self
.user_with_sm
)
580 #the user has no rights granted, this should fail
582 self
.ldb_user2
.modify_ldif(ldif
)
583 except LdbError
, (num
, _
):
584 self
.assertEquals(num
, ERR_INSUFFICIENT_ACCESS_RIGHTS
)
586 # This 'modify' operation should always throw ERR_INSUFFICIENT_ACCESS_RIGHTS
589 #grant self-membership, should be able to add himself
590 user_sid
= self
.get_object_sid(self
.get_user_dn(self
.user_with_sm
))
591 mod
= "(OA;;SW;bf9679c0-0de6-11d0-a285-00aa003049e2;;%s)" % str(user_sid
)
592 self
.dacl_add_ace("CN=test_modify_group2,CN=Users," + self
.base_dn
, mod
)
593 self
.ldb_user2
.modify_ldif(ldif
)
594 res
= self
.ldb_admin
.search( self
.base_dn
, expression
="(distinguishedName=%s)" \
595 % ("CN=test_modify_group2,CN=Users," + self
.base_dn
), attrs
=["Member"])
596 self
.assertEqual(res
[0]["Member"][0], self
.get_user_dn(self
.user_with_sm
))
599 dn: CN=test_modify_group2,CN=Users,""" + self
.base_dn
+ """
602 Member: CN=test_modify_user2,CN=Users,""" + self
.base_dn
604 self
.ldb_user2
.modify_ldif(ldif
)
605 except LdbError
, (num
, _
):
606 self
.assertEquals(num
, ERR_INSUFFICIENT_ACCESS_RIGHTS
)
610 def test_modify_u6(self
):
611 """13 test self membership"""
613 dn: CN=test_modify_group2,CN=Users,""" + self
.base_dn
+ """
616 Member: """ + self
.get_user_dn(self
.user_with_sm
) + """
617 Member: CN=test_modify_user2,CN=Users,""" + self
.base_dn
619 #grant self-membership, should be able to add himself but not others at the same time
620 user_sid
= self
.get_object_sid(self
.get_user_dn(self
.user_with_sm
))
621 mod
= "(OA;;SW;bf9679c0-0de6-11d0-a285-00aa003049e2;;%s)" % str(user_sid
)
622 self
.dacl_add_ace("CN=test_modify_group2,CN=Users," + self
.base_dn
, mod
)
624 self
.ldb_user2
.modify_ldif(ldif
)
625 except LdbError
, (num
, _
):
626 self
.assertEquals(num
, ERR_INSUFFICIENT_ACCESS_RIGHTS
)
630 def test_modify_u7(self
):
631 """13 User with WP modifying Member"""
632 #a second user is given write property permission
633 user_sid
= self
.get_object_sid(self
.get_user_dn(self
.user_with_wp
))
634 mod
= "(A;;WP;;;%s)" % str(user_sid
)
635 self
.dacl_add_ace("CN=test_modify_group2,CN=Users," + self
.base_dn
, mod
)
637 dn: CN=test_modify_group2,CN=Users,""" + self
.base_dn
+ """
640 Member: """ + self
.get_user_dn(self
.user_with_wp
)
641 self
.ldb_user
.modify_ldif(ldif
)
642 res
= self
.ldb_admin
.search( self
.base_dn
, expression
="(distinguishedName=%s)" \
643 % ("CN=test_modify_group2,CN=Users," + self
.base_dn
), attrs
=["Member"])
644 self
.assertEqual(res
[0]["Member"][0], self
.get_user_dn(self
.user_with_wp
))
646 dn: CN=test_modify_group2,CN=Users,""" + self
.base_dn
+ """
649 self
.ldb_user
.modify_ldif(ldif
)
651 dn: CN=test_modify_group2,CN=Users,""" + self
.base_dn
+ """
654 Member: CN=test_modify_user2,CN=Users,""" + self
.base_dn
655 self
.ldb_user
.modify_ldif(ldif
)
656 res
= self
.ldb_admin
.search( self
.base_dn
, expression
="(distinguishedName=%s)" \
657 % ("CN=test_modify_group2,CN=Users," + self
.base_dn
), attrs
=["Member"])
658 self
.assertEqual(res
[0]["Member"][0], "CN=test_modify_user2,CN=Users," + self
.base_dn
)
660 #enable these when we have search implemented
661 class AclSearchTests(AclTests
):
664 super(AclSearchTests
, self
).setUp()
665 self
.u1
= "search_u1"
666 self
.u2
= "search_u2"
667 self
.u3
= "search_u3"
668 self
.group1
= "group1"
669 self
.creds_tmp
= Credentials()
670 self
.creds_tmp
.set_username("")
671 self
.creds_tmp
.set_password("")
672 self
.creds_tmp
.set_domain(creds
.get_domain())
673 self
.creds_tmp
.set_realm(creds
.get_realm())
674 self
.creds_tmp
.set_workstation(creds
.get_workstation())
675 self
.anonymous
= SamDB(url
=host
, credentials
=self
.creds_tmp
, lp
=lp
)
676 self
.dsheuristics
= self
.ldb_admin
.get_dsheuristics()
677 self
.create_enable_user(self
.u1
)
678 self
.create_enable_user(self
.u2
)
679 self
.create_enable_user(self
.u3
)
680 self
.create_security_group(self
.ldb_admin
, self
.get_user_dn(self
.group1
))
681 self
.ldb_admin
.add_remove_group_members(self
.group1
, self
.u2
,
682 add_members_operation
=True)
683 self
.ldb_user
= self
.get_ldb_connection(self
.u1
, self
.user_pass
)
684 self
.ldb_user2
= self
.get_ldb_connection(self
.u2
, self
.user_pass
)
685 self
.ldb_user3
= self
.get_ldb_connection(self
.u3
, self
.user_pass
)
686 self
.full_list
= [Dn(self
.ldb_admin
, "OU=ou2,OU=ou1," + self
.base_dn
),
687 Dn(self
.ldb_admin
, "OU=ou1," + self
.base_dn
),
688 Dn(self
.ldb_admin
, "OU=ou3,OU=ou2,OU=ou1," + self
.base_dn
),
689 Dn(self
.ldb_admin
, "OU=ou4,OU=ou2,OU=ou1," + self
.base_dn
),
690 Dn(self
.ldb_admin
, "OU=ou5,OU=ou3,OU=ou2,OU=ou1," + self
.base_dn
),
691 Dn(self
.ldb_admin
, "OU=ou6,OU=ou4,OU=ou2,OU=ou1," + self
.base_dn
)]
692 self
.user_sid
= self
.get_object_sid(self
.get_user_dn(self
.u1
))
693 self
.group_sid
= self
.get_object_sid(self
.get_user_dn(self
.group1
))
695 def create_clean_ou(self
, object_dn
):
696 """ Base repeating setup for unittests to follow """
697 res
= self
.ldb_admin
.search(base
=self
.base_dn
, scope
=SCOPE_SUBTREE
, \
698 expression
="distinguishedName=%s" % object_dn
)
699 # Make sure top testing OU has been deleted before starting the test
700 self
.assertEqual(res
, [])
701 self
.ldb_admin
.create_ou(object_dn
)
702 desc_sddl
= self
.get_desc_sddl(object_dn
)
703 # Make sure there are inheritable ACEs initially
704 self
.assertTrue("CI" in desc_sddl
or "OI" in desc_sddl
)
705 # Find and remove all inherit ACEs
706 res
= re
.findall("\(.*?\)", desc_sddl
)
707 res
= [x
for x
in res
if ("CI" in x
) or ("OI" in x
)]
709 desc_sddl
= desc_sddl
.replace(x
, "")
710 # Add flag 'protected' in both DACL and SACL so no inherit ACEs
711 # can propagate from above
712 # remove SACL, we are not interested
713 desc_sddl
= desc_sddl
.replace(":AI", ":AIP")
714 self
.modify_desc(object_dn
, desc_sddl
)
715 # Verify all inheritable ACEs are gone
716 desc_sddl
= self
.get_desc_sddl(object_dn
)
717 self
.assertFalse("CI" in desc_sddl
)
718 self
.assertFalse("OI" in desc_sddl
)
721 super(AclSearchTests
, self
).tearDown()
722 self
.delete_force(self
.ldb_admin
, "OU=test_search_ou2,OU=test_search_ou1," + self
.base_dn
)
723 self
.delete_force(self
.ldb_admin
, "OU=test_search_ou1," + self
.base_dn
)
724 self
.delete_force(self
.ldb_admin
, "OU=ou6,OU=ou4,OU=ou2,OU=ou1," + self
.base_dn
)
725 self
.delete_force(self
.ldb_admin
, "OU=ou5,OU=ou3,OU=ou2,OU=ou1," + self
.base_dn
)
726 self
.delete_force(self
.ldb_admin
, "OU=ou4,OU=ou2,OU=ou1," + self
.base_dn
)
727 self
.delete_force(self
.ldb_admin
, "OU=ou3,OU=ou2,OU=ou1," + self
.base_dn
)
728 self
.delete_force(self
.ldb_admin
, "OU=ou2,OU=ou1," + self
.base_dn
)
729 self
.delete_force(self
.ldb_admin
, "OU=ou1," + self
.base_dn
)
730 self
.delete_force(self
.ldb_admin
, self
.get_user_dn("search_u1"))
731 self
.delete_force(self
.ldb_admin
, self
.get_user_dn("search_u2"))
732 self
.delete_force(self
.ldb_admin
, self
.get_user_dn("search_u3"))
733 self
.delete_force(self
.ldb_admin
, self
.get_user_dn("group1"))
735 def test_search_anonymous1(self
):
736 """Verify access of rootDSE with the correct request"""
737 res
= self
.anonymous
.search("", expression
="(objectClass=*)", scope
=SCOPE_BASE
)
738 self
.assertEquals(len(res
), 1)
739 #verify some of the attributes
740 #dont care about values
741 self
.assertTrue("ldapServiceName" in res
[0])
742 self
.assertTrue("namingContexts" in res
[0])
743 self
.assertTrue("isSynchronized" in res
[0])
744 self
.assertTrue("dsServiceName" in res
[0])
745 self
.assertTrue("supportedSASLMechanisms" in res
[0])
746 self
.assertTrue("isGlobalCatalogReady" in res
[0])
747 self
.assertTrue("domainControllerFunctionality" in res
[0])
748 self
.assertTrue("serverName" in res
[0])
750 def test_search_anonymous2(self
):
751 """Make sure we cannot access anything else"""
753 res
= self
.anonymous
.search("", expression
="(objectClass=*)", scope
=SCOPE_SUBTREE
)
754 except LdbError
, (num
, _
):
755 self
.assertEquals(num
, ERR_OPERATIONS_ERROR
)
759 res
= self
.anonymous
.search(self
.base_dn
, expression
="(objectClass=*)", scope
=SCOPE_SUBTREE
)
760 except LdbError
, (num
, _
):
761 self
.assertEquals(num
, ERR_OPERATIONS_ERROR
)
765 res
= self
.anonymous
.search("CN=Configuration," + self
.base_dn
, expression
="(objectClass=*)",
767 except LdbError
, (num
, _
):
768 self
.assertEquals(num
, ERR_OPERATIONS_ERROR
)
772 def test_search_anonymous3(self
):
773 """Set dsHeuristics and repeat"""
774 self
.ldb_admin
.set_dsheuristics("0000002")
775 self
.ldb_admin
.create_ou("OU=test_search_ou1," + self
.base_dn
)
776 mod
= "(A;CI;LC;;;AN)"
777 self
.dacl_add_ace("OU=test_search_ou1," + self
.base_dn
, mod
)
778 self
.ldb_admin
.create_ou("OU=test_search_ou2,OU=test_search_ou1," + self
.base_dn
)
779 res
= self
.anonymous
.search("OU=test_search_ou2,OU=test_search_ou1," + self
.base_dn
,
780 expression
="(objectClass=*)", scope
=SCOPE_SUBTREE
)
781 self
.assertEquals(len(res
), 1)
782 self
.assertTrue("dn" in res
[0])
783 self
.assertTrue(res
[0]["dn"] == Dn(self
.ldb_admin
,
784 "OU=test_search_ou2,OU=test_search_ou1," + self
.base_dn
))
785 res
= self
.anonymous
.search("CN=Configuration," + self
.base_dn
, expression
="(objectClass=*)",
787 self
.assertEquals(len(res
), 1)
788 self
.assertTrue("dn" in res
[0])
789 self
.assertTrue(res
[0]["dn"] == Dn(self
.ldb_admin
, self
.configuration_dn
))
790 self
.ldb_admin
.set_dsheuristics(self
.dsheuristics
)
792 def test_search1(self
):
793 """Make sure users can see us if given LC to user and group"""
794 self
.create_clean_ou("OU=ou1," + self
.base_dn
)
795 mod
= "(A;;LC;;;%s)(A;;LC;;;%s)" % (str(self
.user_sid
), str(self
.group_sid
))
796 self
.dacl_add_ace("OU=ou1," + self
.base_dn
, mod
)
797 self
.ldb_admin
.create_ou("OU=ou2,OU=ou1," + self
.base_dn
,
798 "D:(A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;DA)" + mod
)
799 self
.ldb_admin
.create_ou("OU=ou3,OU=ou2,OU=ou1," + self
.base_dn
,
800 "D:(A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;DA)" + mod
)
801 self
.ldb_admin
.create_ou("OU=ou4,OU=ou2,OU=ou1," + self
.base_dn
,
802 "D:(A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;DA)" + mod
)
803 self
.ldb_admin
.create_ou("OU=ou5,OU=ou3,OU=ou2,OU=ou1," + self
.base_dn
,
804 "D:(A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;DA)" + mod
)
805 self
.ldb_admin
.create_ou("OU=ou6,OU=ou4,OU=ou2,OU=ou1," + self
.base_dn
,
806 "D:(A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;DA)" + mod
)
808 #regular users must see only ou1 and ou2
809 res
= self
.ldb_user3
.search("OU=ou1," + self
.base_dn
, expression
="(objectClass=*)",
811 self
.assertEquals(len(res
), 2)
812 ok_list
= [Dn(self
.ldb_admin
, "OU=ou2,OU=ou1," + self
.base_dn
),
813 Dn(self
.ldb_admin
, "OU=ou1," + self
.base_dn
)]
815 res_list
= [ x
["dn"] for x
in res
if x
["dn"] in ok_list
]
816 self
.assertEquals(sorted(res_list
), sorted(ok_list
))
818 #these users should see all ous
819 res
= self
.ldb_user
.search("OU=ou1," + self
.base_dn
, expression
="(objectClass=*)",
821 self
.assertEquals(len(res
), 6)
822 res_list
= [ x
["dn"] for x
in res
if x
["dn"] in self
.full_list
]
823 self
.assertEquals(sorted(res_list
), sorted(self
.full_list
))
825 res
= self
.ldb_user2
.search("OU=ou1," + self
.base_dn
, expression
="(objectClass=*)",
827 self
.assertEquals(len(res
), 6)
828 res_list
= [ x
["dn"] for x
in res
if x
["dn"] in self
.full_list
]
829 self
.assertEquals(sorted(res_list
), sorted(self
.full_list
))
831 def test_search2(self
):
832 """Make sure users can't see us if access is explicitly denied"""
833 self
.create_clean_ou("OU=ou1," + self
.base_dn
)
834 self
.ldb_admin
.create_ou("OU=ou2,OU=ou1," + self
.base_dn
)
835 self
.ldb_admin
.create_ou("OU=ou3,OU=ou2,OU=ou1," + self
.base_dn
)
836 self
.ldb_admin
.create_ou("OU=ou4,OU=ou2,OU=ou1," + self
.base_dn
)
837 self
.ldb_admin
.create_ou("OU=ou5,OU=ou3,OU=ou2,OU=ou1," + self
.base_dn
)
838 self
.ldb_admin
.create_ou("OU=ou6,OU=ou4,OU=ou2,OU=ou1," + self
.base_dn
)
839 mod
= "(D;;LC;;;%s)(D;;LC;;;%s)" % (str(self
.user_sid
), str(self
.group_sid
))
840 self
.dacl_add_ace("OU=ou2,OU=ou1," + self
.base_dn
, mod
)
841 res
= self
.ldb_user3
.search("OU=ou1," + self
.base_dn
, expression
="(objectClass=*)",
843 #this user should see all ous
844 res_list
= [ x
["dn"] for x
in res
if x
["dn"] in self
.full_list
]
845 self
.assertEquals(sorted(res_list
), sorted(self
.full_list
))
847 #these users should see ou1, 2, 5 and 6 but not 3 and 4
848 res
= self
.ldb_user
.search("OU=ou1," + self
.base_dn
, expression
="(objectClass=*)",
850 ok_list
= [Dn(self
.ldb_admin
, "OU=ou2,OU=ou1," + self
.base_dn
),
851 Dn(self
.ldb_admin
, "OU=ou1," + self
.base_dn
),
852 Dn(self
.ldb_admin
, "OU=ou5,OU=ou3,OU=ou2,OU=ou1," + self
.base_dn
),
853 Dn(self
.ldb_admin
, "OU=ou6,OU=ou4,OU=ou2,OU=ou1," + self
.base_dn
)]
854 res_list
= [ x
["dn"] for x
in res
if x
["dn"] in ok_list
]
855 self
.assertEquals(sorted(res_list
), sorted(ok_list
))
857 res
= self
.ldb_user2
.search("OU=ou1," + self
.base_dn
, expression
="(objectClass=*)",
859 self
.assertEquals(len(res
), 4)
860 res_list
= [ x
["dn"] for x
in res
if x
["dn"] in ok_list
]
861 self
.assertEquals(sorted(res_list
), sorted(ok_list
))
863 def test_search3(self
):
864 """Make sure users can't see ous if access is explicitly denied - 2"""
865 self
.create_clean_ou("OU=ou1," + self
.base_dn
)
866 mod
= "(A;CI;LC;;;%s)(A;CI;LC;;;%s)" % (str(self
.user_sid
), str(self
.group_sid
))
867 self
.dacl_add_ace("OU=ou1," + self
.base_dn
, mod
)
868 self
.ldb_admin
.create_ou("OU=ou2,OU=ou1," + self
.base_dn
,
869 "D:(A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;DA)")
870 self
.ldb_admin
.create_ou("OU=ou3,OU=ou2,OU=ou1," + self
.base_dn
,
871 "D:(A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;DA)")
872 self
.ldb_admin
.create_ou("OU=ou4,OU=ou2,OU=ou1," + self
.base_dn
,
873 "D:(A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;DA)")
874 self
.ldb_admin
.create_ou("OU=ou5,OU=ou3,OU=ou2,OU=ou1," + self
.base_dn
,
875 "D:(A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;DA)")
876 self
.ldb_admin
.create_ou("OU=ou6,OU=ou4,OU=ou2,OU=ou1," + self
.base_dn
,
877 "D:(A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;DA)")
879 print "Testing correct behavior on nonaccessible search base"
881 self
.ldb_user3
.search("OU=ou3,OU=ou2,OU=ou1," + self
.base_dn
, expression
="(objectClass=*)",
883 except LdbError
, (num
, _
):
884 self
.assertEquals(num
, ERR_NO_SUCH_OBJECT
)
888 mod
= "(D;;LC;;;%s)(D;;LC;;;%s)" % (str(self
.user_sid
), str(self
.group_sid
))
889 self
.dacl_add_ace("OU=ou2,OU=ou1," + self
.base_dn
, mod
)
891 ok_list
= [Dn(self
.ldb_admin
, "OU=ou2,OU=ou1," + self
.base_dn
),
892 Dn(self
.ldb_admin
, "OU=ou1," + self
.base_dn
)]
894 res
= self
.ldb_user3
.search("OU=ou1," + self
.base_dn
, expression
="(objectClass=*)",
896 res_list
= [ x
["dn"] for x
in res
if x
["dn"] in ok_list
]
897 self
.assertEquals(sorted(res_list
), sorted(ok_list
))
899 ok_list
= [Dn(self
.ldb_admin
, "OU=ou2,OU=ou1," + self
.base_dn
),
900 Dn(self
.ldb_admin
, "OU=ou1," + self
.base_dn
),
901 Dn(self
.ldb_admin
, "OU=ou5,OU=ou3,OU=ou2,OU=ou1," + self
.base_dn
),
902 Dn(self
.ldb_admin
, "OU=ou6,OU=ou4,OU=ou2,OU=ou1," + self
.base_dn
)]
904 #should not see ou3 and ou4, but should see ou5 and ou6
905 res
= self
.ldb_user
.search("OU=ou1," + self
.base_dn
, expression
="(objectClass=*)",
907 self
.assertEquals(len(res
), 4)
908 res_list
= [ x
["dn"] for x
in res
if x
["dn"] in ok_list
]
909 self
.assertEquals(sorted(res_list
), sorted(ok_list
))
911 res
= self
.ldb_user2
.search("OU=ou1," + self
.base_dn
, expression
="(objectClass=*)",
913 self
.assertEquals(len(res
), 4)
914 res_list
= [ x
["dn"] for x
in res
if x
["dn"] in ok_list
]
915 self
.assertEquals(sorted(res_list
), sorted(ok_list
))
917 def test_search4(self
):
918 """There is no difference in visibility if the user is also creator"""
919 self
.create_clean_ou("OU=ou1," + self
.base_dn
)
920 mod
= "(A;CI;CC;;;%s)" % (str(self
.user_sid
))
921 self
.dacl_add_ace("OU=ou1," + self
.base_dn
, mod
)
922 self
.ldb_user
.create_ou("OU=ou2,OU=ou1," + self
.base_dn
,
923 "D:(A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;DA)")
924 self
.ldb_user
.create_ou("OU=ou3,OU=ou2,OU=ou1," + self
.base_dn
,
925 "D:(A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;DA)")
926 self
.ldb_user
.create_ou("OU=ou4,OU=ou2,OU=ou1," + self
.base_dn
,
927 "D:(A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;DA)")
928 self
.ldb_user
.create_ou("OU=ou5,OU=ou3,OU=ou2,OU=ou1," + self
.base_dn
,
929 "D:(A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;DA)")
930 self
.ldb_user
.create_ou("OU=ou6,OU=ou4,OU=ou2,OU=ou1," + self
.base_dn
,
931 "D:(A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;DA)")
933 ok_list
= [Dn(self
.ldb_admin
, "OU=ou2,OU=ou1," + self
.base_dn
),
934 Dn(self
.ldb_admin
, "OU=ou1," + self
.base_dn
)]
935 res
= self
.ldb_user3
.search("OU=ou1," + self
.base_dn
, expression
="(objectClass=*)",
937 self
.assertEquals(len(res
), 2)
938 res_list
= [ x
["dn"] for x
in res
if x
["dn"] in ok_list
]
939 self
.assertEquals(sorted(res_list
), sorted(ok_list
))
941 res
= self
.ldb_user
.search("OU=ou1," + self
.base_dn
, expression
="(objectClass=*)",
943 self
.assertEquals(len(res
), 2)
944 res_list
= [ x
["dn"] for x
in res
if x
["dn"] in ok_list
]
945 self
.assertEquals(sorted(res_list
), sorted(ok_list
))
947 def test_search5(self
):
948 """Make sure users can see only attributes they are allowed to see"""
949 self
.create_clean_ou("OU=ou1," + self
.base_dn
)
950 mod
= "(A;CI;LC;;;%s)" % (str(self
.user_sid
))
951 self
.dacl_add_ace("OU=ou1," + self
.base_dn
, mod
)
952 self
.ldb_admin
.create_ou("OU=ou2,OU=ou1," + self
.base_dn
,
953 "D:(A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;DA)" + mod
)
954 # assert user can only see dn
955 res
= self
.ldb_user
.search("OU=ou2,OU=ou1," + self
.base_dn
, expression
="(objectClass=*)",
958 self
.assertEquals(len(res
), 1)
959 res_list
= res
[0].keys()
960 self
.assertEquals(res_list
, ok_list
)
962 res
= self
.ldb_user
.search("OU=ou2,OU=ou1," + self
.base_dn
, expression
="(objectClass=*)",
963 scope
=SCOPE_BASE
, attrs
=["ou"])
965 self
.assertEquals(len(res
), 1)
966 res_list
= res
[0].keys()
967 self
.assertEquals(res_list
, ok_list
)
969 #give read property on ou and assert user can only see dn and ou
970 mod
= "(OA;;RP;bf9679f0-0de6-11d0-a285-00aa003049e2;;%s)" % (str(self
.user_sid
))
971 self
.dacl_add_ace("OU=ou1," + self
.base_dn
, mod
)
972 self
.dacl_add_ace("OU=ou2,OU=ou1," + self
.base_dn
, mod
)
973 res
= self
.ldb_user
.search("OU=ou2,OU=ou1," + self
.base_dn
, expression
="(objectClass=*)",
975 ok_list
= ['dn', 'ou']
976 self
.assertEquals(len(res
), 1)
977 res_list
= res
[0].keys()
978 self
.assertEquals(sorted(res_list
), sorted(ok_list
))
980 #give read property on Public Information and assert user can see ou and other members
981 mod
= "(OA;;RP;e48d0154-bcf8-11d1-8702-00c04fb96050;;%s)" % (str(self
.user_sid
))
982 self
.dacl_add_ace("OU=ou1," + self
.base_dn
, mod
)
983 self
.dacl_add_ace("OU=ou2,OU=ou1," + self
.base_dn
, mod
)
984 res
= self
.ldb_user
.search("OU=ou2,OU=ou1," + self
.base_dn
, expression
="(objectClass=*)",
987 ok_list
= ['dn', 'objectClass', 'ou', 'distinguishedName', 'name', 'objectGUID', 'objectCategory']
988 res_list
= res
[0].keys()
989 self
.assertEquals(sorted(res_list
), sorted(ok_list
))
991 def test_search6(self
):
992 """If an attribute that cannot be read is used in a filter, it is as if the attribute does not exist"""
993 self
.create_clean_ou("OU=ou1," + self
.base_dn
)
994 mod
= "(A;CI;LCCC;;;%s)" % (str(self
.user_sid
))
995 self
.dacl_add_ace("OU=ou1," + self
.base_dn
, mod
)
996 self
.ldb_admin
.create_ou("OU=ou2,OU=ou1," + self
.base_dn
,
997 "D:(A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;DA)" + mod
)
998 self
.ldb_user
.create_ou("OU=ou3,OU=ou2,OU=ou1," + self
.base_dn
,
999 "D:(A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;DA)")
1001 res
= self
.ldb_user
.search("OU=ou1," + self
.base_dn
, expression
="(ou=ou3)",
1002 scope
=SCOPE_SUBTREE
)
1003 #nothing should be returned as ou is not accessible
1004 self
.assertEquals(res
, [])
1006 #give read property on ou and assert user can only see dn and ou
1007 mod
= "(OA;;RP;bf9679f0-0de6-11d0-a285-00aa003049e2;;%s)" % (str(self
.user_sid
))
1008 self
.dacl_add_ace("OU=ou3,OU=ou2,OU=ou1," + self
.base_dn
, mod
)
1009 res
= self
.ldb_user
.search("OU=ou1," + self
.base_dn
, expression
="(ou=ou3)",
1010 scope
=SCOPE_SUBTREE
)
1011 self
.assertEquals(len(res
), 1)
1012 ok_list
= ['dn', 'ou']
1013 res_list
= res
[0].keys()
1014 self
.assertEquals(sorted(res_list
), sorted(ok_list
))
1016 #give read property on Public Information and assert user can see ou and other members
1017 mod
= "(OA;;RP;e48d0154-bcf8-11d1-8702-00c04fb96050;;%s)" % (str(self
.user_sid
))
1018 self
.dacl_add_ace("OU=ou2,OU=ou1," + self
.base_dn
, mod
)
1019 res
= self
.ldb_user
.search("OU=ou1," + self
.base_dn
, expression
="(ou=ou2)",
1020 scope
=SCOPE_SUBTREE
)
1021 self
.assertEquals(len(res
), 1)
1022 ok_list
= ['dn', 'objectClass', 'ou', 'distinguishedName', 'name', 'objectGUID', 'objectCategory']
1023 res_list
= res
[0].keys()
1024 self
.assertEquals(sorted(res_list
), sorted(ok_list
))
1026 #tests on ldap delete operations
1027 class AclDeleteTests(AclTests
):
1030 super(AclDeleteTests
, self
).setUp()
1031 self
.regular_user
= "acl_delete_user1"
1032 # Create regular user
1033 self
.create_enable_user(self
.regular_user
)
1034 self
.ldb_user
= self
.get_ldb_connection(self
.regular_user
, self
.user_pass
)
1037 super(AclDeleteTests
, self
).tearDown()
1038 self
.delete_force(self
.ldb_admin
, self
.get_user_dn("test_delete_user1"))
1039 self
.delete_force(self
.ldb_admin
, self
.get_user_dn(self
.regular_user
))
1041 def test_delete_u1(self
):
1042 """User is prohibited by default to delete another User object"""
1043 # Create user that we try to delete
1044 self
.create_test_user(self
.ldb_admin
, self
.get_user_dn("test_delete_user1"))
1045 # Here delete User object should ALWAYS through exception
1047 self
.ldb_user
.delete(self
.get_user_dn("test_delete_user1"))
1048 except LdbError
, (num
, _
):
1049 self
.assertEquals(num
, ERR_INSUFFICIENT_ACCESS_RIGHTS
)
1053 def test_delete_u2(self
):
1054 """User's group has RIGHT_DELETE to another User object"""
1055 user_dn
= self
.get_user_dn("test_delete_user1")
1056 # Create user that we try to delete
1057 self
.create_test_user(self
.ldb_admin
, user_dn
)
1058 mod
= "(A;;SD;;;AU)"
1059 self
.dacl_add_ace(user_dn
, mod
)
1060 # Try to delete User object
1061 self
.ldb_user
.delete(user_dn
)
1062 res
= self
.ldb_admin
.search(self
.base_dn
,
1063 expression
="(distinguishedName=%s)" % user_dn
)
1064 self
.assertEqual(res
, [])
1066 def test_delete_u3(self
):
1067 """User indentified by SID has RIGHT_DELETE to another User object"""
1068 user_dn
= self
.get_user_dn("test_delete_user1")
1069 # Create user that we try to delete
1070 self
.create_test_user(self
.ldb_admin
, user_dn
)
1071 mod
= "(A;;SD;;;%s)" % self
.get_object_sid(self
.get_user_dn(self
.regular_user
))
1072 self
.dacl_add_ace(user_dn
, mod
)
1073 # Try to delete User object
1074 self
.ldb_user
.delete(user_dn
)
1075 res
= self
.ldb_admin
.search(self
.base_dn
,
1076 expression
="(distinguishedName=%s)" % user_dn
)
1077 self
.assertEqual(res
, [])
1079 #tests on ldap rename operations
1080 class AclRenameTests(AclTests
):
1083 super(AclRenameTests
, self
).setUp()
1084 self
.regular_user
= "acl_rename_user1"
1086 # Create regular user
1087 self
.create_enable_user(self
.regular_user
)
1088 self
.ldb_user
= self
.get_ldb_connection(self
.regular_user
, self
.user_pass
)
1091 super(AclRenameTests
, self
).tearDown()
1093 self
.delete_force(self
.ldb_admin
, "CN=test_rename_user1,OU=test_rename_ou3,OU=test_rename_ou2," + self
.base_dn
)
1094 self
.delete_force(self
.ldb_admin
, "CN=test_rename_user2,OU=test_rename_ou3,OU=test_rename_ou2," + self
.base_dn
)
1095 self
.delete_force(self
.ldb_admin
, "CN=test_rename_user5,OU=test_rename_ou3,OU=test_rename_ou2," + self
.base_dn
)
1096 self
.delete_force(self
.ldb_admin
, "OU=test_rename_ou3,OU=test_rename_ou2," + self
.base_dn
)
1098 self
.delete_force(self
.ldb_admin
, "CN=test_rename_user1,OU=test_rename_ou2," + self
.base_dn
)
1099 self
.delete_force(self
.ldb_admin
, "CN=test_rename_user2,OU=test_rename_ou2," + self
.base_dn
)
1100 self
.delete_force(self
.ldb_admin
, "CN=test_rename_user5,OU=test_rename_ou2," + self
.base_dn
)
1101 self
.delete_force(self
.ldb_admin
, "OU=test_rename_ou2," + self
.base_dn
)
1103 self
.delete_force(self
.ldb_admin
, "CN=test_rename_user1,OU=test_rename_ou1," + self
.base_dn
)
1104 self
.delete_force(self
.ldb_admin
, "CN=test_rename_user2,OU=test_rename_ou1," + self
.base_dn
)
1105 self
.delete_force(self
.ldb_admin
, "CN=test_rename_user5,OU=test_rename_ou1," + self
.base_dn
)
1106 self
.delete_force(self
.ldb_admin
, "OU=test_rename_ou3,OU=test_rename_ou1," + self
.base_dn
)
1107 self
.delete_force(self
.ldb_admin
, "OU=test_rename_ou1," + self
.base_dn
)
1108 self
.delete_force(self
.ldb_admin
, self
.get_user_dn(self
.regular_user
))
1110 def test_rename_u1(self
):
1111 """Regular user fails to rename 'User object' within single OU"""
1112 # Create OU structure
1113 self
.ldb_admin
.create_ou("OU=test_rename_ou1," + self
.base_dn
)
1114 self
.create_test_user(self
.ldb_admin
, "CN=test_rename_user1,OU=test_rename_ou1," + self
.base_dn
)
1116 self
.ldb_user
.rename("CN=test_rename_user1,OU=test_rename_ou1," + self
.base_dn
, \
1117 "CN=test_rename_user5,OU=test_rename_ou1," + self
.base_dn
)
1118 except LdbError
, (num
, _
):
1119 self
.assertEquals(num
, ERR_INSUFFICIENT_ACCESS_RIGHTS
)
1123 def test_rename_u2(self
):
1124 """Grant WRITE_PROPERTY to AU so regular user can rename 'User object' within single OU"""
1125 ou_dn
= "OU=test_rename_ou1," + self
.base_dn
1126 user_dn
= "CN=test_rename_user1," + ou_dn
1127 rename_user_dn
= "CN=test_rename_user5," + ou_dn
1128 # Create OU structure
1129 self
.ldb_admin
.create_ou(ou_dn
)
1130 self
.create_test_user(self
.ldb_admin
, user_dn
)
1131 mod
= "(A;;WP;;;AU)"
1132 self
.dacl_add_ace(user_dn
, mod
)
1133 # Rename 'User object' having WP to AU
1134 self
.ldb_user
.rename(user_dn
, rename_user_dn
)
1135 res
= self
.ldb_admin
.search(self
.base_dn
,
1136 expression
="(distinguishedName=%s)" % user_dn
)
1137 self
.assertEqual(res
, [])
1138 res
= self
.ldb_admin
.search(self
.base_dn
,
1139 expression
="(distinguishedName=%s)" % rename_user_dn
)
1140 self
.assertNotEqual(res
, [])
1142 def test_rename_u3(self
):
1143 """Test rename with rights granted to 'User object' SID"""
1144 ou_dn
= "OU=test_rename_ou1," + self
.base_dn
1145 user_dn
= "CN=test_rename_user1," + ou_dn
1146 rename_user_dn
= "CN=test_rename_user5," + ou_dn
1147 # Create OU structure
1148 self
.ldb_admin
.create_ou(ou_dn
)
1149 self
.create_test_user(self
.ldb_admin
, user_dn
)
1150 sid
= self
.get_object_sid(self
.get_user_dn(self
.regular_user
))
1151 mod
= "(A;;WP;;;%s)" % str(sid
)
1152 self
.dacl_add_ace(user_dn
, mod
)
1153 # Rename 'User object' having WP to AU
1154 self
.ldb_user
.rename(user_dn
, rename_user_dn
)
1155 res
= self
.ldb_admin
.search(self
.base_dn
,
1156 expression
="(distinguishedName=%s)" % user_dn
)
1157 self
.assertEqual(res
, [])
1158 res
= self
.ldb_admin
.search(self
.base_dn
,
1159 expression
="(distinguishedName=%s)" % rename_user_dn
)
1160 self
.assertNotEqual(res
, [])
1162 def test_rename_u4(self
):
1163 """Rename 'User object' cross OU with WP, SD and CC right granted on reg. user to AU"""
1164 ou1_dn
= "OU=test_rename_ou1," + self
.base_dn
1165 ou2_dn
= "OU=test_rename_ou2," + self
.base_dn
1166 user_dn
= "CN=test_rename_user2," + ou1_dn
1167 rename_user_dn
= "CN=test_rename_user5," + ou2_dn
1168 # Create OU structure
1169 self
.ldb_admin
.create_ou(ou1_dn
)
1170 self
.ldb_admin
.create_ou(ou2_dn
)
1171 self
.create_test_user(self
.ldb_admin
, user_dn
)
1172 mod
= "(A;;WPSD;;;AU)"
1173 self
.dacl_add_ace(user_dn
, mod
)
1174 mod
= "(A;;CC;;;AU)"
1175 self
.dacl_add_ace(ou2_dn
, mod
)
1176 # Rename 'User object' having SD and CC to AU
1177 self
.ldb_user
.rename(user_dn
, rename_user_dn
)
1178 res
= self
.ldb_admin
.search(self
.base_dn
,
1179 expression
="(distinguishedName=%s)" % user_dn
)
1180 self
.assertEqual(res
, [])
1181 res
= self
.ldb_admin
.search(self
.base_dn
,
1182 expression
="(distinguishedName=%s)" % rename_user_dn
)
1183 self
.assertNotEqual(res
, [])
1185 def test_rename_u5(self
):
1186 """Test rename with rights granted to 'User object' SID"""
1187 ou1_dn
= "OU=test_rename_ou1," + self
.base_dn
1188 ou2_dn
= "OU=test_rename_ou2," + self
.base_dn
1189 user_dn
= "CN=test_rename_user2," + ou1_dn
1190 rename_user_dn
= "CN=test_rename_user5," + ou2_dn
1191 # Create OU structure
1192 self
.ldb_admin
.create_ou(ou1_dn
)
1193 self
.ldb_admin
.create_ou(ou2_dn
)
1194 self
.create_test_user(self
.ldb_admin
, user_dn
)
1195 sid
= self
.get_object_sid(self
.get_user_dn(self
.regular_user
))
1196 mod
= "(A;;WPSD;;;%s)" % str(sid
)
1197 self
.dacl_add_ace(user_dn
, mod
)
1198 mod
= "(A;;CC;;;%s)" % str(sid
)
1199 self
.dacl_add_ace(ou2_dn
, mod
)
1200 # Rename 'User object' having SD and CC to AU
1201 self
.ldb_user
.rename(user_dn
, rename_user_dn
)
1202 res
= self
.ldb_admin
.search(self
.base_dn
,
1203 expression
="(distinguishedName=%s)" % user_dn
)
1204 self
.assertEqual(res
, [])
1205 res
= self
.ldb_admin
.search(self
.base_dn
,
1206 expression
="(distinguishedName=%s)" % rename_user_dn
)
1207 self
.assertNotEqual(res
, [])
1209 def test_rename_u6(self
):
1210 """Rename 'User object' cross OU with WP, DC and CC right granted on OU & user to AU"""
1211 ou1_dn
= "OU=test_rename_ou1," + self
.base_dn
1212 ou2_dn
= "OU=test_rename_ou2," + self
.base_dn
1213 user_dn
= "CN=test_rename_user2," + ou1_dn
1214 rename_user_dn
= "CN=test_rename_user2," + ou2_dn
1215 # Create OU structure
1216 self
.ldb_admin
.create_ou(ou1_dn
)
1217 self
.ldb_admin
.create_ou(ou2_dn
)
1218 #mod = "(A;CI;DCWP;;;AU)"
1219 mod
= "(A;;DC;;;AU)"
1220 self
.dacl_add_ace(ou1_dn
, mod
)
1221 mod
= "(A;;CC;;;AU)"
1222 self
.dacl_add_ace(ou2_dn
, mod
)
1223 self
.create_test_user(self
.ldb_admin
, user_dn
)
1224 mod
= "(A;;WP;;;AU)"
1225 self
.dacl_add_ace(user_dn
, mod
)
1226 # Rename 'User object' having SD and CC to AU
1227 self
.ldb_user
.rename(user_dn
, rename_user_dn
)
1228 res
= self
.ldb_admin
.search(self
.base_dn
,
1229 expression
="(distinguishedName=%s)" % user_dn
)
1230 self
.assertEqual(res
, [])
1231 res
= self
.ldb_admin
.search(self
.base_dn
,
1232 expression
="(distinguishedName=%s)" % rename_user_dn
)
1233 self
.assertNotEqual(res
, [])
1235 def test_rename_u7(self
):
1236 """Rename 'User object' cross OU (second level) with WP, DC and CC right granted on OU to AU"""
1237 ou1_dn
= "OU=test_rename_ou1," + self
.base_dn
1238 ou2_dn
= "OU=test_rename_ou2," + self
.base_dn
1239 ou3_dn
= "OU=test_rename_ou3," + ou2_dn
1240 user_dn
= "CN=test_rename_user2," + ou1_dn
1241 rename_user_dn
= "CN=test_rename_user5," + ou3_dn
1242 # Create OU structure
1243 self
.ldb_admin
.create_ou(ou1_dn
)
1244 self
.ldb_admin
.create_ou(ou2_dn
)
1245 self
.ldb_admin
.create_ou(ou3_dn
)
1246 mod
= "(A;CI;WPDC;;;AU)"
1247 self
.dacl_add_ace(ou1_dn
, mod
)
1248 mod
= "(A;;CC;;;AU)"
1249 self
.dacl_add_ace(ou3_dn
, mod
)
1250 self
.create_test_user(self
.ldb_admin
, user_dn
)
1251 # Rename 'User object' having SD and CC to AU
1252 self
.ldb_user
.rename(user_dn
, rename_user_dn
)
1253 res
= self
.ldb_admin
.search(self
.base_dn
,
1254 expression
="(distinguishedName=%s)" % user_dn
)
1255 self
.assertEqual(res
, [])
1256 res
= self
.ldb_admin
.search(self
.base_dn
,
1257 expression
="(distinguishedName=%s)" % rename_user_dn
)
1258 self
.assertNotEqual(res
, [])
1260 def test_rename_u8(self
):
1261 """Test rename on an object with and without modify access on the RDN attribute"""
1262 ou1_dn
= "OU=test_rename_ou1," + self
.base_dn
1263 ou2_dn
= "OU=test_rename_ou2," + ou1_dn
1264 ou3_dn
= "OU=test_rename_ou3," + ou1_dn
1265 # Create OU structure
1266 self
.ldb_admin
.create_ou(ou1_dn
)
1267 self
.ldb_admin
.create_ou(ou2_dn
)
1268 sid
= self
.get_object_sid(self
.get_user_dn(self
.regular_user
))
1269 mod
= "(OA;;WP;bf967a0e-0de6-11d0-a285-00aa003049e2;;%s)" % str(sid
)
1270 self
.dacl_add_ace(ou2_dn
, mod
)
1271 mod
= "(OD;;WP;bf9679f0-0de6-11d0-a285-00aa003049e2;;%s)" % str(sid
)
1272 self
.dacl_add_ace(ou2_dn
, mod
)
1274 self
.ldb_user
.rename(ou2_dn
, ou3_dn
)
1275 except LdbError
, (num
, _
):
1276 self
.assertEquals(num
, ERR_INSUFFICIENT_ACCESS_RIGHTS
)
1278 # This rename operation should always throw ERR_INSUFFICIENT_ACCESS_RIGHTS
1280 sid
= self
.get_object_sid(self
.get_user_dn(self
.regular_user
))
1281 mod
= "(A;;WP;bf9679f0-0de6-11d0-a285-00aa003049e2;;%s)" % str(sid
)
1282 self
.dacl_add_ace(ou2_dn
, mod
)
1283 self
.ldb_user
.rename(ou2_dn
, ou3_dn
)
1284 res
= self
.ldb_admin
.search(self
.base_dn
, expression
="(distinguishedName=%s)" % ou2_dn
)
1285 self
.assertEqual(res
, [])
1286 res
= self
.ldb_admin
.search(self
.base_dn
, expression
="(distinguishedName=%s)" % ou3_dn
)
1287 self
.assertNotEqual(res
, [])
1289 #tests on Control Access Rights
1290 class AclCARTests(AclTests
):
1293 super(AclCARTests
, self
).setUp()
1294 self
.user_with_wp
= "acl_car_user1"
1295 self
.user_with_pc
= "acl_car_user2"
1296 self
.create_enable_user(self
.user_with_wp
)
1297 self
.create_enable_user(self
.user_with_pc
)
1298 self
.ldb_user
= self
.get_ldb_connection(self
.user_with_wp
, self
.user_pass
)
1299 self
.ldb_user2
= self
.get_ldb_connection(self
.user_with_pc
, self
.user_pass
)
1301 res
= self
.ldb_admin
.search("CN=Directory Service, CN=Windows NT, CN=Services, "
1302 + self
.configuration_dn
, scope
=SCOPE_BASE
, attrs
=["dSHeuristics"])
1303 if "dSHeuristics" in res
[0]:
1304 self
.dsheuristics
= res
[0]["dSHeuristics"][0]
1306 self
.dsheuristics
= None
1308 self
.minPwdAge
= self
.ldb_admin
.get_minPwdAge()
1310 # Set the "dSHeuristics" to have the tests run against Windows Server
1311 self
.ldb_admin
.set_dsheuristics("000000001")
1312 # Set minPwdAge to 0
1313 self
.ldb_admin
.set_minPwdAge("0")
1316 super(AclCARTests
, self
).tearDown()
1317 #restore original values
1318 self
.ldb_admin
.set_dsheuristics(self
.dsheuristics
)
1319 self
.ldb_admin
.set_minPwdAge(self
.minPwdAge
)
1320 self
.delete_force(self
.ldb_admin
, self
.get_user_dn(self
.user_with_wp
))
1321 self
.delete_force(self
.ldb_admin
, self
.get_user_dn(self
.user_with_pc
))
1323 def test_change_password1(self
):
1324 """Try a password change operation without any CARs given"""
1325 #users have change password by default - remove for negative testing
1326 desc
= self
.read_desc(self
.get_user_dn(self
.user_with_wp
))
1327 sddl
= desc
.as_sddl(self
.domain_sid
)
1328 sddl
= sddl
.replace("(OA;;CR;ab721a53-1e2f-11d0-9819-00aa0040529b;;WD)", "")
1329 sddl
= sddl
.replace("(OA;;CR;ab721a53-1e2f-11d0-9819-00aa0040529b;;PS)", "")
1330 self
.modify_desc(self
.get_user_dn(self
.user_with_wp
), sddl
)
1332 self
.ldb_user
.modify_ldif("""
1333 dn: """ + self
.get_user_dn(self
.user_with_wp
) + """
1336 unicodePwd:: """ + base64
.b64encode("\"samba123@\"".encode('utf-16-le')) + """
1338 unicodePwd:: """ + base64
.b64encode("\"thatsAcomplPASS2\"".encode('utf-16-le')) + """
1340 except LdbError
, (num
, _
):
1341 self
.assertEquals(num
, ERR_CONSTRAINT_VIOLATION
)
1343 # for some reason we get constraint violation instead of insufficient access error
1346 def test_change_password2(self
):
1347 """Make sure WP has no influence"""
1348 desc
= self
.read_desc(self
.get_user_dn(self
.user_with_wp
))
1349 sddl
= desc
.as_sddl(self
.domain_sid
)
1350 sddl
= sddl
.replace("(OA;;CR;ab721a53-1e2f-11d0-9819-00aa0040529b;;WD)", "")
1351 sddl
= sddl
.replace("(OA;;CR;ab721a53-1e2f-11d0-9819-00aa0040529b;;PS)", "")
1352 self
.modify_desc(self
.get_user_dn(self
.user_with_wp
), sddl
)
1353 mod
= "(A;;WP;;;PS)"
1354 self
.dacl_add_ace(self
.get_user_dn(self
.user_with_wp
), mod
)
1355 desc
= self
.read_desc(self
.get_user_dn(self
.user_with_wp
))
1356 sddl
= desc
.as_sddl(self
.domain_sid
)
1358 self
.ldb_user
.modify_ldif("""
1359 dn: """ + self
.get_user_dn(self
.user_with_wp
) + """
1362 unicodePwd:: """ + base64
.b64encode("\"samba123@\"".encode('utf-16-le')) + """
1364 unicodePwd:: """ + base64
.b64encode("\"thatsAcomplPASS2\"".encode('utf-16-le')) + """
1366 except LdbError
, (num
, _
):
1367 self
.assertEquals(num
, ERR_CONSTRAINT_VIOLATION
)
1369 # for some reason we get constraint violation instead of insufficient access error
1372 def test_change_password3(self
):
1373 """Make sure WP has no influence"""
1374 mod
= "(D;;WP;;;PS)"
1375 self
.dacl_add_ace(self
.get_user_dn(self
.user_with_wp
), mod
)
1376 desc
= self
.read_desc(self
.get_user_dn(self
.user_with_wp
))
1377 sddl
= desc
.as_sddl(self
.domain_sid
)
1378 self
.ldb_user
.modify_ldif("""
1379 dn: """ + self
.get_user_dn(self
.user_with_wp
) + """
1382 unicodePwd:: """ + base64
.b64encode("\"samba123@\"".encode('utf-16-le')) + """
1384 unicodePwd:: """ + base64
.b64encode("\"thatsAcomplPASS2\"".encode('utf-16-le')) + """
1387 def test_change_password5(self
):
1388 """Make sure rights have no influence on dBCSPwd"""
1389 desc
= self
.read_desc(self
.get_user_dn(self
.user_with_wp
))
1390 sddl
= desc
.as_sddl(self
.domain_sid
)
1391 sddl
= sddl
.replace("(OA;;CR;ab721a53-1e2f-11d0-9819-00aa0040529b;;WD)", "")
1392 sddl
= sddl
.replace("(OA;;CR;ab721a53-1e2f-11d0-9819-00aa0040529b;;PS)", "")
1393 self
.modify_desc(self
.get_user_dn(self
.user_with_wp
), sddl
)
1394 mod
= "(D;;WP;;;PS)"
1395 self
.dacl_add_ace(self
.get_user_dn(self
.user_with_wp
), mod
)
1397 self
.ldb_user
.modify_ldif("""
1398 dn: """ + self
.get_user_dn(self
.user_with_wp
) + """
1401 dBCSPwd: XXXXXXXXXXXXXXXX
1403 dBCSPwd: YYYYYYYYYYYYYYYY
1405 except LdbError
, (num
, _
):
1406 self
.assertEquals(num
, ERR_UNWILLING_TO_PERFORM
)
1410 def test_change_password6(self
):
1411 """Test uneven delete/adds"""
1413 self
.ldb_user
.modify_ldif("""
1414 dn: """ + self
.get_user_dn(self
.user_with_wp
) + """
1416 delete: userPassword
1417 userPassword: thatsAcomplPASS1
1418 delete: userPassword
1419 userPassword: thatsAcomplPASS1
1421 userPassword: thatsAcomplPASS2
1423 except LdbError
, (num
, _
):
1424 self
.assertEquals(num
, ERR_INSUFFICIENT_ACCESS_RIGHTS
)
1427 mod
= "(OA;;CR;00299570-246d-11d0-a768-00aa006e0529;;PS)"
1428 self
.dacl_add_ace(self
.get_user_dn(self
.user_with_wp
), mod
)
1430 self
.ldb_user
.modify_ldif("""
1431 dn: """ + self
.get_user_dn(self
.user_with_wp
) + """
1433 delete: userPassword
1434 userPassword: thatsAcomplPASS1
1435 delete: userPassword
1436 userPassword: thatsAcomplPASS1
1438 userPassword: thatsAcomplPASS2
1440 # This fails on Windows 2000 domain level with constraint violation
1441 except LdbError
, (num
, _
):
1442 self
.assertTrue(num
== ERR_CONSTRAINT_VIOLATION
or
1443 num
== ERR_UNWILLING_TO_PERFORM
)
1448 def test_change_password7(self
):
1449 """Try a password change operation without any CARs given"""
1450 #users have change password by default - remove for negative testing
1451 desc
= self
.read_desc(self
.get_user_dn(self
.user_with_wp
))
1452 sddl
= desc
.as_sddl(self
.domain_sid
)
1453 self
.modify_desc(self
.get_user_dn(self
.user_with_wp
), sddl
)
1454 #first change our own password
1455 self
.ldb_user2
.modify_ldif("""
1456 dn: """ + self
.get_user_dn(self
.user_with_pc
) + """
1459 unicodePwd:: """ + base64
.b64encode("\"samba123@\"".encode('utf-16-le')) + """
1461 unicodePwd:: """ + base64
.b64encode("\"thatsAcomplPASS1\"".encode('utf-16-le')) + """
1463 #then someone else's
1464 self
.ldb_user2
.modify_ldif("""
1465 dn: """ + self
.get_user_dn(self
.user_with_wp
) + """
1468 unicodePwd:: """ + base64
.b64encode("\"samba123@\"".encode('utf-16-le')) + """
1470 unicodePwd:: """ + base64
.b64encode("\"thatsAcomplPASS2\"".encode('utf-16-le')) + """
1473 def test_reset_password1(self
):
1474 """Try a user password reset operation (unicodePwd) before and after granting CAR"""
1476 self
.ldb_user
.modify_ldif("""
1477 dn: """ + self
.get_user_dn(self
.user_with_wp
) + """
1480 unicodePwd:: """ + base64
.b64encode("\"thatsAcomplPASS1\"".encode('utf-16-le')) + """
1482 except LdbError
, (num
, _
):
1483 self
.assertEquals(num
, ERR_INSUFFICIENT_ACCESS_RIGHTS
)
1486 mod
= "(OA;;CR;00299570-246d-11d0-a768-00aa006e0529;;PS)"
1487 self
.dacl_add_ace(self
.get_user_dn(self
.user_with_wp
), mod
)
1488 self
.ldb_user
.modify_ldif("""
1489 dn: """ + self
.get_user_dn(self
.user_with_wp
) + """
1492 unicodePwd:: """ + base64
.b64encode("\"thatsAcomplPASS1\"".encode('utf-16-le')) + """
1495 def test_reset_password2(self
):
1496 """Try a user password reset operation (userPassword) before and after granting CAR"""
1498 self
.ldb_user
.modify_ldif("""
1499 dn: """ + self
.get_user_dn(self
.user_with_wp
) + """
1501 replace: userPassword
1502 userPassword: thatsAcomplPASS1
1504 except LdbError
, (num
, _
):
1505 self
.assertEquals(num
, ERR_INSUFFICIENT_ACCESS_RIGHTS
)
1508 mod
= "(OA;;CR;00299570-246d-11d0-a768-00aa006e0529;;PS)"
1509 self
.dacl_add_ace(self
.get_user_dn(self
.user_with_wp
), mod
)
1511 self
.ldb_user
.modify_ldif("""
1512 dn: """ + self
.get_user_dn(self
.user_with_wp
) + """
1514 replace: userPassword
1515 userPassword: thatsAcomplPASS1
1517 # This fails on Windows 2000 domain level with constraint violation
1518 except LdbError
, (num
, _
):
1519 self
.assertEquals(num
, ERR_CONSTRAINT_VIOLATION
)
1521 def test_reset_password3(self
):
1522 """Grant WP and see what happens (unicodePwd)"""
1523 mod
= "(A;;WP;;;PS)"
1524 self
.dacl_add_ace(self
.get_user_dn(self
.user_with_wp
), mod
)
1526 self
.ldb_user
.modify_ldif("""
1527 dn: """ + self
.get_user_dn(self
.user_with_wp
) + """
1530 unicodePwd:: """ + base64
.b64encode("\"thatsAcomplPASS1\"".encode('utf-16-le')) + """
1532 except LdbError
, (num
, _
):
1533 self
.assertEquals(num
, ERR_INSUFFICIENT_ACCESS_RIGHTS
)
1537 def test_reset_password4(self
):
1538 """Grant WP and see what happens (userPassword)"""
1539 mod
= "(A;;WP;;;PS)"
1540 self
.dacl_add_ace(self
.get_user_dn(self
.user_with_wp
), mod
)
1542 self
.ldb_user
.modify_ldif("""
1543 dn: """ + self
.get_user_dn(self
.user_with_wp
) + """
1545 replace: userPassword
1546 userPassword: thatsAcomplPASS1
1548 except LdbError
, (num
, _
):
1549 self
.assertEquals(num
, ERR_INSUFFICIENT_ACCESS_RIGHTS
)
1553 def test_reset_password5(self
):
1554 """Explicitly deny WP but grant CAR (unicodePwd)"""
1555 mod
= "(D;;WP;;;PS)(OA;;CR;00299570-246d-11d0-a768-00aa006e0529;;PS)"
1556 self
.dacl_add_ace(self
.get_user_dn(self
.user_with_wp
), mod
)
1557 self
.ldb_user
.modify_ldif("""
1558 dn: """ + self
.get_user_dn(self
.user_with_wp
) + """
1561 unicodePwd:: """ + base64
.b64encode("\"thatsAcomplPASS1\"".encode('utf-16-le')) + """
1564 def test_reset_password6(self
):
1565 """Explicitly deny WP but grant CAR (userPassword)"""
1566 mod
= "(D;;WP;;;PS)(OA;;CR;00299570-246d-11d0-a768-00aa006e0529;;PS)"
1567 self
.dacl_add_ace(self
.get_user_dn(self
.user_with_wp
), mod
)
1569 self
.ldb_user
.modify_ldif("""
1570 dn: """ + self
.get_user_dn(self
.user_with_wp
) + """
1572 replace: userPassword
1573 userPassword: thatsAcomplPASS1
1575 # This fails on Windows 2000 domain level with constraint violation
1576 except LdbError
, (num
, _
):
1577 self
.assertEquals(num
, ERR_CONSTRAINT_VIOLATION
)
1579 class AclExtendedTests(AclTests
):
1582 super(AclExtendedTests
, self
).setUp()
1583 #regular user, will be the creator
1589 self
.create_enable_user(self
.u1
)
1590 self
.create_enable_user(self
.u2
)
1591 self
.create_enable_user(self
.u3
)
1592 self
.ldb_admin
.add_remove_group_members("Domain Admins", self
.u3
,
1593 add_members_operation
=True)
1594 self
.ldb_user1
= self
.get_ldb_connection(self
.u1
, self
.user_pass
)
1595 self
.ldb_user2
= self
.get_ldb_connection(self
.u2
, self
.user_pass
)
1596 self
.ldb_user3
= self
.get_ldb_connection(self
.u3
, self
.user_pass
)
1597 self
.user_sid1
= self
.get_object_sid(self
.get_user_dn(self
.u1
))
1598 self
.user_sid2
= self
.get_object_sid(self
.get_user_dn(self
.u2
))
1601 super(AclExtendedTests
, self
).tearDown()
1602 self
.delete_force(self
.ldb_admin
, self
.get_user_dn(self
.u1
))
1603 self
.delete_force(self
.ldb_admin
, self
.get_user_dn(self
.u2
))
1604 self
.delete_force(self
.ldb_admin
, self
.get_user_dn(self
.u3
))
1605 self
.delete_force(self
.ldb_admin
, "CN=ext_group1,OU=ext_ou1," + self
.base_dn
)
1606 self
.delete_force(self
.ldb_admin
, "ou=ext_ou1," + self
.base_dn
)
1608 def test_ntSecurityDescriptor(self
):
1610 self
.ldb_admin
.create_ou("ou=ext_ou1," + self
.base_dn
)
1611 #give u1 Create children access
1612 mod
= "(A;;CC;;;%s)" % str(self
.user_sid1
)
1613 self
.dacl_add_ace("OU=ext_ou1," + self
.base_dn
, mod
)
1614 mod
= "(A;;LC;;;%s)" % str(self
.user_sid2
)
1615 self
.dacl_add_ace("OU=ext_ou1," + self
.base_dn
, mod
)
1616 #create a group under that, grant RP to u2
1617 self
.create_group(self
.ldb_user1
, "CN=ext_group1,OU=ext_ou1," + self
.base_dn
)
1618 mod
= "(A;;RP;;;%s)" % str(self
.user_sid2
)
1619 self
.dacl_add_ace("CN=ext_group1,OU=ext_ou1," + self
.base_dn
, mod
)
1620 #u2 must not read the descriptor
1621 res
= self
.ldb_user2
.search("CN=ext_group1,OU=ext_ou1," + self
.base_dn
,
1622 SCOPE_BASE
, None, ["nTSecurityDescriptor"])
1623 self
.assertNotEqual(res
,[])
1624 self
.assertFalse("nTSecurityDescriptor" in res
[0].keys())
1625 #grant RC to u2 - still no access
1626 mod
= "(A;;RC;;;%s)" % str(self
.user_sid2
)
1627 self
.dacl_add_ace("CN=ext_group1,OU=ext_ou1," + self
.base_dn
, mod
)
1628 res
= self
.ldb_user2
.search("CN=ext_group1,OU=ext_ou1," + self
.base_dn
,
1629 SCOPE_BASE
, None, ["nTSecurityDescriptor"])
1630 self
.assertNotEqual(res
,[])
1631 self
.assertFalse("nTSecurityDescriptor" in res
[0].keys())
1632 #u3 is member of administrators group, should be able to read sd
1633 res
= self
.ldb_user3
.search("CN=ext_group1,OU=ext_ou1," + self
.base_dn
,
1634 SCOPE_BASE
, None, ["nTSecurityDescriptor"])
1635 self
.assertEqual(len(res
),1)
1636 self
.assertTrue("nTSecurityDescriptor" in res
[0].keys())
1638 # Important unit running information
1640 if not "://" in host
:
1641 host
= "ldap://%s" % host
1642 ldb
= SamDB(host
, credentials
=creds
, session_info
=system_session(), lp
=lp
)
1644 runner
= SubunitTestRunner()
1646 if not runner
.run(unittest
.makeSuite(AclAddTests
)).wasSuccessful():
1648 if not runner
.run(unittest
.makeSuite(AclModifyTests
)).wasSuccessful():
1650 if not runner
.run(unittest
.makeSuite(AclDeleteTests
)).wasSuccessful():
1652 if not runner
.run(unittest
.makeSuite(AclRenameTests
)).wasSuccessful():
1654 if not runner
.run(unittest
.makeSuite(AclCARTests
)).wasSuccessful():
1656 if not runner
.run(unittest
.makeSuite(AclSearchTests
)).wasSuccessful():
1658 if not runner
.run(unittest
.makeSuite(AclExtendedTests
)).wasSuccessful():