s4 dns: Test QCLASS_NONE query
[Samba/gebeck_regimport.git] / source4 / scripting / python / samba / tests / dns.py
blobdf10b715efac807d0a3e62ca259639cd1649759e
1 #!/usr/bin/env python
3 # Unix SMB/CIFS implementation.
4 # Copyright (C) Kai Blin <kai@samba.org> 2011
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 3 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful,
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 # GNU General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program. If not, see <http://www.gnu.org/licenses/>.
20 import os
21 import sys
22 import struct
23 import random
24 from samba import socket
25 import samba.ndr as ndr
26 import samba.dcerpc.dns as dns
27 from samba.tests import TestCase
29 class DNSTest(TestCase):
31 def assert_dns_rcode_equals(self, packet, rcode):
32 "Helper function to check return code"
33 p_errcode = packet.operation & 0x000F
34 self.assertEquals(p_errcode, rcode, "Expected RCODE %s, got %s" % \
35 (rcode, p_errcode))
37 def assert_dns_opcode_equals(self, packet, opcode):
38 "Helper function to check opcode"
39 p_opcode = packet.operation & 0x7800
40 self.assertEquals(p_opcode, opcode, "Expected OPCODE %s, got %s" % \
41 (opcode, p_opcode))
43 def make_name_packet(self, opcode, qid=None):
44 "Helper creating a dns.name_packet"
45 p = dns.name_packet()
46 if qid is None:
47 p.id = random.randint(0x0, 0xffff)
48 p.operation = opcode
49 p.questions = []
50 return p
52 def finish_name_packet(self, packet, questions):
53 "Helper to finalize a dns.name_packet"
54 packet.qdcount = len(questions)
55 packet.questions = questions
57 def make_name_question(self, name, qtype, qclass):
58 "Helper creating a dns.name_question"
59 q = dns.name_question()
60 q.name = name
61 q.question_type = qtype
62 q.question_class = qclass
63 return q
65 def get_dns_domain(self):
66 "Helper to get dns domain"
67 return os.getenv('REALM', 'example.com').lower()
69 def dns_transaction_udp(self, packet, host=os.getenv('DC_SERVER_IP')):
70 "send a DNS query and read the reply"
71 s = None
72 try:
73 send_packet = ndr.ndr_pack(packet)
74 s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, 0)
75 s.connect((host, 53))
76 s.send(send_packet, 0)
77 recv_packet = s.recv(2048, 0)
78 return ndr.ndr_unpack(dns.name_packet, recv_packet)
79 finally:
80 if s is not None:
81 s.close()
83 def test_one_a_query(self):
84 "create a query packet containing one query record"
85 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
86 questions = []
88 name = "%s.%s" % (os.getenv('DC_SERVER'), self.get_dns_domain())
89 q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
90 print "asking for ", q.name
91 questions.append(q)
93 self.finish_name_packet(p, questions)
94 response = self.dns_transaction_udp(p)
95 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
96 self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
97 self.assertEquals(response.ancount, 1)
98 self.assertEquals(response.answers[0].rdata,
99 os.getenv('DC_SERVER_IP'))
101 def test_two_queries(self):
102 "create a query packet containing two query records"
103 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
104 questions = []
106 name = "%s.%s" % (os.getenv('DC_SERVER'), self.get_dns_domain())
107 q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
108 questions.append(q)
110 name = "%s.%s" % ('bogusname', self.get_dns_domain())
111 q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
112 questions.append(q)
114 self.finish_name_packet(p, questions)
115 response = self.dns_transaction_udp(p)
116 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_FORMERR)
118 def test_qtype_all_query(self):
119 "create a QTYPE_ALL query"
120 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
121 questions = []
123 name = "%s.%s" % (os.getenv('DC_SERVER'), self.get_dns_domain())
124 q = self.make_name_question(name, dns.DNS_QTYPE_ALL, dns.DNS_QCLASS_IN)
125 print "asking for ", q.name
126 questions.append(q)
128 self.finish_name_packet(p, questions)
129 response = self.dns_transaction_udp(p)
131 num_answers = 1
132 dc_ipv6 = os.getenv('DC_SERVER_IPV6')
133 if dc_ipv6 is not None:
134 num_answers += 1
136 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
137 self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
138 self.assertEquals(response.ancount, num_answers)
139 self.assertEquals(response.answers[0].rdata,
140 os.getenv('DC_SERVER_IP'))
141 if dc_ipv6 is not None:
142 self.assertEquals(response.answers[1].rdata, dc_ipv6)
144 def test_qclass_none_query(self):
145 "create a QCLASS_NONE query"
146 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
147 questions = []
149 name = "%s.%s" % (os.getenv('DC_SERVER'), self.get_dns_domain())
150 q = self.make_name_question(name, dns.DNS_QTYPE_ALL, dns.DNS_QCLASS_NONE)
151 questions.append(q)
153 self.finish_name_packet(p, questions)
154 response = self.dns_transaction_udp(p)
155 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NOTIMP)
158 if __name__ == "__main__":
159 import unittest
160 unittest.main()