1 # Unix SMB/CIFS implementation. Tests for dsdb
2 # Copyright (C) Matthieu Patou <mat@matws.net> 2010
4 # This program is free software; you can redistribute it and/or modify
5 # it under the terms of the GNU General Public License as published by
6 # the Free Software Foundation; either version 3 of the License, or
7 # (at your option) any later version.
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
14 # You should have received a copy of the GNU General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 """Tests for samba.dsdb."""
20 from samba
.credentials
import Credentials
21 from samba
.samdb
import SamDB
22 from samba
.auth
import system_session
23 from samba
.tests
import TestCase
24 from samba
.ndr
import ndr_unpack
, ndr_pack
25 from samba
.dcerpc
import drsblobs
32 class DsdbTests(TestCase
):
35 super(DsdbTests
, self
).setUp()
36 self
.lp
= samba
.tests
.env_loadparm()
37 self
.creds
= Credentials()
38 self
.creds
.guess(self
.lp
)
39 self
.session
= system_session()
40 self
.samdb
= SamDB(session_info
=self
.session
,
41 credentials
=self
.creds
,
44 def test_get_oid_from_attrid(self
):
45 oid
= self
.samdb
.get_oid_from_attid(591614)
46 self
.assertEquals(oid
, "1.2.840.113556.1.4.1790")
48 def test_error_replpropertymetadata(self
):
49 res
= self
.samdb
.search(expression
="cn=Administrator",
50 scope
=ldb
.SCOPE_SUBTREE
,
51 attrs
=["replPropertyMetaData"])
52 repl
= ndr_unpack(drsblobs
.replPropertyMetaDataBlob
,
53 str(res
[0]["replPropertyMetaData"]))
56 # Search for Description
58 old_version
= o
.version
59 o
.version
= o
.version
+ 1
60 replBlob
= ndr_pack(repl
)
63 msg
["replPropertyMetaData"] = ldb
.MessageElement(replBlob
, ldb
.FLAG_MOD_REPLACE
, "replPropertyMetaData")
64 self
.assertRaises(ldb
.LdbError
, self
.samdb
.modify
, msg
, ["local_oid:1.3.6.1.4.1.7165.4.3.14:0"])
66 def test_error_replpropertymetadata_nochange(self
):
67 res
= self
.samdb
.search(expression
="cn=Administrator",
68 scope
=ldb
.SCOPE_SUBTREE
,
69 attrs
=["replPropertyMetaData"])
70 repl
= ndr_unpack(drsblobs
.replPropertyMetaDataBlob
,
71 str(res
[0]["replPropertyMetaData"]))
72 replBlob
= ndr_pack(repl
)
75 msg
["replPropertyMetaData"] = ldb
.MessageElement(replBlob
, ldb
.FLAG_MOD_REPLACE
, "replPropertyMetaData")
76 self
.assertRaises(ldb
.LdbError
, self
.samdb
.modify
, msg
, ["local_oid:1.3.6.1.4.1.7165.4.3.14:0"])
78 def test_error_replpropertymetadata_allow_sort(self
):
79 res
= self
.samdb
.search(expression
="cn=Administrator",
80 scope
=ldb
.SCOPE_SUBTREE
,
81 attrs
=["replPropertyMetaData"])
82 repl
= ndr_unpack(drsblobs
.replPropertyMetaDataBlob
,
83 str(res
[0]["replPropertyMetaData"]))
84 replBlob
= ndr_pack(repl
)
87 msg
["replPropertyMetaData"] = ldb
.MessageElement(replBlob
, ldb
.FLAG_MOD_REPLACE
, "replPropertyMetaData")
88 self
.samdb
.modify(msg
, ["local_oid:1.3.6.1.4.1.7165.4.3.14:0", "local_oid:1.3.6.1.4.1.7165.4.3.25:0"])
90 def test_twoatt_replpropertymetadata(self
):
91 res
= self
.samdb
.search(expression
="cn=Administrator",
92 scope
=ldb
.SCOPE_SUBTREE
,
93 attrs
=["replPropertyMetaData", "uSNChanged"])
94 repl
= ndr_unpack(drsblobs
.replPropertyMetaDataBlob
,
95 str(res
[0]["replPropertyMetaData"]))
98 # Search for Description
100 old_version
= o
.version
101 o
.version
= o
.version
+ 1
102 o
.local_usn
= long(str(res
[0]["uSNChanged"])) + 1
103 replBlob
= ndr_pack(repl
)
106 msg
["replPropertyMetaData"] = ldb
.MessageElement(replBlob
, ldb
.FLAG_MOD_REPLACE
, "replPropertyMetaData")
107 msg
["description"] = ldb
.MessageElement("new val", ldb
.FLAG_MOD_REPLACE
, "description")
108 self
.assertRaises(ldb
.LdbError
, self
.samdb
.modify
, msg
, ["local_oid:1.3.6.1.4.1.7165.4.3.14:0"])
110 def test_set_replpropertymetadata(self
):
111 res
= self
.samdb
.search(expression
="cn=Administrator",
112 scope
=ldb
.SCOPE_SUBTREE
,
113 attrs
=["replPropertyMetaData", "uSNChanged"])
114 repl
= ndr_unpack(drsblobs
.replPropertyMetaDataBlob
,
115 str(res
[0]["replPropertyMetaData"]))
118 # Search for Description
120 old_version
= o
.version
121 o
.version
= o
.version
+ 1
122 o
.local_usn
= long(str(res
[0]["uSNChanged"])) + 1
123 o
.originating_usn
= long(str(res
[0]["uSNChanged"])) + 1
124 replBlob
= ndr_pack(repl
)
127 msg
["replPropertyMetaData"] = ldb
.MessageElement(replBlob
, ldb
.FLAG_MOD_REPLACE
, "replPropertyMetaData")
128 self
.samdb
.modify(msg
, ["local_oid:1.3.6.1.4.1.7165.4.3.14:0"])
130 def test_ok_get_attribute_from_attid(self
):
131 self
.assertEquals(self
.samdb
.get_attribute_from_attid(13), "description")
133 def test_ko_get_attribute_from_attid(self
):
134 self
.assertEquals(self
.samdb
.get_attribute_from_attid(11979), None)
136 def test_get_attribute_replmetadata_version(self
):
137 res
= self
.samdb
.search(expression
="cn=Administrator",
138 scope
=ldb
.SCOPE_SUBTREE
,
140 self
.assertEquals(len(res
), 1)
142 self
.assertEqual(self
.samdb
.get_attribute_replmetadata_version(dn
, "unicodePwd"), 1)
144 def test_set_attribute_replmetadata_version(self
):
145 res
= self
.samdb
.search(expression
="cn=Administrator",
146 scope
=ldb
.SCOPE_SUBTREE
,
148 self
.assertEquals(len(res
), 1)
150 version
= self
.samdb
.get_attribute_replmetadata_version(dn
, "description")
151 self
.samdb
.set_attribute_replmetadata_version(dn
, "description", version
+ 2)
152 self
.assertEqual(self
.samdb
.get_attribute_replmetadata_version(dn
, "description"), version
+ 2)
154 def test_db_lock(self
):
155 basedn
= self
.samdb
.get_default_basedn()
160 # In the child, close the main DB, re-open just one DB
163 self
.samdb
= SamDB(session_info
=self
.session
,
164 credentials
=self
.creds
,
167 self
.samdb
.transaction_start()
170 "dn": "cn=test_db_lock_user,cn=users," + str(basedn
),
171 "objectclass": "user",
174 # Obtain a write lock
175 self
.samdb
.transaction_prepare_commit()
176 os
.write(w1
, b
"added")
179 # Drop the write lock
180 self
.samdb
.transaction_cancel()
183 self
.assertEqual(os
.read(r1
, 5), b
"added")
187 # We need to hold this iterator open to hold the all-record lock.
188 res
= self
.samdb
.search_iterator()
190 # This should take at least 2 seconds because the transaction
191 # has a write lock on one backend db open
198 self
.assertGreater(end
- start
, 1.9)
200 (got_pid
, status
) = os
.waitpid(pid
, 0)
201 self
.assertEqual(got_pid
, pid
)
202 self
.assertTrue(os
.WIFEXITED(status
))
203 self
.assertEqual(os
.WEXITSTATUS(status
), 0)
206 def test_full_db_lock(self
):
207 basedn
= self
.samdb
.get_default_basedn()
208 backend_filename
= "%s.ldb" % basedn
.get_casefold()
209 backend_subpath
= os
.path
.join("sam.ldb.d",
211 backend_path
= self
.lp
.private_path(backend_subpath
)
216 # In the child, close the main DB, re-open just one DB
220 backenddb
= ldb
.Ldb(backend_path
)
223 backenddb
.transaction_start()
225 backenddb
.add({"dn":"@DSDB_LOCK_TEST"})
226 backenddb
.delete("@DSDB_LOCK_TEST")
228 # Obtain a write lock
229 backenddb
.transaction_prepare_commit()
230 os
.write(w1
, b
"added")
233 # Drop the write lock
234 backenddb
.transaction_cancel()
237 self
.assertEqual(os
.read(r1
, 5), b
"added")
241 # We need to hold this iterator open to hold the all-record lock.
242 res
= self
.samdb
.search_iterator()
244 # This should take at least 2 seconds because the transaction
245 # has a write lock on one backend db open
248 self
.assertGreater(end
- start
, 1.9)
254 (got_pid
, status
) = os
.waitpid(pid
, 0)
255 self
.assertEqual(got_pid
, pid
)
256 self
.assertTrue(os
.WIFEXITED(status
))
257 self
.assertEqual(os
.WEXITSTATUS(status
), 0)