s3: smbd: Move check_fsp_open() and check_fsp() to smb1_reply.c
[Samba.git] / python / samba / tests / ntlm_auth_base.py
blob546c89762cc61a33b6d07badf5f047b77b7d08a7
1 # Unix SMB/CIFS implementation.
2 # A test for the ntlm_auth tool
3 # Copyright (C) Kai Blin <kai@samba.org> 2008
4 # Copyright (C) Samuel Cabrero <scabrero@suse.de> 2018
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 3 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful,
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 # GNU General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program. If not, see <http://www.gnu.org/licenses/>.
19 """Test ntlm_auth
20 This test program will start ntlm_auth with the given command line switches and
21 see if it will get the expected results.
22 """
24 import os
25 import samba
26 import subprocess
27 from samba.tests import BlackboxTestCase
29 class NTLMAuthTestCase(BlackboxTestCase):
31 def setUp(self):
32 super(NTLMAuthTestCase, self).setUp()
33 bindir = os.path.normpath(os.getenv("BINDIR", "./bin"))
34 self.ntlm_auth_path = os.path.join(bindir, 'ntlm_auth')
35 self.lp = samba.tests.env_loadparm()
36 self.winbind_separator = self.lp.get('winbind separator')
38 def readLine(self, text_stream):
39 buf = text_stream.readline()
40 newline = buf.find('\n')
41 if newline == -1:
42 raise Exception("Failed to read line")
43 return buf[:newline]
45 def writeLine(self, text_stream, buf):
46 text_stream.write(buf)
47 text_stream.write("\n")
49 def run_helper(self,
50 client_username=None,
51 client_password=None,
52 client_domain=None,
53 client_use_cached_creds=False,
54 server_username=None,
55 server_password=None,
56 server_domain=None,
57 client_helper="ntlmssp-client-1",
58 server_helper="squid-2.5-ntlmssp",
59 server_use_winbind=False,
60 require_membership=None,
61 target_hostname=None,
62 target_service=None):
63 self.assertTrue(os.access(self.ntlm_auth_path, os.X_OK))
65 if client_username is None:
66 raise Exception("client_username required")
68 # Client helper args
69 client_args = []
70 client_args.append(self.ntlm_auth_path)
71 client_args.append("--helper-protocol=%s" % client_helper)
72 client_args.append("--username=%s" % client_username)
73 if client_domain:
74 client_args.append("--domain=%s" % client_domain)
75 if client_use_cached_creds:
76 client_args.append("--use-cached-creds")
77 else:
78 if client_password is None:
79 raise Exception("client_password required")
80 client_args.append("--password=%s" % client_password)
81 if target_service:
82 client_args.append("--target-service=%s" % target_service)
83 if target_hostname:
84 client_args.append("--target-hostname=%s" % target_hostname)
85 client_args.append("--configfile=%s" % self.lp.configfile)
87 # Server helper args
88 server_args = []
89 server_args.append(self.ntlm_auth_path)
90 server_args.append("--helper-protocol=%s" % server_helper)
91 server_args.append("--configfile=%s" % self.lp.configfile)
92 if not server_use_winbind:
93 if server_username is None or server_password is None or server_domain is None:
94 raise Exception("Server credentials required if not using winbind")
95 server_args.append("--username=%s" % server_username)
96 server_args.append("--password=%s" % server_password)
97 server_args.append("--domain=%s" % server_domain)
98 if require_membership is not None:
99 raise Exception("Server must be using winbind for require-membership-of")
100 else:
101 if require_membership is not None:
102 server_args.append("--require-membership-of=%s" % require_membership)
104 # Run helpers
105 result = False
106 server_proc = subprocess.Popen(server_args, stdout=subprocess.PIPE, stdin=subprocess.PIPE, bufsize=0, universal_newlines=True)
107 client_proc = subprocess.Popen(client_args, stdout=subprocess.PIPE, stdin=subprocess.PIPE, bufsize=0, universal_newlines=True)
109 try:
110 if client_helper == "ntlmssp-client-1" and server_helper == "squid-2.5-ntlmssp":
111 self.writeLine(client_proc.stdin, "YR")
112 buf = self.readLine(client_proc.stdout)
113 self.assertTrue(buf.startswith("YR "))
115 self.writeLine(server_proc.stdin, buf)
116 buf = self.readLine(server_proc.stdout)
117 self.assertTrue(buf.startswith("TT "))
119 self.writeLine(client_proc.stdin, buf)
120 buf = self.readLine(client_proc.stdout)
121 self.assertTrue(buf.startswith("AF "))
123 # Client sends 'AF <base64 blob>' but server
124 # expects 'KK <base64 blob>'
125 buf = buf.replace("AF", "KK", 1)
127 self.writeLine(server_proc.stdin, buf)
128 buf = self.readLine(server_proc.stdout)
129 result = buf.startswith("AF ")
130 elif client_helper == "ntlmssp-client-1" and server_helper == "gss-spnego":
131 self.writeLine(client_proc.stdin, "YR")
132 buf = self.readLine(client_proc.stdout)
133 self.assertTrue(buf.startswith("YR "))
135 self.writeLine(server_proc.stdin, buf)
136 buf = self.readLine(server_proc.stdout)
137 self.assertTrue(buf.startswith("TT "))
139 self.writeLine(client_proc.stdin, buf)
140 buf = self.readLine(client_proc.stdout)
141 self.assertTrue(buf.startswith("AF "))
143 # Client sends 'AF <base64 blob>' but server expects 'KK <abse64 blob>'
144 buf = buf.replace("AF", "KK", 1)
146 self.writeLine(server_proc.stdin, buf)
147 buf = self.readLine(server_proc.stdout)
148 result = buf.startswith("AF * ")
149 elif client_helper == "gss-spnego-client" and server_helper == "gss-spnego":
150 self.writeLine(server_proc.stdin, "YR")
151 buf = self.readLine(server_proc.stdout)
153 while True:
154 if (buf.startswith("NA * ")):
155 result = False
156 break
158 self.assertTrue(buf.startswith("AF ") or buf.startswith("TT "))
160 self.writeLine(client_proc.stdin, buf)
161 buf = self.readLine(client_proc.stdout)
163 if buf.startswith("AF"):
164 result = True
165 break
167 self.assertTrue(buf.startswith("AF ") or buf.startswith("KK ") or buf.startswith("TT "))
169 self.writeLine(server_proc.stdin, buf)
170 buf = self.readLine(server_proc.stdout)
172 if buf.startswith("AF * "):
173 result = True
174 break
175 else:
176 self.fail("Helper protocols not handled")
178 if result is True and client_helper == "ntlmssp-client-1":
179 self.writeLine(client_proc.stdin, "GK")
180 buf = self.readLine(client_proc.stdout)
181 self.assertTrue(buf.startswith("GK "))
183 self.writeLine(client_proc.stdin, "GF")
184 buf = self.readLine(client_proc.stdout)
185 self.assertTrue(buf.startswith("GF "))
187 if result is True and server_helper == "squid-2.5-ntlmssp":
188 self.writeLine(server_proc.stdin, "GK")
189 buf = self.readLine(server_proc.stdout)
190 self.assertTrue(buf.startswith("GK "))
192 self.writeLine(server_proc.stdin, "GF")
193 buf = self.readLine(server_proc.stdout)
194 self.assertTrue(buf.startswith("GF "))
196 client_proc.stdin.close()
197 client_proc.wait()
198 self.assertEqual(client_proc.returncode, 0)
200 server_proc.stdin.close()
201 server_proc.wait()
202 self.assertEqual(server_proc.returncode, 0)
204 return result
205 except:
206 client_proc.kill()
207 client_proc.wait()
208 server_proc.kill()
209 server_proc.wait()
210 raise