‘dput.methods.scp’: Convert ‘print’ statements to ‘sys.stdout.write’ calls.
[dput.git] / test / helper.py
blob37095443b04c225aa8dc364355e4d0f3cfbe63ee
1 # -*- coding: utf-8; -*-
3 # test/helper.py
4 # Part of ‘dput’, a Debian package upload toolkit.
6 # Copyright © 2015 Ben Finney <ben+python@benfinney.id.au>
8 # This is free software: you may copy, modify, and/or distribute this work
9 # under the terms of the GNU General Public License as published by the
10 # Free Software Foundation; version 3 of that license or any later version.
11 # No warranty expressed or implied. See the file ‘LICENSE.GPL-3’ for details.
13 """ Helper functionality for Dput test suite. """
15 from __future__ import (absolute_import, unicode_literals)
17 import sys
19 if sys.version_info >= (3, 3):
20 import builtins
21 import unittest
22 import unittest.mock as mock
23 from io import StringIO as StringIO
24 import configparser
25 import collections.abc as collections_abc
26 elif sys.version_info >= (3, 0):
27 raise RuntimeError("Python 3 earlier than 3.3 is not supported.")
28 elif sys.version_info >= (2, 7):
29 # Python 2 standard library.
30 import __builtin__ as builtins
31 # Third-party backport of Python 3 unittest improvements.
32 import unittest2 as unittest
33 # Third-party mock library.
34 import mock
35 # Python 2 standard library.
36 from StringIO import StringIO as BaseStringIO
37 import ConfigParser as configparser
38 import collections as collections_abc
39 else:
40 raise RuntimeError("Python earlier than 2.7 is not supported.")
42 import os
43 import os.path
44 import shutil
45 import tempfile
46 import pwd
47 import errno
48 import time
49 import signal
50 import subprocess
51 import functools
52 import itertools
53 import base64
54 import collections
55 import weakref
56 import shlex
58 __package__ = str("test")
59 __import__(__package__)
61 __metaclass__ = type
63 try:
64 # Python 2 types.
65 basestring
66 unicode
67 except NameError:
68 # Alias for Python 3 types.
69 basestring = str
70 unicode = str
73 def make_unique_slug(testcase):
74 """ Make a unique slug for the test case. """
75 text = base64.b64encode(
76 testcase.getUniqueString().encode('utf-8')
77 ).decode('utf-8')
78 result = text[-30:]
79 return result
82 try:
83 StringIO
84 except NameError:
85 # We don't yet have the StringIO we want. Create it.
87 class StringIO(BaseStringIO, object):
88 """ StringIO with a context manager. """
90 def __enter__(self):
91 return self
93 def __exit__(self, *args):
94 self.close()
95 return False
98 def patch_stdout(testcase):
99 """ Patch `sys.stdout` for the specified test case. """
100 patcher = mock.patch.object(
101 sys, 'stdout', wraps=StringIO())
102 patcher.start()
103 testcase.addCleanup(patcher.stop)
106 def patch_stderr(testcase):
107 """ Patch `sys.stderr` for the specified test case. """
108 patcher = mock.patch.object(
109 sys, 'stderr', wraps=StringIO())
110 patcher.start()
111 testcase.addCleanup(patcher.stop)
114 def patch_signal_signal(testcase):
115 """ Patch `signal.signal` for the specified test case. """
116 func_patcher = mock.patch.object(signal, 'signal')
117 func_patcher.start()
118 testcase.addCleanup(func_patcher.stop)
121 class FakeSystemExit(Exception):
122 """ Fake double for `SystemExit` exception. """
125 EXIT_STATUS_SUCCESS = 0
126 EXIT_STATUS_FAILURE = 1
127 EXIT_STATUS_COMMAND_NOT_FOUND = 127
130 def patch_sys_exit(testcase):
131 """ Patch `sys.exit` for the specified test case. """
132 func_patcher = mock.patch.object(
133 sys, 'exit',
134 side_effect=FakeSystemExit())
135 func_patcher.start()
136 testcase.addCleanup(func_patcher.stop)
139 def patch_sys_argv(testcase):
140 """ Patch the `sys.argv` sequence for the test case. """
141 if not hasattr(testcase, 'progname'):
142 testcase.progname = make_unique_slug(testcase)
143 if not hasattr(testcase, 'sys_argv'):
144 testcase.sys_argv = [testcase.progname]
145 patcher = mock.patch.object(sys, "argv", new=list(testcase.sys_argv))
146 patcher.start()
147 testcase.addCleanup(patcher.stop)
150 def patch_system_interfaces(testcase):
151 """ Patch system interfaces that are disruptive to the test runner. """
152 patch_stdout(testcase)
153 patch_stderr(testcase)
154 patch_sys_exit(testcase)
155 patch_sys_argv(testcase)
158 def patch_time_time(testcase, values=None):
159 """ Patch the `time.time` function for the specified test case.
161 :param testcase: The `TestCase` instance for binding to the patch.
162 :param values: An iterable to provide return values.
163 :return: None.
166 if values is None:
167 values = itertools.count()
169 def generator_fake_time():
170 while True:
171 yield next(values)
173 func_patcher = mock.patch.object(time, "time")
174 func_patcher.start()
175 testcase.addCleanup(func_patcher.stop)
177 time.time.side_effect = generator_fake_time()
180 def patch_os_environ(testcase):
181 """ Patch the `os.environ` mapping. """
182 if not hasattr(testcase, 'os_environ'):
183 testcase.os_environ = {}
184 patcher = mock.patch.object(os, "environ", new=testcase.os_environ)
185 patcher.start()
186 testcase.addCleanup(patcher.stop)
189 def patch_os_getpid(testcase):
190 """ Patch `os.getpid` for the specified test case. """
191 func_patcher = mock.patch.object(os, 'getpid')
192 func_patcher.start()
193 testcase.addCleanup(func_patcher.stop)
196 def patch_os_getuid(testcase):
197 """ Patch the `os.getuid` function. """
198 if not hasattr(testcase, 'os_getuid_return_value'):
199 testcase.os_getuid_return_value = testcase.getUniqueInteger()
200 func_patcher = mock.patch.object(
201 os, "getuid", return_value=testcase.os_getuid_return_value)
202 func_patcher.start()
203 testcase.addCleanup(func_patcher.stop)
206 PasswdEntry = collections.namedtuple(
207 "PasswdEntry",
208 "pw_name pw_passwd pw_uid pw_gid pw_gecos pw_dir pw_shell")
211 def patch_pwd_getpwuid(testcase):
212 """ Patch the `pwd.getpwuid` function. """
213 if not hasattr(testcase, 'pwd_getpwuid_return_value'):
214 testcase.pwd_getpwuid_return_value = PasswdEntry(
215 pw_name=make_unique_slug(testcase),
216 pw_passwd=make_unique_slug(testcase),
217 pw_uid=testcase.getUniqueInteger(),
218 pw_gid=testcase.getUniqueInteger(),
219 pw_gecos=testcase.getUniqueString(),
220 pw_dir=tempfile.mktemp(),
221 pw_shell=tempfile.mktemp())
222 if not isinstance(testcase.pwd_getpwuid_return_value, pwd.struct_passwd):
223 pwent = pwd.struct_passwd(testcase.pwd_getpwuid_return_value)
224 else:
225 pwent = testcase.pwd_getpwuid_return_value
226 func_patcher = mock.patch.object(pwd, "getpwuid", return_value=pwent)
227 func_patcher.start()
228 testcase.addCleanup(func_patcher.stop)
231 def patch_os_path_exists(testcase):
232 """ Patch `os.path.exists` behaviour for this test case.
234 When the patched function is called, the registry of
235 `FileDouble` instances for this test case will be used to get
236 the instance for the path specified.
239 orig_os_path_exists = os.path.exists
241 def fake_os_path_exists(path):
242 registry = FileDouble.get_registry_for_testcase(testcase)
243 if path in registry:
244 file_double = registry[path]
245 result = file_double.os_path_exists_scenario.call_hook()
246 else:
247 result = orig_os_path_exists(path)
248 return result
250 func_patcher = mock.patch.object(
251 os.path, 'exists', side_effect=fake_os_path_exists)
252 func_patcher.start()
253 testcase.addCleanup(func_patcher.stop)
256 def patch_os_access(testcase):
257 """ Patch `os.access` behaviour for this test case.
259 When the patched function is called, the registry of
260 `FileDouble` instances for this test case will be used to get
261 the instance for the path specified.
264 orig_os_access = os.access
266 def fake_os_access(path, mode):
267 registry = FileDouble.get_registry_for_testcase(testcase)
268 if path in registry:
269 file_double = registry[path]
270 result = file_double.os_access_scenario.call_hook(mode)
271 else:
272 result = orig_os_access(path, mode)
273 return result
275 func_patcher = mock.patch.object(
276 os, 'access', side_effect=fake_os_access)
277 func_patcher.start()
278 testcase.addCleanup(func_patcher.stop)
281 StatResult = collections.namedtuple(
282 'StatResult', [
283 'st_mode',
284 'st_ino', 'st_dev', 'st_nlink',
285 'st_uid', 'st_gid',
286 'st_size',
287 'st_atime', 'st_mtime', 'st_ctime',
291 def patch_os_stat(testcase):
292 """ Patch `os.stat` behaviour for this test case.
294 When the patched function is called, the registry of
295 `FileDouble` instances for this test case will be used to get
296 the instance for the path specified.
299 orig_os_stat = os.stat
301 def fake_os_stat(path):
302 registry = FileDouble.get_registry_for_testcase(testcase)
303 if path in registry:
304 file_double = registry[path]
305 result = file_double.os_stat_scenario.call_hook()
306 else:
307 result = orig_os_stat(path)
308 return result
310 func_patcher = mock.patch.object(
311 os, 'stat', side_effect=fake_os_stat)
312 func_patcher.start()
313 testcase.addCleanup(func_patcher.stop)
316 def patch_os_lstat(testcase):
317 """ Patch `os.lstat` behaviour for this test case.
319 When the patched function is called, the registry of
320 `FileDouble` instances for this test case will be used to get
321 the instance for the path specified.
324 orig_os_lstat = os.lstat
326 def fake_os_lstat(path):
327 registry = FileDouble.get_registry_for_testcase(testcase)
328 if path in registry:
329 file_double = registry[path]
330 result = file_double.os_lstat_scenario.call_hook()
331 else:
332 result = orig_os_lstat(path)
333 return result
335 func_patcher = mock.patch.object(
336 os, 'lstat', side_effect=fake_os_lstat)
337 func_patcher.start()
338 testcase.addCleanup(func_patcher.stop)
341 def patch_os_unlink(testcase):
342 """ Patch `os.unlink` behaviour for this test case.
344 When the patched function is called, the registry of
345 `FileDouble` instances for this test case will be used to get
346 the instance for the path specified.
349 orig_os_unlink = os.unlink
351 def fake_os_unlink(path):
352 registry = FileDouble.get_registry_for_testcase(testcase)
353 if path in registry:
354 file_double = registry[path]
355 result = file_double.os_unlink_scenario.call_hook()
356 else:
357 result = orig_os_unlink(path)
358 return result
360 func_patcher = mock.patch.object(
361 os, 'unlink', side_effect=fake_os_unlink)
362 func_patcher.start()
363 testcase.addCleanup(func_patcher.stop)
366 def patch_os_rmdir(testcase):
367 """ Patch `os.rmdir` behaviour for this test case.
369 When the patched function is called, the registry of
370 `FileDouble` instances for this test case will be used to get
371 the instance for the path specified.
374 orig_os_rmdir = os.rmdir
376 def fake_os_rmdir(path):
377 registry = FileDouble.get_registry_for_testcase(testcase)
378 if path in registry:
379 file_double = registry[path]
380 result = file_double.os_rmdir_scenario.call_hook()
381 else:
382 result = orig_os_rmdir(path)
383 return result
385 func_patcher = mock.patch.object(
386 os, 'rmdir', side_effect=fake_os_rmdir)
387 func_patcher.start()
388 testcase.addCleanup(func_patcher.stop)
391 def patch_shutil_rmtree(testcase):
392 """ Patch `shutil.rmtree` behaviour for this test case.
394 When the patched function is called, the registry of
395 `FileDouble` instances for this test case will be used to get
396 the instance for the path specified.
399 orig_shutil_rmtree = os.rmdir
401 def fake_shutil_rmtree(path, ignore_errors=False, onerror=None):
402 registry = FileDouble.get_registry_for_testcase(testcase)
403 if path in registry:
404 file_double = registry[path]
405 result = file_double.shutil_rmtree_scenario.call_hook()
406 else:
407 result = orig_shutil_rmtree(path)
408 return result
410 func_patcher = mock.patch.object(
411 shutil, 'rmtree', side_effect=fake_shutil_rmtree)
412 func_patcher.start()
413 testcase.addCleanup(func_patcher.stop)
416 def patch_tempfile_mkdtemp(testcase):
417 """ Patch the `tempfile.mkdtemp` function for this test case. """
418 if not hasattr(testcase, 'tempfile_mkdtemp_file_double'):
419 testcase.tempfile_mkdtemp_file_double = FileDouble(tempfile.mktemp())
421 double = testcase.tempfile_mkdtemp_file_double
422 double.set_os_unlink_scenario('okay')
423 double.set_os_rmdir_scenario('okay')
424 double.register_for_testcase(testcase)
426 func_patcher = mock.patch.object(tempfile, "mkdtemp")
427 func_patcher.start()
428 testcase.addCleanup(func_patcher.stop)
430 tempfile.mkdtemp.return_value = testcase.tempfile_mkdtemp_file_double.path
433 try:
434 FileNotFoundError
435 FileExistsError
436 PermissionError
437 except NameError:
438 # Python 2 uses IOError.
439 def _ensure_ioerror_args(init_args, init_kwargs, errno_value):
440 result_kwargs = init_kwargs
441 result_errno = errno_value
442 result_strerror = os.strerror(errno_value)
443 result_filename = None
444 if len(init_args) >= 3:
445 result_errno = init_args[0]
446 result_filename = init_args[2]
447 if 'errno' in init_kwargs:
448 result_errno = init_kwargs['errno']
449 del result_kwargs['errno']
450 if 'filename' in init_kwargs:
451 result_filename = init_kwargs['filename']
452 del result_kwargs['filename']
453 if len(init_args) >= 2:
454 result_strerror = init_args[1]
455 if 'strerror' in init_kwargs:
456 result_strerror = init_kwargs['strerror']
457 del result_kwargs['strerror']
458 result_args = (result_errno, result_strerror, result_filename)
459 return (result_args, result_kwargs)
461 class FileNotFoundError(IOError):
462 def __init__(self, *args, **kwargs):
463 (args, kwargs) = _ensure_ioerror_args(
464 args, kwargs, errno_value=errno.ENOENT)
465 super(FileNotFoundError, self).__init__(*args, **kwargs)
467 class FileExistsError(IOError):
468 def __init__(self, *args, **kwargs):
469 (args, kwargs) = _ensure_ioerror_args(
470 args, kwargs, errno_value=errno.EEXIST)
471 super(FileExistsError, self).__init__(*args, **kwargs)
473 class PermissionError(IOError):
474 def __init__(self, *args, **kwargs):
475 (args, kwargs) = _ensure_ioerror_args(
476 args, kwargs, errno_value=errno.EPERM)
477 super(PermissionError, self).__init__(*args, **kwargs)
480 def make_fake_file_scenarios(path=None):
481 """ Make a collection of scenarios for testing with fake files.
483 :path: The filesystem path of the fake file. If not specified,
484 a valid random path will be generated.
485 :return: A collection of scenarios for tests involving input files.
487 The collection is a mapping from scenario name to a dictionary of
488 scenario attributes.
492 if path is None:
493 file_path = tempfile.mktemp()
494 else:
495 file_path = path
497 fake_file_empty = StringIO()
498 fake_file_minimal = StringIO("Lorem ipsum.")
499 fake_file_large = StringIO("\n".join(
500 "ABCDEFGH" * 100
501 for __ in range(1000)))
503 default_scenario_params = {
504 'open_scenario_name': 'okay',
505 'file_double_params': dict(
506 path=file_path, fake_file=fake_file_minimal),
509 scenarios = {
510 'default': {},
511 'error-not-exist': {
512 'open_scenario_name': 'nonexist',
514 'error-exist': {
515 'open_scenario_name': 'exist_error',
517 'error-read-denied': {
518 'open_scenario_name': 'read_denied',
520 'not-found': {
521 'file_double_params': dict(
522 path=file_path, fake_file=fake_file_empty),
524 'exist-empty': {
525 'file_double_params': dict(
526 path=file_path, fake_file=fake_file_empty),
528 'exist-minimal': {
529 'file_double_params': dict(
530 path=file_path, fake_file=fake_file_minimal),
532 'exist-large': {
533 'file_double_params': dict(
534 path=file_path, fake_file=fake_file_large),
538 for (name, scenario) in scenarios.items():
539 params = default_scenario_params.copy()
540 params.update(scenario)
541 scenario.update(params)
542 scenario['file_double'] = FileDouble(**scenario['file_double_params'])
543 scenario['file_double'].set_open_scenario(params['open_scenario_name'])
544 scenario['fake_file_scenario_name'] = name
546 return scenarios
549 def get_file_doubles_from_fake_file_scenarios(scenarios):
550 """ Get the `FileDouble` instances from fake file scenarios.
552 :param scenarios: Collection of fake file scenarios.
553 :return: Collection of `FileDouble` instances.
556 doubles = set(
557 scenario['file_double']
558 for scenario in scenarios
559 if scenario['file_double'] is not None)
561 return doubles
564 def setup_file_double_behaviour(testcase, doubles=None):
565 """ Set up file double instances and behaviour.
567 :param testcase: The `TestCase` instance to modify.
568 :param doubles: Collection of `FileDouble` instances.
569 :return: None.
571 If `doubles` is ``None``, a default collection will be made
572 from the result of `make_fake_file_scenarios` result.
575 if doubles is None:
576 scenarios = make_fake_file_scenarios()
577 doubles = get_file_doubles_from_fake_file_scenarios(
578 scenarios.values())
580 for file_double in doubles:
581 file_double.register_for_testcase(testcase)
583 orig_open = builtins.open
585 def fake_open(path, mode='rt', buffering=-1):
586 registry = FileDouble.get_registry_for_testcase(testcase)
587 if path in registry:
588 file_double = registry[path]
589 result = file_double.builtins_open_scenario.call_hook(
590 mode, buffering)
591 else:
592 result = orig_open(path, mode, buffering)
593 return result
595 mock_open = mock.mock_open()
596 mock_open.side_effect = fake_open
598 func_patcher = mock.patch.object(
599 builtins, "open",
600 new=mock_open)
601 func_patcher.start()
602 testcase.addCleanup(func_patcher.stop)
605 def setup_fake_file_fixtures(testcase):
606 """ Set up fixtures for fake file doubles.
608 :param testcase: The `TestCase` instance to modify.
609 :return: None.
612 scenarios = make_fake_file_scenarios()
613 testcase.fake_file_scenarios = scenarios
615 file_doubles = get_file_doubles_from_fake_file_scenarios(
616 scenarios.values())
617 setup_file_double_behaviour(testcase, file_doubles)
620 def set_fake_file_scenario(testcase, name):
621 """ Set the named fake file scenario for the test case. """
622 scenario = testcase.fake_file_scenarios[name]
623 testcase.fake_file_scenario = scenario
624 testcase.file_double = scenario['file_double']
625 testcase.file_double.register_for_testcase(testcase)
628 class TestDoubleFunctionScenario:
629 """ Scenario for fake behaviour of a specific function. """
631 def __init__(self, scenario_name, double):
632 self.scenario_name = scenario_name
633 self.double = double
635 self.call_hook = getattr(
636 self, "_hook_{name}".format(name=self.scenario_name))
638 def __repr__(self):
639 text = (
640 "<{class_name} instance: {id}"
641 " name: {name!r},"
642 " call_hook name: {hook_name!r}"
643 " double: {double!r}"
644 ">").format(
645 class_name=self.__class__.__name__, id=id(self),
646 name=self.scenario_name, double=self.double,
647 hook_name=self.call_hook.__name__)
648 return text
650 def __eq__(self, other):
651 result = True
652 if not self.scenario_name == other.scenario_name:
653 result = False
654 if not self.double == other.double:
655 result = False
656 if not self.call_hook.__name__ == other.call_hook.__name__:
657 result = False
658 return result
660 def __ne__(self, other):
661 result = not self.__eq__(other)
662 return result
665 class os_path_exists_scenario(TestDoubleFunctionScenario):
666 """ Scenario for `os.path.exists` behaviour. """
668 def _hook_exist(self):
669 return True
671 def _hook_not_exist(self):
672 return False
675 class os_access_scenario(TestDoubleFunctionScenario):
676 """ Scenario for `os.access` behaviour. """
678 def _hook_okay(self, mode):
679 return True
681 def _hook_not_exist(self, mode):
682 return False
684 def _hook_read_only(self, mode):
685 if mode & (os.W_OK | os.X_OK):
686 result = False
687 else:
688 result = True
689 return result
691 def _hook_denied(self, mode):
692 if mode & (os.R_OK | os.W_OK | os.X_OK):
693 result = False
694 else:
695 result = True
696 return result
699 class os_stat_scenario(TestDoubleFunctionScenario):
700 """ Scenario for `os.stat` behaviour. """
702 def _hook_okay(self):
703 return self.double.stat_result
705 def _hook_notfound_error(self):
706 raise FileNotFoundError(
707 self.double.path,
708 "No such file or directory: {path!r}".format(
709 path=self.double.path))
711 def _hook_denied_error(self):
712 raise PermissionError(
713 self.double.path,
714 "Permission denied")
717 class os_lstat_scenario(os_stat_scenario):
718 """ Scenario for `os.lstat` behaviour. """
721 class os_unlink_scenario(TestDoubleFunctionScenario):
722 """ Scenario for `os.unlink` behaviour. """
724 def _hook_okay(self):
725 return None
727 def _hook_nonexist(self):
728 error = FileNotFoundError(
729 self.double.path,
730 "No such file or directory: {path!r}".format(
731 path=self.double.path))
732 raise error
734 def _hook_denied(self):
735 error = PermissionError(
736 self.double.path,
737 "Permission denied")
738 raise error
741 class os_rmdir_scenario(TestDoubleFunctionScenario):
742 """ Scenario for `os.rmdir` behaviour. """
744 def _hook_okay(self):
745 return None
747 def _hook_nonexist(self):
748 error = FileNotFoundError(
749 self.double.path,
750 "No such file or directory: {path!r}".format(
751 path=self.double.path))
752 raise error
754 def _hook_denied(self):
755 error = PermissionError(
756 self.double.path,
757 "Permission denied")
758 raise error
761 class shutil_rmtree_scenario(TestDoubleFunctionScenario):
762 """ Scenario for `shutil.rmtree` behaviour. """
764 def _hook_okay(self):
765 return None
767 def _hook_nonexist(self):
768 error = FileNotFoundError(
769 self.double.path,
770 "No such file or directory: {path!r}".format(
771 path=self.double.path))
772 raise error
774 def _hook_denied(self):
775 error = PermissionError(
776 self.double.path,
777 "Permission denied")
778 raise error
781 class builtins_open_scenario(TestDoubleFunctionScenario):
782 """ Scenario for `builtins.open` behaviour. """
784 def _hook_okay(self, mode, buffering):
785 result = self.double.fake_file
786 return result
788 def _hook_nonexist(self, mode, buffering):
789 if mode.startswith('r'):
790 error = FileNotFoundError(
791 self.double.path,
792 "No such file or directory: {path!r}".format(
793 path=self.double.path))
794 raise error
795 result = self.double.fake_file
796 return result
798 def _hook_exist_error(self, mode, buffering):
799 if mode.startswith('w') or mode.startswith('a'):
800 error = FileExistsError(
801 self.double.path,
802 "File already exists: {path!r}".format(
803 path=self.double.path))
804 raise error
805 result = self.double.fake_file
806 return result
808 def _hook_read_denied(self, mode, buffering):
809 if mode.startswith('r'):
810 error = PermissionError(
811 self.double.path,
812 "Read denied on {path!r}".format(
813 path=self.double.path))
814 raise error
815 result = self.double.fake_file
816 return result
818 def _hook_write_denied(self, mode, buffering):
819 if mode.startswith('w') or mode.startswith('a'):
820 error = PermissionError(
821 self.double.path,
822 "Write denied on {path!r}".format(
823 path=self.double.path))
824 raise error
825 result = self.double.fake_file
826 return result
829 class TestDoubleWithRegistry:
830 """ Abstract base class for a test double with a test case registry. """
832 registry_class = NotImplemented
833 registries = NotImplemented
835 function_scenario_params_by_class = NotImplemented
837 def __new__(cls, *args, **kwargs):
838 superclass = super(TestDoubleWithRegistry, cls)
839 if superclass.__new__ is object.__new__:
840 # The ‘object’ implementation complains about extra arguments.
841 instance = superclass.__new__(cls)
842 else:
843 instance = superclass.__new__(cls, *args, **kwargs)
844 instance.make_set_scenario_methods()
846 return instance
848 def __init__(self, *args, **kwargs):
849 super(TestDoubleWithRegistry, self).__init__(*args, **kwargs)
850 self._set_method_per_scenario()
852 def _make_set_scenario_method(self, scenario_class, params):
853 def method(self, name):
854 scenario = scenario_class(name, double=self)
855 setattr(self, scenario_class.__name__, scenario)
856 method.__doc__ = (
857 """ Set the scenario for `{name}` behaviour. """
858 ).format(name=scenario_class.__name__)
859 method.__name__ = str(params['set_scenario_method_name'])
860 return method
862 def make_set_scenario_methods(self):
863 """ Make `set_<scenario_class_name>` methods on this class. """
864 for (function_scenario_class, function_scenario_params) in (
865 self.function_scenario_params_by_class.items()):
866 method = self._make_set_scenario_method(
867 function_scenario_class, function_scenario_params)
868 setattr(self.__class__, method.__name__, method)
869 function_scenario_params['set_scenario_method'] = method
871 def _set_method_per_scenario(self):
872 """ Set the method to be called for each scenario. """
873 for function_scenario_params in (
874 self.function_scenario_params_by_class.values()):
875 function_scenario_params['set_scenario_method'](
876 self, function_scenario_params['default_scenario_name'])
878 @classmethod
879 def get_registry_for_testcase(cls, testcase):
880 """ Get the FileDouble registry for the specified test case. """
881 # Key in a dict must be hashable.
882 key = (testcase.__class__, id(testcase))
883 registry = cls.registries.setdefault(key, cls.registry_class())
884 return registry
886 def get_registry_key(self):
887 """ Get the registry key for this double. """
888 raise NotImplementedError
890 def register_for_testcase(self, testcase):
891 """ Add this instance to registry for the specified testcase. """
892 registry = self.get_registry_for_testcase(testcase)
893 key = self.get_registry_key()
894 registry[key] = self
895 unregister_func = functools.partial(
896 self.unregister_for_testcase, testcase)
897 testcase.addCleanup(unregister_func)
899 def unregister_for_testcase(self, testcase):
900 """ Remove this instance from registry for the specified testcase. """
901 registry = self.get_registry_for_testcase(testcase)
902 key = self.get_registry_key()
903 if key in registry:
904 registry.pop(key)
907 def copy_fake_file(fake_file):
908 """ Make a copy of the StringIO instance. """
909 fake_file_type = StringIO
910 content = ""
911 if fake_file is not None:
912 fake_file_type = type(fake_file)
913 content = fake_file.getvalue()
914 assert issubclass(fake_file_type, object)
915 result = fake_file_type(content)
916 if hasattr(fake_file, 'encoding'):
917 if not hasattr(result, 'encoding'):
918 result.encoding = fake_file.encoding
919 return result
922 class FileDouble(TestDoubleWithRegistry):
923 """ A testing double for a file. """
925 registry_class = dict
926 registries = {}
928 function_scenario_params_by_class = {
929 os_path_exists_scenario: {
930 'default_scenario_name': 'not_exist',
931 'set_scenario_method_name': 'set_os_path_exists_scenario',
933 os_access_scenario: {
934 'default_scenario_name': 'okay',
935 'set_scenario_method_name': 'set_os_access_scenario',
937 os_stat_scenario: {
938 'default_scenario_name': 'okay',
939 'set_scenario_method_name': 'set_os_stat_scenario',
941 os_lstat_scenario: {
942 'default_scenario_name': 'okay',
943 'set_scenario_method_name': 'set_os_lstat_scenario',
945 builtins_open_scenario: {
946 'default_scenario_name': 'okay',
947 'set_scenario_method_name': 'set_open_scenario',
949 os_unlink_scenario: {
950 'default_scenario_name': 'okay',
951 'set_scenario_method_name': 'set_os_unlink_scenario',
953 os_rmdir_scenario: {
954 'default_scenario_name': 'okay',
955 'set_scenario_method_name': 'set_os_rmdir_scenario',
957 shutil_rmtree_scenario: {
958 'default_scenario_name': 'okay',
959 'set_scenario_method_name': 'set_shutil_rmtree_scenario',
963 def __init__(self, path=None, fake_file=None, *args, **kwargs):
964 self.path = path
965 self.fake_file = copy_fake_file(fake_file)
966 self.fake_file.name = path
968 self._set_stat_result()
970 super(FileDouble, self).__init__(*args, **kwargs)
972 def _set_stat_result(self):
973 """ Set the `os.stat` result for this file. """
974 size = len(self.fake_file.getvalue())
975 self.stat_result = StatResult(
976 st_mode=0,
977 st_ino=None, st_dev=None, st_nlink=None,
978 st_uid=0, st_gid=0,
979 st_size=size,
980 st_atime=None, st_mtime=None, st_ctime=None,
983 def __repr__(self):
984 text = "FileDouble(path={path!r}, fake_file={fake_file!r})".format(
985 path=self.path, fake_file=self.fake_file)
986 return text
988 def get_registry_key(self):
989 """ Get the registry key for this double. """
990 result = self.path
991 return result
994 class os_popen_scenario(TestDoubleFunctionScenario):
995 """ Scenario for `os.popen` behaviour. """
997 stream_name_by_mode = {
998 'w': 'stdin',
999 'r': 'stdout',
1002 def _hook_success(self, argv, mode, buffering):
1003 stream_name = self.stream_name_by_mode[mode]
1004 stream_double = getattr(
1005 self.double, stream_name + '_double')
1006 result = stream_double.fake_file
1007 return result
1009 def _hook_failure(self, argv, mode, buffering):
1010 result = StringIO()
1011 return result
1013 def _hook_not_found(self, argv, mode, buffering):
1014 result = StringIO()
1015 return result
1018 class os_waitpid_scenario(TestDoubleFunctionScenario):
1019 """ Scenario for `os.waitpid` behaviour. """
1021 def _hook_success(self, pid, options):
1022 result = (pid, EXIT_STATUS_SUCCESS)
1023 return result
1025 def _hook_failure(self, pid, options):
1026 result = (pid, EXIT_STATUS_FAILURE)
1027 return result
1029 def _hook_not_found(self, pid, options):
1030 error = OSError(errno.ECHILD)
1031 raise error
1034 class os_system_scenario(TestDoubleFunctionScenario):
1035 """ Scenario for `os.system` behaviour. """
1037 def _hook_success(self, command):
1038 result = EXIT_STATUS_SUCCESS
1039 return result
1041 def _hook_failure(self, command):
1042 result = EXIT_STATUS_FAILURE
1043 return result
1045 def _hook_not_found(self, command):
1046 result = EXIT_STATUS_COMMAND_NOT_FOUND
1047 return result
1050 class os_spawnv_scenario(TestDoubleFunctionScenario):
1051 """ Scenario for `os.spawnv` behaviour. """
1053 def _hook_success(self, mode, file, args):
1054 result = EXIT_STATUS_SUCCESS
1055 return result
1057 def _hook_failure(self, mode, file, args):
1058 result = EXIT_STATUS_FAILURE
1059 return result
1061 def _hook_not_found(self, mode, file, args):
1062 result = EXIT_STATUS_COMMAND_NOT_FOUND
1063 return result
1066 ARG_ANY = object()
1067 ARG_MORE = object()
1070 class PopenDouble:
1071 """ A testing double for `subprocess.Popen`. """
1073 def __init__(self, args, *posargs, **kwargs):
1074 self.stdin = None
1075 self.stdout = None
1076 self.stderr = None
1077 self.pid = None
1078 self.returncode = None
1080 if kwargs.get('shell', False):
1081 self.argv = shlex.split(args)
1082 else:
1083 # The paramter is already a sequence of command-line arguments.
1084 self.argv = args
1086 def set_streams(self, subprocess_double, popen_kwargs):
1087 """ Set the streams on the `PopenDouble`.
1089 :param subprocess_double: The `SubprocessDouble` from
1090 which to get existing stream doubles.
1091 :param popen_kwargs: The keyword arguments to the
1092 `subprocess.Popen` call.
1093 :return: ``None``.
1096 for stream_name in (
1097 name for name in ['stdin', 'stdout', 'stderr']
1098 if name in popen_kwargs):
1099 stream_spec = popen_kwargs[stream_name]
1100 if stream_spec is subprocess.PIPE:
1101 stream_double = getattr(
1102 subprocess_double,
1103 "{name}_double".format(name=stream_name))
1104 stream_file = stream_double.fake_file
1105 elif stream_spec is subprocess.STDOUT:
1106 stream_file = subprocess_double.stdout_double.fake_file
1107 else:
1108 stream_file = stream_spec
1109 setattr(self, stream_name, stream_file)
1111 def wait(self):
1112 """ Wait for subprocess to terminate. """
1113 return self.returncode
1116 class subprocess_popen_scenario(TestDoubleFunctionScenario):
1117 """ Scenario for `subprocess.Popen` behaviour. """
1119 def _hook_success(self, testcase, args, *posargs, **kwargs):
1120 double = self.double.popen_double
1121 double.set_streams(self.double, kwargs)
1122 return double
1125 def patch_subprocess_popen(testcase):
1126 """ Patch `subprocess.Popen` constructor for this test case.
1128 :param testcase: The `TestCase` instance to modify.
1129 :return: None.
1131 When the patched function is called, the registry of
1132 `SubprocessDouble` instances for this test case will be used
1133 to get the instance for the program path specified.
1136 orig_subprocess_popen = subprocess.Popen
1138 def fake_subprocess_popen(args, *posargs, **kwargs):
1139 if kwargs.get('shell', False):
1140 argv = shlex.split(args)
1141 else:
1142 argv = args
1143 registry = SubprocessDouble.get_registry_for_testcase(testcase)
1144 if argv in registry:
1145 subprocess_double = registry[argv]
1146 result = subprocess_double.subprocess_popen_scenario.call_hook(
1147 testcase, args, *posargs, **kwargs)
1148 else:
1149 result = orig_subprocess_popen(args, *posargs, **kwargs)
1150 return result
1152 func_patcher = mock.patch.object(
1153 subprocess, "Popen", side_effect=fake_subprocess_popen)
1154 func_patcher.start()
1155 testcase.addCleanup(func_patcher.stop)
1158 class subprocess_call_scenario(TestDoubleFunctionScenario):
1159 """ Scenario for `subprocess.call` behaviour. """
1161 def _hook_success(self, command):
1162 result = EXIT_STATUS_SUCCESS
1163 return result
1165 def _hook_failure(self, command):
1166 result = EXIT_STATUS_FAILURE
1167 return result
1169 def _hook_not_found(self, command):
1170 result = EXIT_STATUS_COMMAND_NOT_FOUND
1171 return result
1174 def patch_subprocess_call(testcase):
1175 """ Patch `subprocess.call` function for this test case.
1177 :param testcase: The `TestCase` instance to modify.
1178 :return: None.
1180 When the patched function is called, the registry of
1181 `SubprocessDouble` instances for this test case will be used
1182 to get the instance for the program path specified.
1185 orig_subprocess_call = subprocess.call
1187 def fake_subprocess_call(command, *posargs, **kwargs):
1188 if kwargs.get('shell', False):
1189 command_argv = shlex.split(command)
1190 else:
1191 command_argv = command
1192 registry = SubprocessDouble.get_registry_for_testcase(testcase)
1193 if command_argv in registry:
1194 subprocess_double = registry[command_argv]
1195 result = subprocess_double.subprocess_call_scenario.call_hook(
1196 command)
1197 else:
1198 result = orig_subprocess_call(command, *posargs, **kwargs)
1199 return result
1201 func_patcher = mock.patch.object(
1202 subprocess, "call", side_effect=fake_subprocess_call)
1203 func_patcher.start()
1204 testcase.addCleanup(func_patcher.stop)
1207 class subprocess_check_call_scenario(TestDoubleFunctionScenario):
1208 """ Scenario for `subprocess.check_call` behaviour. """
1210 def _hook_success(self, command):
1211 return None
1213 def _hook_failure(self, command):
1214 result = EXIT_STATUS_FAILURE
1215 error = subprocess.CalledProcessError(result, command)
1216 raise error
1218 def _hook_not_found(self, command):
1219 result = EXIT_STATUS_COMMAND_NOT_FOUND
1220 error = subprocess.CalledProcessError(result, command)
1221 raise error
1224 def patch_subprocess_check_call(testcase):
1225 """ Patch `subprocess.check_call` function for this test case.
1227 :param testcase: The `TestCase` instance to modify.
1228 :return: None.
1230 When the patched function is called, the registry of
1231 `SubprocessDouble` instances for this test case will be used
1232 to get the instance for the program path specified.
1235 orig_subprocess_check_call = subprocess.check_call
1237 def fake_subprocess_check_call(command, *posargs, **kwargs):
1238 if kwargs.get('shell', False):
1239 command_argv = shlex.split(command)
1240 else:
1241 command_argv = command
1242 registry = SubprocessDouble.get_registry_for_testcase(testcase)
1243 if command_argv in registry:
1244 subprocess_double = registry[command_argv]
1245 scenario = subprocess_double.subprocess_check_call_scenario
1246 result = scenario.call_hook(command)
1247 else:
1248 result = orig_subprocess_check_call(command, *posargs, **kwargs)
1249 return result
1251 func_patcher = mock.patch.object(
1252 subprocess, "check_call", side_effect=fake_subprocess_check_call)
1253 func_patcher.start()
1254 testcase.addCleanup(func_patcher.stop)
1257 class SubprocessDoubleRegistry(collections_abc.MutableMapping):
1258 """ Registry of `SubprocessDouble` instances by `argv`. """
1260 def __init__(self, *args, **kwargs):
1261 items = []
1262 if args:
1263 if isinstance(args[0], collections_abc.Mapping):
1264 items = args[0].items()
1265 if isinstance(args[0], collections_abc.Iterable):
1266 items = args[0]
1267 self._mapping = dict(items)
1269 def __repr__(self):
1270 text = "<{class_name} object: {mapping}>".format(
1271 class_name=self.__class__.__name__, mapping=self._mapping)
1272 return text
1274 def _match_argv(self, argv):
1275 """ Match the specified `argv` with our registered keys. """
1276 match = None
1277 if not isinstance(argv, collections_abc.Sequence):
1278 return match
1279 candidates = iter(self._mapping)
1280 while match is None:
1281 try:
1282 candidate = next(candidates)
1283 except StopIteration:
1284 break
1285 found = None
1286 if candidate == argv:
1287 # An exact match.
1288 found = True
1289 word_iter = enumerate(candidate)
1290 while found is None:
1291 try:
1292 (word_index, candidate_word) = next(word_iter)
1293 except StopIteration:
1294 break
1295 if candidate_word is ARG_MORE:
1296 # Candiate matches any remaining words. We have a match.
1297 found = True
1298 elif word_index > len(argv):
1299 # Candidate is too long for the specified argv.
1300 found = False
1301 elif candidate_word is ARG_ANY:
1302 # Candidate matches any word at this position.
1303 continue
1304 elif candidate_word == argv[word_index]:
1305 # Candidate matches the word at this position.
1306 continue
1307 else:
1308 # This candidate does not match.
1309 found = False
1310 if found is None:
1311 # Reached the end of the candidate without a mismatch.
1312 found = True
1313 if found:
1314 match = candidate
1315 return match
1317 def __getitem__(self, key):
1318 match = self._match_argv(key)
1319 if match is None:
1320 raise KeyError(key)
1321 result = self._mapping[match]
1322 return result
1324 def __setitem__(self, key, value):
1325 if key in self:
1326 del self[key]
1327 self._mapping[key] = value
1329 def __delitem__(self, key):
1330 match = self._match_argv(key)
1331 if match is not None:
1332 del self._mapping[match]
1334 def __iter__(self):
1335 return self._mapping.__iter__()
1337 def __len__(self):
1338 return self._mapping.__len__()
1341 class SubprocessDouble(TestDoubleWithRegistry):
1342 """ A testing double for a subprocess. """
1344 registry_class = SubprocessDoubleRegistry
1345 registries = {}
1347 double_by_pid = weakref.WeakValueDictionary()
1349 function_scenario_params_by_class = {
1350 subprocess_popen_scenario: {
1351 'default_scenario_name': 'success',
1352 'set_scenario_method_name': 'set_subprocess_popen_scenario',
1354 subprocess_call_scenario: {
1355 'default_scenario_name': 'success',
1356 'set_scenario_method_name': 'set_subprocess_call_scenario',
1358 subprocess_check_call_scenario: {
1359 'default_scenario_name': 'success',
1360 'set_scenario_method_name':
1361 'set_subprocess_check_call_scenario',
1363 os_popen_scenario: {
1364 'default_scenario_name': 'success',
1365 'set_scenario_method_name': 'set_os_popen_scenario',
1367 os_waitpid_scenario: {
1368 'default_scenario_name': 'success',
1369 'set_scenario_method_name': 'set_os_waitpid_scenario',
1371 os_system_scenario: {
1372 'default_scenario_name': 'success',
1373 'set_scenario_method_name': 'set_os_system_scenario',
1375 os_spawnv_scenario: {
1376 'default_scenario_name': 'success',
1377 'set_scenario_method_name': 'set_os_spawnv_scenario',
1381 def __init__(self, path=None, argv=None, *args, **kwargs):
1382 if path is None:
1383 path = tempfile.mktemp()
1384 self.path = path
1386 if argv is None:
1387 command_name = os.path.basename(path)
1388 argv = [command_name]
1389 self.argv = argv
1391 self.pid = self._make_pid()
1392 self._register_by_pid()
1394 self.set_popen_double()
1396 self.stdin_double = FileDouble()
1397 self.stdout_double = FileDouble()
1398 self.stderr_double = FileDouble()
1400 super(SubprocessDouble, self).__init__(*args, **kwargs)
1402 def set_popen_double(self):
1403 """ Set the `PopenDouble` for this instance. """
1404 double = PopenDouble(self.argv)
1405 double.pid = self.pid
1407 self.popen_double = double
1409 def __repr__(self):
1410 text = (
1411 "<SubprocessDouble instance: {id}"
1412 " path: {path!r},"
1413 " argv: {argv!r}"
1414 " stdin_double: {stdin_double!r}"
1415 " stdout_double: {stdout_double!r}"
1416 " stderr_double: {stderr_double!r}"
1417 ">").format(
1418 id=id(self),
1419 path=self.path, argv=self.argv,
1420 stdin_double=self.stdin_double,
1421 stdout_double=self.stdout_double,
1422 stderr_double=self.stderr_double)
1423 return text
1425 @classmethod
1426 def _make_pid(cls):
1427 """ Make a unique PID for a subprocess. """
1428 for pid in itertools.count(1):
1429 yield pid
1431 def _register_by_pid(self):
1432 """ Register this subprocess by its PID. """
1433 self.__class__.double_by_pid[self.pid] = self
1435 def get_registry_key(self):
1436 """ Get the registry key for this double. """
1437 result = tuple(self.argv)
1438 return result
1440 def set_stdin_content(self, text):
1441 """ Set the content of the `stdin` stream for this double. """
1442 self.stdin_double.fake_file = StringIO(text)
1444 def set_stdout_content(self, text):
1445 """ Set the content of the `stdout` stream for this double. """
1446 self.stdout_double.fake_file = StringIO(text)
1448 def set_stderr_content(self, text):
1449 """ Set the content of the `stderr` stream for this double. """
1450 self.stderr_double.fake_file = StringIO(text)
1453 def make_fake_subprocess_scenarios(path=None):
1454 """ Make a collection of scenarios for testing with fake files.
1456 :path: The filesystem path of the fake program. If not specified,
1457 a valid random path will be generated.
1458 :return: A collection of scenarios for tests involving subprocesses.
1460 The collection is a mapping from scenario name to a dictionary of
1461 scenario attributes.
1464 if path is None:
1465 file_path = tempfile.mktemp()
1466 else:
1467 file_path = path
1469 default_scenario_params = {
1470 'return_value': EXIT_STATUS_SUCCESS,
1471 'program_path': file_path,
1472 'argv_after_command_name': [],
1475 scenarios = {
1476 'default': {},
1477 'not-found': {
1478 'return_value': EXIT_STATUS_COMMAND_NOT_FOUND,
1482 for (name, scenario) in scenarios.items():
1483 params = default_scenario_params.copy()
1484 params.update(scenario)
1485 scenario.update(params)
1486 program_path = params['program_path']
1487 program_name = os.path.basename(params['program_path'])
1488 argv = [program_name]
1489 argv.extend(params['argv_after_command_name'])
1490 subprocess_double_params = dict(
1491 path=program_path,
1492 argv=argv,
1494 subprocess_double = SubprocessDouble(**subprocess_double_params)
1495 scenario['subprocess_double'] = subprocess_double
1496 scenario['fake_file_scenario_name'] = name
1498 return scenarios
1501 def get_subprocess_doubles_from_fake_subprocess_scenarios(scenarios):
1502 """ Get the `SubprocessDouble` instances from fake subprocess scenarios.
1504 :param scenarios: Collection of fake subprocess scenarios.
1505 :return: Collection of `SubprocessDouble` instances.
1508 doubles = set(
1509 scenario['subprocess_double']
1510 for scenario in scenarios
1511 if scenario['subprocess_double'] is not None)
1513 return doubles
1516 def setup_subprocess_double_behaviour(testcase, doubles=None):
1517 """ Set up subprocess double instances and behaviour.
1519 :param testcase: The `TestCase` instance to modify.
1520 :param doubles: Collection of `SubprocessDouble` instances.
1521 :return: None.
1523 If `doubles` is ``None``, a default collection will be made
1524 from the return value of `make_fake_subprocess_scenarios`.
1527 if doubles is None:
1528 scenarios = make_fake_subprocess_scenarios()
1529 doubles = get_subprocess_doubles_from_fake_subprocess_scenarios(
1530 scenarios.values())
1532 for double in doubles:
1533 double.register_for_testcase(testcase)
1536 def setup_fake_subprocess_fixtures(testcase):
1537 """ Set up fixtures for fake subprocess doubles.
1539 :param testcase: The `TestCase` instance to modify.
1540 :return: None.
1543 scenarios = make_fake_subprocess_scenarios()
1544 testcase.fake_subprocess_scenarios = scenarios
1546 doubles = get_subprocess_doubles_from_fake_subprocess_scenarios(
1547 scenarios.values())
1548 setup_subprocess_double_behaviour(testcase, doubles)
1551 def patch_os_popen(testcase):
1552 """ Patch `os.popen` behaviour for this test case.
1554 :param testcase: The `TestCase` instance to modify.
1555 :return: None.
1557 When the patched function is called, the registry of
1558 `SubprocessDouble` instances for this test case will be used
1559 to get the instance for the program path specified.
1562 orig_os_popen = os.popen
1564 def fake_os_popen(cmd, mode='r', buffering=-1):
1565 registry = SubprocessDouble.get_registry_for_testcase(testcase)
1566 if isinstance(cmd, basestring):
1567 command_argv = shlex.split(cmd)
1568 else:
1569 command_argv = cmd
1570 if command_argv in registry:
1571 subprocess_double = registry[command_argv]
1572 result = subprocess_double.os_popen_scenario.call_hook(
1573 command_argv, mode, buffering)
1574 else:
1575 result = orig_os_popen(cmd, mode, buffering)
1576 return result
1578 func_patcher = mock.patch.object(
1579 os, "popen", side_effect=fake_os_popen)
1580 func_patcher.start()
1581 testcase.addCleanup(func_patcher.stop)
1584 def patch_os_waitpid(testcase):
1585 """ Patch `os.waitpid` behaviour for this test case.
1587 :param testcase: The `TestCase` instance to modify.
1588 :return: None.
1590 When the patched function is called, the registry of
1591 `SubprocessDouble` instances for this test case will be used
1592 to get the instance for the program path specified.
1595 orig_os_waitpid = os.waitpid
1597 def fake_os_waitpid(pid, options):
1598 registry = SubprocessDouble.double_by_pid
1599 if pid in registry:
1600 subprocess_double = registry[pid]
1601 result = subprocess_double.os_waitpid_scenario.call_hook(
1602 pid, options)
1603 else:
1604 result = orig_os_waitpid(pid, options)
1605 return result
1607 func_patcher = mock.patch.object(
1608 os, "waitpid", side_effect=fake_os_waitpid)
1609 func_patcher.start()
1610 testcase.addCleanup(func_patcher.stop)
1613 def patch_os_system(testcase):
1614 """ Patch `os.system` behaviour for this test case.
1616 :param testcase: The `TestCase` instance to modify.
1617 :return: None.
1619 When the patched function is called, the registry of
1620 `SubprocessDouble` instances for this test case will be used
1621 to get the instance for the program path specified.
1624 orig_os_system = os.system
1626 def fake_os_system(command):
1627 registry = SubprocessDouble.get_registry_for_testcase(testcase)
1628 command_argv = shlex.split(command)
1629 if command_argv in registry:
1630 subprocess_double = registry[command_argv]
1631 result = subprocess_double.os_system_scenario.call_hook(
1632 command)
1633 else:
1634 result = orig_os_system(command)
1635 return result
1637 func_patcher = mock.patch.object(
1638 os, "system", side_effect=fake_os_system)
1639 func_patcher.start()
1640 testcase.addCleanup(func_patcher.stop)
1643 def patch_os_spawnv(testcase):
1644 """ Patch `os.spawnv` behaviour for this test case.
1646 :param testcase: The `TestCase` instance to modify.
1647 :return: None.
1649 When the patched function is called, the registry of
1650 `SubprocessDouble` instances for this test case will be used
1651 to get the instance for the program path specified.
1654 orig_os_spawnv = os.spawnv
1656 def fake_os_spawnv(mode, file, args):
1657 registry = SubprocessDouble.get_registry_for_testcase(testcase)
1658 registry_key = tuple(args)
1659 if registry_key in registry:
1660 subprocess_double = registry[registry_key]
1661 result = subprocess_double.os_spawnv_scenario.call_hook(
1662 mode, file, args)
1663 else:
1664 result = orig_os_spawnv(mode, file, args)
1665 return result
1667 func_patcher = mock.patch.object(
1668 os, "spawnv", side_effect=fake_os_spawnv)
1669 func_patcher.start()
1670 testcase.addCleanup(func_patcher.stop)
1673 # Local variables:
1674 # coding: utf-8
1675 # mode: python
1676 # End:
1677 # vim: fileencoding=utf-8 filetype=python :