1 # -*- coding: utf-8; -*-
4 # Part of ‘dput’, a Debian package upload toolkit.
6 # Copyright © 2015 Ben Finney <ben+python@benfinney.id.au>
8 # This is free software: you may copy, modify, and/or distribute this work
9 # under the terms of the GNU General Public License as published by the
10 # Free Software Foundation; version 3 of that license or any later version.
11 # No warranty expressed or implied. See the file ‘LICENSE.GPL-3’ for details.
13 """ Helper functionality for Dput test suite. """
15 from __future__
import (absolute_import
, unicode_literals
)
19 if sys
.version_info
>= (3, 3):
22 import unittest
.mock
as mock
23 from io
import StringIO
as StringIO
25 import collections
.abc
as collections_abc
26 elif sys
.version_info
>= (3, 0):
27 raise RuntimeError("Python 3 earlier than 3.3 is not supported.")
28 elif sys
.version_info
>= (2, 7):
29 # Python 2 standard library.
30 import __builtin__
as builtins
31 # Third-party backport of Python 3 unittest improvements.
32 import unittest2
as unittest
33 # Third-party mock library.
35 # Python 2 standard library.
36 from StringIO
import StringIO
as BaseStringIO
37 import ConfigParser
as configparser
38 import collections
as collections_abc
40 raise RuntimeError("Python earlier than 2.7 is not supported.")
57 __package__
= str("test")
58 __import__(__package__
)
63 def make_unique_slug(testcase
):
64 """ Make a unique slug for the test case. """
65 text
= base64
.b64encode(
66 testcase
.getUniqueString().encode('utf-8')
75 # We don't yet have the StringIO we want. Create it.
77 class StringIO(BaseStringIO
, object):
78 """ StringIO with a context manager. """
83 def __exit__(self
, *args
):
88 def patch_stdout(testcase
):
89 """ Patch `sys.stdout` for the specified test case. """
90 patcher
= mock
.patch
.object(
91 sys
, 'stdout', wraps
=StringIO())
93 testcase
.addCleanup(patcher
.stop
)
96 def patch_stderr(testcase
):
97 """ Patch `sys.stderr` for the specified test case. """
98 patcher
= mock
.patch
.object(
99 sys
, 'stderr', wraps
=StringIO())
101 testcase
.addCleanup(patcher
.stop
)
104 def patch_signal_signal(testcase
):
105 """ Patch `signal.signal` for the specified test case. """
106 func_patcher
= mock
.patch
.object(signal
, 'signal')
108 testcase
.addCleanup(func_patcher
.stop
)
111 class FakeSystemExit(Exception):
112 """ Fake double for `SystemExit` exception. """
115 EXIT_STATUS_SUCCESS
= 0
116 EXIT_STATUS_FAILURE
= 1
117 EXIT_STATUS_COMMAND_NOT_FOUND
= 127
120 def patch_sys_exit(testcase
):
121 """ Patch `sys.exit` for the specified test case. """
122 func_patcher
= mock
.patch
.object(
124 side_effect
=FakeSystemExit())
126 testcase
.addCleanup(func_patcher
.stop
)
129 def patch_system_interfaces(testcase
):
130 """ Patch system interfaces that are disruptive to the test runner. """
131 patch_stdout(testcase
)
132 patch_stderr(testcase
)
133 patch_sys_exit(testcase
)
136 def patch_time_time(testcase
, values
=None):
137 """ Patch the `time.time` function for the specified test case.
139 :param testcase: The `TestCase` instance for binding to the patch.
140 :param values: An iterable to provide return values.
145 values
= itertools
.count()
147 def generator_fake_time():
151 func_patcher
= mock
.patch
.object(time
, "time")
153 testcase
.addCleanup(func_patcher
.stop
)
155 time
.time
.side_effect
= generator_fake_time()
158 def patch_os_environ(testcase
):
159 """ Patch the `os.environ` mapping. """
160 if not hasattr(testcase
, 'os_environ'):
161 testcase
.os_environ
= {}
162 patcher
= mock
.patch
.object(os
, "environ", new
=testcase
.os_environ
)
164 testcase
.addCleanup(patcher
.stop
)
167 def patch_os_getpid(testcase
):
168 """ Patch `os.getpid` for the specified test case. """
169 func_patcher
= mock
.patch
.object(os
, 'getpid')
171 testcase
.addCleanup(func_patcher
.stop
)
174 def patch_os_getuid(testcase
):
175 """ Patch the `os.getuid` function. """
176 if not hasattr(testcase
, 'os_getuid_return_value'):
177 testcase
.os_getuid_return_value
= testcase
.getUniqueInteger()
178 func_patcher
= mock
.patch
.object(
179 os
, "getuid", return_value
=testcase
.os_getuid_return_value
)
181 testcase
.addCleanup(func_patcher
.stop
)
184 PasswdEntry
= collections
.namedtuple(
186 "pw_name pw_passwd pw_uid pw_gid pw_gecos pw_dir pw_shell")
189 def patch_pwd_getpwuid(testcase
):
190 """ Patch the `pwd.getpwuid` function. """
191 if not hasattr(testcase
, 'pwd_getpwuid_return_value'):
192 testcase
.pwd_getpwuid_return_value
= PasswdEntry(
193 pw_name
=make_unique_slug(testcase
),
194 pw_passwd
=make_unique_slug(testcase
),
195 pw_uid
=testcase
.getUniqueInteger(),
196 pw_gid
=testcase
.getUniqueInteger(),
197 pw_gecos
=testcase
.getUniqueString(),
198 pw_dir
=tempfile
.mktemp(),
199 pw_shell
=tempfile
.mktemp())
200 if not isinstance(testcase
.pwd_getpwuid_return_value
, pwd
.struct_passwd
):
201 pwent
= pwd
.struct_passwd(testcase
.pwd_getpwuid_return_value
)
203 pwent
= testcase
.pwd_getpwuid_return_value
204 func_patcher
= mock
.patch
.object(pwd
, "getpwuid", return_value
=pwent
)
206 testcase
.addCleanup(func_patcher
.stop
)
209 def patch_os_path_exists(testcase
):
210 """ Patch `os.path.exists` behaviour for this test case.
212 When the patched function is called, the registry of
213 `FileDouble` instances for this test case will be used to get
214 the instance for the path specified.
217 orig_os_path_exists
= os
.path
.exists
219 def fake_os_path_exists(path
):
220 registry
= FileDouble
.get_registry_for_testcase(testcase
)
222 file_double
= registry
[path
]
223 result
= file_double
.os_path_exists_scenario
.call_hook()
225 result
= orig_os_path_exists(path
)
228 func_patcher
= mock
.patch
.object(
229 os
.path
, 'exists', side_effect
=fake_os_path_exists
)
231 testcase
.addCleanup(func_patcher
.stop
)
234 def patch_os_access(testcase
):
235 """ Patch `os.access` behaviour for this test case.
237 When the patched function is called, the registry of
238 `FileDouble` instances for this test case will be used to get
239 the instance for the path specified.
242 orig_os_access
= os
.access
244 def fake_os_access(path
, mode
):
245 registry
= FileDouble
.get_registry_for_testcase(testcase
)
247 file_double
= registry
[path
]
248 result
= file_double
.os_access_scenario
.call_hook(mode
)
250 result
= orig_os_access(path
, mode
)
253 func_patcher
= mock
.patch
.object(
254 os
, 'access', side_effect
=fake_os_access
)
256 testcase
.addCleanup(func_patcher
.stop
)
259 StatResult
= collections
.namedtuple(
262 'st_ino', 'st_dev', 'st_nlink',
265 'st_atime', 'st_mtime', 'st_ctime',
269 def patch_os_stat(testcase
):
270 """ Patch `os.stat` behaviour for this test case.
272 When the patched function is called, the registry of
273 `FileDouble` instances for this test case will be used to get
274 the instance for the path specified.
277 orig_os_stat
= os
.stat
279 def fake_os_stat(path
):
280 registry
= FileDouble
.get_registry_for_testcase(testcase
)
282 file_double
= registry
[path
]
283 result
= file_double
.os_stat_scenario
.call_hook()
285 result
= orig_os_stat(path
)
288 func_patcher
= mock
.patch
.object(
289 os
, 'stat', side_effect
=fake_os_stat
)
291 testcase
.addCleanup(func_patcher
.stop
)
294 def patch_os_lstat(testcase
):
295 """ Patch `os.lstat` behaviour for this test case.
297 When the patched function is called, the registry of
298 `FileDouble` instances for this test case will be used to get
299 the instance for the path specified.
302 orig_os_lstat
= os
.lstat
304 def fake_os_lstat(path
):
305 registry
= FileDouble
.get_registry_for_testcase(testcase
)
307 file_double
= registry
[path
]
308 result
= file_double
.os_lstat_scenario
.call_hook()
310 result
= orig_os_lstat(path
)
313 func_patcher
= mock
.patch
.object(
314 os
, 'lstat', side_effect
=fake_os_lstat
)
316 testcase
.addCleanup(func_patcher
.stop
)
319 def patch_os_unlink(testcase
):
320 """ Patch `os.unlink` behaviour for this test case.
322 When the patched function is called, the registry of
323 `FileDouble` instances for this test case will be used to get
324 the instance for the path specified.
327 orig_os_unlink
= os
.unlink
329 def fake_os_unlink(path
):
330 registry
= FileDouble
.get_registry_for_testcase(testcase
)
332 file_double
= registry
[path
]
333 result
= file_double
.os_unlink_scenario
.call_hook()
335 result
= orig_os_unlink(path
)
338 func_patcher
= mock
.patch
.object(
339 os
, 'unlink', side_effect
=fake_os_unlink
)
341 testcase
.addCleanup(func_patcher
.stop
)
344 def patch_os_rmdir(testcase
):
345 """ Patch `os.rmdir` behaviour for this test case.
347 When the patched function is called, the registry of
348 `FileDouble` instances for this test case will be used to get
349 the instance for the path specified.
352 orig_os_rmdir
= os
.rmdir
354 def fake_os_rmdir(path
):
355 registry
= FileDouble
.get_registry_for_testcase(testcase
)
357 file_double
= registry
[path
]
358 result
= file_double
.os_rmdir_scenario
.call_hook()
360 result
= orig_os_rmdir(path
)
363 func_patcher
= mock
.patch
.object(
364 os
, 'rmdir', side_effect
=fake_os_rmdir
)
366 testcase
.addCleanup(func_patcher
.stop
)
369 def patch_tempfile_mkdtemp(testcase
):
370 """ Patch the `tempfile.mkdtemp` function for this test case. """
371 if not hasattr(testcase
, 'tempfile_mkdtemp_file_double'):
372 testcase
.tempfile_mkdtemp_file_double
= FileDouble(tempfile
.mktemp())
374 double
= testcase
.tempfile_mkdtemp_file_double
375 double
.set_os_unlink_scenario('okay')
376 double
.set_os_rmdir_scenario('okay')
377 double
.register_for_testcase(testcase
)
379 func_patcher
= mock
.patch
.object(tempfile
, "mkdtemp")
381 testcase
.addCleanup(func_patcher
.stop
)
383 tempfile
.mkdtemp
.return_value
= testcase
.tempfile_mkdtemp_file_double
.path
391 # Python 2 uses IOError.
392 def _ensure_ioerror_args(init_args
, init_kwargs
, errno_value
):
393 result_kwargs
= init_kwargs
394 result_errno
= errno_value
395 result_strerror
= os
.strerror(errno_value
)
396 result_filename
= None
397 if len(init_args
) >= 3:
398 result_errno
= init_args
[0]
399 result_filename
= init_args
[2]
400 if 'errno' in init_kwargs
:
401 result_errno
= init_kwargs
['errno']
402 del result_kwargs
['errno']
403 if 'filename' in init_kwargs
:
404 result_filename
= init_kwargs
['filename']
405 del result_kwargs
['filename']
406 if len(init_args
) >= 2:
407 result_strerror
= init_args
[1]
408 if 'strerror' in init_kwargs
:
409 result_strerror
= init_kwargs
['strerror']
410 del result_kwargs
['strerror']
411 result_args
= (result_errno
, result_strerror
, result_filename
)
412 return (result_args
, result_kwargs
)
414 class FileNotFoundError(IOError):
415 def __init__(self
, *args
, **kwargs
):
416 (args
, kwargs
) = _ensure_ioerror_args(
417 args
, kwargs
, errno_value
=errno
.ENOENT
)
418 super(FileNotFoundError
, self
).__init
__(*args
, **kwargs
)
420 class FileExistsError(IOError):
421 def __init__(self
, *args
, **kwargs
):
422 (args
, kwargs
) = _ensure_ioerror_args(
423 args
, kwargs
, errno_value
=errno
.EEXIST
)
424 super(FileExistsError
, self
).__init
__(*args
, **kwargs
)
426 class PermissionError(IOError):
427 def __init__(self
, *args
, **kwargs
):
428 (args
, kwargs
) = _ensure_ioerror_args(
429 args
, kwargs
, errno_value
=errno
.EPERM
)
430 super(PermissionError
, self
).__init
__(*args
, **kwargs
)
433 def make_fake_file_scenarios(path
=None):
434 """ Make a collection of scenarios for testing with fake files.
436 :path: The filesystem path of the fake file. If not specified,
437 a valid random path will be generated.
438 :return: A collection of scenarios for tests involving input files.
440 The collection is a mapping from scenario name to a dictionary of
446 file_path
= tempfile
.mktemp()
450 fake_file_empty
= StringIO()
451 fake_file_minimal
= StringIO("Lorem ipsum.")
452 fake_file_large
= StringIO("\n".join(
454 for __
in range(1000)))
456 default_scenario_params
= {
457 'open_scenario_name': 'okay',
458 'file_double_params': dict(
459 path
=file_path
, fake_file
=fake_file_minimal
),
465 'open_scenario_name': 'nonexist',
468 'open_scenario_name': 'exist_error',
470 'error-read-denied': {
471 'open_scenario_name': 'read_denied',
474 'file_double_params': dict(
475 path
=file_path
, fake_file
=fake_file_empty
),
478 'file_double_params': dict(
479 path
=file_path
, fake_file
=fake_file_empty
),
482 'file_double_params': dict(
483 path
=file_path
, fake_file
=fake_file_minimal
),
486 'file_double_params': dict(
487 path
=file_path
, fake_file
=fake_file_large
),
491 for (name
, scenario
) in scenarios
.items():
492 params
= default_scenario_params
.copy()
493 params
.update(scenario
)
494 scenario
.update(params
)
495 scenario
['file_double'] = FileDouble(**scenario
['file_double_params'])
496 scenario
['file_double'].set_open_scenario(params
['open_scenario_name'])
497 scenario
['fake_file_scenario_name'] = name
502 def get_file_doubles_from_fake_file_scenarios(scenarios
):
503 """ Get the `FileDouble` instances from fake file scenarios.
505 :param scenarios: Collection of fake file scenarios.
506 :return: Collection of `FileDouble` instances.
510 scenario
['file_double']
511 for scenario
in scenarios
512 if scenario
['file_double'] is not None)
517 def setup_file_double_behaviour(testcase
, doubles
=None):
518 """ Set up file double instances and behaviour.
520 :param testcase: The `TestCase` instance to modify.
521 :param doubles: Collection of `FileDouble` instances.
524 If `doubles` is ``None``, a default collection will be made
525 from the result of `make_fake_file_scenarios` result.
529 scenarios
= make_fake_file_scenarios()
530 doubles
= get_file_doubles_from_fake_file_scenarios(
533 for file_double
in doubles
:
534 file_double
.register_for_testcase(testcase
)
536 orig_open
= builtins
.open
538 def fake_open(path
, mode
='rt', buffering
=-1):
539 registry
= FileDouble
.get_registry_for_testcase(testcase
)
541 file_double
= registry
[path
]
542 result
= file_double
.builtins_open_scenario
.call_hook(
545 result
= orig_open(path
, mode
, buffering
)
548 mock_open
= mock
.mock_open()
549 mock_open
.side_effect
= fake_open
551 func_patcher
= mock
.patch
.object(
555 testcase
.addCleanup(func_patcher
.stop
)
558 def setup_fake_file_fixtures(testcase
):
559 """ Set up fixtures for fake file doubles.
561 :param testcase: The `TestCase` instance to modify.
565 scenarios
= make_fake_file_scenarios()
566 testcase
.fake_file_scenarios
= scenarios
568 file_doubles
= get_file_doubles_from_fake_file_scenarios(
570 setup_file_double_behaviour(testcase
, file_doubles
)
573 def set_fake_file_scenario(testcase
, name
):
574 """ Set the named fake file scenario for the test case. """
575 scenario
= testcase
.fake_file_scenarios
[name
]
576 testcase
.fake_file_scenario
= scenario
577 testcase
.file_double
= scenario
['file_double']
578 testcase
.file_double
.register_for_testcase(testcase
)
581 class FileFunctionScenario
:
582 """ Scenario for fake behaviour of a specific file-related function. """
584 def __init__(self
, scenario_name
, double
):
585 self
.scenario_name
= scenario_name
588 self
.call_hook
= getattr(
589 self
, "_hook_{name}".format(name
=self
.scenario_name
))
593 "<FileFunctionScenario instance: {id}"
595 " call_hook name: {hook_name!r}"
596 " double: {double!r}"
599 name
=self
.scenario_name
, double
=self
.double
,
600 hook_name
=self
.call_hook
.__name
__)
603 def __eq__(self
, other
):
605 if not self
.scenario_name
== other
.scenario_name
:
607 if not self
.double
== other
.double
:
609 if not self
.call_hook
.__name
__ == other
.call_hook
.__name
__:
613 def __ne__(self
, other
):
614 result
= not self
.__eq
__(other
)
618 class os_path_exists_scenario(FileFunctionScenario
):
619 """ Scenario for `os.path.exists` behaviour. """
621 def _hook_exist(self
):
624 def _hook_not_exist(self
):
628 class os_access_scenario(FileFunctionScenario
):
629 """ Scenario for `os.access` behaviour. """
631 def _hook_okay(self
, mode
):
634 def _hook_not_exist(self
, mode
):
637 def _hook_read_only(self
, mode
):
638 if mode
& (os
.W_OK | os
.X_OK
):
644 def _hook_denied(self
, mode
):
645 if mode
& (os
.R_OK | os
.W_OK | os
.X_OK
):
652 class os_stat_scenario(FileFunctionScenario
):
653 """ Scenario for `os.stat` behaviour. """
655 def _hook_okay(self
):
656 return self
.double
.stat_result
658 def _hook_notfound_error(self
):
659 raise FileNotFoundError(
661 "No such file or directory: {path!r}".format(
662 path
=self
.double
.path
))
664 def _hook_denied_error(self
):
665 raise PermissionError(
670 class os_lstat_scenario(os_stat_scenario
):
671 """ Scenario for `os.lstat` behaviour. """
674 class os_unlink_scenario(FileFunctionScenario
):
675 """ Scenario for `os.unlink` behaviour. """
677 def _hook_okay(self
):
680 def _hook_nonexist(self
):
681 error
= FileNotFoundError(
683 "No such file or directory: {path!r}".format(
684 path
=self
.double
.path
))
687 def _hook_denied(self
):
688 error
= PermissionError(
694 class os_rmdir_scenario(FileFunctionScenario
):
695 """ Scenario for `os.rmdir` behaviour. """
697 def _hook_okay(self
):
700 def _hook_nonexist(self
):
701 error
= FileNotFoundError(
703 "No such file or directory: {path!r}".format(
704 path
=self
.double
.path
))
707 def _hook_denied(self
):
708 error
= PermissionError(
714 class builtins_open_scenario(FileFunctionScenario
):
715 """ Scenario for `builtins.open` behaviour. """
717 def _hook_okay(self
, mode
, buffering
):
718 result
= self
.double
.fake_file
721 def _hook_nonexist(self
, mode
, buffering
):
722 if mode
.startswith('r'):
723 error
= FileNotFoundError(
725 "No such file or directory: {path!r}".format(
726 path
=self
.double
.path
))
728 result
= self
.double
.fake_file
731 def _hook_exist_error(self
, mode
, buffering
):
732 if mode
.startswith('w') or mode
.startswith('a'):
733 error
= FileExistsError(
735 "File already exists: {path!r}".format(
736 path
=self
.double
.path
))
738 result
= self
.double
.fake_file
741 def _hook_read_denied(self
, mode
, buffering
):
742 if mode
.startswith('r'):
743 error
= PermissionError(
745 "Read denied on {path!r}".format(
746 path
=self
.double
.path
))
748 result
= self
.double
.fake_file
751 def _hook_write_denied(self
, mode
, buffering
):
752 if mode
.startswith('w') or mode
.startswith('a'):
753 error
= PermissionError(
755 "Write denied on {path!r}".format(
756 path
=self
.double
.path
))
758 result
= self
.double
.fake_file
762 class TestDoubleWithRegistry
:
763 """ Abstract base class for a test double with a test case registry. """
765 registry_class
= NotImplemented
766 registries
= NotImplemented
768 function_scenario_params_by_class
= NotImplemented
770 def __new__(cls
, *args
, **kwargs
):
771 superclass
= super(TestDoubleWithRegistry
, cls
)
772 if superclass
.__new
__ is object.__new
__:
773 # The ‘object’ implementation complains about extra arguments.
774 instance
= superclass
.__new
__(cls
)
776 instance
= superclass
.__new
__(cls
, *args
, **kwargs
)
777 instance
.make_set_scenario_methods()
781 def __init__(self
, *args
, **kwargs
):
782 super(TestDoubleWithRegistry
, self
).__init
__(*args
, **kwargs
)
783 self
._set
_method
_per
_scenario
()
785 def _make_set_scenario_method(self
, scenario_class
, params
):
786 def method(self
, name
):
787 scenario
= scenario_class(name
, double
=self
)
788 setattr(self
, scenario_class
.__name
__, scenario
)
790 """ Set the scenario for `{name}` behaviour. """
791 ).format(name
=scenario_class
.__name
__)
792 method
.__name
__ = str(params
['set_scenario_method_name'])
795 def make_set_scenario_methods(self
):
796 """ Make `set_<scenario_class_name>` methods on this class. """
797 for (function_scenario_class
, function_scenario_params
) in (
798 self
.function_scenario_params_by_class
.items()):
799 method
= self
._make
_set
_scenario
_method
(
800 function_scenario_class
, function_scenario_params
)
801 setattr(self
.__class
__, method
.__name
__, method
)
802 function_scenario_params
['set_scenario_method'] = method
804 def _set_method_per_scenario(self
):
805 """ Set the method to be called for each scenario. """
806 for function_scenario_params
in (
807 self
.function_scenario_params_by_class
.values()):
808 function_scenario_params
['set_scenario_method'](
809 self
, function_scenario_params
['default_scenario_name'])
812 def get_registry_for_testcase(cls
, testcase
):
813 """ Get the FileDouble registry for the specified test case. """
814 # Key in a dict must be hashable.
815 key
= (testcase
.__class
__, id(testcase
))
816 registry
= cls
.registries
.setdefault(key
, cls
.registry_class())
819 def get_registry_key(self
):
820 """ Get the registry key for this double. """
821 raise NotImplementedError
823 def register_for_testcase(self
, testcase
):
824 """ Add this instance to registry for the specified testcase. """
825 registry
= self
.get_registry_for_testcase(testcase
)
826 key
= self
.get_registry_key()
828 unregister_func
= functools
.partial(
829 self
.unregister_for_testcase
, testcase
)
830 testcase
.addCleanup(unregister_func
)
832 def unregister_for_testcase(self
, testcase
):
833 """ Remove this instance from registry for the specified testcase. """
834 registry
= self
.get_registry_for_testcase(testcase
)
835 key
= self
.get_registry_key()
840 def copy_fake_file(fake_file
):
841 """ Make a copy of the StringIO instance. """
842 fake_file_type
= StringIO
844 if fake_file
is not None:
845 fake_file_type
= type(fake_file
)
846 content
= fake_file
.getvalue()
847 assert issubclass(fake_file_type
, object)
848 result
= fake_file_type(content
)
852 class FileDouble(TestDoubleWithRegistry
):
853 """ A testing double for a file. """
855 registry_class
= dict
858 function_scenario_params_by_class
= {
859 os_path_exists_scenario
: {
860 'default_scenario_name': 'not_exist',
861 'set_scenario_method_name': 'set_os_path_exists_scenario',
863 os_access_scenario
: {
864 'default_scenario_name': 'okay',
865 'set_scenario_method_name': 'set_os_access_scenario',
868 'default_scenario_name': 'okay',
869 'set_scenario_method_name': 'set_os_stat_scenario',
872 'default_scenario_name': 'okay',
873 'set_scenario_method_name': 'set_os_lstat_scenario',
875 builtins_open_scenario
: {
876 'default_scenario_name': 'okay',
877 'set_scenario_method_name': 'set_open_scenario',
879 os_unlink_scenario
: {
880 'default_scenario_name': 'okay',
881 'set_scenario_method_name': 'set_os_unlink_scenario',
884 'default_scenario_name': 'okay',
885 'set_scenario_method_name': 'set_os_rmdir_scenario',
889 def __init__(self
, path
=None, fake_file
=None, *args
, **kwargs
):
891 self
.fake_file
= copy_fake_file(fake_file
)
892 self
.fake_file
.name
= path
894 self
._set
_stat
_result
()
896 super(FileDouble
, self
).__init
__(*args
, **kwargs
)
898 def _set_stat_result(self
):
899 """ Set the `os.stat` result for this file. """
900 size
= len(self
.fake_file
.getvalue())
901 self
.stat_result
= StatResult(
903 st_ino
=None, st_dev
=None, st_nlink
=None,
906 st_atime
=None, st_mtime
=None, st_ctime
=None,
910 text
= "FileDouble(path={path!r}, fake_file={fake_file!r})".format(
911 path
=self
.path
, fake_file
=self
.fake_file
)
914 def get_registry_key(self
):
915 """ Get the registry key for this double. """
920 class SubprocessFunctionScenario
:
921 """ Scenario for fake behaviour of a specific subprocess function. """
923 def __init__(self
, scenario_name
, double
):
924 self
.scenario_name
= scenario_name
925 self
.subprocess_double
= double
927 self
.call_hook
= getattr(
928 self
, "_hook_{name}".format(name
=self
.scenario_name
))
932 "<SubprocessFunctionScenario instance: {id}"
934 " call_hook name: {hook_name!r}"
935 " subprocess_double: {double!r}"
938 name
=self
.scenario_name
, double
=self
.subprocess_double
,
939 hook_name
=self
.call_hook
.__name
__)
942 def __eq__(self
, other
):
944 if not self
.scenario_name
== other
.scenario_name
:
946 if not self
.subprocess_double
== other
.subprocess_double
:
948 if not self
.call_hook
.__name
__ == other
.call_hook
.__name
__:
952 def __ne__(self
, other
):
953 result
= not self
.__eq
__(other
)
957 class os_popen_scenario(SubprocessFunctionScenario
):
958 """ Scenario for `os.popen` behaviour. """
960 stream_name_by_mode
= {
965 def _hook_success(self
, cmd
, mode
, buffering
):
966 stream_name
= self
.stream_name_by_mode
[mode
]
967 stream_double
= getattr(
968 self
.subprocess_double
, stream_name
+ '_double')
969 result
= stream_double
.fake_file
972 def _hook_failure(self
, cmd
, mode
, buffering
):
976 def _hook_not_found(self
, cmd
, mode
, buffering
):
981 class os_waitpid_scenario(SubprocessFunctionScenario
):
982 """ Scenario for `os.waitpid` behaviour. """
984 def _hook_success(self
, pid
, options
):
985 result
= (pid
, EXIT_STATUS_SUCCESS
)
988 def _hook_failure(self
, pid
, options
):
989 result
= (pid
, EXIT_STATUS_FAILURE
)
992 def _hook_not_found(self
, pid
, options
):
993 error
= OSError(errno
.ECHILD
)
997 class os_system_scenario(SubprocessFunctionScenario
):
998 """ Scenario for `os.system` behaviour. """
1000 def _hook_success(self
, command
):
1001 result
= EXIT_STATUS_SUCCESS
1004 def _hook_failure(self
, command
):
1005 result
= EXIT_STATUS_FAILURE
1008 def _hook_not_found(self
, command
):
1009 result
= EXIT_STATUS_COMMAND_NOT_FOUND
1013 class os_spawnv_scenario(SubprocessFunctionScenario
):
1014 """ Scenario for `os.spawnv` behaviour. """
1016 def _hook_success(self
, mode
, file, args
):
1017 result
= EXIT_STATUS_SUCCESS
1020 def _hook_failure(self
, mode
, file, args
):
1021 result
= EXIT_STATUS_FAILURE
1024 def _hook_not_found(self
, mode
, file, args
):
1025 result
= EXIT_STATUS_COMMAND_NOT_FOUND
1034 """ A testing double for `subprocess.Popen`. """
1036 def __init__(self
, args
, *posargs
, **kwargs
):
1041 self
.returncode
= None
1043 def set_streams(self
, subprocess_double
, popen_kwargs
):
1044 """ Set the streams on the `PopenDouble`.
1046 :param subprocess_double: The `SubprocessDouble` from
1047 which to get existing stream doubles.
1048 :param popen_kwargs: The keyword arguments to the
1049 `subprocess.Popen` call.
1053 for stream_name
in (
1054 name
for name
in ['stdin', 'stdout', 'stderr']
1055 if name
in popen_kwargs
):
1056 stream_spec
= popen_kwargs
[stream_name
]
1057 if stream_spec
is subprocess
.PIPE
:
1058 stream_double
= getattr(
1060 "{name}_double".format(name
=stream_name
))
1061 stream_file
= stream_double
.fake_file
1062 elif stream_spec
is subprocess
.STDOUT
:
1063 stream_file
= subprocess_double
.stdout_double
.fake_file
1065 stream_file
= stream_spec
1066 setattr(self
, stream_name
, stream_file
)
1069 class subprocess_popen_scenario(SubprocessFunctionScenario
):
1070 """ Scenario for `subprocess.Popen` behaviour. """
1072 def _hook_success(self
, testcase
, args
, *posargs
, **kwargs
):
1073 double
= self
.subprocess_double
.popen_double
1074 double
.set_streams(self
.subprocess_double
, kwargs
)
1078 def patch_subprocess_popen(testcase
):
1079 """ Patch `subprocess.Popen` constructor for this test case.
1081 :param testcase: The `TestCase` instance to modify.
1084 When the patched function is called, the registry of
1085 `SubprocessDouble` instances for this test case will be used
1086 to get the instance for the program path specified.
1089 orig_subprocess_popen
= subprocess
.Popen
1091 def fake_subprocess_popen(args
, *posargs
, **kwargs
):
1092 if kwargs
.get('shell', False):
1093 argv
= shlex
.split(args
)
1096 registry
= SubprocessDouble
.get_registry_for_testcase(testcase
)
1097 if argv
in registry
:
1098 subprocess_double
= registry
[argv
]
1099 result
= subprocess_double
.subprocess_popen_scenario
.call_hook(
1100 testcase
, args
, *posargs
, **kwargs
)
1102 result
= orig_subprocess_popen(args
, *posargs
, **kwargs
)
1105 func_patcher
= mock
.patch
.object(
1106 subprocess
, "Popen", side_effect
=fake_subprocess_popen
)
1107 func_patcher
.start()
1108 testcase
.addCleanup(func_patcher
.stop
)
1111 class SubprocessDoubleRegistry(collections_abc
.MutableMapping
):
1112 """ Registry of `SubprocessDouble` instances by `argv`. """
1114 def __init__(self
, *args
, **kwargs
):
1117 if isinstance(args
[0], collections_abc
.Mapping
):
1118 items
= args
[0].items()
1119 if isinstance(args
[0], collections_abc
.Iterable
):
1121 self
._mapping
= dict(items
)
1124 text
= "<{class_name} object: {mapping}>".format(
1125 class_name
=self
.__class
__.__name
__, mapping
=self
._mapping
)
1128 def _match_argv(self
, argv
):
1129 """ Match the specified `argv` with our registered keys. """
1131 if not isinstance(argv
, collections_abc
.Sequence
):
1133 candidates
= iter(self
._mapping
)
1134 while match
is None:
1136 candidate
= next(candidates
)
1137 except StopIteration:
1140 if candidate
== argv
:
1143 word_iter
= enumerate(candidate
)
1144 while found
is None:
1146 (word_index
, candidate_word
) = next(word_iter
)
1147 except StopIteration:
1149 if candidate_word
is ARG_MORE
:
1150 # Candiate matches any remaining words. We have a match.
1152 elif word_index
> len(argv
):
1153 # Candidate is too long for the specified argv.
1155 elif candidate_word
is ARG_ANY
:
1156 # Candidate matches any word at this position.
1158 elif candidate_word
== argv
[word_index
]:
1159 # Candidate matches the word at this position.
1162 # This candidate does not match.
1165 # Reached the end of the candidate without a mismatch.
1171 def __getitem__(self
, key
):
1172 match
= self
._match
_argv
(key
)
1175 result
= self
._mapping
[match
]
1178 def __setitem__(self
, key
, value
):
1181 self
._mapping
[key
] = value
1183 def __delitem__(self
, key
):
1184 match
= self
._match
_argv
(key
)
1185 if match
is not None:
1186 del self
._mapping
[match
]
1189 return self
._mapping
.__iter
__()
1192 return self
._mapping
.__len
__()
1195 class SubprocessDouble(TestDoubleWithRegistry
):
1196 """ A testing double for a subprocess. """
1198 registry_class
= SubprocessDoubleRegistry
1201 double_by_pid
= weakref
.WeakValueDictionary()
1203 function_scenario_params_by_class
= {
1204 subprocess_popen_scenario
: {
1205 'default_scenario_name': 'success',
1206 'set_scenario_method_name': 'set_subprocess_popen_scenario',
1208 os_popen_scenario
: {
1209 'default_scenario_name': 'success',
1210 'set_scenario_method_name': 'set_os_popen_scenario',
1212 os_waitpid_scenario
: {
1213 'default_scenario_name': 'success',
1214 'set_scenario_method_name': 'set_os_waitpid_scenario',
1216 os_system_scenario
: {
1217 'default_scenario_name': 'success',
1218 'set_scenario_method_name': 'set_os_system_scenario',
1220 os_spawnv_scenario
: {
1221 'default_scenario_name': 'success',
1222 'set_scenario_method_name': 'set_os_spawnv_scenario',
1226 def __init__(self
, path
=None, argv
=None, *args
, **kwargs
):
1228 path
= tempfile
.mktemp()
1232 command_name
= os
.path
.basename(path
)
1233 argv
= [command_name
]
1236 self
.pid
= self
._make
_pid
()
1237 self
._register
_by
_pid
()
1239 self
.set_popen_double()
1241 self
.stdin_double
= FileDouble()
1242 self
.stdout_double
= FileDouble()
1243 self
.stderr_double
= FileDouble()
1245 super(SubprocessDouble
, self
).__init
__(*args
, **kwargs
)
1247 def set_popen_double(self
):
1248 """ Set the `PopenDouble` for this instance. """
1249 double
= PopenDouble(self
.argv
)
1250 double
.pid
= self
.pid
1252 self
.popen_double
= double
1256 "<SubprocessDouble instance: {id}"
1259 " stdin_double: {stdin_double!r}"
1260 " stdout_double: {stdout_double!r}"
1261 " stderr_double: {stderr_double!r}"
1264 path
=self
.path
, argv
=self
.argv
,
1265 stdin_double
=self
.stdin_double
,
1266 stdout_double
=self
.stdout_double
,
1267 stderr_double
=self
.stderr_double
)
1272 """ Make a unique PID for a subprocess. """
1273 for pid
in itertools
.count(1):
1276 def _register_by_pid(self
):
1277 """ Register this subprocess by its PID. """
1278 self
.__class
__.double_by_pid
[self
.pid
] = self
1280 def get_registry_key(self
):
1281 """ Get the registry key for this double. """
1282 result
= tuple(self
.argv
)
1285 def set_stdin_content(self
, text
):
1286 """ Set the content of the `stdin` stream for this double. """
1287 self
.stdin_double
.fake_file
= StringIO(text
)
1289 def set_stdout_content(self
, text
):
1290 """ Set the content of the `stdout` stream for this double. """
1291 self
.stdout_double
.fake_file
= StringIO(text
)
1293 def set_stderr_content(self
, text
):
1294 """ Set the content of the `stderr` stream for this double. """
1295 self
.stderr_double
.fake_file
= StringIO(text
)
1298 def make_fake_subprocess_scenarios(path
=None):
1299 """ Make a collection of scenarios for testing with fake files.
1301 :path: The filesystem path of the fake program. If not specified,
1302 a valid random path will be generated.
1303 :return: A collection of scenarios for tests involving subprocesses.
1305 The collection is a mapping from scenario name to a dictionary of
1306 scenario attributes.
1310 file_path
= tempfile
.mktemp()
1314 default_scenario_params
= {
1315 'return_value': EXIT_STATUS_SUCCESS
,
1316 'program_path': file_path
,
1317 'argv_after_command_name': [],
1323 'return_value': EXIT_STATUS_COMMAND_NOT_FOUND
,
1327 for (name
, scenario
) in scenarios
.items():
1328 params
= default_scenario_params
.copy()
1329 params
.update(scenario
)
1330 scenario
.update(params
)
1331 program_path
= params
['program_path']
1332 program_name
= os
.path
.basename(params
['program_path'])
1333 argv
= [program_name
]
1334 argv
.extend(params
['argv_after_command_name'])
1335 subprocess_double_params
= dict(
1339 subprocess_double
= SubprocessDouble(**subprocess_double_params
)
1340 scenario
['subprocess_double'] = subprocess_double
1341 scenario
['fake_file_scenario_name'] = name
1346 def get_subprocess_doubles_from_fake_subprocess_scenarios(scenarios
):
1347 """ Get the `SubprocessDouble` instances from fake subprocess scenarios.
1349 :param scenarios: Collection of fake subprocess scenarios.
1350 :return: Collection of `SubprocessDouble` instances.
1354 scenario
['subprocess_double']
1355 for scenario
in scenarios
1356 if scenario
['subprocess_double'] is not None)
1361 def setup_subprocess_double_behaviour(testcase
, doubles
=None):
1362 """ Set up subprocess double instances and behaviour.
1364 :param testcase: The `TestCase` instance to modify.
1365 :param doubles: Collection of `SubprocessDouble` instances.
1368 If `doubles` is ``None``, a default collection will be made
1369 from the return value of `make_fake_subprocess_scenarios`.
1373 scenarios
= make_fake_subprocess_scenarios()
1374 doubles
= get_subprocess_doubles_from_fake_subprocess_scenarios(
1377 for double
in doubles
:
1378 double
.register_for_testcase(testcase
)
1381 def setup_fake_subprocess_fixtures(testcase
):
1382 """ Set up fixtures for fake subprocess doubles.
1384 :param testcase: The `TestCase` instance to modify.
1388 scenarios
= make_fake_subprocess_scenarios()
1389 testcase
.fake_subprocess_scenarios
= scenarios
1391 doubles
= get_subprocess_doubles_from_fake_subprocess_scenarios(
1393 setup_subprocess_double_behaviour(testcase
, doubles
)
1396 def patch_os_popen(testcase
):
1397 """ Patch `os.popen` behaviour for this test case.
1399 :param testcase: The `TestCase` instance to modify.
1402 When the patched function is called, the registry of
1403 `SubprocessDouble` instances for this test case will be used
1404 to get the instance for the program path specified.
1407 orig_os_popen
= os
.popen
1409 def fake_os_popen(cmd
, mode
='r', buffering
=-1):
1410 registry
= SubprocessDouble
.get_registry_for_testcase(testcase
)
1411 command_argv
= shlex
.split(cmd
)
1412 if command_argv
in registry
:
1413 subprocess_double
= registry
[command_argv
]
1414 result
= subprocess_double
.os_popen_scenario
.call_hook(
1415 cmd
, mode
, buffering
)
1417 result
= orig_os_popen(cmd
, mode
, buffering
)
1420 func_patcher
= mock
.patch
.object(
1421 os
, "popen", side_effect
=fake_os_popen
)
1422 func_patcher
.start()
1423 testcase
.addCleanup(func_patcher
.stop
)
1426 def patch_os_waitpid(testcase
):
1427 """ Patch `os.waitpid` behaviour for this test case.
1429 :param testcase: The `TestCase` instance to modify.
1432 When the patched function is called, the registry of
1433 `SubprocessDouble` instances for this test case will be used
1434 to get the instance for the program path specified.
1437 orig_os_waitpid
= os
.waitpid
1439 def fake_os_waitpid(pid
, options
):
1440 registry
= SubprocessDouble
.double_by_pid
1442 subprocess_double
= registry
[pid
]
1443 result
= subprocess_double
.os_waitpid_scenario
.call_hook(
1446 result
= orig_os_waitpid(pid
, options
)
1449 func_patcher
= mock
.patch
.object(
1450 os
, "waitpid", side_effect
=fake_os_waitpid
)
1451 func_patcher
.start()
1452 testcase
.addCleanup(func_patcher
.stop
)
1455 def patch_os_system(testcase
):
1456 """ Patch `os.system` behaviour for this test case.
1458 :param testcase: The `TestCase` instance to modify.
1461 When the patched function is called, the registry of
1462 `SubprocessDouble` instances for this test case will be used
1463 to get the instance for the program path specified.
1466 orig_os_system
= os
.system
1468 def fake_os_system(command
):
1469 registry
= SubprocessDouble
.get_registry_for_testcase(testcase
)
1470 command_argv
= shlex
.split(command
)
1471 if command_argv
in registry
:
1472 subprocess_double
= registry
[command_argv
]
1473 result
= subprocess_double
.os_system_scenario
.call_hook(
1476 result
= orig_os_system(command
)
1479 func_patcher
= mock
.patch
.object(
1480 os
, "system", side_effect
=fake_os_system
)
1481 func_patcher
.start()
1482 testcase
.addCleanup(func_patcher
.stop
)
1485 def patch_os_spawnv(testcase
):
1486 """ Patch `os.spawnv` behaviour for this test case.
1488 :param testcase: The `TestCase` instance to modify.
1491 When the patched function is called, the registry of
1492 `SubprocessDouble` instances for this test case will be used
1493 to get the instance for the program path specified.
1496 orig_os_spawnv
= os
.spawnv
1498 def fake_os_spawnv(mode
, file, args
):
1499 registry
= SubprocessDouble
.get_registry_for_testcase(testcase
)
1500 registry_key
= tuple(args
)
1501 if registry_key
in registry
:
1502 subprocess_double
= registry
[registry_key
]
1503 result
= subprocess_double
.os_spawnv_scenario
.call_hook(
1506 result
= orig_os_spawnv(mode
, file, args
)
1509 func_patcher
= mock
.patch
.object(
1510 os
, "spawnv", side_effect
=fake_os_spawnv
)
1511 func_patcher
.start()
1512 testcase
.addCleanup(func_patcher
.stop
)
1519 # vim: fileencoding=utf-8 filetype=python :