1 # Unix SMB/CIFS implementation.
2 # Copyright (C) Kai Blin <kai@samba.org> 2011
4 # This program is free software; you can redistribute it and/or modify
5 # it under the terms of the GNU General Public License as published by
6 # the Free Software Foundation; either version 3 of the License, or
7 # (at your option) any later version.
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
14 # You should have received a copy of the GNU General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
21 from samba
import socket
22 import samba
.ndr
as ndr
23 import samba
.dcerpc
.dns
as dns
24 from samba
.tests
import TestCase
26 FILTER
=''.join([(len(repr(chr(x
)))==3) and chr(x
) or '.' for x
in range(256)])
29 class DNSTest(TestCase
):
31 def errstr(self
, errcode
):
32 "Return a readable error code"
47 return string_codes
[errcode
]
50 def assert_dns_rcode_equals(self
, packet
, rcode
):
51 "Helper function to check return code"
52 p_errcode
= packet
.operation
& 0x000F
53 self
.assertEquals(p_errcode
, rcode
, "Expected RCODE %s, got %s" %
54 (self
.errstr(rcode
), self
.errstr(p_errcode
)))
56 def assert_dns_opcode_equals(self
, packet
, opcode
):
57 "Helper function to check opcode"
58 p_opcode
= packet
.operation
& 0x7800
59 self
.assertEquals(p_opcode
, opcode
, "Expected OPCODE %s, got %s" %
62 def make_name_packet(self
, opcode
, qid
=None):
63 "Helper creating a dns.name_packet"
66 p
.id = random
.randint(0x0, 0xffff)
71 def finish_name_packet(self
, packet
, questions
):
72 "Helper to finalize a dns.name_packet"
73 packet
.qdcount
= len(questions
)
74 packet
.questions
= questions
76 def make_name_question(self
, name
, qtype
, qclass
):
77 "Helper creating a dns.name_question"
78 q
= dns
.name_question()
80 q
.question_type
= qtype
81 q
.question_class
= qclass
84 def get_dns_domain(self
):
85 "Helper to get dns domain"
86 return os
.getenv('REALM', 'example.com').lower()
88 def dns_transaction_udp(self
, packet
, host
=os
.getenv('SERVER_IP'), dump
=False):
89 "send a DNS query and read the reply"
92 send_packet
= ndr
.ndr_pack(packet
)
94 print self
.hexdump(send_packet
)
95 s
= socket
.socket(socket
.AF_INET
, socket
.SOCK_DGRAM
, 0)
97 s
.send(send_packet
, 0)
98 recv_packet
= s
.recv(2048, 0)
100 print self
.hexdump(recv_packet
)
101 return ndr
.ndr_unpack(dns
.name_packet
, recv_packet
)
106 def dns_transaction_tcp(self
, packet
, host
=os
.getenv('SERVER_IP'), dump
=False):
107 "send a DNS query and read the reply"
110 send_packet
= ndr
.ndr_pack(packet
)
112 print self
.hexdump(send_packet
)
113 s
= socket
.socket(socket
.AF_INET
, socket
.SOCK_STREAM
, 0)
114 s
.connect((host
, 53))
115 tcp_packet
= struct
.pack('!H', len(send_packet
))
116 tcp_packet
+= send_packet
117 s
.send(tcp_packet
, 0)
118 recv_packet
= s
.recv(0xffff + 2, 0)
120 print self
.hexdump(recv_packet
)
121 return ndr
.ndr_unpack(dns
.name_packet
, recv_packet
[2:])
126 def hexdump(self
, src
, length
=8):
129 s
,src
= src
[:length
],src
[length
:]
130 hexa
= ' '.join(["%02X"%ord(x
) for x
in s
])
131 s
= s
.translate(FILTER
)
132 result
+= "%04X %-*s %s\n" % (N
, length
*3, hexa
, s
)
136 class TestSimpleQueries(DNSTest
):
138 def test_one_a_query(self
):
139 "create a query packet containing one query record"
140 p
= self
.make_name_packet(dns
.DNS_OPCODE_QUERY
)
143 name
= "%s.%s" % (os
.getenv('SERVER'), self
.get_dns_domain())
144 q
= self
.make_name_question(name
, dns
.DNS_QTYPE_A
, dns
.DNS_QCLASS_IN
)
145 print "asking for ", q
.name
148 self
.finish_name_packet(p
, questions
)
149 response
= self
.dns_transaction_udp(p
)
150 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_OK
)
151 self
.assert_dns_opcode_equals(response
, dns
.DNS_OPCODE_QUERY
)
152 self
.assertEquals(response
.ancount
, 1)
153 self
.assertEquals(response
.answers
[0].rdata
,
154 os
.getenv('SERVER_IP'))
156 def test_one_a_query_tcp(self
):
157 "create a query packet containing one query record via TCP"
158 p
= self
.make_name_packet(dns
.DNS_OPCODE_QUERY
)
161 name
= "%s.%s" % (os
.getenv('SERVER'), self
.get_dns_domain())
162 q
= self
.make_name_question(name
, dns
.DNS_QTYPE_A
, dns
.DNS_QCLASS_IN
)
163 print "asking for ", q
.name
166 self
.finish_name_packet(p
, questions
)
167 response
= self
.dns_transaction_tcp(p
)
168 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_OK
)
169 self
.assert_dns_opcode_equals(response
, dns
.DNS_OPCODE_QUERY
)
170 self
.assertEquals(response
.ancount
, 1)
171 self
.assertEquals(response
.answers
[0].rdata
,
172 os
.getenv('SERVER_IP'))
174 def test_two_queries(self
):
175 "create a query packet containing two query records"
176 p
= self
.make_name_packet(dns
.DNS_OPCODE_QUERY
)
179 name
= "%s.%s" % (os
.getenv('SERVER'), self
.get_dns_domain())
180 q
= self
.make_name_question(name
, dns
.DNS_QTYPE_A
, dns
.DNS_QCLASS_IN
)
183 name
= "%s.%s" % ('bogusname', self
.get_dns_domain())
184 q
= self
.make_name_question(name
, dns
.DNS_QTYPE_A
, dns
.DNS_QCLASS_IN
)
187 self
.finish_name_packet(p
, questions
)
188 response
= self
.dns_transaction_udp(p
)
189 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_FORMERR
)
191 def test_qtype_all_query(self
):
192 "create a QTYPE_ALL query"
193 p
= self
.make_name_packet(dns
.DNS_OPCODE_QUERY
)
196 name
= "%s.%s" % (os
.getenv('SERVER'), self
.get_dns_domain())
197 q
= self
.make_name_question(name
, dns
.DNS_QTYPE_ALL
, dns
.DNS_QCLASS_IN
)
198 print "asking for ", q
.name
201 self
.finish_name_packet(p
, questions
)
202 response
= self
.dns_transaction_udp(p
)
205 dc_ipv6
= os
.getenv('SERVER_IPV6')
206 if dc_ipv6
is not None:
209 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_OK
)
210 self
.assert_dns_opcode_equals(response
, dns
.DNS_OPCODE_QUERY
)
211 self
.assertEquals(response
.ancount
, num_answers
)
212 self
.assertEquals(response
.answers
[0].rdata
,
213 os
.getenv('SERVER_IP'))
214 if dc_ipv6
is not None:
215 self
.assertEquals(response
.answers
[1].rdata
, dc_ipv6
)
217 def test_qclass_none_query(self
):
218 "create a QCLASS_NONE query"
219 p
= self
.make_name_packet(dns
.DNS_OPCODE_QUERY
)
222 name
= "%s.%s" % (os
.getenv('SERVER'), self
.get_dns_domain())
223 q
= self
.make_name_question(name
, dns
.DNS_QTYPE_ALL
, dns
.DNS_QCLASS_NONE
)
226 self
.finish_name_packet(p
, questions
)
227 response
= self
.dns_transaction_udp(p
)
228 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_NOTIMP
)
230 # Only returns an authority section entry in BIND and Win DNS
231 # FIXME: Enable one Samba implements this feature
232 # def test_soa_hostname_query(self):
233 # "create a SOA query for a hostname"
234 # p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
237 # name = "%s.%s" % (os.getenv('SERVER'), self.get_dns_domain())
238 # q = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
239 # questions.append(q)
241 # self.finish_name_packet(p, questions)
242 # response = self.dns_transaction_udp(p)
243 # self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
244 # self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
245 # # We don't get SOA records for single hosts
246 # self.assertEquals(response.ancount, 0)
248 def test_soa_domain_query(self
):
249 "create a SOA query for a domain"
250 p
= self
.make_name_packet(dns
.DNS_OPCODE_QUERY
)
253 name
= self
.get_dns_domain()
254 q
= self
.make_name_question(name
, dns
.DNS_QTYPE_SOA
, dns
.DNS_QCLASS_IN
)
257 self
.finish_name_packet(p
, questions
)
258 response
= self
.dns_transaction_udp(p
)
259 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_OK
)
260 self
.assert_dns_opcode_equals(response
, dns
.DNS_OPCODE_QUERY
)
261 self
.assertEquals(response
.ancount
, 1)
264 class TestDNSUpdates(DNSTest
):
266 def test_two_updates(self
):
267 "create two update requests"
268 p
= self
.make_name_packet(dns
.DNS_OPCODE_UPDATE
)
271 name
= "%s.%s" % (os
.getenv('SERVER'), self
.get_dns_domain())
272 u
= self
.make_name_question(name
, dns
.DNS_QTYPE_A
, dns
.DNS_QCLASS_IN
)
275 name
= self
.get_dns_domain()
276 u
= self
.make_name_question(name
, dns
.DNS_QTYPE_A
, dns
.DNS_QCLASS_IN
)
279 self
.finish_name_packet(p
, updates
)
280 response
= self
.dns_transaction_udp(p
)
281 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_FORMERR
)
283 def test_update_wrong_qclass(self
):
284 "create update with DNS_QCLASS_NONE"
285 p
= self
.make_name_packet(dns
.DNS_OPCODE_UPDATE
)
288 name
= self
.get_dns_domain()
289 u
= self
.make_name_question(name
, dns
.DNS_QTYPE_A
, dns
.DNS_QCLASS_NONE
)
292 self
.finish_name_packet(p
, updates
)
293 response
= self
.dns_transaction_udp(p
)
294 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_NOTIMP
)
296 def test_update_prereq_with_non_null_ttl(self
):
297 "test update with a non-null TTL"
298 p
= self
.make_name_packet(dns
.DNS_OPCODE_UPDATE
)
301 name
= self
.get_dns_domain()
303 u
= self
.make_name_question(name
, dns
.DNS_QTYPE_SOA
, dns
.DNS_QCLASS_IN
)
305 self
.finish_name_packet(p
, updates
)
309 r
.name
= "%s.%s" % (os
.getenv('SERVER'), self
.get_dns_domain())
310 r
.rr_type
= dns
.DNS_QTYPE_TXT
311 r
.rr_class
= dns
.DNS_QCLASS_NONE
316 p
.ancount
= len(prereqs
)
319 response
= self
.dns_transaction_udp(p
)
320 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_FORMERR
)
322 # I'd love to test this one, but it segfaults. :)
323 # def test_update_prereq_with_non_null_length(self):
324 # "test update with a non-null length"
325 # p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
328 # name = self.get_dns_domain()
330 # u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
332 # self.finish_name_packet(p, updates)
336 # r.name = "%s.%s" % (os.getenv('SERVER'), self.get_dns_domain())
337 # r.rr_type = dns.DNS_QTYPE_TXT
338 # r.rr_class = dns.DNS_QCLASS_ANY
343 # p.ancount = len(prereqs)
344 # p.answers = prereqs
346 # response = self.dns_transaction_udp(p)
347 # self.assert_dns_rcode_equals(response, dns.DNS_RCODE_FORMERR)
349 def test_update_prereq_nonexisting_name(self
):
350 "test update with a nonexisting name"
351 p
= self
.make_name_packet(dns
.DNS_OPCODE_UPDATE
)
354 name
= self
.get_dns_domain()
356 u
= self
.make_name_question(name
, dns
.DNS_QTYPE_SOA
, dns
.DNS_QCLASS_IN
)
358 self
.finish_name_packet(p
, updates
)
362 r
.name
= "idontexist.%s" % self
.get_dns_domain()
363 r
.rr_type
= dns
.DNS_QTYPE_TXT
364 r
.rr_class
= dns
.DNS_QCLASS_ANY
369 p
.ancount
= len(prereqs
)
372 response
= self
.dns_transaction_udp(p
)
373 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_NXRRSET
)
375 def test_update_add_txt_record(self
):
376 "test adding records works"
377 p
= self
.make_name_packet(dns
.DNS_OPCODE_UPDATE
)
380 name
= self
.get_dns_domain()
382 u
= self
.make_name_question(name
, dns
.DNS_QTYPE_SOA
, dns
.DNS_QCLASS_IN
)
384 self
.finish_name_packet(p
, updates
)
388 r
.name
= "textrec.%s" % self
.get_dns_domain()
389 r
.rr_type
= dns
.DNS_QTYPE_TXT
390 r
.rr_class
= dns
.DNS_QCLASS_IN
393 rdata
= dns
.txt_record()
394 rdata
.txt
= '"This is a test"'
397 p
.nscount
= len(updates
)
400 response
= self
.dns_transaction_udp(p
)
401 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_OK
)
403 p
= self
.make_name_packet(dns
.DNS_OPCODE_QUERY
)
406 name
= "textrec.%s" % self
.get_dns_domain()
407 q
= self
.make_name_question(name
, dns
.DNS_QTYPE_TXT
, dns
.DNS_QCLASS_IN
)
410 self
.finish_name_packet(p
, questions
)
411 response
= self
.dns_transaction_udp(p
)
412 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_OK
)
413 self
.assertEquals(response
.ancount
, 1)
414 self
.assertEquals(response
.answers
[0].rdata
.txt
, '"This is a test"')
416 def test_update_add_two_txt_records(self
):
417 "test adding two txt records works"
418 p
= self
.make_name_packet(dns
.DNS_OPCODE_UPDATE
)
421 name
= self
.get_dns_domain()
423 u
= self
.make_name_question(name
, dns
.DNS_QTYPE_SOA
, dns
.DNS_QCLASS_IN
)
425 self
.finish_name_packet(p
, updates
)
429 r
.name
= "textrec2.%s" % self
.get_dns_domain()
430 r
.rr_type
= dns
.DNS_QTYPE_TXT
431 r
.rr_class
= dns
.DNS_QCLASS_IN
434 rdata
= dns
.txt_record()
435 rdata
.txt
= '"This is a test" "and this is a test, too"'
438 p
.nscount
= len(updates
)
441 response
= self
.dns_transaction_udp(p
)
442 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_OK
)
444 p
= self
.make_name_packet(dns
.DNS_OPCODE_QUERY
)
447 name
= "textrec2.%s" % self
.get_dns_domain()
448 q
= self
.make_name_question(name
, dns
.DNS_QTYPE_TXT
, dns
.DNS_QCLASS_IN
)
451 self
.finish_name_packet(p
, questions
)
452 response
= self
.dns_transaction_udp(p
)
453 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_OK
)
454 self
.assertEquals(response
.ancount
, 1)
455 self
.assertEquals(response
.answers
[0].rdata
.txt
, '"This is a test" "and this is a test, too"')
457 def test_delete_record(self
):
458 "Test if deleting records works"
460 NAME
= "deleterec.%s" % self
.get_dns_domain()
462 # First, create a record to make sure we have a record to delete.
463 p
= self
.make_name_packet(dns
.DNS_OPCODE_UPDATE
)
466 name
= self
.get_dns_domain()
468 u
= self
.make_name_question(name
, dns
.DNS_QTYPE_SOA
, dns
.DNS_QCLASS_IN
)
470 self
.finish_name_packet(p
, updates
)
475 r
.rr_type
= dns
.DNS_QTYPE_TXT
476 r
.rr_class
= dns
.DNS_QCLASS_IN
479 rdata
= dns
.txt_record()
480 rdata
.txt
= '"This is a test"'
483 p
.nscount
= len(updates
)
486 response
= self
.dns_transaction_udp(p
)
487 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_OK
)
489 # Now check the record is around
490 p
= self
.make_name_packet(dns
.DNS_OPCODE_QUERY
)
492 q
= self
.make_name_question(NAME
, dns
.DNS_QTYPE_TXT
, dns
.DNS_QCLASS_IN
)
495 self
.finish_name_packet(p
, questions
)
496 response
= self
.dns_transaction_udp(p
)
497 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_OK
)
499 # Now delete the record
500 p
= self
.make_name_packet(dns
.DNS_OPCODE_UPDATE
)
503 name
= self
.get_dns_domain()
505 u
= self
.make_name_question(name
, dns
.DNS_QTYPE_SOA
, dns
.DNS_QCLASS_IN
)
507 self
.finish_name_packet(p
, updates
)
512 r
.rr_type
= dns
.DNS_QTYPE_TXT
513 r
.rr_class
= dns
.DNS_QCLASS_NONE
516 rdata
= dns
.txt_record()
517 rdata
.txt
= '"This is a test"'
520 p
.nscount
= len(updates
)
523 response
= self
.dns_transaction_udp(p
)
524 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_OK
)
526 # And finally check it's gone
527 p
= self
.make_name_packet(dns
.DNS_OPCODE_QUERY
)
530 q
= self
.make_name_question(NAME
, dns
.DNS_QTYPE_TXT
, dns
.DNS_QCLASS_IN
)
533 self
.finish_name_packet(p
, questions
)
534 response
= self
.dns_transaction_udp(p
)
535 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_NXDOMAIN
)
537 def test_readd_record(self
):
538 "Test if adding, deleting and then readding a records works"
540 NAME
= "readdrec.%s" % self
.get_dns_domain()
543 p
= self
.make_name_packet(dns
.DNS_OPCODE_UPDATE
)
546 name
= self
.get_dns_domain()
548 u
= self
.make_name_question(name
, dns
.DNS_QTYPE_SOA
, dns
.DNS_QCLASS_IN
)
550 self
.finish_name_packet(p
, updates
)
555 r
.rr_type
= dns
.DNS_QTYPE_TXT
556 r
.rr_class
= dns
.DNS_QCLASS_IN
559 rdata
= dns
.txt_record()
560 rdata
.txt
= '"This is a test"'
563 p
.nscount
= len(updates
)
566 response
= self
.dns_transaction_udp(p
)
567 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_OK
)
569 # Now check the record is around
570 p
= self
.make_name_packet(dns
.DNS_OPCODE_QUERY
)
572 q
= self
.make_name_question(NAME
, dns
.DNS_QTYPE_TXT
, dns
.DNS_QCLASS_IN
)
575 self
.finish_name_packet(p
, questions
)
576 response
= self
.dns_transaction_udp(p
)
577 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_OK
)
579 # Now delete the record
580 p
= self
.make_name_packet(dns
.DNS_OPCODE_UPDATE
)
583 name
= self
.get_dns_domain()
585 u
= self
.make_name_question(name
, dns
.DNS_QTYPE_SOA
, dns
.DNS_QCLASS_IN
)
587 self
.finish_name_packet(p
, updates
)
592 r
.rr_type
= dns
.DNS_QTYPE_TXT
593 r
.rr_class
= dns
.DNS_QCLASS_NONE
596 rdata
= dns
.txt_record()
597 rdata
.txt
= '"This is a test"'
600 p
.nscount
= len(updates
)
603 response
= self
.dns_transaction_udp(p
)
604 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_OK
)
607 p
= self
.make_name_packet(dns
.DNS_OPCODE_QUERY
)
610 q
= self
.make_name_question(NAME
, dns
.DNS_QTYPE_TXT
, dns
.DNS_QCLASS_IN
)
613 self
.finish_name_packet(p
, questions
)
614 response
= self
.dns_transaction_udp(p
)
615 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_NXDOMAIN
)
617 # recreate the record
618 p
= self
.make_name_packet(dns
.DNS_OPCODE_UPDATE
)
621 name
= self
.get_dns_domain()
623 u
= self
.make_name_question(name
, dns
.DNS_QTYPE_SOA
, dns
.DNS_QCLASS_IN
)
625 self
.finish_name_packet(p
, updates
)
630 r
.rr_type
= dns
.DNS_QTYPE_TXT
631 r
.rr_class
= dns
.DNS_QCLASS_IN
634 rdata
= dns
.txt_record()
635 rdata
.txt
= '"This is a test"'
638 p
.nscount
= len(updates
)
641 response
= self
.dns_transaction_udp(p
)
642 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_OK
)
644 # Now check the record is around
645 p
= self
.make_name_packet(dns
.DNS_OPCODE_QUERY
)
647 q
= self
.make_name_question(NAME
, dns
.DNS_QTYPE_TXT
, dns
.DNS_QCLASS_IN
)
650 self
.finish_name_packet(p
, questions
)
651 response
= self
.dns_transaction_udp(p
)
652 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_OK
)
654 def test_update_add_mx_record(self
):
655 "test adding MX records works"
656 p
= self
.make_name_packet(dns
.DNS_OPCODE_UPDATE
)
659 name
= self
.get_dns_domain()
661 u
= self
.make_name_question(name
, dns
.DNS_QTYPE_SOA
, dns
.DNS_QCLASS_IN
)
663 self
.finish_name_packet(p
, updates
)
667 r
.name
= "%s" % self
.get_dns_domain()
668 r
.rr_type
= dns
.DNS_QTYPE_MX
669 r
.rr_class
= dns
.DNS_QCLASS_IN
672 rdata
= dns
.mx_record()
673 rdata
.preference
= 10
674 rdata
.exchange
= 'mail.%s' % self
.get_dns_domain()
677 p
.nscount
= len(updates
)
680 response
= self
.dns_transaction_udp(p
)
681 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_OK
)
683 p
= self
.make_name_packet(dns
.DNS_OPCODE_QUERY
)
686 name
= "%s" % self
.get_dns_domain()
687 q
= self
.make_name_question(name
, dns
.DNS_QTYPE_MX
, dns
.DNS_QCLASS_IN
)
690 self
.finish_name_packet(p
, questions
)
691 response
= self
.dns_transaction_udp(p
)
692 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_OK
)
693 self
.assertEqual(response
.ancount
, 1)
694 ans
= response
.answers
[0]
695 self
.assertEqual(ans
.rr_type
, dns
.DNS_QTYPE_MX
)
696 self
.assertEqual(ans
.rdata
.preference
, 10)
697 self
.assertEqual(ans
.rdata
.exchange
, 'mail.%s' % self
.get_dns_domain())
700 class TestComplexQueries(DNSTest
):
703 super(TestComplexQueries
, self
).setUp()
704 p
= self
.make_name_packet(dns
.DNS_OPCODE_UPDATE
)
707 name
= self
.get_dns_domain()
709 u
= self
.make_name_question(name
, dns
.DNS_QTYPE_SOA
, dns
.DNS_QCLASS_IN
)
711 self
.finish_name_packet(p
, updates
)
715 r
.name
= "cname_test.%s" % self
.get_dns_domain()
716 r
.rr_type
= dns
.DNS_QTYPE_CNAME
717 r
.rr_class
= dns
.DNS_QCLASS_IN
720 r
.rdata
= "%s.%s" % (os
.getenv('SERVER'), self
.get_dns_domain())
722 p
.nscount
= len(updates
)
725 response
= self
.dns_transaction_udp(p
)
726 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_OK
)
729 super(TestComplexQueries
, self
).tearDown()
730 p
= self
.make_name_packet(dns
.DNS_OPCODE_UPDATE
)
733 name
= self
.get_dns_domain()
735 u
= self
.make_name_question(name
, dns
.DNS_QTYPE_SOA
, dns
.DNS_QCLASS_IN
)
737 self
.finish_name_packet(p
, updates
)
741 r
.name
= "cname_test.%s" % self
.get_dns_domain()
742 r
.rr_type
= dns
.DNS_QTYPE_CNAME
743 r
.rr_class
= dns
.DNS_QCLASS_NONE
746 r
.rdata
= "%s.%s" % (os
.getenv('SERVER'), self
.get_dns_domain())
748 p
.nscount
= len(updates
)
751 response
= self
.dns_transaction_udp(p
)
752 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_OK
)
754 def test_one_a_query(self
):
755 "create a query packet containing one query record"
756 p
= self
.make_name_packet(dns
.DNS_OPCODE_QUERY
)
759 name
= "cname_test.%s" % self
.get_dns_domain()
760 q
= self
.make_name_question(name
, dns
.DNS_QTYPE_A
, dns
.DNS_QCLASS_IN
)
761 print "asking for ", q
.name
764 self
.finish_name_packet(p
, questions
)
765 response
= self
.dns_transaction_udp(p
)
766 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_OK
)
767 self
.assert_dns_opcode_equals(response
, dns
.DNS_OPCODE_QUERY
)
768 self
.assertEquals(response
.ancount
, 2)
769 self
.assertEquals(response
.answers
[0].rr_type
, dns
.DNS_QTYPE_CNAME
)
770 self
.assertEquals(response
.answers
[0].rdata
, "%s.%s" %
771 (os
.getenv('SERVER'), self
.get_dns_domain()))
772 self
.assertEquals(response
.answers
[1].rr_type
, dns
.DNS_QTYPE_A
)
773 self
.assertEquals(response
.answers
[1].rdata
,
774 os
.getenv('SERVER_IP'))
776 class TestInvalidQueries(DNSTest
):
778 def test_one_a_query(self
):
779 "send 0 bytes follows by create a query packet containing one query record"
783 s
= socket
.socket(socket
.AF_INET
, socket
.SOCK_DGRAM
, 0)
784 s
.connect((os
.getenv('SERVER_IP'), 53))
790 p
= self
.make_name_packet(dns
.DNS_OPCODE_QUERY
)
793 name
= "%s.%s" % (os
.getenv('SERVER'), self
.get_dns_domain())
794 q
= self
.make_name_question(name
, dns
.DNS_QTYPE_A
, dns
.DNS_QCLASS_IN
)
795 print "asking for ", q
.name
798 self
.finish_name_packet(p
, questions
)
799 response
= self
.dns_transaction_udp(p
)
800 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_OK
)
801 self
.assert_dns_opcode_equals(response
, dns
.DNS_OPCODE_QUERY
)
802 self
.assertEquals(response
.ancount
, 1)
803 self
.assertEquals(response
.answers
[0].rdata
,
804 os
.getenv('SERVER_IP'))
806 if __name__
== "__main__":