1 """Supporting definitions for the Python regression tests."""
3 if __name__
!= 'test.test_support':
4 raise ImportError('test_support must be imported from the test package')
20 __all__
= ["Error", "TestFailed", "ResourceDenied", "import_module",
21 "verbose", "use_resources", "max_memuse", "record_original_stdout",
22 "get_original_stdout", "unload", "unlink", "rmtree", "forget",
23 "is_resource_enabled", "requires", "find_unused_port", "bind_port",
24 "fcmp", "have_unicode", "is_jython", "TESTFN", "HOST", "FUZZ",
25 "findfile", "verify", "vereq", "sortdict", "check_syntax_error",
26 "open_urlresource", "check_warnings", "CleanImport",
27 "EnvironmentVarGuard", "captured_output",
28 "captured_stdout", "TransientResource", "transient_internet",
29 "run_with_locale", "set_memlimit", "bigmemtest", "bigaddrspacetest",
30 "BasicTestRunner", "run_unittest", "run_doctest", "threading_setup",
31 "threading_cleanup", "reap_children", "cpython_only",
32 "check_impl_detail", "get_attribute", "py3k_bytes"]
34 class Error(Exception):
35 """Base class for regression test exceptions."""
37 class TestFailed(Error
):
40 class ResourceDenied(unittest
.SkipTest
):
41 """Test skipped because it requested a disallowed resource.
43 This is raised when a test calls requires() for a resource that
44 has not be enabled. It is used to distinguish between expected
48 @contextlib.contextmanager
49 def _ignore_deprecated_imports(ignore
=True):
50 """Context manager to suppress package and module deprecation
51 warnings when importing them.
53 If ignore is False, this context manager has no effect."""
55 with warnings
.catch_warnings():
56 warnings
.filterwarnings("ignore", ".+ (module|package)",
63 def import_module(name
, deprecated
=False):
64 """Import and return the module to be tested, raising SkipTest if
67 If deprecated is True, any module or package deprecation messages
68 will be suppressed."""
69 with
_ignore_deprecated_imports(deprecated
):
71 return importlib
.import_module(name
)
72 except ImportError, msg
:
73 raise unittest
.SkipTest(str(msg
))
76 def _save_and_remove_module(name
, orig_modules
):
77 """Helper function to save and remove a module from sys.modules
79 Return value is True if the module was in sys.modules and
83 orig_modules
[name
] = sys
.modules
[name
]
91 def _save_and_block_module(name
, orig_modules
):
92 """Helper function to save and block a module in sys.modules
94 Return value is True if the module was in sys.modules and
98 orig_modules
[name
] = sys
.modules
[name
]
101 sys
.modules
[name
] = 0
105 def import_fresh_module(name
, fresh
=(), blocked
=(), deprecated
=False):
106 """Imports and returns a module, deliberately bypassing the sys.modules cache
107 and importing a fresh copy of the module. Once the import is complete,
108 the sys.modules cache is restored to its original state.
110 Modules named in fresh are also imported anew if needed by the import.
112 Importing of modules named in blocked is prevented while the fresh import
115 If deprecated is True, any module or package deprecation messages
116 will be suppressed."""
117 # NOTE: test_heapq and test_warnings include extra sanity checks to make
118 # sure that this utility function is working as expected
119 with
_ignore_deprecated_imports(deprecated
):
120 # Keep track of modules saved for later restoration as well
121 # as those which just need a blocking entry removed
124 _save_and_remove_module(name
, orig_modules
)
126 for fresh_name
in fresh
:
127 _save_and_remove_module(fresh_name
, orig_modules
)
128 for blocked_name
in blocked
:
129 if not _save_and_block_module(blocked_name
, orig_modules
):
130 names_to_remove
.append(blocked_name
)
131 fresh_module
= importlib
.import_module(name
)
133 for orig_name
, module
in orig_modules
.items():
134 sys
.modules
[orig_name
] = module
135 for name_to_remove
in names_to_remove
:
136 del sys
.modules
[name_to_remove
]
140 def get_attribute(obj
, name
):
141 """Get an attribute, raising SkipTest if AttributeError is raised."""
143 attribute
= getattr(obj
, name
)
144 except AttributeError:
145 raise unittest
.SkipTest("module %s has no attribute %s" % (
151 verbose
= 1 # Flag set to 0 by regrtest.py
152 use_resources
= None # Flag set to [] by regrtest.py
153 max_memuse
= 0 # Disable bigmem tests (they will still be run with
154 # small sizes, to make sure they work.)
157 # _original_stdout is meant to hold stdout at the time regrtest began.
158 # This may be "the real" stdout, or IDLE's emulation of stdout, or whatever.
159 # The point is to have some flavor of stdout the user can actually see.
160 _original_stdout
= None
161 def record_original_stdout(stdout
):
162 global _original_stdout
163 _original_stdout
= stdout
165 def get_original_stdout():
166 return _original_stdout
or sys
.stdout
170 del sys
.modules
[name
]
174 def unlink(filename
):
184 # Unix returns ENOENT, Windows returns ESRCH.
185 if e
.errno
not in (errno
.ENOENT
, errno
.ESRCH
):
189 '''"Forget" a module was ever imported by removing it from sys.modules and
190 deleting any .pyc and .pyo files.'''
192 for dirname
in sys
.path
:
193 unlink(os
.path
.join(dirname
, modname
+ os
.extsep
+ 'pyc'))
194 # Deleting the .pyo file cannot be within the 'try' for the .pyc since
195 # the chance exists that there is no .pyc (and thus the 'try' statement
196 # is exited) but there is a .pyo file.
197 unlink(os
.path
.join(dirname
, modname
+ os
.extsep
+ 'pyo'))
199 def is_resource_enabled(resource
):
200 """Test whether a resource is enabled. Known resources are set by
202 return use_resources
is not None and resource
in use_resources
204 def requires(resource
, msg
=None):
205 """Raise ResourceDenied if the specified resource is not available.
207 If the caller's module is __main__ then automatically return True. The
208 possibility of False being returned occurs when regrtest.py is executing."""
209 # see if the caller's module is __main__ - if so, treat as if
210 # the resource was set
211 if sys
._getframe
(1).f_globals
.get("__name__") == "__main__":
213 if not is_resource_enabled(resource
):
215 msg
= "Use of the `%s' resource not enabled" % resource
216 raise ResourceDenied(msg
)
220 def find_unused_port(family
=socket
.AF_INET
, socktype
=socket
.SOCK_STREAM
):
221 """Returns an unused port that should be suitable for binding. This is
222 achieved by creating a temporary socket with the same family and type as
223 the 'sock' parameter (default is AF_INET, SOCK_STREAM), and binding it to
224 the specified host address (defaults to 0.0.0.0) with the port set to 0,
225 eliciting an unused ephemeral port from the OS. The temporary socket is
226 then closed and deleted, and the ephemeral port is returned.
228 Either this method or bind_port() should be used for any tests where a
229 server socket needs to be bound to a particular port for the duration of
230 the test. Which one to use depends on whether the calling code is creating
231 a python socket, or if an unused port needs to be provided in a constructor
232 or passed to an external program (i.e. the -accept argument to openssl's
233 s_server mode). Always prefer bind_port() over find_unused_port() where
234 possible. Hard coded ports should *NEVER* be used. As soon as a server
235 socket is bound to a hard coded port, the ability to run multiple instances
236 of the test simultaneously on the same host is compromised, which makes the
237 test a ticking time bomb in a buildbot environment. On Unix buildbots, this
238 may simply manifest as a failed test, which can be recovered from without
239 intervention in most cases, but on Windows, the entire python process can
240 completely and utterly wedge, requiring someone to log in to the buildbot
241 and manually kill the affected process.
243 (This is easy to reproduce on Windows, unfortunately, and can be traced to
244 the SO_REUSEADDR socket option having different semantics on Windows versus
245 Unix/Linux. On Unix, you can't have two AF_INET SOCK_STREAM sockets bind,
246 listen and then accept connections on identical host/ports. An EADDRINUSE
247 socket.error will be raised at some point (depending on the platform and
248 the order bind and listen were called on each socket).
250 However, on Windows, if SO_REUSEADDR is set on the sockets, no EADDRINUSE
251 will ever be raised when attempting to bind two identical host/ports. When
252 accept() is called on each socket, the second caller's process will steal
253 the port from the first caller, leaving them both in an awkwardly wedged
254 state where they'll no longer respond to any signals or graceful kills, and
255 must be forcibly killed via OpenProcess()/TerminateProcess().
257 The solution on Windows is to use the SO_EXCLUSIVEADDRUSE socket option
258 instead of SO_REUSEADDR, which effectively affords the same semantics as
259 SO_REUSEADDR on Unix. Given the propensity of Unix developers in the Open
260 Source world compared to Windows ones, this is a common mistake. A quick
261 look over OpenSSL's 0.9.8g source shows that they use SO_REUSEADDR when
262 openssl.exe is called with the 's_server' option, for example. See
263 http://bugs.python.org/issue2550 for more info. The following site also
264 has a very thorough description about the implications of both REUSEADDR
265 and EXCLUSIVEADDRUSE on Windows:
266 http://msdn2.microsoft.com/en-us/library/ms740621(VS.85).aspx)
268 XXX: although this approach is a vast improvement on previous attempts to
269 elicit unused ports, it rests heavily on the assumption that the ephemeral
270 port returned to us by the OS won't immediately be dished back out to some
271 other process when we close and delete our temporary socket but before our
272 calling code has a chance to bind the returned port. We can deal with this
273 issue if/when we come across it."""
274 tempsock
= socket
.socket(family
, socktype
)
275 port
= bind_port(tempsock
)
280 def bind_port(sock
, host
=HOST
):
281 """Bind the socket to a free port and return the port number. Relies on
282 ephemeral ports in order to ensure we are using an unbound port. This is
283 important as many tests may be running simultaneously, especially in a
284 buildbot environment. This method raises an exception if the sock.family
285 is AF_INET and sock.type is SOCK_STREAM, *and* the socket has SO_REUSEADDR
286 or SO_REUSEPORT set on it. Tests should *never* set these socket options
287 for TCP/IP sockets. The only case for setting these options is testing
288 multicasting via multiple UDP sockets.
290 Additionally, if the SO_EXCLUSIVEADDRUSE socket option is available (i.e.
291 on Windows), it will be set on the socket. This will prevent anyone else
292 from bind()'ing to our host/port for the duration of the test.
294 if sock
.family
== socket
.AF_INET
and sock
.type == socket
.SOCK_STREAM
:
295 if hasattr(socket
, 'SO_REUSEADDR'):
296 if sock
.getsockopt(socket
.SOL_SOCKET
, socket
.SO_REUSEADDR
) == 1:
297 raise TestFailed("tests should never set the SO_REUSEADDR " \
298 "socket option on TCP/IP sockets!")
299 if hasattr(socket
, 'SO_REUSEPORT'):
300 if sock
.getsockopt(socket
.SOL_SOCKET
, socket
.SO_REUSEPORT
) == 1:
301 raise TestFailed("tests should never set the SO_REUSEPORT " \
302 "socket option on TCP/IP sockets!")
303 if hasattr(socket
, 'SO_EXCLUSIVEADDRUSE'):
304 sock
.setsockopt(socket
.SOL_SOCKET
, socket
.SO_EXCLUSIVEADDRUSE
, 1)
307 port
= sock
.getsockname()[1]
312 def fcmp(x
, y
): # fuzzy comparison function
313 if isinstance(x
, float) or isinstance(y
, float):
315 fuzz
= (abs(x
) + abs(y
)) * FUZZ
320 elif type(x
) == type(y
) and isinstance(x
, (tuple, list)):
321 for i
in range(min(len(x
), len(y
))):
322 outcome
= fcmp(x
[i
], y
[i
])
325 return (len(x
) > len(y
)) - (len(x
) < len(y
))
326 return (x
> y
) - (x
< y
)
334 is_jython
= sys
.platform
.startswith('java')
336 # Filename used for testing
337 if os
.name
== 'java':
338 # Jython disallows @ in module names
340 elif os
.name
== 'riscos':
344 # Unicode name only used if TEST_FN_ENCODING exists for the platform.
346 # Assuming sys.getfilesystemencoding()!=sys.getdefaultencoding()
347 # TESTFN_UNICODE is a filename that can be encoded using the
348 # file system encoding, but *not* with the default (ascii) encoding
349 if isinstance('', unicode):
351 # XXX perhaps unicode() should accept Unicode strings?
352 TESTFN_UNICODE
= "@test-\xe0\xf2"
354 # 2 latin characters.
355 TESTFN_UNICODE
= unicode("@test-\xe0\xf2", "latin-1")
356 TESTFN_ENCODING
= sys
.getfilesystemencoding()
357 # TESTFN_UNICODE_UNENCODEABLE is a filename that should *not* be
358 # able to be encoded by *either* the default or filesystem encoding.
359 # This test really only makes sense on Windows NT platforms
360 # which have special Unicode support in posixmodule.
361 if (not hasattr(sys
, "getwindowsversion") or
362 sys
.getwindowsversion()[3] < 2): # 0=win32s or 1=9x/ME
363 TESTFN_UNICODE_UNENCODEABLE
= None
365 # Japanese characters (I think - from bug 846133)
366 TESTFN_UNICODE_UNENCODEABLE
= eval('u"@test-\u5171\u6709\u3055\u308c\u308b"')
368 # XXX - Note - should be using TESTFN_ENCODING here - but for
369 # Windows, "mbcs" currently always operates as if in
370 # errors=ignore' mode - hence we get '?' characters rather than
371 # the exception. 'Latin1' operates as we expect - ie, fails.
372 # See [ 850997 ] mbcs encoding ignores errors
373 TESTFN_UNICODE_UNENCODEABLE
.encode("Latin1")
374 except UnicodeEncodeError:
378 'WARNING: The filename %r CAN be encoded by the filesystem. ' \
379 'Unicode filename tests may not be effective' \
380 % TESTFN_UNICODE_UNENCODEABLE
382 # Disambiguate TESTFN for parallel testing, while letting it remain a valid
384 TESTFN
= "{0}_{1}_tmp".format(TESTFN
, os
.getpid())
386 # Make sure we can write to TESTFN, try in /tmp if we can't
389 fp
= open(TESTFN
, 'w+')
391 TMP_TESTFN
= os
.path
.join('/tmp', TESTFN
)
393 fp
= open(TMP_TESTFN
, 'w+')
397 print ('WARNING: tests will fail, unable to write to: %s or %s' %
398 (TESTFN
, TMP_TESTFN
))
404 def findfile(file, here
=__file__
):
405 """Try to find a file on sys.path and the working directory. If it is not
406 found the argument passed to the function is returned (this does not
407 necessarily signal failure; could still be the legitimate path)."""
408 if os
.path
.isabs(file):
411 path
= [os
.path
.dirname(here
)] + path
413 fn
= os
.path
.join(dn
, file)
414 if os
.path
.exists(fn
): return fn
417 def verify(condition
, reason
='test failed'):
418 """Verify that condition is true. If not, raise TestFailed.
420 The optional argument reason can be given to provide
425 raise TestFailed(reason
)
428 """Raise TestFailed if a == b is false.
430 This is better than verify(a == b) because, in case of failure, the
431 error message incorporates repr(a) and repr(b) so you can see the
434 Note that "not (a == b)" isn't necessarily the same as "a != b"; the
439 raise TestFailed("%r == %r" % (a
, b
))
442 "Like repr(dict), but in sorted order."
445 reprpairs
= ["%r: %r" % pair
for pair
in items
]
446 withcommas
= ", ".join(reprpairs
)
447 return "{%s}" % withcommas
451 Create an invalid file descriptor by opening and closing a file and return
454 file = open(TESTFN
, "wb")
461 def check_syntax_error(testcase
, statement
):
462 testcase
.assertRaises(SyntaxError, compile, statement
,
463 '<test string>', 'exec')
465 def open_urlresource(url
):
466 import urllib
, urlparse
469 filename
= urlparse
.urlparse(url
)[2].split('/')[-1] # '/': it's URL!
471 fn
= os
.path
.join(os
.path
.dirname(__file__
), "data", filename
)
472 if os
.path
.exists(fn
):
475 print >> get_original_stdout(), '\tfetching %s ...' % url
476 fn
, _
= urllib
.urlretrieve(url
, fn
)
480 class WarningsRecorder(object):
481 """Convenience wrapper for the warnings list returned on
482 entry to the warnings.catch_warnings() context manager.
484 def __init__(self
, warnings_list
):
485 self
.warnings
= warnings_list
487 def __getattr__(self
, attr
):
489 return getattr(self
.warnings
[-1], attr
)
490 elif attr
in warnings
.WarningMessage
._WARNING
_DETAILS
:
492 raise AttributeError("%r has no attribute %r" % (self
, attr
))
497 @contextlib.contextmanager
498 def check_warnings():
499 with warnings
.catch_warnings(record
=True) as w
:
500 yield WarningsRecorder(w
)
503 class CleanImport(object):
504 """Context manager to force import to return a new module reference.
506 This is useful for testing module-level behaviours, such as
507 the emission of a DeprecationWarning on import.
511 with CleanImport("foo"):
512 __import__("foo") # new reference
515 def __init__(self
, *module_names
):
516 self
.original_modules
= sys
.modules
.copy()
517 for module_name
in module_names
:
518 if module_name
in sys
.modules
:
519 module
= sys
.modules
[module_name
]
520 # It is possible that module_name is just an alias for
521 # another module (e.g. stub for modules renamed in 3.x).
522 # In that case, we also need delete the real module to clear
524 if module
.__name
__ != module_name
:
525 del sys
.modules
[module
.__name
__]
526 del sys
.modules
[module_name
]
531 def __exit__(self
, *ignore_exc
):
532 sys
.modules
.update(self
.original_modules
)
535 class EnvironmentVarGuard(UserDict
.DictMixin
):
537 """Class to help protect the environment variable properly. Can be used as
538 a context manager."""
541 self
._environ
= os
.environ
544 def __getitem__(self
, envvar
):
545 return self
._environ
[envvar
]
547 def __setitem__(self
, envvar
, value
):
548 # Remember the initial value on the first access
549 if envvar
not in self
._changed
:
550 self
._changed
[envvar
] = self
._environ
.get(envvar
)
551 self
._environ
[envvar
] = value
553 def __delitem__(self
, envvar
):
554 # Remember the initial value on the first access
555 if envvar
not in self
._changed
:
556 self
._changed
[envvar
] = self
._environ
.get(envvar
)
557 if envvar
in self
._environ
:
558 del self
._environ
[envvar
]
561 return self
._environ
.keys()
563 def set(self
, envvar
, value
):
566 def unset(self
, envvar
):
572 def __exit__(self
, *ignore_exc
):
573 for (k
, v
) in self
._changed
.items():
575 if k
in self
._environ
:
579 os
.environ
= self
._environ
582 class DirsOnSysPath(object):
583 """Context manager to temporarily add directories to sys.path.
585 This makes a copy of sys.path, appends any directories given
586 as positional arguments, then reverts sys.path to the copied
587 settings when the context ends.
589 Note that *all* sys.path modifications in the body of the
590 context manager, including replacement of the object,
591 will be reverted at the end of the block.
594 def __init__(self
, *paths
):
595 self
.original_value
= sys
.path
[:]
596 self
.original_object
= sys
.path
597 sys
.path
.extend(paths
)
602 def __exit__(self
, *ignore_exc
):
603 sys
.path
= self
.original_object
604 sys
.path
[:] = self
.original_value
607 class TransientResource(object):
609 """Raise ResourceDenied if an exception is raised while the context manager
610 is in effect that matches the specified exception and attributes."""
612 def __init__(self
, exc
, **kwargs
):
619 def __exit__(self
, type_
=None, value
=None, traceback
=None):
620 """If type_ is a subclass of self.exc and value has attributes matching
621 self.attrs, raise ResourceDenied. Otherwise let the exception
622 propagate (if any)."""
623 if type_
is not None and issubclass(self
.exc
, type_
):
624 for attr
, attr_value
in self
.attrs
.iteritems():
625 if not hasattr(value
, attr
):
627 if getattr(value
, attr
) != attr_value
:
630 raise ResourceDenied("an optional resource is not available")
633 @contextlib.contextmanager
634 def transient_internet():
635 """Return a context manager that raises ResourceDenied when various issues
636 with the Internet connection manifest themselves as exceptions."""
637 time_out
= TransientResource(IOError, errno
=errno
.ETIMEDOUT
)
638 socket_peer_reset
= TransientResource(socket
.error
, errno
=errno
.ECONNRESET
)
639 ioerror_peer_reset
= TransientResource(IOError, errno
=errno
.ECONNRESET
)
640 with time_out
, socket_peer_reset
, ioerror_peer_reset
:
644 @contextlib.contextmanager
645 def captured_output(stream_name
):
646 """Run the 'with' statement body using a StringIO object in place of a
647 specific attribute on the sys module.
648 Example use (with 'stream_name=stdout')::
650 with captured_stdout() as s:
652 assert s.getvalue() == "hello"
655 orig_stdout
= getattr(sys
, stream_name
)
656 setattr(sys
, stream_name
, StringIO
.StringIO())
658 yield getattr(sys
, stream_name
)
660 setattr(sys
, stream_name
, orig_stdout
)
662 def captured_stdout():
663 return captured_output("stdout")
665 def captured_stdin():
666 return captured_output("stdin")
669 """Force as many objects as possible to be collected.
671 In non-CPython implementations of Python, this is needed because timely
672 deallocation is not guaranteed by the garbage collector. (Even in CPython
673 this can be the case in case of reference cycles.) This means that __del__
674 methods may be called later than expected and weakrefs may remain alive for
675 longer than expected. This function tries its best to force all garbage
676 objects to disappear.
683 #=======================================================================
684 # Decorator for running a function in a different locale, correctly resetting
687 def run_with_locale(catstr
, *locales
):
689 def inner(*args
, **kwds
):
692 category
= getattr(locale
, catstr
)
693 orig_locale
= locale
.setlocale(category
)
694 except AttributeError:
695 # if the test author gives us an invalid category string
698 # cannot retrieve original locale, so do nothing
699 locale
= orig_locale
= None
703 locale
.setlocale(category
, loc
)
708 # now run the function, resetting the locale on exceptions
710 return func(*args
, **kwds
)
712 if locale
and orig_locale
:
713 locale
.setlocale(category
, orig_locale
)
714 inner
.func_name
= func
.func_name
715 inner
.__doc
__ = func
.__doc
__
719 #=======================================================================
720 # Big-memory-test support. Separate from 'resources' because memory use should be configurable.
722 # Some handy shorthands. Note that these are used for byte-limits as well
723 # as size-limits, in the various bigmem tests
729 MAX_Py_ssize_t
= sys
.maxsize
731 def set_memlimit(limit
):
734 global real_max_memuse
741 m
= re
.match(r
'(\d+(\.\d+)?) (K|M|G|T)b?$', limit
,
742 re
.IGNORECASE | re
.VERBOSE
)
744 raise ValueError('Invalid memory limit %r' % (limit
,))
745 memlimit
= int(float(m
.group(1)) * sizes
[m
.group(3).lower()])
746 real_max_memuse
= memlimit
747 if memlimit
> MAX_Py_ssize_t
:
748 memlimit
= MAX_Py_ssize_t
749 if memlimit
< _2G
- 1:
750 raise ValueError('Memory limit %r too low to be useful' % (limit
,))
751 max_memuse
= memlimit
753 def bigmemtest(minsize
, memuse
, overhead
=5*_1M
):
754 """Decorator for bigmem tests.
756 'minsize' is the minimum useful size for the test (in arbitrary,
757 test-interpreted units.) 'memuse' is the number of 'bytes per size' for
758 the test, or a good estimate of it. 'overhead' specifies fixed overhead,
759 independent of the testsize, and defaults to 5Mb.
761 The decorator tries to guess a good value for 'size' and passes it to
762 the decorated test function. If minsize * memuse is more than the
763 allowed memory use (as defined by max_memuse), the test is skipped.
764 Otherwise, minsize is adjusted upward to use up to max_memuse.
769 # If max_memuse is 0 (the default),
770 # we still want to run the tests with size set to a few kb,
771 # to make sure they work. We still want to avoid using
772 # too much memory, though, but we do that noisily.
774 self
.assertFalse(maxsize
* memuse
+ overhead
> 20 * _1M
)
776 maxsize
= int((max_memuse
- overhead
) / memuse
)
777 if maxsize
< minsize
:
778 # Really ought to print 'test skipped' or something
780 sys
.stderr
.write("Skipping %s because of memory "
781 "constraint\n" % (f
.__name
__,))
783 # Try to keep some breathing room in memory use
784 maxsize
= max(maxsize
- 50 * _1M
, minsize
)
785 return f(self
, maxsize
)
786 wrapper
.minsize
= minsize
787 wrapper
.memuse
= memuse
788 wrapper
.overhead
= overhead
792 def precisionbigmemtest(size
, memuse
, overhead
=5*_1M
):
795 if not real_max_memuse
:
800 if real_max_memuse
and real_max_memuse
< maxsize
* memuse
:
802 sys
.stderr
.write("Skipping %s because of memory "
803 "constraint\n" % (f
.__name
__,))
806 return f(self
, maxsize
)
808 wrapper
.memuse
= memuse
809 wrapper
.overhead
= overhead
813 def bigaddrspacetest(f
):
814 """Decorator for tests that fill the address space."""
816 if max_memuse
< MAX_Py_ssize_t
:
818 sys
.stderr
.write("Skipping %s because of memory "
819 "constraint\n" % (f
.__name
__,))
824 #=======================================================================
825 # unittest integration.
827 class BasicTestRunner
:
829 result
= unittest
.TestResult()
836 def requires_resource(resource
):
837 if resource_is_enabled(resource
):
840 return unittest
.skip("resource {0!r} is not enabled".format(resource
))
842 def cpython_only(test
):
844 Decorator for tests only applicable on CPython.
846 return impl_detail(cpython
=True)(test
)
848 def impl_detail(msg
=None, **guards
):
849 if check_impl_detail(**guards
):
852 guardnames
, default
= _parse_guards(guards
)
854 msg
= "implementation detail not available on {0}"
856 msg
= "implementation detail specific to {0}"
857 guardnames
= sorted(guardnames
.keys())
858 msg
= msg
.format(' or '.join(guardnames
))
859 return unittest
.skip(msg
)
861 def _parse_guards(guards
):
862 # Returns a tuple ({platform_name: run_me}, default_value)
864 return ({'cpython': True}, False)
865 is_true
= guards
.values()[0]
866 assert guards
.values() == [is_true
] * len(guards
) # all True or all False
867 return (guards
, not is_true
)
869 # Use the following check to guard CPython's implementation-specific tests --
870 # or to run them only on the implementation(s) guarded by the arguments.
871 def check_impl_detail(**guards
):
872 """This function returns True or False depending on the host platform.
874 if check_impl_detail(): # only on CPython (default)
875 if check_impl_detail(jython=True): # only on Jython
876 if check_impl_detail(cpython=False): # everywhere except on CPython
878 guards
, default
= _parse_guards(guards
)
879 return guards
.get(platform
.python_implementation().lower(), default
)
883 def _run_suite(suite
):
884 """Run tests from a unittest.TestSuite-derived class."""
886 runner
= unittest
.TextTestRunner(sys
.stdout
, verbosity
=2)
888 runner
= BasicTestRunner()
890 result
= runner
.run(suite
)
891 if not result
.wasSuccessful():
892 if len(result
.errors
) == 1 and not result
.failures
:
893 err
= result
.errors
[0][1]
894 elif len(result
.failures
) == 1 and not result
.errors
:
895 err
= result
.failures
[0][1]
897 err
= "multiple errors occurred"
899 err
+= "; run in verbose mode for details"
900 raise TestFailed(err
)
903 def run_unittest(*classes
):
904 """Run tests from unittest.TestCase-derived classes."""
905 valid_types
= (unittest
.TestSuite
, unittest
.TestCase
)
906 suite
= unittest
.TestSuite()
908 if isinstance(cls
, str):
909 if cls
in sys
.modules
:
910 suite
.addTest(unittest
.findTestCases(sys
.modules
[cls
]))
912 raise ValueError("str arguments must be keys in sys.modules")
913 elif isinstance(cls
, valid_types
):
916 suite
.addTest(unittest
.makeSuite(cls
))
920 #=======================================================================
923 def run_doctest(module
, verbosity
=None):
924 """Run doctest on the given module. Return (#failures, #tests).
926 If optional argument verbosity is not specified (or is None), pass
927 test_support's belief about verbosity on to doctest. Else doctest's
928 usual behavior is used (it searches sys.argv for -v).
933 if verbosity
is None:
938 # Direct doctest output (normally just errors) to real stdout; doctest
939 # output shouldn't be compared by regrtest.
940 save_stdout
= sys
.stdout
941 sys
.stdout
= get_original_stdout()
943 f
, t
= doctest
.testmod(module
, verbose
=verbosity
)
945 raise TestFailed("%d of %d doctests failed" % (f
, t
))
947 sys
.stdout
= save_stdout
949 print 'doctest (%s) ... %d tests with zero failures' % (module
.__name
__, t
)
952 #=======================================================================
953 # Threading support to prevent reporting refleaks when running regrtest.py -R
955 def threading_setup():
957 return len(threading
._active
), len(threading
._limbo
)
959 def threading_cleanup(num_active
, num_limbo
):
965 while len(threading
._active
) != num_active
and count
< _MAX_COUNT
:
970 while len(threading
._limbo
) != num_limbo
and count
< _MAX_COUNT
:
974 def reap_threads(func
):
975 @functools.wraps(func
)
976 def decorator(*args
):
977 key
= threading_setup()
981 threading_cleanup(*key
)
985 """Use this function at the end of test_main() whenever sub-processes
986 are started. This will help ensure that no extra children (zombies)
987 stick around to hog resources and create problems when looking
991 # Reap all our dead child processes so we don't leave zombies around.
992 # These hog resources and might be causing some of the buildbots to die.
993 if hasattr(os
, 'waitpid'):
997 # This will raise an exception on Windows. That's ok.
998 pid
, status
= os
.waitpid(any_process
, os
.WNOHANG
)
1005 """Emulate the py3k bytes() constructor.
1007 NOTE: This is only a best effort function.
1012 except AttributeError:
1015 return b
"".join(chr(x
) for x
in b
)