s4-dsdb/tests/python: Explicitly pass comamnd line LoadParm() instance to system_sess...
[Samba.git] / source4 / dsdb / tests / python / urgent_replication.py
blob946d7d7f34ea2986642c45ce1282ec4479b060c8
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
4 import optparse
5 import sys
6 import os
8 sys.path.append("bin/python")
9 import samba
10 samba.ensure_external_module("testtools", "testtools")
11 samba.ensure_external_module("subunit", "subunit/python")
13 import samba.getopt as options
15 from samba.auth import system_session
16 from ldb import (SCOPE_BASE, LdbError, ERR_NO_SUCH_OBJECT, Message,
17 MessageElement, Dn, FLAG_MOD_REPLACE)
18 from samba.samdb import SamDB
19 import samba.tests
20 import samba.dsdb as dsdb
22 from subunit.run import SubunitTestRunner
23 import unittest
25 parser = optparse.OptionParser("urgent_replication.py [options] <host>")
26 sambaopts = options.SambaOptions(parser)
27 parser.add_option_group(sambaopts)
28 parser.add_option_group(options.VersionOptions(parser))
29 # use command line creds if available
30 credopts = options.CredentialsOptions(parser)
31 parser.add_option_group(credopts)
32 opts, args = parser.parse_args()
34 if len(args) < 1:
35 parser.print_usage()
36 sys.exit(1)
38 host = args[0]
40 lp = sambaopts.get_loadparm()
41 creds = credopts.get_credentials(lp)
43 class UrgentReplicationTests(samba.tests.TestCase):
45 def delete_force(self, ldb, dn):
46 try:
47 ldb.delete(dn, ["relax:0"])
48 except LdbError, (num, _):
49 self.assertEquals(num, ERR_NO_SUCH_OBJECT)
51 def setUp(self):
52 super(UrgentReplicationTests, self).setUp()
53 self.ldb = ldb
54 self.base_dn = ldb.domain_dn()
56 print "baseDN: %s\n" % self.base_dn
58 def test_nonurgent_object(self):
59 """Test if the urgent replication is not activated
60 when handling a non urgent object"""
61 self.ldb.add({
62 "dn": "cn=nonurgenttest,cn=users," + self.base_dn,
63 "objectclass":"user",
64 "samaccountname":"nonurgenttest",
65 "description":"nonurgenttest description"})
67 # urgent replication should not be enabled when creating
68 res = self.ldb.load_partition_usn(self.base_dn)
69 self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"])
71 # urgent replication should not be enabled when modifying
72 m = Message()
73 m.dn = Dn(ldb, "cn=nonurgenttest,cn=users," + self.base_dn)
74 m["description"] = MessageElement("new description", FLAG_MOD_REPLACE,
75 "description")
76 ldb.modify(m)
77 res = self.ldb.load_partition_usn(self.base_dn)
78 self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"])
80 # urgent replication should not be enabled when deleting
81 self.delete_force(self.ldb, "cn=nonurgenttest,cn=users," + self.base_dn)
82 res = self.ldb.load_partition_usn(self.base_dn)
83 self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"])
86 def test_nTDSDSA_object(self):
87 '''Test if the urgent replication is activated
88 when handling a nTDSDSA object'''
89 self.ldb.add({
90 "dn": "cn=test server,cn=Servers,cn=Default-First-Site-Name,cn=Sites,cn=Configuration," + self.base_dn,
91 "objectclass":"server",
92 "cn":"test server",
93 "name":"test server",
94 "systemFlags":"50000000"}, ["relax:0"])
96 self.ldb.add_ldif(
97 """dn: cn=NTDS Settings test,cn=test server,cn=Servers,cn=Default-First-Site-Name,cn=Sites,cn=Configuration,%s""" % (self.base_dn) + """
98 objectclass: nTDSDSA
99 cn: NTDS Settings test
100 options: 1
101 instanceType: 4
102 systemFlags: 33554432""", ["relax:0"])
104 # urgent replication should be enabled when creation
105 res = self.ldb.load_partition_usn("cn=Configuration," + self.base_dn)
106 self.assertEquals(res["uSNHighest"], res["uSNUrgent"])
108 # urgent replication should NOT be enabled when modifying
109 m = Message()
110 m.dn = Dn(ldb, "cn=NTDS Settings test,cn=test server,cn=Servers,cn=Default-First-Site-Name,cn=Sites,cn=Configuration," + self.base_dn)
111 m["options"] = MessageElement("0", FLAG_MOD_REPLACE,
112 "options")
113 ldb.modify(m)
114 res = self.ldb.load_partition_usn("cn=Configuration," + self.base_dn)
115 self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"])
117 # urgent replication should be enabled when deleting
118 self.delete_force(self.ldb, "cn=NTDS Settings test,cn=test server,cn=Servers,cn=Default-First-Site-Name,cn=Sites,cn=Configuration," + self.base_dn)
119 res = self.ldb.load_partition_usn("cn=Configuration," + self.base_dn)
120 self.assertEquals(res["uSNHighest"], res["uSNUrgent"])
122 self.delete_force(self.ldb, "cn=test server,cn=Servers,cn=Default-First-Site-Name,cn=Sites,cn=Configuration," + self.base_dn)
125 def test_crossRef_object(self):
126 '''Test if the urgent replication is activated
127 when handling a crossRef object'''
128 self.ldb.add({
129 "dn": "CN=test crossRef,CN=Partitions,CN=Configuration,"+ self.base_dn,
130 "objectClass": "crossRef",
131 "cn": "test crossRef",
132 "dnsRoot": lp.get("realm").lower(),
133 "instanceType": "4",
134 "nCName": self.base_dn,
135 "showInAdvancedViewOnly": "TRUE",
136 "name": "test crossRef",
137 "systemFlags": "1"}, ["relax:0"])
139 # urgent replication should be enabled when creating
140 res = self.ldb.load_partition_usn("cn=Configuration," + self.base_dn)
141 self.assertEquals(res["uSNHighest"], res["uSNUrgent"])
143 # urgent replication should NOT be enabled when modifying
144 m = Message()
145 m.dn = Dn(ldb, "cn=test crossRef,CN=Partitions,CN=Configuration," + self.base_dn)
146 m["systemFlags"] = MessageElement("0", FLAG_MOD_REPLACE,
147 "systemFlags")
148 ldb.modify(m)
149 res = self.ldb.load_partition_usn("cn=Configuration," + self.base_dn)
150 self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"])
153 # urgent replication should be enabled when deleting
154 self.delete_force(self.ldb, "cn=test crossRef,CN=Partitions,CN=Configuration," + self.base_dn)
155 res = self.ldb.load_partition_usn("cn=Configuration," + self.base_dn)
156 self.assertEquals(res["uSNHighest"], res["uSNUrgent"])
160 def test_attributeSchema_object(self):
161 '''Test if the urgent replication is activated
162 when handling an attributeSchema object'''
164 try:
165 self.ldb.add_ldif(
166 """dn: CN=test attributeSchema,cn=Schema,CN=Configuration,%s""" % self.base_dn + """
167 objectClass: attributeSchema
168 cn: test attributeSchema
169 instanceType: 4
170 isSingleValued: FALSE
171 showInAdvancedViewOnly: FALSE
172 attributeID: 0.9.2342.19200300.100.1.1
173 attributeSyntax: 2.5.5.12
174 adminDisplayName: test attributeSchema
175 adminDescription: test attributeSchema
176 oMSyntax: 64
177 systemOnly: FALSE
178 searchFlags: 8
179 lDAPDisplayName: test attributeSchema
180 name: test attributeSchema""")
182 # urgent replication should be enabled when creating
183 res = self.ldb.load_partition_usn("cn=Schema,cn=Configuration," + self.base_dn)
184 self.assertEquals(res["uSNHighest"], res["uSNUrgent"])
186 except LdbError:
187 print "Not testing urgent replication when creating attributeSchema object ...\n"
189 # urgent replication should be enabled when modifying
190 m = Message()
191 m.dn = Dn(ldb, "CN=test attributeSchema,CN=Schema,CN=Configuration," + self.base_dn)
192 m["lDAPDisplayName"] = MessageElement("updated test attributeSchema", FLAG_MOD_REPLACE,
193 "lDAPDisplayName")
194 ldb.modify(m)
195 res = self.ldb.load_partition_usn("cn=Schema,cn=Configuration," + self.base_dn)
196 self.assertEquals(res["uSNHighest"], res["uSNUrgent"])
199 def test_classSchema_object(self):
200 '''Test if the urgent replication is activated
201 when handling a classSchema object'''
202 try:
203 self.ldb.add_ldif(
204 """dn: CN=test classSchema,CN=Schema,CN=Configuration,%s""" % self.base_dn + """
205 objectClass: classSchema
206 cn: test classSchema
207 instanceType: 4
208 subClassOf: top
209 governsID: 1.2.840.113556.1.5.999
210 rDNAttID: cn
211 showInAdvancedViewOnly: TRUE
212 adminDisplayName: test classSchema
213 adminDescription: test classSchema
214 objectClassCategory: 1
215 lDAPDisplayName: test classSchema
216 name: test classSchema
217 systemOnly: FALSE
218 systemPossSuperiors: dfsConfiguration
219 systemMustContain: msDFS-SchemaMajorVersion
220 defaultSecurityDescriptor: D:(A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;DA)(A;;RPWPCRCCD
221 CLCLORCWOWDSDDTSW;;;SY)(A;;RPLCLORC;;;AU)(A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;CO)
222 systemFlags: 16
223 defaultHidingValue: TRUE""")
225 # urgent replication should be enabled when creating
226 res = self.ldb.load_partition_usn("cn=Schema,cn=Configuration," + self.base_dn)
227 self.assertEquals(res["uSNHighest"], res["uSNUrgent"])
229 except LdbError:
230 print "Not testing urgent replication when creating classSchema object ...\n"
232 # urgent replication should be enabled when modifying
233 m = Message()
234 m.dn = Dn(ldb, "CN=test classSchema,CN=Schema,CN=Configuration," + self.base_dn)
235 m["lDAPDisplayName"] = MessageElement("updated test classSchema", FLAG_MOD_REPLACE,
236 "lDAPDisplayName")
237 ldb.modify(m)
238 res = self.ldb.load_partition_usn("cn=Schema,cn=Configuration," + self.base_dn)
239 self.assertEquals(res["uSNHighest"], res["uSNUrgent"])
242 def test_secret_object(self):
243 '''Test if the urgent replication is activated
244 when handling a secret object'''
246 self.ldb.add({
247 "dn": "cn=test secret,cn=System," + self.base_dn,
248 "objectClass":"secret",
249 "cn":"test secret",
250 "name":"test secret",
251 "currentValue":"xxxxxxx"}, ["relax:0"])
253 # urgent replication should be enabled when creating
254 res = self.ldb.load_partition_usn(self.base_dn)
255 self.assertEquals(res["uSNHighest"], res["uSNUrgent"])
257 # urgent replication should be enabled when modifying
258 m = Message()
259 m.dn = Dn(ldb, "cn=test secret,cn=System," + self.base_dn)
260 m["currentValue"] = MessageElement("yyyyyyyy", FLAG_MOD_REPLACE,
261 "currentValue")
262 ldb.modify(m)
263 res = self.ldb.load_partition_usn(self.base_dn)
264 self.assertEquals(res["uSNHighest"], res["uSNUrgent"])
266 # urgent replication should NOT be enabled when deleting
267 self.delete_force(self.ldb, "cn=test secret,cn=System," + self.base_dn)
268 res = self.ldb.load_partition_usn(self.base_dn)
269 self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"])
272 def test_rIDManager_object(self):
273 '''Test if the urgent replication is activated
274 when handling a rIDManager object'''
275 self.ldb.add_ldif(
276 """dn: CN=RID Manager test,CN=System,%s""" % self.base_dn + """
277 objectClass: rIDManager
278 cn: RID Manager test
279 instanceType: 4
280 showInAdvancedViewOnly: TRUE
281 name: RID Manager test
282 systemFlags: -1946157056
283 isCriticalSystemObject: TRUE
284 rIDAvailablePool: 133001-1073741823""", ["relax:0"])
286 # urgent replication should be enabled when creating
287 res = self.ldb.load_partition_usn(self.base_dn)
288 self.assertEquals(res["uSNHighest"], res["uSNUrgent"])
290 # urgent replication should be enabled when modifying
291 m = Message()
292 m.dn = Dn(ldb, "CN=RID Manager test,CN=System," + self.base_dn)
293 m["systemFlags"] = MessageElement("0", FLAG_MOD_REPLACE,
294 "systemFlags")
295 ldb.modify(m)
296 res = self.ldb.load_partition_usn(self.base_dn)
297 self.assertEquals(res["uSNHighest"], res["uSNUrgent"])
299 # urgent replication should NOT be enabled when deleting
300 self.delete_force(self.ldb, "CN=RID Manager test,CN=System," + self.base_dn)
301 res = self.ldb.load_partition_usn(self.base_dn)
302 self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"])
305 def test_urgent_attributes(self):
306 '''Test if the urgent replication is activated
307 when handling urgent attributes of an object'''
309 self.ldb.add({
310 "dn": "cn=user UrgAttr test,cn=users," + self.base_dn,
311 "objectclass":"user",
312 "samaccountname":"user UrgAttr test",
313 "userAccountControl":str(dsdb.UF_NORMAL_ACCOUNT),
314 "lockoutTime":"0",
315 "pwdLastSet":"0",
316 "description":"urgent attributes test description"})
318 # urgent replication should NOT be enabled when creating
319 res = self.ldb.load_partition_usn(self.base_dn)
320 self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"])
322 # urgent replication should be enabled when modifying userAccountControl
323 m = Message()
324 m.dn = Dn(ldb, "cn=user UrgAttr test,cn=users," + self.base_dn)
325 m["userAccountControl"] = MessageElement(str(dsdb.UF_NORMAL_ACCOUNT+dsdb.UF_SMARTCARD_REQUIRED), FLAG_MOD_REPLACE,
326 "userAccountControl")
327 ldb.modify(m)
328 res = self.ldb.load_partition_usn(self.base_dn)
329 self.assertEquals(res["uSNHighest"], res["uSNUrgent"])
331 # urgent replication should be enabled when modifying lockoutTime
332 m = Message()
333 m.dn = Dn(ldb, "cn=user UrgAttr test,cn=users," + self.base_dn)
334 m["lockoutTime"] = MessageElement("1", FLAG_MOD_REPLACE,
335 "lockoutTime")
336 ldb.modify(m)
337 res = self.ldb.load_partition_usn(self.base_dn)
338 self.assertEquals(res["uSNHighest"], res["uSNUrgent"])
340 # urgent replication should be enabled when modifying pwdLastSet
341 m = Message()
342 m.dn = Dn(ldb, "cn=user UrgAttr test,cn=users," + self.base_dn)
343 m["pwdLastSet"] = MessageElement("1", FLAG_MOD_REPLACE,
344 "pwdLastSet")
345 ldb.modify(m)
346 res = self.ldb.load_partition_usn(self.base_dn)
347 self.assertEquals(res["uSNHighest"], res["uSNUrgent"])
349 # urgent replication should NOT be enabled when modifying a not-urgent
350 # attribute
351 m = Message()
352 m.dn = Dn(ldb, "cn=user UrgAttr test,cn=users," + self.base_dn)
353 m["description"] = MessageElement("updated urgent attributes test description",
354 FLAG_MOD_REPLACE, "description")
355 ldb.modify(m)
356 res = self.ldb.load_partition_usn(self.base_dn)
357 self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"])
359 # urgent replication should NOT be enabled when deleting
360 self.delete_force(self.ldb, "cn=user UrgAttr test,cn=users," + self.base_dn)
361 res = self.ldb.load_partition_usn(self.base_dn)
362 self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"])
365 if not "://" in host:
366 if os.path.isfile(host):
367 host = "tdb://%s" % host
368 else:
369 host = "ldap://%s" % host
372 ldb = SamDB(host, credentials=creds, session_info=system_session(lp), lp=lp,
373 global_schema=False)
375 runner = SubunitTestRunner()
376 rc = 0
377 if not runner.run(unittest.makeSuite(UrgentReplicationTests)).wasSuccessful():
378 rc = 1
379 sys.exit(rc)