git-cola v4.3.1
[git-cola.git] / cola / utils.py
blobbaf7f9225f78f35e75bafe0ca82d2aa342fe5b31
1 """Miscellaneous utility functions"""
2 from __future__ import absolute_import, division, print_function, unicode_literals
3 import copy
4 import os
5 import random
6 import re
7 import shlex
8 import sys
9 import tempfile
10 import time
11 import traceback
13 from . import core
14 from . import compat
16 random.seed(hash(time.time()))
19 def asint(obj, default=0):
20 """Make any value into an int, even if the cast fails"""
21 try:
22 value = int(obj)
23 except (TypeError, ValueError):
24 value = default
25 return value
28 def clamp(value, low, high):
29 """Clamp a value to the specified range"""
30 return min(high, max(low, value))
33 def epoch_millis():
34 return int(time.time() * 1000)
37 def add_parents(paths):
38 """Iterate over each item in the set and add its parent directories."""
39 all_paths = set()
40 for path in paths:
41 while '//' in path:
42 path = path.replace('//', '/')
43 all_paths.add(path)
44 if '/' in path:
45 parent_dir = dirname(path)
46 while parent_dir:
47 all_paths.add(parent_dir)
48 parent_dir = dirname(parent_dir)
49 return all_paths
52 def format_exception(exc):
53 """Format an exception object for display"""
54 exc_type, exc_value, exc_tb = sys.exc_info()
55 details = traceback.format_exception(exc_type, exc_value, exc_tb)
56 details = '\n'.join(map(core.decode, details))
57 if hasattr(exc, 'msg'):
58 msg = exc.msg
59 else:
60 msg = core.decode(repr(exc))
61 return (msg, details)
64 def sublist(values, remove):
65 """Subtracts list b from list a and returns the resulting list."""
66 # conceptually, c = a - b
67 result = []
68 for item in values:
69 if item not in remove:
70 result.append(item)
71 return result
74 __grep_cache = {}
77 def grep(pattern, items, squash=True):
78 """Greps a list for items that match a pattern
80 :param squash: If only one item matches, return just that item
81 :returns: List of matching items
83 """
84 isdict = isinstance(items, dict)
85 if pattern in __grep_cache:
86 regex = __grep_cache[pattern]
87 else:
88 regex = __grep_cache[pattern] = re.compile(pattern)
89 matched = []
90 matchdict = {}
91 for item in items:
92 match = regex.match(item)
93 if not match:
94 continue
95 groups = match.groups()
96 if not groups:
97 subitems = match.group(0)
98 else:
99 if len(groups) == 1:
100 subitems = groups[0]
101 else:
102 subitems = list(groups)
103 if isdict:
104 matchdict[item] = items[item]
105 else:
106 matched.append(subitems)
108 if isdict:
109 result = matchdict
110 elif squash and len(matched) == 1:
111 result = matched[0]
112 else:
113 result = matched
115 return result
118 def basename(path):
120 An os.path.basename() implementation that always uses '/'
122 Avoid os.path.basename because git's output always
123 uses '/' regardless of platform.
126 return path.rsplit('/', 1)[-1]
129 def strip_one(path):
130 """Strip one level of directory"""
131 return path.strip('/').split('/', 1)[-1]
134 def dirname(path, current_dir=''):
136 An os.path.dirname() implementation that always uses '/'
138 Avoid os.path.dirname because git's output always
139 uses '/' regardless of platform.
142 while '//' in path:
143 path = path.replace('//', '/')
144 path_dirname = path.rsplit('/', 1)[0]
145 if path_dirname == path:
146 return current_dir
147 return path.rsplit('/', 1)[0]
150 def splitpath(path):
151 """Split paths using '/' regardless of platform"""
152 return path.split('/')
155 def split(name):
156 """Split a path-like name. Returns tuple "(head, tail)" where "tail" is
157 everything after the final slash. The "head" may be empty.
159 This is the same as os.path.split() but only uses '/' as the delimiter.
161 >>> split('a/b/c')
162 ('a/b', 'c')
164 >>> split('xyz')
165 ('', 'xyz')
168 return (dirname(name), basename(name))
171 def join(*paths):
172 """Join paths using '/' regardless of platform
174 >>> join('a', 'b', 'c')
175 'a/b/c'
178 return '/'.join(paths)
181 def normalize_slash(value):
182 """Strip and normalize slashes in a string
184 >>> normalize_slash('///Meow///Cat///')
185 'Meow/Cat'
188 value = value.strip('/')
189 new_value = value.replace('//', '/')
190 while new_value != value:
191 value = new_value
192 new_value = value.replace('//', '/')
193 return value
196 def pathjoin(paths):
197 """Join a list of paths using '/' regardless of platform
199 >>> pathjoin(['a', 'b', 'c'])
200 'a/b/c'
203 return join(*paths)
206 def pathset(path):
207 """Return all of the path components for the specified path
209 >>> pathset('foo/bar/baz') == ['foo', 'foo/bar', 'foo/bar/baz']
210 True
213 result = []
214 parts = splitpath(path)
215 prefix = ''
216 for part in parts:
217 result.append(prefix + part)
218 prefix += part + '/'
220 return result
223 def select_directory(paths):
224 """Return the first directory in a list of paths"""
225 if not paths:
226 return core.getcwd()
228 for path in paths:
229 if core.isdir(path):
230 return path
232 return os.path.dirname(paths[0]) or core.getcwd()
235 def strip_prefix(prefix, string):
236 """Return string, without the prefix. Blow up if string doesn't
237 start with prefix."""
238 assert string.startswith(prefix)
239 return string[len(prefix) :]
242 def sanitize(value):
243 """Removes shell metacharacters from a string."""
244 for char in """ \t!@#$%^&*()\\;,<>"'[]{}~|""":
245 value = value.replace(char, '_')
246 return value
249 def tablength(word, tabwidth):
250 """Return length of a word taking tabs into account
252 >>> tablength("\\t\\t\\t\\tX", 8)
256 return len(word.replace('\t', '')) + word.count('\t') * tabwidth
259 def _shell_split_py2(value):
260 """Python2 requires bytes inputs to shlex.split(). Returns [unicode]"""
261 try:
262 result = shlex.split(core.encode(value))
263 except ValueError:
264 result = core.encode(value).strip().split()
265 # Decode to unicode strings
266 return [core.decode(arg) for arg in result]
269 def _shell_split_py3(value):
270 """Python3 requires unicode inputs to shlex.split(). Converts to unicode"""
271 try:
272 result = shlex.split(value)
273 except ValueError:
274 result = core.decode(value).strip().split()
275 # Already unicode
276 return result
279 def shell_split(value):
280 if compat.PY2:
281 # Encode before calling split()
282 values = _shell_split_py2(value)
283 else:
284 # Python3 does not need the encode/decode dance
285 values = _shell_split_py3(value)
286 return values
289 def tmp_filename(label, suffix=''):
290 label = 'git-cola-' + label.replace('/', '-').replace('\\', '-')
291 with tempfile.NamedTemporaryFile(
292 prefix=label + '-', suffix=suffix, delete=False
293 ) as handle:
294 return handle.name
297 def is_linux():
298 """Is this a linux machine?"""
299 return sys.platform.startswith('linux')
302 def is_debian():
303 """Is it debian?"""
304 return os.path.exists('/usr/bin/apt-get')
307 def is_darwin():
308 """Return True on OSX."""
309 return sys.platform == 'darwin'
312 def is_win32():
313 """Return True on win32"""
314 return sys.platform in {'win32', 'cygwin'}
317 def launch_default_app(paths):
318 """Execute the default application on the specified paths"""
319 if is_win32():
320 for path in paths:
321 if hasattr(os, 'startfile'):
322 os.startfile(path) # pylint: disable=no-member
323 return
325 if is_darwin():
326 launcher = 'open'
327 else:
328 launcher = 'xdg-open'
330 core.fork([launcher] + paths)
333 def expandpath(path):
334 """Expand ~user/ and environment $variables"""
335 path = os.path.expandvars(path)
336 if path.startswith('~'):
337 path = os.path.expanduser(path)
338 return path
341 class Group(object):
342 """Operate on a collection of objects as a single unit"""
344 def __init__(self, *members):
345 self._members = members
347 def __getattr__(self, name):
348 """Return a function that relays calls to the group"""
350 def relay(*args, **kwargs):
351 for member in self._members:
352 method = getattr(member, name)
353 method(*args, **kwargs)
355 setattr(self, name, relay)
356 return relay
359 class Proxy(object):
360 """Wrap an object and override attributes"""
362 def __init__(self, obj, **overrides):
363 self._obj = obj
364 for k, v in overrides.items():
365 setattr(self, k, v)
367 def __getattr__(self, name):
368 return getattr(self._obj, name)
371 def slice_func(input_items, map_func):
372 """Slice input_items and call `map_func` over every slice
374 This exists because of "errno: Argument list too long"
377 # This comment appeared near the top of include/linux/binfmts.h
378 # in the Linux source tree:
380 # /*
381 # * MAX_ARG_PAGES defines the number of pages allocated for arguments
382 # * and envelope for the new program. 32 should suffice, this gives
383 # * a maximum env+arg of 128kB w/4KB pages!
384 # */
385 # #define MAX_ARG_PAGES 32
387 # 'size' is a heuristic to keep things highly performant by minimizing
388 # the number of slices. If we wanted it to run as few commands as
389 # possible we could call "getconf ARG_MAX" and make a better guess,
390 # but it's probably not worth the complexity (and the extra call to
391 # getconf that we can't do on Windows anyways).
393 # In my testing, getconf ARG_MAX on Mac OS X Mountain Lion reported
394 # 262144 and Debian/Linux-x86_64 reported 2097152.
396 # The hard-coded max_arg_len value is safely below both of these
397 # real-world values.
399 # 4K pages x 32 MAX_ARG_PAGES
400 max_arg_len = (32 * 4096) // 4 # allow plenty of space for the environment
401 max_filename_len = 256
402 size = max_arg_len // max_filename_len
404 status = 0
405 outs = []
406 errs = []
408 items = copy.copy(input_items)
409 while items:
410 stat, out, err = map_func(items[:size])
411 if stat < 0:
412 status = min(stat, status)
413 else:
414 status = max(stat, status)
415 outs.append(out)
416 errs.append(err)
417 items = items[size:]
419 return (status, '\n'.join(outs), '\n'.join(errs))
422 class Sequence(object):
423 def __init__(self, sequence):
424 self.sequence = sequence
426 def index(self, item, default=-1):
427 try:
428 idx = self.sequence.index(item)
429 except ValueError:
430 idx = default
431 return idx
433 def __getitem__(self, idx):
434 return self.sequence[idx]
437 def catch_runtime_error(func, *args, **kwargs):
438 """Run the function safely.
440 Catch RuntimeError to avoid tracebacks during application shutdown.
443 # Signals and callbacks can sometimes get triggered during application shutdown.
444 # This can happen when exiting while background tasks are still processing.
445 # Guard against this by making this operation a no-op.
446 try:
447 valid = True
448 result = func(*args, **kwargs)
449 except RuntimeError:
450 valid = False
451 result = None
452 return (valid, result)