1 """This module provides core functions for handling Unicode and Unix quirks
3 The @interruptable functions retry when system calls are interrupted,
4 e.g. when python raises an IOError or OSError with errno == EINTR.
14 from .decorators
import interruptable
15 from .compat
import ustr
16 from .compat
import PY2
17 from .compat
import PY3
18 from .compat
import WIN32
20 # /usr/include/stdlib.h
21 # #define EXIT_SUCCESS 0 /* Successful exit status. */
22 # #define EXIT_FAILURE 1 /* Failing exit status. */
26 # /usr/include/sysexits.h
27 # #define EX_USAGE 64 /* command line usage error */
28 # #define EX_NOINPUT 66 /* cannot open input */
29 # #define EX_UNAVAILABLE 69 /* service unavailable */
37 # Some files are not in UTF-8; some other aren't in any codification.
38 # Remember that GIT doesn't care about encodings (saves binary data)
44 # <-- add encodings here
49 """Unicode string wrapper that remembers its encoding
51 UStr wraps Unicode strings to provide the `encoding` attribute.
52 UStr is used when decoding strings of an unknown encoding.
53 In order to generate patches that contain the original byte sequences,
54 we must preserve the original encoding when calling decode()
55 so that it can later be used when reconstructing the original
60 def __new__(cls
, string
, encoding
):
61 if isinstance(string
, UStr
):
62 if encoding
!= string
.encoding
:
63 raise ValueError(f
'Encoding conflict: {string.encoding} vs. {encoding}')
66 obj
= ustr
.__new
__(cls
, string
)
67 obj
.encoding
= encoding
71 def decode_maybe(value
, encoding
, errors
='strict'):
72 """Decode a value when the "decode" method exists"""
73 if hasattr(value
, 'decode'):
74 result
= value
.decode(encoding
, errors
=errors
)
80 def decode(value
, encoding
=None, errors
='strict'):
81 """decode(encoded_string) returns an un-encoded Unicode string"""
84 elif isinstance(value
, ustr
):
85 result
= UStr(value
, ENCODING
)
86 elif encoding
== 'bytes':
91 encoding_tests
= _encoding_tests
93 encoding_tests
= itertools
.chain([encoding
], _encoding_tests
)
95 for enc
in encoding_tests
:
97 decoded
= value
.decode(enc
, errors
)
98 result
= UStr(decoded
, enc
)
104 decoded
= value
.decode(ENCODING
, errors
='ignore')
105 result
= UStr(decoded
, ENCODING
)
110 def encode(string
, encoding
=None):
111 """encode(string) returns a byte string encoded to UTF-8"""
112 if not isinstance(string
, ustr
):
114 return string
.encode(encoding
or ENCODING
, 'replace')
117 def mkpath(path
, encoding
=None):
118 # The Windows API requires Unicode strings regardless of python version
120 return decode(path
, encoding
=encoding
)
122 return encode(path
, encoding
=encoding
)
125 def decode_seq(seq
, encoding
=None):
126 """Decode a sequence of values"""
127 return [decode(x
, encoding
=encoding
) for x
in seq
]
130 def list2cmdline(cmd
):
131 return subprocess
.list2cmdline([decode(c
) for c
in cmd
])
134 def read(filename
, size
=-1, encoding
=None, errors
='strict'):
135 """Read filename and return contents"""
136 with
xopen(filename
, 'rb') as fh
:
137 return xread(fh
, size
=size
, encoding
=encoding
, errors
=errors
)
140 def write(path
, contents
, encoding
=None, append
=False):
141 """Writes a Unicode string to a file"""
146 with
xopen(path
, mode
) as fh
:
147 return xwrite(fh
, contents
, encoding
=encoding
)
151 def xread(fh
, size
=-1, encoding
=None, errors
='strict'):
152 """Read from a file handle and retry when interrupted"""
153 return decode(fh
.read(size
), encoding
=encoding
, errors
=errors
)
157 def xwrite(fh
, content
, encoding
=None):
158 """Write to a file handle and retry when interrupted"""
159 return fh
.write(encode(content
, encoding
=encoding
))
164 """Wait on a subprocess and retry when interrupted"""
169 def readline(fh
, encoding
=None):
170 return decode(fh
.readline(), encoding
=encoding
)
178 universal_newlines
=False,
179 stdin
=subprocess
.PIPE
,
180 stdout
=subprocess
.PIPE
,
181 no_win32_startupinfo
=False,
182 stderr
=subprocess
.PIPE
,
185 """Start the given command, and return a subprocess object.
187 This provides a simpler interface to the subprocess module.
190 env
= extra
.pop('env', None)
191 if add_env
is not None:
192 env
= os
.environ
.copy()
195 # Python3 on windows always goes through list2cmdline() internally inside
196 # of subprocess.py so we must provide Unicode strings here otherwise
197 # Python3 breaks when bytes are provided.
199 # Additionally, the preferred usage on Python3 is to pass Unicode
200 # strings to subprocess. Python will automatically encode into the
201 # default encoding (UTF-8) when it gets Unicode strings.
202 shell
= extra
.get('shell', False)
203 cmd
= prep_for_subprocess(cmd
, shell
=shell
)
205 if WIN32
and cwd
== getcwd():
206 # Windows cannot deal with passing a cwd that contains Unicode
207 # but we luckily can pass None when the supplied cwd is the same
208 # as our current directory and get the same effect.
209 # Not doing this causes Unicode encoding errors when launching
217 # If git-cola is invoked on Windows using "start pythonw git-cola",
218 # a console window will briefly flash on the screen each time
219 # git-cola invokes git, which is very annoying. The code below
220 # prevents this by ensuring that any window will be hidden.
221 startupinfo
= subprocess
.STARTUPINFO()
222 startupinfo
.dwFlags |
= subprocess
.STARTF_USESHOWWINDOW
223 extra
['startupinfo'] = startupinfo
225 if WIN32
and not no_win32_startupinfo
:
226 CREATE_NO_WINDOW
= 0x08000000
227 extra
['creationflags'] = CREATE_NO_WINDOW
229 # Use line buffering when in text/universal_newlines mode,
230 # otherwise use the system default buffer size.
231 bufsize
= 1 if universal_newlines
else -1
232 return subprocess
.Popen(
240 universal_newlines
=universal_newlines
,
245 def prep_for_subprocess(cmd
, shell
=False):
246 """Decode on Python3, encode on Python2"""
247 # See the comment in start_command()
255 cmd
= [decode(c
) for c
in cmd
]
257 cmd
= [encode(c
) for c
in cmd
]
262 def communicate(proc
):
263 return proc
.communicate()
266 def run_command(cmd
, *args
, **kwargs
):
267 """Run the given command to completion, and return its results.
269 This provides a simpler interface to the subprocess module.
270 The results are formatted as a 3-tuple: (exit_code, output, errors)
271 The other arguments are passed on to start_command().
274 encoding
= kwargs
.pop('encoding', None)
275 process
= start_command(cmd
, *args
, **kwargs
)
276 (output
, errors
) = communicate(process
)
277 output
= decode(output
, encoding
=encoding
)
278 errors
= decode(errors
, encoding
=encoding
)
279 exit_code
= process
.returncode
280 return (exit_code
, output
or UStr('', ENCODING
), errors
or UStr('', ENCODING
))
284 def _fork_posix(args
, cwd
=None, shell
=False):
285 """Launch a process in the background."""
286 encoded_args
= [encode(arg
) for arg
in args
]
287 return subprocess
.Popen(encoded_args
, cwd
=cwd
, shell
=shell
).pid
290 def _fork_win32(args
, cwd
=None, shell
=False):
291 """Launch a background process using crazy win32 voodoo."""
292 # This is probably wrong, but it works. Windows.. Wow.
293 if args
[0] == 'git-dag':
294 # win32 can't exec python scripts
295 args
= [sys
.executable
] + args
298 args
[0] = _win32_find_exe(args
[0])
301 # see comment in start_command()
302 argv
= [decode(arg
) for arg
in args
]
304 argv
= [encode(arg
) for arg
in args
]
306 DETACHED_PROCESS
= 0x00000008 # Amazing!
307 return subprocess
.Popen(
308 argv
, cwd
=cwd
, creationflags
=DETACHED_PROCESS
, shell
=shell
312 def _win32_find_exe(exe
):
313 """Find the actual file for a Windows executable.
315 This function goes through the same process that the Windows shell uses to
316 locate an executable, taking into account the PATH and PATHEXT environment
317 variables. This allows us to avoid passing shell=True to subprocess.Popen.
320 https://technet.microsoft.com/en-us/library/cc723564.aspx#XSLTsection127121120120
323 # try the argument itself
325 # if argument does not have an extension, also try it with each of the
326 # extensions specified in PATHEXT
328 extensions
= getenv('PATHEXT', '').split(os
.pathsep
)
329 candidates
.extend([(exe
+ ext
) for ext
in extensions
if ext
.startswith('.')])
330 # search the current directory first
331 for candidate
in candidates
:
332 if exists(candidate
):
334 # if the argument does not include a path separator, search each of the
335 # directories on the PATH
336 if not os
.path
.dirname(exe
):
337 for path
in getenv('PATH').split(os
.pathsep
):
339 for candidate
in candidates
:
340 full_path
= os
.path
.join(path
, candidate
)
341 if exists(full_path
):
343 # not found, punt and return the argument unchanged
347 # Portability wrappers
348 if sys
.platform
in {'win32', 'cygwin'}:
354 def _decorator_noop(x
):
358 def wrap(action
, func
, decorator
=None):
359 """Wrap arguments with `action`, optionally decorate the result"""
360 if decorator
is None:
361 decorator
= _decorator_noop
363 @functools.wraps(func
)
364 def wrapped(*args
, **kwargs
):
365 return decorator(func(action(*args
, **kwargs
)))
370 def decorate(decorator
, func
):
371 """Decorate the result of `func` with `action`"""
373 @functools.wraps(func
)
374 def decorated(*args
, **kwargs
):
375 return decorator(func(*args
, **kwargs
))
380 def getenv(name
, default
=None):
381 return decode(os
.getenv(name
, default
))
384 def guess_mimetype(filename
):
385 """Robustly guess a filename's mimetype"""
388 mimetype
= mimetypes
.guess_type(filename
)[0]
389 except UnicodeEncodeError:
390 mimetype
= mimetypes
.guess_type(encode(filename
))[0]
391 except (TypeError, ValueError):
392 mimetype
= mimetypes
.guess_type(decode(filename
))[0]
396 def xopen(path
, mode
='r', encoding
=None):
397 """Open a file with the specified mode and encoding
399 The path is decoded into Unicode on Windows and encoded into bytes on Unix.
401 # pylint: disable=unspecified-encoding
402 return open(mkpath(path
, encoding
=encoding
), mode
)
405 def open_append(path
, encoding
=None):
406 """Open a file for appending in UTF-8 text mode"""
407 return open(mkpath(path
, encoding
=encoding
), 'a', encoding
='utf-8')
410 def open_read(path
, encoding
=None):
411 """Open a file for reading in UTF-8 text mode"""
412 return open(mkpath(path
, encoding
=encoding
), encoding
='utf-8')
415 def open_write(path
, encoding
=None):
416 """Open a file for writing in UTF-8 text mode"""
417 return open(mkpath(path
, encoding
=encoding
), 'w', encoding
='utf-8')
420 def print_stdout(msg
, linesep
='\n'):
423 msg
= encode(msg
, encoding
=ENCODING
)
424 sys
.stdout
.write(msg
)
427 def print_stderr(msg
, linesep
='\n'):
430 msg
= encode(msg
, encoding
=ENCODING
)
431 sys
.stderr
.write(msg
)
434 def error(msg
, status
=EXIT_FAILURE
, linesep
='\n'):
435 print_stderr(msg
, linesep
=linesep
)
441 return platform
.node()
444 abspath
= wrap(mkpath
, os
.path
.abspath
, decorator
=decode
)
445 chdir
= wrap(mkpath
, os
.chdir
)
446 exists
= wrap(mkpath
, os
.path
.exists
)
447 expanduser
= wrap(encode
, os
.path
.expanduser
, decorator
=decode
)
449 if hasattr(os
, 'getcwdu'):
450 # pylint: disable=no-member
453 getcwd
= decorate(decode
, os
.getcwd
)
458 # NOTE: find_executable() is originally from the stdlib, but starting with
459 # python3.7 the stdlib no longer bundles distutils.
460 def _find_executable(executable
, path
=None):
461 """Tries to find 'executable' in the directories listed in 'path'.
463 A string listing directories separated by 'os.pathsep'; defaults to
464 os.environ['PATH']. Returns the complete filename or None if not found.
467 path
= os
.environ
['PATH']
469 paths
= path
.split(os
.pathsep
)
470 _
, ext
= os
.path
.splitext(executable
)
472 if (sys
.platform
== 'win32') and (ext
!= '.exe'):
473 executable
= executable
+ '.exe'
475 if not os
.path
.isfile(executable
):
476 for dirname
in paths
:
477 filename
= os
.path
.join(dirname
, executable
)
478 if os
.path
.isfile(filename
):
479 # the file exists, we have a shot at spawn working
487 """Force writing of everything to disk. No-op on systems without os.sync()"""
488 if hasattr(os
, 'sync'):
492 def rename(old
, new
):
493 """Rename a path. Transform arguments to handle non-ASCII file paths"""
494 os
.rename(mkpath(old
), mkpath(new
))
498 find_executable
= wrap(mkpath
, _find_executable
, decorator
=decode
)
500 find_executable
= wrap(decode
, _find_executable
, decorator
=decode
)
501 isdir
= wrap(mkpath
, os
.path
.isdir
)
502 isfile
= wrap(mkpath
, os
.path
.isfile
)
503 islink
= wrap(mkpath
, os
.path
.islink
)
504 listdir
= wrap(mkpath
, os
.listdir
, decorator
=decode_seq
)
505 makedirs
= wrap(mkpath
, os
.makedirs
)
507 readlink
= wrap(mkpath
, os
.readlink
, decorator
=decode
)
508 except AttributeError:
510 def _readlink_noop(p
):
513 readlink
= _readlink_noop
515 realpath
= wrap(mkpath
, os
.path
.realpath
, decorator
=decode
)
516 relpath
= wrap(mkpath
, os
.path
.relpath
, decorator
=decode
)
517 remove
= wrap(mkpath
, os
.remove
)
518 stat
= wrap(mkpath
, os
.stat
)
519 unlink
= wrap(mkpath
, os
.unlink
)
521 walk
= wrap(mkpath
, os
.walk
)