1 # Unix SMB/CIFS implementation.
2 # A test for the ntlm_auth tool
3 # Copyright (C) Kai Blin <kai@samba.org> 2008
4 # Copyright (C) Samuel Cabrero <scabrero@suse.de> 2018
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 3 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful,
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 # GNU General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program. If not, see <http://www.gnu.org/licenses/>.
20 This test program will start ntlm_auth with the given command line switches and
21 see if it will get the expected results.
27 from samba
.tests
import BlackboxTestCase
29 class NTLMAuthTestCase(BlackboxTestCase
):
32 super(NTLMAuthTestCase
, self
).setUp()
33 bindir
= os
.path
.normpath(os
.getenv("BINDIR", "./bin"))
34 self
.ntlm_auth_path
= os
.path
.join(bindir
, 'ntlm_auth')
35 self
.lp
= samba
.tests
.env_loadparm()
36 self
.winbind_separator
= self
.lp
.get('winbind separator')
38 def readLine(self
, text_stream
):
39 buf
= text_stream
.readline()
40 newline
= buf
.find('\n')
42 raise Exception("Failed to read line")
45 def writeLine(self
, text_stream
, buf
):
46 text_stream
.write(buf
)
47 text_stream
.write("\n")
53 client_use_cached_creds
=False,
57 client_helper
="ntlmssp-client-1",
58 server_helper
="squid-2.5-ntlmssp",
59 server_use_winbind
=False,
60 require_membership
=None,
63 self
.assertTrue(os
.access(self
.ntlm_auth_path
, os
.X_OK
))
65 if client_username
is None:
66 raise Exception("client_username required")
70 client_args
.append(self
.ntlm_auth_path
)
71 client_args
.append("--helper-protocol=%s" % client_helper
)
72 client_args
.append("--username=%s" % client_username
)
74 client_args
.append("--domain=%s" % client_domain
)
75 if client_use_cached_creds
:
76 client_args
.append("--use-cached-creds")
78 if client_password
is None:
79 raise Exception("client_password required")
80 client_args
.append("--password=%s" % client_password
)
82 client_args
.append("--target-service=%s" % target_service
)
84 client_args
.append("--target-hostname=%s" % target_hostname
)
85 client_args
.append("--configfile=%s" % self
.lp
.configfile
)
89 server_args
.append(self
.ntlm_auth_path
)
90 server_args
.append("--helper-protocol=%s" % server_helper
)
91 server_args
.append("--configfile=%s" % self
.lp
.configfile
)
92 if not server_use_winbind
:
93 if server_username
is None or server_password
is None or server_domain
is None:
94 raise Exception("Server credentials required if not using winbind")
95 server_args
.append("--username=%s" % server_username
)
96 server_args
.append("--password=%s" % server_password
)
97 server_args
.append("--domain=%s" % server_domain
)
98 if require_membership
is not None:
99 raise Exception("Server must be using winbind for require-membership-of")
101 if require_membership
is not None:
102 server_args
.append("--require-membership-of=%s" % require_membership
)
106 server_proc
= subprocess
.Popen(server_args
, stdout
=subprocess
.PIPE
, stdin
=subprocess
.PIPE
, bufsize
=0, universal_newlines
=True)
107 client_proc
= subprocess
.Popen(client_args
, stdout
=subprocess
.PIPE
, stdin
=subprocess
.PIPE
, bufsize
=0, universal_newlines
=True)
110 if client_helper
== "ntlmssp-client-1" and server_helper
== "squid-2.5-ntlmssp":
111 self
.writeLine(client_proc
.stdin
, "YR")
112 buf
= self
.readLine(client_proc
.stdout
)
113 self
.assertTrue(buf
.startswith("YR "))
115 self
.writeLine(server_proc
.stdin
, buf
)
116 buf
= self
.readLine(server_proc
.stdout
)
117 self
.assertTrue(buf
.startswith("TT "))
119 self
.writeLine(client_proc
.stdin
, buf
)
120 buf
= self
.readLine(client_proc
.stdout
)
121 self
.assertTrue(buf
.startswith("AF "))
123 # Client sends 'AF <base64 blob>' but server
124 # expects 'KK <base64 blob>'
125 buf
= buf
.replace("AF", "KK", 1)
127 self
.writeLine(server_proc
.stdin
, buf
)
128 buf
= self
.readLine(server_proc
.stdout
)
129 result
= buf
.startswith("AF ")
130 elif client_helper
== "ntlmssp-client-1" and server_helper
== "gss-spnego":
131 self
.writeLine(client_proc
.stdin
, "YR")
132 buf
= self
.readLine(client_proc
.stdout
)
133 self
.assertTrue(buf
.startswith("YR "))
135 self
.writeLine(server_proc
.stdin
, buf
)
136 buf
= self
.readLine(server_proc
.stdout
)
137 self
.assertTrue(buf
.startswith("TT "))
139 self
.writeLine(client_proc
.stdin
, buf
)
140 buf
= self
.readLine(client_proc
.stdout
)
141 self
.assertTrue(buf
.startswith("AF "))
143 # Client sends 'AF <base64 blob>' but server expects 'KK <abse64 blob>'
144 buf
= buf
.replace("AF", "KK", 1)
146 self
.writeLine(server_proc
.stdin
, buf
)
147 buf
= self
.readLine(server_proc
.stdout
)
148 result
= buf
.startswith("AF * ")
149 elif client_helper
== "gss-spnego-client" and server_helper
== "gss-spnego":
150 self
.writeLine(server_proc
.stdin
, "YR")
151 buf
= self
.readLine(server_proc
.stdout
)
154 if (buf
.startswith("NA * ")):
158 self
.assertTrue(buf
.startswith("AF ") or buf
.startswith("TT "))
160 self
.writeLine(client_proc
.stdin
, buf
)
161 buf
= self
.readLine(client_proc
.stdout
)
163 if buf
.startswith("AF"):
167 self
.assertTrue(buf
.startswith("AF ") or buf
.startswith("KK ") or buf
.startswith("TT "))
169 self
.writeLine(server_proc
.stdin
, buf
)
170 buf
= self
.readLine(server_proc
.stdout
)
172 if buf
.startswith("AF * "):
176 self
.fail("Helper protocols not handled")
178 if result
is True and client_helper
== "ntlmssp-client-1":
179 self
.writeLine(client_proc
.stdin
, "GK")
180 buf
= self
.readLine(client_proc
.stdout
)
181 self
.assertTrue(buf
.startswith("GK "))
183 self
.writeLine(client_proc
.stdin
, "GF")
184 buf
= self
.readLine(client_proc
.stdout
)
185 self
.assertTrue(buf
.startswith("GF "))
187 if result
is True and server_helper
== "squid-2.5-ntlmssp":
188 self
.writeLine(server_proc
.stdin
, "GK")
189 buf
= self
.readLine(server_proc
.stdout
)
190 self
.assertTrue(buf
.startswith("GK "))
192 self
.writeLine(server_proc
.stdin
, "GF")
193 buf
= self
.readLine(server_proc
.stdout
)
194 self
.assertTrue(buf
.startswith("GF "))
196 client_proc
.stdin
.close()
198 self
.assertEqual(client_proc
.returncode
, 0)
200 server_proc
.stdin
.close()
202 self
.assertEqual(server_proc
.returncode
, 0)