1 # Very rudimentary test of threading module
3 import test
.test_support
4 from test
.test_support
import verbose
11 # A trivial mutable counter.
12 class Counter(object):
22 class TestThread(threading
.Thread
):
23 def __init__(self
, name
, testcase
, sema
, mutex
, nrunning
):
24 threading
.Thread
.__init
__(self
, name
=name
)
25 self
.testcase
= testcase
28 self
.nrunning
= nrunning
31 delay
= random
.random() * 2
33 print 'task', self
.getName(), 'will run for', delay
, 'sec'
40 print self
.nrunning
.get(), 'tasks are running'
41 self
.testcase
.assert_(self
.nrunning
.get() <= 3)
46 print 'task', self
.getName(), 'done'
50 self
.testcase
.assert_(self
.nrunning
.get() >= 0)
52 print self
.getName(), 'is finished.', self
.nrunning
.get(), \
58 class ThreadTests(unittest
.TestCase
):
60 # Create a bunch of threads, let each do some work, wait until all are
62 def test_various_ops(self
):
63 # This takes about n/3 seconds to run (about n/3 clumps of tasks,
64 # times about 1 second per clump).
67 # no more than 3 of the 10 can run at once
68 sema
= threading
.BoundedSemaphore(value
=3)
69 mutex
= threading
.RLock()
70 numrunning
= Counter()
74 for i
in range(NUMTASKS
):
75 t
= TestThread("<thread %d>"%i, self
, sema
, mutex
, numrunning
)
80 print 'waiting for all tasks to complete'
83 self
.assert_(not t
.isAlive())
85 print 'all tasks done'
86 self
.assertEqual(numrunning
.get(), 0)
88 def test_foreign_thread(self
):
89 # Check that a "foreign" thread can use the threading module.
91 # Acquiring an RLock forces an entry for the foreign
92 # thread to get made in the threading._active map.
98 mutex
= threading
.Lock()
100 tid
= thread
.start_new_thread(f
, (mutex
,))
101 # Wait for the thread to finish.
103 self
.assert_(tid
in threading
._active
)
104 self
.assert_(isinstance(threading
._active
[tid
],
105 threading
._DummyThread
))
106 del threading
._active
[tid
]
109 test
.test_support
.run_unittest(ThreadTests
)
111 if __name__
== "__main__":