Signature files are not input to the command; remove the attempt to match.
[dput.git] / test / helper.py
blobab6e496150ddb6d9b249751688467824bf78125e
1 # -*- coding: utf-8; -*-
3 # test/helper.py
4 # Part of ‘dput’, a Debian package upload toolkit.
6 # This is free software, and you are welcome to redistribute it under
7 # certain conditions; see the end of this file for copyright
8 # information, grant of license, and disclaimer of warranty.
10 """ Helper functionality for Dput test suite. """
12 from __future__ import (absolute_import, unicode_literals)
14 import sys
16 if sys.version_info >= (3, 3):
17 import builtins
18 import collections.abc as collections_abc
19 import configparser
20 from io import StringIO as StringIO
21 import unittest
22 import unittest.mock as mock
23 elif sys.version_info >= (3, 0):
24 raise RuntimeError("Python 3 earlier than 3.3 is not supported.")
25 elif sys.version_info >= (2, 7):
26 # Python 2 standard library.
27 import __builtin__ as builtins
28 import collections as collections_abc
29 import ConfigParser as configparser
30 from StringIO import StringIO as BaseStringIO
31 # Third-party backport of Python 3 unittest improvements.
32 import unittest2 as unittest
33 # Third-party mock library.
34 import mock
35 else:
36 raise RuntimeError("Python earlier than 2.7 is not supported.")
38 import base64
39 import collections
40 import errno
41 import functools
42 import io
43 import itertools
44 import os
45 import os.path
46 import pwd
47 import shlex
48 import shutil
49 import signal
50 import subprocess
51 import tempfile
52 import time
53 import weakref
56 __metaclass__ = type
58 try:
59 # Python 2 types.
60 basestring
61 unicode
62 except NameError:
63 # Alias for Python 3 types.
64 basestring = str
65 unicode = str
68 def make_unique_slug(testcase):
69 """ Make a unique slug for the test case. """
70 text = base64.b64encode(
71 testcase.getUniqueString().encode('utf-8')
72 ).decode('utf-8')
73 result = text[-30:]
74 return result
77 try:
78 StringIO
79 except NameError:
80 # We don't yet have the StringIO we want. Create it.
82 class StringIO(BaseStringIO, object):
83 """ StringIO with a context manager. """
85 def __enter__(self):
86 return self
88 def __exit__(self, *args):
89 self.close()
90 return False
92 def readable(self):
93 return True
95 def writable(self):
96 return True
98 def seekable(self):
99 return True
102 def patch_stdout(testcase):
103 """ Patch `sys.stdout` for the specified test case. """
104 patcher = mock.patch.object(
105 sys, "stdout", wraps=StringIO())
106 patcher.start()
107 testcase.addCleanup(patcher.stop)
110 def patch_stderr(testcase):
111 """ Patch `sys.stderr` for the specified test case. """
112 patcher = mock.patch.object(
113 sys, "stderr", wraps=StringIO())
114 patcher.start()
115 testcase.addCleanup(patcher.stop)
118 def patch_signal_signal(testcase):
119 """ Patch `signal.signal` for the specified test case. """
120 func_patcher = mock.patch.object(signal, "signal", autospec=True)
121 func_patcher.start()
122 testcase.addCleanup(func_patcher.stop)
125 class FakeSystemExit(Exception):
126 """ Fake double for `SystemExit` exception. """
129 EXIT_STATUS_SUCCESS = 0
130 EXIT_STATUS_FAILURE = 1
131 EXIT_STATUS_COMMAND_NOT_FOUND = 127
134 def patch_sys_exit(testcase):
135 """ Patch `sys.exit` for the specified test case. """
136 func_patcher = mock.patch.object(
137 sys, "exit", autospec=True,
138 side_effect=FakeSystemExit())
139 func_patcher.start()
140 testcase.addCleanup(func_patcher.stop)
143 def patch_sys_argv(testcase):
144 """ Patch the `sys.argv` sequence for the test case. """
145 if not hasattr(testcase, 'progname'):
146 testcase.progname = make_unique_slug(testcase)
147 if not hasattr(testcase, 'sys_argv'):
148 testcase.sys_argv = [testcase.progname]
149 patcher = mock.patch.object(
150 sys, "argv",
151 new=list(testcase.sys_argv))
152 patcher.start()
153 testcase.addCleanup(patcher.stop)
156 def patch_system_interfaces(testcase):
157 """ Patch system interfaces that are disruptive to the test runner. """
158 patch_stdout(testcase)
159 patch_stderr(testcase)
160 patch_sys_exit(testcase)
161 patch_sys_argv(testcase)
164 def patch_time_time(testcase, values=None):
165 """ Patch the `time.time` function for the specified test case.
167 :param testcase: The `TestCase` instance for binding to the patch.
168 :param values: An iterable to provide return values.
169 :return: None.
172 if values is None:
173 values = itertools.count()
175 def generator_fake_time():
176 while True:
177 yield next(values)
179 func_patcher = mock.patch.object(time, "time", autospec=True)
180 func_patcher.start()
181 testcase.addCleanup(func_patcher.stop)
183 time.time.side_effect = generator_fake_time()
186 def patch_os_environ(testcase):
187 """ Patch the `os.environ` mapping. """
188 if not hasattr(testcase, 'os_environ'):
189 testcase.os_environ = {}
190 patcher = mock.patch.object(os, "environ", new=testcase.os_environ)
191 patcher.start()
192 testcase.addCleanup(patcher.stop)
195 def patch_os_getpid(testcase):
196 """ Patch `os.getpid` for the specified test case. """
197 func_patcher = mock.patch.object(os, "getpid", autospec=True)
198 func_patcher.start()
199 testcase.addCleanup(func_patcher.stop)
202 def patch_os_getuid(testcase):
203 """ Patch the `os.getuid` function. """
204 if not hasattr(testcase, 'os_getuid_return_value'):
205 testcase.os_getuid_return_value = testcase.getUniqueInteger()
206 func_patcher = mock.patch.object(
207 os, "getuid", autospec=True,
208 return_value=testcase.os_getuid_return_value)
209 func_patcher.start()
210 testcase.addCleanup(func_patcher.stop)
213 PasswdEntry = collections.namedtuple(
214 "PasswdEntry",
215 "pw_name pw_passwd pw_uid pw_gid pw_gecos pw_dir pw_shell")
218 def patch_pwd_getpwuid(testcase):
219 """ Patch the `pwd.getpwuid` function. """
220 if not hasattr(testcase, 'pwd_getpwuid_return_value'):
221 testcase.pwd_getpwuid_return_value = PasswdEntry(
222 pw_name=make_unique_slug(testcase),
223 pw_passwd=make_unique_slug(testcase),
224 pw_uid=testcase.getUniqueInteger(),
225 pw_gid=testcase.getUniqueInteger(),
226 pw_gecos=testcase.getUniqueString(),
227 pw_dir=tempfile.mktemp(),
228 pw_shell=tempfile.mktemp())
229 if not isinstance(testcase.pwd_getpwuid_return_value, pwd.struct_passwd):
230 pwent = pwd.struct_passwd(testcase.pwd_getpwuid_return_value)
231 else:
232 pwent = testcase.pwd_getpwuid_return_value
233 func_patcher = mock.patch.object(
234 pwd, "getpwuid", autospec=True,
235 return_value=pwent)
236 func_patcher.start()
237 testcase.addCleanup(func_patcher.stop)
240 def patch_os_path_exists(testcase):
241 """ Patch `os.path.exists` behaviour for this test case.
243 When the patched function is called, the registry of
244 `FileDouble` instances for this test case will be used to get
245 the instance for the path specified.
248 orig_os_path_exists = os.path.exists
250 def fake_os_path_exists(path):
251 registry = FileDouble.get_registry_for_testcase(testcase)
252 if path in registry:
253 file_double = registry[path]
254 result = file_double.os_path_exists_scenario.call_hook()
255 else:
256 result = orig_os_path_exists(path)
257 return result
259 func_patcher = mock.patch.object(
260 os.path, "exists", autospec=True,
261 side_effect=fake_os_path_exists)
262 func_patcher.start()
263 testcase.addCleanup(func_patcher.stop)
266 def patch_os_access(testcase):
267 """ Patch `os.access` behaviour for this test case.
269 When the patched function is called, the registry of
270 `FileDouble` instances for this test case will be used to get
271 the instance for the path specified.
274 orig_os_access = os.access
276 def fake_os_access(path, mode):
277 registry = FileDouble.get_registry_for_testcase(testcase)
278 if path in registry:
279 file_double = registry[path]
280 result = file_double.os_access_scenario.call_hook(mode)
281 else:
282 result = orig_os_access(path, mode)
283 return result
285 func_patcher = mock.patch.object(
286 os, "access", autospec=True,
287 side_effect=fake_os_access)
288 func_patcher.start()
289 testcase.addCleanup(func_patcher.stop)
292 StatResult = collections.namedtuple(
293 'StatResult', [
294 'st_mode',
295 'st_ino', 'st_dev', 'st_nlink',
296 'st_uid', 'st_gid',
297 'st_size',
298 'st_atime', 'st_mtime', 'st_ctime',
302 def patch_os_stat(testcase):
303 """ Patch `os.stat` behaviour for this test case.
305 When the patched function is called, the registry of
306 `FileDouble` instances for this test case will be used to get
307 the instance for the path specified.
310 orig_os_stat = os.stat
312 def fake_os_stat(path):
313 registry = FileDouble.get_registry_for_testcase(testcase)
314 if path in registry:
315 file_double = registry[path]
316 result = file_double.os_stat_scenario.call_hook()
317 else:
318 result = orig_os_stat(path)
319 return result
321 func_patcher = mock.patch.object(
322 os, "stat", autospec=True,
323 side_effect=fake_os_stat)
324 func_patcher.start()
325 testcase.addCleanup(func_patcher.stop)
328 def patch_os_lstat(testcase):
329 """ Patch `os.lstat` behaviour for this test case.
331 When the patched function is called, the registry of
332 `FileDouble` instances for this test case will be used to get
333 the instance for the path specified.
336 orig_os_lstat = os.lstat
338 def fake_os_lstat(path):
339 registry = FileDouble.get_registry_for_testcase(testcase)
340 if path in registry:
341 file_double = registry[path]
342 result = file_double.os_lstat_scenario.call_hook()
343 else:
344 result = orig_os_lstat(path)
345 return result
347 func_patcher = mock.patch.object(
348 os, "lstat", autospec=True,
349 side_effect=fake_os_lstat)
350 func_patcher.start()
351 testcase.addCleanup(func_patcher.stop)
354 def patch_os_unlink(testcase):
355 """ Patch `os.unlink` behaviour for this test case.
357 When the patched function is called, the registry of
358 `FileDouble` instances for this test case will be used to get
359 the instance for the path specified.
362 orig_os_unlink = os.unlink
364 def fake_os_unlink(path):
365 registry = FileDouble.get_registry_for_testcase(testcase)
366 if path in registry:
367 file_double = registry[path]
368 result = file_double.os_unlink_scenario.call_hook()
369 else:
370 result = orig_os_unlink(path)
371 return result
373 func_patcher = mock.patch.object(
374 os, "unlink", autospec=True,
375 side_effect=fake_os_unlink)
376 func_patcher.start()
377 testcase.addCleanup(func_patcher.stop)
380 def patch_os_rmdir(testcase):
381 """ Patch `os.rmdir` behaviour for this test case.
383 When the patched function is called, the registry of
384 `FileDouble` instances for this test case will be used to get
385 the instance for the path specified.
388 orig_os_rmdir = os.rmdir
390 def fake_os_rmdir(path):
391 registry = FileDouble.get_registry_for_testcase(testcase)
392 if path in registry:
393 file_double = registry[path]
394 result = file_double.os_rmdir_scenario.call_hook()
395 else:
396 result = orig_os_rmdir(path)
397 return result
399 func_patcher = mock.patch.object(
400 os, "rmdir", autospec=True,
401 side_effect=fake_os_rmdir)
402 func_patcher.start()
403 testcase.addCleanup(func_patcher.stop)
406 def patch_shutil_rmtree(testcase):
407 """ Patch `shutil.rmtree` behaviour for this test case.
409 When the patched function is called, the registry of
410 `FileDouble` instances for this test case will be used to get
411 the instance for the path specified.
414 orig_shutil_rmtree = os.rmdir
416 def fake_shutil_rmtree(path, ignore_errors=False, onerror=None):
417 registry = FileDouble.get_registry_for_testcase(testcase)
418 if path in registry:
419 file_double = registry[path]
420 result = file_double.shutil_rmtree_scenario.call_hook()
421 else:
422 result = orig_shutil_rmtree(path)
423 return result
425 func_patcher = mock.patch.object(
426 shutil, "rmtree", autospec=True,
427 side_effect=fake_shutil_rmtree)
428 func_patcher.start()
429 testcase.addCleanup(func_patcher.stop)
432 def patch_tempfile_mkdtemp(testcase):
433 """ Patch the `tempfile.mkdtemp` function for this test case. """
434 if not hasattr(testcase, 'tempfile_mkdtemp_file_double'):
435 testcase.tempfile_mkdtemp_file_double = FileDouble(tempfile.mktemp())
437 double = testcase.tempfile_mkdtemp_file_double
438 double.set_os_unlink_scenario('okay')
439 double.set_os_rmdir_scenario('okay')
440 double.register_for_testcase(testcase)
442 func_patcher = mock.patch.object(tempfile, "mkdtemp", autospec=True)
443 func_patcher.start()
444 testcase.addCleanup(func_patcher.stop)
446 tempfile.mkdtemp.return_value = testcase.tempfile_mkdtemp_file_double.path
449 try:
450 FileNotFoundError
451 FileExistsError
452 PermissionError
453 except NameError:
454 # Python 2 uses IOError.
455 def _ensure_ioerror_args(init_args, init_kwargs, errno_value):
456 result_kwargs = init_kwargs
457 result_errno = errno_value
458 result_strerror = os.strerror(errno_value)
459 result_filename = None
460 if len(init_args) >= 3:
461 result_errno = init_args[0]
462 result_filename = init_args[2]
463 if 'errno' in init_kwargs:
464 result_errno = init_kwargs['errno']
465 del result_kwargs['errno']
466 if 'filename' in init_kwargs:
467 result_filename = init_kwargs['filename']
468 del result_kwargs['filename']
469 if len(init_args) >= 2:
470 result_strerror = init_args[1]
471 if 'strerror' in init_kwargs:
472 result_strerror = init_kwargs['strerror']
473 del result_kwargs['strerror']
474 result_args = (result_errno, result_strerror, result_filename)
475 return (result_args, result_kwargs)
477 class FileNotFoundError(IOError):
478 def __init__(self, *args, **kwargs):
479 (args, kwargs) = _ensure_ioerror_args(
480 args, kwargs, errno_value=errno.ENOENT)
481 super(FileNotFoundError, self).__init__(*args, **kwargs)
483 class FileExistsError(IOError):
484 def __init__(self, *args, **kwargs):
485 (args, kwargs) = _ensure_ioerror_args(
486 args, kwargs, errno_value=errno.EEXIST)
487 super(FileExistsError, self).__init__(*args, **kwargs)
489 class PermissionError(IOError):
490 def __init__(self, *args, **kwargs):
491 (args, kwargs) = _ensure_ioerror_args(
492 args, kwargs, errno_value=errno.EPERM)
493 super(PermissionError, self).__init__(*args, **kwargs)
496 def make_fake_file_scenarios(path=None):
497 """ Make a collection of scenarios for testing with fake files.
499 :path: The filesystem path of the fake file. If not specified,
500 a valid random path will be generated.
501 :return: A collection of scenarios for tests involving input files.
503 The collection is a mapping from scenario name to a dictionary of
504 scenario attributes.
508 if path is None:
509 file_path = tempfile.mktemp()
510 else:
511 file_path = path
513 fake_file_empty = StringIO()
514 fake_file_minimal = StringIO("Lorem ipsum.")
515 fake_file_large = StringIO("\n".join(
516 "ABCDEFGH" * 100
517 for __ in range(1000)))
519 default_scenario_params = {
520 'open_scenario_name': 'okay',
521 'file_double_params': dict(
522 path=file_path, fake_file=fake_file_minimal),
525 scenarios = {
526 'default': {},
527 'error-not-exist': {
528 'open_scenario_name': 'nonexist',
530 'error-exist': {
531 'open_scenario_name': 'exist_error',
533 'error-read-denied': {
534 'open_scenario_name': 'read_denied',
536 'not-found': {
537 'file_double_params': dict(
538 path=file_path, fake_file=fake_file_empty),
540 'exist-empty': {
541 'file_double_params': dict(
542 path=file_path, fake_file=fake_file_empty),
544 'exist-minimal': {
545 'file_double_params': dict(
546 path=file_path, fake_file=fake_file_minimal),
548 'exist-large': {
549 'file_double_params': dict(
550 path=file_path, fake_file=fake_file_large),
554 for (name, scenario) in scenarios.items():
555 params = default_scenario_params.copy()
556 params.update(scenario)
557 scenario.update(params)
558 scenario['file_double'] = FileDouble(**scenario['file_double_params'])
559 scenario['file_double'].set_open_scenario(params['open_scenario_name'])
560 scenario['fake_file_scenario_name'] = name
562 return scenarios
565 def get_file_doubles_from_fake_file_scenarios(scenarios):
566 """ Get the `FileDouble` instances from fake file scenarios.
568 :param scenarios: Collection of fake file scenarios.
569 :return: Collection of `FileDouble` instances.
572 doubles = set(
573 scenario['file_double']
574 for scenario in scenarios
575 if scenario['file_double'] is not None)
577 return doubles
580 def setup_file_double_behaviour(testcase, doubles=None):
581 """ Set up file double instances and behaviour.
583 :param testcase: The `TestCase` instance to modify.
584 :param doubles: Collection of `FileDouble` instances.
585 :return: None.
587 If `doubles` is ``None``, a default collection will be made
588 from the result of `make_fake_file_scenarios` result.
591 if doubles is None:
592 scenarios = make_fake_file_scenarios()
593 doubles = get_file_doubles_from_fake_file_scenarios(
594 scenarios.values())
596 for file_double in doubles:
597 file_double.register_for_testcase(testcase)
599 orig_open = builtins.open
601 def fake_open(path, mode='rt', buffering=-1):
602 registry = FileDouble.get_registry_for_testcase(testcase)
603 if path in registry:
604 file_double = registry[path]
605 result = file_double.builtins_open_scenario.call_hook(
606 mode, buffering)
607 else:
608 result = orig_open(path, mode, buffering)
609 return result
611 mock_open = mock.mock_open()
612 mock_open.side_effect = fake_open
614 func_patcher = mock.patch.object(
615 builtins, "open", new=mock_open)
616 func_patcher.start()
617 testcase.addCleanup(func_patcher.stop)
620 def setup_fake_file_fixtures(testcase):
621 """ Set up fixtures for fake file doubles.
623 :param testcase: The `TestCase` instance to modify.
624 :return: None.
627 scenarios = make_fake_file_scenarios()
628 testcase.fake_file_scenarios = scenarios
630 file_doubles = get_file_doubles_from_fake_file_scenarios(
631 scenarios.values())
632 setup_file_double_behaviour(testcase, file_doubles)
635 def set_fake_file_scenario(testcase, name):
636 """ Set the named fake file scenario for the test case. """
637 scenario = testcase.fake_file_scenarios[name]
638 testcase.fake_file_scenario = scenario
639 testcase.file_double = scenario['file_double']
640 testcase.file_double.register_for_testcase(testcase)
643 class TestDoubleFunctionScenario:
644 """ Scenario for fake behaviour of a specific function. """
646 def __init__(self, scenario_name, double):
647 self.scenario_name = scenario_name
648 self.double = double
650 self.call_hook = getattr(
651 self, "_hook_{name}".format(name=self.scenario_name))
653 def __repr__(self):
654 text = (
655 "<{class_name} instance: {id}"
656 " name: {name!r},"
657 " call_hook name: {hook_name!r}"
658 " double: {double!r}"
659 ">").format(
660 class_name=self.__class__.__name__, id=id(self),
661 name=self.scenario_name, double=self.double,
662 hook_name=self.call_hook.__name__)
663 return text
665 def __eq__(self, other):
666 result = True
667 if not self.scenario_name == other.scenario_name:
668 result = False
669 if not self.double == other.double:
670 result = False
671 if not self.call_hook.__name__ == other.call_hook.__name__:
672 result = False
673 return result
675 def __ne__(self, other):
676 result = not self.__eq__(other)
677 return result
680 class os_path_exists_scenario(TestDoubleFunctionScenario):
681 """ Scenario for `os.path.exists` behaviour. """
683 def _hook_exist(self):
684 return True
686 def _hook_not_exist(self):
687 return False
690 class os_access_scenario(TestDoubleFunctionScenario):
691 """ Scenario for `os.access` behaviour. """
693 def _hook_okay(self, mode):
694 return True
696 def _hook_not_exist(self, mode):
697 return False
699 def _hook_read_only(self, mode):
700 if mode & (os.W_OK | os.X_OK):
701 result = False
702 else:
703 result = True
704 return result
706 def _hook_denied(self, mode):
707 if mode & (os.R_OK | os.W_OK | os.X_OK):
708 result = False
709 else:
710 result = True
711 return result
714 class os_stat_scenario(TestDoubleFunctionScenario):
715 """ Scenario for `os.stat` behaviour. """
717 def _hook_okay(self):
718 return self.double.stat_result
720 def _hook_notfound_error(self):
721 raise FileNotFoundError(
722 self.double.path,
723 "No such file or directory: {path!r}".format(
724 path=self.double.path))
726 def _hook_denied_error(self):
727 raise PermissionError(
728 self.double.path,
729 "Permission denied")
732 class os_lstat_scenario(os_stat_scenario):
733 """ Scenario for `os.lstat` behaviour. """
736 class os_unlink_scenario(TestDoubleFunctionScenario):
737 """ Scenario for `os.unlink` behaviour. """
739 def _hook_okay(self):
740 return None
742 def _hook_nonexist(self):
743 error = FileNotFoundError(
744 self.double.path,
745 "No such file or directory: {path!r}".format(
746 path=self.double.path))
747 raise error
749 def _hook_denied(self):
750 error = PermissionError(
751 self.double.path,
752 "Permission denied")
753 raise error
756 class os_rmdir_scenario(TestDoubleFunctionScenario):
757 """ Scenario for `os.rmdir` behaviour. """
759 def _hook_okay(self):
760 return None
762 def _hook_nonexist(self):
763 error = FileNotFoundError(
764 self.double.path,
765 "No such file or directory: {path!r}".format(
766 path=self.double.path))
767 raise error
769 def _hook_denied(self):
770 error = PermissionError(
771 self.double.path,
772 "Permission denied")
773 raise error
776 class shutil_rmtree_scenario(TestDoubleFunctionScenario):
777 """ Scenario for `shutil.rmtree` behaviour. """
779 def _hook_okay(self):
780 return None
782 def _hook_nonexist(self):
783 error = FileNotFoundError(
784 self.double.path,
785 "No such file or directory: {path!r}".format(
786 path=self.double.path))
787 raise error
789 def _hook_denied(self):
790 error = PermissionError(
791 self.double.path,
792 "Permission denied")
793 raise error
796 class builtins_open_scenario(TestDoubleFunctionScenario):
797 """ Scenario for `builtins.open` behaviour. """
799 def _hook_okay(self, mode, buffering):
800 result = self.double.fake_file
801 return result
803 def _hook_nonexist(self, mode, buffering):
804 if mode.startswith('r'):
805 error = FileNotFoundError(
806 self.double.path,
807 "No such file or directory: {path!r}".format(
808 path=self.double.path))
809 raise error
810 result = self.double.fake_file
811 return result
813 def _hook_exist_error(self, mode, buffering):
814 if mode.startswith('w') or mode.startswith('a'):
815 error = FileExistsError(
816 self.double.path,
817 "File already exists: {path!r}".format(
818 path=self.double.path))
819 raise error
820 result = self.double.fake_file
821 return result
823 def _hook_read_denied(self, mode, buffering):
824 if mode.startswith('r'):
825 error = PermissionError(
826 self.double.path,
827 "Read denied on {path!r}".format(
828 path=self.double.path))
829 raise error
830 result = self.double.fake_file
831 return result
833 def _hook_write_denied(self, mode, buffering):
834 if mode.startswith('w') or mode.startswith('a'):
835 error = PermissionError(
836 self.double.path,
837 "Write denied on {path!r}".format(
838 path=self.double.path))
839 raise error
840 result = self.double.fake_file
841 return result
844 class TestDoubleWithRegistry:
845 """ Abstract base class for a test double with a test case registry. """
847 registry_class = NotImplemented
848 registries = NotImplemented
850 function_scenario_params_by_class = NotImplemented
852 def __new__(cls, *args, **kwargs):
853 superclass = super(TestDoubleWithRegistry, cls)
854 if superclass.__new__ is object.__new__:
855 # The ‘object’ implementation complains about extra arguments.
856 instance = superclass.__new__(cls)
857 else:
858 instance = superclass.__new__(cls, *args, **kwargs)
859 instance.make_set_scenario_methods()
861 return instance
863 def __init__(self, *args, **kwargs):
864 super(TestDoubleWithRegistry, self).__init__(*args, **kwargs)
865 self._set_method_per_scenario()
867 def _make_set_scenario_method(self, scenario_class, params):
868 def method(self, name):
869 scenario = scenario_class(name, double=self)
870 setattr(self, scenario_class.__name__, scenario)
871 method.__doc__ = (
872 """ Set the scenario for `{name}` behaviour. """
873 ).format(name=scenario_class.__name__)
874 method.__name__ = str(params['set_scenario_method_name'])
875 return method
877 def make_set_scenario_methods(self):
878 """ Make `set_<scenario_class_name>` methods on this class. """
879 for (function_scenario_class, function_scenario_params) in (
880 self.function_scenario_params_by_class.items()):
881 method = self._make_set_scenario_method(
882 function_scenario_class, function_scenario_params)
883 setattr(self.__class__, method.__name__, method)
884 function_scenario_params['set_scenario_method'] = method
886 def _set_method_per_scenario(self):
887 """ Set the method to be called for each scenario. """
888 for function_scenario_params in (
889 self.function_scenario_params_by_class.values()):
890 function_scenario_params['set_scenario_method'](
891 self, function_scenario_params['default_scenario_name'])
893 @classmethod
894 def get_registry_for_testcase(cls, testcase):
895 """ Get the FileDouble registry for the specified test case. """
896 # Key in a dict must be hashable.
897 key = (testcase.__class__, id(testcase))
898 registry = cls.registries.setdefault(key, cls.registry_class())
899 return registry
901 def get_registry_key(self):
902 """ Get the registry key for this double. """
903 raise NotImplementedError
905 def register_for_testcase(self, testcase):
906 """ Add this instance to registry for the specified testcase. """
907 registry = self.get_registry_for_testcase(testcase)
908 key = self.get_registry_key()
909 registry[key] = self
910 unregister_func = functools.partial(
911 self.unregister_for_testcase, testcase)
912 testcase.addCleanup(unregister_func)
914 def unregister_for_testcase(self, testcase):
915 """ Remove this instance from registry for the specified testcase. """
916 registry = self.get_registry_for_testcase(testcase)
917 key = self.get_registry_key()
918 if key in registry:
919 registry.pop(key)
922 def copy_fake_file(fake_file):
923 """ Make a copy of the StringIO instance. """
924 fake_file_type = StringIO
925 content = ""
926 if fake_file is not None:
927 fake_file_type = type(fake_file)
928 content = fake_file.getvalue()
929 assert issubclass(fake_file_type, object)
930 result = fake_file_type(content)
931 if hasattr(fake_file, 'encoding'):
932 if not hasattr(result, 'encoding'):
933 result.encoding = fake_file.encoding
934 return result
937 class FileDouble(TestDoubleWithRegistry):
938 """ A testing double for a file. """
940 registry_class = dict
941 registries = {}
943 function_scenario_params_by_class = {
944 os_path_exists_scenario: {
945 'default_scenario_name': 'not_exist',
946 'set_scenario_method_name': 'set_os_path_exists_scenario',
948 os_access_scenario: {
949 'default_scenario_name': 'okay',
950 'set_scenario_method_name': 'set_os_access_scenario',
952 os_stat_scenario: {
953 'default_scenario_name': 'okay',
954 'set_scenario_method_name': 'set_os_stat_scenario',
956 os_lstat_scenario: {
957 'default_scenario_name': 'okay',
958 'set_scenario_method_name': 'set_os_lstat_scenario',
960 builtins_open_scenario: {
961 'default_scenario_name': 'okay',
962 'set_scenario_method_name': 'set_open_scenario',
964 os_unlink_scenario: {
965 'default_scenario_name': 'okay',
966 'set_scenario_method_name': 'set_os_unlink_scenario',
968 os_rmdir_scenario: {
969 'default_scenario_name': 'okay',
970 'set_scenario_method_name': 'set_os_rmdir_scenario',
972 shutil_rmtree_scenario: {
973 'default_scenario_name': 'okay',
974 'set_scenario_method_name': 'set_shutil_rmtree_scenario',
978 def __init__(self, path=None, fake_file=None, *args, **kwargs):
979 self.path = path
980 self.fake_file = copy_fake_file(fake_file)
981 self.fake_file.name = path
983 self._set_stat_result()
985 super(FileDouble, self).__init__(*args, **kwargs)
987 def _set_stat_result(self):
988 """ Set the `os.stat` result for this file. """
989 size = len(self.fake_file.getvalue())
990 self.stat_result = StatResult(
991 st_mode=0,
992 st_ino=None, st_dev=None, st_nlink=None,
993 st_uid=0, st_gid=0,
994 st_size=size,
995 st_atime=None, st_mtime=None, st_ctime=None,
998 def __repr__(self):
999 text = "FileDouble(path={path!r}, fake_file={fake_file!r})".format(
1000 path=self.path, fake_file=self.fake_file)
1001 return text
1003 def get_registry_key(self):
1004 """ Get the registry key for this double. """
1005 result = self.path
1006 return result
1009 class os_popen_scenario(TestDoubleFunctionScenario):
1010 """ Scenario for `os.popen` behaviour. """
1012 stream_name_by_mode = {
1013 'w': 'stdin',
1014 'r': 'stdout',
1017 def _hook_success(self, argv, mode, buffering):
1018 stream_name = self.stream_name_by_mode[mode]
1019 stream_double = getattr(
1020 self.double, stream_name + '_double')
1021 result = stream_double.fake_file
1022 return result
1024 def _hook_failure(self, argv, mode, buffering):
1025 result = StringIO()
1026 return result
1028 def _hook_not_found(self, argv, mode, buffering):
1029 result = StringIO()
1030 return result
1033 class os_waitpid_scenario(TestDoubleFunctionScenario):
1034 """ Scenario for `os.waitpid` behaviour. """
1036 def _hook_success(self, pid, options):
1037 result = (pid, EXIT_STATUS_SUCCESS)
1038 return result
1040 def _hook_failure(self, pid, options):
1041 result = (pid, EXIT_STATUS_FAILURE)
1042 return result
1044 def _hook_not_found(self, pid, options):
1045 error = OSError(errno.ECHILD)
1046 raise error
1049 class os_system_scenario(TestDoubleFunctionScenario):
1050 """ Scenario for `os.system` behaviour. """
1052 def _hook_success(self, command):
1053 result = EXIT_STATUS_SUCCESS
1054 return result
1056 def _hook_failure(self, command):
1057 result = EXIT_STATUS_FAILURE
1058 return result
1060 def _hook_not_found(self, command):
1061 result = EXIT_STATUS_COMMAND_NOT_FOUND
1062 return result
1065 class os_spawnv_scenario(TestDoubleFunctionScenario):
1066 """ Scenario for `os.spawnv` behaviour. """
1068 def _hook_success(self, mode, file, args):
1069 result = EXIT_STATUS_SUCCESS
1070 return result
1072 def _hook_failure(self, mode, file, args):
1073 result = EXIT_STATUS_FAILURE
1074 return result
1076 def _hook_not_found(self, mode, file, args):
1077 result = EXIT_STATUS_COMMAND_NOT_FOUND
1078 return result
1081 ARG_ANY = object()
1082 ARG_MORE = object()
1085 class PopenDouble:
1086 """ A testing double for `subprocess.Popen`. """
1088 def __init__(self, args, *posargs, **kwargs):
1089 self.stdin = None
1090 self.stdout = None
1091 self.stderr = None
1092 self.pid = None
1093 self.returncode = None
1095 if kwargs.get('shell', False):
1096 self.argv = shlex.split(args)
1097 else:
1098 # The paramter is already a sequence of command-line arguments.
1099 self.argv = args
1101 def set_streams(self, subprocess_double, popen_kwargs):
1102 """ Set the streams on the `PopenDouble`.
1104 :param subprocess_double: The `SubprocessDouble` from
1105 which to get existing stream doubles.
1106 :param popen_kwargs: The keyword arguments to the
1107 `subprocess.Popen` call.
1108 :return: ``None``.
1111 for stream_name in (
1112 name for name in ['stdin', 'stdout', 'stderr']
1113 if name in popen_kwargs):
1114 stream_spec = popen_kwargs[stream_name]
1115 if stream_spec is subprocess.PIPE:
1116 stream_double = getattr(
1117 subprocess_double,
1118 "{name}_double".format(name=stream_name))
1119 stream_file = stream_double.fake_file
1120 elif stream_spec is subprocess.STDOUT:
1121 stream_file = subprocess_double.stdout_double.fake_file
1122 else:
1123 stream_file = stream_spec
1124 setattr(self, stream_name, stream_file)
1126 def wait(self):
1127 """ Wait for subprocess to terminate. """
1128 return self.returncode
1131 class subprocess_popen_scenario(TestDoubleFunctionScenario):
1132 """ Scenario for `subprocess.Popen` behaviour. """
1134 def _hook_success(self, testcase, args, *posargs, **kwargs):
1135 double = self.double.popen_double
1136 double.set_streams(self.double, kwargs)
1137 return double
1140 def patch_subprocess_popen(testcase):
1141 """ Patch `subprocess.Popen` constructor for this test case.
1143 :param testcase: The `TestCase` instance to modify.
1144 :return: None.
1146 When the patched function is called, the registry of
1147 `SubprocessDouble` instances for this test case will be used
1148 to get the instance for the program path specified.
1151 orig_subprocess_popen = subprocess.Popen
1153 def fake_subprocess_popen(args, *posargs, **kwargs):
1154 if kwargs.get('shell', False):
1155 argv = shlex.split(args)
1156 else:
1157 argv = args
1158 registry = SubprocessDouble.get_registry_for_testcase(testcase)
1159 if argv in registry:
1160 subprocess_double = registry[argv]
1161 result = subprocess_double.subprocess_popen_scenario.call_hook(
1162 testcase, args, *posargs, **kwargs)
1163 else:
1164 result = orig_subprocess_popen(args, *posargs, **kwargs)
1165 return result
1167 func_patcher = mock.patch.object(
1168 subprocess, "Popen", autospec=True,
1169 side_effect=fake_subprocess_popen)
1170 func_patcher.start()
1171 testcase.addCleanup(func_patcher.stop)
1174 class subprocess_call_scenario(TestDoubleFunctionScenario):
1175 """ Scenario for `subprocess.call` behaviour. """
1177 def _hook_success(self, command):
1178 result = EXIT_STATUS_SUCCESS
1179 return result
1181 def _hook_failure(self, command):
1182 result = EXIT_STATUS_FAILURE
1183 return result
1185 def _hook_not_found(self, command):
1186 result = EXIT_STATUS_COMMAND_NOT_FOUND
1187 return result
1190 def patch_subprocess_call(testcase):
1191 """ Patch `subprocess.call` function for this test case.
1193 :param testcase: The `TestCase` instance to modify.
1194 :return: None.
1196 When the patched function is called, the registry of
1197 `SubprocessDouble` instances for this test case will be used
1198 to get the instance for the program path specified.
1201 orig_subprocess_call = subprocess.call
1203 def fake_subprocess_call(command, *posargs, **kwargs):
1204 if kwargs.get('shell', False):
1205 command_argv = shlex.split(command)
1206 else:
1207 command_argv = command
1208 registry = SubprocessDouble.get_registry_for_testcase(testcase)
1209 if command_argv in registry:
1210 subprocess_double = registry[command_argv]
1211 result = subprocess_double.subprocess_call_scenario.call_hook(
1212 command)
1213 else:
1214 result = orig_subprocess_call(command, *posargs, **kwargs)
1215 return result
1217 func_patcher = mock.patch.object(
1218 subprocess, "call", autospec=True,
1219 side_effect=fake_subprocess_call)
1220 func_patcher.start()
1221 testcase.addCleanup(func_patcher.stop)
1224 class subprocess_check_call_scenario(TestDoubleFunctionScenario):
1225 """ Scenario for `subprocess.check_call` behaviour. """
1227 def _hook_success(self, command):
1228 return None
1230 def _hook_failure(self, command):
1231 result = EXIT_STATUS_FAILURE
1232 error = subprocess.CalledProcessError(result, command)
1233 raise error
1235 def _hook_not_found(self, command):
1236 result = EXIT_STATUS_COMMAND_NOT_FOUND
1237 error = subprocess.CalledProcessError(result, command)
1238 raise error
1241 def patch_subprocess_check_call(testcase):
1242 """ Patch `subprocess.check_call` function for this test case.
1244 :param testcase: The `TestCase` instance to modify.
1245 :return: None.
1247 When the patched function is called, the registry of
1248 `SubprocessDouble` instances for this test case will be used
1249 to get the instance for the program path specified.
1252 orig_subprocess_check_call = subprocess.check_call
1254 def fake_subprocess_check_call(command, *posargs, **kwargs):
1255 if kwargs.get('shell', False):
1256 command_argv = shlex.split(command)
1257 else:
1258 command_argv = command
1259 registry = SubprocessDouble.get_registry_for_testcase(testcase)
1260 if command_argv in registry:
1261 subprocess_double = registry[command_argv]
1262 scenario = subprocess_double.subprocess_check_call_scenario
1263 result = scenario.call_hook(command)
1264 else:
1265 result = orig_subprocess_check_call(command, *posargs, **kwargs)
1266 return result
1268 func_patcher = mock.patch.object(
1269 subprocess, "check_call", autospec=True,
1270 side_effect=fake_subprocess_check_call)
1271 func_patcher.start()
1272 testcase.addCleanup(func_patcher.stop)
1275 class SubprocessDoubleRegistry(collections_abc.MutableMapping):
1276 """ Registry of `SubprocessDouble` instances by `argv`. """
1278 def __init__(self, *args, **kwargs):
1279 items = []
1280 if args:
1281 if isinstance(args[0], collections_abc.Mapping):
1282 items = args[0].items()
1283 if isinstance(args[0], collections_abc.Iterable):
1284 items = args[0]
1285 self._mapping = dict(items)
1287 def __repr__(self):
1288 text = "<{class_name} object: {mapping}>".format(
1289 class_name=self.__class__.__name__, mapping=self._mapping)
1290 return text
1292 def _match_argv(self, argv):
1293 """ Match the specified `argv` with our registered keys. """
1294 match = None
1295 if not isinstance(argv, collections_abc.Sequence):
1296 return match
1297 candidates = iter(self._mapping)
1298 while match is None:
1299 try:
1300 candidate = next(candidates)
1301 except StopIteration:
1302 break
1303 found = None
1304 if candidate == argv:
1305 # An exact match.
1306 found = True
1307 word_iter = enumerate(candidate)
1308 while found is None:
1309 try:
1310 (word_index, candidate_word) = next(word_iter)
1311 except StopIteration:
1312 break
1313 if candidate_word is ARG_MORE:
1314 # Candiate matches any remaining words. We have a match.
1315 found = True
1316 elif word_index > len(argv):
1317 # Candidate is too long for the specified argv.
1318 found = False
1319 elif candidate_word is ARG_ANY:
1320 # Candidate matches any word at this position.
1321 continue
1322 elif candidate_word == argv[word_index]:
1323 # Candidate matches the word at this position.
1324 continue
1325 else:
1326 # This candidate does not match.
1327 found = False
1328 if found is None:
1329 # Reached the end of the candidate without a mismatch.
1330 found = True
1331 if found:
1332 match = candidate
1333 return match
1335 def __getitem__(self, key):
1336 match = self._match_argv(key)
1337 if match is None:
1338 raise KeyError(key)
1339 result = self._mapping[match]
1340 return result
1342 def __setitem__(self, key, value):
1343 if key in self:
1344 del self[key]
1345 self._mapping[key] = value
1347 def __delitem__(self, key):
1348 match = self._match_argv(key)
1349 if match is not None:
1350 del self._mapping[match]
1352 def __iter__(self):
1353 return self._mapping.__iter__()
1355 def __len__(self):
1356 return self._mapping.__len__()
1359 class SubprocessDouble(TestDoubleWithRegistry):
1360 """ A testing double for a subprocess. """
1362 registry_class = SubprocessDoubleRegistry
1363 registries = {}
1365 double_by_pid = weakref.WeakValueDictionary()
1367 function_scenario_params_by_class = {
1368 subprocess_popen_scenario: {
1369 'default_scenario_name': 'success',
1370 'set_scenario_method_name': 'set_subprocess_popen_scenario',
1372 subprocess_call_scenario: {
1373 'default_scenario_name': 'success',
1374 'set_scenario_method_name': 'set_subprocess_call_scenario',
1376 subprocess_check_call_scenario: {
1377 'default_scenario_name': 'success',
1378 'set_scenario_method_name':
1379 'set_subprocess_check_call_scenario',
1381 os_popen_scenario: {
1382 'default_scenario_name': 'success',
1383 'set_scenario_method_name': 'set_os_popen_scenario',
1385 os_waitpid_scenario: {
1386 'default_scenario_name': 'success',
1387 'set_scenario_method_name': 'set_os_waitpid_scenario',
1389 os_system_scenario: {
1390 'default_scenario_name': 'success',
1391 'set_scenario_method_name': 'set_os_system_scenario',
1393 os_spawnv_scenario: {
1394 'default_scenario_name': 'success',
1395 'set_scenario_method_name': 'set_os_spawnv_scenario',
1399 def __init__(self, path=None, argv=None, *args, **kwargs):
1400 if path is None:
1401 path = tempfile.mktemp()
1402 self.path = path
1404 if argv is None:
1405 command_name = os.path.basename(path)
1406 argv = [command_name]
1407 self.argv = argv
1409 self.pid = self._make_pid()
1410 self._register_by_pid()
1412 self.set_popen_double()
1414 stream_class = SubprocessDouble.stream_class
1415 for stream_name in ['stdin', 'stdout', 'stderr']:
1416 fake_file = stream_class()
1417 file_double = FileDouble(fake_file=fake_file)
1418 stream_double_name = '{name}_double'.format(name=stream_name)
1419 setattr(self, stream_double_name, file_double)
1421 super(SubprocessDouble, self).__init__(*args, **kwargs)
1423 def set_popen_double(self):
1424 """ Set the `PopenDouble` for this instance. """
1425 double = PopenDouble(self.argv)
1426 double.pid = self.pid
1428 self.popen_double = double
1430 def __repr__(self):
1431 text = (
1432 "<SubprocessDouble instance: {id}"
1433 " path: {path!r},"
1434 " argv: {argv!r}"
1435 " stdin_double: {stdin_double!r}"
1436 " stdout_double: {stdout_double!r}"
1437 " stderr_double: {stderr_double!r}"
1438 ">").format(
1439 id=id(self),
1440 path=self.path, argv=self.argv,
1441 stdin_double=self.stdin_double,
1442 stdout_double=self.stdout_double,
1443 stderr_double=self.stderr_double)
1444 return text
1446 @classmethod
1447 def _make_pid(cls):
1448 """ Make a unique PID for a subprocess. """
1449 for pid in itertools.count(1):
1450 yield pid
1452 def _register_by_pid(self):
1453 """ Register this subprocess by its PID. """
1454 self.__class__.double_by_pid[self.pid] = self
1456 def get_registry_key(self):
1457 """ Get the registry key for this double. """
1458 result = tuple(self.argv)
1459 return result
1461 stream_class = io.BytesIO
1462 stream_encoding = "utf-8"
1464 def set_stdin_content(self, text, bytes_encoding=stream_encoding):
1465 """ Set the content of the `stdin` stream for this double. """
1466 content = text.encode(bytes_encoding)
1467 fake_file = self.stream_class(content)
1468 self.stdin_double.fake_file = fake_file
1470 def set_stdout_content(self, text, bytes_encoding=stream_encoding):
1471 """ Set the content of the `stdout` stream for this double. """
1472 content = text.encode(bytes_encoding)
1473 fake_file = self.stream_class(content)
1474 self.stdout_double.fake_file = fake_file
1476 def set_stderr_content(self, text, bytes_encoding=stream_encoding):
1477 """ Set the content of the `stderr` stream for this double. """
1478 content = text.encode(bytes_encoding)
1479 fake_file = self.stream_class(content)
1480 self.stderr_double.fake_file = fake_file
1483 def make_fake_subprocess_scenarios(path=None):
1484 """ Make a collection of scenarios for testing with fake files.
1486 :path: The filesystem path of the fake program. If not specified,
1487 a valid random path will be generated.
1488 :return: A collection of scenarios for tests involving subprocesses.
1490 The collection is a mapping from scenario name to a dictionary of
1491 scenario attributes.
1494 if path is None:
1495 file_path = tempfile.mktemp()
1496 else:
1497 file_path = path
1499 default_scenario_params = {
1500 'return_value': EXIT_STATUS_SUCCESS,
1501 'program_path': file_path,
1502 'argv_after_command_name': [],
1505 scenarios = {
1506 'default': {},
1507 'not-found': {
1508 'return_value': EXIT_STATUS_COMMAND_NOT_FOUND,
1512 for (name, scenario) in scenarios.items():
1513 params = default_scenario_params.copy()
1514 params.update(scenario)
1515 scenario.update(params)
1516 program_path = params['program_path']
1517 program_name = os.path.basename(params['program_path'])
1518 argv = [program_name]
1519 argv.extend(params['argv_after_command_name'])
1520 subprocess_double_params = dict(
1521 path=program_path,
1522 argv=argv,
1524 subprocess_double = SubprocessDouble(**subprocess_double_params)
1525 scenario['subprocess_double'] = subprocess_double
1526 scenario['fake_file_scenario_name'] = name
1528 return scenarios
1531 def get_subprocess_doubles_from_fake_subprocess_scenarios(scenarios):
1532 """ Get the `SubprocessDouble` instances from fake subprocess scenarios.
1534 :param scenarios: Collection of fake subprocess scenarios.
1535 :return: Collection of `SubprocessDouble` instances.
1538 doubles = set(
1539 scenario['subprocess_double']
1540 for scenario in scenarios
1541 if scenario['subprocess_double'] is not None)
1543 return doubles
1546 def setup_subprocess_double_behaviour(testcase, doubles=None):
1547 """ Set up subprocess double instances and behaviour.
1549 :param testcase: The `TestCase` instance to modify.
1550 :param doubles: Collection of `SubprocessDouble` instances.
1551 :return: None.
1553 If `doubles` is ``None``, a default collection will be made
1554 from the return value of `make_fake_subprocess_scenarios`.
1557 if doubles is None:
1558 scenarios = make_fake_subprocess_scenarios()
1559 doubles = get_subprocess_doubles_from_fake_subprocess_scenarios(
1560 scenarios.values())
1562 for double in doubles:
1563 double.register_for_testcase(testcase)
1566 def setup_fake_subprocess_fixtures(testcase):
1567 """ Set up fixtures for fake subprocess doubles.
1569 :param testcase: The `TestCase` instance to modify.
1570 :return: None.
1573 scenarios = make_fake_subprocess_scenarios()
1574 testcase.fake_subprocess_scenarios = scenarios
1576 doubles = get_subprocess_doubles_from_fake_subprocess_scenarios(
1577 scenarios.values())
1578 setup_subprocess_double_behaviour(testcase, doubles)
1581 def patch_os_popen(testcase):
1582 """ Patch `os.popen` behaviour for this test case.
1584 :param testcase: The `TestCase` instance to modify.
1585 :return: None.
1587 When the patched function is called, the registry of
1588 `SubprocessDouble` instances for this test case will be used
1589 to get the instance for the program path specified.
1592 orig_os_popen = os.popen
1594 def fake_os_popen(cmd, mode='r', buffering=-1):
1595 registry = SubprocessDouble.get_registry_for_testcase(testcase)
1596 if isinstance(cmd, basestring):
1597 command_argv = shlex.split(cmd)
1598 else:
1599 command_argv = cmd
1600 if command_argv in registry:
1601 subprocess_double = registry[command_argv]
1602 result = subprocess_double.os_popen_scenario.call_hook(
1603 command_argv, mode, buffering)
1604 else:
1605 result = orig_os_popen(cmd, mode, buffering)
1606 return result
1608 func_patcher = mock.patch.object(
1609 os, "popen", autospec=True,
1610 side_effect=fake_os_popen)
1611 func_patcher.start()
1612 testcase.addCleanup(func_patcher.stop)
1615 def patch_os_waitpid(testcase):
1616 """ Patch `os.waitpid` behaviour for this test case.
1618 :param testcase: The `TestCase` instance to modify.
1619 :return: None.
1621 When the patched function is called, the registry of
1622 `SubprocessDouble` instances for this test case will be used
1623 to get the instance for the program path specified.
1626 orig_os_waitpid = os.waitpid
1628 def fake_os_waitpid(pid, options):
1629 registry = SubprocessDouble.double_by_pid
1630 if pid in registry:
1631 subprocess_double = registry[pid]
1632 result = subprocess_double.os_waitpid_scenario.call_hook(
1633 pid, options)
1634 else:
1635 result = orig_os_waitpid(pid, options)
1636 return result
1638 func_patcher = mock.patch.object(
1639 os, "waitpid", autospec=True,
1640 side_effect=fake_os_waitpid)
1641 func_patcher.start()
1642 testcase.addCleanup(func_patcher.stop)
1645 def patch_os_system(testcase):
1646 """ Patch `os.system` behaviour for this test case.
1648 :param testcase: The `TestCase` instance to modify.
1649 :return: None.
1651 When the patched function is called, the registry of
1652 `SubprocessDouble` instances for this test case will be used
1653 to get the instance for the program path specified.
1656 orig_os_system = os.system
1658 def fake_os_system(command):
1659 registry = SubprocessDouble.get_registry_for_testcase(testcase)
1660 command_argv = shlex.split(command)
1661 if command_argv in registry:
1662 subprocess_double = registry[command_argv]
1663 result = subprocess_double.os_system_scenario.call_hook(
1664 command)
1665 else:
1666 result = orig_os_system(command)
1667 return result
1669 func_patcher = mock.patch.object(
1670 os, "system", autospec=True,
1671 side_effect=fake_os_system)
1672 func_patcher.start()
1673 testcase.addCleanup(func_patcher.stop)
1676 def patch_os_spawnv(testcase):
1677 """ Patch `os.spawnv` behaviour for this test case.
1679 :param testcase: The `TestCase` instance to modify.
1680 :return: None.
1682 When the patched function is called, the registry of
1683 `SubprocessDouble` instances for this test case will be used
1684 to get the instance for the program path specified.
1687 orig_os_spawnv = os.spawnv
1689 def fake_os_spawnv(mode, file, args):
1690 registry = SubprocessDouble.get_registry_for_testcase(testcase)
1691 registry_key = tuple(args)
1692 if registry_key in registry:
1693 subprocess_double = registry[registry_key]
1694 result = subprocess_double.os_spawnv_scenario.call_hook(
1695 mode, file, args)
1696 else:
1697 result = orig_os_spawnv(mode, file, args)
1698 return result
1700 func_patcher = mock.patch.object(
1701 os, "spawnv", autospec=True,
1702 side_effect=fake_os_spawnv)
1703 func_patcher.start()
1704 testcase.addCleanup(func_patcher.stop)
1707 # Copyright © 2015–2016 Ben Finney <bignose@debian.org>
1709 # This is free software: you may copy, modify, and/or distribute this work
1710 # under the terms of the GNU General Public License as published by the
1711 # Free Software Foundation; version 3 of that license or any later version.
1712 # No warranty expressed or implied. See the file ‘LICENSE.GPL-3’ for details.
1715 # Local variables:
1716 # coding: utf-8
1717 # mode: python
1718 # End:
1719 # vim: fileencoding=utf-8 filetype=python :