dev: format code using "garden fmt" (black)
[git-cola.git] / cola / git.py
blob7981ea454682c4e80c01a2f6f1b8f2a877730c1e
1 from __future__ import absolute_import, division, print_function, unicode_literals
2 from functools import partial
3 import errno
4 import os
5 from os.path import join
6 import subprocess
7 import threading
8 import time
10 from . import core
11 from .compat import int_types
12 from .compat import ustr
13 from .compat import WIN32
14 from .decorators import memoize
15 from .interaction import Interaction
18 GIT_COLA_TRACE = core.getenv('GIT_COLA_TRACE', '')
19 GIT = core.getenv('GIT_COLA_GIT', 'git')
20 STATUS = 0
21 STDOUT = 1
22 STDERR = 2
24 # Object ID / SHA1-related constants
25 # Git's empty tree is a built-in constant object name.
26 EMPTY_TREE_OID = '4b825dc642cb6eb9a060e54bf8d69288fbee4904'
27 # Git's diff machinery returns zeroes for modified files whose content exists
28 # in the worktree only.
29 MISSING_BLOB_OID = '0000000000000000000000000000000000000000'
30 # Git's SHA-1 object IDs are 40 characters long.
31 # This will need to change when Git moves away from SHA-1.
32 # When that happens we'll have to detect and update this at runtime in
33 # order to support both old and new git.
34 OID_LENGTH = 40
36 _index_lock = threading.Lock()
39 def dashify(value):
40 return value.replace('_', '-')
43 def is_git_dir(git_dir):
44 """From git's setup.c:is_git_directory()."""
45 result = False
46 if git_dir:
47 headref = join(git_dir, 'HEAD')
49 if (
50 core.isdir(git_dir)
51 and (
52 core.isdir(join(git_dir, 'objects'))
53 and core.isdir(join(git_dir, 'refs'))
55 or (
56 core.isfile(join(git_dir, 'gitdir'))
57 and core.isfile(join(git_dir, 'commondir'))
60 result = core.isfile(headref) or (
61 core.islink(headref) and core.readlink(headref).startswith('refs/')
63 else:
64 result = is_git_file(git_dir)
66 return result
69 def is_git_file(filename):
70 return core.isfile(filename) and os.path.basename(filename) == '.git'
73 def is_git_worktree(dirname):
74 return is_git_dir(join(dirname, '.git'))
77 def is_git_repository(path):
78 return is_git_worktree(path) or is_git_dir(path)
81 def read_git_file(path):
82 """Read the path from a .git-file
84 `None` is returned when <path> is not a .git-file.
86 """
87 result = None
88 if path and is_git_file(path):
89 header = 'gitdir: '
90 data = core.read(path).strip()
91 if data.startswith(header):
92 result = data[len(header) :]
93 if result and not os.path.isabs(result):
94 path_folder = os.path.dirname(path)
95 repo_relative = join(path_folder, result)
96 result = os.path.normpath(repo_relative)
97 return result
100 class Paths(object):
101 """Git repository paths of interest"""
103 def __init__(self, git_dir=None, git_file=None, worktree=None, common_dir=None):
104 if git_dir and not is_git_dir(git_dir):
105 git_dir = None
106 self.git_dir = git_dir
107 self.git_file = git_file
108 self.worktree = worktree
109 self.common_dir = common_dir
111 def get(self, path):
112 ceiling_dirs = set()
113 ceiling = core.getenv('GIT_CEILING_DIRECTORIES')
114 if ceiling:
115 ceiling_dirs.update([x for x in ceiling.split(':') if x])
117 if path:
118 path = core.abspath(path)
120 if not self.git_dir or not self.worktree:
121 # Search for a .git directory
122 while path:
123 if path in ceiling_dirs:
124 break
125 if is_git_dir(path):
126 if not self.git_dir:
127 self.git_dir = path
128 basename = os.path.basename(path)
129 if not self.worktree and basename == '.git':
130 self.worktree = os.path.dirname(path)
131 # We are either in a bare repository, or someone set GIT_DIR
132 # but did not set GIT_WORK_TREE.
133 if self.git_dir:
134 if not self.worktree:
135 basename = os.path.basename(self.git_dir)
136 if basename == '.git':
137 self.worktree = os.path.dirname(self.git_dir)
138 elif path and not is_git_dir(path):
139 self.worktree = path
140 break
141 gitpath = join(path, '.git')
142 if is_git_dir(gitpath):
143 if not self.git_dir:
144 self.git_dir = gitpath
145 if not self.worktree:
146 self.worktree = path
147 break
148 path, dummy = os.path.split(path)
149 if not dummy:
150 break
152 if self.git_dir:
153 git_dir_path = read_git_file(self.git_dir)
154 if git_dir_path:
155 self.git_file = self.git_dir
156 self.git_dir = git_dir_path
158 commondir_file = join(git_dir_path, 'commondir')
159 if core.exists(commondir_file):
160 common_path = core.read(commondir_file).strip()
161 if common_path:
162 if os.path.isabs(common_path):
163 common_dir = common_path
164 else:
165 common_dir = join(git_dir_path, common_path)
166 common_dir = os.path.normpath(common_dir)
167 self.common_dir = common_dir
168 # usage: Paths().get()
169 return self
172 def find_git_directory(path):
173 """Perform Git repository discovery"""
174 return Paths(
175 git_dir=core.getenv('GIT_DIR'), worktree=core.getenv('GIT_WORK_TREE')
176 ).get(path)
179 class Git(object):
181 The Git class manages communication with the Git binary
184 def __init__(self):
185 self.paths = Paths()
187 self._valid = {} #: Store the result of is_git_dir() for performance
188 self.set_worktree(core.getcwd())
190 def is_git_repository(self, path):
191 return is_git_repository(path)
193 def getcwd(self):
194 """Return the working directory used by git()"""
195 return self.paths.worktree or self.paths.git_dir
197 def set_worktree(self, path):
198 path = core.decode(path)
199 self.paths = find_git_directory(path)
200 return self.paths.worktree
202 def worktree(self):
203 if not self.paths.worktree:
204 path = core.abspath(core.getcwd())
205 self.paths = find_git_directory(path)
206 return self.paths.worktree
208 def is_valid(self):
209 """Is this a valid git repository?
211 Cache the result to avoid hitting the filesystem.
214 git_dir = self.paths.git_dir
215 try:
216 valid = bool(git_dir) and self._valid[git_dir]
217 except KeyError:
218 valid = self._valid[git_dir] = is_git_dir(git_dir)
220 return valid
222 def git_path(self, *paths):
223 result = None
224 if self.paths.git_dir:
225 result = join(self.paths.git_dir, *paths)
226 if result and self.paths.common_dir and not core.exists(result):
227 common_result = join(self.paths.common_dir, *paths)
228 if core.exists(common_result):
229 result = common_result
230 return result
232 def git_dir(self):
233 if not self.paths.git_dir:
234 path = core.abspath(core.getcwd())
235 self.paths = find_git_directory(path)
236 return self.paths.git_dir
238 def __getattr__(self, name):
239 git_cmd = partial(self.git, name)
240 setattr(self, name, git_cmd)
241 return git_cmd
243 @staticmethod
244 def execute(
245 command,
246 _cwd=None,
247 _decode=True,
248 _encoding=None,
249 _raw=False,
250 _stdin=None,
251 _stderr=subprocess.PIPE,
252 _stdout=subprocess.PIPE,
253 _readonly=False,
254 _no_win32_startupinfo=False,
257 Execute a command and returns its output
259 :param command: argument list to execute.
260 :param _cwd: working directory, defaults to the current directory.
261 :param _decode: whether to decode output, defaults to True.
262 :param _encoding: default encoding, defaults to None (utf-8).
263 :param _readonly: avoid taking the index lock. Assume the command is read-only.
264 :param _raw: do not strip trailing whitespace.
265 :param _stdin: optional stdin filehandle.
266 :returns (status, out, err): exit status, stdout, stderr
269 # Allow the user to have the command executed in their working dir.
270 if not _cwd:
271 _cwd = core.getcwd()
273 extra = {}
275 if hasattr(os, 'setsid'):
276 # SSH uses the SSH_ASKPASS variable only if the process is really
277 # detached from the TTY (stdin redirection and setting the
278 # SSH_ASKPASS environment variable is not enough). To detach a
279 # process from the console it should fork and call os.setsid().
280 extra['preexec_fn'] = os.setsid
282 start_time = time.time()
284 # Start the process
285 # Guard against thread-unsafe .git/index.lock files
286 if not _readonly:
287 _index_lock.acquire() # pylint: disable=consider-using-with
288 try:
289 status, out, err = core.run_command(
290 command,
291 cwd=_cwd,
292 encoding=_encoding,
293 stdin=_stdin,
294 stdout=_stdout,
295 stderr=_stderr,
296 no_win32_startupinfo=_no_win32_startupinfo,
297 **extra
299 finally:
300 # Let the next thread in
301 if not _readonly:
302 _index_lock.release()
304 end_time = time.time()
305 elapsed_time = abs(end_time - start_time)
307 if not _raw and out is not None:
308 out = core.UStr(out.rstrip('\n'), out.encoding)
310 cola_trace = GIT_COLA_TRACE
311 if cola_trace == 'trace':
312 msg = 'trace: %.3fs: %s' % (elapsed_time, core.list2cmdline(command))
313 Interaction.log_status(status, msg, '')
314 elif cola_trace == 'full':
315 if out or err:
316 core.print_stderr(
317 "# %.3fs: %s -> %d: '%s' '%s'"
318 % (elapsed_time, ' '.join(command), status, out, err)
320 else:
321 core.print_stderr(
322 '# %.3fs: %s -> %d' % (elapsed_time, ' '.join(command), status)
324 elif cola_trace:
325 core.print_stderr('# %.3fs: %s' % (elapsed_time, ' '.join(command)))
327 # Allow access to the command's status code
328 return (status, out, err)
330 def git(self, cmd, *args, **kwargs):
331 # Handle optional arguments prior to calling transform_kwargs
332 # otherwise they'll end up in args, which is bad.
333 _kwargs = {'_cwd': self.getcwd()}
334 execute_kwargs = (
335 '_cwd',
336 '_decode',
337 '_encoding',
338 '_stdin',
339 '_stdout',
340 '_stderr',
341 '_raw',
342 '_readonly',
343 '_no_win32_startupinfo',
346 for kwarg in execute_kwargs:
347 if kwarg in kwargs:
348 _kwargs[kwarg] = kwargs.pop(kwarg)
350 # Prepare the argument list
351 git_args = [
352 GIT,
353 '-c',
354 'diff.suppressBlankEmpty=false',
355 '-c',
356 'log.showSignature=false',
357 dashify(cmd),
359 opt_args = transform_kwargs(**kwargs)
360 call = git_args + opt_args
361 call.extend(args)
362 try:
363 result = self.execute(call, **_kwargs)
364 except OSError as exc:
365 if WIN32 and exc.errno == errno.ENOENT:
366 # see if git exists at all. on win32 it can fail with ENOENT in
367 # case of argv overflow. we should be safe from that but use
368 # defensive coding for the worst-case scenario. On UNIX
369 # we have ENAMETOOLONG but that doesn't exist on Windows.
370 if _git_is_installed():
371 raise exc
372 _print_win32_git_hint()
373 result = (1, '', "error: unable to execute '%s'" % GIT)
374 return result
377 def _git_is_installed():
378 """Return True if git is installed"""
379 # On win32 Git commands can fail with ENOENT in case of argv overflow. we
380 # should be safe from that but use defensive coding for the worst-case
381 # scenario. On UNIX we have ENAMETOOLONG but that doesn't exist on
382 # Windows.
383 try:
384 status, _, _ = Git.execute([GIT, '--version'])
385 result = status == 0
386 except OSError:
387 result = False
388 return result
391 def transform_kwargs(**kwargs):
392 """Transform kwargs into git command line options
394 Callers can assume the following behavior:
396 Passing foo=None ignores foo, so that callers can
397 use default values of None that are ignored unless
398 set explicitly.
400 Passing foo=False ignore foo, for the same reason.
402 Passing foo={string-or-number} results in ['--foo=<value>']
403 in the resulting arguments.
406 args = []
407 types_to_stringify = (ustr, float, str) + int_types
409 for k, value in kwargs.items():
410 if len(k) == 1:
411 dashes = '-'
412 equals = ''
413 else:
414 dashes = '--'
415 equals = '='
416 # isinstance(False, int) is True, so we have to check bool first
417 if isinstance(value, bool):
418 if value:
419 args.append('%s%s' % (dashes, dashify(k)))
420 # else: pass # False is ignored; flag=False inhibits --flag
421 elif isinstance(value, types_to_stringify):
422 args.append('%s%s%s%s' % (dashes, dashify(k), equals, value))
424 return args
427 def win32_git_error_hint():
428 return (
429 '\n'
430 'NOTE: If you have Git installed in a custom location, e.g.\n'
431 'C:\\Tools\\Git, then you can create a file at\n'
432 '~/.config/git-cola/git-bindir with following text\n'
433 'and git-cola will add the specified location to your $PATH\n'
434 'automatically when starting cola:\n'
435 '\n'
436 r'C:\Tools\Git\bin'
440 @memoize
441 def _print_win32_git_hint():
442 hint = '\n' + win32_git_error_hint() + '\n'
443 core.print_stderr("error: unable to execute 'git'" + hint)
446 def create():
447 """Create Git instances
449 >>> git = create()
450 >>> status, out, err = git.version()
451 >>> 'git' == out[:3].lower()
452 True
455 return Git()