2 # -*- coding: utf-8 -*-
4 # Tests various schema replication scenarios
6 # Copyright (C) Kamen Mazdrashki <kamenim@samba.org> 2010
8 # This program is free software; you can redistribute it and/or modify
9 # it under the terms of the GNU General Public License as published by
10 # the Free Software Foundation; either version 3 of the License, or
11 # (at your option) any later version.
13 # This program is distributed in the hope that it will be useful,
14 # but WITHOUT ANY WARRANTY; without even the implied warranty of
15 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 # GNU General Public License for more details.
18 # You should have received a copy of the GNU General Public License
19 # along with this program. If not, see <http://www.gnu.org/licenses/>.
24 # export DC1=dc1_dns_name
25 # export DC2=dc2_dns_name
26 # export SUBUNITRUN=$samba4srcdir/scripting/bin/subunitrun
27 # PYTHONPATH="$PYTHONPATH:$samba4srcdir/torture/drs/python" $SUBUNITRUN repl_schema -U"$DOMAIN/$DC_USERNAME"%"$DC_PASSWORD"
35 sys
.path
.append("bin/python")
37 from samba
.auth
import system_session
38 from ldb
import LdbError
, ERR_NO_SUCH_OBJECT
39 from ldb
import SCOPE_BASE
, SCOPE_SUBTREE
40 from samba
.samdb
import SamDB
45 class DrsReplSchemaTestCase(samba
.tests
.TestCase
):
53 # prefix for all objects created
57 super(DrsReplSchemaTestCase
, self
).setUp()
59 # connect to DCs singleton
60 self
._dc
_connect
("dc1", "DC1", ldap_only
=True)
61 self
._dc
_connect
("dc2", "DC2", ldap_only
=True)
63 # initialize objects prefix if not done yet
64 if self
.obj_prefix
is None:
65 t
= time
.strftime("%s", time
.gmtime())
66 DrsReplSchemaTestCase
.obj_prefix
= "DrsReplSchema-%s-" % t
68 # cache some of RootDSE props
69 self
.schema_dn
= self
.info_dc1
["schemaNamingContext"][0]
70 self
.domain_dn
= self
.info_dc1
["defaultNamingContext"][0]
71 self
.config_dn
= self
.info_dc1
["configurationNamingContext"][0]
72 self
.forest_level
= int(self
.info_dc1
["forestFunctionality"][0])
74 # we will need DCs DNS names for 'samba-tool drs' command
75 self
.dnsname_dc1
= self
.info_dc1
["dnsHostName"][0]
76 self
.dnsname_dc2
= self
.info_dc2
["dnsHostName"][0]
79 super(DrsReplSchemaTestCase
, self
).tearDown()
82 def _dc_connect(cls
, attr_name
, env_var
, ldap_only
=True):
84 attr_name_ldb
= "ldb_" + attr_name
85 if hasattr(cls
, attr_name_ldb
):
86 ldb_dc
= getattr(cls
, attr_name_ldb
)
88 url_dc
= samba
.tests
.env_get_var_value(env_var
)
89 ldb_dc
= samba
.tests
.connect_samdb(url_dc
, ldap_only
=ldap_only
)
90 res
= ldb_dc
.search(base
="", expression
="", scope
=SCOPE_BASE
, attrs
=["*"])
92 setattr(cls
, "ldb_" + attr_name
, ldb_dc
)
93 setattr(cls
, "url_" + attr_name
, url_dc
)
94 setattr(cls
, "info_" + attr_name
, info_dc
)
97 def _net_drs_replicate(self
, DC
, fromDC
, nc_dn
):
98 """Triggers replication cycle on 'DC' to
99 replicate from 'fromDC'. Naming context to
100 be replicated is 'nc_dn' dn"""
101 # find out where is net command
102 samba_tool_cmd
= os
.path
.abspath("./bin/samba-tool")
103 # make command line credentials string
104 creds
= samba
.tests
.cmdline_credentials
105 cmd_line_auth
= "-U%s/%s%%%s" % (creds
.get_domain(),
106 creds
.get_username(), creds
.get_password())
107 # bin/samba-tool drs replicate <Dest_DC_NAME> <Src_DC_NAME> <Naming Context>
108 cmd_line
= "%s drs replicate %s %s %s %s" % (samba_tool_cmd
, DC
, fromDC
,
109 nc_dn
, cmd_line_auth
)
110 ret
= os
.system(cmd_line
)
111 self
.assertEquals(ret
, 0, "Replicating %s from %s has failed!" % (DC
, fromDC
))
113 def _GUID_string(self
, guid
):
114 return self
.ldb_dc1
.schema_format_value("objectGUID", guid
)
116 def _ldap_schemaUpdateNow(self
, sam_db
):
123 sam_db
.modify_ldif(ldif
)
125 def _make_obj_names(self
, base_name
):
126 '''Try to create a unique name for an object
127 that is to be added to schema'''
128 obj_name
= self
.obj_prefix
+ base_name
129 obj_dn
= "CN=%s,%s" % (obj_name
, self
.schema_dn
)
130 return (obj_name
, obj_dn
)
132 def _make_class_ldif(self
, class_name
, class_dn
, attrs
=None):
134 dn: """ + class_dn
+ """
136 objectClass: classSchema
137 cn: """ + class_name
+ """
138 governsId: 1.2.840.""" + str(random
.randint(1,100000)) + """.1.5.13
140 objectClassCategory: 1
141 subClassOf: organizationalPerson
146 def _check_object(self
, obj_dn
):
147 '''Check if object obj_dn exists on both DCs'''
148 res_dc1
= self
.ldb_dc1
.search(base
=obj_dn
,
151 self
.assertEquals(len(res_dc1
), 1,
152 "%s doesn't exists on %s" % (obj_dn
, self
.dnsname_dc1
))
154 res_dc2
= self
.ldb_dc2
.search(base
=obj_dn
,
157 except LdbError
, (ERR_NO_SUCH_OBJECT
, _
):
158 self
.fail("%s doesn't exists on %s" % (obj_dn
, self
.dnsname_dc2
))
159 self
.assertEquals(len(res_dc2
), 1,
160 "%s doesn't exists on %s" % (obj_dn
, self
.dnsname_dc2
))
163 """Basic plan is to create bunch of classSchema
164 and attributeSchema objects, replicate Schema NC
165 and then check all objects are replicated correctly"""
167 # add new classSchema object
168 (class_name
, class_dn
) = self
._make
_obj
_names
("cls-A")
169 ldif
= self
._make
_class
_ldif
(class_name
, class_dn
)
170 self
.ldb_dc1
.add_ldif(ldif
)
171 self
._ldap
_schemaUpdateNow
(self
.ldb_dc1
)
172 # force replication from DC1 to DC2
173 self
._net
_drs
_replicate
(DC
=self
.dnsname_dc2
, fromDC
=self
.dnsname_dc1
, nc_dn
=self
.schema_dn
)
174 # check object is replicated
175 self
._check
_object
(class_dn
)