docs: remove references to jaraco.packaging.sphinx
[git-cola.git] / cola / utils.py
blob7ce1de8e50481d4e49e6044f27793d4822771b5f
1 """Miscellaneous utility functions"""
2 import copy
3 import os
4 import random
5 import re
6 import shlex
7 import sys
8 import tempfile
9 import time
10 import traceback
12 from . import core
13 from . import compat
15 random.seed(hash(time.time()))
18 def asint(obj, default=0):
19 """Make any value into an int, even if the cast fails"""
20 try:
21 value = int(obj)
22 except (TypeError, ValueError):
23 value = default
24 return value
27 def clamp(value, low, high):
28 """Clamp a value to the specified range"""
29 return min(high, max(low, value))
32 def epoch_millis():
33 return int(time.time() * 1000)
36 def add_parents(paths):
37 """Iterate over each item in the set and add its parent directories."""
38 all_paths = set()
39 for path in paths:
40 while '//' in path:
41 path = path.replace('//', '/')
42 all_paths.add(path)
43 if '/' in path:
44 parent_dir = dirname(path)
45 while parent_dir:
46 all_paths.add(parent_dir)
47 parent_dir = dirname(parent_dir)
48 return all_paths
51 def format_exception(exc):
52 """Format an exception object for display"""
53 exc_type, exc_value, exc_tb = sys.exc_info()
54 details = traceback.format_exception(exc_type, exc_value, exc_tb)
55 details = '\n'.join(map(core.decode, details))
56 if hasattr(exc, 'msg'):
57 msg = exc.msg
58 else:
59 msg = core.decode(repr(exc))
60 return (msg, details)
63 def sublist(values, remove):
64 """Subtracts list b from list a and returns the resulting list."""
65 # conceptually, c = a - b
66 result = []
67 for item in values:
68 if item not in remove:
69 result.append(item)
70 return result
73 __grep_cache = {}
76 def grep(pattern, items, squash=True):
77 """Greps a list for items that match a pattern
79 :param squash: If only one item matches, return just that item
80 :returns: List of matching items
82 """
83 isdict = isinstance(items, dict)
84 if pattern in __grep_cache:
85 regex = __grep_cache[pattern]
86 else:
87 regex = __grep_cache[pattern] = re.compile(pattern)
88 matched = []
89 matchdict = {}
90 for item in items:
91 match = regex.match(item)
92 if not match:
93 continue
94 groups = match.groups()
95 if not groups:
96 subitems = match.group(0)
97 else:
98 if len(groups) == 1:
99 subitems = groups[0]
100 else:
101 subitems = list(groups)
102 if isdict:
103 matchdict[item] = items[item]
104 else:
105 matched.append(subitems)
107 if isdict:
108 result = matchdict
109 elif squash and len(matched) == 1:
110 result = matched[0]
111 else:
112 result = matched
114 return result
117 def basename(path):
119 An os.path.basename() implementation that always uses '/'
121 Avoid os.path.basename because git's output always
122 uses '/' regardless of platform.
125 return path.rsplit('/', 1)[-1]
128 def strip_one(path):
129 """Strip one level of directory"""
130 return path.strip('/').split('/', 1)[-1]
133 def dirname(path, current_dir=''):
135 An os.path.dirname() implementation that always uses '/'
137 Avoid os.path.dirname because git's output always
138 uses '/' regardless of platform.
141 while '//' in path:
142 path = path.replace('//', '/')
143 path_dirname = path.rsplit('/', 1)[0]
144 if path_dirname == path:
145 return current_dir
146 return path.rsplit('/', 1)[0]
149 def splitpath(path):
150 """Split paths using '/' regardless of platform"""
151 return path.split('/')
154 def split(name):
155 """Split a path-like name. Returns tuple "(head, tail)" where "tail" is
156 everything after the final slash. The "head" may be empty.
158 This is the same as os.path.split() but only uses '/' as the delimiter.
160 >>> split('a/b/c')
161 ('a/b', 'c')
163 >>> split('xyz')
164 ('', 'xyz')
167 return (dirname(name), basename(name))
170 def join(*paths):
171 """Join paths using '/' regardless of platform
173 >>> join('a', 'b', 'c')
174 'a/b/c'
177 return '/'.join(paths)
180 def normalize_slash(value):
181 """Strip and normalize slashes in a string
183 >>> normalize_slash('///Meow///Cat///')
184 'Meow/Cat'
187 value = value.strip('/')
188 new_value = value.replace('//', '/')
189 while new_value != value:
190 value = new_value
191 new_value = value.replace('//', '/')
192 return value
195 def pathjoin(paths):
196 """Join a list of paths using '/' regardless of platform
198 >>> pathjoin(['a', 'b', 'c'])
199 'a/b/c'
202 return join(*paths)
205 def pathset(path):
206 """Return all of the path components for the specified path
208 >>> pathset('foo/bar/baz') == ['foo', 'foo/bar', 'foo/bar/baz']
209 True
212 result = []
213 parts = splitpath(path)
214 prefix = ''
215 for part in parts:
216 result.append(prefix + part)
217 prefix += part + '/'
219 return result
222 def select_directory(paths):
223 """Return the first directory in a list of paths"""
224 if not paths:
225 return core.getcwd()
227 for path in paths:
228 if core.isdir(path):
229 return path
231 return os.path.dirname(paths[0]) or core.getcwd()
234 def strip_prefix(prefix, string):
235 """Return string, without the prefix. Blow up if string doesn't
236 start with prefix."""
237 assert string.startswith(prefix)
238 return string[len(prefix) :]
241 def tablength(word, tabwidth):
242 """Return length of a word taking tabs into account
244 >>> tablength("\\t\\t\\t\\tX", 8)
248 return len(word.replace('\t', '')) + word.count('\t') * tabwidth
251 def _shell_split_py2(value):
252 """Python2 requires bytes inputs to shlex.split(). Returns [unicode]"""
253 try:
254 result = shlex.split(core.encode(value))
255 except ValueError:
256 result = core.encode(value).strip().split()
257 # Decode to unicode strings
258 return [core.decode(arg) for arg in result]
261 def _shell_split_py3(value):
262 """Python3 requires unicode inputs to shlex.split(). Converts to unicode"""
263 try:
264 result = shlex.split(value)
265 except ValueError:
266 result = core.decode(value).strip().split()
267 # Already unicode
268 return result
271 def shell_split(value):
272 if compat.PY2:
273 # Encode before calling split()
274 values = _shell_split_py2(value)
275 else:
276 # Python3 does not need the encode/decode dance
277 values = _shell_split_py3(value)
278 return values
281 def tmp_filename(label, suffix=''):
282 label = 'git-cola-' + label.replace('/', '-').replace('\\', '-')
283 with tempfile.NamedTemporaryFile(
284 prefix=label + '-', suffix=suffix, delete=False
285 ) as handle:
286 return handle.name
289 def is_linux():
290 """Is this a linux machine?"""
291 return sys.platform.startswith('linux')
294 def is_debian():
295 """Is it debian?"""
296 return os.path.exists('/usr/bin/apt-get')
299 def is_darwin():
300 """Return True on OSX."""
301 return sys.platform == 'darwin'
304 def is_win32():
305 """Return True on win32"""
306 return sys.platform in {'win32', 'cygwin'}
309 def launch_default_app(paths):
310 """Execute the default application on the specified paths"""
311 if is_win32():
312 for path in paths:
313 if hasattr(os, 'startfile'):
314 os.startfile(path) # pylint: disable=no-member
315 return
317 if is_darwin():
318 launcher = 'open'
319 else:
320 launcher = 'xdg-open'
322 core.fork([launcher] + paths)
325 def expandpath(path):
326 """Expand ~user/ and environment $variables"""
327 path = os.path.expandvars(path)
328 if path.startswith('~'):
329 path = os.path.expanduser(path)
330 return path
333 class Group:
334 """Operate on a collection of objects as a single unit"""
336 def __init__(self, *members):
337 self._members = members
339 def __getattr__(self, name):
340 """Return a function that relays calls to the group"""
342 def relay(*args, **kwargs):
343 for member in self._members:
344 method = getattr(member, name)
345 method(*args, **kwargs)
347 setattr(self, name, relay)
348 return relay
351 class Proxy:
352 """Wrap an object and override attributes"""
354 def __init__(self, obj, **overrides):
355 self._obj = obj
356 for k, v in overrides.items():
357 setattr(self, k, v)
359 def __getattr__(self, name):
360 return getattr(self._obj, name)
363 def slice_func(input_items, map_func):
364 """Slice input_items and call `map_func` over every slice
366 This exists because of "errno: Argument list too long"
369 # This comment appeared near the top of include/linux/binfmts.h
370 # in the Linux source tree:
372 # /*
373 # * MAX_ARG_PAGES defines the number of pages allocated for arguments
374 # * and envelope for the new program. 32 should suffice, this gives
375 # * a maximum env+arg of 128kB w/4KB pages!
376 # */
377 # #define MAX_ARG_PAGES 32
379 # 'size' is a heuristic to keep things highly performant by minimizing
380 # the number of slices. If we wanted it to run as few commands as
381 # possible we could call "getconf ARG_MAX" and make a better guess,
382 # but it's probably not worth the complexity (and the extra call to
383 # getconf that we can't do on Windows anyways).
385 # In my testing, getconf ARG_MAX on Mac OS X Mountain Lion reported
386 # 262144 and Debian/Linux-x86_64 reported 2097152.
388 # The hard-coded max_arg_len value is safely below both of these
389 # real-world values.
391 # 4K pages x 32 MAX_ARG_PAGES
392 max_arg_len = (32 * 4096) // 4 # allow plenty of space for the environment
393 max_filename_len = 256
394 size = max_arg_len // max_filename_len
396 status = 0
397 outs = []
398 errs = []
400 items = copy.copy(input_items)
401 while items:
402 stat, out, err = map_func(items[:size])
403 if stat < 0:
404 status = min(stat, status)
405 else:
406 status = max(stat, status)
407 outs.append(out)
408 errs.append(err)
409 items = items[size:]
411 return (status, '\n'.join(outs), '\n'.join(errs))
414 class Sequence:
415 def __init__(self, sequence):
416 self.sequence = sequence
418 def index(self, item, default=-1):
419 try:
420 idx = self.sequence.index(item)
421 except ValueError:
422 idx = default
423 return idx
425 def __getitem__(self, idx):
426 return self.sequence[idx]
429 def catch_runtime_error(func, *args, **kwargs):
430 """Run the function safely.
432 Catch RuntimeError to avoid tracebacks during application shutdown.
435 # Signals and callbacks can sometimes get triggered during application shutdown.
436 # This can happen when exiting while background tasks are still processing.
437 # Guard against this by making this operation a no-op.
438 try:
439 valid = True
440 result = func(*args, **kwargs)
441 except RuntimeError:
442 valid = False
443 result = None
444 return (valid, result)