1 # Unix SMB/CIFS implementation.
2 # Copyright (C) Kai Blin <kai@samba.org> 2011
4 # This program is free software; you can redistribute it and/or modify
5 # it under the terms of the GNU General Public License as published by
6 # the Free Software Foundation; either version 3 of the License, or
7 # (at your option) any later version.
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
14 # You should have received a copy of the GNU General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 from __future__
import print_function
20 from samba
import dsdb
21 from samba
.ndr
import ndr_unpack
, ndr_pack
22 from samba
.samdb
import SamDB
23 from samba
.auth
import system_session
25 from ldb
import ERR_OPERATIONS_ERROR
30 import samba
.ndr
as ndr
31 from samba
import credentials
32 from samba
.dcerpc
import dns
, dnsp
, dnsserver
33 from samba
.netcmd
.dns
import TXTRecord
, dns_record_match
, data_to_dns_record
34 from samba
.tests
.subunitrun
import SubunitOptions
, TestProgram
35 from samba
import werror
, WERRORError
36 from samba
.tests
.dns_base
import DNSTest
37 import samba
.getopt
as options
41 parser
= optparse
.OptionParser("dns.py <server name> <server ip> [options]")
42 sambaopts
= options
.SambaOptions(parser
)
43 parser
.add_option_group(sambaopts
)
45 # This timeout only has relevance when testing against Windows
46 # Format errors tend to return patchy responses, so a timeout is needed.
47 parser
.add_option("--timeout", type="int", dest
="timeout",
48 help="Specify timeout for DNS requests")
50 # use command line creds if available
51 credopts
= options
.CredentialsOptions(parser
)
52 parser
.add_option_group(credopts
)
53 subunitopts
= SubunitOptions(parser
)
54 parser
.add_option_group(subunitopts
)
56 opts
, args
= parser
.parse_args()
58 lp
= sambaopts
.get_loadparm()
59 creds
= credopts
.get_credentials(lp
)
61 timeout
= opts
.timeout
69 creds
.set_krb_forwardable(credentials
.NO_KRB_FORWARDABLE
)
72 class TestSimpleQueries(DNSTest
):
74 super(TestSimpleQueries
, self
).setUp()
75 global server
, server_ip
, lp
, creds
, timeout
76 self
.server
= server_name
77 self
.server_ip
= server_ip
80 self
.timeout
= timeout
82 def test_one_a_query(self
):
83 "create a query packet containing one query record"
84 p
= self
.make_name_packet(dns
.DNS_OPCODE_QUERY
)
87 name
= "%s.%s" % (self
.server
, self
.get_dns_domain())
88 q
= self
.make_name_question(name
, dns
.DNS_QTYPE_A
, dns
.DNS_QCLASS_IN
)
89 print("asking for ", q
.name
)
92 self
.finish_name_packet(p
, questions
)
93 (response
, response_packet
) =\
94 self
.dns_transaction_udp(p
, host
=server_ip
)
95 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_OK
)
96 self
.assert_dns_opcode_equals(response
, dns
.DNS_OPCODE_QUERY
)
97 self
.assertEquals(response
.ancount
, 1)
98 self
.assertEquals(response
.answers
[0].rdata
,
101 def test_one_SOA_query(self
):
102 "create a query packet containing one query record for the SOA"
103 p
= self
.make_name_packet(dns
.DNS_OPCODE_QUERY
)
106 name
= "%s" % (self
.get_dns_domain())
107 q
= self
.make_name_question(name
, dns
.DNS_QTYPE_SOA
, dns
.DNS_QCLASS_IN
)
108 print("asking for ", q
.name
)
111 self
.finish_name_packet(p
, questions
)
112 (response
, response_packet
) =\
113 self
.dns_transaction_udp(p
, host
=server_ip
)
114 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_OK
)
115 self
.assert_dns_opcode_equals(response
, dns
.DNS_OPCODE_QUERY
)
116 self
.assertEquals(response
.ancount
, 1)
118 response
.answers
[0].rdata
.mname
.upper(),
119 ("%s.%s" % (self
.server
, self
.get_dns_domain())).upper())
121 def test_one_a_query_tcp(self
):
122 "create a query packet containing one query record via TCP"
123 p
= self
.make_name_packet(dns
.DNS_OPCODE_QUERY
)
126 name
= "%s.%s" % (self
.server
, self
.get_dns_domain())
127 q
= self
.make_name_question(name
, dns
.DNS_QTYPE_A
, dns
.DNS_QCLASS_IN
)
128 print("asking for ", q
.name
)
131 self
.finish_name_packet(p
, questions
)
132 (response
, response_packet
) =\
133 self
.dns_transaction_tcp(p
, host
=server_ip
)
134 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_OK
)
135 self
.assert_dns_opcode_equals(response
, dns
.DNS_OPCODE_QUERY
)
136 self
.assertEquals(response
.ancount
, 1)
137 self
.assertEquals(response
.answers
[0].rdata
,
140 def test_one_mx_query(self
):
141 "create a query packet causing an empty RCODE_OK answer"
142 p
= self
.make_name_packet(dns
.DNS_OPCODE_QUERY
)
145 name
= "%s.%s" % (self
.server
, self
.get_dns_domain())
146 q
= self
.make_name_question(name
, dns
.DNS_QTYPE_MX
, dns
.DNS_QCLASS_IN
)
147 print("asking for ", q
.name
)
150 self
.finish_name_packet(p
, questions
)
151 (response
, response_packet
) =\
152 self
.dns_transaction_udp(p
, host
=server_ip
)
153 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_OK
)
154 self
.assert_dns_opcode_equals(response
, dns
.DNS_OPCODE_QUERY
)
155 self
.assertEquals(response
.ancount
, 0)
157 p
= self
.make_name_packet(dns
.DNS_OPCODE_QUERY
)
160 name
= "invalid-%s.%s" % (self
.server
, self
.get_dns_domain())
161 q
= self
.make_name_question(name
, dns
.DNS_QTYPE_MX
, dns
.DNS_QCLASS_IN
)
162 print("asking for ", q
.name
)
165 self
.finish_name_packet(p
, questions
)
166 (response
, response_packet
) =\
167 self
.dns_transaction_udp(p
, host
=server_ip
)
168 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_NXDOMAIN
)
169 self
.assert_dns_opcode_equals(response
, dns
.DNS_OPCODE_QUERY
)
170 self
.assertEquals(response
.ancount
, 0)
172 def test_two_queries(self
):
173 "create a query packet containing two query records"
174 p
= self
.make_name_packet(dns
.DNS_OPCODE_QUERY
)
177 name
= "%s.%s" % (self
.server
, self
.get_dns_domain())
178 q
= self
.make_name_question(name
, dns
.DNS_QTYPE_A
, dns
.DNS_QCLASS_IN
)
181 name
= "%s.%s" % ('bogusname', self
.get_dns_domain())
182 q
= self
.make_name_question(name
, dns
.DNS_QTYPE_A
, dns
.DNS_QCLASS_IN
)
185 self
.finish_name_packet(p
, questions
)
187 (response
, response_packet
) =\
188 self
.dns_transaction_udp(p
, host
=server_ip
)
189 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_FORMERR
)
190 except socket
.timeout
:
191 # Windows chooses not to respond to incorrectly formatted queries.
192 # Although this appears to be non-deterministic even for the same
193 # request twice, it also appears to be based on a how poorly the
194 # request is formatted.
197 def test_qtype_all_query(self
):
198 "create a QTYPE_ALL query"
199 p
= self
.make_name_packet(dns
.DNS_OPCODE_QUERY
)
202 name
= "%s.%s" % (self
.server
, self
.get_dns_domain())
203 q
= self
.make_name_question(name
, dns
.DNS_QTYPE_ALL
, dns
.DNS_QCLASS_IN
)
204 print("asking for ", q
.name
)
207 self
.finish_name_packet(p
, questions
)
208 (response
, response_packet
) =\
209 self
.dns_transaction_udp(p
, host
=server_ip
)
212 dc_ipv6
= os
.getenv('SERVER_IPV6')
213 if dc_ipv6
is not None:
216 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_OK
)
217 self
.assert_dns_opcode_equals(response
, dns
.DNS_OPCODE_QUERY
)
218 self
.assertEquals(response
.ancount
, num_answers
)
219 self
.assertEquals(response
.answers
[0].rdata
,
221 if dc_ipv6
is not None:
222 self
.assertEquals(response
.answers
[1].rdata
, dc_ipv6
)
224 def test_qclass_none_query(self
):
225 "create a QCLASS_NONE query"
226 p
= self
.make_name_packet(dns
.DNS_OPCODE_QUERY
)
229 name
= "%s.%s" % (self
.server
, self
.get_dns_domain())
230 q
= self
.make_name_question(
236 self
.finish_name_packet(p
, questions
)
238 (response
, response_packet
) =\
239 self
.dns_transaction_udp(p
, host
=server_ip
)
240 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_NOTIMP
)
241 except socket
.timeout
:
242 # Windows chooses not to respond to incorrectly formatted queries.
243 # Although this appears to be non-deterministic even for the same
244 # request twice, it also appears to be based on a how poorly the
245 # request is formatted.
248 def test_soa_hostname_query(self
):
249 "create a SOA query for a hostname"
250 p
= self
.make_name_packet(dns
.DNS_OPCODE_QUERY
)
253 name
= "%s.%s" % (self
.server
, self
.get_dns_domain())
254 q
= self
.make_name_question(name
, dns
.DNS_QTYPE_SOA
, dns
.DNS_QCLASS_IN
)
257 self
.finish_name_packet(p
, questions
)
258 (response
, response_packet
) =\
259 self
.dns_transaction_udp(p
, host
=server_ip
)
260 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_OK
)
261 self
.assert_dns_opcode_equals(response
, dns
.DNS_OPCODE_QUERY
)
262 # We don't get SOA records for single hosts
263 self
.assertEquals(response
.ancount
, 0)
264 # But we do respond with an authority section
265 self
.assertEqual(response
.nscount
, 1)
267 def test_soa_domain_query(self
):
268 "create a SOA query for a domain"
269 p
= self
.make_name_packet(dns
.DNS_OPCODE_QUERY
)
272 name
= self
.get_dns_domain()
273 q
= self
.make_name_question(name
, dns
.DNS_QTYPE_SOA
, dns
.DNS_QCLASS_IN
)
276 self
.finish_name_packet(p
, questions
)
277 (response
, response_packet
) =\
278 self
.dns_transaction_udp(p
, host
=server_ip
)
279 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_OK
)
280 self
.assert_dns_opcode_equals(response
, dns
.DNS_OPCODE_QUERY
)
281 self
.assertEquals(response
.ancount
, 1)
282 self
.assertEquals(response
.answers
[0].rdata
.minimum
, 3600)
285 class TestDNSUpdates(DNSTest
):
287 super(TestDNSUpdates
, self
).setUp()
288 global server
, server_ip
, lp
, creds
, timeout
289 self
.server
= server_name
290 self
.server_ip
= server_ip
293 self
.timeout
= timeout
295 def test_two_updates(self
):
296 "create two update requests"
297 p
= self
.make_name_packet(dns
.DNS_OPCODE_UPDATE
)
300 name
= "%s.%s" % (self
.server
, self
.get_dns_domain())
301 u
= self
.make_name_question(name
, dns
.DNS_QTYPE_A
, dns
.DNS_QCLASS_IN
)
304 name
= self
.get_dns_domain()
305 u
= self
.make_name_question(name
, dns
.DNS_QTYPE_A
, dns
.DNS_QCLASS_IN
)
308 self
.finish_name_packet(p
, updates
)
310 (response
, response_packet
) =\
311 self
.dns_transaction_udp(p
, host
=server_ip
)
312 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_FORMERR
)
313 except socket
.timeout
:
314 # Windows chooses not to respond to incorrectly formatted queries.
315 # Although this appears to be non-deterministic even for the same
316 # request twice, it also appears to be based on a how poorly the
317 # request is formatted.
320 def test_update_wrong_qclass(self
):
321 "create update with DNS_QCLASS_NONE"
322 p
= self
.make_name_packet(dns
.DNS_OPCODE_UPDATE
)
325 name
= self
.get_dns_domain()
326 u
= self
.make_name_question(name
, dns
.DNS_QTYPE_A
, dns
.DNS_QCLASS_NONE
)
329 self
.finish_name_packet(p
, updates
)
330 (response
, response_packet
) =\
331 self
.dns_transaction_udp(p
, host
=server_ip
)
332 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_NOTIMP
)
334 def test_update_prereq_with_non_null_ttl(self
):
335 "test update with a non-null TTL"
336 p
= self
.make_name_packet(dns
.DNS_OPCODE_UPDATE
)
339 name
= self
.get_dns_domain()
341 u
= self
.make_name_question(name
, dns
.DNS_QTYPE_SOA
, dns
.DNS_QCLASS_IN
)
343 self
.finish_name_packet(p
, updates
)
347 r
.name
= "%s.%s" % (self
.server
, self
.get_dns_domain())
348 r
.rr_type
= dns
.DNS_QTYPE_TXT
349 r
.rr_class
= dns
.DNS_QCLASS_NONE
354 p
.ancount
= len(prereqs
)
358 (response
, response_packet
) =\
359 self
.dns_transaction_udp(p
, host
=server_ip
)
360 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_FORMERR
)
361 except socket
.timeout
:
362 # Windows chooses not to respond to incorrectly formatted queries.
363 # Although this appears to be non-deterministic even for the same
364 # request twice, it also appears to be based on a how poorly the
365 # request is formatted.
368 def test_update_prereq_with_non_null_length(self
):
369 "test update with a non-null length"
370 p
= self
.make_name_packet(dns
.DNS_OPCODE_UPDATE
)
373 name
= self
.get_dns_domain()
375 u
= self
.make_name_question(name
, dns
.DNS_QTYPE_SOA
, dns
.DNS_QCLASS_IN
)
377 self
.finish_name_packet(p
, updates
)
381 r
.name
= "%s.%s" % (self
.server
, self
.get_dns_domain())
382 r
.rr_type
= dns
.DNS_QTYPE_TXT
383 r
.rr_class
= dns
.DNS_QCLASS_ANY
388 p
.ancount
= len(prereqs
)
391 (response
, response_packet
) =\
392 self
.dns_transaction_udp(p
, host
=server_ip
)
393 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_NXRRSET
)
395 def test_update_prereq_nonexisting_name(self
):
396 "test update with a nonexisting name"
397 p
= self
.make_name_packet(dns
.DNS_OPCODE_UPDATE
)
400 name
= self
.get_dns_domain()
402 u
= self
.make_name_question(name
, dns
.DNS_QTYPE_SOA
, dns
.DNS_QCLASS_IN
)
404 self
.finish_name_packet(p
, updates
)
408 r
.name
= "idontexist.%s" % self
.get_dns_domain()
409 r
.rr_type
= dns
.DNS_QTYPE_TXT
410 r
.rr_class
= dns
.DNS_QCLASS_ANY
415 p
.ancount
= len(prereqs
)
418 (response
, response_packet
) =\
419 self
.dns_transaction_udp(p
, host
=server_ip
)
420 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_NXRRSET
)
422 def test_update_add_txt_record(self
):
423 "test adding records works"
424 prefix
, txt
= 'textrec', ['"This is a test"']
425 p
= self
.make_txt_update(prefix
, txt
)
426 (response
, response_packet
) =\
427 self
.dns_transaction_udp(p
, host
=server_ip
)
428 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_OK
)
429 self
.check_query_txt(prefix
, txt
)
431 def test_delete_record(self
):
432 "Test if deleting records works"
434 NAME
= "deleterec.%s" % self
.get_dns_domain()
436 # First, create a record to make sure we have a record to delete.
437 p
= self
.make_name_packet(dns
.DNS_OPCODE_UPDATE
)
440 name
= self
.get_dns_domain()
442 u
= self
.make_name_question(name
, dns
.DNS_QTYPE_SOA
, dns
.DNS_QCLASS_IN
)
444 self
.finish_name_packet(p
, updates
)
449 r
.rr_type
= dns
.DNS_QTYPE_TXT
450 r
.rr_class
= dns
.DNS_QCLASS_IN
453 rdata
= self
.make_txt_record(['"This is a test"'])
456 p
.nscount
= len(updates
)
459 (response
, response_packet
) =\
460 self
.dns_transaction_udp(p
, host
=server_ip
)
461 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_OK
)
463 # Now check the record is around
464 p
= self
.make_name_packet(dns
.DNS_OPCODE_QUERY
)
466 q
= self
.make_name_question(NAME
, dns
.DNS_QTYPE_TXT
, dns
.DNS_QCLASS_IN
)
469 self
.finish_name_packet(p
, questions
)
470 (response
, response_packet
) =\
471 self
.dns_transaction_udp(p
, host
=server_ip
)
472 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_OK
)
474 # Now delete the record
475 p
= self
.make_name_packet(dns
.DNS_OPCODE_UPDATE
)
478 name
= self
.get_dns_domain()
480 u
= self
.make_name_question(name
, dns
.DNS_QTYPE_SOA
, dns
.DNS_QCLASS_IN
)
482 self
.finish_name_packet(p
, updates
)
487 r
.rr_type
= dns
.DNS_QTYPE_TXT
488 r
.rr_class
= dns
.DNS_QCLASS_NONE
491 rdata
= self
.make_txt_record(['"This is a test"'])
494 p
.nscount
= len(updates
)
497 (response
, response_packet
) =\
498 self
.dns_transaction_udp(p
, host
=server_ip
)
499 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_OK
)
501 # And finally check it's gone
502 p
= self
.make_name_packet(dns
.DNS_OPCODE_QUERY
)
505 q
= self
.make_name_question(NAME
, dns
.DNS_QTYPE_TXT
, dns
.DNS_QCLASS_IN
)
508 self
.finish_name_packet(p
, questions
)
509 (response
, response_packet
) =\
510 self
.dns_transaction_udp(p
, host
=server_ip
)
511 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_NXDOMAIN
)
513 def test_readd_record(self
):
514 "Test if adding, deleting and then readding a records works"
516 NAME
= "readdrec.%s" % self
.get_dns_domain()
519 p
= self
.make_name_packet(dns
.DNS_OPCODE_UPDATE
)
522 name
= self
.get_dns_domain()
524 u
= self
.make_name_question(name
, dns
.DNS_QTYPE_SOA
, dns
.DNS_QCLASS_IN
)
526 self
.finish_name_packet(p
, updates
)
531 r
.rr_type
= dns
.DNS_QTYPE_TXT
532 r
.rr_class
= dns
.DNS_QCLASS_IN
535 rdata
= self
.make_txt_record(['"This is a test"'])
538 p
.nscount
= len(updates
)
541 (response
, response_packet
) =\
542 self
.dns_transaction_udp(p
, host
=server_ip
)
543 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_OK
)
545 # Now check the record is around
546 p
= self
.make_name_packet(dns
.DNS_OPCODE_QUERY
)
548 q
= self
.make_name_question(NAME
, dns
.DNS_QTYPE_TXT
, dns
.DNS_QCLASS_IN
)
551 self
.finish_name_packet(p
, questions
)
552 (response
, response_packet
) =\
553 self
.dns_transaction_udp(p
, host
=server_ip
)
554 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_OK
)
556 # Now delete the record
557 p
= self
.make_name_packet(dns
.DNS_OPCODE_UPDATE
)
560 name
= self
.get_dns_domain()
562 u
= self
.make_name_question(name
, dns
.DNS_QTYPE_SOA
, dns
.DNS_QCLASS_IN
)
564 self
.finish_name_packet(p
, updates
)
569 r
.rr_type
= dns
.DNS_QTYPE_TXT
570 r
.rr_class
= dns
.DNS_QCLASS_NONE
573 rdata
= self
.make_txt_record(['"This is a test"'])
576 p
.nscount
= len(updates
)
579 (response
, response_packet
) =\
580 self
.dns_transaction_udp(p
, host
=server_ip
)
581 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_OK
)
584 p
= self
.make_name_packet(dns
.DNS_OPCODE_QUERY
)
587 q
= self
.make_name_question(NAME
, dns
.DNS_QTYPE_TXT
, dns
.DNS_QCLASS_IN
)
590 self
.finish_name_packet(p
, questions
)
591 (response
, response_packet
) =\
592 self
.dns_transaction_udp(p
, host
=server_ip
)
593 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_NXDOMAIN
)
595 # recreate the record
596 p
= self
.make_name_packet(dns
.DNS_OPCODE_UPDATE
)
599 name
= self
.get_dns_domain()
601 u
= self
.make_name_question(name
, dns
.DNS_QTYPE_SOA
, dns
.DNS_QCLASS_IN
)
603 self
.finish_name_packet(p
, updates
)
608 r
.rr_type
= dns
.DNS_QTYPE_TXT
609 r
.rr_class
= dns
.DNS_QCLASS_IN
612 rdata
= self
.make_txt_record(['"This is a test"'])
615 p
.nscount
= len(updates
)
618 (response
, response_packet
) =\
619 self
.dns_transaction_udp(p
, host
=server_ip
)
620 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_OK
)
622 # Now check the record is around
623 p
= self
.make_name_packet(dns
.DNS_OPCODE_QUERY
)
625 q
= self
.make_name_question(NAME
, dns
.DNS_QTYPE_TXT
, dns
.DNS_QCLASS_IN
)
628 self
.finish_name_packet(p
, questions
)
629 (response
, response_packet
) =\
630 self
.dns_transaction_udp(p
, host
=server_ip
)
631 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_OK
)
633 def test_update_add_mx_record(self
):
634 "test adding MX records works"
635 p
= self
.make_name_packet(dns
.DNS_OPCODE_UPDATE
)
638 name
= self
.get_dns_domain()
640 u
= self
.make_name_question(name
, dns
.DNS_QTYPE_SOA
, dns
.DNS_QCLASS_IN
)
642 self
.finish_name_packet(p
, updates
)
646 r
.name
= "%s" % self
.get_dns_domain()
647 r
.rr_type
= dns
.DNS_QTYPE_MX
648 r
.rr_class
= dns
.DNS_QCLASS_IN
651 rdata
= dns
.mx_record()
652 rdata
.preference
= 10
653 rdata
.exchange
= 'mail.%s' % self
.get_dns_domain()
656 p
.nscount
= len(updates
)
659 (response
, response_packet
) =\
660 self
.dns_transaction_udp(p
, host
=server_ip
)
661 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_OK
)
663 p
= self
.make_name_packet(dns
.DNS_OPCODE_QUERY
)
666 name
= "%s" % self
.get_dns_domain()
667 q
= self
.make_name_question(name
, dns
.DNS_QTYPE_MX
, dns
.DNS_QCLASS_IN
)
670 self
.finish_name_packet(p
, questions
)
671 (response
, response_packet
) =\
672 self
.dns_transaction_udp(p
, host
=server_ip
)
673 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_OK
)
674 self
.assertEqual(response
.ancount
, 1)
675 ans
= response
.answers
[0]
676 self
.assertEqual(ans
.rr_type
, dns
.DNS_QTYPE_MX
)
677 self
.assertEqual(ans
.rdata
.preference
, 10)
678 self
.assertEqual(ans
.rdata
.exchange
, 'mail.%s' % self
.get_dns_domain())
681 class TestComplexQueries(DNSTest
):
682 def make_dns_update(self
, key
, value
, qtype
):
683 p
= self
.make_name_packet(dns
.DNS_OPCODE_UPDATE
)
685 name
= self
.get_dns_domain()
686 u
= self
.make_name_question(name
, dns
.DNS_QTYPE_SOA
, dns
.DNS_QCLASS_IN
)
687 self
.finish_name_packet(p
, [u
])
692 r
.rr_class
= dns
.DNS_QCLASS_IN
698 (response
, response_packet
) =\
699 self
.dns_transaction_udp(p
, host
=server_ip
)
700 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_OK
)
703 super(TestComplexQueries
, self
).setUp()
705 global server
, server_ip
, lp
, creds
, timeout
706 self
.server
= server_name
707 self
.server_ip
= server_ip
710 self
.timeout
= timeout
712 def test_one_a_query(self
):
713 "create a query packet containing one query record"
718 name
= "cname_test.%s" % self
.get_dns_domain()
719 rdata
= "%s.%s" % (self
.server
, self
.get_dns_domain())
720 self
.make_dns_update(name
, rdata
, dns
.DNS_QTYPE_CNAME
)
722 p
= self
.make_name_packet(dns
.DNS_OPCODE_QUERY
)
726 name
= "cname_test.%s" % self
.get_dns_domain()
727 q
= self
.make_name_question(name
,
730 print("asking for ", q
.name
)
733 self
.finish_name_packet(p
, questions
)
734 (response
, response_packet
) =\
735 self
.dns_transaction_udp(p
, host
=self
.server_ip
)
736 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_OK
)
737 self
.assert_dns_opcode_equals(response
, dns
.DNS_OPCODE_QUERY
)
738 self
.assertEquals(response
.ancount
, 2)
739 self
.assertEquals(response
.answers
[0].rr_type
, dns
.DNS_QTYPE_CNAME
)
740 self
.assertEquals(response
.answers
[0].rdata
, "%s.%s" %
741 (self
.server
, self
.get_dns_domain()))
742 self
.assertEquals(response
.answers
[1].rr_type
, dns
.DNS_QTYPE_A
)
743 self
.assertEquals(response
.answers
[1].rdata
,
748 p
= self
.make_name_packet(dns
.DNS_OPCODE_UPDATE
)
751 name
= self
.get_dns_domain()
753 u
= self
.make_name_question(name
,
757 self
.finish_name_packet(p
, updates
)
761 r
.name
= "cname_test.%s" % self
.get_dns_domain()
762 r
.rr_type
= dns
.DNS_QTYPE_CNAME
763 r
.rr_class
= dns
.DNS_QCLASS_NONE
766 r
.rdata
= "%s.%s" % (self
.server
, self
.get_dns_domain())
768 p
.nscount
= len(updates
)
771 (response
, response_packet
) =\
772 self
.dns_transaction_udp(p
, host
=self
.server_ip
)
773 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_OK
)
775 def test_cname_two_chain(self
):
776 name0
= "cnamechain0.%s" % self
.get_dns_domain()
777 name1
= "cnamechain1.%s" % self
.get_dns_domain()
778 name2
= "cnamechain2.%s" % self
.get_dns_domain()
779 self
.make_dns_update(name1
, name2
, dns
.DNS_QTYPE_CNAME
)
780 self
.make_dns_update(name2
, name0
, dns
.DNS_QTYPE_CNAME
)
781 self
.make_dns_update(name0
, server_ip
, dns
.DNS_QTYPE_A
)
783 p
= self
.make_name_packet(dns
.DNS_OPCODE_QUERY
)
785 q
= self
.make_name_question(name1
, dns
.DNS_QTYPE_A
,
789 self
.finish_name_packet(p
, questions
)
790 (response
, response_packet
) =\
791 self
.dns_transaction_udp(p
, host
=server_ip
)
792 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_OK
)
793 self
.assert_dns_opcode_equals(response
, dns
.DNS_OPCODE_QUERY
)
794 self
.assertEquals(response
.ancount
, 3)
796 self
.assertEquals(response
.answers
[0].rr_type
, dns
.DNS_QTYPE_CNAME
)
797 self
.assertEquals(response
.answers
[0].name
, name1
)
798 self
.assertEquals(response
.answers
[0].rdata
, name2
)
800 self
.assertEquals(response
.answers
[1].rr_type
, dns
.DNS_QTYPE_CNAME
)
801 self
.assertEquals(response
.answers
[1].name
, name2
)
802 self
.assertEquals(response
.answers
[1].rdata
, name0
)
804 self
.assertEquals(response
.answers
[2].rr_type
, dns
.DNS_QTYPE_A
)
805 self
.assertEquals(response
.answers
[2].rdata
,
808 def test_invalid_empty_cname(self
):
809 name0
= "cnamedotprefix0.%s" % self
.get_dns_domain()
811 self
.make_dns_update(name0
, "", dns
.DNS_QTYPE_CNAME
)
812 except AssertionError:
815 self
.fail("Successfully added empty CNAME, which is invalid.")
817 def test_cname_two_chain_not_matching_qtype(self
):
818 name0
= "cnamechain0.%s" % self
.get_dns_domain()
819 name1
= "cnamechain1.%s" % self
.get_dns_domain()
820 name2
= "cnamechain2.%s" % self
.get_dns_domain()
821 self
.make_dns_update(name1
, name2
, dns
.DNS_QTYPE_CNAME
)
822 self
.make_dns_update(name2
, name0
, dns
.DNS_QTYPE_CNAME
)
823 self
.make_dns_update(name0
, server_ip
, dns
.DNS_QTYPE_A
)
825 p
= self
.make_name_packet(dns
.DNS_OPCODE_QUERY
)
827 q
= self
.make_name_question(name1
, dns
.DNS_QTYPE_TXT
,
831 self
.finish_name_packet(p
, questions
)
832 (response
, response_packet
) =\
833 self
.dns_transaction_udp(p
, host
=server_ip
)
834 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_OK
)
835 self
.assert_dns_opcode_equals(response
, dns
.DNS_OPCODE_QUERY
)
837 # CNAME should return all intermediate results!
838 # Only the A records exists, not the TXT.
839 self
.assertEquals(response
.ancount
, 2)
841 self
.assertEquals(response
.answers
[0].rr_type
, dns
.DNS_QTYPE_CNAME
)
842 self
.assertEquals(response
.answers
[0].name
, name1
)
843 self
.assertEquals(response
.answers
[0].rdata
, name2
)
845 self
.assertEquals(response
.answers
[1].rr_type
, dns
.DNS_QTYPE_CNAME
)
846 self
.assertEquals(response
.answers
[1].name
, name2
)
847 self
.assertEquals(response
.answers
[1].rdata
, name0
)
849 def test_cname_loop(self
):
850 cname1
= "cnamelooptestrec." + self
.get_dns_domain()
851 cname2
= "cnamelooptestrec2." + self
.get_dns_domain()
852 cname3
= "cnamelooptestrec3." + self
.get_dns_domain()
853 self
.make_dns_update(cname1
, cname2
, dnsp
.DNS_TYPE_CNAME
)
854 self
.make_dns_update(cname2
, cname3
, dnsp
.DNS_TYPE_CNAME
)
855 self
.make_dns_update(cname3
, cname1
, dnsp
.DNS_TYPE_CNAME
)
857 p
= self
.make_name_packet(dns
.DNS_OPCODE_QUERY
)
860 q
= self
.make_name_question(cname1
,
864 self
.finish_name_packet(p
, questions
)
866 (response
, response_packet
) =\
867 self
.dns_transaction_udp(p
, host
=self
.server_ip
)
869 max_recursion_depth
= 20
870 self
.assertEquals(len(response
.answers
), max_recursion_depth
)
872 # Make sure cname limit doesn't count other records. This is a generic
873 # test called in tests below
874 def max_rec_test(self
, rtype
, rec_gen
):
875 name
= "limittestrec{0}.{1}".format(rtype
, self
.get_dns_domain())
877 num_recs_to_enter
= limit
+ 5
879 for i
in range(1, num_recs_to_enter
+1):
881 self
.make_dns_update(name
, ip
, rtype
)
883 p
= self
.make_name_packet(dns
.DNS_OPCODE_QUERY
)
886 q
= self
.make_name_question(name
,
890 self
.finish_name_packet(p
, questions
)
892 (response
, response_packet
) =\
893 self
.dns_transaction_udp(p
, host
=self
.server_ip
)
895 self
.assertEqual(len(response
.answers
), num_recs_to_enter
)
897 def test_record_limit_A(self
):
899 return "127.0.0." + str(i
)
900 self
.max_rec_test(rtype
=dns
.DNS_QTYPE_A
, rec_gen
=ip4_gen
)
902 def test_record_limit_AAAA(self
):
904 return "AAAA:0:0:0:0:0:0:" + str(i
)
905 self
.max_rec_test(rtype
=dns
.DNS_QTYPE_AAAA
, rec_gen
=ip6_gen
)
907 def test_record_limit_SRV(self
):
909 rec
= dns
.srv_record()
913 rec
.target
= "srvtestrec" + str(i
)
915 self
.max_rec_test(rtype
=dns
.DNS_QTYPE_SRV
, rec_gen
=srv_gen
)
917 # Same as test_record_limit_A but with a preceding CNAME follow
918 def test_cname_limit(self
):
919 cname1
= "cnamelimittestrec." + self
.get_dns_domain()
920 cname2
= "cnamelimittestrec2." + self
.get_dns_domain()
921 cname3
= "cnamelimittestrec3." + self
.get_dns_domain()
922 ip_prefix
= '127.0.0.'
924 num_recs_to_enter
= limit
+ 5
926 self
.make_dns_update(cname1
, cname2
, dnsp
.DNS_TYPE_CNAME
)
927 self
.make_dns_update(cname2
, cname3
, dnsp
.DNS_TYPE_CNAME
)
928 num_arecs_to_enter
= num_recs_to_enter
- 2
929 for i
in range(1, num_arecs_to_enter
+1):
930 ip
= ip_prefix
+ str(i
)
931 self
.make_dns_update(cname3
, ip
, dns
.DNS_QTYPE_A
)
933 p
= self
.make_name_packet(dns
.DNS_OPCODE_QUERY
)
936 q
= self
.make_name_question(cname1
,
940 self
.finish_name_packet(p
, questions
)
942 (response
, response_packet
) =\
943 self
.dns_transaction_udp(p
, host
=self
.server_ip
)
945 self
.assertEqual(len(response
.answers
), num_recs_to_enter
)
947 # ANY query on cname record shouldn't follow the link
948 def test_cname_any_query(self
):
949 cname1
= "cnameanytestrec." + self
.get_dns_domain()
950 cname2
= "cnameanytestrec2." + self
.get_dns_domain()
951 cname3
= "cnameanytestrec3." + self
.get_dns_domain()
953 self
.make_dns_update(cname1
, cname2
, dnsp
.DNS_TYPE_CNAME
)
954 self
.make_dns_update(cname2
, cname3
, dnsp
.DNS_TYPE_CNAME
)
956 p
= self
.make_name_packet(dns
.DNS_OPCODE_QUERY
)
959 q
= self
.make_name_question(cname1
,
963 self
.finish_name_packet(p
, questions
)
965 (response
, response_packet
) =\
966 self
.dns_transaction_udp(p
, host
=self
.server_ip
)
968 self
.assertEqual(len(response
.answers
), 1)
969 self
.assertEqual(response
.answers
[0].name
, cname1
)
970 self
.assertEqual(response
.answers
[0].rdata
, cname2
)
973 class TestInvalidQueries(DNSTest
):
975 super(TestInvalidQueries
, self
).setUp()
976 global server
, server_ip
, lp
, creds
, timeout
977 self
.server
= server_name
978 self
.server_ip
= server_ip
981 self
.timeout
= timeout
983 def test_one_a_query(self
):
984 """send 0 bytes follows by create a query packet
985 containing one query record"""
989 s
= socket
.socket(socket
.AF_INET
, socket
.SOCK_DGRAM
, 0)
990 s
.connect((self
.server_ip
, 53))
996 p
= self
.make_name_packet(dns
.DNS_OPCODE_QUERY
)
999 name
= "%s.%s" % (self
.server
, self
.get_dns_domain())
1000 q
= self
.make_name_question(name
, dns
.DNS_QTYPE_A
, dns
.DNS_QCLASS_IN
)
1001 print("asking for ", q
.name
)
1004 self
.finish_name_packet(p
, questions
)
1005 (response
, response_packet
) =\
1006 self
.dns_transaction_udp(p
, host
=self
.server_ip
)
1007 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_OK
)
1008 self
.assert_dns_opcode_equals(response
, dns
.DNS_OPCODE_QUERY
)
1009 self
.assertEquals(response
.ancount
, 1)
1010 self
.assertEquals(response
.answers
[0].rdata
,
1013 def test_one_a_reply(self
):
1014 "send a reply instead of a query"
1017 p
= self
.make_name_packet(dns
.DNS_OPCODE_QUERY
)
1020 name
= "%s.%s" % ('fakefakefake', self
.get_dns_domain())
1021 q
= self
.make_name_question(name
, dns
.DNS_QTYPE_A
, dns
.DNS_QCLASS_IN
)
1022 print("asking for ", q
.name
)
1025 self
.finish_name_packet(p
, questions
)
1026 p
.operation |
= dns
.DNS_FLAG_REPLY
1029 send_packet
= ndr
.ndr_pack(p
)
1030 s
= socket
.socket(socket
.AF_INET
, socket
.SOCK_STREAM
, 0)
1031 s
.settimeout(timeout
)
1032 host
= self
.server_ip
1033 s
.connect((host
, 53))
1034 tcp_packet
= struct
.pack('!H', len(send_packet
))
1035 tcp_packet
+= send_packet
1036 s
.send(tcp_packet
, 0)
1037 recv_packet
= s
.recv(0xffff + 2, 0)
1038 self
.assertEquals(0, len(recv_packet
))
1039 except socket
.timeout
:
1040 # Windows chooses not to respond to incorrectly formatted queries.
1041 # Although this appears to be non-deterministic even for the same
1042 # request twice, it also appears to be based on a how poorly the
1043 # request is formatted.
1050 class TestZones(DNSTest
):
1052 super(TestZones
, self
).setUp()
1053 global server
, server_ip
, lp
, creds
, timeout
1054 self
.server
= server_name
1055 self
.server_ip
= server_ip
1058 self
.timeout
= timeout
1060 self
.zone
= "test.lan"
1061 self
.rpc_conn
= dnsserver
.dnsserver("ncacn_ip_tcp:%s[sign]" %
1063 self
.lp
, self
.creds
)
1065 self
.samdb
= SamDB(url
="ldap://" + self
.server_ip
,
1066 lp
=self
.get_loadparm(),
1067 session_info
=system_session(),
1068 credentials
=self
.creds
)
1069 self
.zone_dn
= "DC=" + self
.zone
+\
1070 ",CN=MicrosoftDNS,DC=DomainDNSZones," +\
1071 str(self
.samdb
.get_default_basedn())
1074 super(TestZones
, self
).tearDown()
1077 self
.delete_zone(self
.zone
)
1078 except RuntimeError as e
:
1079 (num
, string
) = e
.args
1080 if num
!= werror
.WERR_DNS_ERROR_ZONE_DOES_NOT_EXIST
:
1083 def make_zone_obj(self
, zone
, aging_enabled
=False):
1084 zone_create
= dnsserver
.DNS_RPC_ZONE_CREATE_INFO_LONGHORN()
1085 zone_create
.pszZoneName
= zone
1086 zone_create
.dwZoneType
= dnsp
.DNS_ZONE_TYPE_PRIMARY
1087 zone_create
.fAging
= int(aging_enabled
)
1088 zone_create
.dwDpFlags
= dnsserver
.DNS_DP_DOMAIN_DEFAULT
1089 zone_create
.fDsIntegrated
= 1
1090 zone_create
.fLoadExisting
= 1
1091 zone_create
.fAllowUpdate
= dnsp
.DNS_ZONE_UPDATE_UNSECURE
1094 def create_zone(self
, zone
, aging_enabled
=False):
1095 zone_create
= self
.make_zone_obj(zone
, aging_enabled
)
1097 client_version
= dnsserver
.DNS_CLIENT_VERSION_LONGHORN
1098 self
.rpc_conn
.DnssrvOperation2(client_version
,
1104 dnsserver
.DNSSRV_TYPEID_ZONE_CREATE
,
1106 except WERRORError
as e
:
1109 def set_params(self
, **kwargs
):
1110 zone
= kwargs
.pop('zone', None)
1111 for key
, val
in kwargs
.items():
1112 name_param
= dnsserver
.DNS_RPC_NAME_AND_PARAM()
1113 name_param
.dwParam
= val
1114 name_param
.pszNodeName
= key
1116 client_version
= dnsserver
.DNS_CLIENT_VERSION_LONGHORN
1117 nap_type
= dnsserver
.DNSSRV_TYPEID_NAME_AND_PARAM
1119 self
.rpc_conn
.DnssrvOperation2(client_version
,
1124 'ResetDwordProperty',
1127 except WERRORError
as e
:
1130 def ldap_modify_dnsrecs(self
, name
, func
):
1131 dn
= 'DC={0},{1}'.format(name
, self
.zone_dn
)
1132 dns_recs
= self
.ldap_get_dns_records(name
)
1133 for rec
in dns_recs
:
1135 update_dict
= {'dn': dn
, 'dnsRecord': [ndr_pack(r
) for r
in dns_recs
]}
1136 self
.samdb
.modify(ldb
.Message
.from_dict(self
.samdb
,
1138 ldb
.FLAG_MOD_REPLACE
))
1140 def dns_update_record(self
, prefix
, txt
):
1141 p
= self
.make_txt_update(prefix
, txt
, self
.zone
)
1142 (code
, response
) = self
.dns_transaction_udp(p
, host
=self
.server_ip
)
1143 self
.assert_dns_rcode_equals(code
, dns
.DNS_RCODE_OK
)
1144 recs
= self
.ldap_get_dns_records(prefix
)
1145 recs
= [r
for r
in recs
if r
.data
.str == txt
]
1146 self
.assertEqual(len(recs
), 1)
1149 def dns_tombstone(self
, prefix
, txt
, zone
):
1150 name
= prefix
+ "." + zone
1152 to
= dnsp
.DnssrvRpcRecord()
1153 to
.dwTimeStamp
= 1000
1154 to
.wType
= dnsp
.DNS_TYPE_TOMBSTONE
1156 self
.samdb
.dns_replace(name
, [to
])
1158 def ldap_get_records(self
, name
):
1159 # The use of SCOPE_SUBTREE here avoids raising an exception in the
1160 # 0 results case for a test below.
1162 expr
= "(&(objectClass=dnsNode)(name={0}))".format(name
)
1163 return self
.samdb
.search(base
=self
.zone_dn
, scope
=ldb
.SCOPE_SUBTREE
,
1164 expression
=expr
, attrs
=["*"])
1166 def ldap_get_dns_records(self
, name
):
1167 records
= self
.ldap_get_records(name
)
1168 return [ndr_unpack(dnsp
.DnssrvRpcRecord
, r
)
1169 for r
in records
[0].get('dnsRecord')]
1171 def ldap_get_zone_settings(self
):
1172 records
= self
.samdb
.search(base
=self
.zone_dn
, scope
=ldb
.SCOPE_BASE
,
1173 expression
="(&(objectClass=dnsZone)" +
1174 "(name={0}))".format(self
.zone
),
1175 attrs
=["dNSProperty"])
1176 self
.assertEqual(len(records
), 1)
1177 props
= [ndr_unpack(dnsp
.DnsProperty
, r
)
1178 for r
in records
[0].get('dNSProperty')]
1180 # We have no choice but to repeat these here.
1181 zone_prop_ids
= {0x00: "EMPTY",
1183 0x02: "ALLOW_UPDATE",
1184 0x08: "SECURE_TIME",
1185 0x10: "NOREFRESH_INTERVAL",
1186 0x11: "SCAVENGING_SERVERS",
1187 0x12: "AGING_ENABLED_TIME",
1188 0x20: "REFRESH_INTERVAL",
1189 0x40: "AGING_STATE",
1190 0x80: "DELETED_FROM_HOSTNAME",
1191 0x81: "MASTER_SERVERS",
1192 0x82: "AUTO_NS_SERVERS",
1193 0x83: "DCPROMO_CONVERT",
1194 0x90: "SCAVENGING_SERVERS_DA",
1195 0x91: "MASTER_SERVERS_DA",
1196 0x92: "NS_SERVERS_DA",
1197 0x100: "NODE_DBFLAGS"}
1198 return {zone_prop_ids
[p
.id].lower(): p
.data
for p
in props
}
1200 def set_aging(self
, enable
=False):
1201 self
.create_zone(self
.zone
, aging_enabled
=enable
)
1202 self
.set_params(NoRefreshInterval
=1, RefreshInterval
=1,
1203 Aging
=int(bool(enable
)), zone
=self
.zone
,
1204 AllowUpdate
=dnsp
.DNS_ZONE_UPDATE_UNSECURE
)
1206 def test_set_aging(self
, enable
=True, name
='agingtest', txt
=['test txt']):
1207 self
.set_aging(enable
=True)
1208 settings
= self
.ldap_get_zone_settings()
1209 self
.assertTrue(settings
['aging_state'] is not None)
1210 self
.assertTrue(settings
['aging_state'])
1212 rec
= self
.dns_update_record('agingtest', ['test txt'])
1213 self
.assertNotEqual(rec
.dwTimeStamp
, 0)
1215 def test_set_aging_disabled(self
):
1216 self
.set_aging(enable
=False)
1217 settings
= self
.ldap_get_zone_settings()
1218 self
.assertTrue(settings
['aging_state'] is not None)
1219 self
.assertFalse(settings
['aging_state'])
1221 rec
= self
.dns_update_record('agingtest', ['test txt'])
1222 self
.assertNotEqual(rec
.dwTimeStamp
, 0)
1224 def test_aging_update(self
, enable
=True):
1225 name
, txt
= 'agingtest', ['test txt']
1226 self
.set_aging(enable
=True)
1227 before_mod
= self
.dns_update_record(name
, txt
)
1229 self
.set_params(zone
=self
.zone
, Aging
=0)
1233 self
.assertTrue(rec
.dwTimeStamp
> 0)
1234 rec
.dwTimeStamp
-= dec
1235 self
.ldap_modify_dnsrecs(name
, mod_ts
)
1236 after_mod
= self
.ldap_get_dns_records(name
)
1237 self
.assertEqual(len(after_mod
), 1)
1238 after_mod
= after_mod
[0]
1239 self
.assertEqual(after_mod
.dwTimeStamp
,
1240 before_mod
.dwTimeStamp
- dec
)
1241 after_update
= self
.dns_update_record(name
, txt
)
1242 after_should_equal
= before_mod
if enable
else after_mod
1243 self
.assertEqual(after_should_equal
.dwTimeStamp
,
1244 after_update
.dwTimeStamp
)
1246 def test_aging_update_disabled(self
):
1247 self
.test_aging_update(enable
=False)
1249 def test_aging_refresh(self
):
1250 name
, txt
= 'agingtest', ['test txt']
1251 self
.create_zone(self
.zone
, aging_enabled
=True)
1253 self
.set_params(NoRefreshInterval
=interval
, RefreshInterval
=interval
,
1254 Aging
=1, zone
=self
.zone
,
1255 AllowUpdate
=dnsp
.DNS_ZONE_UPDATE_UNSECURE
)
1256 before_mod
= self
.dns_update_record(name
, txt
)
1259 self
.assertTrue(rec
.dwTimeStamp
> 0)
1260 rec
.dwTimeStamp
-= interval
// 2
1261 self
.ldap_modify_dnsrecs(name
, mod_ts
)
1262 update_during_norefresh
= self
.dns_update_record(name
, txt
)
1265 self
.assertTrue(rec
.dwTimeStamp
> 0)
1266 rec
.dwTimeStamp
-= interval
+ interval
// 2
1267 self
.ldap_modify_dnsrecs(name
, mod_ts
)
1268 update_during_refresh
= self
.dns_update_record(name
, txt
)
1269 self
.assertEqual(update_during_norefresh
.dwTimeStamp
,
1270 before_mod
.dwTimeStamp
- interval
/ 2)
1271 self
.assertEqual(update_during_refresh
.dwTimeStamp
,
1272 before_mod
.dwTimeStamp
)
1274 def test_rpc_add_no_timestamp(self
):
1275 name
, txt
= 'agingtest', ['test txt']
1276 self
.set_aging(enable
=True)
1277 rec_buf
= dnsserver
.DNS_RPC_RECORD_BUF()
1278 rec_buf
.rec
= TXTRecord(txt
)
1279 self
.rpc_conn
.DnssrvUpdateRecord2(
1280 dnsserver
.DNS_CLIENT_VERSION_LONGHORN
,
1287 recs
= self
.ldap_get_dns_records(name
)
1288 self
.assertEqual(len(recs
), 1)
1289 self
.assertEqual(recs
[0].dwTimeStamp
, 0)
1291 def test_static_record_dynamic_update(self
):
1292 name
, txt
= 'agingtest', ['test txt']
1293 txt2
= ['test txt2']
1294 self
.set_aging(enable
=True)
1295 rec_buf
= dnsserver
.DNS_RPC_RECORD_BUF()
1296 rec_buf
.rec
= TXTRecord(txt
)
1297 self
.rpc_conn
.DnssrvUpdateRecord2(
1298 dnsserver
.DNS_CLIENT_VERSION_LONGHORN
,
1306 rec2
= self
.dns_update_record(name
, txt2
)
1307 self
.assertEqual(rec2
.dwTimeStamp
, 0)
1309 def test_dynamic_record_static_update(self
):
1310 name
, txt
= 'agingtest', ['test txt']
1311 txt2
= ['test txt2']
1312 txt3
= ['test txt3']
1313 self
.set_aging(enable
=True)
1315 self
.dns_update_record(name
, txt
)
1317 rec_buf
= dnsserver
.DNS_RPC_RECORD_BUF()
1318 rec_buf
.rec
= TXTRecord(txt2
)
1319 self
.rpc_conn
.DnssrvUpdateRecord2(
1320 dnsserver
.DNS_CLIENT_VERSION_LONGHORN
,
1328 self
.dns_update_record(name
, txt3
)
1330 recs
= self
.ldap_get_dns_records(name
)
1331 # Put in dict because ldap recs might be out of order
1332 recs
= {str(r
.data
.str): r
for r
in recs
}
1333 self
.assertNotEqual(recs
[str(txt
)].dwTimeStamp
, 0)
1334 self
.assertEqual(recs
[str(txt2
)].dwTimeStamp
, 0)
1335 self
.assertEqual(recs
[str(txt3
)].dwTimeStamp
, 0)
1337 def test_dns_tombstone_custom_match_rule(self
):
1338 lp
= self
.get_loadparm()
1339 self
.samdb
= SamDB(url
=lp
.samdb_url(), lp
=lp
,
1340 session_info
=system_session(),
1341 credentials
=self
.creds
)
1343 name
, txt
= 'agingtest', ['test txt']
1344 name2
, txt2
= 'agingtest2', ['test txt2']
1345 name3
, txt3
= 'agingtest3', ['test txt3']
1346 name4
, txt4
= 'agingtest4', ['test txt4']
1347 name5
, txt5
= 'agingtest5', ['test txt5']
1349 self
.create_zone(self
.zone
, aging_enabled
=True)
1351 self
.set_params(NoRefreshInterval
=interval
, RefreshInterval
=interval
,
1352 Aging
=1, zone
=self
.zone
,
1353 AllowUpdate
=dnsp
.DNS_ZONE_UPDATE_UNSECURE
)
1355 self
.dns_update_record(name
, txt
)
1357 self
.dns_update_record(name2
, txt
)
1358 self
.dns_update_record(name2
, txt2
)
1360 self
.dns_update_record(name3
, txt
)
1361 self
.dns_update_record(name3
, txt2
)
1362 last_update
= self
.dns_update_record(name3
, txt3
)
1364 # Modify txt1 of the first 2 names
1366 if rec
.data
.str == txt
:
1367 rec
.dwTimeStamp
-= 2
1368 self
.ldap_modify_dnsrecs(name
, mod_ts
)
1369 self
.ldap_modify_dnsrecs(name2
, mod_ts
)
1371 # create a static dns record.
1372 rec_buf
= dnsserver
.DNS_RPC_RECORD_BUF()
1373 rec_buf
.rec
= TXTRecord(txt4
)
1374 self
.rpc_conn
.DnssrvUpdateRecord2(
1375 dnsserver
.DNS_CLIENT_VERSION_LONGHORN
,
1383 # Create a tomb stoned record.
1384 self
.dns_update_record(name5
, txt5
)
1385 self
.dns_tombstone(name5
, txt5
, self
.zone
)
1387 self
.ldap_get_dns_records(name3
)
1388 expr
= "(dnsRecord:1.3.6.1.4.1.7165.4.5.3:={0})"
1389 expr
= expr
.format(int(last_update
.dwTimeStamp
) - 1)
1391 res
= self
.samdb
.search(base
=self
.zone_dn
, scope
=ldb
.SCOPE_SUBTREE
,
1392 expression
=expr
, attrs
=["*"])
1393 except ldb
.LdbError
as e
:
1395 updated_names
= {str(r
.get('name')) for r
in res
}
1396 self
.assertEqual(updated_names
, set([name
, name2
]))
1398 def test_dns_tombstone_custom_match_rule_no_records(self
):
1399 lp
= self
.get_loadparm()
1400 self
.samdb
= SamDB(url
=lp
.samdb_url(), lp
=lp
,
1401 session_info
=system_session(),
1402 credentials
=self
.creds
)
1404 self
.create_zone(self
.zone
, aging_enabled
=True)
1406 self
.set_params(NoRefreshInterval
=interval
, RefreshInterval
=interval
,
1407 Aging
=1, zone
=self
.zone
,
1408 AllowUpdate
=dnsp
.DNS_ZONE_UPDATE_UNSECURE
)
1410 expr
= "(dnsRecord:1.3.6.1.4.1.7165.4.5.3:={0})"
1411 expr
= expr
.format(1)
1414 res
= self
.samdb
.search(base
=self
.zone_dn
, scope
=ldb
.SCOPE_SUBTREE
,
1415 expression
=expr
, attrs
=["*"])
1416 except ldb
.LdbError
as e
:
1418 self
.assertEqual(0, len(res
))
1420 def test_dns_tombstone_custom_match_rule_fail(self
):
1421 self
.create_zone(self
.zone
, aging_enabled
=True)
1422 samdb
= SamDB(url
=lp
.samdb_url(),
1424 session_info
=system_session(),
1425 credentials
=self
.creds
)
1427 # Property name in not dnsRecord
1428 expr
= "(dnsProperty:1.3.6.1.4.1.7165.4.5.3:=1)"
1429 res
= samdb
.search(base
=self
.zone_dn
, scope
=ldb
.SCOPE_SUBTREE
,
1430 expression
=expr
, attrs
=["*"])
1431 self
.assertEquals(len(res
), 0)
1433 # No value for tombstone time
1435 expr
= "(dnsRecord:1.3.6.1.4.1.7165.4.5.3:=)"
1436 res
= samdb
.search(base
=self
.zone_dn
, scope
=ldb
.SCOPE_SUBTREE
,
1437 expression
=expr
, attrs
=["*"])
1438 self
.assertEquals(len(res
), 0)
1439 self
.fail("Exception: ldb.ldbError not generated")
1440 except ldb
.LdbError
as e
:
1442 self
.assertEquals(num
, ERR_OPERATIONS_ERROR
)
1444 # Tombstone time = -
1446 expr
= "(dnsRecord:1.3.6.1.4.1.7165.4.5.3:=-)"
1447 res
= samdb
.search(base
=self
.zone_dn
, scope
=ldb
.SCOPE_SUBTREE
,
1448 expression
=expr
, attrs
=["*"])
1449 self
.assertEquals(len(res
), 0)
1450 self
.fail("Exception: ldb.ldbError not generated")
1451 except ldb
.LdbError
as e
:
1453 self
.assertEquals(num
, ERR_OPERATIONS_ERROR
)
1455 # Tombstone time longer than 64 characters
1457 expr
= "(dnsRecord:1.3.6.1.4.1.7165.4.5.3:={0})"
1458 expr
= expr
.format("1" * 65)
1459 res
= samdb
.search(base
=self
.zone_dn
, scope
=ldb
.SCOPE_SUBTREE
,
1460 expression
=expr
, attrs
=["*"])
1461 self
.assertEquals(len(res
), 0)
1462 self
.fail("Exception: ldb.ldbError not generated")
1463 except ldb
.LdbError
as e
:
1465 self
.assertEquals(num
, ERR_OPERATIONS_ERROR
)
1467 # Non numeric Tombstone time
1469 expr
= "(dnsRecord:1.3.6.1.4.1.7165.4.5.3:=expired)"
1470 res
= samdb
.search(base
=self
.zone_dn
, scope
=ldb
.SCOPE_SUBTREE
,
1471 expression
=expr
, attrs
=["*"])
1472 self
.assertEquals(len(res
), 0)
1473 self
.fail("Exception: ldb.ldbError not generated")
1474 except ldb
.LdbError
as e
:
1476 self
.assertEquals(num
, ERR_OPERATIONS_ERROR
)
1478 # Non system session
1480 db
= SamDB(url
="ldap://" + self
.server_ip
,
1481 lp
=self
.get_loadparm(),
1482 credentials
=self
.creds
)
1484 expr
= "(dnsRecord:1.3.6.1.4.1.7165.4.5.3:=2)"
1485 res
= db
.search(base
=self
.zone_dn
, scope
=ldb
.SCOPE_SUBTREE
,
1486 expression
=expr
, attrs
=["*"])
1487 self
.assertEquals(len(res
), 0)
1488 self
.fail("Exception: ldb.ldbError not generated")
1489 except ldb
.LdbError
as e
:
1491 self
.assertEquals(num
, ERR_OPERATIONS_ERROR
)
1493 def test_basic_scavenging(self
):
1494 lp
= self
.get_loadparm()
1495 self
.samdb
= SamDB(url
=lp
.samdb_url(), lp
=lp
,
1496 session_info
=system_session(),
1497 credentials
=self
.creds
)
1499 self
.create_zone(self
.zone
, aging_enabled
=True)
1501 self
.set_params(NoRefreshInterval
=interval
, RefreshInterval
=interval
,
1502 zone
=self
.zone
, Aging
=1,
1503 AllowUpdate
=dnsp
.DNS_ZONE_UPDATE_UNSECURE
)
1504 name
, txt
= 'agingtest', ['test txt']
1505 name2
, txt2
= 'agingtest2', ['test txt2']
1506 name3
, txt3
= 'agingtest3', ['test txt3']
1507 self
.dns_update_record(name
, txt
)
1508 self
.dns_update_record(name2
, txt
)
1509 self
.dns_update_record(name2
, txt2
)
1510 self
.dns_update_record(name3
, txt
)
1511 self
.dns_update_record(name3
, txt2
)
1512 last_add
= self
.dns_update_record(name3
, txt3
)
1515 self
.assertTrue(rec
.dwTimeStamp
> 0)
1516 if rec
.data
.str == txt
:
1517 rec
.dwTimeStamp
-= interval
* 5
1518 self
.ldap_modify_dnsrecs(name
, mod_ts
)
1519 self
.ldap_modify_dnsrecs(name2
, mod_ts
)
1520 self
.ldap_modify_dnsrecs(name3
, mod_ts
)
1521 self
.assertTrue(callable(getattr(dsdb
, '_scavenge_dns_records', None)))
1522 dsdb
._scavenge
_dns
_records
(self
.samdb
)
1524 recs
= self
.ldap_get_dns_records(name
)
1525 self
.assertEqual(len(recs
), 1)
1526 self
.assertEqual(recs
[0].wType
, dnsp
.DNS_TYPE_TOMBSTONE
)
1528 recs
= self
.ldap_get_dns_records(name2
)
1529 self
.assertEqual(len(recs
), 1)
1530 self
.assertEqual(recs
[0].wType
, dnsp
.DNS_TYPE_TXT
)
1531 self
.assertEqual(recs
[0].data
.str, txt2
)
1533 recs
= self
.ldap_get_dns_records(name3
)
1534 self
.assertEqual(len(recs
), 2)
1535 txts
= {str(r
.data
.str) for r
in recs
}
1536 self
.assertEqual(txts
, {str(txt2
), str(txt3
)})
1537 self
.assertEqual(recs
[0].wType
, dnsp
.DNS_TYPE_TXT
)
1538 self
.assertEqual(recs
[1].wType
, dnsp
.DNS_TYPE_TXT
)
1540 for make_it_work
in [False, True]:
1541 inc
= -1 if make_it_work
else 1
1544 rec
.data
= (last_add
.dwTimeStamp
- 24 * 14) + inc
1545 self
.ldap_modify_dnsrecs(name
, mod_ts
)
1546 dsdb
._dns
_delete
_tombstones
(self
.samdb
)
1547 recs
= self
.ldap_get_records(name
)
1549 self
.assertEqual(len(recs
), 0)
1551 self
.assertEqual(len(recs
), 1)
1553 def test_fully_qualified_zone(self
):
1555 def create_zone_expect_exists(zone
):
1557 zone_create
= self
.make_zone_obj(zone
)
1558 client_version
= dnsserver
.DNS_CLIENT_VERSION_LONGHORN
1559 zc_type
= dnsserver
.DNSSRV_TYPEID_ZONE_CREATE
1560 self
.rpc_conn
.DnssrvOperation2(client_version
,
1568 except WERRORError
as e
:
1570 if enum
!= werror
.WERR_DNS_ERROR_ZONE_ALREADY_EXISTS
:
1573 self
.fail("Zone {} should already exist".format(zone
))
1575 # Create unqualified, then check creating qualified fails.
1576 self
.create_zone(self
.zone
)
1577 create_zone_expect_exists(self
.zone
+ '.')
1579 # Same again, but the other way around.
1580 self
.create_zone(self
.zone
+ '2.')
1581 create_zone_expect_exists(self
.zone
+ '2')
1583 client_version
= dnsserver
.DNS_CLIENT_VERSION_LONGHORN
1584 request_filter
= dnsserver
.DNS_ZONE_REQUEST_PRIMARY
1585 tid
= dnsserver
.DNSSRV_TYPEID_DWORD
1586 typeid
, res
= self
.rpc_conn
.DnssrvComplexOperation2(client_version
,
1594 self
.delete_zone(self
.zone
)
1595 self
.delete_zone(self
.zone
+ '2')
1597 # Two zones should've been created, neither of them fully qualified.
1598 zones_we_just_made
= []
1599 zones
= [str(z
.pszZoneName
) for z
in res
.ZoneArray
]
1601 if zone
.startswith(self
.zone
):
1602 zones_we_just_made
.append(zone
)
1603 self
.assertEqual(len(zones_we_just_made
), 2)
1604 self
.assertEqual(set(zones_we_just_made
), {self
.zone
+ '2', self
.zone
})
1606 def delete_zone(self
, zone
):
1607 self
.rpc_conn
.DnssrvOperation2(dnsserver
.DNS_CLIENT_VERSION_LONGHORN
,
1613 dnsserver
.DNSSRV_TYPEID_NULL
,
1616 def test_soa_query(self
):
1618 p
= self
.make_name_packet(dns
.DNS_OPCODE_QUERY
)
1621 q
= self
.make_name_question(zone
, dns
.DNS_QTYPE_SOA
, dns
.DNS_QCLASS_IN
)
1623 self
.finish_name_packet(p
, questions
)
1625 (response
, response_packet
) =\
1626 self
.dns_transaction_udp(p
, host
=server_ip
)
1627 # Windows returns OK while BIND logically seems to return NXDOMAIN
1628 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_NXDOMAIN
)
1629 self
.assert_dns_opcode_equals(response
, dns
.DNS_OPCODE_QUERY
)
1630 self
.assertEquals(response
.ancount
, 0)
1632 self
.create_zone(zone
)
1633 (response
, response_packet
) =\
1634 self
.dns_transaction_udp(p
, host
=server_ip
)
1635 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_OK
)
1636 self
.assert_dns_opcode_equals(response
, dns
.DNS_OPCODE_QUERY
)
1637 self
.assertEquals(response
.ancount
, 1)
1638 self
.assertEquals(response
.answers
[0].rr_type
, dns
.DNS_QTYPE_SOA
)
1640 self
.delete_zone(zone
)
1641 (response
, response_packet
) =\
1642 self
.dns_transaction_udp(p
, host
=server_ip
)
1643 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_NXDOMAIN
)
1644 self
.assert_dns_opcode_equals(response
, dns
.DNS_OPCODE_QUERY
)
1645 self
.assertEquals(response
.ancount
, 0)
1648 class TestRPCRoundtrip(DNSTest
):
1650 super(TestRPCRoundtrip
, self
).setUp()
1651 global server
, server_ip
, lp
, creds
1652 self
.server
= server_name
1653 self
.server_ip
= server_ip
1656 self
.rpc_conn
= dnsserver
.dnsserver("ncacn_ip_tcp:%s[sign]" %
1662 super(TestRPCRoundtrip
, self
).tearDown()
1664 def rpc_update(self
, fqn
=None, data
=None, wType
=None, delete
=False):
1665 fqn
= fqn
or ("rpctestrec." + self
.get_dns_domain())
1667 rec
= data_to_dns_record(wType
, data
)
1668 add_rec_buf
= dnsserver
.DNS_RPC_RECORD_BUF()
1669 add_rec_buf
.rec
= rec
1671 add_arg
= add_rec_buf
1675 del_arg
= add_rec_buf
1677 self
.rpc_conn
.DnssrvUpdateRecord2(
1678 dnsserver
.DNS_CLIENT_VERSION_LONGHORN
,
1681 self
.get_dns_domain(),
1686 def test_rpc_self_referencing_cname(self
):
1687 cname
= "cnametest2_unqual_rec_loop"
1688 cname_fqn
= "%s.%s" % (cname
, self
.get_dns_domain())
1691 self
.rpc_update(fqn
=cname
, data
=cname_fqn
,
1692 wType
=dnsp
.DNS_TYPE_CNAME
, delete
=True)
1693 except WERRORError
as e
:
1694 if e
.args
[0] != werror
.WERR_DNS_ERROR_RECORD_DOES_NOT_EXIST
:
1695 self
.fail("RPC DNS gaven wrong error on pre-test cleanup "
1696 "for self referencing CNAME: %s" % e
.args
[0])
1699 self
.rpc_update(fqn
=cname
, wType
=dnsp
.DNS_TYPE_CNAME
, data
=cname_fqn
)
1700 except WERRORError
as e
:
1701 if e
.args
[0] != werror
.WERR_DNS_ERROR_CNAME_LOOP
:
1702 self
.fail("RPC DNS gaven wrong error on insertion of "
1703 "self referencing CNAME: %s" % e
.args
[0])
1706 self
.fail("RPC DNS allowed insertion of self referencing CNAME")
1708 def test_update_add_txt_rpc_to_dns(self
):
1709 prefix
, txt
= 'rpctextrec', ['"This is a test"']
1711 name
= "%s.%s" % (prefix
, self
.get_dns_domain())
1713 rec
= data_to_dns_record(dnsp
.DNS_TYPE_TXT
, '"\\"This is a test\\""')
1714 add_rec_buf
= dnsserver
.DNS_RPC_RECORD_BUF()
1715 add_rec_buf
.rec
= rec
1717 self
.rpc_conn
.DnssrvUpdateRecord2(
1718 dnsserver
.DNS_CLIENT_VERSION_LONGHORN
,
1721 self
.get_dns_domain(),
1726 except WERRORError
as e
:
1730 self
.check_query_txt(prefix
, txt
)
1732 self
.rpc_conn
.DnssrvUpdateRecord2(
1733 dnsserver
.DNS_CLIENT_VERSION_LONGHORN
,
1736 self
.get_dns_domain(),
1741 def test_update_add_null_padded_txt_record(self
):
1742 "test adding records works"
1743 prefix
, txt
= 'pad1textrec', ['"This is a test"', '', '']
1744 p
= self
.make_txt_update(prefix
, txt
)
1745 (response
, response_packet
) =\
1746 self
.dns_transaction_udp(p
, host
=server_ip
)
1747 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_OK
)
1748 self
.check_query_txt(prefix
, txt
)
1749 self
.assertIsNotNone(
1750 dns_record_match(self
.rpc_conn
,
1752 self
.get_dns_domain(),
1753 "%s.%s" % (prefix
, self
.get_dns_domain()),
1755 '"\\"This is a test\\"" "" ""'))
1757 prefix
, txt
= 'pad2textrec', ['"This is a test"', '', '', 'more text']
1758 p
= self
.make_txt_update(prefix
, txt
)
1759 (response
, response_packet
) =\
1760 self
.dns_transaction_udp(p
, host
=server_ip
)
1761 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_OK
)
1762 self
.check_query_txt(prefix
, txt
)
1763 self
.assertIsNotNone(
1767 self
.get_dns_domain(),
1768 "%s.%s" % (prefix
, self
.get_dns_domain()),
1770 '"\\"This is a test\\"" "" "" "more text"'))
1772 prefix
, txt
= 'pad3textrec', ['', '', '"This is a test"']
1773 p
= self
.make_txt_update(prefix
, txt
)
1774 (response
, response_packet
) =\
1775 self
.dns_transaction_udp(p
, host
=server_ip
)
1776 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_OK
)
1777 self
.check_query_txt(prefix
, txt
)
1778 self
.assertIsNotNone(
1782 self
.get_dns_domain(),
1783 "%s.%s" % (prefix
, self
.get_dns_domain()),
1785 '"" "" "\\"This is a test\\""'))
1787 def test_update_add_padding_rpc_to_dns(self
):
1788 prefix
, txt
= 'pad1textrec', ['"This is a test"', '', '']
1789 prefix
= 'rpc' + prefix
1790 name
= "%s.%s" % (prefix
, self
.get_dns_domain())
1792 rec
= data_to_dns_record(dnsp
.DNS_TYPE_TXT
,
1793 '"\\"This is a test\\"" "" ""')
1794 add_rec_buf
= dnsserver
.DNS_RPC_RECORD_BUF()
1795 add_rec_buf
.rec
= rec
1797 self
.rpc_conn
.DnssrvUpdateRecord2(
1798 dnsserver
.DNS_CLIENT_VERSION_LONGHORN
,
1801 self
.get_dns_domain(),
1806 except WERRORError
as e
:
1810 self
.check_query_txt(prefix
, txt
)
1812 self
.rpc_conn
.DnssrvUpdateRecord2(
1813 dnsserver
.DNS_CLIENT_VERSION_LONGHORN
,
1816 self
.get_dns_domain(),
1821 prefix
, txt
= 'pad2textrec', ['"This is a test"', '', '', 'more text']
1822 prefix
= 'rpc' + prefix
1823 name
= "%s.%s" % (prefix
, self
.get_dns_domain())
1825 rec
= data_to_dns_record(dnsp
.DNS_TYPE_TXT
,
1826 '"\\"This is a test\\"" "" "" "more text"')
1827 add_rec_buf
= dnsserver
.DNS_RPC_RECORD_BUF()
1828 add_rec_buf
.rec
= rec
1830 self
.rpc_conn
.DnssrvUpdateRecord2(
1831 dnsserver
.DNS_CLIENT_VERSION_LONGHORN
,
1834 self
.get_dns_domain(),
1839 except WERRORError
as e
:
1843 self
.check_query_txt(prefix
, txt
)
1845 self
.rpc_conn
.DnssrvUpdateRecord2(
1846 dnsserver
.DNS_CLIENT_VERSION_LONGHORN
,
1849 self
.get_dns_domain(),
1854 prefix
, txt
= 'pad3textrec', ['', '', '"This is a test"']
1855 prefix
= 'rpc' + prefix
1856 name
= "%s.%s" % (prefix
, self
.get_dns_domain())
1858 rec
= data_to_dns_record(dnsp
.DNS_TYPE_TXT
,
1859 '"" "" "\\"This is a test\\""')
1860 add_rec_buf
= dnsserver
.DNS_RPC_RECORD_BUF()
1861 add_rec_buf
.rec
= rec
1863 self
.rpc_conn
.DnssrvUpdateRecord2(
1864 dnsserver
.DNS_CLIENT_VERSION_LONGHORN
,
1867 self
.get_dns_domain(),
1871 except WERRORError
as e
:
1875 self
.check_query_txt(prefix
, txt
)
1877 self
.rpc_conn
.DnssrvUpdateRecord2(
1878 dnsserver
.DNS_CLIENT_VERSION_LONGHORN
,
1881 self
.get_dns_domain(),
1886 # Test is incomplete due to strlen against txt records
1887 def test_update_add_null_char_txt_record(self
):
1888 "test adding records works"
1889 prefix
, txt
= 'nulltextrec', ['NULL\x00BYTE']
1890 p
= self
.make_txt_update(prefix
, txt
)
1891 (response
, response_packet
) =\
1892 self
.dns_transaction_udp(p
, host
=server_ip
)
1893 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_OK
)
1894 self
.check_query_txt(prefix
, ['NULL'])
1895 self
.assertIsNotNone(dns_record_match(self
.rpc_conn
, self
.server_ip
,
1896 self
.get_dns_domain(),
1897 "%s.%s" % (prefix
, self
.get_dns_domain()),
1898 dnsp
.DNS_TYPE_TXT
, '"NULL"'))
1900 prefix
, txt
= 'nulltextrec2', ['NULL\x00BYTE', 'NULL\x00BYTE']
1901 p
= self
.make_txt_update(prefix
, txt
)
1902 (response
, response_packet
) =\
1903 self
.dns_transaction_udp(p
, host
=server_ip
)
1904 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_OK
)
1905 self
.check_query_txt(prefix
, ['NULL', 'NULL'])
1906 self
.assertIsNotNone(dns_record_match(self
.rpc_conn
, self
.server_ip
,
1907 self
.get_dns_domain(),
1908 "%s.%s" % (prefix
, self
.get_dns_domain()),
1909 dnsp
.DNS_TYPE_TXT
, '"NULL" "NULL"'))
1911 def test_update_add_null_char_rpc_to_dns(self
):
1912 prefix
= 'rpcnulltextrec'
1913 name
= "%s.%s" % (prefix
, self
.get_dns_domain())
1915 rec
= data_to_dns_record(dnsp
.DNS_TYPE_TXT
, '"NULL\x00BYTE"')
1916 add_rec_buf
= dnsserver
.DNS_RPC_RECORD_BUF()
1917 add_rec_buf
.rec
= rec
1919 self
.rpc_conn
.DnssrvUpdateRecord2(
1920 dnsserver
.DNS_CLIENT_VERSION_LONGHORN
,
1923 self
.get_dns_domain(),
1928 except WERRORError
as e
:
1932 self
.check_query_txt(prefix
, ['NULL'])
1934 self
.rpc_conn
.DnssrvUpdateRecord2(
1935 dnsserver
.DNS_CLIENT_VERSION_LONGHORN
,
1938 self
.get_dns_domain(),
1943 def test_update_add_hex_char_txt_record(self
):
1944 "test adding records works"
1945 prefix
, txt
= 'hextextrec', ['HIGH\xFFBYTE']
1946 p
= self
.make_txt_update(prefix
, txt
)
1947 (response
, response_packet
) =\
1948 self
.dns_transaction_udp(p
, host
=server_ip
)
1949 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_OK
)
1950 self
.check_query_txt(prefix
, txt
)
1951 self
.assertIsNotNone(dns_record_match(self
.rpc_conn
, self
.server_ip
,
1952 self
.get_dns_domain(),
1953 "%s.%s" % (prefix
, self
.get_dns_domain()),
1954 dnsp
.DNS_TYPE_TXT
, '"HIGH\xFFBYTE"'))
1956 def test_update_add_hex_rpc_to_dns(self
):
1957 prefix
, txt
= 'hextextrec', ['HIGH\xFFBYTE']
1958 prefix
= 'rpc' + prefix
1959 name
= "%s.%s" % (prefix
, self
.get_dns_domain())
1961 rec
= data_to_dns_record(dnsp
.DNS_TYPE_TXT
, '"HIGH\xFFBYTE"')
1962 add_rec_buf
= dnsserver
.DNS_RPC_RECORD_BUF()
1963 add_rec_buf
.rec
= rec
1965 self
.rpc_conn
.DnssrvUpdateRecord2(
1966 dnsserver
.DNS_CLIENT_VERSION_LONGHORN
,
1969 self
.get_dns_domain(),
1974 except WERRORError
as e
:
1978 self
.check_query_txt(prefix
, txt
)
1980 self
.rpc_conn
.DnssrvUpdateRecord2(
1981 dnsserver
.DNS_CLIENT_VERSION_LONGHORN
,
1984 self
.get_dns_domain(),
1989 def test_update_add_slash_txt_record(self
):
1990 "test adding records works"
1991 prefix
, txt
= 'slashtextrec', ['Th\\=is=is a test']
1992 p
= self
.make_txt_update(prefix
, txt
)
1993 (response
, response_packet
) =\
1994 self
.dns_transaction_udp(p
, host
=server_ip
)
1995 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_OK
)
1996 self
.check_query_txt(prefix
, txt
)
1997 self
.assertIsNotNone(dns_record_match(self
.rpc_conn
, self
.server_ip
,
1998 self
.get_dns_domain(),
1999 "%s.%s" % (prefix
, self
.get_dns_domain()),
2000 dnsp
.DNS_TYPE_TXT
, '"Th\\\\=is=is a test"'))
2002 # This test fails against Windows as it eliminates slashes in RPC
2003 # One typical use for a slash is in records like 'var=value' to
2004 # escape '=' characters.
2005 def test_update_add_slash_rpc_to_dns(self
):
2006 prefix
, txt
= 'slashtextrec', ['Th\\=is=is a test']
2007 prefix
= 'rpc' + prefix
2008 name
= "%s.%s" % (prefix
, self
.get_dns_domain())
2010 rec
= data_to_dns_record(dnsp
.DNS_TYPE_TXT
, '"Th\\\\=is=is a test"')
2011 add_rec_buf
= dnsserver
.DNS_RPC_RECORD_BUF()
2012 add_rec_buf
.rec
= rec
2014 self
.rpc_conn
.DnssrvUpdateRecord2(
2015 dnsserver
.DNS_CLIENT_VERSION_LONGHORN
,
2018 self
.get_dns_domain(),
2023 except WERRORError
as e
:
2027 self
.check_query_txt(prefix
, txt
)
2030 self
.rpc_conn
.DnssrvUpdateRecord2(
2031 dnsserver
.DNS_CLIENT_VERSION_LONGHORN
,
2034 self
.get_dns_domain(),
2039 def test_update_add_two_txt_records(self
):
2040 "test adding two txt records works"
2041 prefix
, txt
= 'textrec2', ['"This is a test"',
2042 '"and this is a test, too"']
2043 p
= self
.make_txt_update(prefix
, txt
)
2044 (response
, response_packet
) =\
2045 self
.dns_transaction_udp(p
, host
=server_ip
)
2046 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_OK
)
2047 self
.check_query_txt(prefix
, txt
)
2048 self
.assertIsNotNone(dns_record_match(self
.rpc_conn
, self
.server_ip
,
2049 self
.get_dns_domain(),
2050 "%s.%s" % (prefix
, self
.get_dns_domain()),
2051 dnsp
.DNS_TYPE_TXT
, '"\\"This is a test\\""' +
2052 ' "\\"and this is a test, too\\""'))
2054 def test_update_add_two_rpc_to_dns(self
):
2055 prefix
, txt
= 'textrec2', ['"This is a test"',
2056 '"and this is a test, too"']
2057 prefix
= 'rpc' + prefix
2058 name
= "%s.%s" % (prefix
, self
.get_dns_domain())
2060 rec
= data_to_dns_record(dnsp
.DNS_TYPE_TXT
,
2061 '"\\"This is a test\\""' +
2062 ' "\\"and this is a test, too\\""')
2063 add_rec_buf
= dnsserver
.DNS_RPC_RECORD_BUF()
2064 add_rec_buf
.rec
= rec
2066 self
.rpc_conn
.DnssrvUpdateRecord2(
2067 dnsserver
.DNS_CLIENT_VERSION_LONGHORN
,
2070 self
.get_dns_domain(),
2075 except WERRORError
as e
:
2079 self
.check_query_txt(prefix
, txt
)
2081 self
.rpc_conn
.DnssrvUpdateRecord2(
2082 dnsserver
.DNS_CLIENT_VERSION_LONGHORN
,
2085 self
.get_dns_domain(),
2090 def test_update_add_empty_txt_records(self
):
2091 "test adding two txt records works"
2092 prefix
, txt
= 'emptytextrec', []
2093 p
= self
.make_txt_update(prefix
, txt
)
2094 (response
, response_packet
) =\
2095 self
.dns_transaction_udp(p
, host
=server_ip
)
2096 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_OK
)
2097 self
.check_query_txt(prefix
, txt
)
2098 self
.assertIsNotNone(dns_record_match(self
.rpc_conn
, self
.server_ip
,
2099 self
.get_dns_domain(),
2100 "%s.%s" % (prefix
, self
.get_dns_domain()),
2101 dnsp
.DNS_TYPE_TXT
, ''))
2103 def test_update_add_empty_rpc_to_dns(self
):
2104 prefix
, txt
= 'rpcemptytextrec', []
2106 name
= "%s.%s" % (prefix
, self
.get_dns_domain())
2108 rec
= data_to_dns_record(dnsp
.DNS_TYPE_TXT
, '')
2109 add_rec_buf
= dnsserver
.DNS_RPC_RECORD_BUF()
2110 add_rec_buf
.rec
= rec
2112 self
.rpc_conn
.DnssrvUpdateRecord2(
2113 dnsserver
.DNS_CLIENT_VERSION_LONGHORN
,
2116 self
.get_dns_domain(),
2120 except WERRORError
as e
:
2124 self
.check_query_txt(prefix
, txt
)
2126 self
.rpc_conn
.DnssrvUpdateRecord2(
2127 dnsserver
.DNS_CLIENT_VERSION_LONGHORN
,
2130 self
.get_dns_domain(),
2136 TestProgram(module
=__name__
, opts
=subunitopts
)