vfs_ceph_new: common prefix to debug-log messages
[Samba.git] / python / samba / tests / gensec.py
blob8f9c88d170c818c1b2c8ac7e450cdfa0b552d2eb
1 # Unix SMB/CIFS implementation.
2 # Copyright (C) Jelmer Vernooij <jelmer@samba.org> 2009
4 # This program is free software; you can redistribute it and/or modify
5 # it under the terms of the GNU General Public License as published by
6 # the Free Software Foundation; either version 3 of the License, or
7 # (at your option) any later version.
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
14 # You should have received a copy of the GNU General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 """Tests for GENSEC.
20 Note that this just tests the bindings work. It does not intend to test
21 the functionality, that's already done in other tests.
22 """
24 from samba.credentials import Credentials
25 from samba import gensec, auth
26 import samba.tests
29 class GensecTests(samba.tests.TestCase):
31 def setUp(self):
32 super().setUp()
33 self.settings = {}
34 self.settings["lp_ctx"] = self.lp_ctx = samba.tests.env_loadparm()
35 self.settings["target_hostname"] = self.lp_ctx.get("netbios name")
36 self.lp_ctx.set("spnego:simulate_w2k", "no")
38 # This is just for the API tests
39 self.gensec = gensec.Security.start_client(self.settings)
41 def test_start_mech_by_unknown_name(self):
42 self.assertRaises(RuntimeError, self.gensec.start_mech_by_name, "foo")
44 def test_start_mech_by_name(self):
45 self.gensec.start_mech_by_name("spnego")
47 def test_info_uninitialized(self):
48 self.assertRaises(RuntimeError, self.gensec.session_info)
50 def _test_update(self, mech, *, creds=None, client_mech=None, client_only_opt=None):
51 """Test GENSEC by doing an exchange with ourselves using GSSAPI against a KDC"""
53 # Start up a client and server GENSEC instance to test things with
55 if creds is None:
56 creds = self.get_credentials()
58 if client_only_opt:
59 orig_client_opt = self.lp_ctx.get(client_only_opt)
60 if not orig_client_opt:
61 orig_client_opt = ''
62 self.lp_ctx.set(client_only_opt, "yes")
64 self.gensec_client = gensec.Security.start_client(self.settings)
65 self.gensec_client.set_credentials(creds)
66 self.gensec_client.want_feature(gensec.FEATURE_SEAL)
67 if client_mech is not None:
68 self.gensec_client.start_mech_by_name(client_mech)
69 else:
70 self.gensec_client.start_mech_by_sasl_name(mech)
72 if client_only_opt:
73 self.lp_ctx.set(client_only_opt, "no")
75 self.gensec_server = gensec.Security.start_server(settings=self.settings,
76 auth_context=auth.AuthContext(lp_ctx=self.lp_ctx))
77 creds = Credentials()
78 creds.guess(self.lp_ctx)
79 creds.set_machine_account(self.lp_ctx)
80 self.gensec_server.set_credentials(creds)
82 self.gensec_server.want_feature(gensec.FEATURE_SEAL)
83 self.gensec_server.start_mech_by_sasl_name(mech)
85 client_finished = False
86 server_finished = False
87 server_to_client = b""
88 client_to_server = b""
90 # Run the actual call loop
91 while True:
92 if not client_finished:
93 if client_only_opt:
94 self.lp_ctx.set(client_only_opt, "yes")
95 print("running client gensec_update")
96 try:
97 (client_finished, client_to_server) = self.gensec_client.update(server_to_client)
98 except samba.NTSTATUSError as nt:
99 raise AssertionError(nt)
100 if client_only_opt:
101 self.lp_ctx.set(client_only_opt, "no")
102 if not server_finished:
103 print("running server gensec_update")
104 try:
105 (server_finished, server_to_client) = self.gensec_server.update(client_to_server)
106 except samba.NTSTATUSError as nt:
107 raise AssertionError(nt)
109 if client_finished and server_finished:
110 break
112 if client_only_opt:
113 self.lp_ctx.set(client_only_opt, orig_client_opt)
115 self.assertTrue(server_finished)
116 self.assertTrue(client_finished)
118 session_info = self.gensec_server.session_info()
120 test_bytes = b"Hello Server"
121 try:
122 test_wrapped = self.gensec_client.wrap(test_bytes)
123 test_unwrapped = self.gensec_server.unwrap(test_wrapped)
124 except samba.NTSTATUSError as e:
125 self.fail(str(e))
127 self.assertEqual(test_bytes, test_unwrapped)
128 test_bytes = b"Hello Client"
129 test_wrapped = self.gensec_server.wrap(test_bytes)
130 test_unwrapped = self.gensec_client.unwrap(test_wrapped)
131 self.assertEqual(test_bytes, test_unwrapped)
133 client_session_key = self.gensec_client.session_key()
134 server_session_key = self.gensec_server.session_key()
135 self.assertEqual(client_session_key, server_session_key)
137 def test_update(self):
138 self._test_update("GSSAPI")
140 def test_update_spnego(self):
141 self._test_update("GSS-SPNEGO")
143 def test_update_spnego_downgrade(self):
144 self._test_update("GSS-SPNEGO", client_mech="spnego", client_only_opt="gensec:gssapi_krb5")
146 def test_update_no_optimistic_spnego(self):
147 self._test_update("GSS-SPNEGO", client_mech="spnego", client_only_opt="spnego:client_no_optimistic")
149 def test_update_w2k_spnego_client(self):
150 self.lp_ctx.set("spnego:simulate_w2k", "yes")
152 # Re-start the client with this set
153 self.gensec = gensec.Security.start_client(self.settings)
155 # Unset it for the server
156 self.lp_ctx.set("spnego:simulate_w2k", "no")
158 self._test_update("GSS-SPNEGO")
160 def test_update_w2k_spnego_server(self):
161 # Re-start the client with this set
162 self.gensec = gensec.Security.start_client(self.settings)
164 # Unset it for the server
165 self.lp_ctx.set("spnego:simulate_w2k", "yes")
167 self._test_update("GSS-SPNEGO")
169 def test_update_w2k_spnego(self):
170 self.lp_ctx.set("spnego:simulate_w2k", "no")
172 # Re-start the client with this set
173 self.gensec = gensec.Security.start_client(self.settings)
175 self._test_update("GSS-SPNEGO")
177 def test_update_gss_krb5_to_spnego(self):
178 self._test_update("GSS-SPNEGO", client_mech="gssapi_krb5")
180 def test_update_ntlmssp_to_spnego(self):
181 self._test_update("GSS-SPNEGO", client_mech="ntlmssp")
183 def test_update_fast(self):
184 """Test associating a machine account with the credentials
185 to protect the password from cracking and show
186 'log in from device' pattern.
188 (Note we can't tell if FAST armor was actually used with this test)"""
189 creds = self.insta_creds(template=self.get_credentials())
190 machine_creds = Credentials()
191 machine_creds.guess(self.lp_ctx)
192 machine_creds.set_machine_account(self.lp_ctx)
193 creds.set_krb5_fast_armor_credentials(machine_creds, True)
194 self._test_update("GSSAPI", creds=creds)
196 def test_update_anon_fast(self):
197 """Test setting no FAST credentials, but requiring FAST.
198 Against a Heimdal KDC this will trigger the anonymous
199 PKINIT protection.
201 (Note we can't tell if FAST armor was actually used with this test)
203 creds = self.insta_creds(template=self.get_credentials())
204 creds.set_krb5_fast_armor_credentials(None, True)
205 self._test_update("GSSAPI", creds=creds)
207 def test_max_update_size(self):
208 """Test GENSEC by doing an exchange with ourselves using GSSAPI against a KDC"""
210 # Start up a client and server GENSEC instance to test things with
212 self.gensec_client = gensec.Security.start_client(self.settings)
213 self.gensec_client.set_credentials(self.get_credentials())
214 self.gensec_client.want_feature(gensec.FEATURE_SIGN)
215 self.gensec_client.set_max_update_size(5)
216 self.gensec_client.start_mech_by_name("spnego")
218 self.gensec_server = gensec.Security.start_server(settings=self.settings,
219 auth_context=auth.AuthContext(lp_ctx=self.lp_ctx))
220 creds = Credentials()
221 creds.guess(self.lp_ctx)
222 creds.set_machine_account(self.lp_ctx)
223 self.gensec_server.set_credentials(creds)
224 self.gensec_server.want_feature(gensec.FEATURE_SIGN)
225 self.gensec_server.set_max_update_size(5)
226 self.gensec_server.start_mech_by_name("spnego")
228 client_finished = False
229 server_finished = False
230 server_to_client = b""
232 # Run the actual call loop
233 i = 0
234 while not client_finished or not server_finished:
235 i += 1
236 if not client_finished:
237 print("running client gensec_update: %d: %r" % (len(server_to_client), server_to_client))
238 (client_finished, client_to_server) = self.gensec_client.update(server_to_client)
239 if not server_finished:
240 print("running server gensec_update: %d: %r" % (len(client_to_server), client_to_server))
241 (server_finished, server_to_client) = self.gensec_server.update(client_to_server)
243 # Here we expect a lot more than the typical 1 or 2 roundtrips
244 self.assertTrue(i > 10)
246 session_info = self.gensec_server.session_info()
248 test_bytes = b"Hello Server"
249 test_wrapped = self.gensec_client.wrap(test_bytes)
250 test_unwrapped = self.gensec_server.unwrap(test_wrapped)
251 self.assertEqual(test_bytes, test_unwrapped)
252 test_bytes = b"Hello Client"
253 test_wrapped = self.gensec_server.wrap(test_bytes)
254 test_unwrapped = self.gensec_client.unwrap(test_wrapped)
255 self.assertEqual(test_bytes, test_unwrapped)
257 client_session_key = self.gensec_client.session_key()
258 server_session_key = self.gensec_server.session_key()
259 self.assertEqual(client_session_key, server_session_key)