1 """This module provides core functions for handling unicode and UNIX quirks
3 The @interruptable functions retry when system calls are interrupted,
4 e.g. when python raises an IOError or OSError with errno == EINTR.
7 from __future__
import absolute_import
, division
, print_function
, unicode_literals
16 from .decorators
import interruptable
17 from .compat
import ustr
18 from .compat
import PY2
19 from .compat
import PY3
20 from .compat
import WIN32
22 # /usr/include/stdlib.h
23 # #define EXIT_SUCCESS 0 /* Successful exit status. */
24 # #define EXIT_FAILURE 1 /* Failing exit status. */
28 # /usr/include/sysexits.h
29 # #define EX_USAGE 64 /* command line usage error */
30 # #define EX_NOINPUT 66 /* cannot open input */
31 # #define EX_UNAVAILABLE 69 /* service unavailable */
39 # Some files are not in UTF-8; some other aren't in any codification.
40 # Remember that GIT doesn't care about encodings (saves binary data)
46 # <-- add encodings here
51 """Unicode string wrapper that remembers its encoding
53 UStr wraps unicode strings to provide the `encoding` attribute.
54 UStr is used when decoding strings of an unknown encoding.
55 In order to generate patches that contain the original byte sequences,
56 we must preserve the original encoding when calling decode()
57 so that it can later be used when reconstructing the original
62 def __new__(cls
, string
, encoding
):
64 if isinstance(string
, UStr
):
65 if encoding
!= string
.encoding
:
67 'Encoding conflict: %s vs. %s' % (string
.encoding
, encoding
)
71 obj
= ustr
.__new
__(cls
, string
)
72 obj
.encoding
= encoding
76 def decode_maybe(value
, encoding
, errors
='strict'):
77 """Decode a value when the "decode" method exists"""
78 if hasattr(value
, 'decode'):
79 result
= value
.decode(encoding
, errors
=errors
)
85 def decode(value
, encoding
=None, errors
='strict'):
86 """decode(encoded_string) returns an unencoded unicode string"""
89 elif isinstance(value
, ustr
):
90 result
= UStr(value
, ENCODING
)
91 elif encoding
== 'bytes':
96 encoding_tests
= _encoding_tests
98 encoding_tests
= itertools
.chain([encoding
], _encoding_tests
)
100 for enc
in encoding_tests
:
102 decoded
= value
.decode(enc
, errors
)
103 result
= UStr(decoded
, enc
)
109 decoded
= value
.decode(ENCODING
, errors
='ignore')
110 result
= UStr(decoded
, ENCODING
)
115 def encode(string
, encoding
=None):
116 """encode(unencoded_string) returns a string encoded in utf-8"""
117 if not isinstance(string
, ustr
):
119 return string
.encode(encoding
or ENCODING
, 'replace')
122 def mkpath(path
, encoding
=None):
123 # The Windows API requires unicode strings regardless of python version
125 return decode(path
, encoding
=encoding
)
127 return encode(path
, encoding
=encoding
)
130 def decode_seq(seq
, encoding
=None):
131 """Decode a sequence of values"""
132 return [decode(x
, encoding
=encoding
) for x
in seq
]
135 def list2cmdline(cmd
):
136 return subprocess
.list2cmdline([decode(c
) for c
in cmd
])
139 def read(filename
, size
=-1, encoding
=None, errors
='strict'):
140 """Read filename and return contents"""
141 with
xopen(filename
, 'rb') as fh
:
142 return xread(fh
, size
=size
, encoding
=encoding
, errors
=errors
)
145 def write(path
, contents
, encoding
=None, append
=False):
146 """Writes a unicode string to a file"""
151 with
xopen(path
, mode
) as fh
:
152 return xwrite(fh
, contents
, encoding
=encoding
)
156 def xread(fh
, size
=-1, encoding
=None, errors
='strict'):
157 """Read from a filehandle and retry when interrupted"""
158 return decode(fh
.read(size
), encoding
=encoding
, errors
=errors
)
162 def xwrite(fh
, content
, encoding
=None):
163 """Write to a filehandle and retry when interrupted"""
164 return fh
.write(encode(content
, encoding
=encoding
))
169 """Wait on a subprocess and retry when interrupted"""
174 def readline(fh
, encoding
=None):
175 return decode(fh
.readline(), encoding
=encoding
)
183 universal_newlines
=False,
184 stdin
=subprocess
.PIPE
,
185 stdout
=subprocess
.PIPE
,
186 no_win32_startupinfo
=False,
187 stderr
=subprocess
.PIPE
,
190 """Start the given command, and return a subprocess object.
192 This provides a simpler interface to the subprocess module.
195 env
= extra
.pop('env', None)
196 if add_env
is not None:
197 env
= os
.environ
.copy()
200 # Python3 on windows always goes through list2cmdline() internally inside
201 # of subprocess.py so we must provide unicode strings here otherwise
202 # Python3 breaks when bytes are provided.
204 # Additionally, the preferred usage on Python3 is to pass unicode
205 # strings to subprocess. Python will automatically encode into the
206 # default encoding (utf-8) when it gets unicode strings.
207 shell
= extra
.get('shell', False)
208 cmd
= prep_for_subprocess(cmd
, shell
=shell
)
210 if WIN32
and cwd
== getcwd():
211 # Windows cannot deal with passing a cwd that contains unicode
212 # but we luckily can pass None when the supplied cwd is the same
213 # as our current directory and get the same effect.
214 # Not doing this causes unicode encoding errors when launching
222 # If git-cola is invoked on Windows using "start pythonw git-cola",
223 # a console window will briefly flash on the screen each time
224 # git-cola invokes git, which is very annoying. The code below
225 # prevents this by ensuring that any window will be hidden.
226 startupinfo
= subprocess
.STARTUPINFO()
227 startupinfo
.dwFlags
= subprocess
.STARTF_USESHOWWINDOW
228 startupinfo
.wShowWindow
= subprocess
.SW_HIDE
229 extra
['startupinfo'] = startupinfo
231 if WIN32
and not no_win32_startupinfo
:
232 CREATE_NO_WINDOW
= 0x08000000
233 extra
['creationflags'] = CREATE_NO_WINDOW
235 # Use line buffering when in text/universal_newlines mode,
236 # otherwise use the system default buffer size.
237 bufsize
= 1 if universal_newlines
else -1
238 return subprocess
.Popen(
246 universal_newlines
=universal_newlines
,
251 def prep_for_subprocess(cmd
, shell
=False):
252 """Decode on Python3, encode on Python2"""
253 # See the comment in start_command()
261 cmd
= [decode(c
) for c
in cmd
]
263 cmd
= [encode(c
) for c
in cmd
]
268 def communicate(proc
):
269 return proc
.communicate()
272 def run_command(cmd
, *args
, **kwargs
):
273 """Run the given command to completion, and return its results.
275 This provides a simpler interface to the subprocess module.
276 The results are formatted as a 3-tuple: (exit_code, output, errors)
277 The other arguments are passed on to start_command().
280 encoding
= kwargs
.pop('encoding', None)
281 process
= start_command(cmd
, *args
, **kwargs
)
282 (output
, errors
) = communicate(process
)
283 output
= decode(output
, encoding
=encoding
)
284 errors
= decode(errors
, encoding
=encoding
)
285 exit_code
= process
.returncode
286 return (exit_code
, output
or UStr('', ENCODING
), errors
or UStr('', ENCODING
))
290 def _fork_posix(args
, cwd
=None, shell
=False):
291 """Launch a process in the background."""
292 encoded_args
= [encode(arg
) for arg
in args
]
293 return subprocess
.Popen(encoded_args
, cwd
=cwd
, shell
=shell
).pid
296 def _fork_win32(args
, cwd
=None, shell
=False):
297 """Launch a background process using crazy win32 voodoo."""
298 # This is probably wrong, but it works. Windows.. wow.
299 if args
[0] == 'git-dag':
300 # win32 can't exec python scripts
301 args
= [sys
.executable
] + args
304 args
[0] = _win32_find_exe(args
[0])
307 # see comment in start_command()
308 argv
= [decode(arg
) for arg
in args
]
310 argv
= [encode(arg
) for arg
in args
]
312 DETACHED_PROCESS
= 0x00000008 # Amazing!
313 return subprocess
.Popen(
314 argv
, cwd
=cwd
, creationflags
=DETACHED_PROCESS
, shell
=shell
318 def _win32_find_exe(exe
):
319 """Find the actual file for a Windows executable.
321 This function goes through the same process that the Windows shell uses to
322 locate an executable, taking into account the PATH and PATHEXT environment
323 variables. This allows us to avoid passing shell=True to subprocess.Popen.
326 http://technet.microsoft.com/en-us/library/cc723564.aspx#XSLTsection127121120120
329 # try the argument itself
331 # if argument does not have an extension, also try it with each of the
332 # extensions specified in PATHEXT
334 extensions
= getenv('PATHEXT', '').split(os
.pathsep
)
335 candidates
.extend([(exe
+ ext
) for ext
in extensions
if ext
.startswith('.')])
336 # search the current directory first
337 for candidate
in candidates
:
338 if exists(candidate
):
340 # if the argument does not include a path separator, search each of the
341 # directories on the PATH
342 if not os
.path
.dirname(exe
):
343 for path
in getenv('PATH').split(os
.pathsep
):
345 for candidate
in candidates
:
346 full_path
= os
.path
.join(path
, candidate
)
347 if exists(full_path
):
349 # not found, punt and return the argument unchanged
353 # Portability wrappers
354 if sys
.platform
in {'win32', 'cygwin'}:
360 def _decorator_noop(x
):
364 def wrap(action
, func
, decorator
=None):
365 """Wrap arguments with `action`, optionally decorate the result"""
366 if decorator
is None:
367 decorator
= _decorator_noop
369 @functools.wraps(func
)
370 def wrapped(*args
, **kwargs
):
371 return decorator(func(action(*args
, **kwargs
)))
376 def decorate(decorator
, func
):
377 """Decorate the result of `func` with `action`"""
379 @functools.wraps(func
)
380 def decorated(*args
, **kwargs
):
381 return decorator(func(*args
, **kwargs
))
386 def getenv(name
, default
=None):
387 return decode(os
.getenv(name
, default
))
390 def guess_mimetype(filename
):
391 """Robustly guess a filename's mimetype"""
394 mimetype
= mimetypes
.guess_type(filename
)[0]
395 except UnicodeEncodeError:
396 mimetype
= mimetypes
.guess_type(encode(filename
))[0]
397 except (TypeError, ValueError):
398 mimetype
= mimetypes
.guess_type(decode(filename
))[0]
402 def xopen(path
, mode
='r', encoding
=None):
403 """Open a file with the specified mode and encoding
405 The path is decoded into unicode on Windows and encoded into bytes on Unix.
407 # pylint: disable=unspecified-encoding
408 return open(mkpath(path
, encoding
=encoding
), mode
)
411 def open_append(path
, encoding
=None):
412 """Open a file for appending in utf-8 text mode"""
413 return open(mkpath(path
, encoding
=encoding
), 'a', encoding
='utf-8')
416 def open_read(path
, encoding
=None):
417 """Open a file for reading in utf-8 text mode"""
418 return open(mkpath(path
, encoding
=encoding
), 'rt', encoding
='utf-8')
421 def open_write(path
, encoding
=None):
422 """Open a file for writing in utf-8 text mode"""
423 return open(mkpath(path
, encoding
=encoding
), 'wt', encoding
='utf-8')
426 def print_stdout(msg
, linesep
='\n'):
429 msg
= encode(msg
, encoding
=ENCODING
)
430 sys
.stdout
.write(msg
)
433 def print_stderr(msg
, linesep
='\n'):
436 msg
= encode(msg
, encoding
=ENCODING
)
437 sys
.stderr
.write(msg
)
440 def error(msg
, status
=EXIT_FAILURE
, linesep
='\n'):
441 print_stderr(msg
, linesep
=linesep
)
447 return platform
.node()
450 abspath
= wrap(mkpath
, os
.path
.abspath
, decorator
=decode
)
451 chdir
= wrap(mkpath
, os
.chdir
)
452 exists
= wrap(mkpath
, os
.path
.exists
)
453 expanduser
= wrap(encode
, os
.path
.expanduser
, decorator
=decode
)
455 if hasattr(os
, 'getcwdu'):
456 # pylint: disable=no-member
459 getcwd
= decorate(decode
, os
.getcwd
)
464 # NOTE: find_executable() is originally from the stdlib, but starting with
465 # python3.7 the stdlib no longer bundles distutils.
466 def _find_executable(executable
, path
=None):
467 """Tries to find 'executable' in the directories listed in 'path'.
469 A string listing directories separated by 'os.pathsep'; defaults to
470 os.environ['PATH']. Returns the complete filename or None if not found.
473 path
= os
.environ
['PATH']
475 paths
= path
.split(os
.pathsep
)
476 _
, ext
= os
.path
.splitext(executable
)
478 if (sys
.platform
== 'win32') and (ext
!= '.exe'):
479 executable
= executable
+ '.exe'
481 if not os
.path
.isfile(executable
):
482 for dirname
in paths
:
483 filename
= os
.path
.join(dirname
, executable
)
484 if os
.path
.isfile(filename
):
485 # the file exists, we have a shot at spawn working
493 """Force writing of everything to disk. No-op on systems without os.sync()"""
494 if hasattr(os
, 'sync'):
498 def rename(old
, new
):
499 """Rename a path. Transform arguments to handle non-ascii file paths"""
500 os
.rename(mkpath(old
), mkpath(new
))
504 find_executable
= wrap(mkpath
, _find_executable
, decorator
=decode
)
506 find_executable
= wrap(decode
, _find_executable
, decorator
=decode
)
507 isdir
= wrap(mkpath
, os
.path
.isdir
)
508 isfile
= wrap(mkpath
, os
.path
.isfile
)
509 islink
= wrap(mkpath
, os
.path
.islink
)
510 listdir
= wrap(mkpath
, os
.listdir
, decorator
=decode_seq
)
511 makedirs
= wrap(mkpath
, os
.makedirs
)
513 readlink
= wrap(mkpath
, os
.readlink
, decorator
=decode
)
514 except AttributeError:
516 def _readlink_noop(p
):
519 readlink
= _readlink_noop
521 realpath
= wrap(mkpath
, os
.path
.realpath
, decorator
=decode
)
522 relpath
= wrap(mkpath
, os
.path
.relpath
, decorator
=decode
)
523 remove
= wrap(mkpath
, os
.remove
)
524 stat
= wrap(mkpath
, os
.stat
)
525 unlink
= wrap(mkpath
, os
.unlink
)
527 walk
= wrap(mkpath
, os
.walk
)