s4 dns: Test deleting records and fix a small bu
[Samba/gebeck_regimport.git] / source4 / scripting / python / samba / tests / dns.py
blobff973bde058d16f7d8b05fa6fc65c04a87693298
1 #!/usr/bin/env python
3 # Unix SMB/CIFS implementation.
4 # Copyright (C) Kai Blin <kai@samba.org> 2011
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 3 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful,
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 # GNU General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program. If not, see <http://www.gnu.org/licenses/>.
20 import os
21 import struct
22 import random
23 from samba import socket
24 import samba.ndr as ndr
25 import samba.dcerpc.dns as dns
26 from samba.tests import TestCase
28 class DNSTest(TestCase):
30 def errstr(self, errcode):
31 "Return a readable error code"
32 string_codes = [
33 "OK",
34 "FORMERR",
35 "SERVFAIL",
36 "NXDOMAIN",
37 "NOTIMP",
38 "REFUSED",
39 "YXDOMAIN",
40 "YXRRSET",
41 "NXRRSET",
42 "NOTAUTH",
43 "NOTZONE",
46 return string_codes[errcode]
49 def assert_dns_rcode_equals(self, packet, rcode):
50 "Helper function to check return code"
51 p_errcode = packet.operation & 0x000F
52 self.assertEquals(p_errcode, rcode, "Expected RCODE %s, got %s" % \
53 (self.errstr(rcode), self.errstr(p_errcode)))
55 def assert_dns_opcode_equals(self, packet, opcode):
56 "Helper function to check opcode"
57 p_opcode = packet.operation & 0x7800
58 self.assertEquals(p_opcode, opcode, "Expected OPCODE %s, got %s" % \
59 (opcode, p_opcode))
61 def make_name_packet(self, opcode, qid=None):
62 "Helper creating a dns.name_packet"
63 p = dns.name_packet()
64 if qid is None:
65 p.id = random.randint(0x0, 0xffff)
66 p.operation = opcode
67 p.questions = []
68 return p
70 def finish_name_packet(self, packet, questions):
71 "Helper to finalize a dns.name_packet"
72 packet.qdcount = len(questions)
73 packet.questions = questions
75 def make_name_question(self, name, qtype, qclass):
76 "Helper creating a dns.name_question"
77 q = dns.name_question()
78 q.name = name
79 q.question_type = qtype
80 q.question_class = qclass
81 return q
83 def get_dns_domain(self):
84 "Helper to get dns domain"
85 return os.getenv('REALM', 'example.com').lower()
87 def dns_transaction_udp(self, packet, host=os.getenv('DC_SERVER_IP')):
88 "send a DNS query and read the reply"
89 s = None
90 try:
91 send_packet = ndr.ndr_pack(packet)
92 s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, 0)
93 s.connect((host, 53))
94 s.send(send_packet, 0)
95 recv_packet = s.recv(2048, 0)
96 return ndr.ndr_unpack(dns.name_packet, recv_packet)
97 finally:
98 if s is not None:
99 s.close()
101 def dns_transaction_tcp(self, packet, host=os.getenv('DC_SERVER_IP')):
102 "send a DNS query and read the reply"
103 s = None
104 try:
105 send_packet = ndr.ndr_pack(packet)
106 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
107 s.connect((host, 53))
108 tcp_packet = struct.pack('!H', len(send_packet))
109 tcp_packet += send_packet
110 s.send(tcp_packet, 0)
111 recv_packet = s.recv(0xffff + 2, 0)
112 return ndr.ndr_unpack(dns.name_packet, recv_packet[2:])
113 finally:
114 if s is not None:
115 s.close()
117 def test_one_a_query(self):
118 "create a query packet containing one query record"
119 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
120 questions = []
122 name = "%s.%s" % (os.getenv('DC_SERVER'), self.get_dns_domain())
123 q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
124 print "asking for ", q.name
125 questions.append(q)
127 self.finish_name_packet(p, questions)
128 response = self.dns_transaction_udp(p)
129 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
130 self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
131 self.assertEquals(response.ancount, 1)
132 self.assertEquals(response.answers[0].rdata,
133 os.getenv('DC_SERVER_IP'))
135 def test_one_a_query_tcp(self):
136 "create a query packet containing one query record via TCP"
137 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
138 questions = []
140 name = "%s.%s" % (os.getenv('DC_SERVER'), self.get_dns_domain())
141 q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
142 print "asking for ", q.name
143 questions.append(q)
145 self.finish_name_packet(p, questions)
146 response = self.dns_transaction_tcp(p)
147 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
148 self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
149 self.assertEquals(response.ancount, 1)
150 self.assertEquals(response.answers[0].rdata,
151 os.getenv('DC_SERVER_IP'))
153 def test_two_queries(self):
154 "create a query packet containing two query records"
155 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
156 questions = []
158 name = "%s.%s" % (os.getenv('DC_SERVER'), self.get_dns_domain())
159 q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
160 questions.append(q)
162 name = "%s.%s" % ('bogusname', self.get_dns_domain())
163 q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
164 questions.append(q)
166 self.finish_name_packet(p, questions)
167 response = self.dns_transaction_udp(p)
168 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_FORMERR)
170 def test_qtype_all_query(self):
171 "create a QTYPE_ALL query"
172 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
173 questions = []
175 name = "%s.%s" % (os.getenv('DC_SERVER'), self.get_dns_domain())
176 q = self.make_name_question(name, dns.DNS_QTYPE_ALL, dns.DNS_QCLASS_IN)
177 print "asking for ", q.name
178 questions.append(q)
180 self.finish_name_packet(p, questions)
181 response = self.dns_transaction_udp(p)
183 num_answers = 1
184 dc_ipv6 = os.getenv('DC_SERVER_IPV6')
185 if dc_ipv6 is not None:
186 num_answers += 1
188 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
189 self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
190 self.assertEquals(response.ancount, num_answers)
191 self.assertEquals(response.answers[0].rdata,
192 os.getenv('DC_SERVER_IP'))
193 if dc_ipv6 is not None:
194 self.assertEquals(response.answers[1].rdata, dc_ipv6)
196 def test_qclass_none_query(self):
197 "create a QCLASS_NONE query"
198 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
199 questions = []
201 name = "%s.%s" % (os.getenv('DC_SERVER'), self.get_dns_domain())
202 q = self.make_name_question(name, dns.DNS_QTYPE_ALL, dns.DNS_QCLASS_NONE)
203 questions.append(q)
205 self.finish_name_packet(p, questions)
206 response = self.dns_transaction_udp(p)
207 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NOTIMP)
209 # Only returns an authority section entry in BIND and Win DNS
210 # FIXME: Enable one Samba implements this feature
211 # def test_soa_hostname_query(self):
212 # "create a SOA query for a hostname"
213 # p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
214 # questions = []
216 # name = "%s.%s" % (os.getenv('DC_SERVER'), self.get_dns_domain())
217 # q = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
218 # questions.append(q)
220 # self.finish_name_packet(p, questions)
221 # response = self.dns_transaction_udp(p)
222 # self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
223 # self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
224 # # We don't get SOA records for single hosts
225 # self.assertEquals(response.ancount, 0)
227 def test_soa_domain_query(self):
228 "create a SOA query for a domain"
229 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
230 questions = []
232 name = self.get_dns_domain()
233 q = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
234 questions.append(q)
236 self.finish_name_packet(p, questions)
237 response = self.dns_transaction_udp(p)
238 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
239 self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
240 self.assertEquals(response.ancount, 1)
242 def test_two_updates(self):
243 "create two update requests"
244 p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
245 updates = []
247 name = "%s.%s" % (os.getenv('DC_SERVER'), self.get_dns_domain())
248 u = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
249 updates.append(u)
251 name = self.get_dns_domain()
252 u = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
253 updates.append(u)
255 self.finish_name_packet(p, updates)
256 response = self.dns_transaction_udp(p)
257 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_FORMERR)
259 def test_update_wrong_qclass(self):
260 "create update with DNS_QCLASS_NONE"
261 p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
262 updates = []
264 name = self.get_dns_domain()
265 u = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_NONE)
266 updates.append(u)
268 self.finish_name_packet(p, updates)
269 response = self.dns_transaction_udp(p)
270 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NOTIMP)
272 def test_update_prereq_with_non_null_ttl(self):
273 "test update with a non-null TTL"
274 p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
275 updates = []
277 name = self.get_dns_domain()
279 u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
280 updates.append(u)
281 self.finish_name_packet(p, updates)
283 prereqs = []
284 r = dns.res_rec()
285 r.name = "%s.%s" % (os.getenv('DC_SERVER'), self.get_dns_domain())
286 r.rr_type = dns.DNS_QTYPE_TXT
287 r.rr_class = dns.DNS_QCLASS_NONE
288 r.ttl = 1
289 r.length = 0
290 prereqs.append(r)
292 p.ancount = len(prereqs)
293 p.answers = prereqs
295 response = self.dns_transaction_udp(p)
296 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_FORMERR)
298 # I'd love to test this one, but it segfaults. :)
299 # def test_update_prereq_with_non_null_length(self):
300 # "test update with a non-null length"
301 # p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
302 # updates = []
304 # name = self.get_dns_domain()
306 # u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
307 # updates.append(u)
308 # self.finish_name_packet(p, updates)
310 # prereqs = []
311 # r = dns.res_rec()
312 # r.name = "%s.%s" % (os.getenv('DC_SERVER'), self.get_dns_domain())
313 # r.rr_type = dns.DNS_QTYPE_TXT
314 # r.rr_class = dns.DNS_QCLASS_ANY
315 # r.ttl = 0
316 # r.length = 1
317 # prereqs.append(r)
319 # p.ancount = len(prereqs)
320 # p.answers = prereqs
322 # response = self.dns_transaction_udp(p)
323 # self.assert_dns_rcode_equals(response, dns.DNS_RCODE_FORMERR)
325 def test_update_prereq_nonexisting_name(self):
326 "test update with a nonexisting name"
327 p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
328 updates = []
330 name = self.get_dns_domain()
332 u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
333 updates.append(u)
334 self.finish_name_packet(p, updates)
336 prereqs = []
337 r = dns.res_rec()
338 r.name = "idontexist.%s" % self.get_dns_domain()
339 r.rr_type = dns.DNS_QTYPE_TXT
340 r.rr_class = dns.DNS_QCLASS_ANY
341 r.ttl = 0
342 r.length = 0
343 prereqs.append(r)
345 p.ancount = len(prereqs)
346 p.answers = prereqs
348 response = self.dns_transaction_udp(p)
349 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NXRRSET)
351 def test_update_add_txt_record(self):
352 "test adding records works"
353 p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
354 updates = []
356 name = self.get_dns_domain()
358 u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
359 updates.append(u)
360 self.finish_name_packet(p, updates)
362 updates = []
363 r = dns.res_rec()
364 r.name = "textrec.%s" % self.get_dns_domain()
365 r.rr_type = dns.DNS_QTYPE_TXT
366 r.rr_class = dns.DNS_QCLASS_IN
367 r.ttl = 900
368 r.length = 0xffff
369 r.rdata = dns.txt_record()
370 r.rdata.txt = '"This is a test"'
371 updates.append(r)
372 p.nscount = len(updates)
373 p.nsrecs = updates
375 response = self.dns_transaction_udp(p)
376 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
378 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
379 questions = []
381 name = "textrec.%s" % self.get_dns_domain()
382 q = self.make_name_question(name, dns.DNS_QTYPE_TXT, dns.DNS_QCLASS_IN)
383 questions.append(q)
385 self.finish_name_packet(p, questions)
386 response = self.dns_transaction_udp(p)
387 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
388 self.assertEquals(response.ancount, 1)
389 self.assertEquals(response.answers[0].rdata.txt, '"This is a test"')
391 def test_update_add_two_txt_records(self):
392 "test adding two txt records works"
393 p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
394 updates = []
396 name = self.get_dns_domain()
398 u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
399 updates.append(u)
400 self.finish_name_packet(p, updates)
402 updates = []
403 r = dns.res_rec()
404 r.name = "textrec2.%s" % self.get_dns_domain()
405 r.rr_type = dns.DNS_QTYPE_TXT
406 r.rr_class = dns.DNS_QCLASS_IN
407 r.ttl = 900
408 r.length = 0xffff
409 r.rdata = dns.txt_record()
410 r.rdata.txt = '"This is a test" "and this is a test, too"'
411 updates.append(r)
412 p.nscount = len(updates)
413 p.nsrecs = updates
415 response = self.dns_transaction_udp(p)
416 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
418 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
419 questions = []
421 name = "textrec2.%s" % self.get_dns_domain()
422 q = self.make_name_question(name, dns.DNS_QTYPE_TXT, dns.DNS_QCLASS_IN)
423 questions.append(q)
425 self.finish_name_packet(p, questions)
426 response = self.dns_transaction_udp(p)
427 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
428 self.assertEquals(response.ancount, 1)
429 self.assertEquals(response.answers[0].rdata.txt, '"This is a test" "and this is a test, too"')
432 def test_delete_record(self):
433 "Test if deleting records works"
434 p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
435 updates = []
437 name = self.get_dns_domain()
439 u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
440 updates.append(u)
441 self.finish_name_packet(p, updates)
443 updates = []
444 r = dns.res_rec()
445 r.name = "textrec.%s" % self.get_dns_domain()
446 r.rr_type = dns.DNS_QTYPE_TXT
447 r.rr_class = dns.DNS_QCLASS_NONE
448 r.ttl = 0
449 r.length = 0xffff
450 r.rdata = dns.txt_record()
451 r.rdata.txt = '"This is a test"'
452 updates.append(r)
453 p.nscount = len(updates)
454 p.nsrecs = updates
456 response = self.dns_transaction_udp(p)
457 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
459 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
460 questions = []
462 name = "textrec.%s" % self.get_dns_domain()
463 q = self.make_name_question(name, dns.DNS_QTYPE_TXT, dns.DNS_QCLASS_IN)
464 questions.append(q)
466 self.finish_name_packet(p, questions)
467 response = self.dns_transaction_udp(p)
468 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NXDOMAIN)
471 if __name__ == "__main__":
472 import unittest
473 unittest.main()