1 # -*- coding: utf-8; -*-
4 # Part of ‘dput’, a Debian package upload toolkit.
6 # Copyright © 2015 Ben Finney <ben+python@benfinney.id.au>
8 # This is free software: you may copy, modify, and/or distribute this work
9 # under the terms of the GNU General Public License as published by the
10 # Free Software Foundation; version 3 of that license or any later version.
11 # No warranty expressed or implied. See the file ‘LICENSE.GPL-3’ for details.
13 """ Helper functionality for Dput test suite. """
15 from __future__
import (absolute_import
, unicode_literals
)
19 if sys
.version_info
>= (3, 3):
22 import unittest
.mock
as mock
23 from io
import StringIO
as StringIO
25 import collections
.abc
as collections_abc
26 elif sys
.version_info
>= (3, 0):
27 raise RuntimeError("Python 3 earlier than 3.3 is not supported.")
28 elif sys
.version_info
>= (2, 7):
29 # Python 2 standard library.
30 import __builtin__
as builtins
31 # Third-party backport of Python 3 unittest improvements.
32 import unittest2
as unittest
33 # Third-party mock library.
35 # Python 2 standard library.
36 from StringIO
import StringIO
as BaseStringIO
37 import ConfigParser
as configparser
38 import collections
as collections_abc
40 raise RuntimeError("Python earlier than 2.7 is not supported.")
59 __package__
= str("test")
60 __import__(__package__
)
69 # Alias for Python 3 types.
74 def make_unique_slug(testcase
):
75 """ Make a unique slug for the test case. """
76 text
= base64
.b64encode(
77 testcase
.getUniqueString().encode('utf-8')
86 # We don't yet have the StringIO we want. Create it.
88 class StringIO(BaseStringIO
, object):
89 """ StringIO with a context manager. """
94 def __exit__(self
, *args
):
108 def patch_stdout(testcase
):
109 """ Patch `sys.stdout` for the specified test case. """
110 patcher
= mock
.patch
.object(
111 sys
, 'stdout', wraps
=StringIO())
113 testcase
.addCleanup(patcher
.stop
)
116 def patch_stderr(testcase
):
117 """ Patch `sys.stderr` for the specified test case. """
118 patcher
= mock
.patch
.object(
119 sys
, 'stderr', wraps
=StringIO())
121 testcase
.addCleanup(patcher
.stop
)
124 def patch_signal_signal(testcase
):
125 """ Patch `signal.signal` for the specified test case. """
126 func_patcher
= mock
.patch
.object(signal
, 'signal')
128 testcase
.addCleanup(func_patcher
.stop
)
131 class FakeSystemExit(Exception):
132 """ Fake double for `SystemExit` exception. """
135 EXIT_STATUS_SUCCESS
= 0
136 EXIT_STATUS_FAILURE
= 1
137 EXIT_STATUS_COMMAND_NOT_FOUND
= 127
140 def patch_sys_exit(testcase
):
141 """ Patch `sys.exit` for the specified test case. """
142 func_patcher
= mock
.patch
.object(
144 side_effect
=FakeSystemExit())
146 testcase
.addCleanup(func_patcher
.stop
)
149 def patch_sys_argv(testcase
):
150 """ Patch the `sys.argv` sequence for the test case. """
151 if not hasattr(testcase
, 'progname'):
152 testcase
.progname
= make_unique_slug(testcase
)
153 if not hasattr(testcase
, 'sys_argv'):
154 testcase
.sys_argv
= [testcase
.progname
]
155 patcher
= mock
.patch
.object(sys
, "argv", new
=list(testcase
.sys_argv
))
157 testcase
.addCleanup(patcher
.stop
)
160 def patch_system_interfaces(testcase
):
161 """ Patch system interfaces that are disruptive to the test runner. """
162 patch_stdout(testcase
)
163 patch_stderr(testcase
)
164 patch_sys_exit(testcase
)
165 patch_sys_argv(testcase
)
168 def patch_time_time(testcase
, values
=None):
169 """ Patch the `time.time` function for the specified test case.
171 :param testcase: The `TestCase` instance for binding to the patch.
172 :param values: An iterable to provide return values.
177 values
= itertools
.count()
179 def generator_fake_time():
183 func_patcher
= mock
.patch
.object(time
, "time")
185 testcase
.addCleanup(func_patcher
.stop
)
187 time
.time
.side_effect
= generator_fake_time()
190 def patch_os_environ(testcase
):
191 """ Patch the `os.environ` mapping. """
192 if not hasattr(testcase
, 'os_environ'):
193 testcase
.os_environ
= {}
194 patcher
= mock
.patch
.object(os
, "environ", new
=testcase
.os_environ
)
196 testcase
.addCleanup(patcher
.stop
)
199 def patch_os_getpid(testcase
):
200 """ Patch `os.getpid` for the specified test case. """
201 func_patcher
= mock
.patch
.object(os
, 'getpid')
203 testcase
.addCleanup(func_patcher
.stop
)
206 def patch_os_getuid(testcase
):
207 """ Patch the `os.getuid` function. """
208 if not hasattr(testcase
, 'os_getuid_return_value'):
209 testcase
.os_getuid_return_value
= testcase
.getUniqueInteger()
210 func_patcher
= mock
.patch
.object(
211 os
, "getuid", return_value
=testcase
.os_getuid_return_value
)
213 testcase
.addCleanup(func_patcher
.stop
)
216 PasswdEntry
= collections
.namedtuple(
218 "pw_name pw_passwd pw_uid pw_gid pw_gecos pw_dir pw_shell")
221 def patch_pwd_getpwuid(testcase
):
222 """ Patch the `pwd.getpwuid` function. """
223 if not hasattr(testcase
, 'pwd_getpwuid_return_value'):
224 testcase
.pwd_getpwuid_return_value
= PasswdEntry(
225 pw_name
=make_unique_slug(testcase
),
226 pw_passwd
=make_unique_slug(testcase
),
227 pw_uid
=testcase
.getUniqueInteger(),
228 pw_gid
=testcase
.getUniqueInteger(),
229 pw_gecos
=testcase
.getUniqueString(),
230 pw_dir
=tempfile
.mktemp(),
231 pw_shell
=tempfile
.mktemp())
232 if not isinstance(testcase
.pwd_getpwuid_return_value
, pwd
.struct_passwd
):
233 pwent
= pwd
.struct_passwd(testcase
.pwd_getpwuid_return_value
)
235 pwent
= testcase
.pwd_getpwuid_return_value
236 func_patcher
= mock
.patch
.object(pwd
, "getpwuid", return_value
=pwent
)
238 testcase
.addCleanup(func_patcher
.stop
)
241 def patch_os_path_exists(testcase
):
242 """ Patch `os.path.exists` behaviour for this test case.
244 When the patched function is called, the registry of
245 `FileDouble` instances for this test case will be used to get
246 the instance for the path specified.
249 orig_os_path_exists
= os
.path
.exists
251 def fake_os_path_exists(path
):
252 registry
= FileDouble
.get_registry_for_testcase(testcase
)
254 file_double
= registry
[path
]
255 result
= file_double
.os_path_exists_scenario
.call_hook()
257 result
= orig_os_path_exists(path
)
260 func_patcher
= mock
.patch
.object(
261 os
.path
, 'exists', side_effect
=fake_os_path_exists
)
263 testcase
.addCleanup(func_patcher
.stop
)
266 def patch_os_access(testcase
):
267 """ Patch `os.access` behaviour for this test case.
269 When the patched function is called, the registry of
270 `FileDouble` instances for this test case will be used to get
271 the instance for the path specified.
274 orig_os_access
= os
.access
276 def fake_os_access(path
, mode
):
277 registry
= FileDouble
.get_registry_for_testcase(testcase
)
279 file_double
= registry
[path
]
280 result
= file_double
.os_access_scenario
.call_hook(mode
)
282 result
= orig_os_access(path
, mode
)
285 func_patcher
= mock
.patch
.object(
286 os
, 'access', side_effect
=fake_os_access
)
288 testcase
.addCleanup(func_patcher
.stop
)
291 StatResult
= collections
.namedtuple(
294 'st_ino', 'st_dev', 'st_nlink',
297 'st_atime', 'st_mtime', 'st_ctime',
301 def patch_os_stat(testcase
):
302 """ Patch `os.stat` behaviour for this test case.
304 When the patched function is called, the registry of
305 `FileDouble` instances for this test case will be used to get
306 the instance for the path specified.
309 orig_os_stat
= os
.stat
311 def fake_os_stat(path
):
312 registry
= FileDouble
.get_registry_for_testcase(testcase
)
314 file_double
= registry
[path
]
315 result
= file_double
.os_stat_scenario
.call_hook()
317 result
= orig_os_stat(path
)
320 func_patcher
= mock
.patch
.object(
321 os
, 'stat', side_effect
=fake_os_stat
)
323 testcase
.addCleanup(func_patcher
.stop
)
326 def patch_os_lstat(testcase
):
327 """ Patch `os.lstat` behaviour for this test case.
329 When the patched function is called, the registry of
330 `FileDouble` instances for this test case will be used to get
331 the instance for the path specified.
334 orig_os_lstat
= os
.lstat
336 def fake_os_lstat(path
):
337 registry
= FileDouble
.get_registry_for_testcase(testcase
)
339 file_double
= registry
[path
]
340 result
= file_double
.os_lstat_scenario
.call_hook()
342 result
= orig_os_lstat(path
)
345 func_patcher
= mock
.patch
.object(
346 os
, 'lstat', side_effect
=fake_os_lstat
)
348 testcase
.addCleanup(func_patcher
.stop
)
351 def patch_os_unlink(testcase
):
352 """ Patch `os.unlink` behaviour for this test case.
354 When the patched function is called, the registry of
355 `FileDouble` instances for this test case will be used to get
356 the instance for the path specified.
359 orig_os_unlink
= os
.unlink
361 def fake_os_unlink(path
):
362 registry
= FileDouble
.get_registry_for_testcase(testcase
)
364 file_double
= registry
[path
]
365 result
= file_double
.os_unlink_scenario
.call_hook()
367 result
= orig_os_unlink(path
)
370 func_patcher
= mock
.patch
.object(
371 os
, 'unlink', side_effect
=fake_os_unlink
)
373 testcase
.addCleanup(func_patcher
.stop
)
376 def patch_os_rmdir(testcase
):
377 """ Patch `os.rmdir` behaviour for this test case.
379 When the patched function is called, the registry of
380 `FileDouble` instances for this test case will be used to get
381 the instance for the path specified.
384 orig_os_rmdir
= os
.rmdir
386 def fake_os_rmdir(path
):
387 registry
= FileDouble
.get_registry_for_testcase(testcase
)
389 file_double
= registry
[path
]
390 result
= file_double
.os_rmdir_scenario
.call_hook()
392 result
= orig_os_rmdir(path
)
395 func_patcher
= mock
.patch
.object(
396 os
, 'rmdir', side_effect
=fake_os_rmdir
)
398 testcase
.addCleanup(func_patcher
.stop
)
401 def patch_shutil_rmtree(testcase
):
402 """ Patch `shutil.rmtree` behaviour for this test case.
404 When the patched function is called, the registry of
405 `FileDouble` instances for this test case will be used to get
406 the instance for the path specified.
409 orig_shutil_rmtree
= os
.rmdir
411 def fake_shutil_rmtree(path
, ignore_errors
=False, onerror
=None):
412 registry
= FileDouble
.get_registry_for_testcase(testcase
)
414 file_double
= registry
[path
]
415 result
= file_double
.shutil_rmtree_scenario
.call_hook()
417 result
= orig_shutil_rmtree(path
)
420 func_patcher
= mock
.patch
.object(
421 shutil
, 'rmtree', side_effect
=fake_shutil_rmtree
)
423 testcase
.addCleanup(func_patcher
.stop
)
426 def patch_tempfile_mkdtemp(testcase
):
427 """ Patch the `tempfile.mkdtemp` function for this test case. """
428 if not hasattr(testcase
, 'tempfile_mkdtemp_file_double'):
429 testcase
.tempfile_mkdtemp_file_double
= FileDouble(tempfile
.mktemp())
431 double
= testcase
.tempfile_mkdtemp_file_double
432 double
.set_os_unlink_scenario('okay')
433 double
.set_os_rmdir_scenario('okay')
434 double
.register_for_testcase(testcase
)
436 func_patcher
= mock
.patch
.object(tempfile
, "mkdtemp")
438 testcase
.addCleanup(func_patcher
.stop
)
440 tempfile
.mkdtemp
.return_value
= testcase
.tempfile_mkdtemp_file_double
.path
448 # Python 2 uses IOError.
449 def _ensure_ioerror_args(init_args
, init_kwargs
, errno_value
):
450 result_kwargs
= init_kwargs
451 result_errno
= errno_value
452 result_strerror
= os
.strerror(errno_value
)
453 result_filename
= None
454 if len(init_args
) >= 3:
455 result_errno
= init_args
[0]
456 result_filename
= init_args
[2]
457 if 'errno' in init_kwargs
:
458 result_errno
= init_kwargs
['errno']
459 del result_kwargs
['errno']
460 if 'filename' in init_kwargs
:
461 result_filename
= init_kwargs
['filename']
462 del result_kwargs
['filename']
463 if len(init_args
) >= 2:
464 result_strerror
= init_args
[1]
465 if 'strerror' in init_kwargs
:
466 result_strerror
= init_kwargs
['strerror']
467 del result_kwargs
['strerror']
468 result_args
= (result_errno
, result_strerror
, result_filename
)
469 return (result_args
, result_kwargs
)
471 class FileNotFoundError(IOError):
472 def __init__(self
, *args
, **kwargs
):
473 (args
, kwargs
) = _ensure_ioerror_args(
474 args
, kwargs
, errno_value
=errno
.ENOENT
)
475 super(FileNotFoundError
, self
).__init
__(*args
, **kwargs
)
477 class FileExistsError(IOError):
478 def __init__(self
, *args
, **kwargs
):
479 (args
, kwargs
) = _ensure_ioerror_args(
480 args
, kwargs
, errno_value
=errno
.EEXIST
)
481 super(FileExistsError
, self
).__init
__(*args
, **kwargs
)
483 class PermissionError(IOError):
484 def __init__(self
, *args
, **kwargs
):
485 (args
, kwargs
) = _ensure_ioerror_args(
486 args
, kwargs
, errno_value
=errno
.EPERM
)
487 super(PermissionError
, self
).__init
__(*args
, **kwargs
)
490 def make_fake_file_scenarios(path
=None):
491 """ Make a collection of scenarios for testing with fake files.
493 :path: The filesystem path of the fake file. If not specified,
494 a valid random path will be generated.
495 :return: A collection of scenarios for tests involving input files.
497 The collection is a mapping from scenario name to a dictionary of
503 file_path
= tempfile
.mktemp()
507 fake_file_empty
= StringIO()
508 fake_file_minimal
= StringIO("Lorem ipsum.")
509 fake_file_large
= StringIO("\n".join(
511 for __
in range(1000)))
513 default_scenario_params
= {
514 'open_scenario_name': 'okay',
515 'file_double_params': dict(
516 path
=file_path
, fake_file
=fake_file_minimal
),
522 'open_scenario_name': 'nonexist',
525 'open_scenario_name': 'exist_error',
527 'error-read-denied': {
528 'open_scenario_name': 'read_denied',
531 'file_double_params': dict(
532 path
=file_path
, fake_file
=fake_file_empty
),
535 'file_double_params': dict(
536 path
=file_path
, fake_file
=fake_file_empty
),
539 'file_double_params': dict(
540 path
=file_path
, fake_file
=fake_file_minimal
),
543 'file_double_params': dict(
544 path
=file_path
, fake_file
=fake_file_large
),
548 for (name
, scenario
) in scenarios
.items():
549 params
= default_scenario_params
.copy()
550 params
.update(scenario
)
551 scenario
.update(params
)
552 scenario
['file_double'] = FileDouble(**scenario
['file_double_params'])
553 scenario
['file_double'].set_open_scenario(params
['open_scenario_name'])
554 scenario
['fake_file_scenario_name'] = name
559 def get_file_doubles_from_fake_file_scenarios(scenarios
):
560 """ Get the `FileDouble` instances from fake file scenarios.
562 :param scenarios: Collection of fake file scenarios.
563 :return: Collection of `FileDouble` instances.
567 scenario
['file_double']
568 for scenario
in scenarios
569 if scenario
['file_double'] is not None)
574 def setup_file_double_behaviour(testcase
, doubles
=None):
575 """ Set up file double instances and behaviour.
577 :param testcase: The `TestCase` instance to modify.
578 :param doubles: Collection of `FileDouble` instances.
581 If `doubles` is ``None``, a default collection will be made
582 from the result of `make_fake_file_scenarios` result.
586 scenarios
= make_fake_file_scenarios()
587 doubles
= get_file_doubles_from_fake_file_scenarios(
590 for file_double
in doubles
:
591 file_double
.register_for_testcase(testcase
)
593 orig_open
= builtins
.open
595 def fake_open(path
, mode
='rt', buffering
=-1):
596 registry
= FileDouble
.get_registry_for_testcase(testcase
)
598 file_double
= registry
[path
]
599 result
= file_double
.builtins_open_scenario
.call_hook(
602 result
= orig_open(path
, mode
, buffering
)
605 mock_open
= mock
.mock_open()
606 mock_open
.side_effect
= fake_open
608 func_patcher
= mock
.patch
.object(
612 testcase
.addCleanup(func_patcher
.stop
)
615 def setup_fake_file_fixtures(testcase
):
616 """ Set up fixtures for fake file doubles.
618 :param testcase: The `TestCase` instance to modify.
622 scenarios
= make_fake_file_scenarios()
623 testcase
.fake_file_scenarios
= scenarios
625 file_doubles
= get_file_doubles_from_fake_file_scenarios(
627 setup_file_double_behaviour(testcase
, file_doubles
)
630 def set_fake_file_scenario(testcase
, name
):
631 """ Set the named fake file scenario for the test case. """
632 scenario
= testcase
.fake_file_scenarios
[name
]
633 testcase
.fake_file_scenario
= scenario
634 testcase
.file_double
= scenario
['file_double']
635 testcase
.file_double
.register_for_testcase(testcase
)
638 class TestDoubleFunctionScenario
:
639 """ Scenario for fake behaviour of a specific function. """
641 def __init__(self
, scenario_name
, double
):
642 self
.scenario_name
= scenario_name
645 self
.call_hook
= getattr(
646 self
, "_hook_{name}".format(name
=self
.scenario_name
))
650 "<{class_name} instance: {id}"
652 " call_hook name: {hook_name!r}"
653 " double: {double!r}"
655 class_name
=self
.__class
__.__name
__, id=id(self
),
656 name
=self
.scenario_name
, double
=self
.double
,
657 hook_name
=self
.call_hook
.__name
__)
660 def __eq__(self
, other
):
662 if not self
.scenario_name
== other
.scenario_name
:
664 if not self
.double
== other
.double
:
666 if not self
.call_hook
.__name
__ == other
.call_hook
.__name
__:
670 def __ne__(self
, other
):
671 result
= not self
.__eq
__(other
)
675 class os_path_exists_scenario(TestDoubleFunctionScenario
):
676 """ Scenario for `os.path.exists` behaviour. """
678 def _hook_exist(self
):
681 def _hook_not_exist(self
):
685 class os_access_scenario(TestDoubleFunctionScenario
):
686 """ Scenario for `os.access` behaviour. """
688 def _hook_okay(self
, mode
):
691 def _hook_not_exist(self
, mode
):
694 def _hook_read_only(self
, mode
):
695 if mode
& (os
.W_OK | os
.X_OK
):
701 def _hook_denied(self
, mode
):
702 if mode
& (os
.R_OK | os
.W_OK | os
.X_OK
):
709 class os_stat_scenario(TestDoubleFunctionScenario
):
710 """ Scenario for `os.stat` behaviour. """
712 def _hook_okay(self
):
713 return self
.double
.stat_result
715 def _hook_notfound_error(self
):
716 raise FileNotFoundError(
718 "No such file or directory: {path!r}".format(
719 path
=self
.double
.path
))
721 def _hook_denied_error(self
):
722 raise PermissionError(
727 class os_lstat_scenario(os_stat_scenario
):
728 """ Scenario for `os.lstat` behaviour. """
731 class os_unlink_scenario(TestDoubleFunctionScenario
):
732 """ Scenario for `os.unlink` behaviour. """
734 def _hook_okay(self
):
737 def _hook_nonexist(self
):
738 error
= FileNotFoundError(
740 "No such file or directory: {path!r}".format(
741 path
=self
.double
.path
))
744 def _hook_denied(self
):
745 error
= PermissionError(
751 class os_rmdir_scenario(TestDoubleFunctionScenario
):
752 """ Scenario for `os.rmdir` behaviour. """
754 def _hook_okay(self
):
757 def _hook_nonexist(self
):
758 error
= FileNotFoundError(
760 "No such file or directory: {path!r}".format(
761 path
=self
.double
.path
))
764 def _hook_denied(self
):
765 error
= PermissionError(
771 class shutil_rmtree_scenario(TestDoubleFunctionScenario
):
772 """ Scenario for `shutil.rmtree` behaviour. """
774 def _hook_okay(self
):
777 def _hook_nonexist(self
):
778 error
= FileNotFoundError(
780 "No such file or directory: {path!r}".format(
781 path
=self
.double
.path
))
784 def _hook_denied(self
):
785 error
= PermissionError(
791 class builtins_open_scenario(TestDoubleFunctionScenario
):
792 """ Scenario for `builtins.open` behaviour. """
794 def _hook_okay(self
, mode
, buffering
):
795 result
= self
.double
.fake_file
798 def _hook_nonexist(self
, mode
, buffering
):
799 if mode
.startswith('r'):
800 error
= FileNotFoundError(
802 "No such file or directory: {path!r}".format(
803 path
=self
.double
.path
))
805 result
= self
.double
.fake_file
808 def _hook_exist_error(self
, mode
, buffering
):
809 if mode
.startswith('w') or mode
.startswith('a'):
810 error
= FileExistsError(
812 "File already exists: {path!r}".format(
813 path
=self
.double
.path
))
815 result
= self
.double
.fake_file
818 def _hook_read_denied(self
, mode
, buffering
):
819 if mode
.startswith('r'):
820 error
= PermissionError(
822 "Read denied on {path!r}".format(
823 path
=self
.double
.path
))
825 result
= self
.double
.fake_file
828 def _hook_write_denied(self
, mode
, buffering
):
829 if mode
.startswith('w') or mode
.startswith('a'):
830 error
= PermissionError(
832 "Write denied on {path!r}".format(
833 path
=self
.double
.path
))
835 result
= self
.double
.fake_file
839 class TestDoubleWithRegistry
:
840 """ Abstract base class for a test double with a test case registry. """
842 registry_class
= NotImplemented
843 registries
= NotImplemented
845 function_scenario_params_by_class
= NotImplemented
847 def __new__(cls
, *args
, **kwargs
):
848 superclass
= super(TestDoubleWithRegistry
, cls
)
849 if superclass
.__new
__ is object.__new
__:
850 # The ‘object’ implementation complains about extra arguments.
851 instance
= superclass
.__new
__(cls
)
853 instance
= superclass
.__new
__(cls
, *args
, **kwargs
)
854 instance
.make_set_scenario_methods()
858 def __init__(self
, *args
, **kwargs
):
859 super(TestDoubleWithRegistry
, self
).__init
__(*args
, **kwargs
)
860 self
._set
_method
_per
_scenario
()
862 def _make_set_scenario_method(self
, scenario_class
, params
):
863 def method(self
, name
):
864 scenario
= scenario_class(name
, double
=self
)
865 setattr(self
, scenario_class
.__name
__, scenario
)
867 """ Set the scenario for `{name}` behaviour. """
868 ).format(name
=scenario_class
.__name
__)
869 method
.__name
__ = str(params
['set_scenario_method_name'])
872 def make_set_scenario_methods(self
):
873 """ Make `set_<scenario_class_name>` methods on this class. """
874 for (function_scenario_class
, function_scenario_params
) in (
875 self
.function_scenario_params_by_class
.items()):
876 method
= self
._make
_set
_scenario
_method
(
877 function_scenario_class
, function_scenario_params
)
878 setattr(self
.__class
__, method
.__name
__, method
)
879 function_scenario_params
['set_scenario_method'] = method
881 def _set_method_per_scenario(self
):
882 """ Set the method to be called for each scenario. """
883 for function_scenario_params
in (
884 self
.function_scenario_params_by_class
.values()):
885 function_scenario_params
['set_scenario_method'](
886 self
, function_scenario_params
['default_scenario_name'])
889 def get_registry_for_testcase(cls
, testcase
):
890 """ Get the FileDouble registry for the specified test case. """
891 # Key in a dict must be hashable.
892 key
= (testcase
.__class
__, id(testcase
))
893 registry
= cls
.registries
.setdefault(key
, cls
.registry_class())
896 def get_registry_key(self
):
897 """ Get the registry key for this double. """
898 raise NotImplementedError
900 def register_for_testcase(self
, testcase
):
901 """ Add this instance to registry for the specified testcase. """
902 registry
= self
.get_registry_for_testcase(testcase
)
903 key
= self
.get_registry_key()
905 unregister_func
= functools
.partial(
906 self
.unregister_for_testcase
, testcase
)
907 testcase
.addCleanup(unregister_func
)
909 def unregister_for_testcase(self
, testcase
):
910 """ Remove this instance from registry for the specified testcase. """
911 registry
= self
.get_registry_for_testcase(testcase
)
912 key
= self
.get_registry_key()
917 def copy_fake_file(fake_file
):
918 """ Make a copy of the StringIO instance. """
919 fake_file_type
= StringIO
921 if fake_file
is not None:
922 fake_file_type
= type(fake_file
)
923 content
= fake_file
.getvalue()
924 assert issubclass(fake_file_type
, object)
925 result
= fake_file_type(content
)
926 if hasattr(fake_file
, 'encoding'):
927 if not hasattr(result
, 'encoding'):
928 result
.encoding
= fake_file
.encoding
932 class FileDouble(TestDoubleWithRegistry
):
933 """ A testing double for a file. """
935 registry_class
= dict
938 function_scenario_params_by_class
= {
939 os_path_exists_scenario
: {
940 'default_scenario_name': 'not_exist',
941 'set_scenario_method_name': 'set_os_path_exists_scenario',
943 os_access_scenario
: {
944 'default_scenario_name': 'okay',
945 'set_scenario_method_name': 'set_os_access_scenario',
948 'default_scenario_name': 'okay',
949 'set_scenario_method_name': 'set_os_stat_scenario',
952 'default_scenario_name': 'okay',
953 'set_scenario_method_name': 'set_os_lstat_scenario',
955 builtins_open_scenario
: {
956 'default_scenario_name': 'okay',
957 'set_scenario_method_name': 'set_open_scenario',
959 os_unlink_scenario
: {
960 'default_scenario_name': 'okay',
961 'set_scenario_method_name': 'set_os_unlink_scenario',
964 'default_scenario_name': 'okay',
965 'set_scenario_method_name': 'set_os_rmdir_scenario',
967 shutil_rmtree_scenario
: {
968 'default_scenario_name': 'okay',
969 'set_scenario_method_name': 'set_shutil_rmtree_scenario',
973 def __init__(self
, path
=None, fake_file
=None, *args
, **kwargs
):
975 self
.fake_file
= copy_fake_file(fake_file
)
976 self
.fake_file
.name
= path
978 self
._set
_stat
_result
()
980 super(FileDouble
, self
).__init
__(*args
, **kwargs
)
982 def _set_stat_result(self
):
983 """ Set the `os.stat` result for this file. """
984 size
= len(self
.fake_file
.getvalue())
985 self
.stat_result
= StatResult(
987 st_ino
=None, st_dev
=None, st_nlink
=None,
990 st_atime
=None, st_mtime
=None, st_ctime
=None,
994 text
= "FileDouble(path={path!r}, fake_file={fake_file!r})".format(
995 path
=self
.path
, fake_file
=self
.fake_file
)
998 def get_registry_key(self
):
999 """ Get the registry key for this double. """
1004 class os_popen_scenario(TestDoubleFunctionScenario
):
1005 """ Scenario for `os.popen` behaviour. """
1007 stream_name_by_mode
= {
1012 def _hook_success(self
, argv
, mode
, buffering
):
1013 stream_name
= self
.stream_name_by_mode
[mode
]
1014 stream_double
= getattr(
1015 self
.double
, stream_name
+ '_double')
1016 result
= stream_double
.fake_file
1019 def _hook_failure(self
, argv
, mode
, buffering
):
1023 def _hook_not_found(self
, argv
, mode
, buffering
):
1028 class os_waitpid_scenario(TestDoubleFunctionScenario
):
1029 """ Scenario for `os.waitpid` behaviour. """
1031 def _hook_success(self
, pid
, options
):
1032 result
= (pid
, EXIT_STATUS_SUCCESS
)
1035 def _hook_failure(self
, pid
, options
):
1036 result
= (pid
, EXIT_STATUS_FAILURE
)
1039 def _hook_not_found(self
, pid
, options
):
1040 error
= OSError(errno
.ECHILD
)
1044 class os_system_scenario(TestDoubleFunctionScenario
):
1045 """ Scenario for `os.system` behaviour. """
1047 def _hook_success(self
, command
):
1048 result
= EXIT_STATUS_SUCCESS
1051 def _hook_failure(self
, command
):
1052 result
= EXIT_STATUS_FAILURE
1055 def _hook_not_found(self
, command
):
1056 result
= EXIT_STATUS_COMMAND_NOT_FOUND
1060 class os_spawnv_scenario(TestDoubleFunctionScenario
):
1061 """ Scenario for `os.spawnv` behaviour. """
1063 def _hook_success(self
, mode
, file, args
):
1064 result
= EXIT_STATUS_SUCCESS
1067 def _hook_failure(self
, mode
, file, args
):
1068 result
= EXIT_STATUS_FAILURE
1071 def _hook_not_found(self
, mode
, file, args
):
1072 result
= EXIT_STATUS_COMMAND_NOT_FOUND
1081 """ A testing double for `subprocess.Popen`. """
1083 def __init__(self
, args
, *posargs
, **kwargs
):
1088 self
.returncode
= None
1090 if kwargs
.get('shell', False):
1091 self
.argv
= shlex
.split(args
)
1093 # The paramter is already a sequence of command-line arguments.
1096 def set_streams(self
, subprocess_double
, popen_kwargs
):
1097 """ Set the streams on the `PopenDouble`.
1099 :param subprocess_double: The `SubprocessDouble` from
1100 which to get existing stream doubles.
1101 :param popen_kwargs: The keyword arguments to the
1102 `subprocess.Popen` call.
1106 for stream_name
in (
1107 name
for name
in ['stdin', 'stdout', 'stderr']
1108 if name
in popen_kwargs
):
1109 stream_spec
= popen_kwargs
[stream_name
]
1110 if stream_spec
is subprocess
.PIPE
:
1111 stream_double
= getattr(
1113 "{name}_double".format(name
=stream_name
))
1114 stream_file
= stream_double
.fake_file
1115 elif stream_spec
is subprocess
.STDOUT
:
1116 stream_file
= subprocess_double
.stdout_double
.fake_file
1118 stream_file
= stream_spec
1119 setattr(self
, stream_name
, stream_file
)
1122 """ Wait for subprocess to terminate. """
1123 return self
.returncode
1126 class subprocess_popen_scenario(TestDoubleFunctionScenario
):
1127 """ Scenario for `subprocess.Popen` behaviour. """
1129 def _hook_success(self
, testcase
, args
, *posargs
, **kwargs
):
1130 double
= self
.double
.popen_double
1131 double
.set_streams(self
.double
, kwargs
)
1135 def patch_subprocess_popen(testcase
):
1136 """ Patch `subprocess.Popen` constructor for this test case.
1138 :param testcase: The `TestCase` instance to modify.
1141 When the patched function is called, the registry of
1142 `SubprocessDouble` instances for this test case will be used
1143 to get the instance for the program path specified.
1146 orig_subprocess_popen
= subprocess
.Popen
1148 def fake_subprocess_popen(args
, *posargs
, **kwargs
):
1149 if kwargs
.get('shell', False):
1150 argv
= shlex
.split(args
)
1153 registry
= SubprocessDouble
.get_registry_for_testcase(testcase
)
1154 if argv
in registry
:
1155 subprocess_double
= registry
[argv
]
1156 result
= subprocess_double
.subprocess_popen_scenario
.call_hook(
1157 testcase
, args
, *posargs
, **kwargs
)
1159 result
= orig_subprocess_popen(args
, *posargs
, **kwargs
)
1162 func_patcher
= mock
.patch
.object(
1163 subprocess
, "Popen", side_effect
=fake_subprocess_popen
)
1164 func_patcher
.start()
1165 testcase
.addCleanup(func_patcher
.stop
)
1168 class subprocess_call_scenario(TestDoubleFunctionScenario
):
1169 """ Scenario for `subprocess.call` behaviour. """
1171 def _hook_success(self
, command
):
1172 result
= EXIT_STATUS_SUCCESS
1175 def _hook_failure(self
, command
):
1176 result
= EXIT_STATUS_FAILURE
1179 def _hook_not_found(self
, command
):
1180 result
= EXIT_STATUS_COMMAND_NOT_FOUND
1184 def patch_subprocess_call(testcase
):
1185 """ Patch `subprocess.call` function for this test case.
1187 :param testcase: The `TestCase` instance to modify.
1190 When the patched function is called, the registry of
1191 `SubprocessDouble` instances for this test case will be used
1192 to get the instance for the program path specified.
1195 orig_subprocess_call
= subprocess
.call
1197 def fake_subprocess_call(command
, *posargs
, **kwargs
):
1198 if kwargs
.get('shell', False):
1199 command_argv
= shlex
.split(command
)
1201 command_argv
= command
1202 registry
= SubprocessDouble
.get_registry_for_testcase(testcase
)
1203 if command_argv
in registry
:
1204 subprocess_double
= registry
[command_argv
]
1205 result
= subprocess_double
.subprocess_call_scenario
.call_hook(
1208 result
= orig_subprocess_call(command
, *posargs
, **kwargs
)
1211 func_patcher
= mock
.patch
.object(
1212 subprocess
, "call", side_effect
=fake_subprocess_call
)
1213 func_patcher
.start()
1214 testcase
.addCleanup(func_patcher
.stop
)
1217 class subprocess_check_call_scenario(TestDoubleFunctionScenario
):
1218 """ Scenario for `subprocess.check_call` behaviour. """
1220 def _hook_success(self
, command
):
1223 def _hook_failure(self
, command
):
1224 result
= EXIT_STATUS_FAILURE
1225 error
= subprocess
.CalledProcessError(result
, command
)
1228 def _hook_not_found(self
, command
):
1229 result
= EXIT_STATUS_COMMAND_NOT_FOUND
1230 error
= subprocess
.CalledProcessError(result
, command
)
1234 def patch_subprocess_check_call(testcase
):
1235 """ Patch `subprocess.check_call` function for this test case.
1237 :param testcase: The `TestCase` instance to modify.
1240 When the patched function is called, the registry of
1241 `SubprocessDouble` instances for this test case will be used
1242 to get the instance for the program path specified.
1245 orig_subprocess_check_call
= subprocess
.check_call
1247 def fake_subprocess_check_call(command
, *posargs
, **kwargs
):
1248 if kwargs
.get('shell', False):
1249 command_argv
= shlex
.split(command
)
1251 command_argv
= command
1252 registry
= SubprocessDouble
.get_registry_for_testcase(testcase
)
1253 if command_argv
in registry
:
1254 subprocess_double
= registry
[command_argv
]
1255 scenario
= subprocess_double
.subprocess_check_call_scenario
1256 result
= scenario
.call_hook(command
)
1258 result
= orig_subprocess_check_call(command
, *posargs
, **kwargs
)
1261 func_patcher
= mock
.patch
.object(
1262 subprocess
, "check_call", side_effect
=fake_subprocess_check_call
)
1263 func_patcher
.start()
1264 testcase
.addCleanup(func_patcher
.stop
)
1267 class SubprocessDoubleRegistry(collections_abc
.MutableMapping
):
1268 """ Registry of `SubprocessDouble` instances by `argv`. """
1270 def __init__(self
, *args
, **kwargs
):
1273 if isinstance(args
[0], collections_abc
.Mapping
):
1274 items
= args
[0].items()
1275 if isinstance(args
[0], collections_abc
.Iterable
):
1277 self
._mapping
= dict(items
)
1280 text
= "<{class_name} object: {mapping}>".format(
1281 class_name
=self
.__class
__.__name
__, mapping
=self
._mapping
)
1284 def _match_argv(self
, argv
):
1285 """ Match the specified `argv` with our registered keys. """
1287 if not isinstance(argv
, collections_abc
.Sequence
):
1289 candidates
= iter(self
._mapping
)
1290 while match
is None:
1292 candidate
= next(candidates
)
1293 except StopIteration:
1296 if candidate
== argv
:
1299 word_iter
= enumerate(candidate
)
1300 while found
is None:
1302 (word_index
, candidate_word
) = next(word_iter
)
1303 except StopIteration:
1305 if candidate_word
is ARG_MORE
:
1306 # Candiate matches any remaining words. We have a match.
1308 elif word_index
> len(argv
):
1309 # Candidate is too long for the specified argv.
1311 elif candidate_word
is ARG_ANY
:
1312 # Candidate matches any word at this position.
1314 elif candidate_word
== argv
[word_index
]:
1315 # Candidate matches the word at this position.
1318 # This candidate does not match.
1321 # Reached the end of the candidate without a mismatch.
1327 def __getitem__(self
, key
):
1328 match
= self
._match
_argv
(key
)
1331 result
= self
._mapping
[match
]
1334 def __setitem__(self
, key
, value
):
1337 self
._mapping
[key
] = value
1339 def __delitem__(self
, key
):
1340 match
= self
._match
_argv
(key
)
1341 if match
is not None:
1342 del self
._mapping
[match
]
1345 return self
._mapping
.__iter
__()
1348 return self
._mapping
.__len
__()
1351 class SubprocessDouble(TestDoubleWithRegistry
):
1352 """ A testing double for a subprocess. """
1354 registry_class
= SubprocessDoubleRegistry
1357 double_by_pid
= weakref
.WeakValueDictionary()
1359 function_scenario_params_by_class
= {
1360 subprocess_popen_scenario
: {
1361 'default_scenario_name': 'success',
1362 'set_scenario_method_name': 'set_subprocess_popen_scenario',
1364 subprocess_call_scenario
: {
1365 'default_scenario_name': 'success',
1366 'set_scenario_method_name': 'set_subprocess_call_scenario',
1368 subprocess_check_call_scenario
: {
1369 'default_scenario_name': 'success',
1370 'set_scenario_method_name':
1371 'set_subprocess_check_call_scenario',
1373 os_popen_scenario
: {
1374 'default_scenario_name': 'success',
1375 'set_scenario_method_name': 'set_os_popen_scenario',
1377 os_waitpid_scenario
: {
1378 'default_scenario_name': 'success',
1379 'set_scenario_method_name': 'set_os_waitpid_scenario',
1381 os_system_scenario
: {
1382 'default_scenario_name': 'success',
1383 'set_scenario_method_name': 'set_os_system_scenario',
1385 os_spawnv_scenario
: {
1386 'default_scenario_name': 'success',
1387 'set_scenario_method_name': 'set_os_spawnv_scenario',
1391 def __init__(self
, path
=None, argv
=None, *args
, **kwargs
):
1393 path
= tempfile
.mktemp()
1397 command_name
= os
.path
.basename(path
)
1398 argv
= [command_name
]
1401 self
.pid
= self
._make
_pid
()
1402 self
._register
_by
_pid
()
1404 self
.set_popen_double()
1406 stream_class
= SubprocessDouble
.stream_class
1407 for stream_name
in ['stdin', 'stdout', 'stderr']:
1408 fake_file
= stream_class()
1409 file_double
= FileDouble(fake_file
=fake_file
)
1410 stream_double_name
= '{name}_double'.format(name
=stream_name
)
1411 setattr(self
, stream_double_name
, file_double
)
1413 super(SubprocessDouble
, self
).__init
__(*args
, **kwargs
)
1415 def set_popen_double(self
):
1416 """ Set the `PopenDouble` for this instance. """
1417 double
= PopenDouble(self
.argv
)
1418 double
.pid
= self
.pid
1420 self
.popen_double
= double
1424 "<SubprocessDouble instance: {id}"
1427 " stdin_double: {stdin_double!r}"
1428 " stdout_double: {stdout_double!r}"
1429 " stderr_double: {stderr_double!r}"
1432 path
=self
.path
, argv
=self
.argv
,
1433 stdin_double
=self
.stdin_double
,
1434 stdout_double
=self
.stdout_double
,
1435 stderr_double
=self
.stderr_double
)
1440 """ Make a unique PID for a subprocess. """
1441 for pid
in itertools
.count(1):
1444 def _register_by_pid(self
):
1445 """ Register this subprocess by its PID. """
1446 self
.__class
__.double_by_pid
[self
.pid
] = self
1448 def get_registry_key(self
):
1449 """ Get the registry key for this double. """
1450 result
= tuple(self
.argv
)
1453 stream_class
= io
.BytesIO
1454 stream_encoding
= "utf-8"
1456 def set_stdin_content(self
, text
, bytes_encoding
=stream_encoding
):
1457 """ Set the content of the `stdin` stream for this double. """
1458 content
= text
.encode(bytes_encoding
)
1459 fake_file
= self
.stream_class(content
)
1460 self
.stdin_double
.fake_file
= fake_file
1462 def set_stdout_content(self
, text
, bytes_encoding
=stream_encoding
):
1463 """ Set the content of the `stdout` stream for this double. """
1464 content
= text
.encode(bytes_encoding
)
1465 fake_file
= self
.stream_class(content
)
1466 self
.stdout_double
.fake_file
= fake_file
1468 def set_stderr_content(self
, text
, bytes_encoding
=stream_encoding
):
1469 """ Set the content of the `stderr` stream for this double. """
1470 content
= text
.encode(bytes_encoding
)
1471 fake_file
= self
.stream_class(content
)
1472 self
.stderr_double
.fake_file
= fake_file
1475 def make_fake_subprocess_scenarios(path
=None):
1476 """ Make a collection of scenarios for testing with fake files.
1478 :path: The filesystem path of the fake program. If not specified,
1479 a valid random path will be generated.
1480 :return: A collection of scenarios for tests involving subprocesses.
1482 The collection is a mapping from scenario name to a dictionary of
1483 scenario attributes.
1487 file_path
= tempfile
.mktemp()
1491 default_scenario_params
= {
1492 'return_value': EXIT_STATUS_SUCCESS
,
1493 'program_path': file_path
,
1494 'argv_after_command_name': [],
1500 'return_value': EXIT_STATUS_COMMAND_NOT_FOUND
,
1504 for (name
, scenario
) in scenarios
.items():
1505 params
= default_scenario_params
.copy()
1506 params
.update(scenario
)
1507 scenario
.update(params
)
1508 program_path
= params
['program_path']
1509 program_name
= os
.path
.basename(params
['program_path'])
1510 argv
= [program_name
]
1511 argv
.extend(params
['argv_after_command_name'])
1512 subprocess_double_params
= dict(
1516 subprocess_double
= SubprocessDouble(**subprocess_double_params
)
1517 scenario
['subprocess_double'] = subprocess_double
1518 scenario
['fake_file_scenario_name'] = name
1523 def get_subprocess_doubles_from_fake_subprocess_scenarios(scenarios
):
1524 """ Get the `SubprocessDouble` instances from fake subprocess scenarios.
1526 :param scenarios: Collection of fake subprocess scenarios.
1527 :return: Collection of `SubprocessDouble` instances.
1531 scenario
['subprocess_double']
1532 for scenario
in scenarios
1533 if scenario
['subprocess_double'] is not None)
1538 def setup_subprocess_double_behaviour(testcase
, doubles
=None):
1539 """ Set up subprocess double instances and behaviour.
1541 :param testcase: The `TestCase` instance to modify.
1542 :param doubles: Collection of `SubprocessDouble` instances.
1545 If `doubles` is ``None``, a default collection will be made
1546 from the return value of `make_fake_subprocess_scenarios`.
1550 scenarios
= make_fake_subprocess_scenarios()
1551 doubles
= get_subprocess_doubles_from_fake_subprocess_scenarios(
1554 for double
in doubles
:
1555 double
.register_for_testcase(testcase
)
1558 def setup_fake_subprocess_fixtures(testcase
):
1559 """ Set up fixtures for fake subprocess doubles.
1561 :param testcase: The `TestCase` instance to modify.
1565 scenarios
= make_fake_subprocess_scenarios()
1566 testcase
.fake_subprocess_scenarios
= scenarios
1568 doubles
= get_subprocess_doubles_from_fake_subprocess_scenarios(
1570 setup_subprocess_double_behaviour(testcase
, doubles
)
1573 def patch_os_popen(testcase
):
1574 """ Patch `os.popen` behaviour for this test case.
1576 :param testcase: The `TestCase` instance to modify.
1579 When the patched function is called, the registry of
1580 `SubprocessDouble` instances for this test case will be used
1581 to get the instance for the program path specified.
1584 orig_os_popen
= os
.popen
1586 def fake_os_popen(cmd
, mode
='r', buffering
=-1):
1587 registry
= SubprocessDouble
.get_registry_for_testcase(testcase
)
1588 if isinstance(cmd
, basestring
):
1589 command_argv
= shlex
.split(cmd
)
1592 if command_argv
in registry
:
1593 subprocess_double
= registry
[command_argv
]
1594 result
= subprocess_double
.os_popen_scenario
.call_hook(
1595 command_argv
, mode
, buffering
)
1597 result
= orig_os_popen(cmd
, mode
, buffering
)
1600 func_patcher
= mock
.patch
.object(
1601 os
, "popen", side_effect
=fake_os_popen
)
1602 func_patcher
.start()
1603 testcase
.addCleanup(func_patcher
.stop
)
1606 def patch_os_waitpid(testcase
):
1607 """ Patch `os.waitpid` behaviour for this test case.
1609 :param testcase: The `TestCase` instance to modify.
1612 When the patched function is called, the registry of
1613 `SubprocessDouble` instances for this test case will be used
1614 to get the instance for the program path specified.
1617 orig_os_waitpid
= os
.waitpid
1619 def fake_os_waitpid(pid
, options
):
1620 registry
= SubprocessDouble
.double_by_pid
1622 subprocess_double
= registry
[pid
]
1623 result
= subprocess_double
.os_waitpid_scenario
.call_hook(
1626 result
= orig_os_waitpid(pid
, options
)
1629 func_patcher
= mock
.patch
.object(
1630 os
, "waitpid", side_effect
=fake_os_waitpid
)
1631 func_patcher
.start()
1632 testcase
.addCleanup(func_patcher
.stop
)
1635 def patch_os_system(testcase
):
1636 """ Patch `os.system` behaviour for this test case.
1638 :param testcase: The `TestCase` instance to modify.
1641 When the patched function is called, the registry of
1642 `SubprocessDouble` instances for this test case will be used
1643 to get the instance for the program path specified.
1646 orig_os_system
= os
.system
1648 def fake_os_system(command
):
1649 registry
= SubprocessDouble
.get_registry_for_testcase(testcase
)
1650 command_argv
= shlex
.split(command
)
1651 if command_argv
in registry
:
1652 subprocess_double
= registry
[command_argv
]
1653 result
= subprocess_double
.os_system_scenario
.call_hook(
1656 result
= orig_os_system(command
)
1659 func_patcher
= mock
.patch
.object(
1660 os
, "system", side_effect
=fake_os_system
)
1661 func_patcher
.start()
1662 testcase
.addCleanup(func_patcher
.stop
)
1665 def patch_os_spawnv(testcase
):
1666 """ Patch `os.spawnv` behaviour for this test case.
1668 :param testcase: The `TestCase` instance to modify.
1671 When the patched function is called, the registry of
1672 `SubprocessDouble` instances for this test case will be used
1673 to get the instance for the program path specified.
1676 orig_os_spawnv
= os
.spawnv
1678 def fake_os_spawnv(mode
, file, args
):
1679 registry
= SubprocessDouble
.get_registry_for_testcase(testcase
)
1680 registry_key
= tuple(args
)
1681 if registry_key
in registry
:
1682 subprocess_double
= registry
[registry_key
]
1683 result
= subprocess_double
.os_spawnv_scenario
.call_hook(
1686 result
= orig_os_spawnv(mode
, file, args
)
1689 func_patcher
= mock
.patch
.object(
1690 os
, "spawnv", side_effect
=fake_os_spawnv
)
1691 func_patcher
.start()
1692 testcase
.addCleanup(func_patcher
.stop
)
1699 # vim: fileencoding=utf-8 filetype=python :