1 # Unix SMB/CIFS implementation.
2 # Copyright (C) Andrew Bartlett <abartlet@catalyst.net.nz>
4 # This program is free software; you can redistribute it and/or modify
5 # it under the terms of the GNU General Public License as published by
6 # the Free Software Foundation; either version 3 of the License, or
7 # (at your option) any later version.
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
14 # You should have received a copy of the GNU General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
21 from samba
.auth
import system_session
22 from samba
.samdb
import SamDB
23 from samba
.ndr
import ndr_unpack
, ndr_pack
24 from samba
.dcerpc
import dnsp
25 from samba
.tests
.samba_tool
.base
import SambaToolCmdTest
27 class DnsCmdTestCase(SambaToolCmdTest
):
29 super(DnsCmdTestCase
, self
).setUp()
31 self
.dburl
= "ldap://%s" % os
.environ
["SERVER"]
32 self
.creds_string
= "-U%s%%%s" % (os
.environ
["DC_USERNAME"],
33 os
.environ
["DC_PASSWORD"])
35 self
.samdb
= self
.getSamDB("-H", self
.dburl
, self
.creds_string
)
36 self
.config_dn
= str(self
.samdb
.get_config_basedn())
38 self
.testip
= "192.168.0.193"
39 self
.testip2
= "192.168.0.194"
43 # Note: SOA types don't work (and shouldn't), as we only have one zone per DNS record.
45 good_dns
= ["SAMDOM.EXAMPLE.COM",
47 "%sEXAMPLE.COM" % ("1."*100),
58 "SAMDOM..EXAMPLE.COM"]
60 good_mx
= ["SAMDOM.EXAMPLE.COM 65530"]
61 bad_mx
= ["SAMDOM.EXAMPLE.COM -1",
64 "SAMDOM.EXAMPLE.COM 1 1",
65 "SAMDOM.EXAMPLE.COM SAMDOM.EXAMPLE.COM"]
67 good_srv
= ["SAMDOM.EXAMPLE.COM 65530 65530 65530"]
68 bad_srv
= ["SAMDOM.EXAMPLE.COM 0 65536 0",
69 "SAMDOM.EXAMPLE.COM 0 0 65536",
70 "SAMDOM.EXAMPLE.COM 65536 0 0" ]
72 for bad_dn
in bad_dns
:
73 bad_mx
.append("%s 1" % bad_dn
)
74 bad_srv
.append("%s 0 0 0" % bad_dn
)
75 for good_dn
in good_dns
:
76 good_mx
.append("%s 1" % good_dn
)
77 good_srv
.append("%s 0 0 0" % good_dn
)
80 "A":["192.168.0.1", "255.255.255.255"],
81 "AAAA":["1234:5678:9ABC:DEF0:0000:0000:0000:0000",
82 "0000:0000:0000:0000:0000:0000:0000:0000",
83 "1234:5678:9ABC:DEF0:1234:5678:9ABC:DEF0",
85 "1234:5678:9ABC:DEF0::",
87 "1234::5678:9ABC:0000:0000:0000:0000",
96 "TXT":["text", "", "@#!", "\n"]
100 "A":["192.168.0.500",
101 "255.255.255.255/32"],
102 "AAAA":["GGGG:1234:5678:9ABC:0000:0000:0000:0000",
103 "0000:0000:0000:0000:0000:0000:0000:0000/1",
104 "AAAA:AAAA:AAAA:AAAA:G000:0000:0000:1234",
105 "1234:5678:9ABC:DEF0:1234:5678:9ABC:DEF0:1234",
106 "1234:5678:9ABC:DEF0:1234:5678:9ABC",
117 super(DnsCmdTestCase
, self
).tearDown()
125 result
, out
, err
= self
.runsubcmd("dns",
127 os
.environ
["SERVER"],
130 self
.assertCmdSuccess(result
, out
, err
)
132 def deleteZone(self
):
133 result
, out
, err
= self
.runsubcmd("dns",
135 os
.environ
["SERVER"],
138 self
.assertCmdSuccess(result
, out
, err
)
140 def get_record_from_db(self
, zone_name
, record_name
):
141 zones
= self
.samdb
.search(base
="DC=DomainDnsZones,%s"
142 % self
.samdb
.get_default_basedn(),
143 scope
=ldb
.SCOPE_SUBTREE
,
144 expression
="(objectClass=dnsZone)",
148 if zone_name
in str(zone
.dn
):
152 records
= self
.samdb
.search(base
=zone_dn
, scope
=ldb
.SCOPE_SUBTREE
,
153 expression
="(objectClass=dnsNode)",
156 for old_packed_record
in records
:
157 if record_name
in str(old_packed_record
.dn
):
158 return (old_packed_record
.dn
,
159 ndr_unpack(dnsp
.DnssrvRpcRecord
,
160 old_packed_record
["dnsRecord"][0]))
162 def test_rank_none(self
):
163 record_str
= "192.168.50.50"
164 record_type_str
= "A"
166 result
, out
, err
= self
.runsubcmd("dns", "add", os
.environ
["SERVER"],
167 self
.zone
, "testrecord", record_type_str
,
168 record_str
, self
.creds_string
)
169 self
.assertCmdSuccess(result
, out
, err
,
170 "Failed to add record '%s' with type %s."
171 % (record_str
, record_type_str
))
173 dn
, record
= self
.get_record_from_db(self
.zone
, "testrecord")
174 record
.rank
= 0 # DNS_RANK_NONE
175 res
= self
.samdb
.dns_replace_by_dn(dn
, [record
])
177 self
.fail("Unable to update dns record to have DNS_RANK_NONE.")
181 # The record should still exist
182 result
, out
, err
= self
.runsubcmd("dns", "query", os
.environ
["SERVER"],
183 self
.zone
, "testrecord", record_type_str
,
186 self
.assertCmdSuccess(result
, out
, err
,
187 "Failed to query for a record" \
188 "which had DNS_RANK_NONE.")
189 self
.assertTrue("testrecord" in out
and record_str
in out
,
190 "Query for a record which had DNS_RANK_NONE" \
191 "succeeded but produced no resulting records.")
192 except AssertionError, e
:
193 # Windows produces no resulting records
196 # We should not be able to add a duplicate
197 result
, out
, err
= self
.runsubcmd("dns", "add", os
.environ
["SERVER"],
198 self
.zone
, "testrecord", record_type_str
,
199 record_str
, self
.creds_string
)
201 self
.assertCmdFail(result
, "Successfully added duplicate record" \
202 "of one which had DNS_RANK_NONE.")
203 except AssertionError, e
:
206 # We should be able to delete it
207 result
, out
, err
= self
.runsubcmd("dns", "delete", os
.environ
["SERVER"],
208 self
.zone
, "testrecord", record_type_str
,
209 record_str
, self
.creds_string
)
211 self
.assertCmdSuccess(result
, out
, err
, "Failed to delete record" \
212 "which had DNS_RANK_NONE.")
213 except AssertionError, e
:
216 # Now the record should not exist
217 result
, out
, err
= self
.runsubcmd("dns", "query", os
.environ
["SERVER"],
218 self
.zone
, "testrecord",
219 record_type_str
, self
.creds_string
)
221 self
.assertCmdFail(result
, "Successfully queried for deleted record" \
222 "which had DNS_RANK_NONE.")
223 except AssertionError, e
:
227 err_str
= "Failed appropriate behaviour with DNS_RANK_NONE:"
229 err_str
= err_str
+ "\n" + str(error
)
230 raise AssertionError(err_str
)
232 def test_accept_valid_commands(self
):
234 For all good records, attempt to add, query and delete them.
238 for dnstype
in self
.good_records
:
239 for record
in self
.good_records
[dnstype
]:
241 result
, out
, err
= self
.runsubcmd("dns", "add",
242 os
.environ
["SERVER"],
243 self
.zone
, "testrecord",
246 self
.assertCmdSuccess(result
, out
, err
, "Failed to add" \
247 "record %s with type %s."
250 result
, out
, err
= self
.runsubcmd("dns", "query",
251 os
.environ
["SERVER"],
252 self
.zone
, "testrecord",
255 self
.assertCmdSuccess(result
, out
, err
, "Failed to query" \
256 "record %s with qualifier %s."
259 result
, out
, err
= self
.runsubcmd("dns", "delete",
260 os
.environ
["SERVER"],
261 self
.zone
, "testrecord",
264 self
.assertCmdSuccess(result
, out
, err
, "Failed to remove" \
265 "record %s with type %s."
267 except AssertionError as e
:
268 num_failures
= num_failures
+ 1
269 failure_msgs
.append(e
)
272 for msg
in failure_msgs
:
274 self
.fail("Failed to accept valid commands. %d total failures." \
275 "Errors above." % num_failures
)
277 def test_reject_invalid_commands(self
):
279 For all bad records, attempt to add them and update to them,
280 making sure that both operations fail.
285 # Add invalid records and make sure they fail to be added
286 for dnstype
in self
.bad_records
:
287 for record
in self
.bad_records
[dnstype
]:
289 result
, out
, err
= self
.runsubcmd("dns", "add",
290 os
.environ
["SERVER"],
291 self
.zone
, "testrecord",
294 self
.assertCmdFail(result
, "Successfully added invalid" \
295 "record '%s' of type '%s'."
297 except AssertionError as e
:
298 num_failures
= num_failures
+ 1
299 failure_msgs
.append(e
)
302 result
, out
, err
= self
.runsubcmd("dns", "delete",
303 os
.environ
["SERVER"],
304 self
.zone
, "testrecord",
307 self
.assertCmdFail(result
, "Successfully deleted invalid" \
308 "record '%s' of type '%s' which" \
309 "shouldn't exist." % (record
, dnstype
))
310 except AssertionError as e
:
311 num_failures
= num_failures
+ 1
312 failure_msgs
.append(e
)
315 # Update valid records to invalid ones and make sure they
317 for dnstype
in self
.bad_records
:
318 for bad_record
in self
.bad_records
[dnstype
]:
319 good_record
= self
.good_records
[dnstype
][0]
322 result
, out
, err
= self
.runsubcmd("dns", "add",
323 os
.environ
["SERVER"],
324 self
.zone
, "testrecord",
325 dnstype
, good_record
,
327 self
.assertCmdSuccess(result
, out
, err
, "Failed to add " \
328 "record '%s' with type %s."
331 result
, out
, err
= self
.runsubcmd("dns", "update",
332 os
.environ
["SERVER"],
333 self
.zone
, "testrecord",
334 dnstype
, good_record
,
337 self
.assertCmdFail(result
, "Successfully updated valid " \
338 "record '%s' of type '%s' to invalid " \
339 "record '%s' of the same type."
340 % (good_record
, dnstype
, bad_record
))
342 result
, out
, err
= self
.runsubcmd("dns", "delete",
343 os
.environ
["SERVER"],
344 self
.zone
, "testrecord",
345 dnstype
, good_record
,
347 self
.assertCmdSuccess(result
, out
, err
, "Could not delete " \
348 "valid record '%s' of type '%s'."
349 % (good_record
, dnstype
))
350 except AssertionError as e
:
351 num_failures
= num_failures
+ 1
352 failure_msgs
.append(e
)
356 for msg
in failure_msgs
:
358 self
.fail("Failed to reject invalid commands. %d total failures. " \
359 "Errors above." % num_failures
)
361 def test_update_invalid_type(self
):
363 Make sure that a record can't be updated to one of a different type.
365 for dnstype1
in self
.good_records
:
366 record1
= self
.good_records
[dnstype1
][0]
367 result
, out
, err
= self
.runsubcmd("dns", "add",
368 os
.environ
["SERVER"],
369 self
.zone
, "testrecord",
372 self
.assertCmdSuccess(result
, out
, err
, "Failed to add " \
373 "record %s with type %s."
374 % (record1
, dnstype1
))
376 for dnstype2
in self
.good_records
:
377 record2
= self
.good_records
[dnstype2
][0]
379 # Make sure that record2 isn't a valid entry of dnstype1.
380 # For example, any A-type will also be a valid TXT-type.
381 result
, out
, err
= self
.runsubcmd("dns", "add",
382 os
.environ
["SERVER"],
383 self
.zone
, "testrecord",
387 self
.assertCmdFail(result
)
388 except AssertionError:
389 continue # Don't check this one, because record2 _is_ a valid entry of dnstype1.
391 # Check both ways: Give the current type and try to update,
392 # and give the new type and try to update.
393 result
, out
, err
= self
.runsubcmd("dns", "update",
394 os
.environ
["SERVER"],
395 self
.zone
, "testrecord",
397 record2
, self
.creds_string
)
398 self
.assertCmdFail(result
, "Successfully updated record '%s' " \
399 "to '%s', even though the latter is of " \
400 "type '%s' where '%s' was expected."
401 % (record1
, record2
, dnstype2
, dnstype1
))
403 result
, out
, err
= self
.runsubcmd("dns", "update",
404 os
.environ
["SERVER"],
405 self
.zone
, "testrecord",
406 dnstype2
, record1
, record2
,
408 self
.assertCmdFail(result
, "Successfully updated record " \
409 "'%s' to '%s', even though the former " \
410 "is of type '%s' where '%s' was expected."
411 % (record1
, record2
, dnstype1
, dnstype2
))
414 def test_update_valid_type(self
):
415 for dnstype
in self
.good_records
:
416 for record
in self
.good_records
[dnstype
]:
417 result
, out
, err
= self
.runsubcmd("dns", "add",
418 os
.environ
["SERVER"],
419 self
.zone
, "testrecord",
422 self
.assertCmdSuccess(result
, out
, err
, "Failed to add " \
423 "record %s with type %s."
426 # Update the record to be the same.
427 result
, out
, err
= self
.runsubcmd("dns", "update",
428 os
.environ
["SERVER"],
429 self
.zone
, "testrecord",
430 dnstype
, record
, record
,
432 self
.assertCmdFail(result
, "Successfully updated record " \
433 "'%s' to be exactly the same." % record
)
435 result
, out
, err
= self
.runsubcmd("dns", "delete",
436 os
.environ
["SERVER"],
437 self
.zone
, "testrecord",
440 self
.assertCmdSuccess(result
, out
, err
, "Could not delete " \
441 "valid record '%s' of type '%s'."
444 for record
in self
.good_records
["SRV"]:
445 result
, out
, err
= self
.runsubcmd("dns", "add",
446 os
.environ
["SERVER"],
447 self
.zone
, "testrecord",
450 self
.assertCmdSuccess(result
, out
, err
, "Failed to add " \
451 "record %s with type 'SRV'." % record
)
453 split
= record
.split(' ')
454 new_bit
= str(int(split
[3]) + 1)
455 new_record
= '%s %s %s %s' % (split
[0], split
[1], split
[2], new_bit
)
457 result
, out
, err
= self
.runsubcmd("dns", "update",
458 os
.environ
["SERVER"],
459 self
.zone
, "testrecord",
461 new_record
, self
.creds_string
)
462 self
.assertCmdSuccess(result
, out
, err
, "Failed to update record " \
463 "'%s' of type '%s' to '%s'."
464 % (record
, "SRV", new_record
))
466 result
, out
, err
= self
.runsubcmd("dns", "query",
467 os
.environ
["SERVER"],
468 self
.zone
, "testrecord",
469 "SRV", self
.creds_string
)
470 self
.assertCmdSuccess(result
, out
, err
, "Failed to query for " \
471 "record '%s' of type '%s'."
472 % (new_record
, "SRV"))
474 result
, out
, err
= self
.runsubcmd("dns", "delete",
475 os
.environ
["SERVER"],
476 self
.zone
, "testrecord",
479 self
.assertCmdSuccess(result
, out
, err
, "Could not delete " \
480 "valid record '%s' of type '%s'."
481 % (new_record
, "SRV"))
483 # Since 'dns update' takes the current value as a parameter, make sure
484 # we can't enter the wrong current value for a given record.
485 for dnstype
in self
.good_records
:
486 if len(self
.good_records
[dnstype
]) < 3:
487 continue # Not enough records of this type to do this test
489 used_record
= self
.good_records
[dnstype
][0]
490 unused_record
= self
.good_records
[dnstype
][1]
491 new_record
= self
.good_records
[dnstype
][2]
493 result
, out
, err
= self
.runsubcmd("dns", "add",
494 os
.environ
["SERVER"],
495 self
.zone
, "testrecord",
496 dnstype
, used_record
,
498 self
.assertCmdSuccess(result
, out
, err
, "Failed to add record %s " \
499 "with type %s." % (used_record
, dnstype
))
501 result
, out
, err
= self
.runsubcmd("dns", "update",
502 os
.environ
["SERVER"],
503 self
.zone
, "testrecord",
504 dnstype
, unused_record
,
507 self
.assertCmdFail(result
, "Successfully updated record '%s' " \
508 "from '%s' to '%s', even though the given " \
509 "source record is incorrect."
510 % (used_record
, unused_record
, new_record
))
512 def test_invalid_types(self
):
513 result
, out
, err
= self
.runsubcmd("dns", "add",
514 os
.environ
["SERVER"],
515 self
.zone
, "testrecord",
518 self
.assertCmdFail(result
, "Successfully added record of type SOA, " \
519 "when this type should not be available.")
520 self
.assertTrue("type SOA is not supported" in err
,
521 "Invalid error message '%s' when attempting to " \
522 "add record of type SOA." % err
)
524 def test_add_overlapping_different_type(self
):
526 Make sure that we can add an entry with the same name as an existing one but a different type.
530 for dnstype1
in self
.good_records
:
531 record1
= self
.good_records
[dnstype1
][0]
532 for dnstype2
in self
.good_records
:
533 # Only do some subset of dns types, otherwise it takes a long time.
538 if dnstype1
== dnstype2
:
541 record2
= self
.good_records
[dnstype2
][0]
543 result
, out
, err
= self
.runsubcmd("dns", "add",
544 os
.environ
["SERVER"],
545 self
.zone
, "testrecord",
548 self
.assertCmdSuccess(result
, out
, err
, "Failed to add record " \
549 "'%s' of type '%s'." % (record1
, dnstype1
))
551 result
, out
, err
= self
.runsubcmd("dns", "add",
552 os
.environ
["SERVER"],
553 self
.zone
, "testrecord",
556 self
.assertCmdSuccess(result
, out
, err
, "Failed to add record " \
557 "'%s' of type '%s' when a record '%s' " \
558 "of type '%s' with the same name exists."
559 % (record1
, dnstype1
, record2
, dnstype2
))
561 result
, out
, err
= self
.runsubcmd("dns", "query",
562 os
.environ
["SERVER"],
563 self
.zone
, "testrecord",
564 dnstype1
, self
.creds_string
)
565 self
.assertCmdSuccess(result
, out
, err
, "Failed to query for " \
566 "record '%s' of type '%s' when a new " \
567 "record '%s' of type '%s' with the same " \
569 % (record1
, dnstype1
, record2
, dnstype2
))
571 result
, out
, err
= self
.runsubcmd("dns", "query",
572 os
.environ
["SERVER"],
573 self
.zone
, "testrecord",
574 dnstype2
, self
.creds_string
)
575 self
.assertCmdSuccess(result
, out
, err
, "Failed to query " \
576 "record '%s' of type '%s' which should " \
577 "have been added with the same name as " \
578 "record '%s' of type '%s'."
579 % (record2
, dnstype2
, record1
, dnstype1
))
581 result
, out
, err
= self
.runsubcmd("dns", "delete",
582 os
.environ
["SERVER"],
583 self
.zone
, "testrecord",
586 self
.assertCmdSuccess(result
, out
, err
, "Failed to delete " \
587 "record '%s' of type '%s'."
588 % (record1
, dnstype1
))
590 result
, out
, err
= self
.runsubcmd("dns", "delete",
591 os
.environ
["SERVER"],
592 self
.zone
, "testrecord",
595 self
.assertCmdSuccess(result
, out
, err
, "Failed to delete " \
596 "record '%s' of type '%s'."
597 % (record2
, dnstype2
))
599 def test_query_deleted_record(self
):
600 self
.runsubcmd("dns", "add", os
.environ
["SERVER"], self
.zone
,
601 "testrecord", "A", self
.testip
, self
.creds_string
)
602 self
.runsubcmd("dns", "delete", os
.environ
["SERVER"], self
.zone
,
603 "testrecord", "A", self
.testip
, self
.creds_string
)
605 result
, out
, err
= self
.runsubcmd("dns", "query",
606 os
.environ
["SERVER"],
607 self
.zone
, "testrecord",
608 "A", self
.creds_string
)
609 self
.assertCmdFail(result
)
611 def test_add_duplicate_record(self
):
612 for record_type
in self
.good_records
:
613 result
, out
, err
= self
.runsubcmd("dns", "add",
614 os
.environ
["SERVER"],
615 self
.zone
, "testrecord",
617 self
.good_records
[record_type
][0],
619 self
.assertCmdSuccess(result
, out
, err
)
620 result
, out
, err
= self
.runsubcmd("dns", "add",
621 os
.environ
["SERVER"],
622 self
.zone
, "testrecord",
624 self
.good_records
[record_type
][0],
626 self
.assertCmdFail(result
)
627 result
, out
, err
= self
.runsubcmd("dns", "query",
628 os
.environ
["SERVER"],
629 self
.zone
, "testrecord",
630 record_type
, self
.creds_string
)
631 self
.assertCmdSuccess(result
, out
, err
)
632 result
, out
, err
= self
.runsubcmd("dns", "delete",
633 os
.environ
["SERVER"],
634 self
.zone
, "testrecord",
636 self
.good_records
[record_type
][0],
638 self
.assertCmdSuccess(result
, out
, err
)
640 def test_remove_deleted_record(self
):
641 self
.runsubcmd("dns", "add", os
.environ
["SERVER"], self
.zone
,
642 "testrecord", "A", self
.testip
, self
.creds_string
)
643 self
.runsubcmd("dns", "delete", os
.environ
["SERVER"], self
.zone
,
644 "testrecord", "A", self
.testip
, self
.creds_string
)
646 # Attempting to delete a record that has already been deleted or has never existed should fail
647 result
, out
, err
= self
.runsubcmd("dns", "delete",
648 os
.environ
["SERVER"],
649 self
.zone
, "testrecord",
650 "A", self
.testip
, self
.creds_string
)
651 self
.assertCmdFail(result
)
652 result
, out
, err
= self
.runsubcmd("dns", "query",
653 os
.environ
["SERVER"],
654 self
.zone
, "testrecord",
655 "A", self
.creds_string
)
656 self
.assertCmdFail(result
)
657 result
, out
, err
= self
.runsubcmd("dns", "delete",
658 os
.environ
["SERVER"],
659 self
.zone
, "testrecord2",
660 "A", self
.testip
, self
.creds_string
)
661 self
.assertCmdFail(result
)
663 def test_cleanup_record(self
):
665 Test dns cleanup command is working fine.
669 self
.runsubcmd("dns", "add", os
.environ
["SERVER"], self
.zone
,
670 'testa', "A", self
.testip
, self
.creds_string
)
672 # the above A record points to this host
673 dnshostname
= '{}.{}'.format('testa', self
.zone
.lower())
675 # add a CNAME record points to above host
676 self
.runsubcmd("dns", "add", os
.environ
["SERVER"], self
.zone
,
677 'testcname', "CNAME", dnshostname
, self
.creds_string
)
680 self
.runsubcmd("dns", "add", os
.environ
["SERVER"], self
.zone
,
681 'testns', "NS", dnshostname
, self
.creds_string
)
683 # add a PTR record points to above host
684 self
.runsubcmd("dns", "add", os
.environ
["SERVER"], self
.zone
,
685 'testptr', "PTR", dnshostname
, self
.creds_string
)
687 # add a SRV record points to above host
688 srv_record
= "{} 65530 65530 65530".format(dnshostname
)
689 self
.runsubcmd("dns", "add", os
.environ
["SERVER"], self
.zone
,
690 'testsrv', "SRV", srv_record
, self
.creds_string
)
692 # cleanup record for this dns host
693 self
.runsubcmd("dns", "cleanup", os
.environ
["SERVER"],
694 dnshostname
, self
.creds_string
)
696 # all records should be marked as dNSTombstoned
697 for record_name
in ['testa', 'testcname', 'testns', 'testptr', 'testsrv']:
699 records
= self
.samdb
.search(
700 base
="DC=DomainDnsZones,{}".format(self
.samdb
.get_default_basedn()),
701 scope
=ldb
.SCOPE_SUBTREE
,
702 expression
="(&(objectClass=dnsNode)(name={}))".format(record_name
),
703 attrs
=["dNSTombstoned"])
705 self
.assertEqual(len(records
), 1)
706 for record
in records
:
707 self
.assertEqual(str(record
['dNSTombstoned']), 'TRUE')
709 def test_cleanup_record_no_A_record(self
):
711 Test dns cleanup command works with no A record.
715 self
.runsubcmd("dns", "add", os
.environ
["SERVER"], self
.zone
,
716 'notesta', "A", self
.testip
, self
.creds_string
)
718 # the above A record points to this host
719 dnshostname
= '{}.{}'.format('testa', self
.zone
.lower())
721 # add a CNAME record points to above host
722 self
.runsubcmd("dns", "add", os
.environ
["SERVER"], self
.zone
,
723 'notestcname', "CNAME", dnshostname
, self
.creds_string
)
726 self
.runsubcmd("dns", "add", os
.environ
["SERVER"], self
.zone
,
727 'notestns', "NS", dnshostname
, self
.creds_string
)
729 # add a PTR record points to above host
730 self
.runsubcmd("dns", "add", os
.environ
["SERVER"], self
.zone
,
731 'notestptr', "PTR", dnshostname
, self
.creds_string
)
733 # add a SRV record points to above host
734 srv_record
= "{} 65530 65530 65530".format(dnshostname
)
735 self
.runsubcmd("dns", "add", os
.environ
["SERVER"], self
.zone
,
736 'notestsrv', "SRV", srv_record
, self
.creds_string
)
738 # Remove the initial A record (leading to hanging references)
739 self
.runsubcmd("dns", "delete", os
.environ
["SERVER"], self
.zone
,
740 'notesta', "A", self
.testip
, self
.creds_string
)
742 # cleanup record for this dns host
743 self
.runsubcmd("dns", "cleanup", os
.environ
["SERVER"],
744 dnshostname
, self
.creds_string
)
746 # all records should be marked as dNSTombstoned
747 for record_name
in ['notestcname', 'notestns', 'notestptr', 'notestsrv']:
749 records
= self
.samdb
.search(
750 base
="DC=DomainDnsZones,{}".format(self
.samdb
.get_default_basedn()),
751 scope
=ldb
.SCOPE_SUBTREE
,
752 expression
="(&(objectClass=dnsNode)(name={}))".format(record_name
),
753 attrs
=["dNSTombstoned"])
755 self
.assertEqual(len(records
), 1)
756 for record
in records
:
757 self
.assertEqual(str(record
['dNSTombstoned']), 'TRUE')
759 def test_cleanup_multi_srv_record(self
):
761 Test dns cleanup command for multi-valued SRV record.
764 - Add 2 A records host1 and host2
765 - Add a SRV record srv1 and points to both host1 and host2
766 - Run cleanup command for host1
767 - Check records for srv1, data for host1 should be gone and host2 is kept.
770 hosts
= ['host1', 'host2'] # A record names
775 self
.runsubcmd("dns", "add", os
.environ
["SERVER"], self
.zone
,
776 host
, "A", self
.testip
, self
.creds_string
)
778 # the above A record points to this host
779 dnshostname
= '{}.{}'.format(host
, self
.zone
.lower())
781 # add a SRV record points to above host
782 srv_record
= "{} 65530 65530 65530".format(dnshostname
)
783 self
.runsubcmd("dns", "add", os
.environ
["SERVER"], self
.zone
,
784 srv_name
, "SRV", srv_record
, self
.creds_string
)
786 records
= self
.samdb
.search(
787 base
="DC=DomainDnsZones,{}".format(self
.samdb
.get_default_basedn()),
788 scope
=ldb
.SCOPE_SUBTREE
,
789 expression
="(&(objectClass=dnsNode)(name={}))".format(srv_name
),
791 # should have 2 records here
792 self
.assertEqual(len(records
[0]['dnsRecord']), 2)
794 # cleanup record for dns host1
795 dnshostname1
= 'host1.{}'.format(self
.zone
.lower())
796 self
.runsubcmd("dns", "cleanup", os
.environ
["SERVER"],
797 dnshostname1
, self
.creds_string
)
799 records
= self
.samdb
.search(
800 base
="DC=DomainDnsZones,{}".format(self
.samdb
.get_default_basedn()),
801 scope
=ldb
.SCOPE_SUBTREE
,
802 expression
="(&(objectClass=dnsNode)(name={}))".format(srv_name
),
805 # dnsRecord for host1 should be deleted
806 self
.assertEqual(len(records
[0]['dnsRecord']), 1)
809 dns_record_bin
= records
[0]['dnsRecord'][0]
810 dns_record_obj
= ndr_unpack(dnsp
.DnssrvRpcRecord
, dns_record_bin
)
812 # dnsRecord for host2 is still there and is the only one
813 dnshostname2
= 'host2.{}'.format(self
.zone
.lower())
814 self
.assertEqual(dns_record_obj
.data
.nameTarget
, dnshostname2
)
816 def test_dns_wildcards(self
):
818 Ensure that DNS wild card entries can be added deleted and queried
822 records
= [("*.", "MISS", "A", "1.1.1.1"),
823 ("*.SAMDOM", "MISS.SAMDOM", "A", "1.1.1.2")]
824 for (name
, miss
, dnstype
, record
) in records
:
826 result
, out
, err
= self
.runsubcmd("dns", "add",
827 os
.environ
["SERVER"],
831 self
.assertCmdSuccess(
835 ("Failed to add record %s (%s) with type %s."
836 % (name
, record
, dnstype
)))
838 result
, out
, err
= self
.runsubcmd("dns", "query",
839 os
.environ
["SERVER"],
843 self
.assertCmdSuccess(
847 ("Failed to query record %s with qualifier %s."
848 % (record
, dnstype
)))
850 # dns tool does not perform dns wildcard search if the name
852 result
, out
, err
= self
.runsubcmd("dns", "query",
853 os
.environ
["SERVER"],
859 ("Failed to query record %s with qualifier %s."
860 % (record
, dnstype
)))
862 result
, out
, err
= self
.runsubcmd("dns", "delete",
863 os
.environ
["SERVER"],
867 self
.assertCmdSuccess(
871 ("Failed to remove record %s with type %s."
872 % (record
, dnstype
)))
873 except AssertionError as e
:
874 num_failures
= num_failures
+ 1
875 failure_msgs
.append(e
)
878 for msg
in failure_msgs
:
880 self
.fail("Failed to accept valid commands. %d total failures."
881 "Errors above." % num_failures
)