s4:dsdb/subtree_delete: do an early return and avoid some nesting
[Samba/gebeck_regimport.git] / source4 / torture / drs / python / drs_base.py
blobf692edc4f571d7b7179f1cde46b363edf06e23bd
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
4 # Unix SMB/CIFS implementation.
5 # Copyright (C) Kamen Mazdrashki <kamenim@samba.org> 2011
7 # This program is free software; you can redistribute it and/or modify
8 # it under the terms of the GNU General Public License as published by
9 # the Free Software Foundation; either version 3 of the License, or
10 # (at your option) any later version.
12 # This program is distributed in the hope that it will be useful,
13 # but WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 # GNU General Public License for more details.
17 # You should have received a copy of the GNU General Public License
18 # along with this program. If not, see <http://www.gnu.org/licenses/>.
22 import sys
23 import time
24 import os
26 sys.path.insert(0, "bin/python")
27 import samba
28 from samba import dsdb
29 samba.ensure_external_module("testtools", "testtools")
30 samba.ensure_external_module("subunit", "subunit/python")
32 from ldb import (
33 SCOPE_BASE,
34 Message,
35 FLAG_MOD_REPLACE,
38 import samba.tests
41 class DrsBaseTestCase(samba.tests.BlackboxTestCase):
42 """Base class implementation for all DRS python tests.
43 It is intended to provide common initialization and
44 and functionality used by all DRS tests in drs/python
45 test package. For instance, DC1 and DC2 are always used
46 to pass URLs for DCs to test against"""
48 def setUp(self):
49 super(DrsBaseTestCase, self).setUp()
51 # connect to DCs
52 url_dc = samba.tests.env_get_var_value("DC1")
53 (self.ldb_dc1, self.info_dc1) = samba.tests.connect_samdb_ex(url_dc,
54 ldap_only=True)
55 url_dc = samba.tests.env_get_var_value("DC2")
56 (self.ldb_dc2, self.info_dc2) = samba.tests.connect_samdb_ex(url_dc,
57 ldap_only=True)
59 # cache some of RootDSE props
60 self.schema_dn = self.info_dc1["schemaNamingContext"][0]
61 self.domain_dn = self.info_dc1["defaultNamingContext"][0]
62 self.config_dn = self.info_dc1["configurationNamingContext"][0]
63 self.forest_level = int(self.info_dc1["forestFunctionality"][0])
65 # we will need DCs DNS names for 'samba-tool drs' command
66 self.dnsname_dc1 = self.info_dc1["dnsHostName"][0]
67 self.dnsname_dc2 = self.info_dc2["dnsHostName"][0]
69 def tearDown(self):
70 super(DrsBaseTestCase, self).tearDown()
72 def _GUID_string(self, guid):
73 return self.ldb_dc1.schema_format_value("objectGUID", guid)
75 def _ldap_schemaUpdateNow(self, sam_db):
76 rec = {"dn": "",
77 "schemaUpdateNow": "1"}
78 m = Message.from_dict(sam_db, rec, FLAG_MOD_REPLACE)
79 sam_db.modify(m)
81 def _deleted_objects_dn(self, sam_ldb):
82 wkdn = "<WKGUID=18E2EA80684F11D2B9AA00C04F79F805,%s>" % self.domain_dn
83 res = sam_ldb.search(base=wkdn,
84 scope=SCOPE_BASE,
85 controls=["show_deleted:1"])
86 self.assertEquals(len(res), 1)
87 return str(res[0]["dn"])
89 def _lost_and_found_dn(self, sam_ldb, nc):
90 wkdn = "<WKGUID=%s,%s>" % (dsdb.DS_GUID_LOSTANDFOUND_CONTAINER, nc)
91 res = sam_ldb.search(base=wkdn,
92 scope=SCOPE_BASE)
93 self.assertEquals(len(res), 1)
94 return str(res[0]["dn"])
96 def _make_obj_name(self, prefix):
97 return prefix + time.strftime("%s", time.gmtime())
99 def _samba_tool_cmdline(self, drs_command):
100 # find out where is net command
101 samba_tool_cmd = os.path.abspath("./bin/samba-tool")
102 # make command line credentials string
103 creds = self.get_credentials()
104 cmdline_auth = "-U%s/%s%%%s" % (creds.get_domain(),
105 creds.get_username(), creds.get_password())
106 # bin/samba-tool drs <drs_command> <cmdline_auth>
107 return "%s drs %s %s" % (samba_tool_cmd, drs_command, cmdline_auth)
109 def _net_drs_replicate(self, DC, fromDC, nc_dn=None, forced=True, local=False, full_sync=False):
110 if nc_dn is None:
111 nc_dn = self.domain_dn
112 # make base command line
113 samba_tool_cmdline = self._samba_tool_cmdline("replicate")
114 if forced:
115 samba_tool_cmdline += " --sync-forced"
116 if local:
117 samba_tool_cmdline += " --local"
118 if full_sync:
119 samba_tool_cmdline += " --full-sync"
120 # bin/samba-tool drs replicate <Dest_DC_NAME> <Src_DC_NAME> <Naming Context>
121 cmd_line = "%s %s %s %s" % (samba_tool_cmdline, DC, fromDC, nc_dn)
122 return self.check_output(cmd_line)
124 def _enable_inbound_repl(self, DC):
125 # make base command line
126 samba_tool_cmd = self._samba_tool_cmdline("options")
127 # disable replication
128 self.check_run("%s %s --dsa-option=-DISABLE_INBOUND_REPL" %(samba_tool_cmd, DC))
130 def _disable_inbound_repl(self, DC):
131 # make base command line
132 samba_tool_cmd = self._samba_tool_cmdline("options")
133 # disable replication
134 self.check_run("%s %s --dsa-option=+DISABLE_INBOUND_REPL" %(samba_tool_cmd, DC))