netcmd: add optparse validators and Range validator
[Samba.git] / python / samba / netcmd / __init__.py
blob096ba096d48ed432f0edc9091f19879c0f5081a5
1 # Unix SMB/CIFS implementation.
2 # Copyright (C) Jelmer Vernooij <jelmer@samba.org> 2009-2012
3 # Copyright (C) Theresa Halloran <theresahalloran@gmail.com> 2011
5 # This program is free software; you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; either version 3 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
19 import optparse
20 import samba
21 from samba import colour
22 from samba.getopt import SambaOption, OptionError
23 from samba.logger import get_samba_logger
24 from ldb import LdbError, ERR_INVALID_CREDENTIALS
25 import sys
26 import traceback
27 import textwrap
29 from .validators import ValidationError
32 class Option(SambaOption):
33 ATTRS = SambaOption.ATTRS + ["validators"]
34 SUPPRESS_HELP = optparse.SUPPRESS_HELP
36 def run_validators(self, opt, value):
37 """Runs the list of validators on the current option.
39 If the validator raises ValidationError, turn that into CommandError
40 which gives nicer output.
41 """
42 validators = getattr(self, "validators") or []
44 for validator in validators:
45 try:
46 validator(opt, value)
47 except ValidationError as e:
48 raise CommandError(e)
50 def convert_value(self, opt, value):
51 """Override convert_value to run validators just after.
53 This can also be done in process() but there we would have to
54 replace the entire method.
55 """
56 value = super().convert_value(opt, value)
57 self.run_validators(opt, value)
58 return value
61 # This help formatter does text wrapping and preserves newlines
64 class PlainHelpFormatter(optparse.IndentedHelpFormatter):
65 def format_description(self, description=""):
66 desc_width = self.width - self.current_indent
67 indent = " " * self.current_indent
68 paragraphs = description.split('\n')
69 wrapped_paragraphs = [
70 textwrap.fill(p,
71 desc_width,
72 initial_indent=indent,
73 subsequent_indent=indent)
74 for p in paragraphs]
75 result = "\n".join(wrapped_paragraphs) + "\n"
76 return result
78 def format_epilog(self, epilog):
79 if epilog:
80 return "\n" + epilog + "\n"
81 else:
82 return ""
85 class Command(object):
86 """A samba-tool command."""
88 def _get_short_description(self):
89 return self.__doc__.splitlines()[0].rstrip("\n")
91 short_description = property(_get_short_description)
93 def _get_full_description(self):
94 lines = self.__doc__.split("\n")
95 return lines[0] + "\n" + textwrap.dedent("\n".join(lines[1:]))
97 full_description = property(_get_full_description)
99 def _get_name(self):
100 name = self.__class__.__name__
101 if name.startswith("cmd_"):
102 return name[4:]
103 return name
105 name = property(_get_name)
107 # synopsis must be defined in all subclasses in order to provide the
108 # command usage
109 synopsis = None
110 takes_args = []
111 takes_options = []
112 takes_optiongroups = {}
114 hidden = False
115 use_colour = True
116 requested_colour = None
118 raw_argv = None
119 raw_args = None
120 raw_kwargs = None
122 def _set_files(self, outf=None, errf=None):
123 if outf is not None:
124 self.outf = outf
125 if errf is not None:
126 self.errf = errf
128 def __init__(self, outf=sys.stdout, errf=sys.stderr):
129 self._set_files(outf, errf)
131 def usage(self, prog=None):
132 parser, _ = self._create_parser(prog)
133 parser.print_usage()
135 def _print_error(self, msg, evalue=None, klass=None):
136 err = colour.c_DARK_RED("ERROR")
137 klass = '' if klass is None else f'({klass})'
139 if evalue is None:
140 print(f"{err}{klass}: {msg}", file=self.errf)
141 else:
142 print(f"{err}{klass}: {msg} - {evalue}", file=self.errf)
144 def show_command_error(self, e):
145 '''display a command error'''
146 if isinstance(e, CommandError):
147 (etype, evalue, etraceback) = e.exception_info
148 inner_exception = e.inner_exception
149 message = e.message
150 force_traceback = False
151 else:
152 (etype, evalue, etraceback) = sys.exc_info()
153 inner_exception = e
154 message = "uncaught exception"
155 force_traceback = True
157 if isinstance(e, OptionError):
158 print(evalue, file=self.errf)
159 self.usage()
160 force_traceback = False
162 elif isinstance(inner_exception, LdbError):
163 (ldb_ecode, ldb_emsg) = inner_exception.args
164 if ldb_ecode == ERR_INVALID_CREDENTIALS:
165 print("Invalid username or password", file=self.errf)
166 force_traceback = False
167 elif ldb_emsg == 'LDAP client internal error: NT_STATUS_NETWORK_UNREACHABLE':
168 print("Could not reach remote server", file=self.errf)
169 force_traceback = False
170 elif ldb_emsg.startswith("Unable to open tdb "):
171 self._print_error(message, ldb_emsg, 'ldb')
172 force_traceback = False
173 else:
174 self._print_error(message, ldb_emsg, 'ldb')
176 elif isinstance(inner_exception, AssertionError):
177 self._print_error(message, klass='assert')
178 force_traceback = True
179 elif isinstance(inner_exception, RuntimeError):
180 self._print_error(message, evalue, 'runtime')
181 elif type(inner_exception) is Exception:
182 self._print_error(message, evalue, 'exception')
183 force_traceback = True
184 elif inner_exception is None:
185 self._print_error(message)
186 else:
187 self._print_error(message, evalue, str(etype))
189 if force_traceback or samba.get_debug_level() >= 3:
190 traceback.print_tb(etraceback, file=self.errf)
192 def _create_parser(self, prog=None, epilog=None):
193 parser = optparse.OptionParser(
194 usage=self.synopsis,
195 description=self.full_description,
196 formatter=PlainHelpFormatter(),
197 prog=prog, epilog=epilog)
198 parser.add_options(self.takes_options)
199 optiongroups = {}
200 for name in sorted(self.takes_optiongroups.keys()):
201 optiongroup = self.takes_optiongroups[name]
202 optiongroups[name] = optiongroup(parser)
203 parser.add_option_group(optiongroups[name])
204 if self.use_colour:
205 parser.add_option("--color",
206 help="use colour if available (default: auto)",
207 metavar="always|never|auto",
208 default="auto")
210 return parser, optiongroups
212 def message(self, text):
213 self.outf.write(text + "\n")
215 def _resolve(self, path, *argv, outf=None, errf=None):
216 """This is a leaf node, the command that will actually run."""
217 self._set_files(outf, errf)
218 self.command_name = path
219 return (self, argv)
221 def _run(self, *argv):
222 parser, optiongroups = self._create_parser(self.command_name)
223 opts, args = parser.parse_args(list(argv))
224 # Filter out options from option groups
225 kwargs = dict(opts.__dict__)
226 for option_group in parser.option_groups:
227 for option in option_group.option_list:
228 if option.dest is not None:
229 del kwargs[option.dest]
230 kwargs.update(optiongroups)
232 if self.use_colour:
233 self.apply_colour_choice(kwargs.pop('color', 'auto'))
235 # Check for a min a max number of allowed arguments, whenever possible
236 # The suffix "?" means zero or one occurrence
237 # The suffix "+" means at least one occurrence
238 # The suffix "*" means zero or more occurrences
239 min_args = 0
240 max_args = 0
241 undetermined_max_args = False
242 for i, arg in enumerate(self.takes_args):
243 if arg[-1] != "?" and arg[-1] != "*":
244 min_args += 1
245 if arg[-1] == "+" or arg[-1] == "*":
246 undetermined_max_args = True
247 else:
248 max_args += 1
249 if (len(args) < min_args) or (not undetermined_max_args and len(args) > max_args):
250 parser.print_usage()
251 return -1
253 self.raw_argv = list(argv)
254 self.raw_args = args
255 self.raw_kwargs = kwargs
257 try:
258 return self.run(*args, **kwargs)
259 except Exception as e:
260 self.show_command_error(e)
261 return -1
263 def run(self, *args, **kwargs):
264 """Run the command. This should be overridden by all subclasses."""
265 raise NotImplementedError(f"'{self.command_name}' run method not implemented")
267 def get_logger(self, name="", verbose=False, quiet=False, **kwargs):
268 """Get a logger object."""
269 return get_samba_logger(
270 name=name or self.name, stream=self.errf,
271 verbose=verbose, quiet=quiet,
272 **kwargs)
274 def apply_colour_choice(self, requested):
275 """Heuristics to work out whether the user wants colour output, from a
276 --color=yes|no|auto option. This alters the ANSI 16 bit colour
277 "constants" in the colour module to be either real colours or empty
278 strings.
280 self.requested_colour = requested
281 try:
282 colour.colour_if_wanted(self.outf,
283 self.errf,
284 hint=requested)
285 except ValueError as e:
286 raise CommandError(f"Unknown --color option: {requested} "
287 "please choose from always|never|auto")
290 class SuperCommand(Command):
291 """A samba-tool command with subcommands."""
293 synopsis = "%prog <subcommand>"
295 subcommands = {}
297 def _resolve(self, path, *args, outf=None, errf=None):
298 """This is an internal node. We need to consume one of the args and
299 find the relevant child, returning an instance of that Command.
301 If there are no children, this SuperCommand will be returned
302 and its _run() will do a --help like thing.
304 self.command_name = path
305 self._set_files(outf, errf)
307 # We collect up certain option arguments and pass them to the
308 # leaf, which is why we iterate over args, though we really
309 # expect to return in the first iteration.
310 deferred_args = []
312 for i, a in enumerate(args):
313 if a in self.subcommands:
314 sub_args = args[i + 1:] + tuple(deferred_args)
315 sub_path = f'{path} {a}'
317 sub = self.subcommands[a]
318 return sub._resolve(sub_path, *sub_args, outf=outf, errf=errf)
320 elif a in [ '--help', 'help', None, '-h', '-V', '--version' ]:
321 # we pass these to the leaf node.
322 if a == 'help':
323 a = '--help'
324 deferred_args.append(a)
325 continue
327 # they are talking nonsense
328 print("%s: no such subcommand: %s\n" % (path, a), file=self.outf)
329 return (self, [])
331 # We didn't find a subcommand, but maybe we found e.g. --version
332 print("%s: missing subcommand\n" % (path), file=self.outf)
333 return (self, deferred_args)
335 def _run(self, *argv):
336 epilog = "\nAvailable subcommands:\n"
338 subcmds = sorted(self.subcommands.keys())
339 max_length = max([len(c) for c in subcmds])
340 for cmd_name in subcmds:
341 cmd = self.subcommands[cmd_name]
342 if cmd.hidden:
343 continue
344 epilog += " %*s - %s\n" % (
345 -max_length, cmd_name, cmd.short_description)
347 epilog += ("For more help on a specific subcommand, please type: "
348 f"{self.command_name} <subcommand> (-h|--help)\n")
350 parser, optiongroups = self._create_parser(self.command_name, epilog=epilog)
351 opts, args = parser.parse_args(list(argv))
353 # note: if argv had --help, parser.parse_args() will have
354 # already done the .print_help() and attempted to exit with
355 # return code 0, so we won't get here.
356 parser.print_help()
357 return -1
360 class CommandError(Exception):
361 """An exception class for samba-tool Command errors."""
363 def __init__(self, message, inner_exception=None):
364 self.message = message
365 self.inner_exception = inner_exception
366 self.exception_info = sys.exc_info()
368 def __repr__(self):
369 return "CommandError(%s)" % self.message