Exceptions raised during renaming in rotating file handlers are now passed to handleE...
[python.git] / Lib / test / test_threading.py
blob7eb9758ecf261fa5deb5a4ca51566115c3687563
1 # Very rudimentary test of threading module
3 import test.test_support
4 from test.test_support import verbose
5 import random
6 import threading
7 import thread
8 import time
9 import unittest
11 # A trivial mutable counter.
12 class Counter(object):
13 def __init__(self):
14 self.value = 0
15 def inc(self):
16 self.value += 1
17 def dec(self):
18 self.value -= 1
19 def get(self):
20 return self.value
22 class TestThread(threading.Thread):
23 def __init__(self, name, testcase, sema, mutex, nrunning):
24 threading.Thread.__init__(self, name=name)
25 self.testcase = testcase
26 self.sema = sema
27 self.mutex = mutex
28 self.nrunning = nrunning
30 def run(self):
31 delay = random.random() * 2
32 if verbose:
33 print 'task', self.getName(), 'will run for', delay, 'sec'
35 self.sema.acquire()
37 self.mutex.acquire()
38 self.nrunning.inc()
39 if verbose:
40 print self.nrunning.get(), 'tasks are running'
41 self.testcase.assert_(self.nrunning.get() <= 3)
42 self.mutex.release()
44 time.sleep(delay)
45 if verbose:
46 print 'task', self.getName(), 'done'
48 self.mutex.acquire()
49 self.nrunning.dec()
50 self.testcase.assert_(self.nrunning.get() >= 0)
51 if verbose:
52 print self.getName(), 'is finished.', self.nrunning.get(), \
53 'tasks are running'
54 self.mutex.release()
56 self.sema.release()
58 class ThreadTests(unittest.TestCase):
60 # Create a bunch of threads, let each do some work, wait until all are
61 # done.
62 def test_various_ops(self):
63 # This takes about n/3 seconds to run (about n/3 clumps of tasks,
64 # times about 1 second per clump).
65 NUMTASKS = 10
67 # no more than 3 of the 10 can run at once
68 sema = threading.BoundedSemaphore(value=3)
69 mutex = threading.RLock()
70 numrunning = Counter()
72 threads = []
74 for i in range(NUMTASKS):
75 t = TestThread("<thread %d>"%i, self, sema, mutex, numrunning)
76 threads.append(t)
77 t.start()
79 if verbose:
80 print 'waiting for all tasks to complete'
81 for t in threads:
82 t.join(NUMTASKS)
83 self.assert_(not t.isAlive())
84 if verbose:
85 print 'all tasks done'
86 self.assertEqual(numrunning.get(), 0)
88 def test_foreign_thread(self):
89 # Check that a "foreign" thread can use the threading module.
90 def f(mutex):
91 # Acquiring an RLock forces an entry for the foreign
92 # thread to get made in the threading._active map.
93 r = threading.RLock()
94 r.acquire()
95 r.release()
96 mutex.release()
98 mutex = threading.Lock()
99 mutex.acquire()
100 tid = thread.start_new_thread(f, (mutex,))
101 # Wait for the thread to finish.
102 mutex.acquire()
103 self.assert_(tid in threading._active)
104 self.assert_(isinstance(threading._active[tid],
105 threading._DummyThread))
106 del threading._active[tid]
108 def test_main():
109 test.test_support.run_unittest(ThreadTests)
111 if __name__ == "__main__":
112 test_main()