2 # -*- coding: utf-8 -*-
3 # test tokengroups attribute against internal token calculation
9 sys
.path
.insert(0, "bin/python")
12 from samba
.tests
.subunitrun
import SubunitOptions
, TestProgram
14 import samba
.getopt
as options
16 from samba
.auth
import system_session
17 from samba
import ldb
, dsdb
18 from samba
.samdb
import SamDB
19 from samba
.auth
import AuthContext
20 from samba
.ndr
import ndr_unpack
21 from samba
import gensec
22 from samba
.credentials
import Credentials
, DONT_USE_KERBEROS
, MUST_USE_KERBEROS
, AUTO_USE_KERBEROS
23 from samba
.dsdb
import GTYPE_SECURITY_GLOBAL_GROUP
, GTYPE_SECURITY_UNIVERSAL_GROUP
25 from samba
.tests
import delete_force
26 from samba
.dcerpc
import security
27 from samba
.auth
import AUTH_SESSION_INFO_DEFAULT_GROUPS
, AUTH_SESSION_INFO_AUTHENTICATED
, AUTH_SESSION_INFO_SIMPLE_PRIVILEGES
, AUTH_SESSION_INFO_NTLM
30 parser
= optparse
.OptionParser("token_group.py [options] <host>")
31 sambaopts
= options
.SambaOptions(parser
)
32 parser
.add_option_group(sambaopts
)
33 parser
.add_option_group(options
.VersionOptions(parser
))
34 # use command line creds if available
35 credopts
= options
.CredentialsOptions(parser
)
36 parser
.add_option_group(credopts
)
37 subunitopts
= SubunitOptions(parser
)
38 parser
.add_option_group(subunitopts
)
39 opts
, args
= parser
.parse_args()
47 lp
= sambaopts
.get_loadparm()
48 creds
= credopts
.get_credentials(lp
)
49 creds
.set_gensec_features(creds
.get_gensec_features() | gensec
.FEATURE_SEAL
)
52 def closure(vSet
, wSet
, aSet
):
56 if end
not in wSet
and end
in vSet
:
58 closure(vSet
, wSet
, aSet
)
61 class StaticTokenTest(samba
.tests
.TestCase
):
64 super(StaticTokenTest
, self
).setUp()
66 self
.assertNotEqual(creds
.get_kerberos_state(), AUTO_USE_KERBEROS
)
68 self
.ldb
= SamDB(url
, credentials
=creds
, session_info
=system_session(lp
), lp
=lp
)
69 self
.base_dn
= self
.ldb
.domain_dn()
71 res
= self
.ldb
.search("", scope
=ldb
.SCOPE_BASE
, attrs
=["tokenGroups"])
72 self
.assertEqual(len(res
), 1)
74 self
.user_sid_dn
= "<SID=%s>" % str(ndr_unpack(samba
.dcerpc
.security
.dom_sid
, res
[0]["tokenGroups"][0]))
76 session_info_flags
= (AUTH_SESSION_INFO_DEFAULT_GROUPS |
77 AUTH_SESSION_INFO_AUTHENTICATED |
78 AUTH_SESSION_INFO_SIMPLE_PRIVILEGES
)
79 if creds
.get_kerberos_state() == DONT_USE_KERBEROS
:
80 session_info_flags |
= AUTH_SESSION_INFO_NTLM
82 session
= samba
.auth
.user_session(self
.ldb
, lp_ctx
=lp
, dn
=self
.user_sid_dn
,
83 session_info_flags
=session_info_flags
)
85 token
= session
.security_token
88 self
.user_sids
.append(str(s
))
90 # Add asserted identity and Claims Valid for Kerberos
91 if creds
.get_kerberos_state() == MUST_USE_KERBEROS
:
92 self
.user_sids
.append(str(security
.SID_AUTHENTICATION_AUTHORITY_ASSERTED_IDENTITY
))
93 self
.user_sids
.append(str(security
.SID_CLAIMS_VALID
))
96 def test_rootDSE_tokenGroups(self
):
97 """Testing rootDSE tokengroups against internal calculation"""
98 if not url
.startswith("ldap"):
99 self
.fail(msg
="This test is only valid on ldap")
101 res
= self
.ldb
.search("", scope
=ldb
.SCOPE_BASE
, attrs
=["tokenGroups"])
102 self
.assertEqual(len(res
), 1)
104 print("Getting tokenGroups from rootDSE")
106 for sid
in res
[0]['tokenGroups']:
107 tokengroups
.append(str(ndr_unpack(samba
.dcerpc
.security
.dom_sid
, sid
)))
109 sidset1
= set(tokengroups
)
110 sidset2
= set(self
.user_sids
)
111 if len(sidset1
.symmetric_difference(sidset2
)):
112 print("token sids don't match")
113 print("tokengroups: %s" % tokengroups
)
114 print("calculated : %s" % self
.user_sids
)
115 print("difference : %s" % sidset1
.symmetric_difference(sidset2
))
116 self
.fail(msg
="calculated groups don't match against rootDSE tokenGroups")
118 def test_dn_tokenGroups(self
):
119 print("Getting tokenGroups from user DN")
120 res
= self
.ldb
.search(self
.user_sid_dn
, scope
=ldb
.SCOPE_BASE
, attrs
=["tokenGroups"])
121 self
.assertEqual(len(res
), 1)
124 for sid
in res
[0]['tokenGroups']:
125 dn_tokengroups
.append(str(ndr_unpack(samba
.dcerpc
.security
.dom_sid
, sid
)))
127 sidset1
= set(dn_tokengroups
)
128 sidset2
= set(self
.user_sids
)
130 # The tokenGroups is just a subset of the user_sids
131 # so we don't check symmetric_difference() here.
132 if len(sidset1
.difference(sidset2
)):
133 print("dn token sids no subset of user token")
134 print("tokengroups: %s" % dn_tokengroups
)
135 print("user sids : %s" % self
.user_sids
)
136 print("difference : %s" % sidset1
.difference(sidset2
))
137 self
.fail(msg
="DN tokenGroups no subset of full user token")
139 missing_sidset
= sidset2
.difference(sidset1
)
142 extra_sids
.append(self
.user_sids
[0])
143 extra_sids
.append(security
.SID_WORLD
)
144 extra_sids
.append(security
.SID_NT_NETWORK
)
145 extra_sids
.append(security
.SID_NT_AUTHENTICATED_USERS
)
146 extra_sids
.append(security
.SID_BUILTIN_PREW2K
)
147 if creds
.get_kerberos_state() == MUST_USE_KERBEROS
:
148 extra_sids
.append(security
.SID_AUTHENTICATION_AUTHORITY_ASSERTED_IDENTITY
)
149 extra_sids
.append(security
.SID_CLAIMS_VALID
)
150 if creds
.get_kerberos_state() == DONT_USE_KERBEROS
:
151 extra_sids
.append(security
.SID_NT_NTLM_AUTHENTICATION
)
153 extra_sidset
= set(extra_sids
)
155 if len(missing_sidset
.symmetric_difference(extra_sidset
)):
156 print("dn token sids unexpected")
157 print("tokengroups: %s" % dn_tokengroups
)
158 print("user sids: %s" % self
.user_sids
)
159 print("actual difference: %s" % missing_sidset
)
160 print("expected difference: %s" % extra_sidset
)
161 print("unexpected difference : %s" %
162 missing_sidset
.symmetric_difference(extra_sidset
))
163 self
.fail(msg
="DN tokenGroups unexpected difference to full user token")
165 def test_pac_groups(self
):
166 if creds
.get_kerberos_state() != MUST_USE_KERBEROS
:
167 self
.skipTest("Kerberos disabled, skipping PAC test")
170 settings
["lp_ctx"] = lp
171 settings
["target_hostname"] = lp
.get("netbios name")
173 gensec_client
= gensec
.Security
.start_client(settings
)
174 gensec_client
.set_credentials(creds
)
175 gensec_client
.want_feature(gensec
.FEATURE_SEAL
)
176 gensec_client
.start_mech_by_sasl_name("GSSAPI")
178 auth_context
= AuthContext(lp_ctx
=lp
, ldb
=self
.ldb
, methods
=[])
180 gensec_server
= gensec
.Security
.start_server(settings
, auth_context
)
181 machine_creds
= Credentials()
182 machine_creds
.guess(lp
)
183 machine_creds
.set_machine_account(lp
)
184 gensec_server
.set_credentials(machine_creds
)
186 gensec_server
.want_feature(gensec
.FEATURE_SEAL
)
187 gensec_server
.start_mech_by_sasl_name("GSSAPI")
189 client_finished
= False
190 server_finished
= False
191 server_to_client
= b
""
193 # Run the actual call loop.
194 while not client_finished
and not server_finished
:
195 if not client_finished
:
196 print("running client gensec_update")
197 (client_finished
, client_to_server
) = gensec_client
.update(server_to_client
)
198 if not server_finished
:
199 print("running server gensec_update")
200 (server_finished
, server_to_client
) = gensec_server
.update(client_to_server
)
202 session
= gensec_server
.session_info()
204 token
= session
.security_token
207 pac_sids
.append(str(s
))
209 sidset1
= set(pac_sids
)
210 sidset2
= set(self
.user_sids
)
211 if len(sidset1
.symmetric_difference(sidset2
)):
212 print("token sids don't match")
213 print("pac sids: %s" % pac_sids
)
214 print("user sids : %s" % self
.user_sids
)
215 print("difference : %s" % sidset1
.symmetric_difference(sidset2
))
216 self
.fail(msg
="calculated groups don't match against user PAC tokenGroups")
219 class DynamicTokenTest(samba
.tests
.TestCase
):
221 def get_creds(self
, target_username
, target_password
):
222 creds_tmp
= Credentials()
223 creds_tmp
.set_username(target_username
)
224 creds_tmp
.set_password(target_password
)
225 creds_tmp
.set_domain(creds
.get_domain())
226 creds_tmp
.set_realm(creds
.get_realm())
227 creds_tmp
.set_kerberos_state(creds
.get_kerberos_state())
228 creds_tmp
.set_workstation(creds
.get_workstation())
229 creds_tmp
.set_gensec_features(creds_tmp
.get_gensec_features()
230 | gensec
.FEATURE_SEAL
)
233 def get_ldb_connection(self
, target_username
, target_password
):
234 creds_tmp
= self
.get_creds(target_username
, target_password
)
235 ldb_target
= SamDB(url
=url
, credentials
=creds_tmp
, lp
=lp
)
239 super(DynamicTokenTest
, self
).setUp()
241 self
.assertNotEqual(creds
.get_kerberos_state(), AUTO_USE_KERBEROS
)
243 self
.admin_ldb
= SamDB(url
, credentials
=creds
, session_info
=system_session(lp
), lp
=lp
)
245 self
.base_dn
= self
.admin_ldb
.domain_dn()
247 self
.test_user
= "tokengroups_user1"
248 self
.test_user_pass
= "samba123@"
249 self
.admin_ldb
.newuser(self
.test_user
, self
.test_user_pass
)
250 self
.test_group0
= "tokengroups_group0"
251 self
.admin_ldb
.newgroup(self
.test_group0
, grouptype
=dsdb
.GTYPE_SECURITY_DOMAIN_LOCAL_GROUP
)
252 res
= self
.admin_ldb
.search(base
="cn=%s,cn=users,%s" % (self
.test_group0
, self
.base_dn
),
253 attrs
=["objectSid"], scope
=ldb
.SCOPE_BASE
)
254 self
.test_group0_sid
= ndr_unpack(samba
.dcerpc
.security
.dom_sid
, res
[0]["objectSid"][0])
256 self
.admin_ldb
.add_remove_group_members(self
.test_group0
, [self
.test_user
],
257 add_members_operation
=True)
259 self
.test_group1
= "tokengroups_group1"
260 self
.admin_ldb
.newgroup(self
.test_group1
, grouptype
=dsdb
.GTYPE_SECURITY_GLOBAL_GROUP
)
261 res
= self
.admin_ldb
.search(base
="cn=%s,cn=users,%s" % (self
.test_group1
, self
.base_dn
),
262 attrs
=["objectSid"], scope
=ldb
.SCOPE_BASE
)
263 self
.test_group1_sid
= ndr_unpack(samba
.dcerpc
.security
.dom_sid
, res
[0]["objectSid"][0])
265 self
.admin_ldb
.add_remove_group_members(self
.test_group1
, [self
.test_user
],
266 add_members_operation
=True)
268 self
.test_group2
= "tokengroups_group2"
269 self
.admin_ldb
.newgroup(self
.test_group2
, grouptype
=dsdb
.GTYPE_SECURITY_UNIVERSAL_GROUP
)
271 res
= self
.admin_ldb
.search(base
="cn=%s,cn=users,%s" % (self
.test_group2
, self
.base_dn
),
272 attrs
=["objectSid"], scope
=ldb
.SCOPE_BASE
)
273 self
.test_group2_sid
= ndr_unpack(samba
.dcerpc
.security
.dom_sid
, res
[0]["objectSid"][0])
275 self
.admin_ldb
.add_remove_group_members(self
.test_group2
, [self
.test_user
],
276 add_members_operation
=True)
278 self
.test_group3
= "tokengroups_group3"
279 self
.admin_ldb
.newgroup(self
.test_group3
, grouptype
=dsdb
.GTYPE_SECURITY_UNIVERSAL_GROUP
)
281 res
= self
.admin_ldb
.search(base
="cn=%s,cn=users,%s" % (self
.test_group3
, self
.base_dn
),
282 attrs
=["objectSid"], scope
=ldb
.SCOPE_BASE
)
283 self
.test_group3_sid
= ndr_unpack(samba
.dcerpc
.security
.dom_sid
, res
[0]["objectSid"][0])
285 self
.admin_ldb
.add_remove_group_members(self
.test_group3
, [self
.test_group1
],
286 add_members_operation
=True)
288 self
.test_group4
= "tokengroups_group4"
289 self
.admin_ldb
.newgroup(self
.test_group4
, grouptype
=dsdb
.GTYPE_SECURITY_UNIVERSAL_GROUP
)
291 res
= self
.admin_ldb
.search(base
="cn=%s,cn=users,%s" % (self
.test_group4
, self
.base_dn
),
292 attrs
=["objectSid"], scope
=ldb
.SCOPE_BASE
)
293 self
.test_group4_sid
= ndr_unpack(samba
.dcerpc
.security
.dom_sid
, res
[0]["objectSid"][0])
295 self
.admin_ldb
.add_remove_group_members(self
.test_group4
, [self
.test_group3
],
296 add_members_operation
=True)
298 self
.test_group5
= "tokengroups_group5"
299 self
.admin_ldb
.newgroup(self
.test_group5
, grouptype
=dsdb
.GTYPE_SECURITY_DOMAIN_LOCAL_GROUP
)
301 res
= self
.admin_ldb
.search(base
="cn=%s,cn=users,%s" % (self
.test_group5
, self
.base_dn
),
302 attrs
=["objectSid"], scope
=ldb
.SCOPE_BASE
)
303 self
.test_group5_sid
= ndr_unpack(samba
.dcerpc
.security
.dom_sid
, res
[0]["objectSid"][0])
305 self
.admin_ldb
.add_remove_group_members(self
.test_group5
, [self
.test_group4
],
306 add_members_operation
=True)
308 self
.test_group6
= "tokengroups_group6"
309 self
.admin_ldb
.newgroup(self
.test_group6
, grouptype
=dsdb
.GTYPE_SECURITY_DOMAIN_LOCAL_GROUP
)
311 res
= self
.admin_ldb
.search(base
="cn=%s,cn=users,%s" % (self
.test_group6
, self
.base_dn
),
312 attrs
=["objectSid"], scope
=ldb
.SCOPE_BASE
)
313 self
.test_group6_sid
= ndr_unpack(samba
.dcerpc
.security
.dom_sid
, res
[0]["objectSid"][0])
315 self
.admin_ldb
.add_remove_group_members(self
.test_group6
, [self
.test_user
],
316 add_members_operation
=True)
318 self
.ldb
= self
.get_ldb_connection(self
.test_user
, self
.test_user_pass
)
320 res
= self
.ldb
.search("", scope
=ldb
.SCOPE_BASE
, attrs
=["tokenGroups"])
321 self
.assertEqual(len(res
), 1)
323 self
.user_sid
= ndr_unpack(samba
.dcerpc
.security
.dom_sid
, res
[0]["tokenGroups"][0])
324 self
.user_sid_dn
= "<SID=%s>" % str(self
.user_sid
)
326 res
= self
.ldb
.search(self
.user_sid_dn
, scope
=ldb
.SCOPE_BASE
, attrs
=[])
327 self
.assertEqual(len(res
), 1)
329 self
.test_user_dn
= res
[0].dn
331 session_info_flags
= (AUTH_SESSION_INFO_DEFAULT_GROUPS |
332 AUTH_SESSION_INFO_AUTHENTICATED |
333 AUTH_SESSION_INFO_SIMPLE_PRIVILEGES
)
335 if creds
.get_kerberos_state() == DONT_USE_KERBEROS
:
336 session_info_flags |
= AUTH_SESSION_INFO_NTLM
338 session
= samba
.auth
.user_session(self
.ldb
, lp_ctx
=lp
, dn
=self
.user_sid_dn
,
339 session_info_flags
=session_info_flags
)
341 token
= session
.security_token
344 self
.user_sids
.append(str(s
))
346 # Add asserted identity and Claims Valid for Kerberos
347 if creds
.get_kerberos_state() == MUST_USE_KERBEROS
:
348 self
.user_sids
.append(str(security
.SID_AUTHENTICATION_AUTHORITY_ASSERTED_IDENTITY
))
349 self
.user_sids
.append(str(security
.SID_CLAIMS_VALID
))
352 super(DynamicTokenTest
, self
).tearDown()
353 delete_force(self
.admin_ldb
, "CN=%s,%s,%s" %
354 (self
.test_user
, "cn=users", self
.base_dn
))
355 delete_force(self
.admin_ldb
, "CN=%s,%s,%s" %
356 (self
.test_group0
, "cn=users", self
.base_dn
))
357 delete_force(self
.admin_ldb
, "CN=%s,%s,%s" %
358 (self
.test_group1
, "cn=users", self
.base_dn
))
359 delete_force(self
.admin_ldb
, "CN=%s,%s,%s" %
360 (self
.test_group2
, "cn=users", self
.base_dn
))
361 delete_force(self
.admin_ldb
, "CN=%s,%s,%s" %
362 (self
.test_group3
, "cn=users", self
.base_dn
))
363 delete_force(self
.admin_ldb
, "CN=%s,%s,%s" %
364 (self
.test_group4
, "cn=users", self
.base_dn
))
365 delete_force(self
.admin_ldb
, "CN=%s,%s,%s" %
366 (self
.test_group5
, "cn=users", self
.base_dn
))
367 delete_force(self
.admin_ldb
, "CN=%s,%s,%s" %
368 (self
.test_group6
, "cn=users", self
.base_dn
))
370 def test_rootDSE_tokenGroups(self
):
371 """Testing rootDSE tokengroups against internal calculation"""
372 if not url
.startswith("ldap"):
373 self
.fail(msg
="This test is only valid on ldap")
375 res
= self
.ldb
.search("", scope
=ldb
.SCOPE_BASE
, attrs
=["tokenGroups"])
376 self
.assertEqual(len(res
), 1)
378 print("Getting tokenGroups from rootDSE")
380 for sid
in res
[0]['tokenGroups']:
381 tokengroups
.append(str(ndr_unpack(samba
.dcerpc
.security
.dom_sid
, sid
)))
383 sidset1
= set(tokengroups
)
384 sidset2
= set(self
.user_sids
)
385 if len(sidset1
.symmetric_difference(sidset2
)):
386 print("token sids don't match")
387 print("tokengroups: %s" % tokengroups
)
388 print("calculated : %s" % self
.user_sids
)
389 print("difference : %s" % sidset1
.symmetric_difference(sidset2
))
390 self
.fail(msg
="calculated groups don't match against rootDSE tokenGroups")
392 def test_dn_tokenGroups(self
):
393 print("Getting tokenGroups from user DN")
394 res
= self
.ldb
.search(self
.user_sid_dn
, scope
=ldb
.SCOPE_BASE
, attrs
=["tokenGroups"])
395 self
.assertEqual(len(res
), 1)
398 for sid
in res
[0]['tokenGroups']:
399 dn_tokengroups
.append(str(ndr_unpack(samba
.dcerpc
.security
.dom_sid
, sid
)))
401 sidset1
= set(dn_tokengroups
)
402 sidset2
= set(self
.user_sids
)
404 # The tokenGroups is just a subset of the user_sids
405 # so we don't check symmetric_difference() here.
406 if len(sidset1
.difference(sidset2
)):
407 print("dn token sids no subset of user token")
408 print("tokengroups: %s" % dn_tokengroups
)
409 print("user sids : %s" % self
.user_sids
)
410 print("difference : %s" % sidset1
.difference(sidset2
))
411 self
.fail(msg
="DN tokenGroups no subset of full user token")
413 missing_sidset
= sidset2
.difference(sidset1
)
416 extra_sids
.append(self
.user_sids
[0])
417 extra_sids
.append(security
.SID_WORLD
)
418 extra_sids
.append(security
.SID_NT_NETWORK
)
419 extra_sids
.append(security
.SID_NT_AUTHENTICATED_USERS
)
420 extra_sids
.append(security
.SID_BUILTIN_PREW2K
)
421 if creds
.get_kerberos_state() == MUST_USE_KERBEROS
:
422 extra_sids
.append(security
.SID_AUTHENTICATION_AUTHORITY_ASSERTED_IDENTITY
)
423 extra_sids
.append(security
.SID_CLAIMS_VALID
)
424 if creds
.get_kerberos_state() == DONT_USE_KERBEROS
:
425 extra_sids
.append(security
.SID_NT_NTLM_AUTHENTICATION
)
427 extra_sidset
= set(extra_sids
)
429 if len(missing_sidset
.symmetric_difference(extra_sidset
)):
430 print("dn token sids unexpected")
431 print("tokengroups: %s" % dn_tokengroups
)
432 print("user sids: %s" % self
.user_sids
)
433 print("actual difference: %s" % missing_sidset
)
434 print("expected difference: %s" % extra_sidset
)
435 print("unexpected difference : %s" %
436 missing_sidset
.symmetric_difference(extra_sidset
))
437 self
.fail(msg
="DN tokenGroups unexpected difference to full user token")
439 def test_pac_groups(self
):
440 if creds
.get_kerberos_state() != MUST_USE_KERBEROS
:
441 self
.skipTest("Kerberos disabled, skipping PAC test")
444 settings
["lp_ctx"] = lp
445 settings
["target_hostname"] = lp
.get("netbios name")
447 gensec_client
= gensec
.Security
.start_client(settings
)
448 gensec_client
.set_credentials(self
.get_creds(self
.test_user
, self
.test_user_pass
))
449 gensec_client
.want_feature(gensec
.FEATURE_SEAL
)
450 gensec_client
.start_mech_by_sasl_name("GSSAPI")
452 auth_context
= AuthContext(lp_ctx
=lp
, ldb
=self
.ldb
, methods
=[])
454 gensec_server
= gensec
.Security
.start_server(settings
, auth_context
)
455 machine_creds
= Credentials()
456 machine_creds
.guess(lp
)
457 machine_creds
.set_machine_account(lp
)
458 gensec_server
.set_credentials(machine_creds
)
460 gensec_server
.want_feature(gensec
.FEATURE_SEAL
)
461 gensec_server
.start_mech_by_sasl_name("GSSAPI")
463 client_finished
= False
464 server_finished
= False
465 server_to_client
= b
""
467 # Run the actual call loop.
468 while not client_finished
and not server_finished
:
469 if not client_finished
:
470 print("running client gensec_update")
471 (client_finished
, client_to_server
) = gensec_client
.update(server_to_client
)
472 if not server_finished
:
473 print("running server gensec_update")
474 (server_finished
, server_to_client
) = gensec_server
.update(client_to_server
)
476 session
= gensec_server
.session_info()
478 token
= session
.security_token
481 pac_sids
.append(str(s
))
483 sidset1
= set(pac_sids
)
484 sidset2
= set(self
.user_sids
)
485 if len(sidset1
.symmetric_difference(sidset2
)):
486 print("token sids don't match")
487 print("pac sids: %s" % pac_sids
)
488 print("user sids : %s" % self
.user_sids
)
489 print("difference : %s" % sidset1
.symmetric_difference(sidset2
))
490 self
.fail(msg
="calculated groups don't match against user PAC tokenGroups")
492 def test_tokenGroups_manual(self
):
493 # Manually run the tokenGroups algorithm from MS-ADTS 3.1.1.4.5.19 and MS-DRSR 4.1.8.3
494 # and compare the result
495 res
= self
.admin_ldb
.search(base
=self
.base_dn
, scope
=ldb
.SCOPE_SUBTREE
,
496 expression
="(|(objectclass=user)(objectclass=group))",
502 if "memberOf" in obj
:
503 for dn
in obj
["memberOf"]:
504 first
= obj
.dn
.get_casefold()
505 second
= ldb
.Dn(self
.admin_ldb
, dn
.decode('utf8')).get_casefold()
506 aSet
.add((first
, second
))
507 aSetR
.add((second
, first
))
511 res
= self
.admin_ldb
.search(base
=self
.base_dn
, scope
=ldb
.SCOPE_SUBTREE
,
512 expression
="(objectclass=user)",
513 attrs
=["primaryGroupID"])
515 if "primaryGroupID" in obj
:
516 sid
= "%s-%d" % (self
.admin_ldb
.get_domain_sid(), int(obj
["primaryGroupID"][0]))
517 res2
= self
.admin_ldb
.search(base
="<SID=%s>" % sid
, scope
=ldb
.SCOPE_BASE
,
519 first
= obj
.dn
.get_casefold()
520 second
= res2
[0].dn
.get_casefold()
522 aSet
.add((first
, second
))
523 aSetR
.add((second
, first
))
528 wSet
.add(self
.test_user_dn
.get_casefold())
529 closure(vSet
, wSet
, aSet
)
530 wSet
.remove(self
.test_user_dn
.get_casefold())
532 tokenGroupsSet
= set()
534 res
= self
.ldb
.search(self
.user_sid_dn
, scope
=ldb
.SCOPE_BASE
, attrs
=["tokenGroups"])
535 self
.assertEqual(len(res
), 1)
537 for sid
in res
[0]['tokenGroups']:
538 sid
= ndr_unpack(samba
.dcerpc
.security
.dom_sid
, sid
)
539 res3
= self
.admin_ldb
.search(base
="<SID=%s>" % sid
, scope
=ldb
.SCOPE_BASE
,
541 tokenGroupsSet
.add(res3
[0].dn
.get_casefold())
543 if len(wSet
.difference(tokenGroupsSet
)):
544 self
.fail(msg
="additional calculated: %s" % wSet
.difference(tokenGroupsSet
))
546 if len(tokenGroupsSet
.difference(wSet
)):
547 self
.fail(msg
="additional tokenGroups: %s" % tokenGroupsSet
.difference(wSet
))
549 def filtered_closure(self
, wSet
, filter_grouptype
):
550 res
= self
.admin_ldb
.search(base
=self
.base_dn
, scope
=ldb
.SCOPE_SUBTREE
,
551 expression
="(|(objectclass=user)(objectclass=group))",
557 vSet
.add(obj
.dn
.get_casefold())
558 if "memberOf" in obj
:
559 for dn
in obj
["memberOf"]:
560 first
= obj
.dn
.get_casefold()
561 second
= ldb
.Dn(self
.admin_ldb
, dn
.decode('utf8')).get_casefold()
562 aSet
.add((first
, second
))
563 aSetR
.add((second
, first
))
567 res
= self
.admin_ldb
.search(base
=self
.base_dn
, scope
=ldb
.SCOPE_SUBTREE
,
568 expression
="(objectclass=user)",
569 attrs
=["primaryGroupID"])
571 if "primaryGroupID" in obj
:
572 sid
= "%s-%d" % (self
.admin_ldb
.get_domain_sid(), int(obj
["primaryGroupID"][0]))
573 res2
= self
.admin_ldb
.search(base
="<SID=%s>" % sid
, scope
=ldb
.SCOPE_BASE
,
575 first
= obj
.dn
.get_casefold()
576 second
= res2
[0].dn
.get_casefold()
578 aSet
.add((first
, second
))
579 aSetR
.add((second
, first
))
585 res_group
= self
.admin_ldb
.search(base
=v
, scope
=ldb
.SCOPE_BASE
,
587 expression
="objectClass=group")
588 if len(res_group
) == 1:
589 if hex(int(res_group
[0]["groupType"][0]) & 0x00000000FFFFFFFF) == hex(filter_grouptype
):
594 closure(uSet
, wSet
, aSet
)
596 def test_tokenGroupsGlobalAndUniversal_manual(self
):
597 # Manually run the tokenGroups algorithm from MS-ADTS 3.1.1.4.5.19 and MS-DRSR 4.1.8.3
598 # and compare the result
600 # The variable names come from MS-ADTS May 15, 2014
603 S
.add(self
.test_user_dn
.get_casefold())
605 self
.filtered_closure(S
, GTYPE_SECURITY_GLOBAL_GROUP
)
608 # Not really a SID, we do this on DNs...
612 self
.filtered_closure(X
, GTYPE_SECURITY_UNIVERSAL_GROUP
)
616 T
.remove(self
.test_user_dn
.get_casefold())
618 tokenGroupsSet
= set()
620 res
= self
.ldb
.search(self
.user_sid_dn
, scope
=ldb
.SCOPE_BASE
, attrs
=["tokenGroupsGlobalAndUniversal"])
621 self
.assertEqual(len(res
), 1)
623 for sid
in res
[0]['tokenGroupsGlobalAndUniversal']:
624 sid
= ndr_unpack(samba
.dcerpc
.security
.dom_sid
, sid
)
625 res3
= self
.admin_ldb
.search(base
="<SID=%s>" % sid
, scope
=ldb
.SCOPE_BASE
,
627 tokenGroupsSet
.add(res3
[0].dn
.get_casefold())
629 if len(T
.difference(tokenGroupsSet
)):
630 self
.fail(msg
="additional calculated: %s" % T
.difference(tokenGroupsSet
))
632 if len(tokenGroupsSet
.difference(T
)):
633 self
.fail(msg
="additional tokenGroupsGlobalAndUniversal: %s" % tokenGroupsSet
.difference(T
))
635 def test_samr_GetGroupsForUser(self
):
636 # Confirm that we get the correct results against SAMR also
637 if not url
.startswith("ldap://"):
638 self
.fail(msg
="This test is only valid on ldap (so we an find the hostname and use SAMR)")
639 host
= url
.split("://")[1]
640 (domain_sid
, user_rid
) = self
.user_sid
.split()
641 samr_conn
= samba
.dcerpc
.samr
.samr("ncacn_ip_tcp:%s[seal]" % host
, lp
, creds
)
642 samr_handle
= samr_conn
.Connect2(None, security
.SEC_FLAG_MAXIMUM_ALLOWED
)
643 samr_domain
= samr_conn
.OpenDomain(samr_handle
, security
.SEC_FLAG_MAXIMUM_ALLOWED
,
645 user_handle
= samr_conn
.OpenUser(samr_domain
, security
.SEC_FLAG_MAXIMUM_ALLOWED
, user_rid
)
646 rids
= samr_conn
.GetGroupsForUser(user_handle
)
648 for rid
in rids
.rids
:
649 self
.assertEqual(rid
.attributes
, security
.SE_GROUP_DEFAULT_FLAGS
)
650 sid
= "%s-%d" % (domain_sid
, rid
.rid
)
651 res
= self
.admin_ldb
.search(base
="<SID=%s>" % sid
, scope
=ldb
.SCOPE_BASE
,
653 samr_dns
.add(res
[0].dn
.get_casefold())
655 user_info
= samr_conn
.QueryUserInfo(user_handle
, 1)
656 self
.assertEqual(rids
.rids
[0].rid
, user_info
.primary_gid
)
658 tokenGroupsSet
= set()
659 res
= self
.ldb
.search(self
.user_sid_dn
, scope
=ldb
.SCOPE_BASE
, attrs
=["tokenGroupsGlobalAndUniversal"])
660 for sid
in res
[0]['tokenGroupsGlobalAndUniversal']:
661 sid
= ndr_unpack(samba
.dcerpc
.security
.dom_sid
, sid
)
662 res3
= self
.admin_ldb
.search(base
="<SID=%s>" % sid
, scope
=ldb
.SCOPE_BASE
,
664 expression
="(&(|(grouptype=%d)(grouptype=%d))(objectclass=group))"
665 % (GTYPE_SECURITY_GLOBAL_GROUP
, GTYPE_SECURITY_UNIVERSAL_GROUP
))
667 tokenGroupsSet
.add(res3
[0].dn
.get_casefold())
669 if len(samr_dns
.difference(tokenGroupsSet
)):
670 self
.fail(msg
="additional samr_GetUserGroups over tokenGroups: %s" % samr_dns
.difference(tokenGroupsSet
))
673 # Add the primary group
674 primary_group_sid
= "%s-%d" % (domain_sid
, user_info
.primary_gid
)
675 res2
= self
.admin_ldb
.search(base
="<SID=%s>" % primary_group_sid
, scope
=ldb
.SCOPE_BASE
,
678 memberOf
.add(res2
[0].dn
.get_casefold())
679 res
= self
.ldb
.search(self
.user_sid_dn
, scope
=ldb
.SCOPE_BASE
, attrs
=["memberOf"])
680 for dn
in res
[0]['memberOf']:
681 res3
= self
.admin_ldb
.search(base
=dn
, scope
=ldb
.SCOPE_BASE
,
683 expression
="(&(|(grouptype=%d)(grouptype=%d))(objectclass=group))"
684 % (GTYPE_SECURITY_GLOBAL_GROUP
, GTYPE_SECURITY_UNIVERSAL_GROUP
))
686 memberOf
.add(res3
[0].dn
.get_casefold())
688 if len(memberOf
.difference(samr_dns
)):
689 self
.fail(msg
="additional memberOf over samr_GetUserGroups: %s" % memberOf
.difference(samr_dns
))
691 if len(samr_dns
.difference(memberOf
)):
692 self
.fail(msg
="additional samr_GetUserGroups over memberOf: %s" % samr_dns
.difference(memberOf
))
695 S
.add(self
.test_user_dn
.get_casefold())
697 self
.filtered_closure(S
, GTYPE_SECURITY_GLOBAL_GROUP
)
698 self
.filtered_closure(S
, GTYPE_SECURITY_UNIVERSAL_GROUP
)
700 # Now remove the user DN and primary group
701 S
.remove(self
.test_user_dn
.get_casefold())
703 if len(samr_dns
.difference(S
)):
704 self
.fail(msg
="additional samr_GetUserGroups over filtered_closure: %s" % samr_dns
.difference(S
))
706 def test_samr_GetGroupsForUser_nomember(self
):
707 # Confirm that we get the correct results against SAMR also
708 if not url
.startswith("ldap://"):
709 self
.fail(msg
="This test is only valid on ldap (so we an find the hostname and use SAMR)")
710 host
= url
.split("://")[1]
712 test_user
= "tokengroups_user2"
713 self
.admin_ldb
.newuser(test_user
, self
.test_user_pass
)
714 res
= self
.admin_ldb
.search(base
="cn=%s,cn=users,%s" % (test_user
, self
.base_dn
),
715 attrs
=["objectSid"], scope
=ldb
.SCOPE_BASE
)
716 user_sid
= ndr_unpack(samba
.dcerpc
.security
.dom_sid
, res
[0]["objectSid"][0])
718 (domain_sid
, user_rid
) = user_sid
.split()
719 samr_conn
= samba
.dcerpc
.samr
.samr("ncacn_ip_tcp:%s[seal]" % host
, lp
, creds
)
720 samr_handle
= samr_conn
.Connect2(None, security
.SEC_FLAG_MAXIMUM_ALLOWED
)
721 samr_domain
= samr_conn
.OpenDomain(samr_handle
, security
.SEC_FLAG_MAXIMUM_ALLOWED
,
723 user_handle
= samr_conn
.OpenUser(samr_domain
, security
.SEC_FLAG_MAXIMUM_ALLOWED
, user_rid
)
724 rids
= samr_conn
.GetGroupsForUser(user_handle
)
725 user_info
= samr_conn
.QueryUserInfo(user_handle
, 1)
726 delete_force(self
.admin_ldb
, "CN=%s,%s,%s" %
727 (test_user
, "cn=users", self
.base_dn
))
728 self
.assertEqual(len(rids
.rids
), 1)
729 self
.assertEqual(rids
.rids
[0].rid
, user_info
.primary_gid
)
733 if os
.path
.isfile(url
):
734 url
= "tdb://%s" % url
736 url
= "ldap://%s" % url
738 TestProgram(module
=__name__
, opts
=subunitopts
)