selftest: Test behaviour of DNS scavenge with an existing dNSTombstoned value
[Samba.git] / python / samba / tests / dns.py
blobbc05076c6151b4bc6d4edcea55d4d8c7863f5f22
1 # Unix SMB/CIFS implementation.
2 # Copyright (C) Kai Blin <kai@samba.org> 2011
4 # This program is free software; you can redistribute it and/or modify
5 # it under the terms of the GNU General Public License as published by
6 # the Free Software Foundation; either version 3 of the License, or
7 # (at your option) any later version.
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
14 # You should have received a copy of the GNU General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 from __future__ import print_function
20 from samba import dsdb
21 from samba.ndr import ndr_unpack, ndr_pack
22 from samba.samdb import SamDB
23 from samba.auth import system_session
24 import ldb
25 from ldb import ERR_OPERATIONS_ERROR
26 import os
27 import sys
28 import struct
29 import socket
30 import samba.ndr as ndr
31 from samba import credentials
32 from samba.dcerpc import dns, dnsp, dnsserver
33 from samba.netcmd.dns import TXTRecord, dns_record_match, data_to_dns_record
34 from samba.tests.subunitrun import SubunitOptions, TestProgram
35 from samba import werror, WERRORError
36 from samba.tests.dns_base import DNSTest
37 import samba.getopt as options
38 import optparse
41 parser = optparse.OptionParser("dns.py <server name> <server ip> [options]")
42 sambaopts = options.SambaOptions(parser)
43 parser.add_option_group(sambaopts)
45 # This timeout only has relevance when testing against Windows
46 # Format errors tend to return patchy responses, so a timeout is needed.
47 parser.add_option("--timeout", type="int", dest="timeout",
48 help="Specify timeout for DNS requests")
50 # use command line creds if available
51 credopts = options.CredentialsOptions(parser)
52 parser.add_option_group(credopts)
53 subunitopts = SubunitOptions(parser)
54 parser.add_option_group(subunitopts)
56 opts, args = parser.parse_args()
58 lp = sambaopts.get_loadparm()
59 creds = credopts.get_credentials(lp)
61 timeout = opts.timeout
63 if len(args) < 2:
64 parser.print_usage()
65 sys.exit(1)
67 server_name = args[0]
68 server_ip = args[1]
69 creds.set_krb_forwardable(credentials.NO_KRB_FORWARDABLE)
72 class TestSimpleQueries(DNSTest):
73 def setUp(self):
74 super(TestSimpleQueries, self).setUp()
75 global server, server_ip, lp, creds, timeout
76 self.server = server_name
77 self.server_ip = server_ip
78 self.lp = lp
79 self.creds = creds
80 self.timeout = timeout
82 def test_one_a_query(self):
83 "create a query packet containing one query record"
84 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
85 questions = []
87 name = "%s.%s" % (self.server, self.get_dns_domain())
88 q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
89 print("asking for ", q.name)
90 questions.append(q)
92 self.finish_name_packet(p, questions)
93 (response, response_packet) =\
94 self.dns_transaction_udp(p, host=server_ip)
95 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
96 self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
97 self.assertEquals(response.ancount, 1)
98 self.assertEquals(response.answers[0].rdata,
99 self.server_ip)
101 def test_one_SOA_query(self):
102 "create a query packet containing one query record for the SOA"
103 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
104 questions = []
106 name = "%s" % (self.get_dns_domain())
107 q = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
108 print("asking for ", q.name)
109 questions.append(q)
111 self.finish_name_packet(p, questions)
112 (response, response_packet) =\
113 self.dns_transaction_udp(p, host=server_ip)
114 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
115 self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
116 self.assertEquals(response.ancount, 1)
117 self.assertEquals(
118 response.answers[0].rdata.mname.upper(),
119 ("%s.%s" % (self.server, self.get_dns_domain())).upper())
121 def test_one_a_query_tcp(self):
122 "create a query packet containing one query record via TCP"
123 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
124 questions = []
126 name = "%s.%s" % (self.server, self.get_dns_domain())
127 q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
128 print("asking for ", q.name)
129 questions.append(q)
131 self.finish_name_packet(p, questions)
132 (response, response_packet) =\
133 self.dns_transaction_tcp(p, host=server_ip)
134 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
135 self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
136 self.assertEquals(response.ancount, 1)
137 self.assertEquals(response.answers[0].rdata,
138 self.server_ip)
140 def test_one_mx_query(self):
141 "create a query packet causing an empty RCODE_OK answer"
142 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
143 questions = []
145 name = "%s.%s" % (self.server, self.get_dns_domain())
146 q = self.make_name_question(name, dns.DNS_QTYPE_MX, dns.DNS_QCLASS_IN)
147 print("asking for ", q.name)
148 questions.append(q)
150 self.finish_name_packet(p, questions)
151 (response, response_packet) =\
152 self.dns_transaction_udp(p, host=server_ip)
153 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
154 self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
155 self.assertEquals(response.ancount, 0)
157 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
158 questions = []
160 name = "invalid-%s.%s" % (self.server, self.get_dns_domain())
161 q = self.make_name_question(name, dns.DNS_QTYPE_MX, dns.DNS_QCLASS_IN)
162 print("asking for ", q.name)
163 questions.append(q)
165 self.finish_name_packet(p, questions)
166 (response, response_packet) =\
167 self.dns_transaction_udp(p, host=server_ip)
168 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NXDOMAIN)
169 self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
170 self.assertEquals(response.ancount, 0)
172 def test_two_queries(self):
173 "create a query packet containing two query records"
174 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
175 questions = []
177 name = "%s.%s" % (self.server, self.get_dns_domain())
178 q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
179 questions.append(q)
181 name = "%s.%s" % ('bogusname', self.get_dns_domain())
182 q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
183 questions.append(q)
185 self.finish_name_packet(p, questions)
186 try:
187 (response, response_packet) =\
188 self.dns_transaction_udp(p, host=server_ip)
189 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_FORMERR)
190 except socket.timeout:
191 # Windows chooses not to respond to incorrectly formatted queries.
192 # Although this appears to be non-deterministic even for the same
193 # request twice, it also appears to be based on a how poorly the
194 # request is formatted.
195 pass
197 def test_qtype_all_query(self):
198 "create a QTYPE_ALL query"
199 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
200 questions = []
202 name = "%s.%s" % (self.server, self.get_dns_domain())
203 q = self.make_name_question(name, dns.DNS_QTYPE_ALL, dns.DNS_QCLASS_IN)
204 print("asking for ", q.name)
205 questions.append(q)
207 self.finish_name_packet(p, questions)
208 (response, response_packet) =\
209 self.dns_transaction_udp(p, host=server_ip)
211 num_answers = 1
212 dc_ipv6 = os.getenv('SERVER_IPV6')
213 if dc_ipv6 is not None:
214 num_answers += 1
216 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
217 self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
218 self.assertEquals(response.ancount, num_answers)
219 self.assertEquals(response.answers[0].rdata,
220 self.server_ip)
221 if dc_ipv6 is not None:
222 self.assertEquals(response.answers[1].rdata, dc_ipv6)
224 def test_qclass_none_query(self):
225 "create a QCLASS_NONE query"
226 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
227 questions = []
229 name = "%s.%s" % (self.server, self.get_dns_domain())
230 q = self.make_name_question(
231 name,
232 dns.DNS_QTYPE_ALL,
233 dns.DNS_QCLASS_NONE)
234 questions.append(q)
236 self.finish_name_packet(p, questions)
237 try:
238 (response, response_packet) =\
239 self.dns_transaction_udp(p, host=server_ip)
240 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NOTIMP)
241 except socket.timeout:
242 # Windows chooses not to respond to incorrectly formatted queries.
243 # Although this appears to be non-deterministic even for the same
244 # request twice, it also appears to be based on a how poorly the
245 # request is formatted.
246 pass
248 def test_soa_hostname_query(self):
249 "create a SOA query for a hostname"
250 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
251 questions = []
253 name = "%s.%s" % (self.server, self.get_dns_domain())
254 q = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
255 questions.append(q)
257 self.finish_name_packet(p, questions)
258 (response, response_packet) =\
259 self.dns_transaction_udp(p, host=server_ip)
260 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
261 self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
262 # We don't get SOA records for single hosts
263 self.assertEquals(response.ancount, 0)
264 # But we do respond with an authority section
265 self.assertEqual(response.nscount, 1)
267 def test_soa_unknown_hostname_query(self):
268 "create a SOA query for an unknown hostname"
269 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
270 questions = []
272 name = "foobar.%s" % (self.get_dns_domain())
273 q = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
274 questions.append(q)
276 self.finish_name_packet(p, questions)
277 (response, response_packet) =\
278 self.dns_transaction_udp(p, host=server_ip)
279 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NXDOMAIN)
280 self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
281 # We don't get SOA records for single hosts
282 self.assertEquals(response.ancount, 0)
283 # But we do respond with an authority section
284 self.assertEqual(response.nscount, 1)
286 def test_soa_domain_query(self):
287 "create a SOA query for a domain"
288 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
289 questions = []
291 name = self.get_dns_domain()
292 q = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
293 questions.append(q)
295 self.finish_name_packet(p, questions)
296 (response, response_packet) =\
297 self.dns_transaction_udp(p, host=server_ip)
298 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
299 self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
300 self.assertEquals(response.ancount, 1)
301 self.assertEquals(response.answers[0].rdata.minimum, 3600)
304 class TestDNSUpdates(DNSTest):
305 def setUp(self):
306 super(TestDNSUpdates, self).setUp()
307 global server, server_ip, lp, creds, timeout
308 self.server = server_name
309 self.server_ip = server_ip
310 self.lp = lp
311 self.creds = creds
312 self.timeout = timeout
314 def test_two_updates(self):
315 "create two update requests"
316 p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
317 updates = []
319 name = "%s.%s" % (self.server, self.get_dns_domain())
320 u = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
321 updates.append(u)
323 name = self.get_dns_domain()
324 u = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
325 updates.append(u)
327 self.finish_name_packet(p, updates)
328 try:
329 (response, response_packet) =\
330 self.dns_transaction_udp(p, host=server_ip)
331 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_FORMERR)
332 except socket.timeout:
333 # Windows chooses not to respond to incorrectly formatted queries.
334 # Although this appears to be non-deterministic even for the same
335 # request twice, it also appears to be based on a how poorly the
336 # request is formatted.
337 pass
339 def test_update_wrong_qclass(self):
340 "create update with DNS_QCLASS_NONE"
341 p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
342 updates = []
344 name = self.get_dns_domain()
345 u = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_NONE)
346 updates.append(u)
348 self.finish_name_packet(p, updates)
349 (response, response_packet) =\
350 self.dns_transaction_udp(p, host=server_ip)
351 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NOTIMP)
353 def test_update_prereq_with_non_null_ttl(self):
354 "test update with a non-null TTL"
355 p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
356 updates = []
358 name = self.get_dns_domain()
360 u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
361 updates.append(u)
362 self.finish_name_packet(p, updates)
364 prereqs = []
365 r = dns.res_rec()
366 r.name = "%s.%s" % (self.server, self.get_dns_domain())
367 r.rr_type = dns.DNS_QTYPE_TXT
368 r.rr_class = dns.DNS_QCLASS_NONE
369 r.ttl = 1
370 r.length = 0
371 prereqs.append(r)
373 p.ancount = len(prereqs)
374 p.answers = prereqs
376 try:
377 (response, response_packet) =\
378 self.dns_transaction_udp(p, host=server_ip)
379 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_FORMERR)
380 except socket.timeout:
381 # Windows chooses not to respond to incorrectly formatted queries.
382 # Although this appears to be non-deterministic even for the same
383 # request twice, it also appears to be based on a how poorly the
384 # request is formatted.
385 pass
387 def test_update_prereq_with_non_null_length(self):
388 "test update with a non-null length"
389 p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
390 updates = []
392 name = self.get_dns_domain()
394 u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
395 updates.append(u)
396 self.finish_name_packet(p, updates)
398 prereqs = []
399 r = dns.res_rec()
400 r.name = "%s.%s" % (self.server, self.get_dns_domain())
401 r.rr_type = dns.DNS_QTYPE_TXT
402 r.rr_class = dns.DNS_QCLASS_ANY
403 r.ttl = 0
404 r.length = 1
405 prereqs.append(r)
407 p.ancount = len(prereqs)
408 p.answers = prereqs
410 (response, response_packet) =\
411 self.dns_transaction_udp(p, host=server_ip)
412 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NXRRSET)
414 def test_update_prereq_nonexisting_name(self):
415 "test update with a nonexisting name"
416 p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
417 updates = []
419 name = self.get_dns_domain()
421 u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
422 updates.append(u)
423 self.finish_name_packet(p, updates)
425 prereqs = []
426 r = dns.res_rec()
427 r.name = "idontexist.%s" % self.get_dns_domain()
428 r.rr_type = dns.DNS_QTYPE_TXT
429 r.rr_class = dns.DNS_QCLASS_ANY
430 r.ttl = 0
431 r.length = 0
432 prereqs.append(r)
434 p.ancount = len(prereqs)
435 p.answers = prereqs
437 (response, response_packet) =\
438 self.dns_transaction_udp(p, host=server_ip)
439 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NXRRSET)
441 def test_update_add_txt_record(self):
442 "test adding records works"
443 prefix, txt = 'textrec', ['"This is a test"']
444 p = self.make_txt_update(prefix, txt)
445 (response, response_packet) =\
446 self.dns_transaction_udp(p, host=server_ip)
447 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
448 self.check_query_txt(prefix, txt)
450 def test_delete_record(self):
451 "Test if deleting records works"
453 NAME = "deleterec.%s" % self.get_dns_domain()
455 # First, create a record to make sure we have a record to delete.
456 p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
457 updates = []
459 name = self.get_dns_domain()
461 u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
462 updates.append(u)
463 self.finish_name_packet(p, updates)
465 updates = []
466 r = dns.res_rec()
467 r.name = NAME
468 r.rr_type = dns.DNS_QTYPE_TXT
469 r.rr_class = dns.DNS_QCLASS_IN
470 r.ttl = 900
471 r.length = 0xffff
472 rdata = self.make_txt_record(['"This is a test"'])
473 r.rdata = rdata
474 updates.append(r)
475 p.nscount = len(updates)
476 p.nsrecs = updates
478 (response, response_packet) =\
479 self.dns_transaction_udp(p, host=server_ip)
480 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
482 # Now check the record is around
483 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
484 questions = []
485 q = self.make_name_question(NAME, dns.DNS_QTYPE_TXT, dns.DNS_QCLASS_IN)
486 questions.append(q)
488 self.finish_name_packet(p, questions)
489 (response, response_packet) =\
490 self.dns_transaction_udp(p, host=server_ip)
491 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
493 # Now delete the record
494 p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
495 updates = []
497 name = self.get_dns_domain()
499 u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
500 updates.append(u)
501 self.finish_name_packet(p, updates)
503 updates = []
504 r = dns.res_rec()
505 r.name = NAME
506 r.rr_type = dns.DNS_QTYPE_TXT
507 r.rr_class = dns.DNS_QCLASS_NONE
508 r.ttl = 0
509 r.length = 0xffff
510 rdata = self.make_txt_record(['"This is a test"'])
511 r.rdata = rdata
512 updates.append(r)
513 p.nscount = len(updates)
514 p.nsrecs = updates
516 (response, response_packet) =\
517 self.dns_transaction_udp(p, host=server_ip)
518 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
520 # And finally check it's gone
521 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
522 questions = []
524 q = self.make_name_question(NAME, dns.DNS_QTYPE_TXT, dns.DNS_QCLASS_IN)
525 questions.append(q)
527 self.finish_name_packet(p, questions)
528 (response, response_packet) =\
529 self.dns_transaction_udp(p, host=server_ip)
530 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NXDOMAIN)
532 def test_readd_record(self):
533 "Test if adding, deleting and then readding a records works"
535 NAME = "readdrec.%s" % self.get_dns_domain()
537 # Create the record
538 p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
539 updates = []
541 name = self.get_dns_domain()
543 u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
544 updates.append(u)
545 self.finish_name_packet(p, updates)
547 updates = []
548 r = dns.res_rec()
549 r.name = NAME
550 r.rr_type = dns.DNS_QTYPE_TXT
551 r.rr_class = dns.DNS_QCLASS_IN
552 r.ttl = 900
553 r.length = 0xffff
554 rdata = self.make_txt_record(['"This is a test"'])
555 r.rdata = rdata
556 updates.append(r)
557 p.nscount = len(updates)
558 p.nsrecs = updates
560 (response, response_packet) =\
561 self.dns_transaction_udp(p, host=server_ip)
562 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
564 # Now check the record is around
565 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
566 questions = []
567 q = self.make_name_question(NAME, dns.DNS_QTYPE_TXT, dns.DNS_QCLASS_IN)
568 questions.append(q)
570 self.finish_name_packet(p, questions)
571 (response, response_packet) =\
572 self.dns_transaction_udp(p, host=server_ip)
573 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
575 # Now delete the record
576 p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
577 updates = []
579 name = self.get_dns_domain()
581 u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
582 updates.append(u)
583 self.finish_name_packet(p, updates)
585 updates = []
586 r = dns.res_rec()
587 r.name = NAME
588 r.rr_type = dns.DNS_QTYPE_TXT
589 r.rr_class = dns.DNS_QCLASS_NONE
590 r.ttl = 0
591 r.length = 0xffff
592 rdata = self.make_txt_record(['"This is a test"'])
593 r.rdata = rdata
594 updates.append(r)
595 p.nscount = len(updates)
596 p.nsrecs = updates
598 (response, response_packet) =\
599 self.dns_transaction_udp(p, host=server_ip)
600 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
602 # check it's gone
603 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
604 questions = []
606 q = self.make_name_question(NAME, dns.DNS_QTYPE_TXT, dns.DNS_QCLASS_IN)
607 questions.append(q)
609 self.finish_name_packet(p, questions)
610 (response, response_packet) =\
611 self.dns_transaction_udp(p, host=server_ip)
612 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NXDOMAIN)
614 # recreate the record
615 p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
616 updates = []
618 name = self.get_dns_domain()
620 u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
621 updates.append(u)
622 self.finish_name_packet(p, updates)
624 updates = []
625 r = dns.res_rec()
626 r.name = NAME
627 r.rr_type = dns.DNS_QTYPE_TXT
628 r.rr_class = dns.DNS_QCLASS_IN
629 r.ttl = 900
630 r.length = 0xffff
631 rdata = self.make_txt_record(['"This is a test"'])
632 r.rdata = rdata
633 updates.append(r)
634 p.nscount = len(updates)
635 p.nsrecs = updates
637 (response, response_packet) =\
638 self.dns_transaction_udp(p, host=server_ip)
639 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
641 # Now check the record is around
642 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
643 questions = []
644 q = self.make_name_question(NAME, dns.DNS_QTYPE_TXT, dns.DNS_QCLASS_IN)
645 questions.append(q)
647 self.finish_name_packet(p, questions)
648 (response, response_packet) =\
649 self.dns_transaction_udp(p, host=server_ip)
650 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
652 def test_update_add_mx_record(self):
653 "test adding MX records works"
654 p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
655 updates = []
657 name = self.get_dns_domain()
659 u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
660 updates.append(u)
661 self.finish_name_packet(p, updates)
663 updates = []
664 r = dns.res_rec()
665 r.name = "%s" % self.get_dns_domain()
666 r.rr_type = dns.DNS_QTYPE_MX
667 r.rr_class = dns.DNS_QCLASS_IN
668 r.ttl = 900
669 r.length = 0xffff
670 rdata = dns.mx_record()
671 rdata.preference = 10
672 rdata.exchange = 'mail.%s' % self.get_dns_domain()
673 r.rdata = rdata
674 updates.append(r)
675 p.nscount = len(updates)
676 p.nsrecs = updates
678 (response, response_packet) =\
679 self.dns_transaction_udp(p, host=server_ip)
680 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
682 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
683 questions = []
685 name = "%s" % self.get_dns_domain()
686 q = self.make_name_question(name, dns.DNS_QTYPE_MX, dns.DNS_QCLASS_IN)
687 questions.append(q)
689 self.finish_name_packet(p, questions)
690 (response, response_packet) =\
691 self.dns_transaction_udp(p, host=server_ip)
692 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
693 self.assertEqual(response.ancount, 1)
694 ans = response.answers[0]
695 self.assertEqual(ans.rr_type, dns.DNS_QTYPE_MX)
696 self.assertEqual(ans.rdata.preference, 10)
697 self.assertEqual(ans.rdata.exchange, 'mail.%s' % self.get_dns_domain())
700 class TestComplexQueries(DNSTest):
701 def make_dns_update(self, key, value, qtype):
702 p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
704 name = self.get_dns_domain()
705 u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
706 self.finish_name_packet(p, [u])
708 r = dns.res_rec()
709 r.name = key
710 r.rr_type = qtype
711 r.rr_class = dns.DNS_QCLASS_IN
712 r.ttl = 900
713 r.length = 0xffff
714 r.rdata = value
715 p.nscount = 1
716 p.nsrecs = [r]
717 (response, response_packet) =\
718 self.dns_transaction_udp(p, host=server_ip)
719 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
721 def setUp(self):
722 super(TestComplexQueries, self).setUp()
724 global server, server_ip, lp, creds, timeout
725 self.server = server_name
726 self.server_ip = server_ip
727 self.lp = lp
728 self.creds = creds
729 self.timeout = timeout
731 def test_one_a_query(self):
732 "create a query packet containing one query record"
734 try:
736 # Create the record
737 name = "cname_test.%s" % self.get_dns_domain()
738 rdata = "%s.%s" % (self.server, self.get_dns_domain())
739 self.make_dns_update(name, rdata, dns.DNS_QTYPE_CNAME)
741 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
742 questions = []
744 # Check the record
745 name = "cname_test.%s" % self.get_dns_domain()
746 q = self.make_name_question(name,
747 dns.DNS_QTYPE_A,
748 dns.DNS_QCLASS_IN)
749 print("asking for ", q.name)
750 questions.append(q)
752 self.finish_name_packet(p, questions)
753 (response, response_packet) =\
754 self.dns_transaction_udp(p, host=self.server_ip)
755 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
756 self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
757 self.assertEquals(response.ancount, 2)
758 self.assertEquals(response.answers[0].rr_type, dns.DNS_QTYPE_CNAME)
759 self.assertEquals(response.answers[0].rdata, "%s.%s" %
760 (self.server, self.get_dns_domain()))
761 self.assertEquals(response.answers[1].rr_type, dns.DNS_QTYPE_A)
762 self.assertEquals(response.answers[1].rdata,
763 self.server_ip)
765 finally:
766 # Delete the record
767 p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
768 updates = []
770 name = self.get_dns_domain()
772 u = self.make_name_question(name,
773 dns.DNS_QTYPE_SOA,
774 dns.DNS_QCLASS_IN)
775 updates.append(u)
776 self.finish_name_packet(p, updates)
778 updates = []
779 r = dns.res_rec()
780 r.name = "cname_test.%s" % self.get_dns_domain()
781 r.rr_type = dns.DNS_QTYPE_CNAME
782 r.rr_class = dns.DNS_QCLASS_NONE
783 r.ttl = 0
784 r.length = 0xffff
785 r.rdata = "%s.%s" % (self.server, self.get_dns_domain())
786 updates.append(r)
787 p.nscount = len(updates)
788 p.nsrecs = updates
790 (response, response_packet) =\
791 self.dns_transaction_udp(p, host=self.server_ip)
792 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
794 def test_cname_two_chain(self):
795 name0 = "cnamechain0.%s" % self.get_dns_domain()
796 name1 = "cnamechain1.%s" % self.get_dns_domain()
797 name2 = "cnamechain2.%s" % self.get_dns_domain()
798 self.make_dns_update(name1, name2, dns.DNS_QTYPE_CNAME)
799 self.make_dns_update(name2, name0, dns.DNS_QTYPE_CNAME)
800 self.make_dns_update(name0, server_ip, dns.DNS_QTYPE_A)
802 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
803 questions = []
804 q = self.make_name_question(name1, dns.DNS_QTYPE_A,
805 dns.DNS_QCLASS_IN)
806 questions.append(q)
808 self.finish_name_packet(p, questions)
809 (response, response_packet) =\
810 self.dns_transaction_udp(p, host=server_ip)
811 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
812 self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
813 self.assertEquals(response.ancount, 3)
815 self.assertEquals(response.answers[0].rr_type, dns.DNS_QTYPE_CNAME)
816 self.assertEquals(response.answers[0].name, name1)
817 self.assertEquals(response.answers[0].rdata, name2)
819 self.assertEquals(response.answers[1].rr_type, dns.DNS_QTYPE_CNAME)
820 self.assertEquals(response.answers[1].name, name2)
821 self.assertEquals(response.answers[1].rdata, name0)
823 self.assertEquals(response.answers[2].rr_type, dns.DNS_QTYPE_A)
824 self.assertEquals(response.answers[2].rdata,
825 self.server_ip)
827 def test_invalid_empty_cname(self):
828 name0 = "cnamedotprefix0.%s" % self.get_dns_domain()
829 try:
830 self.make_dns_update(name0, "", dns.DNS_QTYPE_CNAME)
831 except AssertionError:
832 pass
833 else:
834 self.fail("Successfully added empty CNAME, which is invalid.")
836 def test_cname_two_chain_not_matching_qtype(self):
837 name0 = "cnamechain0.%s" % self.get_dns_domain()
838 name1 = "cnamechain1.%s" % self.get_dns_domain()
839 name2 = "cnamechain2.%s" % self.get_dns_domain()
840 self.make_dns_update(name1, name2, dns.DNS_QTYPE_CNAME)
841 self.make_dns_update(name2, name0, dns.DNS_QTYPE_CNAME)
842 self.make_dns_update(name0, server_ip, dns.DNS_QTYPE_A)
844 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
845 questions = []
846 q = self.make_name_question(name1, dns.DNS_QTYPE_TXT,
847 dns.DNS_QCLASS_IN)
848 questions.append(q)
850 self.finish_name_packet(p, questions)
851 (response, response_packet) =\
852 self.dns_transaction_udp(p, host=server_ip)
853 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
854 self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
856 # CNAME should return all intermediate results!
857 # Only the A records exists, not the TXT.
858 self.assertEquals(response.ancount, 2)
860 self.assertEquals(response.answers[0].rr_type, dns.DNS_QTYPE_CNAME)
861 self.assertEquals(response.answers[0].name, name1)
862 self.assertEquals(response.answers[0].rdata, name2)
864 self.assertEquals(response.answers[1].rr_type, dns.DNS_QTYPE_CNAME)
865 self.assertEquals(response.answers[1].name, name2)
866 self.assertEquals(response.answers[1].rdata, name0)
868 def test_cname_loop(self):
869 cname1 = "cnamelooptestrec." + self.get_dns_domain()
870 cname2 = "cnamelooptestrec2." + self.get_dns_domain()
871 cname3 = "cnamelooptestrec3." + self.get_dns_domain()
872 self.make_dns_update(cname1, cname2, dnsp.DNS_TYPE_CNAME)
873 self.make_dns_update(cname2, cname3, dnsp.DNS_TYPE_CNAME)
874 self.make_dns_update(cname3, cname1, dnsp.DNS_TYPE_CNAME)
876 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
877 questions = []
879 q = self.make_name_question(cname1,
880 dns.DNS_QTYPE_A,
881 dns.DNS_QCLASS_IN)
882 questions.append(q)
883 self.finish_name_packet(p, questions)
885 (response, response_packet) =\
886 self.dns_transaction_udp(p, host=self.server_ip)
888 max_recursion_depth = 20
889 self.assertEquals(len(response.answers), max_recursion_depth)
891 # Make sure cname limit doesn't count other records. This is a generic
892 # test called in tests below
893 def max_rec_test(self, rtype, rec_gen):
894 name = "limittestrec{0}.{1}".format(rtype, self.get_dns_domain())
895 limit = 20
896 num_recs_to_enter = limit + 5
898 for i in range(1, num_recs_to_enter+1):
899 ip = rec_gen(i)
900 self.make_dns_update(name, ip, rtype)
902 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
903 questions = []
905 q = self.make_name_question(name,
906 rtype,
907 dns.DNS_QCLASS_IN)
908 questions.append(q)
909 self.finish_name_packet(p, questions)
911 (response, response_packet) =\
912 self.dns_transaction_udp(p, host=self.server_ip)
914 self.assertEqual(len(response.answers), num_recs_to_enter)
916 def test_record_limit_A(self):
917 def ip4_gen(i):
918 return "127.0.0." + str(i)
919 self.max_rec_test(rtype=dns.DNS_QTYPE_A, rec_gen=ip4_gen)
921 def test_record_limit_AAAA(self):
922 def ip6_gen(i):
923 return "AAAA:0:0:0:0:0:0:" + str(i)
924 self.max_rec_test(rtype=dns.DNS_QTYPE_AAAA, rec_gen=ip6_gen)
926 def test_record_limit_SRV(self):
927 def srv_gen(i):
928 rec = dns.srv_record()
929 rec.priority = 1
930 rec.weight = 1
931 rec.port = 92
932 rec.target = "srvtestrec" + str(i)
933 return rec
934 self.max_rec_test(rtype=dns.DNS_QTYPE_SRV, rec_gen=srv_gen)
936 # Same as test_record_limit_A but with a preceding CNAME follow
937 def test_cname_limit(self):
938 cname1 = "cnamelimittestrec." + self.get_dns_domain()
939 cname2 = "cnamelimittestrec2." + self.get_dns_domain()
940 cname3 = "cnamelimittestrec3." + self.get_dns_domain()
941 ip_prefix = '127.0.0.'
942 limit = 20
943 num_recs_to_enter = limit + 5
945 self.make_dns_update(cname1, cname2, dnsp.DNS_TYPE_CNAME)
946 self.make_dns_update(cname2, cname3, dnsp.DNS_TYPE_CNAME)
947 num_arecs_to_enter = num_recs_to_enter - 2
948 for i in range(1, num_arecs_to_enter+1):
949 ip = ip_prefix + str(i)
950 self.make_dns_update(cname3, ip, dns.DNS_QTYPE_A)
952 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
953 questions = []
955 q = self.make_name_question(cname1,
956 dns.DNS_QTYPE_A,
957 dns.DNS_QCLASS_IN)
958 questions.append(q)
959 self.finish_name_packet(p, questions)
961 (response, response_packet) =\
962 self.dns_transaction_udp(p, host=self.server_ip)
964 self.assertEqual(len(response.answers), num_recs_to_enter)
966 # ANY query on cname record shouldn't follow the link
967 def test_cname_any_query(self):
968 cname1 = "cnameanytestrec." + self.get_dns_domain()
969 cname2 = "cnameanytestrec2." + self.get_dns_domain()
970 cname3 = "cnameanytestrec3." + self.get_dns_domain()
972 self.make_dns_update(cname1, cname2, dnsp.DNS_TYPE_CNAME)
973 self.make_dns_update(cname2, cname3, dnsp.DNS_TYPE_CNAME)
975 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
976 questions = []
978 q = self.make_name_question(cname1,
979 dns.DNS_QTYPE_ALL,
980 dns.DNS_QCLASS_IN)
981 questions.append(q)
982 self.finish_name_packet(p, questions)
984 (response, response_packet) =\
985 self.dns_transaction_udp(p, host=self.server_ip)
987 self.assertEqual(len(response.answers), 1)
988 self.assertEqual(response.answers[0].name, cname1)
989 self.assertEqual(response.answers[0].rdata, cname2)
992 class TestInvalidQueries(DNSTest):
993 def setUp(self):
994 super(TestInvalidQueries, self).setUp()
995 global server, server_ip, lp, creds, timeout
996 self.server = server_name
997 self.server_ip = server_ip
998 self.lp = lp
999 self.creds = creds
1000 self.timeout = timeout
1002 def test_one_a_query(self):
1003 """send 0 bytes follows by create a query packet
1004 containing one query record"""
1006 s = None
1007 try:
1008 s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, 0)
1009 s.connect((self.server_ip, 53))
1010 s.send(b"", 0)
1011 finally:
1012 if s is not None:
1013 s.close()
1015 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
1016 questions = []
1018 name = "%s.%s" % (self.server, self.get_dns_domain())
1019 q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
1020 print("asking for ", q.name)
1021 questions.append(q)
1023 self.finish_name_packet(p, questions)
1024 (response, response_packet) =\
1025 self.dns_transaction_udp(p, host=self.server_ip)
1026 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
1027 self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
1028 self.assertEquals(response.ancount, 1)
1029 self.assertEquals(response.answers[0].rdata,
1030 self.server_ip)
1032 def test_one_a_reply(self):
1033 "send a reply instead of a query"
1034 global timeout
1036 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
1037 questions = []
1039 name = "%s.%s" % ('fakefakefake', self.get_dns_domain())
1040 q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
1041 print("asking for ", q.name)
1042 questions.append(q)
1044 self.finish_name_packet(p, questions)
1045 p.operation |= dns.DNS_FLAG_REPLY
1046 s = None
1047 try:
1048 send_packet = ndr.ndr_pack(p)
1049 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
1050 s.settimeout(timeout)
1051 host = self.server_ip
1052 s.connect((host, 53))
1053 tcp_packet = struct.pack('!H', len(send_packet))
1054 tcp_packet += send_packet
1055 s.send(tcp_packet, 0)
1056 recv_packet = s.recv(0xffff + 2, 0)
1057 self.assertEquals(0, len(recv_packet))
1058 except socket.timeout:
1059 # Windows chooses not to respond to incorrectly formatted queries.
1060 # Although this appears to be non-deterministic even for the same
1061 # request twice, it also appears to be based on a how poorly the
1062 # request is formatted.
1063 pass
1064 finally:
1065 if s is not None:
1066 s.close()
1069 class TestZones(DNSTest):
1070 def setUp(self):
1071 super(TestZones, self).setUp()
1072 global server, server_ip, lp, creds, timeout
1073 self.server = server_name
1074 self.server_ip = server_ip
1075 self.lp = lp
1076 self.creds = creds
1077 self.timeout = timeout
1079 self.zone = "test.lan"
1080 self.rpc_conn = dnsserver.dnsserver("ncacn_ip_tcp:%s[sign]" %
1081 (self.server_ip),
1082 self.lp, self.creds)
1084 self.samdb = SamDB(url="ldap://" + self.server_ip,
1085 lp=self.get_loadparm(),
1086 session_info=system_session(),
1087 credentials=self.creds)
1088 self.zone_dn = "DC=" + self.zone +\
1089 ",CN=MicrosoftDNS,DC=DomainDNSZones," +\
1090 str(self.samdb.get_default_basedn())
1092 def tearDown(self):
1093 super(TestZones, self).tearDown()
1095 try:
1096 self.delete_zone(self.zone)
1097 except RuntimeError as e:
1098 (num, string) = e.args
1099 if num != werror.WERR_DNS_ERROR_ZONE_DOES_NOT_EXIST:
1100 raise
1102 def make_zone_obj(self, zone, aging_enabled=False):
1103 zone_create = dnsserver.DNS_RPC_ZONE_CREATE_INFO_LONGHORN()
1104 zone_create.pszZoneName = zone
1105 zone_create.dwZoneType = dnsp.DNS_ZONE_TYPE_PRIMARY
1106 zone_create.fAging = int(aging_enabled)
1107 zone_create.dwDpFlags = dnsserver.DNS_DP_DOMAIN_DEFAULT
1108 zone_create.fDsIntegrated = 1
1109 zone_create.fLoadExisting = 1
1110 zone_create.fAllowUpdate = dnsp.DNS_ZONE_UPDATE_UNSECURE
1111 return zone_create
1113 def create_zone(self, zone, aging_enabled=False):
1114 zone_create = self.make_zone_obj(zone, aging_enabled)
1115 try:
1116 client_version = dnsserver.DNS_CLIENT_VERSION_LONGHORN
1117 self.rpc_conn.DnssrvOperation2(client_version,
1119 self.server_ip,
1120 None,
1122 'ZoneCreate',
1123 dnsserver.DNSSRV_TYPEID_ZONE_CREATE,
1124 zone_create)
1125 except WERRORError as e:
1126 self.fail(e)
1128 def set_params(self, **kwargs):
1129 zone = kwargs.pop('zone', None)
1130 for key, val in kwargs.items():
1131 name_param = dnsserver.DNS_RPC_NAME_AND_PARAM()
1132 name_param.dwParam = val
1133 name_param.pszNodeName = key
1135 client_version = dnsserver.DNS_CLIENT_VERSION_LONGHORN
1136 nap_type = dnsserver.DNSSRV_TYPEID_NAME_AND_PARAM
1137 try:
1138 self.rpc_conn.DnssrvOperation2(client_version,
1140 self.server,
1141 zone,
1143 'ResetDwordProperty',
1144 nap_type,
1145 name_param)
1146 except WERRORError as e:
1147 self.fail(str(e))
1149 def ldap_modify_dnsrecs(self, name, func):
1150 dn = 'DC={0},{1}'.format(name, self.zone_dn)
1151 dns_recs = self.ldap_get_dns_records(name)
1152 for rec in dns_recs:
1153 func(rec)
1154 update_dict = {'dn': dn, 'dnsRecord': [ndr_pack(r) for r in dns_recs]}
1155 self.samdb.modify(ldb.Message.from_dict(self.samdb,
1156 update_dict,
1157 ldb.FLAG_MOD_REPLACE))
1159 def dns_update_record(self, prefix, txt):
1160 p = self.make_txt_update(prefix, txt, self.zone)
1161 (code, response) = self.dns_transaction_udp(p, host=self.server_ip)
1162 self.assert_dns_rcode_equals(code, dns.DNS_RCODE_OK)
1163 recs = self.ldap_get_dns_records(prefix)
1164 recs = [r for r in recs if r.data.str == txt]
1165 self.assertEqual(len(recs), 1)
1166 return recs[0]
1168 def dns_tombstone(self, prefix, txt, zone):
1169 name = prefix + "." + zone
1171 to = dnsp.DnssrvRpcRecord()
1172 to.dwTimeStamp = 1000
1173 to.wType = dnsp.DNS_TYPE_TOMBSTONE
1175 self.samdb.dns_replace(name, [to])
1177 def ldap_get_records(self, name):
1178 # The use of SCOPE_SUBTREE here avoids raising an exception in the
1179 # 0 results case for a test below.
1181 expr = "(&(objectClass=dnsNode)(name={0}))".format(name)
1182 return self.samdb.search(base=self.zone_dn, scope=ldb.SCOPE_SUBTREE,
1183 expression=expr, attrs=["*"])
1185 def ldap_get_dns_records(self, name):
1186 records = self.ldap_get_records(name)
1187 return [ndr_unpack(dnsp.DnssrvRpcRecord, r)
1188 for r in records[0].get('dnsRecord')]
1190 def ldap_get_zone_settings(self):
1191 records = self.samdb.search(base=self.zone_dn, scope=ldb.SCOPE_BASE,
1192 expression="(&(objectClass=dnsZone)" +
1193 "(name={0}))".format(self.zone),
1194 attrs=["dNSProperty"])
1195 self.assertEqual(len(records), 1)
1196 props = [ndr_unpack(dnsp.DnsProperty, r)
1197 for r in records[0].get('dNSProperty')]
1199 # We have no choice but to repeat these here.
1200 zone_prop_ids = {0x00: "EMPTY",
1201 0x01: "TYPE",
1202 0x02: "ALLOW_UPDATE",
1203 0x08: "SECURE_TIME",
1204 0x10: "NOREFRESH_INTERVAL",
1205 0x11: "SCAVENGING_SERVERS",
1206 0x12: "AGING_ENABLED_TIME",
1207 0x20: "REFRESH_INTERVAL",
1208 0x40: "AGING_STATE",
1209 0x80: "DELETED_FROM_HOSTNAME",
1210 0x81: "MASTER_SERVERS",
1211 0x82: "AUTO_NS_SERVERS",
1212 0x83: "DCPROMO_CONVERT",
1213 0x90: "SCAVENGING_SERVERS_DA",
1214 0x91: "MASTER_SERVERS_DA",
1215 0x92: "NS_SERVERS_DA",
1216 0x100: "NODE_DBFLAGS"}
1217 return {zone_prop_ids[p.id].lower(): p.data for p in props}
1219 def set_aging(self, enable=False):
1220 self.create_zone(self.zone, aging_enabled=enable)
1221 self.set_params(NoRefreshInterval=1, RefreshInterval=1,
1222 Aging=int(bool(enable)), zone=self.zone,
1223 AllowUpdate=dnsp.DNS_ZONE_UPDATE_UNSECURE)
1225 def test_set_aging(self, enable=True, name='agingtest', txt=['test txt']):
1226 self.set_aging(enable=True)
1227 settings = self.ldap_get_zone_settings()
1228 self.assertTrue(settings['aging_state'] is not None)
1229 self.assertTrue(settings['aging_state'])
1231 rec = self.dns_update_record('agingtest', ['test txt'])
1232 self.assertNotEqual(rec.dwTimeStamp, 0)
1234 def test_set_aging_disabled(self):
1235 self.set_aging(enable=False)
1236 settings = self.ldap_get_zone_settings()
1237 self.assertTrue(settings['aging_state'] is not None)
1238 self.assertFalse(settings['aging_state'])
1240 rec = self.dns_update_record('agingtest', ['test txt'])
1241 self.assertNotEqual(rec.dwTimeStamp, 0)
1243 def test_aging_update(self, enable=True):
1244 name, txt = 'agingtest', ['test txt']
1245 self.set_aging(enable=True)
1246 before_mod = self.dns_update_record(name, txt)
1247 if not enable:
1248 self.set_params(zone=self.zone, Aging=0)
1249 dec = 2
1251 def mod_ts(rec):
1252 self.assertTrue(rec.dwTimeStamp > 0)
1253 rec.dwTimeStamp -= dec
1254 self.ldap_modify_dnsrecs(name, mod_ts)
1255 after_mod = self.ldap_get_dns_records(name)
1256 self.assertEqual(len(after_mod), 1)
1257 after_mod = after_mod[0]
1258 self.assertEqual(after_mod.dwTimeStamp,
1259 before_mod.dwTimeStamp - dec)
1260 after_update = self.dns_update_record(name, txt)
1261 after_should_equal = before_mod if enable else after_mod
1262 self.assertEqual(after_should_equal.dwTimeStamp,
1263 after_update.dwTimeStamp)
1265 def test_aging_update_disabled(self):
1266 self.test_aging_update(enable=False)
1268 def test_aging_refresh(self):
1269 name, txt = 'agingtest', ['test txt']
1270 self.create_zone(self.zone, aging_enabled=True)
1271 interval = 10
1272 self.set_params(NoRefreshInterval=interval, RefreshInterval=interval,
1273 Aging=1, zone=self.zone,
1274 AllowUpdate=dnsp.DNS_ZONE_UPDATE_UNSECURE)
1275 before_mod = self.dns_update_record(name, txt)
1277 def mod_ts(rec):
1278 self.assertTrue(rec.dwTimeStamp > 0)
1279 rec.dwTimeStamp -= interval // 2
1280 self.ldap_modify_dnsrecs(name, mod_ts)
1281 update_during_norefresh = self.dns_update_record(name, txt)
1283 def mod_ts(rec):
1284 self.assertTrue(rec.dwTimeStamp > 0)
1285 rec.dwTimeStamp -= interval + interval // 2
1286 self.ldap_modify_dnsrecs(name, mod_ts)
1287 update_during_refresh = self.dns_update_record(name, txt)
1288 self.assertEqual(update_during_norefresh.dwTimeStamp,
1289 before_mod.dwTimeStamp - interval / 2)
1290 self.assertEqual(update_during_refresh.dwTimeStamp,
1291 before_mod.dwTimeStamp)
1293 def test_rpc_add_no_timestamp(self):
1294 name, txt = 'agingtest', ['test txt']
1295 self.set_aging(enable=True)
1296 rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
1297 rec_buf.rec = TXTRecord(txt)
1298 self.rpc_conn.DnssrvUpdateRecord2(
1299 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1301 self.server_ip,
1302 self.zone,
1303 name,
1304 rec_buf,
1305 None)
1306 recs = self.ldap_get_dns_records(name)
1307 self.assertEqual(len(recs), 1)
1308 self.assertEqual(recs[0].dwTimeStamp, 0)
1310 def test_static_record_dynamic_update(self):
1311 name, txt = 'agingtest', ['test txt']
1312 txt2 = ['test txt2']
1313 self.set_aging(enable=True)
1314 rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
1315 rec_buf.rec = TXTRecord(txt)
1316 self.rpc_conn.DnssrvUpdateRecord2(
1317 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1319 self.server_ip,
1320 self.zone,
1321 name,
1322 rec_buf,
1323 None)
1325 rec2 = self.dns_update_record(name, txt2)
1326 self.assertEqual(rec2.dwTimeStamp, 0)
1328 def test_dynamic_record_static_update(self):
1329 name, txt = 'agingtest', ['test txt']
1330 txt2 = ['test txt2']
1331 txt3 = ['test txt3']
1332 self.set_aging(enable=True)
1334 self.dns_update_record(name, txt)
1336 rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
1337 rec_buf.rec = TXTRecord(txt2)
1338 self.rpc_conn.DnssrvUpdateRecord2(
1339 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1341 self.server_ip,
1342 self.zone,
1343 name,
1344 rec_buf,
1345 None)
1347 self.dns_update_record(name, txt3)
1349 recs = self.ldap_get_dns_records(name)
1350 # Put in dict because ldap recs might be out of order
1351 recs = {str(r.data.str): r for r in recs}
1352 self.assertNotEqual(recs[str(txt)].dwTimeStamp, 0)
1353 self.assertEqual(recs[str(txt2)].dwTimeStamp, 0)
1354 self.assertEqual(recs[str(txt3)].dwTimeStamp, 0)
1356 def test_dns_tombstone_custom_match_rule(self):
1357 lp = self.get_loadparm()
1358 self.samdb = SamDB(url=lp.samdb_url(), lp=lp,
1359 session_info=system_session(),
1360 credentials=self.creds)
1362 name, txt = 'agingtest', ['test txt']
1363 name2, txt2 = 'agingtest2', ['test txt2']
1364 name3, txt3 = 'agingtest3', ['test txt3']
1365 name4, txt4 = 'agingtest4', ['test txt4']
1366 name5, txt5 = 'agingtest5', ['test txt5']
1368 self.create_zone(self.zone, aging_enabled=True)
1369 interval = 10
1370 self.set_params(NoRefreshInterval=interval, RefreshInterval=interval,
1371 Aging=1, zone=self.zone,
1372 AllowUpdate=dnsp.DNS_ZONE_UPDATE_UNSECURE)
1374 self.dns_update_record(name, txt)
1376 self.dns_update_record(name2, txt)
1377 self.dns_update_record(name2, txt2)
1379 self.dns_update_record(name3, txt)
1380 self.dns_update_record(name3, txt2)
1381 last_update = self.dns_update_record(name3, txt3)
1383 # Modify txt1 of the first 2 names
1384 def mod_ts(rec):
1385 if rec.data.str == txt:
1386 rec.dwTimeStamp -= 2
1387 self.ldap_modify_dnsrecs(name, mod_ts)
1388 self.ldap_modify_dnsrecs(name2, mod_ts)
1390 # create a static dns record.
1391 rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
1392 rec_buf.rec = TXTRecord(txt4)
1393 self.rpc_conn.DnssrvUpdateRecord2(
1394 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1396 self.server_ip,
1397 self.zone,
1398 name4,
1399 rec_buf,
1400 None)
1402 # Create a tomb stoned record.
1403 self.dns_update_record(name5, txt5)
1404 self.dns_tombstone(name5, txt5, self.zone)
1406 self.ldap_get_dns_records(name3)
1407 expr = "(dnsRecord:1.3.6.1.4.1.7165.4.5.3:={0})"
1408 expr = expr.format(int(last_update.dwTimeStamp) - 1)
1409 try:
1410 res = self.samdb.search(base=self.zone_dn, scope=ldb.SCOPE_SUBTREE,
1411 expression=expr, attrs=["*"])
1412 except ldb.LdbError as e:
1413 self.fail(str(e))
1414 updated_names = {str(r.get('name')) for r in res}
1415 self.assertEqual(updated_names, set([name, name2]))
1417 def test_dns_tombstone_custom_match_rule_no_records(self):
1418 lp = self.get_loadparm()
1419 self.samdb = SamDB(url=lp.samdb_url(), lp=lp,
1420 session_info=system_session(),
1421 credentials=self.creds)
1423 self.create_zone(self.zone, aging_enabled=True)
1424 interval = 10
1425 self.set_params(NoRefreshInterval=interval, RefreshInterval=interval,
1426 Aging=1, zone=self.zone,
1427 AllowUpdate=dnsp.DNS_ZONE_UPDATE_UNSECURE)
1429 expr = "(dnsRecord:1.3.6.1.4.1.7165.4.5.3:={0})"
1430 expr = expr.format(1)
1432 try:
1433 res = self.samdb.search(base=self.zone_dn, scope=ldb.SCOPE_SUBTREE,
1434 expression=expr, attrs=["*"])
1435 except ldb.LdbError as e:
1436 self.fail(str(e))
1437 self.assertEqual(0, len(res))
1439 def test_dns_tombstone_custom_match_rule_fail(self):
1440 self.create_zone(self.zone, aging_enabled=True)
1441 samdb = SamDB(url=lp.samdb_url(),
1442 lp=lp,
1443 session_info=system_session(),
1444 credentials=self.creds)
1446 # Property name in not dnsRecord
1447 expr = "(dnsProperty:1.3.6.1.4.1.7165.4.5.3:=1)"
1448 res = samdb.search(base=self.zone_dn, scope=ldb.SCOPE_SUBTREE,
1449 expression=expr, attrs=["*"])
1450 self.assertEquals(len(res), 0)
1452 # No value for tombstone time
1453 try:
1454 expr = "(dnsRecord:1.3.6.1.4.1.7165.4.5.3:=)"
1455 res = samdb.search(base=self.zone_dn, scope=ldb.SCOPE_SUBTREE,
1456 expression=expr, attrs=["*"])
1457 self.assertEquals(len(res), 0)
1458 self.fail("Exception: ldb.ldbError not generated")
1459 except ldb.LdbError as e:
1460 (num, msg) = e.args
1461 self.assertEquals(num, ERR_OPERATIONS_ERROR)
1463 # Tombstone time = -
1464 try:
1465 expr = "(dnsRecord:1.3.6.1.4.1.7165.4.5.3:=-)"
1466 res = samdb.search(base=self.zone_dn, scope=ldb.SCOPE_SUBTREE,
1467 expression=expr, attrs=["*"])
1468 self.assertEquals(len(res), 0)
1469 self.fail("Exception: ldb.ldbError not generated")
1470 except ldb.LdbError as e:
1471 (num, _) = e.args
1472 self.assertEquals(num, ERR_OPERATIONS_ERROR)
1474 # Tombstone time longer than 64 characters
1475 try:
1476 expr = "(dnsRecord:1.3.6.1.4.1.7165.4.5.3:={0})"
1477 expr = expr.format("1" * 65)
1478 res = samdb.search(base=self.zone_dn, scope=ldb.SCOPE_SUBTREE,
1479 expression=expr, attrs=["*"])
1480 self.assertEquals(len(res), 0)
1481 self.fail("Exception: ldb.ldbError not generated")
1482 except ldb.LdbError as e:
1483 (num, _) = e.args
1484 self.assertEquals(num, ERR_OPERATIONS_ERROR)
1486 # Non numeric Tombstone time
1487 try:
1488 expr = "(dnsRecord:1.3.6.1.4.1.7165.4.5.3:=expired)"
1489 res = samdb.search(base=self.zone_dn, scope=ldb.SCOPE_SUBTREE,
1490 expression=expr, attrs=["*"])
1491 self.assertEquals(len(res), 0)
1492 self.fail("Exception: ldb.ldbError not generated")
1493 except ldb.LdbError as e:
1494 (num, _) = e.args
1495 self.assertEquals(num, ERR_OPERATIONS_ERROR)
1497 # Non system session
1498 try:
1499 db = SamDB(url="ldap://" + self.server_ip,
1500 lp=self.get_loadparm(),
1501 credentials=self.creds)
1503 expr = "(dnsRecord:1.3.6.1.4.1.7165.4.5.3:=2)"
1504 res = db.search(base=self.zone_dn, scope=ldb.SCOPE_SUBTREE,
1505 expression=expr, attrs=["*"])
1506 self.assertEquals(len(res), 0)
1507 self.fail("Exception: ldb.ldbError not generated")
1508 except ldb.LdbError as e:
1509 (num, _) = e.args
1510 self.assertEquals(num, ERR_OPERATIONS_ERROR)
1512 def test_basic_scavenging(self):
1513 lp = self.get_loadparm()
1514 self.samdb = SamDB(url=lp.samdb_url(), lp=lp,
1515 session_info=system_session(),
1516 credentials=self.creds)
1518 self.create_zone(self.zone, aging_enabled=True)
1519 interval = 1
1520 self.set_params(NoRefreshInterval=interval, RefreshInterval=interval,
1521 zone=self.zone, Aging=1,
1522 AllowUpdate=dnsp.DNS_ZONE_UPDATE_UNSECURE)
1523 name, txt = 'agingtest', ['test txt']
1524 name2, txt2 = 'agingtest2', ['test txt2']
1525 name3, txt3 = 'agingtest3', ['test txt3']
1526 name4, txt4 = 'agingtest4', ['test txt4']
1527 name5, txt5 = 'agingtest5', ['test txt5']
1528 self.dns_update_record(name, txt)
1529 self.dns_update_record(name2, txt)
1530 self.dns_update_record(name2, txt2)
1531 self.dns_update_record(name3, txt)
1532 self.dns_update_record(name3, txt2)
1534 # Create a tomb stoned record.
1535 self.dns_update_record(name4, txt4)
1536 self.dns_tombstone(name4, txt4, self.zone)
1537 records = self.ldap_get_records(name4)
1538 self.assertTrue("dNSTombstoned" in records[0])
1539 self.assertEqual(records[0]["dNSTombstoned"][0], b"TRUE")
1541 # Create an un-tombstoned record, with dnsTombstoned: FALSE
1542 self.dns_update_record(name5, txt5)
1543 self.dns_tombstone(name5, txt5, self.zone)
1544 self.dns_update_record(name5, txt5)
1545 records = self.ldap_get_records(name5)
1546 self.assertTrue("dNSTombstoned" in records[0])
1547 self.assertEqual(records[0]["dNSTombstoned"][0], b"FALSE")
1549 last_add = self.dns_update_record(name3, txt3)
1551 def mod_ts(rec):
1552 self.assertTrue(rec.dwTimeStamp > 0)
1553 if rec.data.str == txt:
1554 rec.dwTimeStamp -= interval * 5
1556 def mod_ts_all(rec):
1557 rec.dwTimeStamp -= interval * 5
1558 self.ldap_modify_dnsrecs(name, mod_ts)
1559 self.ldap_modify_dnsrecs(name2, mod_ts)
1560 self.ldap_modify_dnsrecs(name3, mod_ts)
1561 self.ldap_modify_dnsrecs(name5, mod_ts_all)
1562 self.assertTrue(callable(getattr(dsdb, '_scavenge_dns_records', None)))
1563 dsdb._scavenge_dns_records(self.samdb)
1565 recs = self.ldap_get_dns_records(name)
1566 self.assertEqual(len(recs), 1)
1567 self.assertEqual(recs[0].wType, dnsp.DNS_TYPE_TOMBSTONE)
1568 records = self.ldap_get_records(name)
1569 self.assertTrue("dNSTombstoned" in records[0])
1570 self.assertEqual(records[0]["dNSTombstoned"][0], b"TRUE")
1572 recs = self.ldap_get_dns_records(name2)
1573 self.assertEqual(len(recs), 1)
1574 self.assertEqual(recs[0].wType, dnsp.DNS_TYPE_TXT)
1575 self.assertEqual(recs[0].data.str, txt2)
1577 recs = self.ldap_get_dns_records(name3)
1578 self.assertEqual(len(recs), 2)
1579 txts = {str(r.data.str) for r in recs}
1580 self.assertEqual(txts, {str(txt2), str(txt3)})
1581 self.assertEqual(recs[0].wType, dnsp.DNS_TYPE_TXT)
1582 self.assertEqual(recs[1].wType, dnsp.DNS_TYPE_TXT)
1584 recs = self.ldap_get_dns_records(name4)
1585 self.assertEqual(len(recs), 1)
1586 self.assertEqual(recs[0].wType, dnsp.DNS_TYPE_TOMBSTONE)
1587 records = self.ldap_get_records(name4)
1588 self.assertTrue("dNSTombstoned" in records[0])
1589 self.assertEqual(records[0]["dNSTombstoned"][0], b"TRUE")
1591 recs = self.ldap_get_dns_records(name5)
1592 self.assertEqual(len(recs), 1)
1593 self.assertEqual(recs[0].wType, dnsp.DNS_TYPE_TOMBSTONE)
1594 records = self.ldap_get_records(name5)
1595 self.assertTrue("dNSTombstoned" in records[0])
1596 self.assertEqual(records[0]["dNSTombstoned"][0], b"TRUE")
1598 for make_it_work in [False, True]:
1599 inc = -1 if make_it_work else 1
1601 def mod_ts(rec):
1602 rec.data = (last_add.dwTimeStamp - 24 * 14) + inc
1603 self.ldap_modify_dnsrecs(name, mod_ts)
1604 dsdb._dns_delete_tombstones(self.samdb)
1605 recs = self.ldap_get_records(name)
1606 if make_it_work:
1607 self.assertEqual(len(recs), 0)
1608 else:
1609 self.assertEqual(len(recs), 1)
1611 def test_fully_qualified_zone(self):
1613 def create_zone_expect_exists(zone):
1614 try:
1615 zone_create = self.make_zone_obj(zone)
1616 client_version = dnsserver.DNS_CLIENT_VERSION_LONGHORN
1617 zc_type = dnsserver.DNSSRV_TYPEID_ZONE_CREATE
1618 self.rpc_conn.DnssrvOperation2(client_version,
1620 self.server_ip,
1621 None,
1623 'ZoneCreate',
1624 zc_type,
1625 zone_create)
1626 except WERRORError as e:
1627 enum, _ = e.args
1628 if enum != werror.WERR_DNS_ERROR_ZONE_ALREADY_EXISTS:
1629 self.fail(e)
1630 return
1631 self.fail("Zone {} should already exist".format(zone))
1633 # Create unqualified, then check creating qualified fails.
1634 self.create_zone(self.zone)
1635 create_zone_expect_exists(self.zone + '.')
1637 # Same again, but the other way around.
1638 self.create_zone(self.zone + '2.')
1639 create_zone_expect_exists(self.zone + '2')
1641 client_version = dnsserver.DNS_CLIENT_VERSION_LONGHORN
1642 request_filter = dnsserver.DNS_ZONE_REQUEST_PRIMARY
1643 tid = dnsserver.DNSSRV_TYPEID_DWORD
1644 typeid, res = self.rpc_conn.DnssrvComplexOperation2(client_version,
1646 self.server_ip,
1647 None,
1648 'EnumZones',
1649 tid,
1650 request_filter)
1652 self.delete_zone(self.zone)
1653 self.delete_zone(self.zone + '2')
1655 # Two zones should've been created, neither of them fully qualified.
1656 zones_we_just_made = []
1657 zones = [str(z.pszZoneName) for z in res.ZoneArray]
1658 for zone in zones:
1659 if zone.startswith(self.zone):
1660 zones_we_just_made.append(zone)
1661 self.assertEqual(len(zones_we_just_made), 2)
1662 self.assertEqual(set(zones_we_just_made), {self.zone + '2', self.zone})
1664 def delete_zone(self, zone):
1665 self.rpc_conn.DnssrvOperation2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1667 self.server_ip,
1668 zone,
1670 'DeleteZoneFromDs',
1671 dnsserver.DNSSRV_TYPEID_NULL,
1672 None)
1674 def test_soa_query(self):
1675 zone = "test.lan"
1676 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
1677 questions = []
1679 q = self.make_name_question(zone, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
1680 questions.append(q)
1681 self.finish_name_packet(p, questions)
1683 (response, response_packet) =\
1684 self.dns_transaction_udp(p, host=server_ip)
1685 # Windows returns OK while BIND logically seems to return NXDOMAIN
1686 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NXDOMAIN)
1687 self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
1688 self.assertEquals(response.ancount, 0)
1690 self.create_zone(zone)
1691 (response, response_packet) =\
1692 self.dns_transaction_udp(p, host=server_ip)
1693 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
1694 self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
1695 self.assertEquals(response.ancount, 1)
1696 self.assertEquals(response.answers[0].rr_type, dns.DNS_QTYPE_SOA)
1698 self.delete_zone(zone)
1699 (response, response_packet) =\
1700 self.dns_transaction_udp(p, host=server_ip)
1701 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NXDOMAIN)
1702 self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
1703 self.assertEquals(response.ancount, 0)
1706 class TestRPCRoundtrip(DNSTest):
1707 def setUp(self):
1708 super(TestRPCRoundtrip, self).setUp()
1709 global server, server_ip, lp, creds
1710 self.server = server_name
1711 self.server_ip = server_ip
1712 self.lp = lp
1713 self.creds = creds
1714 self.rpc_conn = dnsserver.dnsserver("ncacn_ip_tcp:%s[sign]" %
1715 (self.server_ip),
1716 self.lp,
1717 self.creds)
1719 def tearDown(self):
1720 super(TestRPCRoundtrip, self).tearDown()
1722 def rpc_update(self, fqn=None, data=None, wType=None, delete=False):
1723 fqn = fqn or ("rpctestrec." + self.get_dns_domain())
1725 rec = data_to_dns_record(wType, data)
1726 add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
1727 add_rec_buf.rec = rec
1729 add_arg = add_rec_buf
1730 del_arg = None
1731 if delete:
1732 add_arg = None
1733 del_arg = add_rec_buf
1735 self.rpc_conn.DnssrvUpdateRecord2(
1736 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1738 self.server_ip,
1739 self.get_dns_domain(),
1740 fqn,
1741 add_arg,
1742 del_arg)
1744 def test_rpc_self_referencing_cname(self):
1745 cname = "cnametest2_unqual_rec_loop"
1746 cname_fqn = "%s.%s" % (cname, self.get_dns_domain())
1748 try:
1749 self.rpc_update(fqn=cname, data=cname_fqn,
1750 wType=dnsp.DNS_TYPE_CNAME, delete=True)
1751 except WERRORError as e:
1752 if e.args[0] != werror.WERR_DNS_ERROR_RECORD_DOES_NOT_EXIST:
1753 self.fail("RPC DNS gaven wrong error on pre-test cleanup "
1754 "for self referencing CNAME: %s" % e.args[0])
1756 try:
1757 self.rpc_update(fqn=cname, wType=dnsp.DNS_TYPE_CNAME, data=cname_fqn)
1758 except WERRORError as e:
1759 if e.args[0] != werror.WERR_DNS_ERROR_CNAME_LOOP:
1760 self.fail("RPC DNS gaven wrong error on insertion of "
1761 "self referencing CNAME: %s" % e.args[0])
1762 return
1764 self.fail("RPC DNS allowed insertion of self referencing CNAME")
1766 def test_update_add_txt_rpc_to_dns(self):
1767 prefix, txt = 'rpctextrec', ['"This is a test"']
1769 name = "%s.%s" % (prefix, self.get_dns_domain())
1771 rec = data_to_dns_record(dnsp.DNS_TYPE_TXT, '"\\"This is a test\\""')
1772 add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
1773 add_rec_buf.rec = rec
1774 try:
1775 self.rpc_conn.DnssrvUpdateRecord2(
1776 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1778 self.server_ip,
1779 self.get_dns_domain(),
1780 name,
1781 add_rec_buf,
1782 None)
1784 except WERRORError as e:
1785 self.fail(str(e))
1787 try:
1788 self.check_query_txt(prefix, txt)
1789 finally:
1790 self.rpc_conn.DnssrvUpdateRecord2(
1791 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1793 self.server_ip,
1794 self.get_dns_domain(),
1795 name,
1796 None,
1797 add_rec_buf)
1799 def test_update_add_null_padded_txt_record(self):
1800 "test adding records works"
1801 prefix, txt = 'pad1textrec', ['"This is a test"', '', '']
1802 p = self.make_txt_update(prefix, txt)
1803 (response, response_packet) =\
1804 self.dns_transaction_udp(p, host=server_ip)
1805 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
1806 self.check_query_txt(prefix, txt)
1807 self.assertIsNotNone(
1808 dns_record_match(self.rpc_conn,
1809 self.server_ip,
1810 self.get_dns_domain(),
1811 "%s.%s" % (prefix, self.get_dns_domain()),
1812 dnsp.DNS_TYPE_TXT,
1813 '"\\"This is a test\\"" "" ""'))
1815 prefix, txt = 'pad2textrec', ['"This is a test"', '', '', 'more text']
1816 p = self.make_txt_update(prefix, txt)
1817 (response, response_packet) =\
1818 self.dns_transaction_udp(p, host=server_ip)
1819 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
1820 self.check_query_txt(prefix, txt)
1821 self.assertIsNotNone(
1822 dns_record_match(
1823 self.rpc_conn,
1824 self.server_ip,
1825 self.get_dns_domain(),
1826 "%s.%s" % (prefix, self.get_dns_domain()),
1827 dnsp.DNS_TYPE_TXT,
1828 '"\\"This is a test\\"" "" "" "more text"'))
1830 prefix, txt = 'pad3textrec', ['', '', '"This is a test"']
1831 p = self.make_txt_update(prefix, txt)
1832 (response, response_packet) =\
1833 self.dns_transaction_udp(p, host=server_ip)
1834 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
1835 self.check_query_txt(prefix, txt)
1836 self.assertIsNotNone(
1837 dns_record_match(
1838 self.rpc_conn,
1839 self.server_ip,
1840 self.get_dns_domain(),
1841 "%s.%s" % (prefix, self.get_dns_domain()),
1842 dnsp.DNS_TYPE_TXT,
1843 '"" "" "\\"This is a test\\""'))
1845 def test_update_add_padding_rpc_to_dns(self):
1846 prefix, txt = 'pad1textrec', ['"This is a test"', '', '']
1847 prefix = 'rpc' + prefix
1848 name = "%s.%s" % (prefix, self.get_dns_domain())
1850 rec = data_to_dns_record(dnsp.DNS_TYPE_TXT,
1851 '"\\"This is a test\\"" "" ""')
1852 add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
1853 add_rec_buf.rec = rec
1854 try:
1855 self.rpc_conn.DnssrvUpdateRecord2(
1856 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1858 self.server_ip,
1859 self.get_dns_domain(),
1860 name,
1861 add_rec_buf,
1862 None)
1864 except WERRORError as e:
1865 self.fail(str(e))
1867 try:
1868 self.check_query_txt(prefix, txt)
1869 finally:
1870 self.rpc_conn.DnssrvUpdateRecord2(
1871 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1873 self.server_ip,
1874 self.get_dns_domain(),
1875 name,
1876 None,
1877 add_rec_buf)
1879 prefix, txt = 'pad2textrec', ['"This is a test"', '', '', 'more text']
1880 prefix = 'rpc' + prefix
1881 name = "%s.%s" % (prefix, self.get_dns_domain())
1883 rec = data_to_dns_record(dnsp.DNS_TYPE_TXT,
1884 '"\\"This is a test\\"" "" "" "more text"')
1885 add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
1886 add_rec_buf.rec = rec
1887 try:
1888 self.rpc_conn.DnssrvUpdateRecord2(
1889 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1891 self.server_ip,
1892 self.get_dns_domain(),
1893 name,
1894 add_rec_buf,
1895 None)
1897 except WERRORError as e:
1898 self.fail(str(e))
1900 try:
1901 self.check_query_txt(prefix, txt)
1902 finally:
1903 self.rpc_conn.DnssrvUpdateRecord2(
1904 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1906 self.server_ip,
1907 self.get_dns_domain(),
1908 name,
1909 None,
1910 add_rec_buf)
1912 prefix, txt = 'pad3textrec', ['', '', '"This is a test"']
1913 prefix = 'rpc' + prefix
1914 name = "%s.%s" % (prefix, self.get_dns_domain())
1916 rec = data_to_dns_record(dnsp.DNS_TYPE_TXT,
1917 '"" "" "\\"This is a test\\""')
1918 add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
1919 add_rec_buf.rec = rec
1920 try:
1921 self.rpc_conn.DnssrvUpdateRecord2(
1922 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1924 self.server_ip,
1925 self.get_dns_domain(),
1926 name,
1927 add_rec_buf,
1928 None)
1929 except WERRORError as e:
1930 self.fail(str(e))
1932 try:
1933 self.check_query_txt(prefix, txt)
1934 finally:
1935 self.rpc_conn.DnssrvUpdateRecord2(
1936 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1938 self.server_ip,
1939 self.get_dns_domain(),
1940 name,
1941 None,
1942 add_rec_buf)
1944 # Test is incomplete due to strlen against txt records
1945 def test_update_add_null_char_txt_record(self):
1946 "test adding records works"
1947 prefix, txt = 'nulltextrec', ['NULL\x00BYTE']
1948 p = self.make_txt_update(prefix, txt)
1949 (response, response_packet) =\
1950 self.dns_transaction_udp(p, host=server_ip)
1951 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
1952 self.check_query_txt(prefix, ['NULL'])
1953 self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server_ip,
1954 self.get_dns_domain(),
1955 "%s.%s" % (prefix, self.get_dns_domain()),
1956 dnsp.DNS_TYPE_TXT, '"NULL"'))
1958 prefix, txt = 'nulltextrec2', ['NULL\x00BYTE', 'NULL\x00BYTE']
1959 p = self.make_txt_update(prefix, txt)
1960 (response, response_packet) =\
1961 self.dns_transaction_udp(p, host=server_ip)
1962 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
1963 self.check_query_txt(prefix, ['NULL', 'NULL'])
1964 self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server_ip,
1965 self.get_dns_domain(),
1966 "%s.%s" % (prefix, self.get_dns_domain()),
1967 dnsp.DNS_TYPE_TXT, '"NULL" "NULL"'))
1969 def test_update_add_null_char_rpc_to_dns(self):
1970 prefix = 'rpcnulltextrec'
1971 name = "%s.%s" % (prefix, self.get_dns_domain())
1973 rec = data_to_dns_record(dnsp.DNS_TYPE_TXT, '"NULL\x00BYTE"')
1974 add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
1975 add_rec_buf.rec = rec
1976 try:
1977 self.rpc_conn.DnssrvUpdateRecord2(
1978 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1980 self.server_ip,
1981 self.get_dns_domain(),
1982 name,
1983 add_rec_buf,
1984 None)
1986 except WERRORError as e:
1987 self.fail(str(e))
1989 try:
1990 self.check_query_txt(prefix, ['NULL'])
1991 finally:
1992 self.rpc_conn.DnssrvUpdateRecord2(
1993 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1995 self.server_ip,
1996 self.get_dns_domain(),
1997 name,
1998 None,
1999 add_rec_buf)
2001 def test_update_add_hex_char_txt_record(self):
2002 "test adding records works"
2003 prefix, txt = 'hextextrec', ['HIGH\xFFBYTE']
2004 p = self.make_txt_update(prefix, txt)
2005 (response, response_packet) =\
2006 self.dns_transaction_udp(p, host=server_ip)
2007 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
2008 self.check_query_txt(prefix, txt)
2009 self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server_ip,
2010 self.get_dns_domain(),
2011 "%s.%s" % (prefix, self.get_dns_domain()),
2012 dnsp.DNS_TYPE_TXT, '"HIGH\xFFBYTE"'))
2014 def test_update_add_hex_rpc_to_dns(self):
2015 prefix, txt = 'hextextrec', ['HIGH\xFFBYTE']
2016 prefix = 'rpc' + prefix
2017 name = "%s.%s" % (prefix, self.get_dns_domain())
2019 rec = data_to_dns_record(dnsp.DNS_TYPE_TXT, '"HIGH\xFFBYTE"')
2020 add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
2021 add_rec_buf.rec = rec
2022 try:
2023 self.rpc_conn.DnssrvUpdateRecord2(
2024 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
2026 self.server_ip,
2027 self.get_dns_domain(),
2028 name,
2029 add_rec_buf,
2030 None)
2032 except WERRORError as e:
2033 self.fail(str(e))
2035 try:
2036 self.check_query_txt(prefix, txt)
2037 finally:
2038 self.rpc_conn.DnssrvUpdateRecord2(
2039 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
2041 self.server_ip,
2042 self.get_dns_domain(),
2043 name,
2044 None,
2045 add_rec_buf)
2047 def test_update_add_slash_txt_record(self):
2048 "test adding records works"
2049 prefix, txt = 'slashtextrec', ['Th\\=is=is a test']
2050 p = self.make_txt_update(prefix, txt)
2051 (response, response_packet) =\
2052 self.dns_transaction_udp(p, host=server_ip)
2053 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
2054 self.check_query_txt(prefix, txt)
2055 self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server_ip,
2056 self.get_dns_domain(),
2057 "%s.%s" % (prefix, self.get_dns_domain()),
2058 dnsp.DNS_TYPE_TXT, '"Th\\\\=is=is a test"'))
2060 # This test fails against Windows as it eliminates slashes in RPC
2061 # One typical use for a slash is in records like 'var=value' to
2062 # escape '=' characters.
2063 def test_update_add_slash_rpc_to_dns(self):
2064 prefix, txt = 'slashtextrec', ['Th\\=is=is a test']
2065 prefix = 'rpc' + prefix
2066 name = "%s.%s" % (prefix, self.get_dns_domain())
2068 rec = data_to_dns_record(dnsp.DNS_TYPE_TXT, '"Th\\\\=is=is a test"')
2069 add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
2070 add_rec_buf.rec = rec
2071 try:
2072 self.rpc_conn.DnssrvUpdateRecord2(
2073 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
2075 self.server_ip,
2076 self.get_dns_domain(),
2077 name,
2078 add_rec_buf,
2079 None)
2081 except WERRORError as e:
2082 self.fail(str(e))
2084 try:
2085 self.check_query_txt(prefix, txt)
2087 finally:
2088 self.rpc_conn.DnssrvUpdateRecord2(
2089 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
2091 self.server_ip,
2092 self.get_dns_domain(),
2093 name,
2094 None,
2095 add_rec_buf)
2097 def test_update_add_two_txt_records(self):
2098 "test adding two txt records works"
2099 prefix, txt = 'textrec2', ['"This is a test"',
2100 '"and this is a test, too"']
2101 p = self.make_txt_update(prefix, txt)
2102 (response, response_packet) =\
2103 self.dns_transaction_udp(p, host=server_ip)
2104 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
2105 self.check_query_txt(prefix, txt)
2106 self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server_ip,
2107 self.get_dns_domain(),
2108 "%s.%s" % (prefix, self.get_dns_domain()),
2109 dnsp.DNS_TYPE_TXT, '"\\"This is a test\\""' +
2110 ' "\\"and this is a test, too\\""'))
2112 def test_update_add_two_rpc_to_dns(self):
2113 prefix, txt = 'textrec2', ['"This is a test"',
2114 '"and this is a test, too"']
2115 prefix = 'rpc' + prefix
2116 name = "%s.%s" % (prefix, self.get_dns_domain())
2118 rec = data_to_dns_record(dnsp.DNS_TYPE_TXT,
2119 '"\\"This is a test\\""' +
2120 ' "\\"and this is a test, too\\""')
2121 add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
2122 add_rec_buf.rec = rec
2123 try:
2124 self.rpc_conn.DnssrvUpdateRecord2(
2125 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
2127 self.server_ip,
2128 self.get_dns_domain(),
2129 name,
2130 add_rec_buf,
2131 None)
2133 except WERRORError as e:
2134 self.fail(str(e))
2136 try:
2137 self.check_query_txt(prefix, txt)
2138 finally:
2139 self.rpc_conn.DnssrvUpdateRecord2(
2140 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
2142 self.server_ip,
2143 self.get_dns_domain(),
2144 name,
2145 None,
2146 add_rec_buf)
2148 def test_update_add_empty_txt_records(self):
2149 "test adding two txt records works"
2150 prefix, txt = 'emptytextrec', []
2151 p = self.make_txt_update(prefix, txt)
2152 (response, response_packet) =\
2153 self.dns_transaction_udp(p, host=server_ip)
2154 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
2155 self.check_query_txt(prefix, txt)
2156 self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server_ip,
2157 self.get_dns_domain(),
2158 "%s.%s" % (prefix, self.get_dns_domain()),
2159 dnsp.DNS_TYPE_TXT, ''))
2161 def test_update_add_empty_rpc_to_dns(self):
2162 prefix, txt = 'rpcemptytextrec', []
2164 name = "%s.%s" % (prefix, self.get_dns_domain())
2166 rec = data_to_dns_record(dnsp.DNS_TYPE_TXT, '')
2167 add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
2168 add_rec_buf.rec = rec
2169 try:
2170 self.rpc_conn.DnssrvUpdateRecord2(
2171 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
2173 self.server_ip,
2174 self.get_dns_domain(),
2175 name,
2176 add_rec_buf,
2177 None)
2178 except WERRORError as e:
2179 self.fail(str(e))
2181 try:
2182 self.check_query_txt(prefix, txt)
2183 finally:
2184 self.rpc_conn.DnssrvUpdateRecord2(
2185 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
2187 self.server_ip,
2188 self.get_dns_domain(),
2189 name,
2190 None,
2191 add_rec_buf)
2194 TestProgram(module=__name__, opts=subunitopts)