2 # Unix SMB/CIFS implementation.
3 # Copyright (C) Stefan Metzmacher 2020
4 # Copyright (C) 2021 Catalyst.Net Ltd
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 3 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful,
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 # GNU General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program. If not, see <http://www.gnu.org/licenses/>.
23 sys
.path
.insert(0, "bin/python")
24 os
.environ
["PYTHONUNBUFFERED"] = "1"
28 from ldb
import SCOPE_SUBTREE
29 from samba
import NTSTATUSError
, gensec
30 from samba
.auth
import AuthContext
31 from samba
.dcerpc
import security
32 from samba
.ndr
import ndr_unpack
33 from samba
.ntstatus
import NT_STATUS_NO_IMPERSONATION_TOKEN
35 from samba
.tests
.krb5
.kdc_base_test
import KDCBaseTest
37 global_asn1_print
= False
38 global_hexdump
= False
41 class CcacheTests(KDCBaseTest
):
42 """Test for authentication using Kerberos credentials stored in a
43 credentials cache file.
46 def test_ccache(self
):
47 self
._run
_ccache
_test
()
49 def test_ccache_rename(self
):
50 self
._run
_ccache
_test
(rename
=True)
52 def test_ccache_no_pac(self
):
53 self
._run
_ccache
_test
(include_pac
=False,
54 expect_anon
=True, allow_error
=True)
56 def _run_ccache_test(self
, rename
=False, include_pac
=True,
57 expect_anon
=False, allow_error
=False):
58 # Create a user account and a machine account, along with a Kerberos
59 # credentials cache file where the service ticket authenticating the
62 mach_name
= "ccachemac"
65 samdb
= self
.get_samdb()
67 # Create the user account.
68 user_credentials
= self
.get_cached_creds(
69 account_type
=self
.AccountType
.USER
,
71 user_name
= user_credentials
.get_username()
73 # Create the machine account.
74 (mach_credentials
, _
) = self
.create_account(
77 account_type
=self
.AccountType
.COMPUTER
,
78 spn
="%s/%s" % (service
,
81 # Talk to the KDC to obtain the service ticket, which gets placed into
82 # the cache. The machine account name has to match the name in the
83 # ticket, to ensure that the krbtgt ticket doesn't also need to be
85 (creds
, cachefile
) = self
.create_ccache_with_user(user_credentials
,
88 # Remove the cached credentials file.
89 self
.addCleanup(os
.remove
, cachefile
.name
)
91 # Retrieve the user account's SID.
92 ldb_res
= samdb
.search(scope
=SCOPE_SUBTREE
,
93 expression
="(sAMAccountName=%s)" % user_name
,
95 self
.assertEqual(1, len(ldb_res
))
96 sid
= ndr_unpack(security
.dom_sid
, ldb_res
[0]["objectSid"][0])
101 new_name
= self
.get_new_username()
103 msg
= ldb
.Message(user_credentials
.get_dn())
104 msg
['sAMAccountName'] = ldb
.MessageElement(new_name
,
105 ldb
.FLAG_MOD_REPLACE
,
109 # Authenticate in-process to the machine account using the user's
110 # cached credentials.
113 lp
.set('server role', 'active directory domain controller')
116 settings
["lp_ctx"] = lp
117 settings
["target_hostname"] = mach_name
119 gensec_client
= gensec
.Security
.start_client(settings
)
120 gensec_client
.set_credentials(creds
)
121 gensec_client
.want_feature(gensec
.FEATURE_SEAL
)
122 gensec_client
.start_mech_by_sasl_name("GSSAPI")
124 auth_context
= AuthContext(lp_ctx
=lp
, ldb
=samdb
, methods
=[])
126 gensec_server
= gensec
.Security
.start_server(settings
, auth_context
)
127 gensec_server
.set_credentials(mach_credentials
)
129 gensec_server
.start_mech_by_sasl_name("GSSAPI")
131 client_finished
= False
132 server_finished
= False
133 server_to_client
= b
''
135 # Operate as both the client and the server to verify the user's
137 while not client_finished
or not server_finished
:
138 if not client_finished
:
139 print("running client gensec_update")
140 (client_finished
, client_to_server
) = gensec_client
.update(
142 if not server_finished
:
143 print("running server gensec_update")
144 (server_finished
, server_to_client
) = gensec_server
.update(
147 # Ensure that the first SID contained within the obtained security
148 # token is the SID of the user we created.
150 # Retrieve the SIDs from the security token.
152 session
= gensec_server
.session_info()
153 except NTSTATUSError
as e
:
158 self
.assertEqual(NT_STATUS_NO_IMPERSONATION_TOKEN
, enum
)
161 token
= session
.security_token
162 token_sids
= token
.sids
163 self
.assertGreater(len(token_sids
), 0)
165 # Ensure that they match.
166 self
.assertEqual(sid
, token_sids
[0])
169 if __name__
== "__main__":
170 global_asn1_print
= False
171 global_hexdump
= False