1 """This module provides core functions for handling unicode and UNIX quirks
3 The @interruptable functions retry when system calls are interrupted,
4 e.g. when python raises an IOError or OSError with errno == EINTR.
7 from __future__
import division
, absolute_import
, unicode_literals
16 from .decorators
import interruptable
17 from .compat
import ustr
18 from .compat
import PY2
19 from .compat
import PY3
20 from .compat
import WIN32
22 # /usr/include/stdlib.h
23 # #define EXIT_SUCCESS 0 /* Successful exit status. */
24 # #define EXIT_FAILURE 1 /* Failing exit status. */
28 # /usr/include/sysexits.h
29 # #define EX_USAGE 64 /* command line usage error */
30 # #define EX_NOINPUT 66 /* cannot open input */
31 # #define EX_UNAVAILABLE 69 /* service unavailable */
39 # Some files are not in UTF-8; some other aren't in any codification.
40 # Remember that GIT doesn't care about encodings (saves binary data)
46 # <-- add encodings here
51 """Unicode string wrapper that remembers its encoding
53 UStr wraps unicode strings to provide the `encoding` attribute.
54 UStr is used when decoding strings of an unknown encoding.
55 In order to generate patches that contain the original byte sequences,
56 we must preserve the original encoding when calling decode()
57 so that it can later be used when reconstructing the original
62 def __new__(cls
, string
, encoding
):
64 if isinstance(string
, UStr
):
65 if encoding
!= string
.encoding
:
67 'Encoding conflict: %s vs. %s' % (string
.encoding
, encoding
)
71 obj
= ustr
.__new
__(cls
, string
)
72 obj
.encoding
= encoding
76 def decode_maybe(value
, encoding
, errors
='strict'):
77 """Decode a value when the "decode" method exists"""
78 if hasattr(value
, 'decode'):
79 result
= value
.decode(encoding
, errors
=errors
)
85 def decode(value
, encoding
=None, errors
='strict'):
86 """decode(encoded_string) returns an unencoded unicode string
90 elif isinstance(value
, ustr
):
91 result
= UStr(value
, ENCODING
)
92 elif encoding
== 'bytes':
97 encoding_tests
= _encoding_tests
99 encoding_tests
= itertools
.chain([encoding
], _encoding_tests
)
101 for enc
in encoding_tests
:
103 decoded
= value
.decode(enc
, errors
)
104 result
= UStr(decoded
, enc
)
110 decoded
= value
.decode(ENCODING
, errors
='ignore')
111 result
= UStr(decoded
, ENCODING
)
116 def encode(string
, encoding
=None):
117 """encode(unencoded_string) returns a string encoded in utf-8
119 if not isinstance(string
, ustr
):
121 return string
.encode(encoding
or ENCODING
, 'replace')
124 def mkpath(path
, encoding
=None):
125 # The Windows API requires unicode strings regardless of python version
127 return decode(path
, encoding
=encoding
)
129 return encode(path
, encoding
=encoding
)
132 def list2cmdline(cmd
):
133 return subprocess
.list2cmdline([decode(c
) for c
in cmd
])
136 def read(filename
, size
=-1, encoding
=None, errors
='strict'):
137 """Read filename and return contents"""
138 with
xopen(filename
, 'rb') as fh
:
139 return xread(fh
, size
=size
, encoding
=encoding
, errors
=errors
)
142 def write(path
, contents
, encoding
=None):
143 """Writes a unicode string to a file"""
144 with
xopen(path
, 'wb') as fh
:
145 return xwrite(fh
, contents
, encoding
=encoding
)
149 def xread(fh
, size
=-1, encoding
=None, errors
='strict'):
150 """Read from a filehandle and retry when interrupted"""
151 return decode(fh
.read(size
), encoding
=encoding
, errors
=errors
)
155 def xwrite(fh
, content
, encoding
=None):
156 """Write to a filehandle and retry when interrupted"""
157 return fh
.write(encode(content
, encoding
=encoding
))
162 """Wait on a subprocess and retry when interrupted"""
167 def readline(fh
, encoding
=None):
168 return decode(fh
.readline(), encoding
=encoding
)
176 universal_newlines
=False,
177 stdin
=subprocess
.PIPE
,
178 stdout
=subprocess
.PIPE
,
179 no_win32_startupinfo
=False,
180 stderr
=subprocess
.PIPE
,
183 """Start the given command, and return a subprocess object.
185 This provides a simpler interface to the subprocess module.
188 env
= extra
.pop('env', None)
189 if add_env
is not None:
190 env
= os
.environ
.copy()
193 # Python3 on windows always goes through list2cmdline() internally inside
194 # of subprocess.py so we must provide unicode strings here otherwise
195 # Python3 breaks when bytes are provided.
197 # Additionally, the preferred usage on Python3 is to pass unicode
198 # strings to subprocess. Python will automatically encode into the
199 # default encoding (utf-8) when it gets unicode strings.
200 shell
= extra
.get('shell', False)
201 cmd
= prep_for_subprocess(cmd
, shell
=shell
)
203 if WIN32
and cwd
== getcwd():
204 # Windows cannot deal with passing a cwd that contains unicode
205 # but we luckily can pass None when the supplied cwd is the same
206 # as our current directory and get the same effect.
207 # Not doing this causes unicode encoding errors when launching
215 # If git-cola is invoked on Windows using "start pythonw git-cola",
216 # a console window will briefly flash on the screen each time
217 # git-cola invokes git, which is very annoying. The code below
218 # prevents this by ensuring that any window will be hidden.
219 startupinfo
= subprocess
.STARTUPINFO()
220 startupinfo
.dwFlags
= subprocess
.STARTF_USESHOWWINDOW
221 startupinfo
.wShowWindow
= subprocess
.SW_HIDE
222 extra
['startupinfo'] = startupinfo
224 if WIN32
and not no_win32_startupinfo
:
225 CREATE_NO_WINDOW
= 0x08000000
226 extra
['creationflags'] = CREATE_NO_WINDOW
228 # Use line buffering when in text/universal_newlines mode,
229 # otherwise use the system default buffer size.
230 bufsize
= 1 if universal_newlines
else -1
231 return subprocess
.Popen(
239 universal_newlines
=universal_newlines
,
244 def prep_for_subprocess(cmd
, shell
=False):
245 """Decode on Python3, encode on Python2"""
246 # See the comment in start_command()
254 cmd
= [decode(c
) for c
in cmd
]
256 cmd
= [encode(c
) for c
in cmd
]
261 def communicate(proc
):
262 return proc
.communicate()
265 def run_command(cmd
, *args
, **kwargs
):
266 """Run the given command to completion, and return its results.
268 This provides a simpler interface to the subprocess module.
269 The results are formatted as a 3-tuple: (exit_code, output, errors)
270 The other arguments are passed on to start_command().
273 encoding
= kwargs
.pop('encoding', None)
274 process
= start_command(cmd
, *args
, **kwargs
)
275 (output
, errors
) = communicate(process
)
276 output
= decode(output
, encoding
=encoding
)
277 errors
= decode(errors
, encoding
=encoding
)
278 exit_code
= process
.returncode
279 return (exit_code
, output
or UStr('', ENCODING
), errors
or UStr('', ENCODING
))
283 def _fork_posix(args
, cwd
=None, shell
=False):
284 """Launch a process in the background."""
285 encoded_args
= [encode(arg
) for arg
in args
]
286 return subprocess
.Popen(encoded_args
, cwd
=cwd
, shell
=shell
).pid
289 def _fork_win32(args
, cwd
=None, shell
=False):
290 """Launch a background process using crazy win32 voodoo."""
291 # This is probably wrong, but it works. Windows.. wow.
292 if args
[0] == 'git-dag':
293 # win32 can't exec python scripts
294 args
= [sys
.executable
] + args
297 args
[0] = _win32_find_exe(args
[0])
300 # see comment in start_command()
301 argv
= [decode(arg
) for arg
in args
]
303 argv
= [encode(arg
) for arg
in args
]
305 DETACHED_PROCESS
= 0x00000008 # Amazing!
306 return subprocess
.Popen(
307 argv
, cwd
=cwd
, creationflags
=DETACHED_PROCESS
, shell
=shell
311 def _win32_find_exe(exe
):
312 """Find the actual file for a Windows executable.
314 This function goes through the same process that the Windows shell uses to
315 locate an executable, taking into account the PATH and PATHEXT environment
316 variables. This allows us to avoid passing shell=True to subprocess.Popen.
319 http://technet.microsoft.com/en-us/library/cc723564.aspx#XSLTsection127121120120
322 # try the argument itself
324 # if argument does not have an extension, also try it with each of the
325 # extensions specified in PATHEXT
327 extensions
= getenv('PATHEXT', '').split(os
.pathsep
)
328 candidates
.extend([(exe
+ ext
) for ext
in extensions
if ext
.startswith('.')])
329 # search the current directory first
330 for candidate
in candidates
:
331 if exists(candidate
):
333 # if the argument does not include a path separator, search each of the
334 # directories on the PATH
335 if not os
.path
.dirname(exe
):
336 for path
in getenv('PATH').split(os
.pathsep
):
338 for candidate
in candidates
:
339 full_path
= os
.path
.join(path
, candidate
)
340 if exists(full_path
):
342 # not found, punt and return the argument unchanged
346 # Portability wrappers
347 if sys
.platform
== 'win32' or sys
.platform
== 'cygwin':
353 def _decorator_noop(x
):
357 def wrap(action
, fn
, decorator
=None):
358 """Wrap arguments with `action`, optionally decorate the result"""
359 if decorator
is None:
360 decorator
= _decorator_noop
363 def wrapped(*args
, **kwargs
):
364 return decorator(fn(action(*args
, **kwargs
)))
369 def decorate(decorator
, fn
):
370 """Decorate the result of `fn` with `action`"""
373 def decorated(*args
, **kwargs
):
374 return decorator(fn(*args
, **kwargs
))
379 def getenv(name
, default
=None):
380 return decode(os
.getenv(name
, default
))
383 def guess_mimetype(filename
):
384 """Robustly guess a filename's mimetype"""
387 mimetype
= mimetypes
.guess_type(filename
)[0]
388 except UnicodeEncodeError:
389 mimetype
= mimetypes
.guess_type(encode(filename
))[0]
390 except (TypeError, ValueError):
391 mimetype
= mimetypes
.guess_type(decode(filename
))[0]
395 def xopen(path
, mode
='r', encoding
=None):
396 return open(mkpath(path
, encoding
=encoding
), mode
)
399 def print_stdout(msg
, linesep
='\n'):
402 msg
= encode(msg
, encoding
=ENCODING
)
403 sys
.stdout
.write(msg
)
406 def print_stderr(msg
, linesep
='\n'):
409 msg
= encode(msg
, encoding
=ENCODING
)
410 sys
.stderr
.write(msg
)
413 def error(msg
, status
=EXIT_FAILURE
, linesep
='\n'):
414 print_stderr(msg
, linesep
=linesep
)
420 return platform
.node()
423 abspath
= wrap(mkpath
, os
.path
.abspath
, decorator
=decode
)
424 chdir
= wrap(mkpath
, os
.chdir
)
425 exists
= wrap(mkpath
, os
.path
.exists
)
426 expanduser
= wrap(encode
, os
.path
.expanduser
, decorator
=decode
)
428 if hasattr(os
, 'getcwdu'):
429 # pylint: disable=no-member
432 getcwd
= decorate(decode
, os
.getcwd
)
437 # NOTE: find_executable() is originally from the stdlib, but starting with
438 # python3.7 the stdlib no longer bundles distutils.
439 def _find_executable(executable
, path
=None):
440 """Tries to find 'executable' in the directories listed in 'path'.
442 A string listing directories separated by 'os.pathsep'; defaults to
443 os.environ['PATH']. Returns the complete filename or None if not found.
446 path
= os
.environ
['PATH']
448 paths
= path
.split(os
.pathsep
)
449 _
, ext
= os
.path
.splitext(executable
)
451 if (sys
.platform
== 'win32') and (ext
!= '.exe'):
452 executable
= executable
+ '.exe'
454 if not os
.path
.isfile(executable
):
456 f
= os
.path
.join(p
, executable
)
457 if os
.path
.isfile(f
):
458 # the file exists, we have a shot at spawn working
466 find_executable
= wrap(mkpath
, _find_executable
, decorator
=decode
)
468 find_executable
= wrap(decode
, _find_executable
, decorator
=decode
)
469 isdir
= wrap(mkpath
, os
.path
.isdir
)
470 isfile
= wrap(mkpath
, os
.path
.isfile
)
471 islink
= wrap(mkpath
, os
.path
.islink
)
472 makedirs
= wrap(mkpath
, os
.makedirs
)
474 readlink
= wrap(mkpath
, os
.readlink
, decorator
=decode
)
475 except AttributeError:
477 def _readlink_noop(p
):
480 readlink
= _readlink_noop
482 realpath
= wrap(mkpath
, os
.path
.realpath
, decorator
=decode
)
483 relpath
= wrap(mkpath
, os
.path
.relpath
, decorator
=decode
)
484 stat
= wrap(mkpath
, os
.stat
)
485 unlink
= wrap(mkpath
, os
.unlink
)
486 walk
= wrap(mkpath
, os
.walk
)