tests/dns_forwarder: Add testing for DNS forwarding
[Samba.git] / python / samba / tests / dns_forwarder_helpers / server.py
blobe302485ea1be7fd5effcd6c2b47b8613007ac3b1
1 # Unix SMB/CIFS implementation.
2 # Copyright (C) Catalyst.Net Ltd 2016
3 # Catalyst.Net's contribution was written by Douglas Bagnall
4 # <douglas.bagnall@catalyst.net.nz> and Garming Sam <garming@catalyst.net.nz>
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 3 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful,
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 # GNU General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program. If not, see <http://www.gnu.org/licenses/>.
19 # Based on the EchoServer example from python docs
20 import SocketServer
21 import time
22 import sys
23 from threading import Timer
24 from samba.dcerpc import dns
25 import samba.ndr as ndr
26 import random
27 import re
29 VERBOSE = False
31 def debug(msg):
32 if VERBOSE:
33 sys.stdout.flush()
34 print "\033[00;36m%s\033[00m" % msg
35 sys.stdout.flush()
37 timeout = 0
40 def answer_question(data, question):
41 r = dns.res_rec()
42 r.name = question.name
43 r.rr_type = dns.DNS_QTYPE_CNAME
44 r.rr_class = dns.DNS_QCLASS_IN
45 r.ttl = 900
46 r.length = 0xffff
47 r.rdata = SERVER_ID
48 return r
51 class DnsHandler(SocketServer.BaseRequestHandler):
52 def make_answer(self, data):
53 data = ndr.ndr_unpack(dns.name_packet, data)
55 debug('answering this question:')
56 debug(data.__ndr_print__())
58 answer = answer_question(data, data.questions[0])
59 if answer is not None:
60 data.answers = [answer] * 1
61 data.ancount += 1
62 debug('the answer was: ')
63 debug(data.__ndr_print__())
65 data.operation |= dns.DNS_FLAG_REPLY
67 return ndr.ndr_pack(data)
69 def really_handle(self, data, socket):
70 answer = self.make_answer(data)
71 socket.sendto(answer, self.client_address)
73 def handle(self):
74 data, socket = self.request
75 debug("%s: %s wrote:" % (SERVER_ID, self.client_address[0]))
77 global timeout
78 m = re.match('^timeout\s+([\d.]+)$', data.strip())
79 if m:
80 timeout = float(m.group(1))
81 debug("timing out at %s" % timeout)
82 return
84 t = Timer(timeout, self.really_handle, [data, socket])
85 t.start()
87 def main():
88 global SERVER_ID
89 host, port, SERVER_ID = sys.argv[1:]
90 server = SocketServer.UDPServer((host, int(port)), DnsHandler)
91 server.serve_forever()
93 main()