smbd: improve reinit_after_fork error handling
[Samba.git] / python / samba / tests / kcc / ldif_import_export.py
blob9e573bf942ddc9c621c2ddb78d9337a143d9e428
1 # Unix SMB/CIFS implementation. Tests for samba.kcc.ldif_import_export.
2 # Copyright (C) Andrew Bartlett 2015
4 # Written by Douglas Bagnall <douglas.bagnall@catalyst.net.nz>
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 3 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful,
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 # GNU General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program. If not, see <http://www.gnu.org/licenses/>.
20 """Tests for samba.kcc.ldif_import_export"""
22 import samba
23 import os
24 import time
25 import subprocess
26 import logging
27 import samba.tests
28 from samba.kcc import ldif_import_export, KCC
29 from samba import ldb
30 from samba.dcerpc import misc
33 from samba.param import LoadParm
34 from samba.credentials import Credentials
35 from samba.samdb import SamDB
37 unix_now = int(time.time())
39 MULTISITE_LDIF = os.path.join(os.environ['SRCDIR_ABS'],
40 "testdata/ldif-utils-test-multisite.ldif")
43 # UNCONNECTED_LDIF is a single site, unconnected 5DC database that was
44 # created using samba-tool domain join in testenv.
45 UNCONNECTED_LDIF = os.path.join(os.environ['SRCDIR_ABS'],
46 "testdata/unconnected-intrasite.ldif")
48 MULTISITE_LDIF_DSAS = (
49 ("CN=WIN08,CN=Servers,CN=Site-4,CN=Sites,CN=Configuration,DC=ad,DC=samba,DC=example,DC=com",
50 "Site-4"),
51 ("CN=WIN07,CN=Servers,CN=Site-4,CN=Sites,CN=Configuration,DC=ad,DC=samba,DC=example,DC=com",
52 "Site-4"),
53 ("CN=WIN06,CN=Servers,CN=Site-3,CN=Sites,CN=Configuration,DC=ad,DC=samba,DC=example,DC=com",
54 "Site-3"),
55 ("CN=WIN09,CN=Servers,CN=Site-5,CN=Sites,CN=Configuration,DC=ad,DC=samba,DC=example,DC=com",
56 "Site-5"),
57 ("CN=WIN10,CN=Servers,CN=Site-5,CN=Sites,CN=Configuration,DC=ad,DC=samba,DC=example,DC=com",
58 "Site-5"),
59 ("CN=WIN02,CN=Servers,CN=Site-2,CN=Sites,CN=Configuration,DC=ad,DC=samba,DC=example,DC=com",
60 "Site-2"),
61 ("CN=WIN04,CN=Servers,CN=Site-2,CN=Sites,CN=Configuration,DC=ad,DC=samba,DC=example,DC=com",
62 "Site-2"),
63 ("CN=WIN03,CN=Servers,CN=Site-2,CN=Sites,CN=Configuration,DC=ad,DC=samba,DC=example,DC=com",
64 "Site-2"),
65 ("CN=WIN05,CN=Servers,CN=Site-2,CN=Sites,CN=Configuration,DC=ad,DC=samba,DC=example,DC=com",
66 "Site-2"),
67 ("CN=WIN01,CN=Servers,CN=Default-First-Site-Name,CN=Sites,CN=Configuration,DC=ad,DC=samba,DC=example,DC=com",
68 "Default-First-Site-Name"),
72 class LdifImportExportTests(samba.tests.TestCaseInTempDir):
73 def setUp(self):
74 super().setUp()
75 self.lp = LoadParm()
76 self.creds = Credentials()
77 self.creds.guess(self.lp)
79 def remove_files(self, *files):
80 for f in files:
81 assert(f.startswith(self.tempdir))
82 os.unlink(f)
84 def test_write_search_url(self):
85 pass
87 def test_ldif_to_samdb(self):
88 dburl = os.path.join(self.tempdir, "ldap")
89 samdb = ldif_import_export.ldif_to_samdb(dburl, self.lp,
90 MULTISITE_LDIF)
91 self.assertIsInstance(samdb, SamDB)
93 dsa = ("CN=WIN01,CN=Servers,CN=Default-First-Site-Name,CN=Sites,"
94 "CN=Configuration,DC=ad,DC=samba,DC=example,DC=com")
95 res = samdb.search(ldb.Dn(samdb, "CN=NTDS Settings," + dsa),
96 scope=ldb.SCOPE_BASE, attrs=["objectGUID"])
98 ntds_guid = misc.GUID(samdb.get_ntds_GUID())
99 self.assertEqual(misc.GUID(res[0]["objectGUID"][0]), ntds_guid)
101 service_name_res = samdb.search(base="",
102 scope=ldb.SCOPE_BASE,
103 attrs=["dsServiceName"])
104 dn = ldb.Dn(samdb,
105 service_name_res[0]["dsServiceName"][0].decode('utf8'))
106 self.assertEqual(dn, ldb.Dn(samdb, "CN=NTDS Settings," + dsa))
107 self.remove_files(dburl)
109 def test_ldif_to_samdb_forced_local_dsa(self):
110 for dsa, site in MULTISITE_LDIF_DSAS:
111 dburl = os.path.join(self.tempdir, "ldif-to-samba-forced-local-dsa"
112 "-%s" % dsa)
113 samdb = ldif_import_export.ldif_to_samdb(dburl, self.lp,
114 MULTISITE_LDIF,
115 forced_local_dsa=dsa)
116 self.assertIsInstance(samdb, SamDB)
117 self.assertEqual(samdb.server_site_name(), site)
119 res = samdb.search(ldb.Dn(samdb, "CN=NTDS Settings," + dsa),
120 scope=ldb.SCOPE_BASE, attrs=["objectGUID"])
122 ntds_guid = misc.GUID(samdb.get_ntds_GUID())
123 self.assertEqual(misc.GUID(res[0]["objectGUID"][0]), ntds_guid)
125 service_name_res = samdb.search(base="",
126 scope=ldb.SCOPE_BASE,
127 attrs=["dsServiceName"])
128 dn = ldb.Dn(samdb,
129 service_name_res[0]["dsServiceName"][0].decode('utf8'))
130 self.assertEqual(dn, ldb.Dn(samdb, "CN=NTDS Settings," + dsa))
131 self.remove_files(dburl)
133 def test_samdb_to_ldif_file(self):
134 dburl = os.path.join(self.tempdir, "ldap")
135 dburl2 = os.path.join(self.tempdir, "ldap_roundtrip")
136 ldif_file = os.path.join(self.tempdir, "ldif")
137 samdb = ldif_import_export.ldif_to_samdb(dburl, self.lp,
138 MULTISITE_LDIF)
139 self.assertIsInstance(samdb, SamDB)
140 ldif_import_export.samdb_to_ldif_file(samdb, dburl,
141 lp=self.lp, creds=None,
142 ldif_file=ldif_file)
143 self.assertGreater(os.path.getsize(ldif_file), 1000,
144 "LDIF should be larger than 1000 bytes")
145 samdb = ldif_import_export.ldif_to_samdb(dburl2, self.lp,
146 ldif_file)
147 self.assertIsInstance(samdb, SamDB)
148 dsa = ("CN=WIN01,CN=Servers,CN=Default-First-Site-Name,CN=Sites,"
149 "CN=Configuration,DC=ad,DC=samba,DC=example,DC=com")
150 res = samdb.search(ldb.Dn(samdb, "CN=NTDS Settings," + dsa),
151 scope=ldb.SCOPE_BASE, attrs=["objectGUID"])
152 self.remove_files(dburl)
153 self.remove_files(dburl2)
154 self.remove_files(ldif_file)
157 class KCCMultisiteLdifTests(samba.tests.TestCaseInTempDir):
158 def setUp(self):
159 super().setUp()
160 self.lp = LoadParm()
161 self.creds = Credentials()
162 self.creds.guess(self.lp)
164 def remove_files(self, *files):
165 for f in files:
166 assert(f.startswith(self.tempdir))
167 os.unlink(f)
169 def _get_kcc(self, name, readonly=False, verify=False, dot_file_dir=None):
170 # Note that setting read-only to False won't affect the ldif,
171 # only the temporary database that is created from it.
172 my_kcc = KCC(unix_now, readonly=readonly, verify=verify,
173 dot_file_dir=dot_file_dir)
174 tmpdb = os.path.join(self.tempdir, 'tmpdb')
175 my_kcc.import_ldif(tmpdb, self.lp, MULTISITE_LDIF)
176 self.remove_files(tmpdb)
177 return my_kcc
179 def test_list_dsas(self):
180 my_kcc = self._get_kcc('test-list')
181 dsas = set(my_kcc.list_dsas())
182 expected_dsas = set(x[0] for x in MULTISITE_LDIF_DSAS)
183 self.assertEqual(dsas, expected_dsas)
185 def test_verify(self):
186 """Check that the KCC generates graphs that pass its own verify
187 option.
189 my_kcc = self._get_kcc('test-verify', verify=True)
190 tmpdb = os.path.join(self.tempdir, 'verify-tmpdb')
191 my_kcc.import_ldif(tmpdb, self.lp, MULTISITE_LDIF)
193 my_kcc.run(None,
194 self.lp, self.creds,
195 attempt_live_connections=False)
196 self.remove_files(tmpdb)
198 def test_unconnected_db(self):
199 """Check that the KCC generates errors on a unconnected db
201 my_kcc = self._get_kcc('test-verify', verify=True)
202 tmpdb = os.path.join(self.tempdir, 'verify-tmpdb')
203 my_kcc.import_ldif(tmpdb, self.lp, UNCONNECTED_LDIF)
205 try:
206 my_kcc.run(None,
207 self.lp, self.creds,
208 attempt_live_connections=False)
209 except samba.kcc.graph_utils.GraphError:
210 pass
211 except Exception:
212 self.fail("Did not expect this error.")
213 finally:
214 self.remove_files(tmpdb)
216 def test_dotfiles(self):
217 """Check that KCC writes dot_files when asked.
219 my_kcc = self._get_kcc('test-dotfiles', dot_file_dir=self.tempdir)
220 tmpdb = os.path.join(self.tempdir, 'dotfile-tmpdb')
221 files = [tmpdb]
222 my_kcc.import_ldif(tmpdb, self.lp, MULTISITE_LDIF)
223 my_kcc.run(None,
224 self.lp, self.creds,
225 attempt_live_connections=False)
227 dot = '/usr/bin/dot'
228 for fn in os.listdir(self.tempdir):
229 if fn.endswith('.dot'):
230 ffn = os.path.join(self.tempdir, fn)
231 if os.path.exists(dot) and subprocess.call([dot, '-?']) == 0:
232 r = subprocess.call([dot, '-Tcanon', ffn])
233 self.assertEqual(r, 0)
235 # even if dot is not there, at least check the file is non-empty
236 size = os.stat(ffn).st_size
237 self.assertNotEqual(size, 0)
238 files.append(ffn)
240 self.remove_files(*files)