s4-python: Remove env from non-executable test scripts.
[Samba/vl.git] / source4 / scripting / python / samba / tests / dns.py
blobb1ea41613c7f63e6890250d365b7b12de295efae
1 # Unix SMB/CIFS implementation.
2 # Copyright (C) Kai Blin <kai@samba.org> 2011
4 # This program is free software; you can redistribute it and/or modify
5 # it under the terms of the GNU General Public License as published by
6 # the Free Software Foundation; either version 3 of the License, or
7 # (at your option) any later version.
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
14 # You should have received a copy of the GNU General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 import os
19 import struct
20 import random
21 from samba import socket
22 import samba.ndr as ndr
23 import samba.dcerpc.dns as dns
24 from samba.tests import TestCase
26 class DNSTest(TestCase):
28 def errstr(self, errcode):
29 "Return a readable error code"
30 string_codes = [
31 "OK",
32 "FORMERR",
33 "SERVFAIL",
34 "NXDOMAIN",
35 "NOTIMP",
36 "REFUSED",
37 "YXDOMAIN",
38 "YXRRSET",
39 "NXRRSET",
40 "NOTAUTH",
41 "NOTZONE",
44 return string_codes[errcode]
47 def assert_dns_rcode_equals(self, packet, rcode):
48 "Helper function to check return code"
49 p_errcode = packet.operation & 0x000F
50 self.assertEquals(p_errcode, rcode, "Expected RCODE %s, got %s" % \
51 (self.errstr(rcode), self.errstr(p_errcode)))
53 def assert_dns_opcode_equals(self, packet, opcode):
54 "Helper function to check opcode"
55 p_opcode = packet.operation & 0x7800
56 self.assertEquals(p_opcode, opcode, "Expected OPCODE %s, got %s" % \
57 (opcode, p_opcode))
59 def make_name_packet(self, opcode, qid=None):
60 "Helper creating a dns.name_packet"
61 p = dns.name_packet()
62 if qid is None:
63 p.id = random.randint(0x0, 0xffff)
64 p.operation = opcode
65 p.questions = []
66 return p
68 def finish_name_packet(self, packet, questions):
69 "Helper to finalize a dns.name_packet"
70 packet.qdcount = len(questions)
71 packet.questions = questions
73 def make_name_question(self, name, qtype, qclass):
74 "Helper creating a dns.name_question"
75 q = dns.name_question()
76 q.name = name
77 q.question_type = qtype
78 q.question_class = qclass
79 return q
81 def get_dns_domain(self):
82 "Helper to get dns domain"
83 return os.getenv('REALM', 'example.com').lower()
85 def dns_transaction_udp(self, packet, host=os.getenv('DC_SERVER_IP')):
86 "send a DNS query and read the reply"
87 s = None
88 try:
89 send_packet = ndr.ndr_pack(packet)
90 s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, 0)
91 s.connect((host, 53))
92 s.send(send_packet, 0)
93 recv_packet = s.recv(2048, 0)
94 return ndr.ndr_unpack(dns.name_packet, recv_packet)
95 finally:
96 if s is not None:
97 s.close()
99 def dns_transaction_tcp(self, packet, host=os.getenv('DC_SERVER_IP')):
100 "send a DNS query and read the reply"
101 s = None
102 try:
103 send_packet = ndr.ndr_pack(packet)
104 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
105 s.connect((host, 53))
106 tcp_packet = struct.pack('!H', len(send_packet))
107 tcp_packet += send_packet
108 s.send(tcp_packet, 0)
109 recv_packet = s.recv(0xffff + 2, 0)
110 return ndr.ndr_unpack(dns.name_packet, recv_packet[2:])
111 finally:
112 if s is not None:
113 s.close()
115 def test_one_a_query(self):
116 "create a query packet containing one query record"
117 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
118 questions = []
120 name = "%s.%s" % (os.getenv('DC_SERVER'), self.get_dns_domain())
121 q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
122 print "asking for ", q.name
123 questions.append(q)
125 self.finish_name_packet(p, questions)
126 response = self.dns_transaction_udp(p)
127 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
128 self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
129 self.assertEquals(response.ancount, 1)
130 self.assertEquals(response.answers[0].rdata,
131 os.getenv('DC_SERVER_IP'))
133 def test_one_a_query_tcp(self):
134 "create a query packet containing one query record via TCP"
135 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
136 questions = []
138 name = "%s.%s" % (os.getenv('DC_SERVER'), self.get_dns_domain())
139 q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
140 print "asking for ", q.name
141 questions.append(q)
143 self.finish_name_packet(p, questions)
144 response = self.dns_transaction_tcp(p)
145 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
146 self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
147 self.assertEquals(response.ancount, 1)
148 self.assertEquals(response.answers[0].rdata,
149 os.getenv('DC_SERVER_IP'))
151 def test_two_queries(self):
152 "create a query packet containing two query records"
153 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
154 questions = []
156 name = "%s.%s" % (os.getenv('DC_SERVER'), self.get_dns_domain())
157 q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
158 questions.append(q)
160 name = "%s.%s" % ('bogusname', self.get_dns_domain())
161 q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
162 questions.append(q)
164 self.finish_name_packet(p, questions)
165 response = self.dns_transaction_udp(p)
166 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_FORMERR)
168 def test_qtype_all_query(self):
169 "create a QTYPE_ALL query"
170 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
171 questions = []
173 name = "%s.%s" % (os.getenv('DC_SERVER'), self.get_dns_domain())
174 q = self.make_name_question(name, dns.DNS_QTYPE_ALL, dns.DNS_QCLASS_IN)
175 print "asking for ", q.name
176 questions.append(q)
178 self.finish_name_packet(p, questions)
179 response = self.dns_transaction_udp(p)
181 num_answers = 1
182 dc_ipv6 = os.getenv('DC_SERVER_IPV6')
183 if dc_ipv6 is not None:
184 num_answers += 1
186 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
187 self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
188 self.assertEquals(response.ancount, num_answers)
189 self.assertEquals(response.answers[0].rdata,
190 os.getenv('DC_SERVER_IP'))
191 if dc_ipv6 is not None:
192 self.assertEquals(response.answers[1].rdata, dc_ipv6)
194 def test_qclass_none_query(self):
195 "create a QCLASS_NONE query"
196 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
197 questions = []
199 name = "%s.%s" % (os.getenv('DC_SERVER'), self.get_dns_domain())
200 q = self.make_name_question(name, dns.DNS_QTYPE_ALL, dns.DNS_QCLASS_NONE)
201 questions.append(q)
203 self.finish_name_packet(p, questions)
204 response = self.dns_transaction_udp(p)
205 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NOTIMP)
207 # Only returns an authority section entry in BIND and Win DNS
208 # FIXME: Enable one Samba implements this feature
209 # def test_soa_hostname_query(self):
210 # "create a SOA query for a hostname"
211 # p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
212 # questions = []
214 # name = "%s.%s" % (os.getenv('DC_SERVER'), self.get_dns_domain())
215 # q = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
216 # questions.append(q)
218 # self.finish_name_packet(p, questions)
219 # response = self.dns_transaction_udp(p)
220 # self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
221 # self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
222 # # We don't get SOA records for single hosts
223 # self.assertEquals(response.ancount, 0)
225 def test_soa_domain_query(self):
226 "create a SOA query for a domain"
227 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
228 questions = []
230 name = self.get_dns_domain()
231 q = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
232 questions.append(q)
234 self.finish_name_packet(p, questions)
235 response = self.dns_transaction_udp(p)
236 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
237 self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
238 self.assertEquals(response.ancount, 1)
240 def test_two_updates(self):
241 "create two update requests"
242 p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
243 updates = []
245 name = "%s.%s" % (os.getenv('DC_SERVER'), self.get_dns_domain())
246 u = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
247 updates.append(u)
249 name = self.get_dns_domain()
250 u = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
251 updates.append(u)
253 self.finish_name_packet(p, updates)
254 response = self.dns_transaction_udp(p)
255 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_FORMERR)
257 def test_update_wrong_qclass(self):
258 "create update with DNS_QCLASS_NONE"
259 p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
260 updates = []
262 name = self.get_dns_domain()
263 u = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_NONE)
264 updates.append(u)
266 self.finish_name_packet(p, updates)
267 response = self.dns_transaction_udp(p)
268 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NOTIMP)
270 def test_update_prereq_with_non_null_ttl(self):
271 "test update with a non-null TTL"
272 p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
273 updates = []
275 name = self.get_dns_domain()
277 u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
278 updates.append(u)
279 self.finish_name_packet(p, updates)
281 prereqs = []
282 r = dns.res_rec()
283 r.name = "%s.%s" % (os.getenv('DC_SERVER'), self.get_dns_domain())
284 r.rr_type = dns.DNS_QTYPE_TXT
285 r.rr_class = dns.DNS_QCLASS_NONE
286 r.ttl = 1
287 r.length = 0
288 prereqs.append(r)
290 p.ancount = len(prereqs)
291 p.answers = prereqs
293 response = self.dns_transaction_udp(p)
294 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_FORMERR)
296 # I'd love to test this one, but it segfaults. :)
297 # def test_update_prereq_with_non_null_length(self):
298 # "test update with a non-null length"
299 # p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
300 # updates = []
302 # name = self.get_dns_domain()
304 # u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
305 # updates.append(u)
306 # self.finish_name_packet(p, updates)
308 # prereqs = []
309 # r = dns.res_rec()
310 # r.name = "%s.%s" % (os.getenv('DC_SERVER'), self.get_dns_domain())
311 # r.rr_type = dns.DNS_QTYPE_TXT
312 # r.rr_class = dns.DNS_QCLASS_ANY
313 # r.ttl = 0
314 # r.length = 1
315 # prereqs.append(r)
317 # p.ancount = len(prereqs)
318 # p.answers = prereqs
320 # response = self.dns_transaction_udp(p)
321 # self.assert_dns_rcode_equals(response, dns.DNS_RCODE_FORMERR)
323 def test_update_prereq_nonexisting_name(self):
324 "test update with a nonexisting name"
325 p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
326 updates = []
328 name = self.get_dns_domain()
330 u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
331 updates.append(u)
332 self.finish_name_packet(p, updates)
334 prereqs = []
335 r = dns.res_rec()
336 r.name = "idontexist.%s" % self.get_dns_domain()
337 r.rr_type = dns.DNS_QTYPE_TXT
338 r.rr_class = dns.DNS_QCLASS_ANY
339 r.ttl = 0
340 r.length = 0
341 prereqs.append(r)
343 p.ancount = len(prereqs)
344 p.answers = prereqs
346 response = self.dns_transaction_udp(p)
347 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NXRRSET)
349 def test_update_add_txt_record(self):
350 "test adding records works"
351 p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
352 updates = []
354 name = self.get_dns_domain()
356 u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
357 updates.append(u)
358 self.finish_name_packet(p, updates)
360 updates = []
361 r = dns.res_rec()
362 r.name = "textrec.%s" % self.get_dns_domain()
363 r.rr_type = dns.DNS_QTYPE_TXT
364 r.rr_class = dns.DNS_QCLASS_IN
365 r.ttl = 900
366 r.length = 0xffff
367 r.rdata = dns.txt_record()
368 r.rdata.txt = '"This is a test"'
369 updates.append(r)
370 p.nscount = len(updates)
371 p.nsrecs = updates
373 response = self.dns_transaction_udp(p)
374 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
376 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
377 questions = []
379 name = "textrec.%s" % self.get_dns_domain()
380 q = self.make_name_question(name, dns.DNS_QTYPE_TXT, dns.DNS_QCLASS_IN)
381 questions.append(q)
383 self.finish_name_packet(p, questions)
384 response = self.dns_transaction_udp(p)
385 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
386 self.assertEquals(response.ancount, 1)
387 self.assertEquals(response.answers[0].rdata.txt, '"This is a test"')
389 def test_update_add_two_txt_records(self):
390 "test adding two txt records works"
391 p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
392 updates = []
394 name = self.get_dns_domain()
396 u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
397 updates.append(u)
398 self.finish_name_packet(p, updates)
400 updates = []
401 r = dns.res_rec()
402 r.name = "textrec2.%s" % self.get_dns_domain()
403 r.rr_type = dns.DNS_QTYPE_TXT
404 r.rr_class = dns.DNS_QCLASS_IN
405 r.ttl = 900
406 r.length = 0xffff
407 r.rdata = dns.txt_record()
408 r.rdata.txt = '"This is a test" "and this is a test, too"'
409 updates.append(r)
410 p.nscount = len(updates)
411 p.nsrecs = updates
413 response = self.dns_transaction_udp(p)
414 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
416 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
417 questions = []
419 name = "textrec2.%s" % self.get_dns_domain()
420 q = self.make_name_question(name, dns.DNS_QTYPE_TXT, dns.DNS_QCLASS_IN)
421 questions.append(q)
423 self.finish_name_packet(p, questions)
424 response = self.dns_transaction_udp(p)
425 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
426 self.assertEquals(response.ancount, 1)
427 self.assertEquals(response.answers[0].rdata.txt, '"This is a test" "and this is a test, too"')
430 def test_delete_record(self):
431 "Test if deleting records works"
432 p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
433 updates = []
435 name = self.get_dns_domain()
437 u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
438 updates.append(u)
439 self.finish_name_packet(p, updates)
441 updates = []
442 r = dns.res_rec()
443 r.name = "textrec.%s" % self.get_dns_domain()
444 r.rr_type = dns.DNS_QTYPE_TXT
445 r.rr_class = dns.DNS_QCLASS_NONE
446 r.ttl = 0
447 r.length = 0xffff
448 r.rdata = dns.txt_record()
449 r.rdata.txt = '"This is a test"'
450 updates.append(r)
451 p.nscount = len(updates)
452 p.nsrecs = updates
454 response = self.dns_transaction_udp(p)
455 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
457 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
458 questions = []
460 name = "textrec.%s" % self.get_dns_domain()
461 q = self.make_name_question(name, dns.DNS_QTYPE_TXT, dns.DNS_QCLASS_IN)
462 questions.append(q)
464 self.finish_name_packet(p, questions)
465 response = self.dns_transaction_udp(p)
466 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NXDOMAIN)
469 if __name__ == "__main__":
470 import unittest
471 unittest.main()