provision: Correctly provision the SOA record minimum TTL
[Samba.git] / python / samba / tests / dns.py
blobf93e13f36df41477ce6b38ccac2293a40cfc31a9
1 # Unix SMB/CIFS implementation.
2 # Copyright (C) Kai Blin <kai@samba.org> 2011
4 # This program is free software; you can redistribute it and/or modify
5 # it under the terms of the GNU General Public License as published by
6 # the Free Software Foundation; either version 3 of the License, or
7 # (at your option) any later version.
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
14 # You should have received a copy of the GNU General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 import os
19 import struct
20 import random
21 import socket
22 import samba.ndr as ndr
23 import samba.dcerpc.dns as dns
24 from samba.tests import TestCase
26 FILTER=''.join([(len(repr(chr(x)))==3) and chr(x) or '.' for x in range(256)])
29 class DNSTest(TestCase):
31 def errstr(self, errcode):
32 "Return a readable error code"
33 string_codes = [
34 "OK",
35 "FORMERR",
36 "SERVFAIL",
37 "NXDOMAIN",
38 "NOTIMP",
39 "REFUSED",
40 "YXDOMAIN",
41 "YXRRSET",
42 "NXRRSET",
43 "NOTAUTH",
44 "NOTZONE",
47 return string_codes[errcode]
50 def assert_dns_rcode_equals(self, packet, rcode):
51 "Helper function to check return code"
52 p_errcode = packet.operation & 0x000F
53 self.assertEquals(p_errcode, rcode, "Expected RCODE %s, got %s" %
54 (self.errstr(rcode), self.errstr(p_errcode)))
56 def assert_dns_opcode_equals(self, packet, opcode):
57 "Helper function to check opcode"
58 p_opcode = packet.operation & 0x7800
59 self.assertEquals(p_opcode, opcode, "Expected OPCODE %s, got %s" %
60 (opcode, p_opcode))
62 def make_name_packet(self, opcode, qid=None):
63 "Helper creating a dns.name_packet"
64 p = dns.name_packet()
65 if qid is None:
66 p.id = random.randint(0x0, 0xffff)
67 p.operation = opcode
68 p.questions = []
69 return p
71 def finish_name_packet(self, packet, questions):
72 "Helper to finalize a dns.name_packet"
73 packet.qdcount = len(questions)
74 packet.questions = questions
76 def make_name_question(self, name, qtype, qclass):
77 "Helper creating a dns.name_question"
78 q = dns.name_question()
79 q.name = name
80 q.question_type = qtype
81 q.question_class = qclass
82 return q
84 def get_dns_domain(self):
85 "Helper to get dns domain"
86 return os.getenv('REALM', 'example.com').lower()
88 def dns_transaction_udp(self, packet, host=os.getenv('SERVER_IP'), dump=False):
89 "send a DNS query and read the reply"
90 s = None
91 try:
92 send_packet = ndr.ndr_pack(packet)
93 if dump:
94 print self.hexdump(send_packet)
95 s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, 0)
96 s.connect((host, 53))
97 s.send(send_packet, 0)
98 recv_packet = s.recv(2048, 0)
99 if dump:
100 print self.hexdump(recv_packet)
101 return ndr.ndr_unpack(dns.name_packet, recv_packet)
102 finally:
103 if s is not None:
104 s.close()
106 def dns_transaction_tcp(self, packet, host=os.getenv('SERVER_IP'), dump=False):
107 "send a DNS query and read the reply"
108 s = None
109 try:
110 send_packet = ndr.ndr_pack(packet)
111 if dump:
112 print self.hexdump(send_packet)
113 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
114 s.connect((host, 53))
115 tcp_packet = struct.pack('!H', len(send_packet))
116 tcp_packet += send_packet
117 s.send(tcp_packet, 0)
118 recv_packet = s.recv(0xffff + 2, 0)
119 if dump:
120 print self.hexdump(recv_packet)
121 return ndr.ndr_unpack(dns.name_packet, recv_packet[2:])
122 finally:
123 if s is not None:
124 s.close()
126 def hexdump(self, src, length=8):
127 N=0; result=''
128 while src:
129 s,src = src[:length],src[length:]
130 hexa = ' '.join(["%02X"%ord(x) for x in s])
131 s = s.translate(FILTER)
132 result += "%04X %-*s %s\n" % (N, length*3, hexa, s)
133 N+=length
134 return result
136 class TestSimpleQueries(DNSTest):
138 def test_one_a_query(self):
139 "create a query packet containing one query record"
140 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
141 questions = []
143 name = "%s.%s" % (os.getenv('SERVER'), self.get_dns_domain())
144 q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
145 print "asking for ", q.name
146 questions.append(q)
148 self.finish_name_packet(p, questions)
149 response = self.dns_transaction_udp(p)
150 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
151 self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
152 self.assertEquals(response.ancount, 1)
153 self.assertEquals(response.answers[0].rdata,
154 os.getenv('SERVER_IP'))
156 def test_one_a_query_tcp(self):
157 "create a query packet containing one query record via TCP"
158 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
159 questions = []
161 name = "%s.%s" % (os.getenv('SERVER'), self.get_dns_domain())
162 q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
163 print "asking for ", q.name
164 questions.append(q)
166 self.finish_name_packet(p, questions)
167 response = self.dns_transaction_tcp(p)
168 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
169 self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
170 self.assertEquals(response.ancount, 1)
171 self.assertEquals(response.answers[0].rdata,
172 os.getenv('SERVER_IP'))
174 def test_one_mx_query(self):
175 "create a query packet causing an empty RCODE_OK answer"
176 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
177 questions = []
179 name = "%s.%s" % (os.getenv('SERVER'), self.get_dns_domain())
180 q = self.make_name_question(name, dns.DNS_QTYPE_MX, dns.DNS_QCLASS_IN)
181 print "asking for ", q.name
182 questions.append(q)
184 self.finish_name_packet(p, questions)
185 response = self.dns_transaction_udp(p)
186 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
187 self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
188 self.assertEquals(response.ancount, 0)
190 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
191 questions = []
193 name = "invalid-%s.%s" % (os.getenv('SERVER'), self.get_dns_domain())
194 q = self.make_name_question(name, dns.DNS_QTYPE_MX, dns.DNS_QCLASS_IN)
195 print "asking for ", q.name
196 questions.append(q)
198 self.finish_name_packet(p, questions)
199 response = self.dns_transaction_udp(p)
200 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NXDOMAIN)
201 self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
202 self.assertEquals(response.ancount, 0)
204 def test_two_queries(self):
205 "create a query packet containing two query records"
206 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
207 questions = []
209 name = "%s.%s" % (os.getenv('SERVER'), self.get_dns_domain())
210 q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
211 questions.append(q)
213 name = "%s.%s" % ('bogusname', self.get_dns_domain())
214 q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
215 questions.append(q)
217 self.finish_name_packet(p, questions)
218 response = self.dns_transaction_udp(p)
219 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_FORMERR)
221 def test_qtype_all_query(self):
222 "create a QTYPE_ALL query"
223 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
224 questions = []
226 name = "%s.%s" % (os.getenv('SERVER'), self.get_dns_domain())
227 q = self.make_name_question(name, dns.DNS_QTYPE_ALL, dns.DNS_QCLASS_IN)
228 print "asking for ", q.name
229 questions.append(q)
231 self.finish_name_packet(p, questions)
232 response = self.dns_transaction_udp(p)
234 num_answers = 1
235 dc_ipv6 = os.getenv('SERVER_IPV6')
236 if dc_ipv6 is not None:
237 num_answers += 1
239 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
240 self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
241 self.assertEquals(response.ancount, num_answers)
242 self.assertEquals(response.answers[0].rdata,
243 os.getenv('SERVER_IP'))
244 if dc_ipv6 is not None:
245 self.assertEquals(response.answers[1].rdata, dc_ipv6)
247 def test_qclass_none_query(self):
248 "create a QCLASS_NONE query"
249 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
250 questions = []
252 name = "%s.%s" % (os.getenv('SERVER'), self.get_dns_domain())
253 q = self.make_name_question(name, dns.DNS_QTYPE_ALL, dns.DNS_QCLASS_NONE)
254 questions.append(q)
256 self.finish_name_packet(p, questions)
257 response = self.dns_transaction_udp(p)
258 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NOTIMP)
260 # Only returns an authority section entry in BIND and Win DNS
261 # FIXME: Enable one Samba implements this feature
262 # def test_soa_hostname_query(self):
263 # "create a SOA query for a hostname"
264 # p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
265 # questions = []
267 # name = "%s.%s" % (os.getenv('SERVER'), self.get_dns_domain())
268 # q = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
269 # questions.append(q)
271 # self.finish_name_packet(p, questions)
272 # response = self.dns_transaction_udp(p)
273 # self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
274 # self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
275 # # We don't get SOA records for single hosts
276 # self.assertEquals(response.ancount, 0)
278 def test_soa_domain_query(self):
279 "create a SOA query for a domain"
280 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
281 questions = []
283 name = self.get_dns_domain()
284 q = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
285 questions.append(q)
287 self.finish_name_packet(p, questions)
288 response = self.dns_transaction_udp(p)
289 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
290 self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
291 self.assertEquals(response.ancount, 1)
292 self.assertEquals(response.answers[0].rdata.minimum, 3600)
295 class TestDNSUpdates(DNSTest):
297 def test_two_updates(self):
298 "create two update requests"
299 p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
300 updates = []
302 name = "%s.%s" % (os.getenv('SERVER'), self.get_dns_domain())
303 u = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
304 updates.append(u)
306 name = self.get_dns_domain()
307 u = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
308 updates.append(u)
310 self.finish_name_packet(p, updates)
311 response = self.dns_transaction_udp(p)
312 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_FORMERR)
314 def test_update_wrong_qclass(self):
315 "create update with DNS_QCLASS_NONE"
316 p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
317 updates = []
319 name = self.get_dns_domain()
320 u = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_NONE)
321 updates.append(u)
323 self.finish_name_packet(p, updates)
324 response = self.dns_transaction_udp(p)
325 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NOTIMP)
327 def test_update_prereq_with_non_null_ttl(self):
328 "test update with a non-null TTL"
329 p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
330 updates = []
332 name = self.get_dns_domain()
334 u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
335 updates.append(u)
336 self.finish_name_packet(p, updates)
338 prereqs = []
339 r = dns.res_rec()
340 r.name = "%s.%s" % (os.getenv('SERVER'), self.get_dns_domain())
341 r.rr_type = dns.DNS_QTYPE_TXT
342 r.rr_class = dns.DNS_QCLASS_NONE
343 r.ttl = 1
344 r.length = 0
345 prereqs.append(r)
347 p.ancount = len(prereqs)
348 p.answers = prereqs
350 response = self.dns_transaction_udp(p)
351 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_FORMERR)
353 # I'd love to test this one, but it segfaults. :)
354 # def test_update_prereq_with_non_null_length(self):
355 # "test update with a non-null length"
356 # p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
357 # updates = []
359 # name = self.get_dns_domain()
361 # u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
362 # updates.append(u)
363 # self.finish_name_packet(p, updates)
365 # prereqs = []
366 # r = dns.res_rec()
367 # r.name = "%s.%s" % (os.getenv('SERVER'), self.get_dns_domain())
368 # r.rr_type = dns.DNS_QTYPE_TXT
369 # r.rr_class = dns.DNS_QCLASS_ANY
370 # r.ttl = 0
371 # r.length = 1
372 # prereqs.append(r)
374 # p.ancount = len(prereqs)
375 # p.answers = prereqs
377 # response = self.dns_transaction_udp(p)
378 # self.assert_dns_rcode_equals(response, dns.DNS_RCODE_FORMERR)
380 def test_update_prereq_nonexisting_name(self):
381 "test update with a nonexisting name"
382 p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
383 updates = []
385 name = self.get_dns_domain()
387 u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
388 updates.append(u)
389 self.finish_name_packet(p, updates)
391 prereqs = []
392 r = dns.res_rec()
393 r.name = "idontexist.%s" % self.get_dns_domain()
394 r.rr_type = dns.DNS_QTYPE_TXT
395 r.rr_class = dns.DNS_QCLASS_ANY
396 r.ttl = 0
397 r.length = 0
398 prereqs.append(r)
400 p.ancount = len(prereqs)
401 p.answers = prereqs
403 response = self.dns_transaction_udp(p)
404 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NXRRSET)
406 def test_update_add_txt_record(self):
407 "test adding records works"
408 p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
409 updates = []
411 name = self.get_dns_domain()
413 u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
414 updates.append(u)
415 self.finish_name_packet(p, updates)
417 updates = []
418 r = dns.res_rec()
419 r.name = "textrec.%s" % self.get_dns_domain()
420 r.rr_type = dns.DNS_QTYPE_TXT
421 r.rr_class = dns.DNS_QCLASS_IN
422 r.ttl = 900
423 r.length = 0xffff
424 rdata = dns.txt_record()
425 rdata.txt = '"This is a test"'
426 r.rdata = rdata
427 updates.append(r)
428 p.nscount = len(updates)
429 p.nsrecs = updates
431 response = self.dns_transaction_udp(p)
432 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
434 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
435 questions = []
437 name = "textrec.%s" % self.get_dns_domain()
438 q = self.make_name_question(name, dns.DNS_QTYPE_TXT, dns.DNS_QCLASS_IN)
439 questions.append(q)
441 self.finish_name_packet(p, questions)
442 response = self.dns_transaction_udp(p)
443 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
444 self.assertEquals(response.ancount, 1)
445 self.assertEquals(response.answers[0].rdata.txt, '"This is a test"')
447 def test_update_add_two_txt_records(self):
448 "test adding two txt records works"
449 p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
450 updates = []
452 name = self.get_dns_domain()
454 u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
455 updates.append(u)
456 self.finish_name_packet(p, updates)
458 updates = []
459 r = dns.res_rec()
460 r.name = "textrec2.%s" % self.get_dns_domain()
461 r.rr_type = dns.DNS_QTYPE_TXT
462 r.rr_class = dns.DNS_QCLASS_IN
463 r.ttl = 900
464 r.length = 0xffff
465 rdata = dns.txt_record()
466 rdata.txt = '"This is a test" "and this is a test, too"'
467 r.rdata = rdata
468 updates.append(r)
469 p.nscount = len(updates)
470 p.nsrecs = updates
472 response = self.dns_transaction_udp(p)
473 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
475 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
476 questions = []
478 name = "textrec2.%s" % self.get_dns_domain()
479 q = self.make_name_question(name, dns.DNS_QTYPE_TXT, dns.DNS_QCLASS_IN)
480 questions.append(q)
482 self.finish_name_packet(p, questions)
483 response = self.dns_transaction_udp(p)
484 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
485 self.assertEquals(response.ancount, 1)
486 self.assertEquals(response.answers[0].rdata.txt, '"This is a test" "and this is a test, too"')
488 def test_delete_record(self):
489 "Test if deleting records works"
491 NAME = "deleterec.%s" % self.get_dns_domain()
493 # First, create a record to make sure we have a record to delete.
494 p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
495 updates = []
497 name = self.get_dns_domain()
499 u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
500 updates.append(u)
501 self.finish_name_packet(p, updates)
503 updates = []
504 r = dns.res_rec()
505 r.name = NAME
506 r.rr_type = dns.DNS_QTYPE_TXT
507 r.rr_class = dns.DNS_QCLASS_IN
508 r.ttl = 900
509 r.length = 0xffff
510 rdata = dns.txt_record()
511 rdata.txt = '"This is a test"'
512 r.rdata = rdata
513 updates.append(r)
514 p.nscount = len(updates)
515 p.nsrecs = updates
517 response = self.dns_transaction_udp(p)
518 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
520 # Now check the record is around
521 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
522 questions = []
523 q = self.make_name_question(NAME, dns.DNS_QTYPE_TXT, dns.DNS_QCLASS_IN)
524 questions.append(q)
526 self.finish_name_packet(p, questions)
527 response = self.dns_transaction_udp(p)
528 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
530 # Now delete the record
531 p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
532 updates = []
534 name = self.get_dns_domain()
536 u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
537 updates.append(u)
538 self.finish_name_packet(p, updates)
540 updates = []
541 r = dns.res_rec()
542 r.name = NAME
543 r.rr_type = dns.DNS_QTYPE_TXT
544 r.rr_class = dns.DNS_QCLASS_NONE
545 r.ttl = 0
546 r.length = 0xffff
547 rdata = dns.txt_record()
548 rdata.txt = '"This is a test"'
549 r.rdata = rdata
550 updates.append(r)
551 p.nscount = len(updates)
552 p.nsrecs = updates
554 response = self.dns_transaction_udp(p)
555 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
557 # And finally check it's gone
558 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
559 questions = []
561 q = self.make_name_question(NAME, dns.DNS_QTYPE_TXT, dns.DNS_QCLASS_IN)
562 questions.append(q)
564 self.finish_name_packet(p, questions)
565 response = self.dns_transaction_udp(p)
566 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NXDOMAIN)
568 def test_readd_record(self):
569 "Test if adding, deleting and then readding a records works"
571 NAME = "readdrec.%s" % self.get_dns_domain()
573 # Create the record
574 p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
575 updates = []
577 name = self.get_dns_domain()
579 u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
580 updates.append(u)
581 self.finish_name_packet(p, updates)
583 updates = []
584 r = dns.res_rec()
585 r.name = NAME
586 r.rr_type = dns.DNS_QTYPE_TXT
587 r.rr_class = dns.DNS_QCLASS_IN
588 r.ttl = 900
589 r.length = 0xffff
590 rdata = dns.txt_record()
591 rdata.txt = '"This is a test"'
592 r.rdata = rdata
593 updates.append(r)
594 p.nscount = len(updates)
595 p.nsrecs = updates
597 response = self.dns_transaction_udp(p)
598 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
600 # Now check the record is around
601 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
602 questions = []
603 q = self.make_name_question(NAME, dns.DNS_QTYPE_TXT, dns.DNS_QCLASS_IN)
604 questions.append(q)
606 self.finish_name_packet(p, questions)
607 response = self.dns_transaction_udp(p)
608 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
610 # Now delete the record
611 p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
612 updates = []
614 name = self.get_dns_domain()
616 u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
617 updates.append(u)
618 self.finish_name_packet(p, updates)
620 updates = []
621 r = dns.res_rec()
622 r.name = NAME
623 r.rr_type = dns.DNS_QTYPE_TXT
624 r.rr_class = dns.DNS_QCLASS_NONE
625 r.ttl = 0
626 r.length = 0xffff
627 rdata = dns.txt_record()
628 rdata.txt = '"This is a test"'
629 r.rdata = rdata
630 updates.append(r)
631 p.nscount = len(updates)
632 p.nsrecs = updates
634 response = self.dns_transaction_udp(p)
635 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
637 # check it's gone
638 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
639 questions = []
641 q = self.make_name_question(NAME, dns.DNS_QTYPE_TXT, dns.DNS_QCLASS_IN)
642 questions.append(q)
644 self.finish_name_packet(p, questions)
645 response = self.dns_transaction_udp(p)
646 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NXDOMAIN)
648 # recreate the record
649 p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
650 updates = []
652 name = self.get_dns_domain()
654 u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
655 updates.append(u)
656 self.finish_name_packet(p, updates)
658 updates = []
659 r = dns.res_rec()
660 r.name = NAME
661 r.rr_type = dns.DNS_QTYPE_TXT
662 r.rr_class = dns.DNS_QCLASS_IN
663 r.ttl = 900
664 r.length = 0xffff
665 rdata = dns.txt_record()
666 rdata.txt = '"This is a test"'
667 r.rdata = rdata
668 updates.append(r)
669 p.nscount = len(updates)
670 p.nsrecs = updates
672 response = self.dns_transaction_udp(p)
673 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
675 # Now check the record is around
676 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
677 questions = []
678 q = self.make_name_question(NAME, dns.DNS_QTYPE_TXT, dns.DNS_QCLASS_IN)
679 questions.append(q)
681 self.finish_name_packet(p, questions)
682 response = self.dns_transaction_udp(p)
683 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
685 def test_update_add_mx_record(self):
686 "test adding MX records works"
687 p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
688 updates = []
690 name = self.get_dns_domain()
692 u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
693 updates.append(u)
694 self.finish_name_packet(p, updates)
696 updates = []
697 r = dns.res_rec()
698 r.name = "%s" % self.get_dns_domain()
699 r.rr_type = dns.DNS_QTYPE_MX
700 r.rr_class = dns.DNS_QCLASS_IN
701 r.ttl = 900
702 r.length = 0xffff
703 rdata = dns.mx_record()
704 rdata.preference = 10
705 rdata.exchange = 'mail.%s' % self.get_dns_domain()
706 r.rdata = rdata
707 updates.append(r)
708 p.nscount = len(updates)
709 p.nsrecs = updates
711 response = self.dns_transaction_udp(p)
712 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
714 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
715 questions = []
717 name = "%s" % self.get_dns_domain()
718 q = self.make_name_question(name, dns.DNS_QTYPE_MX, dns.DNS_QCLASS_IN)
719 questions.append(q)
721 self.finish_name_packet(p, questions)
722 response = self.dns_transaction_udp(p)
723 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
724 self.assertEqual(response.ancount, 1)
725 ans = response.answers[0]
726 self.assertEqual(ans.rr_type, dns.DNS_QTYPE_MX)
727 self.assertEqual(ans.rdata.preference, 10)
728 self.assertEqual(ans.rdata.exchange, 'mail.%s' % self.get_dns_domain())
731 class TestComplexQueries(DNSTest):
733 def setUp(self):
734 super(TestComplexQueries, self).setUp()
735 p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
736 updates = []
738 name = self.get_dns_domain()
740 u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
741 updates.append(u)
742 self.finish_name_packet(p, updates)
744 updates = []
745 r = dns.res_rec()
746 r.name = "cname_test.%s" % self.get_dns_domain()
747 r.rr_type = dns.DNS_QTYPE_CNAME
748 r.rr_class = dns.DNS_QCLASS_IN
749 r.ttl = 900
750 r.length = 0xffff
751 r.rdata = "%s.%s" % (os.getenv('SERVER'), self.get_dns_domain())
752 updates.append(r)
753 p.nscount = len(updates)
754 p.nsrecs = updates
756 response = self.dns_transaction_udp(p)
757 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
759 def tearDown(self):
760 super(TestComplexQueries, self).tearDown()
761 p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
762 updates = []
764 name = self.get_dns_domain()
766 u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
767 updates.append(u)
768 self.finish_name_packet(p, updates)
770 updates = []
771 r = dns.res_rec()
772 r.name = "cname_test.%s" % self.get_dns_domain()
773 r.rr_type = dns.DNS_QTYPE_CNAME
774 r.rr_class = dns.DNS_QCLASS_NONE
775 r.ttl = 0
776 r.length = 0xffff
777 r.rdata = "%s.%s" % (os.getenv('SERVER'), self.get_dns_domain())
778 updates.append(r)
779 p.nscount = len(updates)
780 p.nsrecs = updates
782 response = self.dns_transaction_udp(p)
783 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
785 def test_one_a_query(self):
786 "create a query packet containing one query record"
787 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
788 questions = []
790 name = "cname_test.%s" % self.get_dns_domain()
791 q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
792 print "asking for ", q.name
793 questions.append(q)
795 self.finish_name_packet(p, questions)
796 response = self.dns_transaction_udp(p)
797 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
798 self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
799 self.assertEquals(response.ancount, 2)
800 self.assertEquals(response.answers[0].rr_type, dns.DNS_QTYPE_CNAME)
801 self.assertEquals(response.answers[0].rdata, "%s.%s" %
802 (os.getenv('SERVER'), self.get_dns_domain()))
803 self.assertEquals(response.answers[1].rr_type, dns.DNS_QTYPE_A)
804 self.assertEquals(response.answers[1].rdata,
805 os.getenv('SERVER_IP'))
807 class TestInvalidQueries(DNSTest):
809 def test_one_a_query(self):
810 "send 0 bytes follows by create a query packet containing one query record"
812 s = None
813 try:
814 s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, 0)
815 s.connect((os.getenv('SERVER_IP'), 53))
816 s.send("", 0)
817 finally:
818 if s is not None:
819 s.close()
821 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
822 questions = []
824 name = "%s.%s" % (os.getenv('SERVER'), self.get_dns_domain())
825 q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
826 print "asking for ", q.name
827 questions.append(q)
829 self.finish_name_packet(p, questions)
830 response = self.dns_transaction_udp(p)
831 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
832 self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
833 self.assertEquals(response.ancount, 1)
834 self.assertEquals(response.answers[0].rdata,
835 os.getenv('SERVER_IP'))
837 def test_one_a_reply(self):
838 "send a reply instead of a query"
840 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
841 questions = []
843 name = "%s.%s" % ('fakefakefake', self.get_dns_domain())
844 q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
845 print "asking for ", q.name
846 questions.append(q)
848 self.finish_name_packet(p, questions)
849 p.operation |= dns.DNS_FLAG_REPLY
850 s = None
851 try:
852 send_packet = ndr.ndr_pack(p)
853 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
854 host=os.getenv('SERVER_IP')
855 s.connect((host, 53))
856 tcp_packet = struct.pack('!H', len(send_packet))
857 tcp_packet += send_packet
858 s.send(tcp_packet, 0)
859 recv_packet = s.recv(0xffff + 2, 0)
860 self.assertEquals(0, len(recv_packet))
861 finally:
862 if s is not None:
863 s.close()
866 if __name__ == "__main__":
867 import unittest
868 unittest.main()