smbd: improve reinit_after_fork error handling
[Samba.git] / python / samba / tests / samba_tool / user_getpassword_gmsa.py
blobe291d719900f1ce38f7c58e1b5c8fada2ff29efb
1 # Unix SMB/CIFS implementation.
3 # Blackbox tests for reading Group Managed Service Account passwords
5 # Copyright (C) Catalyst.Net Ltd. 2023
7 # Written by Rob van der Linde <rob@catalyst.net.nz>
9 # This program is free software; you can redistribute it and/or modify
10 # it under the terms of the GNU General Public License as published by
11 # the Free Software Foundation; either version 3 of the License, or
12 # (at your option) any later version.
14 # This program is distributed in the hope that it will be useful,
15 # but WITHOUT ANY WARRANTY; without even the implied warranty of
16 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 # GNU General Public License for more details.
19 # You should have received a copy of the GNU General Public License
20 # along with this program. If not, see <http://www.gnu.org/licenses/>.
23 import os
24 import sys
26 sys.path.insert(0, "bin/python")
27 os.environ["PYTHONUNBUFFERED"] = "1"
29 import datetime
30 import shlex
32 from ldb import ERR_INVALID_CREDENTIALS, LdbError, SCOPE_BASE
34 from samba.credentials import MUST_USE_KERBEROS
35 from samba.dcerpc import samr, security
36 from samba.domain.models import GroupManagedServiceAccount, User
37 from samba.ndr import ndr_unpack
38 from samba.nt_time import nt_time_from_datetime
39 from samba.tests import BlackboxTestCase, connect_samdb
41 DC_SERVER = os.environ["SERVER"]
42 SERVER = os.environ["SERVER"]
43 SERVER_USERNAME = os.environ["USERNAME"]
44 SERVER_PASSWORD = os.environ["PASSWORD"]
46 HOST = f"ldap://{SERVER}"
47 CREDS = f"-U{SERVER_USERNAME}%{SERVER_PASSWORD}"
50 class GMSAPasswordTest(BlackboxTestCase):
51 """Blackbox tests for GMSA getpassword and connecting as that user."""
53 @classmethod
54 def setUpClass(cls):
55 cls.lp = cls.get_loadparm()
56 cls.env_creds = cls.get_env_credentials(lp=cls.lp,
57 env_username="USERNAME",
58 env_password="PASSWORD",
59 env_domain="DOMAIN",
60 env_realm="REALM")
61 cls.samdb = connect_samdb(HOST, lp=cls.lp, credentials=cls.env_creds)
62 super().setUpClass()
64 @classmethod
65 def setUpTestData(cls):
66 cls.gmsa = GroupManagedServiceAccount.create(
67 cls.samdb,
68 name="GMSA_Test_User",
69 dns_host_name="samba.example.com",
70 managed_password_interval=1,
71 group_msa_membership=f"O:SYD:(A;;RP;;;{cls.samdb.connecting_user_sid})")
73 cls.addClassCleanup(cls.gmsa.delete, cls.samdb)
75 def getpassword(self, attrs):
76 shattrs = shlex.quote(attrs)
77 cmd = f"user getpassword --attributes={shattrs} {self.gmsa.account_name}"
79 ldif = self.check_output(cmd).decode()
80 res = self.samdb.parse_ldif(ldif)
81 _, user_message = next(res)
83 # check each attr is returned
84 for attr in attrs.split(","):
85 self.assertIn(attr, user_message)
87 return user_message
89 def test_getpassword(self):
90 self.getpassword("virtualClearTextUTF16,unicodePwd")
91 self.getpassword("virtualClearTextUTF16")
92 self.getpassword("unicodePwd")
94 def test_utf16_password(self):
95 user_msg = self.getpassword("virtualClearTextUTF16")
96 password = user_msg["virtualClearTextUTF16"][0]
98 creds = self.insta_creds(template=self.env_creds)
99 creds.set_username(self.gmsa.account_name)
100 creds.set_utf16_password(password)
101 try:
102 db = connect_samdb(HOST, credentials=creds, lp=self.lp)
103 except LdbError as err:
104 num, _ = err.args
105 if num == ERR_INVALID_CREDENTIALS:
106 self.fail('failed to authenticate using credentials')
108 raise
110 msg = db.search(base="", scope=SCOPE_BASE, attrs=["tokenGroups"])[0]
111 connecting_user_sid = str(ndr_unpack(security.dom_sid, msg["tokenGroups"][0]))
113 self.assertEqual(self.gmsa.object_sid, connecting_user_sid)
115 def test_utf8_password(self):
116 user_msg = self.getpassword("virtualClearTextUTF8")
117 password = str(user_msg["virtualClearTextUTF8"][0])
119 creds = self.insta_creds(template=self.env_creds)
120 # Because the password has been converted to utf-8 via UTF16_MUNGED
121 # the nthash is no longer valid. We need to use AES kerberos ciphers
122 # for this to work.
123 creds.set_kerberos_state(MUST_USE_KERBEROS)
124 creds.set_username(self.gmsa.account_name)
125 creds.set_password(password)
126 try:
127 db = connect_samdb(HOST, credentials=creds, lp=self.lp)
128 except LdbError as err:
129 num, _ = err.args
130 if num == ERR_INVALID_CREDENTIALS:
131 self.fail('failed to authenticate using credentials')
133 raise
135 msg = db.search(base="", scope=SCOPE_BASE, attrs=["tokenGroups"])[0]
136 connecting_user_sid = str(ndr_unpack(security.dom_sid, msg["tokenGroups"][0]))
138 self.assertEqual(self.gmsa.object_sid, connecting_user_sid)
140 def test_unicode_pwd(self):
141 user_msg = self.getpassword("unicodePwd")
143 creds = self.insta_creds(template=self.env_creds)
144 creds.set_username(self.gmsa.account_name)
145 nt_pass = samr.Password()
146 nt_pass.hash = list(user_msg["unicodePwd"][0])
147 creds.set_nt_hash(nt_pass)
148 try:
149 db = connect_samdb(HOST, credentials=creds, lp=self.lp)
150 except LdbError as err:
151 num, _ = err.args
152 if num == ERR_INVALID_CREDENTIALS:
153 self.fail('failed to authenticate using credentials')
155 raise
157 msg = db.search(base="", scope=SCOPE_BASE, attrs=["tokenGroups"])[0]
158 connecting_user_sid = str(ndr_unpack(security.dom_sid, msg["tokenGroups"][0]))
160 self.assertEqual(self.gmsa.object_sid, connecting_user_sid)
162 def test_querytime(self):
163 user_msg = self.getpassword("virtualManagedPasswordQueryTime")
164 querytime = int(user_msg["virtualManagedPasswordQueryTime"][0])
166 # Just assert the number makes sense
167 self.assertGreater(querytime, nt_time_from_datetime(datetime.datetime.now(tz=datetime.timezone.utc)))
168 self.assertLess(querytime, nt_time_from_datetime(datetime.datetime.now(tz=datetime.timezone.utc) + datetime.timedelta(hours=21)))
170 def test_querytime_unixtime(self):
171 user_msg = self.getpassword("virtualManagedPasswordQueryTime;format=UnixTime")
172 querytime = int(user_msg["virtualManagedPasswordQueryTime;format=UnixTime"][0])
174 # Just assert the number makes sense
175 self.assertGreater(querytime, datetime.datetime.now(tz=datetime.timezone.utc).timestamp())
176 self.assertLess(querytime, (datetime.datetime.now(tz=datetime.timezone.utc) + datetime.timedelta(hours=21)).timestamp())
178 @classmethod
179 def _make_cmdline(cls, line):
180 """Override to pass line as samba-tool subcommand instead.
182 Automatically fills in HOST and CREDS as well.
184 if isinstance(line, list):
185 cmd = ["samba-tool"] + line + ["-H", SERVER, CREDS]
186 else:
187 cmd = f"samba-tool {line} -H {HOST} {CREDS}"
189 return super()._make_cmdline(cmd)
192 if __name__ == "__main__":
193 import unittest
194 unittest.main()