docs-xml: "cluster addresses" dns registration
[Samba.git] / python / samba / tests / join.py
blob09a102e26af9963fe557bdf57c1e0614f830b87d
1 # Test joining as a DC and check the join was done right
3 # Copyright (C) Andrew Bartlett <abartlet@samba.org> 2017
5 # This program is free software; you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; either version 3 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
19 import samba
20 import sys
21 import shutil
22 import os
23 from samba.tests.dns_base import DNSTKeyTest
24 from samba.join import DCJoinContext
25 from samba.dcerpc import drsuapi, misc, dns
26 from samba.credentials import Credentials
29 def get_logger(name="subunit"):
30 """Get a logger object."""
31 import logging
32 logger = logging.getLogger(name)
33 logger.addHandler(logging.StreamHandler(sys.stderr))
34 return logger
37 class JoinTestCase(DNSTKeyTest):
38 def setUp(self):
39 self.server = samba.tests.env_get_var_value("SERVER")
40 self.server_ip = samba.tests.env_get_var_value("SERVER_IP")
41 super(JoinTestCase, self).setUp()
42 self.lp = samba.tests.env_loadparm()
43 self.creds = self.get_credentials()
44 self.netbios_name = "jointest1"
45 logger = get_logger()
47 self.join_ctx = DCJoinContext(server=self.server, creds=self.creds,
48 lp=self.get_loadparm(),
49 netbios_name=self.netbios_name,
50 targetdir=self.tempdir,
51 domain=None, logger=logger,
52 dns_backend="SAMBA_INTERNAL")
53 self.join_ctx.userAccountControl = (samba.dsdb.UF_SERVER_TRUST_ACCOUNT |
54 samba.dsdb.UF_TRUSTED_FOR_DELEGATION)
56 self.join_ctx.replica_flags |= (drsuapi.DRSUAPI_DRS_WRIT_REP |
57 drsuapi.DRSUAPI_DRS_FULL_SYNC_IN_PROGRESS)
58 self.join_ctx.domain_replica_flags = self.join_ctx.replica_flags
59 self.join_ctx.secure_channel_type = misc.SEC_CHAN_BDC
61 self.join_ctx.cleanup_old_join()
63 self.join_ctx.force_all_ips = True
65 self.join_ctx.do_join()
67 def tearDown(self):
68 try:
69 paths = self.join_ctx.paths
70 except AttributeError:
71 paths = None
73 if paths is not None:
74 shutil.rmtree(paths.private_dir)
75 shutil.rmtree(paths.state_dir)
76 shutil.rmtree(os.path.join(self.tempdir, "etc"))
77 shutil.rmtree(os.path.join(self.tempdir, "msg.lock"))
78 os.unlink(os.path.join(self.tempdir, "names.tdb"))
79 shutil.rmtree(os.path.join(self.tempdir, "bind-dns"))
81 self.join_ctx.cleanup_old_join(force=True)
83 super(JoinTestCase, self).tearDown()
85 def test_join_makes_records(self):
86 "create a query packet containing one query record via TCP"
87 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
88 questions = []
90 name = self.join_ctx.dnshostname
91 q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
92 questions.append(q)
94 # Get expected IPs
95 IPs = samba.interface_ips(self.lp)
97 self.finish_name_packet(p, questions)
98 (response, response_packet) = self.dns_transaction_tcp(p, host=self.server_ip)
99 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
100 self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
101 self.assertEquals(response.ancount, len(IPs))
103 questions = []
104 name = "%s._msdcs.%s" % (self.join_ctx.ntds_guid, self.join_ctx.dnsforest)
105 q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
106 questions.append(q)
108 self.finish_name_packet(p, questions)
109 (response, response_packet) = self.dns_transaction_tcp(p, host=self.server_ip)
110 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
111 self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
113 self.assertEquals(response.ancount, 1 + len(IPs))
114 self.assertEquals(response.answers[0].rr_type, dns.DNS_QTYPE_CNAME)
115 self.assertEquals(response.answers[0].rdata, self.join_ctx.dnshostname)
116 self.assertEquals(response.answers[1].rr_type, dns.DNS_QTYPE_A)
118 def test_join_records_can_update(self):
119 dc_creds = Credentials()
120 dc_creds.guess(self.join_ctx.lp)
121 dc_creds.set_machine_account(self.join_ctx.lp)
123 self.tkey_trans(creds=dc_creds)
125 p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
126 q = self.make_name_question(self.join_ctx.dnsdomain,
127 dns.DNS_QTYPE_SOA,
128 dns.DNS_QCLASS_IN)
129 questions = []
130 questions.append(q)
131 self.finish_name_packet(p, questions)
133 updates = []
134 # Delete the old expected IPs
135 IPs = samba.interface_ips(self.lp)
136 for IP in IPs[1:]:
137 if ":" in IP:
138 r = dns.res_rec()
139 r.name = self.join_ctx.dnshostname
140 r.rr_type = dns.DNS_QTYPE_AAAA
141 r.rr_class = dns.DNS_QCLASS_NONE
142 r.ttl = 0
143 r.length = 0xffff
144 rdata = IP
145 else:
146 r = dns.res_rec()
147 r.name = self.join_ctx.dnshostname
148 r.rr_type = dns.DNS_QTYPE_A
149 r.rr_class = dns.DNS_QCLASS_NONE
150 r.ttl = 0
151 r.length = 0xffff
152 rdata = IP
154 r.rdata = rdata
155 updates.append(r)
157 p.nscount = len(updates)
158 p.nsrecs = updates
160 mac = self.sign_packet(p, self.key_name)
161 (response, response_p) = self.dns_transaction_udp(p, self.server_ip)
162 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
163 self.verify_packet(response, response_p, mac)
165 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
166 questions = []
168 name = self.join_ctx.dnshostname
169 q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
170 questions.append(q)
172 self.finish_name_packet(p, questions)
173 (response, response_packet) = self.dns_transaction_tcp(p, host=self.server_ip)
174 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
175 self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
176 self.assertEquals(response.ancount, 1)