1 # Unix SMB/CIFS implementation.
2 # Copyright (C) Jelmer Vernooij <jelmer@samba.org> 2009
4 # This program is free software; you can redistribute it and/or modify
5 # it under the terms of the GNU General Public License as published by
6 # the Free Software Foundation; either version 3 of the License, or
7 # (at your option) any later version.
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
14 # You should have received a copy of the GNU General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
20 Note that this just tests the bindings work. It does not intend to test
21 the functionality, that's already done in other tests.
24 from samba
.credentials
import Credentials
25 from samba
import gensec
, auth
29 class GensecTests(samba
.tests
.TestCase
):
34 self
.settings
["lp_ctx"] = self
.lp_ctx
= samba
.tests
.env_loadparm()
35 self
.settings
["target_hostname"] = self
.lp_ctx
.get("netbios name")
36 self
.lp_ctx
.set("spnego:simulate_w2k", "no")
38 # This is just for the API tests
39 self
.gensec
= gensec
.Security
.start_client(self
.settings
)
41 def test_start_mech_by_unknown_name(self
):
42 self
.assertRaises(RuntimeError, self
.gensec
.start_mech_by_name
, "foo")
44 def test_start_mech_by_name(self
):
45 self
.gensec
.start_mech_by_name("spnego")
47 def test_info_uninitialized(self
):
48 self
.assertRaises(RuntimeError, self
.gensec
.session_info
)
50 def _test_update(self
, mech
, *, creds
=None, client_mech
=None, client_only_opt
=None):
51 """Test GENSEC by doing an exchange with ourselves using GSSAPI against a KDC"""
53 # Start up a client and server GENSEC instance to test things with
56 creds
= self
.get_credentials()
59 orig_client_opt
= self
.lp_ctx
.get(client_only_opt
)
60 if not orig_client_opt
:
62 self
.lp_ctx
.set(client_only_opt
, "yes")
64 self
.gensec_client
= gensec
.Security
.start_client(self
.settings
)
65 self
.gensec_client
.set_credentials(creds
)
66 self
.gensec_client
.want_feature(gensec
.FEATURE_SEAL
)
67 if client_mech
is not None:
68 self
.gensec_client
.start_mech_by_name(client_mech
)
70 self
.gensec_client
.start_mech_by_sasl_name(mech
)
73 self
.lp_ctx
.set(client_only_opt
, "no")
75 self
.gensec_server
= gensec
.Security
.start_server(settings
=self
.settings
,
76 auth_context
=auth
.AuthContext(lp_ctx
=self
.lp_ctx
))
78 creds
.guess(self
.lp_ctx
)
79 creds
.set_machine_account(self
.lp_ctx
)
80 self
.gensec_server
.set_credentials(creds
)
82 self
.gensec_server
.want_feature(gensec
.FEATURE_SEAL
)
83 self
.gensec_server
.start_mech_by_sasl_name(mech
)
85 client_finished
= False
86 server_finished
= False
87 server_to_client
= b
""
88 client_to_server
= b
""
90 # Run the actual call loop
92 if not client_finished
:
94 self
.lp_ctx
.set(client_only_opt
, "yes")
95 print("running client gensec_update")
97 (client_finished
, client_to_server
) = self
.gensec_client
.update(server_to_client
)
98 except samba
.NTSTATUSError
as nt
:
99 raise AssertionError(nt
)
101 self
.lp_ctx
.set(client_only_opt
, "no")
102 if not server_finished
:
103 print("running server gensec_update")
105 (server_finished
, server_to_client
) = self
.gensec_server
.update(client_to_server
)
106 except samba
.NTSTATUSError
as nt
:
107 raise AssertionError(nt
)
109 if client_finished
and server_finished
:
113 self
.lp_ctx
.set(client_only_opt
, orig_client_opt
)
115 self
.assertTrue(server_finished
)
116 self
.assertTrue(client_finished
)
118 session_info
= self
.gensec_server
.session_info()
120 test_bytes
= b
"Hello Server"
122 test_wrapped
= self
.gensec_client
.wrap(test_bytes
)
123 test_unwrapped
= self
.gensec_server
.unwrap(test_wrapped
)
124 except samba
.NTSTATUSError
as e
:
127 self
.assertEqual(test_bytes
, test_unwrapped
)
128 test_bytes
= b
"Hello Client"
129 test_wrapped
= self
.gensec_server
.wrap(test_bytes
)
130 test_unwrapped
= self
.gensec_client
.unwrap(test_wrapped
)
131 self
.assertEqual(test_bytes
, test_unwrapped
)
133 client_session_key
= self
.gensec_client
.session_key()
134 server_session_key
= self
.gensec_server
.session_key()
135 self
.assertEqual(client_session_key
, server_session_key
)
137 def test_update(self
):
138 self
._test
_update
("GSSAPI")
140 def test_update_spnego(self
):
141 self
._test
_update
("GSS-SPNEGO")
143 def test_update_spnego_downgrade(self
):
144 self
._test
_update
("GSS-SPNEGO", client_mech
="spnego", client_only_opt
="gensec:gssapi_krb5")
146 def test_update_no_optimistic_spnego(self
):
147 self
._test
_update
("GSS-SPNEGO", client_mech
="spnego", client_only_opt
="spnego:client_no_optimistic")
149 def test_update_w2k_spnego_client(self
):
150 self
.lp_ctx
.set("spnego:simulate_w2k", "yes")
152 # Re-start the client with this set
153 self
.gensec
= gensec
.Security
.start_client(self
.settings
)
155 # Unset it for the server
156 self
.lp_ctx
.set("spnego:simulate_w2k", "no")
158 self
._test
_update
("GSS-SPNEGO")
160 def test_update_w2k_spnego_server(self
):
161 # Re-start the client with this set
162 self
.gensec
= gensec
.Security
.start_client(self
.settings
)
164 # Unset it for the server
165 self
.lp_ctx
.set("spnego:simulate_w2k", "yes")
167 self
._test
_update
("GSS-SPNEGO")
169 def test_update_w2k_spnego(self
):
170 self
.lp_ctx
.set("spnego:simulate_w2k", "no")
172 # Re-start the client with this set
173 self
.gensec
= gensec
.Security
.start_client(self
.settings
)
175 self
._test
_update
("GSS-SPNEGO")
177 def test_update_gss_krb5_to_spnego(self
):
178 self
._test
_update
("GSS-SPNEGO", client_mech
="gssapi_krb5")
180 def test_update_ntlmssp_to_spnego(self
):
181 self
._test
_update
("GSS-SPNEGO", client_mech
="ntlmssp")
183 def test_update_fast(self
):
184 """Test associating a machine account with the credentials
185 to protect the password from cracking and show
186 'log in from device' pattern.
188 (Note we can't tell if FAST armor was actually used with this test)"""
189 creds
= self
.insta_creds(template
=self
.get_credentials())
190 machine_creds
= Credentials()
191 machine_creds
.guess(self
.lp_ctx
)
192 machine_creds
.set_machine_account(self
.lp_ctx
)
193 creds
.set_krb5_fast_armor_credentials(machine_creds
, True)
194 self
._test
_update
("GSSAPI", creds
=creds
)
196 def test_update_anon_fast(self
):
197 """Test setting no FAST credentials, but requiring FAST.
198 Against a Heimdal KDC this will trigger the anonymous
201 (Note we can't tell if FAST armor was actually used with this test)
203 creds
= self
.insta_creds(template
=self
.get_credentials())
204 creds
.set_krb5_fast_armor_credentials(None, True)
205 self
._test
_update
("GSSAPI", creds
=creds
)
207 def test_max_update_size(self
):
208 """Test GENSEC by doing an exchange with ourselves using GSSAPI against a KDC"""
210 # Start up a client and server GENSEC instance to test things with
212 self
.gensec_client
= gensec
.Security
.start_client(self
.settings
)
213 self
.gensec_client
.set_credentials(self
.get_credentials())
214 self
.gensec_client
.want_feature(gensec
.FEATURE_SIGN
)
215 self
.gensec_client
.set_max_update_size(5)
216 self
.gensec_client
.start_mech_by_name("spnego")
218 self
.gensec_server
= gensec
.Security
.start_server(settings
=self
.settings
,
219 auth_context
=auth
.AuthContext(lp_ctx
=self
.lp_ctx
))
220 creds
= Credentials()
221 creds
.guess(self
.lp_ctx
)
222 creds
.set_machine_account(self
.lp_ctx
)
223 self
.gensec_server
.set_credentials(creds
)
224 self
.gensec_server
.want_feature(gensec
.FEATURE_SIGN
)
225 self
.gensec_server
.set_max_update_size(5)
226 self
.gensec_server
.start_mech_by_name("spnego")
228 client_finished
= False
229 server_finished
= False
230 server_to_client
= b
""
232 # Run the actual call loop
234 while not client_finished
or not server_finished
:
236 if not client_finished
:
237 print("running client gensec_update: %d: %r" % (len(server_to_client
), server_to_client
))
238 (client_finished
, client_to_server
) = self
.gensec_client
.update(server_to_client
)
239 if not server_finished
:
240 print("running server gensec_update: %d: %r" % (len(client_to_server
), client_to_server
))
241 (server_finished
, server_to_client
) = self
.gensec_server
.update(client_to_server
)
243 # Here we expect a lot more than the typical 1 or 2 roundtrips
244 self
.assertTrue(i
> 10)
246 session_info
= self
.gensec_server
.session_info()
248 test_bytes
= b
"Hello Server"
249 test_wrapped
= self
.gensec_client
.wrap(test_bytes
)
250 test_unwrapped
= self
.gensec_server
.unwrap(test_wrapped
)
251 self
.assertEqual(test_bytes
, test_unwrapped
)
252 test_bytes
= b
"Hello Client"
253 test_wrapped
= self
.gensec_server
.wrap(test_bytes
)
254 test_unwrapped
= self
.gensec_client
.unwrap(test_wrapped
)
255 self
.assertEqual(test_bytes
, test_unwrapped
)
257 client_session_key
= self
.gensec_client
.session_key()
258 server_session_key
= self
.gensec_server
.session_key()
259 self
.assertEqual(client_session_key
, server_session_key
)