s3: smbd: Move check_fsp_open() and check_fsp() to smb1_reply.c
[Samba.git] / python / samba / tests / dns_tkey.py
blob42f73ba29214962dcaef7761da61eb60d25adf8b
1 # Unix SMB/CIFS implementation.
2 # Copyright (C) Kai Blin <kai@samba.org> 2011
3 # Copyright (C) Ralph Boehme <slow@samba.org> 2016
5 # This program is free software; you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; either version 3 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
19 import sys
20 import optparse
21 import samba.getopt as options
22 from samba.dcerpc import dns
23 from samba.tests.subunitrun import SubunitOptions, TestProgram
24 from samba.tests.dns_base import DNSTKeyTest
26 parser = optparse.OptionParser("dns_tkey.py <server name> <server ip> [options]")
27 sambaopts = options.SambaOptions(parser)
28 parser.add_option_group(sambaopts)
30 # This timeout only has relevance when testing against Windows
31 # Format errors tend to return patchy responses, so a timeout is needed.
32 parser.add_option("--timeout", type="int", dest="timeout",
33 help="Specify timeout for DNS requests")
35 # use command line creds if available
36 credopts = options.CredentialsOptions(parser)
37 parser.add_option_group(credopts)
38 subunitopts = SubunitOptions(parser)
39 parser.add_option_group(subunitopts)
41 opts, args = parser.parse_args()
42 timeout = opts.timeout
44 if len(args) < 2:
45 parser.print_usage()
46 sys.exit(1)
48 server_name = args[0]
49 server_ip = args[1]
52 class TestDNSUpdates(DNSTKeyTest):
53 def setUp(self):
54 self.server = server_name
55 self.server_ip = server_ip
56 super(TestDNSUpdates, self).setUp()
58 def test_tkey(self):
59 "test DNS TKEY handshake"
61 self.tkey_trans()
63 def test_update_wo_tsig(self):
64 "test DNS update without TSIG record"
66 p = self.make_update_request()
67 (response, response_p) = self.dns_transaction_udp(p, self.server_ip)
68 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_REFUSED)
70 rcode = self.search_record(self.newrecname)
71 self.assert_rcode_equals(rcode, dns.DNS_RCODE_NXDOMAIN)
73 def test_update_tsig_bad_keyname(self):
74 "test DNS update with a TSIG record with a bad keyname"
76 self.tkey_trans()
78 p = self.make_update_request()
79 self.sign_packet(p, "badkey")
80 (response, response_p) = self.dns_transaction_udp(p, self.server_ip)
81 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NOTAUTH)
82 tsig_record = response.additional[0].rdata
83 self.assertEqual(tsig_record.error, dns.DNS_RCODE_BADKEY)
84 self.assertEqual(tsig_record.mac_size, 0)
86 rcode = self.search_record(self.newrecname)
87 self.assert_rcode_equals(rcode, dns.DNS_RCODE_NXDOMAIN)
89 def test_update_tsig_bad_mac(self):
90 "test DNS update with a TSIG record with a bad MAC"
92 self.tkey_trans()
94 p = self.make_update_request()
95 self.bad_sign_packet(p, self.key_name)
96 (response, response_p) = self.dns_transaction_udp(p, self.server_ip)
97 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NOTAUTH)
98 tsig_record = response.additional[0].rdata
99 self.assertEqual(tsig_record.error, dns.DNS_RCODE_BADSIG)
100 self.assertEqual(tsig_record.mac_size, 0)
102 rcode = self.search_record(self.newrecname)
103 self.assert_rcode_equals(rcode, dns.DNS_RCODE_NXDOMAIN)
105 def test_update_tsig(self):
106 "test DNS update with correct TSIG record"
108 self.tkey_trans()
110 p = self.make_update_request()
111 mac = self.sign_packet(p, self.key_name)
112 (response, response_p) = self.dns_transaction_udp(p, self.server_ip)
113 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
114 self.verify_packet(response, response_p, mac)
116 # Check the record is around
117 rcode = self.search_record(self.newrecname)
118 self.assert_rcode_equals(rcode, dns.DNS_RCODE_OK)
120 # Now delete the record
121 p = self.make_update_request(delete=True)
122 mac = self.sign_packet(p, self.key_name)
123 (response, response_p) = self.dns_transaction_udp(p, self.server_ip)
124 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
125 self.verify_packet(response, response_p, mac)
127 # check it's gone
128 rcode = self.search_record(self.newrecname)
129 self.assert_rcode_equals(rcode, dns.DNS_RCODE_NXDOMAIN)
131 def test_update_tsig_windows(self):
132 "test DNS update with correct TSIG record (follow Windows pattern)"
134 newrecname = "win" + self.newrecname
135 rr_class = dns.DNS_QCLASS_IN
136 ttl = 1200
138 p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
139 q = self.make_name_question(self.get_dns_domain(),
140 dns.DNS_QTYPE_SOA,
141 dns.DNS_QCLASS_IN)
142 questions = []
143 questions.append(q)
144 self.finish_name_packet(p, questions)
146 updates = []
147 r = dns.res_rec()
148 r.name = newrecname
149 r.rr_type = dns.DNS_QTYPE_A
150 r.rr_class = dns.DNS_QCLASS_ANY
151 r.ttl = 0
152 r.length = 0
153 updates.append(r)
154 r = dns.res_rec()
155 r.name = newrecname
156 r.rr_type = dns.DNS_QTYPE_AAAA
157 r.rr_class = dns.DNS_QCLASS_ANY
158 r.ttl = 0
159 r.length = 0
160 updates.append(r)
161 r = dns.res_rec()
162 r.name = newrecname
163 r.rr_type = dns.DNS_QTYPE_A
164 r.rr_class = rr_class
165 r.ttl = ttl
166 r.length = 0xffff
167 r.rdata = "10.1.45.64"
168 updates.append(r)
169 p.nscount = len(updates)
170 p.nsrecs = updates
172 prereqs = []
173 r = dns.res_rec()
174 r.name = newrecname
175 r.rr_type = dns.DNS_QTYPE_CNAME
176 r.rr_class = dns.DNS_QCLASS_NONE
177 r.ttl = 0
178 r.length = 0
179 prereqs.append(r)
180 p.ancount = len(prereqs)
181 p.answers = prereqs
183 (response, response_p) = self.dns_transaction_udp(p, self.server_ip)
184 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_REFUSED)
186 self.tkey_trans()
187 mac = self.sign_packet(p, self.key_name)
188 (response, response_p) = self.dns_transaction_udp(p, self.server_ip)
189 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
190 self.verify_packet(response, response_p, mac)
192 # Check the record is around
193 rcode = self.search_record(newrecname)
194 self.assert_rcode_equals(rcode, dns.DNS_RCODE_OK)
196 # Now delete the record
197 p = self.make_update_request(delete=True)
198 mac = self.sign_packet(p, self.key_name)
199 (response, response_p) = self.dns_transaction_udp(p, self.server_ip)
200 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
201 self.verify_packet(response, response_p, mac)
203 # check it's gone
204 rcode = self.search_record(self.newrecname)
205 self.assert_rcode_equals(rcode, dns.DNS_RCODE_NXDOMAIN)
208 TestProgram(module=__name__, opts=subunitopts)