1 # Unix SMB/CIFS implementation.
2 # Copyright (C) Catalyst.Net Ltd 2016
3 # Catalyst.Net's contribution was written by Douglas Bagnall
4 # <douglas.bagnall@catalyst.net.nz> and Garming Sam <garming@catalyst.net.nz>
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 3 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful,
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 # GNU General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program. If not, see <http://www.gnu.org/licenses/>.
19 # Based on the EchoServer example from python docs
21 import socketserver
as SocketServer
24 from threading
import Timer
25 from samba
.dcerpc
import dns
26 import samba
.ndr
as ndr
35 print("\033[00;36m%s\033[00m" % msg
)
42 def answer_question(data
, question
):
44 r
.name
= question
.name
45 r
.rr_type
= dns
.DNS_QTYPE_CNAME
46 r
.rr_class
= dns
.DNS_QCLASS_IN
53 class DnsHandler(SocketServer
.BaseRequestHandler
):
54 def make_answer(self
, data
):
55 data
= ndr
.ndr_unpack(dns
.name_packet
, data
)
57 debug('answering this question:')
58 debug(data
.__ndr
_print
__())
60 answer
= answer_question(data
, data
.questions
[0])
61 if answer
is not None:
62 data
.answers
= [answer
] * 1
64 debug('the answer was: ')
65 debug(data
.__ndr
_print
__())
67 data
.operation |
= dns
.DNS_FLAG_REPLY
69 return ndr
.ndr_pack(data
)
71 def really_handle(self
, data
, socket
):
72 answer
= self
.make_answer(data
)
73 socket
.sendto(answer
, self
.client_address
)
76 data
, socket
= self
.request
77 debug("%s: %s wrote:" % (SERVER_ID
, self
.client_address
[0]))
80 m
= re
.match(br
'^timeout\s+([\d.]+)$', data
.strip())
82 timeout
= float(m
.group(1))
83 debug("timing out at %s" % timeout
)
86 t
= Timer(timeout
, self
.really_handle
, [data
, socket
])
89 class TestUDPServer(SocketServer
.UDPServer
):
90 def __init__(self
, server_address
, RequestHandlerClass
):
91 if server_address
[0].find(':') != -1:
92 self
.address_family
= socket
.AF_INET6
94 self
.address_family
= socket
.AF_INET
95 super().__init
__(server_address
, RequestHandlerClass
)
99 host
, port
, SERVER_ID
= sys
.argv
[1:]
100 server
= TestUDPServer((host
, int(port
)), DnsHandler
)
101 server
.serve_forever()