3 # Unix SMB/CIFS implementation.
4 # Copyright (C) Jelmer Vernooij <jelmer@samba.org> 2007-2008
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 3 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful,
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 # GNU General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program. If not, see <http://www.gnu.org/licenses/>.
24 from samba
import param
25 from samba
.credentials
import Credentials
26 from samba
.auth
import system_session
27 from samba
.provision
import getpolicypath
28 from samba
.upgradehelpers
import (get_paths
, get_ldbs
,
29 find_provision_key_parameters
, identic_rename
,
30 updateOEMInfo
, getOEMInfo
, update_gpo
,
31 delta_update_basesamdb
,
32 search_constructed_attrs_stored
,
33 increment_calculated_keyversion_number
)
34 from samba
.tests
import env_loadparm
, TestCaseInTempDir
35 from samba
.tests
.provision
import create_dummy_secretsdb
39 def dummymessage(a
=None, b
=None):
42 smb_conf_path
= "%s/%s/%s" % (os
.environ
["SELFTEST_PREFIX"], "dc", "etc/smb.conf")
44 class UpgradeProvisionBasicLdbHelpersTestCase(TestCaseInTempDir
):
45 """Some simple tests for individual functions in the provisioning code.
48 def test_get_ldbs(self
):
49 paths
= get_paths(param
, None, smb_conf_path
)
53 get_ldbs(paths
, creds
, system_session(), lp
)
55 def test_find_key_param(self
):
56 paths
= get_paths(param
, None, smb_conf_path
)
60 rootdn
= "dc=samba,dc=example,dc=com"
61 ldbs
= get_ldbs(paths
, creds
, system_session(), lp
)
62 names
= find_provision_key_parameters(ldbs
.sam
, ldbs
.secrets
, ldbs
.idmap
,
63 paths
, smb_conf_path
, lp
)
64 self
.assertEquals(names
.realm
, "SAMBA.EXAMPLE.COM")
65 self
.assertEquals(str(names
.rootdn
).lower(), rootdn
.lower())
66 self
.assertNotEquals(names
.policyid_dc
, None)
67 self
.assertNotEquals(names
.ntdsguid
, "")
70 class UpgradeProvisionWithLdbTestCase(TestCaseInTempDir
):
72 def _getEmptyDbName(self
):
73 return os
.path
.join(self
.tempdir
, "sam.ldb")
76 super(UpgradeProvisionWithLdbTestCase
, self
).setUp()
77 paths
= get_paths(param
, None, smb_conf_path
)
78 self
.creds
= Credentials()
79 self
.lp
= env_loadparm()
80 self
.creds
.guess(self
.lp
)
82 self
.ldbs
= get_ldbs(paths
, self
.creds
, system_session(), self
.lp
)
83 self
.names
= find_provision_key_parameters(self
.ldbs
.sam
,
84 self
.ldbs
.secrets
, self
.ldbs
.idmap
, paths
, smb_conf_path
,
86 self
.referencedb
= create_dummy_secretsdb(
87 os
.path
.join(self
.tempdir
, "ref.ldb"))
89 def test_search_constructed_attrs_stored(self
):
90 hashAtt
= search_constructed_attrs_stored(self
.ldbs
.sam
,
92 ["msds-KeyVersionNumber"])
93 self
.assertFalse(hashAtt
.has_key("msds-KeyVersionNumber"))
95 def test_increment_calculated_keyversion_number(self
):
96 dn
= "CN=Administrator,CN=Users,%s" % self
.names
.rootdn
97 # We conctruct a simple hash for the user administrator
99 # And we want the version to be 140
100 hash[dn
.lower()] = 140
102 increment_calculated_keyversion_number(self
.ldbs
.sam
,
105 self
.assertEqual(self
.ldbs
.sam
.get_attribute_replmetadata_version(dn
,
108 # This function should not decrement the version
109 hash[dn
.lower()] = 130
111 increment_calculated_keyversion_number(self
.ldbs
.sam
,
114 self
.assertEqual(self
.ldbs
.sam
.get_attribute_replmetadata_version(dn
,
118 def test_identic_rename(self
):
119 rootdn
= "DC=samba,DC=example,DC=com"
121 guestDN
= ldb
.Dn(self
.ldbs
.sam
, "CN=Guest,CN=Users,%s" % rootdn
)
122 identic_rename(self
.ldbs
.sam
, guestDN
)
123 res
= self
.ldbs
.sam
.search(expression
="(name=Guest)", base
=rootdn
,
124 scope
=ldb
.SCOPE_SUBTREE
, attrs
=["dn"])
125 self
.assertEquals(len(res
), 1)
126 self
.assertEquals(str(res
[0]["dn"]), "CN=Guest,CN=Users,%s" % rootdn
)
128 def test_delta_update_basesamdb(self
):
129 dummysampath
= self
._getEmptyDbName
()
130 delta_update_basesamdb(self
.paths
.samdb
, dummysampath
,
131 self
.creds
, system_session(), self
.lp
,
134 def test_update_gpo_simple(self
):
135 dir = getpolicypath(self
.paths
.sysvol
, self
.names
.dnsdomain
,
138 self
.assertFalse(os
.path
.isdir(dir))
139 update_gpo(self
.paths
, self
.ldbs
.sam
, self
.names
, self
.lp
, dummymessage
)
140 self
.assertTrue(os
.path
.isdir(dir))
142 def test_update_gpo_acl(self
):
143 path
= os
.path
.join(self
.tempdir
, "testupdategpo")
144 save
= self
.paths
.sysvol
145 self
.paths
.sysvol
= path
147 os
.mkdir(os
.path
.join(path
, self
.names
.dnsdomain
))
148 os
.mkdir(os
.path
.join(os
.path
.join(path
, self
.names
.dnsdomain
),
150 update_gpo(self
.paths
, self
.ldbs
.sam
, self
.names
, self
.lp
, dummymessage
)
152 self
.paths
.sysvol
= save
154 def test_getOEMInfo(self
):
155 realm
= self
.lp
.get("realm")
156 basedn
= "DC=%s" % realm
.replace(".", ", DC=")
157 oem
= getOEMInfo(self
.ldbs
.sam
, basedn
)
158 self
.assertNotEquals(oem
, "")
160 def test_updateOEMInfo(self
):
161 realm
= self
.lp
.get("realm")
162 basedn
= "DC=%s" % realm
.replace(".", ", DC=")
163 oem
= getOEMInfo(self
.ldbs
.sam
, basedn
)
164 updateOEMInfo(self
.ldbs
.sam
, basedn
)
165 oem2
= getOEMInfo(self
.ldbs
.sam
, basedn
)
166 self
.assertNotEquals(str(oem
), str(oem2
))
167 self
.assertTrue(re
.match(".*upgrade to.*", str(oem2
)))
170 for name
in ["ref.ldb", "secrets.ldb", "sam.ldb"]:
171 path
= os
.path
.join(self
.tempdir
, name
)
172 if os
.path
.exists(path
):
174 super(UpgradeProvisionWithLdbTestCase
, self
).tearDown()