Remove superfluous manipulation of import path.
[dput.git] / test / helper.py
blobbb4ee926a12c0972b3768c2b9da7edf461e8656a
1 # -*- coding: utf-8; -*-
3 # test/helper.py
4 # Part of ‘dput’, a Debian package upload toolkit.
6 # This is free software, and you are welcome to redistribute it under
7 # certain conditions; see the end of this file for copyright
8 # information, grant of license, and disclaimer of warranty.
10 """ Helper functionality for Dput test suite. """
12 from __future__ import (absolute_import, unicode_literals)
14 import sys
16 if sys.version_info >= (3, 3):
17 import builtins
18 import unittest
19 import unittest.mock as mock
20 from io import StringIO as StringIO
21 import configparser
22 import collections.abc as collections_abc
23 elif sys.version_info >= (3, 0):
24 raise RuntimeError("Python 3 earlier than 3.3 is not supported.")
25 elif sys.version_info >= (2, 7):
26 # Python 2 standard library.
27 import __builtin__ as builtins
28 # Third-party backport of Python 3 unittest improvements.
29 import unittest2 as unittest
30 # Third-party mock library.
31 import mock
32 # Python 2 standard library.
33 from StringIO import StringIO as BaseStringIO
34 import ConfigParser as configparser
35 import collections as collections_abc
36 else:
37 raise RuntimeError("Python earlier than 2.7 is not supported.")
39 import os
40 import os.path
41 import io
42 import shutil
43 import tempfile
44 import pwd
45 import errno
46 import time
47 import signal
48 import subprocess
49 import functools
50 import itertools
51 import base64
52 import collections
53 import weakref
54 import shlex
57 __metaclass__ = type
59 try:
60 # Python 2 types.
61 basestring
62 unicode
63 except NameError:
64 # Alias for Python 3 types.
65 basestring = str
66 unicode = str
69 def make_unique_slug(testcase):
70 """ Make a unique slug for the test case. """
71 text = base64.b64encode(
72 testcase.getUniqueString().encode('utf-8')
73 ).decode('utf-8')
74 result = text[-30:]
75 return result
78 try:
79 StringIO
80 except NameError:
81 # We don't yet have the StringIO we want. Create it.
83 class StringIO(BaseStringIO, object):
84 """ StringIO with a context manager. """
86 def __enter__(self):
87 return self
89 def __exit__(self, *args):
90 self.close()
91 return False
93 def readable(self):
94 return True
96 def writable(self):
97 return True
99 def seekable(self):
100 return True
103 def patch_stdout(testcase):
104 """ Patch `sys.stdout` for the specified test case. """
105 patcher = mock.patch.object(
106 sys, "stdout", wraps=StringIO())
107 patcher.start()
108 testcase.addCleanup(patcher.stop)
111 def patch_stderr(testcase):
112 """ Patch `sys.stderr` for the specified test case. """
113 patcher = mock.patch.object(
114 sys, "stderr", wraps=StringIO())
115 patcher.start()
116 testcase.addCleanup(patcher.stop)
119 def patch_signal_signal(testcase):
120 """ Patch `signal.signal` for the specified test case. """
121 func_patcher = mock.patch.object(signal, "signal", autospec=True)
122 func_patcher.start()
123 testcase.addCleanup(func_patcher.stop)
126 class FakeSystemExit(Exception):
127 """ Fake double for `SystemExit` exception. """
130 EXIT_STATUS_SUCCESS = 0
131 EXIT_STATUS_FAILURE = 1
132 EXIT_STATUS_COMMAND_NOT_FOUND = 127
135 def patch_sys_exit(testcase):
136 """ Patch `sys.exit` for the specified test case. """
137 func_patcher = mock.patch.object(
138 sys, "exit", autospec=True,
139 side_effect=FakeSystemExit())
140 func_patcher.start()
141 testcase.addCleanup(func_patcher.stop)
144 def patch_sys_argv(testcase):
145 """ Patch the `sys.argv` sequence for the test case. """
146 if not hasattr(testcase, 'progname'):
147 testcase.progname = make_unique_slug(testcase)
148 if not hasattr(testcase, 'sys_argv'):
149 testcase.sys_argv = [testcase.progname]
150 patcher = mock.patch.object(
151 sys, "argv",
152 new=list(testcase.sys_argv))
153 patcher.start()
154 testcase.addCleanup(patcher.stop)
157 def patch_system_interfaces(testcase):
158 """ Patch system interfaces that are disruptive to the test runner. """
159 patch_stdout(testcase)
160 patch_stderr(testcase)
161 patch_sys_exit(testcase)
162 patch_sys_argv(testcase)
165 def patch_time_time(testcase, values=None):
166 """ Patch the `time.time` function for the specified test case.
168 :param testcase: The `TestCase` instance for binding to the patch.
169 :param values: An iterable to provide return values.
170 :return: None.
173 if values is None:
174 values = itertools.count()
176 def generator_fake_time():
177 while True:
178 yield next(values)
180 func_patcher = mock.patch.object(time, "time", autospec=True)
181 func_patcher.start()
182 testcase.addCleanup(func_patcher.stop)
184 time.time.side_effect = generator_fake_time()
187 def patch_os_environ(testcase):
188 """ Patch the `os.environ` mapping. """
189 if not hasattr(testcase, 'os_environ'):
190 testcase.os_environ = {}
191 patcher = mock.patch.object(os, "environ", new=testcase.os_environ)
192 patcher.start()
193 testcase.addCleanup(patcher.stop)
196 def patch_os_getpid(testcase):
197 """ Patch `os.getpid` for the specified test case. """
198 func_patcher = mock.patch.object(os, "getpid", autospec=True)
199 func_patcher.start()
200 testcase.addCleanup(func_patcher.stop)
203 def patch_os_getuid(testcase):
204 """ Patch the `os.getuid` function. """
205 if not hasattr(testcase, 'os_getuid_return_value'):
206 testcase.os_getuid_return_value = testcase.getUniqueInteger()
207 func_patcher = mock.patch.object(
208 os, "getuid", autospec=True,
209 return_value=testcase.os_getuid_return_value)
210 func_patcher.start()
211 testcase.addCleanup(func_patcher.stop)
214 PasswdEntry = collections.namedtuple(
215 "PasswdEntry",
216 "pw_name pw_passwd pw_uid pw_gid pw_gecos pw_dir pw_shell")
219 def patch_pwd_getpwuid(testcase):
220 """ Patch the `pwd.getpwuid` function. """
221 if not hasattr(testcase, 'pwd_getpwuid_return_value'):
222 testcase.pwd_getpwuid_return_value = PasswdEntry(
223 pw_name=make_unique_slug(testcase),
224 pw_passwd=make_unique_slug(testcase),
225 pw_uid=testcase.getUniqueInteger(),
226 pw_gid=testcase.getUniqueInteger(),
227 pw_gecos=testcase.getUniqueString(),
228 pw_dir=tempfile.mktemp(),
229 pw_shell=tempfile.mktemp())
230 if not isinstance(testcase.pwd_getpwuid_return_value, pwd.struct_passwd):
231 pwent = pwd.struct_passwd(testcase.pwd_getpwuid_return_value)
232 else:
233 pwent = testcase.pwd_getpwuid_return_value
234 func_patcher = mock.patch.object(
235 pwd, "getpwuid", autospec=True,
236 return_value=pwent)
237 func_patcher.start()
238 testcase.addCleanup(func_patcher.stop)
241 def patch_os_path_exists(testcase):
242 """ Patch `os.path.exists` behaviour for this test case.
244 When the patched function is called, the registry of
245 `FileDouble` instances for this test case will be used to get
246 the instance for the path specified.
249 orig_os_path_exists = os.path.exists
251 def fake_os_path_exists(path):
252 registry = FileDouble.get_registry_for_testcase(testcase)
253 if path in registry:
254 file_double = registry[path]
255 result = file_double.os_path_exists_scenario.call_hook()
256 else:
257 result = orig_os_path_exists(path)
258 return result
260 func_patcher = mock.patch.object(
261 os.path, "exists", autospec=True,
262 side_effect=fake_os_path_exists)
263 func_patcher.start()
264 testcase.addCleanup(func_patcher.stop)
267 def patch_os_access(testcase):
268 """ Patch `os.access` behaviour for this test case.
270 When the patched function is called, the registry of
271 `FileDouble` instances for this test case will be used to get
272 the instance for the path specified.
275 orig_os_access = os.access
277 def fake_os_access(path, mode):
278 registry = FileDouble.get_registry_for_testcase(testcase)
279 if path in registry:
280 file_double = registry[path]
281 result = file_double.os_access_scenario.call_hook(mode)
282 else:
283 result = orig_os_access(path, mode)
284 return result
286 func_patcher = mock.patch.object(
287 os, "access", autospec=True,
288 side_effect=fake_os_access)
289 func_patcher.start()
290 testcase.addCleanup(func_patcher.stop)
293 StatResult = collections.namedtuple(
294 'StatResult', [
295 'st_mode',
296 'st_ino', 'st_dev', 'st_nlink',
297 'st_uid', 'st_gid',
298 'st_size',
299 'st_atime', 'st_mtime', 'st_ctime',
303 def patch_os_stat(testcase):
304 """ Patch `os.stat` behaviour for this test case.
306 When the patched function is called, the registry of
307 `FileDouble` instances for this test case will be used to get
308 the instance for the path specified.
311 orig_os_stat = os.stat
313 def fake_os_stat(path):
314 registry = FileDouble.get_registry_for_testcase(testcase)
315 if path in registry:
316 file_double = registry[path]
317 result = file_double.os_stat_scenario.call_hook()
318 else:
319 result = orig_os_stat(path)
320 return result
322 func_patcher = mock.patch.object(
323 os, "stat", autospec=True,
324 side_effect=fake_os_stat)
325 func_patcher.start()
326 testcase.addCleanup(func_patcher.stop)
329 def patch_os_lstat(testcase):
330 """ Patch `os.lstat` behaviour for this test case.
332 When the patched function is called, the registry of
333 `FileDouble` instances for this test case will be used to get
334 the instance for the path specified.
337 orig_os_lstat = os.lstat
339 def fake_os_lstat(path):
340 registry = FileDouble.get_registry_for_testcase(testcase)
341 if path in registry:
342 file_double = registry[path]
343 result = file_double.os_lstat_scenario.call_hook()
344 else:
345 result = orig_os_lstat(path)
346 return result
348 func_patcher = mock.patch.object(
349 os, "lstat", autospec=True,
350 side_effect=fake_os_lstat)
351 func_patcher.start()
352 testcase.addCleanup(func_patcher.stop)
355 def patch_os_unlink(testcase):
356 """ Patch `os.unlink` behaviour for this test case.
358 When the patched function is called, the registry of
359 `FileDouble` instances for this test case will be used to get
360 the instance for the path specified.
363 orig_os_unlink = os.unlink
365 def fake_os_unlink(path):
366 registry = FileDouble.get_registry_for_testcase(testcase)
367 if path in registry:
368 file_double = registry[path]
369 result = file_double.os_unlink_scenario.call_hook()
370 else:
371 result = orig_os_unlink(path)
372 return result
374 func_patcher = mock.patch.object(
375 os, "unlink", autospec=True,
376 side_effect=fake_os_unlink)
377 func_patcher.start()
378 testcase.addCleanup(func_patcher.stop)
381 def patch_os_rmdir(testcase):
382 """ Patch `os.rmdir` behaviour for this test case.
384 When the patched function is called, the registry of
385 `FileDouble` instances for this test case will be used to get
386 the instance for the path specified.
389 orig_os_rmdir = os.rmdir
391 def fake_os_rmdir(path):
392 registry = FileDouble.get_registry_for_testcase(testcase)
393 if path in registry:
394 file_double = registry[path]
395 result = file_double.os_rmdir_scenario.call_hook()
396 else:
397 result = orig_os_rmdir(path)
398 return result
400 func_patcher = mock.patch.object(
401 os, "rmdir", autospec=True,
402 side_effect=fake_os_rmdir)
403 func_patcher.start()
404 testcase.addCleanup(func_patcher.stop)
407 def patch_shutil_rmtree(testcase):
408 """ Patch `shutil.rmtree` behaviour for this test case.
410 When the patched function is called, the registry of
411 `FileDouble` instances for this test case will be used to get
412 the instance for the path specified.
415 orig_shutil_rmtree = os.rmdir
417 def fake_shutil_rmtree(path, ignore_errors=False, onerror=None):
418 registry = FileDouble.get_registry_for_testcase(testcase)
419 if path in registry:
420 file_double = registry[path]
421 result = file_double.shutil_rmtree_scenario.call_hook()
422 else:
423 result = orig_shutil_rmtree(path)
424 return result
426 func_patcher = mock.patch.object(
427 shutil, "rmtree", autospec=True,
428 side_effect=fake_shutil_rmtree)
429 func_patcher.start()
430 testcase.addCleanup(func_patcher.stop)
433 def patch_tempfile_mkdtemp(testcase):
434 """ Patch the `tempfile.mkdtemp` function for this test case. """
435 if not hasattr(testcase, 'tempfile_mkdtemp_file_double'):
436 testcase.tempfile_mkdtemp_file_double = FileDouble(tempfile.mktemp())
438 double = testcase.tempfile_mkdtemp_file_double
439 double.set_os_unlink_scenario('okay')
440 double.set_os_rmdir_scenario('okay')
441 double.register_for_testcase(testcase)
443 func_patcher = mock.patch.object(tempfile, "mkdtemp", autospec=True)
444 func_patcher.start()
445 testcase.addCleanup(func_patcher.stop)
447 tempfile.mkdtemp.return_value = testcase.tempfile_mkdtemp_file_double.path
450 try:
451 FileNotFoundError
452 FileExistsError
453 PermissionError
454 except NameError:
455 # Python 2 uses IOError.
456 def _ensure_ioerror_args(init_args, init_kwargs, errno_value):
457 result_kwargs = init_kwargs
458 result_errno = errno_value
459 result_strerror = os.strerror(errno_value)
460 result_filename = None
461 if len(init_args) >= 3:
462 result_errno = init_args[0]
463 result_filename = init_args[2]
464 if 'errno' in init_kwargs:
465 result_errno = init_kwargs['errno']
466 del result_kwargs['errno']
467 if 'filename' in init_kwargs:
468 result_filename = init_kwargs['filename']
469 del result_kwargs['filename']
470 if len(init_args) >= 2:
471 result_strerror = init_args[1]
472 if 'strerror' in init_kwargs:
473 result_strerror = init_kwargs['strerror']
474 del result_kwargs['strerror']
475 result_args = (result_errno, result_strerror, result_filename)
476 return (result_args, result_kwargs)
478 class FileNotFoundError(IOError):
479 def __init__(self, *args, **kwargs):
480 (args, kwargs) = _ensure_ioerror_args(
481 args, kwargs, errno_value=errno.ENOENT)
482 super(FileNotFoundError, self).__init__(*args, **kwargs)
484 class FileExistsError(IOError):
485 def __init__(self, *args, **kwargs):
486 (args, kwargs) = _ensure_ioerror_args(
487 args, kwargs, errno_value=errno.EEXIST)
488 super(FileExistsError, self).__init__(*args, **kwargs)
490 class PermissionError(IOError):
491 def __init__(self, *args, **kwargs):
492 (args, kwargs) = _ensure_ioerror_args(
493 args, kwargs, errno_value=errno.EPERM)
494 super(PermissionError, self).__init__(*args, **kwargs)
497 def make_fake_file_scenarios(path=None):
498 """ Make a collection of scenarios for testing with fake files.
500 :path: The filesystem path of the fake file. If not specified,
501 a valid random path will be generated.
502 :return: A collection of scenarios for tests involving input files.
504 The collection is a mapping from scenario name to a dictionary of
505 scenario attributes.
509 if path is None:
510 file_path = tempfile.mktemp()
511 else:
512 file_path = path
514 fake_file_empty = StringIO()
515 fake_file_minimal = StringIO("Lorem ipsum.")
516 fake_file_large = StringIO("\n".join(
517 "ABCDEFGH" * 100
518 for __ in range(1000)))
520 default_scenario_params = {
521 'open_scenario_name': 'okay',
522 'file_double_params': dict(
523 path=file_path, fake_file=fake_file_minimal),
526 scenarios = {
527 'default': {},
528 'error-not-exist': {
529 'open_scenario_name': 'nonexist',
531 'error-exist': {
532 'open_scenario_name': 'exist_error',
534 'error-read-denied': {
535 'open_scenario_name': 'read_denied',
537 'not-found': {
538 'file_double_params': dict(
539 path=file_path, fake_file=fake_file_empty),
541 'exist-empty': {
542 'file_double_params': dict(
543 path=file_path, fake_file=fake_file_empty),
545 'exist-minimal': {
546 'file_double_params': dict(
547 path=file_path, fake_file=fake_file_minimal),
549 'exist-large': {
550 'file_double_params': dict(
551 path=file_path, fake_file=fake_file_large),
555 for (name, scenario) in scenarios.items():
556 params = default_scenario_params.copy()
557 params.update(scenario)
558 scenario.update(params)
559 scenario['file_double'] = FileDouble(**scenario['file_double_params'])
560 scenario['file_double'].set_open_scenario(params['open_scenario_name'])
561 scenario['fake_file_scenario_name'] = name
563 return scenarios
566 def get_file_doubles_from_fake_file_scenarios(scenarios):
567 """ Get the `FileDouble` instances from fake file scenarios.
569 :param scenarios: Collection of fake file scenarios.
570 :return: Collection of `FileDouble` instances.
573 doubles = set(
574 scenario['file_double']
575 for scenario in scenarios
576 if scenario['file_double'] is not None)
578 return doubles
581 def setup_file_double_behaviour(testcase, doubles=None):
582 """ Set up file double instances and behaviour.
584 :param testcase: The `TestCase` instance to modify.
585 :param doubles: Collection of `FileDouble` instances.
586 :return: None.
588 If `doubles` is ``None``, a default collection will be made
589 from the result of `make_fake_file_scenarios` result.
592 if doubles is None:
593 scenarios = make_fake_file_scenarios()
594 doubles = get_file_doubles_from_fake_file_scenarios(
595 scenarios.values())
597 for file_double in doubles:
598 file_double.register_for_testcase(testcase)
600 orig_open = builtins.open
602 def fake_open(path, mode='rt', buffering=-1):
603 registry = FileDouble.get_registry_for_testcase(testcase)
604 if path in registry:
605 file_double = registry[path]
606 result = file_double.builtins_open_scenario.call_hook(
607 mode, buffering)
608 else:
609 result = orig_open(path, mode, buffering)
610 return result
612 mock_open = mock.mock_open()
613 mock_open.side_effect = fake_open
615 func_patcher = mock.patch.object(
616 builtins, "open", new=mock_open)
617 func_patcher.start()
618 testcase.addCleanup(func_patcher.stop)
621 def setup_fake_file_fixtures(testcase):
622 """ Set up fixtures for fake file doubles.
624 :param testcase: The `TestCase` instance to modify.
625 :return: None.
628 scenarios = make_fake_file_scenarios()
629 testcase.fake_file_scenarios = scenarios
631 file_doubles = get_file_doubles_from_fake_file_scenarios(
632 scenarios.values())
633 setup_file_double_behaviour(testcase, file_doubles)
636 def set_fake_file_scenario(testcase, name):
637 """ Set the named fake file scenario for the test case. """
638 scenario = testcase.fake_file_scenarios[name]
639 testcase.fake_file_scenario = scenario
640 testcase.file_double = scenario['file_double']
641 testcase.file_double.register_for_testcase(testcase)
644 class TestDoubleFunctionScenario:
645 """ Scenario for fake behaviour of a specific function. """
647 def __init__(self, scenario_name, double):
648 self.scenario_name = scenario_name
649 self.double = double
651 self.call_hook = getattr(
652 self, "_hook_{name}".format(name=self.scenario_name))
654 def __repr__(self):
655 text = (
656 "<{class_name} instance: {id}"
657 " name: {name!r},"
658 " call_hook name: {hook_name!r}"
659 " double: {double!r}"
660 ">").format(
661 class_name=self.__class__.__name__, id=id(self),
662 name=self.scenario_name, double=self.double,
663 hook_name=self.call_hook.__name__)
664 return text
666 def __eq__(self, other):
667 result = True
668 if not self.scenario_name == other.scenario_name:
669 result = False
670 if not self.double == other.double:
671 result = False
672 if not self.call_hook.__name__ == other.call_hook.__name__:
673 result = False
674 return result
676 def __ne__(self, other):
677 result = not self.__eq__(other)
678 return result
681 class os_path_exists_scenario(TestDoubleFunctionScenario):
682 """ Scenario for `os.path.exists` behaviour. """
684 def _hook_exist(self):
685 return True
687 def _hook_not_exist(self):
688 return False
691 class os_access_scenario(TestDoubleFunctionScenario):
692 """ Scenario for `os.access` behaviour. """
694 def _hook_okay(self, mode):
695 return True
697 def _hook_not_exist(self, mode):
698 return False
700 def _hook_read_only(self, mode):
701 if mode & (os.W_OK | os.X_OK):
702 result = False
703 else:
704 result = True
705 return result
707 def _hook_denied(self, mode):
708 if mode & (os.R_OK | os.W_OK | os.X_OK):
709 result = False
710 else:
711 result = True
712 return result
715 class os_stat_scenario(TestDoubleFunctionScenario):
716 """ Scenario for `os.stat` behaviour. """
718 def _hook_okay(self):
719 return self.double.stat_result
721 def _hook_notfound_error(self):
722 raise FileNotFoundError(
723 self.double.path,
724 "No such file or directory: {path!r}".format(
725 path=self.double.path))
727 def _hook_denied_error(self):
728 raise PermissionError(
729 self.double.path,
730 "Permission denied")
733 class os_lstat_scenario(os_stat_scenario):
734 """ Scenario for `os.lstat` behaviour. """
737 class os_unlink_scenario(TestDoubleFunctionScenario):
738 """ Scenario for `os.unlink` behaviour. """
740 def _hook_okay(self):
741 return None
743 def _hook_nonexist(self):
744 error = FileNotFoundError(
745 self.double.path,
746 "No such file or directory: {path!r}".format(
747 path=self.double.path))
748 raise error
750 def _hook_denied(self):
751 error = PermissionError(
752 self.double.path,
753 "Permission denied")
754 raise error
757 class os_rmdir_scenario(TestDoubleFunctionScenario):
758 """ Scenario for `os.rmdir` behaviour. """
760 def _hook_okay(self):
761 return None
763 def _hook_nonexist(self):
764 error = FileNotFoundError(
765 self.double.path,
766 "No such file or directory: {path!r}".format(
767 path=self.double.path))
768 raise error
770 def _hook_denied(self):
771 error = PermissionError(
772 self.double.path,
773 "Permission denied")
774 raise error
777 class shutil_rmtree_scenario(TestDoubleFunctionScenario):
778 """ Scenario for `shutil.rmtree` behaviour. """
780 def _hook_okay(self):
781 return None
783 def _hook_nonexist(self):
784 error = FileNotFoundError(
785 self.double.path,
786 "No such file or directory: {path!r}".format(
787 path=self.double.path))
788 raise error
790 def _hook_denied(self):
791 error = PermissionError(
792 self.double.path,
793 "Permission denied")
794 raise error
797 class builtins_open_scenario(TestDoubleFunctionScenario):
798 """ Scenario for `builtins.open` behaviour. """
800 def _hook_okay(self, mode, buffering):
801 result = self.double.fake_file
802 return result
804 def _hook_nonexist(self, mode, buffering):
805 if mode.startswith('r'):
806 error = FileNotFoundError(
807 self.double.path,
808 "No such file or directory: {path!r}".format(
809 path=self.double.path))
810 raise error
811 result = self.double.fake_file
812 return result
814 def _hook_exist_error(self, mode, buffering):
815 if mode.startswith('w') or mode.startswith('a'):
816 error = FileExistsError(
817 self.double.path,
818 "File already exists: {path!r}".format(
819 path=self.double.path))
820 raise error
821 result = self.double.fake_file
822 return result
824 def _hook_read_denied(self, mode, buffering):
825 if mode.startswith('r'):
826 error = PermissionError(
827 self.double.path,
828 "Read denied on {path!r}".format(
829 path=self.double.path))
830 raise error
831 result = self.double.fake_file
832 return result
834 def _hook_write_denied(self, mode, buffering):
835 if mode.startswith('w') or mode.startswith('a'):
836 error = PermissionError(
837 self.double.path,
838 "Write denied on {path!r}".format(
839 path=self.double.path))
840 raise error
841 result = self.double.fake_file
842 return result
845 class TestDoubleWithRegistry:
846 """ Abstract base class for a test double with a test case registry. """
848 registry_class = NotImplemented
849 registries = NotImplemented
851 function_scenario_params_by_class = NotImplemented
853 def __new__(cls, *args, **kwargs):
854 superclass = super(TestDoubleWithRegistry, cls)
855 if superclass.__new__ is object.__new__:
856 # The ‘object’ implementation complains about extra arguments.
857 instance = superclass.__new__(cls)
858 else:
859 instance = superclass.__new__(cls, *args, **kwargs)
860 instance.make_set_scenario_methods()
862 return instance
864 def __init__(self, *args, **kwargs):
865 super(TestDoubleWithRegistry, self).__init__(*args, **kwargs)
866 self._set_method_per_scenario()
868 def _make_set_scenario_method(self, scenario_class, params):
869 def method(self, name):
870 scenario = scenario_class(name, double=self)
871 setattr(self, scenario_class.__name__, scenario)
872 method.__doc__ = (
873 """ Set the scenario for `{name}` behaviour. """
874 ).format(name=scenario_class.__name__)
875 method.__name__ = str(params['set_scenario_method_name'])
876 return method
878 def make_set_scenario_methods(self):
879 """ Make `set_<scenario_class_name>` methods on this class. """
880 for (function_scenario_class, function_scenario_params) in (
881 self.function_scenario_params_by_class.items()):
882 method = self._make_set_scenario_method(
883 function_scenario_class, function_scenario_params)
884 setattr(self.__class__, method.__name__, method)
885 function_scenario_params['set_scenario_method'] = method
887 def _set_method_per_scenario(self):
888 """ Set the method to be called for each scenario. """
889 for function_scenario_params in (
890 self.function_scenario_params_by_class.values()):
891 function_scenario_params['set_scenario_method'](
892 self, function_scenario_params['default_scenario_name'])
894 @classmethod
895 def get_registry_for_testcase(cls, testcase):
896 """ Get the FileDouble registry for the specified test case. """
897 # Key in a dict must be hashable.
898 key = (testcase.__class__, id(testcase))
899 registry = cls.registries.setdefault(key, cls.registry_class())
900 return registry
902 def get_registry_key(self):
903 """ Get the registry key for this double. """
904 raise NotImplementedError
906 def register_for_testcase(self, testcase):
907 """ Add this instance to registry for the specified testcase. """
908 registry = self.get_registry_for_testcase(testcase)
909 key = self.get_registry_key()
910 registry[key] = self
911 unregister_func = functools.partial(
912 self.unregister_for_testcase, testcase)
913 testcase.addCleanup(unregister_func)
915 def unregister_for_testcase(self, testcase):
916 """ Remove this instance from registry for the specified testcase. """
917 registry = self.get_registry_for_testcase(testcase)
918 key = self.get_registry_key()
919 if key in registry:
920 registry.pop(key)
923 def copy_fake_file(fake_file):
924 """ Make a copy of the StringIO instance. """
925 fake_file_type = StringIO
926 content = ""
927 if fake_file is not None:
928 fake_file_type = type(fake_file)
929 content = fake_file.getvalue()
930 assert issubclass(fake_file_type, object)
931 result = fake_file_type(content)
932 if hasattr(fake_file, 'encoding'):
933 if not hasattr(result, 'encoding'):
934 result.encoding = fake_file.encoding
935 return result
938 class FileDouble(TestDoubleWithRegistry):
939 """ A testing double for a file. """
941 registry_class = dict
942 registries = {}
944 function_scenario_params_by_class = {
945 os_path_exists_scenario: {
946 'default_scenario_name': 'not_exist',
947 'set_scenario_method_name': 'set_os_path_exists_scenario',
949 os_access_scenario: {
950 'default_scenario_name': 'okay',
951 'set_scenario_method_name': 'set_os_access_scenario',
953 os_stat_scenario: {
954 'default_scenario_name': 'okay',
955 'set_scenario_method_name': 'set_os_stat_scenario',
957 os_lstat_scenario: {
958 'default_scenario_name': 'okay',
959 'set_scenario_method_name': 'set_os_lstat_scenario',
961 builtins_open_scenario: {
962 'default_scenario_name': 'okay',
963 'set_scenario_method_name': 'set_open_scenario',
965 os_unlink_scenario: {
966 'default_scenario_name': 'okay',
967 'set_scenario_method_name': 'set_os_unlink_scenario',
969 os_rmdir_scenario: {
970 'default_scenario_name': 'okay',
971 'set_scenario_method_name': 'set_os_rmdir_scenario',
973 shutil_rmtree_scenario: {
974 'default_scenario_name': 'okay',
975 'set_scenario_method_name': 'set_shutil_rmtree_scenario',
979 def __init__(self, path=None, fake_file=None, *args, **kwargs):
980 self.path = path
981 self.fake_file = copy_fake_file(fake_file)
982 self.fake_file.name = path
984 self._set_stat_result()
986 super(FileDouble, self).__init__(*args, **kwargs)
988 def _set_stat_result(self):
989 """ Set the `os.stat` result for this file. """
990 size = len(self.fake_file.getvalue())
991 self.stat_result = StatResult(
992 st_mode=0,
993 st_ino=None, st_dev=None, st_nlink=None,
994 st_uid=0, st_gid=0,
995 st_size=size,
996 st_atime=None, st_mtime=None, st_ctime=None,
999 def __repr__(self):
1000 text = "FileDouble(path={path!r}, fake_file={fake_file!r})".format(
1001 path=self.path, fake_file=self.fake_file)
1002 return text
1004 def get_registry_key(self):
1005 """ Get the registry key for this double. """
1006 result = self.path
1007 return result
1010 class os_popen_scenario(TestDoubleFunctionScenario):
1011 """ Scenario for `os.popen` behaviour. """
1013 stream_name_by_mode = {
1014 'w': 'stdin',
1015 'r': 'stdout',
1018 def _hook_success(self, argv, mode, buffering):
1019 stream_name = self.stream_name_by_mode[mode]
1020 stream_double = getattr(
1021 self.double, stream_name + '_double')
1022 result = stream_double.fake_file
1023 return result
1025 def _hook_failure(self, argv, mode, buffering):
1026 result = StringIO()
1027 return result
1029 def _hook_not_found(self, argv, mode, buffering):
1030 result = StringIO()
1031 return result
1034 class os_waitpid_scenario(TestDoubleFunctionScenario):
1035 """ Scenario for `os.waitpid` behaviour. """
1037 def _hook_success(self, pid, options):
1038 result = (pid, EXIT_STATUS_SUCCESS)
1039 return result
1041 def _hook_failure(self, pid, options):
1042 result = (pid, EXIT_STATUS_FAILURE)
1043 return result
1045 def _hook_not_found(self, pid, options):
1046 error = OSError(errno.ECHILD)
1047 raise error
1050 class os_system_scenario(TestDoubleFunctionScenario):
1051 """ Scenario for `os.system` behaviour. """
1053 def _hook_success(self, command):
1054 result = EXIT_STATUS_SUCCESS
1055 return result
1057 def _hook_failure(self, command):
1058 result = EXIT_STATUS_FAILURE
1059 return result
1061 def _hook_not_found(self, command):
1062 result = EXIT_STATUS_COMMAND_NOT_FOUND
1063 return result
1066 class os_spawnv_scenario(TestDoubleFunctionScenario):
1067 """ Scenario for `os.spawnv` behaviour. """
1069 def _hook_success(self, mode, file, args):
1070 result = EXIT_STATUS_SUCCESS
1071 return result
1073 def _hook_failure(self, mode, file, args):
1074 result = EXIT_STATUS_FAILURE
1075 return result
1077 def _hook_not_found(self, mode, file, args):
1078 result = EXIT_STATUS_COMMAND_NOT_FOUND
1079 return result
1082 ARG_ANY = object()
1083 ARG_MORE = object()
1086 class PopenDouble:
1087 """ A testing double for `subprocess.Popen`. """
1089 def __init__(self, args, *posargs, **kwargs):
1090 self.stdin = None
1091 self.stdout = None
1092 self.stderr = None
1093 self.pid = None
1094 self.returncode = None
1096 if kwargs.get('shell', False):
1097 self.argv = shlex.split(args)
1098 else:
1099 # The paramter is already a sequence of command-line arguments.
1100 self.argv = args
1102 def set_streams(self, subprocess_double, popen_kwargs):
1103 """ Set the streams on the `PopenDouble`.
1105 :param subprocess_double: The `SubprocessDouble` from
1106 which to get existing stream doubles.
1107 :param popen_kwargs: The keyword arguments to the
1108 `subprocess.Popen` call.
1109 :return: ``None``.
1112 for stream_name in (
1113 name for name in ['stdin', 'stdout', 'stderr']
1114 if name in popen_kwargs):
1115 stream_spec = popen_kwargs[stream_name]
1116 if stream_spec is subprocess.PIPE:
1117 stream_double = getattr(
1118 subprocess_double,
1119 "{name}_double".format(name=stream_name))
1120 stream_file = stream_double.fake_file
1121 elif stream_spec is subprocess.STDOUT:
1122 stream_file = subprocess_double.stdout_double.fake_file
1123 else:
1124 stream_file = stream_spec
1125 setattr(self, stream_name, stream_file)
1127 def wait(self):
1128 """ Wait for subprocess to terminate. """
1129 return self.returncode
1132 class subprocess_popen_scenario(TestDoubleFunctionScenario):
1133 """ Scenario for `subprocess.Popen` behaviour. """
1135 def _hook_success(self, testcase, args, *posargs, **kwargs):
1136 double = self.double.popen_double
1137 double.set_streams(self.double, kwargs)
1138 return double
1141 def patch_subprocess_popen(testcase):
1142 """ Patch `subprocess.Popen` constructor for this test case.
1144 :param testcase: The `TestCase` instance to modify.
1145 :return: None.
1147 When the patched function is called, the registry of
1148 `SubprocessDouble` instances for this test case will be used
1149 to get the instance for the program path specified.
1152 orig_subprocess_popen = subprocess.Popen
1154 def fake_subprocess_popen(args, *posargs, **kwargs):
1155 if kwargs.get('shell', False):
1156 argv = shlex.split(args)
1157 else:
1158 argv = args
1159 registry = SubprocessDouble.get_registry_for_testcase(testcase)
1160 if argv in registry:
1161 subprocess_double = registry[argv]
1162 result = subprocess_double.subprocess_popen_scenario.call_hook(
1163 testcase, args, *posargs, **kwargs)
1164 else:
1165 result = orig_subprocess_popen(args, *posargs, **kwargs)
1166 return result
1168 func_patcher = mock.patch.object(
1169 subprocess, "Popen", autospec=True,
1170 side_effect=fake_subprocess_popen)
1171 func_patcher.start()
1172 testcase.addCleanup(func_patcher.stop)
1175 class subprocess_call_scenario(TestDoubleFunctionScenario):
1176 """ Scenario for `subprocess.call` behaviour. """
1178 def _hook_success(self, command):
1179 result = EXIT_STATUS_SUCCESS
1180 return result
1182 def _hook_failure(self, command):
1183 result = EXIT_STATUS_FAILURE
1184 return result
1186 def _hook_not_found(self, command):
1187 result = EXIT_STATUS_COMMAND_NOT_FOUND
1188 return result
1191 def patch_subprocess_call(testcase):
1192 """ Patch `subprocess.call` function for this test case.
1194 :param testcase: The `TestCase` instance to modify.
1195 :return: None.
1197 When the patched function is called, the registry of
1198 `SubprocessDouble` instances for this test case will be used
1199 to get the instance for the program path specified.
1202 orig_subprocess_call = subprocess.call
1204 def fake_subprocess_call(command, *posargs, **kwargs):
1205 if kwargs.get('shell', False):
1206 command_argv = shlex.split(command)
1207 else:
1208 command_argv = command
1209 registry = SubprocessDouble.get_registry_for_testcase(testcase)
1210 if command_argv in registry:
1211 subprocess_double = registry[command_argv]
1212 result = subprocess_double.subprocess_call_scenario.call_hook(
1213 command)
1214 else:
1215 result = orig_subprocess_call(command, *posargs, **kwargs)
1216 return result
1218 func_patcher = mock.patch.object(
1219 subprocess, "call", autospec=True,
1220 side_effect=fake_subprocess_call)
1221 func_patcher.start()
1222 testcase.addCleanup(func_patcher.stop)
1225 class subprocess_check_call_scenario(TestDoubleFunctionScenario):
1226 """ Scenario for `subprocess.check_call` behaviour. """
1228 def _hook_success(self, command):
1229 return None
1231 def _hook_failure(self, command):
1232 result = EXIT_STATUS_FAILURE
1233 error = subprocess.CalledProcessError(result, command)
1234 raise error
1236 def _hook_not_found(self, command):
1237 result = EXIT_STATUS_COMMAND_NOT_FOUND
1238 error = subprocess.CalledProcessError(result, command)
1239 raise error
1242 def patch_subprocess_check_call(testcase):
1243 """ Patch `subprocess.check_call` function for this test case.
1245 :param testcase: The `TestCase` instance to modify.
1246 :return: None.
1248 When the patched function is called, the registry of
1249 `SubprocessDouble` instances for this test case will be used
1250 to get the instance for the program path specified.
1253 orig_subprocess_check_call = subprocess.check_call
1255 def fake_subprocess_check_call(command, *posargs, **kwargs):
1256 if kwargs.get('shell', False):
1257 command_argv = shlex.split(command)
1258 else:
1259 command_argv = command
1260 registry = SubprocessDouble.get_registry_for_testcase(testcase)
1261 if command_argv in registry:
1262 subprocess_double = registry[command_argv]
1263 scenario = subprocess_double.subprocess_check_call_scenario
1264 result = scenario.call_hook(command)
1265 else:
1266 result = orig_subprocess_check_call(command, *posargs, **kwargs)
1267 return result
1269 func_patcher = mock.patch.object(
1270 subprocess, "check_call", autospec=True,
1271 side_effect=fake_subprocess_check_call)
1272 func_patcher.start()
1273 testcase.addCleanup(func_patcher.stop)
1276 class SubprocessDoubleRegistry(collections_abc.MutableMapping):
1277 """ Registry of `SubprocessDouble` instances by `argv`. """
1279 def __init__(self, *args, **kwargs):
1280 items = []
1281 if args:
1282 if isinstance(args[0], collections_abc.Mapping):
1283 items = args[0].items()
1284 if isinstance(args[0], collections_abc.Iterable):
1285 items = args[0]
1286 self._mapping = dict(items)
1288 def __repr__(self):
1289 text = "<{class_name} object: {mapping}>".format(
1290 class_name=self.__class__.__name__, mapping=self._mapping)
1291 return text
1293 def _match_argv(self, argv):
1294 """ Match the specified `argv` with our registered keys. """
1295 match = None
1296 if not isinstance(argv, collections_abc.Sequence):
1297 return match
1298 candidates = iter(self._mapping)
1299 while match is None:
1300 try:
1301 candidate = next(candidates)
1302 except StopIteration:
1303 break
1304 found = None
1305 if candidate == argv:
1306 # An exact match.
1307 found = True
1308 word_iter = enumerate(candidate)
1309 while found is None:
1310 try:
1311 (word_index, candidate_word) = next(word_iter)
1312 except StopIteration:
1313 break
1314 if candidate_word is ARG_MORE:
1315 # Candiate matches any remaining words. We have a match.
1316 found = True
1317 elif word_index > len(argv):
1318 # Candidate is too long for the specified argv.
1319 found = False
1320 elif candidate_word is ARG_ANY:
1321 # Candidate matches any word at this position.
1322 continue
1323 elif candidate_word == argv[word_index]:
1324 # Candidate matches the word at this position.
1325 continue
1326 else:
1327 # This candidate does not match.
1328 found = False
1329 if found is None:
1330 # Reached the end of the candidate without a mismatch.
1331 found = True
1332 if found:
1333 match = candidate
1334 return match
1336 def __getitem__(self, key):
1337 match = self._match_argv(key)
1338 if match is None:
1339 raise KeyError(key)
1340 result = self._mapping[match]
1341 return result
1343 def __setitem__(self, key, value):
1344 if key in self:
1345 del self[key]
1346 self._mapping[key] = value
1348 def __delitem__(self, key):
1349 match = self._match_argv(key)
1350 if match is not None:
1351 del self._mapping[match]
1353 def __iter__(self):
1354 return self._mapping.__iter__()
1356 def __len__(self):
1357 return self._mapping.__len__()
1360 class SubprocessDouble(TestDoubleWithRegistry):
1361 """ A testing double for a subprocess. """
1363 registry_class = SubprocessDoubleRegistry
1364 registries = {}
1366 double_by_pid = weakref.WeakValueDictionary()
1368 function_scenario_params_by_class = {
1369 subprocess_popen_scenario: {
1370 'default_scenario_name': 'success',
1371 'set_scenario_method_name': 'set_subprocess_popen_scenario',
1373 subprocess_call_scenario: {
1374 'default_scenario_name': 'success',
1375 'set_scenario_method_name': 'set_subprocess_call_scenario',
1377 subprocess_check_call_scenario: {
1378 'default_scenario_name': 'success',
1379 'set_scenario_method_name':
1380 'set_subprocess_check_call_scenario',
1382 os_popen_scenario: {
1383 'default_scenario_name': 'success',
1384 'set_scenario_method_name': 'set_os_popen_scenario',
1386 os_waitpid_scenario: {
1387 'default_scenario_name': 'success',
1388 'set_scenario_method_name': 'set_os_waitpid_scenario',
1390 os_system_scenario: {
1391 'default_scenario_name': 'success',
1392 'set_scenario_method_name': 'set_os_system_scenario',
1394 os_spawnv_scenario: {
1395 'default_scenario_name': 'success',
1396 'set_scenario_method_name': 'set_os_spawnv_scenario',
1400 def __init__(self, path=None, argv=None, *args, **kwargs):
1401 if path is None:
1402 path = tempfile.mktemp()
1403 self.path = path
1405 if argv is None:
1406 command_name = os.path.basename(path)
1407 argv = [command_name]
1408 self.argv = argv
1410 self.pid = self._make_pid()
1411 self._register_by_pid()
1413 self.set_popen_double()
1415 stream_class = SubprocessDouble.stream_class
1416 for stream_name in ['stdin', 'stdout', 'stderr']:
1417 fake_file = stream_class()
1418 file_double = FileDouble(fake_file=fake_file)
1419 stream_double_name = '{name}_double'.format(name=stream_name)
1420 setattr(self, stream_double_name, file_double)
1422 super(SubprocessDouble, self).__init__(*args, **kwargs)
1424 def set_popen_double(self):
1425 """ Set the `PopenDouble` for this instance. """
1426 double = PopenDouble(self.argv)
1427 double.pid = self.pid
1429 self.popen_double = double
1431 def __repr__(self):
1432 text = (
1433 "<SubprocessDouble instance: {id}"
1434 " path: {path!r},"
1435 " argv: {argv!r}"
1436 " stdin_double: {stdin_double!r}"
1437 " stdout_double: {stdout_double!r}"
1438 " stderr_double: {stderr_double!r}"
1439 ">").format(
1440 id=id(self),
1441 path=self.path, argv=self.argv,
1442 stdin_double=self.stdin_double,
1443 stdout_double=self.stdout_double,
1444 stderr_double=self.stderr_double)
1445 return text
1447 @classmethod
1448 def _make_pid(cls):
1449 """ Make a unique PID for a subprocess. """
1450 for pid in itertools.count(1):
1451 yield pid
1453 def _register_by_pid(self):
1454 """ Register this subprocess by its PID. """
1455 self.__class__.double_by_pid[self.pid] = self
1457 def get_registry_key(self):
1458 """ Get the registry key for this double. """
1459 result = tuple(self.argv)
1460 return result
1462 stream_class = io.BytesIO
1463 stream_encoding = "utf-8"
1465 def set_stdin_content(self, text, bytes_encoding=stream_encoding):
1466 """ Set the content of the `stdin` stream for this double. """
1467 content = text.encode(bytes_encoding)
1468 fake_file = self.stream_class(content)
1469 self.stdin_double.fake_file = fake_file
1471 def set_stdout_content(self, text, bytes_encoding=stream_encoding):
1472 """ Set the content of the `stdout` stream for this double. """
1473 content = text.encode(bytes_encoding)
1474 fake_file = self.stream_class(content)
1475 self.stdout_double.fake_file = fake_file
1477 def set_stderr_content(self, text, bytes_encoding=stream_encoding):
1478 """ Set the content of the `stderr` stream for this double. """
1479 content = text.encode(bytes_encoding)
1480 fake_file = self.stream_class(content)
1481 self.stderr_double.fake_file = fake_file
1484 def make_fake_subprocess_scenarios(path=None):
1485 """ Make a collection of scenarios for testing with fake files.
1487 :path: The filesystem path of the fake program. If not specified,
1488 a valid random path will be generated.
1489 :return: A collection of scenarios for tests involving subprocesses.
1491 The collection is a mapping from scenario name to a dictionary of
1492 scenario attributes.
1495 if path is None:
1496 file_path = tempfile.mktemp()
1497 else:
1498 file_path = path
1500 default_scenario_params = {
1501 'return_value': EXIT_STATUS_SUCCESS,
1502 'program_path': file_path,
1503 'argv_after_command_name': [],
1506 scenarios = {
1507 'default': {},
1508 'not-found': {
1509 'return_value': EXIT_STATUS_COMMAND_NOT_FOUND,
1513 for (name, scenario) in scenarios.items():
1514 params = default_scenario_params.copy()
1515 params.update(scenario)
1516 scenario.update(params)
1517 program_path = params['program_path']
1518 program_name = os.path.basename(params['program_path'])
1519 argv = [program_name]
1520 argv.extend(params['argv_after_command_name'])
1521 subprocess_double_params = dict(
1522 path=program_path,
1523 argv=argv,
1525 subprocess_double = SubprocessDouble(**subprocess_double_params)
1526 scenario['subprocess_double'] = subprocess_double
1527 scenario['fake_file_scenario_name'] = name
1529 return scenarios
1532 def get_subprocess_doubles_from_fake_subprocess_scenarios(scenarios):
1533 """ Get the `SubprocessDouble` instances from fake subprocess scenarios.
1535 :param scenarios: Collection of fake subprocess scenarios.
1536 :return: Collection of `SubprocessDouble` instances.
1539 doubles = set(
1540 scenario['subprocess_double']
1541 for scenario in scenarios
1542 if scenario['subprocess_double'] is not None)
1544 return doubles
1547 def setup_subprocess_double_behaviour(testcase, doubles=None):
1548 """ Set up subprocess double instances and behaviour.
1550 :param testcase: The `TestCase` instance to modify.
1551 :param doubles: Collection of `SubprocessDouble` instances.
1552 :return: None.
1554 If `doubles` is ``None``, a default collection will be made
1555 from the return value of `make_fake_subprocess_scenarios`.
1558 if doubles is None:
1559 scenarios = make_fake_subprocess_scenarios()
1560 doubles = get_subprocess_doubles_from_fake_subprocess_scenarios(
1561 scenarios.values())
1563 for double in doubles:
1564 double.register_for_testcase(testcase)
1567 def setup_fake_subprocess_fixtures(testcase):
1568 """ Set up fixtures for fake subprocess doubles.
1570 :param testcase: The `TestCase` instance to modify.
1571 :return: None.
1574 scenarios = make_fake_subprocess_scenarios()
1575 testcase.fake_subprocess_scenarios = scenarios
1577 doubles = get_subprocess_doubles_from_fake_subprocess_scenarios(
1578 scenarios.values())
1579 setup_subprocess_double_behaviour(testcase, doubles)
1582 def patch_os_popen(testcase):
1583 """ Patch `os.popen` behaviour for this test case.
1585 :param testcase: The `TestCase` instance to modify.
1586 :return: None.
1588 When the patched function is called, the registry of
1589 `SubprocessDouble` instances for this test case will be used
1590 to get the instance for the program path specified.
1593 orig_os_popen = os.popen
1595 def fake_os_popen(cmd, mode='r', buffering=-1):
1596 registry = SubprocessDouble.get_registry_for_testcase(testcase)
1597 if isinstance(cmd, basestring):
1598 command_argv = shlex.split(cmd)
1599 else:
1600 command_argv = cmd
1601 if command_argv in registry:
1602 subprocess_double = registry[command_argv]
1603 result = subprocess_double.os_popen_scenario.call_hook(
1604 command_argv, mode, buffering)
1605 else:
1606 result = orig_os_popen(cmd, mode, buffering)
1607 return result
1609 func_patcher = mock.patch.object(
1610 os, "popen", autospec=True,
1611 side_effect=fake_os_popen)
1612 func_patcher.start()
1613 testcase.addCleanup(func_patcher.stop)
1616 def patch_os_waitpid(testcase):
1617 """ Patch `os.waitpid` behaviour for this test case.
1619 :param testcase: The `TestCase` instance to modify.
1620 :return: None.
1622 When the patched function is called, the registry of
1623 `SubprocessDouble` instances for this test case will be used
1624 to get the instance for the program path specified.
1627 orig_os_waitpid = os.waitpid
1629 def fake_os_waitpid(pid, options):
1630 registry = SubprocessDouble.double_by_pid
1631 if pid in registry:
1632 subprocess_double = registry[pid]
1633 result = subprocess_double.os_waitpid_scenario.call_hook(
1634 pid, options)
1635 else:
1636 result = orig_os_waitpid(pid, options)
1637 return result
1639 func_patcher = mock.patch.object(
1640 os, "waitpid", autospec=True,
1641 side_effect=fake_os_waitpid)
1642 func_patcher.start()
1643 testcase.addCleanup(func_patcher.stop)
1646 def patch_os_system(testcase):
1647 """ Patch `os.system` behaviour for this test case.
1649 :param testcase: The `TestCase` instance to modify.
1650 :return: None.
1652 When the patched function is called, the registry of
1653 `SubprocessDouble` instances for this test case will be used
1654 to get the instance for the program path specified.
1657 orig_os_system = os.system
1659 def fake_os_system(command):
1660 registry = SubprocessDouble.get_registry_for_testcase(testcase)
1661 command_argv = shlex.split(command)
1662 if command_argv in registry:
1663 subprocess_double = registry[command_argv]
1664 result = subprocess_double.os_system_scenario.call_hook(
1665 command)
1666 else:
1667 result = orig_os_system(command)
1668 return result
1670 func_patcher = mock.patch.object(
1671 os, "system", autospec=True,
1672 side_effect=fake_os_system)
1673 func_patcher.start()
1674 testcase.addCleanup(func_patcher.stop)
1677 def patch_os_spawnv(testcase):
1678 """ Patch `os.spawnv` behaviour for this test case.
1680 :param testcase: The `TestCase` instance to modify.
1681 :return: None.
1683 When the patched function is called, the registry of
1684 `SubprocessDouble` instances for this test case will be used
1685 to get the instance for the program path specified.
1688 orig_os_spawnv = os.spawnv
1690 def fake_os_spawnv(mode, file, args):
1691 registry = SubprocessDouble.get_registry_for_testcase(testcase)
1692 registry_key = tuple(args)
1693 if registry_key in registry:
1694 subprocess_double = registry[registry_key]
1695 result = subprocess_double.os_spawnv_scenario.call_hook(
1696 mode, file, args)
1697 else:
1698 result = orig_os_spawnv(mode, file, args)
1699 return result
1701 func_patcher = mock.patch.object(
1702 os, "spawnv", autospec=True,
1703 side_effect=fake_os_spawnv)
1704 func_patcher.start()
1705 testcase.addCleanup(func_patcher.stop)
1708 # Copyright © 2015–2016 Ben Finney <bignose@debian.org>
1710 # This is free software: you may copy, modify, and/or distribute this work
1711 # under the terms of the GNU General Public License as published by the
1712 # Free Software Foundation; version 3 of that license or any later version.
1713 # No warranty expressed or implied. See the file ‘LICENSE.GPL-3’ for details.
1716 # Local variables:
1717 # coding: utf-8
1718 # mode: python
1719 # End:
1720 # vim: fileencoding=utf-8 filetype=python :