1 # -*- coding: utf-8; -*-
4 # Part of ‘dput’, a Debian package upload toolkit.
6 # This is free software, and you are welcome to redistribute it under
7 # certain conditions; see the end of this file for copyright
8 # information, grant of license, and disclaimer of warranty.
10 """ Helper functionality for Dput test suite. """
12 from __future__
import (absolute_import
, unicode_literals
)
16 if sys
.version_info
>= (3, 3):
19 import unittest
.mock
as mock
20 from io
import StringIO
as StringIO
22 import collections
.abc
as collections_abc
23 elif sys
.version_info
>= (3, 0):
24 raise RuntimeError("Python 3 earlier than 3.3 is not supported.")
25 elif sys
.version_info
>= (2, 7):
26 # Python 2 standard library.
27 import __builtin__
as builtins
28 # Third-party backport of Python 3 unittest improvements.
29 import unittest2
as unittest
30 # Third-party mock library.
32 # Python 2 standard library.
33 from StringIO
import StringIO
as BaseStringIO
34 import ConfigParser
as configparser
35 import collections
as collections_abc
37 raise RuntimeError("Python earlier than 2.7 is not supported.")
64 # Alias for Python 3 types.
69 def make_unique_slug(testcase
):
70 """ Make a unique slug for the test case. """
71 text
= base64
.b64encode(
72 testcase
.getUniqueString().encode('utf-8')
81 # We don't yet have the StringIO we want. Create it.
83 class StringIO(BaseStringIO
, object):
84 """ StringIO with a context manager. """
89 def __exit__(self
, *args
):
103 def patch_stdout(testcase
):
104 """ Patch `sys.stdout` for the specified test case. """
105 patcher
= mock
.patch
.object(
106 sys
, "stdout", wraps
=StringIO())
108 testcase
.addCleanup(patcher
.stop
)
111 def patch_stderr(testcase
):
112 """ Patch `sys.stderr` for the specified test case. """
113 patcher
= mock
.patch
.object(
114 sys
, "stderr", wraps
=StringIO())
116 testcase
.addCleanup(patcher
.stop
)
119 def patch_signal_signal(testcase
):
120 """ Patch `signal.signal` for the specified test case. """
121 func_patcher
= mock
.patch
.object(signal
, "signal", autospec
=True)
123 testcase
.addCleanup(func_patcher
.stop
)
126 class FakeSystemExit(Exception):
127 """ Fake double for `SystemExit` exception. """
130 EXIT_STATUS_SUCCESS
= 0
131 EXIT_STATUS_FAILURE
= 1
132 EXIT_STATUS_COMMAND_NOT_FOUND
= 127
135 def patch_sys_exit(testcase
):
136 """ Patch `sys.exit` for the specified test case. """
137 func_patcher
= mock
.patch
.object(
138 sys
, "exit", autospec
=True,
139 side_effect
=FakeSystemExit())
141 testcase
.addCleanup(func_patcher
.stop
)
144 def patch_sys_argv(testcase
):
145 """ Patch the `sys.argv` sequence for the test case. """
146 if not hasattr(testcase
, 'progname'):
147 testcase
.progname
= make_unique_slug(testcase
)
148 if not hasattr(testcase
, 'sys_argv'):
149 testcase
.sys_argv
= [testcase
.progname
]
150 patcher
= mock
.patch
.object(
152 new
=list(testcase
.sys_argv
))
154 testcase
.addCleanup(patcher
.stop
)
157 def patch_system_interfaces(testcase
):
158 """ Patch system interfaces that are disruptive to the test runner. """
159 patch_stdout(testcase
)
160 patch_stderr(testcase
)
161 patch_sys_exit(testcase
)
162 patch_sys_argv(testcase
)
165 def patch_time_time(testcase
, values
=None):
166 """ Patch the `time.time` function for the specified test case.
168 :param testcase: The `TestCase` instance for binding to the patch.
169 :param values: An iterable to provide return values.
174 values
= itertools
.count()
176 def generator_fake_time():
180 func_patcher
= mock
.patch
.object(time
, "time", autospec
=True)
182 testcase
.addCleanup(func_patcher
.stop
)
184 time
.time
.side_effect
= generator_fake_time()
187 def patch_os_environ(testcase
):
188 """ Patch the `os.environ` mapping. """
189 if not hasattr(testcase
, 'os_environ'):
190 testcase
.os_environ
= {}
191 patcher
= mock
.patch
.object(os
, "environ", new
=testcase
.os_environ
)
193 testcase
.addCleanup(patcher
.stop
)
196 def patch_os_getpid(testcase
):
197 """ Patch `os.getpid` for the specified test case. """
198 func_patcher
= mock
.patch
.object(os
, "getpid", autospec
=True)
200 testcase
.addCleanup(func_patcher
.stop
)
203 def patch_os_getuid(testcase
):
204 """ Patch the `os.getuid` function. """
205 if not hasattr(testcase
, 'os_getuid_return_value'):
206 testcase
.os_getuid_return_value
= testcase
.getUniqueInteger()
207 func_patcher
= mock
.patch
.object(
208 os
, "getuid", autospec
=True,
209 return_value
=testcase
.os_getuid_return_value
)
211 testcase
.addCleanup(func_patcher
.stop
)
214 PasswdEntry
= collections
.namedtuple(
216 "pw_name pw_passwd pw_uid pw_gid pw_gecos pw_dir pw_shell")
219 def patch_pwd_getpwuid(testcase
):
220 """ Patch the `pwd.getpwuid` function. """
221 if not hasattr(testcase
, 'pwd_getpwuid_return_value'):
222 testcase
.pwd_getpwuid_return_value
= PasswdEntry(
223 pw_name
=make_unique_slug(testcase
),
224 pw_passwd
=make_unique_slug(testcase
),
225 pw_uid
=testcase
.getUniqueInteger(),
226 pw_gid
=testcase
.getUniqueInteger(),
227 pw_gecos
=testcase
.getUniqueString(),
228 pw_dir
=tempfile
.mktemp(),
229 pw_shell
=tempfile
.mktemp())
230 if not isinstance(testcase
.pwd_getpwuid_return_value
, pwd
.struct_passwd
):
231 pwent
= pwd
.struct_passwd(testcase
.pwd_getpwuid_return_value
)
233 pwent
= testcase
.pwd_getpwuid_return_value
234 func_patcher
= mock
.patch
.object(
235 pwd
, "getpwuid", autospec
=True,
238 testcase
.addCleanup(func_patcher
.stop
)
241 def patch_os_path_exists(testcase
):
242 """ Patch `os.path.exists` behaviour for this test case.
244 When the patched function is called, the registry of
245 `FileDouble` instances for this test case will be used to get
246 the instance for the path specified.
249 orig_os_path_exists
= os
.path
.exists
251 def fake_os_path_exists(path
):
252 registry
= FileDouble
.get_registry_for_testcase(testcase
)
254 file_double
= registry
[path
]
255 result
= file_double
.os_path_exists_scenario
.call_hook()
257 result
= orig_os_path_exists(path
)
260 func_patcher
= mock
.patch
.object(
261 os
.path
, "exists", autospec
=True,
262 side_effect
=fake_os_path_exists
)
264 testcase
.addCleanup(func_patcher
.stop
)
267 def patch_os_access(testcase
):
268 """ Patch `os.access` behaviour for this test case.
270 When the patched function is called, the registry of
271 `FileDouble` instances for this test case will be used to get
272 the instance for the path specified.
275 orig_os_access
= os
.access
277 def fake_os_access(path
, mode
):
278 registry
= FileDouble
.get_registry_for_testcase(testcase
)
280 file_double
= registry
[path
]
281 result
= file_double
.os_access_scenario
.call_hook(mode
)
283 result
= orig_os_access(path
, mode
)
286 func_patcher
= mock
.patch
.object(
287 os
, "access", autospec
=True,
288 side_effect
=fake_os_access
)
290 testcase
.addCleanup(func_patcher
.stop
)
293 StatResult
= collections
.namedtuple(
296 'st_ino', 'st_dev', 'st_nlink',
299 'st_atime', 'st_mtime', 'st_ctime',
303 def patch_os_stat(testcase
):
304 """ Patch `os.stat` behaviour for this test case.
306 When the patched function is called, the registry of
307 `FileDouble` instances for this test case will be used to get
308 the instance for the path specified.
311 orig_os_stat
= os
.stat
313 def fake_os_stat(path
):
314 registry
= FileDouble
.get_registry_for_testcase(testcase
)
316 file_double
= registry
[path
]
317 result
= file_double
.os_stat_scenario
.call_hook()
319 result
= orig_os_stat(path
)
322 func_patcher
= mock
.patch
.object(
323 os
, "stat", autospec
=True,
324 side_effect
=fake_os_stat
)
326 testcase
.addCleanup(func_patcher
.stop
)
329 def patch_os_lstat(testcase
):
330 """ Patch `os.lstat` behaviour for this test case.
332 When the patched function is called, the registry of
333 `FileDouble` instances for this test case will be used to get
334 the instance for the path specified.
337 orig_os_lstat
= os
.lstat
339 def fake_os_lstat(path
):
340 registry
= FileDouble
.get_registry_for_testcase(testcase
)
342 file_double
= registry
[path
]
343 result
= file_double
.os_lstat_scenario
.call_hook()
345 result
= orig_os_lstat(path
)
348 func_patcher
= mock
.patch
.object(
349 os
, "lstat", autospec
=True,
350 side_effect
=fake_os_lstat
)
352 testcase
.addCleanup(func_patcher
.stop
)
355 def patch_os_unlink(testcase
):
356 """ Patch `os.unlink` behaviour for this test case.
358 When the patched function is called, the registry of
359 `FileDouble` instances for this test case will be used to get
360 the instance for the path specified.
363 orig_os_unlink
= os
.unlink
365 def fake_os_unlink(path
):
366 registry
= FileDouble
.get_registry_for_testcase(testcase
)
368 file_double
= registry
[path
]
369 result
= file_double
.os_unlink_scenario
.call_hook()
371 result
= orig_os_unlink(path
)
374 func_patcher
= mock
.patch
.object(
375 os
, "unlink", autospec
=True,
376 side_effect
=fake_os_unlink
)
378 testcase
.addCleanup(func_patcher
.stop
)
381 def patch_os_rmdir(testcase
):
382 """ Patch `os.rmdir` behaviour for this test case.
384 When the patched function is called, the registry of
385 `FileDouble` instances for this test case will be used to get
386 the instance for the path specified.
389 orig_os_rmdir
= os
.rmdir
391 def fake_os_rmdir(path
):
392 registry
= FileDouble
.get_registry_for_testcase(testcase
)
394 file_double
= registry
[path
]
395 result
= file_double
.os_rmdir_scenario
.call_hook()
397 result
= orig_os_rmdir(path
)
400 func_patcher
= mock
.patch
.object(
401 os
, "rmdir", autospec
=True,
402 side_effect
=fake_os_rmdir
)
404 testcase
.addCleanup(func_patcher
.stop
)
407 def patch_shutil_rmtree(testcase
):
408 """ Patch `shutil.rmtree` behaviour for this test case.
410 When the patched function is called, the registry of
411 `FileDouble` instances for this test case will be used to get
412 the instance for the path specified.
415 orig_shutil_rmtree
= os
.rmdir
417 def fake_shutil_rmtree(path
, ignore_errors
=False, onerror
=None):
418 registry
= FileDouble
.get_registry_for_testcase(testcase
)
420 file_double
= registry
[path
]
421 result
= file_double
.shutil_rmtree_scenario
.call_hook()
423 result
= orig_shutil_rmtree(path
)
426 func_patcher
= mock
.patch
.object(
427 shutil
, "rmtree", autospec
=True,
428 side_effect
=fake_shutil_rmtree
)
430 testcase
.addCleanup(func_patcher
.stop
)
433 def patch_tempfile_mkdtemp(testcase
):
434 """ Patch the `tempfile.mkdtemp` function for this test case. """
435 if not hasattr(testcase
, 'tempfile_mkdtemp_file_double'):
436 testcase
.tempfile_mkdtemp_file_double
= FileDouble(tempfile
.mktemp())
438 double
= testcase
.tempfile_mkdtemp_file_double
439 double
.set_os_unlink_scenario('okay')
440 double
.set_os_rmdir_scenario('okay')
441 double
.register_for_testcase(testcase
)
443 func_patcher
= mock
.patch
.object(tempfile
, "mkdtemp", autospec
=True)
445 testcase
.addCleanup(func_patcher
.stop
)
447 tempfile
.mkdtemp
.return_value
= testcase
.tempfile_mkdtemp_file_double
.path
455 # Python 2 uses IOError.
456 def _ensure_ioerror_args(init_args
, init_kwargs
, errno_value
):
457 result_kwargs
= init_kwargs
458 result_errno
= errno_value
459 result_strerror
= os
.strerror(errno_value
)
460 result_filename
= None
461 if len(init_args
) >= 3:
462 result_errno
= init_args
[0]
463 result_filename
= init_args
[2]
464 if 'errno' in init_kwargs
:
465 result_errno
= init_kwargs
['errno']
466 del result_kwargs
['errno']
467 if 'filename' in init_kwargs
:
468 result_filename
= init_kwargs
['filename']
469 del result_kwargs
['filename']
470 if len(init_args
) >= 2:
471 result_strerror
= init_args
[1]
472 if 'strerror' in init_kwargs
:
473 result_strerror
= init_kwargs
['strerror']
474 del result_kwargs
['strerror']
475 result_args
= (result_errno
, result_strerror
, result_filename
)
476 return (result_args
, result_kwargs
)
478 class FileNotFoundError(IOError):
479 def __init__(self
, *args
, **kwargs
):
480 (args
, kwargs
) = _ensure_ioerror_args(
481 args
, kwargs
, errno_value
=errno
.ENOENT
)
482 super(FileNotFoundError
, self
).__init
__(*args
, **kwargs
)
484 class FileExistsError(IOError):
485 def __init__(self
, *args
, **kwargs
):
486 (args
, kwargs
) = _ensure_ioerror_args(
487 args
, kwargs
, errno_value
=errno
.EEXIST
)
488 super(FileExistsError
, self
).__init
__(*args
, **kwargs
)
490 class PermissionError(IOError):
491 def __init__(self
, *args
, **kwargs
):
492 (args
, kwargs
) = _ensure_ioerror_args(
493 args
, kwargs
, errno_value
=errno
.EPERM
)
494 super(PermissionError
, self
).__init
__(*args
, **kwargs
)
497 def make_fake_file_scenarios(path
=None):
498 """ Make a collection of scenarios for testing with fake files.
500 :path: The filesystem path of the fake file. If not specified,
501 a valid random path will be generated.
502 :return: A collection of scenarios for tests involving input files.
504 The collection is a mapping from scenario name to a dictionary of
510 file_path
= tempfile
.mktemp()
514 fake_file_empty
= StringIO()
515 fake_file_minimal
= StringIO("Lorem ipsum.")
516 fake_file_large
= StringIO("\n".join(
518 for __
in range(1000)))
520 default_scenario_params
= {
521 'open_scenario_name': 'okay',
522 'file_double_params': dict(
523 path
=file_path
, fake_file
=fake_file_minimal
),
529 'open_scenario_name': 'nonexist',
532 'open_scenario_name': 'exist_error',
534 'error-read-denied': {
535 'open_scenario_name': 'read_denied',
538 'file_double_params': dict(
539 path
=file_path
, fake_file
=fake_file_empty
),
542 'file_double_params': dict(
543 path
=file_path
, fake_file
=fake_file_empty
),
546 'file_double_params': dict(
547 path
=file_path
, fake_file
=fake_file_minimal
),
550 'file_double_params': dict(
551 path
=file_path
, fake_file
=fake_file_large
),
555 for (name
, scenario
) in scenarios
.items():
556 params
= default_scenario_params
.copy()
557 params
.update(scenario
)
558 scenario
.update(params
)
559 scenario
['file_double'] = FileDouble(**scenario
['file_double_params'])
560 scenario
['file_double'].set_open_scenario(params
['open_scenario_name'])
561 scenario
['fake_file_scenario_name'] = name
566 def get_file_doubles_from_fake_file_scenarios(scenarios
):
567 """ Get the `FileDouble` instances from fake file scenarios.
569 :param scenarios: Collection of fake file scenarios.
570 :return: Collection of `FileDouble` instances.
574 scenario
['file_double']
575 for scenario
in scenarios
576 if scenario
['file_double'] is not None)
581 def setup_file_double_behaviour(testcase
, doubles
=None):
582 """ Set up file double instances and behaviour.
584 :param testcase: The `TestCase` instance to modify.
585 :param doubles: Collection of `FileDouble` instances.
588 If `doubles` is ``None``, a default collection will be made
589 from the result of `make_fake_file_scenarios` result.
593 scenarios
= make_fake_file_scenarios()
594 doubles
= get_file_doubles_from_fake_file_scenarios(
597 for file_double
in doubles
:
598 file_double
.register_for_testcase(testcase
)
600 orig_open
= builtins
.open
602 def fake_open(path
, mode
='rt', buffering
=-1):
603 registry
= FileDouble
.get_registry_for_testcase(testcase
)
605 file_double
= registry
[path
]
606 result
= file_double
.builtins_open_scenario
.call_hook(
609 result
= orig_open(path
, mode
, buffering
)
612 mock_open
= mock
.mock_open()
613 mock_open
.side_effect
= fake_open
615 func_patcher
= mock
.patch
.object(
616 builtins
, "open", new
=mock_open
)
618 testcase
.addCleanup(func_patcher
.stop
)
621 def setup_fake_file_fixtures(testcase
):
622 """ Set up fixtures for fake file doubles.
624 :param testcase: The `TestCase` instance to modify.
628 scenarios
= make_fake_file_scenarios()
629 testcase
.fake_file_scenarios
= scenarios
631 file_doubles
= get_file_doubles_from_fake_file_scenarios(
633 setup_file_double_behaviour(testcase
, file_doubles
)
636 def set_fake_file_scenario(testcase
, name
):
637 """ Set the named fake file scenario for the test case. """
638 scenario
= testcase
.fake_file_scenarios
[name
]
639 testcase
.fake_file_scenario
= scenario
640 testcase
.file_double
= scenario
['file_double']
641 testcase
.file_double
.register_for_testcase(testcase
)
644 class TestDoubleFunctionScenario
:
645 """ Scenario for fake behaviour of a specific function. """
647 def __init__(self
, scenario_name
, double
):
648 self
.scenario_name
= scenario_name
651 self
.call_hook
= getattr(
652 self
, "_hook_{name}".format(name
=self
.scenario_name
))
656 "<{class_name} instance: {id}"
658 " call_hook name: {hook_name!r}"
659 " double: {double!r}"
661 class_name
=self
.__class
__.__name
__, id=id(self
),
662 name
=self
.scenario_name
, double
=self
.double
,
663 hook_name
=self
.call_hook
.__name
__)
666 def __eq__(self
, other
):
668 if not self
.scenario_name
== other
.scenario_name
:
670 if not self
.double
== other
.double
:
672 if not self
.call_hook
.__name
__ == other
.call_hook
.__name
__:
676 def __ne__(self
, other
):
677 result
= not self
.__eq
__(other
)
681 class os_path_exists_scenario(TestDoubleFunctionScenario
):
682 """ Scenario for `os.path.exists` behaviour. """
684 def _hook_exist(self
):
687 def _hook_not_exist(self
):
691 class os_access_scenario(TestDoubleFunctionScenario
):
692 """ Scenario for `os.access` behaviour. """
694 def _hook_okay(self
, mode
):
697 def _hook_not_exist(self
, mode
):
700 def _hook_read_only(self
, mode
):
701 if mode
& (os
.W_OK | os
.X_OK
):
707 def _hook_denied(self
, mode
):
708 if mode
& (os
.R_OK | os
.W_OK | os
.X_OK
):
715 class os_stat_scenario(TestDoubleFunctionScenario
):
716 """ Scenario for `os.stat` behaviour. """
718 def _hook_okay(self
):
719 return self
.double
.stat_result
721 def _hook_notfound_error(self
):
722 raise FileNotFoundError(
724 "No such file or directory: {path!r}".format(
725 path
=self
.double
.path
))
727 def _hook_denied_error(self
):
728 raise PermissionError(
733 class os_lstat_scenario(os_stat_scenario
):
734 """ Scenario for `os.lstat` behaviour. """
737 class os_unlink_scenario(TestDoubleFunctionScenario
):
738 """ Scenario for `os.unlink` behaviour. """
740 def _hook_okay(self
):
743 def _hook_nonexist(self
):
744 error
= FileNotFoundError(
746 "No such file or directory: {path!r}".format(
747 path
=self
.double
.path
))
750 def _hook_denied(self
):
751 error
= PermissionError(
757 class os_rmdir_scenario(TestDoubleFunctionScenario
):
758 """ Scenario for `os.rmdir` behaviour. """
760 def _hook_okay(self
):
763 def _hook_nonexist(self
):
764 error
= FileNotFoundError(
766 "No such file or directory: {path!r}".format(
767 path
=self
.double
.path
))
770 def _hook_denied(self
):
771 error
= PermissionError(
777 class shutil_rmtree_scenario(TestDoubleFunctionScenario
):
778 """ Scenario for `shutil.rmtree` behaviour. """
780 def _hook_okay(self
):
783 def _hook_nonexist(self
):
784 error
= FileNotFoundError(
786 "No such file or directory: {path!r}".format(
787 path
=self
.double
.path
))
790 def _hook_denied(self
):
791 error
= PermissionError(
797 class builtins_open_scenario(TestDoubleFunctionScenario
):
798 """ Scenario for `builtins.open` behaviour. """
800 def _hook_okay(self
, mode
, buffering
):
801 result
= self
.double
.fake_file
804 def _hook_nonexist(self
, mode
, buffering
):
805 if mode
.startswith('r'):
806 error
= FileNotFoundError(
808 "No such file or directory: {path!r}".format(
809 path
=self
.double
.path
))
811 result
= self
.double
.fake_file
814 def _hook_exist_error(self
, mode
, buffering
):
815 if mode
.startswith('w') or mode
.startswith('a'):
816 error
= FileExistsError(
818 "File already exists: {path!r}".format(
819 path
=self
.double
.path
))
821 result
= self
.double
.fake_file
824 def _hook_read_denied(self
, mode
, buffering
):
825 if mode
.startswith('r'):
826 error
= PermissionError(
828 "Read denied on {path!r}".format(
829 path
=self
.double
.path
))
831 result
= self
.double
.fake_file
834 def _hook_write_denied(self
, mode
, buffering
):
835 if mode
.startswith('w') or mode
.startswith('a'):
836 error
= PermissionError(
838 "Write denied on {path!r}".format(
839 path
=self
.double
.path
))
841 result
= self
.double
.fake_file
845 class TestDoubleWithRegistry
:
846 """ Abstract base class for a test double with a test case registry. """
848 registry_class
= NotImplemented
849 registries
= NotImplemented
851 function_scenario_params_by_class
= NotImplemented
853 def __new__(cls
, *args
, **kwargs
):
854 superclass
= super(TestDoubleWithRegistry
, cls
)
855 if superclass
.__new
__ is object.__new
__:
856 # The ‘object’ implementation complains about extra arguments.
857 instance
= superclass
.__new
__(cls
)
859 instance
= superclass
.__new
__(cls
, *args
, **kwargs
)
860 instance
.make_set_scenario_methods()
864 def __init__(self
, *args
, **kwargs
):
865 super(TestDoubleWithRegistry
, self
).__init
__(*args
, **kwargs
)
866 self
._set
_method
_per
_scenario
()
868 def _make_set_scenario_method(self
, scenario_class
, params
):
869 def method(self
, name
):
870 scenario
= scenario_class(name
, double
=self
)
871 setattr(self
, scenario_class
.__name
__, scenario
)
873 """ Set the scenario for `{name}` behaviour. """
874 ).format(name
=scenario_class
.__name
__)
875 method
.__name
__ = str(params
['set_scenario_method_name'])
878 def make_set_scenario_methods(self
):
879 """ Make `set_<scenario_class_name>` methods on this class. """
880 for (function_scenario_class
, function_scenario_params
) in (
881 self
.function_scenario_params_by_class
.items()):
882 method
= self
._make
_set
_scenario
_method
(
883 function_scenario_class
, function_scenario_params
)
884 setattr(self
.__class
__, method
.__name
__, method
)
885 function_scenario_params
['set_scenario_method'] = method
887 def _set_method_per_scenario(self
):
888 """ Set the method to be called for each scenario. """
889 for function_scenario_params
in (
890 self
.function_scenario_params_by_class
.values()):
891 function_scenario_params
['set_scenario_method'](
892 self
, function_scenario_params
['default_scenario_name'])
895 def get_registry_for_testcase(cls
, testcase
):
896 """ Get the FileDouble registry for the specified test case. """
897 # Key in a dict must be hashable.
898 key
= (testcase
.__class
__, id(testcase
))
899 registry
= cls
.registries
.setdefault(key
, cls
.registry_class())
902 def get_registry_key(self
):
903 """ Get the registry key for this double. """
904 raise NotImplementedError
906 def register_for_testcase(self
, testcase
):
907 """ Add this instance to registry for the specified testcase. """
908 registry
= self
.get_registry_for_testcase(testcase
)
909 key
= self
.get_registry_key()
911 unregister_func
= functools
.partial(
912 self
.unregister_for_testcase
, testcase
)
913 testcase
.addCleanup(unregister_func
)
915 def unregister_for_testcase(self
, testcase
):
916 """ Remove this instance from registry for the specified testcase. """
917 registry
= self
.get_registry_for_testcase(testcase
)
918 key
= self
.get_registry_key()
923 def copy_fake_file(fake_file
):
924 """ Make a copy of the StringIO instance. """
925 fake_file_type
= StringIO
927 if fake_file
is not None:
928 fake_file_type
= type(fake_file
)
929 content
= fake_file
.getvalue()
930 assert issubclass(fake_file_type
, object)
931 result
= fake_file_type(content
)
932 if hasattr(fake_file
, 'encoding'):
933 if not hasattr(result
, 'encoding'):
934 result
.encoding
= fake_file
.encoding
938 class FileDouble(TestDoubleWithRegistry
):
939 """ A testing double for a file. """
941 registry_class
= dict
944 function_scenario_params_by_class
= {
945 os_path_exists_scenario
: {
946 'default_scenario_name': 'not_exist',
947 'set_scenario_method_name': 'set_os_path_exists_scenario',
949 os_access_scenario
: {
950 'default_scenario_name': 'okay',
951 'set_scenario_method_name': 'set_os_access_scenario',
954 'default_scenario_name': 'okay',
955 'set_scenario_method_name': 'set_os_stat_scenario',
958 'default_scenario_name': 'okay',
959 'set_scenario_method_name': 'set_os_lstat_scenario',
961 builtins_open_scenario
: {
962 'default_scenario_name': 'okay',
963 'set_scenario_method_name': 'set_open_scenario',
965 os_unlink_scenario
: {
966 'default_scenario_name': 'okay',
967 'set_scenario_method_name': 'set_os_unlink_scenario',
970 'default_scenario_name': 'okay',
971 'set_scenario_method_name': 'set_os_rmdir_scenario',
973 shutil_rmtree_scenario
: {
974 'default_scenario_name': 'okay',
975 'set_scenario_method_name': 'set_shutil_rmtree_scenario',
979 def __init__(self
, path
=None, fake_file
=None, *args
, **kwargs
):
981 self
.fake_file
= copy_fake_file(fake_file
)
982 self
.fake_file
.name
= path
984 self
._set
_stat
_result
()
986 super(FileDouble
, self
).__init
__(*args
, **kwargs
)
988 def _set_stat_result(self
):
989 """ Set the `os.stat` result for this file. """
990 size
= len(self
.fake_file
.getvalue())
991 self
.stat_result
= StatResult(
993 st_ino
=None, st_dev
=None, st_nlink
=None,
996 st_atime
=None, st_mtime
=None, st_ctime
=None,
1000 text
= "FileDouble(path={path!r}, fake_file={fake_file!r})".format(
1001 path
=self
.path
, fake_file
=self
.fake_file
)
1004 def get_registry_key(self
):
1005 """ Get the registry key for this double. """
1010 class os_popen_scenario(TestDoubleFunctionScenario
):
1011 """ Scenario for `os.popen` behaviour. """
1013 stream_name_by_mode
= {
1018 def _hook_success(self
, argv
, mode
, buffering
):
1019 stream_name
= self
.stream_name_by_mode
[mode
]
1020 stream_double
= getattr(
1021 self
.double
, stream_name
+ '_double')
1022 result
= stream_double
.fake_file
1025 def _hook_failure(self
, argv
, mode
, buffering
):
1029 def _hook_not_found(self
, argv
, mode
, buffering
):
1034 class os_waitpid_scenario(TestDoubleFunctionScenario
):
1035 """ Scenario for `os.waitpid` behaviour. """
1037 def _hook_success(self
, pid
, options
):
1038 result
= (pid
, EXIT_STATUS_SUCCESS
)
1041 def _hook_failure(self
, pid
, options
):
1042 result
= (pid
, EXIT_STATUS_FAILURE
)
1045 def _hook_not_found(self
, pid
, options
):
1046 error
= OSError(errno
.ECHILD
)
1050 class os_system_scenario(TestDoubleFunctionScenario
):
1051 """ Scenario for `os.system` behaviour. """
1053 def _hook_success(self
, command
):
1054 result
= EXIT_STATUS_SUCCESS
1057 def _hook_failure(self
, command
):
1058 result
= EXIT_STATUS_FAILURE
1061 def _hook_not_found(self
, command
):
1062 result
= EXIT_STATUS_COMMAND_NOT_FOUND
1066 class os_spawnv_scenario(TestDoubleFunctionScenario
):
1067 """ Scenario for `os.spawnv` behaviour. """
1069 def _hook_success(self
, mode
, file, args
):
1070 result
= EXIT_STATUS_SUCCESS
1073 def _hook_failure(self
, mode
, file, args
):
1074 result
= EXIT_STATUS_FAILURE
1077 def _hook_not_found(self
, mode
, file, args
):
1078 result
= EXIT_STATUS_COMMAND_NOT_FOUND
1087 """ A testing double for `subprocess.Popen`. """
1089 def __init__(self
, args
, *posargs
, **kwargs
):
1094 self
.returncode
= None
1096 if kwargs
.get('shell', False):
1097 self
.argv
= shlex
.split(args
)
1099 # The paramter is already a sequence of command-line arguments.
1102 def set_streams(self
, subprocess_double
, popen_kwargs
):
1103 """ Set the streams on the `PopenDouble`.
1105 :param subprocess_double: The `SubprocessDouble` from
1106 which to get existing stream doubles.
1107 :param popen_kwargs: The keyword arguments to the
1108 `subprocess.Popen` call.
1112 for stream_name
in (
1113 name
for name
in ['stdin', 'stdout', 'stderr']
1114 if name
in popen_kwargs
):
1115 stream_spec
= popen_kwargs
[stream_name
]
1116 if stream_spec
is subprocess
.PIPE
:
1117 stream_double
= getattr(
1119 "{name}_double".format(name
=stream_name
))
1120 stream_file
= stream_double
.fake_file
1121 elif stream_spec
is subprocess
.STDOUT
:
1122 stream_file
= subprocess_double
.stdout_double
.fake_file
1124 stream_file
= stream_spec
1125 setattr(self
, stream_name
, stream_file
)
1128 """ Wait for subprocess to terminate. """
1129 return self
.returncode
1132 class subprocess_popen_scenario(TestDoubleFunctionScenario
):
1133 """ Scenario for `subprocess.Popen` behaviour. """
1135 def _hook_success(self
, testcase
, args
, *posargs
, **kwargs
):
1136 double
= self
.double
.popen_double
1137 double
.set_streams(self
.double
, kwargs
)
1141 def patch_subprocess_popen(testcase
):
1142 """ Patch `subprocess.Popen` constructor for this test case.
1144 :param testcase: The `TestCase` instance to modify.
1147 When the patched function is called, the registry of
1148 `SubprocessDouble` instances for this test case will be used
1149 to get the instance for the program path specified.
1152 orig_subprocess_popen
= subprocess
.Popen
1154 def fake_subprocess_popen(args
, *posargs
, **kwargs
):
1155 if kwargs
.get('shell', False):
1156 argv
= shlex
.split(args
)
1159 registry
= SubprocessDouble
.get_registry_for_testcase(testcase
)
1160 if argv
in registry
:
1161 subprocess_double
= registry
[argv
]
1162 result
= subprocess_double
.subprocess_popen_scenario
.call_hook(
1163 testcase
, args
, *posargs
, **kwargs
)
1165 result
= orig_subprocess_popen(args
, *posargs
, **kwargs
)
1168 func_patcher
= mock
.patch
.object(
1169 subprocess
, "Popen", autospec
=True,
1170 side_effect
=fake_subprocess_popen
)
1171 func_patcher
.start()
1172 testcase
.addCleanup(func_patcher
.stop
)
1175 class subprocess_call_scenario(TestDoubleFunctionScenario
):
1176 """ Scenario for `subprocess.call` behaviour. """
1178 def _hook_success(self
, command
):
1179 result
= EXIT_STATUS_SUCCESS
1182 def _hook_failure(self
, command
):
1183 result
= EXIT_STATUS_FAILURE
1186 def _hook_not_found(self
, command
):
1187 result
= EXIT_STATUS_COMMAND_NOT_FOUND
1191 def patch_subprocess_call(testcase
):
1192 """ Patch `subprocess.call` function for this test case.
1194 :param testcase: The `TestCase` instance to modify.
1197 When the patched function is called, the registry of
1198 `SubprocessDouble` instances for this test case will be used
1199 to get the instance for the program path specified.
1202 orig_subprocess_call
= subprocess
.call
1204 def fake_subprocess_call(command
, *posargs
, **kwargs
):
1205 if kwargs
.get('shell', False):
1206 command_argv
= shlex
.split(command
)
1208 command_argv
= command
1209 registry
= SubprocessDouble
.get_registry_for_testcase(testcase
)
1210 if command_argv
in registry
:
1211 subprocess_double
= registry
[command_argv
]
1212 result
= subprocess_double
.subprocess_call_scenario
.call_hook(
1215 result
= orig_subprocess_call(command
, *posargs
, **kwargs
)
1218 func_patcher
= mock
.patch
.object(
1219 subprocess
, "call", autospec
=True,
1220 side_effect
=fake_subprocess_call
)
1221 func_patcher
.start()
1222 testcase
.addCleanup(func_patcher
.stop
)
1225 class subprocess_check_call_scenario(TestDoubleFunctionScenario
):
1226 """ Scenario for `subprocess.check_call` behaviour. """
1228 def _hook_success(self
, command
):
1231 def _hook_failure(self
, command
):
1232 result
= EXIT_STATUS_FAILURE
1233 error
= subprocess
.CalledProcessError(result
, command
)
1236 def _hook_not_found(self
, command
):
1237 result
= EXIT_STATUS_COMMAND_NOT_FOUND
1238 error
= subprocess
.CalledProcessError(result
, command
)
1242 def patch_subprocess_check_call(testcase
):
1243 """ Patch `subprocess.check_call` function for this test case.
1245 :param testcase: The `TestCase` instance to modify.
1248 When the patched function is called, the registry of
1249 `SubprocessDouble` instances for this test case will be used
1250 to get the instance for the program path specified.
1253 orig_subprocess_check_call
= subprocess
.check_call
1255 def fake_subprocess_check_call(command
, *posargs
, **kwargs
):
1256 if kwargs
.get('shell', False):
1257 command_argv
= shlex
.split(command
)
1259 command_argv
= command
1260 registry
= SubprocessDouble
.get_registry_for_testcase(testcase
)
1261 if command_argv
in registry
:
1262 subprocess_double
= registry
[command_argv
]
1263 scenario
= subprocess_double
.subprocess_check_call_scenario
1264 result
= scenario
.call_hook(command
)
1266 result
= orig_subprocess_check_call(command
, *posargs
, **kwargs
)
1269 func_patcher
= mock
.patch
.object(
1270 subprocess
, "check_call", autospec
=True,
1271 side_effect
=fake_subprocess_check_call
)
1272 func_patcher
.start()
1273 testcase
.addCleanup(func_patcher
.stop
)
1276 class SubprocessDoubleRegistry(collections_abc
.MutableMapping
):
1277 """ Registry of `SubprocessDouble` instances by `argv`. """
1279 def __init__(self
, *args
, **kwargs
):
1282 if isinstance(args
[0], collections_abc
.Mapping
):
1283 items
= args
[0].items()
1284 if isinstance(args
[0], collections_abc
.Iterable
):
1286 self
._mapping
= dict(items
)
1289 text
= "<{class_name} object: {mapping}>".format(
1290 class_name
=self
.__class
__.__name
__, mapping
=self
._mapping
)
1293 def _match_argv(self
, argv
):
1294 """ Match the specified `argv` with our registered keys. """
1296 if not isinstance(argv
, collections_abc
.Sequence
):
1298 candidates
= iter(self
._mapping
)
1299 while match
is None:
1301 candidate
= next(candidates
)
1302 except StopIteration:
1305 if candidate
== argv
:
1308 word_iter
= enumerate(candidate
)
1309 while found
is None:
1311 (word_index
, candidate_word
) = next(word_iter
)
1312 except StopIteration:
1314 if candidate_word
is ARG_MORE
:
1315 # Candiate matches any remaining words. We have a match.
1317 elif word_index
> len(argv
):
1318 # Candidate is too long for the specified argv.
1320 elif candidate_word
is ARG_ANY
:
1321 # Candidate matches any word at this position.
1323 elif candidate_word
== argv
[word_index
]:
1324 # Candidate matches the word at this position.
1327 # This candidate does not match.
1330 # Reached the end of the candidate without a mismatch.
1336 def __getitem__(self
, key
):
1337 match
= self
._match
_argv
(key
)
1340 result
= self
._mapping
[match
]
1343 def __setitem__(self
, key
, value
):
1346 self
._mapping
[key
] = value
1348 def __delitem__(self
, key
):
1349 match
= self
._match
_argv
(key
)
1350 if match
is not None:
1351 del self
._mapping
[match
]
1354 return self
._mapping
.__iter
__()
1357 return self
._mapping
.__len
__()
1360 class SubprocessDouble(TestDoubleWithRegistry
):
1361 """ A testing double for a subprocess. """
1363 registry_class
= SubprocessDoubleRegistry
1366 double_by_pid
= weakref
.WeakValueDictionary()
1368 function_scenario_params_by_class
= {
1369 subprocess_popen_scenario
: {
1370 'default_scenario_name': 'success',
1371 'set_scenario_method_name': 'set_subprocess_popen_scenario',
1373 subprocess_call_scenario
: {
1374 'default_scenario_name': 'success',
1375 'set_scenario_method_name': 'set_subprocess_call_scenario',
1377 subprocess_check_call_scenario
: {
1378 'default_scenario_name': 'success',
1379 'set_scenario_method_name':
1380 'set_subprocess_check_call_scenario',
1382 os_popen_scenario
: {
1383 'default_scenario_name': 'success',
1384 'set_scenario_method_name': 'set_os_popen_scenario',
1386 os_waitpid_scenario
: {
1387 'default_scenario_name': 'success',
1388 'set_scenario_method_name': 'set_os_waitpid_scenario',
1390 os_system_scenario
: {
1391 'default_scenario_name': 'success',
1392 'set_scenario_method_name': 'set_os_system_scenario',
1394 os_spawnv_scenario
: {
1395 'default_scenario_name': 'success',
1396 'set_scenario_method_name': 'set_os_spawnv_scenario',
1400 def __init__(self
, path
=None, argv
=None, *args
, **kwargs
):
1402 path
= tempfile
.mktemp()
1406 command_name
= os
.path
.basename(path
)
1407 argv
= [command_name
]
1410 self
.pid
= self
._make
_pid
()
1411 self
._register
_by
_pid
()
1413 self
.set_popen_double()
1415 stream_class
= SubprocessDouble
.stream_class
1416 for stream_name
in ['stdin', 'stdout', 'stderr']:
1417 fake_file
= stream_class()
1418 file_double
= FileDouble(fake_file
=fake_file
)
1419 stream_double_name
= '{name}_double'.format(name
=stream_name
)
1420 setattr(self
, stream_double_name
, file_double
)
1422 super(SubprocessDouble
, self
).__init
__(*args
, **kwargs
)
1424 def set_popen_double(self
):
1425 """ Set the `PopenDouble` for this instance. """
1426 double
= PopenDouble(self
.argv
)
1427 double
.pid
= self
.pid
1429 self
.popen_double
= double
1433 "<SubprocessDouble instance: {id}"
1436 " stdin_double: {stdin_double!r}"
1437 " stdout_double: {stdout_double!r}"
1438 " stderr_double: {stderr_double!r}"
1441 path
=self
.path
, argv
=self
.argv
,
1442 stdin_double
=self
.stdin_double
,
1443 stdout_double
=self
.stdout_double
,
1444 stderr_double
=self
.stderr_double
)
1449 """ Make a unique PID for a subprocess. """
1450 for pid
in itertools
.count(1):
1453 def _register_by_pid(self
):
1454 """ Register this subprocess by its PID. """
1455 self
.__class
__.double_by_pid
[self
.pid
] = self
1457 def get_registry_key(self
):
1458 """ Get the registry key for this double. """
1459 result
= tuple(self
.argv
)
1462 stream_class
= io
.BytesIO
1463 stream_encoding
= "utf-8"
1465 def set_stdin_content(self
, text
, bytes_encoding
=stream_encoding
):
1466 """ Set the content of the `stdin` stream for this double. """
1467 content
= text
.encode(bytes_encoding
)
1468 fake_file
= self
.stream_class(content
)
1469 self
.stdin_double
.fake_file
= fake_file
1471 def set_stdout_content(self
, text
, bytes_encoding
=stream_encoding
):
1472 """ Set the content of the `stdout` stream for this double. """
1473 content
= text
.encode(bytes_encoding
)
1474 fake_file
= self
.stream_class(content
)
1475 self
.stdout_double
.fake_file
= fake_file
1477 def set_stderr_content(self
, text
, bytes_encoding
=stream_encoding
):
1478 """ Set the content of the `stderr` stream for this double. """
1479 content
= text
.encode(bytes_encoding
)
1480 fake_file
= self
.stream_class(content
)
1481 self
.stderr_double
.fake_file
= fake_file
1484 def make_fake_subprocess_scenarios(path
=None):
1485 """ Make a collection of scenarios for testing with fake files.
1487 :path: The filesystem path of the fake program. If not specified,
1488 a valid random path will be generated.
1489 :return: A collection of scenarios for tests involving subprocesses.
1491 The collection is a mapping from scenario name to a dictionary of
1492 scenario attributes.
1496 file_path
= tempfile
.mktemp()
1500 default_scenario_params
= {
1501 'return_value': EXIT_STATUS_SUCCESS
,
1502 'program_path': file_path
,
1503 'argv_after_command_name': [],
1509 'return_value': EXIT_STATUS_COMMAND_NOT_FOUND
,
1513 for (name
, scenario
) in scenarios
.items():
1514 params
= default_scenario_params
.copy()
1515 params
.update(scenario
)
1516 scenario
.update(params
)
1517 program_path
= params
['program_path']
1518 program_name
= os
.path
.basename(params
['program_path'])
1519 argv
= [program_name
]
1520 argv
.extend(params
['argv_after_command_name'])
1521 subprocess_double_params
= dict(
1525 subprocess_double
= SubprocessDouble(**subprocess_double_params
)
1526 scenario
['subprocess_double'] = subprocess_double
1527 scenario
['fake_file_scenario_name'] = name
1532 def get_subprocess_doubles_from_fake_subprocess_scenarios(scenarios
):
1533 """ Get the `SubprocessDouble` instances from fake subprocess scenarios.
1535 :param scenarios: Collection of fake subprocess scenarios.
1536 :return: Collection of `SubprocessDouble` instances.
1540 scenario
['subprocess_double']
1541 for scenario
in scenarios
1542 if scenario
['subprocess_double'] is not None)
1547 def setup_subprocess_double_behaviour(testcase
, doubles
=None):
1548 """ Set up subprocess double instances and behaviour.
1550 :param testcase: The `TestCase` instance to modify.
1551 :param doubles: Collection of `SubprocessDouble` instances.
1554 If `doubles` is ``None``, a default collection will be made
1555 from the return value of `make_fake_subprocess_scenarios`.
1559 scenarios
= make_fake_subprocess_scenarios()
1560 doubles
= get_subprocess_doubles_from_fake_subprocess_scenarios(
1563 for double
in doubles
:
1564 double
.register_for_testcase(testcase
)
1567 def setup_fake_subprocess_fixtures(testcase
):
1568 """ Set up fixtures for fake subprocess doubles.
1570 :param testcase: The `TestCase` instance to modify.
1574 scenarios
= make_fake_subprocess_scenarios()
1575 testcase
.fake_subprocess_scenarios
= scenarios
1577 doubles
= get_subprocess_doubles_from_fake_subprocess_scenarios(
1579 setup_subprocess_double_behaviour(testcase
, doubles
)
1582 def patch_os_popen(testcase
):
1583 """ Patch `os.popen` behaviour for this test case.
1585 :param testcase: The `TestCase` instance to modify.
1588 When the patched function is called, the registry of
1589 `SubprocessDouble` instances for this test case will be used
1590 to get the instance for the program path specified.
1593 orig_os_popen
= os
.popen
1595 def fake_os_popen(cmd
, mode
='r', buffering
=-1):
1596 registry
= SubprocessDouble
.get_registry_for_testcase(testcase
)
1597 if isinstance(cmd
, basestring
):
1598 command_argv
= shlex
.split(cmd
)
1601 if command_argv
in registry
:
1602 subprocess_double
= registry
[command_argv
]
1603 result
= subprocess_double
.os_popen_scenario
.call_hook(
1604 command_argv
, mode
, buffering
)
1606 result
= orig_os_popen(cmd
, mode
, buffering
)
1609 func_patcher
= mock
.patch
.object(
1610 os
, "popen", autospec
=True,
1611 side_effect
=fake_os_popen
)
1612 func_patcher
.start()
1613 testcase
.addCleanup(func_patcher
.stop
)
1616 def patch_os_waitpid(testcase
):
1617 """ Patch `os.waitpid` behaviour for this test case.
1619 :param testcase: The `TestCase` instance to modify.
1622 When the patched function is called, the registry of
1623 `SubprocessDouble` instances for this test case will be used
1624 to get the instance for the program path specified.
1627 orig_os_waitpid
= os
.waitpid
1629 def fake_os_waitpid(pid
, options
):
1630 registry
= SubprocessDouble
.double_by_pid
1632 subprocess_double
= registry
[pid
]
1633 result
= subprocess_double
.os_waitpid_scenario
.call_hook(
1636 result
= orig_os_waitpid(pid
, options
)
1639 func_patcher
= mock
.patch
.object(
1640 os
, "waitpid", autospec
=True,
1641 side_effect
=fake_os_waitpid
)
1642 func_patcher
.start()
1643 testcase
.addCleanup(func_patcher
.stop
)
1646 def patch_os_system(testcase
):
1647 """ Patch `os.system` behaviour for this test case.
1649 :param testcase: The `TestCase` instance to modify.
1652 When the patched function is called, the registry of
1653 `SubprocessDouble` instances for this test case will be used
1654 to get the instance for the program path specified.
1657 orig_os_system
= os
.system
1659 def fake_os_system(command
):
1660 registry
= SubprocessDouble
.get_registry_for_testcase(testcase
)
1661 command_argv
= shlex
.split(command
)
1662 if command_argv
in registry
:
1663 subprocess_double
= registry
[command_argv
]
1664 result
= subprocess_double
.os_system_scenario
.call_hook(
1667 result
= orig_os_system(command
)
1670 func_patcher
= mock
.patch
.object(
1671 os
, "system", autospec
=True,
1672 side_effect
=fake_os_system
)
1673 func_patcher
.start()
1674 testcase
.addCleanup(func_patcher
.stop
)
1677 def patch_os_spawnv(testcase
):
1678 """ Patch `os.spawnv` behaviour for this test case.
1680 :param testcase: The `TestCase` instance to modify.
1683 When the patched function is called, the registry of
1684 `SubprocessDouble` instances for this test case will be used
1685 to get the instance for the program path specified.
1688 orig_os_spawnv
= os
.spawnv
1690 def fake_os_spawnv(mode
, file, args
):
1691 registry
= SubprocessDouble
.get_registry_for_testcase(testcase
)
1692 registry_key
= tuple(args
)
1693 if registry_key
in registry
:
1694 subprocess_double
= registry
[registry_key
]
1695 result
= subprocess_double
.os_spawnv_scenario
.call_hook(
1698 result
= orig_os_spawnv(mode
, file, args
)
1701 func_patcher
= mock
.patch
.object(
1702 os
, "spawnv", autospec
=True,
1703 side_effect
=fake_os_spawnv
)
1704 func_patcher
.start()
1705 testcase
.addCleanup(func_patcher
.stop
)
1708 # Copyright © 2015–2016 Ben Finney <bignose@debian.org>
1710 # This is free software: you may copy, modify, and/or distribute this work
1711 # under the terms of the GNU General Public License as published by the
1712 # Free Software Foundation; version 3 of that license or any later version.
1713 # No warranty expressed or implied. See the file ‘LICENSE.GPL-3’ for details.
1720 # vim: fileencoding=utf-8 filetype=python :