1 """This module provides core functions for handling unicode and UNIX quirks
3 The @interruptable functions retry when system calls are interrupted,
4 e.g. when python raises an IOError or OSError with errno == EINTR.
7 from __future__
import absolute_import
, division
, print_function
, unicode_literals
16 from .decorators
import interruptable
17 from .compat
import ustr
18 from .compat
import PY2
19 from .compat
import PY3
20 from .compat
import WIN32
22 # /usr/include/stdlib.h
23 # #define EXIT_SUCCESS 0 /* Successful exit status. */
24 # #define EXIT_FAILURE 1 /* Failing exit status. */
28 # /usr/include/sysexits.h
29 # #define EX_USAGE 64 /* command line usage error */
30 # #define EX_NOINPUT 66 /* cannot open input */
31 # #define EX_UNAVAILABLE 69 /* service unavailable */
39 # Some files are not in UTF-8; some other aren't in any codification.
40 # Remember that GIT doesn't care about encodings (saves binary data)
46 # <-- add encodings here
51 """Unicode string wrapper that remembers its encoding
53 UStr wraps unicode strings to provide the `encoding` attribute.
54 UStr is used when decoding strings of an unknown encoding.
55 In order to generate patches that contain the original byte sequences,
56 we must preserve the original encoding when calling decode()
57 so that it can later be used when reconstructing the original
62 def __new__(cls
, string
, encoding
):
64 if isinstance(string
, UStr
):
65 if encoding
!= string
.encoding
:
67 'Encoding conflict: %s vs. %s' % (string
.encoding
, encoding
)
71 obj
= ustr
.__new
__(cls
, string
)
72 obj
.encoding
= encoding
76 def decode_maybe(value
, encoding
, errors
='strict'):
77 """Decode a value when the "decode" method exists"""
78 if hasattr(value
, 'decode'):
79 result
= value
.decode(encoding
, errors
=errors
)
85 def decode(value
, encoding
=None, errors
='strict'):
86 """decode(encoded_string) returns an unencoded unicode string"""
89 elif isinstance(value
, ustr
):
90 result
= UStr(value
, ENCODING
)
91 elif encoding
== 'bytes':
96 encoding_tests
= _encoding_tests
98 encoding_tests
= itertools
.chain([encoding
], _encoding_tests
)
100 for enc
in encoding_tests
:
102 decoded
= value
.decode(enc
, errors
)
103 result
= UStr(decoded
, enc
)
109 decoded
= value
.decode(ENCODING
, errors
='ignore')
110 result
= UStr(decoded
, ENCODING
)
115 def encode(string
, encoding
=None):
116 """encode(unencoded_string) returns a string encoded in utf-8"""
117 if not isinstance(string
, ustr
):
119 return string
.encode(encoding
or ENCODING
, 'replace')
122 def mkpath(path
, encoding
=None):
123 # The Windows API requires unicode strings regardless of python version
125 return decode(path
, encoding
=encoding
)
127 return encode(path
, encoding
=encoding
)
130 def decode_seq(seq
, encoding
=None):
131 """Decode a sequence of values"""
132 return [decode(x
, encoding
=encoding
) for x
in seq
]
135 def list2cmdline(cmd
):
136 return subprocess
.list2cmdline([decode(c
) for c
in cmd
])
139 def read(filename
, size
=-1, encoding
=None, errors
='strict'):
140 """Read filename and return contents"""
141 with
xopen(filename
, 'rb') as fh
:
142 return xread(fh
, size
=size
, encoding
=encoding
, errors
=errors
)
145 def write(path
, contents
, encoding
=None):
146 """Writes a unicode string to a file"""
147 with
xopen(path
, 'wb') as fh
:
148 return xwrite(fh
, contents
, encoding
=encoding
)
152 def xread(fh
, size
=-1, encoding
=None, errors
='strict'):
153 """Read from a filehandle and retry when interrupted"""
154 return decode(fh
.read(size
), encoding
=encoding
, errors
=errors
)
158 def xwrite(fh
, content
, encoding
=None):
159 """Write to a filehandle and retry when interrupted"""
160 return fh
.write(encode(content
, encoding
=encoding
))
165 """Wait on a subprocess and retry when interrupted"""
170 def readline(fh
, encoding
=None):
171 return decode(fh
.readline(), encoding
=encoding
)
179 universal_newlines
=False,
180 stdin
=subprocess
.PIPE
,
181 stdout
=subprocess
.PIPE
,
182 no_win32_startupinfo
=False,
183 stderr
=subprocess
.PIPE
,
186 """Start the given command, and return a subprocess object.
188 This provides a simpler interface to the subprocess module.
191 env
= extra
.pop('env', None)
192 if add_env
is not None:
193 env
= os
.environ
.copy()
196 # Python3 on windows always goes through list2cmdline() internally inside
197 # of subprocess.py so we must provide unicode strings here otherwise
198 # Python3 breaks when bytes are provided.
200 # Additionally, the preferred usage on Python3 is to pass unicode
201 # strings to subprocess. Python will automatically encode into the
202 # default encoding (utf-8) when it gets unicode strings.
203 shell
= extra
.get('shell', False)
204 cmd
= prep_for_subprocess(cmd
, shell
=shell
)
206 if WIN32
and cwd
== getcwd():
207 # Windows cannot deal with passing a cwd that contains unicode
208 # but we luckily can pass None when the supplied cwd is the same
209 # as our current directory and get the same effect.
210 # Not doing this causes unicode encoding errors when launching
218 # If git-cola is invoked on Windows using "start pythonw git-cola",
219 # a console window will briefly flash on the screen each time
220 # git-cola invokes git, which is very annoying. The code below
221 # prevents this by ensuring that any window will be hidden.
222 startupinfo
= subprocess
.STARTUPINFO()
223 startupinfo
.dwFlags
= subprocess
.STARTF_USESHOWWINDOW
224 startupinfo
.wShowWindow
= subprocess
.SW_HIDE
225 extra
['startupinfo'] = startupinfo
227 if WIN32
and not no_win32_startupinfo
:
228 CREATE_NO_WINDOW
= 0x08000000
229 extra
['creationflags'] = CREATE_NO_WINDOW
231 # Use line buffering when in text/universal_newlines mode,
232 # otherwise use the system default buffer size.
233 bufsize
= 1 if universal_newlines
else -1
234 return subprocess
.Popen(
242 universal_newlines
=universal_newlines
,
247 def prep_for_subprocess(cmd
, shell
=False):
248 """Decode on Python3, encode on Python2"""
249 # See the comment in start_command()
257 cmd
= [decode(c
) for c
in cmd
]
259 cmd
= [encode(c
) for c
in cmd
]
264 def communicate(proc
):
265 return proc
.communicate()
268 def run_command(cmd
, *args
, **kwargs
):
269 """Run the given command to completion, and return its results.
271 This provides a simpler interface to the subprocess module.
272 The results are formatted as a 3-tuple: (exit_code, output, errors)
273 The other arguments are passed on to start_command().
276 encoding
= kwargs
.pop('encoding', None)
277 process
= start_command(cmd
, *args
, **kwargs
)
278 (output
, errors
) = communicate(process
)
279 output
= decode(output
, encoding
=encoding
)
280 errors
= decode(errors
, encoding
=encoding
)
281 exit_code
= process
.returncode
282 return (exit_code
, output
or UStr('', ENCODING
), errors
or UStr('', ENCODING
))
286 def _fork_posix(args
, cwd
=None, shell
=False):
287 """Launch a process in the background."""
288 encoded_args
= [encode(arg
) for arg
in args
]
289 return subprocess
.Popen(encoded_args
, cwd
=cwd
, shell
=shell
).pid
292 def _fork_win32(args
, cwd
=None, shell
=False):
293 """Launch a background process using crazy win32 voodoo."""
294 # This is probably wrong, but it works. Windows.. wow.
295 if args
[0] == 'git-dag':
296 # win32 can't exec python scripts
297 args
= [sys
.executable
] + args
300 args
[0] = _win32_find_exe(args
[0])
303 # see comment in start_command()
304 argv
= [decode(arg
) for arg
in args
]
306 argv
= [encode(arg
) for arg
in args
]
308 DETACHED_PROCESS
= 0x00000008 # Amazing!
309 return subprocess
.Popen(
310 argv
, cwd
=cwd
, creationflags
=DETACHED_PROCESS
, shell
=shell
314 def _win32_find_exe(exe
):
315 """Find the actual file for a Windows executable.
317 This function goes through the same process that the Windows shell uses to
318 locate an executable, taking into account the PATH and PATHEXT environment
319 variables. This allows us to avoid passing shell=True to subprocess.Popen.
322 http://technet.microsoft.com/en-us/library/cc723564.aspx#XSLTsection127121120120
325 # try the argument itself
327 # if argument does not have an extension, also try it with each of the
328 # extensions specified in PATHEXT
330 extensions
= getenv('PATHEXT', '').split(os
.pathsep
)
331 candidates
.extend([(exe
+ ext
) for ext
in extensions
if ext
.startswith('.')])
332 # search the current directory first
333 for candidate
in candidates
:
334 if exists(candidate
):
336 # if the argument does not include a path separator, search each of the
337 # directories on the PATH
338 if not os
.path
.dirname(exe
):
339 for path
in getenv('PATH').split(os
.pathsep
):
341 for candidate
in candidates
:
342 full_path
= os
.path
.join(path
, candidate
)
343 if exists(full_path
):
345 # not found, punt and return the argument unchanged
349 # Portability wrappers
350 if sys
.platform
in {'win32', 'cygwin'}:
356 def _decorator_noop(x
):
360 def wrap(action
, func
, decorator
=None):
361 """Wrap arguments with `action`, optionally decorate the result"""
362 if decorator
is None:
363 decorator
= _decorator_noop
365 @functools.wraps(func
)
366 def wrapped(*args
, **kwargs
):
367 return decorator(func(action(*args
, **kwargs
)))
372 def decorate(decorator
, func
):
373 """Decorate the result of `func` with `action`"""
375 @functools.wraps(func
)
376 def decorated(*args
, **kwargs
):
377 return decorator(func(*args
, **kwargs
))
382 def getenv(name
, default
=None):
383 return decode(os
.getenv(name
, default
))
386 def guess_mimetype(filename
):
387 """Robustly guess a filename's mimetype"""
390 mimetype
= mimetypes
.guess_type(filename
)[0]
391 except UnicodeEncodeError:
392 mimetype
= mimetypes
.guess_type(encode(filename
))[0]
393 except (TypeError, ValueError):
394 mimetype
= mimetypes
.guess_type(decode(filename
))[0]
398 def xopen(path
, mode
='r', encoding
=None):
399 """Open a file with the specified mode and encoding
401 The path is decoded into unicode on Windows and encoded into bytes on Unix.
403 # pylint: disable=unspecified-encoding
404 return open(mkpath(path
, encoding
=encoding
), mode
)
407 def open_append(path
, encoding
=None):
408 """Open a file for appending in utf-8 text mode"""
409 return open(mkpath(path
, encoding
=encoding
), 'a', encoding
='utf-8')
412 def open_read(path
, encoding
=None):
413 """Open a file for reading in utf-8 text mode"""
414 return open(mkpath(path
, encoding
=encoding
), 'rt', encoding
='utf-8')
417 def open_write(path
, encoding
=None):
418 """Open a file for writing in utf-8 text mode"""
419 return open(mkpath(path
, encoding
=encoding
), 'wt', encoding
='utf-8')
422 def print_stdout(msg
, linesep
='\n'):
425 msg
= encode(msg
, encoding
=ENCODING
)
426 sys
.stdout
.write(msg
)
429 def print_stderr(msg
, linesep
='\n'):
432 msg
= encode(msg
, encoding
=ENCODING
)
433 sys
.stderr
.write(msg
)
436 def error(msg
, status
=EXIT_FAILURE
, linesep
='\n'):
437 print_stderr(msg
, linesep
=linesep
)
443 return platform
.node()
446 abspath
= wrap(mkpath
, os
.path
.abspath
, decorator
=decode
)
447 chdir
= wrap(mkpath
, os
.chdir
)
448 exists
= wrap(mkpath
, os
.path
.exists
)
449 expanduser
= wrap(encode
, os
.path
.expanduser
, decorator
=decode
)
451 if hasattr(os
, 'getcwdu'):
452 # pylint: disable=no-member
455 getcwd
= decorate(decode
, os
.getcwd
)
460 # NOTE: find_executable() is originally from the stdlib, but starting with
461 # python3.7 the stdlib no longer bundles distutils.
462 def _find_executable(executable
, path
=None):
463 """Tries to find 'executable' in the directories listed in 'path'.
465 A string listing directories separated by 'os.pathsep'; defaults to
466 os.environ['PATH']. Returns the complete filename or None if not found.
469 path
= os
.environ
['PATH']
471 paths
= path
.split(os
.pathsep
)
472 _
, ext
= os
.path
.splitext(executable
)
474 if (sys
.platform
== 'win32') and (ext
!= '.exe'):
475 executable
= executable
+ '.exe'
477 if not os
.path
.isfile(executable
):
478 for dirname
in paths
:
479 filename
= os
.path
.join(dirname
, executable
)
480 if os
.path
.isfile(filename
):
481 # the file exists, we have a shot at spawn working
489 """Force writing of everything to disk. No-op on systems without os.sync()"""
490 if hasattr(os
, 'sync'):
494 def rename(old
, new
):
495 """Rename a path. Transform arguments to handle non-ascii file paths"""
496 os
.rename(mkpath(old
), mkpath(new
))
500 find_executable
= wrap(mkpath
, _find_executable
, decorator
=decode
)
502 find_executable
= wrap(decode
, _find_executable
, decorator
=decode
)
503 isdir
= wrap(mkpath
, os
.path
.isdir
)
504 isfile
= wrap(mkpath
, os
.path
.isfile
)
505 islink
= wrap(mkpath
, os
.path
.islink
)
506 listdir
= wrap(mkpath
, os
.listdir
, decorator
=decode_seq
)
507 makedirs
= wrap(mkpath
, os
.makedirs
)
509 readlink
= wrap(mkpath
, os
.readlink
, decorator
=decode
)
510 except AttributeError:
512 def _readlink_noop(p
):
515 readlink
= _readlink_noop
517 realpath
= wrap(mkpath
, os
.path
.realpath
, decorator
=decode
)
518 relpath
= wrap(mkpath
, os
.path
.relpath
, decorator
=decode
)
519 remove
= wrap(mkpath
, os
.remove
)
520 stat
= wrap(mkpath
, os
.stat
)
521 unlink
= wrap(mkpath
, os
.unlink
)
522 walk
= wrap(mkpath
, os
.walk
)