2 # -*- coding: utf-8 -*-
8 sys
.path
.append("bin/python")
10 samba
.ensure_external_module("testtools", "testtools")
11 samba
.ensure_external_module("subunit", "subunit/python")
13 import samba
.getopt
as options
15 from samba
.auth
import system_session
16 from ldb
import (SCOPE_BASE
, LdbError
, ERR_NO_SUCH_OBJECT
, Message
,
17 MessageElement
, Dn
, FLAG_MOD_REPLACE
)
18 from samba
.samdb
import SamDB
20 import samba
.dsdb
as dsdb
22 from subunit
.run
import SubunitTestRunner
25 parser
= optparse
.OptionParser("urgent_replication.py [options] <host>")
26 sambaopts
= options
.SambaOptions(parser
)
27 parser
.add_option_group(sambaopts
)
28 parser
.add_option_group(options
.VersionOptions(parser
))
29 # use command line creds if available
30 credopts
= options
.CredentialsOptions(parser
)
31 parser
.add_option_group(credopts
)
32 opts
, args
= parser
.parse_args()
40 lp
= sambaopts
.get_loadparm()
41 creds
= credopts
.get_credentials(lp
)
43 class UrgentReplicationTests(samba
.tests
.TestCase
):
45 def delete_force(self
, ldb
, dn
):
47 ldb
.delete(dn
, ["relax:0"])
48 except LdbError
, (num
, _
):
49 self
.assertEquals(num
, ERR_NO_SUCH_OBJECT
)
52 super(UrgentReplicationTests
, self
).setUp()
54 self
.base_dn
= ldb
.domain_dn()
56 print "baseDN: %s\n" % self
.base_dn
58 def test_nonurgent_object(self
):
59 """Test if the urgent replication is not activated
60 when handling a non urgent object"""
62 "dn": "cn=nonurgenttest,cn=users," + self
.base_dn
,
64 "samaccountname":"nonurgenttest",
65 "description":"nonurgenttest description"})
67 # urgent replication should not be enabled when creating
68 res
= self
.ldb
.load_partition_usn(self
.base_dn
)
69 self
.assertNotEquals(res
["uSNHighest"], res
["uSNUrgent"])
71 # urgent replication should not be enabled when modifying
73 m
.dn
= Dn(ldb
, "cn=nonurgenttest,cn=users," + self
.base_dn
)
74 m
["description"] = MessageElement("new description", FLAG_MOD_REPLACE
,
77 res
= self
.ldb
.load_partition_usn(self
.base_dn
)
78 self
.assertNotEquals(res
["uSNHighest"], res
["uSNUrgent"])
80 # urgent replication should not be enabled when deleting
81 self
.delete_force(self
.ldb
, "cn=nonurgenttest,cn=users," + self
.base_dn
)
82 res
= self
.ldb
.load_partition_usn(self
.base_dn
)
83 self
.assertNotEquals(res
["uSNHighest"], res
["uSNUrgent"])
86 def test_nTDSDSA_object(self
):
87 '''Test if the urgent replication is activated
88 when handling a nTDSDSA object'''
90 "dn": "cn=test server,cn=Servers,cn=Default-First-Site-Name,cn=Sites,cn=Configuration," + self
.base_dn
,
91 "objectclass":"server",
94 "systemFlags":"50000000"}, ["relax:0"])
97 """dn: cn=NTDS Settings test,cn=test server,cn=Servers,cn=Default-First-Site-Name,cn=Sites,cn=Configuration,%s""" % (self
.base_dn
) + """
99 cn: NTDS Settings test
102 systemFlags: 33554432""", ["relax:0"])
104 # urgent replication should be enabled when creation
105 res
= self
.ldb
.load_partition_usn("cn=Configuration," + self
.base_dn
)
106 self
.assertEquals(res
["uSNHighest"], res
["uSNUrgent"])
108 # urgent replication should NOT be enabled when modifying
110 m
.dn
= Dn(ldb
, "cn=NTDS Settings test,cn=test server,cn=Servers,cn=Default-First-Site-Name,cn=Sites,cn=Configuration," + self
.base_dn
)
111 m
["options"] = MessageElement("0", FLAG_MOD_REPLACE
,
114 res
= self
.ldb
.load_partition_usn("cn=Configuration," + self
.base_dn
)
115 self
.assertNotEquals(res
["uSNHighest"], res
["uSNUrgent"])
117 # urgent replication should be enabled when deleting
118 self
.delete_force(self
.ldb
, "cn=NTDS Settings test,cn=test server,cn=Servers,cn=Default-First-Site-Name,cn=Sites,cn=Configuration," + self
.base_dn
)
119 res
= self
.ldb
.load_partition_usn("cn=Configuration," + self
.base_dn
)
120 self
.assertEquals(res
["uSNHighest"], res
["uSNUrgent"])
122 self
.delete_force(self
.ldb
, "cn=test server,cn=Servers,cn=Default-First-Site-Name,cn=Sites,cn=Configuration," + self
.base_dn
)
125 def test_crossRef_object(self
):
126 '''Test if the urgent replication is activated
127 when handling a crossRef object'''
129 "dn": "CN=test crossRef,CN=Partitions,CN=Configuration,"+ self
.base_dn
,
130 "objectClass": "crossRef",
131 "cn": "test crossRef",
132 "dnsRoot": lp
.get("realm").lower(),
134 "nCName": self
.base_dn
,
135 "showInAdvancedViewOnly": "TRUE",
136 "name": "test crossRef",
137 "systemFlags": "1"}, ["relax:0"])
139 # urgent replication should be enabled when creating
140 res
= self
.ldb
.load_partition_usn("cn=Configuration," + self
.base_dn
)
141 self
.assertEquals(res
["uSNHighest"], res
["uSNUrgent"])
143 # urgent replication should NOT be enabled when modifying
145 m
.dn
= Dn(ldb
, "cn=test crossRef,CN=Partitions,CN=Configuration," + self
.base_dn
)
146 m
["systemFlags"] = MessageElement("0", FLAG_MOD_REPLACE
,
149 res
= self
.ldb
.load_partition_usn("cn=Configuration," + self
.base_dn
)
150 self
.assertNotEquals(res
["uSNHighest"], res
["uSNUrgent"])
153 # urgent replication should be enabled when deleting
154 self
.delete_force(self
.ldb
, "cn=test crossRef,CN=Partitions,CN=Configuration," + self
.base_dn
)
155 res
= self
.ldb
.load_partition_usn("cn=Configuration," + self
.base_dn
)
156 self
.assertEquals(res
["uSNHighest"], res
["uSNUrgent"])
160 def test_attributeSchema_object(self
):
161 '''Test if the urgent replication is activated
162 when handling an attributeSchema object'''
166 """dn: CN=test attributeSchema,cn=Schema,CN=Configuration,%s""" % self
.base_dn
+ """
167 objectClass: attributeSchema
168 cn: test attributeSchema
170 isSingleValued: FALSE
171 showInAdvancedViewOnly: FALSE
172 attributeID: 0.9.2342.19200300.100.1.1
173 attributeSyntax: 2.5.5.12
174 adminDisplayName: test attributeSchema
175 adminDescription: test attributeSchema
179 lDAPDisplayName: test attributeSchema
180 name: test attributeSchema""")
182 # urgent replication should be enabled when creating
183 res
= self
.ldb
.load_partition_usn("cn=Schema,cn=Configuration," + self
.base_dn
)
184 self
.assertEquals(res
["uSNHighest"], res
["uSNUrgent"])
187 print "Not testing urgent replication when creating attributeSchema object ...\n"
189 # urgent replication should be enabled when modifying
191 m
.dn
= Dn(ldb
, "CN=test attributeSchema,CN=Schema,CN=Configuration," + self
.base_dn
)
192 m
["lDAPDisplayName"] = MessageElement("updated test attributeSchema", FLAG_MOD_REPLACE
,
195 res
= self
.ldb
.load_partition_usn("cn=Schema,cn=Configuration," + self
.base_dn
)
196 self
.assertEquals(res
["uSNHighest"], res
["uSNUrgent"])
199 def test_classSchema_object(self
):
200 '''Test if the urgent replication is activated
201 when handling a classSchema object'''
204 """dn: CN=test classSchema,CN=Schema,CN=Configuration,%s""" % self
.base_dn
+ """
205 objectClass: classSchema
209 governsID: 1.2.840.113556.1.5.999
211 showInAdvancedViewOnly: TRUE
212 adminDisplayName: test classSchema
213 adminDescription: test classSchema
214 objectClassCategory: 1
215 lDAPDisplayName: test classSchema
216 name: test classSchema
218 systemPossSuperiors: dfsConfiguration
219 systemMustContain: msDFS-SchemaMajorVersion
220 defaultSecurityDescriptor: D:(A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;DA)(A;;RPWPCRCCD
221 CLCLORCWOWDSDDTSW;;;SY)(A;;RPLCLORC;;;AU)(A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;CO)
223 defaultHidingValue: TRUE""")
225 # urgent replication should be enabled when creating
226 res
= self
.ldb
.load_partition_usn("cn=Schema,cn=Configuration," + self
.base_dn
)
227 self
.assertEquals(res
["uSNHighest"], res
["uSNUrgent"])
230 print "Not testing urgent replication when creating classSchema object ...\n"
232 # urgent replication should be enabled when modifying
234 m
.dn
= Dn(ldb
, "CN=test classSchema,CN=Schema,CN=Configuration," + self
.base_dn
)
235 m
["lDAPDisplayName"] = MessageElement("updated test classSchema", FLAG_MOD_REPLACE
,
238 res
= self
.ldb
.load_partition_usn("cn=Schema,cn=Configuration," + self
.base_dn
)
239 self
.assertEquals(res
["uSNHighest"], res
["uSNUrgent"])
242 def test_secret_object(self
):
243 '''Test if the urgent replication is activated
244 when handling a secret object'''
247 "dn": "cn=test secret,cn=System," + self
.base_dn
,
248 "objectClass":"secret",
250 "name":"test secret",
251 "currentValue":"xxxxxxx"}, ["relax:0"])
253 # urgent replication should be enabled when creating
254 res
= self
.ldb
.load_partition_usn(self
.base_dn
)
255 self
.assertEquals(res
["uSNHighest"], res
["uSNUrgent"])
257 # urgent replication should be enabled when modifying
259 m
.dn
= Dn(ldb
, "cn=test secret,cn=System," + self
.base_dn
)
260 m
["currentValue"] = MessageElement("yyyyyyyy", FLAG_MOD_REPLACE
,
263 res
= self
.ldb
.load_partition_usn(self
.base_dn
)
264 self
.assertEquals(res
["uSNHighest"], res
["uSNUrgent"])
266 # urgent replication should NOT be enabled when deleting
267 self
.delete_force(self
.ldb
, "cn=test secret,cn=System," + self
.base_dn
)
268 res
= self
.ldb
.load_partition_usn(self
.base_dn
)
269 self
.assertNotEquals(res
["uSNHighest"], res
["uSNUrgent"])
272 def test_rIDManager_object(self
):
273 '''Test if the urgent replication is activated
274 when handling a rIDManager object'''
276 """dn: CN=RID Manager test,CN=System,%s""" % self
.base_dn
+ """
277 objectClass: rIDManager
280 showInAdvancedViewOnly: TRUE
281 name: RID Manager test
282 systemFlags: -1946157056
283 isCriticalSystemObject: TRUE
284 rIDAvailablePool: 133001-1073741823""", ["relax:0"])
286 # urgent replication should be enabled when creating
287 res
= self
.ldb
.load_partition_usn(self
.base_dn
)
288 self
.assertEquals(res
["uSNHighest"], res
["uSNUrgent"])
290 # urgent replication should be enabled when modifying
292 m
.dn
= Dn(ldb
, "CN=RID Manager test,CN=System," + self
.base_dn
)
293 m
["systemFlags"] = MessageElement("0", FLAG_MOD_REPLACE
,
296 res
= self
.ldb
.load_partition_usn(self
.base_dn
)
297 self
.assertEquals(res
["uSNHighest"], res
["uSNUrgent"])
299 # urgent replication should NOT be enabled when deleting
300 self
.delete_force(self
.ldb
, "CN=RID Manager test,CN=System," + self
.base_dn
)
301 res
= self
.ldb
.load_partition_usn(self
.base_dn
)
302 self
.assertNotEquals(res
["uSNHighest"], res
["uSNUrgent"])
305 def test_urgent_attributes(self
):
306 '''Test if the urgent replication is activated
307 when handling urgent attributes of an object'''
310 "dn": "cn=user UrgAttr test,cn=users," + self
.base_dn
,
311 "objectclass":"user",
312 "samaccountname":"user UrgAttr test",
313 "userAccountControl":str(dsdb
.UF_NORMAL_ACCOUNT
),
316 "description":"urgent attributes test description"})
318 # urgent replication should NOT be enabled when creating
319 res
= self
.ldb
.load_partition_usn(self
.base_dn
)
320 self
.assertNotEquals(res
["uSNHighest"], res
["uSNUrgent"])
322 # urgent replication should be enabled when modifying userAccountControl
324 m
.dn
= Dn(ldb
, "cn=user UrgAttr test,cn=users," + self
.base_dn
)
325 m
["userAccountControl"] = MessageElement(str(dsdb
.UF_NORMAL_ACCOUNT
+dsdb
.UF_SMARTCARD_REQUIRED
), FLAG_MOD_REPLACE
,
326 "userAccountControl")
328 res
= self
.ldb
.load_partition_usn(self
.base_dn
)
329 self
.assertEquals(res
["uSNHighest"], res
["uSNUrgent"])
331 # urgent replication should be enabled when modifying lockoutTime
333 m
.dn
= Dn(ldb
, "cn=user UrgAttr test,cn=users," + self
.base_dn
)
334 m
["lockoutTime"] = MessageElement("1", FLAG_MOD_REPLACE
,
337 res
= self
.ldb
.load_partition_usn(self
.base_dn
)
338 self
.assertEquals(res
["uSNHighest"], res
["uSNUrgent"])
340 # urgent replication should be enabled when modifying pwdLastSet
342 m
.dn
= Dn(ldb
, "cn=user UrgAttr test,cn=users," + self
.base_dn
)
343 m
["pwdLastSet"] = MessageElement("1", FLAG_MOD_REPLACE
,
346 res
= self
.ldb
.load_partition_usn(self
.base_dn
)
347 self
.assertEquals(res
["uSNHighest"], res
["uSNUrgent"])
349 # urgent replication should NOT be enabled when modifying a not-urgent
352 m
.dn
= Dn(ldb
, "cn=user UrgAttr test,cn=users," + self
.base_dn
)
353 m
["description"] = MessageElement("updated urgent attributes test description",
354 FLAG_MOD_REPLACE
, "description")
356 res
= self
.ldb
.load_partition_usn(self
.base_dn
)
357 self
.assertNotEquals(res
["uSNHighest"], res
["uSNUrgent"])
359 # urgent replication should NOT be enabled when deleting
360 self
.delete_force(self
.ldb
, "cn=user UrgAttr test,cn=users," + self
.base_dn
)
361 res
= self
.ldb
.load_partition_usn(self
.base_dn
)
362 self
.assertNotEquals(res
["uSNHighest"], res
["uSNUrgent"])
365 if not "://" in host
:
366 if os
.path
.isfile(host
):
367 host
= "tdb://%s" % host
369 host
= "ldap://%s" % host
372 ldb
= SamDB(host
, credentials
=creds
, session_info
=system_session(lp
), lp
=lp
,
375 runner
= SubunitTestRunner()
377 if not runner
.run(unittest
.makeSuite(UrgentReplicationTests
)).wasSuccessful():