s4 dns: Add a first test case
[Samba/gebeck_regimport.git] / source4 / scripting / python / samba / tests / dns.py
blob25505d946327e6a081c06a9b3ff3021410b9a9f1
1 #!/usr/bin/env python
3 # Unix SMB/CIFS implementation.
4 # Copyright (C) Kai Blin <kai@samba.org> 2011
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 3 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful,
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 # GNU General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program. If not, see <http://www.gnu.org/licenses/>.
20 import os
21 import sys
22 import struct
23 import random
24 from samba import socket
25 import samba.ndr as ndr
26 import samba.dcerpc.dns as dns
27 from samba.tests import TestCase
29 class DNSTest(TestCase):
31 def assert_dns_rcode_equals(self, packet, rcode):
32 "Helper function to check return code"
33 p_errcode = packet.operation & 0x000F
34 self.assertEquals(p_errcode, rcode, "Expected RCODE %s, got %s" % \
35 (rcode, p_errcode))
37 def assert_dns_opcode_equals(self, packet, opcode):
38 "Helper function to check opcode"
39 p_opcode = packet.operation & 0x7800
40 self.assertEquals(p_opcode, opcode, "Expected OPCODE %s, got %s" % \
41 (opcode, p_opcode))
43 def make_name_packet(self, opcode, qid=None):
44 "Helper creating a dns.name_packet"
45 p = dns.name_packet()
46 if qid is None:
47 p.id = random.randint(0x0, 0xffff)
48 p.operation = opcode
49 p.questions = []
50 return p
52 def finish_name_packet(self, packet, questions):
53 "Helper to finalize a dns.name_packet"
54 packet.qdcount = len(questions)
55 packet.questions = questions
57 def make_name_question(self, name, qtype, qclass):
58 "Helper creating a dns.name_question"
59 q = dns.name_question()
60 q.name = name
61 q.question_type = qtype
62 q.question_class = qclass
63 return q
65 def get_dns_domain(self):
66 "Helper to get dns domain"
67 return os.getenv('REALM', 'example.com').lower()
69 def test_one_a_query(self):
70 "create a query packet containing one query record"
71 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
72 questions = []
74 name = "%s.%s" % (os.getenv('DC_SERVER'), self.get_dns_domain())
75 q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
76 print "asking for ", q.name
77 questions.append(q)
79 self.finish_name_packet(p, questions)
80 response = self.dns_transaction_udp(p)
81 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
82 self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
84 def test_two_queries(self):
85 "create a query packet containing two query records"
86 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
87 questions = []
89 name = "%s.%s" % (os.getenv('DC_SERVER'), self.get_dns_domain())
90 q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
91 questions.append(q)
93 name = "%s.%s" % ('bogusname', self.get_dns_domain())
94 q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
95 questions.append(q)
97 self.finish_name_packet(p, questions)
98 response = self.dns_transaction_udp(p)
99 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_FORMERR)
101 def dns_transaction_udp(self, packet, host=os.getenv('DC_SERVER_IP')):
102 "send a DNS query and read the reply"
103 s = None
104 try:
105 send_packet = ndr.ndr_pack(packet)
106 s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, 0)
107 s.connect((host, 53))
108 s.send(send_packet, 0)
109 recv_packet = s.recv(2048, 0)
110 return ndr.ndr_unpack(dns.name_packet, recv_packet)
111 finally:
112 if s is not None:
113 s.close()
115 if __name__ == "__main__":
116 import unittest
117 unittest.main()