1 """This module provides core functions for handling unicode and UNIX quirks
3 The @interruptable functions retry when system calls are interrupted,
4 e.g. when python raises an IOError or OSError with errno == EINTR.
7 from __future__
import division
, absolute_import
, unicode_literals
16 from .decorators
import interruptable
17 from .compat
import ustr
18 from .compat
import PY2
19 from .compat
import PY3
20 from .compat
import WIN32
22 # /usr/include/stdlib.h
23 # #define EXIT_SUCCESS 0 /* Successful exit status. */
24 # #define EXIT_FAILURE 1 /* Failing exit status. */
28 # /usr/include/sysexits.h
29 # #define EX_USAGE 64 /* command line usage error */
30 # #define EX_NOINPUT 66 /* cannot open input */
31 # #define EX_UNAVAILABLE 69 /* service unavailable */
39 # Some files are not in UTF-8; some other aren't in any codification.
40 # Remember that GIT doesn't care about encodings (saves binary data)
46 # <-- add encodings here
51 """Unicode string wrapper that remembers its encoding
53 UStr wraps unicode strings to provide the `encoding` attribute.
54 UStr is used when decoding strings of an unknown encoding.
55 In order to generate patches that contain the original byte sequences,
56 we must preserve the original encoding when calling decode()
57 so that it can later be used when reconstructing the original
62 def __new__(cls
, string
, encoding
):
64 if isinstance(string
, UStr
):
65 if encoding
!= string
.encoding
:
67 'Encoding conflict: %s vs. %s' % (string
.encoding
, encoding
)
71 obj
= ustr
.__new
__(cls
, string
)
72 obj
.encoding
= encoding
76 def decode_maybe(value
, encoding
, errors
='strict'):
77 """Decode a value when the "decode" method exists"""
78 if hasattr(value
, 'decode'):
79 result
= value
.decode(encoding
, errors
=errors
)
85 def decode(value
, encoding
=None, errors
='strict'):
86 """decode(encoded_string) returns an unencoded unicode string"""
89 elif isinstance(value
, ustr
):
90 result
= UStr(value
, ENCODING
)
91 elif encoding
== 'bytes':
96 encoding_tests
= _encoding_tests
98 encoding_tests
= itertools
.chain([encoding
], _encoding_tests
)
100 for enc
in encoding_tests
:
102 decoded
= value
.decode(enc
, errors
)
103 result
= UStr(decoded
, enc
)
109 decoded
= value
.decode(ENCODING
, errors
='ignore')
110 result
= UStr(decoded
, ENCODING
)
115 def encode(string
, encoding
=None):
116 """encode(unencoded_string) returns a string encoded in utf-8"""
117 if not isinstance(string
, ustr
):
119 return string
.encode(encoding
or ENCODING
, 'replace')
122 def mkpath(path
, encoding
=None):
123 # The Windows API requires unicode strings regardless of python version
125 return decode(path
, encoding
=encoding
)
127 return encode(path
, encoding
=encoding
)
130 def list2cmdline(cmd
):
131 return subprocess
.list2cmdline([decode(c
) for c
in cmd
])
134 def read(filename
, size
=-1, encoding
=None, errors
='strict'):
135 """Read filename and return contents"""
136 with
xopen(filename
, 'rb') as fh
:
137 return xread(fh
, size
=size
, encoding
=encoding
, errors
=errors
)
140 def write(path
, contents
, encoding
=None):
141 """Writes a unicode string to a file"""
142 with
xopen(path
, 'wb') as fh
:
143 return xwrite(fh
, contents
, encoding
=encoding
)
147 def xread(fh
, size
=-1, encoding
=None, errors
='strict'):
148 """Read from a filehandle and retry when interrupted"""
149 return decode(fh
.read(size
), encoding
=encoding
, errors
=errors
)
153 def xwrite(fh
, content
, encoding
=None):
154 """Write to a filehandle and retry when interrupted"""
155 return fh
.write(encode(content
, encoding
=encoding
))
160 """Wait on a subprocess and retry when interrupted"""
165 def readline(fh
, encoding
=None):
166 return decode(fh
.readline(), encoding
=encoding
)
174 universal_newlines
=False,
175 stdin
=subprocess
.PIPE
,
176 stdout
=subprocess
.PIPE
,
177 no_win32_startupinfo
=False,
178 stderr
=subprocess
.PIPE
,
181 """Start the given command, and return a subprocess object.
183 This provides a simpler interface to the subprocess module.
186 env
= extra
.pop('env', None)
187 if add_env
is not None:
188 env
= os
.environ
.copy()
191 # Python3 on windows always goes through list2cmdline() internally inside
192 # of subprocess.py so we must provide unicode strings here otherwise
193 # Python3 breaks when bytes are provided.
195 # Additionally, the preferred usage on Python3 is to pass unicode
196 # strings to subprocess. Python will automatically encode into the
197 # default encoding (utf-8) when it gets unicode strings.
198 shell
= extra
.get('shell', False)
199 cmd
= prep_for_subprocess(cmd
, shell
=shell
)
201 if WIN32
and cwd
== getcwd():
202 # Windows cannot deal with passing a cwd that contains unicode
203 # but we luckily can pass None when the supplied cwd is the same
204 # as our current directory and get the same effect.
205 # Not doing this causes unicode encoding errors when launching
213 # If git-cola is invoked on Windows using "start pythonw git-cola",
214 # a console window will briefly flash on the screen each time
215 # git-cola invokes git, which is very annoying. The code below
216 # prevents this by ensuring that any window will be hidden.
217 startupinfo
= subprocess
.STARTUPINFO()
218 startupinfo
.dwFlags
= subprocess
.STARTF_USESHOWWINDOW
219 startupinfo
.wShowWindow
= subprocess
.SW_HIDE
220 extra
['startupinfo'] = startupinfo
222 if WIN32
and not no_win32_startupinfo
:
223 CREATE_NO_WINDOW
= 0x08000000
224 extra
['creationflags'] = CREATE_NO_WINDOW
226 # Use line buffering when in text/universal_newlines mode,
227 # otherwise use the system default buffer size.
228 bufsize
= 1 if universal_newlines
else -1
229 return subprocess
.Popen(
237 universal_newlines
=universal_newlines
,
242 def prep_for_subprocess(cmd
, shell
=False):
243 """Decode on Python3, encode on Python2"""
244 # See the comment in start_command()
252 cmd
= [decode(c
) for c
in cmd
]
254 cmd
= [encode(c
) for c
in cmd
]
259 def communicate(proc
):
260 return proc
.communicate()
263 def run_command(cmd
, *args
, **kwargs
):
264 """Run the given command to completion, and return its results.
266 This provides a simpler interface to the subprocess module.
267 The results are formatted as a 3-tuple: (exit_code, output, errors)
268 The other arguments are passed on to start_command().
271 encoding
= kwargs
.pop('encoding', None)
272 process
= start_command(cmd
, *args
, **kwargs
)
273 (output
, errors
) = communicate(process
)
274 output
= decode(output
, encoding
=encoding
)
275 errors
= decode(errors
, encoding
=encoding
)
276 exit_code
= process
.returncode
277 return (exit_code
, output
or UStr('', ENCODING
), errors
or UStr('', ENCODING
))
281 def _fork_posix(args
, cwd
=None, shell
=False):
282 """Launch a process in the background."""
283 encoded_args
= [encode(arg
) for arg
in args
]
284 return subprocess
.Popen(encoded_args
, cwd
=cwd
, shell
=shell
).pid
287 def _fork_win32(args
, cwd
=None, shell
=False):
288 """Launch a background process using crazy win32 voodoo."""
289 # This is probably wrong, but it works. Windows.. wow.
290 if args
[0] == 'git-dag':
291 # win32 can't exec python scripts
292 args
= [sys
.executable
] + args
295 args
[0] = _win32_find_exe(args
[0])
298 # see comment in start_command()
299 argv
= [decode(arg
) for arg
in args
]
301 argv
= [encode(arg
) for arg
in args
]
303 DETACHED_PROCESS
= 0x00000008 # Amazing!
304 return subprocess
.Popen(
305 argv
, cwd
=cwd
, creationflags
=DETACHED_PROCESS
, shell
=shell
309 def _win32_find_exe(exe
):
310 """Find the actual file for a Windows executable.
312 This function goes through the same process that the Windows shell uses to
313 locate an executable, taking into account the PATH and PATHEXT environment
314 variables. This allows us to avoid passing shell=True to subprocess.Popen.
317 http://technet.microsoft.com/en-us/library/cc723564.aspx#XSLTsection127121120120
320 # try the argument itself
322 # if argument does not have an extension, also try it with each of the
323 # extensions specified in PATHEXT
325 extensions
= getenv('PATHEXT', '').split(os
.pathsep
)
326 candidates
.extend([(exe
+ ext
) for ext
in extensions
if ext
.startswith('.')])
327 # search the current directory first
328 for candidate
in candidates
:
329 if exists(candidate
):
331 # if the argument does not include a path separator, search each of the
332 # directories on the PATH
333 if not os
.path
.dirname(exe
):
334 for path
in getenv('PATH').split(os
.pathsep
):
336 for candidate
in candidates
:
337 full_path
= os
.path
.join(path
, candidate
)
338 if exists(full_path
):
340 # not found, punt and return the argument unchanged
344 # Portability wrappers
345 if sys
.platform
== 'win32' or sys
.platform
== 'cygwin':
351 def _decorator_noop(x
):
355 def wrap(action
, fn
, decorator
=None):
356 """Wrap arguments with `action`, optionally decorate the result"""
357 if decorator
is None:
358 decorator
= _decorator_noop
361 def wrapped(*args
, **kwargs
):
362 return decorator(fn(action(*args
, **kwargs
)))
367 def decorate(decorator
, fn
):
368 """Decorate the result of `fn` with `action`"""
371 def decorated(*args
, **kwargs
):
372 return decorator(fn(*args
, **kwargs
))
377 def getenv(name
, default
=None):
378 return decode(os
.getenv(name
, default
))
381 def guess_mimetype(filename
):
382 """Robustly guess a filename's mimetype"""
385 mimetype
= mimetypes
.guess_type(filename
)[0]
386 except UnicodeEncodeError:
387 mimetype
= mimetypes
.guess_type(encode(filename
))[0]
388 except (TypeError, ValueError):
389 mimetype
= mimetypes
.guess_type(decode(filename
))[0]
393 def xopen(path
, mode
='r', encoding
=None):
394 return open(mkpath(path
, encoding
=encoding
), mode
)
397 def print_stdout(msg
, linesep
='\n'):
400 msg
= encode(msg
, encoding
=ENCODING
)
401 sys
.stdout
.write(msg
)
404 def print_stderr(msg
, linesep
='\n'):
407 msg
= encode(msg
, encoding
=ENCODING
)
408 sys
.stderr
.write(msg
)
411 def error(msg
, status
=EXIT_FAILURE
, linesep
='\n'):
412 print_stderr(msg
, linesep
=linesep
)
418 return platform
.node()
421 abspath
= wrap(mkpath
, os
.path
.abspath
, decorator
=decode
)
422 chdir
= wrap(mkpath
, os
.chdir
)
423 exists
= wrap(mkpath
, os
.path
.exists
)
424 expanduser
= wrap(encode
, os
.path
.expanduser
, decorator
=decode
)
426 if hasattr(os
, 'getcwdu'):
427 # pylint: disable=no-member
430 getcwd
= decorate(decode
, os
.getcwd
)
435 # NOTE: find_executable() is originally from the stdlib, but starting with
436 # python3.7 the stdlib no longer bundles distutils.
437 def _find_executable(executable
, path
=None):
438 """Tries to find 'executable' in the directories listed in 'path'.
440 A string listing directories separated by 'os.pathsep'; defaults to
441 os.environ['PATH']. Returns the complete filename or None if not found.
444 path
= os
.environ
['PATH']
446 paths
= path
.split(os
.pathsep
)
447 _
, ext
= os
.path
.splitext(executable
)
449 if (sys
.platform
== 'win32') and (ext
!= '.exe'):
450 executable
= executable
+ '.exe'
452 if not os
.path
.isfile(executable
):
454 f
= os
.path
.join(p
, executable
)
455 if os
.path
.isfile(f
):
456 # the file exists, we have a shot at spawn working
464 find_executable
= wrap(mkpath
, _find_executable
, decorator
=decode
)
466 find_executable
= wrap(decode
, _find_executable
, decorator
=decode
)
467 isdir
= wrap(mkpath
, os
.path
.isdir
)
468 isfile
= wrap(mkpath
, os
.path
.isfile
)
469 islink
= wrap(mkpath
, os
.path
.islink
)
470 makedirs
= wrap(mkpath
, os
.makedirs
)
472 readlink
= wrap(mkpath
, os
.readlink
, decorator
=decode
)
473 except AttributeError:
475 def _readlink_noop(p
):
478 readlink
= _readlink_noop
480 realpath
= wrap(mkpath
, os
.path
.realpath
, decorator
=decode
)
481 relpath
= wrap(mkpath
, os
.path
.relpath
, decorator
=decode
)
482 stat
= wrap(mkpath
, os
.stat
)
483 unlink
= wrap(mkpath
, os
.unlink
)
484 walk
= wrap(mkpath
, os
.walk
)