core: make getcwd() fail-safe
[git-cola.git] / cola / git.py
blobfa2a740ce47572ae01230f7f842b963b4c007e4d
1 from __future__ import division, absolute_import, unicode_literals
2 from functools import partial
3 import errno
4 import os
5 from os.path import join
6 import subprocess
7 import threading
9 from . import core
10 from .compat import int_types
11 from .compat import ustr
12 from .compat import WIN32
13 from .decorators import memoize
14 from .interaction import Interaction
17 GIT_COLA_TRACE = core.getenv('GIT_COLA_TRACE', '')
18 GIT = core.getenv('GIT_COLA_GIT', 'git')
19 STATUS = 0
20 STDOUT = 1
21 STDERR = 2
23 # Object ID / SHA1-related constants
24 # Git's empty tree is a built-in constant object name.
25 EMPTY_TREE_OID = '4b825dc642cb6eb9a060e54bf8d69288fbee4904'
26 # Git's diff machinery returns zeroes for modified files whose content exists
27 # in the worktree only.
28 MISSING_BLOB_OID = '0000000000000000000000000000000000000000'
29 # Git's SHA-1 object IDs are 40 characters long.
30 # This will need to change when Git moves away from SHA-1.
31 # When that happens we'll have to detect and update this at runtime in
32 # order to support both old and new git.
33 OID_LENGTH = 40
35 _index_lock = threading.Lock()
38 def dashify(s):
39 return s.replace('_', '-')
42 def is_git_dir(git_dir):
43 """From git's setup.c:is_git_directory()."""
44 result = False
45 if git_dir:
46 headref = join(git_dir, 'HEAD')
48 if (core.isdir(git_dir) and
49 (core.isdir(join(git_dir, 'objects')) and
50 core.isdir(join(git_dir, 'refs'))) or
51 (core.isfile(join(git_dir, 'gitdir')) and
52 core.isfile(join(git_dir, 'commondir')))):
54 result = (core.isfile(headref) or
55 (core.islink(headref) and
56 core.readlink(headref).startswith('refs/')))
57 else:
58 result = is_git_file(git_dir)
60 return result
63 def is_git_file(f):
64 return core.isfile(f) and os.path.basename(f) == '.git'
67 def is_git_worktree(d):
68 return is_git_dir(join(d, '.git'))
71 def is_git_repository(path):
72 return is_git_worktree(path) or is_git_dir(path)
75 def read_git_file(path):
76 """Read the path from a .git-file
78 `None` is returned when <path> is not a .git-file.
80 """
81 result = None
82 if path and is_git_file(path):
83 header = 'gitdir: '
84 data = core.read(path).strip()
85 if data.startswith(header):
86 result = data[len(header):]
87 if result and not os.path.isabs(result):
88 path_folder = os.path.dirname(path)
89 repo_relative = join(path_folder, result)
90 result = os.path.normpath(repo_relative)
91 return result
94 class Paths(object):
95 """Git repository paths of interest"""
97 def __init__(self, git_dir=None, git_file=None,
98 worktree=None, common_dir=None):
99 if git_dir and not is_git_dir(git_dir):
100 git_dir = None
101 self.git_dir = git_dir
102 self.git_file = git_file
103 self.worktree = worktree
104 self.common_dir = common_dir
106 def get(self, path):
107 ceiling_dirs = set()
108 ceiling = core.getenv('GIT_CEILING_DIRECTORIES')
109 if ceiling:
110 ceiling_dirs.update([x for x in ceiling.split(':') if x])
112 if path:
113 path = core.abspath(path)
115 if not self.git_dir or not self.worktree:
116 # Search for a .git directory
117 while path:
118 if path in ceiling_dirs:
119 break
120 if is_git_dir(path):
121 if not self.git_dir:
122 self.git_dir = path
123 basename = os.path.basename(path)
124 if not self.worktree and basename == '.git':
125 self.worktree = os.path.dirname(path)
126 # We are either in a bare repository, or someone set GIT_DIR
127 # but did not set GIT_WORK_TREE.
128 if self.git_dir:
129 if not self.worktree:
130 basename = os.path.basename(self.git_dir)
131 if basename == '.git':
132 self.worktree = os.path.dirname(self.git_dir)
133 elif path and not is_git_dir(path):
134 self.worktree = path
135 break
136 gitpath = join(path, '.git')
137 if is_git_dir(gitpath):
138 if not self.git_dir:
139 self.git_dir = gitpath
140 if not self.worktree:
141 self.worktree = path
142 break
143 path, dummy = os.path.split(path)
144 if not dummy:
145 break
147 if self.git_dir:
148 git_dir_path = read_git_file(self.git_dir)
149 if git_dir_path:
150 self.git_file = self.git_dir
151 self.git_dir = git_dir_path
153 commondir_file = join(git_dir_path, 'commondir')
154 if core.exists(commondir_file):
155 common_path = core.read(commondir_file).strip()
156 if common_path:
157 if os.path.isabs(common_path):
158 common_dir = common_path
159 else:
160 common_dir = join(git_dir_path, common_path)
161 common_dir = os.path.normpath(common_dir)
162 self.common_dir = common_dir
163 # usage: Paths().get()
164 return self
167 def find_git_directory(path):
168 """Perform Git repository discovery
171 return Paths(git_dir=core.getenv('GIT_DIR'),
172 worktree=core.getenv('GIT_WORK_TREE')).get(path)
175 class Git(object):
177 The Git class manages communication with the Git binary
179 def __init__(self):
180 self.paths = Paths()
182 self._valid = {} #: Store the result of is_git_dir() for performance
183 self.set_worktree(core.getcwd())
185 # pylint: disable=no-self-use
186 def is_git_repository(self, path):
187 return is_git_repository(path)
189 def getcwd(self):
190 """Return the working directory used by git()"""
191 return self.paths.worktree or self.paths.git_dir
193 def set_worktree(self, path):
194 path = core.decode(path)
195 self.paths = find_git_directory(path)
196 return self.paths.worktree
198 def worktree(self):
199 if not self.paths.worktree:
200 path = core.abspath(core.getcwd())
201 self.paths = find_git_directory(path)
202 return self.paths.worktree
204 def is_valid(self):
205 """Is this a valid git repository?
207 Cache the result to avoid hitting the filesystem.
210 git_dir = self.paths.git_dir
211 try:
212 valid = bool(git_dir) and self._valid[git_dir]
213 except KeyError:
214 valid = self._valid[git_dir] = is_git_dir(git_dir)
216 return valid
218 def git_path(self, *paths):
219 result = None
220 if self.paths.git_dir:
221 result = join(self.paths.git_dir, *paths)
222 if result and self.paths.common_dir and not core.exists(result):
223 common_result = join(self.paths.common_dir, *paths)
224 if core.exists(common_result):
225 result = common_result
226 return result
228 def git_dir(self):
229 if not self.paths.git_dir:
230 path = core.abspath(core.getcwd())
231 self.paths = find_git_directory(path)
232 return self.paths.git_dir
234 def __getattr__(self, name):
235 git_cmd = partial(self.git, name)
236 setattr(self, name, git_cmd)
237 return git_cmd
239 @staticmethod
240 def execute(command,
241 _cwd=None,
242 _decode=True,
243 _encoding=None,
244 _raw=False,
245 _stdin=None,
246 _stderr=subprocess.PIPE,
247 _stdout=subprocess.PIPE,
248 _readonly=False,
249 _no_win32_startupinfo=False):
251 Execute a command and returns its output
253 :param command: argument list to execute.
254 :param _cwd: working directory, defaults to the current directory.
255 :param _decode: whether to decode output, defaults to True.
256 :param _encoding: default encoding, defaults to None (utf-8).
257 :param _raw: do not strip trailing whitespace.
258 :param _stdin: optional stdin filehandle.
259 :returns (status, out, err): exit status, stdout, stderr
262 # Allow the user to have the command executed in their working dir.
263 if not _cwd:
264 _cwd = core.getcwd()
266 extra = {}
268 if hasattr(os, 'setsid'):
269 # SSH uses the SSH_ASKPASS variable only if the process is really
270 # detached from the TTY (stdin redirection and setting the
271 # SSH_ASKPASS environment variable is not enough). To detach a
272 # process from the console it should fork and call os.setsid().
273 extra['preexec_fn'] = os.setsid
275 # Start the process
276 # Guard against thread-unsafe .git/index.lock files
277 if not _readonly:
278 _index_lock.acquire()
279 try:
280 status, out, err = core.run_command(
281 command, cwd=_cwd, encoding=_encoding,
282 stdin=_stdin, stdout=_stdout, stderr=_stderr,
283 no_win32_startupinfo=_no_win32_startupinfo, **extra)
284 finally:
285 # Let the next thread in
286 if not _readonly:
287 _index_lock.release()
289 if not _raw and out is not None:
290 out = core.UStr(out.rstrip('\n'), out.encoding)
292 cola_trace = GIT_COLA_TRACE
293 if cola_trace == 'trace':
294 msg = 'trace: ' + core.list2cmdline(command)
295 Interaction.log_status(status, msg, '')
296 elif cola_trace == 'full':
297 if out or err:
298 core.print_stderr(
299 "%s -> %d: '%s' '%s'"
300 % (' '.join(command), status, out, err))
301 else:
302 core.print_stderr("%s -> %d" % (' '.join(command), status))
303 elif cola_trace:
304 core.print_stderr(' '.join(command))
306 # Allow access to the command's status code
307 return (status, out, err)
309 def git(self, cmd, *args, **kwargs):
310 # Handle optional arguments prior to calling transform_kwargs
311 # otherwise they'll end up in args, which is bad.
312 _kwargs = dict(_cwd=self.getcwd())
313 execute_kwargs = (
314 '_cwd',
315 '_decode',
316 '_encoding',
317 '_stdin',
318 '_stdout',
319 '_stderr',
320 '_raw',
321 '_readonly',
322 '_no_win32_startupinfo',
325 for kwarg in execute_kwargs:
326 if kwarg in kwargs:
327 _kwargs[kwarg] = kwargs.pop(kwarg)
329 # Prepare the argument list
330 git_args = [
331 GIT,
332 '-c', 'diff.suppressBlankEmpty=false',
333 '-c', 'log.showSignature=false',
334 dashify(cmd),
336 opt_args = transform_kwargs(**kwargs)
337 call = git_args + opt_args
338 call.extend(args)
339 try:
340 result = self.execute(call, **_kwargs)
341 except OSError as e:
342 if WIN32 and e.errno == errno.ENOENT:
343 # see if git exists at all. on win32 it can fail with ENOENT in
344 # case of argv overflow. we should be safe from that but use
345 # defensive coding for the worst-case scenario. On UNIX
346 # we have ENAMETOOLONG but that doesn't exist on Windows.
347 if _git_is_installed():
348 raise e
349 _print_win32_git_hint()
350 result = (1, '', "error: unable to execute '%s'" % GIT)
351 return result
354 def _git_is_installed():
355 """Return True if git is installed"""
356 # On win32 Git commands can fail with ENOENT in case of argv overflow. we
357 # should be safe from that but use defensive coding for the worst-case
358 # scenario. On UNIX we have ENAMETOOLONG but that doesn't exist on
359 # Windows.
360 try:
361 status, _, _ = Git.execute([GIT, '--version'])
362 result = status == 0
363 except OSError:
364 result = False
365 return result
368 def transform_kwargs(**kwargs):
369 """Transform kwargs into git command line options
371 Callers can assume the following behavior:
373 Passing foo=None ignores foo, so that callers can
374 use default values of None that are ignored unless
375 set explicitly.
377 Passing foo=False ignore foo, for the same reason.
379 Passing foo={string-or-number} results in ['--foo=<value>']
380 in the resulting arguments.
383 args = []
384 types_to_stringify = (ustr, float, str) + int_types
386 for k, v in kwargs.items():
387 if len(k) == 1:
388 dashes = '-'
389 eq = ''
390 else:
391 dashes = '--'
392 eq = '='
393 # isinstance(False, int) is True, so we have to check bool first
394 if isinstance(v, bool):
395 if v:
396 args.append('%s%s' % (dashes, dashify(k)))
397 # else: pass # False is ignored; flag=False inhibits --flag
398 elif isinstance(v, types_to_stringify):
399 args.append('%s%s%s%s' % (dashes, dashify(k), eq, v))
401 return args
404 def win32_git_error_hint():
405 return (
406 '\n'
407 'NOTE: If you have Git installed in a custom location, e.g.\n'
408 'C:\\Tools\\Git, then you can create a file at\n'
409 '~/.config/git-cola/git-bindir with following text\n'
410 'and git-cola will add the specified location to your $PATH\n'
411 'automatically when starting cola:\n'
412 '\n'
413 r'C:\Tools\Git\bin')
416 @memoize
417 def _print_win32_git_hint():
418 hint = '\n' + win32_git_error_hint() + '\n'
419 core.print_stderr("error: unable to execute 'git'" + hint)
422 def create():
423 """Create Git instances
425 >>> git = create()
426 >>> status, out, err = git.version()
427 >>> 'git' == out[:3].lower()
428 True
431 return Git()