s4 dns: Check more of the returned values for the A query
[Samba/gebeck_regimport.git] / source4 / scripting / python / samba / tests / dns.py
blobad0b55a7f8ad9e9f9a670a58e1be8baea909d328
1 #!/usr/bin/env python
3 # Unix SMB/CIFS implementation.
4 # Copyright (C) Kai Blin <kai@samba.org> 2011
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 3 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful,
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 # GNU General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program. If not, see <http://www.gnu.org/licenses/>.
20 import os
21 import sys
22 import struct
23 import random
24 from samba import socket
25 import samba.ndr as ndr
26 import samba.dcerpc.dns as dns
27 from samba.tests import TestCase
29 class DNSTest(TestCase):
31 def assert_dns_rcode_equals(self, packet, rcode):
32 "Helper function to check return code"
33 p_errcode = packet.operation & 0x000F
34 self.assertEquals(p_errcode, rcode, "Expected RCODE %s, got %s" % \
35 (rcode, p_errcode))
37 def assert_dns_opcode_equals(self, packet, opcode):
38 "Helper function to check opcode"
39 p_opcode = packet.operation & 0x7800
40 self.assertEquals(p_opcode, opcode, "Expected OPCODE %s, got %s" % \
41 (opcode, p_opcode))
43 def make_name_packet(self, opcode, qid=None):
44 "Helper creating a dns.name_packet"
45 p = dns.name_packet()
46 if qid is None:
47 p.id = random.randint(0x0, 0xffff)
48 p.operation = opcode
49 p.questions = []
50 return p
52 def finish_name_packet(self, packet, questions):
53 "Helper to finalize a dns.name_packet"
54 packet.qdcount = len(questions)
55 packet.questions = questions
57 def make_name_question(self, name, qtype, qclass):
58 "Helper creating a dns.name_question"
59 q = dns.name_question()
60 q.name = name
61 q.question_type = qtype
62 q.question_class = qclass
63 return q
65 def get_dns_domain(self):
66 "Helper to get dns domain"
67 return os.getenv('REALM', 'example.com').lower()
69 def dns_transaction_udp(self, packet, host=os.getenv('DC_SERVER_IP')):
70 "send a DNS query and read the reply"
71 s = None
72 try:
73 send_packet = ndr.ndr_pack(packet)
74 s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, 0)
75 s.connect((host, 53))
76 s.send(send_packet, 0)
77 recv_packet = s.recv(2048, 0)
78 return ndr.ndr_unpack(dns.name_packet, recv_packet)
79 finally:
80 if s is not None:
81 s.close()
83 def test_one_a_query(self):
84 "create a query packet containing one query record"
85 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
86 questions = []
88 name = "%s.%s" % (os.getenv('DC_SERVER'), self.get_dns_domain())
89 q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
90 print "asking for ", q.name
91 questions.append(q)
93 self.finish_name_packet(p, questions)
94 response = self.dns_transaction_udp(p)
95 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
96 self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
97 self.assertEquals(response.ancount, 1)
98 self.assertEquals(response.answers[0].rdata,
99 os.getenv('DC_SERVER_IP'))
101 def test_two_queries(self):
102 "create a query packet containing two query records"
103 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
104 questions = []
106 name = "%s.%s" % (os.getenv('DC_SERVER'), self.get_dns_domain())
107 q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
108 questions.append(q)
110 name = "%s.%s" % ('bogusname', self.get_dns_domain())
111 q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
112 questions.append(q)
114 self.finish_name_packet(p, questions)
115 response = self.dns_transaction_udp(p)
116 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_FORMERR)
119 if __name__ == "__main__":
120 import unittest
121 unittest.main()