Use ‘io.BytesIO’ for streams of ‘SubprocessDouble’.
[dput.git] / test / helper.py
blobd1756169c78261eb54910eb3afb2f4a311d03011
1 # -*- coding: utf-8; -*-
3 # test/helper.py
4 # Part of ‘dput’, a Debian package upload toolkit.
6 # Copyright © 2015 Ben Finney <ben+python@benfinney.id.au>
8 # This is free software: you may copy, modify, and/or distribute this work
9 # under the terms of the GNU General Public License as published by the
10 # Free Software Foundation; version 3 of that license or any later version.
11 # No warranty expressed or implied. See the file ‘LICENSE.GPL-3’ for details.
13 """ Helper functionality for Dput test suite. """
15 from __future__ import (absolute_import, unicode_literals)
17 import sys
19 if sys.version_info >= (3, 3):
20 import builtins
21 import unittest
22 import unittest.mock as mock
23 from io import StringIO as StringIO
24 import configparser
25 import collections.abc as collections_abc
26 elif sys.version_info >= (3, 0):
27 raise RuntimeError("Python 3 earlier than 3.3 is not supported.")
28 elif sys.version_info >= (2, 7):
29 # Python 2 standard library.
30 import __builtin__ as builtins
31 # Third-party backport of Python 3 unittest improvements.
32 import unittest2 as unittest
33 # Third-party mock library.
34 import mock
35 # Python 2 standard library.
36 from StringIO import StringIO as BaseStringIO
37 import ConfigParser as configparser
38 import collections as collections_abc
39 else:
40 raise RuntimeError("Python earlier than 2.7 is not supported.")
42 import os
43 import os.path
44 import io
45 import shutil
46 import tempfile
47 import pwd
48 import errno
49 import time
50 import signal
51 import subprocess
52 import functools
53 import itertools
54 import base64
55 import collections
56 import weakref
57 import shlex
59 __package__ = str("test")
60 __import__(__package__)
62 __metaclass__ = type
64 try:
65 # Python 2 types.
66 basestring
67 unicode
68 except NameError:
69 # Alias for Python 3 types.
70 basestring = str
71 unicode = str
74 def make_unique_slug(testcase):
75 """ Make a unique slug for the test case. """
76 text = base64.b64encode(
77 testcase.getUniqueString().encode('utf-8')
78 ).decode('utf-8')
79 result = text[-30:]
80 return result
83 try:
84 StringIO
85 except NameError:
86 # We don't yet have the StringIO we want. Create it.
88 class StringIO(BaseStringIO, object):
89 """ StringIO with a context manager. """
91 def __enter__(self):
92 return self
94 def __exit__(self, *args):
95 self.close()
96 return False
98 def readable(self):
99 return True
101 def writable(self):
102 return True
104 def seekable(self):
105 return True
108 def patch_stdout(testcase):
109 """ Patch `sys.stdout` for the specified test case. """
110 patcher = mock.patch.object(
111 sys, 'stdout', wraps=StringIO())
112 patcher.start()
113 testcase.addCleanup(patcher.stop)
116 def patch_stderr(testcase):
117 """ Patch `sys.stderr` for the specified test case. """
118 patcher = mock.patch.object(
119 sys, 'stderr', wraps=StringIO())
120 patcher.start()
121 testcase.addCleanup(patcher.stop)
124 def patch_signal_signal(testcase):
125 """ Patch `signal.signal` for the specified test case. """
126 func_patcher = mock.patch.object(signal, 'signal')
127 func_patcher.start()
128 testcase.addCleanup(func_patcher.stop)
131 class FakeSystemExit(Exception):
132 """ Fake double for `SystemExit` exception. """
135 EXIT_STATUS_SUCCESS = 0
136 EXIT_STATUS_FAILURE = 1
137 EXIT_STATUS_COMMAND_NOT_FOUND = 127
140 def patch_sys_exit(testcase):
141 """ Patch `sys.exit` for the specified test case. """
142 func_patcher = mock.patch.object(
143 sys, 'exit',
144 side_effect=FakeSystemExit())
145 func_patcher.start()
146 testcase.addCleanup(func_patcher.stop)
149 def patch_sys_argv(testcase):
150 """ Patch the `sys.argv` sequence for the test case. """
151 if not hasattr(testcase, 'progname'):
152 testcase.progname = make_unique_slug(testcase)
153 if not hasattr(testcase, 'sys_argv'):
154 testcase.sys_argv = [testcase.progname]
155 patcher = mock.patch.object(sys, "argv", new=list(testcase.sys_argv))
156 patcher.start()
157 testcase.addCleanup(patcher.stop)
160 def patch_system_interfaces(testcase):
161 """ Patch system interfaces that are disruptive to the test runner. """
162 patch_stdout(testcase)
163 patch_stderr(testcase)
164 patch_sys_exit(testcase)
165 patch_sys_argv(testcase)
168 def patch_time_time(testcase, values=None):
169 """ Patch the `time.time` function for the specified test case.
171 :param testcase: The `TestCase` instance for binding to the patch.
172 :param values: An iterable to provide return values.
173 :return: None.
176 if values is None:
177 values = itertools.count()
179 def generator_fake_time():
180 while True:
181 yield next(values)
183 func_patcher = mock.patch.object(time, "time")
184 func_patcher.start()
185 testcase.addCleanup(func_patcher.stop)
187 time.time.side_effect = generator_fake_time()
190 def patch_os_environ(testcase):
191 """ Patch the `os.environ` mapping. """
192 if not hasattr(testcase, 'os_environ'):
193 testcase.os_environ = {}
194 patcher = mock.patch.object(os, "environ", new=testcase.os_environ)
195 patcher.start()
196 testcase.addCleanup(patcher.stop)
199 def patch_os_getpid(testcase):
200 """ Patch `os.getpid` for the specified test case. """
201 func_patcher = mock.patch.object(os, 'getpid')
202 func_patcher.start()
203 testcase.addCleanup(func_patcher.stop)
206 def patch_os_getuid(testcase):
207 """ Patch the `os.getuid` function. """
208 if not hasattr(testcase, 'os_getuid_return_value'):
209 testcase.os_getuid_return_value = testcase.getUniqueInteger()
210 func_patcher = mock.patch.object(
211 os, "getuid", return_value=testcase.os_getuid_return_value)
212 func_patcher.start()
213 testcase.addCleanup(func_patcher.stop)
216 PasswdEntry = collections.namedtuple(
217 "PasswdEntry",
218 "pw_name pw_passwd pw_uid pw_gid pw_gecos pw_dir pw_shell")
221 def patch_pwd_getpwuid(testcase):
222 """ Patch the `pwd.getpwuid` function. """
223 if not hasattr(testcase, 'pwd_getpwuid_return_value'):
224 testcase.pwd_getpwuid_return_value = PasswdEntry(
225 pw_name=make_unique_slug(testcase),
226 pw_passwd=make_unique_slug(testcase),
227 pw_uid=testcase.getUniqueInteger(),
228 pw_gid=testcase.getUniqueInteger(),
229 pw_gecos=testcase.getUniqueString(),
230 pw_dir=tempfile.mktemp(),
231 pw_shell=tempfile.mktemp())
232 if not isinstance(testcase.pwd_getpwuid_return_value, pwd.struct_passwd):
233 pwent = pwd.struct_passwd(testcase.pwd_getpwuid_return_value)
234 else:
235 pwent = testcase.pwd_getpwuid_return_value
236 func_patcher = mock.patch.object(pwd, "getpwuid", return_value=pwent)
237 func_patcher.start()
238 testcase.addCleanup(func_patcher.stop)
241 def patch_os_path_exists(testcase):
242 """ Patch `os.path.exists` behaviour for this test case.
244 When the patched function is called, the registry of
245 `FileDouble` instances for this test case will be used to get
246 the instance for the path specified.
249 orig_os_path_exists = os.path.exists
251 def fake_os_path_exists(path):
252 registry = FileDouble.get_registry_for_testcase(testcase)
253 if path in registry:
254 file_double = registry[path]
255 result = file_double.os_path_exists_scenario.call_hook()
256 else:
257 result = orig_os_path_exists(path)
258 return result
260 func_patcher = mock.patch.object(
261 os.path, 'exists', side_effect=fake_os_path_exists)
262 func_patcher.start()
263 testcase.addCleanup(func_patcher.stop)
266 def patch_os_access(testcase):
267 """ Patch `os.access` behaviour for this test case.
269 When the patched function is called, the registry of
270 `FileDouble` instances for this test case will be used to get
271 the instance for the path specified.
274 orig_os_access = os.access
276 def fake_os_access(path, mode):
277 registry = FileDouble.get_registry_for_testcase(testcase)
278 if path in registry:
279 file_double = registry[path]
280 result = file_double.os_access_scenario.call_hook(mode)
281 else:
282 result = orig_os_access(path, mode)
283 return result
285 func_patcher = mock.patch.object(
286 os, 'access', side_effect=fake_os_access)
287 func_patcher.start()
288 testcase.addCleanup(func_patcher.stop)
291 StatResult = collections.namedtuple(
292 'StatResult', [
293 'st_mode',
294 'st_ino', 'st_dev', 'st_nlink',
295 'st_uid', 'st_gid',
296 'st_size',
297 'st_atime', 'st_mtime', 'st_ctime',
301 def patch_os_stat(testcase):
302 """ Patch `os.stat` behaviour for this test case.
304 When the patched function is called, the registry of
305 `FileDouble` instances for this test case will be used to get
306 the instance for the path specified.
309 orig_os_stat = os.stat
311 def fake_os_stat(path):
312 registry = FileDouble.get_registry_for_testcase(testcase)
313 if path in registry:
314 file_double = registry[path]
315 result = file_double.os_stat_scenario.call_hook()
316 else:
317 result = orig_os_stat(path)
318 return result
320 func_patcher = mock.patch.object(
321 os, 'stat', side_effect=fake_os_stat)
322 func_patcher.start()
323 testcase.addCleanup(func_patcher.stop)
326 def patch_os_lstat(testcase):
327 """ Patch `os.lstat` behaviour for this test case.
329 When the patched function is called, the registry of
330 `FileDouble` instances for this test case will be used to get
331 the instance for the path specified.
334 orig_os_lstat = os.lstat
336 def fake_os_lstat(path):
337 registry = FileDouble.get_registry_for_testcase(testcase)
338 if path in registry:
339 file_double = registry[path]
340 result = file_double.os_lstat_scenario.call_hook()
341 else:
342 result = orig_os_lstat(path)
343 return result
345 func_patcher = mock.patch.object(
346 os, 'lstat', side_effect=fake_os_lstat)
347 func_patcher.start()
348 testcase.addCleanup(func_patcher.stop)
351 def patch_os_unlink(testcase):
352 """ Patch `os.unlink` behaviour for this test case.
354 When the patched function is called, the registry of
355 `FileDouble` instances for this test case will be used to get
356 the instance for the path specified.
359 orig_os_unlink = os.unlink
361 def fake_os_unlink(path):
362 registry = FileDouble.get_registry_for_testcase(testcase)
363 if path in registry:
364 file_double = registry[path]
365 result = file_double.os_unlink_scenario.call_hook()
366 else:
367 result = orig_os_unlink(path)
368 return result
370 func_patcher = mock.patch.object(
371 os, 'unlink', side_effect=fake_os_unlink)
372 func_patcher.start()
373 testcase.addCleanup(func_patcher.stop)
376 def patch_os_rmdir(testcase):
377 """ Patch `os.rmdir` behaviour for this test case.
379 When the patched function is called, the registry of
380 `FileDouble` instances for this test case will be used to get
381 the instance for the path specified.
384 orig_os_rmdir = os.rmdir
386 def fake_os_rmdir(path):
387 registry = FileDouble.get_registry_for_testcase(testcase)
388 if path in registry:
389 file_double = registry[path]
390 result = file_double.os_rmdir_scenario.call_hook()
391 else:
392 result = orig_os_rmdir(path)
393 return result
395 func_patcher = mock.patch.object(
396 os, 'rmdir', side_effect=fake_os_rmdir)
397 func_patcher.start()
398 testcase.addCleanup(func_patcher.stop)
401 def patch_shutil_rmtree(testcase):
402 """ Patch `shutil.rmtree` behaviour for this test case.
404 When the patched function is called, the registry of
405 `FileDouble` instances for this test case will be used to get
406 the instance for the path specified.
409 orig_shutil_rmtree = os.rmdir
411 def fake_shutil_rmtree(path, ignore_errors=False, onerror=None):
412 registry = FileDouble.get_registry_for_testcase(testcase)
413 if path in registry:
414 file_double = registry[path]
415 result = file_double.shutil_rmtree_scenario.call_hook()
416 else:
417 result = orig_shutil_rmtree(path)
418 return result
420 func_patcher = mock.patch.object(
421 shutil, 'rmtree', side_effect=fake_shutil_rmtree)
422 func_patcher.start()
423 testcase.addCleanup(func_patcher.stop)
426 def patch_tempfile_mkdtemp(testcase):
427 """ Patch the `tempfile.mkdtemp` function for this test case. """
428 if not hasattr(testcase, 'tempfile_mkdtemp_file_double'):
429 testcase.tempfile_mkdtemp_file_double = FileDouble(tempfile.mktemp())
431 double = testcase.tempfile_mkdtemp_file_double
432 double.set_os_unlink_scenario('okay')
433 double.set_os_rmdir_scenario('okay')
434 double.register_for_testcase(testcase)
436 func_patcher = mock.patch.object(tempfile, "mkdtemp")
437 func_patcher.start()
438 testcase.addCleanup(func_patcher.stop)
440 tempfile.mkdtemp.return_value = testcase.tempfile_mkdtemp_file_double.path
443 try:
444 FileNotFoundError
445 FileExistsError
446 PermissionError
447 except NameError:
448 # Python 2 uses IOError.
449 def _ensure_ioerror_args(init_args, init_kwargs, errno_value):
450 result_kwargs = init_kwargs
451 result_errno = errno_value
452 result_strerror = os.strerror(errno_value)
453 result_filename = None
454 if len(init_args) >= 3:
455 result_errno = init_args[0]
456 result_filename = init_args[2]
457 if 'errno' in init_kwargs:
458 result_errno = init_kwargs['errno']
459 del result_kwargs['errno']
460 if 'filename' in init_kwargs:
461 result_filename = init_kwargs['filename']
462 del result_kwargs['filename']
463 if len(init_args) >= 2:
464 result_strerror = init_args[1]
465 if 'strerror' in init_kwargs:
466 result_strerror = init_kwargs['strerror']
467 del result_kwargs['strerror']
468 result_args = (result_errno, result_strerror, result_filename)
469 return (result_args, result_kwargs)
471 class FileNotFoundError(IOError):
472 def __init__(self, *args, **kwargs):
473 (args, kwargs) = _ensure_ioerror_args(
474 args, kwargs, errno_value=errno.ENOENT)
475 super(FileNotFoundError, self).__init__(*args, **kwargs)
477 class FileExistsError(IOError):
478 def __init__(self, *args, **kwargs):
479 (args, kwargs) = _ensure_ioerror_args(
480 args, kwargs, errno_value=errno.EEXIST)
481 super(FileExistsError, self).__init__(*args, **kwargs)
483 class PermissionError(IOError):
484 def __init__(self, *args, **kwargs):
485 (args, kwargs) = _ensure_ioerror_args(
486 args, kwargs, errno_value=errno.EPERM)
487 super(PermissionError, self).__init__(*args, **kwargs)
490 def make_fake_file_scenarios(path=None):
491 """ Make a collection of scenarios for testing with fake files.
493 :path: The filesystem path of the fake file. If not specified,
494 a valid random path will be generated.
495 :return: A collection of scenarios for tests involving input files.
497 The collection is a mapping from scenario name to a dictionary of
498 scenario attributes.
502 if path is None:
503 file_path = tempfile.mktemp()
504 else:
505 file_path = path
507 fake_file_empty = StringIO()
508 fake_file_minimal = StringIO("Lorem ipsum.")
509 fake_file_large = StringIO("\n".join(
510 "ABCDEFGH" * 100
511 for __ in range(1000)))
513 default_scenario_params = {
514 'open_scenario_name': 'okay',
515 'file_double_params': dict(
516 path=file_path, fake_file=fake_file_minimal),
519 scenarios = {
520 'default': {},
521 'error-not-exist': {
522 'open_scenario_name': 'nonexist',
524 'error-exist': {
525 'open_scenario_name': 'exist_error',
527 'error-read-denied': {
528 'open_scenario_name': 'read_denied',
530 'not-found': {
531 'file_double_params': dict(
532 path=file_path, fake_file=fake_file_empty),
534 'exist-empty': {
535 'file_double_params': dict(
536 path=file_path, fake_file=fake_file_empty),
538 'exist-minimal': {
539 'file_double_params': dict(
540 path=file_path, fake_file=fake_file_minimal),
542 'exist-large': {
543 'file_double_params': dict(
544 path=file_path, fake_file=fake_file_large),
548 for (name, scenario) in scenarios.items():
549 params = default_scenario_params.copy()
550 params.update(scenario)
551 scenario.update(params)
552 scenario['file_double'] = FileDouble(**scenario['file_double_params'])
553 scenario['file_double'].set_open_scenario(params['open_scenario_name'])
554 scenario['fake_file_scenario_name'] = name
556 return scenarios
559 def get_file_doubles_from_fake_file_scenarios(scenarios):
560 """ Get the `FileDouble` instances from fake file scenarios.
562 :param scenarios: Collection of fake file scenarios.
563 :return: Collection of `FileDouble` instances.
566 doubles = set(
567 scenario['file_double']
568 for scenario in scenarios
569 if scenario['file_double'] is not None)
571 return doubles
574 def setup_file_double_behaviour(testcase, doubles=None):
575 """ Set up file double instances and behaviour.
577 :param testcase: The `TestCase` instance to modify.
578 :param doubles: Collection of `FileDouble` instances.
579 :return: None.
581 If `doubles` is ``None``, a default collection will be made
582 from the result of `make_fake_file_scenarios` result.
585 if doubles is None:
586 scenarios = make_fake_file_scenarios()
587 doubles = get_file_doubles_from_fake_file_scenarios(
588 scenarios.values())
590 for file_double in doubles:
591 file_double.register_for_testcase(testcase)
593 orig_open = builtins.open
595 def fake_open(path, mode='rt', buffering=-1):
596 registry = FileDouble.get_registry_for_testcase(testcase)
597 if path in registry:
598 file_double = registry[path]
599 result = file_double.builtins_open_scenario.call_hook(
600 mode, buffering)
601 else:
602 result = orig_open(path, mode, buffering)
603 return result
605 mock_open = mock.mock_open()
606 mock_open.side_effect = fake_open
608 func_patcher = mock.patch.object(
609 builtins, "open",
610 new=mock_open)
611 func_patcher.start()
612 testcase.addCleanup(func_patcher.stop)
615 def setup_fake_file_fixtures(testcase):
616 """ Set up fixtures for fake file doubles.
618 :param testcase: The `TestCase` instance to modify.
619 :return: None.
622 scenarios = make_fake_file_scenarios()
623 testcase.fake_file_scenarios = scenarios
625 file_doubles = get_file_doubles_from_fake_file_scenarios(
626 scenarios.values())
627 setup_file_double_behaviour(testcase, file_doubles)
630 def set_fake_file_scenario(testcase, name):
631 """ Set the named fake file scenario for the test case. """
632 scenario = testcase.fake_file_scenarios[name]
633 testcase.fake_file_scenario = scenario
634 testcase.file_double = scenario['file_double']
635 testcase.file_double.register_for_testcase(testcase)
638 class TestDoubleFunctionScenario:
639 """ Scenario for fake behaviour of a specific function. """
641 def __init__(self, scenario_name, double):
642 self.scenario_name = scenario_name
643 self.double = double
645 self.call_hook = getattr(
646 self, "_hook_{name}".format(name=self.scenario_name))
648 def __repr__(self):
649 text = (
650 "<{class_name} instance: {id}"
651 " name: {name!r},"
652 " call_hook name: {hook_name!r}"
653 " double: {double!r}"
654 ">").format(
655 class_name=self.__class__.__name__, id=id(self),
656 name=self.scenario_name, double=self.double,
657 hook_name=self.call_hook.__name__)
658 return text
660 def __eq__(self, other):
661 result = True
662 if not self.scenario_name == other.scenario_name:
663 result = False
664 if not self.double == other.double:
665 result = False
666 if not self.call_hook.__name__ == other.call_hook.__name__:
667 result = False
668 return result
670 def __ne__(self, other):
671 result = not self.__eq__(other)
672 return result
675 class os_path_exists_scenario(TestDoubleFunctionScenario):
676 """ Scenario for `os.path.exists` behaviour. """
678 def _hook_exist(self):
679 return True
681 def _hook_not_exist(self):
682 return False
685 class os_access_scenario(TestDoubleFunctionScenario):
686 """ Scenario for `os.access` behaviour. """
688 def _hook_okay(self, mode):
689 return True
691 def _hook_not_exist(self, mode):
692 return False
694 def _hook_read_only(self, mode):
695 if mode & (os.W_OK | os.X_OK):
696 result = False
697 else:
698 result = True
699 return result
701 def _hook_denied(self, mode):
702 if mode & (os.R_OK | os.W_OK | os.X_OK):
703 result = False
704 else:
705 result = True
706 return result
709 class os_stat_scenario(TestDoubleFunctionScenario):
710 """ Scenario for `os.stat` behaviour. """
712 def _hook_okay(self):
713 return self.double.stat_result
715 def _hook_notfound_error(self):
716 raise FileNotFoundError(
717 self.double.path,
718 "No such file or directory: {path!r}".format(
719 path=self.double.path))
721 def _hook_denied_error(self):
722 raise PermissionError(
723 self.double.path,
724 "Permission denied")
727 class os_lstat_scenario(os_stat_scenario):
728 """ Scenario for `os.lstat` behaviour. """
731 class os_unlink_scenario(TestDoubleFunctionScenario):
732 """ Scenario for `os.unlink` behaviour. """
734 def _hook_okay(self):
735 return None
737 def _hook_nonexist(self):
738 error = FileNotFoundError(
739 self.double.path,
740 "No such file or directory: {path!r}".format(
741 path=self.double.path))
742 raise error
744 def _hook_denied(self):
745 error = PermissionError(
746 self.double.path,
747 "Permission denied")
748 raise error
751 class os_rmdir_scenario(TestDoubleFunctionScenario):
752 """ Scenario for `os.rmdir` behaviour. """
754 def _hook_okay(self):
755 return None
757 def _hook_nonexist(self):
758 error = FileNotFoundError(
759 self.double.path,
760 "No such file or directory: {path!r}".format(
761 path=self.double.path))
762 raise error
764 def _hook_denied(self):
765 error = PermissionError(
766 self.double.path,
767 "Permission denied")
768 raise error
771 class shutil_rmtree_scenario(TestDoubleFunctionScenario):
772 """ Scenario for `shutil.rmtree` behaviour. """
774 def _hook_okay(self):
775 return None
777 def _hook_nonexist(self):
778 error = FileNotFoundError(
779 self.double.path,
780 "No such file or directory: {path!r}".format(
781 path=self.double.path))
782 raise error
784 def _hook_denied(self):
785 error = PermissionError(
786 self.double.path,
787 "Permission denied")
788 raise error
791 class builtins_open_scenario(TestDoubleFunctionScenario):
792 """ Scenario for `builtins.open` behaviour. """
794 def _hook_okay(self, mode, buffering):
795 result = self.double.fake_file
796 return result
798 def _hook_nonexist(self, mode, buffering):
799 if mode.startswith('r'):
800 error = FileNotFoundError(
801 self.double.path,
802 "No such file or directory: {path!r}".format(
803 path=self.double.path))
804 raise error
805 result = self.double.fake_file
806 return result
808 def _hook_exist_error(self, mode, buffering):
809 if mode.startswith('w') or mode.startswith('a'):
810 error = FileExistsError(
811 self.double.path,
812 "File already exists: {path!r}".format(
813 path=self.double.path))
814 raise error
815 result = self.double.fake_file
816 return result
818 def _hook_read_denied(self, mode, buffering):
819 if mode.startswith('r'):
820 error = PermissionError(
821 self.double.path,
822 "Read denied on {path!r}".format(
823 path=self.double.path))
824 raise error
825 result = self.double.fake_file
826 return result
828 def _hook_write_denied(self, mode, buffering):
829 if mode.startswith('w') or mode.startswith('a'):
830 error = PermissionError(
831 self.double.path,
832 "Write denied on {path!r}".format(
833 path=self.double.path))
834 raise error
835 result = self.double.fake_file
836 return result
839 class TestDoubleWithRegistry:
840 """ Abstract base class for a test double with a test case registry. """
842 registry_class = NotImplemented
843 registries = NotImplemented
845 function_scenario_params_by_class = NotImplemented
847 def __new__(cls, *args, **kwargs):
848 superclass = super(TestDoubleWithRegistry, cls)
849 if superclass.__new__ is object.__new__:
850 # The ‘object’ implementation complains about extra arguments.
851 instance = superclass.__new__(cls)
852 else:
853 instance = superclass.__new__(cls, *args, **kwargs)
854 instance.make_set_scenario_methods()
856 return instance
858 def __init__(self, *args, **kwargs):
859 super(TestDoubleWithRegistry, self).__init__(*args, **kwargs)
860 self._set_method_per_scenario()
862 def _make_set_scenario_method(self, scenario_class, params):
863 def method(self, name):
864 scenario = scenario_class(name, double=self)
865 setattr(self, scenario_class.__name__, scenario)
866 method.__doc__ = (
867 """ Set the scenario for `{name}` behaviour. """
868 ).format(name=scenario_class.__name__)
869 method.__name__ = str(params['set_scenario_method_name'])
870 return method
872 def make_set_scenario_methods(self):
873 """ Make `set_<scenario_class_name>` methods on this class. """
874 for (function_scenario_class, function_scenario_params) in (
875 self.function_scenario_params_by_class.items()):
876 method = self._make_set_scenario_method(
877 function_scenario_class, function_scenario_params)
878 setattr(self.__class__, method.__name__, method)
879 function_scenario_params['set_scenario_method'] = method
881 def _set_method_per_scenario(self):
882 """ Set the method to be called for each scenario. """
883 for function_scenario_params in (
884 self.function_scenario_params_by_class.values()):
885 function_scenario_params['set_scenario_method'](
886 self, function_scenario_params['default_scenario_name'])
888 @classmethod
889 def get_registry_for_testcase(cls, testcase):
890 """ Get the FileDouble registry for the specified test case. """
891 # Key in a dict must be hashable.
892 key = (testcase.__class__, id(testcase))
893 registry = cls.registries.setdefault(key, cls.registry_class())
894 return registry
896 def get_registry_key(self):
897 """ Get the registry key for this double. """
898 raise NotImplementedError
900 def register_for_testcase(self, testcase):
901 """ Add this instance to registry for the specified testcase. """
902 registry = self.get_registry_for_testcase(testcase)
903 key = self.get_registry_key()
904 registry[key] = self
905 unregister_func = functools.partial(
906 self.unregister_for_testcase, testcase)
907 testcase.addCleanup(unregister_func)
909 def unregister_for_testcase(self, testcase):
910 """ Remove this instance from registry for the specified testcase. """
911 registry = self.get_registry_for_testcase(testcase)
912 key = self.get_registry_key()
913 if key in registry:
914 registry.pop(key)
917 def copy_fake_file(fake_file):
918 """ Make a copy of the StringIO instance. """
919 fake_file_type = StringIO
920 content = ""
921 if fake_file is not None:
922 fake_file_type = type(fake_file)
923 content = fake_file.getvalue()
924 assert issubclass(fake_file_type, object)
925 result = fake_file_type(content)
926 if hasattr(fake_file, 'encoding'):
927 if not hasattr(result, 'encoding'):
928 result.encoding = fake_file.encoding
929 return result
932 class FileDouble(TestDoubleWithRegistry):
933 """ A testing double for a file. """
935 registry_class = dict
936 registries = {}
938 function_scenario_params_by_class = {
939 os_path_exists_scenario: {
940 'default_scenario_name': 'not_exist',
941 'set_scenario_method_name': 'set_os_path_exists_scenario',
943 os_access_scenario: {
944 'default_scenario_name': 'okay',
945 'set_scenario_method_name': 'set_os_access_scenario',
947 os_stat_scenario: {
948 'default_scenario_name': 'okay',
949 'set_scenario_method_name': 'set_os_stat_scenario',
951 os_lstat_scenario: {
952 'default_scenario_name': 'okay',
953 'set_scenario_method_name': 'set_os_lstat_scenario',
955 builtins_open_scenario: {
956 'default_scenario_name': 'okay',
957 'set_scenario_method_name': 'set_open_scenario',
959 os_unlink_scenario: {
960 'default_scenario_name': 'okay',
961 'set_scenario_method_name': 'set_os_unlink_scenario',
963 os_rmdir_scenario: {
964 'default_scenario_name': 'okay',
965 'set_scenario_method_name': 'set_os_rmdir_scenario',
967 shutil_rmtree_scenario: {
968 'default_scenario_name': 'okay',
969 'set_scenario_method_name': 'set_shutil_rmtree_scenario',
973 def __init__(self, path=None, fake_file=None, *args, **kwargs):
974 self.path = path
975 self.fake_file = copy_fake_file(fake_file)
976 self.fake_file.name = path
978 self._set_stat_result()
980 super(FileDouble, self).__init__(*args, **kwargs)
982 def _set_stat_result(self):
983 """ Set the `os.stat` result for this file. """
984 size = len(self.fake_file.getvalue())
985 self.stat_result = StatResult(
986 st_mode=0,
987 st_ino=None, st_dev=None, st_nlink=None,
988 st_uid=0, st_gid=0,
989 st_size=size,
990 st_atime=None, st_mtime=None, st_ctime=None,
993 def __repr__(self):
994 text = "FileDouble(path={path!r}, fake_file={fake_file!r})".format(
995 path=self.path, fake_file=self.fake_file)
996 return text
998 def get_registry_key(self):
999 """ Get the registry key for this double. """
1000 result = self.path
1001 return result
1004 class os_popen_scenario(TestDoubleFunctionScenario):
1005 """ Scenario for `os.popen` behaviour. """
1007 stream_name_by_mode = {
1008 'w': 'stdin',
1009 'r': 'stdout',
1012 def _hook_success(self, argv, mode, buffering):
1013 stream_name = self.stream_name_by_mode[mode]
1014 stream_double = getattr(
1015 self.double, stream_name + '_double')
1016 result = stream_double.fake_file
1017 return result
1019 def _hook_failure(self, argv, mode, buffering):
1020 result = StringIO()
1021 return result
1023 def _hook_not_found(self, argv, mode, buffering):
1024 result = StringIO()
1025 return result
1028 class os_waitpid_scenario(TestDoubleFunctionScenario):
1029 """ Scenario for `os.waitpid` behaviour. """
1031 def _hook_success(self, pid, options):
1032 result = (pid, EXIT_STATUS_SUCCESS)
1033 return result
1035 def _hook_failure(self, pid, options):
1036 result = (pid, EXIT_STATUS_FAILURE)
1037 return result
1039 def _hook_not_found(self, pid, options):
1040 error = OSError(errno.ECHILD)
1041 raise error
1044 class os_system_scenario(TestDoubleFunctionScenario):
1045 """ Scenario for `os.system` behaviour. """
1047 def _hook_success(self, command):
1048 result = EXIT_STATUS_SUCCESS
1049 return result
1051 def _hook_failure(self, command):
1052 result = EXIT_STATUS_FAILURE
1053 return result
1055 def _hook_not_found(self, command):
1056 result = EXIT_STATUS_COMMAND_NOT_FOUND
1057 return result
1060 class os_spawnv_scenario(TestDoubleFunctionScenario):
1061 """ Scenario for `os.spawnv` behaviour. """
1063 def _hook_success(self, mode, file, args):
1064 result = EXIT_STATUS_SUCCESS
1065 return result
1067 def _hook_failure(self, mode, file, args):
1068 result = EXIT_STATUS_FAILURE
1069 return result
1071 def _hook_not_found(self, mode, file, args):
1072 result = EXIT_STATUS_COMMAND_NOT_FOUND
1073 return result
1076 ARG_ANY = object()
1077 ARG_MORE = object()
1080 class PopenDouble:
1081 """ A testing double for `subprocess.Popen`. """
1083 def __init__(self, args, *posargs, **kwargs):
1084 self.stdin = None
1085 self.stdout = None
1086 self.stderr = None
1087 self.pid = None
1088 self.returncode = None
1090 if kwargs.get('shell', False):
1091 self.argv = shlex.split(args)
1092 else:
1093 # The paramter is already a sequence of command-line arguments.
1094 self.argv = args
1096 def set_streams(self, subprocess_double, popen_kwargs):
1097 """ Set the streams on the `PopenDouble`.
1099 :param subprocess_double: The `SubprocessDouble` from
1100 which to get existing stream doubles.
1101 :param popen_kwargs: The keyword arguments to the
1102 `subprocess.Popen` call.
1103 :return: ``None``.
1106 for stream_name in (
1107 name for name in ['stdin', 'stdout', 'stderr']
1108 if name in popen_kwargs):
1109 stream_spec = popen_kwargs[stream_name]
1110 if stream_spec is subprocess.PIPE:
1111 stream_double = getattr(
1112 subprocess_double,
1113 "{name}_double".format(name=stream_name))
1114 stream_file = stream_double.fake_file
1115 elif stream_spec is subprocess.STDOUT:
1116 stream_file = subprocess_double.stdout_double.fake_file
1117 else:
1118 stream_file = stream_spec
1119 setattr(self, stream_name, stream_file)
1121 def wait(self):
1122 """ Wait for subprocess to terminate. """
1123 return self.returncode
1126 class subprocess_popen_scenario(TestDoubleFunctionScenario):
1127 """ Scenario for `subprocess.Popen` behaviour. """
1129 def _hook_success(self, testcase, args, *posargs, **kwargs):
1130 double = self.double.popen_double
1131 double.set_streams(self.double, kwargs)
1132 return double
1135 def patch_subprocess_popen(testcase):
1136 """ Patch `subprocess.Popen` constructor for this test case.
1138 :param testcase: The `TestCase` instance to modify.
1139 :return: None.
1141 When the patched function is called, the registry of
1142 `SubprocessDouble` instances for this test case will be used
1143 to get the instance for the program path specified.
1146 orig_subprocess_popen = subprocess.Popen
1148 def fake_subprocess_popen(args, *posargs, **kwargs):
1149 if kwargs.get('shell', False):
1150 argv = shlex.split(args)
1151 else:
1152 argv = args
1153 registry = SubprocessDouble.get_registry_for_testcase(testcase)
1154 if argv in registry:
1155 subprocess_double = registry[argv]
1156 result = subprocess_double.subprocess_popen_scenario.call_hook(
1157 testcase, args, *posargs, **kwargs)
1158 else:
1159 result = orig_subprocess_popen(args, *posargs, **kwargs)
1160 return result
1162 func_patcher = mock.patch.object(
1163 subprocess, "Popen", side_effect=fake_subprocess_popen)
1164 func_patcher.start()
1165 testcase.addCleanup(func_patcher.stop)
1168 class subprocess_call_scenario(TestDoubleFunctionScenario):
1169 """ Scenario for `subprocess.call` behaviour. """
1171 def _hook_success(self, command):
1172 result = EXIT_STATUS_SUCCESS
1173 return result
1175 def _hook_failure(self, command):
1176 result = EXIT_STATUS_FAILURE
1177 return result
1179 def _hook_not_found(self, command):
1180 result = EXIT_STATUS_COMMAND_NOT_FOUND
1181 return result
1184 def patch_subprocess_call(testcase):
1185 """ Patch `subprocess.call` function for this test case.
1187 :param testcase: The `TestCase` instance to modify.
1188 :return: None.
1190 When the patched function is called, the registry of
1191 `SubprocessDouble` instances for this test case will be used
1192 to get the instance for the program path specified.
1195 orig_subprocess_call = subprocess.call
1197 def fake_subprocess_call(command, *posargs, **kwargs):
1198 if kwargs.get('shell', False):
1199 command_argv = shlex.split(command)
1200 else:
1201 command_argv = command
1202 registry = SubprocessDouble.get_registry_for_testcase(testcase)
1203 if command_argv in registry:
1204 subprocess_double = registry[command_argv]
1205 result = subprocess_double.subprocess_call_scenario.call_hook(
1206 command)
1207 else:
1208 result = orig_subprocess_call(command, *posargs, **kwargs)
1209 return result
1211 func_patcher = mock.patch.object(
1212 subprocess, "call", side_effect=fake_subprocess_call)
1213 func_patcher.start()
1214 testcase.addCleanup(func_patcher.stop)
1217 class subprocess_check_call_scenario(TestDoubleFunctionScenario):
1218 """ Scenario for `subprocess.check_call` behaviour. """
1220 def _hook_success(self, command):
1221 return None
1223 def _hook_failure(self, command):
1224 result = EXIT_STATUS_FAILURE
1225 error = subprocess.CalledProcessError(result, command)
1226 raise error
1228 def _hook_not_found(self, command):
1229 result = EXIT_STATUS_COMMAND_NOT_FOUND
1230 error = subprocess.CalledProcessError(result, command)
1231 raise error
1234 def patch_subprocess_check_call(testcase):
1235 """ Patch `subprocess.check_call` function for this test case.
1237 :param testcase: The `TestCase` instance to modify.
1238 :return: None.
1240 When the patched function is called, the registry of
1241 `SubprocessDouble` instances for this test case will be used
1242 to get the instance for the program path specified.
1245 orig_subprocess_check_call = subprocess.check_call
1247 def fake_subprocess_check_call(command, *posargs, **kwargs):
1248 if kwargs.get('shell', False):
1249 command_argv = shlex.split(command)
1250 else:
1251 command_argv = command
1252 registry = SubprocessDouble.get_registry_for_testcase(testcase)
1253 if command_argv in registry:
1254 subprocess_double = registry[command_argv]
1255 scenario = subprocess_double.subprocess_check_call_scenario
1256 result = scenario.call_hook(command)
1257 else:
1258 result = orig_subprocess_check_call(command, *posargs, **kwargs)
1259 return result
1261 func_patcher = mock.patch.object(
1262 subprocess, "check_call", side_effect=fake_subprocess_check_call)
1263 func_patcher.start()
1264 testcase.addCleanup(func_patcher.stop)
1267 class SubprocessDoubleRegistry(collections_abc.MutableMapping):
1268 """ Registry of `SubprocessDouble` instances by `argv`. """
1270 def __init__(self, *args, **kwargs):
1271 items = []
1272 if args:
1273 if isinstance(args[0], collections_abc.Mapping):
1274 items = args[0].items()
1275 if isinstance(args[0], collections_abc.Iterable):
1276 items = args[0]
1277 self._mapping = dict(items)
1279 def __repr__(self):
1280 text = "<{class_name} object: {mapping}>".format(
1281 class_name=self.__class__.__name__, mapping=self._mapping)
1282 return text
1284 def _match_argv(self, argv):
1285 """ Match the specified `argv` with our registered keys. """
1286 match = None
1287 if not isinstance(argv, collections_abc.Sequence):
1288 return match
1289 candidates = iter(self._mapping)
1290 while match is None:
1291 try:
1292 candidate = next(candidates)
1293 except StopIteration:
1294 break
1295 found = None
1296 if candidate == argv:
1297 # An exact match.
1298 found = True
1299 word_iter = enumerate(candidate)
1300 while found is None:
1301 try:
1302 (word_index, candidate_word) = next(word_iter)
1303 except StopIteration:
1304 break
1305 if candidate_word is ARG_MORE:
1306 # Candiate matches any remaining words. We have a match.
1307 found = True
1308 elif word_index > len(argv):
1309 # Candidate is too long for the specified argv.
1310 found = False
1311 elif candidate_word is ARG_ANY:
1312 # Candidate matches any word at this position.
1313 continue
1314 elif candidate_word == argv[word_index]:
1315 # Candidate matches the word at this position.
1316 continue
1317 else:
1318 # This candidate does not match.
1319 found = False
1320 if found is None:
1321 # Reached the end of the candidate without a mismatch.
1322 found = True
1323 if found:
1324 match = candidate
1325 return match
1327 def __getitem__(self, key):
1328 match = self._match_argv(key)
1329 if match is None:
1330 raise KeyError(key)
1331 result = self._mapping[match]
1332 return result
1334 def __setitem__(self, key, value):
1335 if key in self:
1336 del self[key]
1337 self._mapping[key] = value
1339 def __delitem__(self, key):
1340 match = self._match_argv(key)
1341 if match is not None:
1342 del self._mapping[match]
1344 def __iter__(self):
1345 return self._mapping.__iter__()
1347 def __len__(self):
1348 return self._mapping.__len__()
1351 class SubprocessDouble(TestDoubleWithRegistry):
1352 """ A testing double for a subprocess. """
1354 registry_class = SubprocessDoubleRegistry
1355 registries = {}
1357 double_by_pid = weakref.WeakValueDictionary()
1359 function_scenario_params_by_class = {
1360 subprocess_popen_scenario: {
1361 'default_scenario_name': 'success',
1362 'set_scenario_method_name': 'set_subprocess_popen_scenario',
1364 subprocess_call_scenario: {
1365 'default_scenario_name': 'success',
1366 'set_scenario_method_name': 'set_subprocess_call_scenario',
1368 subprocess_check_call_scenario: {
1369 'default_scenario_name': 'success',
1370 'set_scenario_method_name':
1371 'set_subprocess_check_call_scenario',
1373 os_popen_scenario: {
1374 'default_scenario_name': 'success',
1375 'set_scenario_method_name': 'set_os_popen_scenario',
1377 os_waitpid_scenario: {
1378 'default_scenario_name': 'success',
1379 'set_scenario_method_name': 'set_os_waitpid_scenario',
1381 os_system_scenario: {
1382 'default_scenario_name': 'success',
1383 'set_scenario_method_name': 'set_os_system_scenario',
1385 os_spawnv_scenario: {
1386 'default_scenario_name': 'success',
1387 'set_scenario_method_name': 'set_os_spawnv_scenario',
1391 def __init__(self, path=None, argv=None, *args, **kwargs):
1392 if path is None:
1393 path = tempfile.mktemp()
1394 self.path = path
1396 if argv is None:
1397 command_name = os.path.basename(path)
1398 argv = [command_name]
1399 self.argv = argv
1401 self.pid = self._make_pid()
1402 self._register_by_pid()
1404 self.set_popen_double()
1406 stream_class = SubprocessDouble.stream_class
1407 for stream_name in ['stdin', 'stdout', 'stderr']:
1408 fake_file = stream_class()
1409 file_double = FileDouble(fake_file=fake_file)
1410 stream_double_name = '{name}_double'.format(name=stream_name)
1411 setattr(self, stream_double_name, file_double)
1413 super(SubprocessDouble, self).__init__(*args, **kwargs)
1415 def set_popen_double(self):
1416 """ Set the `PopenDouble` for this instance. """
1417 double = PopenDouble(self.argv)
1418 double.pid = self.pid
1420 self.popen_double = double
1422 def __repr__(self):
1423 text = (
1424 "<SubprocessDouble instance: {id}"
1425 " path: {path!r},"
1426 " argv: {argv!r}"
1427 " stdin_double: {stdin_double!r}"
1428 " stdout_double: {stdout_double!r}"
1429 " stderr_double: {stderr_double!r}"
1430 ">").format(
1431 id=id(self),
1432 path=self.path, argv=self.argv,
1433 stdin_double=self.stdin_double,
1434 stdout_double=self.stdout_double,
1435 stderr_double=self.stderr_double)
1436 return text
1438 @classmethod
1439 def _make_pid(cls):
1440 """ Make a unique PID for a subprocess. """
1441 for pid in itertools.count(1):
1442 yield pid
1444 def _register_by_pid(self):
1445 """ Register this subprocess by its PID. """
1446 self.__class__.double_by_pid[self.pid] = self
1448 def get_registry_key(self):
1449 """ Get the registry key for this double. """
1450 result = tuple(self.argv)
1451 return result
1453 stream_class = io.BytesIO
1454 stream_encoding = "utf-8"
1456 def set_stdin_content(self, text, bytes_encoding=stream_encoding):
1457 """ Set the content of the `stdin` stream for this double. """
1458 content = text.encode(bytes_encoding)
1459 fake_file = self.stream_class(content)
1460 self.stdin_double.fake_file = fake_file
1462 def set_stdout_content(self, text, bytes_encoding=stream_encoding):
1463 """ Set the content of the `stdout` stream for this double. """
1464 content = text.encode(bytes_encoding)
1465 fake_file = self.stream_class(content)
1466 self.stdout_double.fake_file = fake_file
1468 def set_stderr_content(self, text, bytes_encoding=stream_encoding):
1469 """ Set the content of the `stderr` stream for this double. """
1470 content = text.encode(bytes_encoding)
1471 fake_file = self.stream_class(content)
1472 self.stderr_double.fake_file = fake_file
1475 def make_fake_subprocess_scenarios(path=None):
1476 """ Make a collection of scenarios for testing with fake files.
1478 :path: The filesystem path of the fake program. If not specified,
1479 a valid random path will be generated.
1480 :return: A collection of scenarios for tests involving subprocesses.
1482 The collection is a mapping from scenario name to a dictionary of
1483 scenario attributes.
1486 if path is None:
1487 file_path = tempfile.mktemp()
1488 else:
1489 file_path = path
1491 default_scenario_params = {
1492 'return_value': EXIT_STATUS_SUCCESS,
1493 'program_path': file_path,
1494 'argv_after_command_name': [],
1497 scenarios = {
1498 'default': {},
1499 'not-found': {
1500 'return_value': EXIT_STATUS_COMMAND_NOT_FOUND,
1504 for (name, scenario) in scenarios.items():
1505 params = default_scenario_params.copy()
1506 params.update(scenario)
1507 scenario.update(params)
1508 program_path = params['program_path']
1509 program_name = os.path.basename(params['program_path'])
1510 argv = [program_name]
1511 argv.extend(params['argv_after_command_name'])
1512 subprocess_double_params = dict(
1513 path=program_path,
1514 argv=argv,
1516 subprocess_double = SubprocessDouble(**subprocess_double_params)
1517 scenario['subprocess_double'] = subprocess_double
1518 scenario['fake_file_scenario_name'] = name
1520 return scenarios
1523 def get_subprocess_doubles_from_fake_subprocess_scenarios(scenarios):
1524 """ Get the `SubprocessDouble` instances from fake subprocess scenarios.
1526 :param scenarios: Collection of fake subprocess scenarios.
1527 :return: Collection of `SubprocessDouble` instances.
1530 doubles = set(
1531 scenario['subprocess_double']
1532 for scenario in scenarios
1533 if scenario['subprocess_double'] is not None)
1535 return doubles
1538 def setup_subprocess_double_behaviour(testcase, doubles=None):
1539 """ Set up subprocess double instances and behaviour.
1541 :param testcase: The `TestCase` instance to modify.
1542 :param doubles: Collection of `SubprocessDouble` instances.
1543 :return: None.
1545 If `doubles` is ``None``, a default collection will be made
1546 from the return value of `make_fake_subprocess_scenarios`.
1549 if doubles is None:
1550 scenarios = make_fake_subprocess_scenarios()
1551 doubles = get_subprocess_doubles_from_fake_subprocess_scenarios(
1552 scenarios.values())
1554 for double in doubles:
1555 double.register_for_testcase(testcase)
1558 def setup_fake_subprocess_fixtures(testcase):
1559 """ Set up fixtures for fake subprocess doubles.
1561 :param testcase: The `TestCase` instance to modify.
1562 :return: None.
1565 scenarios = make_fake_subprocess_scenarios()
1566 testcase.fake_subprocess_scenarios = scenarios
1568 doubles = get_subprocess_doubles_from_fake_subprocess_scenarios(
1569 scenarios.values())
1570 setup_subprocess_double_behaviour(testcase, doubles)
1573 def patch_os_popen(testcase):
1574 """ Patch `os.popen` behaviour for this test case.
1576 :param testcase: The `TestCase` instance to modify.
1577 :return: None.
1579 When the patched function is called, the registry of
1580 `SubprocessDouble` instances for this test case will be used
1581 to get the instance for the program path specified.
1584 orig_os_popen = os.popen
1586 def fake_os_popen(cmd, mode='r', buffering=-1):
1587 registry = SubprocessDouble.get_registry_for_testcase(testcase)
1588 if isinstance(cmd, basestring):
1589 command_argv = shlex.split(cmd)
1590 else:
1591 command_argv = cmd
1592 if command_argv in registry:
1593 subprocess_double = registry[command_argv]
1594 result = subprocess_double.os_popen_scenario.call_hook(
1595 command_argv, mode, buffering)
1596 else:
1597 result = orig_os_popen(cmd, mode, buffering)
1598 return result
1600 func_patcher = mock.patch.object(
1601 os, "popen", side_effect=fake_os_popen)
1602 func_patcher.start()
1603 testcase.addCleanup(func_patcher.stop)
1606 def patch_os_waitpid(testcase):
1607 """ Patch `os.waitpid` behaviour for this test case.
1609 :param testcase: The `TestCase` instance to modify.
1610 :return: None.
1612 When the patched function is called, the registry of
1613 `SubprocessDouble` instances for this test case will be used
1614 to get the instance for the program path specified.
1617 orig_os_waitpid = os.waitpid
1619 def fake_os_waitpid(pid, options):
1620 registry = SubprocessDouble.double_by_pid
1621 if pid in registry:
1622 subprocess_double = registry[pid]
1623 result = subprocess_double.os_waitpid_scenario.call_hook(
1624 pid, options)
1625 else:
1626 result = orig_os_waitpid(pid, options)
1627 return result
1629 func_patcher = mock.patch.object(
1630 os, "waitpid", side_effect=fake_os_waitpid)
1631 func_patcher.start()
1632 testcase.addCleanup(func_patcher.stop)
1635 def patch_os_system(testcase):
1636 """ Patch `os.system` behaviour for this test case.
1638 :param testcase: The `TestCase` instance to modify.
1639 :return: None.
1641 When the patched function is called, the registry of
1642 `SubprocessDouble` instances for this test case will be used
1643 to get the instance for the program path specified.
1646 orig_os_system = os.system
1648 def fake_os_system(command):
1649 registry = SubprocessDouble.get_registry_for_testcase(testcase)
1650 command_argv = shlex.split(command)
1651 if command_argv in registry:
1652 subprocess_double = registry[command_argv]
1653 result = subprocess_double.os_system_scenario.call_hook(
1654 command)
1655 else:
1656 result = orig_os_system(command)
1657 return result
1659 func_patcher = mock.patch.object(
1660 os, "system", side_effect=fake_os_system)
1661 func_patcher.start()
1662 testcase.addCleanup(func_patcher.stop)
1665 def patch_os_spawnv(testcase):
1666 """ Patch `os.spawnv` behaviour for this test case.
1668 :param testcase: The `TestCase` instance to modify.
1669 :return: None.
1671 When the patched function is called, the registry of
1672 `SubprocessDouble` instances for this test case will be used
1673 to get the instance for the program path specified.
1676 orig_os_spawnv = os.spawnv
1678 def fake_os_spawnv(mode, file, args):
1679 registry = SubprocessDouble.get_registry_for_testcase(testcase)
1680 registry_key = tuple(args)
1681 if registry_key in registry:
1682 subprocess_double = registry[registry_key]
1683 result = subprocess_double.os_spawnv_scenario.call_hook(
1684 mode, file, args)
1685 else:
1686 result = orig_os_spawnv(mode, file, args)
1687 return result
1689 func_patcher = mock.patch.object(
1690 os, "spawnv", side_effect=fake_os_spawnv)
1691 func_patcher.start()
1692 testcase.addCleanup(func_patcher.stop)
1695 # Local variables:
1696 # coding: utf-8
1697 # mode: python
1698 # End:
1699 # vim: fileencoding=utf-8 filetype=python :