1 # Test joining as a DC and check the join was done right
3 # Copyright (C) Andrew Bartlett <abartlet@samba.org> 2017
5 # This program is free software; you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; either version 3 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
23 from samba
.tests
.dns_base
import DNSTKeyTest
24 from samba
.join
import DCJoinContext
25 from samba
.dcerpc
import drsuapi
, misc
, dns
26 from samba
.credentials
import Credentials
29 def get_logger(name
="subunit"):
30 """Get a logger object."""
32 logger
= logging
.getLogger(name
)
33 logger
.addHandler(logging
.StreamHandler(sys
.stderr
))
37 class JoinTestCase(DNSTKeyTest
):
39 self
.server
= samba
.tests
.env_get_var_value("SERVER")
40 self
.server_ip
= samba
.tests
.env_get_var_value("SERVER_IP")
41 super(JoinTestCase
, self
).setUp()
42 self
.lp
= samba
.tests
.env_loadparm()
43 self
.creds
= self
.get_credentials()
44 self
.netbios_name
= "jointest1"
47 self
.join_ctx
= DCJoinContext(server
=self
.server
, creds
=self
.creds
,
48 lp
=self
.get_loadparm(),
49 netbios_name
=self
.netbios_name
,
50 targetdir
=self
.tempdir
,
51 domain
=None, logger
=logger
,
52 dns_backend
="SAMBA_INTERNAL")
53 self
.join_ctx
.userAccountControl
= (samba
.dsdb
.UF_SERVER_TRUST_ACCOUNT |
54 samba
.dsdb
.UF_TRUSTED_FOR_DELEGATION
)
56 self
.join_ctx
.replica_flags |
= (drsuapi
.DRSUAPI_DRS_WRIT_REP |
57 drsuapi
.DRSUAPI_DRS_FULL_SYNC_IN_PROGRESS
)
58 self
.join_ctx
.domain_replica_flags
= self
.join_ctx
.replica_flags
59 self
.join_ctx
.secure_channel_type
= misc
.SEC_CHAN_BDC
61 self
.join_ctx
.cleanup_old_join()
63 self
.join_ctx
.force_all_ips
= True
65 self
.join_ctx
.do_join()
69 paths
= self
.join_ctx
.paths
70 except AttributeError:
74 shutil
.rmtree(paths
.private_dir
)
75 shutil
.rmtree(paths
.state_dir
)
76 shutil
.rmtree(os
.path
.join(self
.tempdir
, "etc"))
77 shutil
.rmtree(os
.path
.join(self
.tempdir
, "msg.lock"))
78 os
.unlink(os
.path
.join(self
.tempdir
, "names.tdb"))
79 shutil
.rmtree(os
.path
.join(self
.tempdir
, "bind-dns"))
81 self
.join_ctx
.cleanup_old_join(force
=True)
83 super(JoinTestCase
, self
).tearDown()
85 def test_join_makes_records(self
):
86 "create a query packet containing one query record via TCP"
87 p
= self
.make_name_packet(dns
.DNS_OPCODE_QUERY
)
90 name
= self
.join_ctx
.dnshostname
91 q
= self
.make_name_question(name
, dns
.DNS_QTYPE_A
, dns
.DNS_QCLASS_IN
)
95 IPs
= samba
.interface_ips(self
.lp
)
97 self
.finish_name_packet(p
, questions
)
98 (response
, response_packet
) = self
.dns_transaction_tcp(p
, host
=self
.server_ip
)
99 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_OK
)
100 self
.assert_dns_opcode_equals(response
, dns
.DNS_OPCODE_QUERY
)
101 self
.assertEquals(response
.ancount
, len(IPs
))
104 name
= "%s._msdcs.%s" % (self
.join_ctx
.ntds_guid
, self
.join_ctx
.dnsforest
)
105 q
= self
.make_name_question(name
, dns
.DNS_QTYPE_A
, dns
.DNS_QCLASS_IN
)
108 self
.finish_name_packet(p
, questions
)
109 (response
, response_packet
) = self
.dns_transaction_tcp(p
, host
=self
.server_ip
)
110 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_OK
)
111 self
.assert_dns_opcode_equals(response
, dns
.DNS_OPCODE_QUERY
)
113 self
.assertEquals(response
.ancount
, 1 + len(IPs
))
114 self
.assertEquals(response
.answers
[0].rr_type
, dns
.DNS_QTYPE_CNAME
)
115 self
.assertEquals(response
.answers
[0].rdata
, self
.join_ctx
.dnshostname
)
116 self
.assertEquals(response
.answers
[1].rr_type
, dns
.DNS_QTYPE_A
)
118 def test_join_records_can_update(self
):
119 dc_creds
= Credentials()
120 dc_creds
.guess(self
.join_ctx
.lp
)
121 dc_creds
.set_machine_account(self
.join_ctx
.lp
)
123 self
.tkey_trans(creds
=dc_creds
)
125 p
= self
.make_name_packet(dns
.DNS_OPCODE_UPDATE
)
126 q
= self
.make_name_question(self
.join_ctx
.dnsdomain
,
131 self
.finish_name_packet(p
, questions
)
134 # Delete the old expected IPs
135 IPs
= samba
.interface_ips(self
.lp
)
139 r
.name
= self
.join_ctx
.dnshostname
140 r
.rr_type
= dns
.DNS_QTYPE_AAAA
141 r
.rr_class
= dns
.DNS_QCLASS_NONE
147 r
.name
= self
.join_ctx
.dnshostname
148 r
.rr_type
= dns
.DNS_QTYPE_A
149 r
.rr_class
= dns
.DNS_QCLASS_NONE
157 p
.nscount
= len(updates
)
160 mac
= self
.sign_packet(p
, self
.key_name
)
161 (response
, response_p
) = self
.dns_transaction_udp(p
, self
.server_ip
)
162 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_OK
)
163 self
.verify_packet(response
, response_p
, mac
)
165 p
= self
.make_name_packet(dns
.DNS_OPCODE_QUERY
)
168 name
= self
.join_ctx
.dnshostname
169 q
= self
.make_name_question(name
, dns
.DNS_QTYPE_A
, dns
.DNS_QCLASS_IN
)
172 self
.finish_name_packet(p
, questions
)
173 (response
, response_packet
) = self
.dns_transaction_tcp(p
, host
=self
.server_ip
)
174 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_OK
)
175 self
.assert_dns_opcode_equals(response
, dns
.DNS_OPCODE_QUERY
)
176 self
.assertEquals(response
.ancount
, 1)