Use a custom timeout in test_support.open_urlresource.
[python.git] / Lib / test / test_support.py
blobc2dcb57915f21c3aa0df89503474bdc2eae211e5
1 """Supporting definitions for the Python regression tests."""
3 if __name__ != 'test.test_support':
4 raise ImportError('test_support must be imported from the test package')
6 import contextlib
7 import errno
8 import functools
9 import gc
10 import socket
11 import sys
12 import os
13 import platform
14 import shutil
15 import warnings
16 import unittest
17 import importlib
18 import UserDict
20 __all__ = ["Error", "TestFailed", "ResourceDenied", "import_module",
21 "verbose", "use_resources", "max_memuse", "record_original_stdout",
22 "get_original_stdout", "unload", "unlink", "rmtree", "forget",
23 "is_resource_enabled", "requires", "find_unused_port", "bind_port",
24 "fcmp", "have_unicode", "is_jython", "TESTFN", "HOST", "FUZZ",
25 "findfile", "verify", "vereq", "sortdict", "check_syntax_error",
26 "open_urlresource", "check_warnings", "CleanImport",
27 "EnvironmentVarGuard", "captured_output",
28 "captured_stdout", "TransientResource", "transient_internet",
29 "run_with_locale", "set_memlimit", "bigmemtest", "bigaddrspacetest",
30 "BasicTestRunner", "run_unittest", "run_doctest", "threading_setup",
31 "threading_cleanup", "reap_children", "cpython_only",
32 "check_impl_detail", "get_attribute", "py3k_bytes"]
34 class Error(Exception):
35 """Base class for regression test exceptions."""
37 class TestFailed(Error):
38 """Test failed."""
40 class ResourceDenied(unittest.SkipTest):
41 """Test skipped because it requested a disallowed resource.
43 This is raised when a test calls requires() for a resource that
44 has not be enabled. It is used to distinguish between expected
45 and unexpected skips.
46 """
48 @contextlib.contextmanager
49 def _ignore_deprecated_imports(ignore=True):
50 """Context manager to suppress package and module deprecation
51 warnings when importing them.
53 If ignore is False, this context manager has no effect."""
54 if ignore:
55 with warnings.catch_warnings():
56 warnings.filterwarnings("ignore", ".+ (module|package)",
57 DeprecationWarning)
58 yield
59 else:
60 yield
63 def import_module(name, deprecated=False):
64 """Import and return the module to be tested, raising SkipTest if
65 it is not available.
67 If deprecated is True, any module or package deprecation messages
68 will be suppressed."""
69 with _ignore_deprecated_imports(deprecated):
70 try:
71 return importlib.import_module(name)
72 except ImportError, msg:
73 raise unittest.SkipTest(str(msg))
76 def _save_and_remove_module(name, orig_modules):
77 """Helper function to save and remove a module from sys.modules
79 Return value is True if the module was in sys.modules and
80 False otherwise."""
81 saved = True
82 try:
83 orig_modules[name] = sys.modules[name]
84 except KeyError:
85 saved = False
86 else:
87 del sys.modules[name]
88 return saved
91 def _save_and_block_module(name, orig_modules):
92 """Helper function to save and block a module in sys.modules
94 Return value is True if the module was in sys.modules and
95 False otherwise."""
96 saved = True
97 try:
98 orig_modules[name] = sys.modules[name]
99 except KeyError:
100 saved = False
101 sys.modules[name] = 0
102 return saved
105 def import_fresh_module(name, fresh=(), blocked=(), deprecated=False):
106 """Imports and returns a module, deliberately bypassing the sys.modules cache
107 and importing a fresh copy of the module. Once the import is complete,
108 the sys.modules cache is restored to its original state.
110 Modules named in fresh are also imported anew if needed by the import.
112 Importing of modules named in blocked is prevented while the fresh import
113 takes place.
115 If deprecated is True, any module or package deprecation messages
116 will be suppressed."""
117 # NOTE: test_heapq and test_warnings include extra sanity checks to make
118 # sure that this utility function is working as expected
119 with _ignore_deprecated_imports(deprecated):
120 # Keep track of modules saved for later restoration as well
121 # as those which just need a blocking entry removed
122 orig_modules = {}
123 names_to_remove = []
124 _save_and_remove_module(name, orig_modules)
125 try:
126 for fresh_name in fresh:
127 _save_and_remove_module(fresh_name, orig_modules)
128 for blocked_name in blocked:
129 if not _save_and_block_module(blocked_name, orig_modules):
130 names_to_remove.append(blocked_name)
131 fresh_module = importlib.import_module(name)
132 finally:
133 for orig_name, module in orig_modules.items():
134 sys.modules[orig_name] = module
135 for name_to_remove in names_to_remove:
136 del sys.modules[name_to_remove]
137 return fresh_module
140 def get_attribute(obj, name):
141 """Get an attribute, raising SkipTest if AttributeError is raised."""
142 try:
143 attribute = getattr(obj, name)
144 except AttributeError:
145 raise unittest.SkipTest("module %s has no attribute %s" % (
146 obj.__name__, name))
147 else:
148 return attribute
151 verbose = 1 # Flag set to 0 by regrtest.py
152 use_resources = None # Flag set to [] by regrtest.py
153 max_memuse = 0 # Disable bigmem tests (they will still be run with
154 # small sizes, to make sure they work.)
155 real_max_memuse = 0
157 # _original_stdout is meant to hold stdout at the time regrtest began.
158 # This may be "the real" stdout, or IDLE's emulation of stdout, or whatever.
159 # The point is to have some flavor of stdout the user can actually see.
160 _original_stdout = None
161 def record_original_stdout(stdout):
162 global _original_stdout
163 _original_stdout = stdout
165 def get_original_stdout():
166 return _original_stdout or sys.stdout
168 def unload(name):
169 try:
170 del sys.modules[name]
171 except KeyError:
172 pass
174 def unlink(filename):
175 try:
176 os.unlink(filename)
177 except OSError:
178 pass
180 def rmtree(path):
181 try:
182 shutil.rmtree(path)
183 except OSError, e:
184 # Unix returns ENOENT, Windows returns ESRCH.
185 if e.errno not in (errno.ENOENT, errno.ESRCH):
186 raise
188 def forget(modname):
189 '''"Forget" a module was ever imported by removing it from sys.modules and
190 deleting any .pyc and .pyo files.'''
191 unload(modname)
192 for dirname in sys.path:
193 unlink(os.path.join(dirname, modname + os.extsep + 'pyc'))
194 # Deleting the .pyo file cannot be within the 'try' for the .pyc since
195 # the chance exists that there is no .pyc (and thus the 'try' statement
196 # is exited) but there is a .pyo file.
197 unlink(os.path.join(dirname, modname + os.extsep + 'pyo'))
199 def is_resource_enabled(resource):
200 """Test whether a resource is enabled. Known resources are set by
201 regrtest.py."""
202 return use_resources is not None and resource in use_resources
204 def requires(resource, msg=None):
205 """Raise ResourceDenied if the specified resource is not available.
207 If the caller's module is __main__ then automatically return True. The
208 possibility of False being returned occurs when regrtest.py is executing."""
209 # see if the caller's module is __main__ - if so, treat as if
210 # the resource was set
211 if sys._getframe(1).f_globals.get("__name__") == "__main__":
212 return
213 if not is_resource_enabled(resource):
214 if msg is None:
215 msg = "Use of the `%s' resource not enabled" % resource
216 raise ResourceDenied(msg)
218 HOST = 'localhost'
220 def find_unused_port(family=socket.AF_INET, socktype=socket.SOCK_STREAM):
221 """Returns an unused port that should be suitable for binding. This is
222 achieved by creating a temporary socket with the same family and type as
223 the 'sock' parameter (default is AF_INET, SOCK_STREAM), and binding it to
224 the specified host address (defaults to 0.0.0.0) with the port set to 0,
225 eliciting an unused ephemeral port from the OS. The temporary socket is
226 then closed and deleted, and the ephemeral port is returned.
228 Either this method or bind_port() should be used for any tests where a
229 server socket needs to be bound to a particular port for the duration of
230 the test. Which one to use depends on whether the calling code is creating
231 a python socket, or if an unused port needs to be provided in a constructor
232 or passed to an external program (i.e. the -accept argument to openssl's
233 s_server mode). Always prefer bind_port() over find_unused_port() where
234 possible. Hard coded ports should *NEVER* be used. As soon as a server
235 socket is bound to a hard coded port, the ability to run multiple instances
236 of the test simultaneously on the same host is compromised, which makes the
237 test a ticking time bomb in a buildbot environment. On Unix buildbots, this
238 may simply manifest as a failed test, which can be recovered from without
239 intervention in most cases, but on Windows, the entire python process can
240 completely and utterly wedge, requiring someone to log in to the buildbot
241 and manually kill the affected process.
243 (This is easy to reproduce on Windows, unfortunately, and can be traced to
244 the SO_REUSEADDR socket option having different semantics on Windows versus
245 Unix/Linux. On Unix, you can't have two AF_INET SOCK_STREAM sockets bind,
246 listen and then accept connections on identical host/ports. An EADDRINUSE
247 socket.error will be raised at some point (depending on the platform and
248 the order bind and listen were called on each socket).
250 However, on Windows, if SO_REUSEADDR is set on the sockets, no EADDRINUSE
251 will ever be raised when attempting to bind two identical host/ports. When
252 accept() is called on each socket, the second caller's process will steal
253 the port from the first caller, leaving them both in an awkwardly wedged
254 state where they'll no longer respond to any signals or graceful kills, and
255 must be forcibly killed via OpenProcess()/TerminateProcess().
257 The solution on Windows is to use the SO_EXCLUSIVEADDRUSE socket option
258 instead of SO_REUSEADDR, which effectively affords the same semantics as
259 SO_REUSEADDR on Unix. Given the propensity of Unix developers in the Open
260 Source world compared to Windows ones, this is a common mistake. A quick
261 look over OpenSSL's 0.9.8g source shows that they use SO_REUSEADDR when
262 openssl.exe is called with the 's_server' option, for example. See
263 http://bugs.python.org/issue2550 for more info. The following site also
264 has a very thorough description about the implications of both REUSEADDR
265 and EXCLUSIVEADDRUSE on Windows:
266 http://msdn2.microsoft.com/en-us/library/ms740621(VS.85).aspx)
268 XXX: although this approach is a vast improvement on previous attempts to
269 elicit unused ports, it rests heavily on the assumption that the ephemeral
270 port returned to us by the OS won't immediately be dished back out to some
271 other process when we close and delete our temporary socket but before our
272 calling code has a chance to bind the returned port. We can deal with this
273 issue if/when we come across it."""
274 tempsock = socket.socket(family, socktype)
275 port = bind_port(tempsock)
276 tempsock.close()
277 del tempsock
278 return port
280 def bind_port(sock, host=HOST):
281 """Bind the socket to a free port and return the port number. Relies on
282 ephemeral ports in order to ensure we are using an unbound port. This is
283 important as many tests may be running simultaneously, especially in a
284 buildbot environment. This method raises an exception if the sock.family
285 is AF_INET and sock.type is SOCK_STREAM, *and* the socket has SO_REUSEADDR
286 or SO_REUSEPORT set on it. Tests should *never* set these socket options
287 for TCP/IP sockets. The only case for setting these options is testing
288 multicasting via multiple UDP sockets.
290 Additionally, if the SO_EXCLUSIVEADDRUSE socket option is available (i.e.
291 on Windows), it will be set on the socket. This will prevent anyone else
292 from bind()'ing to our host/port for the duration of the test.
294 if sock.family == socket.AF_INET and sock.type == socket.SOCK_STREAM:
295 if hasattr(socket, 'SO_REUSEADDR'):
296 if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR) == 1:
297 raise TestFailed("tests should never set the SO_REUSEADDR " \
298 "socket option on TCP/IP sockets!")
299 if hasattr(socket, 'SO_REUSEPORT'):
300 if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT) == 1:
301 raise TestFailed("tests should never set the SO_REUSEPORT " \
302 "socket option on TCP/IP sockets!")
303 if hasattr(socket, 'SO_EXCLUSIVEADDRUSE'):
304 sock.setsockopt(socket.SOL_SOCKET, socket.SO_EXCLUSIVEADDRUSE, 1)
306 sock.bind((host, 0))
307 port = sock.getsockname()[1]
308 return port
310 FUZZ = 1e-6
312 def fcmp(x, y): # fuzzy comparison function
313 if isinstance(x, float) or isinstance(y, float):
314 try:
315 fuzz = (abs(x) + abs(y)) * FUZZ
316 if abs(x-y) <= fuzz:
317 return 0
318 except:
319 pass
320 elif type(x) == type(y) and isinstance(x, (tuple, list)):
321 for i in range(min(len(x), len(y))):
322 outcome = fcmp(x[i], y[i])
323 if outcome != 0:
324 return outcome
325 return (len(x) > len(y)) - (len(x) < len(y))
326 return (x > y) - (x < y)
328 try:
329 unicode
330 have_unicode = True
331 except NameError:
332 have_unicode = False
334 is_jython = sys.platform.startswith('java')
336 # Filename used for testing
337 if os.name == 'java':
338 # Jython disallows @ in module names
339 TESTFN = '$test'
340 elif os.name == 'riscos':
341 TESTFN = 'testfile'
342 else:
343 TESTFN = '@test'
344 # Unicode name only used if TEST_FN_ENCODING exists for the platform.
345 if have_unicode:
346 # Assuming sys.getfilesystemencoding()!=sys.getdefaultencoding()
347 # TESTFN_UNICODE is a filename that can be encoded using the
348 # file system encoding, but *not* with the default (ascii) encoding
349 if isinstance('', unicode):
350 # python -U
351 # XXX perhaps unicode() should accept Unicode strings?
352 TESTFN_UNICODE = "@test-\xe0\xf2"
353 else:
354 # 2 latin characters.
355 TESTFN_UNICODE = unicode("@test-\xe0\xf2", "latin-1")
356 TESTFN_ENCODING = sys.getfilesystemencoding()
357 # TESTFN_UNICODE_UNENCODEABLE is a filename that should *not* be
358 # able to be encoded by *either* the default or filesystem encoding.
359 # This test really only makes sense on Windows NT platforms
360 # which have special Unicode support in posixmodule.
361 if (not hasattr(sys, "getwindowsversion") or
362 sys.getwindowsversion()[3] < 2): # 0=win32s or 1=9x/ME
363 TESTFN_UNICODE_UNENCODEABLE = None
364 else:
365 # Japanese characters (I think - from bug 846133)
366 TESTFN_UNICODE_UNENCODEABLE = eval('u"@test-\u5171\u6709\u3055\u308c\u308b"')
367 try:
368 # XXX - Note - should be using TESTFN_ENCODING here - but for
369 # Windows, "mbcs" currently always operates as if in
370 # errors=ignore' mode - hence we get '?' characters rather than
371 # the exception. 'Latin1' operates as we expect - ie, fails.
372 # See [ 850997 ] mbcs encoding ignores errors
373 TESTFN_UNICODE_UNENCODEABLE.encode("Latin1")
374 except UnicodeEncodeError:
375 pass
376 else:
377 print \
378 'WARNING: The filename %r CAN be encoded by the filesystem. ' \
379 'Unicode filename tests may not be effective' \
380 % TESTFN_UNICODE_UNENCODEABLE
382 # Disambiguate TESTFN for parallel testing, while letting it remain a valid
383 # module name.
384 TESTFN = "{0}_{1}_tmp".format(TESTFN, os.getpid())
386 # Make sure we can write to TESTFN, try in /tmp if we can't
387 fp = None
388 try:
389 fp = open(TESTFN, 'w+')
390 except IOError:
391 TMP_TESTFN = os.path.join('/tmp', TESTFN)
392 try:
393 fp = open(TMP_TESTFN, 'w+')
394 TESTFN = TMP_TESTFN
395 del TMP_TESTFN
396 except IOError:
397 print ('WARNING: tests will fail, unable to write to: %s or %s' %
398 (TESTFN, TMP_TESTFN))
399 if fp is not None:
400 fp.close()
401 unlink(TESTFN)
402 del fp
404 def findfile(file, here=__file__):
405 """Try to find a file on sys.path and the working directory. If it is not
406 found the argument passed to the function is returned (this does not
407 necessarily signal failure; could still be the legitimate path)."""
408 if os.path.isabs(file):
409 return file
410 path = sys.path
411 path = [os.path.dirname(here)] + path
412 for dn in path:
413 fn = os.path.join(dn, file)
414 if os.path.exists(fn): return fn
415 return file
417 def verify(condition, reason='test failed'):
418 """Verify that condition is true. If not, raise TestFailed.
420 The optional argument reason can be given to provide
421 a better error text.
424 if not condition:
425 raise TestFailed(reason)
427 def vereq(a, b):
428 """Raise TestFailed if a == b is false.
430 This is better than verify(a == b) because, in case of failure, the
431 error message incorporates repr(a) and repr(b) so you can see the
432 inputs.
434 Note that "not (a == b)" isn't necessarily the same as "a != b"; the
435 former is tested.
438 if not (a == b):
439 raise TestFailed("%r == %r" % (a, b))
441 def sortdict(dict):
442 "Like repr(dict), but in sorted order."
443 items = dict.items()
444 items.sort()
445 reprpairs = ["%r: %r" % pair for pair in items]
446 withcommas = ", ".join(reprpairs)
447 return "{%s}" % withcommas
449 def make_bad_fd():
451 Create an invalid file descriptor by opening and closing a file and return
452 its fd.
454 file = open(TESTFN, "wb")
455 try:
456 return file.fileno()
457 finally:
458 file.close()
459 unlink(TESTFN)
461 def check_syntax_error(testcase, statement):
462 testcase.assertRaises(SyntaxError, compile, statement,
463 '<test string>', 'exec')
465 def open_urlresource(url):
466 import urlparse, urllib2
468 requires('urlfetch')
469 filename = urlparse.urlparse(url)[2].split('/')[-1] # '/': it's URL!
471 fn = os.path.join(os.path.dirname(__file__), "data", filename)
472 if os.path.exists(fn):
473 return open(fn)
475 print >> get_original_stdout(), '\tfetching %s ...' % url
476 f = urllib2.urlopen(url, timeout=15)
477 try:
478 with open(fn, "wb") as out:
479 s = f.read()
480 while s:
481 out.write(s)
482 s = f.read()
483 finally:
484 f.close()
485 return open(fn)
488 class WarningsRecorder(object):
489 """Convenience wrapper for the warnings list returned on
490 entry to the warnings.catch_warnings() context manager.
492 def __init__(self, warnings_list):
493 self.warnings = warnings_list
495 def __getattr__(self, attr):
496 if self.warnings:
497 return getattr(self.warnings[-1], attr)
498 elif attr in warnings.WarningMessage._WARNING_DETAILS:
499 return None
500 raise AttributeError("%r has no attribute %r" % (self, attr))
502 def reset(self):
503 del self.warnings[:]
505 @contextlib.contextmanager
506 def check_warnings():
507 with warnings.catch_warnings(record=True) as w:
508 yield WarningsRecorder(w)
511 class CleanImport(object):
512 """Context manager to force import to return a new module reference.
514 This is useful for testing module-level behaviours, such as
515 the emission of a DeprecationWarning on import.
517 Use like this:
519 with CleanImport("foo"):
520 __import__("foo") # new reference
523 def __init__(self, *module_names):
524 self.original_modules = sys.modules.copy()
525 for module_name in module_names:
526 if module_name in sys.modules:
527 module = sys.modules[module_name]
528 # It is possible that module_name is just an alias for
529 # another module (e.g. stub for modules renamed in 3.x).
530 # In that case, we also need delete the real module to clear
531 # the import cache.
532 if module.__name__ != module_name:
533 del sys.modules[module.__name__]
534 del sys.modules[module_name]
536 def __enter__(self):
537 return self
539 def __exit__(self, *ignore_exc):
540 sys.modules.update(self.original_modules)
543 class EnvironmentVarGuard(UserDict.DictMixin):
545 """Class to help protect the environment variable properly. Can be used as
546 a context manager."""
548 def __init__(self):
549 self._environ = os.environ
550 self._changed = {}
552 def __getitem__(self, envvar):
553 return self._environ[envvar]
555 def __setitem__(self, envvar, value):
556 # Remember the initial value on the first access
557 if envvar not in self._changed:
558 self._changed[envvar] = self._environ.get(envvar)
559 self._environ[envvar] = value
561 def __delitem__(self, envvar):
562 # Remember the initial value on the first access
563 if envvar not in self._changed:
564 self._changed[envvar] = self._environ.get(envvar)
565 if envvar in self._environ:
566 del self._environ[envvar]
568 def keys(self):
569 return self._environ.keys()
571 def set(self, envvar, value):
572 self[envvar] = value
574 def unset(self, envvar):
575 del self[envvar]
577 def __enter__(self):
578 return self
580 def __exit__(self, *ignore_exc):
581 for (k, v) in self._changed.items():
582 if v is None:
583 if k in self._environ:
584 del self._environ[k]
585 else:
586 self._environ[k] = v
587 os.environ = self._environ
590 class DirsOnSysPath(object):
591 """Context manager to temporarily add directories to sys.path.
593 This makes a copy of sys.path, appends any directories given
594 as positional arguments, then reverts sys.path to the copied
595 settings when the context ends.
597 Note that *all* sys.path modifications in the body of the
598 context manager, including replacement of the object,
599 will be reverted at the end of the block.
602 def __init__(self, *paths):
603 self.original_value = sys.path[:]
604 self.original_object = sys.path
605 sys.path.extend(paths)
607 def __enter__(self):
608 return self
610 def __exit__(self, *ignore_exc):
611 sys.path = self.original_object
612 sys.path[:] = self.original_value
615 class TransientResource(object):
617 """Raise ResourceDenied if an exception is raised while the context manager
618 is in effect that matches the specified exception and attributes."""
620 def __init__(self, exc, **kwargs):
621 self.exc = exc
622 self.attrs = kwargs
624 def __enter__(self):
625 return self
627 def __exit__(self, type_=None, value=None, traceback=None):
628 """If type_ is a subclass of self.exc and value has attributes matching
629 self.attrs, raise ResourceDenied. Otherwise let the exception
630 propagate (if any)."""
631 if type_ is not None and issubclass(self.exc, type_):
632 for attr, attr_value in self.attrs.iteritems():
633 if not hasattr(value, attr):
634 break
635 if getattr(value, attr) != attr_value:
636 break
637 else:
638 raise ResourceDenied("an optional resource is not available")
641 @contextlib.contextmanager
642 def transient_internet():
643 """Return a context manager that raises ResourceDenied when various issues
644 with the Internet connection manifest themselves as exceptions."""
645 time_out = TransientResource(IOError, errno=errno.ETIMEDOUT)
646 socket_peer_reset = TransientResource(socket.error, errno=errno.ECONNRESET)
647 ioerror_peer_reset = TransientResource(IOError, errno=errno.ECONNRESET)
648 with time_out, socket_peer_reset, ioerror_peer_reset:
649 yield
652 @contextlib.contextmanager
653 def captured_output(stream_name):
654 """Run the 'with' statement body using a StringIO object in place of a
655 specific attribute on the sys module.
656 Example use (with 'stream_name=stdout')::
658 with captured_stdout() as s:
659 print "hello"
660 assert s.getvalue() == "hello"
662 import StringIO
663 orig_stdout = getattr(sys, stream_name)
664 setattr(sys, stream_name, StringIO.StringIO())
665 try:
666 yield getattr(sys, stream_name)
667 finally:
668 setattr(sys, stream_name, orig_stdout)
670 def captured_stdout():
671 return captured_output("stdout")
673 def captured_stdin():
674 return captured_output("stdin")
676 def gc_collect():
677 """Force as many objects as possible to be collected.
679 In non-CPython implementations of Python, this is needed because timely
680 deallocation is not guaranteed by the garbage collector. (Even in CPython
681 this can be the case in case of reference cycles.) This means that __del__
682 methods may be called later than expected and weakrefs may remain alive for
683 longer than expected. This function tries its best to force all garbage
684 objects to disappear.
686 gc.collect()
687 gc.collect()
688 gc.collect()
691 #=======================================================================
692 # Decorator for running a function in a different locale, correctly resetting
693 # it afterwards.
695 def run_with_locale(catstr, *locales):
696 def decorator(func):
697 def inner(*args, **kwds):
698 try:
699 import locale
700 category = getattr(locale, catstr)
701 orig_locale = locale.setlocale(category)
702 except AttributeError:
703 # if the test author gives us an invalid category string
704 raise
705 except:
706 # cannot retrieve original locale, so do nothing
707 locale = orig_locale = None
708 else:
709 for loc in locales:
710 try:
711 locale.setlocale(category, loc)
712 break
713 except:
714 pass
716 # now run the function, resetting the locale on exceptions
717 try:
718 return func(*args, **kwds)
719 finally:
720 if locale and orig_locale:
721 locale.setlocale(category, orig_locale)
722 inner.func_name = func.func_name
723 inner.__doc__ = func.__doc__
724 return inner
725 return decorator
727 #=======================================================================
728 # Big-memory-test support. Separate from 'resources' because memory use should be configurable.
730 # Some handy shorthands. Note that these are used for byte-limits as well
731 # as size-limits, in the various bigmem tests
732 _1M = 1024*1024
733 _1G = 1024 * _1M
734 _2G = 2 * _1G
735 _4G = 4 * _1G
737 MAX_Py_ssize_t = sys.maxsize
739 def set_memlimit(limit):
740 import re
741 global max_memuse
742 global real_max_memuse
743 sizes = {
744 'k': 1024,
745 'm': _1M,
746 'g': _1G,
747 't': 1024*_1G,
749 m = re.match(r'(\d+(\.\d+)?) (K|M|G|T)b?$', limit,
750 re.IGNORECASE | re.VERBOSE)
751 if m is None:
752 raise ValueError('Invalid memory limit %r' % (limit,))
753 memlimit = int(float(m.group(1)) * sizes[m.group(3).lower()])
754 real_max_memuse = memlimit
755 if memlimit > MAX_Py_ssize_t:
756 memlimit = MAX_Py_ssize_t
757 if memlimit < _2G - 1:
758 raise ValueError('Memory limit %r too low to be useful' % (limit,))
759 max_memuse = memlimit
761 def bigmemtest(minsize, memuse, overhead=5*_1M):
762 """Decorator for bigmem tests.
764 'minsize' is the minimum useful size for the test (in arbitrary,
765 test-interpreted units.) 'memuse' is the number of 'bytes per size' for
766 the test, or a good estimate of it. 'overhead' specifies fixed overhead,
767 independent of the testsize, and defaults to 5Mb.
769 The decorator tries to guess a good value for 'size' and passes it to
770 the decorated test function. If minsize * memuse is more than the
771 allowed memory use (as defined by max_memuse), the test is skipped.
772 Otherwise, minsize is adjusted upward to use up to max_memuse.
774 def decorator(f):
775 def wrapper(self):
776 if not max_memuse:
777 # If max_memuse is 0 (the default),
778 # we still want to run the tests with size set to a few kb,
779 # to make sure they work. We still want to avoid using
780 # too much memory, though, but we do that noisily.
781 maxsize = 5147
782 self.assertFalse(maxsize * memuse + overhead > 20 * _1M)
783 else:
784 maxsize = int((max_memuse - overhead) / memuse)
785 if maxsize < minsize:
786 # Really ought to print 'test skipped' or something
787 if verbose:
788 sys.stderr.write("Skipping %s because of memory "
789 "constraint\n" % (f.__name__,))
790 return
791 # Try to keep some breathing room in memory use
792 maxsize = max(maxsize - 50 * _1M, minsize)
793 return f(self, maxsize)
794 wrapper.minsize = minsize
795 wrapper.memuse = memuse
796 wrapper.overhead = overhead
797 return wrapper
798 return decorator
800 def precisionbigmemtest(size, memuse, overhead=5*_1M):
801 def decorator(f):
802 def wrapper(self):
803 if not real_max_memuse:
804 maxsize = 5147
805 else:
806 maxsize = size
808 if real_max_memuse and real_max_memuse < maxsize * memuse:
809 if verbose:
810 sys.stderr.write("Skipping %s because of memory "
811 "constraint\n" % (f.__name__,))
812 return
814 return f(self, maxsize)
815 wrapper.size = size
816 wrapper.memuse = memuse
817 wrapper.overhead = overhead
818 return wrapper
819 return decorator
821 def bigaddrspacetest(f):
822 """Decorator for tests that fill the address space."""
823 def wrapper(self):
824 if max_memuse < MAX_Py_ssize_t:
825 if verbose:
826 sys.stderr.write("Skipping %s because of memory "
827 "constraint\n" % (f.__name__,))
828 else:
829 return f(self)
830 return wrapper
832 #=======================================================================
833 # unittest integration.
835 class BasicTestRunner:
836 def run(self, test):
837 result = unittest.TestResult()
838 test(result)
839 return result
841 def _id(obj):
842 return obj
844 def requires_resource(resource):
845 if resource_is_enabled(resource):
846 return _id
847 else:
848 return unittest.skip("resource {0!r} is not enabled".format(resource))
850 def cpython_only(test):
852 Decorator for tests only applicable on CPython.
854 return impl_detail(cpython=True)(test)
856 def impl_detail(msg=None, **guards):
857 if check_impl_detail(**guards):
858 return _id
859 if msg is None:
860 guardnames, default = _parse_guards(guards)
861 if default:
862 msg = "implementation detail not available on {0}"
863 else:
864 msg = "implementation detail specific to {0}"
865 guardnames = sorted(guardnames.keys())
866 msg = msg.format(' or '.join(guardnames))
867 return unittest.skip(msg)
869 def _parse_guards(guards):
870 # Returns a tuple ({platform_name: run_me}, default_value)
871 if not guards:
872 return ({'cpython': True}, False)
873 is_true = guards.values()[0]
874 assert guards.values() == [is_true] * len(guards) # all True or all False
875 return (guards, not is_true)
877 # Use the following check to guard CPython's implementation-specific tests --
878 # or to run them only on the implementation(s) guarded by the arguments.
879 def check_impl_detail(**guards):
880 """This function returns True or False depending on the host platform.
881 Examples:
882 if check_impl_detail(): # only on CPython (default)
883 if check_impl_detail(jython=True): # only on Jython
884 if check_impl_detail(cpython=False): # everywhere except on CPython
886 guards, default = _parse_guards(guards)
887 return guards.get(platform.python_implementation().lower(), default)
891 def _run_suite(suite):
892 """Run tests from a unittest.TestSuite-derived class."""
893 if verbose:
894 runner = unittest.TextTestRunner(sys.stdout, verbosity=2)
895 else:
896 runner = BasicTestRunner()
898 result = runner.run(suite)
899 if not result.wasSuccessful():
900 if len(result.errors) == 1 and not result.failures:
901 err = result.errors[0][1]
902 elif len(result.failures) == 1 and not result.errors:
903 err = result.failures[0][1]
904 else:
905 err = "multiple errors occurred"
906 if not verbose:
907 err += "; run in verbose mode for details"
908 raise TestFailed(err)
911 def run_unittest(*classes):
912 """Run tests from unittest.TestCase-derived classes."""
913 valid_types = (unittest.TestSuite, unittest.TestCase)
914 suite = unittest.TestSuite()
915 for cls in classes:
916 if isinstance(cls, str):
917 if cls in sys.modules:
918 suite.addTest(unittest.findTestCases(sys.modules[cls]))
919 else:
920 raise ValueError("str arguments must be keys in sys.modules")
921 elif isinstance(cls, valid_types):
922 suite.addTest(cls)
923 else:
924 suite.addTest(unittest.makeSuite(cls))
925 _run_suite(suite)
928 #=======================================================================
929 # doctest driver.
931 def run_doctest(module, verbosity=None):
932 """Run doctest on the given module. Return (#failures, #tests).
934 If optional argument verbosity is not specified (or is None), pass
935 test_support's belief about verbosity on to doctest. Else doctest's
936 usual behavior is used (it searches sys.argv for -v).
939 import doctest
941 if verbosity is None:
942 verbosity = verbose
943 else:
944 verbosity = None
946 # Direct doctest output (normally just errors) to real stdout; doctest
947 # output shouldn't be compared by regrtest.
948 save_stdout = sys.stdout
949 sys.stdout = get_original_stdout()
950 try:
951 f, t = doctest.testmod(module, verbose=verbosity)
952 if f:
953 raise TestFailed("%d of %d doctests failed" % (f, t))
954 finally:
955 sys.stdout = save_stdout
956 if verbose:
957 print 'doctest (%s) ... %d tests with zero failures' % (module.__name__, t)
958 return f, t
960 #=======================================================================
961 # Threading support to prevent reporting refleaks when running regrtest.py -R
963 # NOTE: we use thread._count() rather than threading.enumerate() (or the
964 # moral equivalent thereof) because a threading.Thread object is still alive
965 # until its __bootstrap() method has returned, even after it has been
966 # unregistered from the threading module.
967 # thread._count(), on the other hand, only gets decremented *after* the
968 # __bootstrap() method has returned, which gives us reliable reference counts
969 # at the end of a test run.
971 def threading_setup():
972 import thread
973 return thread._count(),
975 def threading_cleanup(nb_threads):
976 import thread
977 import time
979 _MAX_COUNT = 10
980 for count in range(_MAX_COUNT):
981 n = thread._count()
982 if n == nb_threads:
983 break
984 time.sleep(0.1)
985 # XXX print a warning in case of failure?
987 def reap_threads(func):
988 @functools.wraps(func)
989 def decorator(*args):
990 key = threading_setup()
991 try:
992 return func(*args)
993 finally:
994 threading_cleanup(*key)
995 return decorator
997 def reap_children():
998 """Use this function at the end of test_main() whenever sub-processes
999 are started. This will help ensure that no extra children (zombies)
1000 stick around to hog resources and create problems when looking
1001 for refleaks.
1004 # Reap all our dead child processes so we don't leave zombies around.
1005 # These hog resources and might be causing some of the buildbots to die.
1006 if hasattr(os, 'waitpid'):
1007 any_process = -1
1008 while True:
1009 try:
1010 # This will raise an exception on Windows. That's ok.
1011 pid, status = os.waitpid(any_process, os.WNOHANG)
1012 if pid == 0:
1013 break
1014 except:
1015 break
1017 def py3k_bytes(b):
1018 """Emulate the py3k bytes() constructor.
1020 NOTE: This is only a best effort function.
1022 try:
1023 # memoryview?
1024 return b.tobytes()
1025 except AttributeError:
1026 try:
1027 # iterable of ints?
1028 return b"".join(chr(x) for x in b)
1029 except TypeError:
1030 return bytes(b)