2 # -*- coding: utf-8 -*-
4 # Unix SMB/CIFS implementation.
5 # Copyright (C) Anatoliy Atanasov <anatoliy.atanasov@postpath.com> 2010
7 # This program is free software; you can redistribute it and/or modify
8 # it under the terms of the GNU General Public License as published by
9 # the Free Software Foundation; either version 3 of the License, or
10 # (at your option) any later version.
12 # This program is distributed in the hope that it will be useful,
13 # but WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 # GNU General Public License for more details.
17 # You should have received a copy of the GNU General Public License
18 # along with this program. If not, see <http://www.gnu.org/licenses/>.
22 # export DC1=dc1_dns_name
23 # export DC2=dc2_dns_name
24 # export SUBUNITRUN=$samba4srcdir/scripting/bin/subunitrun
25 # PYTHONPATH="$PYTHONPATH:$samba4srcdir/torture/drs/python" $SUBUNITRUN fsmo -U"$DOMAIN/$DC_USERNAME"%"$DC_PASSWORD"
32 sys
.path
.append("bin/python")
34 from ldb
import SCOPE_BASE
38 class DrsFsmoTestCase(samba
.tests
.TestCase
):
48 super(DrsFsmoTestCase
, self
).setUp()
50 # we have to wait for the replication before we make the check
51 self
.fsmo_wait_max_time
= 20
52 self
.fsmo_wait_sleep_time
= 0.2
53 # connect to DCs singleton
54 if self
.ldb_dc1
is None:
55 DrsFsmoTestCase
.dc1
= samba
.tests
.env_get_var_value("DC1")
56 DrsFsmoTestCase
.ldb_dc1
= samba
.tests
.connect_samdb(self
.dc1
, ldap_only
=True)
57 if self
.ldb_dc2
is None:
58 DrsFsmoTestCase
.dc2
= samba
.tests
.env_get_var_value("DC2")
59 DrsFsmoTestCase
.ldb_dc2
= samba
.tests
.connect_samdb(self
.dc2
, ldap_only
=True)
62 if self
.info_dc1
is None:
64 res
= ldb
.search(base
="", expression
="", scope
=SCOPE_BASE
, attrs
=["*"])
65 self
.assertEquals(len(res
), 1)
66 DrsFsmoTestCase
.info_dc1
= res
[0]
67 if self
.info_dc2
is None:
69 res
= ldb
.search(base
="", expression
="", scope
=SCOPE_BASE
, attrs
=["*"])
70 self
.assertEquals(len(res
), 1)
71 DrsFsmoTestCase
.info_dc2
= res
[0]
73 # cache some of RootDSE props
74 self
.schema_dn
= self
.info_dc1
["schemaNamingContext"][0]
75 self
.domain_dn
= self
.info_dc1
["defaultNamingContext"][0]
76 self
.config_dn
= self
.info_dc1
["configurationNamingContext"][0]
77 self
.dsServiceName_dc1
= self
.info_dc1
["dsServiceName"][0]
78 self
.dsServiceName_dc2
= self
.info_dc2
["dsServiceName"][0]
79 self
.infrastructure_dn
= "CN=Infrastructure," + self
.domain_dn
80 self
.naming_dn
= "CN=Partitions," + self
.config_dn
81 self
.rid_dn
= "CN=RID Manager$,CN=System," + self
.domain_dn
83 # we will need DCs DNS names for 'net fsmo' command
84 self
.dnsname_dc1
= self
.info_dc1
["dnsHostName"][0]
85 self
.dnsname_dc2
= self
.info_dc2
["dnsHostName"][0]
89 super(DrsFsmoTestCase
, self
).tearDown()
91 def _net_fsmo_role_transfer(self
, DC
, role
):
92 # find out where is samba-tool command
93 net_cmd
= os
.path
.abspath("./bin/samba-tool")
94 # make command line credentials string
95 creds
= self
.get_credentials()
96 cmd_line_auth
= "-U%s/%s%%%s" % (creds
.get_domain(),
97 creds
.get_username(), creds
.get_password())
98 # bin/samba-tool fsmo transfer --role=role --host=ldap://DC:389
99 cmd_line
= "%s fsmo transfer --role=%s --host=ldap://%s:389 %s" % (net_cmd
, role
, DC
,
101 ret
= os
.system(cmd_line
)
102 self
.assertEquals(ret
, 0, "Transferring role %s to %s has failed!" % (role
, DC
))
105 def _wait_for_role_transfer(self
, ldb_dc
, role_dn
, master
):
106 """Wait for role transfer for certain amount of time
108 :return: (Result=True|False, CurrentMasterDnsName) tuple
111 retries
= int(self
.fsmo_wait_max_time
/ self
.fsmo_wait_sleep_time
) + 1
112 for i
in range(0, retries
):
113 # check if master has been transfered
114 res
= ldb_dc
.search(role_dn
,
115 scope
=SCOPE_BASE
, attrs
=["fSMORoleOwner"])
116 assert len(res
) == 1, "Only one fSMORoleOwner value expected!"
117 cur_master
= res
[0]["fSMORoleOwner"][0]
118 if master
== cur_master
:
119 return (True, cur_master
)
120 # skip last sleep, if no need to wait anymore
121 if i
!= (retries
- 1):
122 # wait a little bit before next retry
123 time
.sleep(self
.fsmo_wait_sleep_time
)
124 return (False, cur_master
)
126 def _role_transfer(self
, role
, role_dn
):
127 """Triggers transfer of role from DC1 to DC2
128 and vice versa so the role goes back to the original dc"""
129 # dc2 gets the role from dc1
130 print "Testing for %s role transfer from %s to %s" % (role
, self
.dnsname_dc1
, self
.dnsname_dc2
)
132 self
._net
_fsmo
_role
_transfer
(DC
=self
.dnsname_dc2
, role
=role
)
133 # check if the role is transfered
134 (res
, master
) = self
._wait
_for
_role
_transfer
(ldb_dc
=self
.ldb_dc2
,
136 master
=self
.dsServiceName_dc2
)
138 "Transferring %s role to %s has failed, master is: %s!"%(role
, self
.dsServiceName_dc2
, master
))
140 # dc1 gets back the role from dc2
141 print "Testing for %s role transfer from %s to %s" % (role
, self
.dnsname_dc2
, self
.dnsname_dc1
)
142 self
._net
_fsmo
_role
_transfer
(DC
=self
.dnsname_dc1
, role
=role
)
143 # check if the role is transfered
144 (res
, master
) = self
._wait
_for
_role
_transfer
(ldb_dc
=self
.ldb_dc1
,
146 master
=self
.dsServiceName_dc1
)
148 "Transferring %s role to %s has failed, master is: %s!"%(role
, self
.dsServiceName_dc1
, master
))
151 def test_SchemaMasterTransfer(self
):
152 self
._role
_transfer
(role
="schema", role_dn
=self
.schema_dn
)
155 def test_InfrastructureMasterTransfer(self
):
156 self
._role
_transfer
(role
="infrastructure", role_dn
=self
.infrastructure_dn
)
159 def test_PDCMasterTransfer(self
):
160 self
._role
_transfer
(role
="pdc", role_dn
=self
.domain_dn
)
163 def test_RIDMasterTransfer(self
):
164 self
._role
_transfer
(role
="rid", role_dn
=self
.rid_dn
)
167 def test_NamingMasterTransfer(self
):
168 self
._role
_transfer
(role
="naming", role_dn
=self
.naming_dn
)