1 """This module provides core functions for handling unicode and UNIX quirks
3 The @interruptable functions retry when system calls are interrupted,
4 e.g. when python raises an IOError or OSError with errno == EINTR.
7 from __future__
import absolute_import
, division
, print_function
, unicode_literals
16 from .decorators
import interruptable
17 from .compat
import ustr
18 from .compat
import PY2
19 from .compat
import PY3
20 from .compat
import WIN32
22 # /usr/include/stdlib.h
23 # #define EXIT_SUCCESS 0 /* Successful exit status. */
24 # #define EXIT_FAILURE 1 /* Failing exit status. */
28 # /usr/include/sysexits.h
29 # #define EX_USAGE 64 /* command line usage error */
30 # #define EX_NOINPUT 66 /* cannot open input */
31 # #define EX_UNAVAILABLE 69 /* service unavailable */
39 # Some files are not in UTF-8; some other aren't in any codification.
40 # Remember that GIT doesn't care about encodings (saves binary data)
46 # <-- add encodings here
51 """Unicode string wrapper that remembers its encoding
53 UStr wraps unicode strings to provide the `encoding` attribute.
54 UStr is used when decoding strings of an unknown encoding.
55 In order to generate patches that contain the original byte sequences,
56 we must preserve the original encoding when calling decode()
57 so that it can later be used when reconstructing the original
62 def __new__(cls
, string
, encoding
):
64 if isinstance(string
, UStr
):
65 if encoding
!= string
.encoding
:
67 'Encoding conflict: %s vs. %s' % (string
.encoding
, encoding
)
71 obj
= ustr
.__new
__(cls
, string
)
72 obj
.encoding
= encoding
76 def decode_maybe(value
, encoding
, errors
='strict'):
77 """Decode a value when the "decode" method exists"""
78 if hasattr(value
, 'decode'):
79 result
= value
.decode(encoding
, errors
=errors
)
85 def decode(value
, encoding
=None, errors
='strict'):
86 """decode(encoded_string) returns an unencoded unicode string"""
89 elif isinstance(value
, ustr
):
90 result
= UStr(value
, ENCODING
)
91 elif encoding
== 'bytes':
96 encoding_tests
= _encoding_tests
98 encoding_tests
= itertools
.chain([encoding
], _encoding_tests
)
100 for enc
in encoding_tests
:
102 decoded
= value
.decode(enc
, errors
)
103 result
= UStr(decoded
, enc
)
109 decoded
= value
.decode(ENCODING
, errors
='ignore')
110 result
= UStr(decoded
, ENCODING
)
115 def encode(string
, encoding
=None):
116 """encode(unencoded_string) returns a string encoded in utf-8"""
117 if not isinstance(string
, ustr
):
119 return string
.encode(encoding
or ENCODING
, 'replace')
122 def mkpath(path
, encoding
=None):
123 # The Windows API requires unicode strings regardless of python version
125 return decode(path
, encoding
=encoding
)
127 return encode(path
, encoding
=encoding
)
130 def decode_seq(seq
, encoding
=None):
131 """Decode a sequence of values"""
132 return [decode(x
, encoding
=encoding
) for x
in seq
]
135 def list2cmdline(cmd
):
136 return subprocess
.list2cmdline([decode(c
) for c
in cmd
])
139 def read(filename
, size
=-1, encoding
=None, errors
='strict'):
140 """Read filename and return contents"""
141 with
xopen(filename
, 'rb') as fh
:
142 return xread(fh
, size
=size
, encoding
=encoding
, errors
=errors
)
145 def write(path
, contents
, encoding
=None):
146 """Writes a unicode string to a file"""
147 with
xopen(path
, 'wb') as fh
:
148 return xwrite(fh
, contents
, encoding
=encoding
)
152 def xread(fh
, size
=-1, encoding
=None, errors
='strict'):
153 """Read from a filehandle and retry when interrupted"""
154 return decode(fh
.read(size
), encoding
=encoding
, errors
=errors
)
158 def xwrite(fh
, content
, encoding
=None):
159 """Write to a filehandle and retry when interrupted"""
160 return fh
.write(encode(content
, encoding
=encoding
))
165 """Wait on a subprocess and retry when interrupted"""
170 def readline(fh
, encoding
=None):
171 return decode(fh
.readline(), encoding
=encoding
)
179 universal_newlines
=False,
180 stdin
=subprocess
.PIPE
,
181 stdout
=subprocess
.PIPE
,
182 no_win32_startupinfo
=False,
183 stderr
=subprocess
.PIPE
,
186 """Start the given command, and return a subprocess object.
188 This provides a simpler interface to the subprocess module.
191 env
= extra
.pop('env', None)
192 if add_env
is not None:
193 env
= os
.environ
.copy()
196 # Python3 on windows always goes through list2cmdline() internally inside
197 # of subprocess.py so we must provide unicode strings here otherwise
198 # Python3 breaks when bytes are provided.
200 # Additionally, the preferred usage on Python3 is to pass unicode
201 # strings to subprocess. Python will automatically encode into the
202 # default encoding (utf-8) when it gets unicode strings.
203 shell
= extra
.get('shell', False)
204 cmd
= prep_for_subprocess(cmd
, shell
=shell
)
206 if WIN32
and cwd
== getcwd():
207 # Windows cannot deal with passing a cwd that contains unicode
208 # but we luckily can pass None when the supplied cwd is the same
209 # as our current directory and get the same effect.
210 # Not doing this causes unicode encoding errors when launching
218 # If git-cola is invoked on Windows using "start pythonw git-cola",
219 # a console window will briefly flash on the screen each time
220 # git-cola invokes git, which is very annoying. The code below
221 # prevents this by ensuring that any window will be hidden.
222 startupinfo
= subprocess
.STARTUPINFO()
223 startupinfo
.dwFlags
= subprocess
.STARTF_USESHOWWINDOW
224 startupinfo
.wShowWindow
= subprocess
.SW_HIDE
225 extra
['startupinfo'] = startupinfo
227 if WIN32
and not no_win32_startupinfo
:
228 CREATE_NO_WINDOW
= 0x08000000
229 extra
['creationflags'] = CREATE_NO_WINDOW
231 # Use line buffering when in text/universal_newlines mode,
232 # otherwise use the system default buffer size.
233 bufsize
= 1 if universal_newlines
else -1
234 return subprocess
.Popen(
242 universal_newlines
=universal_newlines
,
247 def prep_for_subprocess(cmd
, shell
=False):
248 """Decode on Python3, encode on Python2"""
249 # See the comment in start_command()
257 cmd
= [decode(c
) for c
in cmd
]
259 cmd
= [encode(c
) for c
in cmd
]
264 def communicate(proc
):
265 return proc
.communicate()
268 def run_command(cmd
, *args
, **kwargs
):
269 """Run the given command to completion, and return its results.
271 This provides a simpler interface to the subprocess module.
272 The results are formatted as a 3-tuple: (exit_code, output, errors)
273 The other arguments are passed on to start_command().
276 encoding
= kwargs
.pop('encoding', None)
277 process
= start_command(cmd
, *args
, **kwargs
)
278 (output
, errors
) = communicate(process
)
279 output
= decode(output
, encoding
=encoding
)
280 errors
= decode(errors
, encoding
=encoding
)
281 exit_code
= process
.returncode
282 return (exit_code
, output
or UStr('', ENCODING
), errors
or UStr('', ENCODING
))
286 def _fork_posix(args
, cwd
=None, shell
=False):
287 """Launch a process in the background."""
288 encoded_args
= [encode(arg
) for arg
in args
]
289 return subprocess
.Popen(encoded_args
, cwd
=cwd
, shell
=shell
).pid
292 def _fork_win32(args
, cwd
=None, shell
=False):
293 """Launch a background process using crazy win32 voodoo."""
294 # This is probably wrong, but it works. Windows.. wow.
295 if args
[0] == 'git-dag':
296 # win32 can't exec python scripts
297 args
= [sys
.executable
] + args
300 args
[0] = _win32_find_exe(args
[0])
303 # see comment in start_command()
304 argv
= [decode(arg
) for arg
in args
]
306 argv
= [encode(arg
) for arg
in args
]
308 DETACHED_PROCESS
= 0x00000008 # Amazing!
309 return subprocess
.Popen(
310 argv
, cwd
=cwd
, creationflags
=DETACHED_PROCESS
, shell
=shell
314 def _win32_find_exe(exe
):
315 """Find the actual file for a Windows executable.
317 This function goes through the same process that the Windows shell uses to
318 locate an executable, taking into account the PATH and PATHEXT environment
319 variables. This allows us to avoid passing shell=True to subprocess.Popen.
322 http://technet.microsoft.com/en-us/library/cc723564.aspx#XSLTsection127121120120
325 # try the argument itself
327 # if argument does not have an extension, also try it with each of the
328 # extensions specified in PATHEXT
330 extensions
= getenv('PATHEXT', '').split(os
.pathsep
)
331 candidates
.extend([(exe
+ ext
) for ext
in extensions
if ext
.startswith('.')])
332 # search the current directory first
333 for candidate
in candidates
:
334 if exists(candidate
):
336 # if the argument does not include a path separator, search each of the
337 # directories on the PATH
338 if not os
.path
.dirname(exe
):
339 for path
in getenv('PATH').split(os
.pathsep
):
341 for candidate
in candidates
:
342 full_path
= os
.path
.join(path
, candidate
)
343 if exists(full_path
):
345 # not found, punt and return the argument unchanged
349 # Portability wrappers
350 if sys
.platform
in {'win32', 'cygwin'}:
356 def _decorator_noop(x
):
360 def wrap(action
, fn
, decorator
=None):
361 """Wrap arguments with `action`, optionally decorate the result"""
362 if decorator
is None:
363 decorator
= _decorator_noop
366 def wrapped(*args
, **kwargs
):
367 return decorator(fn(action(*args
, **kwargs
)))
372 def decorate(decorator
, fn
):
373 """Decorate the result of `fn` with `action`"""
376 def decorated(*args
, **kwargs
):
377 return decorator(fn(*args
, **kwargs
))
382 def getenv(name
, default
=None):
383 return decode(os
.getenv(name
, default
))
386 def guess_mimetype(filename
):
387 """Robustly guess a filename's mimetype"""
390 mimetype
= mimetypes
.guess_type(filename
)[0]
391 except UnicodeEncodeError:
392 mimetype
= mimetypes
.guess_type(encode(filename
))[0]
393 except (TypeError, ValueError):
394 mimetype
= mimetypes
.guess_type(decode(filename
))[0]
398 def xopen(path
, mode
='r', encoding
=None):
399 return open(mkpath(path
, encoding
=encoding
), mode
)
402 def print_stdout(msg
, linesep
='\n'):
405 msg
= encode(msg
, encoding
=ENCODING
)
406 sys
.stdout
.write(msg
)
409 def print_stderr(msg
, linesep
='\n'):
412 msg
= encode(msg
, encoding
=ENCODING
)
413 sys
.stderr
.write(msg
)
416 def error(msg
, status
=EXIT_FAILURE
, linesep
='\n'):
417 print_stderr(msg
, linesep
=linesep
)
423 return platform
.node()
426 abspath
= wrap(mkpath
, os
.path
.abspath
, decorator
=decode
)
427 chdir
= wrap(mkpath
, os
.chdir
)
428 exists
= wrap(mkpath
, os
.path
.exists
)
429 expanduser
= wrap(encode
, os
.path
.expanduser
, decorator
=decode
)
431 if hasattr(os
, 'getcwdu'):
432 # pylint: disable=no-member
435 getcwd
= decorate(decode
, os
.getcwd
)
440 # NOTE: find_executable() is originally from the stdlib, but starting with
441 # python3.7 the stdlib no longer bundles distutils.
442 def _find_executable(executable
, path
=None):
443 """Tries to find 'executable' in the directories listed in 'path'.
445 A string listing directories separated by 'os.pathsep'; defaults to
446 os.environ['PATH']. Returns the complete filename or None if not found.
449 path
= os
.environ
['PATH']
451 paths
= path
.split(os
.pathsep
)
452 _
, ext
= os
.path
.splitext(executable
)
454 if (sys
.platform
== 'win32') and (ext
!= '.exe'):
455 executable
= executable
+ '.exe'
457 if not os
.path
.isfile(executable
):
459 f
= os
.path
.join(p
, executable
)
460 if os
.path
.isfile(f
):
461 # the file exists, we have a shot at spawn working
469 """Force writing of everything to disk. No-op on systems without os.sync()"""
470 if hasattr(os
, 'sync'):
474 def rename(old
, new
):
475 """Rename a path. Transform arguments to handle non-ascii file paths"""
476 os
.rename(mkpath(old
), mkpath(new
))
480 find_executable
= wrap(mkpath
, _find_executable
, decorator
=decode
)
482 find_executable
= wrap(decode
, _find_executable
, decorator
=decode
)
483 isdir
= wrap(mkpath
, os
.path
.isdir
)
484 isfile
= wrap(mkpath
, os
.path
.isfile
)
485 islink
= wrap(mkpath
, os
.path
.islink
)
486 listdir
= wrap(mkpath
, os
.listdir
, decorator
=decode_seq
)
487 makedirs
= wrap(mkpath
, os
.makedirs
)
489 readlink
= wrap(mkpath
, os
.readlink
, decorator
=decode
)
490 except AttributeError:
492 def _readlink_noop(p
):
495 readlink
= _readlink_noop
497 realpath
= wrap(mkpath
, os
.path
.realpath
, decorator
=decode
)
498 relpath
= wrap(mkpath
, os
.path
.relpath
, decorator
=decode
)
499 remove
= wrap(mkpath
, os
.remove
)
500 stat
= wrap(mkpath
, os
.stat
)
501 unlink
= wrap(mkpath
, os
.unlink
)
502 walk
= wrap(mkpath
, os
.walk
)