fetch: add ability to fetch into a remote tracking branch
[git-cola.git] / cola / git.py
blob8bbd3208f9641c2c9d35c1998ffc10df008f9e51
1 from functools import partial
2 import errno
3 import os
4 from os.path import join
5 import subprocess
6 import threading
7 import time
9 from . import core
10 from .compat import int_types
11 from .compat import ustr
12 from .compat import WIN32
13 from .decorators import memoize
14 from .interaction import Interaction
17 GIT_COLA_TRACE = core.getenv('GIT_COLA_TRACE', '')
18 GIT = core.getenv('GIT_COLA_GIT', 'git')
19 STATUS = 0
20 STDOUT = 1
21 STDERR = 2
23 # Object ID / SHA-1-related constants
24 # Git's empty tree is a built-in constant object name.
25 EMPTY_TREE_OID = '4b825dc642cb6eb9a060e54bf8d69288fbee4904'
26 # Git's diff machinery returns zeroes for modified files whose content exists
27 # in the worktree only.
28 MISSING_BLOB_OID = '0000000000000000000000000000000000000000'
29 # Git's SHA-1 object IDs are 40 characters long.
30 # This will need to change when Git moves away from SHA-1.
31 # When that happens we'll have to detect and update this at runtime in
32 # order to support both old and new git.
33 OID_LENGTH = 40
35 _index_lock = threading.Lock()
38 def dashify(value):
39 return value.replace('_', '-')
42 def is_git_dir(git_dir):
43 """From git's setup.c:is_git_directory()."""
44 result = False
45 if git_dir:
46 headref = join(git_dir, 'HEAD')
48 if (
49 core.isdir(git_dir)
50 and (
51 core.isdir(join(git_dir, 'objects'))
52 and core.isdir(join(git_dir, 'refs'))
54 or (
55 core.isfile(join(git_dir, 'gitdir'))
56 and core.isfile(join(git_dir, 'commondir'))
59 result = core.isfile(headref) or (
60 core.islink(headref) and core.readlink(headref).startswith('refs/')
62 else:
63 result = is_git_file(git_dir)
65 return result
68 def is_git_file(filename):
69 return core.isfile(filename) and os.path.basename(filename) == '.git'
72 def is_git_worktree(dirname):
73 return is_git_dir(join(dirname, '.git'))
76 def is_git_repository(path):
77 return is_git_worktree(path) or is_git_dir(path)
80 def read_git_file(path):
81 """Read the path from a .git-file
83 `None` is returned when <path> is not a .git-file.
85 """
86 result = None
87 if path and is_git_file(path):
88 header = 'gitdir: '
89 data = core.read(path).strip()
90 if data.startswith(header):
91 result = data[len(header) :]
92 if result and not os.path.isabs(result):
93 path_folder = os.path.dirname(path)
94 repo_relative = join(path_folder, result)
95 result = os.path.normpath(repo_relative)
96 return result
99 class Paths:
100 """Git repository paths of interest"""
102 def __init__(self, git_dir=None, git_file=None, worktree=None, common_dir=None):
103 if git_dir and not is_git_dir(git_dir):
104 git_dir = None
105 self.git_dir = git_dir
106 self.git_file = git_file
107 self.worktree = worktree
108 self.common_dir = common_dir
110 def get(self, path):
111 """Search for git worktrees and bare repositories"""
112 if not self.git_dir or not self.worktree:
113 ceiling_dirs = set()
114 ceiling = core.getenv('GIT_CEILING_DIRECTORIES')
115 if ceiling:
116 ceiling_dirs.update([x for x in ceiling.split(os.pathsep) if x])
117 if path:
118 path = core.abspath(path)
119 self._search_for_git(path, ceiling_dirs)
121 if self.git_dir:
122 git_dir_path = read_git_file(self.git_dir)
123 if git_dir_path:
124 self.git_file = self.git_dir
125 self.git_dir = git_dir_path
127 commondir_file = join(git_dir_path, 'commondir')
128 if core.exists(commondir_file):
129 common_path = core.read(commondir_file).strip()
130 if common_path:
131 if os.path.isabs(common_path):
132 common_dir = common_path
133 else:
134 common_dir = join(git_dir_path, common_path)
135 common_dir = os.path.normpath(common_dir)
136 self.common_dir = common_dir
137 # usage: Paths().get()
138 return self
140 def _search_for_git(self, path, ceiling_dirs):
141 """Search for git repositories located at path or above"""
142 while path:
143 if path in ceiling_dirs:
144 break
145 if is_git_dir(path):
146 if not self.git_dir:
147 self.git_dir = path
148 basename = os.path.basename(path)
149 if not self.worktree and basename == '.git':
150 self.worktree = os.path.dirname(path)
151 # We are either in a bare repository, or someone set GIT_DIR
152 # but did not set GIT_WORK_TREE.
153 if self.git_dir:
154 if not self.worktree:
155 basename = os.path.basename(self.git_dir)
156 if basename == '.git':
157 self.worktree = os.path.dirname(self.git_dir)
158 elif path and not is_git_dir(path):
159 self.worktree = path
160 break
161 gitpath = join(path, '.git')
162 if is_git_dir(gitpath):
163 if not self.git_dir:
164 self.git_dir = gitpath
165 if not self.worktree:
166 self.worktree = path
167 break
168 path, dummy = os.path.split(path)
169 if not dummy:
170 break
173 def find_git_directory(path):
174 """Perform Git repository discovery"""
175 return Paths(
176 git_dir=core.getenv('GIT_DIR'), worktree=core.getenv('GIT_WORK_TREE')
177 ).get(path)
180 class Git:
182 The Git class manages communication with the Git binary
185 def __init__(self):
186 self.paths = Paths()
188 self._valid = {} #: Store the result of is_git_dir() for performance
189 self.set_worktree(core.getcwd())
191 def is_git_repository(self, path):
192 return is_git_repository(path)
194 def getcwd(self):
195 """Return the working directory used by git()"""
196 return self.paths.worktree or self.paths.git_dir
198 def set_worktree(self, path):
199 path = core.decode(path)
200 self.paths = find_git_directory(path)
201 return self.paths.worktree
203 def worktree(self):
204 if not self.paths.worktree:
205 path = core.abspath(core.getcwd())
206 self.paths = find_git_directory(path)
207 return self.paths.worktree
209 def is_valid(self):
210 """Is this a valid git repository?
212 Cache the result to avoid hitting the filesystem.
215 git_dir = self.paths.git_dir
216 try:
217 valid = bool(git_dir) and self._valid[git_dir]
218 except KeyError:
219 valid = self._valid[git_dir] = is_git_dir(git_dir)
221 return valid
223 def git_path(self, *paths):
224 result = None
225 if self.paths.git_dir:
226 result = join(self.paths.git_dir, *paths)
227 if result and self.paths.common_dir and not core.exists(result):
228 common_result = join(self.paths.common_dir, *paths)
229 if core.exists(common_result):
230 result = common_result
231 return result
233 def git_dir(self):
234 if not self.paths.git_dir:
235 path = core.abspath(core.getcwd())
236 self.paths = find_git_directory(path)
237 return self.paths.git_dir
239 def __getattr__(self, name):
240 git_cmd = partial(self.git, name)
241 setattr(self, name, git_cmd)
242 return git_cmd
244 @staticmethod
245 def execute(
246 command,
247 _cwd=None,
248 _decode=True,
249 _encoding=None,
250 _raw=False,
251 _stdin=None,
252 _stderr=subprocess.PIPE,
253 _stdout=subprocess.PIPE,
254 _readonly=False,
255 _no_win32_startupinfo=False,
258 Execute a command and returns its output
260 :param command: argument list to execute.
261 :param _cwd: working directory, defaults to the current directory.
262 :param _decode: whether to decode output, defaults to True.
263 :param _encoding: default encoding, defaults to None (utf-8).
264 :param _readonly: avoid taking the index lock. Assume the command is read-only.
265 :param _raw: do not strip trailing whitespace.
266 :param _stdin: optional stdin filehandle.
267 :returns (status, out, err): exit status, stdout, stderr
270 # Allow the user to have the command executed in their working dir.
271 if not _cwd:
272 _cwd = core.getcwd()
274 extra = {}
276 if hasattr(os, 'setsid'):
277 # SSH uses the SSH_ASKPASS variable only if the process is really
278 # detached from the TTY (stdin redirection and setting the
279 # SSH_ASKPASS environment variable is not enough). To detach a
280 # process from the console it should fork and call os.setsid().
281 extra['preexec_fn'] = os.setsid
283 start_time = time.time()
285 # Start the process
286 # Guard against thread-unsafe .git/index.lock files
287 if not _readonly:
288 _index_lock.acquire()
289 try:
290 status, out, err = core.run_command(
291 command,
292 cwd=_cwd,
293 encoding=_encoding,
294 stdin=_stdin,
295 stdout=_stdout,
296 stderr=_stderr,
297 no_win32_startupinfo=_no_win32_startupinfo,
298 **extra,
300 finally:
301 # Let the next thread in
302 if not _readonly:
303 _index_lock.release()
305 end_time = time.time()
306 elapsed_time = abs(end_time - start_time)
308 if not _raw and out is not None:
309 out = core.UStr(out.rstrip('\n'), out.encoding)
311 cola_trace = GIT_COLA_TRACE
312 if cola_trace == 'trace':
313 msg = f'trace: {elapsed_time:.3f}s: {core.list2cmdline(command)}'
314 Interaction.log_status(status, msg, '')
315 elif cola_trace == 'full':
316 if out or err:
317 core.print_stderr(
318 "# %.3fs: %s -> %d: '%s' '%s'"
319 % (elapsed_time, ' '.join(command), status, out, err)
321 else:
322 core.print_stderr(
323 '# %.3fs: %s -> %d' % (elapsed_time, ' '.join(command), status)
325 elif cola_trace:
326 core.print_stderr('# {:.3f}s: {}'.format(elapsed_time, ' '.join(command)))
328 # Allow access to the command's status code
329 return (status, out, err)
331 def git(self, cmd, *args, **kwargs):
332 # Handle optional arguments prior to calling transform_kwargs
333 # otherwise they'll end up in args, which is bad.
334 _kwargs = {'_cwd': self.getcwd()}
335 execute_kwargs = (
336 '_cwd',
337 '_decode',
338 '_encoding',
339 '_stdin',
340 '_stdout',
341 '_stderr',
342 '_raw',
343 '_readonly',
344 '_no_win32_startupinfo',
347 for kwarg in execute_kwargs:
348 if kwarg in kwargs:
349 _kwargs[kwarg] = kwargs.pop(kwarg)
351 # Prepare the argument list
352 git_args = [
353 GIT,
354 '-c',
355 'diff.suppressBlankEmpty=false',
356 '-c',
357 'log.showSignature=false',
358 dashify(cmd),
360 opt_args = transform_kwargs(**kwargs)
361 call = git_args + opt_args
362 call.extend(args)
363 try:
364 result = self.execute(call, **_kwargs)
365 except OSError as exc:
366 if WIN32 and exc.errno == errno.ENOENT:
367 # see if git exists at all. On win32 it can fail with ENOENT in
368 # case of argv overflow. We should be safe from that but use
369 # defensive coding for the worst-case scenario. On UNIX
370 # we have ENAMETOOLONG but that doesn't exist on Windows.
371 if _git_is_installed():
372 raise exc
373 _print_win32_git_hint()
374 result = (1, '', "error: unable to execute '%s'" % GIT)
375 return result
378 def _git_is_installed():
379 """Return True if git is installed"""
380 # On win32 Git commands can fail with ENOENT in case of argv overflow. We
381 # should be safe from that but use defensive coding for the worst-case
382 # scenario. On UNIX we have ENAMETOOLONG but that doesn't exist on
383 # Windows.
384 try:
385 status, _, _ = Git.execute([GIT, '--version'])
386 result = status == 0
387 except OSError:
388 result = False
389 return result
392 def transform_kwargs(**kwargs):
393 """Transform kwargs into git command line options
395 Callers can assume the following behavior:
397 Passing foo=None ignores foo, so that callers can
398 use default values of None that are ignored unless
399 set explicitly.
401 Passing foo=False ignore foo, for the same reason.
403 Passing foo={string-or-number} results in ['--foo=<value>']
404 in the resulting arguments.
407 args = []
408 types_to_stringify = (ustr, float, str) + int_types
410 for k, value in kwargs.items():
411 if len(k) == 1:
412 dashes = '-'
413 equals = ''
414 else:
415 dashes = '--'
416 equals = '='
417 # isinstance(False, int) is True, so we have to check bool first
418 if isinstance(value, bool):
419 if value:
420 args.append(f'{dashes}{dashify(k)}')
421 # else: pass # False is ignored; flag=False inhibits --flag
422 elif isinstance(value, types_to_stringify):
423 args.append(f'{dashes}{dashify(k)}{equals}{value}')
425 return args
428 def win32_git_error_hint():
429 return (
430 '\n'
431 'NOTE: If you have Git installed in a custom location, e.g.\n'
432 'C:\\Tools\\Git, then you can create a file at\n'
433 '~/.config/git-cola/git-bindir with following text\n'
434 'and git-cola will add the specified location to your $PATH\n'
435 'automatically when starting cola:\n'
436 '\n'
437 r'C:\Tools\Git\bin'
441 @memoize
442 def _print_win32_git_hint():
443 hint = '\n' + win32_git_error_hint() + '\n'
444 core.print_stderr("error: unable to execute 'git'" + hint)
447 def create():
448 """Create Git instances
450 >>> git = create()
451 >>> status, out, err = git.version()
452 >>> 'git' == out[:3].lower()
453 True
456 return Git()