3 # Unix SMB/CIFS implementation.
4 # Copyright (C) Kai Blin <kai@samba.org> 2011
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 3 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful,
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 # GNU General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program. If not, see <http://www.gnu.org/licenses/>.
24 from samba
import socket
25 import samba
.ndr
as ndr
26 import samba
.dcerpc
.dns
as dns
27 from samba
.tests
import TestCase
29 class DNSTest(TestCase
):
31 def errstr(self
, errcode
):
32 "Return a readable error code"
47 return string_codes
[errcode
]
50 def assert_dns_rcode_equals(self
, packet
, rcode
):
51 "Helper function to check return code"
52 p_errcode
= packet
.operation
& 0x000F
53 self
.assertEquals(p_errcode
, rcode
, "Expected RCODE %s, got %s" % \
54 (self
.errstr(rcode
), self
.errstr(p_errcode
)))
56 def assert_dns_opcode_equals(self
, packet
, opcode
):
57 "Helper function to check opcode"
58 p_opcode
= packet
.operation
& 0x7800
59 self
.assertEquals(p_opcode
, opcode
, "Expected OPCODE %s, got %s" % \
62 def make_name_packet(self
, opcode
, qid
=None):
63 "Helper creating a dns.name_packet"
66 p
.id = random
.randint(0x0, 0xffff)
71 def finish_name_packet(self
, packet
, questions
):
72 "Helper to finalize a dns.name_packet"
73 packet
.qdcount
= len(questions
)
74 packet
.questions
= questions
76 def make_name_question(self
, name
, qtype
, qclass
):
77 "Helper creating a dns.name_question"
78 q
= dns
.name_question()
80 q
.question_type
= qtype
81 q
.question_class
= qclass
84 def get_dns_domain(self
):
85 "Helper to get dns domain"
86 return os
.getenv('REALM', 'example.com').lower()
88 def dns_transaction_udp(self
, packet
, host
=os
.getenv('DC_SERVER_IP')):
89 "send a DNS query and read the reply"
92 send_packet
= ndr
.ndr_pack(packet
)
93 s
= socket
.socket(socket
.AF_INET
, socket
.SOCK_DGRAM
, 0)
95 s
.send(send_packet
, 0)
96 recv_packet
= s
.recv(2048, 0)
97 return ndr
.ndr_unpack(dns
.name_packet
, recv_packet
)
102 def dns_transaction_tcp(self
, packet
, host
=os
.getenv('DC_SERVER_IP')):
103 "send a DNS query and read the reply"
106 send_packet
= ndr
.ndr_pack(packet
)
107 s
= socket
.socket(socket
.AF_INET
, socket
.SOCK_STREAM
, 0)
108 s
.connect((host
, 53))
109 tcp_packet
= struct
.pack('!H', len(send_packet
))
110 tcp_packet
+= send_packet
111 s
.send(tcp_packet
, 0)
112 recv_packet
= s
.recv(0xffff + 2, 0)
113 return ndr
.ndr_unpack(dns
.name_packet
, recv_packet
[2:])
118 def test_one_a_query(self
):
119 "create a query packet containing one query record"
120 p
= self
.make_name_packet(dns
.DNS_OPCODE_QUERY
)
123 name
= "%s.%s" % (os
.getenv('DC_SERVER'), self
.get_dns_domain())
124 q
= self
.make_name_question(name
, dns
.DNS_QTYPE_A
, dns
.DNS_QCLASS_IN
)
125 print "asking for ", q
.name
128 self
.finish_name_packet(p
, questions
)
129 response
= self
.dns_transaction_udp(p
)
130 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_OK
)
131 self
.assert_dns_opcode_equals(response
, dns
.DNS_OPCODE_QUERY
)
132 self
.assertEquals(response
.ancount
, 1)
133 self
.assertEquals(response
.answers
[0].rdata
,
134 os
.getenv('DC_SERVER_IP'))
136 def test_one_a_query_tcp(self
):
137 "create a query packet containing one query record via TCP"
138 p
= self
.make_name_packet(dns
.DNS_OPCODE_QUERY
)
141 name
= "%s.%s" % (os
.getenv('DC_SERVER'), self
.get_dns_domain())
142 q
= self
.make_name_question(name
, dns
.DNS_QTYPE_A
, dns
.DNS_QCLASS_IN
)
143 print "asking for ", q
.name
146 self
.finish_name_packet(p
, questions
)
147 response
= self
.dns_transaction_tcp(p
)
148 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_OK
)
149 self
.assert_dns_opcode_equals(response
, dns
.DNS_OPCODE_QUERY
)
150 self
.assertEquals(response
.ancount
, 1)
151 self
.assertEquals(response
.answers
[0].rdata
,
152 os
.getenv('DC_SERVER_IP'))
154 def test_two_queries(self
):
155 "create a query packet containing two query records"
156 p
= self
.make_name_packet(dns
.DNS_OPCODE_QUERY
)
159 name
= "%s.%s" % (os
.getenv('DC_SERVER'), self
.get_dns_domain())
160 q
= self
.make_name_question(name
, dns
.DNS_QTYPE_A
, dns
.DNS_QCLASS_IN
)
163 name
= "%s.%s" % ('bogusname', self
.get_dns_domain())
164 q
= self
.make_name_question(name
, dns
.DNS_QTYPE_A
, dns
.DNS_QCLASS_IN
)
167 self
.finish_name_packet(p
, questions
)
168 response
= self
.dns_transaction_udp(p
)
169 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_FORMERR
)
171 def test_qtype_all_query(self
):
172 "create a QTYPE_ALL query"
173 p
= self
.make_name_packet(dns
.DNS_OPCODE_QUERY
)
176 name
= "%s.%s" % (os
.getenv('DC_SERVER'), self
.get_dns_domain())
177 q
= self
.make_name_question(name
, dns
.DNS_QTYPE_ALL
, dns
.DNS_QCLASS_IN
)
178 print "asking for ", q
.name
181 self
.finish_name_packet(p
, questions
)
182 response
= self
.dns_transaction_udp(p
)
185 dc_ipv6
= os
.getenv('DC_SERVER_IPV6')
186 if dc_ipv6
is not None:
189 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_OK
)
190 self
.assert_dns_opcode_equals(response
, dns
.DNS_OPCODE_QUERY
)
191 self
.assertEquals(response
.ancount
, num_answers
)
192 self
.assertEquals(response
.answers
[0].rdata
,
193 os
.getenv('DC_SERVER_IP'))
194 if dc_ipv6
is not None:
195 self
.assertEquals(response
.answers
[1].rdata
, dc_ipv6
)
197 def test_qclass_none_query(self
):
198 "create a QCLASS_NONE query"
199 p
= self
.make_name_packet(dns
.DNS_OPCODE_QUERY
)
202 name
= "%s.%s" % (os
.getenv('DC_SERVER'), self
.get_dns_domain())
203 q
= self
.make_name_question(name
, dns
.DNS_QTYPE_ALL
, dns
.DNS_QCLASS_NONE
)
206 self
.finish_name_packet(p
, questions
)
207 response
= self
.dns_transaction_udp(p
)
208 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_NOTIMP
)
210 # Only returns an authority section entry in BIND and Win DNS
211 # FIXME: Enable one Samba implements this feature
212 # def test_soa_hostname_query(self):
213 # "create a SOA query for a hostname"
214 # p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
217 # name = "%s.%s" % (os.getenv('DC_SERVER'), self.get_dns_domain())
218 # q = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
219 # questions.append(q)
221 # self.finish_name_packet(p, questions)
222 # response = self.dns_transaction_udp(p)
223 # self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
224 # self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
225 # # We don't get SOA records for single hosts
226 # self.assertEquals(response.ancount, 0)
228 def test_soa_domain_query(self
):
229 "create a SOA query for a domain"
230 p
= self
.make_name_packet(dns
.DNS_OPCODE_QUERY
)
233 name
= self
.get_dns_domain()
234 q
= self
.make_name_question(name
, dns
.DNS_QTYPE_SOA
, dns
.DNS_QCLASS_IN
)
237 self
.finish_name_packet(p
, questions
)
238 response
= self
.dns_transaction_udp(p
)
239 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_OK
)
240 self
.assert_dns_opcode_equals(response
, dns
.DNS_OPCODE_QUERY
)
241 self
.assertEquals(response
.ancount
, 1)
243 def test_two_updates(self
):
244 "create two update requests"
245 p
= self
.make_name_packet(dns
.DNS_OPCODE_UPDATE
)
248 name
= "%s.%s" % (os
.getenv('DC_SERVER'), self
.get_dns_domain())
249 u
= self
.make_name_question(name
, dns
.DNS_QTYPE_A
, dns
.DNS_QCLASS_IN
)
252 name
= self
.get_dns_domain()
253 u
= self
.make_name_question(name
, dns
.DNS_QTYPE_A
, dns
.DNS_QCLASS_IN
)
256 self
.finish_name_packet(p
, updates
)
257 response
= self
.dns_transaction_udp(p
)
258 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_FORMERR
)
260 def test_update_wrong_qclass(self
):
261 "create update with DNS_QCLASS_NONE"
262 p
= self
.make_name_packet(dns
.DNS_OPCODE_UPDATE
)
265 name
= self
.get_dns_domain()
266 u
= self
.make_name_question(name
, dns
.DNS_QTYPE_A
, dns
.DNS_QCLASS_NONE
)
269 self
.finish_name_packet(p
, updates
)
270 response
= self
.dns_transaction_udp(p
)
271 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_NOTIMP
)
273 def test_update_prereq_with_non_null_ttl(self
):
274 "test update with a non-null TTL"
275 p
= self
.make_name_packet(dns
.DNS_OPCODE_UPDATE
)
278 name
= self
.get_dns_domain()
280 u
= self
.make_name_question(name
, dns
.DNS_QTYPE_SOA
, dns
.DNS_QCLASS_IN
)
282 self
.finish_name_packet(p
, updates
)
286 r
.name
= "%s.%s" % (os
.getenv('DC_SERVER'), self
.get_dns_domain())
287 r
.rr_type
= dns
.DNS_QTYPE_TXT
288 r
.rr_class
= dns
.DNS_QCLASS_NONE
293 p
.ancount
= len(prereqs
)
296 response
= self
.dns_transaction_udp(p
)
297 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_FORMERR
)
299 # I'd love to test this one, but it segfaults. :)
300 # def test_update_prereq_with_non_null_length(self):
301 # "test update with a non-null length"
302 # p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
305 # name = self.get_dns_domain()
307 # u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
309 # self.finish_name_packet(p, updates)
313 # r.name = "%s.%s" % (os.getenv('DC_SERVER'), self.get_dns_domain())
314 # r.rr_type = dns.DNS_QTYPE_TXT
315 # r.rr_class = dns.DNS_QCLASS_ANY
320 # p.ancount = len(prereqs)
321 # p.answers = prereqs
323 # response = self.dns_transaction_udp(p)
324 # self.assert_dns_rcode_equals(response, dns.DNS_RCODE_FORMERR)
326 def test_update_prereq_nonexisting_name(self
):
327 "test update with a non-null TTL"
328 p
= self
.make_name_packet(dns
.DNS_OPCODE_UPDATE
)
331 name
= self
.get_dns_domain()
333 u
= self
.make_name_question(name
, dns
.DNS_QTYPE_SOA
, dns
.DNS_QCLASS_IN
)
335 self
.finish_name_packet(p
, updates
)
339 r
.name
= "idontexist.%s" % self
.get_dns_domain()
340 r
.rr_type
= dns
.DNS_QTYPE_TXT
341 r
.rr_class
= dns
.DNS_QCLASS_ANY
346 p
.ancount
= len(prereqs
)
349 response
= self
.dns_transaction_udp(p
)
350 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_NXRRSET
)
352 def test_update_add_txt_record(self
):
353 "test adding records works"
354 p
= self
.make_name_packet(dns
.DNS_OPCODE_UPDATE
)
357 name
= self
.get_dns_domain()
359 u
= self
.make_name_question(name
, dns
.DNS_QTYPE_SOA
, dns
.DNS_QCLASS_IN
)
361 self
.finish_name_packet(p
, updates
)
365 r
.name
= "textrec.%s" % self
.get_dns_domain()
366 r
.rr_type
= dns
.DNS_QTYPE_TXT
367 r
.rr_class
= dns
.DNS_QCLASS_IN
370 r
.rdata
= dns
.txt_record()
371 r
.rdata
.txt
= '"This is a test"'
373 p
.nscount
= len(updates
)
376 response
= self
.dns_transaction_udp(p
)
377 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_OK
)
379 p
= self
.make_name_packet(dns
.DNS_OPCODE_QUERY
)
382 name
= "textrec.%s" % self
.get_dns_domain()
383 q
= self
.make_name_question(name
, dns
.DNS_QTYPE_TXT
, dns
.DNS_QCLASS_IN
)
386 self
.finish_name_packet(p
, questions
)
387 response
= self
.dns_transaction_udp(p
)
388 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_OK
)
389 self
.assertEquals(response
.ancount
, 1)
390 self
.assertEquals(response
.answers
[0].rdata
.txt
, '"This is a test"')
392 def test_update_add_two_txt_records(self
):
393 "test adding two txt records works"
394 p
= self
.make_name_packet(dns
.DNS_OPCODE_UPDATE
)
397 name
= self
.get_dns_domain()
399 u
= self
.make_name_question(name
, dns
.DNS_QTYPE_SOA
, dns
.DNS_QCLASS_IN
)
401 self
.finish_name_packet(p
, updates
)
405 r
.name
= "textrec2.%s" % self
.get_dns_domain()
406 r
.rr_type
= dns
.DNS_QTYPE_TXT
407 r
.rr_class
= dns
.DNS_QCLASS_IN
410 r
.rdata
= dns
.txt_record()
411 r
.rdata
.txt
= '"This is a test" "and this is a test, too"'
413 p
.nscount
= len(updates
)
416 response
= self
.dns_transaction_udp(p
)
417 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_OK
)
419 p
= self
.make_name_packet(dns
.DNS_OPCODE_QUERY
)
422 name
= "textrec2.%s" % self
.get_dns_domain()
423 q
= self
.make_name_question(name
, dns
.DNS_QTYPE_TXT
, dns
.DNS_QCLASS_IN
)
426 self
.finish_name_packet(p
, questions
)
427 response
= self
.dns_transaction_udp(p
)
428 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_OK
)
429 self
.assertEquals(response
.ancount
, 1)
430 self
.assertEquals(response
.answers
[0].rdata
.txt
, '"This is a test" "and this is a test, too"')
433 if __name__
== "__main__":