2 # -*- coding: utf-8 -*-
3 # This is unit with tests for LDAP access checks
9 sys
.path
.insert(0, "bin/python")
12 from samba
.tests
.subunitrun
import SubunitOptions
, TestProgram
14 import samba
.getopt
as options
15 from samba
.join
import dc_join
18 SCOPE_BASE
, SCOPE_SUBTREE
, LdbError
, ERR_NO_SUCH_OBJECT
,
19 ERR_UNWILLING_TO_PERFORM
, ERR_INSUFFICIENT_ACCESS_RIGHTS
)
20 from ldb
import ERR_CONSTRAINT_VIOLATION
21 from ldb
import ERR_OPERATIONS_ERROR
22 from ldb
import Message
, MessageElement
, Dn
23 from ldb
import FLAG_MOD_REPLACE
, FLAG_MOD_ADD
, FLAG_MOD_DELETE
24 from samba
.dcerpc
import security
, drsuapi
, misc
26 from samba
.auth
import system_session
27 from samba
import gensec
, sd_utils
28 from samba
.samdb
import SamDB
29 from samba
.credentials
import Credentials
, DONT_USE_KERBEROS
31 from samba
.tests
import delete_force
34 parser
= optparse
.OptionParser("acl.py [options] <host>")
35 sambaopts
= options
.SambaOptions(parser
)
36 parser
.add_option_group(sambaopts
)
37 parser
.add_option_group(options
.VersionOptions(parser
))
39 # use command line creds if available
40 credopts
= options
.CredentialsOptions(parser
)
41 parser
.add_option_group(credopts
)
42 subunitopts
= SubunitOptions(parser
)
43 parser
.add_option_group(subunitopts
)
45 opts
, args
= parser
.parse_args()
53 ldaphost
= "ldap://%s" % host
56 start
= host
.rindex("://")
57 host
= host
.lstrip(start
+3)
59 lp
= sambaopts
.get_loadparm()
60 creds
= credopts
.get_credentials(lp
)
61 creds
.set_gensec_features(creds
.get_gensec_features() | gensec
.FEATURE_SEAL
)
67 class AclTests(samba
.tests
.TestCase
):
70 super(AclTests
, self
).setUp()
71 self
.ldb_admin
= SamDB(ldaphost
, credentials
=creds
, session_info
=system_session(lp
), lp
=lp
)
72 self
.base_dn
= self
.ldb_admin
.domain_dn()
73 self
.domain_sid
= security
.dom_sid(self
.ldb_admin
.get_domain_sid())
74 self
.user_pass
= "samba123@"
75 self
.configuration_dn
= self
.ldb_admin
.get_config_basedn().get_linearized()
76 self
.sd_utils
= sd_utils
.SDUtils(self
.ldb_admin
)
77 #used for anonymous login
78 self
.creds_tmp
= Credentials()
79 self
.creds_tmp
.set_username("")
80 self
.creds_tmp
.set_password("")
81 self
.creds_tmp
.set_domain(creds
.get_domain())
82 self
.creds_tmp
.set_realm(creds
.get_realm())
83 self
.creds_tmp
.set_workstation(creds
.get_workstation())
84 print "baseDN: %s" % self
.base_dn
86 def get_user_dn(self
, name
):
87 return "CN=%s,CN=Users,%s" % (name
, self
.base_dn
)
89 def get_ldb_connection(self
, target_username
, target_password
):
90 creds_tmp
= Credentials()
91 creds_tmp
.set_username(target_username
)
92 creds_tmp
.set_password(target_password
)
93 creds_tmp
.set_domain(creds
.get_domain())
94 creds_tmp
.set_realm(creds
.get_realm())
95 creds_tmp
.set_workstation(creds
.get_workstation())
96 creds_tmp
.set_gensec_features(creds_tmp
.get_gensec_features()
97 | gensec
.FEATURE_SEAL
)
98 creds_tmp
.set_kerberos_state(DONT_USE_KERBEROS
) # kinit is too expensive to use in a tight loop
99 ldb_target
= SamDB(url
=ldaphost
, credentials
=creds_tmp
, lp
=lp
)
102 # Test if we have any additional groups for users than default ones
103 def assert_user_no_group_member(self
, username
):
104 res
= self
.ldb_admin
.search(self
.base_dn
, expression
="(distinguishedName=%s)" % self
.get_user_dn(username
))
106 self
.assertEqual(res
[0]["memberOf"][0], "")
112 #tests on ldap add operations
113 class AclAddTests(AclTests
):
116 super(AclAddTests
, self
).setUp()
117 # Domain admin that will be creator of OU parent-child structure
118 self
.usr_admin_owner
= "acl_add_user1"
119 # Second domain admin that will not be creator of OU parent-child structure
120 self
.usr_admin_not_owner
= "acl_add_user2"
122 self
.regular_user
= "acl_add_user3"
123 self
.test_user1
= "test_add_user1"
124 self
.test_group1
= "test_add_group1"
125 self
.ou1
= "OU=test_add_ou1"
126 self
.ou2
= "OU=test_add_ou2,%s" % self
.ou1
127 self
.ldb_admin
.newuser(self
.usr_admin_owner
, self
.user_pass
)
128 self
.ldb_admin
.newuser(self
.usr_admin_not_owner
, self
.user_pass
)
129 self
.ldb_admin
.newuser(self
.regular_user
, self
.user_pass
)
131 # add admins to the Domain Admins group
132 self
.ldb_admin
.add_remove_group_members("Domain Admins", [self
.usr_admin_owner
],
133 add_members_operation
=True)
134 self
.ldb_admin
.add_remove_group_members("Domain Admins", [self
.usr_admin_not_owner
],
135 add_members_operation
=True)
137 self
.ldb_owner
= self
.get_ldb_connection(self
.usr_admin_owner
, self
.user_pass
)
138 self
.ldb_notowner
= self
.get_ldb_connection(self
.usr_admin_not_owner
, self
.user_pass
)
139 self
.ldb_user
= self
.get_ldb_connection(self
.regular_user
, self
.user_pass
)
142 super(AclAddTests
, self
).tearDown()
143 delete_force(self
.ldb_admin
, "CN=%s,%s,%s" %
144 (self
.test_user1
, self
.ou2
, self
.base_dn
))
145 delete_force(self
.ldb_admin
, "CN=%s,%s,%s" %
146 (self
.test_group1
, self
.ou2
, self
.base_dn
))
147 delete_force(self
.ldb_admin
, "%s,%s" % (self
.ou2
, self
.base_dn
))
148 delete_force(self
.ldb_admin
, "%s,%s" % (self
.ou1
, self
.base_dn
))
149 delete_force(self
.ldb_admin
, self
.get_user_dn(self
.usr_admin_owner
))
150 delete_force(self
.ldb_admin
, self
.get_user_dn(self
.usr_admin_not_owner
))
151 delete_force(self
.ldb_admin
, self
.get_user_dn(self
.regular_user
))
152 delete_force(self
.ldb_admin
, self
.get_user_dn("test_add_anonymous"))
154 # Make sure top OU is deleted (and so everything under it)
155 def assert_top_ou_deleted(self
):
156 res
= self
.ldb_admin
.search(self
.base_dn
,
157 expression
="(distinguishedName=%s,%s)" % (
158 "OU=test_add_ou1", self
.base_dn
))
159 self
.assertEqual(len(res
), 0)
161 def test_add_u1(self
):
162 """Testing OU with the rights of Doman Admin not creator of the OU """
163 self
.assert_top_ou_deleted()
164 # Change descriptor for top level OU
165 self
.ldb_owner
.create_ou("OU=test_add_ou1," + self
.base_dn
)
166 self
.ldb_owner
.create_ou("OU=test_add_ou2,OU=test_add_ou1," + self
.base_dn
)
167 user_sid
= self
.sd_utils
.get_object_sid(self
.get_user_dn(self
.usr_admin_not_owner
))
168 mod
= "(D;CI;WPCC;;;%s)" % str(user_sid
)
169 self
.sd_utils
.dacl_add_ace("OU=test_add_ou1," + self
.base_dn
, mod
)
170 # Test user and group creation with another domain admin's credentials
171 self
.ldb_notowner
.newuser(self
.test_user1
, self
.user_pass
, userou
=self
.ou2
)
172 self
.ldb_notowner
.newgroup("test_add_group1", groupou
="OU=test_add_ou2,OU=test_add_ou1",
173 grouptype
=samba
.dsdb
.GTYPE_DISTRIBUTION_DOMAIN_LOCAL_GROUP
)
174 # Make sure we HAVE created the two objects -- user and group
175 # !!! We should not be able to do that, but however beacuse of ACE ordering our inherited Deny ACE
176 # !!! comes after explicit (A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;DA) that comes from somewhere
177 res
= self
.ldb_admin
.search(self
.base_dn
, expression
="(distinguishedName=%s,%s)" % ("CN=test_add_user1,OU=test_add_ou2,OU=test_add_ou1", self
.base_dn
))
178 self
.assertTrue(len(res
) > 0)
179 res
= self
.ldb_admin
.search(self
.base_dn
, expression
="(distinguishedName=%s,%s)" % ("CN=test_add_group1,OU=test_add_ou2,OU=test_add_ou1", self
.base_dn
))
180 self
.assertTrue(len(res
) > 0)
182 def test_add_u2(self
):
183 """Testing OU with the regular user that has no rights granted over the OU """
184 self
.assert_top_ou_deleted()
185 # Create a parent-child OU structure with domain admin credentials
186 self
.ldb_owner
.create_ou("OU=test_add_ou1," + self
.base_dn
)
187 self
.ldb_owner
.create_ou("OU=test_add_ou2,OU=test_add_ou1," + self
.base_dn
)
188 # Test user and group creation with regular user credentials
190 self
.ldb_user
.newuser(self
.test_user1
, self
.user_pass
, userou
=self
.ou2
)
191 self
.ldb_user
.newgroup("test_add_group1", groupou
="OU=test_add_ou2,OU=test_add_ou1",
192 grouptype
=samba
.dsdb
.GTYPE_DISTRIBUTION_DOMAIN_LOCAL_GROUP
)
193 except LdbError
, (num
, _
):
194 self
.assertEquals(num
, ERR_INSUFFICIENT_ACCESS_RIGHTS
)
197 # Make sure we HAVEN'T created any of two objects -- user or group
198 res
= self
.ldb_admin
.search(self
.base_dn
, expression
="(distinguishedName=%s,%s)" % ("CN=test_add_user1,OU=test_add_ou2,OU=test_add_ou1", self
.base_dn
))
199 self
.assertEqual(len(res
), 0)
200 res
= self
.ldb_admin
.search(self
.base_dn
, expression
="(distinguishedName=%s,%s)" % ("CN=test_add_group1,OU=test_add_ou2,OU=test_add_ou1", self
.base_dn
))
201 self
.assertEqual(len(res
), 0)
203 def test_add_u3(self
):
204 """Testing OU with the rights of regular user granted the right 'Create User child objects' """
205 self
.assert_top_ou_deleted()
206 # Change descriptor for top level OU
207 self
.ldb_owner
.create_ou("OU=test_add_ou1," + self
.base_dn
)
208 user_sid
= self
.sd_utils
.get_object_sid(self
.get_user_dn(self
.regular_user
))
209 mod
= "(OA;CI;CC;bf967aba-0de6-11d0-a285-00aa003049e2;;%s)" % str(user_sid
)
210 self
.sd_utils
.dacl_add_ace("OU=test_add_ou1," + self
.base_dn
, mod
)
211 self
.ldb_owner
.create_ou("OU=test_add_ou2,OU=test_add_ou1," + self
.base_dn
)
212 # Test user and group creation with granted user only to one of the objects
213 self
.ldb_user
.newuser(self
.test_user1
, self
.user_pass
, userou
=self
.ou2
, setpassword
=False)
215 self
.ldb_user
.newgroup("test_add_group1", groupou
="OU=test_add_ou2,OU=test_add_ou1",
216 grouptype
=samba
.dsdb
.GTYPE_DISTRIBUTION_DOMAIN_LOCAL_GROUP
)
217 except LdbError
, (num
, _
):
218 self
.assertEquals(num
, ERR_INSUFFICIENT_ACCESS_RIGHTS
)
221 # Make sure we HAVE created the one of two objects -- user
222 res
= self
.ldb_admin
.search(self
.base_dn
,
223 expression
="(distinguishedName=%s,%s)" %
224 ("CN=test_add_user1,OU=test_add_ou2,OU=test_add_ou1",
226 self
.assertNotEqual(len(res
), 0)
227 res
= self
.ldb_admin
.search(self
.base_dn
,
228 expression
="(distinguishedName=%s,%s)" %
229 ("CN=test_add_group1,OU=test_add_ou2,OU=test_add_ou1",
231 self
.assertEqual(len(res
), 0)
233 def test_add_u4(self
):
234 """ 4 Testing OU with the rights of Doman Admin creator of the OU"""
235 self
.assert_top_ou_deleted()
236 self
.ldb_owner
.create_ou("OU=test_add_ou1," + self
.base_dn
)
237 self
.ldb_owner
.create_ou("OU=test_add_ou2,OU=test_add_ou1," + self
.base_dn
)
238 self
.ldb_owner
.newuser(self
.test_user1
, self
.user_pass
, userou
=self
.ou2
)
239 self
.ldb_owner
.newgroup("test_add_group1", groupou
="OU=test_add_ou2,OU=test_add_ou1",
240 grouptype
=samba
.dsdb
.GTYPE_DISTRIBUTION_DOMAIN_LOCAL_GROUP
)
241 # Make sure we have successfully created the two objects -- user and group
242 res
= self
.ldb_admin
.search(self
.base_dn
, expression
="(distinguishedName=%s,%s)" % ("CN=test_add_user1,OU=test_add_ou2,OU=test_add_ou1", self
.base_dn
))
243 self
.assertTrue(len(res
) > 0)
244 res
= self
.ldb_admin
.search(self
.base_dn
,
245 expression
="(distinguishedName=%s,%s)" % ("CN=test_add_group1,OU=test_add_ou2,OU=test_add_ou1", self
.base_dn
))
246 self
.assertTrue(len(res
) > 0)
248 def test_add_anonymous(self
):
249 """Test add operation with anonymous user"""
250 anonymous
= SamDB(url
=ldaphost
, credentials
=self
.creds_tmp
, lp
=lp
)
252 anonymous
.newuser("test_add_anonymous", self
.user_pass
)
253 except LdbError
, (num
, _
):
254 self
.assertEquals(num
, ERR_OPERATIONS_ERROR
)
258 #tests on ldap modify operations
259 class AclModifyTests(AclTests
):
262 super(AclModifyTests
, self
).setUp()
263 self
.user_with_wp
= "acl_mod_user1"
264 self
.user_with_sm
= "acl_mod_user2"
265 self
.user_with_group_sm
= "acl_mod_user3"
266 self
.ldb_admin
.newuser(self
.user_with_wp
, self
.user_pass
)
267 self
.ldb_admin
.newuser(self
.user_with_sm
, self
.user_pass
)
268 self
.ldb_admin
.newuser(self
.user_with_group_sm
, self
.user_pass
)
269 self
.ldb_user
= self
.get_ldb_connection(self
.user_with_wp
, self
.user_pass
)
270 self
.ldb_user2
= self
.get_ldb_connection(self
.user_with_sm
, self
.user_pass
)
271 self
.ldb_user3
= self
.get_ldb_connection(self
.user_with_group_sm
, self
.user_pass
)
272 self
.user_sid
= self
.sd_utils
.get_object_sid( self
.get_user_dn(self
.user_with_wp
))
273 self
.ldb_admin
.newgroup("test_modify_group2", grouptype
=samba
.dsdb
.GTYPE_DISTRIBUTION_DOMAIN_LOCAL_GROUP
)
274 self
.ldb_admin
.newgroup("test_modify_group3", grouptype
=samba
.dsdb
.GTYPE_DISTRIBUTION_DOMAIN_LOCAL_GROUP
)
275 self
.ldb_admin
.newuser("test_modify_user2", self
.user_pass
)
278 super(AclModifyTests
, self
).tearDown()
279 delete_force(self
.ldb_admin
, self
.get_user_dn("test_modify_user1"))
280 delete_force(self
.ldb_admin
, "CN=test_modify_group1,CN=Users," + self
.base_dn
)
281 delete_force(self
.ldb_admin
, "CN=test_modify_group2,CN=Users," + self
.base_dn
)
282 delete_force(self
.ldb_admin
, "CN=test_modify_group3,CN=Users," + self
.base_dn
)
283 delete_force(self
.ldb_admin
, "OU=test_modify_ou1," + self
.base_dn
)
284 delete_force(self
.ldb_admin
, self
.get_user_dn(self
.user_with_wp
))
285 delete_force(self
.ldb_admin
, self
.get_user_dn(self
.user_with_sm
))
286 delete_force(self
.ldb_admin
, self
.get_user_dn(self
.user_with_group_sm
))
287 delete_force(self
.ldb_admin
, self
.get_user_dn("test_modify_user2"))
288 delete_force(self
.ldb_admin
, self
.get_user_dn("test_anonymous"))
290 def test_modify_u1(self
):
291 """5 Modify one attribute if you have DS_WRITE_PROPERTY for it"""
292 mod
= "(OA;;WP;bf967953-0de6-11d0-a285-00aa003049e2;;%s)" % str(self
.user_sid
)
293 # First test object -- User
294 print "Testing modify on User object"
295 self
.ldb_admin
.newuser("test_modify_user1", self
.user_pass
)
296 self
.sd_utils
.dacl_add_ace(self
.get_user_dn("test_modify_user1"), mod
)
298 dn: """ + self
.get_user_dn("test_modify_user1") + """
301 displayName: test_changed"""
302 self
.ldb_user
.modify_ldif(ldif
)
303 res
= self
.ldb_admin
.search(self
.base_dn
,
304 expression
="(distinguishedName=%s)" % self
.get_user_dn("test_modify_user1"))
305 self
.assertEqual(res
[0]["displayName"][0], "test_changed")
306 # Second test object -- Group
307 print "Testing modify on Group object"
308 self
.ldb_admin
.newgroup("test_modify_group1",
309 grouptype
=samba
.dsdb
.GTYPE_DISTRIBUTION_DOMAIN_LOCAL_GROUP
)
310 self
.sd_utils
.dacl_add_ace("CN=test_modify_group1,CN=Users," + self
.base_dn
, mod
)
312 dn: CN=test_modify_group1,CN=Users,""" + self
.base_dn
+ """
315 displayName: test_changed"""
316 self
.ldb_user
.modify_ldif(ldif
)
317 res
= self
.ldb_admin
.search(self
.base_dn
, expression
="(distinguishedName=%s)" % str("CN=test_modify_group1,CN=Users," + self
.base_dn
))
318 self
.assertEqual(res
[0]["displayName"][0], "test_changed")
319 # Third test object -- Organizational Unit
320 print "Testing modify on OU object"
321 #delete_force(self.ldb_admin, "OU=test_modify_ou1," + self.base_dn)
322 self
.ldb_admin
.create_ou("OU=test_modify_ou1," + self
.base_dn
)
323 self
.sd_utils
.dacl_add_ace("OU=test_modify_ou1," + self
.base_dn
, mod
)
325 dn: OU=test_modify_ou1,""" + self
.base_dn
+ """
328 displayName: test_changed"""
329 self
.ldb_user
.modify_ldif(ldif
)
330 res
= self
.ldb_admin
.search(self
.base_dn
, expression
="(distinguishedName=%s)" % str("OU=test_modify_ou1," + self
.base_dn
))
331 self
.assertEqual(res
[0]["displayName"][0], "test_changed")
333 def test_modify_u2(self
):
334 """6 Modify two attributes as you have DS_WRITE_PROPERTY granted only for one of them"""
335 mod
= "(OA;;WP;bf967953-0de6-11d0-a285-00aa003049e2;;%s)" % str(self
.user_sid
)
336 # First test object -- User
337 print "Testing modify on User object"
338 #delete_force(self.ldb_admin, self.get_user_dn("test_modify_user1"))
339 self
.ldb_admin
.newuser("test_modify_user1", self
.user_pass
)
340 self
.sd_utils
.dacl_add_ace(self
.get_user_dn("test_modify_user1"), mod
)
341 # Modify on attribute you have rights for
343 dn: """ + self
.get_user_dn("test_modify_user1") + """
346 displayName: test_changed"""
347 self
.ldb_user
.modify_ldif(ldif
)
348 res
= self
.ldb_admin
.search(self
.base_dn
,
349 expression
="(distinguishedName=%s)" %
350 self
.get_user_dn("test_modify_user1"))
351 self
.assertEqual(res
[0]["displayName"][0], "test_changed")
352 # Modify on attribute you do not have rights for granted
354 dn: """ + self
.get_user_dn("test_modify_user1") + """
357 url: www.samba.org"""
359 self
.ldb_user
.modify_ldif(ldif
)
360 except LdbError
, (num
, _
):
361 self
.assertEquals(num
, ERR_INSUFFICIENT_ACCESS_RIGHTS
)
363 # This 'modify' operation should always throw ERR_INSUFFICIENT_ACCESS_RIGHTS
365 # Second test object -- Group
366 print "Testing modify on Group object"
367 self
.ldb_admin
.newgroup("test_modify_group1",
368 grouptype
=samba
.dsdb
.GTYPE_DISTRIBUTION_DOMAIN_LOCAL_GROUP
)
369 self
.sd_utils
.dacl_add_ace("CN=test_modify_group1,CN=Users," + self
.base_dn
, mod
)
371 dn: CN=test_modify_group1,CN=Users,""" + self
.base_dn
+ """
374 displayName: test_changed"""
375 self
.ldb_user
.modify_ldif(ldif
)
376 res
= self
.ldb_admin
.search(self
.base_dn
,
377 expression
="(distinguishedName=%s)" %
378 str("CN=test_modify_group1,CN=Users," + self
.base_dn
))
379 self
.assertEqual(res
[0]["displayName"][0], "test_changed")
380 # Modify on attribute you do not have rights for granted
382 dn: CN=test_modify_group1,CN=Users,""" + self
.base_dn
+ """
385 url: www.samba.org"""
387 self
.ldb_user
.modify_ldif(ldif
)
388 except LdbError
, (num
, _
):
389 self
.assertEquals(num
, ERR_INSUFFICIENT_ACCESS_RIGHTS
)
391 # This 'modify' operation should always throw ERR_INSUFFICIENT_ACCESS_RIGHTS
393 # Modify on attribute you do not have rights for granted while also modifying something you do have rights for
395 dn: CN=test_modify_group1,CN=Users,""" + self
.base_dn
+ """
400 displayName: test_changed"""
402 self
.ldb_user
.modify_ldif(ldif
)
403 except LdbError
, (num
, _
):
404 self
.assertEquals(num
, ERR_INSUFFICIENT_ACCESS_RIGHTS
)
406 # This 'modify' operation should always throw ERR_INSUFFICIENT_ACCESS_RIGHTS
408 # Second test object -- Organizational Unit
409 print "Testing modify on OU object"
410 self
.ldb_admin
.create_ou("OU=test_modify_ou1," + self
.base_dn
)
411 self
.sd_utils
.dacl_add_ace("OU=test_modify_ou1," + self
.base_dn
, mod
)
413 dn: OU=test_modify_ou1,""" + self
.base_dn
+ """
416 displayName: test_changed"""
417 self
.ldb_user
.modify_ldif(ldif
)
418 res
= self
.ldb_admin
.search(self
.base_dn
,
419 expression
="(distinguishedName=%s)" % str("OU=test_modify_ou1,"
421 self
.assertEqual(res
[0]["displayName"][0], "test_changed")
422 # Modify on attribute you do not have rights for granted
424 dn: OU=test_modify_ou1,""" + self
.base_dn
+ """
427 url: www.samba.org"""
429 self
.ldb_user
.modify_ldif(ldif
)
430 except LdbError
, (num
, _
):
431 self
.assertEquals(num
, ERR_INSUFFICIENT_ACCESS_RIGHTS
)
433 # This 'modify' operation should always throw ERR_INSUFFICIENT_ACCESS_RIGHTS
436 def test_modify_u3(self
):
437 """7 Modify one attribute as you have no what so ever rights granted"""
438 # First test object -- User
439 print "Testing modify on User object"
440 self
.ldb_admin
.newuser("test_modify_user1", self
.user_pass
)
441 # Modify on attribute you do not have rights for granted
443 dn: """ + self
.get_user_dn("test_modify_user1") + """
446 url: www.samba.org"""
448 self
.ldb_user
.modify_ldif(ldif
)
449 except LdbError
, (num
, _
):
450 self
.assertEquals(num
, ERR_INSUFFICIENT_ACCESS_RIGHTS
)
452 # This 'modify' operation should always throw ERR_INSUFFICIENT_ACCESS_RIGHTS
455 # Second test object -- Group
456 print "Testing modify on Group object"
457 self
.ldb_admin
.newgroup("test_modify_group1",
458 grouptype
=samba
.dsdb
.GTYPE_DISTRIBUTION_DOMAIN_LOCAL_GROUP
)
459 # Modify on attribute you do not have rights for granted
461 dn: CN=test_modify_group1,CN=Users,""" + self
.base_dn
+ """
464 url: www.samba.org"""
466 self
.ldb_user
.modify_ldif(ldif
)
467 except LdbError
, (num
, _
):
468 self
.assertEquals(num
, ERR_INSUFFICIENT_ACCESS_RIGHTS
)
470 # This 'modify' operation should always throw ERR_INSUFFICIENT_ACCESS_RIGHTS
473 # Second test object -- Organizational Unit
474 print "Testing modify on OU object"
475 #delete_force(self.ldb_admin, "OU=test_modify_ou1," + self.base_dn)
476 self
.ldb_admin
.create_ou("OU=test_modify_ou1," + self
.base_dn
)
477 # Modify on attribute you do not have rights for granted
479 dn: OU=test_modify_ou1,""" + self
.base_dn
+ """
482 url: www.samba.org"""
484 self
.ldb_user
.modify_ldif(ldif
)
485 except LdbError
, (num
, _
):
486 self
.assertEquals(num
, ERR_INSUFFICIENT_ACCESS_RIGHTS
)
488 # This 'modify' operation should always throw ERR_INSUFFICIENT_ACCESS_RIGHTS
492 def test_modify_u4(self
):
493 """11 Grant WP to PRINCIPAL_SELF and test modify"""
495 dn: """ + self
.get_user_dn(self
.user_with_wp
) + """
497 add: adminDescription
498 adminDescription: blah blah blah"""
500 self
.ldb_user
.modify_ldif(ldif
)
501 except LdbError
, (num
, _
):
502 self
.assertEquals(num
, ERR_INSUFFICIENT_ACCESS_RIGHTS
)
504 # This 'modify' operation should always throw ERR_INSUFFICIENT_ACCESS_RIGHTS
507 mod
= "(OA;;WP;bf967919-0de6-11d0-a285-00aa003049e2;;PS)"
508 self
.sd_utils
.dacl_add_ace(self
.get_user_dn(self
.user_with_wp
), mod
)
509 # Modify on attribute you have rights for
510 self
.ldb_user
.modify_ldif(ldif
)
511 res
= self
.ldb_admin
.search(self
.base_dn
, expression
="(distinguishedName=%s)" \
512 % self
.get_user_dn(self
.user_with_wp
), attrs
=["adminDescription"] )
513 self
.assertEqual(res
[0]["adminDescription"][0], "blah blah blah")
515 def test_modify_u5(self
):
516 """12 test self membership"""
518 dn: CN=test_modify_group2,CN=Users,""" + self
.base_dn
+ """
521 Member: """ + self
.get_user_dn(self
.user_with_sm
)
522 #the user has no rights granted, this should fail
524 self
.ldb_user2
.modify_ldif(ldif
)
525 except LdbError
, (num
, _
):
526 self
.assertEquals(num
, ERR_INSUFFICIENT_ACCESS_RIGHTS
)
528 # This 'modify' operation should always throw ERR_INSUFFICIENT_ACCESS_RIGHTS
531 #grant self-membership, should be able to add himself
532 user_sid
= self
.sd_utils
.get_object_sid(self
.get_user_dn(self
.user_with_sm
))
533 mod
= "(OA;;SW;bf9679c0-0de6-11d0-a285-00aa003049e2;;%s)" % str(user_sid
)
534 self
.sd_utils
.dacl_add_ace("CN=test_modify_group2,CN=Users," + self
.base_dn
, mod
)
535 self
.ldb_user2
.modify_ldif(ldif
)
536 res
= self
.ldb_admin
.search( self
.base_dn
, expression
="(distinguishedName=%s)" \
537 % ("CN=test_modify_group2,CN=Users," + self
.base_dn
), attrs
=["Member"])
538 self
.assertEqual(res
[0]["Member"][0], self
.get_user_dn(self
.user_with_sm
))
541 dn: CN=test_modify_group2,CN=Users,""" + self
.base_dn
+ """
544 Member: CN=test_modify_user2,CN=Users,""" + self
.base_dn
546 self
.ldb_user2
.modify_ldif(ldif
)
547 except LdbError
, (num
, _
):
548 self
.assertEquals(num
, ERR_INSUFFICIENT_ACCESS_RIGHTS
)
552 def test_modify_u6(self
):
553 """13 test self membership"""
555 dn: CN=test_modify_group2,CN=Users,""" + self
.base_dn
+ """
558 Member: """ + self
.get_user_dn(self
.user_with_sm
) + """
559 Member: CN=test_modify_user2,CN=Users,""" + self
.base_dn
561 #grant self-membership, should be able to add himself but not others at the same time
562 user_sid
= self
.sd_utils
.get_object_sid(self
.get_user_dn(self
.user_with_sm
))
563 mod
= "(OA;;SW;bf9679c0-0de6-11d0-a285-00aa003049e2;;%s)" % str(user_sid
)
564 self
.sd_utils
.dacl_add_ace("CN=test_modify_group2,CN=Users," + self
.base_dn
, mod
)
566 self
.ldb_user2
.modify_ldif(ldif
)
567 except LdbError
, (num
, _
):
568 self
.assertEquals(num
, ERR_INSUFFICIENT_ACCESS_RIGHTS
)
572 def test_modify_u7(self
):
573 """13 User with WP modifying Member"""
574 #a second user is given write property permission
575 user_sid
= self
.sd_utils
.get_object_sid(self
.get_user_dn(self
.user_with_wp
))
576 mod
= "(A;;WP;;;%s)" % str(user_sid
)
577 self
.sd_utils
.dacl_add_ace("CN=test_modify_group2,CN=Users," + self
.base_dn
, mod
)
579 dn: CN=test_modify_group2,CN=Users,""" + self
.base_dn
+ """
582 Member: """ + self
.get_user_dn(self
.user_with_wp
)
583 self
.ldb_user
.modify_ldif(ldif
)
584 res
= self
.ldb_admin
.search( self
.base_dn
, expression
="(distinguishedName=%s)" \
585 % ("CN=test_modify_group2,CN=Users," + self
.base_dn
), attrs
=["Member"])
586 self
.assertEqual(res
[0]["Member"][0], self
.get_user_dn(self
.user_with_wp
))
588 dn: CN=test_modify_group2,CN=Users,""" + self
.base_dn
+ """
591 self
.ldb_user
.modify_ldif(ldif
)
593 dn: CN=test_modify_group2,CN=Users,""" + self
.base_dn
+ """
596 Member: CN=test_modify_user2,CN=Users,""" + self
.base_dn
597 self
.ldb_user
.modify_ldif(ldif
)
598 res
= self
.ldb_admin
.search( self
.base_dn
, expression
="(distinguishedName=%s)" \
599 % ("CN=test_modify_group2,CN=Users," + self
.base_dn
), attrs
=["Member"])
600 self
.assertEqual(res
[0]["Member"][0], "CN=test_modify_user2,CN=Users," + self
.base_dn
)
602 def test_modify_anonymous(self
):
603 """Test add operation with anonymous user"""
604 anonymous
= SamDB(url
=ldaphost
, credentials
=self
.creds_tmp
, lp
=lp
)
605 self
.ldb_admin
.newuser("test_anonymous", "samba123@")
607 m
.dn
= Dn(anonymous
, self
.get_user_dn("test_anonymous"))
609 m
["description"] = MessageElement("sambauser2",
614 except LdbError
, (num
, _
):
615 self
.assertEquals(num
, ERR_OPERATIONS_ERROR
)
619 #enable these when we have search implemented
620 class AclSearchTests(AclTests
):
623 super(AclSearchTests
, self
).setUp()
624 # Get the old "dSHeuristics" if it was set
625 dsheuristics
= self
.ldb_admin
.get_dsheuristics()
626 # Reset the "dSHeuristics" as they were before
627 self
.addCleanup(self
.ldb_admin
.set_dsheuristics
, dsheuristics
)
628 # Set the "dSHeuristics" to activate the correct "userPassword" behaviour
629 self
.ldb_admin
.set_dsheuristics("000000001")
630 # Get the old "minPwdAge"
631 minPwdAge
= self
.ldb_admin
.get_minPwdAge()
632 # Reset the "minPwdAge" as it was before
633 self
.addCleanup(self
.ldb_admin
.set_minPwdAge
, minPwdAge
)
634 # Set it temporarely to "0"
635 self
.ldb_admin
.set_minPwdAge("0")
637 self
.u1
= "search_u1"
638 self
.u2
= "search_u2"
639 self
.u3
= "search_u3"
640 self
.group1
= "group1"
641 self
.ldb_admin
.newuser(self
.u1
, self
.user_pass
)
642 self
.ldb_admin
.newuser(self
.u2
, self
.user_pass
)
643 self
.ldb_admin
.newuser(self
.u3
, self
.user_pass
)
644 self
.ldb_admin
.newgroup(self
.group1
, grouptype
=samba
.dsdb
.GTYPE_SECURITY_GLOBAL_GROUP
)
645 self
.ldb_admin
.add_remove_group_members(self
.group1
, [self
.u2
],
646 add_members_operation
=True)
647 self
.ldb_user
= self
.get_ldb_connection(self
.u1
, self
.user_pass
)
648 self
.ldb_user2
= self
.get_ldb_connection(self
.u2
, self
.user_pass
)
649 self
.ldb_user3
= self
.get_ldb_connection(self
.u3
, self
.user_pass
)
650 self
.full_list
= [Dn(self
.ldb_admin
, "OU=ou2,OU=ou1," + self
.base_dn
),
651 Dn(self
.ldb_admin
, "OU=ou1," + self
.base_dn
),
652 Dn(self
.ldb_admin
, "OU=ou3,OU=ou2,OU=ou1," + self
.base_dn
),
653 Dn(self
.ldb_admin
, "OU=ou4,OU=ou2,OU=ou1," + self
.base_dn
),
654 Dn(self
.ldb_admin
, "OU=ou5,OU=ou3,OU=ou2,OU=ou1," + self
.base_dn
),
655 Dn(self
.ldb_admin
, "OU=ou6,OU=ou4,OU=ou2,OU=ou1," + self
.base_dn
)]
656 self
.user_sid
= self
.sd_utils
.get_object_sid(self
.get_user_dn(self
.u1
))
657 self
.group_sid
= self
.sd_utils
.get_object_sid(self
.get_user_dn(self
.group1
))
659 def create_clean_ou(self
, object_dn
):
660 """ Base repeating setup for unittests to follow """
661 res
= self
.ldb_admin
.search(base
=self
.base_dn
, scope
=SCOPE_SUBTREE
, \
662 expression
="distinguishedName=%s" % object_dn
)
663 # Make sure top testing OU has been deleted before starting the test
664 self
.assertEqual(len(res
), 0)
665 self
.ldb_admin
.create_ou(object_dn
)
666 desc_sddl
= self
.sd_utils
.get_sd_as_sddl(object_dn
)
667 # Make sure there are inheritable ACEs initially
668 self
.assertTrue("CI" in desc_sddl
or "OI" in desc_sddl
)
669 # Find and remove all inherit ACEs
670 res
= re
.findall("\(.*?\)", desc_sddl
)
671 res
= [x
for x
in res
if ("CI" in x
) or ("OI" in x
)]
673 desc_sddl
= desc_sddl
.replace(x
, "")
674 # Add flag 'protected' in both DACL and SACL so no inherit ACEs
675 # can propagate from above
676 # remove SACL, we are not interested
677 desc_sddl
= desc_sddl
.replace(":AI", ":AIP")
678 self
.sd_utils
.modify_sd_on_dn(object_dn
, desc_sddl
)
679 # Verify all inheritable ACEs are gone
680 desc_sddl
= self
.sd_utils
.get_sd_as_sddl(object_dn
)
681 self
.assertFalse("CI" in desc_sddl
)
682 self
.assertFalse("OI" in desc_sddl
)
685 super(AclSearchTests
, self
).tearDown()
686 delete_force(self
.ldb_admin
, "OU=test_search_ou2,OU=test_search_ou1," + self
.base_dn
)
687 delete_force(self
.ldb_admin
, "OU=test_search_ou1," + self
.base_dn
)
688 delete_force(self
.ldb_admin
, "OU=ou6,OU=ou4,OU=ou2,OU=ou1," + self
.base_dn
)
689 delete_force(self
.ldb_admin
, "OU=ou5,OU=ou3,OU=ou2,OU=ou1," + self
.base_dn
)
690 delete_force(self
.ldb_admin
, "OU=ou4,OU=ou2,OU=ou1," + self
.base_dn
)
691 delete_force(self
.ldb_admin
, "OU=ou3,OU=ou2,OU=ou1," + self
.base_dn
)
692 delete_force(self
.ldb_admin
, "OU=ou2,OU=ou1," + self
.base_dn
)
693 delete_force(self
.ldb_admin
, "OU=ou1," + self
.base_dn
)
694 delete_force(self
.ldb_admin
, self
.get_user_dn("search_u1"))
695 delete_force(self
.ldb_admin
, self
.get_user_dn("search_u2"))
696 delete_force(self
.ldb_admin
, self
.get_user_dn("search_u3"))
697 delete_force(self
.ldb_admin
, self
.get_user_dn("group1"))
699 def test_search_anonymous1(self
):
700 """Verify access of rootDSE with the correct request"""
701 anonymous
= SamDB(url
=ldaphost
, credentials
=self
.creds_tmp
, lp
=lp
)
702 res
= anonymous
.search("", expression
="(objectClass=*)", scope
=SCOPE_BASE
)
703 self
.assertEquals(len(res
), 1)
704 #verify some of the attributes
705 #dont care about values
706 self
.assertTrue("ldapServiceName" in res
[0])
707 self
.assertTrue("namingContexts" in res
[0])
708 self
.assertTrue("isSynchronized" in res
[0])
709 self
.assertTrue("dsServiceName" in res
[0])
710 self
.assertTrue("supportedSASLMechanisms" in res
[0])
711 self
.assertTrue("isGlobalCatalogReady" in res
[0])
712 self
.assertTrue("domainControllerFunctionality" in res
[0])
713 self
.assertTrue("serverName" in res
[0])
715 def test_search_anonymous2(self
):
716 """Make sure we cannot access anything else"""
717 anonymous
= SamDB(url
=ldaphost
, credentials
=self
.creds_tmp
, lp
=lp
)
719 res
= anonymous
.search("", expression
="(objectClass=*)", scope
=SCOPE_SUBTREE
)
720 except LdbError
, (num
, _
):
721 self
.assertEquals(num
, ERR_OPERATIONS_ERROR
)
725 res
= anonymous
.search(self
.base_dn
, expression
="(objectClass=*)", scope
=SCOPE_SUBTREE
)
726 except LdbError
, (num
, _
):
727 self
.assertEquals(num
, ERR_OPERATIONS_ERROR
)
731 res
= anonymous
.search(anonymous
.get_config_basedn(), expression
="(objectClass=*)",
733 except LdbError
, (num
, _
):
734 self
.assertEquals(num
, ERR_OPERATIONS_ERROR
)
738 def test_search_anonymous3(self
):
739 """Set dsHeuristics and repeat"""
740 self
.ldb_admin
.set_dsheuristics("0000002")
741 self
.ldb_admin
.create_ou("OU=test_search_ou1," + self
.base_dn
)
742 mod
= "(A;CI;LC;;;AN)"
743 self
.sd_utils
.dacl_add_ace("OU=test_search_ou1," + self
.base_dn
, mod
)
744 self
.ldb_admin
.create_ou("OU=test_search_ou2,OU=test_search_ou1," + self
.base_dn
)
745 anonymous
= SamDB(url
=ldaphost
, credentials
=self
.creds_tmp
, lp
=lp
)
746 res
= anonymous
.search("OU=test_search_ou2,OU=test_search_ou1," + self
.base_dn
,
747 expression
="(objectClass=*)", scope
=SCOPE_SUBTREE
)
748 self
.assertEquals(len(res
), 1)
749 self
.assertTrue("dn" in res
[0])
750 self
.assertTrue(res
[0]["dn"] == Dn(self
.ldb_admin
,
751 "OU=test_search_ou2,OU=test_search_ou1," + self
.base_dn
))
752 res
= anonymous
.search(anonymous
.get_config_basedn(), expression
="(objectClass=*)",
754 self
.assertEquals(len(res
), 1)
755 self
.assertTrue("dn" in res
[0])
756 self
.assertTrue(res
[0]["dn"] == Dn(self
.ldb_admin
, self
.configuration_dn
))
758 def test_search1(self
):
759 """Make sure users can see us if given LC to user and group"""
760 self
.create_clean_ou("OU=ou1," + self
.base_dn
)
761 mod
= "(A;;LC;;;%s)(A;;LC;;;%s)" % (str(self
.user_sid
), str(self
.group_sid
))
762 self
.sd_utils
.dacl_add_ace("OU=ou1," + self
.base_dn
, mod
)
763 tmp_desc
= security
.descriptor
.from_sddl("D:(A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;DA)" + mod
,
765 self
.ldb_admin
.create_ou("OU=ou2,OU=ou1," + self
.base_dn
, sd
=tmp_desc
)
766 self
.ldb_admin
.create_ou("OU=ou3,OU=ou2,OU=ou1," + self
.base_dn
, sd
=tmp_desc
)
767 self
.ldb_admin
.create_ou("OU=ou4,OU=ou2,OU=ou1," + self
.base_dn
, sd
=tmp_desc
)
768 self
.ldb_admin
.create_ou("OU=ou5,OU=ou3,OU=ou2,OU=ou1," + self
.base_dn
, sd
=tmp_desc
)
769 self
.ldb_admin
.create_ou("OU=ou6,OU=ou4,OU=ou2,OU=ou1," + self
.base_dn
, sd
=tmp_desc
)
771 #regular users must see only ou1 and ou2
772 res
= self
.ldb_user3
.search("OU=ou1," + self
.base_dn
, expression
="(objectClass=*)",
774 self
.assertEquals(len(res
), 2)
775 ok_list
= [Dn(self
.ldb_admin
, "OU=ou2,OU=ou1," + self
.base_dn
),
776 Dn(self
.ldb_admin
, "OU=ou1," + self
.base_dn
)]
778 res_list
= [ x
["dn"] for x
in res
if x
["dn"] in ok_list
]
779 self
.assertEquals(sorted(res_list
), sorted(ok_list
))
781 #these users should see all ous
782 res
= self
.ldb_user
.search("OU=ou1," + self
.base_dn
, expression
="(objectClass=*)",
784 self
.assertEquals(len(res
), 6)
785 res_list
= [ x
["dn"] for x
in res
if x
["dn"] in self
.full_list
]
786 self
.assertEquals(sorted(res_list
), sorted(self
.full_list
))
788 res
= self
.ldb_user2
.search("OU=ou1," + self
.base_dn
, expression
="(objectClass=*)",
790 self
.assertEquals(len(res
), 6)
791 res_list
= [ x
["dn"] for x
in res
if x
["dn"] in self
.full_list
]
792 self
.assertEquals(sorted(res_list
), sorted(self
.full_list
))
794 def test_search2(self
):
795 """Make sure users can't see us if access is explicitly denied"""
796 self
.create_clean_ou("OU=ou1," + self
.base_dn
)
797 self
.ldb_admin
.create_ou("OU=ou2,OU=ou1," + self
.base_dn
)
798 self
.ldb_admin
.create_ou("OU=ou3,OU=ou2,OU=ou1," + self
.base_dn
)
799 self
.ldb_admin
.create_ou("OU=ou4,OU=ou2,OU=ou1," + self
.base_dn
)
800 self
.ldb_admin
.create_ou("OU=ou5,OU=ou3,OU=ou2,OU=ou1," + self
.base_dn
)
801 self
.ldb_admin
.create_ou("OU=ou6,OU=ou4,OU=ou2,OU=ou1," + self
.base_dn
)
802 mod
= "(D;;LC;;;%s)(D;;LC;;;%s)" % (str(self
.user_sid
), str(self
.group_sid
))
803 self
.sd_utils
.dacl_add_ace("OU=ou2,OU=ou1," + self
.base_dn
, mod
)
804 res
= self
.ldb_user3
.search("OU=ou1," + self
.base_dn
, expression
="(objectClass=*)",
806 #this user should see all ous
807 res_list
= [ x
["dn"] for x
in res
if x
["dn"] in self
.full_list
]
808 self
.assertEquals(sorted(res_list
), sorted(self
.full_list
))
810 #these users should see ou1, 2, 5 and 6 but not 3 and 4
811 res
= self
.ldb_user
.search("OU=ou1," + self
.base_dn
, expression
="(objectClass=*)",
813 ok_list
= [Dn(self
.ldb_admin
, "OU=ou2,OU=ou1," + self
.base_dn
),
814 Dn(self
.ldb_admin
, "OU=ou1," + self
.base_dn
),
815 Dn(self
.ldb_admin
, "OU=ou5,OU=ou3,OU=ou2,OU=ou1," + self
.base_dn
),
816 Dn(self
.ldb_admin
, "OU=ou6,OU=ou4,OU=ou2,OU=ou1," + self
.base_dn
)]
817 res_list
= [ x
["dn"] for x
in res
if x
["dn"] in ok_list
]
818 self
.assertEquals(sorted(res_list
), sorted(ok_list
))
820 res
= self
.ldb_user2
.search("OU=ou1," + self
.base_dn
, expression
="(objectClass=*)",
822 self
.assertEquals(len(res
), 4)
823 res_list
= [ x
["dn"] for x
in res
if x
["dn"] in ok_list
]
824 self
.assertEquals(sorted(res_list
), sorted(ok_list
))
826 def test_search3(self
):
827 """Make sure users can't see ous if access is explicitly denied - 2"""
828 self
.create_clean_ou("OU=ou1," + self
.base_dn
)
829 mod
= "(A;CI;LC;;;%s)(A;CI;LC;;;%s)" % (str(self
.user_sid
), str(self
.group_sid
))
830 self
.sd_utils
.dacl_add_ace("OU=ou1," + self
.base_dn
, mod
)
831 tmp_desc
= security
.descriptor
.from_sddl("D:(A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;DA)" + mod
,
833 self
.ldb_admin
.create_ou("OU=ou2,OU=ou1," + self
.base_dn
, sd
=tmp_desc
)
834 self
.ldb_admin
.create_ou("OU=ou3,OU=ou2,OU=ou1," + self
.base_dn
, sd
=tmp_desc
)
835 self
.ldb_admin
.create_ou("OU=ou4,OU=ou2,OU=ou1," + self
.base_dn
, sd
=tmp_desc
)
836 self
.ldb_admin
.create_ou("OU=ou5,OU=ou3,OU=ou2,OU=ou1," + self
.base_dn
, sd
=tmp_desc
)
837 self
.ldb_admin
.create_ou("OU=ou6,OU=ou4,OU=ou2,OU=ou1," + self
.base_dn
, sd
=tmp_desc
)
839 print "Testing correct behavior on nonaccessible search base"
841 self
.ldb_user3
.search("OU=ou3,OU=ou2,OU=ou1," + self
.base_dn
, expression
="(objectClass=*)",
843 except LdbError
, (num
, _
):
844 self
.assertEquals(num
, ERR_NO_SUCH_OBJECT
)
848 mod
= "(D;;LC;;;%s)(D;;LC;;;%s)" % (str(self
.user_sid
), str(self
.group_sid
))
849 self
.sd_utils
.dacl_add_ace("OU=ou2,OU=ou1," + self
.base_dn
, mod
)
851 ok_list
= [Dn(self
.ldb_admin
, "OU=ou2,OU=ou1," + self
.base_dn
),
852 Dn(self
.ldb_admin
, "OU=ou1," + self
.base_dn
)]
854 res
= self
.ldb_user3
.search("OU=ou1," + self
.base_dn
, expression
="(objectClass=*)",
856 res_list
= [ x
["dn"] for x
in res
if x
["dn"] in ok_list
]
857 self
.assertEquals(sorted(res_list
), sorted(ok_list
))
859 ok_list
= [Dn(self
.ldb_admin
, "OU=ou2,OU=ou1," + self
.base_dn
),
860 Dn(self
.ldb_admin
, "OU=ou1," + self
.base_dn
),
861 Dn(self
.ldb_admin
, "OU=ou5,OU=ou3,OU=ou2,OU=ou1," + self
.base_dn
),
862 Dn(self
.ldb_admin
, "OU=ou6,OU=ou4,OU=ou2,OU=ou1," + self
.base_dn
)]
864 #should not see ou3 and ou4, but should see ou5 and ou6
865 res
= self
.ldb_user
.search("OU=ou1," + self
.base_dn
, expression
="(objectClass=*)",
867 self
.assertEquals(len(res
), 4)
868 res_list
= [ x
["dn"] for x
in res
if x
["dn"] in ok_list
]
869 self
.assertEquals(sorted(res_list
), sorted(ok_list
))
871 res
= self
.ldb_user2
.search("OU=ou1," + self
.base_dn
, expression
="(objectClass=*)",
873 self
.assertEquals(len(res
), 4)
874 res_list
= [ x
["dn"] for x
in res
if x
["dn"] in ok_list
]
875 self
.assertEquals(sorted(res_list
), sorted(ok_list
))
877 def test_search4(self
):
878 """There is no difference in visibility if the user is also creator"""
879 self
.create_clean_ou("OU=ou1," + self
.base_dn
)
880 mod
= "(A;CI;CC;;;%s)" % (str(self
.user_sid
))
881 self
.sd_utils
.dacl_add_ace("OU=ou1," + self
.base_dn
, mod
)
882 tmp_desc
= security
.descriptor
.from_sddl("D:(A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;DA)" + mod
,
884 self
.ldb_user
.create_ou("OU=ou2,OU=ou1," + self
.base_dn
, sd
=tmp_desc
)
885 self
.ldb_user
.create_ou("OU=ou3,OU=ou2,OU=ou1," + self
.base_dn
, sd
=tmp_desc
)
886 self
.ldb_user
.create_ou("OU=ou4,OU=ou2,OU=ou1," + self
.base_dn
, sd
=tmp_desc
)
887 self
.ldb_user
.create_ou("OU=ou5,OU=ou3,OU=ou2,OU=ou1," + self
.base_dn
, sd
=tmp_desc
)
888 self
.ldb_user
.create_ou("OU=ou6,OU=ou4,OU=ou2,OU=ou1," + self
.base_dn
, sd
=tmp_desc
)
890 ok_list
= [Dn(self
.ldb_admin
, "OU=ou2,OU=ou1," + self
.base_dn
),
891 Dn(self
.ldb_admin
, "OU=ou1," + self
.base_dn
)]
892 res
= self
.ldb_user3
.search("OU=ou1," + self
.base_dn
, expression
="(objectClass=*)",
894 self
.assertEquals(len(res
), 2)
895 res_list
= [ x
["dn"] for x
in res
if x
["dn"] in ok_list
]
896 self
.assertEquals(sorted(res_list
), sorted(ok_list
))
898 res
= self
.ldb_user
.search("OU=ou1," + self
.base_dn
, expression
="(objectClass=*)",
900 self
.assertEquals(len(res
), 2)
901 res_list
= [ x
["dn"] for x
in res
if x
["dn"] in ok_list
]
902 self
.assertEquals(sorted(res_list
), sorted(ok_list
))
904 def test_search5(self
):
905 """Make sure users can see only attributes they are allowed to see"""
906 self
.create_clean_ou("OU=ou1," + self
.base_dn
)
907 mod
= "(A;CI;LC;;;%s)" % (str(self
.user_sid
))
908 self
.sd_utils
.dacl_add_ace("OU=ou1," + self
.base_dn
, mod
)
909 tmp_desc
= security
.descriptor
.from_sddl("D:(A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;DA)" + mod
,
911 self
.ldb_admin
.create_ou("OU=ou2,OU=ou1," + self
.base_dn
, sd
=tmp_desc
)
912 # assert user can only see dn
913 res
= self
.ldb_user
.search("OU=ou2,OU=ou1," + self
.base_dn
, expression
="(objectClass=*)",
916 self
.assertEquals(len(res
), 1)
917 res_list
= res
[0].keys()
918 self
.assertEquals(res_list
, ok_list
)
920 res
= self
.ldb_user
.search("OU=ou2,OU=ou1," + self
.base_dn
, expression
="(objectClass=*)",
921 scope
=SCOPE_BASE
, attrs
=["ou"])
923 self
.assertEquals(len(res
), 1)
924 res_list
= res
[0].keys()
925 self
.assertEquals(res_list
, ok_list
)
927 #give read property on ou and assert user can only see dn and ou
928 mod
= "(OA;;RP;bf9679f0-0de6-11d0-a285-00aa003049e2;;%s)" % (str(self
.user_sid
))
929 self
.sd_utils
.dacl_add_ace("OU=ou1," + self
.base_dn
, mod
)
930 self
.sd_utils
.dacl_add_ace("OU=ou2,OU=ou1," + self
.base_dn
, mod
)
931 res
= self
.ldb_user
.search("OU=ou2,OU=ou1," + self
.base_dn
, expression
="(objectClass=*)",
933 ok_list
= ['dn', 'ou']
934 self
.assertEquals(len(res
), 1)
935 res_list
= res
[0].keys()
936 self
.assertEquals(sorted(res_list
), sorted(ok_list
))
938 #give read property on Public Information and assert user can see ou and other members
939 mod
= "(OA;;RP;e48d0154-bcf8-11d1-8702-00c04fb96050;;%s)" % (str(self
.user_sid
))
940 self
.sd_utils
.dacl_add_ace("OU=ou1," + self
.base_dn
, mod
)
941 self
.sd_utils
.dacl_add_ace("OU=ou2,OU=ou1," + self
.base_dn
, mod
)
942 res
= self
.ldb_user
.search("OU=ou2,OU=ou1," + self
.base_dn
, expression
="(objectClass=*)",
945 ok_list
= ['dn', 'objectClass', 'ou', 'distinguishedName', 'name', 'objectGUID', 'objectCategory']
946 res_list
= res
[0].keys()
947 self
.assertEquals(sorted(res_list
), sorted(ok_list
))
949 def test_search6(self
):
950 """If an attribute that cannot be read is used in a filter, it is as if the attribute does not exist"""
951 self
.create_clean_ou("OU=ou1," + self
.base_dn
)
952 mod
= "(A;CI;LCCC;;;%s)" % (str(self
.user_sid
))
953 self
.sd_utils
.dacl_add_ace("OU=ou1," + self
.base_dn
, mod
)
954 tmp_desc
= security
.descriptor
.from_sddl("D:(A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;DA)" + mod
,
956 self
.ldb_admin
.create_ou("OU=ou2,OU=ou1," + self
.base_dn
, sd
=tmp_desc
)
957 self
.ldb_user
.create_ou("OU=ou3,OU=ou2,OU=ou1," + self
.base_dn
, sd
=tmp_desc
)
959 res
= self
.ldb_user
.search("OU=ou1," + self
.base_dn
, expression
="(ou=ou3)",
961 #nothing should be returned as ou is not accessible
962 self
.assertEquals(len(res
), 0)
964 #give read property on ou and assert user can only see dn and ou
965 mod
= "(OA;;RP;bf9679f0-0de6-11d0-a285-00aa003049e2;;%s)" % (str(self
.user_sid
))
966 self
.sd_utils
.dacl_add_ace("OU=ou3,OU=ou2,OU=ou1," + self
.base_dn
, mod
)
967 res
= self
.ldb_user
.search("OU=ou1," + self
.base_dn
, expression
="(ou=ou3)",
969 self
.assertEquals(len(res
), 1)
970 ok_list
= ['dn', 'ou']
971 res_list
= res
[0].keys()
972 self
.assertEquals(sorted(res_list
), sorted(ok_list
))
974 #give read property on Public Information and assert user can see ou and other members
975 mod
= "(OA;;RP;e48d0154-bcf8-11d1-8702-00c04fb96050;;%s)" % (str(self
.user_sid
))
976 self
.sd_utils
.dacl_add_ace("OU=ou2,OU=ou1," + self
.base_dn
, mod
)
977 res
= self
.ldb_user
.search("OU=ou1," + self
.base_dn
, expression
="(ou=ou2)",
979 self
.assertEquals(len(res
), 1)
980 ok_list
= ['dn', 'objectClass', 'ou', 'distinguishedName', 'name', 'objectGUID', 'objectCategory']
981 res_list
= res
[0].keys()
982 self
.assertEquals(sorted(res_list
), sorted(ok_list
))
984 #tests on ldap delete operations
985 class AclDeleteTests(AclTests
):
988 super(AclDeleteTests
, self
).setUp()
989 self
.regular_user
= "acl_delete_user1"
990 # Create regular user
991 self
.ldb_admin
.newuser(self
.regular_user
, self
.user_pass
)
992 self
.ldb_user
= self
.get_ldb_connection(self
.regular_user
, self
.user_pass
)
995 super(AclDeleteTests
, self
).tearDown()
996 delete_force(self
.ldb_admin
, self
.get_user_dn("test_delete_user1"))
997 delete_force(self
.ldb_admin
, self
.get_user_dn(self
.regular_user
))
998 delete_force(self
.ldb_admin
, self
.get_user_dn("test_anonymous"))
1000 def test_delete_u1(self
):
1001 """User is prohibited by default to delete another User object"""
1002 # Create user that we try to delete
1003 self
.ldb_admin
.newuser("test_delete_user1", self
.user_pass
)
1004 # Here delete User object should ALWAYS through exception
1006 self
.ldb_user
.delete(self
.get_user_dn("test_delete_user1"))
1007 except LdbError
, (num
, _
):
1008 self
.assertEquals(num
, ERR_INSUFFICIENT_ACCESS_RIGHTS
)
1012 def test_delete_u2(self
):
1013 """User's group has RIGHT_DELETE to another User object"""
1014 user_dn
= self
.get_user_dn("test_delete_user1")
1015 # Create user that we try to delete
1016 self
.ldb_admin
.newuser("test_delete_user1", self
.user_pass
)
1017 mod
= "(A;;SD;;;AU)"
1018 self
.sd_utils
.dacl_add_ace(user_dn
, mod
)
1019 # Try to delete User object
1020 self
.ldb_user
.delete(user_dn
)
1021 res
= self
.ldb_admin
.search(self
.base_dn
,
1022 expression
="(distinguishedName=%s)" % user_dn
)
1023 self
.assertEqual(len(res
), 0)
1025 def test_delete_u3(self
):
1026 """User indentified by SID has RIGHT_DELETE to another User object"""
1027 user_dn
= self
.get_user_dn("test_delete_user1")
1028 # Create user that we try to delete
1029 self
.ldb_admin
.newuser("test_delete_user1", self
.user_pass
)
1030 mod
= "(A;;SD;;;%s)" % self
.sd_utils
.get_object_sid(self
.get_user_dn(self
.regular_user
))
1031 self
.sd_utils
.dacl_add_ace(user_dn
, mod
)
1032 # Try to delete User object
1033 self
.ldb_user
.delete(user_dn
)
1034 res
= self
.ldb_admin
.search(self
.base_dn
,
1035 expression
="(distinguishedName=%s)" % user_dn
)
1036 self
.assertEqual(len(res
), 0)
1038 def test_delete_anonymous(self
):
1039 """Test add operation with anonymous user"""
1040 anonymous
= SamDB(url
=ldaphost
, credentials
=self
.creds_tmp
, lp
=lp
)
1041 self
.ldb_admin
.newuser("test_anonymous", "samba123@")
1044 anonymous
.delete(self
.get_user_dn("test_anonymous"))
1045 except LdbError
, (num
, _
):
1046 self
.assertEquals(num
, ERR_OPERATIONS_ERROR
)
1050 #tests on ldap rename operations
1051 class AclRenameTests(AclTests
):
1054 super(AclRenameTests
, self
).setUp()
1055 self
.regular_user
= "acl_rename_user1"
1056 self
.ou1
= "OU=test_rename_ou1"
1057 self
.ou2
= "OU=test_rename_ou2"
1058 self
.ou3
= "OU=test_rename_ou3,%s" % self
.ou2
1059 self
.testuser1
= "test_rename_user1"
1060 self
.testuser2
= "test_rename_user2"
1061 self
.testuser3
= "test_rename_user3"
1062 self
.testuser4
= "test_rename_user4"
1063 self
.testuser5
= "test_rename_user5"
1064 # Create regular user
1065 self
.ldb_admin
.newuser(self
.regular_user
, self
.user_pass
)
1066 self
.ldb_user
= self
.get_ldb_connection(self
.regular_user
, self
.user_pass
)
1069 super(AclRenameTests
, self
).tearDown()
1071 delete_force(self
.ldb_admin
, "CN=%s,%s,%s" % (self
.testuser1
, self
.ou3
, self
.base_dn
))
1072 delete_force(self
.ldb_admin
, "CN=%s,%s,%s" % (self
.testuser2
, self
.ou3
, self
.base_dn
))
1073 delete_force(self
.ldb_admin
, "CN=%s,%s,%s" % (self
.testuser5
, self
.ou3
, self
.base_dn
))
1074 delete_force(self
.ldb_admin
, "%s,%s" % (self
.ou3
, self
.base_dn
))
1076 delete_force(self
.ldb_admin
, "CN=%s,%s,%s" % (self
.testuser1
, self
.ou2
, self
.base_dn
))
1077 delete_force(self
.ldb_admin
, "CN=%s,%s,%s" % (self
.testuser2
, self
.ou2
, self
.base_dn
))
1078 delete_force(self
.ldb_admin
, "CN=%s,%s,%s" % (self
.testuser5
, self
.ou2
, self
.base_dn
))
1079 delete_force(self
.ldb_admin
, "%s,%s" % (self
.ou2
, self
.base_dn
))
1081 delete_force(self
.ldb_admin
, "CN=%s,%s,%s" % (self
.testuser1
, self
.ou1
, self
.base_dn
))
1082 delete_force(self
.ldb_admin
, "CN=%s,%s,%s" % (self
.testuser2
, self
.ou1
, self
.base_dn
))
1083 delete_force(self
.ldb_admin
, "CN=%s,%s,%s" % (self
.testuser5
, self
.ou1
, self
.base_dn
))
1084 delete_force(self
.ldb_admin
, "OU=test_rename_ou3,%s,%s" % (self
.ou1
, self
.base_dn
))
1085 delete_force(self
.ldb_admin
, "%s,%s" % (self
.ou1
, self
.base_dn
))
1086 delete_force(self
.ldb_admin
, self
.get_user_dn(self
.regular_user
))
1088 def test_rename_u1(self
):
1089 """Regular user fails to rename 'User object' within single OU"""
1090 # Create OU structure
1091 self
.ldb_admin
.create_ou("OU=test_rename_ou1," + self
.base_dn
)
1092 self
.ldb_admin
.newuser(self
.testuser1
, self
.user_pass
, userou
=self
.ou1
)
1094 self
.ldb_user
.rename("CN=%s,%s,%s" % (self
.testuser1
, self
.ou1
, self
.base_dn
), \
1095 "CN=%s,%s,%s" % (self
.testuser5
, self
.ou1
, self
.base_dn
))
1096 except LdbError
, (num
, _
):
1097 self
.assertEquals(num
, ERR_INSUFFICIENT_ACCESS_RIGHTS
)
1101 def test_rename_u2(self
):
1102 """Grant WRITE_PROPERTY to AU so regular user can rename 'User object' within single OU"""
1103 ou_dn
= "OU=test_rename_ou1," + self
.base_dn
1104 user_dn
= "CN=test_rename_user1," + ou_dn
1105 rename_user_dn
= "CN=test_rename_user5," + ou_dn
1106 # Create OU structure
1107 self
.ldb_admin
.create_ou(ou_dn
)
1108 self
.ldb_admin
.newuser(self
.testuser1
, self
.user_pass
, userou
=self
.ou1
)
1109 mod
= "(A;;WP;;;AU)"
1110 self
.sd_utils
.dacl_add_ace(user_dn
, mod
)
1111 # Rename 'User object' having WP to AU
1112 self
.ldb_user
.rename(user_dn
, rename_user_dn
)
1113 res
= self
.ldb_admin
.search(self
.base_dn
,
1114 expression
="(distinguishedName=%s)" % user_dn
)
1115 self
.assertEqual(len(res
), 0)
1116 res
= self
.ldb_admin
.search(self
.base_dn
,
1117 expression
="(distinguishedName=%s)" % rename_user_dn
)
1118 self
.assertNotEqual(len(res
), 0)
1120 def test_rename_u3(self
):
1121 """Test rename with rights granted to 'User object' SID"""
1122 ou_dn
= "OU=test_rename_ou1," + self
.base_dn
1123 user_dn
= "CN=test_rename_user1," + ou_dn
1124 rename_user_dn
= "CN=test_rename_user5," + ou_dn
1125 # Create OU structure
1126 self
.ldb_admin
.create_ou(ou_dn
)
1127 self
.ldb_admin
.newuser(self
.testuser1
, self
.user_pass
, userou
=self
.ou1
)
1128 sid
= self
.sd_utils
.get_object_sid(self
.get_user_dn(self
.regular_user
))
1129 mod
= "(A;;WP;;;%s)" % str(sid
)
1130 self
.sd_utils
.dacl_add_ace(user_dn
, mod
)
1131 # Rename 'User object' having WP to AU
1132 self
.ldb_user
.rename(user_dn
, rename_user_dn
)
1133 res
= self
.ldb_admin
.search(self
.base_dn
,
1134 expression
="(distinguishedName=%s)" % user_dn
)
1135 self
.assertEqual(len(res
), 0)
1136 res
= self
.ldb_admin
.search(self
.base_dn
,
1137 expression
="(distinguishedName=%s)" % rename_user_dn
)
1138 self
.assertNotEqual(len(res
), 0)
1140 def test_rename_u4(self
):
1141 """Rename 'User object' cross OU with WP, SD and CC right granted on reg. user to AU"""
1142 ou1_dn
= "OU=test_rename_ou1," + self
.base_dn
1143 ou2_dn
= "OU=test_rename_ou2," + self
.base_dn
1144 user_dn
= "CN=test_rename_user2," + ou1_dn
1145 rename_user_dn
= "CN=test_rename_user5," + ou2_dn
1146 # Create OU structure
1147 self
.ldb_admin
.create_ou(ou1_dn
)
1148 self
.ldb_admin
.create_ou(ou2_dn
)
1149 self
.ldb_admin
.newuser(self
.testuser2
, self
.user_pass
, userou
=self
.ou1
)
1150 mod
= "(A;;WPSD;;;AU)"
1151 self
.sd_utils
.dacl_add_ace(user_dn
, mod
)
1152 mod
= "(A;;CC;;;AU)"
1153 self
.sd_utils
.dacl_add_ace(ou2_dn
, mod
)
1154 # Rename 'User object' having SD and CC to AU
1155 self
.ldb_user
.rename(user_dn
, rename_user_dn
)
1156 res
= self
.ldb_admin
.search(self
.base_dn
,
1157 expression
="(distinguishedName=%s)" % user_dn
)
1158 self
.assertEqual(len(res
), 0)
1159 res
= self
.ldb_admin
.search(self
.base_dn
,
1160 expression
="(distinguishedName=%s)" % rename_user_dn
)
1161 self
.assertNotEqual(len(res
), 0)
1163 def test_rename_u5(self
):
1164 """Test rename with rights granted to 'User object' SID"""
1165 ou1_dn
= "OU=test_rename_ou1," + self
.base_dn
1166 ou2_dn
= "OU=test_rename_ou2," + self
.base_dn
1167 user_dn
= "CN=test_rename_user2," + ou1_dn
1168 rename_user_dn
= "CN=test_rename_user5," + ou2_dn
1169 # Create OU structure
1170 self
.ldb_admin
.create_ou(ou1_dn
)
1171 self
.ldb_admin
.create_ou(ou2_dn
)
1172 self
.ldb_admin
.newuser(self
.testuser2
, self
.user_pass
, userou
=self
.ou1
)
1173 sid
= self
.sd_utils
.get_object_sid(self
.get_user_dn(self
.regular_user
))
1174 mod
= "(A;;WPSD;;;%s)" % str(sid
)
1175 self
.sd_utils
.dacl_add_ace(user_dn
, mod
)
1176 mod
= "(A;;CC;;;%s)" % str(sid
)
1177 self
.sd_utils
.dacl_add_ace(ou2_dn
, mod
)
1178 # Rename 'User object' having SD and CC to AU
1179 self
.ldb_user
.rename(user_dn
, rename_user_dn
)
1180 res
= self
.ldb_admin
.search(self
.base_dn
,
1181 expression
="(distinguishedName=%s)" % user_dn
)
1182 self
.assertEqual(len(res
), 0)
1183 res
= self
.ldb_admin
.search(self
.base_dn
,
1184 expression
="(distinguishedName=%s)" % rename_user_dn
)
1185 self
.assertNotEqual(len(res
), 0)
1187 def test_rename_u6(self
):
1188 """Rename 'User object' cross OU with WP, DC and CC right granted on OU & user to AU"""
1189 ou1_dn
= "OU=test_rename_ou1," + self
.base_dn
1190 ou2_dn
= "OU=test_rename_ou2," + self
.base_dn
1191 user_dn
= "CN=test_rename_user2," + ou1_dn
1192 rename_user_dn
= "CN=test_rename_user2," + ou2_dn
1193 # Create OU structure
1194 self
.ldb_admin
.create_ou(ou1_dn
)
1195 self
.ldb_admin
.create_ou(ou2_dn
)
1196 #mod = "(A;CI;DCWP;;;AU)"
1197 mod
= "(A;;DC;;;AU)"
1198 self
.sd_utils
.dacl_add_ace(ou1_dn
, mod
)
1199 mod
= "(A;;CC;;;AU)"
1200 self
.sd_utils
.dacl_add_ace(ou2_dn
, mod
)
1201 self
.ldb_admin
.newuser(self
.testuser2
, self
.user_pass
, userou
=self
.ou1
)
1202 mod
= "(A;;WP;;;AU)"
1203 self
.sd_utils
.dacl_add_ace(user_dn
, mod
)
1204 # Rename 'User object' having SD and CC to AU
1205 self
.ldb_user
.rename(user_dn
, rename_user_dn
)
1206 res
= self
.ldb_admin
.search(self
.base_dn
,
1207 expression
="(distinguishedName=%s)" % user_dn
)
1208 self
.assertEqual(len(res
), 0)
1209 res
= self
.ldb_admin
.search(self
.base_dn
,
1210 expression
="(distinguishedName=%s)" % rename_user_dn
)
1211 self
.assertNotEqual(len(res
), 0)
1213 def test_rename_u7(self
):
1214 """Rename 'User object' cross OU (second level) with WP, DC and CC right granted on OU to AU"""
1215 ou1_dn
= "OU=test_rename_ou1," + self
.base_dn
1216 ou2_dn
= "OU=test_rename_ou2," + self
.base_dn
1217 ou3_dn
= "OU=test_rename_ou3," + ou2_dn
1218 user_dn
= "CN=test_rename_user2," + ou1_dn
1219 rename_user_dn
= "CN=test_rename_user5," + ou3_dn
1220 # Create OU structure
1221 self
.ldb_admin
.create_ou(ou1_dn
)
1222 self
.ldb_admin
.create_ou(ou2_dn
)
1223 self
.ldb_admin
.create_ou(ou3_dn
)
1224 mod
= "(A;CI;WPDC;;;AU)"
1225 self
.sd_utils
.dacl_add_ace(ou1_dn
, mod
)
1226 mod
= "(A;;CC;;;AU)"
1227 self
.sd_utils
.dacl_add_ace(ou3_dn
, mod
)
1228 self
.ldb_admin
.newuser(self
.testuser2
, self
.user_pass
, userou
=self
.ou1
)
1229 # Rename 'User object' having SD and CC to AU
1230 self
.ldb_user
.rename(user_dn
, rename_user_dn
)
1231 res
= self
.ldb_admin
.search(self
.base_dn
,
1232 expression
="(distinguishedName=%s)" % user_dn
)
1233 self
.assertEqual(len(res
), 0)
1234 res
= self
.ldb_admin
.search(self
.base_dn
,
1235 expression
="(distinguishedName=%s)" % rename_user_dn
)
1236 self
.assertNotEqual(len(res
), 0)
1238 def test_rename_u8(self
):
1239 """Test rename on an object with and without modify access on the RDN attribute"""
1240 ou1_dn
= "OU=test_rename_ou1," + self
.base_dn
1241 ou2_dn
= "OU=test_rename_ou2," + ou1_dn
1242 ou3_dn
= "OU=test_rename_ou3," + ou1_dn
1243 # Create OU structure
1244 self
.ldb_admin
.create_ou(ou1_dn
)
1245 self
.ldb_admin
.create_ou(ou2_dn
)
1246 sid
= self
.sd_utils
.get_object_sid(self
.get_user_dn(self
.regular_user
))
1247 mod
= "(OA;;WP;bf967a0e-0de6-11d0-a285-00aa003049e2;;%s)" % str(sid
)
1248 self
.sd_utils
.dacl_add_ace(ou2_dn
, mod
)
1249 mod
= "(OD;;WP;bf9679f0-0de6-11d0-a285-00aa003049e2;;%s)" % str(sid
)
1250 self
.sd_utils
.dacl_add_ace(ou2_dn
, mod
)
1252 self
.ldb_user
.rename(ou2_dn
, ou3_dn
)
1253 except LdbError
, (num
, _
):
1254 self
.assertEquals(num
, ERR_INSUFFICIENT_ACCESS_RIGHTS
)
1256 # This rename operation should always throw ERR_INSUFFICIENT_ACCESS_RIGHTS
1258 sid
= self
.sd_utils
.get_object_sid(self
.get_user_dn(self
.regular_user
))
1259 mod
= "(A;;WP;bf9679f0-0de6-11d0-a285-00aa003049e2;;%s)" % str(sid
)
1260 self
.sd_utils
.dacl_add_ace(ou2_dn
, mod
)
1261 self
.ldb_user
.rename(ou2_dn
, ou3_dn
)
1262 res
= self
.ldb_admin
.search(self
.base_dn
, expression
="(distinguishedName=%s)" % ou2_dn
)
1263 self
.assertEqual(len(res
), 0)
1264 res
= self
.ldb_admin
.search(self
.base_dn
, expression
="(distinguishedName=%s)" % ou3_dn
)
1265 self
.assertNotEqual(len(res
), 0)
1267 def test_rename_u9(self
):
1268 """Rename 'User object' cross OU, with explicit deny on sd and dc"""
1269 ou1_dn
= "OU=test_rename_ou1," + self
.base_dn
1270 ou2_dn
= "OU=test_rename_ou2," + self
.base_dn
1271 user_dn
= "CN=test_rename_user2," + ou1_dn
1272 rename_user_dn
= "CN=test_rename_user5," + ou2_dn
1273 # Create OU structure
1274 self
.ldb_admin
.create_ou(ou1_dn
)
1275 self
.ldb_admin
.create_ou(ou2_dn
)
1276 self
.ldb_admin
.newuser(self
.testuser2
, self
.user_pass
, userou
=self
.ou1
)
1277 mod
= "(D;;SD;;;DA)"
1278 self
.sd_utils
.dacl_add_ace(user_dn
, mod
)
1279 mod
= "(D;;DC;;;DA)"
1280 self
.sd_utils
.dacl_add_ace(ou1_dn
, mod
)
1281 # Rename 'User object' having SD and CC to AU
1283 self
.ldb_admin
.rename(user_dn
, rename_user_dn
)
1284 except LdbError
, (num
, _
):
1285 self
.assertEquals(num
, ERR_INSUFFICIENT_ACCESS_RIGHTS
)
1288 #add an allow ace so we can delete this ou
1289 mod
= "(A;;DC;;;DA)"
1290 self
.sd_utils
.dacl_add_ace(ou1_dn
, mod
)
1293 #tests on Control Access Rights
1294 class AclCARTests(AclTests
):
1297 super(AclCARTests
, self
).setUp()
1299 # Get the old "dSHeuristics" if it was set
1300 dsheuristics
= self
.ldb_admin
.get_dsheuristics()
1301 # Reset the "dSHeuristics" as they were before
1302 self
.addCleanup(self
.ldb_admin
.set_dsheuristics
, dsheuristics
)
1303 # Set the "dSHeuristics" to activate the correct "userPassword" behaviour
1304 self
.ldb_admin
.set_dsheuristics("000000001")
1305 # Get the old "minPwdAge"
1306 minPwdAge
= self
.ldb_admin
.get_minPwdAge()
1307 # Reset the "minPwdAge" as it was before
1308 self
.addCleanup(self
.ldb_admin
.set_minPwdAge
, minPwdAge
)
1309 # Set it temporarely to "0"
1310 self
.ldb_admin
.set_minPwdAge("0")
1312 self
.user_with_wp
= "acl_car_user1"
1313 self
.user_with_pc
= "acl_car_user2"
1314 self
.ldb_admin
.newuser(self
.user_with_wp
, self
.user_pass
)
1315 self
.ldb_admin
.newuser(self
.user_with_pc
, self
.user_pass
)
1316 self
.ldb_user
= self
.get_ldb_connection(self
.user_with_wp
, self
.user_pass
)
1317 self
.ldb_user2
= self
.get_ldb_connection(self
.user_with_pc
, self
.user_pass
)
1320 super(AclCARTests
, self
).tearDown()
1321 delete_force(self
.ldb_admin
, self
.get_user_dn(self
.user_with_wp
))
1322 delete_force(self
.ldb_admin
, self
.get_user_dn(self
.user_with_pc
))
1324 def test_change_password1(self
):
1325 """Try a password change operation without any CARs given"""
1326 #users have change password by default - remove for negative testing
1327 desc
= self
.sd_utils
.read_sd_on_dn(self
.get_user_dn(self
.user_with_wp
))
1328 sddl
= desc
.as_sddl(self
.domain_sid
)
1329 sddl
= sddl
.replace("(OA;;CR;ab721a53-1e2f-11d0-9819-00aa0040529b;;WD)", "")
1330 sddl
= sddl
.replace("(OA;;CR;ab721a53-1e2f-11d0-9819-00aa0040529b;;PS)", "")
1331 self
.sd_utils
.modify_sd_on_dn(self
.get_user_dn(self
.user_with_wp
), sddl
)
1333 self
.ldb_user
.modify_ldif("""
1334 dn: """ + self
.get_user_dn(self
.user_with_wp
) + """
1337 unicodePwd:: """ + base64
.b64encode("\"samba123@\"".encode('utf-16-le')) + """
1339 unicodePwd:: """ + base64
.b64encode("\"thatsAcomplPASS2\"".encode('utf-16-le')) + """
1341 except LdbError
, (num
, _
):
1342 self
.assertEquals(num
, ERR_CONSTRAINT_VIOLATION
)
1344 # for some reason we get constraint violation instead of insufficient access error
1347 def test_change_password2(self
):
1348 """Make sure WP has no influence"""
1349 desc
= self
.sd_utils
.read_sd_on_dn(self
.get_user_dn(self
.user_with_wp
))
1350 sddl
= desc
.as_sddl(self
.domain_sid
)
1351 sddl
= sddl
.replace("(OA;;CR;ab721a53-1e2f-11d0-9819-00aa0040529b;;WD)", "")
1352 sddl
= sddl
.replace("(OA;;CR;ab721a53-1e2f-11d0-9819-00aa0040529b;;PS)", "")
1353 self
.sd_utils
.modify_sd_on_dn(self
.get_user_dn(self
.user_with_wp
), sddl
)
1354 mod
= "(A;;WP;;;PS)"
1355 self
.sd_utils
.dacl_add_ace(self
.get_user_dn(self
.user_with_wp
), mod
)
1356 desc
= self
.sd_utils
.read_sd_on_dn(self
.get_user_dn(self
.user_with_wp
))
1357 sddl
= desc
.as_sddl(self
.domain_sid
)
1359 self
.ldb_user
.modify_ldif("""
1360 dn: """ + self
.get_user_dn(self
.user_with_wp
) + """
1363 unicodePwd:: """ + base64
.b64encode("\"samba123@\"".encode('utf-16-le')) + """
1365 unicodePwd:: """ + base64
.b64encode("\"thatsAcomplPASS2\"".encode('utf-16-le')) + """
1367 except LdbError
, (num
, _
):
1368 self
.assertEquals(num
, ERR_CONSTRAINT_VIOLATION
)
1370 # for some reason we get constraint violation instead of insufficient access error
1373 def test_change_password3(self
):
1374 """Make sure WP has no influence"""
1375 mod
= "(D;;WP;;;PS)"
1376 self
.sd_utils
.dacl_add_ace(self
.get_user_dn(self
.user_with_wp
), mod
)
1377 desc
= self
.sd_utils
.read_sd_on_dn(self
.get_user_dn(self
.user_with_wp
))
1378 sddl
= desc
.as_sddl(self
.domain_sid
)
1379 self
.ldb_user
.modify_ldif("""
1380 dn: """ + self
.get_user_dn(self
.user_with_wp
) + """
1383 unicodePwd:: """ + base64
.b64encode("\"samba123@\"".encode('utf-16-le')) + """
1385 unicodePwd:: """ + base64
.b64encode("\"thatsAcomplPASS2\"".encode('utf-16-le')) + """
1388 def test_change_password5(self
):
1389 """Make sure rights have no influence on dBCSPwd"""
1390 desc
= self
.sd_utils
.read_sd_on_dn(self
.get_user_dn(self
.user_with_wp
))
1391 sddl
= desc
.as_sddl(self
.domain_sid
)
1392 sddl
= sddl
.replace("(OA;;CR;ab721a53-1e2f-11d0-9819-00aa0040529b;;WD)", "")
1393 sddl
= sddl
.replace("(OA;;CR;ab721a53-1e2f-11d0-9819-00aa0040529b;;PS)", "")
1394 self
.sd_utils
.modify_sd_on_dn(self
.get_user_dn(self
.user_with_wp
), sddl
)
1395 mod
= "(D;;WP;;;PS)"
1396 self
.sd_utils
.dacl_add_ace(self
.get_user_dn(self
.user_with_wp
), mod
)
1398 self
.ldb_user
.modify_ldif("""
1399 dn: """ + self
.get_user_dn(self
.user_with_wp
) + """
1402 dBCSPwd: XXXXXXXXXXXXXXXX
1404 dBCSPwd: YYYYYYYYYYYYYYYY
1406 except LdbError
, (num
, _
):
1407 self
.assertEquals(num
, ERR_UNWILLING_TO_PERFORM
)
1411 def test_change_password6(self
):
1412 """Test uneven delete/adds"""
1414 self
.ldb_user
.modify_ldif("""
1415 dn: """ + self
.get_user_dn(self
.user_with_wp
) + """
1417 delete: userPassword
1418 userPassword: thatsAcomplPASS1
1419 delete: userPassword
1420 userPassword: thatsAcomplPASS1
1422 userPassword: thatsAcomplPASS2
1424 except LdbError
, (num
, _
):
1425 self
.assertEquals(num
, ERR_INSUFFICIENT_ACCESS_RIGHTS
)
1428 mod
= "(OA;;CR;00299570-246d-11d0-a768-00aa006e0529;;PS)"
1429 self
.sd_utils
.dacl_add_ace(self
.get_user_dn(self
.user_with_wp
), mod
)
1431 self
.ldb_user
.modify_ldif("""
1432 dn: """ + self
.get_user_dn(self
.user_with_wp
) + """
1434 delete: userPassword
1435 userPassword: thatsAcomplPASS1
1436 delete: userPassword
1437 userPassword: thatsAcomplPASS1
1439 userPassword: thatsAcomplPASS2
1441 # This fails on Windows 2000 domain level with constraint violation
1442 except LdbError
, (num
, _
):
1443 self
.assertTrue(num
== ERR_CONSTRAINT_VIOLATION
or
1444 num
== ERR_UNWILLING_TO_PERFORM
)
1449 def test_change_password7(self
):
1450 """Try a password change operation without any CARs given"""
1451 #users have change password by default - remove for negative testing
1452 desc
= self
.sd_utils
.read_sd_on_dn(self
.get_user_dn(self
.user_with_wp
))
1453 sddl
= desc
.as_sddl(self
.domain_sid
)
1454 self
.sd_utils
.modify_sd_on_dn(self
.get_user_dn(self
.user_with_wp
), sddl
)
1455 #first change our own password
1456 self
.ldb_user2
.modify_ldif("""
1457 dn: """ + self
.get_user_dn(self
.user_with_pc
) + """
1460 unicodePwd:: """ + base64
.b64encode("\"samba123@\"".encode('utf-16-le')) + """
1462 unicodePwd:: """ + base64
.b64encode("\"thatsAcomplPASS1\"".encode('utf-16-le')) + """
1464 #then someone else's
1465 self
.ldb_user2
.modify_ldif("""
1466 dn: """ + self
.get_user_dn(self
.user_with_wp
) + """
1469 unicodePwd:: """ + base64
.b64encode("\"samba123@\"".encode('utf-16-le')) + """
1471 unicodePwd:: """ + base64
.b64encode("\"thatsAcomplPASS2\"".encode('utf-16-le')) + """
1474 def test_reset_password1(self
):
1475 """Try a user password reset operation (unicodePwd) before and after granting CAR"""
1477 self
.ldb_user
.modify_ldif("""
1478 dn: """ + self
.get_user_dn(self
.user_with_wp
) + """
1481 unicodePwd:: """ + base64
.b64encode("\"thatsAcomplPASS1\"".encode('utf-16-le')) + """
1483 except LdbError
, (num
, _
):
1484 self
.assertEquals(num
, ERR_INSUFFICIENT_ACCESS_RIGHTS
)
1487 mod
= "(OA;;CR;00299570-246d-11d0-a768-00aa006e0529;;PS)"
1488 self
.sd_utils
.dacl_add_ace(self
.get_user_dn(self
.user_with_wp
), mod
)
1489 self
.ldb_user
.modify_ldif("""
1490 dn: """ + self
.get_user_dn(self
.user_with_wp
) + """
1493 unicodePwd:: """ + base64
.b64encode("\"thatsAcomplPASS1\"".encode('utf-16-le')) + """
1496 def test_reset_password2(self
):
1497 """Try a user password reset operation (userPassword) before and after granting CAR"""
1499 self
.ldb_user
.modify_ldif("""
1500 dn: """ + self
.get_user_dn(self
.user_with_wp
) + """
1502 replace: userPassword
1503 userPassword: thatsAcomplPASS1
1505 except LdbError
, (num
, _
):
1506 self
.assertEquals(num
, ERR_INSUFFICIENT_ACCESS_RIGHTS
)
1509 mod
= "(OA;;CR;00299570-246d-11d0-a768-00aa006e0529;;PS)"
1510 self
.sd_utils
.dacl_add_ace(self
.get_user_dn(self
.user_with_wp
), mod
)
1512 self
.ldb_user
.modify_ldif("""
1513 dn: """ + self
.get_user_dn(self
.user_with_wp
) + """
1515 replace: userPassword
1516 userPassword: thatsAcomplPASS1
1518 # This fails on Windows 2000 domain level with constraint violation
1519 except LdbError
, (num
, _
):
1520 self
.assertEquals(num
, ERR_CONSTRAINT_VIOLATION
)
1522 def test_reset_password3(self
):
1523 """Grant WP and see what happens (unicodePwd)"""
1524 mod
= "(A;;WP;;;PS)"
1525 self
.sd_utils
.dacl_add_ace(self
.get_user_dn(self
.user_with_wp
), mod
)
1527 self
.ldb_user
.modify_ldif("""
1528 dn: """ + self
.get_user_dn(self
.user_with_wp
) + """
1531 unicodePwd:: """ + base64
.b64encode("\"thatsAcomplPASS1\"".encode('utf-16-le')) + """
1533 except LdbError
, (num
, _
):
1534 self
.assertEquals(num
, ERR_INSUFFICIENT_ACCESS_RIGHTS
)
1538 def test_reset_password4(self
):
1539 """Grant WP and see what happens (userPassword)"""
1540 mod
= "(A;;WP;;;PS)"
1541 self
.sd_utils
.dacl_add_ace(self
.get_user_dn(self
.user_with_wp
), mod
)
1543 self
.ldb_user
.modify_ldif("""
1544 dn: """ + self
.get_user_dn(self
.user_with_wp
) + """
1546 replace: userPassword
1547 userPassword: thatsAcomplPASS1
1549 except LdbError
, (num
, _
):
1550 self
.assertEquals(num
, ERR_INSUFFICIENT_ACCESS_RIGHTS
)
1554 def test_reset_password5(self
):
1555 """Explicitly deny WP but grant CAR (unicodePwd)"""
1556 mod
= "(D;;WP;;;PS)(OA;;CR;00299570-246d-11d0-a768-00aa006e0529;;PS)"
1557 self
.sd_utils
.dacl_add_ace(self
.get_user_dn(self
.user_with_wp
), mod
)
1558 self
.ldb_user
.modify_ldif("""
1559 dn: """ + self
.get_user_dn(self
.user_with_wp
) + """
1562 unicodePwd:: """ + base64
.b64encode("\"thatsAcomplPASS1\"".encode('utf-16-le')) + """
1565 def test_reset_password6(self
):
1566 """Explicitly deny WP but grant CAR (userPassword)"""
1567 mod
= "(D;;WP;;;PS)(OA;;CR;00299570-246d-11d0-a768-00aa006e0529;;PS)"
1568 self
.sd_utils
.dacl_add_ace(self
.get_user_dn(self
.user_with_wp
), mod
)
1570 self
.ldb_user
.modify_ldif("""
1571 dn: """ + self
.get_user_dn(self
.user_with_wp
) + """
1573 replace: userPassword
1574 userPassword: thatsAcomplPASS1
1576 # This fails on Windows 2000 domain level with constraint violation
1577 except LdbError
, (num
, _
):
1578 self
.assertEquals(num
, ERR_CONSTRAINT_VIOLATION
)
1580 class AclExtendedTests(AclTests
):
1583 super(AclExtendedTests
, self
).setUp()
1584 #regular user, will be the creator
1590 self
.ldb_admin
.newuser(self
.u1
, self
.user_pass
)
1591 self
.ldb_admin
.newuser(self
.u2
, self
.user_pass
)
1592 self
.ldb_admin
.newuser(self
.u3
, self
.user_pass
)
1593 self
.ldb_admin
.add_remove_group_members("Domain Admins", [self
.u3
],
1594 add_members_operation
=True)
1595 self
.ldb_user1
= self
.get_ldb_connection(self
.u1
, self
.user_pass
)
1596 self
.ldb_user2
= self
.get_ldb_connection(self
.u2
, self
.user_pass
)
1597 self
.ldb_user3
= self
.get_ldb_connection(self
.u3
, self
.user_pass
)
1598 self
.user_sid1
= self
.sd_utils
.get_object_sid(self
.get_user_dn(self
.u1
))
1599 self
.user_sid2
= self
.sd_utils
.get_object_sid(self
.get_user_dn(self
.u2
))
1602 super(AclExtendedTests
, self
).tearDown()
1603 delete_force(self
.ldb_admin
, self
.get_user_dn(self
.u1
))
1604 delete_force(self
.ldb_admin
, self
.get_user_dn(self
.u2
))
1605 delete_force(self
.ldb_admin
, self
.get_user_dn(self
.u3
))
1606 delete_force(self
.ldb_admin
, "CN=ext_group1,OU=ext_ou1," + self
.base_dn
)
1607 delete_force(self
.ldb_admin
, "ou=ext_ou1," + self
.base_dn
)
1609 def test_ntSecurityDescriptor(self
):
1611 self
.ldb_admin
.create_ou("ou=ext_ou1," + self
.base_dn
)
1612 #give u1 Create children access
1613 mod
= "(A;;CC;;;%s)" % str(self
.user_sid1
)
1614 self
.sd_utils
.dacl_add_ace("OU=ext_ou1," + self
.base_dn
, mod
)
1615 mod
= "(A;;LC;;;%s)" % str(self
.user_sid2
)
1616 self
.sd_utils
.dacl_add_ace("OU=ext_ou1," + self
.base_dn
, mod
)
1617 #create a group under that, grant RP to u2
1618 self
.ldb_user1
.newgroup("ext_group1", groupou
="OU=ext_ou1",
1619 grouptype
=samba
.dsdb
.GTYPE_DISTRIBUTION_DOMAIN_LOCAL_GROUP
)
1620 mod
= "(A;;RP;;;%s)" % str(self
.user_sid2
)
1621 self
.sd_utils
.dacl_add_ace("CN=ext_group1,OU=ext_ou1," + self
.base_dn
, mod
)
1622 #u2 must not read the descriptor
1623 res
= self
.ldb_user2
.search("CN=ext_group1,OU=ext_ou1," + self
.base_dn
,
1624 SCOPE_BASE
, None, ["nTSecurityDescriptor"])
1625 self
.assertNotEqual(len(res
), 0)
1626 self
.assertFalse("nTSecurityDescriptor" in res
[0].keys())
1627 #grant RC to u2 - still no access
1628 mod
= "(A;;RC;;;%s)" % str(self
.user_sid2
)
1629 self
.sd_utils
.dacl_add_ace("CN=ext_group1,OU=ext_ou1," + self
.base_dn
, mod
)
1630 res
= self
.ldb_user2
.search("CN=ext_group1,OU=ext_ou1," + self
.base_dn
,
1631 SCOPE_BASE
, None, ["nTSecurityDescriptor"])
1632 self
.assertNotEqual(len(res
), 0)
1633 self
.assertFalse("nTSecurityDescriptor" in res
[0].keys())
1634 #u3 is member of administrators group, should be able to read sd
1635 res
= self
.ldb_user3
.search("CN=ext_group1,OU=ext_ou1," + self
.base_dn
,
1636 SCOPE_BASE
, None, ["nTSecurityDescriptor"])
1637 self
.assertEqual(len(res
),1)
1638 self
.assertTrue("nTSecurityDescriptor" in res
[0].keys())
1640 class AclUndeleteTests(AclTests
):
1643 super(AclUndeleteTests
, self
).setUp()
1644 self
.regular_user
= "undeleter1"
1645 self
.ou1
= "OU=undeleted_ou,"
1646 self
.testuser1
= "to_be_undeleted1"
1647 self
.testuser2
= "to_be_undeleted2"
1648 self
.testuser3
= "to_be_undeleted3"
1649 self
.testuser4
= "to_be_undeleted4"
1650 self
.testuser5
= "to_be_undeleted5"
1651 self
.testuser6
= "to_be_undeleted6"
1653 self
.new_dn_ou
= "CN="+ self
.testuser4
+ "," + self
.ou1
+ self
.base_dn
1655 # Create regular user
1656 self
.testuser1_dn
= self
.get_user_dn(self
.testuser1
)
1657 self
.testuser2_dn
= self
.get_user_dn(self
.testuser2
)
1658 self
.testuser3_dn
= self
.get_user_dn(self
.testuser3
)
1659 self
.testuser4_dn
= self
.get_user_dn(self
.testuser4
)
1660 self
.testuser5_dn
= self
.get_user_dn(self
.testuser5
)
1661 self
.deleted_dn1
= self
.create_delete_user(self
.testuser1
)
1662 self
.deleted_dn2
= self
.create_delete_user(self
.testuser2
)
1663 self
.deleted_dn3
= self
.create_delete_user(self
.testuser3
)
1664 self
.deleted_dn4
= self
.create_delete_user(self
.testuser4
)
1665 self
.deleted_dn5
= self
.create_delete_user(self
.testuser5
)
1667 self
.ldb_admin
.create_ou(self
.ou1
+ self
.base_dn
)
1669 self
.ldb_admin
.newuser(self
.regular_user
, self
.user_pass
)
1670 self
.ldb_admin
.add_remove_group_members("Domain Admins", [self
.regular_user
],
1671 add_members_operation
=True)
1672 self
.ldb_user
= self
.get_ldb_connection(self
.regular_user
, self
.user_pass
)
1673 self
.sid
= self
.sd_utils
.get_object_sid(self
.get_user_dn(self
.regular_user
))
1676 super(AclUndeleteTests
, self
).tearDown()
1677 delete_force(self
.ldb_admin
, self
.get_user_dn(self
.regular_user
))
1678 delete_force(self
.ldb_admin
, self
.get_user_dn(self
.testuser1
))
1679 delete_force(self
.ldb_admin
, self
.get_user_dn(self
.testuser2
))
1680 delete_force(self
.ldb_admin
, self
.get_user_dn(self
.testuser3
))
1681 delete_force(self
.ldb_admin
, self
.get_user_dn(self
.testuser4
))
1682 delete_force(self
.ldb_admin
, self
.get_user_dn(self
.testuser5
))
1683 delete_force(self
.ldb_admin
, self
.new_dn_ou
)
1684 delete_force(self
.ldb_admin
, self
.ou1
+ self
.base_dn
)
1686 def GUID_string(self
, guid
):
1687 return ldb
.schema_format_value("objectGUID", guid
)
1689 def create_delete_user(self
, new_user
):
1690 self
.ldb_admin
.newuser(new_user
, self
.user_pass
)
1692 res
= self
.ldb_admin
.search(expression
="(objectClass=*)",
1693 base
=self
.get_user_dn(new_user
),
1695 controls
=["show_deleted:1"])
1696 guid
= res
[0]["objectGUID"][0]
1697 self
.ldb_admin
.delete(self
.get_user_dn(new_user
))
1698 res
= self
.ldb_admin
.search(base
="<GUID=%s>" % self
.GUID_string(guid
),
1699 scope
=SCOPE_BASE
, controls
=["show_deleted:1"])
1700 self
.assertEquals(len(res
), 1)
1701 return str(res
[0].dn
)
1703 def undelete_deleted(self
, olddn
, newdn
):
1705 msg
.dn
= Dn(self
.ldb_user
, olddn
)
1706 msg
["isDeleted"] = MessageElement([], FLAG_MOD_DELETE
, "isDeleted")
1707 msg
["distinguishedName"] = MessageElement([newdn
], FLAG_MOD_REPLACE
, "distinguishedName")
1708 res
= self
.ldb_user
.modify(msg
, ["show_recycled:1"])
1710 def undelete_deleted_with_mod(self
, olddn
, newdn
):
1712 msg
.dn
= Dn(ldb
, olddn
)
1713 msg
["isDeleted"] = MessageElement([], FLAG_MOD_DELETE
, "isDeleted")
1714 msg
["distinguishedName"] = MessageElement([newdn
], FLAG_MOD_REPLACE
, "distinguishedName")
1715 msg
["url"] = MessageElement(["www.samba.org"], FLAG_MOD_REPLACE
, "url")
1716 res
= self
.ldb_user
.modify(msg
, ["show_deleted:1"])
1718 def test_undelete(self
):
1719 # it appears the user has to have LC on the old parent to be able to move the object
1720 # otherwise we get no such object. Since only System can modify the SD on deleted object
1721 # we cannot grant this permission via LDAP, and this leaves us with "negative" tests at the moment
1723 # deny write property on rdn, should fail
1724 mod
= "(OD;;WP;bf967a0e-0de6-11d0-a285-00aa003049e2;;%s)" % str(self
.sid
)
1725 self
.sd_utils
.dacl_add_ace(self
.deleted_dn1
, mod
)
1727 self
.undelete_deleted(self
.deleted_dn1
, self
.testuser1_dn
)
1729 except LdbError
, (num
, _
):
1730 self
.assertEquals(num
, ERR_INSUFFICIENT_ACCESS_RIGHTS
)
1732 # seems that permissions on isDeleted and distinguishedName are irrelevant
1733 mod
= "(OD;;WP;bf96798f-0de6-11d0-a285-00aa003049e2;;%s)" % str(self
.sid
)
1734 self
.sd_utils
.dacl_add_ace(self
.deleted_dn2
, mod
)
1735 mod
= "(OD;;WP;bf9679e4-0de6-11d0-a285-00aa003049e2;;%s)" % str(self
.sid
)
1736 self
.sd_utils
.dacl_add_ace(self
.deleted_dn2
, mod
)
1737 self
.undelete_deleted(self
.deleted_dn2
, self
.testuser2_dn
)
1739 # attempt undelete with simultanious addition of url, WP to which is denied
1740 mod
= "(OD;;WP;9a9a0221-4a5b-11d1-a9c3-0000f80367c1;;%s)" % str(self
.sid
)
1741 self
.sd_utils
.dacl_add_ace(self
.deleted_dn3
, mod
)
1743 self
.undelete_deleted_with_mod(self
.deleted_dn3
, self
.testuser3_dn
)
1745 except LdbError
, (num
, _
):
1746 self
.assertEquals(num
, ERR_INSUFFICIENT_ACCESS_RIGHTS
)
1748 # undelete in an ou, in which we have no right to create children
1749 mod
= "(D;;CC;;;%s)" % str(self
.sid
)
1750 self
.sd_utils
.dacl_add_ace(self
.ou1
+ self
.base_dn
, mod
)
1752 self
.undelete_deleted(self
.deleted_dn4
, self
.new_dn_ou
)
1754 except LdbError
, (num
, _
):
1755 self
.assertEquals(num
, ERR_INSUFFICIENT_ACCESS_RIGHTS
)
1757 # delete is not required
1758 mod
= "(D;;SD;;;%s)" % str(self
.sid
)
1759 self
.sd_utils
.dacl_add_ace(self
.deleted_dn5
, mod
)
1760 self
.undelete_deleted(self
.deleted_dn5
, self
.testuser5_dn
)
1762 # deny Reanimate-Tombstone, should fail
1763 mod
= "(OD;;CR;45ec5156-db7e-47bb-b53f-dbeb2d03c40f;;%s)" % str(self
.sid
)
1764 self
.sd_utils
.dacl_add_ace(self
.base_dn
, mod
)
1766 self
.undelete_deleted(self
.deleted_dn4
, self
.testuser4_dn
)
1768 except LdbError
, (num
, _
):
1769 self
.assertEquals(num
, ERR_INSUFFICIENT_ACCESS_RIGHTS
)
1771 class AclSPNTests(AclTests
):
1774 super(AclSPNTests
, self
).setUp()
1775 self
.dcname
= "TESTSRV8"
1776 self
.rodcname
= "TESTRODC8"
1777 self
.computername
= "testcomp8"
1778 self
.test_user
= "spn_test_user8"
1779 self
.computerdn
= "CN=%s,CN=computers,%s" % (self
.computername
, self
.base_dn
)
1780 self
.dc_dn
= "CN=%s,OU=Domain Controllers,%s" % (self
.dcname
, self
.base_dn
)
1781 self
.site
= "Default-First-Site-Name"
1782 self
.rodcctx
= dc_join(server
=host
, creds
=creds
, lp
=lp
,
1783 site
=self
.site
, netbios_name
=self
.rodcname
, targetdir
=None,
1785 self
.dcctx
= dc_join(server
=host
, creds
=creds
, lp
=lp
, site
=self
.site
,
1786 netbios_name
=self
.dcname
, targetdir
=None, domain
=None)
1787 self
.ldb_admin
.newuser(self
.test_user
, self
.user_pass
)
1788 self
.ldb_user1
= self
.get_ldb_connection(self
.test_user
, self
.user_pass
)
1789 self
.user_sid1
= self
.sd_utils
.get_object_sid(self
.get_user_dn(self
.test_user
))
1790 self
.create_computer(self
.computername
, self
.dcctx
.dnsdomain
)
1791 self
.create_rodc(self
.rodcctx
)
1792 self
.create_dc(self
.dcctx
)
1795 super(AclSPNTests
, self
).tearDown()
1796 self
.rodcctx
.cleanup_old_join()
1797 self
.dcctx
.cleanup_old_join()
1798 delete_force(self
.ldb_admin
, "cn=%s,cn=computers,%s" % (self
.computername
, self
.base_dn
))
1799 delete_force(self
.ldb_admin
, self
.get_user_dn(self
.test_user
))
1801 def replace_spn(self
, _ldb
, dn
, spn
):
1802 print "Setting spn %s on %s" % (spn
, dn
)
1803 res
= self
.ldb_admin
.search(dn
, expression
="(objectClass=*)",
1804 scope
=SCOPE_BASE
, attrs
=["servicePrincipalName"])
1805 if "servicePrincipalName" in res
[0].keys():
1806 flag
= FLAG_MOD_REPLACE
1811 msg
.dn
= Dn(self
.ldb_admin
, dn
)
1812 msg
["servicePrincipalName"] = MessageElement(spn
, flag
,
1813 "servicePrincipalName")
1816 def create_computer(self
, computername
, domainname
):
1817 dn
= "CN=%s,CN=computers,%s" % (computername
, self
.base_dn
)
1818 samaccountname
= computername
+ "$"
1819 dnshostname
= "%s.%s" % (computername
, domainname
)
1820 self
.ldb_admin
.add({
1822 "objectclass": "computer",
1823 "sAMAccountName": samaccountname
,
1824 "userAccountControl": str(samba
.dsdb
.UF_WORKSTATION_TRUST_ACCOUNT
),
1825 "dNSHostName": dnshostname
})
1827 # same as for join_RODC, but do not set any SPNs
1828 def create_rodc(self
, ctx
):
1829 ctx
.nc_list
= [ ctx
.base_dn
, ctx
.config_dn
, ctx
.schema_dn
]
1830 ctx
.full_nc_list
= [ ctx
.base_dn
, ctx
.config_dn
, ctx
.schema_dn
]
1831 ctx
.krbtgt_dn
= "CN=krbtgt_%s,CN=Users,%s" % (ctx
.myname
, ctx
.base_dn
)
1833 ctx
.never_reveal_sid
= [ "<SID=%s-%s>" % (ctx
.domsid
, security
.DOMAIN_RID_RODC_DENY
),
1834 "<SID=%s>" % security
.SID_BUILTIN_ADMINISTRATORS
,
1835 "<SID=%s>" % security
.SID_BUILTIN_SERVER_OPERATORS
,
1836 "<SID=%s>" % security
.SID_BUILTIN_BACKUP_OPERATORS
,
1837 "<SID=%s>" % security
.SID_BUILTIN_ACCOUNT_OPERATORS
]
1838 ctx
.reveal_sid
= "<SID=%s-%s>" % (ctx
.domsid
, security
.DOMAIN_RID_RODC_ALLOW
)
1840 mysid
= ctx
.get_mysid()
1841 admin_dn
= "<SID=%s>" % mysid
1842 ctx
.managedby
= admin_dn
1844 ctx
.userAccountControl
= (samba
.dsdb
.UF_WORKSTATION_TRUST_ACCOUNT |
1845 samba
.dsdb
.UF_TRUSTED_TO_AUTHENTICATE_FOR_DELEGATION |
1846 samba
.dsdb
.UF_PARTIAL_SECRETS_ACCOUNT
)
1848 ctx
.connection_dn
= "CN=RODC Connection (FRS),%s" % ctx
.ntds_dn
1849 ctx
.secure_channel_type
= misc
.SEC_CHAN_RODC
1851 ctx
.replica_flags
= (drsuapi
.DRSUAPI_DRS_INIT_SYNC |
1852 drsuapi
.DRSUAPI_DRS_PER_SYNC |
1853 drsuapi
.DRSUAPI_DRS_GET_ANC |
1854 drsuapi
.DRSUAPI_DRS_NEVER_SYNCED |
1855 drsuapi
.DRSUAPI_DRS_SPECIAL_SECRET_PROCESSING
)
1857 ctx
.join_add_objects()
1859 def create_dc(self
, ctx
):
1860 ctx
.nc_list
= [ ctx
.base_dn
, ctx
.config_dn
, ctx
.schema_dn
]
1861 ctx
.full_nc_list
= [ ctx
.base_dn
, ctx
.config_dn
, ctx
.schema_dn
]
1862 ctx
.userAccountControl
= samba
.dsdb
.UF_SERVER_TRUST_ACCOUNT | samba
.dsdb
.UF_TRUSTED_FOR_DELEGATION
1863 ctx
.secure_channel_type
= misc
.SEC_CHAN_BDC
1864 ctx
.replica_flags
= (drsuapi
.DRSUAPI_DRS_WRIT_REP |
1865 drsuapi
.DRSUAPI_DRS_INIT_SYNC |
1866 drsuapi
.DRSUAPI_DRS_PER_SYNC |
1867 drsuapi
.DRSUAPI_DRS_FULL_SYNC_IN_PROGRESS |
1868 drsuapi
.DRSUAPI_DRS_NEVER_SYNCED
)
1870 ctx
.join_add_objects()
1872 def dc_spn_test(self
, ctx
):
1873 netbiosdomain
= self
.dcctx
.get_domain_name()
1875 self
.replace_spn(self
.ldb_user1
, ctx
.acct_dn
, "HOST/%s/%s" % (ctx
.myname
, netbiosdomain
))
1876 except LdbError
, (num
, _
):
1877 self
.assertEquals(num
, ERR_INSUFFICIENT_ACCESS_RIGHTS
)
1879 mod
= "(OA;;SW;f3a64788-5306-11d1-a9c5-0000f80367c1;;%s)" % str(self
.user_sid1
)
1880 self
.sd_utils
.dacl_add_ace(ctx
.acct_dn
, mod
)
1881 self
.replace_spn(self
.ldb_user1
, ctx
.acct_dn
, "HOST/%s/%s" % (ctx
.myname
, netbiosdomain
))
1882 self
.replace_spn(self
.ldb_user1
, ctx
.acct_dn
, "HOST/%s" % (ctx
.myname
))
1883 self
.replace_spn(self
.ldb_user1
, ctx
.acct_dn
, "HOST/%s.%s/%s" %
1884 (ctx
.myname
, ctx
.dnsdomain
, netbiosdomain
))
1885 self
.replace_spn(self
.ldb_user1
, ctx
.acct_dn
, "HOST/%s/%s" % (ctx
.myname
, ctx
.dnsdomain
))
1886 self
.replace_spn(self
.ldb_user1
, ctx
.acct_dn
, "HOST/%s.%s/%s" %
1887 (ctx
.myname
, ctx
.dnsdomain
, ctx
.dnsdomain
))
1888 self
.replace_spn(self
.ldb_user1
, ctx
.acct_dn
, "GC/%s.%s/%s" %
1889 (ctx
.myname
, ctx
.dnsdomain
, ctx
.dnsforest
))
1890 self
.replace_spn(self
.ldb_user1
, ctx
.acct_dn
, "ldap/%s/%s" % (ctx
.myname
, netbiosdomain
))
1891 self
.replace_spn(self
.ldb_user1
, ctx
.acct_dn
, "ldap/%s.%s/%s" %
1892 (ctx
.myname
, ctx
.dnsdomain
, netbiosdomain
))
1893 self
.replace_spn(self
.ldb_user1
, ctx
.acct_dn
, "ldap/%s" % (ctx
.myname
))
1894 self
.replace_spn(self
.ldb_user1
, ctx
.acct_dn
, "ldap/%s/%s" % (ctx
.myname
, ctx
.dnsdomain
))
1895 self
.replace_spn(self
.ldb_user1
, ctx
.acct_dn
, "ldap/%s.%s/%s" %
1896 (ctx
.myname
, ctx
.dnsdomain
, ctx
.dnsdomain
))
1897 self
.replace_spn(self
.ldb_user1
, ctx
.acct_dn
, "DNS/%s/%s" % (ctx
.myname
, ctx
.dnsdomain
))
1898 self
.replace_spn(self
.ldb_user1
, ctx
.acct_dn
, "RestrictedKrbHost/%s/%s" %
1899 (ctx
.myname
, ctx
.dnsdomain
))
1900 self
.replace_spn(self
.ldb_user1
, ctx
.acct_dn
, "RestrictedKrbHost/%s" %
1902 self
.replace_spn(self
.ldb_user1
, ctx
.acct_dn
, "Dfsr-12F9A27C-BF97-4787-9364-D31B6C55EB04/%s/%s" %
1903 (ctx
.myname
, ctx
.dnsdomain
))
1904 self
.replace_spn(self
.ldb_user1
, ctx
.acct_dn
, "NtFrs-88f5d2bd-b646-11d2-a6d3-00c04fc9b232/%s/%s" %
1905 (ctx
.myname
, ctx
.dnsdomain
))
1906 self
.replace_spn(self
.ldb_user1
, ctx
.acct_dn
, "ldap/%s._msdcs.%s" %
1907 (ctx
.ntds_guid
, ctx
.dnsdomain
))
1909 #the following spns do not match the restrictions and should fail
1911 self
.replace_spn(self
.ldb_user1
, ctx
.acct_dn
, "ldap/%s.%s/ForestDnsZones.%s" %
1912 (ctx
.myname
, ctx
.dnsdomain
, ctx
.dnsdomain
))
1913 except LdbError
, (num
, _
):
1914 self
.assertEquals(num
, ERR_CONSTRAINT_VIOLATION
)
1916 self
.replace_spn(self
.ldb_user1
, ctx
.acct_dn
, "ldap/%s.%s/DomainDnsZones.%s" %
1917 (ctx
.myname
, ctx
.dnsdomain
, ctx
.dnsdomain
))
1918 except LdbError
, (num
, _
):
1919 self
.assertEquals(num
, ERR_CONSTRAINT_VIOLATION
)
1921 self
.replace_spn(self
.ldb_user1
, ctx
.acct_dn
, "nosuchservice/%s/%s" % ("abcd", "abcd"))
1922 except LdbError
, (num
, _
):
1923 self
.assertEquals(num
, ERR_CONSTRAINT_VIOLATION
)
1925 self
.replace_spn(self
.ldb_user1
, ctx
.acct_dn
, "GC/%s.%s/%s" %
1926 (ctx
.myname
, ctx
.dnsdomain
, netbiosdomain
))
1927 except LdbError
, (num
, _
):
1928 self
.assertEquals(num
, ERR_CONSTRAINT_VIOLATION
)
1930 self
.replace_spn(self
.ldb_user1
, ctx
.acct_dn
, "E3514235-4B06-11D1-AB04-00C04FC2DCD2/%s/%s" %
1931 (ctx
.ntds_guid
, ctx
.dnsdomain
))
1932 except LdbError
, (num
, _
):
1933 self
.assertEquals(num
, ERR_CONSTRAINT_VIOLATION
)
1935 def test_computer_spn(self
):
1936 # with WP, any value can be set
1937 netbiosdomain
= self
.dcctx
.get_domain_name()
1938 self
.replace_spn(self
.ldb_admin
, self
.computerdn
, "HOST/%s/%s" %
1939 (self
.computername
, netbiosdomain
))
1940 self
.replace_spn(self
.ldb_admin
, self
.computerdn
, "HOST/%s" % (self
.computername
))
1941 self
.replace_spn(self
.ldb_admin
, self
.computerdn
, "HOST/%s.%s/%s" %
1942 (self
.computername
, self
.dcctx
.dnsdomain
, netbiosdomain
))
1943 self
.replace_spn(self
.ldb_admin
, self
.computerdn
, "HOST/%s/%s" %
1944 (self
.computername
, self
.dcctx
.dnsdomain
))
1945 self
.replace_spn(self
.ldb_admin
, self
.computerdn
, "HOST/%s.%s/%s" %
1946 (self
.computername
, self
.dcctx
.dnsdomain
, self
.dcctx
.dnsdomain
))
1947 self
.replace_spn(self
.ldb_admin
, self
.computerdn
, "GC/%s.%s/%s" %
1948 (self
.computername
, self
.dcctx
.dnsdomain
, self
.dcctx
.dnsforest
))
1949 self
.replace_spn(self
.ldb_admin
, self
.computerdn
, "ldap/%s/%s" % (self
.computername
, netbiosdomain
))
1950 self
.replace_spn(self
.ldb_admin
, self
.computerdn
, "ldap/%s.%s/ForestDnsZones.%s" %
1951 (self
.computername
, self
.dcctx
.dnsdomain
, self
.dcctx
.dnsdomain
))
1952 self
.replace_spn(self
.ldb_admin
, self
.computerdn
, "ldap/%s.%s/DomainDnsZones.%s" %
1953 (self
.computername
, self
.dcctx
.dnsdomain
, self
.dcctx
.dnsdomain
))
1954 self
.replace_spn(self
.ldb_admin
, self
.computerdn
, "ldap/%s.%s/%s" %
1955 (self
.computername
, self
.dcctx
.dnsdomain
, netbiosdomain
))
1956 self
.replace_spn(self
.ldb_admin
, self
.computerdn
, "ldap/%s" % (self
.computername
))
1957 self
.replace_spn(self
.ldb_admin
, self
.computerdn
, "ldap/%s/%s" %
1958 (self
.computername
, self
.dcctx
.dnsdomain
))
1959 self
.replace_spn(self
.ldb_admin
, self
.computerdn
, "ldap/%s.%s/%s" %
1960 (self
.computername
, self
.dcctx
.dnsdomain
, self
.dcctx
.dnsdomain
))
1961 self
.replace_spn(self
.ldb_admin
, self
.computerdn
, "DNS/%s/%s" %
1962 (self
.computername
, self
.dcctx
.dnsdomain
))
1963 self
.replace_spn(self
.ldb_admin
, self
.computerdn
, "RestrictedKrbHost/%s/%s" %
1964 (self
.computername
, self
.dcctx
.dnsdomain
))
1965 self
.replace_spn(self
.ldb_admin
, self
.computerdn
, "RestrictedKrbHost/%s" %
1966 (self
.computername
))
1967 self
.replace_spn(self
.ldb_admin
, self
.computerdn
, "Dfsr-12F9A27C-BF97-4787-9364-D31B6C55EB04/%s/%s" %
1968 (self
.computername
, self
.dcctx
.dnsdomain
))
1969 self
.replace_spn(self
.ldb_admin
, self
.computerdn
, "NtFrs-88f5d2bd-b646-11d2-a6d3-00c04fc9b232/%s/%s" %
1970 (self
.computername
, self
.dcctx
.dnsdomain
))
1971 self
.replace_spn(self
.ldb_admin
, self
.computerdn
, "nosuchservice/%s/%s" % ("abcd", "abcd"))
1973 #user has neither WP nor Validated-SPN, access denied expected
1975 self
.replace_spn(self
.ldb_user1
, self
.computerdn
, "HOST/%s/%s" % (self
.computername
, netbiosdomain
))
1976 except LdbError
, (num
, _
):
1977 self
.assertEquals(num
, ERR_INSUFFICIENT_ACCESS_RIGHTS
)
1979 mod
= "(OA;;SW;f3a64788-5306-11d1-a9c5-0000f80367c1;;%s)" % str(self
.user_sid1
)
1980 self
.sd_utils
.dacl_add_ace(self
.computerdn
, mod
)
1981 #grant Validated-SPN and check which values are accepted
1982 #see 3.1.1.5.3.1.1.4 servicePrincipalName for reference
1984 # for regular computer objects we shouldalways get constraint violation
1986 # This does not pass against Windows, although it should according to docs
1987 self
.replace_spn(self
.ldb_user1
, self
.computerdn
, "HOST/%s" % (self
.computername
))
1988 self
.replace_spn(self
.ldb_user1
, self
.computerdn
, "HOST/%s.%s" %
1989 (self
.computername
, self
.dcctx
.dnsdomain
))
1992 self
.replace_spn(self
.ldb_user1
, self
.computerdn
, "HOST/%s/%s" % (self
.computername
, netbiosdomain
))
1993 except LdbError
, (num
, _
):
1994 self
.assertEquals(num
, ERR_CONSTRAINT_VIOLATION
)
1996 self
.replace_spn(self
.ldb_user1
, self
.computerdn
, "HOST/%s.%s/%s" %
1997 (self
.computername
, self
.dcctx
.dnsdomain
, netbiosdomain
))
1998 except LdbError
, (num
, _
):
1999 self
.assertEquals(num
, ERR_CONSTRAINT_VIOLATION
)
2001 self
.replace_spn(self
.ldb_user1
, self
.computerdn
, "HOST/%s/%s" %
2002 (self
.computername
, self
.dcctx
.dnsdomain
))
2003 except LdbError
, (num
, _
):
2004 self
.assertEquals(num
, ERR_CONSTRAINT_VIOLATION
)
2006 self
.replace_spn(self
.ldb_user1
, self
.computerdn
, "HOST/%s.%s/%s" %
2007 (self
.computername
, self
.dcctx
.dnsdomain
, self
.dcctx
.dnsdomain
))
2008 except LdbError
, (num
, _
):
2009 self
.assertEquals(num
, ERR_CONSTRAINT_VIOLATION
)
2011 self
.replace_spn(self
.ldb_user1
, self
.computerdn
, "GC/%s.%s/%s" %
2012 (self
.computername
, self
.dcctx
.dnsdomain
, self
.dcctx
.dnsforest
))
2013 except LdbError
, (num
, _
):
2014 self
.assertEquals(num
, ERR_CONSTRAINT_VIOLATION
)
2016 self
.replace_spn(self
.ldb_user1
, self
.computerdn
, "ldap/%s/%s" % (self
.computername
, netbiosdomain
))
2017 except LdbError
, (num
, _
):
2018 self
.assertEquals(num
, ERR_CONSTRAINT_VIOLATION
)
2020 self
.replace_spn(self
.ldb_user1
, self
.computerdn
, "ldap/%s.%s/ForestDnsZones.%s" %
2021 (self
.computername
, self
.dcctx
.dnsdomain
, self
.dcctx
.dnsdomain
))
2022 except LdbError
, (num
, _
):
2023 self
.assertEquals(num
, ERR_CONSTRAINT_VIOLATION
)
2025 def test_spn_rwdc(self
):
2026 self
.dc_spn_test(self
.dcctx
)
2028 def test_spn_rodc(self
):
2029 self
.dc_spn_test(self
.rodcctx
)
2032 # Important unit running information
2034 ldb
= SamDB(ldaphost
, credentials
=creds
, session_info
=system_session(lp
), lp
=lp
)
2036 TestProgram(module
=__name__
, opts
=subunitopts
)