3 # Unix SMB/CIFS implementation.
4 # Copyright (C) Kai Blin <kai@samba.org> 2011
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 3 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful,
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 # GNU General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program. If not, see <http://www.gnu.org/licenses/>.
24 from samba
import socket
25 import samba
.ndr
as ndr
26 import samba
.dcerpc
.dns
as dns
27 from samba
.tests
import TestCase
29 class DNSTest(TestCase
):
31 def assert_dns_rcode_equals(self
, packet
, rcode
):
32 "Helper function to check return code"
33 p_errcode
= packet
.operation
& 0x000F
34 self
.assertEquals(p_errcode
, rcode
, "Expected RCODE %s, got %s" % \
37 def assert_dns_opcode_equals(self
, packet
, opcode
):
38 "Helper function to check opcode"
39 p_opcode
= packet
.operation
& 0x7800
40 self
.assertEquals(p_opcode
, opcode
, "Expected OPCODE %s, got %s" % \
43 def make_name_packet(self
, opcode
, qid
=None):
44 "Helper creating a dns.name_packet"
47 p
.id = random
.randint(0x0, 0xffff)
52 def finish_name_packet(self
, packet
, questions
):
53 "Helper to finalize a dns.name_packet"
54 packet
.qdcount
= len(questions
)
55 packet
.questions
= questions
57 def make_name_question(self
, name
, qtype
, qclass
):
58 "Helper creating a dns.name_question"
59 q
= dns
.name_question()
61 q
.question_type
= qtype
62 q
.question_class
= qclass
65 def get_dns_domain(self
):
66 "Helper to get dns domain"
67 return os
.getenv('REALM', 'example.com').lower()
69 def dns_transaction_udp(self
, packet
, host
=os
.getenv('DC_SERVER_IP')):
70 "send a DNS query and read the reply"
73 send_packet
= ndr
.ndr_pack(packet
)
74 s
= socket
.socket(socket
.AF_INET
, socket
.SOCK_DGRAM
, 0)
76 s
.send(send_packet
, 0)
77 recv_packet
= s
.recv(2048, 0)
78 return ndr
.ndr_unpack(dns
.name_packet
, recv_packet
)
83 def test_one_a_query(self
):
84 "create a query packet containing one query record"
85 p
= self
.make_name_packet(dns
.DNS_OPCODE_QUERY
)
88 name
= "%s.%s" % (os
.getenv('DC_SERVER'), self
.get_dns_domain())
89 q
= self
.make_name_question(name
, dns
.DNS_QTYPE_A
, dns
.DNS_QCLASS_IN
)
90 print "asking for ", q
.name
93 self
.finish_name_packet(p
, questions
)
94 response
= self
.dns_transaction_udp(p
)
95 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_OK
)
96 self
.assert_dns_opcode_equals(response
, dns
.DNS_OPCODE_QUERY
)
97 self
.assertEquals(response
.ancount
, 1)
98 self
.assertEquals(response
.answers
[0].rdata
,
99 os
.getenv('DC_SERVER_IP'))
101 def test_two_queries(self
):
102 "create a query packet containing two query records"
103 p
= self
.make_name_packet(dns
.DNS_OPCODE_QUERY
)
106 name
= "%s.%s" % (os
.getenv('DC_SERVER'), self
.get_dns_domain())
107 q
= self
.make_name_question(name
, dns
.DNS_QTYPE_A
, dns
.DNS_QCLASS_IN
)
110 name
= "%s.%s" % ('bogusname', self
.get_dns_domain())
111 q
= self
.make_name_question(name
, dns
.DNS_QTYPE_A
, dns
.DNS_QCLASS_IN
)
114 self
.finish_name_packet(p
, questions
)
115 response
= self
.dns_transaction_udp(p
)
116 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_FORMERR
)
119 if __name__
== "__main__":