s4-ldb: fixed permissions on urgent_replication.py
[Samba/gebeck_regimport.git] / source4 / lib / ldb / tests / python / urgent_replication.py
blob752a95e1b42805eaff85133aa6c654320068ce81
1 #!/usr/bin/python
2 # -*- coding: utf-8 -*-
3 # This is a port of the original in testprogs/ejs/ldap.js
5 import getopt
6 import optparse
7 import sys
8 import time
9 import random
10 import base64
11 import os
13 sys.path.append("bin/python")
14 sys.path.append("../lib/subunit/python")
16 import samba.getopt as options
18 from samba.auth import system_session
19 from ldb import SCOPE_BASE, LdbError
20 from ldb import ERR_NO_SUCH_OBJECT
21 from ldb import Message, MessageElement, Dn
22 from ldb import FLAG_MOD_REPLACE
23 from samba import Ldb
24 from samba import glue
26 from subunit.run import SubunitTestRunner
27 import unittest
29 from samba.ndr import ndr_pack, ndr_unpack
30 from samba.dcerpc import security
32 parser = optparse.OptionParser("urgent_replication [options] <host>")
33 sambaopts = options.SambaOptions(parser)
34 parser.add_option_group(sambaopts)
35 parser.add_option_group(options.VersionOptions(parser))
36 # use command line creds if available
37 credopts = options.CredentialsOptions(parser)
38 parser.add_option_group(credopts)
39 opts, args = parser.parse_args()
41 if len(args) < 1:
42 parser.print_usage()
43 sys.exit(1)
45 host = args[0]
47 lp = sambaopts.get_loadparm()
48 creds = credopts.get_credentials(lp)
50 class UrgentReplicationTests(unittest.TestCase):
52 def delete_force(self, ldb, dn):
53 try:
54 ldb.delete(dn)
55 except LdbError, (num, _):
56 self.assertEquals(num, ERR_NO_SUCH_OBJECT)
58 def find_basedn(self, ldb):
59 res = ldb.search(base="", expression="", scope=SCOPE_BASE,
60 attrs=["defaultNamingContext"])
61 self.assertEquals(len(res), 1)
62 return res[0]["defaultNamingContext"][0]
64 def setUp(self):
65 self.ldb = ldb
66 self.base_dn = self.find_basedn(ldb)
68 print "baseDN: %s\n" % self.base_dn
70 def test_nonurgent_object(self):
71 '''Test if the urgent replication is not activated
72 when handling a non urgent object'''
73 self.ldb.add({
74 "dn": "cn=nonurgenttest,cn=users," + self.base_dn,
75 "objectclass":"user",
76 "samaccountname":"nonurgenttest",
77 "description":"nonurgenttest description"});
79 ''' urgent replication should not be enabled when creating '''
80 res = glue.dsdb_load_partition_usn(self.ldb, self.base_dn)
81 self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"]);
83 ''' urgent replication should not be enabled when modifying '''
84 m = Message()
85 m.dn = Dn(ldb, "cn=nonurgenttest,cn=users," + self.base_dn)
86 m["description"] = MessageElement("new description", FLAG_MOD_REPLACE,
87 "description")
88 ldb.modify(m)
89 res = glue.dsdb_load_partition_usn(self.ldb, self.base_dn)
90 self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"]);
92 ''' urgent replication should not be enabled when deleting '''
93 self.delete_force(self.ldb, "cn=nonurgenttest,cn=users," + self.base_dn)
94 res = glue.dsdb_load_partition_usn(self.ldb, self.base_dn)
95 self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"]);
98 def test_nTDSDSA_object(self):
99 '''Test if the urgent replication is activated
100 when handling a nTDSDSA object'''
101 self.ldb.add({
102 "dn": "cn=test server,cn=Servers,cn=Default-First-Site-Name,cn=Sites,cn=Configuration," + self.base_dn,
103 "objectclass":"server",
104 "cn":"test server",
105 "name":"test server",
106 "systemFlags":"50000000"});
108 self.ldb.add_ldif(
109 """dn: cn=NTDS Settings test,cn=test server,cn=Servers,cn=Default-First-Site-Name,cn=Sites,cn=Configuration,%s""" % (self.base_dn) + """
110 objectclass: nTDSDSA
111 cn: NTDS Settings test
112 options: 1
113 instanceType: 4
114 systemFlags: 33554432""", ["relax:0"]);
116 ''' urgent replication should be enabled when creation '''
117 res = glue.dsdb_load_partition_usn(self.ldb, "cn=Configuration," + self.base_dn)
118 self.assertEquals(res["uSNHighest"], res["uSNUrgent"]);
120 ''' urgent replication should NOT be enabled when modifying '''
121 m = Message()
122 m.dn = Dn(ldb, "cn=NTDS Settings test,cn=test server,cn=Servers,cn=Default-First-Site-Name,cn=Sites,cn=Configuration," + self.base_dn)
123 m["options"] = MessageElement("0", FLAG_MOD_REPLACE,
124 "options")
125 ldb.modify(m)
126 res = glue.dsdb_load_partition_usn(self.ldb, "cn=Configuration," + self.base_dn)
127 self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"]);
129 ''' urgent replication should be enabled when deleting '''
130 self.delete_force(self.ldb, "cn=NTDS Settings test,cn=test server,cn=Servers,cn=Default-First-Site-Name,cn=Sites,cn=Configuration," + self.base_dn)
131 res = glue.dsdb_load_partition_usn(self.ldb, "cn=Configuration," + self.base_dn)
132 self.assertEquals(res["uSNHighest"], res["uSNUrgent"]);
134 self.delete_force(self.ldb, "cn=test server,cn=Servers,cn=Default-First-Site-Name,cn=Sites,cn=Configuration," + self.base_dn)
137 def test_crossRef_object(self):
138 '''Test if the urgent replication is activated
139 when handling a crossRef object'''
140 self.ldb.add({
141 "dn": "CN=test crossRef,CN=Partitions,CN=Configuration,"+ self.base_dn,
142 "objectClass": "crossRef",
143 "cn": "test crossRef",
144 "instanceType": "4",
145 "nCName": self.base_dn,
146 "showInAdvancedViewOnly": "TRUE",
147 "name": "test crossRef",
148 "systemFlags": "1"});
150 ''' urgent replication should be enabled when creating '''
151 res = glue.dsdb_load_partition_usn(self.ldb, "cn=Configuration," + self.base_dn)
152 self.assertEquals(res["uSNHighest"], res["uSNUrgent"]);
154 ''' urgent replication should NOT be enabled when modifying '''
155 m = Message()
156 m.dn = Dn(ldb, "cn=test crossRef,CN=Partitions,CN=Configuration," + self.base_dn)
157 m["systemFlags"] = MessageElement("0", FLAG_MOD_REPLACE,
158 "systemFlags")
159 ldb.modify(m)
160 res = glue.dsdb_load_partition_usn(self.ldb, "cn=Configuration," + self.base_dn)
161 self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"]);
164 ''' urgent replication should be enabled when deleting '''
165 self.delete_force(self.ldb, "cn=test crossRef,CN=Partitions,CN=Configuration," + self.base_dn)
166 res = glue.dsdb_load_partition_usn(self.ldb, "cn=Configuration," + self.base_dn)
167 self.assertEquals(res["uSNHighest"], res["uSNUrgent"]);
171 def test_attributeSchema_object(self):
172 '''Test if the urgent replication is activated
173 when handling an attributeSchema object'''
175 try:
176 self.ldb.add_ldif(
177 """dn: CN=test attributeSchema,cn=Schema,CN=Configuration,%s""" % self.base_dn + """
178 objectClass: attributeSchema
179 cn: test attributeSchema
180 instanceType: 4
181 isSingleValued: FALSE
182 showInAdvancedViewOnly: FALSE
183 attributeID: 0.9.2342.19200300.100.1.1
184 attributeSyntax: 2.5.5.12
185 adminDisplayName: test attributeSchema
186 adminDescription: test attributeSchema
187 oMSyntax: 64
188 systemOnly: FALSE
189 searchFlags: 8
190 lDAPDisplayName: test attributeSchema
191 name: test attributeSchema
192 systemFlags: 0""", ["relax:0"]);
194 ''' urgent replication should be enabled when creating '''
195 res = glue.dsdb_load_partition_usn(self.ldb, "cn=Schema,cn=Configuration," + self.base_dn)
196 self.assertEquals(res["uSNHighest"], res["uSNUrgent"]);
198 except LdbError:
199 print "Not testing urgent replication when creating attributeSchema object ...\n"
201 ''' urgent replication should be enabled when modifying '''
202 m = Message()
203 m.dn = Dn(ldb, "CN=test attributeSchema,CN=Schema,CN=Configuration," + self.base_dn)
204 m["lDAPDisplayName"] = MessageElement("updated test attributeSchema", FLAG_MOD_REPLACE,
205 "lDAPDisplayName")
206 ldb.modify(m)
207 res = glue.dsdb_load_partition_usn(self.ldb, "cn=Schema,cn=Configuration," + self.base_dn)
208 self.assertEquals(res["uSNHighest"], res["uSNUrgent"]);
211 def test_classSchema_object(self):
212 '''Test if the urgent replication is activated
213 when handling a classSchema object'''
214 try:
215 self.ldb.add_ldif(
216 """dn: CN=test classSchema,CN=Schema,CN=Configuration,%s""" % self.base_dn + """
217 objectClass: classSchema
218 cn: test classSchema
219 instanceType: 4
220 subClassOf: top
221 governsID: 1.2.840.113556.1.5.999
222 rDNAttID: cn
223 showInAdvancedViewOnly: TRUE
224 adminDisplayName: test classSchema
225 adminDescription: test classSchema
226 objectClassCategory: 1
227 lDAPDisplayName: test classSchema
228 name: test classSchema
229 systemOnly: FALSE
230 systemPossSuperiors: dfsConfiguration
231 systemMustContain: msDFS-SchemaMajorVersion
232 defaultSecurityDescriptor: D:(A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;DA)(A;;RPWPCRCCD
233 CLCLORCWOWDSDDTSW;;;SY)(A;;RPLCLORC;;;AU)(A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;CO)
234 systemFlags: 16
235 defaultHidingValue: TRUE""", ["relax:0"]);
237 ''' urgent replication should be enabled when creating '''
238 res = glue.dsdb_load_partition_usn(self.ldb, "cn=Schema,cn=Configuration," + self.base_dn)
239 self.assertEquals(res["uSNHighest"], res["uSNUrgent"]);
241 except LdbError:
242 print "Not testing urgent replication when creating classSchema object ...\n"
244 ''' urgent replication should be enabled when modifying '''
245 m = Message()
246 m.dn = Dn(ldb, "CN=test classSchema,CN=Schema,CN=Configuration," + self.base_dn)
247 m["lDAPDisplayName"] = MessageElement("updated test classSchema", FLAG_MOD_REPLACE,
248 "lDAPDisplayName")
249 ldb.modify(m)
250 res = glue.dsdb_load_partition_usn(self.ldb, "cn=Schema,cn=Configuration," + self.base_dn)
251 self.assertEquals(res["uSNHighest"], res["uSNUrgent"]);
254 def test_secret_object(self):
256 '''Test if the urgent replication is activated
257 when handling a secret object'''
259 self.ldb.add({
260 "dn": "cn=test secret,cn=System," + self.base_dn,
261 "objectClass":"secret",
262 "cn":"test secret",
263 "name":"test secret",
264 "currentValue":"xxxxxxx"});
267 ''' urgent replication should be enabled when creationg '''
268 res = glue.dsdb_load_partition_usn(self.ldb, self.base_dn)
269 self.assertEquals(res["uSNHighest"], res["uSNUrgent"]);
271 ''' urgent replication should be enabled when modifying '''
272 m = Message()
273 m.dn = Dn(ldb, "cn=test secret,cn=System," + self.base_dn)
274 m["currentValue"] = MessageElement("yyyyyyyy", FLAG_MOD_REPLACE,
275 "currentValue")
276 ldb.modify(m)
277 res = glue.dsdb_load_partition_usn(self.ldb, self.base_dn)
278 self.assertEquals(res["uSNHighest"], res["uSNUrgent"]);
280 ''' urgent replication should NOT be enabled when deleting '''
281 self.delete_force(self.ldb, "cn=test secret,cn=System," + self.base_dn)
282 res = glue.dsdb_load_partition_usn(self.ldb, self.base_dn)
283 self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"]);
286 def test_rIDManager_object(self):
287 '''Test if the urgent replication is activated
288 when handling a rIDManager object'''
289 self.ldb.add_ldif(
290 """dn: CN=RID Manager test,CN=System,%s""" % self.base_dn + """
291 objectClass: rIDManager
292 cn: RID Manager test
293 instanceType: 4
294 showInAdvancedViewOnly: TRUE
295 name: RID Manager test
296 systemFlags: -1946157056
297 isCriticalSystemObject: TRUE
298 rIDAvailablePool: 133001-1073741823""", ["relax:0"])
300 ''' urgent replication should be enabled when creating '''
301 res = glue.dsdb_load_partition_usn(self.ldb, self.base_dn)
302 self.assertEquals(res["uSNHighest"], res["uSNUrgent"]);
304 ''' urgent replication should be enabled when modifying '''
305 m = Message()
306 m.dn = Dn(ldb, "CN=RID Manager test,CN=System," + self.base_dn)
307 m["systemFlags"] = MessageElement("0", FLAG_MOD_REPLACE,
308 "systemFlags")
309 ldb.modify(m)
310 res = glue.dsdb_load_partition_usn(self.ldb, self.base_dn)
311 self.assertEquals(res["uSNHighest"], res["uSNUrgent"]);
313 ''' urgent replication should NOT be enabled when deleting '''
314 self.delete_force(self.ldb, "CN=RID Manager test,CN=System," + self.base_dn)
315 res = glue.dsdb_load_partition_usn(self.ldb, self.base_dn)
316 self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"]);
319 def test_urgent_attributes(self):
320 '''Test if the urgent replication is activated
321 when handling urgent attributes of an object'''
323 self.ldb.add({
324 "dn": "cn=user UrgAttr test,cn=users," + self.base_dn,
325 "objectclass":"user",
326 "samaccountname":"user UrgAttr test",
327 "userAccountControl":"1",
328 "lockoutTime":"0",
329 "pwdLastSet":"0",
330 "description":"urgent attributes test description"});
332 ''' urgent replication should NOT be enabled when creating '''
333 res = glue.dsdb_load_partition_usn(self.ldb, self.base_dn)
334 self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"]);
336 ''' urgent replication should be enabled when modifying userAccountControl '''
337 m = Message()
338 m.dn = Dn(ldb, "cn=user UrgAttr test,cn=users," + self.base_dn)
339 m["userAccountControl"] = MessageElement("0", FLAG_MOD_REPLACE,
340 "userAccountControl")
341 ldb.modify(m)
342 res = glue.dsdb_load_partition_usn(self.ldb, self.base_dn)
343 self.assertEquals(res["uSNHighest"], res["uSNUrgent"]);
345 ''' urgent replication should be enabled when modifying lockoutTime '''
346 m = Message()
347 m.dn = Dn(ldb, "cn=user UrgAttr test,cn=users," + self.base_dn)
348 m["lockoutTime"] = MessageElement("1", FLAG_MOD_REPLACE,
349 "lockoutTime")
350 ldb.modify(m)
351 res = glue.dsdb_load_partition_usn(self.ldb, self.base_dn)
352 self.assertEquals(res["uSNHighest"], res["uSNUrgent"]);
354 ''' urgent replication should be enabled when modifying pwdLastSet '''
355 m = Message()
356 m.dn = Dn(ldb, "cn=user UrgAttr test,cn=users," + self.base_dn)
357 m["pwdLastSet"] = MessageElement("1", FLAG_MOD_REPLACE,
358 "pwdLastSet")
359 ldb.modify(m)
360 res = glue.dsdb_load_partition_usn(self.ldb, self.base_dn)
361 self.assertEquals(res["uSNHighest"], res["uSNUrgent"]);
363 ''' urgent replication should NOT be enabled when modifying a not-urgent attribute '''
364 m = Message()
365 m.dn = Dn(ldb, "cn=user UrgAttr test,cn=users," + self.base_dn)
366 m["description"] = MessageElement("updated urgent attributes test description",
367 FLAG_MOD_REPLACE, "description")
368 ldb.modify(m)
369 res = glue.dsdb_load_partition_usn(self.ldb, self.base_dn)
370 self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"]);
372 ''' urgent replication should NOT be enabled when deleting '''
373 self.delete_force(self.ldb, "cn=user UrgAttr test,cn=users," + self.base_dn)
374 res = glue.dsdb_load_partition_usn(self.ldb, self.base_dn)
375 self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"]);
378 if not "://" in host:
379 if os.path.isfile(host):
380 host = "tdb://%s" % host
381 else:
382 host = "ldap://%s" % host
385 ldb = Ldb(host, credentials=creds, session_info=system_session(), lp=lp)
387 runner = SubunitTestRunner()
388 rc = 0
389 if not runner.run(unittest.makeSuite(UrgentReplicationTests)).wasSuccessful():
390 rc = 1
391 sys.exit(rc)