3 # Unix SMB/CIFS implementation.
4 # Copyright (C) Kai Blin <kai@samba.org> 2011
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 3 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful,
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 # GNU General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program. If not, see <http://www.gnu.org/licenses/>.
23 from samba
import socket
24 import samba
.ndr
as ndr
25 import samba
.dcerpc
.dns
as dns
26 from samba
.tests
import TestCase
28 class DNSTest(TestCase
):
30 def errstr(self
, errcode
):
31 "Return a readable error code"
46 return string_codes
[errcode
]
49 def assert_dns_rcode_equals(self
, packet
, rcode
):
50 "Helper function to check return code"
51 p_errcode
= packet
.operation
& 0x000F
52 self
.assertEquals(p_errcode
, rcode
, "Expected RCODE %s, got %s" % \
53 (self
.errstr(rcode
), self
.errstr(p_errcode
)))
55 def assert_dns_opcode_equals(self
, packet
, opcode
):
56 "Helper function to check opcode"
57 p_opcode
= packet
.operation
& 0x7800
58 self
.assertEquals(p_opcode
, opcode
, "Expected OPCODE %s, got %s" % \
61 def make_name_packet(self
, opcode
, qid
=None):
62 "Helper creating a dns.name_packet"
65 p
.id = random
.randint(0x0, 0xffff)
70 def finish_name_packet(self
, packet
, questions
):
71 "Helper to finalize a dns.name_packet"
72 packet
.qdcount
= len(questions
)
73 packet
.questions
= questions
75 def make_name_question(self
, name
, qtype
, qclass
):
76 "Helper creating a dns.name_question"
77 q
= dns
.name_question()
79 q
.question_type
= qtype
80 q
.question_class
= qclass
83 def get_dns_domain(self
):
84 "Helper to get dns domain"
85 return os
.getenv('REALM', 'example.com').lower()
87 def dns_transaction_udp(self
, packet
, host
=os
.getenv('DC_SERVER_IP')):
88 "send a DNS query and read the reply"
91 send_packet
= ndr
.ndr_pack(packet
)
92 s
= socket
.socket(socket
.AF_INET
, socket
.SOCK_DGRAM
, 0)
94 s
.send(send_packet
, 0)
95 recv_packet
= s
.recv(2048, 0)
96 return ndr
.ndr_unpack(dns
.name_packet
, recv_packet
)
101 def dns_transaction_tcp(self
, packet
, host
=os
.getenv('DC_SERVER_IP')):
102 "send a DNS query and read the reply"
105 send_packet
= ndr
.ndr_pack(packet
)
106 s
= socket
.socket(socket
.AF_INET
, socket
.SOCK_STREAM
, 0)
107 s
.connect((host
, 53))
108 tcp_packet
= struct
.pack('!H', len(send_packet
))
109 tcp_packet
+= send_packet
110 s
.send(tcp_packet
, 0)
111 recv_packet
= s
.recv(0xffff + 2, 0)
112 return ndr
.ndr_unpack(dns
.name_packet
, recv_packet
[2:])
117 def test_one_a_query(self
):
118 "create a query packet containing one query record"
119 p
= self
.make_name_packet(dns
.DNS_OPCODE_QUERY
)
122 name
= "%s.%s" % (os
.getenv('DC_SERVER'), self
.get_dns_domain())
123 q
= self
.make_name_question(name
, dns
.DNS_QTYPE_A
, dns
.DNS_QCLASS_IN
)
124 print "asking for ", q
.name
127 self
.finish_name_packet(p
, questions
)
128 response
= self
.dns_transaction_udp(p
)
129 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_OK
)
130 self
.assert_dns_opcode_equals(response
, dns
.DNS_OPCODE_QUERY
)
131 self
.assertEquals(response
.ancount
, 1)
132 self
.assertEquals(response
.answers
[0].rdata
,
133 os
.getenv('DC_SERVER_IP'))
135 def test_one_a_query_tcp(self
):
136 "create a query packet containing one query record via TCP"
137 p
= self
.make_name_packet(dns
.DNS_OPCODE_QUERY
)
140 name
= "%s.%s" % (os
.getenv('DC_SERVER'), self
.get_dns_domain())
141 q
= self
.make_name_question(name
, dns
.DNS_QTYPE_A
, dns
.DNS_QCLASS_IN
)
142 print "asking for ", q
.name
145 self
.finish_name_packet(p
, questions
)
146 response
= self
.dns_transaction_tcp(p
)
147 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_OK
)
148 self
.assert_dns_opcode_equals(response
, dns
.DNS_OPCODE_QUERY
)
149 self
.assertEquals(response
.ancount
, 1)
150 self
.assertEquals(response
.answers
[0].rdata
,
151 os
.getenv('DC_SERVER_IP'))
153 def test_two_queries(self
):
154 "create a query packet containing two query records"
155 p
= self
.make_name_packet(dns
.DNS_OPCODE_QUERY
)
158 name
= "%s.%s" % (os
.getenv('DC_SERVER'), self
.get_dns_domain())
159 q
= self
.make_name_question(name
, dns
.DNS_QTYPE_A
, dns
.DNS_QCLASS_IN
)
162 name
= "%s.%s" % ('bogusname', self
.get_dns_domain())
163 q
= self
.make_name_question(name
, dns
.DNS_QTYPE_A
, dns
.DNS_QCLASS_IN
)
166 self
.finish_name_packet(p
, questions
)
167 response
= self
.dns_transaction_udp(p
)
168 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_FORMERR
)
170 def test_qtype_all_query(self
):
171 "create a QTYPE_ALL query"
172 p
= self
.make_name_packet(dns
.DNS_OPCODE_QUERY
)
175 name
= "%s.%s" % (os
.getenv('DC_SERVER'), self
.get_dns_domain())
176 q
= self
.make_name_question(name
, dns
.DNS_QTYPE_ALL
, dns
.DNS_QCLASS_IN
)
177 print "asking for ", q
.name
180 self
.finish_name_packet(p
, questions
)
181 response
= self
.dns_transaction_udp(p
)
184 dc_ipv6
= os
.getenv('DC_SERVER_IPV6')
185 if dc_ipv6
is not None:
188 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_OK
)
189 self
.assert_dns_opcode_equals(response
, dns
.DNS_OPCODE_QUERY
)
190 self
.assertEquals(response
.ancount
, num_answers
)
191 self
.assertEquals(response
.answers
[0].rdata
,
192 os
.getenv('DC_SERVER_IP'))
193 if dc_ipv6
is not None:
194 self
.assertEquals(response
.answers
[1].rdata
, dc_ipv6
)
196 def test_qclass_none_query(self
):
197 "create a QCLASS_NONE query"
198 p
= self
.make_name_packet(dns
.DNS_OPCODE_QUERY
)
201 name
= "%s.%s" % (os
.getenv('DC_SERVER'), self
.get_dns_domain())
202 q
= self
.make_name_question(name
, dns
.DNS_QTYPE_ALL
, dns
.DNS_QCLASS_NONE
)
205 self
.finish_name_packet(p
, questions
)
206 response
= self
.dns_transaction_udp(p
)
207 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_NOTIMP
)
209 # Only returns an authority section entry in BIND and Win DNS
210 # FIXME: Enable one Samba implements this feature
211 # def test_soa_hostname_query(self):
212 # "create a SOA query for a hostname"
213 # p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
216 # name = "%s.%s" % (os.getenv('DC_SERVER'), self.get_dns_domain())
217 # q = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
218 # questions.append(q)
220 # self.finish_name_packet(p, questions)
221 # response = self.dns_transaction_udp(p)
222 # self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
223 # self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
224 # # We don't get SOA records for single hosts
225 # self.assertEquals(response.ancount, 0)
227 def test_soa_domain_query(self
):
228 "create a SOA query for a domain"
229 p
= self
.make_name_packet(dns
.DNS_OPCODE_QUERY
)
232 name
= self
.get_dns_domain()
233 q
= self
.make_name_question(name
, dns
.DNS_QTYPE_SOA
, dns
.DNS_QCLASS_IN
)
236 self
.finish_name_packet(p
, questions
)
237 response
= self
.dns_transaction_udp(p
)
238 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_OK
)
239 self
.assert_dns_opcode_equals(response
, dns
.DNS_OPCODE_QUERY
)
240 self
.assertEquals(response
.ancount
, 1)
242 def test_two_updates(self
):
243 "create two update requests"
244 p
= self
.make_name_packet(dns
.DNS_OPCODE_UPDATE
)
247 name
= "%s.%s" % (os
.getenv('DC_SERVER'), self
.get_dns_domain())
248 u
= self
.make_name_question(name
, dns
.DNS_QTYPE_A
, dns
.DNS_QCLASS_IN
)
251 name
= self
.get_dns_domain()
252 u
= self
.make_name_question(name
, dns
.DNS_QTYPE_A
, dns
.DNS_QCLASS_IN
)
255 self
.finish_name_packet(p
, updates
)
256 response
= self
.dns_transaction_udp(p
)
257 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_FORMERR
)
259 def test_update_wrong_qclass(self
):
260 "create update with DNS_QCLASS_NONE"
261 p
= self
.make_name_packet(dns
.DNS_OPCODE_UPDATE
)
264 name
= self
.get_dns_domain()
265 u
= self
.make_name_question(name
, dns
.DNS_QTYPE_A
, dns
.DNS_QCLASS_NONE
)
268 self
.finish_name_packet(p
, updates
)
269 response
= self
.dns_transaction_udp(p
)
270 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_NOTIMP
)
272 def test_update_prereq_with_non_null_ttl(self
):
273 "test update with a non-null TTL"
274 p
= self
.make_name_packet(dns
.DNS_OPCODE_UPDATE
)
277 name
= self
.get_dns_domain()
279 u
= self
.make_name_question(name
, dns
.DNS_QTYPE_SOA
, dns
.DNS_QCLASS_IN
)
281 self
.finish_name_packet(p
, updates
)
285 r
.name
= "%s.%s" % (os
.getenv('DC_SERVER'), self
.get_dns_domain())
286 r
.rr_type
= dns
.DNS_QTYPE_TXT
287 r
.rr_class
= dns
.DNS_QCLASS_NONE
292 p
.ancount
= len(prereqs
)
295 response
= self
.dns_transaction_udp(p
)
296 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_FORMERR
)
298 # I'd love to test this one, but it segfaults. :)
299 # def test_update_prereq_with_non_null_length(self):
300 # "test update with a non-null length"
301 # p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
304 # name = self.get_dns_domain()
306 # u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
308 # self.finish_name_packet(p, updates)
312 # r.name = "%s.%s" % (os.getenv('DC_SERVER'), self.get_dns_domain())
313 # r.rr_type = dns.DNS_QTYPE_TXT
314 # r.rr_class = dns.DNS_QCLASS_ANY
319 # p.ancount = len(prereqs)
320 # p.answers = prereqs
322 # response = self.dns_transaction_udp(p)
323 # self.assert_dns_rcode_equals(response, dns.DNS_RCODE_FORMERR)
325 def test_update_prereq_nonexisting_name(self
):
326 "test update with a nonexisting name"
327 p
= self
.make_name_packet(dns
.DNS_OPCODE_UPDATE
)
330 name
= self
.get_dns_domain()
332 u
= self
.make_name_question(name
, dns
.DNS_QTYPE_SOA
, dns
.DNS_QCLASS_IN
)
334 self
.finish_name_packet(p
, updates
)
338 r
.name
= "idontexist.%s" % self
.get_dns_domain()
339 r
.rr_type
= dns
.DNS_QTYPE_TXT
340 r
.rr_class
= dns
.DNS_QCLASS_ANY
345 p
.ancount
= len(prereqs
)
348 response
= self
.dns_transaction_udp(p
)
349 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_NXRRSET
)
351 def test_update_add_txt_record(self
):
352 "test adding records works"
353 p
= self
.make_name_packet(dns
.DNS_OPCODE_UPDATE
)
356 name
= self
.get_dns_domain()
358 u
= self
.make_name_question(name
, dns
.DNS_QTYPE_SOA
, dns
.DNS_QCLASS_IN
)
360 self
.finish_name_packet(p
, updates
)
364 r
.name
= "textrec.%s" % self
.get_dns_domain()
365 r
.rr_type
= dns
.DNS_QTYPE_TXT
366 r
.rr_class
= dns
.DNS_QCLASS_IN
369 r
.rdata
= dns
.txt_record()
370 r
.rdata
.txt
= '"This is a test"'
372 p
.nscount
= len(updates
)
375 response
= self
.dns_transaction_udp(p
)
376 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_OK
)
378 p
= self
.make_name_packet(dns
.DNS_OPCODE_QUERY
)
381 name
= "textrec.%s" % self
.get_dns_domain()
382 q
= self
.make_name_question(name
, dns
.DNS_QTYPE_TXT
, dns
.DNS_QCLASS_IN
)
385 self
.finish_name_packet(p
, questions
)
386 response
= self
.dns_transaction_udp(p
)
387 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_OK
)
388 self
.assertEquals(response
.ancount
, 1)
389 self
.assertEquals(response
.answers
[0].rdata
.txt
, '"This is a test"')
391 def test_update_add_two_txt_records(self
):
392 "test adding two txt records works"
393 p
= self
.make_name_packet(dns
.DNS_OPCODE_UPDATE
)
396 name
= self
.get_dns_domain()
398 u
= self
.make_name_question(name
, dns
.DNS_QTYPE_SOA
, dns
.DNS_QCLASS_IN
)
400 self
.finish_name_packet(p
, updates
)
404 r
.name
= "textrec2.%s" % self
.get_dns_domain()
405 r
.rr_type
= dns
.DNS_QTYPE_TXT
406 r
.rr_class
= dns
.DNS_QCLASS_IN
409 r
.rdata
= dns
.txt_record()
410 r
.rdata
.txt
= '"This is a test" "and this is a test, too"'
412 p
.nscount
= len(updates
)
415 response
= self
.dns_transaction_udp(p
)
416 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_OK
)
418 p
= self
.make_name_packet(dns
.DNS_OPCODE_QUERY
)
421 name
= "textrec2.%s" % self
.get_dns_domain()
422 q
= self
.make_name_question(name
, dns
.DNS_QTYPE_TXT
, dns
.DNS_QCLASS_IN
)
425 self
.finish_name_packet(p
, questions
)
426 response
= self
.dns_transaction_udp(p
)
427 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_OK
)
428 self
.assertEquals(response
.ancount
, 1)
429 self
.assertEquals(response
.answers
[0].rdata
.txt
, '"This is a test" "and this is a test, too"')
432 def test_delete_record(self
):
433 "Test if deleting records works"
434 p
= self
.make_name_packet(dns
.DNS_OPCODE_UPDATE
)
437 name
= self
.get_dns_domain()
439 u
= self
.make_name_question(name
, dns
.DNS_QTYPE_SOA
, dns
.DNS_QCLASS_IN
)
441 self
.finish_name_packet(p
, updates
)
445 r
.name
= "textrec.%s" % self
.get_dns_domain()
446 r
.rr_type
= dns
.DNS_QTYPE_TXT
447 r
.rr_class
= dns
.DNS_QCLASS_NONE
450 r
.rdata
= dns
.txt_record()
451 r
.rdata
.txt
= '"This is a test"'
453 p
.nscount
= len(updates
)
456 response
= self
.dns_transaction_udp(p
)
457 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_OK
)
459 p
= self
.make_name_packet(dns
.DNS_OPCODE_QUERY
)
462 name
= "textrec.%s" % self
.get_dns_domain()
463 q
= self
.make_name_question(name
, dns
.DNS_QTYPE_TXT
, dns
.DNS_QCLASS_IN
)
466 self
.finish_name_packet(p
, questions
)
467 response
= self
.dns_transaction_udp(p
)
468 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_NXDOMAIN
)
471 if __name__
== "__main__":