move sections
[python/dscho.git] / Lib / test / test_support.py
blob6e7853b89e522c92e358c651fdbb18d7097b02a1
1 """Supporting definitions for the Python regression tests."""
3 if __name__ != 'test.test_support':
4 raise ImportError('test_support must be imported from the test package')
6 import contextlib
7 import errno
8 import functools
9 import gc
10 import socket
11 import sys
12 import os
13 import platform
14 import shutil
15 import warnings
16 import unittest
17 import importlib
18 import UserDict
19 import re
20 import time
21 try:
22 import thread
23 except ImportError:
24 thread = None
26 __all__ = ["Error", "TestFailed", "ResourceDenied", "import_module",
27 "verbose", "use_resources", "max_memuse", "record_original_stdout",
28 "get_original_stdout", "unload", "unlink", "rmtree", "forget",
29 "is_resource_enabled", "requires", "find_unused_port", "bind_port",
30 "fcmp", "have_unicode", "is_jython", "TESTFN", "HOST", "FUZZ",
31 "SAVEDCWD", "temp_cwd", "findfile", "sortdict", "check_syntax_error",
32 "open_urlresource", "check_warnings", "check_py3k_warnings",
33 "CleanImport", "EnvironmentVarGuard", "captured_output",
34 "captured_stdout", "TransientResource", "transient_internet",
35 "run_with_locale", "set_memlimit", "bigmemtest", "bigaddrspacetest",
36 "BasicTestRunner", "run_unittest", "run_doctest", "threading_setup",
37 "threading_cleanup", "reap_children", "cpython_only",
38 "check_impl_detail", "get_attribute", "py3k_bytes"]
41 class Error(Exception):
42 """Base class for regression test exceptions."""
44 class TestFailed(Error):
45 """Test failed."""
47 class ResourceDenied(unittest.SkipTest):
48 """Test skipped because it requested a disallowed resource.
50 This is raised when a test calls requires() for a resource that
51 has not been enabled. It is used to distinguish between expected
52 and unexpected skips.
53 """
55 @contextlib.contextmanager
56 def _ignore_deprecated_imports(ignore=True):
57 """Context manager to suppress package and module deprecation
58 warnings when importing them.
60 If ignore is False, this context manager has no effect."""
61 if ignore:
62 with warnings.catch_warnings():
63 warnings.filterwarnings("ignore", ".+ (module|package)",
64 DeprecationWarning)
65 yield
66 else:
67 yield
70 def import_module(name, deprecated=False):
71 """Import and return the module to be tested, raising SkipTest if
72 it is not available.
74 If deprecated is True, any module or package deprecation messages
75 will be suppressed."""
76 with _ignore_deprecated_imports(deprecated):
77 try:
78 return importlib.import_module(name)
79 except ImportError, msg:
80 raise unittest.SkipTest(str(msg))
83 def _save_and_remove_module(name, orig_modules):
84 """Helper function to save and remove a module from sys.modules
86 Return value is True if the module was in sys.modules and
87 False otherwise."""
88 saved = True
89 try:
90 orig_modules[name] = sys.modules[name]
91 except KeyError:
92 saved = False
93 else:
94 del sys.modules[name]
95 return saved
98 def _save_and_block_module(name, orig_modules):
99 """Helper function to save and block a module in sys.modules
101 Return value is True if the module was in sys.modules and
102 False otherwise."""
103 saved = True
104 try:
105 orig_modules[name] = sys.modules[name]
106 except KeyError:
107 saved = False
108 sys.modules[name] = 0
109 return saved
112 def import_fresh_module(name, fresh=(), blocked=(), deprecated=False):
113 """Imports and returns a module, deliberately bypassing the sys.modules cache
114 and importing a fresh copy of the module. Once the import is complete,
115 the sys.modules cache is restored to its original state.
117 Modules named in fresh are also imported anew if needed by the import.
119 Importing of modules named in blocked is prevented while the fresh import
120 takes place.
122 If deprecated is True, any module or package deprecation messages
123 will be suppressed."""
124 # NOTE: test_heapq and test_warnings include extra sanity checks to make
125 # sure that this utility function is working as expected
126 with _ignore_deprecated_imports(deprecated):
127 # Keep track of modules saved for later restoration as well
128 # as those which just need a blocking entry removed
129 orig_modules = {}
130 names_to_remove = []
131 _save_and_remove_module(name, orig_modules)
132 try:
133 for fresh_name in fresh:
134 _save_and_remove_module(fresh_name, orig_modules)
135 for blocked_name in blocked:
136 if not _save_and_block_module(blocked_name, orig_modules):
137 names_to_remove.append(blocked_name)
138 fresh_module = importlib.import_module(name)
139 finally:
140 for orig_name, module in orig_modules.items():
141 sys.modules[orig_name] = module
142 for name_to_remove in names_to_remove:
143 del sys.modules[name_to_remove]
144 return fresh_module
147 def get_attribute(obj, name):
148 """Get an attribute, raising SkipTest if AttributeError is raised."""
149 try:
150 attribute = getattr(obj, name)
151 except AttributeError:
152 raise unittest.SkipTest("module %s has no attribute %s" % (
153 obj.__name__, name))
154 else:
155 return attribute
158 verbose = 1 # Flag set to 0 by regrtest.py
159 use_resources = None # Flag set to [] by regrtest.py
160 max_memuse = 0 # Disable bigmem tests (they will still be run with
161 # small sizes, to make sure they work.)
162 real_max_memuse = 0
164 # _original_stdout is meant to hold stdout at the time regrtest began.
165 # This may be "the real" stdout, or IDLE's emulation of stdout, or whatever.
166 # The point is to have some flavor of stdout the user can actually see.
167 _original_stdout = None
168 def record_original_stdout(stdout):
169 global _original_stdout
170 _original_stdout = stdout
172 def get_original_stdout():
173 return _original_stdout or sys.stdout
175 def unload(name):
176 try:
177 del sys.modules[name]
178 except KeyError:
179 pass
181 def unlink(filename):
182 try:
183 os.unlink(filename)
184 except OSError:
185 pass
187 def rmtree(path):
188 try:
189 shutil.rmtree(path)
190 except OSError, e:
191 # Unix returns ENOENT, Windows returns ESRCH.
192 if e.errno not in (errno.ENOENT, errno.ESRCH):
193 raise
195 def forget(modname):
196 '''"Forget" a module was ever imported by removing it from sys.modules and
197 deleting any .pyc and .pyo files.'''
198 unload(modname)
199 for dirname in sys.path:
200 unlink(os.path.join(dirname, modname + os.extsep + 'pyc'))
201 # Deleting the .pyo file cannot be within the 'try' for the .pyc since
202 # the chance exists that there is no .pyc (and thus the 'try' statement
203 # is exited) but there is a .pyo file.
204 unlink(os.path.join(dirname, modname + os.extsep + 'pyo'))
206 def is_resource_enabled(resource):
207 """Test whether a resource is enabled. Known resources are set by
208 regrtest.py."""
209 return use_resources is not None and resource in use_resources
211 def requires(resource, msg=None):
212 """Raise ResourceDenied if the specified resource is not available.
214 If the caller's module is __main__ then automatically return True. The
215 possibility of False being returned occurs when regrtest.py is executing."""
216 # see if the caller's module is __main__ - if so, treat as if
217 # the resource was set
218 if sys._getframe(1).f_globals.get("__name__") == "__main__":
219 return
220 if not is_resource_enabled(resource):
221 if msg is None:
222 msg = "Use of the `%s' resource not enabled" % resource
223 raise ResourceDenied(msg)
225 HOST = 'localhost'
227 def find_unused_port(family=socket.AF_INET, socktype=socket.SOCK_STREAM):
228 """Returns an unused port that should be suitable for binding. This is
229 achieved by creating a temporary socket with the same family and type as
230 the 'sock' parameter (default is AF_INET, SOCK_STREAM), and binding it to
231 the specified host address (defaults to 0.0.0.0) with the port set to 0,
232 eliciting an unused ephemeral port from the OS. The temporary socket is
233 then closed and deleted, and the ephemeral port is returned.
235 Either this method or bind_port() should be used for any tests where a
236 server socket needs to be bound to a particular port for the duration of
237 the test. Which one to use depends on whether the calling code is creating
238 a python socket, or if an unused port needs to be provided in a constructor
239 or passed to an external program (i.e. the -accept argument to openssl's
240 s_server mode). Always prefer bind_port() over find_unused_port() where
241 possible. Hard coded ports should *NEVER* be used. As soon as a server
242 socket is bound to a hard coded port, the ability to run multiple instances
243 of the test simultaneously on the same host is compromised, which makes the
244 test a ticking time bomb in a buildbot environment. On Unix buildbots, this
245 may simply manifest as a failed test, which can be recovered from without
246 intervention in most cases, but on Windows, the entire python process can
247 completely and utterly wedge, requiring someone to log in to the buildbot
248 and manually kill the affected process.
250 (This is easy to reproduce on Windows, unfortunately, and can be traced to
251 the SO_REUSEADDR socket option having different semantics on Windows versus
252 Unix/Linux. On Unix, you can't have two AF_INET SOCK_STREAM sockets bind,
253 listen and then accept connections on identical host/ports. An EADDRINUSE
254 socket.error will be raised at some point (depending on the platform and
255 the order bind and listen were called on each socket).
257 However, on Windows, if SO_REUSEADDR is set on the sockets, no EADDRINUSE
258 will ever be raised when attempting to bind two identical host/ports. When
259 accept() is called on each socket, the second caller's process will steal
260 the port from the first caller, leaving them both in an awkwardly wedged
261 state where they'll no longer respond to any signals or graceful kills, and
262 must be forcibly killed via OpenProcess()/TerminateProcess().
264 The solution on Windows is to use the SO_EXCLUSIVEADDRUSE socket option
265 instead of SO_REUSEADDR, which effectively affords the same semantics as
266 SO_REUSEADDR on Unix. Given the propensity of Unix developers in the Open
267 Source world compared to Windows ones, this is a common mistake. A quick
268 look over OpenSSL's 0.9.8g source shows that they use SO_REUSEADDR when
269 openssl.exe is called with the 's_server' option, for example. See
270 http://bugs.python.org/issue2550 for more info. The following site also
271 has a very thorough description about the implications of both REUSEADDR
272 and EXCLUSIVEADDRUSE on Windows:
273 http://msdn2.microsoft.com/en-us/library/ms740621(VS.85).aspx)
275 XXX: although this approach is a vast improvement on previous attempts to
276 elicit unused ports, it rests heavily on the assumption that the ephemeral
277 port returned to us by the OS won't immediately be dished back out to some
278 other process when we close and delete our temporary socket but before our
279 calling code has a chance to bind the returned port. We can deal with this
280 issue if/when we come across it."""
281 tempsock = socket.socket(family, socktype)
282 port = bind_port(tempsock)
283 tempsock.close()
284 del tempsock
285 return port
287 def bind_port(sock, host=HOST):
288 """Bind the socket to a free port and return the port number. Relies on
289 ephemeral ports in order to ensure we are using an unbound port. This is
290 important as many tests may be running simultaneously, especially in a
291 buildbot environment. This method raises an exception if the sock.family
292 is AF_INET and sock.type is SOCK_STREAM, *and* the socket has SO_REUSEADDR
293 or SO_REUSEPORT set on it. Tests should *never* set these socket options
294 for TCP/IP sockets. The only case for setting these options is testing
295 multicasting via multiple UDP sockets.
297 Additionally, if the SO_EXCLUSIVEADDRUSE socket option is available (i.e.
298 on Windows), it will be set on the socket. This will prevent anyone else
299 from bind()'ing to our host/port for the duration of the test.
301 if sock.family == socket.AF_INET and sock.type == socket.SOCK_STREAM:
302 if hasattr(socket, 'SO_REUSEADDR'):
303 if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR) == 1:
304 raise TestFailed("tests should never set the SO_REUSEADDR " \
305 "socket option on TCP/IP sockets!")
306 if hasattr(socket, 'SO_REUSEPORT'):
307 if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT) == 1:
308 raise TestFailed("tests should never set the SO_REUSEPORT " \
309 "socket option on TCP/IP sockets!")
310 if hasattr(socket, 'SO_EXCLUSIVEADDRUSE'):
311 sock.setsockopt(socket.SOL_SOCKET, socket.SO_EXCLUSIVEADDRUSE, 1)
313 sock.bind((host, 0))
314 port = sock.getsockname()[1]
315 return port
317 FUZZ = 1e-6
319 def fcmp(x, y): # fuzzy comparison function
320 if isinstance(x, float) or isinstance(y, float):
321 try:
322 fuzz = (abs(x) + abs(y)) * FUZZ
323 if abs(x-y) <= fuzz:
324 return 0
325 except:
326 pass
327 elif type(x) == type(y) and isinstance(x, (tuple, list)):
328 for i in range(min(len(x), len(y))):
329 outcome = fcmp(x[i], y[i])
330 if outcome != 0:
331 return outcome
332 return (len(x) > len(y)) - (len(x) < len(y))
333 return (x > y) - (x < y)
335 try:
336 unicode
337 have_unicode = True
338 except NameError:
339 have_unicode = False
341 is_jython = sys.platform.startswith('java')
343 # Filename used for testing
344 if os.name == 'java':
345 # Jython disallows @ in module names
346 TESTFN = '$test'
347 elif os.name == 'riscos':
348 TESTFN = 'testfile'
349 else:
350 TESTFN = '@test'
351 # Unicode name only used if TEST_FN_ENCODING exists for the platform.
352 if have_unicode:
353 # Assuming sys.getfilesystemencoding()!=sys.getdefaultencoding()
354 # TESTFN_UNICODE is a filename that can be encoded using the
355 # file system encoding, but *not* with the default (ascii) encoding
356 if isinstance('', unicode):
357 # python -U
358 # XXX perhaps unicode() should accept Unicode strings?
359 TESTFN_UNICODE = "@test-\xe0\xf2"
360 else:
361 # 2 latin characters.
362 TESTFN_UNICODE = unicode("@test-\xe0\xf2", "latin-1")
363 TESTFN_ENCODING = sys.getfilesystemencoding()
364 # TESTFN_UNICODE_UNENCODEABLE is a filename that should *not* be
365 # able to be encoded by *either* the default or filesystem encoding.
366 # This test really only makes sense on Windows NT platforms
367 # which have special Unicode support in posixmodule.
368 if (not hasattr(sys, "getwindowsversion") or
369 sys.getwindowsversion()[3] < 2): # 0=win32s or 1=9x/ME
370 TESTFN_UNICODE_UNENCODEABLE = None
371 else:
372 # Japanese characters (I think - from bug 846133)
373 TESTFN_UNICODE_UNENCODEABLE = eval('u"@test-\u5171\u6709\u3055\u308c\u308b"')
374 try:
375 # XXX - Note - should be using TESTFN_ENCODING here - but for
376 # Windows, "mbcs" currently always operates as if in
377 # errors=ignore' mode - hence we get '?' characters rather than
378 # the exception. 'Latin1' operates as we expect - ie, fails.
379 # See [ 850997 ] mbcs encoding ignores errors
380 TESTFN_UNICODE_UNENCODEABLE.encode("Latin1")
381 except UnicodeEncodeError:
382 pass
383 else:
384 print \
385 'WARNING: The filename %r CAN be encoded by the filesystem. ' \
386 'Unicode filename tests may not be effective' \
387 % TESTFN_UNICODE_UNENCODEABLE
390 # Disambiguate TESTFN for parallel testing, while letting it remain a valid
391 # module name.
392 TESTFN = "{}_{}_tmp".format(TESTFN, os.getpid())
394 # Save the initial cwd
395 SAVEDCWD = os.getcwd()
397 @contextlib.contextmanager
398 def temp_cwd(name='tempcwd', quiet=False):
400 Context manager that creates a temporary directory and set it as CWD.
402 The new CWD is created in the current directory and it's named *name*.
403 If *quiet* is False (default) and it's not possible to create or change
404 the CWD, an error is raised. If it's True, only a warning is raised
405 and the original CWD is used.
407 if isinstance(name, unicode):
408 try:
409 name = name.encode(sys.getfilesystemencoding() or 'ascii')
410 except UnicodeEncodeError:
411 if not quiet:
412 raise unittest.SkipTest('unable to encode the cwd name with '
413 'the filesystem encoding.')
414 saved_dir = os.getcwd()
415 is_temporary = False
416 try:
417 os.mkdir(name)
418 os.chdir(name)
419 is_temporary = True
420 except OSError:
421 if not quiet:
422 raise
423 warnings.warn('tests may fail, unable to change the CWD to ' + name,
424 RuntimeWarning, stacklevel=3)
425 try:
426 yield os.getcwd()
427 finally:
428 os.chdir(saved_dir)
429 if is_temporary:
430 rmtree(name)
433 def findfile(file, here=__file__, subdir=None):
434 """Try to find a file on sys.path and the working directory. If it is not
435 found the argument passed to the function is returned (this does not
436 necessarily signal failure; could still be the legitimate path)."""
437 if os.path.isabs(file):
438 return file
439 if subdir is not None:
440 file = os.path.join(subdir, file)
441 path = sys.path
442 path = [os.path.dirname(here)] + path
443 for dn in path:
444 fn = os.path.join(dn, file)
445 if os.path.exists(fn): return fn
446 return file
448 def sortdict(dict):
449 "Like repr(dict), but in sorted order."
450 items = dict.items()
451 items.sort()
452 reprpairs = ["%r: %r" % pair for pair in items]
453 withcommas = ", ".join(reprpairs)
454 return "{%s}" % withcommas
456 def make_bad_fd():
458 Create an invalid file descriptor by opening and closing a file and return
459 its fd.
461 file = open(TESTFN, "wb")
462 try:
463 return file.fileno()
464 finally:
465 file.close()
466 unlink(TESTFN)
468 def check_syntax_error(testcase, statement):
469 testcase.assertRaises(SyntaxError, compile, statement,
470 '<test string>', 'exec')
472 def open_urlresource(url, check=None):
473 import urlparse, urllib2
475 filename = urlparse.urlparse(url)[2].split('/')[-1] # '/': it's URL!
477 fn = os.path.join(os.path.dirname(__file__), "data", filename)
479 def check_valid_file(fn):
480 f = open(fn)
481 if check is None:
482 return f
483 elif check(f):
484 f.seek(0)
485 return f
486 f.close()
488 if os.path.exists(fn):
489 f = check_valid_file(fn)
490 if f is not None:
491 return f
492 unlink(fn)
494 # Verify the requirement before downloading the file
495 requires('urlfetch')
497 print >> get_original_stdout(), '\tfetching %s ...' % url
498 f = urllib2.urlopen(url, timeout=15)
499 try:
500 with open(fn, "wb") as out:
501 s = f.read()
502 while s:
503 out.write(s)
504 s = f.read()
505 finally:
506 f.close()
508 f = check_valid_file(fn)
509 if f is not None:
510 return f
511 raise TestFailed('invalid resource "%s"' % fn)
514 class WarningsRecorder(object):
515 """Convenience wrapper for the warnings list returned on
516 entry to the warnings.catch_warnings() context manager.
518 def __init__(self, warnings_list):
519 self._warnings = warnings_list
520 self._last = 0
522 def __getattr__(self, attr):
523 if len(self._warnings) > self._last:
524 return getattr(self._warnings[-1], attr)
525 elif attr in warnings.WarningMessage._WARNING_DETAILS:
526 return None
527 raise AttributeError("%r has no attribute %r" % (self, attr))
529 @property
530 def warnings(self):
531 return self._warnings[self._last:]
533 def reset(self):
534 self._last = len(self._warnings)
537 def _filterwarnings(filters, quiet=False):
538 """Catch the warnings, then check if all the expected
539 warnings have been raised and re-raise unexpected warnings.
540 If 'quiet' is True, only re-raise the unexpected warnings.
542 # Clear the warning registry of the calling module
543 # in order to re-raise the warnings.
544 frame = sys._getframe(2)
545 registry = frame.f_globals.get('__warningregistry__')
546 if registry:
547 registry.clear()
548 with warnings.catch_warnings(record=True) as w:
549 # Set filter "always" to record all warnings. Because
550 # test_warnings swap the module, we need to look up in
551 # the sys.modules dictionary.
552 sys.modules['warnings'].simplefilter("always")
553 yield WarningsRecorder(w)
554 # Filter the recorded warnings
555 reraise = [warning.message for warning in w]
556 missing = []
557 for msg, cat in filters:
558 seen = False
559 for exc in reraise[:]:
560 message = str(exc)
561 # Filter out the matching messages
562 if (re.match(msg, message, re.I) and
563 issubclass(exc.__class__, cat)):
564 seen = True
565 reraise.remove(exc)
566 if not seen and not quiet:
567 # This filter caught nothing
568 missing.append((msg, cat.__name__))
569 if reraise:
570 raise AssertionError("unhandled warning %r" % reraise[0])
571 if missing:
572 raise AssertionError("filter (%r, %s) did not catch any warning" %
573 missing[0])
576 @contextlib.contextmanager
577 def check_warnings(*filters, **kwargs):
578 """Context manager to silence warnings.
580 Accept 2-tuples as positional arguments:
581 ("message regexp", WarningCategory)
583 Optional argument:
584 - if 'quiet' is True, it does not fail if a filter catches nothing
585 (default True without argument,
586 default False if some filters are defined)
588 Without argument, it defaults to:
589 check_warnings(("", Warning), quiet=True)
591 quiet = kwargs.get('quiet')
592 if not filters:
593 filters = (("", Warning),)
594 # Preserve backward compatibility
595 if quiet is None:
596 quiet = True
597 return _filterwarnings(filters, quiet)
600 @contextlib.contextmanager
601 def check_py3k_warnings(*filters, **kwargs):
602 """Context manager to silence py3k warnings.
604 Accept 2-tuples as positional arguments:
605 ("message regexp", WarningCategory)
607 Optional argument:
608 - if 'quiet' is True, it does not fail if a filter catches nothing
609 (default False)
611 Without argument, it defaults to:
612 check_py3k_warnings(("", DeprecationWarning), quiet=False)
614 if sys.py3kwarning:
615 if not filters:
616 filters = (("", DeprecationWarning),)
617 else:
618 # It should not raise any py3k warning
619 filters = ()
620 return _filterwarnings(filters, kwargs.get('quiet'))
623 class CleanImport(object):
624 """Context manager to force import to return a new module reference.
626 This is useful for testing module-level behaviours, such as
627 the emission of a DeprecationWarning on import.
629 Use like this:
631 with CleanImport("foo"):
632 importlib.import_module("foo") # new reference
635 def __init__(self, *module_names):
636 self.original_modules = sys.modules.copy()
637 for module_name in module_names:
638 if module_name in sys.modules:
639 module = sys.modules[module_name]
640 # It is possible that module_name is just an alias for
641 # another module (e.g. stub for modules renamed in 3.x).
642 # In that case, we also need delete the real module to clear
643 # the import cache.
644 if module.__name__ != module_name:
645 del sys.modules[module.__name__]
646 del sys.modules[module_name]
648 def __enter__(self):
649 return self
651 def __exit__(self, *ignore_exc):
652 sys.modules.update(self.original_modules)
655 class EnvironmentVarGuard(UserDict.DictMixin):
657 """Class to help protect the environment variable properly. Can be used as
658 a context manager."""
660 def __init__(self):
661 self._environ = os.environ
662 self._changed = {}
664 def __getitem__(self, envvar):
665 return self._environ[envvar]
667 def __setitem__(self, envvar, value):
668 # Remember the initial value on the first access
669 if envvar not in self._changed:
670 self._changed[envvar] = self._environ.get(envvar)
671 self._environ[envvar] = value
673 def __delitem__(self, envvar):
674 # Remember the initial value on the first access
675 if envvar not in self._changed:
676 self._changed[envvar] = self._environ.get(envvar)
677 if envvar in self._environ:
678 del self._environ[envvar]
680 def keys(self):
681 return self._environ.keys()
683 def set(self, envvar, value):
684 self[envvar] = value
686 def unset(self, envvar):
687 del self[envvar]
689 def __enter__(self):
690 return self
692 def __exit__(self, *ignore_exc):
693 for (k, v) in self._changed.items():
694 if v is None:
695 if k in self._environ:
696 del self._environ[k]
697 else:
698 self._environ[k] = v
699 os.environ = self._environ
702 class DirsOnSysPath(object):
703 """Context manager to temporarily add directories to sys.path.
705 This makes a copy of sys.path, appends any directories given
706 as positional arguments, then reverts sys.path to the copied
707 settings when the context ends.
709 Note that *all* sys.path modifications in the body of the
710 context manager, including replacement of the object,
711 will be reverted at the end of the block.
714 def __init__(self, *paths):
715 self.original_value = sys.path[:]
716 self.original_object = sys.path
717 sys.path.extend(paths)
719 def __enter__(self):
720 return self
722 def __exit__(self, *ignore_exc):
723 sys.path = self.original_object
724 sys.path[:] = self.original_value
727 class TransientResource(object):
729 """Raise ResourceDenied if an exception is raised while the context manager
730 is in effect that matches the specified exception and attributes."""
732 def __init__(self, exc, **kwargs):
733 self.exc = exc
734 self.attrs = kwargs
736 def __enter__(self):
737 return self
739 def __exit__(self, type_=None, value=None, traceback=None):
740 """If type_ is a subclass of self.exc and value has attributes matching
741 self.attrs, raise ResourceDenied. Otherwise let the exception
742 propagate (if any)."""
743 if type_ is not None and issubclass(self.exc, type_):
744 for attr, attr_value in self.attrs.iteritems():
745 if not hasattr(value, attr):
746 break
747 if getattr(value, attr) != attr_value:
748 break
749 else:
750 raise ResourceDenied("an optional resource is not available")
753 _transients = {
754 IOError: (errno.ECONNRESET, errno.ETIMEDOUT),
755 socket.error: (errno.ECONNRESET,),
756 socket.gaierror: [getattr(socket, t)
757 for t in ('EAI_NODATA', 'EAI_NONAME')
758 if hasattr(socket, t)],
760 @contextlib.contextmanager
761 def transient_internet():
762 """Return a context manager that raises ResourceDenied when various issues
763 with the Internet connection manifest themselves as exceptions.
765 Errors caught:
766 timeout IOError errno = ETIMEDOUT
767 socket reset socket.error, IOError errno = ECONNRESET
768 dns no data socket.gaierror errno = EAI_NODATA
769 dns no name socket.gaierror errno = EAI_NONAME
771 try:
772 yield
773 except tuple(_transients) as err:
774 for errtype in _transients:
775 if isinstance(err, errtype) and err.errno in _transients[errtype]:
776 raise ResourceDenied("could not establish network "
777 "connection ({})".format(err))
778 raise
781 @contextlib.contextmanager
782 def captured_output(stream_name):
783 """Run the 'with' statement body using a StringIO object in place of a
784 specific attribute on the sys module.
785 Example use (with 'stream_name=stdout')::
787 with captured_stdout() as s:
788 print "hello"
789 assert s.getvalue() == "hello"
791 import StringIO
792 orig_stdout = getattr(sys, stream_name)
793 setattr(sys, stream_name, StringIO.StringIO())
794 try:
795 yield getattr(sys, stream_name)
796 finally:
797 setattr(sys, stream_name, orig_stdout)
799 def captured_stdout():
800 return captured_output("stdout")
802 def captured_stdin():
803 return captured_output("stdin")
805 def gc_collect():
806 """Force as many objects as possible to be collected.
808 In non-CPython implementations of Python, this is needed because timely
809 deallocation is not guaranteed by the garbage collector. (Even in CPython
810 this can be the case in case of reference cycles.) This means that __del__
811 methods may be called later than expected and weakrefs may remain alive for
812 longer than expected. This function tries its best to force all garbage
813 objects to disappear.
815 gc.collect()
816 if is_jython:
817 time.sleep(0.1)
818 gc.collect()
819 gc.collect()
822 #=======================================================================
823 # Decorator for running a function in a different locale, correctly resetting
824 # it afterwards.
826 def run_with_locale(catstr, *locales):
827 def decorator(func):
828 def inner(*args, **kwds):
829 try:
830 import locale
831 category = getattr(locale, catstr)
832 orig_locale = locale.setlocale(category)
833 except AttributeError:
834 # if the test author gives us an invalid category string
835 raise
836 except:
837 # cannot retrieve original locale, so do nothing
838 locale = orig_locale = None
839 else:
840 for loc in locales:
841 try:
842 locale.setlocale(category, loc)
843 break
844 except:
845 pass
847 # now run the function, resetting the locale on exceptions
848 try:
849 return func(*args, **kwds)
850 finally:
851 if locale and orig_locale:
852 locale.setlocale(category, orig_locale)
853 inner.func_name = func.func_name
854 inner.__doc__ = func.__doc__
855 return inner
856 return decorator
858 #=======================================================================
859 # Big-memory-test support. Separate from 'resources' because memory use should be configurable.
861 # Some handy shorthands. Note that these are used for byte-limits as well
862 # as size-limits, in the various bigmem tests
863 _1M = 1024*1024
864 _1G = 1024 * _1M
865 _2G = 2 * _1G
866 _4G = 4 * _1G
868 MAX_Py_ssize_t = sys.maxsize
870 def set_memlimit(limit):
871 global max_memuse
872 global real_max_memuse
873 sizes = {
874 'k': 1024,
875 'm': _1M,
876 'g': _1G,
877 't': 1024*_1G,
879 m = re.match(r'(\d+(\.\d+)?) (K|M|G|T)b?$', limit,
880 re.IGNORECASE | re.VERBOSE)
881 if m is None:
882 raise ValueError('Invalid memory limit %r' % (limit,))
883 memlimit = int(float(m.group(1)) * sizes[m.group(3).lower()])
884 real_max_memuse = memlimit
885 if memlimit > MAX_Py_ssize_t:
886 memlimit = MAX_Py_ssize_t
887 if memlimit < _2G - 1:
888 raise ValueError('Memory limit %r too low to be useful' % (limit,))
889 max_memuse = memlimit
891 def bigmemtest(minsize, memuse, overhead=5*_1M):
892 """Decorator for bigmem tests.
894 'minsize' is the minimum useful size for the test (in arbitrary,
895 test-interpreted units.) 'memuse' is the number of 'bytes per size' for
896 the test, or a good estimate of it. 'overhead' specifies fixed overhead,
897 independent of the testsize, and defaults to 5Mb.
899 The decorator tries to guess a good value for 'size' and passes it to
900 the decorated test function. If minsize * memuse is more than the
901 allowed memory use (as defined by max_memuse), the test is skipped.
902 Otherwise, minsize is adjusted upward to use up to max_memuse.
904 def decorator(f):
905 def wrapper(self):
906 if not max_memuse:
907 # If max_memuse is 0 (the default),
908 # we still want to run the tests with size set to a few kb,
909 # to make sure they work. We still want to avoid using
910 # too much memory, though, but we do that noisily.
911 maxsize = 5147
912 self.assertFalse(maxsize * memuse + overhead > 20 * _1M)
913 else:
914 maxsize = int((max_memuse - overhead) / memuse)
915 if maxsize < minsize:
916 # Really ought to print 'test skipped' or something
917 if verbose:
918 sys.stderr.write("Skipping %s because of memory "
919 "constraint\n" % (f.__name__,))
920 return
921 # Try to keep some breathing room in memory use
922 maxsize = max(maxsize - 50 * _1M, minsize)
923 return f(self, maxsize)
924 wrapper.minsize = minsize
925 wrapper.memuse = memuse
926 wrapper.overhead = overhead
927 return wrapper
928 return decorator
930 def precisionbigmemtest(size, memuse, overhead=5*_1M):
931 def decorator(f):
932 def wrapper(self):
933 if not real_max_memuse:
934 maxsize = 5147
935 else:
936 maxsize = size
938 if real_max_memuse and real_max_memuse < maxsize * memuse:
939 if verbose:
940 sys.stderr.write("Skipping %s because of memory "
941 "constraint\n" % (f.__name__,))
942 return
944 return f(self, maxsize)
945 wrapper.size = size
946 wrapper.memuse = memuse
947 wrapper.overhead = overhead
948 return wrapper
949 return decorator
951 def bigaddrspacetest(f):
952 """Decorator for tests that fill the address space."""
953 def wrapper(self):
954 if max_memuse < MAX_Py_ssize_t:
955 if verbose:
956 sys.stderr.write("Skipping %s because of memory "
957 "constraint\n" % (f.__name__,))
958 else:
959 return f(self)
960 return wrapper
962 #=======================================================================
963 # unittest integration.
965 class BasicTestRunner:
966 def run(self, test):
967 result = unittest.TestResult()
968 test(result)
969 return result
971 def _id(obj):
972 return obj
974 def requires_resource(resource):
975 if resource_is_enabled(resource):
976 return _id
977 else:
978 return unittest.skip("resource {0!r} is not enabled".format(resource))
980 def cpython_only(test):
982 Decorator for tests only applicable on CPython.
984 return impl_detail(cpython=True)(test)
986 def impl_detail(msg=None, **guards):
987 if check_impl_detail(**guards):
988 return _id
989 if msg is None:
990 guardnames, default = _parse_guards(guards)
991 if default:
992 msg = "implementation detail not available on {0}"
993 else:
994 msg = "implementation detail specific to {0}"
995 guardnames = sorted(guardnames.keys())
996 msg = msg.format(' or '.join(guardnames))
997 return unittest.skip(msg)
999 def _parse_guards(guards):
1000 # Returns a tuple ({platform_name: run_me}, default_value)
1001 if not guards:
1002 return ({'cpython': True}, False)
1003 is_true = guards.values()[0]
1004 assert guards.values() == [is_true] * len(guards) # all True or all False
1005 return (guards, not is_true)
1007 # Use the following check to guard CPython's implementation-specific tests --
1008 # or to run them only on the implementation(s) guarded by the arguments.
1009 def check_impl_detail(**guards):
1010 """This function returns True or False depending on the host platform.
1011 Examples:
1012 if check_impl_detail(): # only on CPython (default)
1013 if check_impl_detail(jython=True): # only on Jython
1014 if check_impl_detail(cpython=False): # everywhere except on CPython
1016 guards, default = _parse_guards(guards)
1017 return guards.get(platform.python_implementation().lower(), default)
1021 def _run_suite(suite):
1022 """Run tests from a unittest.TestSuite-derived class."""
1023 if verbose:
1024 runner = unittest.TextTestRunner(sys.stdout, verbosity=2)
1025 else:
1026 runner = BasicTestRunner()
1028 result = runner.run(suite)
1029 if not result.wasSuccessful():
1030 if len(result.errors) == 1 and not result.failures:
1031 err = result.errors[0][1]
1032 elif len(result.failures) == 1 and not result.errors:
1033 err = result.failures[0][1]
1034 else:
1035 err = "multiple errors occurred"
1036 if not verbose:
1037 err += "; run in verbose mode for details"
1038 raise TestFailed(err)
1041 def run_unittest(*classes):
1042 """Run tests from unittest.TestCase-derived classes."""
1043 valid_types = (unittest.TestSuite, unittest.TestCase)
1044 suite = unittest.TestSuite()
1045 for cls in classes:
1046 if isinstance(cls, str):
1047 if cls in sys.modules:
1048 suite.addTest(unittest.findTestCases(sys.modules[cls]))
1049 else:
1050 raise ValueError("str arguments must be keys in sys.modules")
1051 elif isinstance(cls, valid_types):
1052 suite.addTest(cls)
1053 else:
1054 suite.addTest(unittest.makeSuite(cls))
1055 _run_suite(suite)
1058 #=======================================================================
1059 # doctest driver.
1061 def run_doctest(module, verbosity=None):
1062 """Run doctest on the given module. Return (#failures, #tests).
1064 If optional argument verbosity is not specified (or is None), pass
1065 test_support's belief about verbosity on to doctest. Else doctest's
1066 usual behavior is used (it searches sys.argv for -v).
1069 import doctest
1071 if verbosity is None:
1072 verbosity = verbose
1073 else:
1074 verbosity = None
1076 # Direct doctest output (normally just errors) to real stdout; doctest
1077 # output shouldn't be compared by regrtest.
1078 save_stdout = sys.stdout
1079 sys.stdout = get_original_stdout()
1080 try:
1081 f, t = doctest.testmod(module, verbose=verbosity)
1082 if f:
1083 raise TestFailed("%d of %d doctests failed" % (f, t))
1084 finally:
1085 sys.stdout = save_stdout
1086 if verbose:
1087 print 'doctest (%s) ... %d tests with zero failures' % (module.__name__, t)
1088 return f, t
1090 #=======================================================================
1091 # Threading support to prevent reporting refleaks when running regrtest.py -R
1093 # NOTE: we use thread._count() rather than threading.enumerate() (or the
1094 # moral equivalent thereof) because a threading.Thread object is still alive
1095 # until its __bootstrap() method has returned, even after it has been
1096 # unregistered from the threading module.
1097 # thread._count(), on the other hand, only gets decremented *after* the
1098 # __bootstrap() method has returned, which gives us reliable reference counts
1099 # at the end of a test run.
1101 def threading_setup():
1102 if thread:
1103 return thread._count(),
1104 else:
1105 return 1,
1107 def threading_cleanup(nb_threads):
1108 if not thread:
1109 return
1111 _MAX_COUNT = 10
1112 for count in range(_MAX_COUNT):
1113 n = thread._count()
1114 if n == nb_threads:
1115 break
1116 time.sleep(0.1)
1117 # XXX print a warning in case of failure?
1119 def reap_threads(func):
1120 """Use this function when threads are being used. This will
1121 ensure that the threads are cleaned up even when the test fails.
1122 If threading is unavailable this function does nothing.
1124 if not thread:
1125 return func
1127 @functools.wraps(func)
1128 def decorator(*args):
1129 key = threading_setup()
1130 try:
1131 return func(*args)
1132 finally:
1133 threading_cleanup(*key)
1134 return decorator
1136 def reap_children():
1137 """Use this function at the end of test_main() whenever sub-processes
1138 are started. This will help ensure that no extra children (zombies)
1139 stick around to hog resources and create problems when looking
1140 for refleaks.
1143 # Reap all our dead child processes so we don't leave zombies around.
1144 # These hog resources and might be causing some of the buildbots to die.
1145 if hasattr(os, 'waitpid'):
1146 any_process = -1
1147 while True:
1148 try:
1149 # This will raise an exception on Windows. That's ok.
1150 pid, status = os.waitpid(any_process, os.WNOHANG)
1151 if pid == 0:
1152 break
1153 except:
1154 break
1156 def py3k_bytes(b):
1157 """Emulate the py3k bytes() constructor.
1159 NOTE: This is only a best effort function.
1161 try:
1162 # memoryview?
1163 return b.tobytes()
1164 except AttributeError:
1165 try:
1166 # iterable of ints?
1167 return b"".join(chr(x) for x in b)
1168 except TypeError:
1169 return bytes(b)