libsmb: Use clistr_smb2_extract_snapshot_token() in cli_smb2_create_fnum_send()
[Samba.git] / python / samba / tests / join.py
blobda34171da280d01f13a67e715e08675be648ebe8
1 # Test joining as a DC and check the join was done right
3 # Copyright (C) Andrew Bartlett <abartlet@samba.org> 2017
5 # This program is free software; you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; either version 3 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
19 import samba
20 import sys
21 import shutil
22 import os
23 from samba.tests.dns_base import DNSTKeyTest
24 from samba.join import DCJoinContext
25 from samba.dcerpc import drsuapi, misc, dns
26 from samba.credentials import Credentials
27 from samba.provision import interface_ips_v4
30 def get_logger(name="subunit"):
31 """Get a logger object."""
32 import logging
33 logger = logging.getLogger(name)
34 logger.addHandler(logging.StreamHandler(sys.stderr))
35 return logger
38 class JoinTestCase(DNSTKeyTest):
39 def setUp(self):
40 self.server = samba.tests.env_get_var_value("SERVER")
41 self.server_ip = samba.tests.env_get_var_value("SERVER_IP")
42 super(JoinTestCase, self).setUp()
43 self.lp = samba.tests.env_loadparm()
44 self.creds = self.get_credentials()
45 self.netbios_name = "jointest1"
46 logger = get_logger()
48 self.join_ctx = DCJoinContext(server=self.server, creds=self.creds,
49 lp=self.get_loadparm(),
50 netbios_name=self.netbios_name,
51 targetdir=self.tempdir,
52 domain=None, logger=logger,
53 dns_backend="SAMBA_INTERNAL")
54 self.join_ctx.userAccountControl = (samba.dsdb.UF_SERVER_TRUST_ACCOUNT |
55 samba.dsdb.UF_TRUSTED_FOR_DELEGATION)
57 self.join_ctx.replica_flags |= (drsuapi.DRSUAPI_DRS_WRIT_REP |
58 drsuapi.DRSUAPI_DRS_FULL_SYNC_IN_PROGRESS)
59 self.join_ctx.domain_replica_flags = self.join_ctx.replica_flags
60 self.join_ctx.secure_channel_type = misc.SEC_CHAN_BDC
62 self.join_ctx.cleanup_old_join()
64 self.join_ctx.force_all_ips = True
66 self.join_ctx.do_join()
68 def tearDown(self):
69 try:
70 paths = self.join_ctx.paths
71 except AttributeError:
72 paths = None
74 if paths is not None:
75 shutil.rmtree(paths.private_dir)
76 shutil.rmtree(paths.state_dir)
77 self.rm_dirs("etc", "msg.lock", "bind-dns")
78 self.rm_files("names.tdb")
80 self.join_ctx.cleanup_old_join(force=True)
82 super(JoinTestCase, self).tearDown()
84 def test_join_makes_records(self):
85 "create a query packet containing one query record via TCP"
86 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
87 questions = []
89 name = self.join_ctx.dnshostname
90 q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
91 questions.append(q)
93 # Get expected IPs
94 IPs = interface_ips_v4(self.lp, all_interfaces=True)
96 self.finish_name_packet(p, questions)
97 (response, response_packet) = self.dns_transaction_tcp(p, host=self.server_ip)
98 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
99 self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
100 self.assertEqual(response.ancount, len(IPs))
102 questions = []
103 name = "%s._msdcs.%s" % (self.join_ctx.ntds_guid, self.join_ctx.dnsforest)
104 q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
105 questions.append(q)
107 self.finish_name_packet(p, questions)
108 (response, response_packet) = self.dns_transaction_tcp(p, host=self.server_ip)
109 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
110 self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
112 self.assertEqual(response.ancount, 1 + len(IPs))
113 self.assertEqual(response.answers[0].rr_type, dns.DNS_QTYPE_CNAME)
114 self.assertEqual(response.answers[0].rdata, self.join_ctx.dnshostname)
115 self.assertEqual(response.answers[1].rr_type, dns.DNS_QTYPE_A)
117 def test_join_records_can_update(self):
118 dc_creds = Credentials()
119 dc_creds.guess(self.join_ctx.lp)
120 dc_creds.set_machine_account(self.join_ctx.lp)
122 self.tkey_trans(creds=dc_creds)
124 p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
125 q = self.make_name_question(self.join_ctx.dnsdomain,
126 dns.DNS_QTYPE_SOA,
127 dns.DNS_QCLASS_IN)
128 questions = []
129 questions.append(q)
130 self.finish_name_packet(p, questions)
132 updates = []
133 # Delete the old expected IPs
134 IPs = interface_ips_v4(self.lp, all_interfaces=True)
135 for IP in IPs[1:]:
136 if ":" in IP:
137 r = dns.res_rec()
138 r.name = self.join_ctx.dnshostname
139 r.rr_type = dns.DNS_QTYPE_AAAA
140 r.rr_class = dns.DNS_QCLASS_NONE
141 r.ttl = 0
142 r.length = 0xffff
143 rdata = IP
144 else:
145 r = dns.res_rec()
146 r.name = self.join_ctx.dnshostname
147 r.rr_type = dns.DNS_QTYPE_A
148 r.rr_class = dns.DNS_QCLASS_NONE
149 r.ttl = 0
150 r.length = 0xffff
151 rdata = IP
153 r.rdata = rdata
154 updates.append(r)
156 p.nscount = len(updates)
157 p.nsrecs = updates
159 mac = self.sign_packet(p, self.key_name)
160 (response, response_p) = self.dns_transaction_udp(p, self.server_ip)
161 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
162 self.verify_packet(response, response_p, mac)
164 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
165 questions = []
167 name = self.join_ctx.dnshostname
168 q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
169 questions.append(q)
171 self.finish_name_packet(p, questions)
172 (response, response_packet) = self.dns_transaction_tcp(p, host=self.server_ip)
173 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
174 self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
175 self.assertEqual(response.ancount, 1)