1 # Unix SMB/CIFS implementation.
2 # Copyright (C) Kai Blin <kai@samba.org> 2011
4 # This program is free software; you can redistribute it and/or modify
5 # it under the terms of the GNU General Public License as published by
6 # the Free Software Foundation; either version 3 of the License, or
7 # (at your option) any later version.
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
14 # You should have received a copy of the GNU General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
22 import samba
.ndr
as ndr
23 import samba
.dcerpc
.dns
as dns
24 from samba
import credentials
, param
25 from samba
.tests
import TestCase
26 from samba
.dcerpc
import dnsp
, dnsserver
28 FILTER
=''.join([(len(repr(chr(x
)))==3) and chr(x
) or '.' for x
in range(256)])
31 class DNSTest(TestCase
):
33 def errstr(self
, errcode
):
34 "Return a readable error code"
49 return string_codes
[errcode
]
52 def assert_dns_rcode_equals(self
, packet
, rcode
):
53 "Helper function to check return code"
54 p_errcode
= packet
.operation
& 0x000F
55 self
.assertEquals(p_errcode
, rcode
, "Expected RCODE %s, got %s" %
56 (self
.errstr(rcode
), self
.errstr(p_errcode
)))
58 def assert_dns_opcode_equals(self
, packet
, opcode
):
59 "Helper function to check opcode"
60 p_opcode
= packet
.operation
& 0x7800
61 self
.assertEquals(p_opcode
, opcode
, "Expected OPCODE %s, got %s" %
64 def make_name_packet(self
, opcode
, qid
=None):
65 "Helper creating a dns.name_packet"
68 p
.id = random
.randint(0x0, 0xffff)
73 def finish_name_packet(self
, packet
, questions
):
74 "Helper to finalize a dns.name_packet"
75 packet
.qdcount
= len(questions
)
76 packet
.questions
= questions
78 def make_name_question(self
, name
, qtype
, qclass
):
79 "Helper creating a dns.name_question"
80 q
= dns
.name_question()
82 q
.question_type
= qtype
83 q
.question_class
= qclass
86 def get_dns_domain(self
):
87 "Helper to get dns domain"
88 return os
.getenv('REALM', 'example.com').lower()
90 def dns_transaction_udp(self
, packet
, host
=os
.getenv('SERVER_IP'), dump
=False):
91 "send a DNS query and read the reply"
94 send_packet
= ndr
.ndr_pack(packet
)
96 print self
.hexdump(send_packet
)
97 s
= socket
.socket(socket
.AF_INET
, socket
.SOCK_DGRAM
, 0)
99 s
.send(send_packet
, 0)
100 recv_packet
= s
.recv(2048, 0)
102 print self
.hexdump(recv_packet
)
103 return ndr
.ndr_unpack(dns
.name_packet
, recv_packet
)
108 def dns_transaction_tcp(self
, packet
, host
=os
.getenv('SERVER_IP'), dump
=False):
109 "send a DNS query and read the reply"
112 send_packet
= ndr
.ndr_pack(packet
)
114 print self
.hexdump(send_packet
)
115 s
= socket
.socket(socket
.AF_INET
, socket
.SOCK_STREAM
, 0)
116 s
.connect((host
, 53))
117 tcp_packet
= struct
.pack('!H', len(send_packet
))
118 tcp_packet
+= send_packet
119 s
.send(tcp_packet
, 0)
120 recv_packet
= s
.recv(0xffff + 2, 0)
122 print self
.hexdump(recv_packet
)
123 return ndr
.ndr_unpack(dns
.name_packet
, recv_packet
[2:])
128 def hexdump(self
, src
, length
=8):
132 s
, src
= src
[:length
], src
[length
:]
133 hexa
= ' '.join(["%02X" % ord(x
) for x
in s
])
134 s
= s
.translate(FILTER
)
135 result
+= "%04X %-*s %s\n" % (N
, length
*3, hexa
, s
)
140 class TestSimpleQueries(DNSTest
):
142 def test_one_a_query(self
):
143 "create a query packet containing one query record"
144 p
= self
.make_name_packet(dns
.DNS_OPCODE_QUERY
)
147 name
= "%s.%s" % (os
.getenv('SERVER'), self
.get_dns_domain())
148 q
= self
.make_name_question(name
, dns
.DNS_QTYPE_A
, dns
.DNS_QCLASS_IN
)
149 print "asking for ", q
.name
152 self
.finish_name_packet(p
, questions
)
153 response
= self
.dns_transaction_udp(p
)
154 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_OK
)
155 self
.assert_dns_opcode_equals(response
, dns
.DNS_OPCODE_QUERY
)
156 self
.assertEquals(response
.ancount
, 1)
157 self
.assertEquals(response
.answers
[0].rdata
,
158 os
.getenv('SERVER_IP'))
160 def test_one_a_query_tcp(self
):
161 "create a query packet containing one query record via TCP"
162 p
= self
.make_name_packet(dns
.DNS_OPCODE_QUERY
)
165 name
= "%s.%s" % (os
.getenv('SERVER'), self
.get_dns_domain())
166 q
= self
.make_name_question(name
, dns
.DNS_QTYPE_A
, dns
.DNS_QCLASS_IN
)
167 print "asking for ", q
.name
170 self
.finish_name_packet(p
, questions
)
171 response
= self
.dns_transaction_tcp(p
)
172 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_OK
)
173 self
.assert_dns_opcode_equals(response
, dns
.DNS_OPCODE_QUERY
)
174 self
.assertEquals(response
.ancount
, 1)
175 self
.assertEquals(response
.answers
[0].rdata
,
176 os
.getenv('SERVER_IP'))
178 def test_one_mx_query(self
):
179 "create a query packet causing an empty RCODE_OK answer"
180 p
= self
.make_name_packet(dns
.DNS_OPCODE_QUERY
)
183 name
= "%s.%s" % (os
.getenv('SERVER'), self
.get_dns_domain())
184 q
= self
.make_name_question(name
, dns
.DNS_QTYPE_MX
, dns
.DNS_QCLASS_IN
)
185 print "asking for ", q
.name
188 self
.finish_name_packet(p
, questions
)
189 response
= self
.dns_transaction_udp(p
)
190 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_OK
)
191 self
.assert_dns_opcode_equals(response
, dns
.DNS_OPCODE_QUERY
)
192 self
.assertEquals(response
.ancount
, 0)
194 p
= self
.make_name_packet(dns
.DNS_OPCODE_QUERY
)
197 name
= "invalid-%s.%s" % (os
.getenv('SERVER'), self
.get_dns_domain())
198 q
= self
.make_name_question(name
, dns
.DNS_QTYPE_MX
, dns
.DNS_QCLASS_IN
)
199 print "asking for ", q
.name
202 self
.finish_name_packet(p
, questions
)
203 response
= self
.dns_transaction_udp(p
)
204 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_NXDOMAIN
)
205 self
.assert_dns_opcode_equals(response
, dns
.DNS_OPCODE_QUERY
)
206 self
.assertEquals(response
.ancount
, 0)
208 def test_two_queries(self
):
209 "create a query packet containing two query records"
210 p
= self
.make_name_packet(dns
.DNS_OPCODE_QUERY
)
213 name
= "%s.%s" % (os
.getenv('SERVER'), self
.get_dns_domain())
214 q
= self
.make_name_question(name
, dns
.DNS_QTYPE_A
, dns
.DNS_QCLASS_IN
)
217 name
= "%s.%s" % ('bogusname', self
.get_dns_domain())
218 q
= self
.make_name_question(name
, dns
.DNS_QTYPE_A
, dns
.DNS_QCLASS_IN
)
221 self
.finish_name_packet(p
, questions
)
222 response
= self
.dns_transaction_udp(p
)
223 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_FORMERR
)
225 def test_qtype_all_query(self
):
226 "create a QTYPE_ALL query"
227 p
= self
.make_name_packet(dns
.DNS_OPCODE_QUERY
)
230 name
= "%s.%s" % (os
.getenv('SERVER'), self
.get_dns_domain())
231 q
= self
.make_name_question(name
, dns
.DNS_QTYPE_ALL
, dns
.DNS_QCLASS_IN
)
232 print "asking for ", q
.name
235 self
.finish_name_packet(p
, questions
)
236 response
= self
.dns_transaction_udp(p
)
239 dc_ipv6
= os
.getenv('SERVER_IPV6')
240 if dc_ipv6
is not None:
243 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_OK
)
244 self
.assert_dns_opcode_equals(response
, dns
.DNS_OPCODE_QUERY
)
245 self
.assertEquals(response
.ancount
, num_answers
)
246 self
.assertEquals(response
.answers
[0].rdata
,
247 os
.getenv('SERVER_IP'))
248 if dc_ipv6
is not None:
249 self
.assertEquals(response
.answers
[1].rdata
, dc_ipv6
)
251 def test_qclass_none_query(self
):
252 "create a QCLASS_NONE query"
253 p
= self
.make_name_packet(dns
.DNS_OPCODE_QUERY
)
256 name
= "%s.%s" % (os
.getenv('SERVER'), self
.get_dns_domain())
257 q
= self
.make_name_question(name
, dns
.DNS_QTYPE_ALL
, dns
.DNS_QCLASS_NONE
)
260 self
.finish_name_packet(p
, questions
)
261 response
= self
.dns_transaction_udp(p
)
262 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_NOTIMP
)
264 # Only returns an authority section entry in BIND and Win DNS
265 # FIXME: Enable one Samba implements this feature
266 # def test_soa_hostname_query(self):
267 # "create a SOA query for a hostname"
268 # p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
271 # name = "%s.%s" % (os.getenv('SERVER'), self.get_dns_domain())
272 # q = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
273 # questions.append(q)
275 # self.finish_name_packet(p, questions)
276 # response = self.dns_transaction_udp(p)
277 # self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
278 # self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
279 # # We don't get SOA records for single hosts
280 # self.assertEquals(response.ancount, 0)
282 def test_soa_domain_query(self
):
283 "create a SOA query for a domain"
284 p
= self
.make_name_packet(dns
.DNS_OPCODE_QUERY
)
287 name
= self
.get_dns_domain()
288 q
= self
.make_name_question(name
, dns
.DNS_QTYPE_SOA
, dns
.DNS_QCLASS_IN
)
291 self
.finish_name_packet(p
, questions
)
292 response
= self
.dns_transaction_udp(p
)
293 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_OK
)
294 self
.assert_dns_opcode_equals(response
, dns
.DNS_OPCODE_QUERY
)
295 self
.assertEquals(response
.ancount
, 1)
296 self
.assertEquals(response
.answers
[0].rdata
.minimum
, 3600)
299 class TestDNSUpdates(DNSTest
):
301 def test_two_updates(self
):
302 "create two update requests"
303 p
= self
.make_name_packet(dns
.DNS_OPCODE_UPDATE
)
306 name
= "%s.%s" % (os
.getenv('SERVER'), self
.get_dns_domain())
307 u
= self
.make_name_question(name
, dns
.DNS_QTYPE_A
, dns
.DNS_QCLASS_IN
)
310 name
= self
.get_dns_domain()
311 u
= self
.make_name_question(name
, dns
.DNS_QTYPE_A
, dns
.DNS_QCLASS_IN
)
314 self
.finish_name_packet(p
, updates
)
315 response
= self
.dns_transaction_udp(p
)
316 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_FORMERR
)
318 def test_update_wrong_qclass(self
):
319 "create update with DNS_QCLASS_NONE"
320 p
= self
.make_name_packet(dns
.DNS_OPCODE_UPDATE
)
323 name
= self
.get_dns_domain()
324 u
= self
.make_name_question(name
, dns
.DNS_QTYPE_A
, dns
.DNS_QCLASS_NONE
)
327 self
.finish_name_packet(p
, updates
)
328 response
= self
.dns_transaction_udp(p
)
329 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_NOTIMP
)
331 def test_update_prereq_with_non_null_ttl(self
):
332 "test update with a non-null TTL"
333 p
= self
.make_name_packet(dns
.DNS_OPCODE_UPDATE
)
336 name
= self
.get_dns_domain()
338 u
= self
.make_name_question(name
, dns
.DNS_QTYPE_SOA
, dns
.DNS_QCLASS_IN
)
340 self
.finish_name_packet(p
, updates
)
344 r
.name
= "%s.%s" % (os
.getenv('SERVER'), self
.get_dns_domain())
345 r
.rr_type
= dns
.DNS_QTYPE_TXT
346 r
.rr_class
= dns
.DNS_QCLASS_NONE
351 p
.ancount
= len(prereqs
)
354 response
= self
.dns_transaction_udp(p
)
355 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_FORMERR
)
357 # I'd love to test this one, but it segfaults. :)
358 # def test_update_prereq_with_non_null_length(self):
359 # "test update with a non-null length"
360 # p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
363 # name = self.get_dns_domain()
365 # u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
367 # self.finish_name_packet(p, updates)
371 # r.name = "%s.%s" % (os.getenv('SERVER'), self.get_dns_domain())
372 # r.rr_type = dns.DNS_QTYPE_TXT
373 # r.rr_class = dns.DNS_QCLASS_ANY
378 # p.ancount = len(prereqs)
379 # p.answers = prereqs
381 # response = self.dns_transaction_udp(p)
382 # self.assert_dns_rcode_equals(response, dns.DNS_RCODE_FORMERR)
384 def test_update_prereq_nonexisting_name(self
):
385 "test update with a nonexisting name"
386 p
= self
.make_name_packet(dns
.DNS_OPCODE_UPDATE
)
389 name
= self
.get_dns_domain()
391 u
= self
.make_name_question(name
, dns
.DNS_QTYPE_SOA
, dns
.DNS_QCLASS_IN
)
393 self
.finish_name_packet(p
, updates
)
397 r
.name
= "idontexist.%s" % self
.get_dns_domain()
398 r
.rr_type
= dns
.DNS_QTYPE_TXT
399 r
.rr_class
= dns
.DNS_QCLASS_ANY
404 p
.ancount
= len(prereqs
)
407 response
= self
.dns_transaction_udp(p
)
408 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_NXRRSET
)
410 def test_update_add_txt_record(self
):
411 "test adding records works"
412 p
= self
.make_name_packet(dns
.DNS_OPCODE_UPDATE
)
415 name
= self
.get_dns_domain()
417 u
= self
.make_name_question(name
, dns
.DNS_QTYPE_SOA
, dns
.DNS_QCLASS_IN
)
419 self
.finish_name_packet(p
, updates
)
423 r
.name
= "textrec.%s" % self
.get_dns_domain()
424 r
.rr_type
= dns
.DNS_QTYPE_TXT
425 r
.rr_class
= dns
.DNS_QCLASS_IN
428 rdata
= dns
.txt_record()
429 rdata
.txt
= '"This is a test"'
432 p
.nscount
= len(updates
)
435 response
= self
.dns_transaction_udp(p
)
436 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_OK
)
438 p
= self
.make_name_packet(dns
.DNS_OPCODE_QUERY
)
441 name
= "textrec.%s" % self
.get_dns_domain()
442 q
= self
.make_name_question(name
, dns
.DNS_QTYPE_TXT
, dns
.DNS_QCLASS_IN
)
445 self
.finish_name_packet(p
, questions
)
446 response
= self
.dns_transaction_udp(p
)
447 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_OK
)
448 self
.assertEquals(response
.ancount
, 1)
449 self
.assertEquals(response
.answers
[0].rdata
.txt
, '"This is a test"')
451 def test_update_add_two_txt_records(self
):
452 "test adding two txt records works"
453 p
= self
.make_name_packet(dns
.DNS_OPCODE_UPDATE
)
456 name
= self
.get_dns_domain()
458 u
= self
.make_name_question(name
, dns
.DNS_QTYPE_SOA
, dns
.DNS_QCLASS_IN
)
460 self
.finish_name_packet(p
, updates
)
464 r
.name
= "textrec2.%s" % self
.get_dns_domain()
465 r
.rr_type
= dns
.DNS_QTYPE_TXT
466 r
.rr_class
= dns
.DNS_QCLASS_IN
469 rdata
= dns
.txt_record()
470 rdata
.txt
= '"This is a test" "and this is a test, too"'
473 p
.nscount
= len(updates
)
476 response
= self
.dns_transaction_udp(p
)
477 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_OK
)
479 p
= self
.make_name_packet(dns
.DNS_OPCODE_QUERY
)
482 name
= "textrec2.%s" % self
.get_dns_domain()
483 q
= self
.make_name_question(name
, dns
.DNS_QTYPE_TXT
, dns
.DNS_QCLASS_IN
)
486 self
.finish_name_packet(p
, questions
)
487 response
= self
.dns_transaction_udp(p
)
488 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_OK
)
489 self
.assertEquals(response
.ancount
, 1)
490 self
.assertEquals(response
.answers
[0].rdata
.txt
, '"This is a test" "and this is a test, too"')
492 def test_delete_record(self
):
493 "Test if deleting records works"
495 NAME
= "deleterec.%s" % self
.get_dns_domain()
497 # First, create a record to make sure we have a record to delete.
498 p
= self
.make_name_packet(dns
.DNS_OPCODE_UPDATE
)
501 name
= self
.get_dns_domain()
503 u
= self
.make_name_question(name
, dns
.DNS_QTYPE_SOA
, dns
.DNS_QCLASS_IN
)
505 self
.finish_name_packet(p
, updates
)
510 r
.rr_type
= dns
.DNS_QTYPE_TXT
511 r
.rr_class
= dns
.DNS_QCLASS_IN
514 rdata
= dns
.txt_record()
515 rdata
.txt
= '"This is a test"'
518 p
.nscount
= len(updates
)
521 response
= self
.dns_transaction_udp(p
)
522 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_OK
)
524 # Now check the record is around
525 p
= self
.make_name_packet(dns
.DNS_OPCODE_QUERY
)
527 q
= self
.make_name_question(NAME
, dns
.DNS_QTYPE_TXT
, dns
.DNS_QCLASS_IN
)
530 self
.finish_name_packet(p
, questions
)
531 response
= self
.dns_transaction_udp(p
)
532 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_OK
)
534 # Now delete the record
535 p
= self
.make_name_packet(dns
.DNS_OPCODE_UPDATE
)
538 name
= self
.get_dns_domain()
540 u
= self
.make_name_question(name
, dns
.DNS_QTYPE_SOA
, dns
.DNS_QCLASS_IN
)
542 self
.finish_name_packet(p
, updates
)
547 r
.rr_type
= dns
.DNS_QTYPE_TXT
548 r
.rr_class
= dns
.DNS_QCLASS_NONE
551 rdata
= dns
.txt_record()
552 rdata
.txt
= '"This is a test"'
555 p
.nscount
= len(updates
)
558 response
= self
.dns_transaction_udp(p
)
559 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_OK
)
561 # And finally check it's gone
562 p
= self
.make_name_packet(dns
.DNS_OPCODE_QUERY
)
565 q
= self
.make_name_question(NAME
, dns
.DNS_QTYPE_TXT
, dns
.DNS_QCLASS_IN
)
568 self
.finish_name_packet(p
, questions
)
569 response
= self
.dns_transaction_udp(p
)
570 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_NXDOMAIN
)
572 def test_readd_record(self
):
573 "Test if adding, deleting and then readding a records works"
575 NAME
= "readdrec.%s" % self
.get_dns_domain()
578 p
= self
.make_name_packet(dns
.DNS_OPCODE_UPDATE
)
581 name
= self
.get_dns_domain()
583 u
= self
.make_name_question(name
, dns
.DNS_QTYPE_SOA
, dns
.DNS_QCLASS_IN
)
585 self
.finish_name_packet(p
, updates
)
590 r
.rr_type
= dns
.DNS_QTYPE_TXT
591 r
.rr_class
= dns
.DNS_QCLASS_IN
594 rdata
= dns
.txt_record()
595 rdata
.txt
= '"This is a test"'
598 p
.nscount
= len(updates
)
601 response
= self
.dns_transaction_udp(p
)
602 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_OK
)
604 # Now check the record is around
605 p
= self
.make_name_packet(dns
.DNS_OPCODE_QUERY
)
607 q
= self
.make_name_question(NAME
, dns
.DNS_QTYPE_TXT
, dns
.DNS_QCLASS_IN
)
610 self
.finish_name_packet(p
, questions
)
611 response
= self
.dns_transaction_udp(p
)
612 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_OK
)
614 # Now delete the record
615 p
= self
.make_name_packet(dns
.DNS_OPCODE_UPDATE
)
618 name
= self
.get_dns_domain()
620 u
= self
.make_name_question(name
, dns
.DNS_QTYPE_SOA
, dns
.DNS_QCLASS_IN
)
622 self
.finish_name_packet(p
, updates
)
627 r
.rr_type
= dns
.DNS_QTYPE_TXT
628 r
.rr_class
= dns
.DNS_QCLASS_NONE
631 rdata
= dns
.txt_record()
632 rdata
.txt
= '"This is a test"'
635 p
.nscount
= len(updates
)
638 response
= self
.dns_transaction_udp(p
)
639 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_OK
)
642 p
= self
.make_name_packet(dns
.DNS_OPCODE_QUERY
)
645 q
= self
.make_name_question(NAME
, dns
.DNS_QTYPE_TXT
, dns
.DNS_QCLASS_IN
)
648 self
.finish_name_packet(p
, questions
)
649 response
= self
.dns_transaction_udp(p
)
650 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_NXDOMAIN
)
652 # recreate the record
653 p
= self
.make_name_packet(dns
.DNS_OPCODE_UPDATE
)
656 name
= self
.get_dns_domain()
658 u
= self
.make_name_question(name
, dns
.DNS_QTYPE_SOA
, dns
.DNS_QCLASS_IN
)
660 self
.finish_name_packet(p
, updates
)
665 r
.rr_type
= dns
.DNS_QTYPE_TXT
666 r
.rr_class
= dns
.DNS_QCLASS_IN
669 rdata
= dns
.txt_record()
670 rdata
.txt
= '"This is a test"'
673 p
.nscount
= len(updates
)
676 response
= self
.dns_transaction_udp(p
)
677 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_OK
)
679 # Now check the record is around
680 p
= self
.make_name_packet(dns
.DNS_OPCODE_QUERY
)
682 q
= self
.make_name_question(NAME
, dns
.DNS_QTYPE_TXT
, dns
.DNS_QCLASS_IN
)
685 self
.finish_name_packet(p
, questions
)
686 response
= self
.dns_transaction_udp(p
)
687 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_OK
)
689 def test_update_add_mx_record(self
):
690 "test adding MX records works"
691 p
= self
.make_name_packet(dns
.DNS_OPCODE_UPDATE
)
694 name
= self
.get_dns_domain()
696 u
= self
.make_name_question(name
, dns
.DNS_QTYPE_SOA
, dns
.DNS_QCLASS_IN
)
698 self
.finish_name_packet(p
, updates
)
702 r
.name
= "%s" % self
.get_dns_domain()
703 r
.rr_type
= dns
.DNS_QTYPE_MX
704 r
.rr_class
= dns
.DNS_QCLASS_IN
707 rdata
= dns
.mx_record()
708 rdata
.preference
= 10
709 rdata
.exchange
= 'mail.%s' % self
.get_dns_domain()
712 p
.nscount
= len(updates
)
715 response
= self
.dns_transaction_udp(p
)
716 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_OK
)
718 p
= self
.make_name_packet(dns
.DNS_OPCODE_QUERY
)
721 name
= "%s" % self
.get_dns_domain()
722 q
= self
.make_name_question(name
, dns
.DNS_QTYPE_MX
, dns
.DNS_QCLASS_IN
)
725 self
.finish_name_packet(p
, questions
)
726 response
= self
.dns_transaction_udp(p
)
727 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_OK
)
728 self
.assertEqual(response
.ancount
, 1)
729 ans
= response
.answers
[0]
730 self
.assertEqual(ans
.rr_type
, dns
.DNS_QTYPE_MX
)
731 self
.assertEqual(ans
.rdata
.preference
, 10)
732 self
.assertEqual(ans
.rdata
.exchange
, 'mail.%s' % self
.get_dns_domain())
735 class TestComplexQueries(DNSTest
):
738 super(TestComplexQueries
, self
).setUp()
739 p
= self
.make_name_packet(dns
.DNS_OPCODE_UPDATE
)
742 name
= self
.get_dns_domain()
744 u
= self
.make_name_question(name
, dns
.DNS_QTYPE_SOA
, dns
.DNS_QCLASS_IN
)
746 self
.finish_name_packet(p
, updates
)
750 r
.name
= "cname_test.%s" % self
.get_dns_domain()
751 r
.rr_type
= dns
.DNS_QTYPE_CNAME
752 r
.rr_class
= dns
.DNS_QCLASS_IN
755 r
.rdata
= "%s.%s" % (os
.getenv('SERVER'), self
.get_dns_domain())
757 p
.nscount
= len(updates
)
760 response
= self
.dns_transaction_udp(p
)
761 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_OK
)
764 super(TestComplexQueries
, self
).tearDown()
765 p
= self
.make_name_packet(dns
.DNS_OPCODE_UPDATE
)
768 name
= self
.get_dns_domain()
770 u
= self
.make_name_question(name
, dns
.DNS_QTYPE_SOA
, dns
.DNS_QCLASS_IN
)
772 self
.finish_name_packet(p
, updates
)
776 r
.name
= "cname_test.%s" % self
.get_dns_domain()
777 r
.rr_type
= dns
.DNS_QTYPE_CNAME
778 r
.rr_class
= dns
.DNS_QCLASS_NONE
781 r
.rdata
= "%s.%s" % (os
.getenv('SERVER'), self
.get_dns_domain())
783 p
.nscount
= len(updates
)
786 response
= self
.dns_transaction_udp(p
)
787 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_OK
)
789 def test_one_a_query(self
):
790 "create a query packet containing one query record"
791 p
= self
.make_name_packet(dns
.DNS_OPCODE_QUERY
)
794 name
= "cname_test.%s" % self
.get_dns_domain()
795 q
= self
.make_name_question(name
, dns
.DNS_QTYPE_A
, dns
.DNS_QCLASS_IN
)
796 print "asking for ", q
.name
799 self
.finish_name_packet(p
, questions
)
800 response
= self
.dns_transaction_udp(p
)
801 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_OK
)
802 self
.assert_dns_opcode_equals(response
, dns
.DNS_OPCODE_QUERY
)
803 self
.assertEquals(response
.ancount
, 2)
804 self
.assertEquals(response
.answers
[0].rr_type
, dns
.DNS_QTYPE_CNAME
)
805 self
.assertEquals(response
.answers
[0].rdata
, "%s.%s" %
806 (os
.getenv('SERVER'), self
.get_dns_domain()))
807 self
.assertEquals(response
.answers
[1].rr_type
, dns
.DNS_QTYPE_A
)
808 self
.assertEquals(response
.answers
[1].rdata
,
809 os
.getenv('SERVER_IP'))
811 class TestInvalidQueries(DNSTest
):
813 def test_one_a_query(self
):
814 "send 0 bytes follows by create a query packet containing one query record"
818 s
= socket
.socket(socket
.AF_INET
, socket
.SOCK_DGRAM
, 0)
819 s
.connect((os
.getenv('SERVER_IP'), 53))
825 p
= self
.make_name_packet(dns
.DNS_OPCODE_QUERY
)
828 name
= "%s.%s" % (os
.getenv('SERVER'), self
.get_dns_domain())
829 q
= self
.make_name_question(name
, dns
.DNS_QTYPE_A
, dns
.DNS_QCLASS_IN
)
830 print "asking for ", q
.name
833 self
.finish_name_packet(p
, questions
)
834 response
= self
.dns_transaction_udp(p
)
835 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_OK
)
836 self
.assert_dns_opcode_equals(response
, dns
.DNS_OPCODE_QUERY
)
837 self
.assertEquals(response
.ancount
, 1)
838 self
.assertEquals(response
.answers
[0].rdata
,
839 os
.getenv('SERVER_IP'))
841 def test_one_a_reply(self
):
842 "send a reply instead of a query"
844 p
= self
.make_name_packet(dns
.DNS_OPCODE_QUERY
)
847 name
= "%s.%s" % ('fakefakefake', self
.get_dns_domain())
848 q
= self
.make_name_question(name
, dns
.DNS_QTYPE_A
, dns
.DNS_QCLASS_IN
)
849 print "asking for ", q
.name
852 self
.finish_name_packet(p
, questions
)
853 p
.operation |
= dns
.DNS_FLAG_REPLY
856 send_packet
= ndr
.ndr_pack(p
)
857 s
= socket
.socket(socket
.AF_INET
, socket
.SOCK_STREAM
, 0)
858 host
=os
.getenv('SERVER_IP')
859 s
.connect((host
, 53))
860 tcp_packet
= struct
.pack('!H', len(send_packet
))
861 tcp_packet
+= send_packet
862 s
.send(tcp_packet
, 0)
863 recv_packet
= s
.recv(0xffff + 2, 0)
864 self
.assertEquals(0, len(recv_packet
))
869 class TestZones(DNSTest
):
870 def get_loadparm(self
):
871 lp
= param
.LoadParm()
872 lp
.load(os
.getenv("SMB_CONF_PATH"))
875 def get_credentials(self
, lp
):
876 creds
= credentials
.Credentials()
878 creds
.set_machine_account(lp
)
879 creds
.set_krb_forwardable(credentials
.NO_KRB_FORWARDABLE
)
883 super(TestZones
, self
).setUp()
884 self
.lp
= self
.get_loadparm()
885 self
.creds
= self
.get_credentials(self
.lp
)
886 self
.server
= os
.getenv("SERVER_IP")
887 self
.zone
= "test.lan"
888 self
.rpc_conn
= dnsserver
.dnsserver("ncacn_ip_tcp:%s" % (self
.server
),
892 super(TestZones
, self
).tearDown()
894 self
.delete_zone(self
.zone
)
895 except RuntimeError, (num
, string
):
896 if num
!= 9601: #WERR_DNS_ERROR_ZONE_DOES_NOT_EXIST
899 def create_zone(self
, zone
):
900 zone_create
= dnsserver
.DNS_RPC_ZONE_CREATE_INFO_LONGHORN()
901 zone_create
.pszZoneName
= zone
902 zone_create
.dwZoneType
= dnsp
.DNS_ZONE_TYPE_PRIMARY
903 zone_create
.fAllowUpdate
= dnsp
.DNS_ZONE_UPDATE_SECURE
904 zone_create
.fAging
= 0
905 zone_create
.dwDpFlags
= dnsserver
.DNS_DP_DOMAIN_DEFAULT
906 self
.rpc_conn
.DnssrvOperation2(dnsserver
.DNS_CLIENT_VERSION_LONGHORN
,
912 dnsserver
.DNSSRV_TYPEID_ZONE_CREATE
,
915 def delete_zone(self
, zone
):
916 self
.rpc_conn
.DnssrvOperation2(dnsserver
.DNS_CLIENT_VERSION_LONGHORN
,
922 dnsserver
.DNSSRV_TYPEID_NULL
,
925 def test_soa_query(self
):
927 p
= self
.make_name_packet(dns
.DNS_OPCODE_QUERY
)
930 q
= self
.make_name_question(zone
, dns
.DNS_QTYPE_SOA
, dns
.DNS_QCLASS_IN
)
932 self
.finish_name_packet(p
, questions
)
934 response
= self
.dns_transaction_udp(p
)
935 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_NXDOMAIN
)
936 self
.assert_dns_opcode_equals(response
, dns
.DNS_OPCODE_QUERY
)
937 self
.assertEquals(response
.ancount
, 0)
939 self
.create_zone(zone
)
940 response
= self
.dns_transaction_udp(p
)
941 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_OK
)
942 self
.assert_dns_opcode_equals(response
, dns
.DNS_OPCODE_QUERY
)
943 self
.assertEquals(response
.ancount
, 1)
944 self
.assertEquals(response
.answers
[0].rr_type
, dns
.DNS_QTYPE_SOA
)
946 self
.delete_zone(zone
)
947 response
= self
.dns_transaction_udp(p
)
948 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_NXDOMAIN
)
949 self
.assert_dns_opcode_equals(response
, dns
.DNS_OPCODE_QUERY
)
950 self
.assertEquals(response
.ancount
, 0)
954 if __name__
== "__main__":