Make relative symlinks with minimal traversal.
[dput.git] / test / helper.py
blob23d69d6678bd990c7d12e3e42ebd2f131aa97ae3
1 # -*- coding: utf-8; -*-
3 # test/helper.py
4 # Part of ‘dput’, a Debian package upload toolkit.
6 # Copyright © 2015 Ben Finney <ben+python@benfinney.id.au>
8 # This is free software: you may copy, modify, and/or distribute this work
9 # under the terms of the GNU General Public License as published by the
10 # Free Software Foundation; version 3 of that license or any later version.
11 # No warranty expressed or implied. See the file ‘LICENSE.GPL-3’ for details.
13 """ Helper functionality for Dput test suite. """
15 from __future__ import (absolute_import, unicode_literals)
17 import sys
19 if sys.version_info >= (3, 3):
20 import builtins
21 import unittest
22 import unittest.mock as mock
23 from io import StringIO as StringIO
24 import configparser
25 import collections.abc as collections_abc
26 elif sys.version_info >= (3, 0):
27 raise RuntimeError("Python 3 earlier than 3.3 is not supported.")
28 elif sys.version_info >= (2, 7):
29 # Python 2 standard library.
30 import __builtin__ as builtins
31 # Third-party backport of Python 3 unittest improvements.
32 import unittest2 as unittest
33 # Third-party mock library.
34 import mock
35 # Python 2 standard library.
36 from StringIO import StringIO as BaseStringIO
37 import ConfigParser as configparser
38 import collections as collections_abc
39 else:
40 raise RuntimeError("Python earlier than 2.7 is not supported.")
42 import os
43 import os.path
44 import tempfile
45 import pwd
46 import errno
47 import time
48 import signal
49 import subprocess
50 import functools
51 import itertools
52 import base64
53 import collections
54 import weakref
55 import shlex
57 __package__ = str("test")
58 __import__(__package__)
60 __metaclass__ = type
63 def make_unique_slug(testcase):
64 """ Make a unique slug for the test case. """
65 text = base64.b64encode(
66 testcase.getUniqueString().encode('utf-8')
67 ).decode('utf-8')
68 result = text[-30:]
69 return result
72 try:
73 StringIO
74 except NameError:
75 # We don't yet have the StringIO we want. Create it.
77 class StringIO(BaseStringIO, object):
78 """ StringIO with a context manager. """
80 def __enter__(self):
81 return self
83 def __exit__(self, *args):
84 self.close()
85 return False
88 def patch_stdout(testcase):
89 """ Patch `sys.stdout` for the specified test case. """
90 patcher = mock.patch.object(
91 sys, 'stdout', wraps=StringIO())
92 patcher.start()
93 testcase.addCleanup(patcher.stop)
96 def patch_stderr(testcase):
97 """ Patch `sys.stderr` for the specified test case. """
98 patcher = mock.patch.object(
99 sys, 'stderr', wraps=StringIO())
100 patcher.start()
101 testcase.addCleanup(patcher.stop)
104 def patch_signal_signal(testcase):
105 """ Patch `signal.signal` for the specified test case. """
106 func_patcher = mock.patch.object(signal, 'signal')
107 func_patcher.start()
108 testcase.addCleanup(func_patcher.stop)
111 class FakeSystemExit(Exception):
112 """ Fake double for `SystemExit` exception. """
115 EXIT_STATUS_SUCCESS = 0
116 EXIT_STATUS_FAILURE = 1
117 EXIT_STATUS_COMMAND_NOT_FOUND = 127
120 def patch_sys_exit(testcase):
121 """ Patch `sys.exit` for the specified test case. """
122 func_patcher = mock.patch.object(
123 sys, 'exit',
124 side_effect=FakeSystemExit())
125 func_patcher.start()
126 testcase.addCleanup(func_patcher.stop)
129 def patch_system_interfaces(testcase):
130 """ Patch system interfaces that are disruptive to the test runner. """
131 patch_stdout(testcase)
132 patch_stderr(testcase)
133 patch_sys_exit(testcase)
136 def patch_time_time(testcase, values=None):
137 """ Patch the `time.time` function for the specified test case.
139 :param testcase: The `TestCase` instance for binding to the patch.
140 :param values: An iterable to provide return values.
141 :return: None.
144 if values is None:
145 values = itertools.count()
147 def generator_fake_time():
148 while True:
149 yield next(values)
151 func_patcher = mock.patch.object(time, "time")
152 func_patcher.start()
153 testcase.addCleanup(func_patcher.stop)
155 time.time.side_effect = generator_fake_time()
158 def patch_os_environ(testcase):
159 """ Patch the `os.environ` mapping. """
160 if not hasattr(testcase, 'os_environ'):
161 testcase.os_environ = {}
162 patcher = mock.patch.object(os, "environ", new=testcase.os_environ)
163 patcher.start()
164 testcase.addCleanup(patcher.stop)
167 def patch_os_getpid(testcase):
168 """ Patch `os.getpid` for the specified test case. """
169 func_patcher = mock.patch.object(os, 'getpid')
170 func_patcher.start()
171 testcase.addCleanup(func_patcher.stop)
174 def patch_os_getuid(testcase):
175 """ Patch the `os.getuid` function. """
176 if not hasattr(testcase, 'os_getuid_return_value'):
177 testcase.os_getuid_return_value = testcase.getUniqueInteger()
178 func_patcher = mock.patch.object(
179 os, "getuid", return_value=testcase.os_getuid_return_value)
180 func_patcher.start()
181 testcase.addCleanup(func_patcher.stop)
184 PasswdEntry = collections.namedtuple(
185 "PasswdEntry",
186 "pw_name pw_passwd pw_uid pw_gid pw_gecos pw_dir pw_shell")
189 def patch_pwd_getpwuid(testcase):
190 """ Patch the `pwd.getpwuid` function. """
191 if not hasattr(testcase, 'pwd_getpwuid_return_value'):
192 testcase.pwd_getpwuid_return_value = PasswdEntry(
193 pw_name=make_unique_slug(testcase),
194 pw_passwd=make_unique_slug(testcase),
195 pw_uid=testcase.getUniqueInteger(),
196 pw_gid=testcase.getUniqueInteger(),
197 pw_gecos=testcase.getUniqueString(),
198 pw_dir=tempfile.mktemp(),
199 pw_shell=tempfile.mktemp())
200 if not isinstance(testcase.pwd_getpwuid_return_value, pwd.struct_passwd):
201 pwent = pwd.struct_passwd(testcase.pwd_getpwuid_return_value)
202 else:
203 pwent = testcase.pwd_getpwuid_return_value
204 func_patcher = mock.patch.object(pwd, "getpwuid", return_value=pwent)
205 func_patcher.start()
206 testcase.addCleanup(func_patcher.stop)
209 def patch_os_path_exists(testcase):
210 """ Patch `os.path.exists` behaviour for this test case.
212 When the patched function is called, the registry of
213 `FileDouble` instances for this test case will be used to get
214 the instance for the path specified.
217 orig_os_path_exists = os.path.exists
219 def fake_os_path_exists(path):
220 registry = FileDouble.get_registry_for_testcase(testcase)
221 if path in registry:
222 file_double = registry[path]
223 result = file_double.os_path_exists_scenario.call_hook()
224 else:
225 result = orig_os_path_exists(path)
226 return result
228 func_patcher = mock.patch.object(
229 os.path, 'exists', side_effect=fake_os_path_exists)
230 func_patcher.start()
231 testcase.addCleanup(func_patcher.stop)
234 def patch_os_access(testcase):
235 """ Patch `os.access` behaviour for this test case.
237 When the patched function is called, the registry of
238 `FileDouble` instances for this test case will be used to get
239 the instance for the path specified.
242 orig_os_access = os.access
244 def fake_os_access(path, mode):
245 registry = FileDouble.get_registry_for_testcase(testcase)
246 if path in registry:
247 file_double = registry[path]
248 result = file_double.os_access_scenario.call_hook(mode)
249 else:
250 result = orig_os_access(path, mode)
251 return result
253 func_patcher = mock.patch.object(
254 os, 'access', side_effect=fake_os_access)
255 func_patcher.start()
256 testcase.addCleanup(func_patcher.stop)
259 StatResult = collections.namedtuple(
260 'StatResult', [
261 'st_mode',
262 'st_ino', 'st_dev', 'st_nlink',
263 'st_uid', 'st_gid',
264 'st_size',
265 'st_atime', 'st_mtime', 'st_ctime',
269 def patch_os_stat(testcase):
270 """ Patch `os.stat` behaviour for this test case.
272 When the patched function is called, the registry of
273 `FileDouble` instances for this test case will be used to get
274 the instance for the path specified.
277 orig_os_stat = os.stat
279 def fake_os_stat(path):
280 registry = FileDouble.get_registry_for_testcase(testcase)
281 if path in registry:
282 file_double = registry[path]
283 result = file_double.os_stat_scenario.call_hook()
284 else:
285 result = orig_os_stat(path)
286 return result
288 func_patcher = mock.patch.object(
289 os, 'stat', side_effect=fake_os_stat)
290 func_patcher.start()
291 testcase.addCleanup(func_patcher.stop)
294 def patch_os_lstat(testcase):
295 """ Patch `os.lstat` behaviour for this test case.
297 When the patched function is called, the registry of
298 `FileDouble` instances for this test case will be used to get
299 the instance for the path specified.
302 orig_os_lstat = os.lstat
304 def fake_os_lstat(path):
305 registry = FileDouble.get_registry_for_testcase(testcase)
306 if path in registry:
307 file_double = registry[path]
308 result = file_double.os_lstat_scenario.call_hook()
309 else:
310 result = orig_os_lstat(path)
311 return result
313 func_patcher = mock.patch.object(
314 os, 'lstat', side_effect=fake_os_lstat)
315 func_patcher.start()
316 testcase.addCleanup(func_patcher.stop)
319 def patch_os_unlink(testcase):
320 """ Patch `os.unlink` behaviour for this test case.
322 When the patched function is called, the registry of
323 `FileDouble` instances for this test case will be used to get
324 the instance for the path specified.
327 orig_os_unlink = os.unlink
329 def fake_os_unlink(path):
330 registry = FileDouble.get_registry_for_testcase(testcase)
331 if path in registry:
332 file_double = registry[path]
333 result = file_double.os_unlink_scenario.call_hook()
334 else:
335 result = orig_os_unlink(path)
336 return result
338 func_patcher = mock.patch.object(
339 os, 'unlink', side_effect=fake_os_unlink)
340 func_patcher.start()
341 testcase.addCleanup(func_patcher.stop)
344 def patch_os_rmdir(testcase):
345 """ Patch `os.rmdir` behaviour for this test case.
347 When the patched function is called, the registry of
348 `FileDouble` instances for this test case will be used to get
349 the instance for the path specified.
352 orig_os_rmdir = os.rmdir
354 def fake_os_rmdir(path):
355 registry = FileDouble.get_registry_for_testcase(testcase)
356 if path in registry:
357 file_double = registry[path]
358 result = file_double.os_rmdir_scenario.call_hook()
359 else:
360 result = orig_os_rmdir(path)
361 return result
363 func_patcher = mock.patch.object(
364 os, 'rmdir', side_effect=fake_os_rmdir)
365 func_patcher.start()
366 testcase.addCleanup(func_patcher.stop)
369 def patch_tempfile_mkdtemp(testcase):
370 """ Patch the `tempfile.mkdtemp` function for this test case. """
371 if not hasattr(testcase, 'tempfile_mkdtemp_file_double'):
372 testcase.tempfile_mkdtemp_file_double = FileDouble(tempfile.mktemp())
374 double = testcase.tempfile_mkdtemp_file_double
375 double.set_os_unlink_scenario('okay')
376 double.set_os_rmdir_scenario('okay')
377 double.register_for_testcase(testcase)
379 func_patcher = mock.patch.object(tempfile, "mkdtemp")
380 func_patcher.start()
381 testcase.addCleanup(func_patcher.stop)
383 tempfile.mkdtemp.return_value = testcase.tempfile_mkdtemp_file_double.path
386 try:
387 FileNotFoundError
388 FileExistsError
389 PermissionError
390 except NameError:
391 # Python 2 uses IOError.
392 def _ensure_ioerror_args(init_args, init_kwargs, errno_value):
393 result_kwargs = init_kwargs
394 result_errno = errno_value
395 result_strerror = os.strerror(errno_value)
396 result_filename = None
397 if len(init_args) >= 3:
398 result_errno = init_args[0]
399 result_filename = init_args[2]
400 if 'errno' in init_kwargs:
401 result_errno = init_kwargs['errno']
402 del result_kwargs['errno']
403 if 'filename' in init_kwargs:
404 result_filename = init_kwargs['filename']
405 del result_kwargs['filename']
406 if len(init_args) >= 2:
407 result_strerror = init_args[1]
408 if 'strerror' in init_kwargs:
409 result_strerror = init_kwargs['strerror']
410 del result_kwargs['strerror']
411 result_args = (result_errno, result_strerror, result_filename)
412 return (result_args, result_kwargs)
414 class FileNotFoundError(IOError):
415 def __init__(self, *args, **kwargs):
416 (args, kwargs) = _ensure_ioerror_args(
417 args, kwargs, errno_value=errno.ENOENT)
418 super(FileNotFoundError, self).__init__(*args, **kwargs)
420 class FileExistsError(IOError):
421 def __init__(self, *args, **kwargs):
422 (args, kwargs) = _ensure_ioerror_args(
423 args, kwargs, errno_value=errno.EEXIST)
424 super(FileExistsError, self).__init__(*args, **kwargs)
426 class PermissionError(IOError):
427 def __init__(self, *args, **kwargs):
428 (args, kwargs) = _ensure_ioerror_args(
429 args, kwargs, errno_value=errno.EPERM)
430 super(PermissionError, self).__init__(*args, **kwargs)
433 def make_fake_file_scenarios(path=None):
434 """ Make a collection of scenarios for testing with fake files.
436 :path: The filesystem path of the fake file. If not specified,
437 a valid random path will be generated.
438 :return: A collection of scenarios for tests involving input files.
440 The collection is a mapping from scenario name to a dictionary of
441 scenario attributes.
445 if path is None:
446 file_path = tempfile.mktemp()
447 else:
448 file_path = path
450 fake_file_empty = StringIO()
451 fake_file_minimal = StringIO("Lorem ipsum.")
452 fake_file_large = StringIO("\n".join(
453 "ABCDEFGH" * 100
454 for __ in range(1000)))
456 default_scenario_params = {
457 'open_scenario_name': 'okay',
458 'file_double_params': dict(
459 path=file_path, fake_file=fake_file_minimal),
462 scenarios = {
463 'default': {},
464 'error-not-exist': {
465 'open_scenario_name': 'nonexist',
467 'error-exist': {
468 'open_scenario_name': 'exist_error',
470 'error-read-denied': {
471 'open_scenario_name': 'read_denied',
473 'not-found': {
474 'file_double_params': dict(
475 path=file_path, fake_file=fake_file_empty),
477 'exist-empty': {
478 'file_double_params': dict(
479 path=file_path, fake_file=fake_file_empty),
481 'exist-minimal': {
482 'file_double_params': dict(
483 path=file_path, fake_file=fake_file_minimal),
485 'exist-large': {
486 'file_double_params': dict(
487 path=file_path, fake_file=fake_file_large),
491 for (name, scenario) in scenarios.items():
492 params = default_scenario_params.copy()
493 params.update(scenario)
494 scenario.update(params)
495 scenario['file_double'] = FileDouble(**scenario['file_double_params'])
496 scenario['file_double'].set_open_scenario(params['open_scenario_name'])
497 scenario['fake_file_scenario_name'] = name
499 return scenarios
502 def get_file_doubles_from_fake_file_scenarios(scenarios):
503 """ Get the `FileDouble` instances from fake file scenarios.
505 :param scenarios: Collection of fake file scenarios.
506 :return: Collection of `FileDouble` instances.
509 doubles = set(
510 scenario['file_double']
511 for scenario in scenarios
512 if scenario['file_double'] is not None)
514 return doubles
517 def setup_file_double_behaviour(testcase, doubles=None):
518 """ Set up file double instances and behaviour.
520 :param testcase: The `TestCase` instance to modify.
521 :param doubles: Collection of `FileDouble` instances.
522 :return: None.
524 If `doubles` is ``None``, a default collection will be made
525 from the result of `make_fake_file_scenarios` result.
528 if doubles is None:
529 scenarios = make_fake_file_scenarios()
530 doubles = get_file_doubles_from_fake_file_scenarios(
531 scenarios.values())
533 for file_double in doubles:
534 file_double.register_for_testcase(testcase)
536 orig_open = builtins.open
538 def fake_open(path, mode='rt', buffering=-1):
539 registry = FileDouble.get_registry_for_testcase(testcase)
540 if path in registry:
541 file_double = registry[path]
542 result = file_double.builtins_open_scenario.call_hook(
543 mode, buffering)
544 else:
545 result = orig_open(path, mode, buffering)
546 return result
548 mock_open = mock.mock_open()
549 mock_open.side_effect = fake_open
551 func_patcher = mock.patch.object(
552 builtins, "open",
553 new=mock_open)
554 func_patcher.start()
555 testcase.addCleanup(func_patcher.stop)
558 def setup_fake_file_fixtures(testcase):
559 """ Set up fixtures for fake file doubles.
561 :param testcase: The `TestCase` instance to modify.
562 :return: None.
565 scenarios = make_fake_file_scenarios()
566 testcase.fake_file_scenarios = scenarios
568 file_doubles = get_file_doubles_from_fake_file_scenarios(
569 scenarios.values())
570 setup_file_double_behaviour(testcase, file_doubles)
573 def set_fake_file_scenario(testcase, name):
574 """ Set the named fake file scenario for the test case. """
575 scenario = testcase.fake_file_scenarios[name]
576 testcase.fake_file_scenario = scenario
577 testcase.file_double = scenario['file_double']
578 testcase.file_double.register_for_testcase(testcase)
581 class FileFunctionScenario:
582 """ Scenario for fake behaviour of a specific file-related function. """
584 def __init__(self, scenario_name, double):
585 self.scenario_name = scenario_name
586 self.double = double
588 self.call_hook = getattr(
589 self, "_hook_{name}".format(name=self.scenario_name))
591 def __repr__(self):
592 text = (
593 "<FileFunctionScenario instance: {id}"
594 " name: {name!r},"
595 " call_hook name: {hook_name!r}"
596 " double: {double!r}"
597 ">").format(
598 id=id(self),
599 name=self.scenario_name, double=self.double,
600 hook_name=self.call_hook.__name__)
601 return text
603 def __eq__(self, other):
604 result = True
605 if not self.scenario_name == other.scenario_name:
606 result = False
607 if not self.double == other.double:
608 result = False
609 if not self.call_hook.__name__ == other.call_hook.__name__:
610 result = False
611 return result
613 def __ne__(self, other):
614 result = not self.__eq__(other)
615 return result
618 class os_path_exists_scenario(FileFunctionScenario):
619 """ Scenario for `os.path.exists` behaviour. """
621 def _hook_exist(self):
622 return True
624 def _hook_not_exist(self):
625 return False
628 class os_access_scenario(FileFunctionScenario):
629 """ Scenario for `os.access` behaviour. """
631 def _hook_okay(self, mode):
632 return True
634 def _hook_not_exist(self, mode):
635 return False
637 def _hook_read_only(self, mode):
638 if mode & (os.W_OK | os.X_OK):
639 result = False
640 else:
641 result = True
642 return result
644 def _hook_denied(self, mode):
645 if mode & (os.R_OK | os.W_OK | os.X_OK):
646 result = False
647 else:
648 result = True
649 return result
652 class os_stat_scenario(FileFunctionScenario):
653 """ Scenario for `os.stat` behaviour. """
655 def _hook_okay(self):
656 return self.double.stat_result
658 def _hook_notfound_error(self):
659 raise FileNotFoundError(
660 self.double.path,
661 "No such file or directory: {path!r}".format(
662 path=self.double.path))
664 def _hook_denied_error(self):
665 raise PermissionError(
666 self.double.path,
667 "Permission denied")
670 class os_lstat_scenario(os_stat_scenario):
671 """ Scenario for `os.lstat` behaviour. """
674 class os_unlink_scenario(FileFunctionScenario):
675 """ Scenario for `os.unlink` behaviour. """
677 def _hook_okay(self):
678 return None
680 def _hook_nonexist(self):
681 error = FileNotFoundError(
682 self.double.path,
683 "No such file or directory: {path!r}".format(
684 path=self.double.path))
685 raise error
687 def _hook_denied(self):
688 error = PermissionError(
689 self.double.path,
690 "Permission denied")
691 raise error
694 class os_rmdir_scenario(FileFunctionScenario):
695 """ Scenario for `os.rmdir` behaviour. """
697 def _hook_okay(self):
698 return None
700 def _hook_nonexist(self):
701 error = FileNotFoundError(
702 self.double.path,
703 "No such file or directory: {path!r}".format(
704 path=self.double.path))
705 raise error
707 def _hook_denied(self):
708 error = PermissionError(
709 self.double.path,
710 "Permission denied")
711 raise error
714 class builtins_open_scenario(FileFunctionScenario):
715 """ Scenario for `builtins.open` behaviour. """
717 def _hook_okay(self, mode, buffering):
718 result = self.double.fake_file
719 return result
721 def _hook_nonexist(self, mode, buffering):
722 if mode.startswith('r'):
723 error = FileNotFoundError(
724 self.double.path,
725 "No such file or directory: {path!r}".format(
726 path=self.double.path))
727 raise error
728 result = self.double.fake_file
729 return result
731 def _hook_exist_error(self, mode, buffering):
732 if mode.startswith('w') or mode.startswith('a'):
733 error = FileExistsError(
734 self.double.path,
735 "File already exists: {path!r}".format(
736 path=self.double.path))
737 raise error
738 result = self.double.fake_file
739 return result
741 def _hook_read_denied(self, mode, buffering):
742 if mode.startswith('r'):
743 error = PermissionError(
744 self.double.path,
745 "Read denied on {path!r}".format(
746 path=self.double.path))
747 raise error
748 result = self.double.fake_file
749 return result
751 def _hook_write_denied(self, mode, buffering):
752 if mode.startswith('w') or mode.startswith('a'):
753 error = PermissionError(
754 self.double.path,
755 "Write denied on {path!r}".format(
756 path=self.double.path))
757 raise error
758 result = self.double.fake_file
759 return result
762 class TestDoubleWithRegistry:
763 """ Abstract base class for a test double with a test case registry. """
765 registry_class = NotImplemented
766 registries = NotImplemented
768 function_scenario_params_by_class = NotImplemented
770 def __new__(cls, *args, **kwargs):
771 superclass = super(TestDoubleWithRegistry, cls)
772 if superclass.__new__ is object.__new__:
773 # The ‘object’ implementation complains about extra arguments.
774 instance = superclass.__new__(cls)
775 else:
776 instance = superclass.__new__(cls, *args, **kwargs)
777 instance.make_set_scenario_methods()
779 return instance
781 def __init__(self, *args, **kwargs):
782 super(TestDoubleWithRegistry, self).__init__(*args, **kwargs)
783 self._set_method_per_scenario()
785 def _make_set_scenario_method(self, scenario_class, params):
786 def method(self, name):
787 scenario = scenario_class(name, double=self)
788 setattr(self, scenario_class.__name__, scenario)
789 method.__doc__ = (
790 """ Set the scenario for `{name}` behaviour. """
791 ).format(name=scenario_class.__name__)
792 method.__name__ = str(params['set_scenario_method_name'])
793 return method
795 def make_set_scenario_methods(self):
796 """ Make `set_<scenario_class_name>` methods on this class. """
797 for (function_scenario_class, function_scenario_params) in (
798 self.function_scenario_params_by_class.items()):
799 method = self._make_set_scenario_method(
800 function_scenario_class, function_scenario_params)
801 setattr(self.__class__, method.__name__, method)
802 function_scenario_params['set_scenario_method'] = method
804 def _set_method_per_scenario(self):
805 """ Set the method to be called for each scenario. """
806 for function_scenario_params in (
807 self.function_scenario_params_by_class.values()):
808 function_scenario_params['set_scenario_method'](
809 self, function_scenario_params['default_scenario_name'])
811 @classmethod
812 def get_registry_for_testcase(cls, testcase):
813 """ Get the FileDouble registry for the specified test case. """
814 # Key in a dict must be hashable.
815 key = (testcase.__class__, id(testcase))
816 registry = cls.registries.setdefault(key, cls.registry_class())
817 return registry
819 def get_registry_key(self):
820 """ Get the registry key for this double. """
821 raise NotImplementedError
823 def register_for_testcase(self, testcase):
824 """ Add this instance to registry for the specified testcase. """
825 registry = self.get_registry_for_testcase(testcase)
826 key = self.get_registry_key()
827 registry[key] = self
828 unregister_func = functools.partial(
829 self.unregister_for_testcase, testcase)
830 testcase.addCleanup(unregister_func)
832 def unregister_for_testcase(self, testcase):
833 """ Remove this instance from registry for the specified testcase. """
834 registry = self.get_registry_for_testcase(testcase)
835 key = self.get_registry_key()
836 if key in registry:
837 registry.pop(key)
840 def copy_fake_file(fake_file):
841 """ Make a copy of the StringIO instance. """
842 fake_file_type = StringIO
843 content = ""
844 if fake_file is not None:
845 fake_file_type = type(fake_file)
846 content = fake_file.getvalue()
847 assert issubclass(fake_file_type, object)
848 result = fake_file_type(content)
849 return result
852 class FileDouble(TestDoubleWithRegistry):
853 """ A testing double for a file. """
855 registry_class = dict
856 registries = {}
858 function_scenario_params_by_class = {
859 os_path_exists_scenario: {
860 'default_scenario_name': 'not_exist',
861 'set_scenario_method_name': 'set_os_path_exists_scenario',
863 os_access_scenario: {
864 'default_scenario_name': 'okay',
865 'set_scenario_method_name': 'set_os_access_scenario',
867 os_stat_scenario: {
868 'default_scenario_name': 'okay',
869 'set_scenario_method_name': 'set_os_stat_scenario',
871 os_lstat_scenario: {
872 'default_scenario_name': 'okay',
873 'set_scenario_method_name': 'set_os_lstat_scenario',
875 builtins_open_scenario: {
876 'default_scenario_name': 'okay',
877 'set_scenario_method_name': 'set_open_scenario',
879 os_unlink_scenario: {
880 'default_scenario_name': 'okay',
881 'set_scenario_method_name': 'set_os_unlink_scenario',
883 os_rmdir_scenario: {
884 'default_scenario_name': 'okay',
885 'set_scenario_method_name': 'set_os_rmdir_scenario',
889 def __init__(self, path=None, fake_file=None, *args, **kwargs):
890 self.path = path
891 self.fake_file = copy_fake_file(fake_file)
892 self.fake_file.name = path
894 self._set_stat_result()
896 super(FileDouble, self).__init__(*args, **kwargs)
898 def _set_stat_result(self):
899 """ Set the `os.stat` result for this file. """
900 size = len(self.fake_file.getvalue())
901 self.stat_result = StatResult(
902 st_mode=0,
903 st_ino=None, st_dev=None, st_nlink=None,
904 st_uid=0, st_gid=0,
905 st_size=size,
906 st_atime=None, st_mtime=None, st_ctime=None,
909 def __repr__(self):
910 text = "FileDouble(path={path!r}, fake_file={fake_file!r})".format(
911 path=self.path, fake_file=self.fake_file)
912 return text
914 def get_registry_key(self):
915 """ Get the registry key for this double. """
916 result = self.path
917 return result
920 class SubprocessFunctionScenario:
921 """ Scenario for fake behaviour of a specific subprocess function. """
923 def __init__(self, scenario_name, double):
924 self.scenario_name = scenario_name
925 self.subprocess_double = double
927 self.call_hook = getattr(
928 self, "_hook_{name}".format(name=self.scenario_name))
930 def __repr__(self):
931 text = (
932 "<SubprocessFunctionScenario instance: {id}"
933 " name: {name!r},"
934 " call_hook name: {hook_name!r}"
935 " subprocess_double: {double!r}"
936 ">").format(
937 id=id(self),
938 name=self.scenario_name, double=self.subprocess_double,
939 hook_name=self.call_hook.__name__)
940 return text
942 def __eq__(self, other):
943 result = True
944 if not self.scenario_name == other.scenario_name:
945 result = False
946 if not self.subprocess_double == other.subprocess_double:
947 result = False
948 if not self.call_hook.__name__ == other.call_hook.__name__:
949 result = False
950 return result
952 def __ne__(self, other):
953 result = not self.__eq__(other)
954 return result
957 class os_popen_scenario(SubprocessFunctionScenario):
958 """ Scenario for `os.popen` behaviour. """
960 stream_name_by_mode = {
961 'w': 'stdin',
962 'r': 'stdout',
965 def _hook_success(self, cmd, mode, buffering):
966 stream_name = self.stream_name_by_mode[mode]
967 stream_double = getattr(
968 self.subprocess_double, stream_name + '_double')
969 result = stream_double.fake_file
970 return result
972 def _hook_failure(self, cmd, mode, buffering):
973 result = StringIO()
974 return result
976 def _hook_not_found(self, cmd, mode, buffering):
977 result = StringIO()
978 return result
981 class os_waitpid_scenario(SubprocessFunctionScenario):
982 """ Scenario for `os.waitpid` behaviour. """
984 def _hook_success(self, pid, options):
985 result = (pid, EXIT_STATUS_SUCCESS)
986 return result
988 def _hook_failure(self, pid, options):
989 result = (pid, EXIT_STATUS_FAILURE)
990 return result
992 def _hook_not_found(self, pid, options):
993 error = OSError(errno.ECHILD)
994 raise error
997 class os_system_scenario(SubprocessFunctionScenario):
998 """ Scenario for `os.system` behaviour. """
1000 def _hook_success(self, command):
1001 result = EXIT_STATUS_SUCCESS
1002 return result
1004 def _hook_failure(self, command):
1005 result = EXIT_STATUS_FAILURE
1006 return result
1008 def _hook_not_found(self, command):
1009 result = EXIT_STATUS_COMMAND_NOT_FOUND
1010 return result
1013 class os_spawnv_scenario(SubprocessFunctionScenario):
1014 """ Scenario for `os.spawnv` behaviour. """
1016 def _hook_success(self, mode, file, args):
1017 result = EXIT_STATUS_SUCCESS
1018 return result
1020 def _hook_failure(self, mode, file, args):
1021 result = EXIT_STATUS_FAILURE
1022 return result
1024 def _hook_not_found(self, mode, file, args):
1025 result = EXIT_STATUS_COMMAND_NOT_FOUND
1026 return result
1029 ARG_ANY = object()
1030 ARG_MORE = object()
1033 class PopenDouble:
1034 """ A testing double for `subprocess.Popen`. """
1036 def __init__(self, args, *posargs, **kwargs):
1037 self.stdin = None
1038 self.stdout = None
1039 self.stderr = None
1040 self.pid = None
1041 self.returncode = None
1043 def set_streams(self, subprocess_double, popen_kwargs):
1044 """ Set the streams on the `PopenDouble`.
1046 :param subprocess_double: The `SubprocessDouble` from
1047 which to get existing stream doubles.
1048 :param popen_kwargs: The keyword arguments to the
1049 `subprocess.Popen` call.
1050 :return: ``None``.
1053 for stream_name in (
1054 name for name in ['stdin', 'stdout', 'stderr']
1055 if name in popen_kwargs):
1056 stream_spec = popen_kwargs[stream_name]
1057 if stream_spec is subprocess.PIPE:
1058 stream_double = getattr(
1059 subprocess_double,
1060 "{name}_double".format(name=stream_name))
1061 stream_file = stream_double.fake_file
1062 elif stream_spec is subprocess.STDOUT:
1063 stream_file = subprocess_double.stdout_double.fake_file
1064 else:
1065 stream_file = stream_spec
1066 setattr(self, stream_name, stream_file)
1069 class subprocess_popen_scenario(SubprocessFunctionScenario):
1070 """ Scenario for `subprocess.Popen` behaviour. """
1072 def _hook_success(self, testcase, args, *posargs, **kwargs):
1073 double = self.subprocess_double.popen_double
1074 double.set_streams(self.subprocess_double, kwargs)
1075 return double
1078 def patch_subprocess_popen(testcase):
1079 """ Patch `subprocess.Popen` constructor for this test case.
1081 :param testcase: The `TestCase` instance to modify.
1082 :return: None.
1084 When the patched function is called, the registry of
1085 `SubprocessDouble` instances for this test case will be used
1086 to get the instance for the program path specified.
1089 orig_subprocess_popen = subprocess.Popen
1091 def fake_subprocess_popen(args, *posargs, **kwargs):
1092 if kwargs.get('shell', False):
1093 argv = shlex.split(args)
1094 else:
1095 argv = args
1096 registry = SubprocessDouble.get_registry_for_testcase(testcase)
1097 if argv in registry:
1098 subprocess_double = registry[argv]
1099 result = subprocess_double.subprocess_popen_scenario.call_hook(
1100 testcase, args, *posargs, **kwargs)
1101 else:
1102 result = orig_subprocess_popen(args, *posargs, **kwargs)
1103 return result
1105 func_patcher = mock.patch.object(
1106 subprocess, "Popen", side_effect=fake_subprocess_popen)
1107 func_patcher.start()
1108 testcase.addCleanup(func_patcher.stop)
1111 class SubprocessDoubleRegistry(collections_abc.MutableMapping):
1112 """ Registry of `SubprocessDouble` instances by `argv`. """
1114 def __init__(self, *args, **kwargs):
1115 items = []
1116 if args:
1117 if isinstance(args[0], collections_abc.Mapping):
1118 items = args[0].items()
1119 if isinstance(args[0], collections_abc.Iterable):
1120 items = args[0]
1121 self._mapping = dict(items)
1123 def __repr__(self):
1124 text = "<{class_name} object: {mapping}>".format(
1125 class_name=self.__class__.__name__, mapping=self._mapping)
1126 return text
1128 def _match_argv(self, argv):
1129 """ Match the specified `argv` with our registered keys. """
1130 match = None
1131 if not isinstance(argv, collections_abc.Sequence):
1132 return match
1133 candidates = iter(self._mapping)
1134 while match is None:
1135 try:
1136 candidate = next(candidates)
1137 except StopIteration:
1138 break
1139 found = None
1140 if candidate == argv:
1141 # An exact match.
1142 found = True
1143 word_iter = enumerate(candidate)
1144 while found is None:
1145 try:
1146 (word_index, candidate_word) = next(word_iter)
1147 except StopIteration:
1148 break
1149 if candidate_word is ARG_MORE:
1150 # Candiate matches any remaining words. We have a match.
1151 found = True
1152 elif word_index > len(argv):
1153 # Candidate is too long for the specified argv.
1154 found = False
1155 elif candidate_word is ARG_ANY:
1156 # Candidate matches any word at this position.
1157 continue
1158 elif candidate_word == argv[word_index]:
1159 # Candidate matches the word at this position.
1160 continue
1161 else:
1162 # This candidate does not match.
1163 found = False
1164 if found is None:
1165 # Reached the end of the candidate without a mismatch.
1166 found = True
1167 if found:
1168 match = candidate
1169 return match
1171 def __getitem__(self, key):
1172 match = self._match_argv(key)
1173 if match is None:
1174 raise KeyError(key)
1175 result = self._mapping[match]
1176 return result
1178 def __setitem__(self, key, value):
1179 if key in self:
1180 del self[key]
1181 self._mapping[key] = value
1183 def __delitem__(self, key):
1184 match = self._match_argv(key)
1185 if match is not None:
1186 del self._mapping[match]
1188 def __iter__(self):
1189 return self._mapping.__iter__()
1191 def __len__(self):
1192 return self._mapping.__len__()
1195 class SubprocessDouble(TestDoubleWithRegistry):
1196 """ A testing double for a subprocess. """
1198 registry_class = SubprocessDoubleRegistry
1199 registries = {}
1201 double_by_pid = weakref.WeakValueDictionary()
1203 function_scenario_params_by_class = {
1204 subprocess_popen_scenario: {
1205 'default_scenario_name': 'success',
1206 'set_scenario_method_name': 'set_subprocess_popen_scenario',
1208 os_popen_scenario: {
1209 'default_scenario_name': 'success',
1210 'set_scenario_method_name': 'set_os_popen_scenario',
1212 os_waitpid_scenario: {
1213 'default_scenario_name': 'success',
1214 'set_scenario_method_name': 'set_os_waitpid_scenario',
1216 os_system_scenario: {
1217 'default_scenario_name': 'success',
1218 'set_scenario_method_name': 'set_os_system_scenario',
1220 os_spawnv_scenario: {
1221 'default_scenario_name': 'success',
1222 'set_scenario_method_name': 'set_os_spawnv_scenario',
1226 def __init__(self, path=None, argv=None, *args, **kwargs):
1227 if path is None:
1228 path = tempfile.mktemp()
1229 self.path = path
1231 if argv is None:
1232 command_name = os.path.basename(path)
1233 argv = [command_name]
1234 self.argv = argv
1236 self.pid = self._make_pid()
1237 self._register_by_pid()
1239 self.set_popen_double()
1241 self.stdin_double = FileDouble()
1242 self.stdout_double = FileDouble()
1243 self.stderr_double = FileDouble()
1245 super(SubprocessDouble, self).__init__(*args, **kwargs)
1247 def set_popen_double(self):
1248 """ Set the `PopenDouble` for this instance. """
1249 double = PopenDouble(self.argv)
1250 double.pid = self.pid
1252 self.popen_double = double
1254 def __repr__(self):
1255 text = (
1256 "<SubprocessDouble instance: {id}"
1257 " path: {path!r},"
1258 " argv: {argv!r}"
1259 " stdin_double: {stdin_double!r}"
1260 " stdout_double: {stdout_double!r}"
1261 " stderr_double: {stderr_double!r}"
1262 ">").format(
1263 id=id(self),
1264 path=self.path, argv=self.argv,
1265 stdin_double=self.stdin_double,
1266 stdout_double=self.stdout_double,
1267 stderr_double=self.stderr_double)
1268 return text
1270 @classmethod
1271 def _make_pid(cls):
1272 """ Make a unique PID for a subprocess. """
1273 for pid in itertools.count(1):
1274 yield pid
1276 def _register_by_pid(self):
1277 """ Register this subprocess by its PID. """
1278 self.__class__.double_by_pid[self.pid] = self
1280 def get_registry_key(self):
1281 """ Get the registry key for this double. """
1282 result = tuple(self.argv)
1283 return result
1285 def set_stdin_content(self, text):
1286 """ Set the content of the `stdin` stream for this double. """
1287 self.stdin_double.fake_file = StringIO(text)
1289 def set_stdout_content(self, text):
1290 """ Set the content of the `stdout` stream for this double. """
1291 self.stdout_double.fake_file = StringIO(text)
1293 def set_stderr_content(self, text):
1294 """ Set the content of the `stderr` stream for this double. """
1295 self.stderr_double.fake_file = StringIO(text)
1298 def make_fake_subprocess_scenarios(path=None):
1299 """ Make a collection of scenarios for testing with fake files.
1301 :path: The filesystem path of the fake program. If not specified,
1302 a valid random path will be generated.
1303 :return: A collection of scenarios for tests involving subprocesses.
1305 The collection is a mapping from scenario name to a dictionary of
1306 scenario attributes.
1309 if path is None:
1310 file_path = tempfile.mktemp()
1311 else:
1312 file_path = path
1314 default_scenario_params = {
1315 'return_value': EXIT_STATUS_SUCCESS,
1316 'program_path': file_path,
1317 'argv_after_command_name': [],
1320 scenarios = {
1321 'default': {},
1322 'not-found': {
1323 'return_value': EXIT_STATUS_COMMAND_NOT_FOUND,
1327 for (name, scenario) in scenarios.items():
1328 params = default_scenario_params.copy()
1329 params.update(scenario)
1330 scenario.update(params)
1331 program_path = params['program_path']
1332 program_name = os.path.basename(params['program_path'])
1333 argv = [program_name]
1334 argv.extend(params['argv_after_command_name'])
1335 subprocess_double_params = dict(
1336 path=program_path,
1337 argv=argv,
1339 subprocess_double = SubprocessDouble(**subprocess_double_params)
1340 scenario['subprocess_double'] = subprocess_double
1341 scenario['fake_file_scenario_name'] = name
1343 return scenarios
1346 def get_subprocess_doubles_from_fake_subprocess_scenarios(scenarios):
1347 """ Get the `SubprocessDouble` instances from fake subprocess scenarios.
1349 :param scenarios: Collection of fake subprocess scenarios.
1350 :return: Collection of `SubprocessDouble` instances.
1353 doubles = set(
1354 scenario['subprocess_double']
1355 for scenario in scenarios
1356 if scenario['subprocess_double'] is not None)
1358 return doubles
1361 def setup_subprocess_double_behaviour(testcase, doubles=None):
1362 """ Set up subprocess double instances and behaviour.
1364 :param testcase: The `TestCase` instance to modify.
1365 :param doubles: Collection of `SubprocessDouble` instances.
1366 :return: None.
1368 If `doubles` is ``None``, a default collection will be made
1369 from the return value of `make_fake_subprocess_scenarios`.
1372 if doubles is None:
1373 scenarios = make_fake_subprocess_scenarios()
1374 doubles = get_subprocess_doubles_from_fake_subprocess_scenarios(
1375 scenarios.values())
1377 for double in doubles:
1378 double.register_for_testcase(testcase)
1381 def setup_fake_subprocess_fixtures(testcase):
1382 """ Set up fixtures for fake subprocess doubles.
1384 :param testcase: The `TestCase` instance to modify.
1385 :return: None.
1388 scenarios = make_fake_subprocess_scenarios()
1389 testcase.fake_subprocess_scenarios = scenarios
1391 doubles = get_subprocess_doubles_from_fake_subprocess_scenarios(
1392 scenarios.values())
1393 setup_subprocess_double_behaviour(testcase, doubles)
1396 def patch_os_popen(testcase):
1397 """ Patch `os.popen` behaviour for this test case.
1399 :param testcase: The `TestCase` instance to modify.
1400 :return: None.
1402 When the patched function is called, the registry of
1403 `SubprocessDouble` instances for this test case will be used
1404 to get the instance for the program path specified.
1407 orig_os_popen = os.popen
1409 def fake_os_popen(cmd, mode='r', buffering=-1):
1410 registry = SubprocessDouble.get_registry_for_testcase(testcase)
1411 command_argv = shlex.split(cmd)
1412 if command_argv in registry:
1413 subprocess_double = registry[command_argv]
1414 result = subprocess_double.os_popen_scenario.call_hook(
1415 cmd, mode, buffering)
1416 else:
1417 result = orig_os_popen(cmd, mode, buffering)
1418 return result
1420 func_patcher = mock.patch.object(
1421 os, "popen", side_effect=fake_os_popen)
1422 func_patcher.start()
1423 testcase.addCleanup(func_patcher.stop)
1426 def patch_os_waitpid(testcase):
1427 """ Patch `os.waitpid` behaviour for this test case.
1429 :param testcase: The `TestCase` instance to modify.
1430 :return: None.
1432 When the patched function is called, the registry of
1433 `SubprocessDouble` instances for this test case will be used
1434 to get the instance for the program path specified.
1437 orig_os_waitpid = os.waitpid
1439 def fake_os_waitpid(pid, options):
1440 registry = SubprocessDouble.double_by_pid
1441 if pid in registry:
1442 subprocess_double = registry[pid]
1443 result = subprocess_double.os_waitpid_scenario.call_hook(
1444 pid, options)
1445 else:
1446 result = orig_os_waitpid(pid, options)
1447 return result
1449 func_patcher = mock.patch.object(
1450 os, "waitpid", side_effect=fake_os_waitpid)
1451 func_patcher.start()
1452 testcase.addCleanup(func_patcher.stop)
1455 def patch_os_system(testcase):
1456 """ Patch `os.system` behaviour for this test case.
1458 :param testcase: The `TestCase` instance to modify.
1459 :return: None.
1461 When the patched function is called, the registry of
1462 `SubprocessDouble` instances for this test case will be used
1463 to get the instance for the program path specified.
1466 orig_os_system = os.system
1468 def fake_os_system(command):
1469 registry = SubprocessDouble.get_registry_for_testcase(testcase)
1470 command_argv = shlex.split(command)
1471 if command_argv in registry:
1472 subprocess_double = registry[command_argv]
1473 result = subprocess_double.os_system_scenario.call_hook(
1474 command)
1475 else:
1476 result = orig_os_system(command)
1477 return result
1479 func_patcher = mock.patch.object(
1480 os, "system", side_effect=fake_os_system)
1481 func_patcher.start()
1482 testcase.addCleanup(func_patcher.stop)
1485 def patch_os_spawnv(testcase):
1486 """ Patch `os.spawnv` behaviour for this test case.
1488 :param testcase: The `TestCase` instance to modify.
1489 :return: None.
1491 When the patched function is called, the registry of
1492 `SubprocessDouble` instances for this test case will be used
1493 to get the instance for the program path specified.
1496 orig_os_spawnv = os.spawnv
1498 def fake_os_spawnv(mode, file, args):
1499 registry = SubprocessDouble.get_registry_for_testcase(testcase)
1500 registry_key = tuple(args)
1501 if registry_key in registry:
1502 subprocess_double = registry[registry_key]
1503 result = subprocess_double.os_spawnv_scenario.call_hook(
1504 mode, file, args)
1505 else:
1506 result = orig_os_spawnv(mode, file, args)
1507 return result
1509 func_patcher = mock.patch.object(
1510 os, "spawnv", side_effect=fake_os_spawnv)
1511 func_patcher.start()
1512 testcase.addCleanup(func_patcher.stop)
1515 # Local variables:
1516 # coding: utf-8
1517 # mode: python
1518 # End:
1519 # vim: fileencoding=utf-8 filetype=python :