Document a bug fixed by using standard library APIs.
[dput.git] / test / helper.py
blob4444202be9702a37008ee0c9fc1f636eb3928654
1 # -*- coding: utf-8; -*-
3 # test/helper.py
4 # Part of ‘dput’, a Debian package upload toolkit.
6 # Copyright © 2015–2016 Ben Finney <ben+python@benfinney.id.au>
8 # This is free software: you may copy, modify, and/or distribute this work
9 # under the terms of the GNU General Public License as published by the
10 # Free Software Foundation; version 3 of that license or any later version.
11 # No warranty expressed or implied. See the file ‘LICENSE.GPL-3’ for details.
13 """ Helper functionality for Dput test suite. """
15 from __future__ import (absolute_import, unicode_literals)
17 import sys
19 if sys.version_info >= (3, 3):
20 import builtins
21 import unittest
22 import unittest.mock as mock
23 from io import StringIO as StringIO
24 import configparser
25 import collections.abc as collections_abc
26 elif sys.version_info >= (3, 0):
27 raise RuntimeError("Python 3 earlier than 3.3 is not supported.")
28 elif sys.version_info >= (2, 7):
29 # Python 2 standard library.
30 import __builtin__ as builtins
31 # Third-party backport of Python 3 unittest improvements.
32 import unittest2 as unittest
33 # Third-party mock library.
34 import mock
35 # Python 2 standard library.
36 from StringIO import StringIO as BaseStringIO
37 import ConfigParser as configparser
38 import collections as collections_abc
39 else:
40 raise RuntimeError("Python earlier than 2.7 is not supported.")
42 import os
43 import os.path
44 import io
45 import shutil
46 import tempfile
47 import pwd
48 import errno
49 import time
50 import signal
51 import subprocess
52 import functools
53 import itertools
54 import base64
55 import collections
56 import weakref
57 import shlex
59 __package__ = str("test")
60 __import__(__package__)
62 __metaclass__ = type
64 try:
65 # Python 2 types.
66 basestring
67 unicode
68 except NameError:
69 # Alias for Python 3 types.
70 basestring = str
71 unicode = str
74 def make_unique_slug(testcase):
75 """ Make a unique slug for the test case. """
76 text = base64.b64encode(
77 testcase.getUniqueString().encode('utf-8')
78 ).decode('utf-8')
79 result = text[-30:]
80 return result
83 try:
84 StringIO
85 except NameError:
86 # We don't yet have the StringIO we want. Create it.
88 class StringIO(BaseStringIO, object):
89 """ StringIO with a context manager. """
91 def __enter__(self):
92 return self
94 def __exit__(self, *args):
95 self.close()
96 return False
98 def readable(self):
99 return True
101 def writable(self):
102 return True
104 def seekable(self):
105 return True
108 def patch_stdout(testcase):
109 """ Patch `sys.stdout` for the specified test case. """
110 patcher = mock.patch.object(
111 sys, "stdout", wraps=StringIO())
112 patcher.start()
113 testcase.addCleanup(patcher.stop)
116 def patch_stderr(testcase):
117 """ Patch `sys.stderr` for the specified test case. """
118 patcher = mock.patch.object(
119 sys, "stderr", wraps=StringIO())
120 patcher.start()
121 testcase.addCleanup(patcher.stop)
124 def patch_signal_signal(testcase):
125 """ Patch `signal.signal` for the specified test case. """
126 func_patcher = mock.patch.object(signal, "signal", autospec=True)
127 func_patcher.start()
128 testcase.addCleanup(func_patcher.stop)
131 class FakeSystemExit(Exception):
132 """ Fake double for `SystemExit` exception. """
135 EXIT_STATUS_SUCCESS = 0
136 EXIT_STATUS_FAILURE = 1
137 EXIT_STATUS_COMMAND_NOT_FOUND = 127
140 def patch_sys_exit(testcase):
141 """ Patch `sys.exit` for the specified test case. """
142 func_patcher = mock.patch.object(
143 sys, "exit", autospec=True,
144 side_effect=FakeSystemExit())
145 func_patcher.start()
146 testcase.addCleanup(func_patcher.stop)
149 def patch_sys_argv(testcase):
150 """ Patch the `sys.argv` sequence for the test case. """
151 if not hasattr(testcase, 'progname'):
152 testcase.progname = make_unique_slug(testcase)
153 if not hasattr(testcase, 'sys_argv'):
154 testcase.sys_argv = [testcase.progname]
155 patcher = mock.patch.object(
156 sys, "argv",
157 new=list(testcase.sys_argv))
158 patcher.start()
159 testcase.addCleanup(patcher.stop)
162 def patch_system_interfaces(testcase):
163 """ Patch system interfaces that are disruptive to the test runner. """
164 patch_stdout(testcase)
165 patch_stderr(testcase)
166 patch_sys_exit(testcase)
167 patch_sys_argv(testcase)
170 def patch_time_time(testcase, values=None):
171 """ Patch the `time.time` function for the specified test case.
173 :param testcase: The `TestCase` instance for binding to the patch.
174 :param values: An iterable to provide return values.
175 :return: None.
178 if values is None:
179 values = itertools.count()
181 def generator_fake_time():
182 while True:
183 yield next(values)
185 func_patcher = mock.patch.object(time, "time", autospec=True)
186 func_patcher.start()
187 testcase.addCleanup(func_patcher.stop)
189 time.time.side_effect = generator_fake_time()
192 def patch_os_environ(testcase):
193 """ Patch the `os.environ` mapping. """
194 if not hasattr(testcase, 'os_environ'):
195 testcase.os_environ = {}
196 patcher = mock.patch.object(os, "environ", new=testcase.os_environ)
197 patcher.start()
198 testcase.addCleanup(patcher.stop)
201 def patch_os_getpid(testcase):
202 """ Patch `os.getpid` for the specified test case. """
203 func_patcher = mock.patch.object(os, "getpid", autospec=True)
204 func_patcher.start()
205 testcase.addCleanup(func_patcher.stop)
208 def patch_os_getuid(testcase):
209 """ Patch the `os.getuid` function. """
210 if not hasattr(testcase, 'os_getuid_return_value'):
211 testcase.os_getuid_return_value = testcase.getUniqueInteger()
212 func_patcher = mock.patch.object(
213 os, "getuid", autospec=True,
214 return_value=testcase.os_getuid_return_value)
215 func_patcher.start()
216 testcase.addCleanup(func_patcher.stop)
219 PasswdEntry = collections.namedtuple(
220 "PasswdEntry",
221 "pw_name pw_passwd pw_uid pw_gid pw_gecos pw_dir pw_shell")
224 def patch_pwd_getpwuid(testcase):
225 """ Patch the `pwd.getpwuid` function. """
226 if not hasattr(testcase, 'pwd_getpwuid_return_value'):
227 testcase.pwd_getpwuid_return_value = PasswdEntry(
228 pw_name=make_unique_slug(testcase),
229 pw_passwd=make_unique_slug(testcase),
230 pw_uid=testcase.getUniqueInteger(),
231 pw_gid=testcase.getUniqueInteger(),
232 pw_gecos=testcase.getUniqueString(),
233 pw_dir=tempfile.mktemp(),
234 pw_shell=tempfile.mktemp())
235 if not isinstance(testcase.pwd_getpwuid_return_value, pwd.struct_passwd):
236 pwent = pwd.struct_passwd(testcase.pwd_getpwuid_return_value)
237 else:
238 pwent = testcase.pwd_getpwuid_return_value
239 func_patcher = mock.patch.object(
240 pwd, "getpwuid", autospec=True,
241 return_value=pwent)
242 func_patcher.start()
243 testcase.addCleanup(func_patcher.stop)
246 def patch_os_path_exists(testcase):
247 """ Patch `os.path.exists` behaviour for this test case.
249 When the patched function is called, the registry of
250 `FileDouble` instances for this test case will be used to get
251 the instance for the path specified.
254 orig_os_path_exists = os.path.exists
256 def fake_os_path_exists(path):
257 registry = FileDouble.get_registry_for_testcase(testcase)
258 if path in registry:
259 file_double = registry[path]
260 result = file_double.os_path_exists_scenario.call_hook()
261 else:
262 result = orig_os_path_exists(path)
263 return result
265 func_patcher = mock.patch.object(
266 os.path, "exists", autospec=True,
267 side_effect=fake_os_path_exists)
268 func_patcher.start()
269 testcase.addCleanup(func_patcher.stop)
272 def patch_os_access(testcase):
273 """ Patch `os.access` behaviour for this test case.
275 When the patched function is called, the registry of
276 `FileDouble` instances for this test case will be used to get
277 the instance for the path specified.
280 orig_os_access = os.access
282 def fake_os_access(path, mode):
283 registry = FileDouble.get_registry_for_testcase(testcase)
284 if path in registry:
285 file_double = registry[path]
286 result = file_double.os_access_scenario.call_hook(mode)
287 else:
288 result = orig_os_access(path, mode)
289 return result
291 func_patcher = mock.patch.object(
292 os, "access", autospec=True,
293 side_effect=fake_os_access)
294 func_patcher.start()
295 testcase.addCleanup(func_patcher.stop)
298 StatResult = collections.namedtuple(
299 'StatResult', [
300 'st_mode',
301 'st_ino', 'st_dev', 'st_nlink',
302 'st_uid', 'st_gid',
303 'st_size',
304 'st_atime', 'st_mtime', 'st_ctime',
308 def patch_os_stat(testcase):
309 """ Patch `os.stat` behaviour for this test case.
311 When the patched function is called, the registry of
312 `FileDouble` instances for this test case will be used to get
313 the instance for the path specified.
316 orig_os_stat = os.stat
318 def fake_os_stat(path):
319 registry = FileDouble.get_registry_for_testcase(testcase)
320 if path in registry:
321 file_double = registry[path]
322 result = file_double.os_stat_scenario.call_hook()
323 else:
324 result = orig_os_stat(path)
325 return result
327 func_patcher = mock.patch.object(
328 os, "stat", autospec=True,
329 side_effect=fake_os_stat)
330 func_patcher.start()
331 testcase.addCleanup(func_patcher.stop)
334 def patch_os_lstat(testcase):
335 """ Patch `os.lstat` behaviour for this test case.
337 When the patched function is called, the registry of
338 `FileDouble` instances for this test case will be used to get
339 the instance for the path specified.
342 orig_os_lstat = os.lstat
344 def fake_os_lstat(path):
345 registry = FileDouble.get_registry_for_testcase(testcase)
346 if path in registry:
347 file_double = registry[path]
348 result = file_double.os_lstat_scenario.call_hook()
349 else:
350 result = orig_os_lstat(path)
351 return result
353 func_patcher = mock.patch.object(
354 os, "lstat", autospec=True,
355 side_effect=fake_os_lstat)
356 func_patcher.start()
357 testcase.addCleanup(func_patcher.stop)
360 def patch_os_unlink(testcase):
361 """ Patch `os.unlink` behaviour for this test case.
363 When the patched function is called, the registry of
364 `FileDouble` instances for this test case will be used to get
365 the instance for the path specified.
368 orig_os_unlink = os.unlink
370 def fake_os_unlink(path):
371 registry = FileDouble.get_registry_for_testcase(testcase)
372 if path in registry:
373 file_double = registry[path]
374 result = file_double.os_unlink_scenario.call_hook()
375 else:
376 result = orig_os_unlink(path)
377 return result
379 func_patcher = mock.patch.object(
380 os, "unlink", autospec=True,
381 side_effect=fake_os_unlink)
382 func_patcher.start()
383 testcase.addCleanup(func_patcher.stop)
386 def patch_os_rmdir(testcase):
387 """ Patch `os.rmdir` behaviour for this test case.
389 When the patched function is called, the registry of
390 `FileDouble` instances for this test case will be used to get
391 the instance for the path specified.
394 orig_os_rmdir = os.rmdir
396 def fake_os_rmdir(path):
397 registry = FileDouble.get_registry_for_testcase(testcase)
398 if path in registry:
399 file_double = registry[path]
400 result = file_double.os_rmdir_scenario.call_hook()
401 else:
402 result = orig_os_rmdir(path)
403 return result
405 func_patcher = mock.patch.object(
406 os, "rmdir", autospec=True,
407 side_effect=fake_os_rmdir)
408 func_patcher.start()
409 testcase.addCleanup(func_patcher.stop)
412 def patch_shutil_rmtree(testcase):
413 """ Patch `shutil.rmtree` behaviour for this test case.
415 When the patched function is called, the registry of
416 `FileDouble` instances for this test case will be used to get
417 the instance for the path specified.
420 orig_shutil_rmtree = os.rmdir
422 def fake_shutil_rmtree(path, ignore_errors=False, onerror=None):
423 registry = FileDouble.get_registry_for_testcase(testcase)
424 if path in registry:
425 file_double = registry[path]
426 result = file_double.shutil_rmtree_scenario.call_hook()
427 else:
428 result = orig_shutil_rmtree(path)
429 return result
431 func_patcher = mock.patch.object(
432 shutil, "rmtree", autospec=True,
433 side_effect=fake_shutil_rmtree)
434 func_patcher.start()
435 testcase.addCleanup(func_patcher.stop)
438 def patch_tempfile_mkdtemp(testcase):
439 """ Patch the `tempfile.mkdtemp` function for this test case. """
440 if not hasattr(testcase, 'tempfile_mkdtemp_file_double'):
441 testcase.tempfile_mkdtemp_file_double = FileDouble(tempfile.mktemp())
443 double = testcase.tempfile_mkdtemp_file_double
444 double.set_os_unlink_scenario('okay')
445 double.set_os_rmdir_scenario('okay')
446 double.register_for_testcase(testcase)
448 func_patcher = mock.patch.object(tempfile, "mkdtemp", autospec=True)
449 func_patcher.start()
450 testcase.addCleanup(func_patcher.stop)
452 tempfile.mkdtemp.return_value = testcase.tempfile_mkdtemp_file_double.path
455 try:
456 FileNotFoundError
457 FileExistsError
458 PermissionError
459 except NameError:
460 # Python 2 uses IOError.
461 def _ensure_ioerror_args(init_args, init_kwargs, errno_value):
462 result_kwargs = init_kwargs
463 result_errno = errno_value
464 result_strerror = os.strerror(errno_value)
465 result_filename = None
466 if len(init_args) >= 3:
467 result_errno = init_args[0]
468 result_filename = init_args[2]
469 if 'errno' in init_kwargs:
470 result_errno = init_kwargs['errno']
471 del result_kwargs['errno']
472 if 'filename' in init_kwargs:
473 result_filename = init_kwargs['filename']
474 del result_kwargs['filename']
475 if len(init_args) >= 2:
476 result_strerror = init_args[1]
477 if 'strerror' in init_kwargs:
478 result_strerror = init_kwargs['strerror']
479 del result_kwargs['strerror']
480 result_args = (result_errno, result_strerror, result_filename)
481 return (result_args, result_kwargs)
483 class FileNotFoundError(IOError):
484 def __init__(self, *args, **kwargs):
485 (args, kwargs) = _ensure_ioerror_args(
486 args, kwargs, errno_value=errno.ENOENT)
487 super(FileNotFoundError, self).__init__(*args, **kwargs)
489 class FileExistsError(IOError):
490 def __init__(self, *args, **kwargs):
491 (args, kwargs) = _ensure_ioerror_args(
492 args, kwargs, errno_value=errno.EEXIST)
493 super(FileExistsError, self).__init__(*args, **kwargs)
495 class PermissionError(IOError):
496 def __init__(self, *args, **kwargs):
497 (args, kwargs) = _ensure_ioerror_args(
498 args, kwargs, errno_value=errno.EPERM)
499 super(PermissionError, self).__init__(*args, **kwargs)
502 def make_fake_file_scenarios(path=None):
503 """ Make a collection of scenarios for testing with fake files.
505 :path: The filesystem path of the fake file. If not specified,
506 a valid random path will be generated.
507 :return: A collection of scenarios for tests involving input files.
509 The collection is a mapping from scenario name to a dictionary of
510 scenario attributes.
514 if path is None:
515 file_path = tempfile.mktemp()
516 else:
517 file_path = path
519 fake_file_empty = StringIO()
520 fake_file_minimal = StringIO("Lorem ipsum.")
521 fake_file_large = StringIO("\n".join(
522 "ABCDEFGH" * 100
523 for __ in range(1000)))
525 default_scenario_params = {
526 'open_scenario_name': 'okay',
527 'file_double_params': dict(
528 path=file_path, fake_file=fake_file_minimal),
531 scenarios = {
532 'default': {},
533 'error-not-exist': {
534 'open_scenario_name': 'nonexist',
536 'error-exist': {
537 'open_scenario_name': 'exist_error',
539 'error-read-denied': {
540 'open_scenario_name': 'read_denied',
542 'not-found': {
543 'file_double_params': dict(
544 path=file_path, fake_file=fake_file_empty),
546 'exist-empty': {
547 'file_double_params': dict(
548 path=file_path, fake_file=fake_file_empty),
550 'exist-minimal': {
551 'file_double_params': dict(
552 path=file_path, fake_file=fake_file_minimal),
554 'exist-large': {
555 'file_double_params': dict(
556 path=file_path, fake_file=fake_file_large),
560 for (name, scenario) in scenarios.items():
561 params = default_scenario_params.copy()
562 params.update(scenario)
563 scenario.update(params)
564 scenario['file_double'] = FileDouble(**scenario['file_double_params'])
565 scenario['file_double'].set_open_scenario(params['open_scenario_name'])
566 scenario['fake_file_scenario_name'] = name
568 return scenarios
571 def get_file_doubles_from_fake_file_scenarios(scenarios):
572 """ Get the `FileDouble` instances from fake file scenarios.
574 :param scenarios: Collection of fake file scenarios.
575 :return: Collection of `FileDouble` instances.
578 doubles = set(
579 scenario['file_double']
580 for scenario in scenarios
581 if scenario['file_double'] is not None)
583 return doubles
586 def setup_file_double_behaviour(testcase, doubles=None):
587 """ Set up file double instances and behaviour.
589 :param testcase: The `TestCase` instance to modify.
590 :param doubles: Collection of `FileDouble` instances.
591 :return: None.
593 If `doubles` is ``None``, a default collection will be made
594 from the result of `make_fake_file_scenarios` result.
597 if doubles is None:
598 scenarios = make_fake_file_scenarios()
599 doubles = get_file_doubles_from_fake_file_scenarios(
600 scenarios.values())
602 for file_double in doubles:
603 file_double.register_for_testcase(testcase)
605 orig_open = builtins.open
607 def fake_open(path, mode='rt', buffering=-1):
608 registry = FileDouble.get_registry_for_testcase(testcase)
609 if path in registry:
610 file_double = registry[path]
611 result = file_double.builtins_open_scenario.call_hook(
612 mode, buffering)
613 else:
614 result = orig_open(path, mode, buffering)
615 return result
617 mock_open = mock.mock_open()
618 mock_open.side_effect = fake_open
620 func_patcher = mock.patch.object(
621 builtins, "open", new=mock_open)
622 func_patcher.start()
623 testcase.addCleanup(func_patcher.stop)
626 def setup_fake_file_fixtures(testcase):
627 """ Set up fixtures for fake file doubles.
629 :param testcase: The `TestCase` instance to modify.
630 :return: None.
633 scenarios = make_fake_file_scenarios()
634 testcase.fake_file_scenarios = scenarios
636 file_doubles = get_file_doubles_from_fake_file_scenarios(
637 scenarios.values())
638 setup_file_double_behaviour(testcase, file_doubles)
641 def set_fake_file_scenario(testcase, name):
642 """ Set the named fake file scenario for the test case. """
643 scenario = testcase.fake_file_scenarios[name]
644 testcase.fake_file_scenario = scenario
645 testcase.file_double = scenario['file_double']
646 testcase.file_double.register_for_testcase(testcase)
649 class TestDoubleFunctionScenario:
650 """ Scenario for fake behaviour of a specific function. """
652 def __init__(self, scenario_name, double):
653 self.scenario_name = scenario_name
654 self.double = double
656 self.call_hook = getattr(
657 self, "_hook_{name}".format(name=self.scenario_name))
659 def __repr__(self):
660 text = (
661 "<{class_name} instance: {id}"
662 " name: {name!r},"
663 " call_hook name: {hook_name!r}"
664 " double: {double!r}"
665 ">").format(
666 class_name=self.__class__.__name__, id=id(self),
667 name=self.scenario_name, double=self.double,
668 hook_name=self.call_hook.__name__)
669 return text
671 def __eq__(self, other):
672 result = True
673 if not self.scenario_name == other.scenario_name:
674 result = False
675 if not self.double == other.double:
676 result = False
677 if not self.call_hook.__name__ == other.call_hook.__name__:
678 result = False
679 return result
681 def __ne__(self, other):
682 result = not self.__eq__(other)
683 return result
686 class os_path_exists_scenario(TestDoubleFunctionScenario):
687 """ Scenario for `os.path.exists` behaviour. """
689 def _hook_exist(self):
690 return True
692 def _hook_not_exist(self):
693 return False
696 class os_access_scenario(TestDoubleFunctionScenario):
697 """ Scenario for `os.access` behaviour. """
699 def _hook_okay(self, mode):
700 return True
702 def _hook_not_exist(self, mode):
703 return False
705 def _hook_read_only(self, mode):
706 if mode & (os.W_OK | os.X_OK):
707 result = False
708 else:
709 result = True
710 return result
712 def _hook_denied(self, mode):
713 if mode & (os.R_OK | os.W_OK | os.X_OK):
714 result = False
715 else:
716 result = True
717 return result
720 class os_stat_scenario(TestDoubleFunctionScenario):
721 """ Scenario for `os.stat` behaviour. """
723 def _hook_okay(self):
724 return self.double.stat_result
726 def _hook_notfound_error(self):
727 raise FileNotFoundError(
728 self.double.path,
729 "No such file or directory: {path!r}".format(
730 path=self.double.path))
732 def _hook_denied_error(self):
733 raise PermissionError(
734 self.double.path,
735 "Permission denied")
738 class os_lstat_scenario(os_stat_scenario):
739 """ Scenario for `os.lstat` behaviour. """
742 class os_unlink_scenario(TestDoubleFunctionScenario):
743 """ Scenario for `os.unlink` behaviour. """
745 def _hook_okay(self):
746 return None
748 def _hook_nonexist(self):
749 error = FileNotFoundError(
750 self.double.path,
751 "No such file or directory: {path!r}".format(
752 path=self.double.path))
753 raise error
755 def _hook_denied(self):
756 error = PermissionError(
757 self.double.path,
758 "Permission denied")
759 raise error
762 class os_rmdir_scenario(TestDoubleFunctionScenario):
763 """ Scenario for `os.rmdir` behaviour. """
765 def _hook_okay(self):
766 return None
768 def _hook_nonexist(self):
769 error = FileNotFoundError(
770 self.double.path,
771 "No such file or directory: {path!r}".format(
772 path=self.double.path))
773 raise error
775 def _hook_denied(self):
776 error = PermissionError(
777 self.double.path,
778 "Permission denied")
779 raise error
782 class shutil_rmtree_scenario(TestDoubleFunctionScenario):
783 """ Scenario for `shutil.rmtree` behaviour. """
785 def _hook_okay(self):
786 return None
788 def _hook_nonexist(self):
789 error = FileNotFoundError(
790 self.double.path,
791 "No such file or directory: {path!r}".format(
792 path=self.double.path))
793 raise error
795 def _hook_denied(self):
796 error = PermissionError(
797 self.double.path,
798 "Permission denied")
799 raise error
802 class builtins_open_scenario(TestDoubleFunctionScenario):
803 """ Scenario for `builtins.open` behaviour. """
805 def _hook_okay(self, mode, buffering):
806 result = self.double.fake_file
807 return result
809 def _hook_nonexist(self, mode, buffering):
810 if mode.startswith('r'):
811 error = FileNotFoundError(
812 self.double.path,
813 "No such file or directory: {path!r}".format(
814 path=self.double.path))
815 raise error
816 result = self.double.fake_file
817 return result
819 def _hook_exist_error(self, mode, buffering):
820 if mode.startswith('w') or mode.startswith('a'):
821 error = FileExistsError(
822 self.double.path,
823 "File already exists: {path!r}".format(
824 path=self.double.path))
825 raise error
826 result = self.double.fake_file
827 return result
829 def _hook_read_denied(self, mode, buffering):
830 if mode.startswith('r'):
831 error = PermissionError(
832 self.double.path,
833 "Read denied on {path!r}".format(
834 path=self.double.path))
835 raise error
836 result = self.double.fake_file
837 return result
839 def _hook_write_denied(self, mode, buffering):
840 if mode.startswith('w') or mode.startswith('a'):
841 error = PermissionError(
842 self.double.path,
843 "Write denied on {path!r}".format(
844 path=self.double.path))
845 raise error
846 result = self.double.fake_file
847 return result
850 class TestDoubleWithRegistry:
851 """ Abstract base class for a test double with a test case registry. """
853 registry_class = NotImplemented
854 registries = NotImplemented
856 function_scenario_params_by_class = NotImplemented
858 def __new__(cls, *args, **kwargs):
859 superclass = super(TestDoubleWithRegistry, cls)
860 if superclass.__new__ is object.__new__:
861 # The ‘object’ implementation complains about extra arguments.
862 instance = superclass.__new__(cls)
863 else:
864 instance = superclass.__new__(cls, *args, **kwargs)
865 instance.make_set_scenario_methods()
867 return instance
869 def __init__(self, *args, **kwargs):
870 super(TestDoubleWithRegistry, self).__init__(*args, **kwargs)
871 self._set_method_per_scenario()
873 def _make_set_scenario_method(self, scenario_class, params):
874 def method(self, name):
875 scenario = scenario_class(name, double=self)
876 setattr(self, scenario_class.__name__, scenario)
877 method.__doc__ = (
878 """ Set the scenario for `{name}` behaviour. """
879 ).format(name=scenario_class.__name__)
880 method.__name__ = str(params['set_scenario_method_name'])
881 return method
883 def make_set_scenario_methods(self):
884 """ Make `set_<scenario_class_name>` methods on this class. """
885 for (function_scenario_class, function_scenario_params) in (
886 self.function_scenario_params_by_class.items()):
887 method = self._make_set_scenario_method(
888 function_scenario_class, function_scenario_params)
889 setattr(self.__class__, method.__name__, method)
890 function_scenario_params['set_scenario_method'] = method
892 def _set_method_per_scenario(self):
893 """ Set the method to be called for each scenario. """
894 for function_scenario_params in (
895 self.function_scenario_params_by_class.values()):
896 function_scenario_params['set_scenario_method'](
897 self, function_scenario_params['default_scenario_name'])
899 @classmethod
900 def get_registry_for_testcase(cls, testcase):
901 """ Get the FileDouble registry for the specified test case. """
902 # Key in a dict must be hashable.
903 key = (testcase.__class__, id(testcase))
904 registry = cls.registries.setdefault(key, cls.registry_class())
905 return registry
907 def get_registry_key(self):
908 """ Get the registry key for this double. """
909 raise NotImplementedError
911 def register_for_testcase(self, testcase):
912 """ Add this instance to registry for the specified testcase. """
913 registry = self.get_registry_for_testcase(testcase)
914 key = self.get_registry_key()
915 registry[key] = self
916 unregister_func = functools.partial(
917 self.unregister_for_testcase, testcase)
918 testcase.addCleanup(unregister_func)
920 def unregister_for_testcase(self, testcase):
921 """ Remove this instance from registry for the specified testcase. """
922 registry = self.get_registry_for_testcase(testcase)
923 key = self.get_registry_key()
924 if key in registry:
925 registry.pop(key)
928 def copy_fake_file(fake_file):
929 """ Make a copy of the StringIO instance. """
930 fake_file_type = StringIO
931 content = ""
932 if fake_file is not None:
933 fake_file_type = type(fake_file)
934 content = fake_file.getvalue()
935 assert issubclass(fake_file_type, object)
936 result = fake_file_type(content)
937 if hasattr(fake_file, 'encoding'):
938 if not hasattr(result, 'encoding'):
939 result.encoding = fake_file.encoding
940 return result
943 class FileDouble(TestDoubleWithRegistry):
944 """ A testing double for a file. """
946 registry_class = dict
947 registries = {}
949 function_scenario_params_by_class = {
950 os_path_exists_scenario: {
951 'default_scenario_name': 'not_exist',
952 'set_scenario_method_name': 'set_os_path_exists_scenario',
954 os_access_scenario: {
955 'default_scenario_name': 'okay',
956 'set_scenario_method_name': 'set_os_access_scenario',
958 os_stat_scenario: {
959 'default_scenario_name': 'okay',
960 'set_scenario_method_name': 'set_os_stat_scenario',
962 os_lstat_scenario: {
963 'default_scenario_name': 'okay',
964 'set_scenario_method_name': 'set_os_lstat_scenario',
966 builtins_open_scenario: {
967 'default_scenario_name': 'okay',
968 'set_scenario_method_name': 'set_open_scenario',
970 os_unlink_scenario: {
971 'default_scenario_name': 'okay',
972 'set_scenario_method_name': 'set_os_unlink_scenario',
974 os_rmdir_scenario: {
975 'default_scenario_name': 'okay',
976 'set_scenario_method_name': 'set_os_rmdir_scenario',
978 shutil_rmtree_scenario: {
979 'default_scenario_name': 'okay',
980 'set_scenario_method_name': 'set_shutil_rmtree_scenario',
984 def __init__(self, path=None, fake_file=None, *args, **kwargs):
985 self.path = path
986 self.fake_file = copy_fake_file(fake_file)
987 self.fake_file.name = path
989 self._set_stat_result()
991 super(FileDouble, self).__init__(*args, **kwargs)
993 def _set_stat_result(self):
994 """ Set the `os.stat` result for this file. """
995 size = len(self.fake_file.getvalue())
996 self.stat_result = StatResult(
997 st_mode=0,
998 st_ino=None, st_dev=None, st_nlink=None,
999 st_uid=0, st_gid=0,
1000 st_size=size,
1001 st_atime=None, st_mtime=None, st_ctime=None,
1004 def __repr__(self):
1005 text = "FileDouble(path={path!r}, fake_file={fake_file!r})".format(
1006 path=self.path, fake_file=self.fake_file)
1007 return text
1009 def get_registry_key(self):
1010 """ Get the registry key for this double. """
1011 result = self.path
1012 return result
1015 class os_popen_scenario(TestDoubleFunctionScenario):
1016 """ Scenario for `os.popen` behaviour. """
1018 stream_name_by_mode = {
1019 'w': 'stdin',
1020 'r': 'stdout',
1023 def _hook_success(self, argv, mode, buffering):
1024 stream_name = self.stream_name_by_mode[mode]
1025 stream_double = getattr(
1026 self.double, stream_name + '_double')
1027 result = stream_double.fake_file
1028 return result
1030 def _hook_failure(self, argv, mode, buffering):
1031 result = StringIO()
1032 return result
1034 def _hook_not_found(self, argv, mode, buffering):
1035 result = StringIO()
1036 return result
1039 class os_waitpid_scenario(TestDoubleFunctionScenario):
1040 """ Scenario for `os.waitpid` behaviour. """
1042 def _hook_success(self, pid, options):
1043 result = (pid, EXIT_STATUS_SUCCESS)
1044 return result
1046 def _hook_failure(self, pid, options):
1047 result = (pid, EXIT_STATUS_FAILURE)
1048 return result
1050 def _hook_not_found(self, pid, options):
1051 error = OSError(errno.ECHILD)
1052 raise error
1055 class os_system_scenario(TestDoubleFunctionScenario):
1056 """ Scenario for `os.system` behaviour. """
1058 def _hook_success(self, command):
1059 result = EXIT_STATUS_SUCCESS
1060 return result
1062 def _hook_failure(self, command):
1063 result = EXIT_STATUS_FAILURE
1064 return result
1066 def _hook_not_found(self, command):
1067 result = EXIT_STATUS_COMMAND_NOT_FOUND
1068 return result
1071 class os_spawnv_scenario(TestDoubleFunctionScenario):
1072 """ Scenario for `os.spawnv` behaviour. """
1074 def _hook_success(self, mode, file, args):
1075 result = EXIT_STATUS_SUCCESS
1076 return result
1078 def _hook_failure(self, mode, file, args):
1079 result = EXIT_STATUS_FAILURE
1080 return result
1082 def _hook_not_found(self, mode, file, args):
1083 result = EXIT_STATUS_COMMAND_NOT_FOUND
1084 return result
1087 ARG_ANY = object()
1088 ARG_MORE = object()
1091 class PopenDouble:
1092 """ A testing double for `subprocess.Popen`. """
1094 def __init__(self, args, *posargs, **kwargs):
1095 self.stdin = None
1096 self.stdout = None
1097 self.stderr = None
1098 self.pid = None
1099 self.returncode = None
1101 if kwargs.get('shell', False):
1102 self.argv = shlex.split(args)
1103 else:
1104 # The paramter is already a sequence of command-line arguments.
1105 self.argv = args
1107 def set_streams(self, subprocess_double, popen_kwargs):
1108 """ Set the streams on the `PopenDouble`.
1110 :param subprocess_double: The `SubprocessDouble` from
1111 which to get existing stream doubles.
1112 :param popen_kwargs: The keyword arguments to the
1113 `subprocess.Popen` call.
1114 :return: ``None``.
1117 for stream_name in (
1118 name for name in ['stdin', 'stdout', 'stderr']
1119 if name in popen_kwargs):
1120 stream_spec = popen_kwargs[stream_name]
1121 if stream_spec is subprocess.PIPE:
1122 stream_double = getattr(
1123 subprocess_double,
1124 "{name}_double".format(name=stream_name))
1125 stream_file = stream_double.fake_file
1126 elif stream_spec is subprocess.STDOUT:
1127 stream_file = subprocess_double.stdout_double.fake_file
1128 else:
1129 stream_file = stream_spec
1130 setattr(self, stream_name, stream_file)
1132 def wait(self):
1133 """ Wait for subprocess to terminate. """
1134 return self.returncode
1137 class subprocess_popen_scenario(TestDoubleFunctionScenario):
1138 """ Scenario for `subprocess.Popen` behaviour. """
1140 def _hook_success(self, testcase, args, *posargs, **kwargs):
1141 double = self.double.popen_double
1142 double.set_streams(self.double, kwargs)
1143 return double
1146 def patch_subprocess_popen(testcase):
1147 """ Patch `subprocess.Popen` constructor for this test case.
1149 :param testcase: The `TestCase` instance to modify.
1150 :return: None.
1152 When the patched function is called, the registry of
1153 `SubprocessDouble` instances for this test case will be used
1154 to get the instance for the program path specified.
1157 orig_subprocess_popen = subprocess.Popen
1159 def fake_subprocess_popen(args, *posargs, **kwargs):
1160 if kwargs.get('shell', False):
1161 argv = shlex.split(args)
1162 else:
1163 argv = args
1164 registry = SubprocessDouble.get_registry_for_testcase(testcase)
1165 if argv in registry:
1166 subprocess_double = registry[argv]
1167 result = subprocess_double.subprocess_popen_scenario.call_hook(
1168 testcase, args, *posargs, **kwargs)
1169 else:
1170 result = orig_subprocess_popen(args, *posargs, **kwargs)
1171 return result
1173 func_patcher = mock.patch.object(
1174 subprocess, "Popen", autospec=True,
1175 side_effect=fake_subprocess_popen)
1176 func_patcher.start()
1177 testcase.addCleanup(func_patcher.stop)
1180 class subprocess_call_scenario(TestDoubleFunctionScenario):
1181 """ Scenario for `subprocess.call` behaviour. """
1183 def _hook_success(self, command):
1184 result = EXIT_STATUS_SUCCESS
1185 return result
1187 def _hook_failure(self, command):
1188 result = EXIT_STATUS_FAILURE
1189 return result
1191 def _hook_not_found(self, command):
1192 result = EXIT_STATUS_COMMAND_NOT_FOUND
1193 return result
1196 def patch_subprocess_call(testcase):
1197 """ Patch `subprocess.call` function for this test case.
1199 :param testcase: The `TestCase` instance to modify.
1200 :return: None.
1202 When the patched function is called, the registry of
1203 `SubprocessDouble` instances for this test case will be used
1204 to get the instance for the program path specified.
1207 orig_subprocess_call = subprocess.call
1209 def fake_subprocess_call(command, *posargs, **kwargs):
1210 if kwargs.get('shell', False):
1211 command_argv = shlex.split(command)
1212 else:
1213 command_argv = command
1214 registry = SubprocessDouble.get_registry_for_testcase(testcase)
1215 if command_argv in registry:
1216 subprocess_double = registry[command_argv]
1217 result = subprocess_double.subprocess_call_scenario.call_hook(
1218 command)
1219 else:
1220 result = orig_subprocess_call(command, *posargs, **kwargs)
1221 return result
1223 func_patcher = mock.patch.object(
1224 subprocess, "call", autospec=True,
1225 side_effect=fake_subprocess_call)
1226 func_patcher.start()
1227 testcase.addCleanup(func_patcher.stop)
1230 class subprocess_check_call_scenario(TestDoubleFunctionScenario):
1231 """ Scenario for `subprocess.check_call` behaviour. """
1233 def _hook_success(self, command):
1234 return None
1236 def _hook_failure(self, command):
1237 result = EXIT_STATUS_FAILURE
1238 error = subprocess.CalledProcessError(result, command)
1239 raise error
1241 def _hook_not_found(self, command):
1242 result = EXIT_STATUS_COMMAND_NOT_FOUND
1243 error = subprocess.CalledProcessError(result, command)
1244 raise error
1247 def patch_subprocess_check_call(testcase):
1248 """ Patch `subprocess.check_call` function for this test case.
1250 :param testcase: The `TestCase` instance to modify.
1251 :return: None.
1253 When the patched function is called, the registry of
1254 `SubprocessDouble` instances for this test case will be used
1255 to get the instance for the program path specified.
1258 orig_subprocess_check_call = subprocess.check_call
1260 def fake_subprocess_check_call(command, *posargs, **kwargs):
1261 if kwargs.get('shell', False):
1262 command_argv = shlex.split(command)
1263 else:
1264 command_argv = command
1265 registry = SubprocessDouble.get_registry_for_testcase(testcase)
1266 if command_argv in registry:
1267 subprocess_double = registry[command_argv]
1268 scenario = subprocess_double.subprocess_check_call_scenario
1269 result = scenario.call_hook(command)
1270 else:
1271 result = orig_subprocess_check_call(command, *posargs, **kwargs)
1272 return result
1274 func_patcher = mock.patch.object(
1275 subprocess, "check_call", autospec=True,
1276 side_effect=fake_subprocess_check_call)
1277 func_patcher.start()
1278 testcase.addCleanup(func_patcher.stop)
1281 class SubprocessDoubleRegistry(collections_abc.MutableMapping):
1282 """ Registry of `SubprocessDouble` instances by `argv`. """
1284 def __init__(self, *args, **kwargs):
1285 items = []
1286 if args:
1287 if isinstance(args[0], collections_abc.Mapping):
1288 items = args[0].items()
1289 if isinstance(args[0], collections_abc.Iterable):
1290 items = args[0]
1291 self._mapping = dict(items)
1293 def __repr__(self):
1294 text = "<{class_name} object: {mapping}>".format(
1295 class_name=self.__class__.__name__, mapping=self._mapping)
1296 return text
1298 def _match_argv(self, argv):
1299 """ Match the specified `argv` with our registered keys. """
1300 match = None
1301 if not isinstance(argv, collections_abc.Sequence):
1302 return match
1303 candidates = iter(self._mapping)
1304 while match is None:
1305 try:
1306 candidate = next(candidates)
1307 except StopIteration:
1308 break
1309 found = None
1310 if candidate == argv:
1311 # An exact match.
1312 found = True
1313 word_iter = enumerate(candidate)
1314 while found is None:
1315 try:
1316 (word_index, candidate_word) = next(word_iter)
1317 except StopIteration:
1318 break
1319 if candidate_word is ARG_MORE:
1320 # Candiate matches any remaining words. We have a match.
1321 found = True
1322 elif word_index > len(argv):
1323 # Candidate is too long for the specified argv.
1324 found = False
1325 elif candidate_word is ARG_ANY:
1326 # Candidate matches any word at this position.
1327 continue
1328 elif candidate_word == argv[word_index]:
1329 # Candidate matches the word at this position.
1330 continue
1331 else:
1332 # This candidate does not match.
1333 found = False
1334 if found is None:
1335 # Reached the end of the candidate without a mismatch.
1336 found = True
1337 if found:
1338 match = candidate
1339 return match
1341 def __getitem__(self, key):
1342 match = self._match_argv(key)
1343 if match is None:
1344 raise KeyError(key)
1345 result = self._mapping[match]
1346 return result
1348 def __setitem__(self, key, value):
1349 if key in self:
1350 del self[key]
1351 self._mapping[key] = value
1353 def __delitem__(self, key):
1354 match = self._match_argv(key)
1355 if match is not None:
1356 del self._mapping[match]
1358 def __iter__(self):
1359 return self._mapping.__iter__()
1361 def __len__(self):
1362 return self._mapping.__len__()
1365 class SubprocessDouble(TestDoubleWithRegistry):
1366 """ A testing double for a subprocess. """
1368 registry_class = SubprocessDoubleRegistry
1369 registries = {}
1371 double_by_pid = weakref.WeakValueDictionary()
1373 function_scenario_params_by_class = {
1374 subprocess_popen_scenario: {
1375 'default_scenario_name': 'success',
1376 'set_scenario_method_name': 'set_subprocess_popen_scenario',
1378 subprocess_call_scenario: {
1379 'default_scenario_name': 'success',
1380 'set_scenario_method_name': 'set_subprocess_call_scenario',
1382 subprocess_check_call_scenario: {
1383 'default_scenario_name': 'success',
1384 'set_scenario_method_name':
1385 'set_subprocess_check_call_scenario',
1387 os_popen_scenario: {
1388 'default_scenario_name': 'success',
1389 'set_scenario_method_name': 'set_os_popen_scenario',
1391 os_waitpid_scenario: {
1392 'default_scenario_name': 'success',
1393 'set_scenario_method_name': 'set_os_waitpid_scenario',
1395 os_system_scenario: {
1396 'default_scenario_name': 'success',
1397 'set_scenario_method_name': 'set_os_system_scenario',
1399 os_spawnv_scenario: {
1400 'default_scenario_name': 'success',
1401 'set_scenario_method_name': 'set_os_spawnv_scenario',
1405 def __init__(self, path=None, argv=None, *args, **kwargs):
1406 if path is None:
1407 path = tempfile.mktemp()
1408 self.path = path
1410 if argv is None:
1411 command_name = os.path.basename(path)
1412 argv = [command_name]
1413 self.argv = argv
1415 self.pid = self._make_pid()
1416 self._register_by_pid()
1418 self.set_popen_double()
1420 stream_class = SubprocessDouble.stream_class
1421 for stream_name in ['stdin', 'stdout', 'stderr']:
1422 fake_file = stream_class()
1423 file_double = FileDouble(fake_file=fake_file)
1424 stream_double_name = '{name}_double'.format(name=stream_name)
1425 setattr(self, stream_double_name, file_double)
1427 super(SubprocessDouble, self).__init__(*args, **kwargs)
1429 def set_popen_double(self):
1430 """ Set the `PopenDouble` for this instance. """
1431 double = PopenDouble(self.argv)
1432 double.pid = self.pid
1434 self.popen_double = double
1436 def __repr__(self):
1437 text = (
1438 "<SubprocessDouble instance: {id}"
1439 " path: {path!r},"
1440 " argv: {argv!r}"
1441 " stdin_double: {stdin_double!r}"
1442 " stdout_double: {stdout_double!r}"
1443 " stderr_double: {stderr_double!r}"
1444 ">").format(
1445 id=id(self),
1446 path=self.path, argv=self.argv,
1447 stdin_double=self.stdin_double,
1448 stdout_double=self.stdout_double,
1449 stderr_double=self.stderr_double)
1450 return text
1452 @classmethod
1453 def _make_pid(cls):
1454 """ Make a unique PID for a subprocess. """
1455 for pid in itertools.count(1):
1456 yield pid
1458 def _register_by_pid(self):
1459 """ Register this subprocess by its PID. """
1460 self.__class__.double_by_pid[self.pid] = self
1462 def get_registry_key(self):
1463 """ Get the registry key for this double. """
1464 result = tuple(self.argv)
1465 return result
1467 stream_class = io.BytesIO
1468 stream_encoding = "utf-8"
1470 def set_stdin_content(self, text, bytes_encoding=stream_encoding):
1471 """ Set the content of the `stdin` stream for this double. """
1472 content = text.encode(bytes_encoding)
1473 fake_file = self.stream_class(content)
1474 self.stdin_double.fake_file = fake_file
1476 def set_stdout_content(self, text, bytes_encoding=stream_encoding):
1477 """ Set the content of the `stdout` stream for this double. """
1478 content = text.encode(bytes_encoding)
1479 fake_file = self.stream_class(content)
1480 self.stdout_double.fake_file = fake_file
1482 def set_stderr_content(self, text, bytes_encoding=stream_encoding):
1483 """ Set the content of the `stderr` stream for this double. """
1484 content = text.encode(bytes_encoding)
1485 fake_file = self.stream_class(content)
1486 self.stderr_double.fake_file = fake_file
1489 def make_fake_subprocess_scenarios(path=None):
1490 """ Make a collection of scenarios for testing with fake files.
1492 :path: The filesystem path of the fake program. If not specified,
1493 a valid random path will be generated.
1494 :return: A collection of scenarios for tests involving subprocesses.
1496 The collection is a mapping from scenario name to a dictionary of
1497 scenario attributes.
1500 if path is None:
1501 file_path = tempfile.mktemp()
1502 else:
1503 file_path = path
1505 default_scenario_params = {
1506 'return_value': EXIT_STATUS_SUCCESS,
1507 'program_path': file_path,
1508 'argv_after_command_name': [],
1511 scenarios = {
1512 'default': {},
1513 'not-found': {
1514 'return_value': EXIT_STATUS_COMMAND_NOT_FOUND,
1518 for (name, scenario) in scenarios.items():
1519 params = default_scenario_params.copy()
1520 params.update(scenario)
1521 scenario.update(params)
1522 program_path = params['program_path']
1523 program_name = os.path.basename(params['program_path'])
1524 argv = [program_name]
1525 argv.extend(params['argv_after_command_name'])
1526 subprocess_double_params = dict(
1527 path=program_path,
1528 argv=argv,
1530 subprocess_double = SubprocessDouble(**subprocess_double_params)
1531 scenario['subprocess_double'] = subprocess_double
1532 scenario['fake_file_scenario_name'] = name
1534 return scenarios
1537 def get_subprocess_doubles_from_fake_subprocess_scenarios(scenarios):
1538 """ Get the `SubprocessDouble` instances from fake subprocess scenarios.
1540 :param scenarios: Collection of fake subprocess scenarios.
1541 :return: Collection of `SubprocessDouble` instances.
1544 doubles = set(
1545 scenario['subprocess_double']
1546 for scenario in scenarios
1547 if scenario['subprocess_double'] is not None)
1549 return doubles
1552 def setup_subprocess_double_behaviour(testcase, doubles=None):
1553 """ Set up subprocess double instances and behaviour.
1555 :param testcase: The `TestCase` instance to modify.
1556 :param doubles: Collection of `SubprocessDouble` instances.
1557 :return: None.
1559 If `doubles` is ``None``, a default collection will be made
1560 from the return value of `make_fake_subprocess_scenarios`.
1563 if doubles is None:
1564 scenarios = make_fake_subprocess_scenarios()
1565 doubles = get_subprocess_doubles_from_fake_subprocess_scenarios(
1566 scenarios.values())
1568 for double in doubles:
1569 double.register_for_testcase(testcase)
1572 def setup_fake_subprocess_fixtures(testcase):
1573 """ Set up fixtures for fake subprocess doubles.
1575 :param testcase: The `TestCase` instance to modify.
1576 :return: None.
1579 scenarios = make_fake_subprocess_scenarios()
1580 testcase.fake_subprocess_scenarios = scenarios
1582 doubles = get_subprocess_doubles_from_fake_subprocess_scenarios(
1583 scenarios.values())
1584 setup_subprocess_double_behaviour(testcase, doubles)
1587 def patch_os_popen(testcase):
1588 """ Patch `os.popen` behaviour for this test case.
1590 :param testcase: The `TestCase` instance to modify.
1591 :return: None.
1593 When the patched function is called, the registry of
1594 `SubprocessDouble` instances for this test case will be used
1595 to get the instance for the program path specified.
1598 orig_os_popen = os.popen
1600 def fake_os_popen(cmd, mode='r', buffering=-1):
1601 registry = SubprocessDouble.get_registry_for_testcase(testcase)
1602 if isinstance(cmd, basestring):
1603 command_argv = shlex.split(cmd)
1604 else:
1605 command_argv = cmd
1606 if command_argv in registry:
1607 subprocess_double = registry[command_argv]
1608 result = subprocess_double.os_popen_scenario.call_hook(
1609 command_argv, mode, buffering)
1610 else:
1611 result = orig_os_popen(cmd, mode, buffering)
1612 return result
1614 func_patcher = mock.patch.object(
1615 os, "popen", autospec=True,
1616 side_effect=fake_os_popen)
1617 func_patcher.start()
1618 testcase.addCleanup(func_patcher.stop)
1621 def patch_os_waitpid(testcase):
1622 """ Patch `os.waitpid` behaviour for this test case.
1624 :param testcase: The `TestCase` instance to modify.
1625 :return: None.
1627 When the patched function is called, the registry of
1628 `SubprocessDouble` instances for this test case will be used
1629 to get the instance for the program path specified.
1632 orig_os_waitpid = os.waitpid
1634 def fake_os_waitpid(pid, options):
1635 registry = SubprocessDouble.double_by_pid
1636 if pid in registry:
1637 subprocess_double = registry[pid]
1638 result = subprocess_double.os_waitpid_scenario.call_hook(
1639 pid, options)
1640 else:
1641 result = orig_os_waitpid(pid, options)
1642 return result
1644 func_patcher = mock.patch.object(
1645 os, "waitpid", autospec=True,
1646 side_effect=fake_os_waitpid)
1647 func_patcher.start()
1648 testcase.addCleanup(func_patcher.stop)
1651 def patch_os_system(testcase):
1652 """ Patch `os.system` behaviour for this test case.
1654 :param testcase: The `TestCase` instance to modify.
1655 :return: None.
1657 When the patched function is called, the registry of
1658 `SubprocessDouble` instances for this test case will be used
1659 to get the instance for the program path specified.
1662 orig_os_system = os.system
1664 def fake_os_system(command):
1665 registry = SubprocessDouble.get_registry_for_testcase(testcase)
1666 command_argv = shlex.split(command)
1667 if command_argv in registry:
1668 subprocess_double = registry[command_argv]
1669 result = subprocess_double.os_system_scenario.call_hook(
1670 command)
1671 else:
1672 result = orig_os_system(command)
1673 return result
1675 func_patcher = mock.patch.object(
1676 os, "system", autospec=True,
1677 side_effect=fake_os_system)
1678 func_patcher.start()
1679 testcase.addCleanup(func_patcher.stop)
1682 def patch_os_spawnv(testcase):
1683 """ Patch `os.spawnv` behaviour for this test case.
1685 :param testcase: The `TestCase` instance to modify.
1686 :return: None.
1688 When the patched function is called, the registry of
1689 `SubprocessDouble` instances for this test case will be used
1690 to get the instance for the program path specified.
1693 orig_os_spawnv = os.spawnv
1695 def fake_os_spawnv(mode, file, args):
1696 registry = SubprocessDouble.get_registry_for_testcase(testcase)
1697 registry_key = tuple(args)
1698 if registry_key in registry:
1699 subprocess_double = registry[registry_key]
1700 result = subprocess_double.os_spawnv_scenario.call_hook(
1701 mode, file, args)
1702 else:
1703 result = orig_os_spawnv(mode, file, args)
1704 return result
1706 func_patcher = mock.patch.object(
1707 os, "spawnv", autospec=True,
1708 side_effect=fake_os_spawnv)
1709 func_patcher.start()
1710 testcase.addCleanup(func_patcher.stop)
1713 # Local variables:
1714 # coding: utf-8
1715 # mode: python
1716 # End:
1717 # vim: fileencoding=utf-8 filetype=python :