3 # Unix SMB/CIFS implementation.
4 # Copyright (C) Kai Blin <kai@samba.org> 2011
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 3 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful,
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 # GNU General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program. If not, see <http://www.gnu.org/licenses/>.
24 from samba
import socket
25 import samba
.ndr
as ndr
26 import samba
.dcerpc
.dns
as dns
27 from samba
.tests
import TestCase
29 class DNSTest(TestCase
):
31 def errstr(self
, errcode
):
32 "Return a readable error code"
47 return string_codes
[errcode
]
50 def assert_dns_rcode_equals(self
, packet
, rcode
):
51 "Helper function to check return code"
52 p_errcode
= packet
.operation
& 0x000F
53 self
.assertEquals(p_errcode
, rcode
, "Expected RCODE %s, got %s" % \
54 (self
.errstr(rcode
), self
.errstr(p_errcode
)))
56 def assert_dns_opcode_equals(self
, packet
, opcode
):
57 "Helper function to check opcode"
58 p_opcode
= packet
.operation
& 0x7800
59 self
.assertEquals(p_opcode
, opcode
, "Expected OPCODE %s, got %s" % \
62 def make_name_packet(self
, opcode
, qid
=None):
63 "Helper creating a dns.name_packet"
66 p
.id = random
.randint(0x0, 0xffff)
71 def finish_name_packet(self
, packet
, questions
):
72 "Helper to finalize a dns.name_packet"
73 packet
.qdcount
= len(questions
)
74 packet
.questions
= questions
76 def make_name_question(self
, name
, qtype
, qclass
):
77 "Helper creating a dns.name_question"
78 q
= dns
.name_question()
80 q
.question_type
= qtype
81 q
.question_class
= qclass
84 def get_dns_domain(self
):
85 "Helper to get dns domain"
86 return os
.getenv('REALM', 'example.com').lower()
88 def dns_transaction_udp(self
, packet
, host
=os
.getenv('DC_SERVER_IP')):
89 "send a DNS query and read the reply"
92 send_packet
= ndr
.ndr_pack(packet
)
93 s
= socket
.socket(socket
.AF_INET
, socket
.SOCK_DGRAM
, 0)
95 s
.send(send_packet
, 0)
96 recv_packet
= s
.recv(2048, 0)
97 return ndr
.ndr_unpack(dns
.name_packet
, recv_packet
)
102 def test_one_a_query(self
):
103 "create a query packet containing one query record"
104 p
= self
.make_name_packet(dns
.DNS_OPCODE_QUERY
)
107 name
= "%s.%s" % (os
.getenv('DC_SERVER'), self
.get_dns_domain())
108 q
= self
.make_name_question(name
, dns
.DNS_QTYPE_A
, dns
.DNS_QCLASS_IN
)
109 print "asking for ", q
.name
112 self
.finish_name_packet(p
, questions
)
113 response
= self
.dns_transaction_udp(p
)
114 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_OK
)
115 self
.assert_dns_opcode_equals(response
, dns
.DNS_OPCODE_QUERY
)
116 self
.assertEquals(response
.ancount
, 1)
117 self
.assertEquals(response
.answers
[0].rdata
,
118 os
.getenv('DC_SERVER_IP'))
120 def test_two_queries(self
):
121 "create a query packet containing two query records"
122 p
= self
.make_name_packet(dns
.DNS_OPCODE_QUERY
)
125 name
= "%s.%s" % (os
.getenv('DC_SERVER'), self
.get_dns_domain())
126 q
= self
.make_name_question(name
, dns
.DNS_QTYPE_A
, dns
.DNS_QCLASS_IN
)
129 name
= "%s.%s" % ('bogusname', self
.get_dns_domain())
130 q
= self
.make_name_question(name
, dns
.DNS_QTYPE_A
, dns
.DNS_QCLASS_IN
)
133 self
.finish_name_packet(p
, questions
)
134 response
= self
.dns_transaction_udp(p
)
135 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_FORMERR
)
137 def test_qtype_all_query(self
):
138 "create a QTYPE_ALL query"
139 p
= self
.make_name_packet(dns
.DNS_OPCODE_QUERY
)
142 name
= "%s.%s" % (os
.getenv('DC_SERVER'), self
.get_dns_domain())
143 q
= self
.make_name_question(name
, dns
.DNS_QTYPE_ALL
, dns
.DNS_QCLASS_IN
)
144 print "asking for ", q
.name
147 self
.finish_name_packet(p
, questions
)
148 response
= self
.dns_transaction_udp(p
)
151 dc_ipv6
= os
.getenv('DC_SERVER_IPV6')
152 if dc_ipv6
is not None:
155 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_OK
)
156 self
.assert_dns_opcode_equals(response
, dns
.DNS_OPCODE_QUERY
)
157 self
.assertEquals(response
.ancount
, num_answers
)
158 self
.assertEquals(response
.answers
[0].rdata
,
159 os
.getenv('DC_SERVER_IP'))
160 if dc_ipv6
is not None:
161 self
.assertEquals(response
.answers
[1].rdata
, dc_ipv6
)
163 def test_qclass_none_query(self
):
164 "create a QCLASS_NONE query"
165 p
= self
.make_name_packet(dns
.DNS_OPCODE_QUERY
)
168 name
= "%s.%s" % (os
.getenv('DC_SERVER'), self
.get_dns_domain())
169 q
= self
.make_name_question(name
, dns
.DNS_QTYPE_ALL
, dns
.DNS_QCLASS_NONE
)
172 self
.finish_name_packet(p
, questions
)
173 response
= self
.dns_transaction_udp(p
)
174 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_NOTIMP
)
176 # Only returns an authority section entry in BIND and Win DNS
177 # FIXME: Enable one Samba implements this feature
178 # def test_soa_hostname_query(self):
179 # "create a SOA query for a hostname"
180 # p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
183 # name = "%s.%s" % (os.getenv('DC_SERVER'), self.get_dns_domain())
184 # q = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
185 # questions.append(q)
187 # self.finish_name_packet(p, questions)
188 # response = self.dns_transaction_udp(p)
189 # self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
190 # self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
191 # # We don't get SOA records for single hosts
192 # self.assertEquals(response.ancount, 0)
194 def test_soa_domain_query(self
):
195 "create a SOA query for a domain"
196 p
= self
.make_name_packet(dns
.DNS_OPCODE_QUERY
)
199 name
= self
.get_dns_domain()
200 q
= self
.make_name_question(name
, dns
.DNS_QTYPE_SOA
, dns
.DNS_QCLASS_IN
)
203 self
.finish_name_packet(p
, questions
)
204 response
= self
.dns_transaction_udp(p
)
205 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_OK
)
206 self
.assert_dns_opcode_equals(response
, dns
.DNS_OPCODE_QUERY
)
207 self
.assertEquals(response
.ancount
, 1)
209 def test_two_updates(self
):
210 "create two update requests"
211 p
= self
.make_name_packet(dns
.DNS_OPCODE_UPDATE
)
214 name
= "%s.%s" % (os
.getenv('DC_SERVER'), self
.get_dns_domain())
215 u
= self
.make_name_question(name
, dns
.DNS_QTYPE_A
, dns
.DNS_QCLASS_IN
)
218 name
= self
.get_dns_domain()
219 u
= self
.make_name_question(name
, dns
.DNS_QTYPE_A
, dns
.DNS_QCLASS_IN
)
222 self
.finish_name_packet(p
, updates
)
223 response
= self
.dns_transaction_udp(p
)
224 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_FORMERR
)
226 def test_update_wrong_qclass(self
):
227 "create update with DNS_QCLASS_NONE"
228 p
= self
.make_name_packet(dns
.DNS_OPCODE_UPDATE
)
231 name
= self
.get_dns_domain()
232 u
= self
.make_name_question(name
, dns
.DNS_QTYPE_A
, dns
.DNS_QCLASS_NONE
)
235 self
.finish_name_packet(p
, updates
)
236 response
= self
.dns_transaction_udp(p
)
237 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_NOTIMP
)
239 def test_update_prereq_with_non_null_ttl(self
):
240 "test update with a non-null TTL"
241 p
= self
.make_name_packet(dns
.DNS_OPCODE_UPDATE
)
244 name
= self
.get_dns_domain()
246 u
= self
.make_name_question(name
, dns
.DNS_QTYPE_SOA
, dns
.DNS_QCLASS_IN
)
248 self
.finish_name_packet(p
, updates
)
252 r
.name
= "%s.%s" % (os
.getenv('DC_SERVER'), self
.get_dns_domain())
253 r
.rr_type
= dns
.DNS_QTYPE_TXT
254 r
.rr_class
= dns
.DNS_QCLASS_NONE
259 p
.ancount
= len(prereqs
)
262 response
= self
.dns_transaction_udp(p
)
263 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_FORMERR
)
265 # I'd love to test this one, but it segfaults. :)
266 # def test_update_prereq_with_non_null_length(self):
267 # "test update with a non-null length"
268 # p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
271 # name = self.get_dns_domain()
273 # u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
275 # self.finish_name_packet(p, updates)
279 # r.name = "%s.%s" % (os.getenv('DC_SERVER'), self.get_dns_domain())
280 # r.rr_type = dns.DNS_QTYPE_TXT
281 # r.rr_class = dns.DNS_QCLASS_ANY
286 # p.ancount = len(prereqs)
287 # p.answers = prereqs
289 # response = self.dns_transaction_udp(p)
290 # self.assert_dns_rcode_equals(response, dns.DNS_RCODE_FORMERR)
292 def test_update_prereq_nonexisting_name(self
):
293 "test update with a non-null TTL"
294 p
= self
.make_name_packet(dns
.DNS_OPCODE_UPDATE
)
297 name
= self
.get_dns_domain()
299 u
= self
.make_name_question(name
, dns
.DNS_QTYPE_SOA
, dns
.DNS_QCLASS_IN
)
301 self
.finish_name_packet(p
, updates
)
305 r
.name
= "idontexist.%s" % self
.get_dns_domain()
306 r
.rr_type
= dns
.DNS_QTYPE_TXT
307 r
.rr_class
= dns
.DNS_QCLASS_ANY
312 p
.ancount
= len(prereqs
)
315 response
= self
.dns_transaction_udp(p
)
316 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_NXRRSET
)
318 if __name__
== "__main__":