samba-tool: avoid mutable Command class values
[samba.git] / python / samba / netcmd / __init__.py
blob54e4a207a209050b6ede63dc8519680d1e929312
1 # Unix SMB/CIFS implementation.
2 # Copyright (C) Jelmer Vernooij <jelmer@samba.org> 2009-2012
3 # Copyright (C) Theresa Halloran <theresahalloran@gmail.com> 2011
5 # This program is free software; you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; either version 3 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
19 import json
20 import optparse
21 import sys
22 import textwrap
23 import traceback
25 import samba
26 from ldb import ERR_INVALID_CREDENTIALS, LdbError
27 from samba import colour
28 from samba.auth import system_session
29 from samba.getopt import Option, OptionParser
30 from samba.logger import get_samba_logger
31 from samba.samdb import SamDB
32 from samba.dcerpc.security import SDDLValueError
34 from .encoders import JSONEncoder
37 class PlainHelpFormatter(optparse.IndentedHelpFormatter):
38 """This help formatter does text wrapping and preserves newlines."""
40 def format_description(self, description=""):
41 desc_width = self.width - self.current_indent
42 indent = " " * self.current_indent
43 paragraphs = description.split('\n')
44 wrapped_paragraphs = [
45 textwrap.fill(p,
46 desc_width,
47 initial_indent=indent,
48 subsequent_indent=indent)
49 for p in paragraphs]
50 result = "\n".join(wrapped_paragraphs) + "\n"
51 return result
53 def format_epilog(self, epilog):
54 if epilog:
55 return "\n" + epilog + "\n"
56 else:
57 return ""
60 class Command(object):
61 """A samba-tool command."""
63 def _get_short_description(self):
64 return self.__doc__.splitlines()[0].rstrip("\n")
66 short_description = property(_get_short_description)
68 def _get_full_description(self):
69 lines = self.__doc__.split("\n")
70 return lines[0] + "\n" + textwrap.dedent("\n".join(lines[1:]))
72 full_description = property(_get_full_description)
74 def _get_name(self):
75 name = self.__class__.__name__
76 if name.startswith("cmd_"):
77 return name[4:]
78 return name
80 name = property(_get_name)
82 # synopsis must be defined in all subclasses in order to provide the
83 # command usage
84 synopsis = None
85 takes_args = ()
86 takes_options = ()
87 takes_optiongroups = {}
89 hidden = False
90 use_colour = True
91 requested_colour = None
93 raw_argv = None
94 raw_args = None
95 raw_kwargs = None
97 def _set_files(self, outf=None, errf=None):
98 if outf is not None:
99 self.outf = outf
100 if errf is not None:
101 self.errf = errf
103 def __init__(self, outf=sys.stdout, errf=sys.stderr):
104 self._set_files(outf, errf)
106 def usage(self, prog=None):
107 parser, _ = self._create_parser(prog)
108 parser.print_usage()
110 def _print_error(self, msg, evalue=None, klass=None):
111 err = colour.c_DARK_RED("ERROR")
112 klass = '' if klass is None else f'({klass})'
114 if evalue is None:
115 print(f"{err}{klass}: {msg}", file=self.errf)
116 else:
117 print(f"{err}{klass}: {msg} - {evalue}", file=self.errf)
119 def _print_sddl_value_error(self, e):
120 generic_msg, specific_msg, position, sddl = e.args
121 print(f"{colour.c_DARK_RED('ERROR')}: {generic_msg}\n",
122 file=self.errf)
123 print(f' {sddl}', file=self.errf)
124 # If the SDDL contains non-ascii characters, the byte offset
125 # provided by the exception won't agree with the visual offset
126 # because those characters will be encoded as multiple bytes.
128 # To account for this we'll attempt to measure the string
129 # length of the specified number of bytes. That is not quite
130 # the same as the visual length, because the SDDL could
131 # contain zero-width, full-width, or combining characters, but
132 # it is closer.
133 try:
134 position = len((sddl.encode()[:position]).decode())
135 except ValueError:
136 # use the original position
137 pass
139 print(f"{colour.c_DARK_YELLOW('^'):>{position + 2}}", file=self.errf)
140 print(f' {specific_msg}', file=self.errf)
142 def ldb_connect(self, hostopts, sambaopts, credopts):
143 """Helper to connect to Ldb database using command line opts."""
144 lp = sambaopts.get_loadparm()
145 creds = credopts.get_credentials(lp)
146 return SamDB(hostopts.H, credentials=creds,
147 session_info=system_session(lp), lp=lp)
149 def print_json(self, data):
150 """Print json on the screen using consistent formatting and sorting.
152 A custom JSONEncoder class is used to help with serializing unknown
153 objects such as Dn for example.
155 json.dump(data, self.outf, cls=JSONEncoder, indent=2, sort_keys=True)
156 self.outf.write("\n")
158 def show_command_error(self, e):
159 """display a command error"""
160 if isinstance(e, CommandError):
161 (etype, evalue, etraceback) = e.exception_info
162 inner_exception = e.inner_exception
163 message = e.message
164 force_traceback = False
165 else:
166 (etype, evalue, etraceback) = sys.exc_info()
167 inner_exception = e
168 message = "uncaught exception"
169 force_traceback = True
171 if isinstance(e, optparse.OptParseError):
172 print(evalue, file=self.errf)
173 self.usage()
174 force_traceback = False
176 elif isinstance(inner_exception, LdbError):
177 (ldb_ecode, ldb_emsg) = inner_exception.args
178 if ldb_ecode == ERR_INVALID_CREDENTIALS:
179 print("Invalid username or password", file=self.errf)
180 force_traceback = False
181 elif ldb_emsg == 'LDAP client internal error: NT_STATUS_NETWORK_UNREACHABLE':
182 print("Could not reach remote server", file=self.errf)
183 force_traceback = False
184 elif ldb_emsg.startswith("Unable to open tdb "):
185 self._print_error(message, ldb_emsg, 'ldb')
186 force_traceback = False
187 else:
188 self._print_error(message, ldb_emsg, 'ldb')
190 elif isinstance(inner_exception, SDDLValueError):
191 self._print_sddl_value_error(inner_exception)
192 force_traceback = False
194 elif isinstance(inner_exception, AssertionError):
195 self._print_error(message, klass='assert')
196 force_traceback = True
197 elif isinstance(inner_exception, RuntimeError):
198 self._print_error(message, evalue, 'runtime')
199 elif type(inner_exception) is Exception:
200 self._print_error(message, evalue, 'exception')
201 force_traceback = True
202 elif inner_exception is None:
203 self._print_error(message)
204 else:
205 self._print_error(message, evalue, str(etype))
207 if force_traceback or samba.get_debug_level() >= 3:
208 traceback.print_tb(etraceback, file=self.errf)
210 def _create_parser(self, prog=None, epilog=None):
211 parser = OptionParser(
212 usage=self.synopsis,
213 description=self.full_description,
214 formatter=PlainHelpFormatter(),
215 prog=prog,
216 epilog=epilog,
217 option_class=Option)
218 parser.add_options(self.takes_options)
219 optiongroups = {}
220 for name in sorted(self.takes_optiongroups.keys()):
221 optiongroup = self.takes_optiongroups[name]
222 optiongroups[name] = optiongroup(parser)
223 parser.add_option_group(optiongroups[name])
224 if self.use_colour:
225 parser.add_option("--color",
226 help="use colour if available (default: auto)",
227 metavar="always|never|auto",
228 default="auto")
230 return parser, optiongroups
232 def message(self, text):
233 self.outf.write(text + "\n")
235 def _resolve(self, path, *argv, outf=None, errf=None):
236 """This is a leaf node, the command that will actually run."""
237 self._set_files(outf, errf)
238 self.command_name = path
239 return (self, argv)
241 def _run(self, *argv):
242 parser, optiongroups = self._create_parser(self.command_name)
244 # Handle possible validation errors raised by parser
245 try:
246 opts, args = parser.parse_args(list(argv))
247 except Exception as e:
248 self.show_command_error(e)
249 return -1
251 # Filter out options from option groups
252 kwargs = dict(opts.__dict__)
253 for option_group in parser.option_groups:
254 for option in option_group.option_list:
255 if option.dest is not None and option.dest in kwargs:
256 del kwargs[option.dest]
257 kwargs.update(optiongroups)
259 if self.use_colour:
260 self.apply_colour_choice(kwargs.pop('color', 'auto'))
262 # Check for a min a max number of allowed arguments, whenever possible
263 # The suffix "?" means zero or one occurrence
264 # The suffix "+" means at least one occurrence
265 # The suffix "*" means zero or more occurrences
266 min_args = 0
267 max_args = 0
268 undetermined_max_args = False
269 for i, arg in enumerate(self.takes_args):
270 if arg[-1] != "?" and arg[-1] != "*":
271 min_args += 1
272 if arg[-1] == "+" or arg[-1] == "*":
273 undetermined_max_args = True
274 else:
275 max_args += 1
276 if (len(args) < min_args) or (not undetermined_max_args and len(args) > max_args):
277 parser.print_usage()
278 return -1
280 self.raw_argv = list(argv)
281 self.raw_args = args
282 self.raw_kwargs = kwargs
284 try:
285 return self.run(*args, **kwargs)
286 except Exception as e:
287 self.show_command_error(e)
288 return -1
290 def run(self, *args, **kwargs):
291 """Run the command. This should be overridden by all subclasses."""
292 raise NotImplementedError(f"'{self.command_name}' run method not implemented")
294 def get_logger(self, name="", verbose=False, quiet=False, **kwargs):
295 """Get a logger object."""
296 return get_samba_logger(
297 name=name or self.name, stream=self.errf,
298 verbose=verbose, quiet=quiet,
299 **kwargs)
301 def apply_colour_choice(self, requested):
302 """Heuristics to work out whether the user wants colour output, from a
303 --color=yes|no|auto option. This alters the ANSI 16 bit colour
304 "constants" in the colour module to be either real colours or empty
305 strings.
307 self.requested_colour = requested
308 try:
309 colour.colour_if_wanted(self.outf,
310 self.errf,
311 hint=requested)
312 except ValueError as e:
313 raise CommandError(f"Unknown --color option: {requested} "
314 "please choose from always|never|auto")
317 class SuperCommand(Command):
318 """A samba-tool command with subcommands."""
320 synopsis = "%prog <subcommand>"
322 subcommands = {}
324 def _resolve(self, path, *args, outf=None, errf=None):
325 """This is an internal node. We need to consume one of the args and
326 find the relevant child, returning an instance of that Command.
328 If there are no children, this SuperCommand will be returned
329 and its _run() will do a --help like thing.
331 self.command_name = path
332 self._set_files(outf, errf)
334 # We collect up certain option arguments and pass them to the
335 # leaf, which is why we iterate over args, though we really
336 # expect to return in the first iteration.
337 deferred_args = []
339 for i, a in enumerate(args):
340 if a in self.subcommands:
341 sub_args = args[i + 1:] + tuple(deferred_args)
342 sub_path = f'{path} {a}'
344 sub = self.subcommands[a]
345 return sub._resolve(sub_path, *sub_args, outf=outf, errf=errf)
347 elif a in ['--help', 'help', None, '-h', '-V', '--version']:
348 # we pass these to the leaf node.
349 if a == 'help':
350 a = '--help'
351 deferred_args.append(a)
352 continue
354 # they are talking nonsense
355 print("%s: no such subcommand: %s\n" % (path, a), file=self.outf)
356 return (self, [])
358 # We didn't find a subcommand, but maybe we found e.g. --version
359 print("%s: missing subcommand\n" % (path), file=self.outf)
360 return (self, deferred_args)
362 def _run(self, *argv):
363 epilog = "\nAvailable subcommands:\n"
365 subcmds = sorted(self.subcommands.keys())
366 max_length = max([len(c) for c in subcmds])
367 for cmd_name in subcmds:
368 cmd = self.subcommands[cmd_name]
369 if cmd.hidden:
370 continue
371 epilog += " %*s - %s\n" % (
372 -max_length, cmd_name, cmd.short_description)
374 epilog += ("For more help on a specific subcommand, please type: "
375 f"{self.command_name} <subcommand> (-h|--help)\n")
377 parser, optiongroups = self._create_parser(self.command_name, epilog=epilog)
378 opts, args = parser.parse_args(list(argv))
380 # note: if argv had --help, parser.parse_args() will have
381 # already done the .print_help() and attempted to exit with
382 # return code 0, so we won't get here.
383 parser.print_help()
384 return -1
387 class CommandError(Exception):
388 """An exception class for samba-tool Command errors."""
390 def __init__(self, message, inner_exception=None):
391 self.message = message
392 self.inner_exception = inner_exception
393 self.exception_info = sys.exc_info()
395 def __repr__(self):
396 return "CommandError(%s)" % self.message