tests/samba-tool: dns cleanup should work with a missing name
[Samba.git] / python / samba / tests / samba_tool / dnscmd.py
blob780af4f8bd064798288b4e3308cfdbc8c0bbacfe
1 # Unix SMB/CIFS implementation.
2 # Copyright (C) Andrew Bartlett <abartlet@catalyst.net.nz>
4 # This program is free software; you can redistribute it and/or modify
5 # it under the terms of the GNU General Public License as published by
6 # the Free Software Foundation; either version 3 of the License, or
7 # (at your option) any later version.
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
14 # You should have received a copy of the GNU General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 import os
19 import ldb
21 from samba.auth import system_session
22 from samba.samdb import SamDB
23 from samba.ndr import ndr_unpack, ndr_pack
24 from samba.dcerpc import dnsp
25 from samba.tests.samba_tool.base import SambaToolCmdTest
27 class DnsCmdTestCase(SambaToolCmdTest):
28 def setUp(self):
29 super(DnsCmdTestCase, self).setUp()
31 self.dburl = "ldap://%s" % os.environ["SERVER"]
32 self.creds_string = "-U%s%%%s" % (os.environ["DC_USERNAME"],
33 os.environ["DC_PASSWORD"])
35 self.samdb = self.getSamDB("-H", self.dburl, self.creds_string)
36 self.config_dn = str(self.samdb.get_config_basedn())
38 self.testip = "192.168.0.193"
39 self.testip2 = "192.168.0.194"
41 self.addZone()
43 # Note: SOA types don't work (and shouldn't), as we only have one zone per DNS record.
45 good_dns = ["SAMDOM.EXAMPLE.COM",
46 "1.EXAMPLE.COM",
47 "%sEXAMPLE.COM" % ("1."*100),
48 "EXAMPLE",
49 "\n.COM",
50 "!@#$%^&*()_",
51 "HIGH\xFFBYTE",
52 "@.EXAMPLE.COM",
53 "."]
54 bad_dns = ["...",
55 ".EXAMPLE.COM",
56 ".EXAMPLE.",
57 "",
58 "SAMDOM..EXAMPLE.COM"]
60 good_mx = ["SAMDOM.EXAMPLE.COM 65530"]
61 bad_mx = ["SAMDOM.EXAMPLE.COM -1",
62 "SAMDOM.EXAMPLE.COM",
63 " ",
64 "SAMDOM.EXAMPLE.COM 1 1",
65 "SAMDOM.EXAMPLE.COM SAMDOM.EXAMPLE.COM"]
67 good_srv = ["SAMDOM.EXAMPLE.COM 65530 65530 65530"]
68 bad_srv = ["SAMDOM.EXAMPLE.COM 0 65536 0",
69 "SAMDOM.EXAMPLE.COM 0 0 65536",
70 "SAMDOM.EXAMPLE.COM 65536 0 0" ]
72 for bad_dn in bad_dns:
73 bad_mx.append("%s 1" % bad_dn)
74 bad_srv.append("%s 0 0 0" % bad_dn)
75 for good_dn in good_dns:
76 good_mx.append("%s 1" % good_dn)
77 good_srv.append("%s 0 0 0" % good_dn)
79 self.good_records = {
80 "A":["192.168.0.1", "255.255.255.255"],
81 "AAAA":["1234:5678:9ABC:DEF0:0000:0000:0000:0000",
82 "0000:0000:0000:0000:0000:0000:0000:0000",
83 "1234:5678:9ABC:DEF0:1234:5678:9ABC:DEF0",
84 "1234:1234:1234::",
85 "1234:5678:9ABC:DEF0::",
86 "0000:0000::0000",
87 "1234::5678:9ABC:0000:0000:0000:0000",
88 "::1",
89 "::",
90 "1:1:1:1:1:1:1:1"],
91 "PTR":good_dns,
92 "CNAME":good_dns,
93 "NS":good_dns,
94 "MX":good_mx,
95 "SRV":good_srv,
96 "TXT":["text", "", "@#!", "\n"]
99 self.bad_records = {
100 "A":["192.168.0.500",
101 "255.255.255.255/32"],
102 "AAAA":["GGGG:1234:5678:9ABC:0000:0000:0000:0000",
103 "0000:0000:0000:0000:0000:0000:0000:0000/1",
104 "AAAA:AAAA:AAAA:AAAA:G000:0000:0000:1234",
105 "1234:5678:9ABC:DEF0:1234:5678:9ABC:DEF0:1234",
106 "1234:5678:9ABC:DEF0:1234:5678:9ABC",
107 "1111::1111::1111"],
108 "PTR":bad_dns,
109 "CNAME":bad_dns,
110 "NS":bad_dns,
111 "MX":bad_mx,
112 "SRV":bad_srv
115 def tearDown(self):
116 self.deleteZone()
117 super(DnsCmdTestCase, self).tearDown()
119 def resetZone(self):
120 self.deleteZone()
121 self.addZone()
123 def addZone(self):
124 self.zone = "zone"
125 result, out, err = self.runsubcmd("dns",
126 "zonecreate",
127 os.environ["SERVER"],
128 self.zone,
129 self.creds_string)
130 self.assertCmdSuccess(result, out, err)
132 def deleteZone(self):
133 result, out, err = self.runsubcmd("dns",
134 "zonedelete",
135 os.environ["SERVER"],
136 self.zone,
137 self.creds_string)
138 self.assertCmdSuccess(result, out, err)
140 def get_record_from_db(self, zone_name, record_name):
141 zones = self.samdb.search(base="DC=DomainDnsZones,%s"
142 % self.samdb.get_default_basedn(),
143 scope=ldb.SCOPE_SUBTREE,
144 expression="(objectClass=dnsZone)",
145 attrs=["cn"])
147 for zone in zones:
148 if zone_name in str(zone.dn):
149 zone_dn = zone.dn
150 break
152 records = self.samdb.search(base=zone_dn, scope=ldb.SCOPE_SUBTREE,
153 expression="(objectClass=dnsNode)",
154 attrs=["dnsRecord"])
156 for old_packed_record in records:
157 if record_name in str(old_packed_record.dn):
158 return (old_packed_record.dn,
159 ndr_unpack(dnsp.DnssrvRpcRecord,
160 old_packed_record["dnsRecord"][0]))
162 def test_rank_none(self):
163 record_str = "192.168.50.50"
164 record_type_str = "A"
166 result, out, err = self.runsubcmd("dns", "add", os.environ["SERVER"],
167 self.zone, "testrecord", record_type_str,
168 record_str, self.creds_string)
169 self.assertCmdSuccess(result, out, err,
170 "Failed to add record '%s' with type %s."
171 % (record_str, record_type_str))
173 dn, record = self.get_record_from_db(self.zone, "testrecord")
174 record.rank = 0 # DNS_RANK_NONE
175 res = self.samdb.dns_replace_by_dn(dn, [record])
176 if res is not None:
177 self.fail("Unable to update dns record to have DNS_RANK_NONE.")
179 errors = []
181 # The record should still exist
182 result, out, err = self.runsubcmd("dns", "query", os.environ["SERVER"],
183 self.zone, "testrecord", record_type_str,
184 self.creds_string)
185 try:
186 self.assertCmdSuccess(result, out, err,
187 "Failed to query for a record" \
188 "which had DNS_RANK_NONE.")
189 self.assertTrue("testrecord" in out and record_str in out,
190 "Query for a record which had DNS_RANK_NONE" \
191 "succeeded but produced no resulting records.")
192 except AssertionError, e:
193 # Windows produces no resulting records
194 pass
196 # We should not be able to add a duplicate
197 result, out, err = self.runsubcmd("dns", "add", os.environ["SERVER"],
198 self.zone, "testrecord", record_type_str,
199 record_str, self.creds_string)
200 try:
201 self.assertCmdFail(result, "Successfully added duplicate record" \
202 "of one which had DNS_RANK_NONE.")
203 except AssertionError, e:
204 errors.append(e)
206 # We should be able to delete it
207 result, out, err = self.runsubcmd("dns", "delete", os.environ["SERVER"],
208 self.zone, "testrecord", record_type_str,
209 record_str, self.creds_string)
210 try:
211 self.assertCmdSuccess(result, out, err, "Failed to delete record" \
212 "which had DNS_RANK_NONE.")
213 except AssertionError, e:
214 errors.append(e)
216 # Now the record should not exist
217 result, out, err = self.runsubcmd("dns", "query", os.environ["SERVER"],
218 self.zone, "testrecord",
219 record_type_str, self.creds_string)
220 try:
221 self.assertCmdFail(result, "Successfully queried for deleted record" \
222 "which had DNS_RANK_NONE.")
223 except AssertionError, e:
224 errors.append(e)
226 if len(errors) > 0:
227 err_str = "Failed appropriate behaviour with DNS_RANK_NONE:"
228 for error in errors:
229 err_str = err_str + "\n" + str(error)
230 raise AssertionError(err_str)
232 def test_accept_valid_commands(self):
234 For all good records, attempt to add, query and delete them.
236 num_failures = 0
237 failure_msgs = []
238 for dnstype in self.good_records:
239 for record in self.good_records[dnstype]:
240 try:
241 result, out, err = self.runsubcmd("dns", "add",
242 os.environ["SERVER"],
243 self.zone, "testrecord",
244 dnstype, record,
245 self.creds_string)
246 self.assertCmdSuccess(result, out, err, "Failed to add" \
247 "record %s with type %s."
248 % (record, dnstype))
250 result, out, err = self.runsubcmd("dns", "query",
251 os.environ["SERVER"],
252 self.zone, "testrecord",
253 dnstype,
254 self.creds_string)
255 self.assertCmdSuccess(result, out, err, "Failed to query" \
256 "record %s with qualifier %s."
257 % (record, dnstype))
259 result, out, err = self.runsubcmd("dns", "delete",
260 os.environ["SERVER"],
261 self.zone, "testrecord",
262 dnstype, record,
263 self.creds_string)
264 self.assertCmdSuccess(result, out, err, "Failed to remove" \
265 "record %s with type %s."
266 % (record, dnstype))
267 except AssertionError as e:
268 num_failures = num_failures + 1
269 failure_msgs.append(e)
271 if num_failures > 0:
272 for msg in failure_msgs:
273 print(msg)
274 self.fail("Failed to accept valid commands. %d total failures." \
275 "Errors above." % num_failures)
277 def test_reject_invalid_commands(self):
279 For all bad records, attempt to add them and update to them,
280 making sure that both operations fail.
282 num_failures = 0
283 failure_msgs = []
285 # Add invalid records and make sure they fail to be added
286 for dnstype in self.bad_records:
287 for record in self.bad_records[dnstype]:
288 try:
289 result, out, err = self.runsubcmd("dns", "add",
290 os.environ["SERVER"],
291 self.zone, "testrecord",
292 dnstype, record,
293 self.creds_string)
294 self.assertCmdFail(result, "Successfully added invalid" \
295 "record '%s' of type '%s'."
296 % (record, dnstype))
297 except AssertionError as e:
298 num_failures = num_failures + 1
299 failure_msgs.append(e)
300 self.resetZone()
301 try:
302 result, out, err = self.runsubcmd("dns", "delete",
303 os.environ["SERVER"],
304 self.zone, "testrecord",
305 dnstype, record,
306 self.creds_string)
307 self.assertCmdFail(result, "Successfully deleted invalid" \
308 "record '%s' of type '%s' which" \
309 "shouldn't exist." % (record, dnstype))
310 except AssertionError as e:
311 num_failures = num_failures + 1
312 failure_msgs.append(e)
313 self.resetZone()
315 # Update valid records to invalid ones and make sure they
316 # fail to be updated
317 for dnstype in self.bad_records:
318 for bad_record in self.bad_records[dnstype]:
319 good_record = self.good_records[dnstype][0]
321 try:
322 result, out, err = self.runsubcmd("dns", "add",
323 os.environ["SERVER"],
324 self.zone, "testrecord",
325 dnstype, good_record,
326 self.creds_string)
327 self.assertCmdSuccess(result, out, err, "Failed to add " \
328 "record '%s' with type %s."
329 % (record, dnstype))
331 result, out, err = self.runsubcmd("dns", "update",
332 os.environ["SERVER"],
333 self.zone, "testrecord",
334 dnstype, good_record,
335 bad_record,
336 self.creds_string)
337 self.assertCmdFail(result, "Successfully updated valid " \
338 "record '%s' of type '%s' to invalid " \
339 "record '%s' of the same type."
340 % (good_record, dnstype, bad_record))
342 result, out, err = self.runsubcmd("dns", "delete",
343 os.environ["SERVER"],
344 self.zone, "testrecord",
345 dnstype, good_record,
346 self.creds_string)
347 self.assertCmdSuccess(result, out, err, "Could not delete " \
348 "valid record '%s' of type '%s'."
349 % (good_record, dnstype))
350 except AssertionError as e:
351 num_failures = num_failures + 1
352 failure_msgs.append(e)
353 self.resetZone()
355 if num_failures > 0:
356 for msg in failure_msgs:
357 print(msg)
358 self.fail("Failed to reject invalid commands. %d total failures. " \
359 "Errors above." % num_failures)
361 def test_update_invalid_type(self):
363 Make sure that a record can't be updated to one of a different type.
365 for dnstype1 in self.good_records:
366 record1 = self.good_records[dnstype1][0]
367 result, out, err = self.runsubcmd("dns", "add",
368 os.environ["SERVER"],
369 self.zone, "testrecord",
370 dnstype1, record1,
371 self.creds_string)
372 self.assertCmdSuccess(result, out, err, "Failed to add " \
373 "record %s with type %s."
374 % (record1, dnstype1))
376 for dnstype2 in self.good_records:
377 record2 = self.good_records[dnstype2][0]
379 # Make sure that record2 isn't a valid entry of dnstype1.
380 # For example, any A-type will also be a valid TXT-type.
381 result, out, err = self.runsubcmd("dns", "add",
382 os.environ["SERVER"],
383 self.zone, "testrecord",
384 dnstype1, record2,
385 self.creds_string)
386 try:
387 self.assertCmdFail(result)
388 except AssertionError:
389 continue # Don't check this one, because record2 _is_ a valid entry of dnstype1.
391 # Check both ways: Give the current type and try to update,
392 # and give the new type and try to update.
393 result, out, err = self.runsubcmd("dns", "update",
394 os.environ["SERVER"],
395 self.zone, "testrecord",
396 dnstype1, record1,
397 record2, self.creds_string)
398 self.assertCmdFail(result, "Successfully updated record '%s' " \
399 "to '%s', even though the latter is of " \
400 "type '%s' where '%s' was expected."
401 % (record1, record2, dnstype2, dnstype1))
403 result, out, err = self.runsubcmd("dns", "update",
404 os.environ["SERVER"],
405 self.zone, "testrecord",
406 dnstype2, record1, record2,
407 self.creds_string)
408 self.assertCmdFail(result, "Successfully updated record " \
409 "'%s' to '%s', even though the former " \
410 "is of type '%s' where '%s' was expected."
411 % (record1, record2, dnstype1, dnstype2))
414 def test_update_valid_type(self):
415 for dnstype in self.good_records:
416 for record in self.good_records[dnstype]:
417 result, out, err = self.runsubcmd("dns", "add",
418 os.environ["SERVER"],
419 self.zone, "testrecord",
420 dnstype, record,
421 self.creds_string)
422 self.assertCmdSuccess(result, out, err, "Failed to add " \
423 "record %s with type %s."
424 % (record, dnstype))
426 # Update the record to be the same.
427 result, out, err = self.runsubcmd("dns", "update",
428 os.environ["SERVER"],
429 self.zone, "testrecord",
430 dnstype, record, record,
431 self.creds_string)
432 self.assertCmdFail(result, "Successfully updated record " \
433 "'%s' to be exactly the same." % record)
435 result, out, err = self.runsubcmd("dns", "delete",
436 os.environ["SERVER"],
437 self.zone, "testrecord",
438 dnstype, record,
439 self.creds_string)
440 self.assertCmdSuccess(result, out, err, "Could not delete " \
441 "valid record '%s' of type '%s'."
442 % (record, dnstype))
444 for record in self.good_records["SRV"]:
445 result, out, err = self.runsubcmd("dns", "add",
446 os.environ["SERVER"],
447 self.zone, "testrecord",
448 "SRV", record,
449 self.creds_string)
450 self.assertCmdSuccess(result, out, err, "Failed to add " \
451 "record %s with type 'SRV'." % record)
453 split = record.split(' ')
454 new_bit = str(int(split[3]) + 1)
455 new_record = '%s %s %s %s' % (split[0], split[1], split[2], new_bit)
457 result, out, err = self.runsubcmd("dns", "update",
458 os.environ["SERVER"],
459 self.zone, "testrecord",
460 "SRV", record,
461 new_record, self.creds_string)
462 self.assertCmdSuccess(result, out, err, "Failed to update record " \
463 "'%s' of type '%s' to '%s'."
464 % (record, "SRV", new_record))
466 result, out, err = self.runsubcmd("dns", "query",
467 os.environ["SERVER"],
468 self.zone, "testrecord",
469 "SRV", self.creds_string)
470 self.assertCmdSuccess(result, out, err, "Failed to query for " \
471 "record '%s' of type '%s'."
472 % (new_record, "SRV"))
474 result, out, err = self.runsubcmd("dns", "delete",
475 os.environ["SERVER"],
476 self.zone, "testrecord",
477 "SRV", new_record,
478 self.creds_string)
479 self.assertCmdSuccess(result, out, err, "Could not delete " \
480 "valid record '%s' of type '%s'."
481 % (new_record, "SRV"))
483 # Since 'dns update' takes the current value as a parameter, make sure
484 # we can't enter the wrong current value for a given record.
485 for dnstype in self.good_records:
486 if len(self.good_records[dnstype]) < 3:
487 continue # Not enough records of this type to do this test
489 used_record = self.good_records[dnstype][0]
490 unused_record = self.good_records[dnstype][1]
491 new_record = self.good_records[dnstype][2]
493 result, out, err = self.runsubcmd("dns", "add",
494 os.environ["SERVER"],
495 self.zone, "testrecord",
496 dnstype, used_record,
497 self.creds_string)
498 self.assertCmdSuccess(result, out, err, "Failed to add record %s " \
499 "with type %s." % (used_record, dnstype))
501 result, out, err = self.runsubcmd("dns", "update",
502 os.environ["SERVER"],
503 self.zone, "testrecord",
504 dnstype, unused_record,
505 new_record,
506 self.creds_string)
507 self.assertCmdFail(result, "Successfully updated record '%s' " \
508 "from '%s' to '%s', even though the given " \
509 "source record is incorrect."
510 % (used_record, unused_record, new_record))
512 def test_invalid_types(self):
513 result, out, err = self.runsubcmd("dns", "add",
514 os.environ["SERVER"],
515 self.zone, "testrecord",
516 "SOA", "test",
517 self.creds_string)
518 self.assertCmdFail(result, "Successfully added record of type SOA, " \
519 "when this type should not be available.")
520 self.assertTrue("type SOA is not supported" in err,
521 "Invalid error message '%s' when attempting to " \
522 "add record of type SOA." % err)
524 def test_add_overlapping_different_type(self):
526 Make sure that we can add an entry with the same name as an existing one but a different type.
529 i = 0
530 for dnstype1 in self.good_records:
531 record1 = self.good_records[dnstype1][0]
532 for dnstype2 in self.good_records:
533 # Only do some subset of dns types, otherwise it takes a long time.
534 i += 1
535 if i % 4 != 0:
536 continue
538 if dnstype1 == dnstype2:
539 continue
541 record2 = self.good_records[dnstype2][0]
543 result, out, err = self.runsubcmd("dns", "add",
544 os.environ["SERVER"],
545 self.zone, "testrecord",
546 dnstype1, record1,
547 self.creds_string)
548 self.assertCmdSuccess(result, out, err, "Failed to add record " \
549 "'%s' of type '%s'." % (record1, dnstype1))
551 result, out, err = self.runsubcmd("dns", "add",
552 os.environ["SERVER"],
553 self.zone, "testrecord",
554 dnstype2, record2,
555 self.creds_string)
556 self.assertCmdSuccess(result, out, err, "Failed to add record " \
557 "'%s' of type '%s' when a record '%s' " \
558 "of type '%s' with the same name exists."
559 % (record1, dnstype1, record2, dnstype2))
561 result, out, err = self.runsubcmd("dns", "query",
562 os.environ["SERVER"],
563 self.zone, "testrecord",
564 dnstype1, self.creds_string)
565 self.assertCmdSuccess(result, out, err, "Failed to query for " \
566 "record '%s' of type '%s' when a new " \
567 "record '%s' of type '%s' with the same " \
568 "name was added."
569 % (record1, dnstype1, record2, dnstype2))
571 result, out, err = self.runsubcmd("dns", "query",
572 os.environ["SERVER"],
573 self.zone, "testrecord",
574 dnstype2, self.creds_string)
575 self.assertCmdSuccess(result, out, err, "Failed to query " \
576 "record '%s' of type '%s' which should " \
577 "have been added with the same name as " \
578 "record '%s' of type '%s'."
579 % (record2, dnstype2, record1, dnstype1))
581 result, out, err = self.runsubcmd("dns", "delete",
582 os.environ["SERVER"],
583 self.zone, "testrecord",
584 dnstype1, record1,
585 self.creds_string)
586 self.assertCmdSuccess(result, out, err, "Failed to delete " \
587 "record '%s' of type '%s'."
588 % (record1, dnstype1))
590 result, out, err = self.runsubcmd("dns", "delete",
591 os.environ["SERVER"],
592 self.zone, "testrecord",
593 dnstype2, record2,
594 self.creds_string)
595 self.assertCmdSuccess(result, out, err, "Failed to delete " \
596 "record '%s' of type '%s'."
597 % (record2, dnstype2))
599 def test_query_deleted_record(self):
600 self.runsubcmd("dns", "add", os.environ["SERVER"], self.zone,
601 "testrecord", "A", self.testip, self.creds_string)
602 self.runsubcmd("dns", "delete", os.environ["SERVER"], self.zone,
603 "testrecord", "A", self.testip, self.creds_string)
605 result, out, err = self.runsubcmd("dns", "query",
606 os.environ["SERVER"],
607 self.zone, "testrecord",
608 "A", self.creds_string)
609 self.assertCmdFail(result)
611 def test_add_duplicate_record(self):
612 for record_type in self.good_records:
613 result, out, err = self.runsubcmd("dns", "add",
614 os.environ["SERVER"],
615 self.zone, "testrecord",
616 record_type,
617 self.good_records[record_type][0],
618 self.creds_string)
619 self.assertCmdSuccess(result, out, err)
620 result, out, err = self.runsubcmd("dns", "add",
621 os.environ["SERVER"],
622 self.zone, "testrecord",
623 record_type,
624 self.good_records[record_type][0],
625 self.creds_string)
626 self.assertCmdFail(result)
627 result, out, err = self.runsubcmd("dns", "query",
628 os.environ["SERVER"],
629 self.zone, "testrecord",
630 record_type, self.creds_string)
631 self.assertCmdSuccess(result, out, err)
632 result, out, err = self.runsubcmd("dns", "delete",
633 os.environ["SERVER"],
634 self.zone, "testrecord",
635 record_type,
636 self.good_records[record_type][0],
637 self.creds_string)
638 self.assertCmdSuccess(result, out, err)
640 def test_remove_deleted_record(self):
641 self.runsubcmd("dns", "add", os.environ["SERVER"], self.zone,
642 "testrecord", "A", self.testip, self.creds_string)
643 self.runsubcmd("dns", "delete", os.environ["SERVER"], self.zone,
644 "testrecord", "A", self.testip, self.creds_string)
646 # Attempting to delete a record that has already been deleted or has never existed should fail
647 result, out, err = self.runsubcmd("dns", "delete",
648 os.environ["SERVER"],
649 self.zone, "testrecord",
650 "A", self.testip, self.creds_string)
651 self.assertCmdFail(result)
652 result, out, err = self.runsubcmd("dns", "query",
653 os.environ["SERVER"],
654 self.zone, "testrecord",
655 "A", self.creds_string)
656 self.assertCmdFail(result)
657 result, out, err = self.runsubcmd("dns", "delete",
658 os.environ["SERVER"],
659 self.zone, "testrecord2",
660 "A", self.testip, self.creds_string)
661 self.assertCmdFail(result)
663 def test_cleanup_record(self):
665 Test dns cleanup command is working fine.
668 # add a A record
669 self.runsubcmd("dns", "add", os.environ["SERVER"], self.zone,
670 'testa', "A", self.testip, self.creds_string)
672 # the above A record points to this host
673 dnshostname = '{}.{}'.format('testa', self.zone.lower())
675 # add a CNAME record points to above host
676 self.runsubcmd("dns", "add", os.environ["SERVER"], self.zone,
677 'testcname', "CNAME", dnshostname, self.creds_string)
679 # add a NS record
680 self.runsubcmd("dns", "add", os.environ["SERVER"], self.zone,
681 'testns', "NS", dnshostname, self.creds_string)
683 # add a PTR record points to above host
684 self.runsubcmd("dns", "add", os.environ["SERVER"], self.zone,
685 'testptr', "PTR", dnshostname, self.creds_string)
687 # add a SRV record points to above host
688 srv_record = "{} 65530 65530 65530".format(dnshostname)
689 self.runsubcmd("dns", "add", os.environ["SERVER"], self.zone,
690 'testsrv', "SRV", srv_record, self.creds_string)
692 # cleanup record for this dns host
693 self.runsubcmd("dns", "cleanup", os.environ["SERVER"],
694 dnshostname, self.creds_string)
696 # all records should be marked as dNSTombstoned
697 for record_name in ['testa', 'testcname', 'testns', 'testptr', 'testsrv']:
699 records = self.samdb.search(
700 base="DC=DomainDnsZones,{}".format(self.samdb.get_default_basedn()),
701 scope=ldb.SCOPE_SUBTREE,
702 expression="(&(objectClass=dnsNode)(name={}))".format(record_name),
703 attrs=["dNSTombstoned"])
705 self.assertEqual(len(records), 1)
706 for record in records:
707 self.assertEqual(str(record['dNSTombstoned']), 'TRUE')
709 def test_cleanup_record_no_A_record(self):
711 Test dns cleanup command works with no A record.
714 # add a A record
715 self.runsubcmd("dns", "add", os.environ["SERVER"], self.zone,
716 'notesta', "A", self.testip, self.creds_string)
718 # the above A record points to this host
719 dnshostname = '{}.{}'.format('testa', self.zone.lower())
721 # add a CNAME record points to above host
722 self.runsubcmd("dns", "add", os.environ["SERVER"], self.zone,
723 'notestcname', "CNAME", dnshostname, self.creds_string)
725 # add a NS record
726 self.runsubcmd("dns", "add", os.environ["SERVER"], self.zone,
727 'notestns', "NS", dnshostname, self.creds_string)
729 # add a PTR record points to above host
730 self.runsubcmd("dns", "add", os.environ["SERVER"], self.zone,
731 'notestptr', "PTR", dnshostname, self.creds_string)
733 # add a SRV record points to above host
734 srv_record = "{} 65530 65530 65530".format(dnshostname)
735 self.runsubcmd("dns", "add", os.environ["SERVER"], self.zone,
736 'notestsrv', "SRV", srv_record, self.creds_string)
738 # Remove the initial A record (leading to hanging references)
739 self.runsubcmd("dns", "delete", os.environ["SERVER"], self.zone,
740 'notesta', "A", self.testip, self.creds_string)
742 # cleanup record for this dns host
743 self.runsubcmd("dns", "cleanup", os.environ["SERVER"],
744 dnshostname, self.creds_string)
746 # all records should be marked as dNSTombstoned
747 for record_name in ['notestcname', 'notestns', 'notestptr', 'notestsrv']:
749 records = self.samdb.search(
750 base="DC=DomainDnsZones,{}".format(self.samdb.get_default_basedn()),
751 scope=ldb.SCOPE_SUBTREE,
752 expression="(&(objectClass=dnsNode)(name={}))".format(record_name),
753 attrs=["dNSTombstoned"])
755 self.assertEqual(len(records), 1)
756 for record in records:
757 self.assertEqual(str(record['dNSTombstoned']), 'TRUE')
759 def test_cleanup_multi_srv_record(self):
761 Test dns cleanup command for multi-valued SRV record.
763 Steps:
764 - Add 2 A records host1 and host2
765 - Add a SRV record srv1 and points to both host1 and host2
766 - Run cleanup command for host1
767 - Check records for srv1, data for host1 should be gone and host2 is kept.
770 hosts = ['host1', 'host2'] # A record names
771 srv_name = 'srv1'
773 # add A records
774 for host in hosts:
775 self.runsubcmd("dns", "add", os.environ["SERVER"], self.zone,
776 host, "A", self.testip, self.creds_string)
778 # the above A record points to this host
779 dnshostname = '{}.{}'.format(host, self.zone.lower())
781 # add a SRV record points to above host
782 srv_record = "{} 65530 65530 65530".format(dnshostname)
783 self.runsubcmd("dns", "add", os.environ["SERVER"], self.zone,
784 srv_name, "SRV", srv_record, self.creds_string)
786 records = self.samdb.search(
787 base="DC=DomainDnsZones,{}".format(self.samdb.get_default_basedn()),
788 scope=ldb.SCOPE_SUBTREE,
789 expression="(&(objectClass=dnsNode)(name={}))".format(srv_name),
790 attrs=['dnsRecord'])
791 # should have 2 records here
792 self.assertEqual(len(records[0]['dnsRecord']), 2)
794 # cleanup record for dns host1
795 dnshostname1 = 'host1.{}'.format(self.zone.lower())
796 self.runsubcmd("dns", "cleanup", os.environ["SERVER"],
797 dnshostname1, self.creds_string)
799 records = self.samdb.search(
800 base="DC=DomainDnsZones,{}".format(self.samdb.get_default_basedn()),
801 scope=ldb.SCOPE_SUBTREE,
802 expression="(&(objectClass=dnsNode)(name={}))".format(srv_name),
803 attrs=['dnsRecord'])
805 # dnsRecord for host1 should be deleted
806 self.assertEqual(len(records[0]['dnsRecord']), 1)
808 # unpack data
809 dns_record_bin = records[0]['dnsRecord'][0]
810 dns_record_obj = ndr_unpack(dnsp.DnssrvRpcRecord, dns_record_bin)
812 # dnsRecord for host2 is still there and is the only one
813 dnshostname2 = 'host2.{}'.format(self.zone.lower())
814 self.assertEqual(dns_record_obj.data.nameTarget, dnshostname2)
816 def test_dns_wildcards(self):
818 Ensure that DNS wild card entries can be added deleted and queried
820 num_failures = 0
821 failure_msgs = []
822 records = [("*.", "MISS", "A", "1.1.1.1"),
823 ("*.SAMDOM", "MISS.SAMDOM", "A", "1.1.1.2")]
824 for (name, miss, dnstype, record) in records:
825 try:
826 result, out, err = self.runsubcmd("dns", "add",
827 os.environ["SERVER"],
828 self.zone, name,
829 dnstype, record,
830 self.creds_string)
831 self.assertCmdSuccess(
832 result,
833 out,
834 err,
835 ("Failed to add record %s (%s) with type %s."
836 % (name, record, dnstype)))
838 result, out, err = self.runsubcmd("dns", "query",
839 os.environ["SERVER"],
840 self.zone, name,
841 dnstype,
842 self.creds_string)
843 self.assertCmdSuccess(
844 result,
845 out,
846 err,
847 ("Failed to query record %s with qualifier %s."
848 % (record, dnstype)))
850 # dns tool does not perform dns wildcard search if the name
851 # does not match
852 result, out, err = self.runsubcmd("dns", "query",
853 os.environ["SERVER"],
854 self.zone, miss,
855 dnstype,
856 self.creds_string)
857 self.assertCmdFail(
858 result,
859 ("Failed to query record %s with qualifier %s."
860 % (record, dnstype)))
862 result, out, err = self.runsubcmd("dns", "delete",
863 os.environ["SERVER"],
864 self.zone, name,
865 dnstype, record,
866 self.creds_string)
867 self.assertCmdSuccess(
868 result,
869 out,
870 err,
871 ("Failed to remove record %s with type %s."
872 % (record, dnstype)))
873 except AssertionError as e:
874 num_failures = num_failures + 1
875 failure_msgs.append(e)
877 if num_failures > 0:
878 for msg in failure_msgs:
879 print(msg)
880 self.fail("Failed to accept valid commands. %d total failures."
881 "Errors above." % num_failures)