2 # -*- coding: utf-8 -*-
4 # Unix SMB/CIFS implementation.
5 # Copyright (C) Kamen Mazdrashki <kamenim@samba.org> 2011
7 # This program is free software; you can redistribute it and/or modify
8 # it under the terms of the GNU General Public License as published by
9 # the Free Software Foundation; either version 3 of the License, or
10 # (at your option) any later version.
12 # This program is distributed in the hope that it will be useful,
13 # but WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 # GNU General Public License for more details.
17 # You should have received a copy of the GNU General Public License
18 # along with this program. If not, see <http://www.gnu.org/licenses/>.
26 sys
.path
.insert(0, "bin/python")
28 from samba
import dsdb
29 samba
.ensure_external_module("testtools", "testtools")
30 samba
.ensure_external_module("subunit", "subunit/python")
41 class DrsBaseTestCase(samba
.tests
.BlackboxTestCase
):
42 """Base class implementation for all DRS python tests.
43 It is intended to provide common initialization and
44 and functionality used by all DRS tests in drs/python
45 test package. For instance, DC1 and DC2 are always used
46 to pass URLs for DCs to test against"""
49 super(DrsBaseTestCase
, self
).setUp()
52 url_dc
= samba
.tests
.env_get_var_value("DC1")
53 (self
.ldb_dc1
, self
.info_dc1
) = samba
.tests
.connect_samdb_ex(url_dc
,
55 url_dc
= samba
.tests
.env_get_var_value("DC2")
56 (self
.ldb_dc2
, self
.info_dc2
) = samba
.tests
.connect_samdb_ex(url_dc
,
59 # cache some of RootDSE props
60 self
.schema_dn
= self
.info_dc1
["schemaNamingContext"][0]
61 self
.domain_dn
= self
.info_dc1
["defaultNamingContext"][0]
62 self
.config_dn
= self
.info_dc1
["configurationNamingContext"][0]
63 self
.forest_level
= int(self
.info_dc1
["forestFunctionality"][0])
65 # we will need DCs DNS names for 'samba-tool drs' command
66 self
.dnsname_dc1
= self
.info_dc1
["dnsHostName"][0]
67 self
.dnsname_dc2
= self
.info_dc2
["dnsHostName"][0]
70 super(DrsBaseTestCase
, self
).tearDown()
72 def _GUID_string(self
, guid
):
73 return self
.ldb_dc1
.schema_format_value("objectGUID", guid
)
75 def _ldap_schemaUpdateNow(self
, sam_db
):
77 "schemaUpdateNow": "1"}
78 m
= Message
.from_dict(sam_db
, rec
, FLAG_MOD_REPLACE
)
81 def _deleted_objects_dn(self
, sam_ldb
):
82 wkdn
= "<WKGUID=18E2EA80684F11D2B9AA00C04F79F805,%s>" % self
.domain_dn
83 res
= sam_ldb
.search(base
=wkdn
,
85 controls
=["show_deleted:1"])
86 self
.assertEquals(len(res
), 1)
87 return str(res
[0]["dn"])
89 def _lost_and_found_dn(self
, sam_ldb
, nc
):
90 wkdn
= "<WKGUID=%s,%s>" % (dsdb
.DS_GUID_LOSTANDFOUND_CONTAINER
, nc
)
91 res
= sam_ldb
.search(base
=wkdn
,
93 self
.assertEquals(len(res
), 1)
94 return str(res
[0]["dn"])
96 def _make_obj_name(self
, prefix
):
97 return prefix
+ time
.strftime("%s", time
.gmtime())
99 def _samba_tool_cmdline(self
, drs_command
):
100 # find out where is net command
101 samba_tool_cmd
= os
.path
.abspath("./bin/samba-tool")
102 # make command line credentials string
103 creds
= self
.get_credentials()
104 cmdline_auth
= "-U%s/%s%%%s" % (creds
.get_domain(),
105 creds
.get_username(), creds
.get_password())
106 # bin/samba-tool drs <drs_command> <cmdline_auth>
107 return "%s drs %s %s" % (samba_tool_cmd
, drs_command
, cmdline_auth
)
109 def _net_drs_replicate(self
, DC
, fromDC
, nc_dn
=None, forced
=True, local
=False, full_sync
=False):
111 nc_dn
= self
.domain_dn
112 # make base command line
113 samba_tool_cmdline
= self
._samba
_tool
_cmdline
("replicate")
115 samba_tool_cmdline
+= " --sync-forced"
117 samba_tool_cmdline
+= " --local"
119 samba_tool_cmdline
+= " --full-sync"
120 # bin/samba-tool drs replicate <Dest_DC_NAME> <Src_DC_NAME> <Naming Context>
121 cmd_line
= "%s %s %s %s" % (samba_tool_cmdline
, DC
, fromDC
, nc_dn
)
122 return self
.check_output(cmd_line
)
124 def _enable_inbound_repl(self
, DC
):
125 # make base command line
126 samba_tool_cmd
= self
._samba
_tool
_cmdline
("options")
127 # disable replication
128 self
.check_run("%s %s --dsa-option=-DISABLE_INBOUND_REPL" %(samba_tool_cmd
, DC
))
130 def _disable_inbound_repl(self
, DC
):
131 # make base command line
132 samba_tool_cmd
= self
._samba
_tool
_cmdline
("options")
133 # disable replication
134 self
.check_run("%s %s --dsa-option=+DISABLE_INBOUND_REPL" %(samba_tool_cmd
, DC
))