s4:gensec/gssapi: use gensec_gssapi_max_{input,wrapped}_size() for all backends
[Samba.git] / python / samba / tests / dns.py
blob92ac876ff356402a7777751b501f08f6d64b900a
1 # Unix SMB/CIFS implementation.
2 # Copyright (C) Kai Blin <kai@samba.org> 2011
4 # This program is free software; you can redistribute it and/or modify
5 # it under the terms of the GNU General Public License as published by
6 # the Free Software Foundation; either version 3 of the License, or
7 # (at your option) any later version.
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
14 # You should have received a copy of the GNU General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 import os
19 import struct
20 import random
21 import socket
22 import samba.ndr as ndr
23 import samba.dcerpc.dns as dns
24 from samba import credentials, param
25 from samba.tests import TestCase
26 from samba.dcerpc import dnsp, dnsserver
28 FILTER=''.join([(len(repr(chr(x)))==3) and chr(x) or '.' for x in range(256)])
31 class DNSTest(TestCase):
33 def errstr(self, errcode):
34 "Return a readable error code"
35 string_codes = [
36 "OK",
37 "FORMERR",
38 "SERVFAIL",
39 "NXDOMAIN",
40 "NOTIMP",
41 "REFUSED",
42 "YXDOMAIN",
43 "YXRRSET",
44 "NXRRSET",
45 "NOTAUTH",
46 "NOTZONE",
49 return string_codes[errcode]
52 def assert_dns_rcode_equals(self, packet, rcode):
53 "Helper function to check return code"
54 p_errcode = packet.operation & 0x000F
55 self.assertEquals(p_errcode, rcode, "Expected RCODE %s, got %s" %
56 (self.errstr(rcode), self.errstr(p_errcode)))
58 def assert_dns_opcode_equals(self, packet, opcode):
59 "Helper function to check opcode"
60 p_opcode = packet.operation & 0x7800
61 self.assertEquals(p_opcode, opcode, "Expected OPCODE %s, got %s" %
62 (opcode, p_opcode))
64 def make_name_packet(self, opcode, qid=None):
65 "Helper creating a dns.name_packet"
66 p = dns.name_packet()
67 if qid is None:
68 p.id = random.randint(0x0, 0xffff)
69 p.operation = opcode
70 p.questions = []
71 return p
73 def finish_name_packet(self, packet, questions):
74 "Helper to finalize a dns.name_packet"
75 packet.qdcount = len(questions)
76 packet.questions = questions
78 def make_name_question(self, name, qtype, qclass):
79 "Helper creating a dns.name_question"
80 q = dns.name_question()
81 q.name = name
82 q.question_type = qtype
83 q.question_class = qclass
84 return q
86 def get_dns_domain(self):
87 "Helper to get dns domain"
88 return os.getenv('REALM', 'example.com').lower()
90 def dns_transaction_udp(self, packet, host=os.getenv('SERVER_IP'), dump=False):
91 "send a DNS query and read the reply"
92 s = None
93 try:
94 send_packet = ndr.ndr_pack(packet)
95 if dump:
96 print self.hexdump(send_packet)
97 s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, 0)
98 s.connect((host, 53))
99 s.send(send_packet, 0)
100 recv_packet = s.recv(2048, 0)
101 if dump:
102 print self.hexdump(recv_packet)
103 return ndr.ndr_unpack(dns.name_packet, recv_packet)
104 finally:
105 if s is not None:
106 s.close()
108 def dns_transaction_tcp(self, packet, host=os.getenv('SERVER_IP'), dump=False):
109 "send a DNS query and read the reply"
110 s = None
111 try:
112 send_packet = ndr.ndr_pack(packet)
113 if dump:
114 print self.hexdump(send_packet)
115 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
116 s.connect((host, 53))
117 tcp_packet = struct.pack('!H', len(send_packet))
118 tcp_packet += send_packet
119 s.send(tcp_packet, 0)
120 recv_packet = s.recv(0xffff + 2, 0)
121 if dump:
122 print self.hexdump(recv_packet)
123 return ndr.ndr_unpack(dns.name_packet, recv_packet[2:])
124 finally:
125 if s is not None:
126 s.close()
128 def hexdump(self, src, length=8):
129 N = 0
130 result = ''
131 while src:
132 s, src = src[:length], src[length:]
133 hexa = ' '.join(["%02X" % ord(x) for x in s])
134 s = s.translate(FILTER)
135 result += "%04X %-*s %s\n" % (N, length*3, hexa, s)
136 N += length
137 return result
140 class TestSimpleQueries(DNSTest):
142 def test_one_a_query(self):
143 "create a query packet containing one query record"
144 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
145 questions = []
147 name = "%s.%s" % (os.getenv('SERVER'), self.get_dns_domain())
148 q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
149 print "asking for ", q.name
150 questions.append(q)
152 self.finish_name_packet(p, questions)
153 response = self.dns_transaction_udp(p)
154 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
155 self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
156 self.assertEquals(response.ancount, 1)
157 self.assertEquals(response.answers[0].rdata,
158 os.getenv('SERVER_IP'))
160 def test_one_a_query_tcp(self):
161 "create a query packet containing one query record via TCP"
162 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
163 questions = []
165 name = "%s.%s" % (os.getenv('SERVER'), self.get_dns_domain())
166 q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
167 print "asking for ", q.name
168 questions.append(q)
170 self.finish_name_packet(p, questions)
171 response = self.dns_transaction_tcp(p)
172 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
173 self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
174 self.assertEquals(response.ancount, 1)
175 self.assertEquals(response.answers[0].rdata,
176 os.getenv('SERVER_IP'))
178 def test_one_mx_query(self):
179 "create a query packet causing an empty RCODE_OK answer"
180 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
181 questions = []
183 name = "%s.%s" % (os.getenv('SERVER'), self.get_dns_domain())
184 q = self.make_name_question(name, dns.DNS_QTYPE_MX, dns.DNS_QCLASS_IN)
185 print "asking for ", q.name
186 questions.append(q)
188 self.finish_name_packet(p, questions)
189 response = self.dns_transaction_udp(p)
190 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
191 self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
192 self.assertEquals(response.ancount, 0)
194 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
195 questions = []
197 name = "invalid-%s.%s" % (os.getenv('SERVER'), self.get_dns_domain())
198 q = self.make_name_question(name, dns.DNS_QTYPE_MX, dns.DNS_QCLASS_IN)
199 print "asking for ", q.name
200 questions.append(q)
202 self.finish_name_packet(p, questions)
203 response = self.dns_transaction_udp(p)
204 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NXDOMAIN)
205 self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
206 self.assertEquals(response.ancount, 0)
208 def test_two_queries(self):
209 "create a query packet containing two query records"
210 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
211 questions = []
213 name = "%s.%s" % (os.getenv('SERVER'), self.get_dns_domain())
214 q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
215 questions.append(q)
217 name = "%s.%s" % ('bogusname', self.get_dns_domain())
218 q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
219 questions.append(q)
221 self.finish_name_packet(p, questions)
222 response = self.dns_transaction_udp(p)
223 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_FORMERR)
225 def test_qtype_all_query(self):
226 "create a QTYPE_ALL query"
227 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
228 questions = []
230 name = "%s.%s" % (os.getenv('SERVER'), self.get_dns_domain())
231 q = self.make_name_question(name, dns.DNS_QTYPE_ALL, dns.DNS_QCLASS_IN)
232 print "asking for ", q.name
233 questions.append(q)
235 self.finish_name_packet(p, questions)
236 response = self.dns_transaction_udp(p)
238 num_answers = 1
239 dc_ipv6 = os.getenv('SERVER_IPV6')
240 if dc_ipv6 is not None:
241 num_answers += 1
243 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
244 self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
245 self.assertEquals(response.ancount, num_answers)
246 self.assertEquals(response.answers[0].rdata,
247 os.getenv('SERVER_IP'))
248 if dc_ipv6 is not None:
249 self.assertEquals(response.answers[1].rdata, dc_ipv6)
251 def test_qclass_none_query(self):
252 "create a QCLASS_NONE query"
253 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
254 questions = []
256 name = "%s.%s" % (os.getenv('SERVER'), self.get_dns_domain())
257 q = self.make_name_question(name, dns.DNS_QTYPE_ALL, dns.DNS_QCLASS_NONE)
258 questions.append(q)
260 self.finish_name_packet(p, questions)
261 response = self.dns_transaction_udp(p)
262 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NOTIMP)
264 # Only returns an authority section entry in BIND and Win DNS
265 # FIXME: Enable one Samba implements this feature
266 # def test_soa_hostname_query(self):
267 # "create a SOA query for a hostname"
268 # p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
269 # questions = []
271 # name = "%s.%s" % (os.getenv('SERVER'), self.get_dns_domain())
272 # q = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
273 # questions.append(q)
275 # self.finish_name_packet(p, questions)
276 # response = self.dns_transaction_udp(p)
277 # self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
278 # self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
279 # # We don't get SOA records for single hosts
280 # self.assertEquals(response.ancount, 0)
282 def test_soa_domain_query(self):
283 "create a SOA query for a domain"
284 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
285 questions = []
287 name = self.get_dns_domain()
288 q = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
289 questions.append(q)
291 self.finish_name_packet(p, questions)
292 response = self.dns_transaction_udp(p)
293 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
294 self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
295 self.assertEquals(response.ancount, 1)
296 self.assertEquals(response.answers[0].rdata.minimum, 3600)
299 class TestDNSUpdates(DNSTest):
301 def test_two_updates(self):
302 "create two update requests"
303 p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
304 updates = []
306 name = "%s.%s" % (os.getenv('SERVER'), self.get_dns_domain())
307 u = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
308 updates.append(u)
310 name = self.get_dns_domain()
311 u = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
312 updates.append(u)
314 self.finish_name_packet(p, updates)
315 response = self.dns_transaction_udp(p)
316 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_FORMERR)
318 def test_update_wrong_qclass(self):
319 "create update with DNS_QCLASS_NONE"
320 p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
321 updates = []
323 name = self.get_dns_domain()
324 u = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_NONE)
325 updates.append(u)
327 self.finish_name_packet(p, updates)
328 response = self.dns_transaction_udp(p)
329 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NOTIMP)
331 def test_update_prereq_with_non_null_ttl(self):
332 "test update with a non-null TTL"
333 p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
334 updates = []
336 name = self.get_dns_domain()
338 u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
339 updates.append(u)
340 self.finish_name_packet(p, updates)
342 prereqs = []
343 r = dns.res_rec()
344 r.name = "%s.%s" % (os.getenv('SERVER'), self.get_dns_domain())
345 r.rr_type = dns.DNS_QTYPE_TXT
346 r.rr_class = dns.DNS_QCLASS_NONE
347 r.ttl = 1
348 r.length = 0
349 prereqs.append(r)
351 p.ancount = len(prereqs)
352 p.answers = prereqs
354 response = self.dns_transaction_udp(p)
355 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_FORMERR)
357 # I'd love to test this one, but it segfaults. :)
358 # def test_update_prereq_with_non_null_length(self):
359 # "test update with a non-null length"
360 # p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
361 # updates = []
363 # name = self.get_dns_domain()
365 # u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
366 # updates.append(u)
367 # self.finish_name_packet(p, updates)
369 # prereqs = []
370 # r = dns.res_rec()
371 # r.name = "%s.%s" % (os.getenv('SERVER'), self.get_dns_domain())
372 # r.rr_type = dns.DNS_QTYPE_TXT
373 # r.rr_class = dns.DNS_QCLASS_ANY
374 # r.ttl = 0
375 # r.length = 1
376 # prereqs.append(r)
378 # p.ancount = len(prereqs)
379 # p.answers = prereqs
381 # response = self.dns_transaction_udp(p)
382 # self.assert_dns_rcode_equals(response, dns.DNS_RCODE_FORMERR)
384 def test_update_prereq_nonexisting_name(self):
385 "test update with a nonexisting name"
386 p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
387 updates = []
389 name = self.get_dns_domain()
391 u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
392 updates.append(u)
393 self.finish_name_packet(p, updates)
395 prereqs = []
396 r = dns.res_rec()
397 r.name = "idontexist.%s" % self.get_dns_domain()
398 r.rr_type = dns.DNS_QTYPE_TXT
399 r.rr_class = dns.DNS_QCLASS_ANY
400 r.ttl = 0
401 r.length = 0
402 prereqs.append(r)
404 p.ancount = len(prereqs)
405 p.answers = prereqs
407 response = self.dns_transaction_udp(p)
408 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NXRRSET)
410 def test_update_add_txt_record(self):
411 "test adding records works"
412 p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
413 updates = []
415 name = self.get_dns_domain()
417 u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
418 updates.append(u)
419 self.finish_name_packet(p, updates)
421 updates = []
422 r = dns.res_rec()
423 r.name = "textrec.%s" % self.get_dns_domain()
424 r.rr_type = dns.DNS_QTYPE_TXT
425 r.rr_class = dns.DNS_QCLASS_IN
426 r.ttl = 900
427 r.length = 0xffff
428 rdata = dns.txt_record()
429 rdata.txt = '"This is a test"'
430 r.rdata = rdata
431 updates.append(r)
432 p.nscount = len(updates)
433 p.nsrecs = updates
435 response = self.dns_transaction_udp(p)
436 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
438 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
439 questions = []
441 name = "textrec.%s" % self.get_dns_domain()
442 q = self.make_name_question(name, dns.DNS_QTYPE_TXT, dns.DNS_QCLASS_IN)
443 questions.append(q)
445 self.finish_name_packet(p, questions)
446 response = self.dns_transaction_udp(p)
447 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
448 self.assertEquals(response.ancount, 1)
449 self.assertEquals(response.answers[0].rdata.txt, '"This is a test"')
451 def test_update_add_two_txt_records(self):
452 "test adding two txt records works"
453 p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
454 updates = []
456 name = self.get_dns_domain()
458 u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
459 updates.append(u)
460 self.finish_name_packet(p, updates)
462 updates = []
463 r = dns.res_rec()
464 r.name = "textrec2.%s" % self.get_dns_domain()
465 r.rr_type = dns.DNS_QTYPE_TXT
466 r.rr_class = dns.DNS_QCLASS_IN
467 r.ttl = 900
468 r.length = 0xffff
469 rdata = dns.txt_record()
470 rdata.txt = '"This is a test" "and this is a test, too"'
471 r.rdata = rdata
472 updates.append(r)
473 p.nscount = len(updates)
474 p.nsrecs = updates
476 response = self.dns_transaction_udp(p)
477 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
479 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
480 questions = []
482 name = "textrec2.%s" % self.get_dns_domain()
483 q = self.make_name_question(name, dns.DNS_QTYPE_TXT, dns.DNS_QCLASS_IN)
484 questions.append(q)
486 self.finish_name_packet(p, questions)
487 response = self.dns_transaction_udp(p)
488 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
489 self.assertEquals(response.ancount, 1)
490 self.assertEquals(response.answers[0].rdata.txt, '"This is a test" "and this is a test, too"')
492 def test_delete_record(self):
493 "Test if deleting records works"
495 NAME = "deleterec.%s" % self.get_dns_domain()
497 # First, create a record to make sure we have a record to delete.
498 p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
499 updates = []
501 name = self.get_dns_domain()
503 u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
504 updates.append(u)
505 self.finish_name_packet(p, updates)
507 updates = []
508 r = dns.res_rec()
509 r.name = NAME
510 r.rr_type = dns.DNS_QTYPE_TXT
511 r.rr_class = dns.DNS_QCLASS_IN
512 r.ttl = 900
513 r.length = 0xffff
514 rdata = dns.txt_record()
515 rdata.txt = '"This is a test"'
516 r.rdata = rdata
517 updates.append(r)
518 p.nscount = len(updates)
519 p.nsrecs = updates
521 response = self.dns_transaction_udp(p)
522 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
524 # Now check the record is around
525 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
526 questions = []
527 q = self.make_name_question(NAME, dns.DNS_QTYPE_TXT, dns.DNS_QCLASS_IN)
528 questions.append(q)
530 self.finish_name_packet(p, questions)
531 response = self.dns_transaction_udp(p)
532 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
534 # Now delete the record
535 p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
536 updates = []
538 name = self.get_dns_domain()
540 u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
541 updates.append(u)
542 self.finish_name_packet(p, updates)
544 updates = []
545 r = dns.res_rec()
546 r.name = NAME
547 r.rr_type = dns.DNS_QTYPE_TXT
548 r.rr_class = dns.DNS_QCLASS_NONE
549 r.ttl = 0
550 r.length = 0xffff
551 rdata = dns.txt_record()
552 rdata.txt = '"This is a test"'
553 r.rdata = rdata
554 updates.append(r)
555 p.nscount = len(updates)
556 p.nsrecs = updates
558 response = self.dns_transaction_udp(p)
559 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
561 # And finally check it's gone
562 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
563 questions = []
565 q = self.make_name_question(NAME, dns.DNS_QTYPE_TXT, dns.DNS_QCLASS_IN)
566 questions.append(q)
568 self.finish_name_packet(p, questions)
569 response = self.dns_transaction_udp(p)
570 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NXDOMAIN)
572 def test_readd_record(self):
573 "Test if adding, deleting and then readding a records works"
575 NAME = "readdrec.%s" % self.get_dns_domain()
577 # Create the record
578 p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
579 updates = []
581 name = self.get_dns_domain()
583 u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
584 updates.append(u)
585 self.finish_name_packet(p, updates)
587 updates = []
588 r = dns.res_rec()
589 r.name = NAME
590 r.rr_type = dns.DNS_QTYPE_TXT
591 r.rr_class = dns.DNS_QCLASS_IN
592 r.ttl = 900
593 r.length = 0xffff
594 rdata = dns.txt_record()
595 rdata.txt = '"This is a test"'
596 r.rdata = rdata
597 updates.append(r)
598 p.nscount = len(updates)
599 p.nsrecs = updates
601 response = self.dns_transaction_udp(p)
602 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
604 # Now check the record is around
605 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
606 questions = []
607 q = self.make_name_question(NAME, dns.DNS_QTYPE_TXT, dns.DNS_QCLASS_IN)
608 questions.append(q)
610 self.finish_name_packet(p, questions)
611 response = self.dns_transaction_udp(p)
612 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
614 # Now delete the record
615 p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
616 updates = []
618 name = self.get_dns_domain()
620 u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
621 updates.append(u)
622 self.finish_name_packet(p, updates)
624 updates = []
625 r = dns.res_rec()
626 r.name = NAME
627 r.rr_type = dns.DNS_QTYPE_TXT
628 r.rr_class = dns.DNS_QCLASS_NONE
629 r.ttl = 0
630 r.length = 0xffff
631 rdata = dns.txt_record()
632 rdata.txt = '"This is a test"'
633 r.rdata = rdata
634 updates.append(r)
635 p.nscount = len(updates)
636 p.nsrecs = updates
638 response = self.dns_transaction_udp(p)
639 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
641 # check it's gone
642 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
643 questions = []
645 q = self.make_name_question(NAME, dns.DNS_QTYPE_TXT, dns.DNS_QCLASS_IN)
646 questions.append(q)
648 self.finish_name_packet(p, questions)
649 response = self.dns_transaction_udp(p)
650 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NXDOMAIN)
652 # recreate the record
653 p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
654 updates = []
656 name = self.get_dns_domain()
658 u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
659 updates.append(u)
660 self.finish_name_packet(p, updates)
662 updates = []
663 r = dns.res_rec()
664 r.name = NAME
665 r.rr_type = dns.DNS_QTYPE_TXT
666 r.rr_class = dns.DNS_QCLASS_IN
667 r.ttl = 900
668 r.length = 0xffff
669 rdata = dns.txt_record()
670 rdata.txt = '"This is a test"'
671 r.rdata = rdata
672 updates.append(r)
673 p.nscount = len(updates)
674 p.nsrecs = updates
676 response = self.dns_transaction_udp(p)
677 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
679 # Now check the record is around
680 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
681 questions = []
682 q = self.make_name_question(NAME, dns.DNS_QTYPE_TXT, dns.DNS_QCLASS_IN)
683 questions.append(q)
685 self.finish_name_packet(p, questions)
686 response = self.dns_transaction_udp(p)
687 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
689 def test_update_add_mx_record(self):
690 "test adding MX records works"
691 p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
692 updates = []
694 name = self.get_dns_domain()
696 u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
697 updates.append(u)
698 self.finish_name_packet(p, updates)
700 updates = []
701 r = dns.res_rec()
702 r.name = "%s" % self.get_dns_domain()
703 r.rr_type = dns.DNS_QTYPE_MX
704 r.rr_class = dns.DNS_QCLASS_IN
705 r.ttl = 900
706 r.length = 0xffff
707 rdata = dns.mx_record()
708 rdata.preference = 10
709 rdata.exchange = 'mail.%s' % self.get_dns_domain()
710 r.rdata = rdata
711 updates.append(r)
712 p.nscount = len(updates)
713 p.nsrecs = updates
715 response = self.dns_transaction_udp(p)
716 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
718 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
719 questions = []
721 name = "%s" % self.get_dns_domain()
722 q = self.make_name_question(name, dns.DNS_QTYPE_MX, dns.DNS_QCLASS_IN)
723 questions.append(q)
725 self.finish_name_packet(p, questions)
726 response = self.dns_transaction_udp(p)
727 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
728 self.assertEqual(response.ancount, 1)
729 ans = response.answers[0]
730 self.assertEqual(ans.rr_type, dns.DNS_QTYPE_MX)
731 self.assertEqual(ans.rdata.preference, 10)
732 self.assertEqual(ans.rdata.exchange, 'mail.%s' % self.get_dns_domain())
735 class TestComplexQueries(DNSTest):
737 def setUp(self):
738 super(TestComplexQueries, self).setUp()
739 p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
740 updates = []
742 name = self.get_dns_domain()
744 u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
745 updates.append(u)
746 self.finish_name_packet(p, updates)
748 updates = []
749 r = dns.res_rec()
750 r.name = "cname_test.%s" % self.get_dns_domain()
751 r.rr_type = dns.DNS_QTYPE_CNAME
752 r.rr_class = dns.DNS_QCLASS_IN
753 r.ttl = 900
754 r.length = 0xffff
755 r.rdata = "%s.%s" % (os.getenv('SERVER'), self.get_dns_domain())
756 updates.append(r)
757 p.nscount = len(updates)
758 p.nsrecs = updates
760 response = self.dns_transaction_udp(p)
761 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
763 def tearDown(self):
764 super(TestComplexQueries, self).tearDown()
765 p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
766 updates = []
768 name = self.get_dns_domain()
770 u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
771 updates.append(u)
772 self.finish_name_packet(p, updates)
774 updates = []
775 r = dns.res_rec()
776 r.name = "cname_test.%s" % self.get_dns_domain()
777 r.rr_type = dns.DNS_QTYPE_CNAME
778 r.rr_class = dns.DNS_QCLASS_NONE
779 r.ttl = 0
780 r.length = 0xffff
781 r.rdata = "%s.%s" % (os.getenv('SERVER'), self.get_dns_domain())
782 updates.append(r)
783 p.nscount = len(updates)
784 p.nsrecs = updates
786 response = self.dns_transaction_udp(p)
787 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
789 def test_one_a_query(self):
790 "create a query packet containing one query record"
791 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
792 questions = []
794 name = "cname_test.%s" % self.get_dns_domain()
795 q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
796 print "asking for ", q.name
797 questions.append(q)
799 self.finish_name_packet(p, questions)
800 response = self.dns_transaction_udp(p)
801 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
802 self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
803 self.assertEquals(response.ancount, 2)
804 self.assertEquals(response.answers[0].rr_type, dns.DNS_QTYPE_CNAME)
805 self.assertEquals(response.answers[0].rdata, "%s.%s" %
806 (os.getenv('SERVER'), self.get_dns_domain()))
807 self.assertEquals(response.answers[1].rr_type, dns.DNS_QTYPE_A)
808 self.assertEquals(response.answers[1].rdata,
809 os.getenv('SERVER_IP'))
811 class TestInvalidQueries(DNSTest):
813 def test_one_a_query(self):
814 "send 0 bytes follows by create a query packet containing one query record"
816 s = None
817 try:
818 s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, 0)
819 s.connect((os.getenv('SERVER_IP'), 53))
820 s.send("", 0)
821 finally:
822 if s is not None:
823 s.close()
825 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
826 questions = []
828 name = "%s.%s" % (os.getenv('SERVER'), self.get_dns_domain())
829 q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
830 print "asking for ", q.name
831 questions.append(q)
833 self.finish_name_packet(p, questions)
834 response = self.dns_transaction_udp(p)
835 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
836 self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
837 self.assertEquals(response.ancount, 1)
838 self.assertEquals(response.answers[0].rdata,
839 os.getenv('SERVER_IP'))
841 def test_one_a_reply(self):
842 "send a reply instead of a query"
844 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
845 questions = []
847 name = "%s.%s" % ('fakefakefake', self.get_dns_domain())
848 q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
849 print "asking for ", q.name
850 questions.append(q)
852 self.finish_name_packet(p, questions)
853 p.operation |= dns.DNS_FLAG_REPLY
854 s = None
855 try:
856 send_packet = ndr.ndr_pack(p)
857 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
858 host=os.getenv('SERVER_IP')
859 s.connect((host, 53))
860 tcp_packet = struct.pack('!H', len(send_packet))
861 tcp_packet += send_packet
862 s.send(tcp_packet, 0)
863 recv_packet = s.recv(0xffff + 2, 0)
864 self.assertEquals(0, len(recv_packet))
865 finally:
866 if s is not None:
867 s.close()
869 class TestZones(DNSTest):
870 def get_loadparm(self):
871 lp = param.LoadParm()
872 lp.load(os.getenv("SMB_CONF_PATH"))
873 return lp
875 def get_credentials(self, lp):
876 creds = credentials.Credentials()
877 creds.guess(lp)
878 creds.set_machine_account(lp)
879 creds.set_krb_forwardable(credentials.NO_KRB_FORWARDABLE)
880 return creds
882 def setUp(self):
883 super(TestZones, self).setUp()
884 self.lp = self.get_loadparm()
885 self.creds = self.get_credentials(self.lp)
886 self.server = os.getenv("SERVER_IP")
887 self.zone = "test.lan"
888 self.rpc_conn = dnsserver.dnsserver("ncacn_ip_tcp:%s" % (self.server),
889 self.lp, self.creds)
891 def tearDown(self):
892 super(TestZones, self).tearDown()
893 try:
894 self.delete_zone(self.zone)
895 except RuntimeError, (num, string):
896 if num != 9601: #WERR_DNS_ERROR_ZONE_DOES_NOT_EXIST
897 raise
899 def create_zone(self, zone):
900 zone_create = dnsserver.DNS_RPC_ZONE_CREATE_INFO_LONGHORN()
901 zone_create.pszZoneName = zone
902 zone_create.dwZoneType = dnsp.DNS_ZONE_TYPE_PRIMARY
903 zone_create.fAllowUpdate = dnsp.DNS_ZONE_UPDATE_SECURE
904 zone_create.fAging = 0
905 zone_create.dwDpFlags = dnsserver.DNS_DP_DOMAIN_DEFAULT
906 self.rpc_conn.DnssrvOperation2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
908 self.server,
909 None,
911 'ZoneCreate',
912 dnsserver.DNSSRV_TYPEID_ZONE_CREATE,
913 zone_create)
915 def delete_zone(self, zone):
916 self.rpc_conn.DnssrvOperation2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
918 self.server,
919 zone,
921 'DeleteZoneFromDs',
922 dnsserver.DNSSRV_TYPEID_NULL,
923 None)
925 def test_soa_query(self):
926 zone = "test.lan"
927 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
928 questions = []
930 q = self.make_name_question(zone, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
931 questions.append(q)
932 self.finish_name_packet(p, questions)
934 response = self.dns_transaction_udp(p)
935 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NXDOMAIN)
936 self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
937 self.assertEquals(response.ancount, 0)
939 self.create_zone(zone)
940 response = self.dns_transaction_udp(p)
941 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
942 self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
943 self.assertEquals(response.ancount, 1)
944 self.assertEquals(response.answers[0].rr_type, dns.DNS_QTYPE_SOA)
946 self.delete_zone(zone)
947 response = self.dns_transaction_udp(p)
948 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NXDOMAIN)
949 self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
950 self.assertEquals(response.ancount, 0)
954 if __name__ == "__main__":
955 import unittest
956 unittest.main()