1 """This module provides core functions for handling unicode and UNIX quirks
3 The @interruptable functions retry when system calls are interrupted,
4 e.g. when python raises an IOError or OSError with errno == EINTR.
15 from .decorators
import interruptable
16 from .compat
import ustr
17 from .compat
import PY2
18 from .compat
import PY3
19 from .compat
import WIN32
21 # /usr/include/stdlib.h
22 # #define EXIT_SUCCESS 0 /* Successful exit status. */
23 # #define EXIT_FAILURE 1 /* Failing exit status. */
27 # /usr/include/sysexits.h
28 # #define EX_USAGE 64 /* command line usage error */
29 # #define EX_NOINPUT 66 /* cannot open input */
30 # #define EX_UNAVAILABLE 69 /* service unavailable */
38 # Some files are not in UTF-8; some other aren't in any codification.
39 # Remember that GIT doesn't care about encodings (saves binary data)
45 # <-- add encodings here
50 """Unicode string wrapper that remembers its encoding
52 UStr wraps unicode strings to provide the `encoding` attribute.
53 UStr is used when decoding strings of an unknown encoding.
54 In order to generate patches that contain the original byte sequences,
55 we must preserve the original encoding when calling decode()
56 so that it can later be used when reconstructing the original
61 def __new__(cls
, string
, encoding
):
62 if isinstance(string
, UStr
):
63 if encoding
!= string
.encoding
:
64 raise ValueError(f
'Encoding conflict: {string.encoding} vs. {encoding}')
67 obj
= ustr
.__new
__(cls
, string
)
68 obj
.encoding
= encoding
72 def decode_maybe(value
, encoding
, errors
='strict'):
73 """Decode a value when the "decode" method exists"""
74 if hasattr(value
, 'decode'):
75 result
= value
.decode(encoding
, errors
=errors
)
81 def decode(value
, encoding
=None, errors
='strict'):
82 """decode(encoded_string) returns an unencoded unicode string"""
85 elif isinstance(value
, ustr
):
86 result
= UStr(value
, ENCODING
)
87 elif encoding
== 'bytes':
92 encoding_tests
= _encoding_tests
94 encoding_tests
= itertools
.chain([encoding
], _encoding_tests
)
96 for enc
in encoding_tests
:
98 decoded
= value
.decode(enc
, errors
)
99 result
= UStr(decoded
, enc
)
105 decoded
= value
.decode(ENCODING
, errors
='ignore')
106 result
= UStr(decoded
, ENCODING
)
111 def encode(string
, encoding
=None):
112 """encode(unencoded_string) returns a string encoded in utf-8"""
113 if not isinstance(string
, ustr
):
115 return string
.encode(encoding
or ENCODING
, 'replace')
118 def mkpath(path
, encoding
=None):
119 # The Windows API requires unicode strings regardless of python version
121 return decode(path
, encoding
=encoding
)
123 return encode(path
, encoding
=encoding
)
126 def decode_seq(seq
, encoding
=None):
127 """Decode a sequence of values"""
128 return [decode(x
, encoding
=encoding
) for x
in seq
]
131 def list2cmdline(cmd
):
132 return subprocess
.list2cmdline([decode(c
) for c
in cmd
])
135 def read(filename
, size
=-1, encoding
=None, errors
='strict'):
136 """Read filename and return contents"""
137 with
xopen(filename
, 'rb') as fh
:
138 return xread(fh
, size
=size
, encoding
=encoding
, errors
=errors
)
141 def write(path
, contents
, encoding
=None, append
=False):
142 """Writes a unicode string to a file"""
147 with
xopen(path
, mode
) as fh
:
148 return xwrite(fh
, contents
, encoding
=encoding
)
152 def xread(fh
, size
=-1, encoding
=None, errors
='strict'):
153 """Read from a filehandle and retry when interrupted"""
154 return decode(fh
.read(size
), encoding
=encoding
, errors
=errors
)
158 def xwrite(fh
, content
, encoding
=None):
159 """Write to a filehandle and retry when interrupted"""
160 return fh
.write(encode(content
, encoding
=encoding
))
165 """Wait on a subprocess and retry when interrupted"""
170 def readline(fh
, encoding
=None):
171 return decode(fh
.readline(), encoding
=encoding
)
179 universal_newlines
=False,
180 stdin
=subprocess
.PIPE
,
181 stdout
=subprocess
.PIPE
,
182 no_win32_startupinfo
=False,
183 stderr
=subprocess
.PIPE
,
186 """Start the given command, and return a subprocess object.
188 This provides a simpler interface to the subprocess module.
191 env
= extra
.pop('env', None)
192 if add_env
is not None:
193 env
= os
.environ
.copy()
196 # Python3 on windows always goes through list2cmdline() internally inside
197 # of subprocess.py so we must provide unicode strings here otherwise
198 # Python3 breaks when bytes are provided.
200 # Additionally, the preferred usage on Python3 is to pass unicode
201 # strings to subprocess. Python will automatically encode into the
202 # default encoding (utf-8) when it gets unicode strings.
203 shell
= extra
.get('shell', False)
204 cmd
= prep_for_subprocess(cmd
, shell
=shell
)
206 if WIN32
and cwd
== getcwd():
207 # Windows cannot deal with passing a cwd that contains unicode
208 # but we luckily can pass None when the supplied cwd is the same
209 # as our current directory and get the same effect.
210 # Not doing this causes unicode encoding errors when launching
218 # If git-cola is invoked on Windows using "start pythonw git-cola",
219 # a console window will briefly flash on the screen each time
220 # git-cola invokes git, which is very annoying. The code below
221 # prevents this by ensuring that any window will be hidden.
222 startupinfo
= subprocess
.STARTUPINFO()
223 startupinfo
.dwFlags |
= subprocess
.STARTF_USESHOWWINDOW
224 extra
['startupinfo'] = startupinfo
226 if WIN32
and not no_win32_startupinfo
:
227 CREATE_NO_WINDOW
= 0x08000000
228 extra
['creationflags'] = CREATE_NO_WINDOW
230 # Use line buffering when in text/universal_newlines mode,
231 # otherwise use the system default buffer size.
232 bufsize
= 1 if universal_newlines
else -1
233 return subprocess
.Popen(
241 universal_newlines
=universal_newlines
,
246 def prep_for_subprocess(cmd
, shell
=False):
247 """Decode on Python3, encode on Python2"""
248 # See the comment in start_command()
256 cmd
= [decode(c
) for c
in cmd
]
258 cmd
= [encode(c
) for c
in cmd
]
263 def communicate(proc
):
264 return proc
.communicate()
267 def run_command(cmd
, *args
, **kwargs
):
268 """Run the given command to completion, and return its results.
270 This provides a simpler interface to the subprocess module.
271 The results are formatted as a 3-tuple: (exit_code, output, errors)
272 The other arguments are passed on to start_command().
275 encoding
= kwargs
.pop('encoding', None)
276 process
= start_command(cmd
, *args
, **kwargs
)
277 (output
, errors
) = communicate(process
)
278 output
= decode(output
, encoding
=encoding
)
279 errors
= decode(errors
, encoding
=encoding
)
280 exit_code
= process
.returncode
281 return (exit_code
, output
or UStr('', ENCODING
), errors
or UStr('', ENCODING
))
285 def _fork_posix(args
, cwd
=None, shell
=False):
286 """Launch a process in the background."""
287 encoded_args
= [encode(arg
) for arg
in args
]
288 return subprocess
.Popen(encoded_args
, cwd
=cwd
, shell
=shell
).pid
291 def _fork_win32(args
, cwd
=None, shell
=False):
292 """Launch a background process using crazy win32 voodoo."""
293 # This is probably wrong, but it works. Windows.. wow.
294 if args
[0] == 'git-dag':
295 # win32 can't exec python scripts
296 args
= [sys
.executable
] + args
299 args
[0] = _win32_find_exe(args
[0])
302 # see comment in start_command()
303 argv
= [decode(arg
) for arg
in args
]
305 argv
= [encode(arg
) for arg
in args
]
307 DETACHED_PROCESS
= 0x00000008 # Amazing!
308 return subprocess
.Popen(
309 argv
, cwd
=cwd
, creationflags
=DETACHED_PROCESS
, shell
=shell
313 def _win32_find_exe(exe
):
314 """Find the actual file for a Windows executable.
316 This function goes through the same process that the Windows shell uses to
317 locate an executable, taking into account the PATH and PATHEXT environment
318 variables. This allows us to avoid passing shell=True to subprocess.Popen.
321 http://technet.microsoft.com/en-us/library/cc723564.aspx#XSLTsection127121120120
324 # try the argument itself
326 # if argument does not have an extension, also try it with each of the
327 # extensions specified in PATHEXT
329 extensions
= getenv('PATHEXT', '').split(os
.pathsep
)
330 candidates
.extend([(exe
+ ext
) for ext
in extensions
if ext
.startswith('.')])
331 # search the current directory first
332 for candidate
in candidates
:
333 if exists(candidate
):
335 # if the argument does not include a path separator, search each of the
336 # directories on the PATH
337 if not os
.path
.dirname(exe
):
338 for path
in getenv('PATH').split(os
.pathsep
):
340 for candidate
in candidates
:
341 full_path
= os
.path
.join(path
, candidate
)
342 if exists(full_path
):
344 # not found, punt and return the argument unchanged
348 # Portability wrappers
349 if sys
.platform
in {'win32', 'cygwin'}:
355 def _decorator_noop(x
):
359 def wrap(action
, func
, decorator
=None):
360 """Wrap arguments with `action`, optionally decorate the result"""
361 if decorator
is None:
362 decorator
= _decorator_noop
364 @functools.wraps(func
)
365 def wrapped(*args
, **kwargs
):
366 return decorator(func(action(*args
, **kwargs
)))
371 def decorate(decorator
, func
):
372 """Decorate the result of `func` with `action`"""
374 @functools.wraps(func
)
375 def decorated(*args
, **kwargs
):
376 return decorator(func(*args
, **kwargs
))
381 def getenv(name
, default
=None):
382 return decode(os
.getenv(name
, default
))
385 def guess_mimetype(filename
):
386 """Robustly guess a filename's mimetype"""
389 mimetype
= mimetypes
.guess_type(filename
)[0]
390 except UnicodeEncodeError:
391 mimetype
= mimetypes
.guess_type(encode(filename
))[0]
392 except (TypeError, ValueError):
393 mimetype
= mimetypes
.guess_type(decode(filename
))[0]
397 def xopen(path
, mode
='r', encoding
=None):
398 """Open a file with the specified mode and encoding
400 The path is decoded into unicode on Windows and encoded into bytes on Unix.
402 # pylint: disable=unspecified-encoding
403 return open(mkpath(path
, encoding
=encoding
), mode
)
406 def open_append(path
, encoding
=None):
407 """Open a file for appending in utf-8 text mode"""
408 return open(mkpath(path
, encoding
=encoding
), 'a', encoding
='utf-8')
411 def open_read(path
, encoding
=None):
412 """Open a file for reading in utf-8 text mode"""
413 return open(mkpath(path
, encoding
=encoding
), encoding
='utf-8')
416 def open_write(path
, encoding
=None):
417 """Open a file for writing in utf-8 text mode"""
418 return open(mkpath(path
, encoding
=encoding
), 'w', encoding
='utf-8')
421 def print_stdout(msg
, linesep
='\n'):
424 msg
= encode(msg
, encoding
=ENCODING
)
425 sys
.stdout
.write(msg
)
428 def print_stderr(msg
, linesep
='\n'):
431 msg
= encode(msg
, encoding
=ENCODING
)
432 sys
.stderr
.write(msg
)
435 def error(msg
, status
=EXIT_FAILURE
, linesep
='\n'):
436 print_stderr(msg
, linesep
=linesep
)
442 return platform
.node()
445 abspath
= wrap(mkpath
, os
.path
.abspath
, decorator
=decode
)
446 chdir
= wrap(mkpath
, os
.chdir
)
447 exists
= wrap(mkpath
, os
.path
.exists
)
448 expanduser
= wrap(encode
, os
.path
.expanduser
, decorator
=decode
)
450 if hasattr(os
, 'getcwdu'):
451 # pylint: disable=no-member
454 getcwd
= decorate(decode
, os
.getcwd
)
459 # NOTE: find_executable() is originally from the stdlib, but starting with
460 # python3.7 the stdlib no longer bundles distutils.
461 def _find_executable(executable
, path
=None):
462 """Tries to find 'executable' in the directories listed in 'path'.
464 A string listing directories separated by 'os.pathsep'; defaults to
465 os.environ['PATH']. Returns the complete filename or None if not found.
468 path
= os
.environ
['PATH']
470 paths
= path
.split(os
.pathsep
)
471 _
, ext
= os
.path
.splitext(executable
)
473 if (sys
.platform
== 'win32') and (ext
!= '.exe'):
474 executable
= executable
+ '.exe'
476 if not os
.path
.isfile(executable
):
477 for dirname
in paths
:
478 filename
= os
.path
.join(dirname
, executable
)
479 if os
.path
.isfile(filename
):
480 # the file exists, we have a shot at spawn working
488 """Force writing of everything to disk. No-op on systems without os.sync()"""
489 if hasattr(os
, 'sync'):
493 def rename(old
, new
):
494 """Rename a path. Transform arguments to handle non-ascii file paths"""
495 os
.rename(mkpath(old
), mkpath(new
))
499 find_executable
= wrap(mkpath
, _find_executable
, decorator
=decode
)
501 find_executable
= wrap(decode
, _find_executable
, decorator
=decode
)
502 isdir
= wrap(mkpath
, os
.path
.isdir
)
503 isfile
= wrap(mkpath
, os
.path
.isfile
)
504 islink
= wrap(mkpath
, os
.path
.islink
)
505 listdir
= wrap(mkpath
, os
.listdir
, decorator
=decode_seq
)
506 makedirs
= wrap(mkpath
, os
.makedirs
)
508 readlink
= wrap(mkpath
, os
.readlink
, decorator
=decode
)
509 except AttributeError:
511 def _readlink_noop(p
):
514 readlink
= _readlink_noop
516 realpath
= wrap(mkpath
, os
.path
.realpath
, decorator
=decode
)
517 relpath
= wrap(mkpath
, os
.path
.relpath
, decorator
=decode
)
518 remove
= wrap(mkpath
, os
.remove
)
519 stat
= wrap(mkpath
, os
.stat
)
520 unlink
= wrap(mkpath
, os
.unlink
)
522 walk
= wrap(mkpath
, os
.walk
)