1 # Very rudimentary test of threading module
3 import test
.test_support
4 from test
.test_support
import verbose
8 thread
= test
.test_support
.import_module('thread')
9 threading
= test
.test_support
.import_module('threading')
14 from test
import lock_tests
16 # A trivial mutable counter.
17 class Counter(object):
27 class TestThread(threading
.Thread
):
28 def __init__(self
, name
, testcase
, sema
, mutex
, nrunning
):
29 threading
.Thread
.__init
__(self
, name
=name
)
30 self
.testcase
= testcase
33 self
.nrunning
= nrunning
36 delay
= random
.random() / 10000.0
38 print 'task %s will run for %.1f usec' % (
39 self
.name
, delay
* 1e6
)
45 print self
.nrunning
.get(), 'tasks are running'
46 self
.testcase
.assertTrue(self
.nrunning
.get() <= 3)
50 print 'task', self
.name
, 'done'
54 self
.testcase
.assertTrue(self
.nrunning
.get() >= 0)
56 print '%s is finished. %d tasks are running' % (
57 self
.name
, self
.nrunning
.get())
59 class BaseTestCase(unittest
.TestCase
):
61 self
._threads
= test
.test_support
.threading_setup()
64 test
.test_support
.threading_cleanup(*self
._threads
)
65 test
.test_support
.reap_children()
68 class ThreadTests(BaseTestCase
):
70 # Create a bunch of threads, let each do some work, wait until all are
72 def test_various_ops(self
):
73 # This takes about n/3 seconds to run (about n/3 clumps of tasks,
74 # times about 1 second per clump).
77 # no more than 3 of the 10 can run at once
78 sema
= threading
.BoundedSemaphore(value
=3)
79 mutex
= threading
.RLock()
80 numrunning
= Counter()
84 for i
in range(NUMTASKS
):
85 t
= TestThread("<thread %d>"%i, self
, sema
, mutex
, numrunning
)
87 self
.assertEqual(t
.ident
, None)
88 self
.assertTrue(re
.match('<TestThread\(.*, initial\)>', repr(t
)))
92 print 'waiting for all tasks to complete'
95 self
.assertTrue(not t
.is_alive())
96 self
.assertNotEqual(t
.ident
, 0)
97 self
.assertFalse(t
.ident
is None)
98 self
.assertTrue(re
.match('<TestThread\(.*, \w+ -?\d+\)>', repr(t
)))
100 print 'all tasks done'
101 self
.assertEqual(numrunning
.get(), 0)
103 def test_ident_of_no_threading_threads(self
):
104 # The ident still must work for the main thread and dummy threads.
105 self
.assertFalse(threading
.currentThread().ident
is None)
107 ident
.append(threading
.currentThread().ident
)
109 done
= threading
.Event()
111 thread
.start_new_thread(f
, ())
113 self
.assertFalse(ident
[0] is None)
114 # Kill the "immortal" _DummyThread
115 del threading
._active
[ident
[0]]
117 # run with a small(ish) thread stack size (256kB)
118 def test_various_ops_small_stack(self
):
120 print 'with 256kB thread stack size...'
122 threading
.stack_size(262144)
125 print 'platform does not support changing thread stack size'
127 self
.test_various_ops()
128 threading
.stack_size(0)
130 # run with a large thread stack size (1MB)
131 def test_various_ops_large_stack(self
):
133 print 'with 1MB thread stack size...'
135 threading
.stack_size(0x100000)
138 print 'platform does not support changing thread stack size'
140 self
.test_various_ops()
141 threading
.stack_size(0)
143 def test_foreign_thread(self
):
144 # Check that a "foreign" thread can use the threading module.
146 # Calling current_thread() forces an entry for the foreign
147 # thread to get made in the threading._active map.
148 threading
.current_thread()
151 mutex
= threading
.Lock()
153 tid
= thread
.start_new_thread(f
, (mutex
,))
154 # Wait for the thread to finish.
156 self
.assertIn(tid
, threading
._active
)
157 self
.assertIsInstance(threading
._active
[tid
], threading
._DummyThread
)
158 del threading
._active
[tid
]
160 # PyThreadState_SetAsyncExc() is a CPython-only gimmick, not (currently)
161 # exposed at the Python level. This test relies on ctypes to get at it.
162 def test_PyThreadState_SetAsyncExc(self
):
167 print "test_PyThreadState_SetAsyncExc can't import ctypes"
168 return # can't do anything
170 set_async_exc
= ctypes
.pythonapi
.PyThreadState_SetAsyncExc
172 class AsyncExc(Exception):
175 exception
= ctypes
.py_object(AsyncExc
)
177 # First check it works when setting the exception from the same thread.
178 tid
= thread
.get_ident()
181 result
= set_async_exc(ctypes
.c_long(tid
), exception
)
182 # The exception is async, so we might have to keep the VM busy until
189 # This code is unreachable but it reflects the intent. If we wanted
190 # to be smarter the above loop wouldn't be infinite.
191 self
.fail("AsyncExc not raised")
193 self
.assertEqual(result
, 1) # one thread state modified
194 except UnboundLocalError:
195 # The exception was raised too quickly for us to get the result.
198 # `worker_started` is set by the thread when it's inside a try/except
199 # block waiting to catch the asynchronously set AsyncExc exception.
200 # `worker_saw_exception` is set by the thread upon catching that
202 worker_started
= threading
.Event()
203 worker_saw_exception
= threading
.Event()
205 class Worker(threading
.Thread
):
207 self
.id = thread
.get_ident()
208 self
.finished
= False
216 worker_saw_exception
.set()
219 t
.daemon
= True # so if this fails, we don't hang Python at shutdown
222 print " started worker thread"
224 # Try a thread id that doesn't make sense.
226 print " trying nonsensical thread id"
227 result
= set_async_exc(ctypes
.c_long(-1), exception
)
228 self
.assertEqual(result
, 0) # no thread states modified
230 # Now raise an exception in the worker thread.
232 print " waiting for worker thread to get started"
233 ret
= worker_started
.wait()
236 print " verifying worker hasn't exited"
237 self
.assertTrue(not t
.finished
)
239 print " attempting to raise asynch exception in worker"
240 result
= set_async_exc(ctypes
.c_long(t
.id), exception
)
241 self
.assertEqual(result
, 1) # one thread state modified
243 print " waiting for worker to say it caught the exception"
244 worker_saw_exception
.wait(timeout
=10)
245 self
.assertTrue(t
.finished
)
247 print " all OK -- joining worker"
250 # else the thread is still running, and we have no way to kill it
252 def test_limbo_cleanup(self
):
253 # Issue 7481: Failure to start thread should cleanup the limbo map.
254 def fail_new_thread(*args
):
256 _start_new_thread
= threading
._start
_new
_thread
257 threading
._start
_new
_thread
= fail_new_thread
259 t
= threading
.Thread(target
=lambda: None)
260 self
.assertRaises(thread
.error
, t
.start
)
262 t
in threading
._limbo
,
263 "Failed to cleanup _limbo map on failure of Thread.start().")
265 threading
._start
_new
_thread
= _start_new_thread
267 def test_finalize_runnning_thread(self
):
268 # Issue 1402: the PyGILState_Ensure / _Release functions may be called
269 # very late on python exit: on deallocation of a running thread for
275 print("test_finalize_with_runnning_thread can't import ctypes")
276 return # can't do anything
279 rc
= subprocess
.call([sys
.executable
, "-c", """if 1:
280 import ctypes, sys, time, thread
282 # This lock is used as a simple event variable.
283 ready = thread.allocate_lock()
286 # Module globals are cleared before __del__ is run
287 # So we save the functions in class dict
289 ensure = ctypes.pythonapi.PyGILState_Ensure
290 release = ctypes.pythonapi.PyGILState_Release
292 state = self.ensure()
300 thread.start_new_thread(waitingThread, ())
301 ready.acquire() # Be sure the other thread is waiting.
304 self
.assertEqual(rc
, 42)
306 def test_finalize_with_trace(self
):
308 # Avoid a deadlock when sys.settrace steps into threading._shutdown
310 rc
= subprocess
.call([sys
.executable
, "-c", """if 1:
311 import sys, threading
313 # A deadlock-killer, to prevent the
314 # testsuite to hang forever
318 print 'program blocked; aborting'
320 t = threading.Thread(target=killer)
324 # This is the trace function
325 def func(frame, event, arg):
326 threading.current_thread()
331 self
.assertFalse(rc
== 2, "interpreted was blocked")
332 self
.assertTrue(rc
== 0, "Unexpected error")
334 def test_join_nondaemon_on_shutdown(self
):
336 # Raising SystemExit skipped threading._shutdown
338 p
= subprocess
.Popen([sys
.executable
, "-c", """if 1:
340 from time import sleep
344 # As a non-daemon thread we SHOULD wake up and nothing
345 # should be torn down yet
346 print "Woke up, sleep function is:", sleep
348 threading.Thread(target=child).start()
351 stdout
=subprocess
.PIPE
,
352 stderr
=subprocess
.PIPE
)
353 stdout
, stderr
= p
.communicate()
354 self
.assertEqual(stdout
.strip(),
355 "Woke up, sleep function is: <built-in function sleep>")
356 stderr
= re
.sub(r
"^\[\d+ refs\]", "", stderr
, re
.MULTILINE
).strip()
357 self
.assertEqual(stderr
, "")
359 def test_enumerate_after_join(self
):
360 # Try hard to trigger #1703448: a thread is still returned in
361 # threading.enumerate() after it has been join()ed.
362 enum
= threading
.enumerate
363 old_interval
= sys
.getcheckinterval()
365 for i
in xrange(1, 100):
366 # Try a couple times at each thread-switching interval
367 # to get more interleavings.
368 sys
.setcheckinterval(i
// 5)
369 t
= threading
.Thread(target
=lambda: None)
373 self
.assertNotIn(t
, l
,
374 "#1703448 triggered after %d trials: %s" % (i
, l
))
376 sys
.setcheckinterval(old_interval
)
378 def test_no_refcycle_through_target(self
):
379 class RunSelfFunction(object):
380 def __init__(self
, should_raise
):
381 # The links in this refcycle from Thread back to self
382 # should be cleaned up when the thread completes.
383 self
.should_raise
= should_raise
384 self
.thread
= threading
.Thread(target
=self
._run
,
386 kwargs
={'yet_another':self
})
389 def _run(self
, other_ref
, yet_another
):
390 if self
.should_raise
:
393 cyclic_object
= RunSelfFunction(should_raise
=False)
394 weak_cyclic_object
= weakref
.ref(cyclic_object
)
395 cyclic_object
.thread
.join()
397 self
.assertEquals(None, weak_cyclic_object(),
398 msg
=('%d references still around' %
399 sys
.getrefcount(weak_cyclic_object())))
401 raising_cyclic_object
= RunSelfFunction(should_raise
=True)
402 weak_raising_cyclic_object
= weakref
.ref(raising_cyclic_object
)
403 raising_cyclic_object
.thread
.join()
404 del raising_cyclic_object
405 self
.assertEquals(None, weak_raising_cyclic_object(),
406 msg
=('%d references still around' %
407 sys
.getrefcount(weak_raising_cyclic_object())))
410 class ThreadJoinOnShutdown(BaseTestCase
):
412 def _run_and_join(self
, script
):
414 import sys, os, time, threading
416 # a thread, which waits for the main program to terminate
417 def joiningfunc(mainthread):
419 print 'end of thread'
423 p
= subprocess
.Popen([sys
.executable
, "-c", script
], stdout
=subprocess
.PIPE
)
425 data
= p
.stdout
.read().replace('\r', '')
426 self
.assertEqual(data
, "end of main\nend of thread\n")
427 self
.assertFalse(rc
== 2, "interpreter was blocked")
428 self
.assertTrue(rc
== 0, "Unexpected error")
430 def test_1_join_on_shutdown(self
):
431 # The usual case: on exit, wait for a non-daemon thread
434 t = threading.Thread(target=joiningfunc,
435 args=(threading.current_thread(),))
440 self
._run
_and
_join
(script
)
443 def test_2_join_in_forked_process(self
):
444 # Like the test above, but from a forked interpreter
446 if not hasattr(os
, 'fork'):
451 os.waitpid(childpid, 0)
454 t = threading.Thread(target=joiningfunc,
455 args=(threading.current_thread(),))
459 self
._run
_and
_join
(script
)
461 def test_3_join_in_forked_from_thread(self
):
462 # Like the test above, but fork() was called from a worker thread
463 # In the forked process, the main Thread object must be marked as stopped.
465 if not hasattr(os
, 'fork'):
467 # Skip platforms with known problems forking from a worker thread.
468 # See http://bugs.python.org/issue3863.
469 if sys
.platform
in ('freebsd4', 'freebsd5', 'freebsd6', 'os2emx'):
470 print >>sys
.stderr
, ('Skipping test_3_join_in_forked_from_thread'
471 ' due to known OS bugs on'), sys
.platform
474 main_thread = threading.current_thread()
478 os.waitpid(childpid, 0)
481 t = threading.Thread(target=joiningfunc,
485 t.join() # Should not block: main_thread is already stopped
487 w = threading.Thread(target=worker)
490 self
._run
_and
_join
(script
)
493 class ThreadingExceptionTests(BaseTestCase
):
494 # A RuntimeError should be raised if Thread.start() is called
496 def test_start_thread_again(self
):
497 thread
= threading
.Thread()
499 self
.assertRaises(RuntimeError, thread
.start
)
501 def test_joining_current_thread(self
):
502 current_thread
= threading
.current_thread()
503 self
.assertRaises(RuntimeError, current_thread
.join
);
505 def test_joining_inactive_thread(self
):
506 thread
= threading
.Thread()
507 self
.assertRaises(RuntimeError, thread
.join
)
509 def test_daemonize_active_thread(self
):
510 thread
= threading
.Thread()
512 self
.assertRaises(RuntimeError, setattr, thread
, "daemon", True)
515 class LockTests(lock_tests
.LockTests
):
516 locktype
= staticmethod(threading
.Lock
)
518 class RLockTests(lock_tests
.RLockTests
):
519 locktype
= staticmethod(threading
.RLock
)
521 class EventTests(lock_tests
.EventTests
):
522 eventtype
= staticmethod(threading
.Event
)
524 class ConditionAsRLockTests(lock_tests
.RLockTests
):
525 # An Condition uses an RLock by default and exports its API.
526 locktype
= staticmethod(threading
.Condition
)
528 class ConditionTests(lock_tests
.ConditionTests
):
529 condtype
= staticmethod(threading
.Condition
)
531 class SemaphoreTests(lock_tests
.SemaphoreTests
):
532 semtype
= staticmethod(threading
.Semaphore
)
534 class BoundedSemaphoreTests(lock_tests
.BoundedSemaphoreTests
):
535 semtype
= staticmethod(threading
.BoundedSemaphore
)
539 test
.test_support
.run_unittest(LockTests
, RLockTests
, EventTests
,
540 ConditionAsRLockTests
, ConditionTests
,
541 SemaphoreTests
, BoundedSemaphoreTests
,
543 ThreadJoinOnShutdown
,
544 ThreadingExceptionTests
,
547 if __name__
== "__main__":