1 # Unix SMB/CIFS implementation.
2 # Copyright (C) Kai Blin <kai@samba.org> 2011
4 # This program is free software; you can redistribute it and/or modify
5 # it under the terms of the GNU General Public License as published by
6 # the Free Software Foundation; either version 3 of the License, or
7 # (at your option) any later version.
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
14 # You should have received a copy of the GNU General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 from __future__
import print_function
20 from samba
import dsdb
21 from samba
.ndr
import ndr_unpack
, ndr_pack
22 from samba
.samdb
import SamDB
23 from samba
.auth
import system_session
25 from ldb
import ERR_OPERATIONS_ERROR
30 import samba
.ndr
as ndr
31 from samba
import credentials
32 from samba
.dcerpc
import dns
, dnsp
, dnsserver
33 from samba
.netcmd
.dns
import TXTRecord
, dns_record_match
, data_to_dns_record
34 from samba
.tests
.subunitrun
import SubunitOptions
, TestProgram
35 from samba
import werror
, WERRORError
36 from samba
.tests
.dns_base
import DNSTest
37 import samba
.getopt
as options
41 parser
= optparse
.OptionParser("dns.py <server name> <server ip> [options]")
42 sambaopts
= options
.SambaOptions(parser
)
43 parser
.add_option_group(sambaopts
)
45 # This timeout only has relevance when testing against Windows
46 # Format errors tend to return patchy responses, so a timeout is needed.
47 parser
.add_option("--timeout", type="int", dest
="timeout",
48 help="Specify timeout for DNS requests")
50 # use command line creds if available
51 credopts
= options
.CredentialsOptions(parser
)
52 parser
.add_option_group(credopts
)
53 subunitopts
= SubunitOptions(parser
)
54 parser
.add_option_group(subunitopts
)
56 opts
, args
= parser
.parse_args()
58 lp
= sambaopts
.get_loadparm()
59 creds
= credopts
.get_credentials(lp
)
61 timeout
= opts
.timeout
69 creds
.set_krb_forwardable(credentials
.NO_KRB_FORWARDABLE
)
72 class TestSimpleQueries(DNSTest
):
74 super(TestSimpleQueries
, self
).setUp()
75 global server
, server_ip
, lp
, creds
, timeout
76 self
.server
= server_name
77 self
.server_ip
= server_ip
80 self
.timeout
= timeout
82 def test_one_a_query(self
):
83 "create a query packet containing one query record"
84 p
= self
.make_name_packet(dns
.DNS_OPCODE_QUERY
)
87 name
= "%s.%s" % (self
.server
, self
.get_dns_domain())
88 q
= self
.make_name_question(name
, dns
.DNS_QTYPE_A
, dns
.DNS_QCLASS_IN
)
89 print("asking for ", q
.name
)
92 self
.finish_name_packet(p
, questions
)
93 (response
, response_packet
) =\
94 self
.dns_transaction_udp(p
, host
=server_ip
)
95 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_OK
)
96 self
.assert_dns_opcode_equals(response
, dns
.DNS_OPCODE_QUERY
)
97 self
.assertEquals(response
.ancount
, 1)
98 self
.assertEquals(response
.answers
[0].rdata
,
101 def test_one_SOA_query(self
):
102 "create a query packet containing one query record for the SOA"
103 p
= self
.make_name_packet(dns
.DNS_OPCODE_QUERY
)
106 name
= "%s" % (self
.get_dns_domain())
107 q
= self
.make_name_question(name
, dns
.DNS_QTYPE_SOA
, dns
.DNS_QCLASS_IN
)
108 print("asking for ", q
.name
)
111 self
.finish_name_packet(p
, questions
)
112 (response
, response_packet
) =\
113 self
.dns_transaction_udp(p
, host
=server_ip
)
114 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_OK
)
115 self
.assert_dns_opcode_equals(response
, dns
.DNS_OPCODE_QUERY
)
116 self
.assertEquals(response
.ancount
, 1)
118 response
.answers
[0].rdata
.mname
.upper(),
119 ("%s.%s" % (self
.server
, self
.get_dns_domain())).upper())
121 def test_one_a_query_tcp(self
):
122 "create a query packet containing one query record via TCP"
123 p
= self
.make_name_packet(dns
.DNS_OPCODE_QUERY
)
126 name
= "%s.%s" % (self
.server
, self
.get_dns_domain())
127 q
= self
.make_name_question(name
, dns
.DNS_QTYPE_A
, dns
.DNS_QCLASS_IN
)
128 print("asking for ", q
.name
)
131 self
.finish_name_packet(p
, questions
)
132 (response
, response_packet
) =\
133 self
.dns_transaction_tcp(p
, host
=server_ip
)
134 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_OK
)
135 self
.assert_dns_opcode_equals(response
, dns
.DNS_OPCODE_QUERY
)
136 self
.assertEquals(response
.ancount
, 1)
137 self
.assertEquals(response
.answers
[0].rdata
,
140 def test_one_mx_query(self
):
141 "create a query packet causing an empty RCODE_OK answer"
142 p
= self
.make_name_packet(dns
.DNS_OPCODE_QUERY
)
145 name
= "%s.%s" % (self
.server
, self
.get_dns_domain())
146 q
= self
.make_name_question(name
, dns
.DNS_QTYPE_MX
, dns
.DNS_QCLASS_IN
)
147 print("asking for ", q
.name
)
150 self
.finish_name_packet(p
, questions
)
151 (response
, response_packet
) =\
152 self
.dns_transaction_udp(p
, host
=server_ip
)
153 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_OK
)
154 self
.assert_dns_opcode_equals(response
, dns
.DNS_OPCODE_QUERY
)
155 self
.assertEquals(response
.ancount
, 0)
157 p
= self
.make_name_packet(dns
.DNS_OPCODE_QUERY
)
160 name
= "invalid-%s.%s" % (self
.server
, self
.get_dns_domain())
161 q
= self
.make_name_question(name
, dns
.DNS_QTYPE_MX
, dns
.DNS_QCLASS_IN
)
162 print("asking for ", q
.name
)
165 self
.finish_name_packet(p
, questions
)
166 (response
, response_packet
) =\
167 self
.dns_transaction_udp(p
, host
=server_ip
)
168 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_NXDOMAIN
)
169 self
.assert_dns_opcode_equals(response
, dns
.DNS_OPCODE_QUERY
)
170 self
.assertEquals(response
.ancount
, 0)
172 def test_two_queries(self
):
173 "create a query packet containing two query records"
174 p
= self
.make_name_packet(dns
.DNS_OPCODE_QUERY
)
177 name
= "%s.%s" % (self
.server
, self
.get_dns_domain())
178 q
= self
.make_name_question(name
, dns
.DNS_QTYPE_A
, dns
.DNS_QCLASS_IN
)
181 name
= "%s.%s" % ('bogusname', self
.get_dns_domain())
182 q
= self
.make_name_question(name
, dns
.DNS_QTYPE_A
, dns
.DNS_QCLASS_IN
)
185 self
.finish_name_packet(p
, questions
)
187 (response
, response_packet
) =\
188 self
.dns_transaction_udp(p
, host
=server_ip
)
189 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_FORMERR
)
190 except socket
.timeout
:
191 # Windows chooses not to respond to incorrectly formatted queries.
192 # Although this appears to be non-deterministic even for the same
193 # request twice, it also appears to be based on a how poorly the
194 # request is formatted.
197 def test_qtype_all_query(self
):
198 "create a QTYPE_ALL query"
199 p
= self
.make_name_packet(dns
.DNS_OPCODE_QUERY
)
202 name
= "%s.%s" % (self
.server
, self
.get_dns_domain())
203 q
= self
.make_name_question(name
, dns
.DNS_QTYPE_ALL
, dns
.DNS_QCLASS_IN
)
204 print("asking for ", q
.name
)
207 self
.finish_name_packet(p
, questions
)
208 (response
, response_packet
) =\
209 self
.dns_transaction_udp(p
, host
=server_ip
)
212 dc_ipv6
= os
.getenv('SERVER_IPV6')
213 if dc_ipv6
is not None:
216 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_OK
)
217 self
.assert_dns_opcode_equals(response
, dns
.DNS_OPCODE_QUERY
)
218 self
.assertEquals(response
.ancount
, num_answers
)
219 self
.assertEquals(response
.answers
[0].rdata
,
221 if dc_ipv6
is not None:
222 self
.assertEquals(response
.answers
[1].rdata
, dc_ipv6
)
224 def test_qclass_none_query(self
):
225 "create a QCLASS_NONE query"
226 p
= self
.make_name_packet(dns
.DNS_OPCODE_QUERY
)
229 name
= "%s.%s" % (self
.server
, self
.get_dns_domain())
230 q
= self
.make_name_question(
236 self
.finish_name_packet(p
, questions
)
238 (response
, response_packet
) =\
239 self
.dns_transaction_udp(p
, host
=server_ip
)
240 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_NOTIMP
)
241 except socket
.timeout
:
242 # Windows chooses not to respond to incorrectly formatted queries.
243 # Although this appears to be non-deterministic even for the same
244 # request twice, it also appears to be based on a how poorly the
245 # request is formatted.
248 def test_soa_hostname_query(self
):
249 "create a SOA query for a hostname"
250 p
= self
.make_name_packet(dns
.DNS_OPCODE_QUERY
)
253 name
= "%s.%s" % (self
.server
, self
.get_dns_domain())
254 q
= self
.make_name_question(name
, dns
.DNS_QTYPE_SOA
, dns
.DNS_QCLASS_IN
)
257 self
.finish_name_packet(p
, questions
)
258 (response
, response_packet
) =\
259 self
.dns_transaction_udp(p
, host
=server_ip
)
260 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_OK
)
261 self
.assert_dns_opcode_equals(response
, dns
.DNS_OPCODE_QUERY
)
262 # We don't get SOA records for single hosts
263 self
.assertEquals(response
.ancount
, 0)
264 # But we do respond with an authority section
265 self
.assertEqual(response
.nscount
, 1)
267 def test_soa_unknown_hostname_query(self
):
268 "create a SOA query for an unknown hostname"
269 p
= self
.make_name_packet(dns
.DNS_OPCODE_QUERY
)
272 name
= "foobar.%s" % (self
.get_dns_domain())
273 q
= self
.make_name_question(name
, dns
.DNS_QTYPE_SOA
, dns
.DNS_QCLASS_IN
)
276 self
.finish_name_packet(p
, questions
)
277 (response
, response_packet
) =\
278 self
.dns_transaction_udp(p
, host
=server_ip
)
279 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_NXDOMAIN
)
280 self
.assert_dns_opcode_equals(response
, dns
.DNS_OPCODE_QUERY
)
281 # We don't get SOA records for single hosts
282 self
.assertEquals(response
.ancount
, 0)
283 # But we do respond with an authority section
284 self
.assertEqual(response
.nscount
, 1)
286 def test_soa_domain_query(self
):
287 "create a SOA query for a domain"
288 p
= self
.make_name_packet(dns
.DNS_OPCODE_QUERY
)
291 name
= self
.get_dns_domain()
292 q
= self
.make_name_question(name
, dns
.DNS_QTYPE_SOA
, dns
.DNS_QCLASS_IN
)
295 self
.finish_name_packet(p
, questions
)
296 (response
, response_packet
) =\
297 self
.dns_transaction_udp(p
, host
=server_ip
)
298 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_OK
)
299 self
.assert_dns_opcode_equals(response
, dns
.DNS_OPCODE_QUERY
)
300 self
.assertEquals(response
.ancount
, 1)
301 self
.assertEquals(response
.answers
[0].rdata
.minimum
, 3600)
304 class TestDNSUpdates(DNSTest
):
306 super(TestDNSUpdates
, self
).setUp()
307 global server
, server_ip
, lp
, creds
, timeout
308 self
.server
= server_name
309 self
.server_ip
= server_ip
312 self
.timeout
= timeout
314 def test_two_updates(self
):
315 "create two update requests"
316 p
= self
.make_name_packet(dns
.DNS_OPCODE_UPDATE
)
319 name
= "%s.%s" % (self
.server
, self
.get_dns_domain())
320 u
= self
.make_name_question(name
, dns
.DNS_QTYPE_A
, dns
.DNS_QCLASS_IN
)
323 name
= self
.get_dns_domain()
324 u
= self
.make_name_question(name
, dns
.DNS_QTYPE_A
, dns
.DNS_QCLASS_IN
)
327 self
.finish_name_packet(p
, updates
)
329 (response
, response_packet
) =\
330 self
.dns_transaction_udp(p
, host
=server_ip
)
331 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_FORMERR
)
332 except socket
.timeout
:
333 # Windows chooses not to respond to incorrectly formatted queries.
334 # Although this appears to be non-deterministic even for the same
335 # request twice, it also appears to be based on a how poorly the
336 # request is formatted.
339 def test_update_wrong_qclass(self
):
340 "create update with DNS_QCLASS_NONE"
341 p
= self
.make_name_packet(dns
.DNS_OPCODE_UPDATE
)
344 name
= self
.get_dns_domain()
345 u
= self
.make_name_question(name
, dns
.DNS_QTYPE_A
, dns
.DNS_QCLASS_NONE
)
348 self
.finish_name_packet(p
, updates
)
349 (response
, response_packet
) =\
350 self
.dns_transaction_udp(p
, host
=server_ip
)
351 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_NOTIMP
)
353 def test_update_prereq_with_non_null_ttl(self
):
354 "test update with a non-null TTL"
355 p
= self
.make_name_packet(dns
.DNS_OPCODE_UPDATE
)
358 name
= self
.get_dns_domain()
360 u
= self
.make_name_question(name
, dns
.DNS_QTYPE_SOA
, dns
.DNS_QCLASS_IN
)
362 self
.finish_name_packet(p
, updates
)
366 r
.name
= "%s.%s" % (self
.server
, self
.get_dns_domain())
367 r
.rr_type
= dns
.DNS_QTYPE_TXT
368 r
.rr_class
= dns
.DNS_QCLASS_NONE
373 p
.ancount
= len(prereqs
)
377 (response
, response_packet
) =\
378 self
.dns_transaction_udp(p
, host
=server_ip
)
379 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_FORMERR
)
380 except socket
.timeout
:
381 # Windows chooses not to respond to incorrectly formatted queries.
382 # Although this appears to be non-deterministic even for the same
383 # request twice, it also appears to be based on a how poorly the
384 # request is formatted.
387 def test_update_prereq_with_non_null_length(self
):
388 "test update with a non-null length"
389 p
= self
.make_name_packet(dns
.DNS_OPCODE_UPDATE
)
392 name
= self
.get_dns_domain()
394 u
= self
.make_name_question(name
, dns
.DNS_QTYPE_SOA
, dns
.DNS_QCLASS_IN
)
396 self
.finish_name_packet(p
, updates
)
400 r
.name
= "%s.%s" % (self
.server
, self
.get_dns_domain())
401 r
.rr_type
= dns
.DNS_QTYPE_TXT
402 r
.rr_class
= dns
.DNS_QCLASS_ANY
407 p
.ancount
= len(prereqs
)
410 (response
, response_packet
) =\
411 self
.dns_transaction_udp(p
, host
=server_ip
)
412 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_NXRRSET
)
414 def test_update_prereq_nonexisting_name(self
):
415 "test update with a nonexisting name"
416 p
= self
.make_name_packet(dns
.DNS_OPCODE_UPDATE
)
419 name
= self
.get_dns_domain()
421 u
= self
.make_name_question(name
, dns
.DNS_QTYPE_SOA
, dns
.DNS_QCLASS_IN
)
423 self
.finish_name_packet(p
, updates
)
427 r
.name
= "idontexist.%s" % self
.get_dns_domain()
428 r
.rr_type
= dns
.DNS_QTYPE_TXT
429 r
.rr_class
= dns
.DNS_QCLASS_ANY
434 p
.ancount
= len(prereqs
)
437 (response
, response_packet
) =\
438 self
.dns_transaction_udp(p
, host
=server_ip
)
439 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_NXRRSET
)
441 def test_update_add_txt_record(self
):
442 "test adding records works"
443 prefix
, txt
= 'textrec', ['"This is a test"']
444 p
= self
.make_txt_update(prefix
, txt
)
445 (response
, response_packet
) =\
446 self
.dns_transaction_udp(p
, host
=server_ip
)
447 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_OK
)
448 self
.check_query_txt(prefix
, txt
)
450 def test_delete_record(self
):
451 "Test if deleting records works"
453 NAME
= "deleterec.%s" % self
.get_dns_domain()
455 # First, create a record to make sure we have a record to delete.
456 p
= self
.make_name_packet(dns
.DNS_OPCODE_UPDATE
)
459 name
= self
.get_dns_domain()
461 u
= self
.make_name_question(name
, dns
.DNS_QTYPE_SOA
, dns
.DNS_QCLASS_IN
)
463 self
.finish_name_packet(p
, updates
)
468 r
.rr_type
= dns
.DNS_QTYPE_TXT
469 r
.rr_class
= dns
.DNS_QCLASS_IN
472 rdata
= self
.make_txt_record(['"This is a test"'])
475 p
.nscount
= len(updates
)
478 (response
, response_packet
) =\
479 self
.dns_transaction_udp(p
, host
=server_ip
)
480 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_OK
)
482 # Now check the record is around
483 p
= self
.make_name_packet(dns
.DNS_OPCODE_QUERY
)
485 q
= self
.make_name_question(NAME
, dns
.DNS_QTYPE_TXT
, dns
.DNS_QCLASS_IN
)
488 self
.finish_name_packet(p
, questions
)
489 (response
, response_packet
) =\
490 self
.dns_transaction_udp(p
, host
=server_ip
)
491 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_OK
)
493 # Now delete the record
494 p
= self
.make_name_packet(dns
.DNS_OPCODE_UPDATE
)
497 name
= self
.get_dns_domain()
499 u
= self
.make_name_question(name
, dns
.DNS_QTYPE_SOA
, dns
.DNS_QCLASS_IN
)
501 self
.finish_name_packet(p
, updates
)
506 r
.rr_type
= dns
.DNS_QTYPE_TXT
507 r
.rr_class
= dns
.DNS_QCLASS_NONE
510 rdata
= self
.make_txt_record(['"This is a test"'])
513 p
.nscount
= len(updates
)
516 (response
, response_packet
) =\
517 self
.dns_transaction_udp(p
, host
=server_ip
)
518 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_OK
)
520 # And finally check it's gone
521 p
= self
.make_name_packet(dns
.DNS_OPCODE_QUERY
)
524 q
= self
.make_name_question(NAME
, dns
.DNS_QTYPE_TXT
, dns
.DNS_QCLASS_IN
)
527 self
.finish_name_packet(p
, questions
)
528 (response
, response_packet
) =\
529 self
.dns_transaction_udp(p
, host
=server_ip
)
530 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_NXDOMAIN
)
532 def test_readd_record(self
):
533 "Test if adding, deleting and then readding a records works"
535 NAME
= "readdrec.%s" % self
.get_dns_domain()
538 p
= self
.make_name_packet(dns
.DNS_OPCODE_UPDATE
)
541 name
= self
.get_dns_domain()
543 u
= self
.make_name_question(name
, dns
.DNS_QTYPE_SOA
, dns
.DNS_QCLASS_IN
)
545 self
.finish_name_packet(p
, updates
)
550 r
.rr_type
= dns
.DNS_QTYPE_TXT
551 r
.rr_class
= dns
.DNS_QCLASS_IN
554 rdata
= self
.make_txt_record(['"This is a test"'])
557 p
.nscount
= len(updates
)
560 (response
, response_packet
) =\
561 self
.dns_transaction_udp(p
, host
=server_ip
)
562 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_OK
)
564 # Now check the record is around
565 p
= self
.make_name_packet(dns
.DNS_OPCODE_QUERY
)
567 q
= self
.make_name_question(NAME
, dns
.DNS_QTYPE_TXT
, dns
.DNS_QCLASS_IN
)
570 self
.finish_name_packet(p
, questions
)
571 (response
, response_packet
) =\
572 self
.dns_transaction_udp(p
, host
=server_ip
)
573 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_OK
)
575 # Now delete the record
576 p
= self
.make_name_packet(dns
.DNS_OPCODE_UPDATE
)
579 name
= self
.get_dns_domain()
581 u
= self
.make_name_question(name
, dns
.DNS_QTYPE_SOA
, dns
.DNS_QCLASS_IN
)
583 self
.finish_name_packet(p
, updates
)
588 r
.rr_type
= dns
.DNS_QTYPE_TXT
589 r
.rr_class
= dns
.DNS_QCLASS_NONE
592 rdata
= self
.make_txt_record(['"This is a test"'])
595 p
.nscount
= len(updates
)
598 (response
, response_packet
) =\
599 self
.dns_transaction_udp(p
, host
=server_ip
)
600 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_OK
)
603 p
= self
.make_name_packet(dns
.DNS_OPCODE_QUERY
)
606 q
= self
.make_name_question(NAME
, dns
.DNS_QTYPE_TXT
, dns
.DNS_QCLASS_IN
)
609 self
.finish_name_packet(p
, questions
)
610 (response
, response_packet
) =\
611 self
.dns_transaction_udp(p
, host
=server_ip
)
612 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_NXDOMAIN
)
614 # recreate the record
615 p
= self
.make_name_packet(dns
.DNS_OPCODE_UPDATE
)
618 name
= self
.get_dns_domain()
620 u
= self
.make_name_question(name
, dns
.DNS_QTYPE_SOA
, dns
.DNS_QCLASS_IN
)
622 self
.finish_name_packet(p
, updates
)
627 r
.rr_type
= dns
.DNS_QTYPE_TXT
628 r
.rr_class
= dns
.DNS_QCLASS_IN
631 rdata
= self
.make_txt_record(['"This is a test"'])
634 p
.nscount
= len(updates
)
637 (response
, response_packet
) =\
638 self
.dns_transaction_udp(p
, host
=server_ip
)
639 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_OK
)
641 # Now check the record is around
642 p
= self
.make_name_packet(dns
.DNS_OPCODE_QUERY
)
644 q
= self
.make_name_question(NAME
, dns
.DNS_QTYPE_TXT
, dns
.DNS_QCLASS_IN
)
647 self
.finish_name_packet(p
, questions
)
648 (response
, response_packet
) =\
649 self
.dns_transaction_udp(p
, host
=server_ip
)
650 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_OK
)
652 def test_update_add_mx_record(self
):
653 "test adding MX records works"
654 p
= self
.make_name_packet(dns
.DNS_OPCODE_UPDATE
)
657 name
= self
.get_dns_domain()
659 u
= self
.make_name_question(name
, dns
.DNS_QTYPE_SOA
, dns
.DNS_QCLASS_IN
)
661 self
.finish_name_packet(p
, updates
)
665 r
.name
= "%s" % self
.get_dns_domain()
666 r
.rr_type
= dns
.DNS_QTYPE_MX
667 r
.rr_class
= dns
.DNS_QCLASS_IN
670 rdata
= dns
.mx_record()
671 rdata
.preference
= 10
672 rdata
.exchange
= 'mail.%s' % self
.get_dns_domain()
675 p
.nscount
= len(updates
)
678 (response
, response_packet
) =\
679 self
.dns_transaction_udp(p
, host
=server_ip
)
680 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_OK
)
682 p
= self
.make_name_packet(dns
.DNS_OPCODE_QUERY
)
685 name
= "%s" % self
.get_dns_domain()
686 q
= self
.make_name_question(name
, dns
.DNS_QTYPE_MX
, dns
.DNS_QCLASS_IN
)
689 self
.finish_name_packet(p
, questions
)
690 (response
, response_packet
) =\
691 self
.dns_transaction_udp(p
, host
=server_ip
)
692 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_OK
)
693 self
.assertEqual(response
.ancount
, 1)
694 ans
= response
.answers
[0]
695 self
.assertEqual(ans
.rr_type
, dns
.DNS_QTYPE_MX
)
696 self
.assertEqual(ans
.rdata
.preference
, 10)
697 self
.assertEqual(ans
.rdata
.exchange
, 'mail.%s' % self
.get_dns_domain())
700 class TestComplexQueries(DNSTest
):
701 def make_dns_update(self
, key
, value
, qtype
):
702 p
= self
.make_name_packet(dns
.DNS_OPCODE_UPDATE
)
704 name
= self
.get_dns_domain()
705 u
= self
.make_name_question(name
, dns
.DNS_QTYPE_SOA
, dns
.DNS_QCLASS_IN
)
706 self
.finish_name_packet(p
, [u
])
711 r
.rr_class
= dns
.DNS_QCLASS_IN
717 (response
, response_packet
) =\
718 self
.dns_transaction_udp(p
, host
=server_ip
)
719 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_OK
)
722 super(TestComplexQueries
, self
).setUp()
724 global server
, server_ip
, lp
, creds
, timeout
725 self
.server
= server_name
726 self
.server_ip
= server_ip
729 self
.timeout
= timeout
731 def test_one_a_query(self
):
732 "create a query packet containing one query record"
737 name
= "cname_test.%s" % self
.get_dns_domain()
738 rdata
= "%s.%s" % (self
.server
, self
.get_dns_domain())
739 self
.make_dns_update(name
, rdata
, dns
.DNS_QTYPE_CNAME
)
741 p
= self
.make_name_packet(dns
.DNS_OPCODE_QUERY
)
745 name
= "cname_test.%s" % self
.get_dns_domain()
746 q
= self
.make_name_question(name
,
749 print("asking for ", q
.name
)
752 self
.finish_name_packet(p
, questions
)
753 (response
, response_packet
) =\
754 self
.dns_transaction_udp(p
, host
=self
.server_ip
)
755 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_OK
)
756 self
.assert_dns_opcode_equals(response
, dns
.DNS_OPCODE_QUERY
)
757 self
.assertEquals(response
.ancount
, 2)
758 self
.assertEquals(response
.answers
[0].rr_type
, dns
.DNS_QTYPE_CNAME
)
759 self
.assertEquals(response
.answers
[0].rdata
, "%s.%s" %
760 (self
.server
, self
.get_dns_domain()))
761 self
.assertEquals(response
.answers
[1].rr_type
, dns
.DNS_QTYPE_A
)
762 self
.assertEquals(response
.answers
[1].rdata
,
767 p
= self
.make_name_packet(dns
.DNS_OPCODE_UPDATE
)
770 name
= self
.get_dns_domain()
772 u
= self
.make_name_question(name
,
776 self
.finish_name_packet(p
, updates
)
780 r
.name
= "cname_test.%s" % self
.get_dns_domain()
781 r
.rr_type
= dns
.DNS_QTYPE_CNAME
782 r
.rr_class
= dns
.DNS_QCLASS_NONE
785 r
.rdata
= "%s.%s" % (self
.server
, self
.get_dns_domain())
787 p
.nscount
= len(updates
)
790 (response
, response_packet
) =\
791 self
.dns_transaction_udp(p
, host
=self
.server_ip
)
792 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_OK
)
794 def test_cname_two_chain(self
):
795 name0
= "cnamechain0.%s" % self
.get_dns_domain()
796 name1
= "cnamechain1.%s" % self
.get_dns_domain()
797 name2
= "cnamechain2.%s" % self
.get_dns_domain()
798 self
.make_dns_update(name1
, name2
, dns
.DNS_QTYPE_CNAME
)
799 self
.make_dns_update(name2
, name0
, dns
.DNS_QTYPE_CNAME
)
800 self
.make_dns_update(name0
, server_ip
, dns
.DNS_QTYPE_A
)
802 p
= self
.make_name_packet(dns
.DNS_OPCODE_QUERY
)
804 q
= self
.make_name_question(name1
, dns
.DNS_QTYPE_A
,
808 self
.finish_name_packet(p
, questions
)
809 (response
, response_packet
) =\
810 self
.dns_transaction_udp(p
, host
=server_ip
)
811 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_OK
)
812 self
.assert_dns_opcode_equals(response
, dns
.DNS_OPCODE_QUERY
)
813 self
.assertEquals(response
.ancount
, 3)
815 self
.assertEquals(response
.answers
[0].rr_type
, dns
.DNS_QTYPE_CNAME
)
816 self
.assertEquals(response
.answers
[0].name
, name1
)
817 self
.assertEquals(response
.answers
[0].rdata
, name2
)
819 self
.assertEquals(response
.answers
[1].rr_type
, dns
.DNS_QTYPE_CNAME
)
820 self
.assertEquals(response
.answers
[1].name
, name2
)
821 self
.assertEquals(response
.answers
[1].rdata
, name0
)
823 self
.assertEquals(response
.answers
[2].rr_type
, dns
.DNS_QTYPE_A
)
824 self
.assertEquals(response
.answers
[2].rdata
,
827 def test_invalid_empty_cname(self
):
828 name0
= "cnamedotprefix0.%s" % self
.get_dns_domain()
830 self
.make_dns_update(name0
, "", dns
.DNS_QTYPE_CNAME
)
831 except AssertionError:
834 self
.fail("Successfully added empty CNAME, which is invalid.")
836 def test_cname_two_chain_not_matching_qtype(self
):
837 name0
= "cnamechain0.%s" % self
.get_dns_domain()
838 name1
= "cnamechain1.%s" % self
.get_dns_domain()
839 name2
= "cnamechain2.%s" % self
.get_dns_domain()
840 self
.make_dns_update(name1
, name2
, dns
.DNS_QTYPE_CNAME
)
841 self
.make_dns_update(name2
, name0
, dns
.DNS_QTYPE_CNAME
)
842 self
.make_dns_update(name0
, server_ip
, dns
.DNS_QTYPE_A
)
844 p
= self
.make_name_packet(dns
.DNS_OPCODE_QUERY
)
846 q
= self
.make_name_question(name1
, dns
.DNS_QTYPE_TXT
,
850 self
.finish_name_packet(p
, questions
)
851 (response
, response_packet
) =\
852 self
.dns_transaction_udp(p
, host
=server_ip
)
853 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_OK
)
854 self
.assert_dns_opcode_equals(response
, dns
.DNS_OPCODE_QUERY
)
856 # CNAME should return all intermediate results!
857 # Only the A records exists, not the TXT.
858 self
.assertEquals(response
.ancount
, 2)
860 self
.assertEquals(response
.answers
[0].rr_type
, dns
.DNS_QTYPE_CNAME
)
861 self
.assertEquals(response
.answers
[0].name
, name1
)
862 self
.assertEquals(response
.answers
[0].rdata
, name2
)
864 self
.assertEquals(response
.answers
[1].rr_type
, dns
.DNS_QTYPE_CNAME
)
865 self
.assertEquals(response
.answers
[1].name
, name2
)
866 self
.assertEquals(response
.answers
[1].rdata
, name0
)
868 def test_cname_loop(self
):
869 cname1
= "cnamelooptestrec." + self
.get_dns_domain()
870 cname2
= "cnamelooptestrec2." + self
.get_dns_domain()
871 cname3
= "cnamelooptestrec3." + self
.get_dns_domain()
872 self
.make_dns_update(cname1
, cname2
, dnsp
.DNS_TYPE_CNAME
)
873 self
.make_dns_update(cname2
, cname3
, dnsp
.DNS_TYPE_CNAME
)
874 self
.make_dns_update(cname3
, cname1
, dnsp
.DNS_TYPE_CNAME
)
876 p
= self
.make_name_packet(dns
.DNS_OPCODE_QUERY
)
879 q
= self
.make_name_question(cname1
,
883 self
.finish_name_packet(p
, questions
)
885 (response
, response_packet
) =\
886 self
.dns_transaction_udp(p
, host
=self
.server_ip
)
888 max_recursion_depth
= 20
889 self
.assertEquals(len(response
.answers
), max_recursion_depth
)
891 # Make sure cname limit doesn't count other records. This is a generic
892 # test called in tests below
893 def max_rec_test(self
, rtype
, rec_gen
):
894 name
= "limittestrec{0}.{1}".format(rtype
, self
.get_dns_domain())
896 num_recs_to_enter
= limit
+ 5
898 for i
in range(1, num_recs_to_enter
+1):
900 self
.make_dns_update(name
, ip
, rtype
)
902 p
= self
.make_name_packet(dns
.DNS_OPCODE_QUERY
)
905 q
= self
.make_name_question(name
,
909 self
.finish_name_packet(p
, questions
)
911 (response
, response_packet
) =\
912 self
.dns_transaction_udp(p
, host
=self
.server_ip
)
914 self
.assertEqual(len(response
.answers
), num_recs_to_enter
)
916 def test_record_limit_A(self
):
918 return "127.0.0." + str(i
)
919 self
.max_rec_test(rtype
=dns
.DNS_QTYPE_A
, rec_gen
=ip4_gen
)
921 def test_record_limit_AAAA(self
):
923 return "AAAA:0:0:0:0:0:0:" + str(i
)
924 self
.max_rec_test(rtype
=dns
.DNS_QTYPE_AAAA
, rec_gen
=ip6_gen
)
926 def test_record_limit_SRV(self
):
928 rec
= dns
.srv_record()
932 rec
.target
= "srvtestrec" + str(i
)
934 self
.max_rec_test(rtype
=dns
.DNS_QTYPE_SRV
, rec_gen
=srv_gen
)
936 # Same as test_record_limit_A but with a preceding CNAME follow
937 def test_cname_limit(self
):
938 cname1
= "cnamelimittestrec." + self
.get_dns_domain()
939 cname2
= "cnamelimittestrec2." + self
.get_dns_domain()
940 cname3
= "cnamelimittestrec3." + self
.get_dns_domain()
941 ip_prefix
= '127.0.0.'
943 num_recs_to_enter
= limit
+ 5
945 self
.make_dns_update(cname1
, cname2
, dnsp
.DNS_TYPE_CNAME
)
946 self
.make_dns_update(cname2
, cname3
, dnsp
.DNS_TYPE_CNAME
)
947 num_arecs_to_enter
= num_recs_to_enter
- 2
948 for i
in range(1, num_arecs_to_enter
+1):
949 ip
= ip_prefix
+ str(i
)
950 self
.make_dns_update(cname3
, ip
, dns
.DNS_QTYPE_A
)
952 p
= self
.make_name_packet(dns
.DNS_OPCODE_QUERY
)
955 q
= self
.make_name_question(cname1
,
959 self
.finish_name_packet(p
, questions
)
961 (response
, response_packet
) =\
962 self
.dns_transaction_udp(p
, host
=self
.server_ip
)
964 self
.assertEqual(len(response
.answers
), num_recs_to_enter
)
966 # ANY query on cname record shouldn't follow the link
967 def test_cname_any_query(self
):
968 cname1
= "cnameanytestrec." + self
.get_dns_domain()
969 cname2
= "cnameanytestrec2." + self
.get_dns_domain()
970 cname3
= "cnameanytestrec3." + self
.get_dns_domain()
972 self
.make_dns_update(cname1
, cname2
, dnsp
.DNS_TYPE_CNAME
)
973 self
.make_dns_update(cname2
, cname3
, dnsp
.DNS_TYPE_CNAME
)
975 p
= self
.make_name_packet(dns
.DNS_OPCODE_QUERY
)
978 q
= self
.make_name_question(cname1
,
982 self
.finish_name_packet(p
, questions
)
984 (response
, response_packet
) =\
985 self
.dns_transaction_udp(p
, host
=self
.server_ip
)
987 self
.assertEqual(len(response
.answers
), 1)
988 self
.assertEqual(response
.answers
[0].name
, cname1
)
989 self
.assertEqual(response
.answers
[0].rdata
, cname2
)
992 class TestInvalidQueries(DNSTest
):
994 super(TestInvalidQueries
, self
).setUp()
995 global server
, server_ip
, lp
, creds
, timeout
996 self
.server
= server_name
997 self
.server_ip
= server_ip
1000 self
.timeout
= timeout
1002 def test_one_a_query(self
):
1003 """send 0 bytes follows by create a query packet
1004 containing one query record"""
1008 s
= socket
.socket(socket
.AF_INET
, socket
.SOCK_DGRAM
, 0)
1009 s
.connect((self
.server_ip
, 53))
1015 p
= self
.make_name_packet(dns
.DNS_OPCODE_QUERY
)
1018 name
= "%s.%s" % (self
.server
, self
.get_dns_domain())
1019 q
= self
.make_name_question(name
, dns
.DNS_QTYPE_A
, dns
.DNS_QCLASS_IN
)
1020 print("asking for ", q
.name
)
1023 self
.finish_name_packet(p
, questions
)
1024 (response
, response_packet
) =\
1025 self
.dns_transaction_udp(p
, host
=self
.server_ip
)
1026 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_OK
)
1027 self
.assert_dns_opcode_equals(response
, dns
.DNS_OPCODE_QUERY
)
1028 self
.assertEquals(response
.ancount
, 1)
1029 self
.assertEquals(response
.answers
[0].rdata
,
1032 def test_one_a_reply(self
):
1033 "send a reply instead of a query"
1036 p
= self
.make_name_packet(dns
.DNS_OPCODE_QUERY
)
1039 name
= "%s.%s" % ('fakefakefake', self
.get_dns_domain())
1040 q
= self
.make_name_question(name
, dns
.DNS_QTYPE_A
, dns
.DNS_QCLASS_IN
)
1041 print("asking for ", q
.name
)
1044 self
.finish_name_packet(p
, questions
)
1045 p
.operation |
= dns
.DNS_FLAG_REPLY
1048 send_packet
= ndr
.ndr_pack(p
)
1049 s
= socket
.socket(socket
.AF_INET
, socket
.SOCK_STREAM
, 0)
1050 s
.settimeout(timeout
)
1051 host
= self
.server_ip
1052 s
.connect((host
, 53))
1053 tcp_packet
= struct
.pack('!H', len(send_packet
))
1054 tcp_packet
+= send_packet
1055 s
.send(tcp_packet
, 0)
1056 recv_packet
= s
.recv(0xffff + 2, 0)
1057 self
.assertEquals(0, len(recv_packet
))
1058 except socket
.timeout
:
1059 # Windows chooses not to respond to incorrectly formatted queries.
1060 # Although this appears to be non-deterministic even for the same
1061 # request twice, it also appears to be based on a how poorly the
1062 # request is formatted.
1069 class TestZones(DNSTest
):
1071 super(TestZones
, self
).setUp()
1072 global server
, server_ip
, lp
, creds
, timeout
1073 self
.server
= server_name
1074 self
.server_ip
= server_ip
1077 self
.timeout
= timeout
1079 self
.zone
= "test.lan"
1080 self
.rpc_conn
= dnsserver
.dnsserver("ncacn_ip_tcp:%s[sign]" %
1082 self
.lp
, self
.creds
)
1084 self
.samdb
= SamDB(url
="ldap://" + self
.server_ip
,
1085 lp
=self
.get_loadparm(),
1086 session_info
=system_session(),
1087 credentials
=self
.creds
)
1088 self
.zone_dn
= "DC=" + self
.zone
+\
1089 ",CN=MicrosoftDNS,DC=DomainDNSZones," +\
1090 str(self
.samdb
.get_default_basedn())
1093 super(TestZones
, self
).tearDown()
1096 self
.delete_zone(self
.zone
)
1097 except RuntimeError as e
:
1098 (num
, string
) = e
.args
1099 if num
!= werror
.WERR_DNS_ERROR_ZONE_DOES_NOT_EXIST
:
1102 def make_zone_obj(self
, zone
, aging_enabled
=False):
1103 zone_create
= dnsserver
.DNS_RPC_ZONE_CREATE_INFO_LONGHORN()
1104 zone_create
.pszZoneName
= zone
1105 zone_create
.dwZoneType
= dnsp
.DNS_ZONE_TYPE_PRIMARY
1106 zone_create
.fAging
= int(aging_enabled
)
1107 zone_create
.dwDpFlags
= dnsserver
.DNS_DP_DOMAIN_DEFAULT
1108 zone_create
.fDsIntegrated
= 1
1109 zone_create
.fLoadExisting
= 1
1110 zone_create
.fAllowUpdate
= dnsp
.DNS_ZONE_UPDATE_UNSECURE
1113 def create_zone(self
, zone
, aging_enabled
=False):
1114 zone_create
= self
.make_zone_obj(zone
, aging_enabled
)
1116 client_version
= dnsserver
.DNS_CLIENT_VERSION_LONGHORN
1117 self
.rpc_conn
.DnssrvOperation2(client_version
,
1123 dnsserver
.DNSSRV_TYPEID_ZONE_CREATE
,
1125 except WERRORError
as e
:
1128 def set_params(self
, **kwargs
):
1129 zone
= kwargs
.pop('zone', None)
1130 for key
, val
in kwargs
.items():
1131 name_param
= dnsserver
.DNS_RPC_NAME_AND_PARAM()
1132 name_param
.dwParam
= val
1133 name_param
.pszNodeName
= key
1135 client_version
= dnsserver
.DNS_CLIENT_VERSION_LONGHORN
1136 nap_type
= dnsserver
.DNSSRV_TYPEID_NAME_AND_PARAM
1138 self
.rpc_conn
.DnssrvOperation2(client_version
,
1143 'ResetDwordProperty',
1146 except WERRORError
as e
:
1149 def ldap_modify_dnsrecs(self
, name
, func
):
1150 dn
= 'DC={0},{1}'.format(name
, self
.zone_dn
)
1151 dns_recs
= self
.ldap_get_dns_records(name
)
1152 for rec
in dns_recs
:
1154 update_dict
= {'dn': dn
, 'dnsRecord': [ndr_pack(r
) for r
in dns_recs
]}
1155 self
.samdb
.modify(ldb
.Message
.from_dict(self
.samdb
,
1157 ldb
.FLAG_MOD_REPLACE
))
1159 def dns_update_record(self
, prefix
, txt
):
1160 p
= self
.make_txt_update(prefix
, txt
, self
.zone
)
1161 (code
, response
) = self
.dns_transaction_udp(p
, host
=self
.server_ip
)
1162 self
.assert_dns_rcode_equals(code
, dns
.DNS_RCODE_OK
)
1163 recs
= self
.ldap_get_dns_records(prefix
)
1164 recs
= [r
for r
in recs
if r
.data
.str == txt
]
1165 self
.assertEqual(len(recs
), 1)
1168 def dns_tombstone(self
, prefix
, txt
, zone
):
1169 name
= prefix
+ "." + zone
1171 to
= dnsp
.DnssrvRpcRecord()
1172 to
.dwTimeStamp
= 1000
1173 to
.wType
= dnsp
.DNS_TYPE_TOMBSTONE
1175 self
.samdb
.dns_replace(name
, [to
])
1177 def ldap_get_records(self
, name
):
1178 # The use of SCOPE_SUBTREE here avoids raising an exception in the
1179 # 0 results case for a test below.
1181 expr
= "(&(objectClass=dnsNode)(name={0}))".format(name
)
1182 return self
.samdb
.search(base
=self
.zone_dn
, scope
=ldb
.SCOPE_SUBTREE
,
1183 expression
=expr
, attrs
=["*"])
1185 def ldap_get_dns_records(self
, name
):
1186 records
= self
.ldap_get_records(name
)
1187 return [ndr_unpack(dnsp
.DnssrvRpcRecord
, r
)
1188 for r
in records
[0].get('dnsRecord')]
1190 def ldap_get_zone_settings(self
):
1191 records
= self
.samdb
.search(base
=self
.zone_dn
, scope
=ldb
.SCOPE_BASE
,
1192 expression
="(&(objectClass=dnsZone)" +
1193 "(name={0}))".format(self
.zone
),
1194 attrs
=["dNSProperty"])
1195 self
.assertEqual(len(records
), 1)
1196 props
= [ndr_unpack(dnsp
.DnsProperty
, r
)
1197 for r
in records
[0].get('dNSProperty')]
1199 # We have no choice but to repeat these here.
1200 zone_prop_ids
= {0x00: "EMPTY",
1202 0x02: "ALLOW_UPDATE",
1203 0x08: "SECURE_TIME",
1204 0x10: "NOREFRESH_INTERVAL",
1205 0x11: "SCAVENGING_SERVERS",
1206 0x12: "AGING_ENABLED_TIME",
1207 0x20: "REFRESH_INTERVAL",
1208 0x40: "AGING_STATE",
1209 0x80: "DELETED_FROM_HOSTNAME",
1210 0x81: "MASTER_SERVERS",
1211 0x82: "AUTO_NS_SERVERS",
1212 0x83: "DCPROMO_CONVERT",
1213 0x90: "SCAVENGING_SERVERS_DA",
1214 0x91: "MASTER_SERVERS_DA",
1215 0x92: "NS_SERVERS_DA",
1216 0x100: "NODE_DBFLAGS"}
1217 return {zone_prop_ids
[p
.id].lower(): p
.data
for p
in props
}
1219 def set_aging(self
, enable
=False):
1220 self
.create_zone(self
.zone
, aging_enabled
=enable
)
1221 self
.set_params(NoRefreshInterval
=1, RefreshInterval
=1,
1222 Aging
=int(bool(enable
)), zone
=self
.zone
,
1223 AllowUpdate
=dnsp
.DNS_ZONE_UPDATE_UNSECURE
)
1225 def test_set_aging(self
, enable
=True, name
='agingtest', txt
=['test txt']):
1226 self
.set_aging(enable
=True)
1227 settings
= self
.ldap_get_zone_settings()
1228 self
.assertTrue(settings
['aging_state'] is not None)
1229 self
.assertTrue(settings
['aging_state'])
1231 rec
= self
.dns_update_record('agingtest', ['test txt'])
1232 self
.assertNotEqual(rec
.dwTimeStamp
, 0)
1234 def test_set_aging_disabled(self
):
1235 self
.set_aging(enable
=False)
1236 settings
= self
.ldap_get_zone_settings()
1237 self
.assertTrue(settings
['aging_state'] is not None)
1238 self
.assertFalse(settings
['aging_state'])
1240 rec
= self
.dns_update_record('agingtest', ['test txt'])
1241 self
.assertNotEqual(rec
.dwTimeStamp
, 0)
1243 def test_aging_update(self
, enable
=True):
1244 name
, txt
= 'agingtest', ['test txt']
1245 self
.set_aging(enable
=True)
1246 before_mod
= self
.dns_update_record(name
, txt
)
1248 self
.set_params(zone
=self
.zone
, Aging
=0)
1252 self
.assertTrue(rec
.dwTimeStamp
> 0)
1253 rec
.dwTimeStamp
-= dec
1254 self
.ldap_modify_dnsrecs(name
, mod_ts
)
1255 after_mod
= self
.ldap_get_dns_records(name
)
1256 self
.assertEqual(len(after_mod
), 1)
1257 after_mod
= after_mod
[0]
1258 self
.assertEqual(after_mod
.dwTimeStamp
,
1259 before_mod
.dwTimeStamp
- dec
)
1260 after_update
= self
.dns_update_record(name
, txt
)
1261 after_should_equal
= before_mod
if enable
else after_mod
1262 self
.assertEqual(after_should_equal
.dwTimeStamp
,
1263 after_update
.dwTimeStamp
)
1265 def test_aging_update_disabled(self
):
1266 self
.test_aging_update(enable
=False)
1268 def test_aging_refresh(self
):
1269 name
, txt
= 'agingtest', ['test txt']
1270 self
.create_zone(self
.zone
, aging_enabled
=True)
1272 self
.set_params(NoRefreshInterval
=interval
, RefreshInterval
=interval
,
1273 Aging
=1, zone
=self
.zone
,
1274 AllowUpdate
=dnsp
.DNS_ZONE_UPDATE_UNSECURE
)
1275 before_mod
= self
.dns_update_record(name
, txt
)
1278 self
.assertTrue(rec
.dwTimeStamp
> 0)
1279 rec
.dwTimeStamp
-= interval
// 2
1280 self
.ldap_modify_dnsrecs(name
, mod_ts
)
1281 update_during_norefresh
= self
.dns_update_record(name
, txt
)
1284 self
.assertTrue(rec
.dwTimeStamp
> 0)
1285 rec
.dwTimeStamp
-= interval
+ interval
// 2
1286 self
.ldap_modify_dnsrecs(name
, mod_ts
)
1287 update_during_refresh
= self
.dns_update_record(name
, txt
)
1288 self
.assertEqual(update_during_norefresh
.dwTimeStamp
,
1289 before_mod
.dwTimeStamp
- interval
/ 2)
1290 self
.assertEqual(update_during_refresh
.dwTimeStamp
,
1291 before_mod
.dwTimeStamp
)
1293 def test_rpc_add_no_timestamp(self
):
1294 name
, txt
= 'agingtest', ['test txt']
1295 self
.set_aging(enable
=True)
1296 rec_buf
= dnsserver
.DNS_RPC_RECORD_BUF()
1297 rec_buf
.rec
= TXTRecord(txt
)
1298 self
.rpc_conn
.DnssrvUpdateRecord2(
1299 dnsserver
.DNS_CLIENT_VERSION_LONGHORN
,
1306 recs
= self
.ldap_get_dns_records(name
)
1307 self
.assertEqual(len(recs
), 1)
1308 self
.assertEqual(recs
[0].dwTimeStamp
, 0)
1310 def test_static_record_dynamic_update(self
):
1311 name
, txt
= 'agingtest', ['test txt']
1312 txt2
= ['test txt2']
1313 self
.set_aging(enable
=True)
1314 rec_buf
= dnsserver
.DNS_RPC_RECORD_BUF()
1315 rec_buf
.rec
= TXTRecord(txt
)
1316 self
.rpc_conn
.DnssrvUpdateRecord2(
1317 dnsserver
.DNS_CLIENT_VERSION_LONGHORN
,
1325 rec2
= self
.dns_update_record(name
, txt2
)
1326 self
.assertEqual(rec2
.dwTimeStamp
, 0)
1328 def test_dynamic_record_static_update(self
):
1329 name
, txt
= 'agingtest', ['test txt']
1330 txt2
= ['test txt2']
1331 txt3
= ['test txt3']
1332 self
.set_aging(enable
=True)
1334 self
.dns_update_record(name
, txt
)
1336 rec_buf
= dnsserver
.DNS_RPC_RECORD_BUF()
1337 rec_buf
.rec
= TXTRecord(txt2
)
1338 self
.rpc_conn
.DnssrvUpdateRecord2(
1339 dnsserver
.DNS_CLIENT_VERSION_LONGHORN
,
1347 self
.dns_update_record(name
, txt3
)
1349 recs
= self
.ldap_get_dns_records(name
)
1350 # Put in dict because ldap recs might be out of order
1351 recs
= {str(r
.data
.str): r
for r
in recs
}
1352 self
.assertNotEqual(recs
[str(txt
)].dwTimeStamp
, 0)
1353 self
.assertEqual(recs
[str(txt2
)].dwTimeStamp
, 0)
1354 self
.assertEqual(recs
[str(txt3
)].dwTimeStamp
, 0)
1356 def test_dns_tombstone_custom_match_rule(self
):
1357 lp
= self
.get_loadparm()
1358 self
.samdb
= SamDB(url
=lp
.samdb_url(), lp
=lp
,
1359 session_info
=system_session(),
1360 credentials
=self
.creds
)
1362 name
, txt
= 'agingtest', ['test txt']
1363 name2
, txt2
= 'agingtest2', ['test txt2']
1364 name3
, txt3
= 'agingtest3', ['test txt3']
1365 name4
, txt4
= 'agingtest4', ['test txt4']
1366 name5
, txt5
= 'agingtest5', ['test txt5']
1368 self
.create_zone(self
.zone
, aging_enabled
=True)
1370 self
.set_params(NoRefreshInterval
=interval
, RefreshInterval
=interval
,
1371 Aging
=1, zone
=self
.zone
,
1372 AllowUpdate
=dnsp
.DNS_ZONE_UPDATE_UNSECURE
)
1374 self
.dns_update_record(name
, txt
)
1376 self
.dns_update_record(name2
, txt
)
1377 self
.dns_update_record(name2
, txt2
)
1379 self
.dns_update_record(name3
, txt
)
1380 self
.dns_update_record(name3
, txt2
)
1381 last_update
= self
.dns_update_record(name3
, txt3
)
1383 # Modify txt1 of the first 2 names
1385 if rec
.data
.str == txt
:
1386 rec
.dwTimeStamp
-= 2
1387 self
.ldap_modify_dnsrecs(name
, mod_ts
)
1388 self
.ldap_modify_dnsrecs(name2
, mod_ts
)
1390 # create a static dns record.
1391 rec_buf
= dnsserver
.DNS_RPC_RECORD_BUF()
1392 rec_buf
.rec
= TXTRecord(txt4
)
1393 self
.rpc_conn
.DnssrvUpdateRecord2(
1394 dnsserver
.DNS_CLIENT_VERSION_LONGHORN
,
1402 # Create a tomb stoned record.
1403 self
.dns_update_record(name5
, txt5
)
1404 self
.dns_tombstone(name5
, txt5
, self
.zone
)
1406 self
.ldap_get_dns_records(name3
)
1407 expr
= "(dnsRecord:1.3.6.1.4.1.7165.4.5.3:={0})"
1408 expr
= expr
.format(int(last_update
.dwTimeStamp
) - 1)
1410 res
= self
.samdb
.search(base
=self
.zone_dn
, scope
=ldb
.SCOPE_SUBTREE
,
1411 expression
=expr
, attrs
=["*"])
1412 except ldb
.LdbError
as e
:
1414 updated_names
= {str(r
.get('name')) for r
in res
}
1415 self
.assertEqual(updated_names
, set([name
, name2
]))
1417 def test_dns_tombstone_custom_match_rule_no_records(self
):
1418 lp
= self
.get_loadparm()
1419 self
.samdb
= SamDB(url
=lp
.samdb_url(), lp
=lp
,
1420 session_info
=system_session(),
1421 credentials
=self
.creds
)
1423 self
.create_zone(self
.zone
, aging_enabled
=True)
1425 self
.set_params(NoRefreshInterval
=interval
, RefreshInterval
=interval
,
1426 Aging
=1, zone
=self
.zone
,
1427 AllowUpdate
=dnsp
.DNS_ZONE_UPDATE_UNSECURE
)
1429 expr
= "(dnsRecord:1.3.6.1.4.1.7165.4.5.3:={0})"
1430 expr
= expr
.format(1)
1433 res
= self
.samdb
.search(base
=self
.zone_dn
, scope
=ldb
.SCOPE_SUBTREE
,
1434 expression
=expr
, attrs
=["*"])
1435 except ldb
.LdbError
as e
:
1437 self
.assertEqual(0, len(res
))
1439 def test_dns_tombstone_custom_match_rule_fail(self
):
1440 self
.create_zone(self
.zone
, aging_enabled
=True)
1441 samdb
= SamDB(url
=lp
.samdb_url(),
1443 session_info
=system_session(),
1444 credentials
=self
.creds
)
1446 # Property name in not dnsRecord
1447 expr
= "(dnsProperty:1.3.6.1.4.1.7165.4.5.3:=1)"
1448 res
= samdb
.search(base
=self
.zone_dn
, scope
=ldb
.SCOPE_SUBTREE
,
1449 expression
=expr
, attrs
=["*"])
1450 self
.assertEquals(len(res
), 0)
1452 # No value for tombstone time
1454 expr
= "(dnsRecord:1.3.6.1.4.1.7165.4.5.3:=)"
1455 res
= samdb
.search(base
=self
.zone_dn
, scope
=ldb
.SCOPE_SUBTREE
,
1456 expression
=expr
, attrs
=["*"])
1457 self
.assertEquals(len(res
), 0)
1458 self
.fail("Exception: ldb.ldbError not generated")
1459 except ldb
.LdbError
as e
:
1461 self
.assertEquals(num
, ERR_OPERATIONS_ERROR
)
1463 # Tombstone time = -
1465 expr
= "(dnsRecord:1.3.6.1.4.1.7165.4.5.3:=-)"
1466 res
= samdb
.search(base
=self
.zone_dn
, scope
=ldb
.SCOPE_SUBTREE
,
1467 expression
=expr
, attrs
=["*"])
1468 self
.assertEquals(len(res
), 0)
1469 self
.fail("Exception: ldb.ldbError not generated")
1470 except ldb
.LdbError
as e
:
1472 self
.assertEquals(num
, ERR_OPERATIONS_ERROR
)
1474 # Tombstone time longer than 64 characters
1476 expr
= "(dnsRecord:1.3.6.1.4.1.7165.4.5.3:={0})"
1477 expr
= expr
.format("1" * 65)
1478 res
= samdb
.search(base
=self
.zone_dn
, scope
=ldb
.SCOPE_SUBTREE
,
1479 expression
=expr
, attrs
=["*"])
1480 self
.assertEquals(len(res
), 0)
1481 self
.fail("Exception: ldb.ldbError not generated")
1482 except ldb
.LdbError
as e
:
1484 self
.assertEquals(num
, ERR_OPERATIONS_ERROR
)
1486 # Non numeric Tombstone time
1488 expr
= "(dnsRecord:1.3.6.1.4.1.7165.4.5.3:=expired)"
1489 res
= samdb
.search(base
=self
.zone_dn
, scope
=ldb
.SCOPE_SUBTREE
,
1490 expression
=expr
, attrs
=["*"])
1491 self
.assertEquals(len(res
), 0)
1492 self
.fail("Exception: ldb.ldbError not generated")
1493 except ldb
.LdbError
as e
:
1495 self
.assertEquals(num
, ERR_OPERATIONS_ERROR
)
1497 # Non system session
1499 db
= SamDB(url
="ldap://" + self
.server_ip
,
1500 lp
=self
.get_loadparm(),
1501 credentials
=self
.creds
)
1503 expr
= "(dnsRecord:1.3.6.1.4.1.7165.4.5.3:=2)"
1504 res
= db
.search(base
=self
.zone_dn
, scope
=ldb
.SCOPE_SUBTREE
,
1505 expression
=expr
, attrs
=["*"])
1506 self
.assertEquals(len(res
), 0)
1507 self
.fail("Exception: ldb.ldbError not generated")
1508 except ldb
.LdbError
as e
:
1510 self
.assertEquals(num
, ERR_OPERATIONS_ERROR
)
1512 def test_basic_scavenging(self
):
1513 lp
= self
.get_loadparm()
1514 self
.samdb
= SamDB(url
=lp
.samdb_url(), lp
=lp
,
1515 session_info
=system_session(),
1516 credentials
=self
.creds
)
1518 self
.create_zone(self
.zone
, aging_enabled
=True)
1520 self
.set_params(NoRefreshInterval
=interval
, RefreshInterval
=interval
,
1521 zone
=self
.zone
, Aging
=1,
1522 AllowUpdate
=dnsp
.DNS_ZONE_UPDATE_UNSECURE
)
1523 name
, txt
= 'agingtest', ['test txt']
1524 name2
, txt2
= 'agingtest2', ['test txt2']
1525 name3
, txt3
= 'agingtest3', ['test txt3']
1526 name4
, txt4
= 'agingtest4', ['test txt4']
1527 name5
, txt5
= 'agingtest5', ['test txt5']
1528 self
.dns_update_record(name
, txt
)
1529 self
.dns_update_record(name2
, txt
)
1530 self
.dns_update_record(name2
, txt2
)
1531 self
.dns_update_record(name3
, txt
)
1532 self
.dns_update_record(name3
, txt2
)
1534 # Create a tomb stoned record.
1535 self
.dns_update_record(name4
, txt4
)
1536 self
.dns_tombstone(name4
, txt4
, self
.zone
)
1537 records
= self
.ldap_get_records(name4
)
1538 self
.assertTrue("dNSTombstoned" in records
[0])
1539 self
.assertEqual(records
[0]["dNSTombstoned"][0], b
"TRUE")
1541 # Create an un-tombstoned record, with dnsTombstoned: FALSE
1542 self
.dns_update_record(name5
, txt5
)
1543 self
.dns_tombstone(name5
, txt5
, self
.zone
)
1544 self
.dns_update_record(name5
, txt5
)
1545 records
= self
.ldap_get_records(name5
)
1546 self
.assertTrue("dNSTombstoned" in records
[0])
1547 self
.assertEqual(records
[0]["dNSTombstoned"][0], b
"FALSE")
1549 last_add
= self
.dns_update_record(name3
, txt3
)
1552 self
.assertTrue(rec
.dwTimeStamp
> 0)
1553 if rec
.data
.str == txt
:
1554 rec
.dwTimeStamp
-= interval
* 5
1556 def mod_ts_all(rec
):
1557 rec
.dwTimeStamp
-= interval
* 5
1558 self
.ldap_modify_dnsrecs(name
, mod_ts
)
1559 self
.ldap_modify_dnsrecs(name2
, mod_ts
)
1560 self
.ldap_modify_dnsrecs(name3
, mod_ts
)
1561 self
.ldap_modify_dnsrecs(name5
, mod_ts_all
)
1562 self
.assertTrue(callable(getattr(dsdb
, '_scavenge_dns_records', None)))
1563 dsdb
._scavenge
_dns
_records
(self
.samdb
)
1565 recs
= self
.ldap_get_dns_records(name
)
1566 self
.assertEqual(len(recs
), 1)
1567 self
.assertEqual(recs
[0].wType
, dnsp
.DNS_TYPE_TOMBSTONE
)
1568 records
= self
.ldap_get_records(name
)
1569 self
.assertTrue("dNSTombstoned" in records
[0])
1570 self
.assertEqual(records
[0]["dNSTombstoned"][0], b
"TRUE")
1572 recs
= self
.ldap_get_dns_records(name2
)
1573 self
.assertEqual(len(recs
), 1)
1574 self
.assertEqual(recs
[0].wType
, dnsp
.DNS_TYPE_TXT
)
1575 self
.assertEqual(recs
[0].data
.str, txt2
)
1577 recs
= self
.ldap_get_dns_records(name3
)
1578 self
.assertEqual(len(recs
), 2)
1579 txts
= {str(r
.data
.str) for r
in recs
}
1580 self
.assertEqual(txts
, {str(txt2
), str(txt3
)})
1581 self
.assertEqual(recs
[0].wType
, dnsp
.DNS_TYPE_TXT
)
1582 self
.assertEqual(recs
[1].wType
, dnsp
.DNS_TYPE_TXT
)
1584 recs
= self
.ldap_get_dns_records(name4
)
1585 self
.assertEqual(len(recs
), 1)
1586 self
.assertEqual(recs
[0].wType
, dnsp
.DNS_TYPE_TOMBSTONE
)
1587 records
= self
.ldap_get_records(name4
)
1588 self
.assertTrue("dNSTombstoned" in records
[0])
1589 self
.assertEqual(records
[0]["dNSTombstoned"][0], b
"TRUE")
1591 recs
= self
.ldap_get_dns_records(name5
)
1592 self
.assertEqual(len(recs
), 1)
1593 self
.assertEqual(recs
[0].wType
, dnsp
.DNS_TYPE_TOMBSTONE
)
1594 records
= self
.ldap_get_records(name5
)
1595 self
.assertTrue("dNSTombstoned" in records
[0])
1596 self
.assertEqual(records
[0]["dNSTombstoned"][0], b
"TRUE")
1598 for make_it_work
in [False, True]:
1599 inc
= -1 if make_it_work
else 1
1602 rec
.data
= (last_add
.dwTimeStamp
- 24 * 14) + inc
1603 self
.ldap_modify_dnsrecs(name
, mod_ts
)
1604 dsdb
._dns
_delete
_tombstones
(self
.samdb
)
1605 recs
= self
.ldap_get_records(name
)
1607 self
.assertEqual(len(recs
), 0)
1609 self
.assertEqual(len(recs
), 1)
1611 def test_fully_qualified_zone(self
):
1613 def create_zone_expect_exists(zone
):
1615 zone_create
= self
.make_zone_obj(zone
)
1616 client_version
= dnsserver
.DNS_CLIENT_VERSION_LONGHORN
1617 zc_type
= dnsserver
.DNSSRV_TYPEID_ZONE_CREATE
1618 self
.rpc_conn
.DnssrvOperation2(client_version
,
1626 except WERRORError
as e
:
1628 if enum
!= werror
.WERR_DNS_ERROR_ZONE_ALREADY_EXISTS
:
1631 self
.fail("Zone {} should already exist".format(zone
))
1633 # Create unqualified, then check creating qualified fails.
1634 self
.create_zone(self
.zone
)
1635 create_zone_expect_exists(self
.zone
+ '.')
1637 # Same again, but the other way around.
1638 self
.create_zone(self
.zone
+ '2.')
1639 create_zone_expect_exists(self
.zone
+ '2')
1641 client_version
= dnsserver
.DNS_CLIENT_VERSION_LONGHORN
1642 request_filter
= dnsserver
.DNS_ZONE_REQUEST_PRIMARY
1643 tid
= dnsserver
.DNSSRV_TYPEID_DWORD
1644 typeid
, res
= self
.rpc_conn
.DnssrvComplexOperation2(client_version
,
1652 self
.delete_zone(self
.zone
)
1653 self
.delete_zone(self
.zone
+ '2')
1655 # Two zones should've been created, neither of them fully qualified.
1656 zones_we_just_made
= []
1657 zones
= [str(z
.pszZoneName
) for z
in res
.ZoneArray
]
1659 if zone
.startswith(self
.zone
):
1660 zones_we_just_made
.append(zone
)
1661 self
.assertEqual(len(zones_we_just_made
), 2)
1662 self
.assertEqual(set(zones_we_just_made
), {self
.zone
+ '2', self
.zone
})
1664 def delete_zone(self
, zone
):
1665 self
.rpc_conn
.DnssrvOperation2(dnsserver
.DNS_CLIENT_VERSION_LONGHORN
,
1671 dnsserver
.DNSSRV_TYPEID_NULL
,
1674 def test_soa_query(self
):
1676 p
= self
.make_name_packet(dns
.DNS_OPCODE_QUERY
)
1679 q
= self
.make_name_question(zone
, dns
.DNS_QTYPE_SOA
, dns
.DNS_QCLASS_IN
)
1681 self
.finish_name_packet(p
, questions
)
1683 (response
, response_packet
) =\
1684 self
.dns_transaction_udp(p
, host
=server_ip
)
1685 # Windows returns OK while BIND logically seems to return NXDOMAIN
1686 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_NXDOMAIN
)
1687 self
.assert_dns_opcode_equals(response
, dns
.DNS_OPCODE_QUERY
)
1688 self
.assertEquals(response
.ancount
, 0)
1690 self
.create_zone(zone
)
1691 (response
, response_packet
) =\
1692 self
.dns_transaction_udp(p
, host
=server_ip
)
1693 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_OK
)
1694 self
.assert_dns_opcode_equals(response
, dns
.DNS_OPCODE_QUERY
)
1695 self
.assertEquals(response
.ancount
, 1)
1696 self
.assertEquals(response
.answers
[0].rr_type
, dns
.DNS_QTYPE_SOA
)
1698 self
.delete_zone(zone
)
1699 (response
, response_packet
) =\
1700 self
.dns_transaction_udp(p
, host
=server_ip
)
1701 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_NXDOMAIN
)
1702 self
.assert_dns_opcode_equals(response
, dns
.DNS_OPCODE_QUERY
)
1703 self
.assertEquals(response
.ancount
, 0)
1706 class TestRPCRoundtrip(DNSTest
):
1708 super(TestRPCRoundtrip
, self
).setUp()
1709 global server
, server_ip
, lp
, creds
1710 self
.server
= server_name
1711 self
.server_ip
= server_ip
1714 self
.rpc_conn
= dnsserver
.dnsserver("ncacn_ip_tcp:%s[sign]" %
1720 super(TestRPCRoundtrip
, self
).tearDown()
1722 def rpc_update(self
, fqn
=None, data
=None, wType
=None, delete
=False):
1723 fqn
= fqn
or ("rpctestrec." + self
.get_dns_domain())
1725 rec
= data_to_dns_record(wType
, data
)
1726 add_rec_buf
= dnsserver
.DNS_RPC_RECORD_BUF()
1727 add_rec_buf
.rec
= rec
1729 add_arg
= add_rec_buf
1733 del_arg
= add_rec_buf
1735 self
.rpc_conn
.DnssrvUpdateRecord2(
1736 dnsserver
.DNS_CLIENT_VERSION_LONGHORN
,
1739 self
.get_dns_domain(),
1744 def test_rpc_self_referencing_cname(self
):
1745 cname
= "cnametest2_unqual_rec_loop"
1746 cname_fqn
= "%s.%s" % (cname
, self
.get_dns_domain())
1749 self
.rpc_update(fqn
=cname
, data
=cname_fqn
,
1750 wType
=dnsp
.DNS_TYPE_CNAME
, delete
=True)
1751 except WERRORError
as e
:
1752 if e
.args
[0] != werror
.WERR_DNS_ERROR_RECORD_DOES_NOT_EXIST
:
1753 self
.fail("RPC DNS gaven wrong error on pre-test cleanup "
1754 "for self referencing CNAME: %s" % e
.args
[0])
1757 self
.rpc_update(fqn
=cname
, wType
=dnsp
.DNS_TYPE_CNAME
, data
=cname_fqn
)
1758 except WERRORError
as e
:
1759 if e
.args
[0] != werror
.WERR_DNS_ERROR_CNAME_LOOP
:
1760 self
.fail("RPC DNS gaven wrong error on insertion of "
1761 "self referencing CNAME: %s" % e
.args
[0])
1764 self
.fail("RPC DNS allowed insertion of self referencing CNAME")
1766 def test_update_add_txt_rpc_to_dns(self
):
1767 prefix
, txt
= 'rpctextrec', ['"This is a test"']
1769 name
= "%s.%s" % (prefix
, self
.get_dns_domain())
1771 rec
= data_to_dns_record(dnsp
.DNS_TYPE_TXT
, '"\\"This is a test\\""')
1772 add_rec_buf
= dnsserver
.DNS_RPC_RECORD_BUF()
1773 add_rec_buf
.rec
= rec
1775 self
.rpc_conn
.DnssrvUpdateRecord2(
1776 dnsserver
.DNS_CLIENT_VERSION_LONGHORN
,
1779 self
.get_dns_domain(),
1784 except WERRORError
as e
:
1788 self
.check_query_txt(prefix
, txt
)
1790 self
.rpc_conn
.DnssrvUpdateRecord2(
1791 dnsserver
.DNS_CLIENT_VERSION_LONGHORN
,
1794 self
.get_dns_domain(),
1799 def test_update_add_null_padded_txt_record(self
):
1800 "test adding records works"
1801 prefix
, txt
= 'pad1textrec', ['"This is a test"', '', '']
1802 p
= self
.make_txt_update(prefix
, txt
)
1803 (response
, response_packet
) =\
1804 self
.dns_transaction_udp(p
, host
=server_ip
)
1805 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_OK
)
1806 self
.check_query_txt(prefix
, txt
)
1807 self
.assertIsNotNone(
1808 dns_record_match(self
.rpc_conn
,
1810 self
.get_dns_domain(),
1811 "%s.%s" % (prefix
, self
.get_dns_domain()),
1813 '"\\"This is a test\\"" "" ""'))
1815 prefix
, txt
= 'pad2textrec', ['"This is a test"', '', '', 'more text']
1816 p
= self
.make_txt_update(prefix
, txt
)
1817 (response
, response_packet
) =\
1818 self
.dns_transaction_udp(p
, host
=server_ip
)
1819 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_OK
)
1820 self
.check_query_txt(prefix
, txt
)
1821 self
.assertIsNotNone(
1825 self
.get_dns_domain(),
1826 "%s.%s" % (prefix
, self
.get_dns_domain()),
1828 '"\\"This is a test\\"" "" "" "more text"'))
1830 prefix
, txt
= 'pad3textrec', ['', '', '"This is a test"']
1831 p
= self
.make_txt_update(prefix
, txt
)
1832 (response
, response_packet
) =\
1833 self
.dns_transaction_udp(p
, host
=server_ip
)
1834 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_OK
)
1835 self
.check_query_txt(prefix
, txt
)
1836 self
.assertIsNotNone(
1840 self
.get_dns_domain(),
1841 "%s.%s" % (prefix
, self
.get_dns_domain()),
1843 '"" "" "\\"This is a test\\""'))
1845 def test_update_add_padding_rpc_to_dns(self
):
1846 prefix
, txt
= 'pad1textrec', ['"This is a test"', '', '']
1847 prefix
= 'rpc' + prefix
1848 name
= "%s.%s" % (prefix
, self
.get_dns_domain())
1850 rec
= data_to_dns_record(dnsp
.DNS_TYPE_TXT
,
1851 '"\\"This is a test\\"" "" ""')
1852 add_rec_buf
= dnsserver
.DNS_RPC_RECORD_BUF()
1853 add_rec_buf
.rec
= rec
1855 self
.rpc_conn
.DnssrvUpdateRecord2(
1856 dnsserver
.DNS_CLIENT_VERSION_LONGHORN
,
1859 self
.get_dns_domain(),
1864 except WERRORError
as e
:
1868 self
.check_query_txt(prefix
, txt
)
1870 self
.rpc_conn
.DnssrvUpdateRecord2(
1871 dnsserver
.DNS_CLIENT_VERSION_LONGHORN
,
1874 self
.get_dns_domain(),
1879 prefix
, txt
= 'pad2textrec', ['"This is a test"', '', '', 'more text']
1880 prefix
= 'rpc' + prefix
1881 name
= "%s.%s" % (prefix
, self
.get_dns_domain())
1883 rec
= data_to_dns_record(dnsp
.DNS_TYPE_TXT
,
1884 '"\\"This is a test\\"" "" "" "more text"')
1885 add_rec_buf
= dnsserver
.DNS_RPC_RECORD_BUF()
1886 add_rec_buf
.rec
= rec
1888 self
.rpc_conn
.DnssrvUpdateRecord2(
1889 dnsserver
.DNS_CLIENT_VERSION_LONGHORN
,
1892 self
.get_dns_domain(),
1897 except WERRORError
as e
:
1901 self
.check_query_txt(prefix
, txt
)
1903 self
.rpc_conn
.DnssrvUpdateRecord2(
1904 dnsserver
.DNS_CLIENT_VERSION_LONGHORN
,
1907 self
.get_dns_domain(),
1912 prefix
, txt
= 'pad3textrec', ['', '', '"This is a test"']
1913 prefix
= 'rpc' + prefix
1914 name
= "%s.%s" % (prefix
, self
.get_dns_domain())
1916 rec
= data_to_dns_record(dnsp
.DNS_TYPE_TXT
,
1917 '"" "" "\\"This is a test\\""')
1918 add_rec_buf
= dnsserver
.DNS_RPC_RECORD_BUF()
1919 add_rec_buf
.rec
= rec
1921 self
.rpc_conn
.DnssrvUpdateRecord2(
1922 dnsserver
.DNS_CLIENT_VERSION_LONGHORN
,
1925 self
.get_dns_domain(),
1929 except WERRORError
as e
:
1933 self
.check_query_txt(prefix
, txt
)
1935 self
.rpc_conn
.DnssrvUpdateRecord2(
1936 dnsserver
.DNS_CLIENT_VERSION_LONGHORN
,
1939 self
.get_dns_domain(),
1944 # Test is incomplete due to strlen against txt records
1945 def test_update_add_null_char_txt_record(self
):
1946 "test adding records works"
1947 prefix
, txt
= 'nulltextrec', ['NULL\x00BYTE']
1948 p
= self
.make_txt_update(prefix
, txt
)
1949 (response
, response_packet
) =\
1950 self
.dns_transaction_udp(p
, host
=server_ip
)
1951 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_OK
)
1952 self
.check_query_txt(prefix
, ['NULL'])
1953 self
.assertIsNotNone(dns_record_match(self
.rpc_conn
, self
.server_ip
,
1954 self
.get_dns_domain(),
1955 "%s.%s" % (prefix
, self
.get_dns_domain()),
1956 dnsp
.DNS_TYPE_TXT
, '"NULL"'))
1958 prefix
, txt
= 'nulltextrec2', ['NULL\x00BYTE', 'NULL\x00BYTE']
1959 p
= self
.make_txt_update(prefix
, txt
)
1960 (response
, response_packet
) =\
1961 self
.dns_transaction_udp(p
, host
=server_ip
)
1962 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_OK
)
1963 self
.check_query_txt(prefix
, ['NULL', 'NULL'])
1964 self
.assertIsNotNone(dns_record_match(self
.rpc_conn
, self
.server_ip
,
1965 self
.get_dns_domain(),
1966 "%s.%s" % (prefix
, self
.get_dns_domain()),
1967 dnsp
.DNS_TYPE_TXT
, '"NULL" "NULL"'))
1969 def test_update_add_null_char_rpc_to_dns(self
):
1970 prefix
= 'rpcnulltextrec'
1971 name
= "%s.%s" % (prefix
, self
.get_dns_domain())
1973 rec
= data_to_dns_record(dnsp
.DNS_TYPE_TXT
, '"NULL\x00BYTE"')
1974 add_rec_buf
= dnsserver
.DNS_RPC_RECORD_BUF()
1975 add_rec_buf
.rec
= rec
1977 self
.rpc_conn
.DnssrvUpdateRecord2(
1978 dnsserver
.DNS_CLIENT_VERSION_LONGHORN
,
1981 self
.get_dns_domain(),
1986 except WERRORError
as e
:
1990 self
.check_query_txt(prefix
, ['NULL'])
1992 self
.rpc_conn
.DnssrvUpdateRecord2(
1993 dnsserver
.DNS_CLIENT_VERSION_LONGHORN
,
1996 self
.get_dns_domain(),
2001 def test_update_add_hex_char_txt_record(self
):
2002 "test adding records works"
2003 prefix
, txt
= 'hextextrec', ['HIGH\xFFBYTE']
2004 p
= self
.make_txt_update(prefix
, txt
)
2005 (response
, response_packet
) =\
2006 self
.dns_transaction_udp(p
, host
=server_ip
)
2007 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_OK
)
2008 self
.check_query_txt(prefix
, txt
)
2009 self
.assertIsNotNone(dns_record_match(self
.rpc_conn
, self
.server_ip
,
2010 self
.get_dns_domain(),
2011 "%s.%s" % (prefix
, self
.get_dns_domain()),
2012 dnsp
.DNS_TYPE_TXT
, '"HIGH\xFFBYTE"'))
2014 def test_update_add_hex_rpc_to_dns(self
):
2015 prefix
, txt
= 'hextextrec', ['HIGH\xFFBYTE']
2016 prefix
= 'rpc' + prefix
2017 name
= "%s.%s" % (prefix
, self
.get_dns_domain())
2019 rec
= data_to_dns_record(dnsp
.DNS_TYPE_TXT
, '"HIGH\xFFBYTE"')
2020 add_rec_buf
= dnsserver
.DNS_RPC_RECORD_BUF()
2021 add_rec_buf
.rec
= rec
2023 self
.rpc_conn
.DnssrvUpdateRecord2(
2024 dnsserver
.DNS_CLIENT_VERSION_LONGHORN
,
2027 self
.get_dns_domain(),
2032 except WERRORError
as e
:
2036 self
.check_query_txt(prefix
, txt
)
2038 self
.rpc_conn
.DnssrvUpdateRecord2(
2039 dnsserver
.DNS_CLIENT_VERSION_LONGHORN
,
2042 self
.get_dns_domain(),
2047 def test_update_add_slash_txt_record(self
):
2048 "test adding records works"
2049 prefix
, txt
= 'slashtextrec', ['Th\\=is=is a test']
2050 p
= self
.make_txt_update(prefix
, txt
)
2051 (response
, response_packet
) =\
2052 self
.dns_transaction_udp(p
, host
=server_ip
)
2053 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_OK
)
2054 self
.check_query_txt(prefix
, txt
)
2055 self
.assertIsNotNone(dns_record_match(self
.rpc_conn
, self
.server_ip
,
2056 self
.get_dns_domain(),
2057 "%s.%s" % (prefix
, self
.get_dns_domain()),
2058 dnsp
.DNS_TYPE_TXT
, '"Th\\\\=is=is a test"'))
2060 # This test fails against Windows as it eliminates slashes in RPC
2061 # One typical use for a slash is in records like 'var=value' to
2062 # escape '=' characters.
2063 def test_update_add_slash_rpc_to_dns(self
):
2064 prefix
, txt
= 'slashtextrec', ['Th\\=is=is a test']
2065 prefix
= 'rpc' + prefix
2066 name
= "%s.%s" % (prefix
, self
.get_dns_domain())
2068 rec
= data_to_dns_record(dnsp
.DNS_TYPE_TXT
, '"Th\\\\=is=is a test"')
2069 add_rec_buf
= dnsserver
.DNS_RPC_RECORD_BUF()
2070 add_rec_buf
.rec
= rec
2072 self
.rpc_conn
.DnssrvUpdateRecord2(
2073 dnsserver
.DNS_CLIENT_VERSION_LONGHORN
,
2076 self
.get_dns_domain(),
2081 except WERRORError
as e
:
2085 self
.check_query_txt(prefix
, txt
)
2088 self
.rpc_conn
.DnssrvUpdateRecord2(
2089 dnsserver
.DNS_CLIENT_VERSION_LONGHORN
,
2092 self
.get_dns_domain(),
2097 def test_update_add_two_txt_records(self
):
2098 "test adding two txt records works"
2099 prefix
, txt
= 'textrec2', ['"This is a test"',
2100 '"and this is a test, too"']
2101 p
= self
.make_txt_update(prefix
, txt
)
2102 (response
, response_packet
) =\
2103 self
.dns_transaction_udp(p
, host
=server_ip
)
2104 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_OK
)
2105 self
.check_query_txt(prefix
, txt
)
2106 self
.assertIsNotNone(dns_record_match(self
.rpc_conn
, self
.server_ip
,
2107 self
.get_dns_domain(),
2108 "%s.%s" % (prefix
, self
.get_dns_domain()),
2109 dnsp
.DNS_TYPE_TXT
, '"\\"This is a test\\""' +
2110 ' "\\"and this is a test, too\\""'))
2112 def test_update_add_two_rpc_to_dns(self
):
2113 prefix
, txt
= 'textrec2', ['"This is a test"',
2114 '"and this is a test, too"']
2115 prefix
= 'rpc' + prefix
2116 name
= "%s.%s" % (prefix
, self
.get_dns_domain())
2118 rec
= data_to_dns_record(dnsp
.DNS_TYPE_TXT
,
2119 '"\\"This is a test\\""' +
2120 ' "\\"and this is a test, too\\""')
2121 add_rec_buf
= dnsserver
.DNS_RPC_RECORD_BUF()
2122 add_rec_buf
.rec
= rec
2124 self
.rpc_conn
.DnssrvUpdateRecord2(
2125 dnsserver
.DNS_CLIENT_VERSION_LONGHORN
,
2128 self
.get_dns_domain(),
2133 except WERRORError
as e
:
2137 self
.check_query_txt(prefix
, txt
)
2139 self
.rpc_conn
.DnssrvUpdateRecord2(
2140 dnsserver
.DNS_CLIENT_VERSION_LONGHORN
,
2143 self
.get_dns_domain(),
2148 def test_update_add_empty_txt_records(self
):
2149 "test adding two txt records works"
2150 prefix
, txt
= 'emptytextrec', []
2151 p
= self
.make_txt_update(prefix
, txt
)
2152 (response
, response_packet
) =\
2153 self
.dns_transaction_udp(p
, host
=server_ip
)
2154 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_OK
)
2155 self
.check_query_txt(prefix
, txt
)
2156 self
.assertIsNotNone(dns_record_match(self
.rpc_conn
, self
.server_ip
,
2157 self
.get_dns_domain(),
2158 "%s.%s" % (prefix
, self
.get_dns_domain()),
2159 dnsp
.DNS_TYPE_TXT
, ''))
2161 def test_update_add_empty_rpc_to_dns(self
):
2162 prefix
, txt
= 'rpcemptytextrec', []
2164 name
= "%s.%s" % (prefix
, self
.get_dns_domain())
2166 rec
= data_to_dns_record(dnsp
.DNS_TYPE_TXT
, '')
2167 add_rec_buf
= dnsserver
.DNS_RPC_RECORD_BUF()
2168 add_rec_buf
.rec
= rec
2170 self
.rpc_conn
.DnssrvUpdateRecord2(
2171 dnsserver
.DNS_CLIENT_VERSION_LONGHORN
,
2174 self
.get_dns_domain(),
2178 except WERRORError
as e
:
2182 self
.check_query_txt(prefix
, txt
)
2184 self
.rpc_conn
.DnssrvUpdateRecord2(
2185 dnsserver
.DNS_CLIENT_VERSION_LONGHORN
,
2188 self
.get_dns_domain(),
2194 TestProgram(module
=__name__
, opts
=subunitopts
)