1 # Unix SMB/CIFS implementation.
2 # Copyright (C) Kai Blin <kai@samba.org> 2011
4 # This program is free software; you can redistribute it and/or modify
5 # it under the terms of the GNU General Public License as published by
6 # the Free Software Foundation; either version 3 of the License, or
7 # (at your option) any later version.
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
14 # You should have received a copy of the GNU General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
21 from samba
import socket
22 import samba
.ndr
as ndr
23 import samba
.dcerpc
.dns
as dns
24 from samba
.tests
import TestCase
26 class DNSTest(TestCase
):
28 def errstr(self
, errcode
):
29 "Return a readable error code"
44 return string_codes
[errcode
]
47 def assert_dns_rcode_equals(self
, packet
, rcode
):
48 "Helper function to check return code"
49 p_errcode
= packet
.operation
& 0x000F
50 self
.assertEquals(p_errcode
, rcode
, "Expected RCODE %s, got %s" % \
51 (self
.errstr(rcode
), self
.errstr(p_errcode
)))
53 def assert_dns_opcode_equals(self
, packet
, opcode
):
54 "Helper function to check opcode"
55 p_opcode
= packet
.operation
& 0x7800
56 self
.assertEquals(p_opcode
, opcode
, "Expected OPCODE %s, got %s" % \
59 def make_name_packet(self
, opcode
, qid
=None):
60 "Helper creating a dns.name_packet"
63 p
.id = random
.randint(0x0, 0xffff)
68 def finish_name_packet(self
, packet
, questions
):
69 "Helper to finalize a dns.name_packet"
70 packet
.qdcount
= len(questions
)
71 packet
.questions
= questions
73 def make_name_question(self
, name
, qtype
, qclass
):
74 "Helper creating a dns.name_question"
75 q
= dns
.name_question()
77 q
.question_type
= qtype
78 q
.question_class
= qclass
81 def get_dns_domain(self
):
82 "Helper to get dns domain"
83 return os
.getenv('REALM', 'example.com').lower()
85 def dns_transaction_udp(self
, packet
, host
=os
.getenv('DC_SERVER_IP')):
86 "send a DNS query and read the reply"
89 send_packet
= ndr
.ndr_pack(packet
)
90 s
= socket
.socket(socket
.AF_INET
, socket
.SOCK_DGRAM
, 0)
92 s
.send(send_packet
, 0)
93 recv_packet
= s
.recv(2048, 0)
94 return ndr
.ndr_unpack(dns
.name_packet
, recv_packet
)
99 def dns_transaction_tcp(self
, packet
, host
=os
.getenv('DC_SERVER_IP')):
100 "send a DNS query and read the reply"
103 send_packet
= ndr
.ndr_pack(packet
)
104 s
= socket
.socket(socket
.AF_INET
, socket
.SOCK_STREAM
, 0)
105 s
.connect((host
, 53))
106 tcp_packet
= struct
.pack('!H', len(send_packet
))
107 tcp_packet
+= send_packet
108 s
.send(tcp_packet
, 0)
109 recv_packet
= s
.recv(0xffff + 2, 0)
110 return ndr
.ndr_unpack(dns
.name_packet
, recv_packet
[2:])
115 def test_one_a_query(self
):
116 "create a query packet containing one query record"
117 p
= self
.make_name_packet(dns
.DNS_OPCODE_QUERY
)
120 name
= "%s.%s" % (os
.getenv('DC_SERVER'), self
.get_dns_domain())
121 q
= self
.make_name_question(name
, dns
.DNS_QTYPE_A
, dns
.DNS_QCLASS_IN
)
122 print "asking for ", q
.name
125 self
.finish_name_packet(p
, questions
)
126 response
= self
.dns_transaction_udp(p
)
127 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_OK
)
128 self
.assert_dns_opcode_equals(response
, dns
.DNS_OPCODE_QUERY
)
129 self
.assertEquals(response
.ancount
, 1)
130 self
.assertEquals(response
.answers
[0].rdata
,
131 os
.getenv('DC_SERVER_IP'))
133 def test_one_a_query_tcp(self
):
134 "create a query packet containing one query record via TCP"
135 p
= self
.make_name_packet(dns
.DNS_OPCODE_QUERY
)
138 name
= "%s.%s" % (os
.getenv('DC_SERVER'), self
.get_dns_domain())
139 q
= self
.make_name_question(name
, dns
.DNS_QTYPE_A
, dns
.DNS_QCLASS_IN
)
140 print "asking for ", q
.name
143 self
.finish_name_packet(p
, questions
)
144 response
= self
.dns_transaction_tcp(p
)
145 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_OK
)
146 self
.assert_dns_opcode_equals(response
, dns
.DNS_OPCODE_QUERY
)
147 self
.assertEquals(response
.ancount
, 1)
148 self
.assertEquals(response
.answers
[0].rdata
,
149 os
.getenv('DC_SERVER_IP'))
151 def test_two_queries(self
):
152 "create a query packet containing two query records"
153 p
= self
.make_name_packet(dns
.DNS_OPCODE_QUERY
)
156 name
= "%s.%s" % (os
.getenv('DC_SERVER'), self
.get_dns_domain())
157 q
= self
.make_name_question(name
, dns
.DNS_QTYPE_A
, dns
.DNS_QCLASS_IN
)
160 name
= "%s.%s" % ('bogusname', self
.get_dns_domain())
161 q
= self
.make_name_question(name
, dns
.DNS_QTYPE_A
, dns
.DNS_QCLASS_IN
)
164 self
.finish_name_packet(p
, questions
)
165 response
= self
.dns_transaction_udp(p
)
166 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_FORMERR
)
168 def test_qtype_all_query(self
):
169 "create a QTYPE_ALL query"
170 p
= self
.make_name_packet(dns
.DNS_OPCODE_QUERY
)
173 name
= "%s.%s" % (os
.getenv('DC_SERVER'), self
.get_dns_domain())
174 q
= self
.make_name_question(name
, dns
.DNS_QTYPE_ALL
, dns
.DNS_QCLASS_IN
)
175 print "asking for ", q
.name
178 self
.finish_name_packet(p
, questions
)
179 response
= self
.dns_transaction_udp(p
)
182 dc_ipv6
= os
.getenv('DC_SERVER_IPV6')
183 if dc_ipv6
is not None:
186 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_OK
)
187 self
.assert_dns_opcode_equals(response
, dns
.DNS_OPCODE_QUERY
)
188 self
.assertEquals(response
.ancount
, num_answers
)
189 self
.assertEquals(response
.answers
[0].rdata
,
190 os
.getenv('DC_SERVER_IP'))
191 if dc_ipv6
is not None:
192 self
.assertEquals(response
.answers
[1].rdata
, dc_ipv6
)
194 def test_qclass_none_query(self
):
195 "create a QCLASS_NONE query"
196 p
= self
.make_name_packet(dns
.DNS_OPCODE_QUERY
)
199 name
= "%s.%s" % (os
.getenv('DC_SERVER'), self
.get_dns_domain())
200 q
= self
.make_name_question(name
, dns
.DNS_QTYPE_ALL
, dns
.DNS_QCLASS_NONE
)
203 self
.finish_name_packet(p
, questions
)
204 response
= self
.dns_transaction_udp(p
)
205 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_NOTIMP
)
207 # Only returns an authority section entry in BIND and Win DNS
208 # FIXME: Enable one Samba implements this feature
209 # def test_soa_hostname_query(self):
210 # "create a SOA query for a hostname"
211 # p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
214 # name = "%s.%s" % (os.getenv('DC_SERVER'), self.get_dns_domain())
215 # q = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
216 # questions.append(q)
218 # self.finish_name_packet(p, questions)
219 # response = self.dns_transaction_udp(p)
220 # self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
221 # self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
222 # # We don't get SOA records for single hosts
223 # self.assertEquals(response.ancount, 0)
225 def test_soa_domain_query(self
):
226 "create a SOA query for a domain"
227 p
= self
.make_name_packet(dns
.DNS_OPCODE_QUERY
)
230 name
= self
.get_dns_domain()
231 q
= self
.make_name_question(name
, dns
.DNS_QTYPE_SOA
, dns
.DNS_QCLASS_IN
)
234 self
.finish_name_packet(p
, questions
)
235 response
= self
.dns_transaction_udp(p
)
236 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_OK
)
237 self
.assert_dns_opcode_equals(response
, dns
.DNS_OPCODE_QUERY
)
238 self
.assertEquals(response
.ancount
, 1)
240 def test_two_updates(self
):
241 "create two update requests"
242 p
= self
.make_name_packet(dns
.DNS_OPCODE_UPDATE
)
245 name
= "%s.%s" % (os
.getenv('DC_SERVER'), self
.get_dns_domain())
246 u
= self
.make_name_question(name
, dns
.DNS_QTYPE_A
, dns
.DNS_QCLASS_IN
)
249 name
= self
.get_dns_domain()
250 u
= self
.make_name_question(name
, dns
.DNS_QTYPE_A
, dns
.DNS_QCLASS_IN
)
253 self
.finish_name_packet(p
, updates
)
254 response
= self
.dns_transaction_udp(p
)
255 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_FORMERR
)
257 def test_update_wrong_qclass(self
):
258 "create update with DNS_QCLASS_NONE"
259 p
= self
.make_name_packet(dns
.DNS_OPCODE_UPDATE
)
262 name
= self
.get_dns_domain()
263 u
= self
.make_name_question(name
, dns
.DNS_QTYPE_A
, dns
.DNS_QCLASS_NONE
)
266 self
.finish_name_packet(p
, updates
)
267 response
= self
.dns_transaction_udp(p
)
268 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_NOTIMP
)
270 def test_update_prereq_with_non_null_ttl(self
):
271 "test update with a non-null TTL"
272 p
= self
.make_name_packet(dns
.DNS_OPCODE_UPDATE
)
275 name
= self
.get_dns_domain()
277 u
= self
.make_name_question(name
, dns
.DNS_QTYPE_SOA
, dns
.DNS_QCLASS_IN
)
279 self
.finish_name_packet(p
, updates
)
283 r
.name
= "%s.%s" % (os
.getenv('DC_SERVER'), self
.get_dns_domain())
284 r
.rr_type
= dns
.DNS_QTYPE_TXT
285 r
.rr_class
= dns
.DNS_QCLASS_NONE
290 p
.ancount
= len(prereqs
)
293 response
= self
.dns_transaction_udp(p
)
294 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_FORMERR
)
296 # I'd love to test this one, but it segfaults. :)
297 # def test_update_prereq_with_non_null_length(self):
298 # "test update with a non-null length"
299 # p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
302 # name = self.get_dns_domain()
304 # u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
306 # self.finish_name_packet(p, updates)
310 # r.name = "%s.%s" % (os.getenv('DC_SERVER'), self.get_dns_domain())
311 # r.rr_type = dns.DNS_QTYPE_TXT
312 # r.rr_class = dns.DNS_QCLASS_ANY
317 # p.ancount = len(prereqs)
318 # p.answers = prereqs
320 # response = self.dns_transaction_udp(p)
321 # self.assert_dns_rcode_equals(response, dns.DNS_RCODE_FORMERR)
323 def test_update_prereq_nonexisting_name(self
):
324 "test update with a nonexisting name"
325 p
= self
.make_name_packet(dns
.DNS_OPCODE_UPDATE
)
328 name
= self
.get_dns_domain()
330 u
= self
.make_name_question(name
, dns
.DNS_QTYPE_SOA
, dns
.DNS_QCLASS_IN
)
332 self
.finish_name_packet(p
, updates
)
336 r
.name
= "idontexist.%s" % self
.get_dns_domain()
337 r
.rr_type
= dns
.DNS_QTYPE_TXT
338 r
.rr_class
= dns
.DNS_QCLASS_ANY
343 p
.ancount
= len(prereqs
)
346 response
= self
.dns_transaction_udp(p
)
347 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_NXRRSET
)
349 def test_update_add_txt_record(self
):
350 "test adding records works"
351 p
= self
.make_name_packet(dns
.DNS_OPCODE_UPDATE
)
354 name
= self
.get_dns_domain()
356 u
= self
.make_name_question(name
, dns
.DNS_QTYPE_SOA
, dns
.DNS_QCLASS_IN
)
358 self
.finish_name_packet(p
, updates
)
362 r
.name
= "textrec.%s" % self
.get_dns_domain()
363 r
.rr_type
= dns
.DNS_QTYPE_TXT
364 r
.rr_class
= dns
.DNS_QCLASS_IN
367 r
.rdata
= dns
.txt_record()
368 r
.rdata
.txt
= '"This is a test"'
370 p
.nscount
= len(updates
)
373 response
= self
.dns_transaction_udp(p
)
374 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_OK
)
376 p
= self
.make_name_packet(dns
.DNS_OPCODE_QUERY
)
379 name
= "textrec.%s" % self
.get_dns_domain()
380 q
= self
.make_name_question(name
, dns
.DNS_QTYPE_TXT
, dns
.DNS_QCLASS_IN
)
383 self
.finish_name_packet(p
, questions
)
384 response
= self
.dns_transaction_udp(p
)
385 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_OK
)
386 self
.assertEquals(response
.ancount
, 1)
387 self
.assertEquals(response
.answers
[0].rdata
.txt
, '"This is a test"')
389 def test_update_add_two_txt_records(self
):
390 "test adding two txt records works"
391 p
= self
.make_name_packet(dns
.DNS_OPCODE_UPDATE
)
394 name
= self
.get_dns_domain()
396 u
= self
.make_name_question(name
, dns
.DNS_QTYPE_SOA
, dns
.DNS_QCLASS_IN
)
398 self
.finish_name_packet(p
, updates
)
402 r
.name
= "textrec2.%s" % self
.get_dns_domain()
403 r
.rr_type
= dns
.DNS_QTYPE_TXT
404 r
.rr_class
= dns
.DNS_QCLASS_IN
407 r
.rdata
= dns
.txt_record()
408 r
.rdata
.txt
= '"This is a test" "and this is a test, too"'
410 p
.nscount
= len(updates
)
413 response
= self
.dns_transaction_udp(p
)
414 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_OK
)
416 p
= self
.make_name_packet(dns
.DNS_OPCODE_QUERY
)
419 name
= "textrec2.%s" % self
.get_dns_domain()
420 q
= self
.make_name_question(name
, dns
.DNS_QTYPE_TXT
, dns
.DNS_QCLASS_IN
)
423 self
.finish_name_packet(p
, questions
)
424 response
= self
.dns_transaction_udp(p
)
425 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_OK
)
426 self
.assertEquals(response
.ancount
, 1)
427 self
.assertEquals(response
.answers
[0].rdata
.txt
, '"This is a test" "and this is a test, too"')
430 def test_delete_record(self
):
431 "Test if deleting records works"
432 p
= self
.make_name_packet(dns
.DNS_OPCODE_UPDATE
)
435 name
= self
.get_dns_domain()
437 u
= self
.make_name_question(name
, dns
.DNS_QTYPE_SOA
, dns
.DNS_QCLASS_IN
)
439 self
.finish_name_packet(p
, updates
)
443 r
.name
= "textrec.%s" % self
.get_dns_domain()
444 r
.rr_type
= dns
.DNS_QTYPE_TXT
445 r
.rr_class
= dns
.DNS_QCLASS_NONE
448 r
.rdata
= dns
.txt_record()
449 r
.rdata
.txt
= '"This is a test"'
451 p
.nscount
= len(updates
)
454 response
= self
.dns_transaction_udp(p
)
455 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_OK
)
457 p
= self
.make_name_packet(dns
.DNS_OPCODE_QUERY
)
460 name
= "textrec.%s" % self
.get_dns_domain()
461 q
= self
.make_name_question(name
, dns
.DNS_QTYPE_TXT
, dns
.DNS_QCLASS_IN
)
464 self
.finish_name_packet(p
, questions
)
465 response
= self
.dns_transaction_udp(p
)
466 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_NXDOMAIN
)
469 if __name__
== "__main__":