2 # -*- coding: utf-8 -*-
3 # This is a port of the original in testprogs/ejs/ldap.js
13 sys
.path
.append("bin/python")
14 sys
.path
.append("../lib/subunit/python")
16 import samba
.getopt
as options
18 from samba
.auth
import system_session
19 from ldb
import SCOPE_BASE
, LdbError
20 from ldb
import ERR_NO_SUCH_OBJECT
21 from ldb
import Message
, MessageElement
, Dn
22 from ldb
import FLAG_MOD_REPLACE
24 from samba
import glue
26 from subunit
.run
import SubunitTestRunner
29 from samba
.ndr
import ndr_pack
, ndr_unpack
30 from samba
.dcerpc
import security
32 parser
= optparse
.OptionParser("urgent_replication [options] <host>")
33 sambaopts
= options
.SambaOptions(parser
)
34 parser
.add_option_group(sambaopts
)
35 parser
.add_option_group(options
.VersionOptions(parser
))
36 # use command line creds if available
37 credopts
= options
.CredentialsOptions(parser
)
38 parser
.add_option_group(credopts
)
39 opts
, args
= parser
.parse_args()
47 lp
= sambaopts
.get_loadparm()
48 creds
= credopts
.get_credentials(lp
)
50 class UrgentReplicationTests(unittest
.TestCase
):
52 def delete_force(self
, ldb
, dn
):
55 except LdbError
, (num
, _
):
56 self
.assertEquals(num
, ERR_NO_SUCH_OBJECT
)
58 def find_basedn(self
, ldb
):
59 res
= ldb
.search(base
="", expression
="", scope
=SCOPE_BASE
,
60 attrs
=["defaultNamingContext"])
61 self
.assertEquals(len(res
), 1)
62 return res
[0]["defaultNamingContext"][0]
66 self
.base_dn
= self
.find_basedn(ldb
)
68 print "baseDN: %s\n" % self
.base_dn
70 def test_nonurgent_object(self
):
71 '''Test if the urgent replication is not activated
72 when handling a non urgent object'''
74 "dn": "cn=nonurgenttest,cn=users," + self
.base_dn
,
76 "samaccountname":"nonurgenttest",
77 "description":"nonurgenttest description"});
79 ''' urgent replication should not be enabled when creating '''
80 res
= glue
.dsdb_load_partition_usn(self
.ldb
, self
.base_dn
)
81 self
.assertNotEquals(res
["uSNHighest"], res
["uSNUrgent"]);
83 ''' urgent replication should not be enabled when modifying '''
85 m
.dn
= Dn(ldb
, "cn=nonurgenttest,cn=users," + self
.base_dn
)
86 m
["description"] = MessageElement("new description", FLAG_MOD_REPLACE
,
89 res
= glue
.dsdb_load_partition_usn(self
.ldb
, self
.base_dn
)
90 self
.assertNotEquals(res
["uSNHighest"], res
["uSNUrgent"]);
92 ''' urgent replication should not be enabled when deleting '''
93 self
.delete_force(self
.ldb
, "cn=nonurgenttest,cn=users," + self
.base_dn
)
94 res
= glue
.dsdb_load_partition_usn(self
.ldb
, self
.base_dn
)
95 self
.assertNotEquals(res
["uSNHighest"], res
["uSNUrgent"]);
98 def test_nTDSDSA_object(self
):
99 '''Test if the urgent replication is activated
100 when handling a nTDSDSA object'''
102 "dn": "cn=test server,cn=Servers,cn=Default-First-Site-Name,cn=Sites,cn=Configuration," + self
.base_dn
,
103 "objectclass":"server",
105 "name":"test server",
106 "systemFlags":"50000000"});
109 """dn: cn=NTDS Settings test,cn=test server,cn=Servers,cn=Default-First-Site-Name,cn=Sites,cn=Configuration,%s""" % (self
.base_dn
) + """
111 cn: NTDS Settings test
114 systemFlags: 33554432""", ["relax:0"]);
116 ''' urgent replication should be enabled when creation '''
117 res
= glue
.dsdb_load_partition_usn(self
.ldb
, "cn=Configuration," + self
.base_dn
)
118 self
.assertEquals(res
["uSNHighest"], res
["uSNUrgent"]);
120 ''' urgent replication should NOT be enabled when modifying '''
122 m
.dn
= Dn(ldb
, "cn=NTDS Settings test,cn=test server,cn=Servers,cn=Default-First-Site-Name,cn=Sites,cn=Configuration," + self
.base_dn
)
123 m
["options"] = MessageElement("0", FLAG_MOD_REPLACE
,
126 res
= glue
.dsdb_load_partition_usn(self
.ldb
, "cn=Configuration," + self
.base_dn
)
127 self
.assertNotEquals(res
["uSNHighest"], res
["uSNUrgent"]);
129 ''' urgent replication should be enabled when deleting '''
130 self
.delete_force(self
.ldb
, "cn=NTDS Settings test,cn=test server,cn=Servers,cn=Default-First-Site-Name,cn=Sites,cn=Configuration," + self
.base_dn
)
131 res
= glue
.dsdb_load_partition_usn(self
.ldb
, "cn=Configuration," + self
.base_dn
)
132 self
.assertEquals(res
["uSNHighest"], res
["uSNUrgent"]);
134 self
.delete_force(self
.ldb
, "cn=test server,cn=Servers,cn=Default-First-Site-Name,cn=Sites,cn=Configuration," + self
.base_dn
)
137 def test_crossRef_object(self
):
138 '''Test if the urgent replication is activated
139 when handling a crossRef object'''
141 "dn": "CN=test crossRef,CN=Partitions,CN=Configuration,"+ self
.base_dn
,
142 "objectClass": "crossRef",
143 "cn": "test crossRef",
145 "nCName": self
.base_dn
,
146 "showInAdvancedViewOnly": "TRUE",
147 "name": "test crossRef",
148 "systemFlags": "1"});
150 ''' urgent replication should be enabled when creating '''
151 res
= glue
.dsdb_load_partition_usn(self
.ldb
, "cn=Configuration," + self
.base_dn
)
152 self
.assertEquals(res
["uSNHighest"], res
["uSNUrgent"]);
154 ''' urgent replication should NOT be enabled when modifying '''
156 m
.dn
= Dn(ldb
, "cn=test crossRef,CN=Partitions,CN=Configuration," + self
.base_dn
)
157 m
["systemFlags"] = MessageElement("0", FLAG_MOD_REPLACE
,
160 res
= glue
.dsdb_load_partition_usn(self
.ldb
, "cn=Configuration," + self
.base_dn
)
161 self
.assertNotEquals(res
["uSNHighest"], res
["uSNUrgent"]);
164 ''' urgent replication should be enabled when deleting '''
165 self
.delete_force(self
.ldb
, "cn=test crossRef,CN=Partitions,CN=Configuration," + self
.base_dn
)
166 res
= glue
.dsdb_load_partition_usn(self
.ldb
, "cn=Configuration," + self
.base_dn
)
167 self
.assertEquals(res
["uSNHighest"], res
["uSNUrgent"]);
171 def test_attributeSchema_object(self
):
172 '''Test if the urgent replication is activated
173 when handling an attributeSchema object'''
177 """dn: CN=test attributeSchema,cn=Schema,CN=Configuration,%s""" % self
.base_dn
+ """
178 objectClass: attributeSchema
179 cn: test attributeSchema
181 isSingleValued: FALSE
182 showInAdvancedViewOnly: FALSE
183 attributeID: 0.9.2342.19200300.100.1.1
184 attributeSyntax: 2.5.5.12
185 adminDisplayName: test attributeSchema
186 adminDescription: test attributeSchema
190 lDAPDisplayName: test attributeSchema
191 name: test attributeSchema
192 systemFlags: 0""", ["relax:0"]);
194 ''' urgent replication should be enabled when creating '''
195 res
= glue
.dsdb_load_partition_usn(self
.ldb
, "cn=Schema,cn=Configuration," + self
.base_dn
)
196 self
.assertEquals(res
["uSNHighest"], res
["uSNUrgent"]);
199 print "Not testing urgent replication when creating attributeSchema object ...\n"
201 ''' urgent replication should be enabled when modifying '''
203 m
.dn
= Dn(ldb
, "CN=test attributeSchema,CN=Schema,CN=Configuration," + self
.base_dn
)
204 m
["lDAPDisplayName"] = MessageElement("updated test attributeSchema", FLAG_MOD_REPLACE
,
207 res
= glue
.dsdb_load_partition_usn(self
.ldb
, "cn=Schema,cn=Configuration," + self
.base_dn
)
208 self
.assertEquals(res
["uSNHighest"], res
["uSNUrgent"]);
211 def test_classSchema_object(self
):
212 '''Test if the urgent replication is activated
213 when handling a classSchema object'''
216 """dn: CN=test classSchema,CN=Schema,CN=Configuration,%s""" % self
.base_dn
+ """
217 objectClass: classSchema
221 governsID: 1.2.840.113556.1.5.999
223 showInAdvancedViewOnly: TRUE
224 adminDisplayName: test classSchema
225 adminDescription: test classSchema
226 objectClassCategory: 1
227 lDAPDisplayName: test classSchema
228 name: test classSchema
230 systemPossSuperiors: dfsConfiguration
231 systemMustContain: msDFS-SchemaMajorVersion
232 defaultSecurityDescriptor: D:(A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;DA)(A;;RPWPCRCCD
233 CLCLORCWOWDSDDTSW;;;SY)(A;;RPLCLORC;;;AU)(A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;CO)
235 defaultHidingValue: TRUE""", ["relax:0"]);
237 ''' urgent replication should be enabled when creating '''
238 res
= glue
.dsdb_load_partition_usn(self
.ldb
, "cn=Schema,cn=Configuration," + self
.base_dn
)
239 self
.assertEquals(res
["uSNHighest"], res
["uSNUrgent"]);
242 print "Not testing urgent replication when creating classSchema object ...\n"
244 ''' urgent replication should be enabled when modifying '''
246 m
.dn
= Dn(ldb
, "CN=test classSchema,CN=Schema,CN=Configuration," + self
.base_dn
)
247 m
["lDAPDisplayName"] = MessageElement("updated test classSchema", FLAG_MOD_REPLACE
,
250 res
= glue
.dsdb_load_partition_usn(self
.ldb
, "cn=Schema,cn=Configuration," + self
.base_dn
)
251 self
.assertEquals(res
["uSNHighest"], res
["uSNUrgent"]);
254 def test_secret_object(self
):
256 '''Test if the urgent replication is activated
257 when handling a secret object'''
260 "dn": "cn=test secret,cn=System," + self
.base_dn
,
261 "objectClass":"secret",
263 "name":"test secret",
264 "currentValue":"xxxxxxx"});
267 ''' urgent replication should be enabled when creationg '''
268 res
= glue
.dsdb_load_partition_usn(self
.ldb
, self
.base_dn
)
269 self
.assertEquals(res
["uSNHighest"], res
["uSNUrgent"]);
271 ''' urgent replication should be enabled when modifying '''
273 m
.dn
= Dn(ldb
, "cn=test secret,cn=System," + self
.base_dn
)
274 m
["currentValue"] = MessageElement("yyyyyyyy", FLAG_MOD_REPLACE
,
277 res
= glue
.dsdb_load_partition_usn(self
.ldb
, self
.base_dn
)
278 self
.assertEquals(res
["uSNHighest"], res
["uSNUrgent"]);
280 ''' urgent replication should NOT be enabled when deleting '''
281 self
.delete_force(self
.ldb
, "cn=test secret,cn=System," + self
.base_dn
)
282 res
= glue
.dsdb_load_partition_usn(self
.ldb
, self
.base_dn
)
283 self
.assertNotEquals(res
["uSNHighest"], res
["uSNUrgent"]);
286 def test_rIDManager_object(self
):
287 '''Test if the urgent replication is activated
288 when handling a rIDManager object'''
290 """dn: CN=RID Manager test,CN=System,%s""" % self
.base_dn
+ """
291 objectClass: rIDManager
294 showInAdvancedViewOnly: TRUE
295 name: RID Manager test
296 systemFlags: -1946157056
297 isCriticalSystemObject: TRUE
298 rIDAvailablePool: 133001-1073741823""", ["relax:0"])
300 ''' urgent replication should be enabled when creating '''
301 res
= glue
.dsdb_load_partition_usn(self
.ldb
, self
.base_dn
)
302 self
.assertEquals(res
["uSNHighest"], res
["uSNUrgent"]);
304 ''' urgent replication should be enabled when modifying '''
306 m
.dn
= Dn(ldb
, "CN=RID Manager test,CN=System," + self
.base_dn
)
307 m
["systemFlags"] = MessageElement("0", FLAG_MOD_REPLACE
,
310 res
= glue
.dsdb_load_partition_usn(self
.ldb
, self
.base_dn
)
311 self
.assertEquals(res
["uSNHighest"], res
["uSNUrgent"]);
313 ''' urgent replication should NOT be enabled when deleting '''
314 self
.delete_force(self
.ldb
, "CN=RID Manager test,CN=System," + self
.base_dn
)
315 res
= glue
.dsdb_load_partition_usn(self
.ldb
, self
.base_dn
)
316 self
.assertNotEquals(res
["uSNHighest"], res
["uSNUrgent"]);
319 def test_urgent_attributes(self
):
320 '''Test if the urgent replication is activated
321 when handling urgent attributes of an object'''
324 "dn": "cn=user UrgAttr test,cn=users," + self
.base_dn
,
325 "objectclass":"user",
326 "samaccountname":"user UrgAttr test",
327 "userAccountControl":"1",
330 "description":"urgent attributes test description"});
332 ''' urgent replication should NOT be enabled when creating '''
333 res
= glue
.dsdb_load_partition_usn(self
.ldb
, self
.base_dn
)
334 self
.assertNotEquals(res
["uSNHighest"], res
["uSNUrgent"]);
336 ''' urgent replication should be enabled when modifying userAccountControl '''
338 m
.dn
= Dn(ldb
, "cn=user UrgAttr test,cn=users," + self
.base_dn
)
339 m
["userAccountControl"] = MessageElement("0", FLAG_MOD_REPLACE
,
340 "userAccountControl")
342 res
= glue
.dsdb_load_partition_usn(self
.ldb
, self
.base_dn
)
343 self
.assertEquals(res
["uSNHighest"], res
["uSNUrgent"]);
345 ''' urgent replication should be enabled when modifying lockoutTime '''
347 m
.dn
= Dn(ldb
, "cn=user UrgAttr test,cn=users," + self
.base_dn
)
348 m
["lockoutTime"] = MessageElement("1", FLAG_MOD_REPLACE
,
351 res
= glue
.dsdb_load_partition_usn(self
.ldb
, self
.base_dn
)
352 self
.assertEquals(res
["uSNHighest"], res
["uSNUrgent"]);
354 ''' urgent replication should be enabled when modifying pwdLastSet '''
356 m
.dn
= Dn(ldb
, "cn=user UrgAttr test,cn=users," + self
.base_dn
)
357 m
["pwdLastSet"] = MessageElement("1", FLAG_MOD_REPLACE
,
360 res
= glue
.dsdb_load_partition_usn(self
.ldb
, self
.base_dn
)
361 self
.assertEquals(res
["uSNHighest"], res
["uSNUrgent"]);
363 ''' urgent replication should NOT be enabled when modifying a not-urgent attribute '''
365 m
.dn
= Dn(ldb
, "cn=user UrgAttr test,cn=users," + self
.base_dn
)
366 m
["description"] = MessageElement("updated urgent attributes test description",
367 FLAG_MOD_REPLACE
, "description")
369 res
= glue
.dsdb_load_partition_usn(self
.ldb
, self
.base_dn
)
370 self
.assertNotEquals(res
["uSNHighest"], res
["uSNUrgent"]);
372 ''' urgent replication should NOT be enabled when deleting '''
373 self
.delete_force(self
.ldb
, "cn=user UrgAttr test,cn=users," + self
.base_dn
)
374 res
= glue
.dsdb_load_partition_usn(self
.ldb
, self
.base_dn
)
375 self
.assertNotEquals(res
["uSNHighest"], res
["uSNUrgent"]);
378 if not "://" in host
:
379 if os
.path
.isfile(host
):
380 host
= "tdb://%s" % host
382 host
= "ldap://%s" % host
385 ldb
= Ldb(host
, credentials
=creds
, session_info
=system_session(), lp
=lp
)
387 runner
= SubunitTestRunner()
389 if not runner
.run(unittest
.makeSuite(UrgentReplicationTests
)).wasSuccessful():