smbd: improve reinit_after_fork error handling
[Samba.git] / python / samba / tests / samba_tool / sites.py
blob4288f35bc94c02e968db8065061ce6a23a2e2502
1 # Unix SMB/CIFS implementation.
2 # Copyright (C) Catalyst.Net LTD 2015
3 # Copyright (C) Sean Dague <sdague@linux.vnet.ibm.com> 2011
5 # Catalyst.Net's contribution was written by Douglas Bagnall
6 # <douglas.bagnall@catalyst.net.nz>.
8 # This program is free software; you can redistribute it and/or modify
9 # it under the terms of the GNU General Public License as published by
10 # the Free Software Foundation; either version 3 of the License, or
11 # (at your option) any later version.
13 # This program is distributed in the hope that it will be useful,
14 # but WITHOUT ANY WARRANTY; without even the implied warranty of
15 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 # GNU General Public License for more details.
18 # You should have received a copy of the GNU General Public License
19 # along with this program. If not, see <http://www.gnu.org/licenses/>.
22 import json
23 import os
24 import ldb
25 from samba.tests.samba_tool.base import SambaToolCmdTest
26 from samba import sites, subnets
29 class BaseSitesCmdTestCase(SambaToolCmdTest):
30 """Tests for samba-tool sites subnets"""
31 def setUp(self):
32 super().setUp()
33 self.dburl = "ldap://%s" % os.environ["DC_SERVER"]
34 self.creds_string = "-U%s%%%s" % (os.environ["DC_USERNAME"],
35 os.environ["DC_PASSWORD"])
37 self.samdb = self.getSamDB("-H", self.dburl, self.creds_string)
38 self.config_dn = str(self.samdb.get_config_basedn())
41 class SitesCmdTestCase(BaseSitesCmdTestCase):
43 def test_site_create(self):
44 sitename = 'new_site'
46 result, out, err = self.runsubcmd("sites", "create", sitename,
47 "-H", self.dburl, self.creds_string)
48 self.assertCmdSuccess(result, out, err)
50 dnsites = ldb.Dn(self.samdb, "CN=Sites,%s" % self.config_dn)
51 dnsite = ldb.Dn(self.samdb, "CN=%s,%s" % (sitename, dnsites))
53 ret = self.samdb.search(base=dnsites, scope=ldb.SCOPE_ONELEVEL,
54 expression='(cn=%s)' % sitename)
55 self.assertEqual(len(ret), 1)
57 # now delete it
58 self.samdb.delete(dnsite, ["tree_delete:0"])
60 def test_site_list(self):
61 result, out, err = self.runsubcmd("sites", "list",
62 "-H", self.dburl, self.creds_string)
63 self.assertCmdSuccess(result, out, err)
64 self.assertIn("Default-First-Site-Name", out)
66 # The same but with --json
67 result, out, err = self.runsubcmd("sites", "list", "--json",
68 "-H", self.dburl, self.creds_string)
69 self.assertCmdSuccess(result, out, err)
70 json_data = json.loads(out)
71 self.assertIn("Default-First-Site-Name", json_data)
73 def test_site_view(self):
74 result, out, err = self.runsubcmd("sites", "view",
75 "Default-First-Site-Name",
76 "-H", self.dburl, self.creds_string)
77 self.assertCmdSuccess(result, out, err)
78 json_data = json.loads(out)
79 self.assertEqual(json_data["cn"], "Default-First-Site-Name")
81 # Now try one that doesn't exist
82 result, out, err = self.runsubcmd("sites", "view",
83 "Does-Not-Exist",
84 "-H", self.dburl, self.creds_string)
85 self.assertCmdFail(result, err)
88 class SitesSubnetCmdTestCase(BaseSitesCmdTestCase):
89 def setUp(self):
90 super().setUp()
91 self.sitename = "testsite"
92 self.sitename2 = "testsite2"
93 self.samdb.transaction_start()
94 sites.create_site(self.samdb, self.config_dn, self.sitename)
95 sites.create_site(self.samdb, self.config_dn, self.sitename2)
96 self.samdb.transaction_commit()
98 def tearDown(self):
99 self.samdb.transaction_start()
100 sites.delete_site(self.samdb, self.config_dn, self.sitename)
101 sites.delete_site(self.samdb, self.config_dn, self.sitename2)
102 self.samdb.transaction_commit()
103 super().tearDown()
105 def test_site_subnet_create(self):
106 cidrs = (("10.9.8.0/24", self.sitename),
107 ("50.60.0.0/16", self.sitename2),
108 ("50.61.0.0/16", self.sitename2), # second subnet on the site
109 ("50.0.0.0/8", self.sitename), # overlapping subnet, other site
110 ("50.62.1.2/32", self.sitename), # single IP
111 ("aaaa:bbbb:cccc:dddd:eeee:ffff:2222:1100/120",
112 self.sitename2),
115 for cidr, sitename in cidrs:
116 result, out, err = self.runsubcmd("sites", "subnet", "create",
117 cidr, sitename,
118 "-H", self.dburl,
119 self.creds_string)
120 self.assertCmdSuccess(result, out, err)
122 ret = self.samdb.search(base=self.config_dn,
123 scope=ldb.SCOPE_SUBTREE,
124 expression=('(&(objectclass=subnet)(cn=%s))'
125 % cidr))
126 self.assertIsNotNone(ret)
127 self.assertEqual(len(ret), 1)
129 dnsubnets = ldb.Dn(self.samdb,
130 "CN=Subnets,CN=Sites,%s" % self.config_dn)
132 for cidr, sitename in cidrs:
133 dnsubnet = ldb.Dn(self.samdb, ("Cn=%s,CN=Subnets,CN=Sites,%s" %
134 (cidr, self.config_dn)))
136 ret = self.samdb.search(base=dnsubnets, scope=ldb.SCOPE_ONELEVEL,
137 expression='(CN=%s)' % cidr)
138 self.assertIsNotNone(ret)
139 self.assertEqual(len(ret), 1)
140 self.samdb.delete(dnsubnet, ["tree_delete:0"])
142 def test_site_subnet_create_should_fail(self):
143 cidrs = (("10.9.8.0/33", self.sitename), # mask too big
144 ("50.60.0.0/8", self.sitename2), # insufficient zeros
145 ("50.261.0.0/16", self.sitename2), # bad octet
146 ("7.0.0.0.0/0", self.sitename), # insufficient zeros
147 ("aaaa:bbbb:cccc:dddd:eeee:ffff:2222:1100/119",
148 self.sitename), # insufficient zeros
151 for cidr, sitename in cidrs:
152 result, out, err = self.runsubcmd("sites", "subnet", "create",
153 cidr, sitename,
154 "-H", self.dburl,
155 self.creds_string)
156 self.assertCmdFail(result)
158 ret = self.samdb.search(base=self.config_dn,
159 scope=ldb.SCOPE_SUBTREE,
160 expression=('(&(objectclass=subnet)(cn=%s))'
161 % cidr))
163 self.assertIsNotNone(ret)
164 self.assertEqual(len(ret), 0)
166 def test_site_subnet_list(self):
167 subnet = "10.9.8.0/24"
168 subnets.create_subnet(self.samdb, self.samdb.get_config_basedn(),
169 subnet, self.sitename)
171 # cleanup after test
172 dnsubnet = ldb.Dn(self.samdb, ("CN=%s,CN=Subnets,CN=Sites,%s" %
173 (subnet, self.config_dn)))
174 self.addCleanup(self.samdb.delete, dnsubnet, ["tree_delete:1"])
176 result, out, err = self.runsubcmd("sites", "subnet", "list",
177 self.sitename,
178 "-H", self.dburl, self.creds_string)
180 self.assertCmdSuccess(result, out, err)
181 self.assertIn(subnet, out)
183 def test_site_subnet_view(self):
184 subnet = "50.60.0.0/16"
185 subnets.create_subnet(self.samdb, self.samdb.get_config_basedn(),
186 subnet, self.sitename2)
188 # cleanup after test
189 dnsubnet = ldb.Dn(self.samdb, ("CN=%s,CN=Subnets,CN=Sites,%s" %
190 (subnet, self.config_dn)))
191 self.addCleanup(self.samdb.delete, dnsubnet, ["tree_delete:1"])
193 result, out, err = self.runsubcmd("sites", "subnet",
194 "view", subnet,
195 "-H", self.dburl, self.creds_string)
197 self.assertCmdSuccess(result, out, err)
198 json_data = json.loads(out)
199 self.assertEqual(json_data["cn"], subnet)
201 # Now try one that doesn't exist
202 result, out, err = self.runsubcmd("sites", "subnet",
203 "view", "50.0.0.0/8",
204 "-H", self.dburl, self.creds_string)
205 self.assertCmdFail(result, err)