2 # -*- coding: utf-8 -*-
4 # Unix SMB/CIFS implementation.
5 # Copyright (C) Anatoliy Atanasov <anatoliy.atanasov@postpath.com> 2010
7 # This program is free software; you can redistribute it and/or modify
8 # it under the terms of the GNU General Public License as published by
9 # the Free Software Foundation; either version 3 of the License, or
10 # (at your option) any later version.
12 # This program is distributed in the hope that it will be useful,
13 # but WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 # GNU General Public License for more details.
17 # You should have received a copy of the GNU General Public License
18 # along with this program. If not, see <http://www.gnu.org/licenses/>.
22 # export DC1=dc1_dns_name
23 # export DC2=dc2_dns_name
24 # export SUBUNITRUN=$samba4srcdir/scripting/bin/subunitrun
25 # PYTHONPATH="$PYTHONPATH:$samba4srcdir/torture/drs/python" $SUBUNITRUN fsmo -U"$DOMAIN/$DC_USERNAME"%"$DC_PASSWORD"
32 sys
.path
.insert(0, "bin/python")
34 from ldb
import SCOPE_BASE
38 class DrsFsmoTestCase(drs_base
.DrsBaseTestCase
):
41 super(DrsFsmoTestCase
, self
).setUp()
43 # we have to wait for the replication before we make the check
44 self
.fsmo_wait_max_time
= 20
45 self
.fsmo_wait_sleep_time
= 0.2
47 # cache some of RootDSE props
48 self
.dsServiceName_dc1
= self
.info_dc1
["dsServiceName"][0]
49 self
.dsServiceName_dc2
= self
.info_dc2
["dsServiceName"][0]
50 self
.infrastructure_dn
= "CN=Infrastructure," + self
.domain_dn
51 self
.naming_dn
= "CN=Partitions," + self
.config_dn
52 self
.rid_dn
= "CN=RID Manager$,CN=System," + self
.domain_dn
55 super(DrsFsmoTestCase
, self
).tearDown()
57 def _net_fsmo_role_transfer(self
, DC
, role
):
58 # find out where is samba-tool command
59 net_cmd
= os
.path
.abspath("./bin/samba-tool")
60 # make command line credentials string
61 creds
= self
.get_credentials()
62 cmd_line_auth
= "-U%s/%s%%%s" % (creds
.get_domain(),
63 creds
.get_username(), creds
.get_password())
64 # bin/samba-tool fsmo transfer --role=role -H ldap://DC:389
65 cmd_line
= "%s fsmo transfer --role=%s -H ldap://%s:389 %s" % (net_cmd
, role
, DC
,
67 ret
= os
.system(cmd_line
)
68 self
.assertEquals(ret
, 0, "Transferring role %s to %s has failed!" % (role
, DC
))
70 def _wait_for_role_transfer(self
, ldb_dc
, role_dn
, master
):
71 """Wait for role transfer for certain amount of time
73 :return: (Result=True|False, CurrentMasterDnsName) tuple
76 retries
= int(self
.fsmo_wait_max_time
/ self
.fsmo_wait_sleep_time
) + 1
77 for i
in range(0, retries
):
78 # check if master has been transfered
79 res
= ldb_dc
.search(role_dn
,
80 scope
=SCOPE_BASE
, attrs
=["fSMORoleOwner"])
81 assert len(res
) == 1, "Only one fSMORoleOwner value expected!"
82 cur_master
= res
[0]["fSMORoleOwner"][0]
83 if master
== cur_master
:
84 return (True, cur_master
)
85 # skip last sleep, if no need to wait anymore
86 if i
!= (retries
- 1):
87 # wait a little bit before next retry
88 time
.sleep(self
.fsmo_wait_sleep_time
)
89 return (False, cur_master
)
91 def _role_transfer(self
, role
, role_dn
):
92 """Triggers transfer of role from DC1 to DC2
93 and vice versa so the role goes back to the original dc"""
94 # dc2 gets the role from dc1
95 print "Testing for %s role transfer from %s to %s" % (role
, self
.dnsname_dc1
, self
.dnsname_dc2
)
97 self
._net
_fsmo
_role
_transfer
(DC
=self
.dnsname_dc2
, role
=role
)
98 # check if the role is transfered
99 (res
, master
) = self
._wait
_for
_role
_transfer
(ldb_dc
=self
.ldb_dc2
,
101 master
=self
.dsServiceName_dc2
)
103 "Transferring %s role to %s has failed, master is: %s!"%(role
, self
.dsServiceName_dc2
, master
))
105 # dc1 gets back the role from dc2
106 print "Testing for %s role transfer from %s to %s" % (role
, self
.dnsname_dc2
, self
.dnsname_dc1
)
107 self
._net
_fsmo
_role
_transfer
(DC
=self
.dnsname_dc1
, role
=role
)
108 # check if the role is transfered
109 (res
, master
) = self
._wait
_for
_role
_transfer
(ldb_dc
=self
.ldb_dc1
,
111 master
=self
.dsServiceName_dc1
)
113 "Transferring %s role to %s has failed, master is: %s!"%(role
, self
.dsServiceName_dc1
, master
))
115 def test_SchemaMasterTransfer(self
):
116 self
._role
_transfer
(role
="schema", role_dn
=self
.schema_dn
)
118 def test_InfrastructureMasterTransfer(self
):
119 self
._role
_transfer
(role
="infrastructure", role_dn
=self
.infrastructure_dn
)
121 def test_PDCMasterTransfer(self
):
122 self
._role
_transfer
(role
="pdc", role_dn
=self
.domain_dn
)
124 def test_RIDMasterTransfer(self
):
125 self
._role
_transfer
(role
="rid", role_dn
=self
.rid_dn
)
127 def test_NamingMasterTransfer(self
):
128 self
._role
_transfer
(role
="naming", role_dn
=self
.naming_dn
)