1 # Unix SMB/CIFS implementation.
2 # Copyright (C) Kai Blin <kai@samba.org> 2011
4 # This program is free software; you can redistribute it and/or modify
5 # it under the terms of the GNU General Public License as published by
6 # the Free Software Foundation; either version 3 of the License, or
7 # (at your option) any later version.
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
14 # You should have received a copy of the GNU General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
22 import samba
.ndr
as ndr
23 import samba
.dcerpc
.dns
as dns
24 from samba
.tests
import TestCase
26 FILTER
=''.join([(len(repr(chr(x
)))==3) and chr(x
) or '.' for x
in range(256)])
29 class DNSTest(TestCase
):
31 def errstr(self
, errcode
):
32 "Return a readable error code"
47 return string_codes
[errcode
]
50 def assert_dns_rcode_equals(self
, packet
, rcode
):
51 "Helper function to check return code"
52 p_errcode
= packet
.operation
& 0x000F
53 self
.assertEquals(p_errcode
, rcode
, "Expected RCODE %s, got %s" %
54 (self
.errstr(rcode
), self
.errstr(p_errcode
)))
56 def assert_dns_opcode_equals(self
, packet
, opcode
):
57 "Helper function to check opcode"
58 p_opcode
= packet
.operation
& 0x7800
59 self
.assertEquals(p_opcode
, opcode
, "Expected OPCODE %s, got %s" %
62 def make_name_packet(self
, opcode
, qid
=None):
63 "Helper creating a dns.name_packet"
66 p
.id = random
.randint(0x0, 0xffff)
71 def finish_name_packet(self
, packet
, questions
):
72 "Helper to finalize a dns.name_packet"
73 packet
.qdcount
= len(questions
)
74 packet
.questions
= questions
76 def make_name_question(self
, name
, qtype
, qclass
):
77 "Helper creating a dns.name_question"
78 q
= dns
.name_question()
80 q
.question_type
= qtype
81 q
.question_class
= qclass
84 def get_dns_domain(self
):
85 "Helper to get dns domain"
86 return os
.getenv('REALM', 'example.com').lower()
88 def dns_transaction_udp(self
, packet
, host
=os
.getenv('SERVER_IP'), dump
=False):
89 "send a DNS query and read the reply"
92 send_packet
= ndr
.ndr_pack(packet
)
94 print self
.hexdump(send_packet
)
95 s
= socket
.socket(socket
.AF_INET
, socket
.SOCK_DGRAM
, 0)
97 s
.send(send_packet
, 0)
98 recv_packet
= s
.recv(2048, 0)
100 print self
.hexdump(recv_packet
)
101 return ndr
.ndr_unpack(dns
.name_packet
, recv_packet
)
106 def dns_transaction_tcp(self
, packet
, host
=os
.getenv('SERVER_IP'), dump
=False):
107 "send a DNS query and read the reply"
110 send_packet
= ndr
.ndr_pack(packet
)
112 print self
.hexdump(send_packet
)
113 s
= socket
.socket(socket
.AF_INET
, socket
.SOCK_STREAM
, 0)
114 s
.connect((host
, 53))
115 tcp_packet
= struct
.pack('!H', len(send_packet
))
116 tcp_packet
+= send_packet
117 s
.send(tcp_packet
, 0)
118 recv_packet
= s
.recv(0xffff + 2, 0)
120 print self
.hexdump(recv_packet
)
121 return ndr
.ndr_unpack(dns
.name_packet
, recv_packet
[2:])
126 def hexdump(self
, src
, length
=8):
129 s
,src
= src
[:length
],src
[length
:]
130 hexa
= ' '.join(["%02X"%ord(x
) for x
in s
])
131 s
= s
.translate(FILTER
)
132 result
+= "%04X %-*s %s\n" % (N
, length
*3, hexa
, s
)
136 class TestSimpleQueries(DNSTest
):
138 def test_one_a_query(self
):
139 "create a query packet containing one query record"
140 p
= self
.make_name_packet(dns
.DNS_OPCODE_QUERY
)
143 name
= "%s.%s" % (os
.getenv('SERVER'), self
.get_dns_domain())
144 q
= self
.make_name_question(name
, dns
.DNS_QTYPE_A
, dns
.DNS_QCLASS_IN
)
145 print "asking for ", q
.name
148 self
.finish_name_packet(p
, questions
)
149 response
= self
.dns_transaction_udp(p
)
150 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_OK
)
151 self
.assert_dns_opcode_equals(response
, dns
.DNS_OPCODE_QUERY
)
152 self
.assertEquals(response
.ancount
, 1)
153 self
.assertEquals(response
.answers
[0].rdata
,
154 os
.getenv('SERVER_IP'))
156 def test_one_a_query_tcp(self
):
157 "create a query packet containing one query record via TCP"
158 p
= self
.make_name_packet(dns
.DNS_OPCODE_QUERY
)
161 name
= "%s.%s" % (os
.getenv('SERVER'), self
.get_dns_domain())
162 q
= self
.make_name_question(name
, dns
.DNS_QTYPE_A
, dns
.DNS_QCLASS_IN
)
163 print "asking for ", q
.name
166 self
.finish_name_packet(p
, questions
)
167 response
= self
.dns_transaction_tcp(p
)
168 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_OK
)
169 self
.assert_dns_opcode_equals(response
, dns
.DNS_OPCODE_QUERY
)
170 self
.assertEquals(response
.ancount
, 1)
171 self
.assertEquals(response
.answers
[0].rdata
,
172 os
.getenv('SERVER_IP'))
174 def test_one_mx_query(self
):
175 "create a query packet causing an empty RCODE_OK answer"
176 p
= self
.make_name_packet(dns
.DNS_OPCODE_QUERY
)
179 name
= "%s.%s" % (os
.getenv('SERVER'), self
.get_dns_domain())
180 q
= self
.make_name_question(name
, dns
.DNS_QTYPE_MX
, dns
.DNS_QCLASS_IN
)
181 print "asking for ", q
.name
184 self
.finish_name_packet(p
, questions
)
185 response
= self
.dns_transaction_udp(p
)
186 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_OK
)
187 self
.assert_dns_opcode_equals(response
, dns
.DNS_OPCODE_QUERY
)
188 self
.assertEquals(response
.ancount
, 0)
190 p
= self
.make_name_packet(dns
.DNS_OPCODE_QUERY
)
193 name
= "invalid-%s.%s" % (os
.getenv('SERVER'), self
.get_dns_domain())
194 q
= self
.make_name_question(name
, dns
.DNS_QTYPE_MX
, dns
.DNS_QCLASS_IN
)
195 print "asking for ", q
.name
198 self
.finish_name_packet(p
, questions
)
199 response
= self
.dns_transaction_udp(p
)
200 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_NXDOMAIN
)
201 self
.assert_dns_opcode_equals(response
, dns
.DNS_OPCODE_QUERY
)
202 self
.assertEquals(response
.ancount
, 0)
204 def test_two_queries(self
):
205 "create a query packet containing two query records"
206 p
= self
.make_name_packet(dns
.DNS_OPCODE_QUERY
)
209 name
= "%s.%s" % (os
.getenv('SERVER'), self
.get_dns_domain())
210 q
= self
.make_name_question(name
, dns
.DNS_QTYPE_A
, dns
.DNS_QCLASS_IN
)
213 name
= "%s.%s" % ('bogusname', self
.get_dns_domain())
214 q
= self
.make_name_question(name
, dns
.DNS_QTYPE_A
, dns
.DNS_QCLASS_IN
)
217 self
.finish_name_packet(p
, questions
)
218 response
= self
.dns_transaction_udp(p
)
219 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_FORMERR
)
221 def test_qtype_all_query(self
):
222 "create a QTYPE_ALL query"
223 p
= self
.make_name_packet(dns
.DNS_OPCODE_QUERY
)
226 name
= "%s.%s" % (os
.getenv('SERVER'), self
.get_dns_domain())
227 q
= self
.make_name_question(name
, dns
.DNS_QTYPE_ALL
, dns
.DNS_QCLASS_IN
)
228 print "asking for ", q
.name
231 self
.finish_name_packet(p
, questions
)
232 response
= self
.dns_transaction_udp(p
)
235 dc_ipv6
= os
.getenv('SERVER_IPV6')
236 if dc_ipv6
is not None:
239 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_OK
)
240 self
.assert_dns_opcode_equals(response
, dns
.DNS_OPCODE_QUERY
)
241 self
.assertEquals(response
.ancount
, num_answers
)
242 self
.assertEquals(response
.answers
[0].rdata
,
243 os
.getenv('SERVER_IP'))
244 if dc_ipv6
is not None:
245 self
.assertEquals(response
.answers
[1].rdata
, dc_ipv6
)
247 def test_qclass_none_query(self
):
248 "create a QCLASS_NONE query"
249 p
= self
.make_name_packet(dns
.DNS_OPCODE_QUERY
)
252 name
= "%s.%s" % (os
.getenv('SERVER'), self
.get_dns_domain())
253 q
= self
.make_name_question(name
, dns
.DNS_QTYPE_ALL
, dns
.DNS_QCLASS_NONE
)
256 self
.finish_name_packet(p
, questions
)
257 response
= self
.dns_transaction_udp(p
)
258 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_NOTIMP
)
260 # Only returns an authority section entry in BIND and Win DNS
261 # FIXME: Enable one Samba implements this feature
262 # def test_soa_hostname_query(self):
263 # "create a SOA query for a hostname"
264 # p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
267 # name = "%s.%s" % (os.getenv('SERVER'), self.get_dns_domain())
268 # q = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
269 # questions.append(q)
271 # self.finish_name_packet(p, questions)
272 # response = self.dns_transaction_udp(p)
273 # self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
274 # self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
275 # # We don't get SOA records for single hosts
276 # self.assertEquals(response.ancount, 0)
278 def test_soa_domain_query(self
):
279 "create a SOA query for a domain"
280 p
= self
.make_name_packet(dns
.DNS_OPCODE_QUERY
)
283 name
= self
.get_dns_domain()
284 q
= self
.make_name_question(name
, dns
.DNS_QTYPE_SOA
, dns
.DNS_QCLASS_IN
)
287 self
.finish_name_packet(p
, questions
)
288 response
= self
.dns_transaction_udp(p
)
289 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_OK
)
290 self
.assert_dns_opcode_equals(response
, dns
.DNS_OPCODE_QUERY
)
291 self
.assertEquals(response
.ancount
, 1)
292 self
.assertEquals(response
.answers
[0].rdata
.minimum
, 3600)
295 class TestDNSUpdates(DNSTest
):
297 def test_two_updates(self
):
298 "create two update requests"
299 p
= self
.make_name_packet(dns
.DNS_OPCODE_UPDATE
)
302 name
= "%s.%s" % (os
.getenv('SERVER'), self
.get_dns_domain())
303 u
= self
.make_name_question(name
, dns
.DNS_QTYPE_A
, dns
.DNS_QCLASS_IN
)
306 name
= self
.get_dns_domain()
307 u
= self
.make_name_question(name
, dns
.DNS_QTYPE_A
, dns
.DNS_QCLASS_IN
)
310 self
.finish_name_packet(p
, updates
)
311 response
= self
.dns_transaction_udp(p
)
312 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_FORMERR
)
314 def test_update_wrong_qclass(self
):
315 "create update with DNS_QCLASS_NONE"
316 p
= self
.make_name_packet(dns
.DNS_OPCODE_UPDATE
)
319 name
= self
.get_dns_domain()
320 u
= self
.make_name_question(name
, dns
.DNS_QTYPE_A
, dns
.DNS_QCLASS_NONE
)
323 self
.finish_name_packet(p
, updates
)
324 response
= self
.dns_transaction_udp(p
)
325 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_NOTIMP
)
327 def test_update_prereq_with_non_null_ttl(self
):
328 "test update with a non-null TTL"
329 p
= self
.make_name_packet(dns
.DNS_OPCODE_UPDATE
)
332 name
= self
.get_dns_domain()
334 u
= self
.make_name_question(name
, dns
.DNS_QTYPE_SOA
, dns
.DNS_QCLASS_IN
)
336 self
.finish_name_packet(p
, updates
)
340 r
.name
= "%s.%s" % (os
.getenv('SERVER'), self
.get_dns_domain())
341 r
.rr_type
= dns
.DNS_QTYPE_TXT
342 r
.rr_class
= dns
.DNS_QCLASS_NONE
347 p
.ancount
= len(prereqs
)
350 response
= self
.dns_transaction_udp(p
)
351 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_FORMERR
)
353 # I'd love to test this one, but it segfaults. :)
354 # def test_update_prereq_with_non_null_length(self):
355 # "test update with a non-null length"
356 # p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
359 # name = self.get_dns_domain()
361 # u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
363 # self.finish_name_packet(p, updates)
367 # r.name = "%s.%s" % (os.getenv('SERVER'), self.get_dns_domain())
368 # r.rr_type = dns.DNS_QTYPE_TXT
369 # r.rr_class = dns.DNS_QCLASS_ANY
374 # p.ancount = len(prereqs)
375 # p.answers = prereqs
377 # response = self.dns_transaction_udp(p)
378 # self.assert_dns_rcode_equals(response, dns.DNS_RCODE_FORMERR)
380 def test_update_prereq_nonexisting_name(self
):
381 "test update with a nonexisting name"
382 p
= self
.make_name_packet(dns
.DNS_OPCODE_UPDATE
)
385 name
= self
.get_dns_domain()
387 u
= self
.make_name_question(name
, dns
.DNS_QTYPE_SOA
, dns
.DNS_QCLASS_IN
)
389 self
.finish_name_packet(p
, updates
)
393 r
.name
= "idontexist.%s" % self
.get_dns_domain()
394 r
.rr_type
= dns
.DNS_QTYPE_TXT
395 r
.rr_class
= dns
.DNS_QCLASS_ANY
400 p
.ancount
= len(prereqs
)
403 response
= self
.dns_transaction_udp(p
)
404 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_NXRRSET
)
406 def test_update_add_txt_record(self
):
407 "test adding records works"
408 p
= self
.make_name_packet(dns
.DNS_OPCODE_UPDATE
)
411 name
= self
.get_dns_domain()
413 u
= self
.make_name_question(name
, dns
.DNS_QTYPE_SOA
, dns
.DNS_QCLASS_IN
)
415 self
.finish_name_packet(p
, updates
)
419 r
.name
= "textrec.%s" % self
.get_dns_domain()
420 r
.rr_type
= dns
.DNS_QTYPE_TXT
421 r
.rr_class
= dns
.DNS_QCLASS_IN
424 rdata
= dns
.txt_record()
425 rdata
.txt
= '"This is a test"'
428 p
.nscount
= len(updates
)
431 response
= self
.dns_transaction_udp(p
)
432 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_OK
)
434 p
= self
.make_name_packet(dns
.DNS_OPCODE_QUERY
)
437 name
= "textrec.%s" % self
.get_dns_domain()
438 q
= self
.make_name_question(name
, dns
.DNS_QTYPE_TXT
, dns
.DNS_QCLASS_IN
)
441 self
.finish_name_packet(p
, questions
)
442 response
= self
.dns_transaction_udp(p
)
443 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_OK
)
444 self
.assertEquals(response
.ancount
, 1)
445 self
.assertEquals(response
.answers
[0].rdata
.txt
, '"This is a test"')
447 def test_update_add_two_txt_records(self
):
448 "test adding two txt records works"
449 p
= self
.make_name_packet(dns
.DNS_OPCODE_UPDATE
)
452 name
= self
.get_dns_domain()
454 u
= self
.make_name_question(name
, dns
.DNS_QTYPE_SOA
, dns
.DNS_QCLASS_IN
)
456 self
.finish_name_packet(p
, updates
)
460 r
.name
= "textrec2.%s" % self
.get_dns_domain()
461 r
.rr_type
= dns
.DNS_QTYPE_TXT
462 r
.rr_class
= dns
.DNS_QCLASS_IN
465 rdata
= dns
.txt_record()
466 rdata
.txt
= '"This is a test" "and this is a test, too"'
469 p
.nscount
= len(updates
)
472 response
= self
.dns_transaction_udp(p
)
473 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_OK
)
475 p
= self
.make_name_packet(dns
.DNS_OPCODE_QUERY
)
478 name
= "textrec2.%s" % self
.get_dns_domain()
479 q
= self
.make_name_question(name
, dns
.DNS_QTYPE_TXT
, dns
.DNS_QCLASS_IN
)
482 self
.finish_name_packet(p
, questions
)
483 response
= self
.dns_transaction_udp(p
)
484 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_OK
)
485 self
.assertEquals(response
.ancount
, 1)
486 self
.assertEquals(response
.answers
[0].rdata
.txt
, '"This is a test" "and this is a test, too"')
488 def test_delete_record(self
):
489 "Test if deleting records works"
491 NAME
= "deleterec.%s" % self
.get_dns_domain()
493 # First, create a record to make sure we have a record to delete.
494 p
= self
.make_name_packet(dns
.DNS_OPCODE_UPDATE
)
497 name
= self
.get_dns_domain()
499 u
= self
.make_name_question(name
, dns
.DNS_QTYPE_SOA
, dns
.DNS_QCLASS_IN
)
501 self
.finish_name_packet(p
, updates
)
506 r
.rr_type
= dns
.DNS_QTYPE_TXT
507 r
.rr_class
= dns
.DNS_QCLASS_IN
510 rdata
= dns
.txt_record()
511 rdata
.txt
= '"This is a test"'
514 p
.nscount
= len(updates
)
517 response
= self
.dns_transaction_udp(p
)
518 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_OK
)
520 # Now check the record is around
521 p
= self
.make_name_packet(dns
.DNS_OPCODE_QUERY
)
523 q
= self
.make_name_question(NAME
, dns
.DNS_QTYPE_TXT
, dns
.DNS_QCLASS_IN
)
526 self
.finish_name_packet(p
, questions
)
527 response
= self
.dns_transaction_udp(p
)
528 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_OK
)
530 # Now delete the record
531 p
= self
.make_name_packet(dns
.DNS_OPCODE_UPDATE
)
534 name
= self
.get_dns_domain()
536 u
= self
.make_name_question(name
, dns
.DNS_QTYPE_SOA
, dns
.DNS_QCLASS_IN
)
538 self
.finish_name_packet(p
, updates
)
543 r
.rr_type
= dns
.DNS_QTYPE_TXT
544 r
.rr_class
= dns
.DNS_QCLASS_NONE
547 rdata
= dns
.txt_record()
548 rdata
.txt
= '"This is a test"'
551 p
.nscount
= len(updates
)
554 response
= self
.dns_transaction_udp(p
)
555 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_OK
)
557 # And finally check it's gone
558 p
= self
.make_name_packet(dns
.DNS_OPCODE_QUERY
)
561 q
= self
.make_name_question(NAME
, dns
.DNS_QTYPE_TXT
, dns
.DNS_QCLASS_IN
)
564 self
.finish_name_packet(p
, questions
)
565 response
= self
.dns_transaction_udp(p
)
566 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_NXDOMAIN
)
568 def test_readd_record(self
):
569 "Test if adding, deleting and then readding a records works"
571 NAME
= "readdrec.%s" % self
.get_dns_domain()
574 p
= self
.make_name_packet(dns
.DNS_OPCODE_UPDATE
)
577 name
= self
.get_dns_domain()
579 u
= self
.make_name_question(name
, dns
.DNS_QTYPE_SOA
, dns
.DNS_QCLASS_IN
)
581 self
.finish_name_packet(p
, updates
)
586 r
.rr_type
= dns
.DNS_QTYPE_TXT
587 r
.rr_class
= dns
.DNS_QCLASS_IN
590 rdata
= dns
.txt_record()
591 rdata
.txt
= '"This is a test"'
594 p
.nscount
= len(updates
)
597 response
= self
.dns_transaction_udp(p
)
598 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_OK
)
600 # Now check the record is around
601 p
= self
.make_name_packet(dns
.DNS_OPCODE_QUERY
)
603 q
= self
.make_name_question(NAME
, dns
.DNS_QTYPE_TXT
, dns
.DNS_QCLASS_IN
)
606 self
.finish_name_packet(p
, questions
)
607 response
= self
.dns_transaction_udp(p
)
608 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_OK
)
610 # Now delete the record
611 p
= self
.make_name_packet(dns
.DNS_OPCODE_UPDATE
)
614 name
= self
.get_dns_domain()
616 u
= self
.make_name_question(name
, dns
.DNS_QTYPE_SOA
, dns
.DNS_QCLASS_IN
)
618 self
.finish_name_packet(p
, updates
)
623 r
.rr_type
= dns
.DNS_QTYPE_TXT
624 r
.rr_class
= dns
.DNS_QCLASS_NONE
627 rdata
= dns
.txt_record()
628 rdata
.txt
= '"This is a test"'
631 p
.nscount
= len(updates
)
634 response
= self
.dns_transaction_udp(p
)
635 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_OK
)
638 p
= self
.make_name_packet(dns
.DNS_OPCODE_QUERY
)
641 q
= self
.make_name_question(NAME
, dns
.DNS_QTYPE_TXT
, dns
.DNS_QCLASS_IN
)
644 self
.finish_name_packet(p
, questions
)
645 response
= self
.dns_transaction_udp(p
)
646 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_NXDOMAIN
)
648 # recreate the record
649 p
= self
.make_name_packet(dns
.DNS_OPCODE_UPDATE
)
652 name
= self
.get_dns_domain()
654 u
= self
.make_name_question(name
, dns
.DNS_QTYPE_SOA
, dns
.DNS_QCLASS_IN
)
656 self
.finish_name_packet(p
, updates
)
661 r
.rr_type
= dns
.DNS_QTYPE_TXT
662 r
.rr_class
= dns
.DNS_QCLASS_IN
665 rdata
= dns
.txt_record()
666 rdata
.txt
= '"This is a test"'
669 p
.nscount
= len(updates
)
672 response
= self
.dns_transaction_udp(p
)
673 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_OK
)
675 # Now check the record is around
676 p
= self
.make_name_packet(dns
.DNS_OPCODE_QUERY
)
678 q
= self
.make_name_question(NAME
, dns
.DNS_QTYPE_TXT
, dns
.DNS_QCLASS_IN
)
681 self
.finish_name_packet(p
, questions
)
682 response
= self
.dns_transaction_udp(p
)
683 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_OK
)
685 def test_update_add_mx_record(self
):
686 "test adding MX records works"
687 p
= self
.make_name_packet(dns
.DNS_OPCODE_UPDATE
)
690 name
= self
.get_dns_domain()
692 u
= self
.make_name_question(name
, dns
.DNS_QTYPE_SOA
, dns
.DNS_QCLASS_IN
)
694 self
.finish_name_packet(p
, updates
)
698 r
.name
= "%s" % self
.get_dns_domain()
699 r
.rr_type
= dns
.DNS_QTYPE_MX
700 r
.rr_class
= dns
.DNS_QCLASS_IN
703 rdata
= dns
.mx_record()
704 rdata
.preference
= 10
705 rdata
.exchange
= 'mail.%s' % self
.get_dns_domain()
708 p
.nscount
= len(updates
)
711 response
= self
.dns_transaction_udp(p
)
712 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_OK
)
714 p
= self
.make_name_packet(dns
.DNS_OPCODE_QUERY
)
717 name
= "%s" % self
.get_dns_domain()
718 q
= self
.make_name_question(name
, dns
.DNS_QTYPE_MX
, dns
.DNS_QCLASS_IN
)
721 self
.finish_name_packet(p
, questions
)
722 response
= self
.dns_transaction_udp(p
)
723 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_OK
)
724 self
.assertEqual(response
.ancount
, 1)
725 ans
= response
.answers
[0]
726 self
.assertEqual(ans
.rr_type
, dns
.DNS_QTYPE_MX
)
727 self
.assertEqual(ans
.rdata
.preference
, 10)
728 self
.assertEqual(ans
.rdata
.exchange
, 'mail.%s' % self
.get_dns_domain())
731 class TestComplexQueries(DNSTest
):
734 super(TestComplexQueries
, self
).setUp()
735 p
= self
.make_name_packet(dns
.DNS_OPCODE_UPDATE
)
738 name
= self
.get_dns_domain()
740 u
= self
.make_name_question(name
, dns
.DNS_QTYPE_SOA
, dns
.DNS_QCLASS_IN
)
742 self
.finish_name_packet(p
, updates
)
746 r
.name
= "cname_test.%s" % self
.get_dns_domain()
747 r
.rr_type
= dns
.DNS_QTYPE_CNAME
748 r
.rr_class
= dns
.DNS_QCLASS_IN
751 r
.rdata
= "%s.%s" % (os
.getenv('SERVER'), self
.get_dns_domain())
753 p
.nscount
= len(updates
)
756 response
= self
.dns_transaction_udp(p
)
757 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_OK
)
760 super(TestComplexQueries
, self
).tearDown()
761 p
= self
.make_name_packet(dns
.DNS_OPCODE_UPDATE
)
764 name
= self
.get_dns_domain()
766 u
= self
.make_name_question(name
, dns
.DNS_QTYPE_SOA
, dns
.DNS_QCLASS_IN
)
768 self
.finish_name_packet(p
, updates
)
772 r
.name
= "cname_test.%s" % self
.get_dns_domain()
773 r
.rr_type
= dns
.DNS_QTYPE_CNAME
774 r
.rr_class
= dns
.DNS_QCLASS_NONE
777 r
.rdata
= "%s.%s" % (os
.getenv('SERVER'), self
.get_dns_domain())
779 p
.nscount
= len(updates
)
782 response
= self
.dns_transaction_udp(p
)
783 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_OK
)
785 def test_one_a_query(self
):
786 "create a query packet containing one query record"
787 p
= self
.make_name_packet(dns
.DNS_OPCODE_QUERY
)
790 name
= "cname_test.%s" % self
.get_dns_domain()
791 q
= self
.make_name_question(name
, dns
.DNS_QTYPE_A
, dns
.DNS_QCLASS_IN
)
792 print "asking for ", q
.name
795 self
.finish_name_packet(p
, questions
)
796 response
= self
.dns_transaction_udp(p
)
797 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_OK
)
798 self
.assert_dns_opcode_equals(response
, dns
.DNS_OPCODE_QUERY
)
799 self
.assertEquals(response
.ancount
, 2)
800 self
.assertEquals(response
.answers
[0].rr_type
, dns
.DNS_QTYPE_CNAME
)
801 self
.assertEquals(response
.answers
[0].rdata
, "%s.%s" %
802 (os
.getenv('SERVER'), self
.get_dns_domain()))
803 self
.assertEquals(response
.answers
[1].rr_type
, dns
.DNS_QTYPE_A
)
804 self
.assertEquals(response
.answers
[1].rdata
,
805 os
.getenv('SERVER_IP'))
807 class TestInvalidQueries(DNSTest
):
809 def test_one_a_query(self
):
810 "send 0 bytes follows by create a query packet containing one query record"
814 s
= socket
.socket(socket
.AF_INET
, socket
.SOCK_DGRAM
, 0)
815 s
.connect((os
.getenv('SERVER_IP'), 53))
821 p
= self
.make_name_packet(dns
.DNS_OPCODE_QUERY
)
824 name
= "%s.%s" % (os
.getenv('SERVER'), self
.get_dns_domain())
825 q
= self
.make_name_question(name
, dns
.DNS_QTYPE_A
, dns
.DNS_QCLASS_IN
)
826 print "asking for ", q
.name
829 self
.finish_name_packet(p
, questions
)
830 response
= self
.dns_transaction_udp(p
)
831 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_OK
)
832 self
.assert_dns_opcode_equals(response
, dns
.DNS_OPCODE_QUERY
)
833 self
.assertEquals(response
.ancount
, 1)
834 self
.assertEquals(response
.answers
[0].rdata
,
835 os
.getenv('SERVER_IP'))
837 def test_one_a_reply(self
):
838 "send a reply instead of a query"
840 p
= self
.make_name_packet(dns
.DNS_OPCODE_QUERY
)
843 name
= "%s.%s" % ('fakefakefake', self
.get_dns_domain())
844 q
= self
.make_name_question(name
, dns
.DNS_QTYPE_A
, dns
.DNS_QCLASS_IN
)
845 print "asking for ", q
.name
848 self
.finish_name_packet(p
, questions
)
849 p
.operation |
= dns
.DNS_FLAG_REPLY
852 send_packet
= ndr
.ndr_pack(p
)
853 s
= socket
.socket(socket
.AF_INET
, socket
.SOCK_STREAM
, 0)
854 host
=os
.getenv('SERVER_IP')
855 s
.connect((host
, 53))
856 tcp_packet
= struct
.pack('!H', len(send_packet
))
857 tcp_packet
+= send_packet
858 s
.send(tcp_packet
, 0)
859 recv_packet
= s
.recv(0xffff + 2, 0)
860 self
.assertEquals(0, len(recv_packet
))
866 if __name__
== "__main__":