docs-xml: "cluster addresses" dns registration
[Samba.git] / python / samba / tests / dns.py
blob275d4fcd692142ae409d2d2de65faba2adb51e94
1 # Unix SMB/CIFS implementation.
2 # Copyright (C) Kai Blin <kai@samba.org> 2011
4 # This program is free software; you can redistribute it and/or modify
5 # it under the terms of the GNU General Public License as published by
6 # the Free Software Foundation; either version 3 of the License, or
7 # (at your option) any later version.
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
14 # You should have received a copy of the GNU General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 from __future__ import print_function
20 from samba import dsdb
21 from samba.ndr import ndr_unpack, ndr_pack
22 from samba.samdb import SamDB
23 from samba.auth import system_session
24 import ldb
25 from ldb import ERR_OPERATIONS_ERROR
26 import os
27 import sys
28 import struct
29 import socket
30 import samba.ndr as ndr
31 from samba import credentials
32 from samba.dcerpc import dns, dnsp, dnsserver
33 from samba.netcmd.dns import TXTRecord, dns_record_match, data_to_dns_record
34 from samba.tests.subunitrun import SubunitOptions, TestProgram
35 from samba import werror, WERRORError
36 from samba.tests.dns_base import DNSTest
37 import samba.getopt as options
38 import optparse
41 parser = optparse.OptionParser("dns.py <server name> <server ip> [options]")
42 sambaopts = options.SambaOptions(parser)
43 parser.add_option_group(sambaopts)
45 # This timeout only has relevance when testing against Windows
46 # Format errors tend to return patchy responses, so a timeout is needed.
47 parser.add_option("--timeout", type="int", dest="timeout",
48 help="Specify timeout for DNS requests")
50 # use command line creds if available
51 credopts = options.CredentialsOptions(parser)
52 parser.add_option_group(credopts)
53 subunitopts = SubunitOptions(parser)
54 parser.add_option_group(subunitopts)
56 opts, args = parser.parse_args()
58 lp = sambaopts.get_loadparm()
59 creds = credopts.get_credentials(lp)
61 timeout = opts.timeout
63 if len(args) < 2:
64 parser.print_usage()
65 sys.exit(1)
67 server_name = args[0]
68 server_ip = args[1]
69 creds.set_krb_forwardable(credentials.NO_KRB_FORWARDABLE)
72 class TestSimpleQueries(DNSTest):
73 def setUp(self):
74 super(TestSimpleQueries, self).setUp()
75 global server, server_ip, lp, creds, timeout
76 self.server = server_name
77 self.server_ip = server_ip
78 self.lp = lp
79 self.creds = creds
80 self.timeout = timeout
82 def test_one_a_query(self):
83 "create a query packet containing one query record"
84 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
85 questions = []
87 name = "%s.%s" % (self.server, self.get_dns_domain())
88 q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
89 print("asking for ", q.name)
90 questions.append(q)
92 self.finish_name_packet(p, questions)
93 (response, response_packet) =\
94 self.dns_transaction_udp(p, host=server_ip)
95 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
96 self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
97 self.assertEquals(response.ancount, 1)
98 self.assertEquals(response.answers[0].rdata,
99 self.server_ip)
101 def test_one_SOA_query(self):
102 "create a query packet containing one query record for the SOA"
103 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
104 questions = []
106 name = "%s" % (self.get_dns_domain())
107 q = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
108 print("asking for ", q.name)
109 questions.append(q)
111 self.finish_name_packet(p, questions)
112 (response, response_packet) =\
113 self.dns_transaction_udp(p, host=server_ip)
114 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
115 self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
116 self.assertEquals(response.ancount, 1)
117 self.assertEquals(
118 response.answers[0].rdata.mname.upper(),
119 ("%s.%s" % (self.server, self.get_dns_domain())).upper())
121 def test_one_a_query_tcp(self):
122 "create a query packet containing one query record via TCP"
123 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
124 questions = []
126 name = "%s.%s" % (self.server, self.get_dns_domain())
127 q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
128 print("asking for ", q.name)
129 questions.append(q)
131 self.finish_name_packet(p, questions)
132 (response, response_packet) =\
133 self.dns_transaction_tcp(p, host=server_ip)
134 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
135 self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
136 self.assertEquals(response.ancount, 1)
137 self.assertEquals(response.answers[0].rdata,
138 self.server_ip)
140 def test_one_mx_query(self):
141 "create a query packet causing an empty RCODE_OK answer"
142 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
143 questions = []
145 name = "%s.%s" % (self.server, self.get_dns_domain())
146 q = self.make_name_question(name, dns.DNS_QTYPE_MX, dns.DNS_QCLASS_IN)
147 print("asking for ", q.name)
148 questions.append(q)
150 self.finish_name_packet(p, questions)
151 (response, response_packet) =\
152 self.dns_transaction_udp(p, host=server_ip)
153 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
154 self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
155 self.assertEquals(response.ancount, 0)
157 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
158 questions = []
160 name = "invalid-%s.%s" % (self.server, self.get_dns_domain())
161 q = self.make_name_question(name, dns.DNS_QTYPE_MX, dns.DNS_QCLASS_IN)
162 print("asking for ", q.name)
163 questions.append(q)
165 self.finish_name_packet(p, questions)
166 (response, response_packet) =\
167 self.dns_transaction_udp(p, host=server_ip)
168 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NXDOMAIN)
169 self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
170 self.assertEquals(response.ancount, 0)
172 def test_two_queries(self):
173 "create a query packet containing two query records"
174 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
175 questions = []
177 name = "%s.%s" % (self.server, self.get_dns_domain())
178 q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
179 questions.append(q)
181 name = "%s.%s" % ('bogusname', self.get_dns_domain())
182 q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
183 questions.append(q)
185 self.finish_name_packet(p, questions)
186 try:
187 (response, response_packet) =\
188 self.dns_transaction_udp(p, host=server_ip)
189 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_FORMERR)
190 except socket.timeout:
191 # Windows chooses not to respond to incorrectly formatted queries.
192 # Although this appears to be non-deterministic even for the same
193 # request twice, it also appears to be based on a how poorly the
194 # request is formatted.
195 pass
197 def test_qtype_all_query(self):
198 "create a QTYPE_ALL query"
199 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
200 questions = []
202 name = "%s.%s" % (self.server, self.get_dns_domain())
203 q = self.make_name_question(name, dns.DNS_QTYPE_ALL, dns.DNS_QCLASS_IN)
204 print("asking for ", q.name)
205 questions.append(q)
207 self.finish_name_packet(p, questions)
208 (response, response_packet) =\
209 self.dns_transaction_udp(p, host=server_ip)
211 num_answers = 1
212 dc_ipv6 = os.getenv('SERVER_IPV6')
213 if dc_ipv6 is not None:
214 num_answers += 1
216 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
217 self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
218 self.assertEquals(response.ancount, num_answers)
219 self.assertEquals(response.answers[0].rdata,
220 self.server_ip)
221 if dc_ipv6 is not None:
222 self.assertEquals(response.answers[1].rdata, dc_ipv6)
224 def test_qclass_none_query(self):
225 "create a QCLASS_NONE query"
226 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
227 questions = []
229 name = "%s.%s" % (self.server, self.get_dns_domain())
230 q = self.make_name_question(
231 name,
232 dns.DNS_QTYPE_ALL,
233 dns.DNS_QCLASS_NONE)
234 questions.append(q)
236 self.finish_name_packet(p, questions)
237 try:
238 (response, response_packet) =\
239 self.dns_transaction_udp(p, host=server_ip)
240 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NOTIMP)
241 except socket.timeout:
242 # Windows chooses not to respond to incorrectly formatted queries.
243 # Although this appears to be non-deterministic even for the same
244 # request twice, it also appears to be based on a how poorly the
245 # request is formatted.
246 pass
248 def test_soa_hostname_query(self):
249 "create a SOA query for a hostname"
250 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
251 questions = []
253 name = "%s.%s" % (self.server, self.get_dns_domain())
254 q = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
255 questions.append(q)
257 self.finish_name_packet(p, questions)
258 (response, response_packet) =\
259 self.dns_transaction_udp(p, host=server_ip)
260 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
261 self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
262 # We don't get SOA records for single hosts
263 self.assertEquals(response.ancount, 0)
264 # But we do respond with an authority section
265 self.assertEqual(response.nscount, 1)
267 def test_soa_domain_query(self):
268 "create a SOA query for a domain"
269 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
270 questions = []
272 name = self.get_dns_domain()
273 q = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
274 questions.append(q)
276 self.finish_name_packet(p, questions)
277 (response, response_packet) =\
278 self.dns_transaction_udp(p, host=server_ip)
279 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
280 self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
281 self.assertEquals(response.ancount, 1)
282 self.assertEquals(response.answers[0].rdata.minimum, 3600)
285 class TestDNSUpdates(DNSTest):
286 def setUp(self):
287 super(TestDNSUpdates, self).setUp()
288 global server, server_ip, lp, creds, timeout
289 self.server = server_name
290 self.server_ip = server_ip
291 self.lp = lp
292 self.creds = creds
293 self.timeout = timeout
295 def test_two_updates(self):
296 "create two update requests"
297 p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
298 updates = []
300 name = "%s.%s" % (self.server, self.get_dns_domain())
301 u = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
302 updates.append(u)
304 name = self.get_dns_domain()
305 u = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
306 updates.append(u)
308 self.finish_name_packet(p, updates)
309 try:
310 (response, response_packet) =\
311 self.dns_transaction_udp(p, host=server_ip)
312 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_FORMERR)
313 except socket.timeout:
314 # Windows chooses not to respond to incorrectly formatted queries.
315 # Although this appears to be non-deterministic even for the same
316 # request twice, it also appears to be based on a how poorly the
317 # request is formatted.
318 pass
320 def test_update_wrong_qclass(self):
321 "create update with DNS_QCLASS_NONE"
322 p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
323 updates = []
325 name = self.get_dns_domain()
326 u = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_NONE)
327 updates.append(u)
329 self.finish_name_packet(p, updates)
330 (response, response_packet) =\
331 self.dns_transaction_udp(p, host=server_ip)
332 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NOTIMP)
334 def test_update_prereq_with_non_null_ttl(self):
335 "test update with a non-null TTL"
336 p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
337 updates = []
339 name = self.get_dns_domain()
341 u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
342 updates.append(u)
343 self.finish_name_packet(p, updates)
345 prereqs = []
346 r = dns.res_rec()
347 r.name = "%s.%s" % (self.server, self.get_dns_domain())
348 r.rr_type = dns.DNS_QTYPE_TXT
349 r.rr_class = dns.DNS_QCLASS_NONE
350 r.ttl = 1
351 r.length = 0
352 prereqs.append(r)
354 p.ancount = len(prereqs)
355 p.answers = prereqs
357 try:
358 (response, response_packet) =\
359 self.dns_transaction_udp(p, host=server_ip)
360 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_FORMERR)
361 except socket.timeout:
362 # Windows chooses not to respond to incorrectly formatted queries.
363 # Although this appears to be non-deterministic even for the same
364 # request twice, it also appears to be based on a how poorly the
365 # request is formatted.
366 pass
368 def test_update_prereq_with_non_null_length(self):
369 "test update with a non-null length"
370 p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
371 updates = []
373 name = self.get_dns_domain()
375 u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
376 updates.append(u)
377 self.finish_name_packet(p, updates)
379 prereqs = []
380 r = dns.res_rec()
381 r.name = "%s.%s" % (self.server, self.get_dns_domain())
382 r.rr_type = dns.DNS_QTYPE_TXT
383 r.rr_class = dns.DNS_QCLASS_ANY
384 r.ttl = 0
385 r.length = 1
386 prereqs.append(r)
388 p.ancount = len(prereqs)
389 p.answers = prereqs
391 (response, response_packet) =\
392 self.dns_transaction_udp(p, host=server_ip)
393 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NXRRSET)
395 def test_update_prereq_nonexisting_name(self):
396 "test update with a nonexisting name"
397 p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
398 updates = []
400 name = self.get_dns_domain()
402 u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
403 updates.append(u)
404 self.finish_name_packet(p, updates)
406 prereqs = []
407 r = dns.res_rec()
408 r.name = "idontexist.%s" % self.get_dns_domain()
409 r.rr_type = dns.DNS_QTYPE_TXT
410 r.rr_class = dns.DNS_QCLASS_ANY
411 r.ttl = 0
412 r.length = 0
413 prereqs.append(r)
415 p.ancount = len(prereqs)
416 p.answers = prereqs
418 (response, response_packet) =\
419 self.dns_transaction_udp(p, host=server_ip)
420 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NXRRSET)
422 def test_update_add_txt_record(self):
423 "test adding records works"
424 prefix, txt = 'textrec', ['"This is a test"']
425 p = self.make_txt_update(prefix, txt)
426 (response, response_packet) =\
427 self.dns_transaction_udp(p, host=server_ip)
428 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
429 self.check_query_txt(prefix, txt)
431 def test_delete_record(self):
432 "Test if deleting records works"
434 NAME = "deleterec.%s" % self.get_dns_domain()
436 # First, create a record to make sure we have a record to delete.
437 p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
438 updates = []
440 name = self.get_dns_domain()
442 u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
443 updates.append(u)
444 self.finish_name_packet(p, updates)
446 updates = []
447 r = dns.res_rec()
448 r.name = NAME
449 r.rr_type = dns.DNS_QTYPE_TXT
450 r.rr_class = dns.DNS_QCLASS_IN
451 r.ttl = 900
452 r.length = 0xffff
453 rdata = self.make_txt_record(['"This is a test"'])
454 r.rdata = rdata
455 updates.append(r)
456 p.nscount = len(updates)
457 p.nsrecs = updates
459 (response, response_packet) =\
460 self.dns_transaction_udp(p, host=server_ip)
461 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
463 # Now check the record is around
464 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
465 questions = []
466 q = self.make_name_question(NAME, dns.DNS_QTYPE_TXT, dns.DNS_QCLASS_IN)
467 questions.append(q)
469 self.finish_name_packet(p, questions)
470 (response, response_packet) =\
471 self.dns_transaction_udp(p, host=server_ip)
472 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
474 # Now delete the record
475 p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
476 updates = []
478 name = self.get_dns_domain()
480 u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
481 updates.append(u)
482 self.finish_name_packet(p, updates)
484 updates = []
485 r = dns.res_rec()
486 r.name = NAME
487 r.rr_type = dns.DNS_QTYPE_TXT
488 r.rr_class = dns.DNS_QCLASS_NONE
489 r.ttl = 0
490 r.length = 0xffff
491 rdata = self.make_txt_record(['"This is a test"'])
492 r.rdata = rdata
493 updates.append(r)
494 p.nscount = len(updates)
495 p.nsrecs = updates
497 (response, response_packet) =\
498 self.dns_transaction_udp(p, host=server_ip)
499 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
501 # And finally check it's gone
502 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
503 questions = []
505 q = self.make_name_question(NAME, dns.DNS_QTYPE_TXT, dns.DNS_QCLASS_IN)
506 questions.append(q)
508 self.finish_name_packet(p, questions)
509 (response, response_packet) =\
510 self.dns_transaction_udp(p, host=server_ip)
511 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NXDOMAIN)
513 def test_readd_record(self):
514 "Test if adding, deleting and then readding a records works"
516 NAME = "readdrec.%s" % self.get_dns_domain()
518 # Create the record
519 p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
520 updates = []
522 name = self.get_dns_domain()
524 u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
525 updates.append(u)
526 self.finish_name_packet(p, updates)
528 updates = []
529 r = dns.res_rec()
530 r.name = NAME
531 r.rr_type = dns.DNS_QTYPE_TXT
532 r.rr_class = dns.DNS_QCLASS_IN
533 r.ttl = 900
534 r.length = 0xffff
535 rdata = self.make_txt_record(['"This is a test"'])
536 r.rdata = rdata
537 updates.append(r)
538 p.nscount = len(updates)
539 p.nsrecs = updates
541 (response, response_packet) =\
542 self.dns_transaction_udp(p, host=server_ip)
543 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
545 # Now check the record is around
546 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
547 questions = []
548 q = self.make_name_question(NAME, dns.DNS_QTYPE_TXT, dns.DNS_QCLASS_IN)
549 questions.append(q)
551 self.finish_name_packet(p, questions)
552 (response, response_packet) =\
553 self.dns_transaction_udp(p, host=server_ip)
554 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
556 # Now delete the record
557 p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
558 updates = []
560 name = self.get_dns_domain()
562 u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
563 updates.append(u)
564 self.finish_name_packet(p, updates)
566 updates = []
567 r = dns.res_rec()
568 r.name = NAME
569 r.rr_type = dns.DNS_QTYPE_TXT
570 r.rr_class = dns.DNS_QCLASS_NONE
571 r.ttl = 0
572 r.length = 0xffff
573 rdata = self.make_txt_record(['"This is a test"'])
574 r.rdata = rdata
575 updates.append(r)
576 p.nscount = len(updates)
577 p.nsrecs = updates
579 (response, response_packet) =\
580 self.dns_transaction_udp(p, host=server_ip)
581 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
583 # check it's gone
584 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
585 questions = []
587 q = self.make_name_question(NAME, dns.DNS_QTYPE_TXT, dns.DNS_QCLASS_IN)
588 questions.append(q)
590 self.finish_name_packet(p, questions)
591 (response, response_packet) =\
592 self.dns_transaction_udp(p, host=server_ip)
593 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NXDOMAIN)
595 # recreate the record
596 p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
597 updates = []
599 name = self.get_dns_domain()
601 u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
602 updates.append(u)
603 self.finish_name_packet(p, updates)
605 updates = []
606 r = dns.res_rec()
607 r.name = NAME
608 r.rr_type = dns.DNS_QTYPE_TXT
609 r.rr_class = dns.DNS_QCLASS_IN
610 r.ttl = 900
611 r.length = 0xffff
612 rdata = self.make_txt_record(['"This is a test"'])
613 r.rdata = rdata
614 updates.append(r)
615 p.nscount = len(updates)
616 p.nsrecs = updates
618 (response, response_packet) =\
619 self.dns_transaction_udp(p, host=server_ip)
620 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
622 # Now check the record is around
623 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
624 questions = []
625 q = self.make_name_question(NAME, dns.DNS_QTYPE_TXT, dns.DNS_QCLASS_IN)
626 questions.append(q)
628 self.finish_name_packet(p, questions)
629 (response, response_packet) =\
630 self.dns_transaction_udp(p, host=server_ip)
631 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
633 def test_update_add_mx_record(self):
634 "test adding MX records works"
635 p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
636 updates = []
638 name = self.get_dns_domain()
640 u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
641 updates.append(u)
642 self.finish_name_packet(p, updates)
644 updates = []
645 r = dns.res_rec()
646 r.name = "%s" % self.get_dns_domain()
647 r.rr_type = dns.DNS_QTYPE_MX
648 r.rr_class = dns.DNS_QCLASS_IN
649 r.ttl = 900
650 r.length = 0xffff
651 rdata = dns.mx_record()
652 rdata.preference = 10
653 rdata.exchange = 'mail.%s' % self.get_dns_domain()
654 r.rdata = rdata
655 updates.append(r)
656 p.nscount = len(updates)
657 p.nsrecs = updates
659 (response, response_packet) =\
660 self.dns_transaction_udp(p, host=server_ip)
661 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
663 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
664 questions = []
666 name = "%s" % self.get_dns_domain()
667 q = self.make_name_question(name, dns.DNS_QTYPE_MX, dns.DNS_QCLASS_IN)
668 questions.append(q)
670 self.finish_name_packet(p, questions)
671 (response, response_packet) =\
672 self.dns_transaction_udp(p, host=server_ip)
673 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
674 self.assertEqual(response.ancount, 1)
675 ans = response.answers[0]
676 self.assertEqual(ans.rr_type, dns.DNS_QTYPE_MX)
677 self.assertEqual(ans.rdata.preference, 10)
678 self.assertEqual(ans.rdata.exchange, 'mail.%s' % self.get_dns_domain())
681 class TestComplexQueries(DNSTest):
682 def make_dns_update(self, key, value, qtype):
683 p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
685 name = self.get_dns_domain()
686 u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
687 self.finish_name_packet(p, [u])
689 r = dns.res_rec()
690 r.name = key
691 r.rr_type = qtype
692 r.rr_class = dns.DNS_QCLASS_IN
693 r.ttl = 900
694 r.length = 0xffff
695 r.rdata = value
696 p.nscount = 1
697 p.nsrecs = [r]
698 (response, response_packet) =\
699 self.dns_transaction_udp(p, host=server_ip)
700 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
702 def setUp(self):
703 super(TestComplexQueries, self).setUp()
705 global server, server_ip, lp, creds, timeout
706 self.server = server_name
707 self.server_ip = server_ip
708 self.lp = lp
709 self.creds = creds
710 self.timeout = timeout
712 def test_one_a_query(self):
713 "create a query packet containing one query record"
715 try:
717 # Create the record
718 name = "cname_test.%s" % self.get_dns_domain()
719 rdata = "%s.%s" % (self.server, self.get_dns_domain())
720 self.make_dns_update(name, rdata, dns.DNS_QTYPE_CNAME)
722 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
723 questions = []
725 # Check the record
726 name = "cname_test.%s" % self.get_dns_domain()
727 q = self.make_name_question(name,
728 dns.DNS_QTYPE_A,
729 dns.DNS_QCLASS_IN)
730 print("asking for ", q.name)
731 questions.append(q)
733 self.finish_name_packet(p, questions)
734 (response, response_packet) =\
735 self.dns_transaction_udp(p, host=self.server_ip)
736 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
737 self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
738 self.assertEquals(response.ancount, 2)
739 self.assertEquals(response.answers[0].rr_type, dns.DNS_QTYPE_CNAME)
740 self.assertEquals(response.answers[0].rdata, "%s.%s" %
741 (self.server, self.get_dns_domain()))
742 self.assertEquals(response.answers[1].rr_type, dns.DNS_QTYPE_A)
743 self.assertEquals(response.answers[1].rdata,
744 self.server_ip)
746 finally:
747 # Delete the record
748 p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
749 updates = []
751 name = self.get_dns_domain()
753 u = self.make_name_question(name,
754 dns.DNS_QTYPE_SOA,
755 dns.DNS_QCLASS_IN)
756 updates.append(u)
757 self.finish_name_packet(p, updates)
759 updates = []
760 r = dns.res_rec()
761 r.name = "cname_test.%s" % self.get_dns_domain()
762 r.rr_type = dns.DNS_QTYPE_CNAME
763 r.rr_class = dns.DNS_QCLASS_NONE
764 r.ttl = 0
765 r.length = 0xffff
766 r.rdata = "%s.%s" % (self.server, self.get_dns_domain())
767 updates.append(r)
768 p.nscount = len(updates)
769 p.nsrecs = updates
771 (response, response_packet) =\
772 self.dns_transaction_udp(p, host=self.server_ip)
773 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
775 def test_cname_two_chain(self):
776 name0 = "cnamechain0.%s" % self.get_dns_domain()
777 name1 = "cnamechain1.%s" % self.get_dns_domain()
778 name2 = "cnamechain2.%s" % self.get_dns_domain()
779 self.make_dns_update(name1, name2, dns.DNS_QTYPE_CNAME)
780 self.make_dns_update(name2, name0, dns.DNS_QTYPE_CNAME)
781 self.make_dns_update(name0, server_ip, dns.DNS_QTYPE_A)
783 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
784 questions = []
785 q = self.make_name_question(name1, dns.DNS_QTYPE_A,
786 dns.DNS_QCLASS_IN)
787 questions.append(q)
789 self.finish_name_packet(p, questions)
790 (response, response_packet) =\
791 self.dns_transaction_udp(p, host=server_ip)
792 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
793 self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
794 self.assertEquals(response.ancount, 3)
796 self.assertEquals(response.answers[0].rr_type, dns.DNS_QTYPE_CNAME)
797 self.assertEquals(response.answers[0].name, name1)
798 self.assertEquals(response.answers[0].rdata, name2)
800 self.assertEquals(response.answers[1].rr_type, dns.DNS_QTYPE_CNAME)
801 self.assertEquals(response.answers[1].name, name2)
802 self.assertEquals(response.answers[1].rdata, name0)
804 self.assertEquals(response.answers[2].rr_type, dns.DNS_QTYPE_A)
805 self.assertEquals(response.answers[2].rdata,
806 self.server_ip)
808 def test_invalid_empty_cname(self):
809 name0 = "cnamedotprefix0.%s" % self.get_dns_domain()
810 try:
811 self.make_dns_update(name0, "", dns.DNS_QTYPE_CNAME)
812 except AssertionError:
813 pass
814 else:
815 self.fail("Successfully added empty CNAME, which is invalid.")
817 def test_cname_two_chain_not_matching_qtype(self):
818 name0 = "cnamechain0.%s" % self.get_dns_domain()
819 name1 = "cnamechain1.%s" % self.get_dns_domain()
820 name2 = "cnamechain2.%s" % self.get_dns_domain()
821 self.make_dns_update(name1, name2, dns.DNS_QTYPE_CNAME)
822 self.make_dns_update(name2, name0, dns.DNS_QTYPE_CNAME)
823 self.make_dns_update(name0, server_ip, dns.DNS_QTYPE_A)
825 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
826 questions = []
827 q = self.make_name_question(name1, dns.DNS_QTYPE_TXT,
828 dns.DNS_QCLASS_IN)
829 questions.append(q)
831 self.finish_name_packet(p, questions)
832 (response, response_packet) =\
833 self.dns_transaction_udp(p, host=server_ip)
834 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
835 self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
837 # CNAME should return all intermediate results!
838 # Only the A records exists, not the TXT.
839 self.assertEquals(response.ancount, 2)
841 self.assertEquals(response.answers[0].rr_type, dns.DNS_QTYPE_CNAME)
842 self.assertEquals(response.answers[0].name, name1)
843 self.assertEquals(response.answers[0].rdata, name2)
845 self.assertEquals(response.answers[1].rr_type, dns.DNS_QTYPE_CNAME)
846 self.assertEquals(response.answers[1].name, name2)
847 self.assertEquals(response.answers[1].rdata, name0)
849 def test_cname_loop(self):
850 cname1 = "cnamelooptestrec." + self.get_dns_domain()
851 cname2 = "cnamelooptestrec2." + self.get_dns_domain()
852 cname3 = "cnamelooptestrec3." + self.get_dns_domain()
853 self.make_dns_update(cname1, cname2, dnsp.DNS_TYPE_CNAME)
854 self.make_dns_update(cname2, cname3, dnsp.DNS_TYPE_CNAME)
855 self.make_dns_update(cname3, cname1, dnsp.DNS_TYPE_CNAME)
857 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
858 questions = []
860 q = self.make_name_question(cname1,
861 dns.DNS_QTYPE_A,
862 dns.DNS_QCLASS_IN)
863 questions.append(q)
864 self.finish_name_packet(p, questions)
866 (response, response_packet) =\
867 self.dns_transaction_udp(p, host=self.server_ip)
869 max_recursion_depth = 20
870 self.assertEquals(len(response.answers), max_recursion_depth)
872 # Make sure cname limit doesn't count other records. This is a generic
873 # test called in tests below
874 def max_rec_test(self, rtype, rec_gen):
875 name = "limittestrec{0}.{1}".format(rtype, self.get_dns_domain())
876 limit = 20
877 num_recs_to_enter = limit + 5
879 for i in range(1, num_recs_to_enter+1):
880 ip = rec_gen(i)
881 self.make_dns_update(name, ip, rtype)
883 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
884 questions = []
886 q = self.make_name_question(name,
887 rtype,
888 dns.DNS_QCLASS_IN)
889 questions.append(q)
890 self.finish_name_packet(p, questions)
892 (response, response_packet) =\
893 self.dns_transaction_udp(p, host=self.server_ip)
895 self.assertEqual(len(response.answers), num_recs_to_enter)
897 def test_record_limit_A(self):
898 def ip4_gen(i):
899 return "127.0.0." + str(i)
900 self.max_rec_test(rtype=dns.DNS_QTYPE_A, rec_gen=ip4_gen)
902 def test_record_limit_AAAA(self):
903 def ip6_gen(i):
904 return "AAAA:0:0:0:0:0:0:" + str(i)
905 self.max_rec_test(rtype=dns.DNS_QTYPE_AAAA, rec_gen=ip6_gen)
907 def test_record_limit_SRV(self):
908 def srv_gen(i):
909 rec = dns.srv_record()
910 rec.priority = 1
911 rec.weight = 1
912 rec.port = 92
913 rec.target = "srvtestrec" + str(i)
914 return rec
915 self.max_rec_test(rtype=dns.DNS_QTYPE_SRV, rec_gen=srv_gen)
917 # Same as test_record_limit_A but with a preceding CNAME follow
918 def test_cname_limit(self):
919 cname1 = "cnamelimittestrec." + self.get_dns_domain()
920 cname2 = "cnamelimittestrec2." + self.get_dns_domain()
921 cname3 = "cnamelimittestrec3." + self.get_dns_domain()
922 ip_prefix = '127.0.0.'
923 limit = 20
924 num_recs_to_enter = limit + 5
926 self.make_dns_update(cname1, cname2, dnsp.DNS_TYPE_CNAME)
927 self.make_dns_update(cname2, cname3, dnsp.DNS_TYPE_CNAME)
928 num_arecs_to_enter = num_recs_to_enter - 2
929 for i in range(1, num_arecs_to_enter+1):
930 ip = ip_prefix + str(i)
931 self.make_dns_update(cname3, ip, dns.DNS_QTYPE_A)
933 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
934 questions = []
936 q = self.make_name_question(cname1,
937 dns.DNS_QTYPE_A,
938 dns.DNS_QCLASS_IN)
939 questions.append(q)
940 self.finish_name_packet(p, questions)
942 (response, response_packet) =\
943 self.dns_transaction_udp(p, host=self.server_ip)
945 self.assertEqual(len(response.answers), num_recs_to_enter)
947 # ANY query on cname record shouldn't follow the link
948 def test_cname_any_query(self):
949 cname1 = "cnameanytestrec." + self.get_dns_domain()
950 cname2 = "cnameanytestrec2." + self.get_dns_domain()
951 cname3 = "cnameanytestrec3." + self.get_dns_domain()
953 self.make_dns_update(cname1, cname2, dnsp.DNS_TYPE_CNAME)
954 self.make_dns_update(cname2, cname3, dnsp.DNS_TYPE_CNAME)
956 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
957 questions = []
959 q = self.make_name_question(cname1,
960 dns.DNS_QTYPE_ALL,
961 dns.DNS_QCLASS_IN)
962 questions.append(q)
963 self.finish_name_packet(p, questions)
965 (response, response_packet) =\
966 self.dns_transaction_udp(p, host=self.server_ip)
968 self.assertEqual(len(response.answers), 1)
969 self.assertEqual(response.answers[0].name, cname1)
970 self.assertEqual(response.answers[0].rdata, cname2)
973 class TestInvalidQueries(DNSTest):
974 def setUp(self):
975 super(TestInvalidQueries, self).setUp()
976 global server, server_ip, lp, creds, timeout
977 self.server = server_name
978 self.server_ip = server_ip
979 self.lp = lp
980 self.creds = creds
981 self.timeout = timeout
983 def test_one_a_query(self):
984 """send 0 bytes follows by create a query packet
985 containing one query record"""
987 s = None
988 try:
989 s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, 0)
990 s.connect((self.server_ip, 53))
991 s.send(b"", 0)
992 finally:
993 if s is not None:
994 s.close()
996 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
997 questions = []
999 name = "%s.%s" % (self.server, self.get_dns_domain())
1000 q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
1001 print("asking for ", q.name)
1002 questions.append(q)
1004 self.finish_name_packet(p, questions)
1005 (response, response_packet) =\
1006 self.dns_transaction_udp(p, host=self.server_ip)
1007 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
1008 self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
1009 self.assertEquals(response.ancount, 1)
1010 self.assertEquals(response.answers[0].rdata,
1011 self.server_ip)
1013 def test_one_a_reply(self):
1014 "send a reply instead of a query"
1015 global timeout
1017 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
1018 questions = []
1020 name = "%s.%s" % ('fakefakefake', self.get_dns_domain())
1021 q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
1022 print("asking for ", q.name)
1023 questions.append(q)
1025 self.finish_name_packet(p, questions)
1026 p.operation |= dns.DNS_FLAG_REPLY
1027 s = None
1028 try:
1029 send_packet = ndr.ndr_pack(p)
1030 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
1031 s.settimeout(timeout)
1032 host = self.server_ip
1033 s.connect((host, 53))
1034 tcp_packet = struct.pack('!H', len(send_packet))
1035 tcp_packet += send_packet
1036 s.send(tcp_packet, 0)
1037 recv_packet = s.recv(0xffff + 2, 0)
1038 self.assertEquals(0, len(recv_packet))
1039 except socket.timeout:
1040 # Windows chooses not to respond to incorrectly formatted queries.
1041 # Although this appears to be non-deterministic even for the same
1042 # request twice, it also appears to be based on a how poorly the
1043 # request is formatted.
1044 pass
1045 finally:
1046 if s is not None:
1047 s.close()
1050 class TestZones(DNSTest):
1051 def setUp(self):
1052 super(TestZones, self).setUp()
1053 global server, server_ip, lp, creds, timeout
1054 self.server = server_name
1055 self.server_ip = server_ip
1056 self.lp = lp
1057 self.creds = creds
1058 self.timeout = timeout
1060 self.zone = "test.lan"
1061 self.rpc_conn = dnsserver.dnsserver("ncacn_ip_tcp:%s[sign]" %
1062 (self.server_ip),
1063 self.lp, self.creds)
1065 self.samdb = SamDB(url="ldap://" + self.server_ip,
1066 lp=self.get_loadparm(),
1067 session_info=system_session(),
1068 credentials=self.creds)
1069 self.zone_dn = "DC=" + self.zone +\
1070 ",CN=MicrosoftDNS,DC=DomainDNSZones," +\
1071 str(self.samdb.get_default_basedn())
1073 def tearDown(self):
1074 super(TestZones, self).tearDown()
1076 try:
1077 self.delete_zone(self.zone)
1078 except RuntimeError as e:
1079 (num, string) = e.args
1080 if num != werror.WERR_DNS_ERROR_ZONE_DOES_NOT_EXIST:
1081 raise
1083 def make_zone_obj(self, zone, aging_enabled=False):
1084 zone_create = dnsserver.DNS_RPC_ZONE_CREATE_INFO_LONGHORN()
1085 zone_create.pszZoneName = zone
1086 zone_create.dwZoneType = dnsp.DNS_ZONE_TYPE_PRIMARY
1087 zone_create.fAging = int(aging_enabled)
1088 zone_create.dwDpFlags = dnsserver.DNS_DP_DOMAIN_DEFAULT
1089 zone_create.fDsIntegrated = 1
1090 zone_create.fLoadExisting = 1
1091 zone_create.fAllowUpdate = dnsp.DNS_ZONE_UPDATE_UNSECURE
1092 return zone_create
1094 def create_zone(self, zone, aging_enabled=False):
1095 zone_create = self.make_zone_obj(zone, aging_enabled)
1096 try:
1097 client_version = dnsserver.DNS_CLIENT_VERSION_LONGHORN
1098 self.rpc_conn.DnssrvOperation2(client_version,
1100 self.server_ip,
1101 None,
1103 'ZoneCreate',
1104 dnsserver.DNSSRV_TYPEID_ZONE_CREATE,
1105 zone_create)
1106 except WERRORError as e:
1107 self.fail(e)
1109 def set_params(self, **kwargs):
1110 zone = kwargs.pop('zone', None)
1111 for key, val in kwargs.items():
1112 name_param = dnsserver.DNS_RPC_NAME_AND_PARAM()
1113 name_param.dwParam = val
1114 name_param.pszNodeName = key
1116 client_version = dnsserver.DNS_CLIENT_VERSION_LONGHORN
1117 nap_type = dnsserver.DNSSRV_TYPEID_NAME_AND_PARAM
1118 try:
1119 self.rpc_conn.DnssrvOperation2(client_version,
1121 self.server,
1122 zone,
1124 'ResetDwordProperty',
1125 nap_type,
1126 name_param)
1127 except WERRORError as e:
1128 self.fail(str(e))
1130 def ldap_modify_dnsrecs(self, name, func):
1131 dn = 'DC={0},{1}'.format(name, self.zone_dn)
1132 dns_recs = self.ldap_get_dns_records(name)
1133 for rec in dns_recs:
1134 func(rec)
1135 update_dict = {'dn': dn, 'dnsRecord': [ndr_pack(r) for r in dns_recs]}
1136 self.samdb.modify(ldb.Message.from_dict(self.samdb,
1137 update_dict,
1138 ldb.FLAG_MOD_REPLACE))
1140 def dns_update_record(self, prefix, txt):
1141 p = self.make_txt_update(prefix, txt, self.zone)
1142 (code, response) = self.dns_transaction_udp(p, host=self.server_ip)
1143 self.assert_dns_rcode_equals(code, dns.DNS_RCODE_OK)
1144 recs = self.ldap_get_dns_records(prefix)
1145 recs = [r for r in recs if r.data.str == txt]
1146 self.assertEqual(len(recs), 1)
1147 return recs[0]
1149 def dns_tombstone(self, prefix, txt, zone):
1150 name = prefix + "." + zone
1152 to = dnsp.DnssrvRpcRecord()
1153 to.dwTimeStamp = 1000
1154 to.wType = dnsp.DNS_TYPE_TOMBSTONE
1156 self.samdb.dns_replace(name, [to])
1158 def ldap_get_records(self, name):
1159 # The use of SCOPE_SUBTREE here avoids raising an exception in the
1160 # 0 results case for a test below.
1162 expr = "(&(objectClass=dnsNode)(name={0}))".format(name)
1163 return self.samdb.search(base=self.zone_dn, scope=ldb.SCOPE_SUBTREE,
1164 expression=expr, attrs=["*"])
1166 def ldap_get_dns_records(self, name):
1167 records = self.ldap_get_records(name)
1168 return [ndr_unpack(dnsp.DnssrvRpcRecord, r)
1169 for r in records[0].get('dnsRecord')]
1171 def ldap_get_zone_settings(self):
1172 records = self.samdb.search(base=self.zone_dn, scope=ldb.SCOPE_BASE,
1173 expression="(&(objectClass=dnsZone)" +
1174 "(name={0}))".format(self.zone),
1175 attrs=["dNSProperty"])
1176 self.assertEqual(len(records), 1)
1177 props = [ndr_unpack(dnsp.DnsProperty, r)
1178 for r in records[0].get('dNSProperty')]
1180 # We have no choice but to repeat these here.
1181 zone_prop_ids = {0x00: "EMPTY",
1182 0x01: "TYPE",
1183 0x02: "ALLOW_UPDATE",
1184 0x08: "SECURE_TIME",
1185 0x10: "NOREFRESH_INTERVAL",
1186 0x11: "SCAVENGING_SERVERS",
1187 0x12: "AGING_ENABLED_TIME",
1188 0x20: "REFRESH_INTERVAL",
1189 0x40: "AGING_STATE",
1190 0x80: "DELETED_FROM_HOSTNAME",
1191 0x81: "MASTER_SERVERS",
1192 0x82: "AUTO_NS_SERVERS",
1193 0x83: "DCPROMO_CONVERT",
1194 0x90: "SCAVENGING_SERVERS_DA",
1195 0x91: "MASTER_SERVERS_DA",
1196 0x92: "NS_SERVERS_DA",
1197 0x100: "NODE_DBFLAGS"}
1198 return {zone_prop_ids[p.id].lower(): p.data for p in props}
1200 def set_aging(self, enable=False):
1201 self.create_zone(self.zone, aging_enabled=enable)
1202 self.set_params(NoRefreshInterval=1, RefreshInterval=1,
1203 Aging=int(bool(enable)), zone=self.zone,
1204 AllowUpdate=dnsp.DNS_ZONE_UPDATE_UNSECURE)
1206 def test_set_aging(self, enable=True, name='agingtest', txt=['test txt']):
1207 self.set_aging(enable=True)
1208 settings = self.ldap_get_zone_settings()
1209 self.assertTrue(settings['aging_state'] is not None)
1210 self.assertTrue(settings['aging_state'])
1212 rec = self.dns_update_record('agingtest', ['test txt'])
1213 self.assertNotEqual(rec.dwTimeStamp, 0)
1215 def test_set_aging_disabled(self):
1216 self.set_aging(enable=False)
1217 settings = self.ldap_get_zone_settings()
1218 self.assertTrue(settings['aging_state'] is not None)
1219 self.assertFalse(settings['aging_state'])
1221 rec = self.dns_update_record('agingtest', ['test txt'])
1222 self.assertNotEqual(rec.dwTimeStamp, 0)
1224 def test_aging_update(self, enable=True):
1225 name, txt = 'agingtest', ['test txt']
1226 self.set_aging(enable=True)
1227 before_mod = self.dns_update_record(name, txt)
1228 if not enable:
1229 self.set_params(zone=self.zone, Aging=0)
1230 dec = 2
1232 def mod_ts(rec):
1233 self.assertTrue(rec.dwTimeStamp > 0)
1234 rec.dwTimeStamp -= dec
1235 self.ldap_modify_dnsrecs(name, mod_ts)
1236 after_mod = self.ldap_get_dns_records(name)
1237 self.assertEqual(len(after_mod), 1)
1238 after_mod = after_mod[0]
1239 self.assertEqual(after_mod.dwTimeStamp,
1240 before_mod.dwTimeStamp - dec)
1241 after_update = self.dns_update_record(name, txt)
1242 after_should_equal = before_mod if enable else after_mod
1243 self.assertEqual(after_should_equal.dwTimeStamp,
1244 after_update.dwTimeStamp)
1246 def test_aging_update_disabled(self):
1247 self.test_aging_update(enable=False)
1249 def test_aging_refresh(self):
1250 name, txt = 'agingtest', ['test txt']
1251 self.create_zone(self.zone, aging_enabled=True)
1252 interval = 10
1253 self.set_params(NoRefreshInterval=interval, RefreshInterval=interval,
1254 Aging=1, zone=self.zone,
1255 AllowUpdate=dnsp.DNS_ZONE_UPDATE_UNSECURE)
1256 before_mod = self.dns_update_record(name, txt)
1258 def mod_ts(rec):
1259 self.assertTrue(rec.dwTimeStamp > 0)
1260 rec.dwTimeStamp -= interval // 2
1261 self.ldap_modify_dnsrecs(name, mod_ts)
1262 update_during_norefresh = self.dns_update_record(name, txt)
1264 def mod_ts(rec):
1265 self.assertTrue(rec.dwTimeStamp > 0)
1266 rec.dwTimeStamp -= interval + interval // 2
1267 self.ldap_modify_dnsrecs(name, mod_ts)
1268 update_during_refresh = self.dns_update_record(name, txt)
1269 self.assertEqual(update_during_norefresh.dwTimeStamp,
1270 before_mod.dwTimeStamp - interval / 2)
1271 self.assertEqual(update_during_refresh.dwTimeStamp,
1272 before_mod.dwTimeStamp)
1274 def test_rpc_add_no_timestamp(self):
1275 name, txt = 'agingtest', ['test txt']
1276 self.set_aging(enable=True)
1277 rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
1278 rec_buf.rec = TXTRecord(txt)
1279 self.rpc_conn.DnssrvUpdateRecord2(
1280 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1282 self.server_ip,
1283 self.zone,
1284 name,
1285 rec_buf,
1286 None)
1287 recs = self.ldap_get_dns_records(name)
1288 self.assertEqual(len(recs), 1)
1289 self.assertEqual(recs[0].dwTimeStamp, 0)
1291 def test_static_record_dynamic_update(self):
1292 name, txt = 'agingtest', ['test txt']
1293 txt2 = ['test txt2']
1294 self.set_aging(enable=True)
1295 rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
1296 rec_buf.rec = TXTRecord(txt)
1297 self.rpc_conn.DnssrvUpdateRecord2(
1298 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1300 self.server_ip,
1301 self.zone,
1302 name,
1303 rec_buf,
1304 None)
1306 rec2 = self.dns_update_record(name, txt2)
1307 self.assertEqual(rec2.dwTimeStamp, 0)
1309 def test_dynamic_record_static_update(self):
1310 name, txt = 'agingtest', ['test txt']
1311 txt2 = ['test txt2']
1312 txt3 = ['test txt3']
1313 self.set_aging(enable=True)
1315 self.dns_update_record(name, txt)
1317 rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
1318 rec_buf.rec = TXTRecord(txt2)
1319 self.rpc_conn.DnssrvUpdateRecord2(
1320 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1322 self.server_ip,
1323 self.zone,
1324 name,
1325 rec_buf,
1326 None)
1328 self.dns_update_record(name, txt3)
1330 recs = self.ldap_get_dns_records(name)
1331 # Put in dict because ldap recs might be out of order
1332 recs = {str(r.data.str): r for r in recs}
1333 self.assertNotEqual(recs[str(txt)].dwTimeStamp, 0)
1334 self.assertEqual(recs[str(txt2)].dwTimeStamp, 0)
1335 self.assertEqual(recs[str(txt3)].dwTimeStamp, 0)
1337 def test_dns_tombstone_custom_match_rule(self):
1338 lp = self.get_loadparm()
1339 self.samdb = SamDB(url=lp.samdb_url(), lp=lp,
1340 session_info=system_session(),
1341 credentials=self.creds)
1343 name, txt = 'agingtest', ['test txt']
1344 name2, txt2 = 'agingtest2', ['test txt2']
1345 name3, txt3 = 'agingtest3', ['test txt3']
1346 name4, txt4 = 'agingtest4', ['test txt4']
1347 name5, txt5 = 'agingtest5', ['test txt5']
1349 self.create_zone(self.zone, aging_enabled=True)
1350 interval = 10
1351 self.set_params(NoRefreshInterval=interval, RefreshInterval=interval,
1352 Aging=1, zone=self.zone,
1353 AllowUpdate=dnsp.DNS_ZONE_UPDATE_UNSECURE)
1355 self.dns_update_record(name, txt)
1357 self.dns_update_record(name2, txt)
1358 self.dns_update_record(name2, txt2)
1360 self.dns_update_record(name3, txt)
1361 self.dns_update_record(name3, txt2)
1362 last_update = self.dns_update_record(name3, txt3)
1364 # Modify txt1 of the first 2 names
1365 def mod_ts(rec):
1366 if rec.data.str == txt:
1367 rec.dwTimeStamp -= 2
1368 self.ldap_modify_dnsrecs(name, mod_ts)
1369 self.ldap_modify_dnsrecs(name2, mod_ts)
1371 # create a static dns record.
1372 rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
1373 rec_buf.rec = TXTRecord(txt4)
1374 self.rpc_conn.DnssrvUpdateRecord2(
1375 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1377 self.server_ip,
1378 self.zone,
1379 name4,
1380 rec_buf,
1381 None)
1383 # Create a tomb stoned record.
1384 self.dns_update_record(name5, txt5)
1385 self.dns_tombstone(name5, txt5, self.zone)
1387 self.ldap_get_dns_records(name3)
1388 expr = "(dnsRecord:1.3.6.1.4.1.7165.4.5.3:={0})"
1389 expr = expr.format(int(last_update.dwTimeStamp) - 1)
1390 try:
1391 res = self.samdb.search(base=self.zone_dn, scope=ldb.SCOPE_SUBTREE,
1392 expression=expr, attrs=["*"])
1393 except ldb.LdbError as e:
1394 self.fail(str(e))
1395 updated_names = {str(r.get('name')) for r in res}
1396 self.assertEqual(updated_names, set([name, name2]))
1398 def test_dns_tombstone_custom_match_rule_no_records(self):
1399 lp = self.get_loadparm()
1400 self.samdb = SamDB(url=lp.samdb_url(), lp=lp,
1401 session_info=system_session(),
1402 credentials=self.creds)
1404 self.create_zone(self.zone, aging_enabled=True)
1405 interval = 10
1406 self.set_params(NoRefreshInterval=interval, RefreshInterval=interval,
1407 Aging=1, zone=self.zone,
1408 AllowUpdate=dnsp.DNS_ZONE_UPDATE_UNSECURE)
1410 expr = "(dnsRecord:1.3.6.1.4.1.7165.4.5.3:={0})"
1411 expr = expr.format(1)
1413 try:
1414 res = self.samdb.search(base=self.zone_dn, scope=ldb.SCOPE_SUBTREE,
1415 expression=expr, attrs=["*"])
1416 except ldb.LdbError as e:
1417 self.fail(str(e))
1418 self.assertEqual(0, len(res))
1420 def test_dns_tombstone_custom_match_rule_fail(self):
1421 self.create_zone(self.zone, aging_enabled=True)
1422 samdb = SamDB(url=lp.samdb_url(),
1423 lp=lp,
1424 session_info=system_session(),
1425 credentials=self.creds)
1427 # Property name in not dnsRecord
1428 expr = "(dnsProperty:1.3.6.1.4.1.7165.4.5.3:=1)"
1429 res = samdb.search(base=self.zone_dn, scope=ldb.SCOPE_SUBTREE,
1430 expression=expr, attrs=["*"])
1431 self.assertEquals(len(res), 0)
1433 # No value for tombstone time
1434 try:
1435 expr = "(dnsRecord:1.3.6.1.4.1.7165.4.5.3:=)"
1436 res = samdb.search(base=self.zone_dn, scope=ldb.SCOPE_SUBTREE,
1437 expression=expr, attrs=["*"])
1438 self.assertEquals(len(res), 0)
1439 self.fail("Exception: ldb.ldbError not generated")
1440 except ldb.LdbError as e:
1441 (num, msg) = e.args
1442 self.assertEquals(num, ERR_OPERATIONS_ERROR)
1444 # Tombstone time = -
1445 try:
1446 expr = "(dnsRecord:1.3.6.1.4.1.7165.4.5.3:=-)"
1447 res = samdb.search(base=self.zone_dn, scope=ldb.SCOPE_SUBTREE,
1448 expression=expr, attrs=["*"])
1449 self.assertEquals(len(res), 0)
1450 self.fail("Exception: ldb.ldbError not generated")
1451 except ldb.LdbError as e:
1452 (num, _) = e.args
1453 self.assertEquals(num, ERR_OPERATIONS_ERROR)
1455 # Tombstone time longer than 64 characters
1456 try:
1457 expr = "(dnsRecord:1.3.6.1.4.1.7165.4.5.3:={0})"
1458 expr = expr.format("1" * 65)
1459 res = samdb.search(base=self.zone_dn, scope=ldb.SCOPE_SUBTREE,
1460 expression=expr, attrs=["*"])
1461 self.assertEquals(len(res), 0)
1462 self.fail("Exception: ldb.ldbError not generated")
1463 except ldb.LdbError as e:
1464 (num, _) = e.args
1465 self.assertEquals(num, ERR_OPERATIONS_ERROR)
1467 # Non numeric Tombstone time
1468 try:
1469 expr = "(dnsRecord:1.3.6.1.4.1.7165.4.5.3:=expired)"
1470 res = samdb.search(base=self.zone_dn, scope=ldb.SCOPE_SUBTREE,
1471 expression=expr, attrs=["*"])
1472 self.assertEquals(len(res), 0)
1473 self.fail("Exception: ldb.ldbError not generated")
1474 except ldb.LdbError as e:
1475 (num, _) = e.args
1476 self.assertEquals(num, ERR_OPERATIONS_ERROR)
1478 # Non system session
1479 try:
1480 db = SamDB(url="ldap://" + self.server_ip,
1481 lp=self.get_loadparm(),
1482 credentials=self.creds)
1484 expr = "(dnsRecord:1.3.6.1.4.1.7165.4.5.3:=2)"
1485 res = db.search(base=self.zone_dn, scope=ldb.SCOPE_SUBTREE,
1486 expression=expr, attrs=["*"])
1487 self.assertEquals(len(res), 0)
1488 self.fail("Exception: ldb.ldbError not generated")
1489 except ldb.LdbError as e:
1490 (num, _) = e.args
1491 self.assertEquals(num, ERR_OPERATIONS_ERROR)
1493 def test_basic_scavenging(self):
1494 lp = self.get_loadparm()
1495 self.samdb = SamDB(url=lp.samdb_url(), lp=lp,
1496 session_info=system_session(),
1497 credentials=self.creds)
1499 self.create_zone(self.zone, aging_enabled=True)
1500 interval = 1
1501 self.set_params(NoRefreshInterval=interval, RefreshInterval=interval,
1502 zone=self.zone, Aging=1,
1503 AllowUpdate=dnsp.DNS_ZONE_UPDATE_UNSECURE)
1504 name, txt = 'agingtest', ['test txt']
1505 name2, txt2 = 'agingtest2', ['test txt2']
1506 name3, txt3 = 'agingtest3', ['test txt3']
1507 self.dns_update_record(name, txt)
1508 self.dns_update_record(name2, txt)
1509 self.dns_update_record(name2, txt2)
1510 self.dns_update_record(name3, txt)
1511 self.dns_update_record(name3, txt2)
1512 last_add = self.dns_update_record(name3, txt3)
1514 def mod_ts(rec):
1515 self.assertTrue(rec.dwTimeStamp > 0)
1516 if rec.data.str == txt:
1517 rec.dwTimeStamp -= interval * 5
1518 self.ldap_modify_dnsrecs(name, mod_ts)
1519 self.ldap_modify_dnsrecs(name2, mod_ts)
1520 self.ldap_modify_dnsrecs(name3, mod_ts)
1521 self.assertTrue(callable(getattr(dsdb, '_scavenge_dns_records', None)))
1522 dsdb._scavenge_dns_records(self.samdb)
1524 recs = self.ldap_get_dns_records(name)
1525 self.assertEqual(len(recs), 1)
1526 self.assertEqual(recs[0].wType, dnsp.DNS_TYPE_TOMBSTONE)
1528 recs = self.ldap_get_dns_records(name2)
1529 self.assertEqual(len(recs), 1)
1530 self.assertEqual(recs[0].wType, dnsp.DNS_TYPE_TXT)
1531 self.assertEqual(recs[0].data.str, txt2)
1533 recs = self.ldap_get_dns_records(name3)
1534 self.assertEqual(len(recs), 2)
1535 txts = {str(r.data.str) for r in recs}
1536 self.assertEqual(txts, {str(txt2), str(txt3)})
1537 self.assertEqual(recs[0].wType, dnsp.DNS_TYPE_TXT)
1538 self.assertEqual(recs[1].wType, dnsp.DNS_TYPE_TXT)
1540 for make_it_work in [False, True]:
1541 inc = -1 if make_it_work else 1
1543 def mod_ts(rec):
1544 rec.data = (last_add.dwTimeStamp - 24 * 14) + inc
1545 self.ldap_modify_dnsrecs(name, mod_ts)
1546 dsdb._dns_delete_tombstones(self.samdb)
1547 recs = self.ldap_get_records(name)
1548 if make_it_work:
1549 self.assertEqual(len(recs), 0)
1550 else:
1551 self.assertEqual(len(recs), 1)
1553 def test_fully_qualified_zone(self):
1555 def create_zone_expect_exists(zone):
1556 try:
1557 zone_create = self.make_zone_obj(zone)
1558 client_version = dnsserver.DNS_CLIENT_VERSION_LONGHORN
1559 zc_type = dnsserver.DNSSRV_TYPEID_ZONE_CREATE
1560 self.rpc_conn.DnssrvOperation2(client_version,
1562 self.server_ip,
1563 None,
1565 'ZoneCreate',
1566 zc_type,
1567 zone_create)
1568 except WERRORError as e:
1569 enum, _ = e.args
1570 if enum != werror.WERR_DNS_ERROR_ZONE_ALREADY_EXISTS:
1571 self.fail(e)
1572 return
1573 self.fail("Zone {} should already exist".format(zone))
1575 # Create unqualified, then check creating qualified fails.
1576 self.create_zone(self.zone)
1577 create_zone_expect_exists(self.zone + '.')
1579 # Same again, but the other way around.
1580 self.create_zone(self.zone + '2.')
1581 create_zone_expect_exists(self.zone + '2')
1583 client_version = dnsserver.DNS_CLIENT_VERSION_LONGHORN
1584 request_filter = dnsserver.DNS_ZONE_REQUEST_PRIMARY
1585 tid = dnsserver.DNSSRV_TYPEID_DWORD
1586 typeid, res = self.rpc_conn.DnssrvComplexOperation2(client_version,
1588 self.server_ip,
1589 None,
1590 'EnumZones',
1591 tid,
1592 request_filter)
1594 self.delete_zone(self.zone)
1595 self.delete_zone(self.zone + '2')
1597 # Two zones should've been created, neither of them fully qualified.
1598 zones_we_just_made = []
1599 zones = [str(z.pszZoneName) for z in res.ZoneArray]
1600 for zone in zones:
1601 if zone.startswith(self.zone):
1602 zones_we_just_made.append(zone)
1603 self.assertEqual(len(zones_we_just_made), 2)
1604 self.assertEqual(set(zones_we_just_made), {self.zone + '2', self.zone})
1606 def delete_zone(self, zone):
1607 self.rpc_conn.DnssrvOperation2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1609 self.server_ip,
1610 zone,
1612 'DeleteZoneFromDs',
1613 dnsserver.DNSSRV_TYPEID_NULL,
1614 None)
1616 def test_soa_query(self):
1617 zone = "test.lan"
1618 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
1619 questions = []
1621 q = self.make_name_question(zone, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
1622 questions.append(q)
1623 self.finish_name_packet(p, questions)
1625 (response, response_packet) =\
1626 self.dns_transaction_udp(p, host=server_ip)
1627 # Windows returns OK while BIND logically seems to return NXDOMAIN
1628 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NXDOMAIN)
1629 self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
1630 self.assertEquals(response.ancount, 0)
1632 self.create_zone(zone)
1633 (response, response_packet) =\
1634 self.dns_transaction_udp(p, host=server_ip)
1635 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
1636 self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
1637 self.assertEquals(response.ancount, 1)
1638 self.assertEquals(response.answers[0].rr_type, dns.DNS_QTYPE_SOA)
1640 self.delete_zone(zone)
1641 (response, response_packet) =\
1642 self.dns_transaction_udp(p, host=server_ip)
1643 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NXDOMAIN)
1644 self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
1645 self.assertEquals(response.ancount, 0)
1648 class TestRPCRoundtrip(DNSTest):
1649 def setUp(self):
1650 super(TestRPCRoundtrip, self).setUp()
1651 global server, server_ip, lp, creds
1652 self.server = server_name
1653 self.server_ip = server_ip
1654 self.lp = lp
1655 self.creds = creds
1656 self.rpc_conn = dnsserver.dnsserver("ncacn_ip_tcp:%s[sign]" %
1657 (self.server_ip),
1658 self.lp,
1659 self.creds)
1661 def tearDown(self):
1662 super(TestRPCRoundtrip, self).tearDown()
1664 def rpc_update(self, fqn=None, data=None, wType=None, delete=False):
1665 fqn = fqn or ("rpctestrec." + self.get_dns_domain())
1667 rec = data_to_dns_record(wType, data)
1668 add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
1669 add_rec_buf.rec = rec
1671 add_arg = add_rec_buf
1672 del_arg = None
1673 if delete:
1674 add_arg = None
1675 del_arg = add_rec_buf
1677 self.rpc_conn.DnssrvUpdateRecord2(
1678 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1680 self.server_ip,
1681 self.get_dns_domain(),
1682 fqn,
1683 add_arg,
1684 del_arg)
1686 def test_rpc_self_referencing_cname(self):
1687 cname = "cnametest2_unqual_rec_loop"
1688 cname_fqn = "%s.%s" % (cname, self.get_dns_domain())
1690 try:
1691 self.rpc_update(fqn=cname, data=cname_fqn,
1692 wType=dnsp.DNS_TYPE_CNAME, delete=True)
1693 except WERRORError as e:
1694 if e.args[0] != werror.WERR_DNS_ERROR_RECORD_DOES_NOT_EXIST:
1695 self.fail("RPC DNS gaven wrong error on pre-test cleanup "
1696 "for self referencing CNAME: %s" % e.args[0])
1698 try:
1699 self.rpc_update(fqn=cname, wType=dnsp.DNS_TYPE_CNAME, data=cname_fqn)
1700 except WERRORError as e:
1701 if e.args[0] != werror.WERR_DNS_ERROR_CNAME_LOOP:
1702 self.fail("RPC DNS gaven wrong error on insertion of "
1703 "self referencing CNAME: %s" % e.args[0])
1704 return
1706 self.fail("RPC DNS allowed insertion of self referencing CNAME")
1708 def test_update_add_txt_rpc_to_dns(self):
1709 prefix, txt = 'rpctextrec', ['"This is a test"']
1711 name = "%s.%s" % (prefix, self.get_dns_domain())
1713 rec = data_to_dns_record(dnsp.DNS_TYPE_TXT, '"\\"This is a test\\""')
1714 add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
1715 add_rec_buf.rec = rec
1716 try:
1717 self.rpc_conn.DnssrvUpdateRecord2(
1718 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1720 self.server_ip,
1721 self.get_dns_domain(),
1722 name,
1723 add_rec_buf,
1724 None)
1726 except WERRORError as e:
1727 self.fail(str(e))
1729 try:
1730 self.check_query_txt(prefix, txt)
1731 finally:
1732 self.rpc_conn.DnssrvUpdateRecord2(
1733 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1735 self.server_ip,
1736 self.get_dns_domain(),
1737 name,
1738 None,
1739 add_rec_buf)
1741 def test_update_add_null_padded_txt_record(self):
1742 "test adding records works"
1743 prefix, txt = 'pad1textrec', ['"This is a test"', '', '']
1744 p = self.make_txt_update(prefix, txt)
1745 (response, response_packet) =\
1746 self.dns_transaction_udp(p, host=server_ip)
1747 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
1748 self.check_query_txt(prefix, txt)
1749 self.assertIsNotNone(
1750 dns_record_match(self.rpc_conn,
1751 self.server_ip,
1752 self.get_dns_domain(),
1753 "%s.%s" % (prefix, self.get_dns_domain()),
1754 dnsp.DNS_TYPE_TXT,
1755 '"\\"This is a test\\"" "" ""'))
1757 prefix, txt = 'pad2textrec', ['"This is a test"', '', '', 'more text']
1758 p = self.make_txt_update(prefix, txt)
1759 (response, response_packet) =\
1760 self.dns_transaction_udp(p, host=server_ip)
1761 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
1762 self.check_query_txt(prefix, txt)
1763 self.assertIsNotNone(
1764 dns_record_match(
1765 self.rpc_conn,
1766 self.server_ip,
1767 self.get_dns_domain(),
1768 "%s.%s" % (prefix, self.get_dns_domain()),
1769 dnsp.DNS_TYPE_TXT,
1770 '"\\"This is a test\\"" "" "" "more text"'))
1772 prefix, txt = 'pad3textrec', ['', '', '"This is a test"']
1773 p = self.make_txt_update(prefix, txt)
1774 (response, response_packet) =\
1775 self.dns_transaction_udp(p, host=server_ip)
1776 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
1777 self.check_query_txt(prefix, txt)
1778 self.assertIsNotNone(
1779 dns_record_match(
1780 self.rpc_conn,
1781 self.server_ip,
1782 self.get_dns_domain(),
1783 "%s.%s" % (prefix, self.get_dns_domain()),
1784 dnsp.DNS_TYPE_TXT,
1785 '"" "" "\\"This is a test\\""'))
1787 def test_update_add_padding_rpc_to_dns(self):
1788 prefix, txt = 'pad1textrec', ['"This is a test"', '', '']
1789 prefix = 'rpc' + prefix
1790 name = "%s.%s" % (prefix, self.get_dns_domain())
1792 rec = data_to_dns_record(dnsp.DNS_TYPE_TXT,
1793 '"\\"This is a test\\"" "" ""')
1794 add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
1795 add_rec_buf.rec = rec
1796 try:
1797 self.rpc_conn.DnssrvUpdateRecord2(
1798 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1800 self.server_ip,
1801 self.get_dns_domain(),
1802 name,
1803 add_rec_buf,
1804 None)
1806 except WERRORError as e:
1807 self.fail(str(e))
1809 try:
1810 self.check_query_txt(prefix, txt)
1811 finally:
1812 self.rpc_conn.DnssrvUpdateRecord2(
1813 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1815 self.server_ip,
1816 self.get_dns_domain(),
1817 name,
1818 None,
1819 add_rec_buf)
1821 prefix, txt = 'pad2textrec', ['"This is a test"', '', '', 'more text']
1822 prefix = 'rpc' + prefix
1823 name = "%s.%s" % (prefix, self.get_dns_domain())
1825 rec = data_to_dns_record(dnsp.DNS_TYPE_TXT,
1826 '"\\"This is a test\\"" "" "" "more text"')
1827 add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
1828 add_rec_buf.rec = rec
1829 try:
1830 self.rpc_conn.DnssrvUpdateRecord2(
1831 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1833 self.server_ip,
1834 self.get_dns_domain(),
1835 name,
1836 add_rec_buf,
1837 None)
1839 except WERRORError as e:
1840 self.fail(str(e))
1842 try:
1843 self.check_query_txt(prefix, txt)
1844 finally:
1845 self.rpc_conn.DnssrvUpdateRecord2(
1846 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1848 self.server_ip,
1849 self.get_dns_domain(),
1850 name,
1851 None,
1852 add_rec_buf)
1854 prefix, txt = 'pad3textrec', ['', '', '"This is a test"']
1855 prefix = 'rpc' + prefix
1856 name = "%s.%s" % (prefix, self.get_dns_domain())
1858 rec = data_to_dns_record(dnsp.DNS_TYPE_TXT,
1859 '"" "" "\\"This is a test\\""')
1860 add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
1861 add_rec_buf.rec = rec
1862 try:
1863 self.rpc_conn.DnssrvUpdateRecord2(
1864 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1866 self.server_ip,
1867 self.get_dns_domain(),
1868 name,
1869 add_rec_buf,
1870 None)
1871 except WERRORError as e:
1872 self.fail(str(e))
1874 try:
1875 self.check_query_txt(prefix, txt)
1876 finally:
1877 self.rpc_conn.DnssrvUpdateRecord2(
1878 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1880 self.server_ip,
1881 self.get_dns_domain(),
1882 name,
1883 None,
1884 add_rec_buf)
1886 # Test is incomplete due to strlen against txt records
1887 def test_update_add_null_char_txt_record(self):
1888 "test adding records works"
1889 prefix, txt = 'nulltextrec', ['NULL\x00BYTE']
1890 p = self.make_txt_update(prefix, txt)
1891 (response, response_packet) =\
1892 self.dns_transaction_udp(p, host=server_ip)
1893 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
1894 self.check_query_txt(prefix, ['NULL'])
1895 self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server_ip,
1896 self.get_dns_domain(),
1897 "%s.%s" % (prefix, self.get_dns_domain()),
1898 dnsp.DNS_TYPE_TXT, '"NULL"'))
1900 prefix, txt = 'nulltextrec2', ['NULL\x00BYTE', 'NULL\x00BYTE']
1901 p = self.make_txt_update(prefix, txt)
1902 (response, response_packet) =\
1903 self.dns_transaction_udp(p, host=server_ip)
1904 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
1905 self.check_query_txt(prefix, ['NULL', 'NULL'])
1906 self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server_ip,
1907 self.get_dns_domain(),
1908 "%s.%s" % (prefix, self.get_dns_domain()),
1909 dnsp.DNS_TYPE_TXT, '"NULL" "NULL"'))
1911 def test_update_add_null_char_rpc_to_dns(self):
1912 prefix = 'rpcnulltextrec'
1913 name = "%s.%s" % (prefix, self.get_dns_domain())
1915 rec = data_to_dns_record(dnsp.DNS_TYPE_TXT, '"NULL\x00BYTE"')
1916 add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
1917 add_rec_buf.rec = rec
1918 try:
1919 self.rpc_conn.DnssrvUpdateRecord2(
1920 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1922 self.server_ip,
1923 self.get_dns_domain(),
1924 name,
1925 add_rec_buf,
1926 None)
1928 except WERRORError as e:
1929 self.fail(str(e))
1931 try:
1932 self.check_query_txt(prefix, ['NULL'])
1933 finally:
1934 self.rpc_conn.DnssrvUpdateRecord2(
1935 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1937 self.server_ip,
1938 self.get_dns_domain(),
1939 name,
1940 None,
1941 add_rec_buf)
1943 def test_update_add_hex_char_txt_record(self):
1944 "test adding records works"
1945 prefix, txt = 'hextextrec', ['HIGH\xFFBYTE']
1946 p = self.make_txt_update(prefix, txt)
1947 (response, response_packet) =\
1948 self.dns_transaction_udp(p, host=server_ip)
1949 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
1950 self.check_query_txt(prefix, txt)
1951 self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server_ip,
1952 self.get_dns_domain(),
1953 "%s.%s" % (prefix, self.get_dns_domain()),
1954 dnsp.DNS_TYPE_TXT, '"HIGH\xFFBYTE"'))
1956 def test_update_add_hex_rpc_to_dns(self):
1957 prefix, txt = 'hextextrec', ['HIGH\xFFBYTE']
1958 prefix = 'rpc' + prefix
1959 name = "%s.%s" % (prefix, self.get_dns_domain())
1961 rec = data_to_dns_record(dnsp.DNS_TYPE_TXT, '"HIGH\xFFBYTE"')
1962 add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
1963 add_rec_buf.rec = rec
1964 try:
1965 self.rpc_conn.DnssrvUpdateRecord2(
1966 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1968 self.server_ip,
1969 self.get_dns_domain(),
1970 name,
1971 add_rec_buf,
1972 None)
1974 except WERRORError as e:
1975 self.fail(str(e))
1977 try:
1978 self.check_query_txt(prefix, txt)
1979 finally:
1980 self.rpc_conn.DnssrvUpdateRecord2(
1981 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1983 self.server_ip,
1984 self.get_dns_domain(),
1985 name,
1986 None,
1987 add_rec_buf)
1989 def test_update_add_slash_txt_record(self):
1990 "test adding records works"
1991 prefix, txt = 'slashtextrec', ['Th\\=is=is a test']
1992 p = self.make_txt_update(prefix, txt)
1993 (response, response_packet) =\
1994 self.dns_transaction_udp(p, host=server_ip)
1995 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
1996 self.check_query_txt(prefix, txt)
1997 self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server_ip,
1998 self.get_dns_domain(),
1999 "%s.%s" % (prefix, self.get_dns_domain()),
2000 dnsp.DNS_TYPE_TXT, '"Th\\\\=is=is a test"'))
2002 # This test fails against Windows as it eliminates slashes in RPC
2003 # One typical use for a slash is in records like 'var=value' to
2004 # escape '=' characters.
2005 def test_update_add_slash_rpc_to_dns(self):
2006 prefix, txt = 'slashtextrec', ['Th\\=is=is a test']
2007 prefix = 'rpc' + prefix
2008 name = "%s.%s" % (prefix, self.get_dns_domain())
2010 rec = data_to_dns_record(dnsp.DNS_TYPE_TXT, '"Th\\\\=is=is a test"')
2011 add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
2012 add_rec_buf.rec = rec
2013 try:
2014 self.rpc_conn.DnssrvUpdateRecord2(
2015 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
2017 self.server_ip,
2018 self.get_dns_domain(),
2019 name,
2020 add_rec_buf,
2021 None)
2023 except WERRORError as e:
2024 self.fail(str(e))
2026 try:
2027 self.check_query_txt(prefix, txt)
2029 finally:
2030 self.rpc_conn.DnssrvUpdateRecord2(
2031 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
2033 self.server_ip,
2034 self.get_dns_domain(),
2035 name,
2036 None,
2037 add_rec_buf)
2039 def test_update_add_two_txt_records(self):
2040 "test adding two txt records works"
2041 prefix, txt = 'textrec2', ['"This is a test"',
2042 '"and this is a test, too"']
2043 p = self.make_txt_update(prefix, txt)
2044 (response, response_packet) =\
2045 self.dns_transaction_udp(p, host=server_ip)
2046 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
2047 self.check_query_txt(prefix, txt)
2048 self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server_ip,
2049 self.get_dns_domain(),
2050 "%s.%s" % (prefix, self.get_dns_domain()),
2051 dnsp.DNS_TYPE_TXT, '"\\"This is a test\\""' +
2052 ' "\\"and this is a test, too\\""'))
2054 def test_update_add_two_rpc_to_dns(self):
2055 prefix, txt = 'textrec2', ['"This is a test"',
2056 '"and this is a test, too"']
2057 prefix = 'rpc' + prefix
2058 name = "%s.%s" % (prefix, self.get_dns_domain())
2060 rec = data_to_dns_record(dnsp.DNS_TYPE_TXT,
2061 '"\\"This is a test\\""' +
2062 ' "\\"and this is a test, too\\""')
2063 add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
2064 add_rec_buf.rec = rec
2065 try:
2066 self.rpc_conn.DnssrvUpdateRecord2(
2067 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
2069 self.server_ip,
2070 self.get_dns_domain(),
2071 name,
2072 add_rec_buf,
2073 None)
2075 except WERRORError as e:
2076 self.fail(str(e))
2078 try:
2079 self.check_query_txt(prefix, txt)
2080 finally:
2081 self.rpc_conn.DnssrvUpdateRecord2(
2082 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
2084 self.server_ip,
2085 self.get_dns_domain(),
2086 name,
2087 None,
2088 add_rec_buf)
2090 def test_update_add_empty_txt_records(self):
2091 "test adding two txt records works"
2092 prefix, txt = 'emptytextrec', []
2093 p = self.make_txt_update(prefix, txt)
2094 (response, response_packet) =\
2095 self.dns_transaction_udp(p, host=server_ip)
2096 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
2097 self.check_query_txt(prefix, txt)
2098 self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server_ip,
2099 self.get_dns_domain(),
2100 "%s.%s" % (prefix, self.get_dns_domain()),
2101 dnsp.DNS_TYPE_TXT, ''))
2103 def test_update_add_empty_rpc_to_dns(self):
2104 prefix, txt = 'rpcemptytextrec', []
2106 name = "%s.%s" % (prefix, self.get_dns_domain())
2108 rec = data_to_dns_record(dnsp.DNS_TYPE_TXT, '')
2109 add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
2110 add_rec_buf.rec = rec
2111 try:
2112 self.rpc_conn.DnssrvUpdateRecord2(
2113 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
2115 self.server_ip,
2116 self.get_dns_domain(),
2117 name,
2118 add_rec_buf,
2119 None)
2120 except WERRORError as e:
2121 self.fail(str(e))
2123 try:
2124 self.check_query_txt(prefix, txt)
2125 finally:
2126 self.rpc_conn.DnssrvUpdateRecord2(
2127 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
2129 self.server_ip,
2130 self.get_dns_domain(),
2131 name,
2132 None,
2133 add_rec_buf)
2136 TestProgram(module=__name__, opts=subunitopts)