1 # -*- coding: utf-8; -*-
4 # Part of ‘dput’, a Debian package upload toolkit.
6 # Copyright © 2015–2016 Ben Finney <ben+python@benfinney.id.au>
8 # This is free software: you may copy, modify, and/or distribute this work
9 # under the terms of the GNU General Public License as published by the
10 # Free Software Foundation; version 3 of that license or any later version.
11 # No warranty expressed or implied. See the file ‘LICENSE.GPL-3’ for details.
13 """ Helper functionality for Dput test suite. """
15 from __future__
import (absolute_import
, unicode_literals
)
19 if sys
.version_info
>= (3, 3):
22 import unittest
.mock
as mock
23 from io
import StringIO
as StringIO
25 import collections
.abc
as collections_abc
26 elif sys
.version_info
>= (3, 0):
27 raise RuntimeError("Python 3 earlier than 3.3 is not supported.")
28 elif sys
.version_info
>= (2, 7):
29 # Python 2 standard library.
30 import __builtin__
as builtins
31 # Third-party backport of Python 3 unittest improvements.
32 import unittest2
as unittest
33 # Third-party mock library.
35 # Python 2 standard library.
36 from StringIO
import StringIO
as BaseStringIO
37 import ConfigParser
as configparser
38 import collections
as collections_abc
40 raise RuntimeError("Python earlier than 2.7 is not supported.")
59 __package__
= str("test")
60 __import__(__package__
)
69 # Alias for Python 3 types.
74 def make_unique_slug(testcase
):
75 """ Make a unique slug for the test case. """
76 text
= base64
.b64encode(
77 testcase
.getUniqueString().encode('utf-8')
86 # We don't yet have the StringIO we want. Create it.
88 class StringIO(BaseStringIO
, object):
89 """ StringIO with a context manager. """
94 def __exit__(self
, *args
):
108 def patch_stdout(testcase
):
109 """ Patch `sys.stdout` for the specified test case. """
110 patcher
= mock
.patch
.object(
111 sys
, "stdout", wraps
=StringIO())
113 testcase
.addCleanup(patcher
.stop
)
116 def patch_stderr(testcase
):
117 """ Patch `sys.stderr` for the specified test case. """
118 patcher
= mock
.patch
.object(
119 sys
, "stderr", wraps
=StringIO())
121 testcase
.addCleanup(patcher
.stop
)
124 def patch_signal_signal(testcase
):
125 """ Patch `signal.signal` for the specified test case. """
126 func_patcher
= mock
.patch
.object(signal
, "signal", autospec
=True)
128 testcase
.addCleanup(func_patcher
.stop
)
131 class FakeSystemExit(Exception):
132 """ Fake double for `SystemExit` exception. """
135 EXIT_STATUS_SUCCESS
= 0
136 EXIT_STATUS_FAILURE
= 1
137 EXIT_STATUS_COMMAND_NOT_FOUND
= 127
140 def patch_sys_exit(testcase
):
141 """ Patch `sys.exit` for the specified test case. """
142 func_patcher
= mock
.patch
.object(
143 sys
, "exit", autospec
=True,
144 side_effect
=FakeSystemExit())
146 testcase
.addCleanup(func_patcher
.stop
)
149 def patch_sys_argv(testcase
):
150 """ Patch the `sys.argv` sequence for the test case. """
151 if not hasattr(testcase
, 'progname'):
152 testcase
.progname
= make_unique_slug(testcase
)
153 if not hasattr(testcase
, 'sys_argv'):
154 testcase
.sys_argv
= [testcase
.progname
]
155 patcher
= mock
.patch
.object(
157 new
=list(testcase
.sys_argv
))
159 testcase
.addCleanup(patcher
.stop
)
162 def patch_system_interfaces(testcase
):
163 """ Patch system interfaces that are disruptive to the test runner. """
164 patch_stdout(testcase
)
165 patch_stderr(testcase
)
166 patch_sys_exit(testcase
)
167 patch_sys_argv(testcase
)
170 def patch_time_time(testcase
, values
=None):
171 """ Patch the `time.time` function for the specified test case.
173 :param testcase: The `TestCase` instance for binding to the patch.
174 :param values: An iterable to provide return values.
179 values
= itertools
.count()
181 def generator_fake_time():
185 func_patcher
= mock
.patch
.object(time
, "time", autospec
=True)
187 testcase
.addCleanup(func_patcher
.stop
)
189 time
.time
.side_effect
= generator_fake_time()
192 def patch_os_environ(testcase
):
193 """ Patch the `os.environ` mapping. """
194 if not hasattr(testcase
, 'os_environ'):
195 testcase
.os_environ
= {}
196 patcher
= mock
.patch
.object(os
, "environ", new
=testcase
.os_environ
)
198 testcase
.addCleanup(patcher
.stop
)
201 def patch_os_getpid(testcase
):
202 """ Patch `os.getpid` for the specified test case. """
203 func_patcher
= mock
.patch
.object(os
, "getpid", autospec
=True)
205 testcase
.addCleanup(func_patcher
.stop
)
208 def patch_os_getuid(testcase
):
209 """ Patch the `os.getuid` function. """
210 if not hasattr(testcase
, 'os_getuid_return_value'):
211 testcase
.os_getuid_return_value
= testcase
.getUniqueInteger()
212 func_patcher
= mock
.patch
.object(
213 os
, "getuid", autospec
=True,
214 return_value
=testcase
.os_getuid_return_value
)
216 testcase
.addCleanup(func_patcher
.stop
)
219 PasswdEntry
= collections
.namedtuple(
221 "pw_name pw_passwd pw_uid pw_gid pw_gecos pw_dir pw_shell")
224 def patch_pwd_getpwuid(testcase
):
225 """ Patch the `pwd.getpwuid` function. """
226 if not hasattr(testcase
, 'pwd_getpwuid_return_value'):
227 testcase
.pwd_getpwuid_return_value
= PasswdEntry(
228 pw_name
=make_unique_slug(testcase
),
229 pw_passwd
=make_unique_slug(testcase
),
230 pw_uid
=testcase
.getUniqueInteger(),
231 pw_gid
=testcase
.getUniqueInteger(),
232 pw_gecos
=testcase
.getUniqueString(),
233 pw_dir
=tempfile
.mktemp(),
234 pw_shell
=tempfile
.mktemp())
235 if not isinstance(testcase
.pwd_getpwuid_return_value
, pwd
.struct_passwd
):
236 pwent
= pwd
.struct_passwd(testcase
.pwd_getpwuid_return_value
)
238 pwent
= testcase
.pwd_getpwuid_return_value
239 func_patcher
= mock
.patch
.object(
240 pwd
, "getpwuid", autospec
=True,
243 testcase
.addCleanup(func_patcher
.stop
)
246 def patch_os_path_exists(testcase
):
247 """ Patch `os.path.exists` behaviour for this test case.
249 When the patched function is called, the registry of
250 `FileDouble` instances for this test case will be used to get
251 the instance for the path specified.
254 orig_os_path_exists
= os
.path
.exists
256 def fake_os_path_exists(path
):
257 registry
= FileDouble
.get_registry_for_testcase(testcase
)
259 file_double
= registry
[path
]
260 result
= file_double
.os_path_exists_scenario
.call_hook()
262 result
= orig_os_path_exists(path
)
265 func_patcher
= mock
.patch
.object(
266 os
.path
, "exists", autospec
=True,
267 side_effect
=fake_os_path_exists
)
269 testcase
.addCleanup(func_patcher
.stop
)
272 def patch_os_access(testcase
):
273 """ Patch `os.access` behaviour for this test case.
275 When the patched function is called, the registry of
276 `FileDouble` instances for this test case will be used to get
277 the instance for the path specified.
280 orig_os_access
= os
.access
282 def fake_os_access(path
, mode
):
283 registry
= FileDouble
.get_registry_for_testcase(testcase
)
285 file_double
= registry
[path
]
286 result
= file_double
.os_access_scenario
.call_hook(mode
)
288 result
= orig_os_access(path
, mode
)
291 func_patcher
= mock
.patch
.object(
292 os
, "access", autospec
=True,
293 side_effect
=fake_os_access
)
295 testcase
.addCleanup(func_patcher
.stop
)
298 StatResult
= collections
.namedtuple(
301 'st_ino', 'st_dev', 'st_nlink',
304 'st_atime', 'st_mtime', 'st_ctime',
308 def patch_os_stat(testcase
):
309 """ Patch `os.stat` behaviour for this test case.
311 When the patched function is called, the registry of
312 `FileDouble` instances for this test case will be used to get
313 the instance for the path specified.
316 orig_os_stat
= os
.stat
318 def fake_os_stat(path
):
319 registry
= FileDouble
.get_registry_for_testcase(testcase
)
321 file_double
= registry
[path
]
322 result
= file_double
.os_stat_scenario
.call_hook()
324 result
= orig_os_stat(path
)
327 func_patcher
= mock
.patch
.object(
328 os
, "stat", autospec
=True,
329 side_effect
=fake_os_stat
)
331 testcase
.addCleanup(func_patcher
.stop
)
334 def patch_os_lstat(testcase
):
335 """ Patch `os.lstat` behaviour for this test case.
337 When the patched function is called, the registry of
338 `FileDouble` instances for this test case will be used to get
339 the instance for the path specified.
342 orig_os_lstat
= os
.lstat
344 def fake_os_lstat(path
):
345 registry
= FileDouble
.get_registry_for_testcase(testcase
)
347 file_double
= registry
[path
]
348 result
= file_double
.os_lstat_scenario
.call_hook()
350 result
= orig_os_lstat(path
)
353 func_patcher
= mock
.patch
.object(
354 os
, "lstat", autospec
=True,
355 side_effect
=fake_os_lstat
)
357 testcase
.addCleanup(func_patcher
.stop
)
360 def patch_os_unlink(testcase
):
361 """ Patch `os.unlink` behaviour for this test case.
363 When the patched function is called, the registry of
364 `FileDouble` instances for this test case will be used to get
365 the instance for the path specified.
368 orig_os_unlink
= os
.unlink
370 def fake_os_unlink(path
):
371 registry
= FileDouble
.get_registry_for_testcase(testcase
)
373 file_double
= registry
[path
]
374 result
= file_double
.os_unlink_scenario
.call_hook()
376 result
= orig_os_unlink(path
)
379 func_patcher
= mock
.patch
.object(
380 os
, "unlink", autospec
=True,
381 side_effect
=fake_os_unlink
)
383 testcase
.addCleanup(func_patcher
.stop
)
386 def patch_os_rmdir(testcase
):
387 """ Patch `os.rmdir` behaviour for this test case.
389 When the patched function is called, the registry of
390 `FileDouble` instances for this test case will be used to get
391 the instance for the path specified.
394 orig_os_rmdir
= os
.rmdir
396 def fake_os_rmdir(path
):
397 registry
= FileDouble
.get_registry_for_testcase(testcase
)
399 file_double
= registry
[path
]
400 result
= file_double
.os_rmdir_scenario
.call_hook()
402 result
= orig_os_rmdir(path
)
405 func_patcher
= mock
.patch
.object(
406 os
, "rmdir", autospec
=True,
407 side_effect
=fake_os_rmdir
)
409 testcase
.addCleanup(func_patcher
.stop
)
412 def patch_shutil_rmtree(testcase
):
413 """ Patch `shutil.rmtree` behaviour for this test case.
415 When the patched function is called, the registry of
416 `FileDouble` instances for this test case will be used to get
417 the instance for the path specified.
420 orig_shutil_rmtree
= os
.rmdir
422 def fake_shutil_rmtree(path
, ignore_errors
=False, onerror
=None):
423 registry
= FileDouble
.get_registry_for_testcase(testcase
)
425 file_double
= registry
[path
]
426 result
= file_double
.shutil_rmtree_scenario
.call_hook()
428 result
= orig_shutil_rmtree(path
)
431 func_patcher
= mock
.patch
.object(
432 shutil
, "rmtree", autospec
=True,
433 side_effect
=fake_shutil_rmtree
)
435 testcase
.addCleanup(func_patcher
.stop
)
438 def patch_tempfile_mkdtemp(testcase
):
439 """ Patch the `tempfile.mkdtemp` function for this test case. """
440 if not hasattr(testcase
, 'tempfile_mkdtemp_file_double'):
441 testcase
.tempfile_mkdtemp_file_double
= FileDouble(tempfile
.mktemp())
443 double
= testcase
.tempfile_mkdtemp_file_double
444 double
.set_os_unlink_scenario('okay')
445 double
.set_os_rmdir_scenario('okay')
446 double
.register_for_testcase(testcase
)
448 func_patcher
= mock
.patch
.object(tempfile
, "mkdtemp", autospec
=True)
450 testcase
.addCleanup(func_patcher
.stop
)
452 tempfile
.mkdtemp
.return_value
= testcase
.tempfile_mkdtemp_file_double
.path
460 # Python 2 uses IOError.
461 def _ensure_ioerror_args(init_args
, init_kwargs
, errno_value
):
462 result_kwargs
= init_kwargs
463 result_errno
= errno_value
464 result_strerror
= os
.strerror(errno_value
)
465 result_filename
= None
466 if len(init_args
) >= 3:
467 result_errno
= init_args
[0]
468 result_filename
= init_args
[2]
469 if 'errno' in init_kwargs
:
470 result_errno
= init_kwargs
['errno']
471 del result_kwargs
['errno']
472 if 'filename' in init_kwargs
:
473 result_filename
= init_kwargs
['filename']
474 del result_kwargs
['filename']
475 if len(init_args
) >= 2:
476 result_strerror
= init_args
[1]
477 if 'strerror' in init_kwargs
:
478 result_strerror
= init_kwargs
['strerror']
479 del result_kwargs
['strerror']
480 result_args
= (result_errno
, result_strerror
, result_filename
)
481 return (result_args
, result_kwargs
)
483 class FileNotFoundError(IOError):
484 def __init__(self
, *args
, **kwargs
):
485 (args
, kwargs
) = _ensure_ioerror_args(
486 args
, kwargs
, errno_value
=errno
.ENOENT
)
487 super(FileNotFoundError
, self
).__init
__(*args
, **kwargs
)
489 class FileExistsError(IOError):
490 def __init__(self
, *args
, **kwargs
):
491 (args
, kwargs
) = _ensure_ioerror_args(
492 args
, kwargs
, errno_value
=errno
.EEXIST
)
493 super(FileExistsError
, self
).__init
__(*args
, **kwargs
)
495 class PermissionError(IOError):
496 def __init__(self
, *args
, **kwargs
):
497 (args
, kwargs
) = _ensure_ioerror_args(
498 args
, kwargs
, errno_value
=errno
.EPERM
)
499 super(PermissionError
, self
).__init
__(*args
, **kwargs
)
502 def make_fake_file_scenarios(path
=None):
503 """ Make a collection of scenarios for testing with fake files.
505 :path: The filesystem path of the fake file. If not specified,
506 a valid random path will be generated.
507 :return: A collection of scenarios for tests involving input files.
509 The collection is a mapping from scenario name to a dictionary of
515 file_path
= tempfile
.mktemp()
519 fake_file_empty
= StringIO()
520 fake_file_minimal
= StringIO("Lorem ipsum.")
521 fake_file_large
= StringIO("\n".join(
523 for __
in range(1000)))
525 default_scenario_params
= {
526 'open_scenario_name': 'okay',
527 'file_double_params': dict(
528 path
=file_path
, fake_file
=fake_file_minimal
),
534 'open_scenario_name': 'nonexist',
537 'open_scenario_name': 'exist_error',
539 'error-read-denied': {
540 'open_scenario_name': 'read_denied',
543 'file_double_params': dict(
544 path
=file_path
, fake_file
=fake_file_empty
),
547 'file_double_params': dict(
548 path
=file_path
, fake_file
=fake_file_empty
),
551 'file_double_params': dict(
552 path
=file_path
, fake_file
=fake_file_minimal
),
555 'file_double_params': dict(
556 path
=file_path
, fake_file
=fake_file_large
),
560 for (name
, scenario
) in scenarios
.items():
561 params
= default_scenario_params
.copy()
562 params
.update(scenario
)
563 scenario
.update(params
)
564 scenario
['file_double'] = FileDouble(**scenario
['file_double_params'])
565 scenario
['file_double'].set_open_scenario(params
['open_scenario_name'])
566 scenario
['fake_file_scenario_name'] = name
571 def get_file_doubles_from_fake_file_scenarios(scenarios
):
572 """ Get the `FileDouble` instances from fake file scenarios.
574 :param scenarios: Collection of fake file scenarios.
575 :return: Collection of `FileDouble` instances.
579 scenario
['file_double']
580 for scenario
in scenarios
581 if scenario
['file_double'] is not None)
586 def setup_file_double_behaviour(testcase
, doubles
=None):
587 """ Set up file double instances and behaviour.
589 :param testcase: The `TestCase` instance to modify.
590 :param doubles: Collection of `FileDouble` instances.
593 If `doubles` is ``None``, a default collection will be made
594 from the result of `make_fake_file_scenarios` result.
598 scenarios
= make_fake_file_scenarios()
599 doubles
= get_file_doubles_from_fake_file_scenarios(
602 for file_double
in doubles
:
603 file_double
.register_for_testcase(testcase
)
605 orig_open
= builtins
.open
607 def fake_open(path
, mode
='rt', buffering
=-1):
608 registry
= FileDouble
.get_registry_for_testcase(testcase
)
610 file_double
= registry
[path
]
611 result
= file_double
.builtins_open_scenario
.call_hook(
614 result
= orig_open(path
, mode
, buffering
)
617 mock_open
= mock
.mock_open()
618 mock_open
.side_effect
= fake_open
620 func_patcher
= mock
.patch
.object(
621 builtins
, "open", new
=mock_open
)
623 testcase
.addCleanup(func_patcher
.stop
)
626 def setup_fake_file_fixtures(testcase
):
627 """ Set up fixtures for fake file doubles.
629 :param testcase: The `TestCase` instance to modify.
633 scenarios
= make_fake_file_scenarios()
634 testcase
.fake_file_scenarios
= scenarios
636 file_doubles
= get_file_doubles_from_fake_file_scenarios(
638 setup_file_double_behaviour(testcase
, file_doubles
)
641 def set_fake_file_scenario(testcase
, name
):
642 """ Set the named fake file scenario for the test case. """
643 scenario
= testcase
.fake_file_scenarios
[name
]
644 testcase
.fake_file_scenario
= scenario
645 testcase
.file_double
= scenario
['file_double']
646 testcase
.file_double
.register_for_testcase(testcase
)
649 class TestDoubleFunctionScenario
:
650 """ Scenario for fake behaviour of a specific function. """
652 def __init__(self
, scenario_name
, double
):
653 self
.scenario_name
= scenario_name
656 self
.call_hook
= getattr(
657 self
, "_hook_{name}".format(name
=self
.scenario_name
))
661 "<{class_name} instance: {id}"
663 " call_hook name: {hook_name!r}"
664 " double: {double!r}"
666 class_name
=self
.__class
__.__name
__, id=id(self
),
667 name
=self
.scenario_name
, double
=self
.double
,
668 hook_name
=self
.call_hook
.__name
__)
671 def __eq__(self
, other
):
673 if not self
.scenario_name
== other
.scenario_name
:
675 if not self
.double
== other
.double
:
677 if not self
.call_hook
.__name
__ == other
.call_hook
.__name
__:
681 def __ne__(self
, other
):
682 result
= not self
.__eq
__(other
)
686 class os_path_exists_scenario(TestDoubleFunctionScenario
):
687 """ Scenario for `os.path.exists` behaviour. """
689 def _hook_exist(self
):
692 def _hook_not_exist(self
):
696 class os_access_scenario(TestDoubleFunctionScenario
):
697 """ Scenario for `os.access` behaviour. """
699 def _hook_okay(self
, mode
):
702 def _hook_not_exist(self
, mode
):
705 def _hook_read_only(self
, mode
):
706 if mode
& (os
.W_OK | os
.X_OK
):
712 def _hook_denied(self
, mode
):
713 if mode
& (os
.R_OK | os
.W_OK | os
.X_OK
):
720 class os_stat_scenario(TestDoubleFunctionScenario
):
721 """ Scenario for `os.stat` behaviour. """
723 def _hook_okay(self
):
724 return self
.double
.stat_result
726 def _hook_notfound_error(self
):
727 raise FileNotFoundError(
729 "No such file or directory: {path!r}".format(
730 path
=self
.double
.path
))
732 def _hook_denied_error(self
):
733 raise PermissionError(
738 class os_lstat_scenario(os_stat_scenario
):
739 """ Scenario for `os.lstat` behaviour. """
742 class os_unlink_scenario(TestDoubleFunctionScenario
):
743 """ Scenario for `os.unlink` behaviour. """
745 def _hook_okay(self
):
748 def _hook_nonexist(self
):
749 error
= FileNotFoundError(
751 "No such file or directory: {path!r}".format(
752 path
=self
.double
.path
))
755 def _hook_denied(self
):
756 error
= PermissionError(
762 class os_rmdir_scenario(TestDoubleFunctionScenario
):
763 """ Scenario for `os.rmdir` behaviour. """
765 def _hook_okay(self
):
768 def _hook_nonexist(self
):
769 error
= FileNotFoundError(
771 "No such file or directory: {path!r}".format(
772 path
=self
.double
.path
))
775 def _hook_denied(self
):
776 error
= PermissionError(
782 class shutil_rmtree_scenario(TestDoubleFunctionScenario
):
783 """ Scenario for `shutil.rmtree` behaviour. """
785 def _hook_okay(self
):
788 def _hook_nonexist(self
):
789 error
= FileNotFoundError(
791 "No such file or directory: {path!r}".format(
792 path
=self
.double
.path
))
795 def _hook_denied(self
):
796 error
= PermissionError(
802 class builtins_open_scenario(TestDoubleFunctionScenario
):
803 """ Scenario for `builtins.open` behaviour. """
805 def _hook_okay(self
, mode
, buffering
):
806 result
= self
.double
.fake_file
809 def _hook_nonexist(self
, mode
, buffering
):
810 if mode
.startswith('r'):
811 error
= FileNotFoundError(
813 "No such file or directory: {path!r}".format(
814 path
=self
.double
.path
))
816 result
= self
.double
.fake_file
819 def _hook_exist_error(self
, mode
, buffering
):
820 if mode
.startswith('w') or mode
.startswith('a'):
821 error
= FileExistsError(
823 "File already exists: {path!r}".format(
824 path
=self
.double
.path
))
826 result
= self
.double
.fake_file
829 def _hook_read_denied(self
, mode
, buffering
):
830 if mode
.startswith('r'):
831 error
= PermissionError(
833 "Read denied on {path!r}".format(
834 path
=self
.double
.path
))
836 result
= self
.double
.fake_file
839 def _hook_write_denied(self
, mode
, buffering
):
840 if mode
.startswith('w') or mode
.startswith('a'):
841 error
= PermissionError(
843 "Write denied on {path!r}".format(
844 path
=self
.double
.path
))
846 result
= self
.double
.fake_file
850 class TestDoubleWithRegistry
:
851 """ Abstract base class for a test double with a test case registry. """
853 registry_class
= NotImplemented
854 registries
= NotImplemented
856 function_scenario_params_by_class
= NotImplemented
858 def __new__(cls
, *args
, **kwargs
):
859 superclass
= super(TestDoubleWithRegistry
, cls
)
860 if superclass
.__new
__ is object.__new
__:
861 # The ‘object’ implementation complains about extra arguments.
862 instance
= superclass
.__new
__(cls
)
864 instance
= superclass
.__new
__(cls
, *args
, **kwargs
)
865 instance
.make_set_scenario_methods()
869 def __init__(self
, *args
, **kwargs
):
870 super(TestDoubleWithRegistry
, self
).__init
__(*args
, **kwargs
)
871 self
._set
_method
_per
_scenario
()
873 def _make_set_scenario_method(self
, scenario_class
, params
):
874 def method(self
, name
):
875 scenario
= scenario_class(name
, double
=self
)
876 setattr(self
, scenario_class
.__name
__, scenario
)
878 """ Set the scenario for `{name}` behaviour. """
879 ).format(name
=scenario_class
.__name
__)
880 method
.__name
__ = str(params
['set_scenario_method_name'])
883 def make_set_scenario_methods(self
):
884 """ Make `set_<scenario_class_name>` methods on this class. """
885 for (function_scenario_class
, function_scenario_params
) in (
886 self
.function_scenario_params_by_class
.items()):
887 method
= self
._make
_set
_scenario
_method
(
888 function_scenario_class
, function_scenario_params
)
889 setattr(self
.__class
__, method
.__name
__, method
)
890 function_scenario_params
['set_scenario_method'] = method
892 def _set_method_per_scenario(self
):
893 """ Set the method to be called for each scenario. """
894 for function_scenario_params
in (
895 self
.function_scenario_params_by_class
.values()):
896 function_scenario_params
['set_scenario_method'](
897 self
, function_scenario_params
['default_scenario_name'])
900 def get_registry_for_testcase(cls
, testcase
):
901 """ Get the FileDouble registry for the specified test case. """
902 # Key in a dict must be hashable.
903 key
= (testcase
.__class
__, id(testcase
))
904 registry
= cls
.registries
.setdefault(key
, cls
.registry_class())
907 def get_registry_key(self
):
908 """ Get the registry key for this double. """
909 raise NotImplementedError
911 def register_for_testcase(self
, testcase
):
912 """ Add this instance to registry for the specified testcase. """
913 registry
= self
.get_registry_for_testcase(testcase
)
914 key
= self
.get_registry_key()
916 unregister_func
= functools
.partial(
917 self
.unregister_for_testcase
, testcase
)
918 testcase
.addCleanup(unregister_func
)
920 def unregister_for_testcase(self
, testcase
):
921 """ Remove this instance from registry for the specified testcase. """
922 registry
= self
.get_registry_for_testcase(testcase
)
923 key
= self
.get_registry_key()
928 def copy_fake_file(fake_file
):
929 """ Make a copy of the StringIO instance. """
930 fake_file_type
= StringIO
932 if fake_file
is not None:
933 fake_file_type
= type(fake_file
)
934 content
= fake_file
.getvalue()
935 assert issubclass(fake_file_type
, object)
936 result
= fake_file_type(content
)
937 if hasattr(fake_file
, 'encoding'):
938 if not hasattr(result
, 'encoding'):
939 result
.encoding
= fake_file
.encoding
943 class FileDouble(TestDoubleWithRegistry
):
944 """ A testing double for a file. """
946 registry_class
= dict
949 function_scenario_params_by_class
= {
950 os_path_exists_scenario
: {
951 'default_scenario_name': 'not_exist',
952 'set_scenario_method_name': 'set_os_path_exists_scenario',
954 os_access_scenario
: {
955 'default_scenario_name': 'okay',
956 'set_scenario_method_name': 'set_os_access_scenario',
959 'default_scenario_name': 'okay',
960 'set_scenario_method_name': 'set_os_stat_scenario',
963 'default_scenario_name': 'okay',
964 'set_scenario_method_name': 'set_os_lstat_scenario',
966 builtins_open_scenario
: {
967 'default_scenario_name': 'okay',
968 'set_scenario_method_name': 'set_open_scenario',
970 os_unlink_scenario
: {
971 'default_scenario_name': 'okay',
972 'set_scenario_method_name': 'set_os_unlink_scenario',
975 'default_scenario_name': 'okay',
976 'set_scenario_method_name': 'set_os_rmdir_scenario',
978 shutil_rmtree_scenario
: {
979 'default_scenario_name': 'okay',
980 'set_scenario_method_name': 'set_shutil_rmtree_scenario',
984 def __init__(self
, path
=None, fake_file
=None, *args
, **kwargs
):
986 self
.fake_file
= copy_fake_file(fake_file
)
987 self
.fake_file
.name
= path
989 self
._set
_stat
_result
()
991 super(FileDouble
, self
).__init
__(*args
, **kwargs
)
993 def _set_stat_result(self
):
994 """ Set the `os.stat` result for this file. """
995 size
= len(self
.fake_file
.getvalue())
996 self
.stat_result
= StatResult(
998 st_ino
=None, st_dev
=None, st_nlink
=None,
1001 st_atime
=None, st_mtime
=None, st_ctime
=None,
1005 text
= "FileDouble(path={path!r}, fake_file={fake_file!r})".format(
1006 path
=self
.path
, fake_file
=self
.fake_file
)
1009 def get_registry_key(self
):
1010 """ Get the registry key for this double. """
1015 class os_popen_scenario(TestDoubleFunctionScenario
):
1016 """ Scenario for `os.popen` behaviour. """
1018 stream_name_by_mode
= {
1023 def _hook_success(self
, argv
, mode
, buffering
):
1024 stream_name
= self
.stream_name_by_mode
[mode
]
1025 stream_double
= getattr(
1026 self
.double
, stream_name
+ '_double')
1027 result
= stream_double
.fake_file
1030 def _hook_failure(self
, argv
, mode
, buffering
):
1034 def _hook_not_found(self
, argv
, mode
, buffering
):
1039 class os_waitpid_scenario(TestDoubleFunctionScenario
):
1040 """ Scenario for `os.waitpid` behaviour. """
1042 def _hook_success(self
, pid
, options
):
1043 result
= (pid
, EXIT_STATUS_SUCCESS
)
1046 def _hook_failure(self
, pid
, options
):
1047 result
= (pid
, EXIT_STATUS_FAILURE
)
1050 def _hook_not_found(self
, pid
, options
):
1051 error
= OSError(errno
.ECHILD
)
1055 class os_system_scenario(TestDoubleFunctionScenario
):
1056 """ Scenario for `os.system` behaviour. """
1058 def _hook_success(self
, command
):
1059 result
= EXIT_STATUS_SUCCESS
1062 def _hook_failure(self
, command
):
1063 result
= EXIT_STATUS_FAILURE
1066 def _hook_not_found(self
, command
):
1067 result
= EXIT_STATUS_COMMAND_NOT_FOUND
1071 class os_spawnv_scenario(TestDoubleFunctionScenario
):
1072 """ Scenario for `os.spawnv` behaviour. """
1074 def _hook_success(self
, mode
, file, args
):
1075 result
= EXIT_STATUS_SUCCESS
1078 def _hook_failure(self
, mode
, file, args
):
1079 result
= EXIT_STATUS_FAILURE
1082 def _hook_not_found(self
, mode
, file, args
):
1083 result
= EXIT_STATUS_COMMAND_NOT_FOUND
1092 """ A testing double for `subprocess.Popen`. """
1094 def __init__(self
, args
, *posargs
, **kwargs
):
1099 self
.returncode
= None
1101 if kwargs
.get('shell', False):
1102 self
.argv
= shlex
.split(args
)
1104 # The paramter is already a sequence of command-line arguments.
1107 def set_streams(self
, subprocess_double
, popen_kwargs
):
1108 """ Set the streams on the `PopenDouble`.
1110 :param subprocess_double: The `SubprocessDouble` from
1111 which to get existing stream doubles.
1112 :param popen_kwargs: The keyword arguments to the
1113 `subprocess.Popen` call.
1117 for stream_name
in (
1118 name
for name
in ['stdin', 'stdout', 'stderr']
1119 if name
in popen_kwargs
):
1120 stream_spec
= popen_kwargs
[stream_name
]
1121 if stream_spec
is subprocess
.PIPE
:
1122 stream_double
= getattr(
1124 "{name}_double".format(name
=stream_name
))
1125 stream_file
= stream_double
.fake_file
1126 elif stream_spec
is subprocess
.STDOUT
:
1127 stream_file
= subprocess_double
.stdout_double
.fake_file
1129 stream_file
= stream_spec
1130 setattr(self
, stream_name
, stream_file
)
1133 """ Wait for subprocess to terminate. """
1134 return self
.returncode
1137 class subprocess_popen_scenario(TestDoubleFunctionScenario
):
1138 """ Scenario for `subprocess.Popen` behaviour. """
1140 def _hook_success(self
, testcase
, args
, *posargs
, **kwargs
):
1141 double
= self
.double
.popen_double
1142 double
.set_streams(self
.double
, kwargs
)
1146 def patch_subprocess_popen(testcase
):
1147 """ Patch `subprocess.Popen` constructor for this test case.
1149 :param testcase: The `TestCase` instance to modify.
1152 When the patched function is called, the registry of
1153 `SubprocessDouble` instances for this test case will be used
1154 to get the instance for the program path specified.
1157 orig_subprocess_popen
= subprocess
.Popen
1159 def fake_subprocess_popen(args
, *posargs
, **kwargs
):
1160 if kwargs
.get('shell', False):
1161 argv
= shlex
.split(args
)
1164 registry
= SubprocessDouble
.get_registry_for_testcase(testcase
)
1165 if argv
in registry
:
1166 subprocess_double
= registry
[argv
]
1167 result
= subprocess_double
.subprocess_popen_scenario
.call_hook(
1168 testcase
, args
, *posargs
, **kwargs
)
1170 result
= orig_subprocess_popen(args
, *posargs
, **kwargs
)
1173 func_patcher
= mock
.patch
.object(
1174 subprocess
, "Popen", autospec
=True,
1175 side_effect
=fake_subprocess_popen
)
1176 func_patcher
.start()
1177 testcase
.addCleanup(func_patcher
.stop
)
1180 class subprocess_call_scenario(TestDoubleFunctionScenario
):
1181 """ Scenario for `subprocess.call` behaviour. """
1183 def _hook_success(self
, command
):
1184 result
= EXIT_STATUS_SUCCESS
1187 def _hook_failure(self
, command
):
1188 result
= EXIT_STATUS_FAILURE
1191 def _hook_not_found(self
, command
):
1192 result
= EXIT_STATUS_COMMAND_NOT_FOUND
1196 def patch_subprocess_call(testcase
):
1197 """ Patch `subprocess.call` function for this test case.
1199 :param testcase: The `TestCase` instance to modify.
1202 When the patched function is called, the registry of
1203 `SubprocessDouble` instances for this test case will be used
1204 to get the instance for the program path specified.
1207 orig_subprocess_call
= subprocess
.call
1209 def fake_subprocess_call(command
, *posargs
, **kwargs
):
1210 if kwargs
.get('shell', False):
1211 command_argv
= shlex
.split(command
)
1213 command_argv
= command
1214 registry
= SubprocessDouble
.get_registry_for_testcase(testcase
)
1215 if command_argv
in registry
:
1216 subprocess_double
= registry
[command_argv
]
1217 result
= subprocess_double
.subprocess_call_scenario
.call_hook(
1220 result
= orig_subprocess_call(command
, *posargs
, **kwargs
)
1223 func_patcher
= mock
.patch
.object(
1224 subprocess
, "call", autospec
=True,
1225 side_effect
=fake_subprocess_call
)
1226 func_patcher
.start()
1227 testcase
.addCleanup(func_patcher
.stop
)
1230 class subprocess_check_call_scenario(TestDoubleFunctionScenario
):
1231 """ Scenario for `subprocess.check_call` behaviour. """
1233 def _hook_success(self
, command
):
1236 def _hook_failure(self
, command
):
1237 result
= EXIT_STATUS_FAILURE
1238 error
= subprocess
.CalledProcessError(result
, command
)
1241 def _hook_not_found(self
, command
):
1242 result
= EXIT_STATUS_COMMAND_NOT_FOUND
1243 error
= subprocess
.CalledProcessError(result
, command
)
1247 def patch_subprocess_check_call(testcase
):
1248 """ Patch `subprocess.check_call` function for this test case.
1250 :param testcase: The `TestCase` instance to modify.
1253 When the patched function is called, the registry of
1254 `SubprocessDouble` instances for this test case will be used
1255 to get the instance for the program path specified.
1258 orig_subprocess_check_call
= subprocess
.check_call
1260 def fake_subprocess_check_call(command
, *posargs
, **kwargs
):
1261 if kwargs
.get('shell', False):
1262 command_argv
= shlex
.split(command
)
1264 command_argv
= command
1265 registry
= SubprocessDouble
.get_registry_for_testcase(testcase
)
1266 if command_argv
in registry
:
1267 subprocess_double
= registry
[command_argv
]
1268 scenario
= subprocess_double
.subprocess_check_call_scenario
1269 result
= scenario
.call_hook(command
)
1271 result
= orig_subprocess_check_call(command
, *posargs
, **kwargs
)
1274 func_patcher
= mock
.patch
.object(
1275 subprocess
, "check_call", autospec
=True,
1276 side_effect
=fake_subprocess_check_call
)
1277 func_patcher
.start()
1278 testcase
.addCleanup(func_patcher
.stop
)
1281 class SubprocessDoubleRegistry(collections_abc
.MutableMapping
):
1282 """ Registry of `SubprocessDouble` instances by `argv`. """
1284 def __init__(self
, *args
, **kwargs
):
1287 if isinstance(args
[0], collections_abc
.Mapping
):
1288 items
= args
[0].items()
1289 if isinstance(args
[0], collections_abc
.Iterable
):
1291 self
._mapping
= dict(items
)
1294 text
= "<{class_name} object: {mapping}>".format(
1295 class_name
=self
.__class
__.__name
__, mapping
=self
._mapping
)
1298 def _match_argv(self
, argv
):
1299 """ Match the specified `argv` with our registered keys. """
1301 if not isinstance(argv
, collections_abc
.Sequence
):
1303 candidates
= iter(self
._mapping
)
1304 while match
is None:
1306 candidate
= next(candidates
)
1307 except StopIteration:
1310 if candidate
== argv
:
1313 word_iter
= enumerate(candidate
)
1314 while found
is None:
1316 (word_index
, candidate_word
) = next(word_iter
)
1317 except StopIteration:
1319 if candidate_word
is ARG_MORE
:
1320 # Candiate matches any remaining words. We have a match.
1322 elif word_index
> len(argv
):
1323 # Candidate is too long for the specified argv.
1325 elif candidate_word
is ARG_ANY
:
1326 # Candidate matches any word at this position.
1328 elif candidate_word
== argv
[word_index
]:
1329 # Candidate matches the word at this position.
1332 # This candidate does not match.
1335 # Reached the end of the candidate without a mismatch.
1341 def __getitem__(self
, key
):
1342 match
= self
._match
_argv
(key
)
1345 result
= self
._mapping
[match
]
1348 def __setitem__(self
, key
, value
):
1351 self
._mapping
[key
] = value
1353 def __delitem__(self
, key
):
1354 match
= self
._match
_argv
(key
)
1355 if match
is not None:
1356 del self
._mapping
[match
]
1359 return self
._mapping
.__iter
__()
1362 return self
._mapping
.__len
__()
1365 class SubprocessDouble(TestDoubleWithRegistry
):
1366 """ A testing double for a subprocess. """
1368 registry_class
= SubprocessDoubleRegistry
1371 double_by_pid
= weakref
.WeakValueDictionary()
1373 function_scenario_params_by_class
= {
1374 subprocess_popen_scenario
: {
1375 'default_scenario_name': 'success',
1376 'set_scenario_method_name': 'set_subprocess_popen_scenario',
1378 subprocess_call_scenario
: {
1379 'default_scenario_name': 'success',
1380 'set_scenario_method_name': 'set_subprocess_call_scenario',
1382 subprocess_check_call_scenario
: {
1383 'default_scenario_name': 'success',
1384 'set_scenario_method_name':
1385 'set_subprocess_check_call_scenario',
1387 os_popen_scenario
: {
1388 'default_scenario_name': 'success',
1389 'set_scenario_method_name': 'set_os_popen_scenario',
1391 os_waitpid_scenario
: {
1392 'default_scenario_name': 'success',
1393 'set_scenario_method_name': 'set_os_waitpid_scenario',
1395 os_system_scenario
: {
1396 'default_scenario_name': 'success',
1397 'set_scenario_method_name': 'set_os_system_scenario',
1399 os_spawnv_scenario
: {
1400 'default_scenario_name': 'success',
1401 'set_scenario_method_name': 'set_os_spawnv_scenario',
1405 def __init__(self
, path
=None, argv
=None, *args
, **kwargs
):
1407 path
= tempfile
.mktemp()
1411 command_name
= os
.path
.basename(path
)
1412 argv
= [command_name
]
1415 self
.pid
= self
._make
_pid
()
1416 self
._register
_by
_pid
()
1418 self
.set_popen_double()
1420 stream_class
= SubprocessDouble
.stream_class
1421 for stream_name
in ['stdin', 'stdout', 'stderr']:
1422 fake_file
= stream_class()
1423 file_double
= FileDouble(fake_file
=fake_file
)
1424 stream_double_name
= '{name}_double'.format(name
=stream_name
)
1425 setattr(self
, stream_double_name
, file_double
)
1427 super(SubprocessDouble
, self
).__init
__(*args
, **kwargs
)
1429 def set_popen_double(self
):
1430 """ Set the `PopenDouble` for this instance. """
1431 double
= PopenDouble(self
.argv
)
1432 double
.pid
= self
.pid
1434 self
.popen_double
= double
1438 "<SubprocessDouble instance: {id}"
1441 " stdin_double: {stdin_double!r}"
1442 " stdout_double: {stdout_double!r}"
1443 " stderr_double: {stderr_double!r}"
1446 path
=self
.path
, argv
=self
.argv
,
1447 stdin_double
=self
.stdin_double
,
1448 stdout_double
=self
.stdout_double
,
1449 stderr_double
=self
.stderr_double
)
1454 """ Make a unique PID for a subprocess. """
1455 for pid
in itertools
.count(1):
1458 def _register_by_pid(self
):
1459 """ Register this subprocess by its PID. """
1460 self
.__class
__.double_by_pid
[self
.pid
] = self
1462 def get_registry_key(self
):
1463 """ Get the registry key for this double. """
1464 result
= tuple(self
.argv
)
1467 stream_class
= io
.BytesIO
1468 stream_encoding
= "utf-8"
1470 def set_stdin_content(self
, text
, bytes_encoding
=stream_encoding
):
1471 """ Set the content of the `stdin` stream for this double. """
1472 content
= text
.encode(bytes_encoding
)
1473 fake_file
= self
.stream_class(content
)
1474 self
.stdin_double
.fake_file
= fake_file
1476 def set_stdout_content(self
, text
, bytes_encoding
=stream_encoding
):
1477 """ Set the content of the `stdout` stream for this double. """
1478 content
= text
.encode(bytes_encoding
)
1479 fake_file
= self
.stream_class(content
)
1480 self
.stdout_double
.fake_file
= fake_file
1482 def set_stderr_content(self
, text
, bytes_encoding
=stream_encoding
):
1483 """ Set the content of the `stderr` stream for this double. """
1484 content
= text
.encode(bytes_encoding
)
1485 fake_file
= self
.stream_class(content
)
1486 self
.stderr_double
.fake_file
= fake_file
1489 def make_fake_subprocess_scenarios(path
=None):
1490 """ Make a collection of scenarios for testing with fake files.
1492 :path: The filesystem path of the fake program. If not specified,
1493 a valid random path will be generated.
1494 :return: A collection of scenarios for tests involving subprocesses.
1496 The collection is a mapping from scenario name to a dictionary of
1497 scenario attributes.
1501 file_path
= tempfile
.mktemp()
1505 default_scenario_params
= {
1506 'return_value': EXIT_STATUS_SUCCESS
,
1507 'program_path': file_path
,
1508 'argv_after_command_name': [],
1514 'return_value': EXIT_STATUS_COMMAND_NOT_FOUND
,
1518 for (name
, scenario
) in scenarios
.items():
1519 params
= default_scenario_params
.copy()
1520 params
.update(scenario
)
1521 scenario
.update(params
)
1522 program_path
= params
['program_path']
1523 program_name
= os
.path
.basename(params
['program_path'])
1524 argv
= [program_name
]
1525 argv
.extend(params
['argv_after_command_name'])
1526 subprocess_double_params
= dict(
1530 subprocess_double
= SubprocessDouble(**subprocess_double_params
)
1531 scenario
['subprocess_double'] = subprocess_double
1532 scenario
['fake_file_scenario_name'] = name
1537 def get_subprocess_doubles_from_fake_subprocess_scenarios(scenarios
):
1538 """ Get the `SubprocessDouble` instances from fake subprocess scenarios.
1540 :param scenarios: Collection of fake subprocess scenarios.
1541 :return: Collection of `SubprocessDouble` instances.
1545 scenario
['subprocess_double']
1546 for scenario
in scenarios
1547 if scenario
['subprocess_double'] is not None)
1552 def setup_subprocess_double_behaviour(testcase
, doubles
=None):
1553 """ Set up subprocess double instances and behaviour.
1555 :param testcase: The `TestCase` instance to modify.
1556 :param doubles: Collection of `SubprocessDouble` instances.
1559 If `doubles` is ``None``, a default collection will be made
1560 from the return value of `make_fake_subprocess_scenarios`.
1564 scenarios
= make_fake_subprocess_scenarios()
1565 doubles
= get_subprocess_doubles_from_fake_subprocess_scenarios(
1568 for double
in doubles
:
1569 double
.register_for_testcase(testcase
)
1572 def setup_fake_subprocess_fixtures(testcase
):
1573 """ Set up fixtures for fake subprocess doubles.
1575 :param testcase: The `TestCase` instance to modify.
1579 scenarios
= make_fake_subprocess_scenarios()
1580 testcase
.fake_subprocess_scenarios
= scenarios
1582 doubles
= get_subprocess_doubles_from_fake_subprocess_scenarios(
1584 setup_subprocess_double_behaviour(testcase
, doubles
)
1587 def patch_os_popen(testcase
):
1588 """ Patch `os.popen` behaviour for this test case.
1590 :param testcase: The `TestCase` instance to modify.
1593 When the patched function is called, the registry of
1594 `SubprocessDouble` instances for this test case will be used
1595 to get the instance for the program path specified.
1598 orig_os_popen
= os
.popen
1600 def fake_os_popen(cmd
, mode
='r', buffering
=-1):
1601 registry
= SubprocessDouble
.get_registry_for_testcase(testcase
)
1602 if isinstance(cmd
, basestring
):
1603 command_argv
= shlex
.split(cmd
)
1606 if command_argv
in registry
:
1607 subprocess_double
= registry
[command_argv
]
1608 result
= subprocess_double
.os_popen_scenario
.call_hook(
1609 command_argv
, mode
, buffering
)
1611 result
= orig_os_popen(cmd
, mode
, buffering
)
1614 func_patcher
= mock
.patch
.object(
1615 os
, "popen", autospec
=True,
1616 side_effect
=fake_os_popen
)
1617 func_patcher
.start()
1618 testcase
.addCleanup(func_patcher
.stop
)
1621 def patch_os_waitpid(testcase
):
1622 """ Patch `os.waitpid` behaviour for this test case.
1624 :param testcase: The `TestCase` instance to modify.
1627 When the patched function is called, the registry of
1628 `SubprocessDouble` instances for this test case will be used
1629 to get the instance for the program path specified.
1632 orig_os_waitpid
= os
.waitpid
1634 def fake_os_waitpid(pid
, options
):
1635 registry
= SubprocessDouble
.double_by_pid
1637 subprocess_double
= registry
[pid
]
1638 result
= subprocess_double
.os_waitpid_scenario
.call_hook(
1641 result
= orig_os_waitpid(pid
, options
)
1644 func_patcher
= mock
.patch
.object(
1645 os
, "waitpid", autospec
=True,
1646 side_effect
=fake_os_waitpid
)
1647 func_patcher
.start()
1648 testcase
.addCleanup(func_patcher
.stop
)
1651 def patch_os_system(testcase
):
1652 """ Patch `os.system` behaviour for this test case.
1654 :param testcase: The `TestCase` instance to modify.
1657 When the patched function is called, the registry of
1658 `SubprocessDouble` instances for this test case will be used
1659 to get the instance for the program path specified.
1662 orig_os_system
= os
.system
1664 def fake_os_system(command
):
1665 registry
= SubprocessDouble
.get_registry_for_testcase(testcase
)
1666 command_argv
= shlex
.split(command
)
1667 if command_argv
in registry
:
1668 subprocess_double
= registry
[command_argv
]
1669 result
= subprocess_double
.os_system_scenario
.call_hook(
1672 result
= orig_os_system(command
)
1675 func_patcher
= mock
.patch
.object(
1676 os
, "system", autospec
=True,
1677 side_effect
=fake_os_system
)
1678 func_patcher
.start()
1679 testcase
.addCleanup(func_patcher
.stop
)
1682 def patch_os_spawnv(testcase
):
1683 """ Patch `os.spawnv` behaviour for this test case.
1685 :param testcase: The `TestCase` instance to modify.
1688 When the patched function is called, the registry of
1689 `SubprocessDouble` instances for this test case will be used
1690 to get the instance for the program path specified.
1693 orig_os_spawnv
= os
.spawnv
1695 def fake_os_spawnv(mode
, file, args
):
1696 registry
= SubprocessDouble
.get_registry_for_testcase(testcase
)
1697 registry_key
= tuple(args
)
1698 if registry_key
in registry
:
1699 subprocess_double
= registry
[registry_key
]
1700 result
= subprocess_double
.os_spawnv_scenario
.call_hook(
1703 result
= orig_os_spawnv(mode
, file, args
)
1706 func_patcher
= mock
.patch
.object(
1707 os
, "spawnv", autospec
=True,
1708 side_effect
=fake_os_spawnv
)
1709 func_patcher
.start()
1710 testcase
.addCleanup(func_patcher
.stop
)
1717 # vim: fileencoding=utf-8 filetype=python :