s4 dns: Update prerequisite checking conforming to RFC
[Samba/gebeck_regimport.git] / source4 / scripting / python / samba / tests / dns.py
blobca9edbf50009c7e5dca4b9eb60774985d15617ca
1 #!/usr/bin/env python
3 # Unix SMB/CIFS implementation.
4 # Copyright (C) Kai Blin <kai@samba.org> 2011
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 3 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful,
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 # GNU General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program. If not, see <http://www.gnu.org/licenses/>.
20 import os
21 import sys
22 import struct
23 import random
24 from samba import socket
25 import samba.ndr as ndr
26 import samba.dcerpc.dns as dns
27 from samba.tests import TestCase
29 class DNSTest(TestCase):
31 def errstr(self, errcode):
32 "Return a readable error code"
33 string_codes = [
34 "OK",
35 "FORMERR",
36 "SERVFAIL",
37 "NXDOMAIN",
38 "NOTIMP",
39 "REFUSED",
40 "YXDOMAIN",
41 "YXRRSET",
42 "NXRRSET",
43 "NOTAUTH",
44 "NOTZONE",
47 return string_codes[errcode]
50 def assert_dns_rcode_equals(self, packet, rcode):
51 "Helper function to check return code"
52 p_errcode = packet.operation & 0x000F
53 self.assertEquals(p_errcode, rcode, "Expected RCODE %s, got %s" % \
54 (self.errstr(rcode), self.errstr(p_errcode)))
56 def assert_dns_opcode_equals(self, packet, opcode):
57 "Helper function to check opcode"
58 p_opcode = packet.operation & 0x7800
59 self.assertEquals(p_opcode, opcode, "Expected OPCODE %s, got %s" % \
60 (opcode, p_opcode))
62 def make_name_packet(self, opcode, qid=None):
63 "Helper creating a dns.name_packet"
64 p = dns.name_packet()
65 if qid is None:
66 p.id = random.randint(0x0, 0xffff)
67 p.operation = opcode
68 p.questions = []
69 return p
71 def finish_name_packet(self, packet, questions):
72 "Helper to finalize a dns.name_packet"
73 packet.qdcount = len(questions)
74 packet.questions = questions
76 def make_name_question(self, name, qtype, qclass):
77 "Helper creating a dns.name_question"
78 q = dns.name_question()
79 q.name = name
80 q.question_type = qtype
81 q.question_class = qclass
82 return q
84 def get_dns_domain(self):
85 "Helper to get dns domain"
86 return os.getenv('REALM', 'example.com').lower()
88 def dns_transaction_udp(self, packet, host=os.getenv('DC_SERVER_IP')):
89 "send a DNS query and read the reply"
90 s = None
91 try:
92 send_packet = ndr.ndr_pack(packet)
93 s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, 0)
94 s.connect((host, 53))
95 s.send(send_packet, 0)
96 recv_packet = s.recv(2048, 0)
97 return ndr.ndr_unpack(dns.name_packet, recv_packet)
98 finally:
99 if s is not None:
100 s.close()
102 def test_one_a_query(self):
103 "create a query packet containing one query record"
104 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
105 questions = []
107 name = "%s.%s" % (os.getenv('DC_SERVER'), self.get_dns_domain())
108 q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
109 print "asking for ", q.name
110 questions.append(q)
112 self.finish_name_packet(p, questions)
113 response = self.dns_transaction_udp(p)
114 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
115 self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
116 self.assertEquals(response.ancount, 1)
117 self.assertEquals(response.answers[0].rdata,
118 os.getenv('DC_SERVER_IP'))
120 def test_two_queries(self):
121 "create a query packet containing two query records"
122 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
123 questions = []
125 name = "%s.%s" % (os.getenv('DC_SERVER'), self.get_dns_domain())
126 q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
127 questions.append(q)
129 name = "%s.%s" % ('bogusname', self.get_dns_domain())
130 q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
131 questions.append(q)
133 self.finish_name_packet(p, questions)
134 response = self.dns_transaction_udp(p)
135 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_FORMERR)
137 def test_qtype_all_query(self):
138 "create a QTYPE_ALL query"
139 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
140 questions = []
142 name = "%s.%s" % (os.getenv('DC_SERVER'), self.get_dns_domain())
143 q = self.make_name_question(name, dns.DNS_QTYPE_ALL, dns.DNS_QCLASS_IN)
144 print "asking for ", q.name
145 questions.append(q)
147 self.finish_name_packet(p, questions)
148 response = self.dns_transaction_udp(p)
150 num_answers = 1
151 dc_ipv6 = os.getenv('DC_SERVER_IPV6')
152 if dc_ipv6 is not None:
153 num_answers += 1
155 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
156 self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
157 self.assertEquals(response.ancount, num_answers)
158 self.assertEquals(response.answers[0].rdata,
159 os.getenv('DC_SERVER_IP'))
160 if dc_ipv6 is not None:
161 self.assertEquals(response.answers[1].rdata, dc_ipv6)
163 def test_qclass_none_query(self):
164 "create a QCLASS_NONE query"
165 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
166 questions = []
168 name = "%s.%s" % (os.getenv('DC_SERVER'), self.get_dns_domain())
169 q = self.make_name_question(name, dns.DNS_QTYPE_ALL, dns.DNS_QCLASS_NONE)
170 questions.append(q)
172 self.finish_name_packet(p, questions)
173 response = self.dns_transaction_udp(p)
174 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NOTIMP)
176 # Only returns an authority section entry in BIND and Win DNS
177 # FIXME: Enable one Samba implements this feature
178 # def test_soa_hostname_query(self):
179 # "create a SOA query for a hostname"
180 # p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
181 # questions = []
183 # name = "%s.%s" % (os.getenv('DC_SERVER'), self.get_dns_domain())
184 # q = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
185 # questions.append(q)
187 # self.finish_name_packet(p, questions)
188 # response = self.dns_transaction_udp(p)
189 # self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
190 # self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
191 # # We don't get SOA records for single hosts
192 # self.assertEquals(response.ancount, 0)
194 def test_soa_domain_query(self):
195 "create a SOA query for a domain"
196 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
197 questions = []
199 name = self.get_dns_domain()
200 q = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
201 questions.append(q)
203 self.finish_name_packet(p, questions)
204 response = self.dns_transaction_udp(p)
205 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
206 self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
207 self.assertEquals(response.ancount, 1)
209 def test_two_updates(self):
210 "create two update requests"
211 p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
212 updates = []
214 name = "%s.%s" % (os.getenv('DC_SERVER'), self.get_dns_domain())
215 u = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
216 updates.append(u)
218 name = self.get_dns_domain()
219 u = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
220 updates.append(u)
222 self.finish_name_packet(p, updates)
223 response = self.dns_transaction_udp(p)
224 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_FORMERR)
226 def test_update_wrong_qclass(self):
227 "create update with DNS_QCLASS_NONE"
228 p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
229 updates = []
231 name = self.get_dns_domain()
232 u = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_NONE)
233 updates.append(u)
235 self.finish_name_packet(p, updates)
236 response = self.dns_transaction_udp(p)
237 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NOTIMP)
239 def test_update_prereq_with_non_null_ttl(self):
240 "test update with a non-null TTL"
241 p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
242 updates = []
244 name = self.get_dns_domain()
246 u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
247 updates.append(u)
248 self.finish_name_packet(p, updates)
250 prereqs = []
251 r = dns.res_rec()
252 r.name = "%s.%s" % (os.getenv('DC_SERVER'), self.get_dns_domain())
253 r.rr_type = dns.DNS_QTYPE_TXT
254 r.rr_class = dns.DNS_QCLASS_NONE
255 r.ttl = 1
256 r.length = 0
257 prereqs.append(r)
259 p.ancount = len(prereqs)
260 p.answers = prereqs
262 response = self.dns_transaction_udp(p)
263 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_FORMERR)
265 # I'd love to test this one, but it segfaults. :)
266 # def test_update_prereq_with_non_null_length(self):
267 # "test update with a non-null length"
268 # p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
269 # updates = []
271 # name = self.get_dns_domain()
273 # u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
274 # updates.append(u)
275 # self.finish_name_packet(p, updates)
277 # prereqs = []
278 # r = dns.res_rec()
279 # r.name = "%s.%s" % (os.getenv('DC_SERVER'), self.get_dns_domain())
280 # r.rr_type = dns.DNS_QTYPE_TXT
281 # r.rr_class = dns.DNS_QCLASS_ANY
282 # r.ttl = 0
283 # r.length = 1
284 # prereqs.append(r)
286 # p.ancount = len(prereqs)
287 # p.answers = prereqs
289 # response = self.dns_transaction_udp(p)
290 # self.assert_dns_rcode_equals(response, dns.DNS_RCODE_FORMERR)
292 def test_update_prereq_nonexisting_name(self):
293 "test update with a non-null TTL"
294 p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
295 updates = []
297 name = self.get_dns_domain()
299 u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
300 updates.append(u)
301 self.finish_name_packet(p, updates)
303 prereqs = []
304 r = dns.res_rec()
305 r.name = "idontexist.%s" % self.get_dns_domain()
306 r.rr_type = dns.DNS_QTYPE_TXT
307 r.rr_class = dns.DNS_QCLASS_ANY
308 r.ttl = 0
309 r.length = 0
310 prereqs.append(r)
312 p.ancount = len(prereqs)
313 p.answers = prereqs
315 response = self.dns_transaction_udp(p)
316 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NXRRSET)
318 if __name__ == "__main__":
319 import unittest
320 unittest.main()