Exceptions raised during renaming in rotating file handlers are now passed to handleE...
[python.git] / Lib / test / test_dummy_threading.py
1 # Very rudimentary test of threading module
3 # Create a bunch of threads, let each do some work, wait until all are done
5 from test.test_support import verbose
6 import random
7 import dummy_threading as _threading
8 import time
11 class TestThread(_threading.Thread):
13 def run(self):
14 global running
15 # Uncomment if testing another module, such as the real 'threading'
16 # module.
17 #delay = random.random() * 2
18 delay = 0
19 if verbose:
20 print 'task', self.getName(), 'will run for', delay, 'sec'
21 sema.acquire()
22 mutex.acquire()
23 running = running + 1
24 if verbose:
25 print running, 'tasks are running'
26 mutex.release()
27 time.sleep(delay)
28 if verbose:
29 print 'task', self.getName(), 'done'
30 mutex.acquire()
31 running = running - 1
32 if verbose:
33 print self.getName(), 'is finished.', running, 'tasks are running'
34 mutex.release()
35 sema.release()
37 def starttasks():
38 for i in range(numtasks):
39 t = TestThread(name="<thread %d>"%i)
40 threads.append(t)
41 t.start()
44 def test_main():
45 # This takes about n/3 seconds to run (about n/3 clumps of tasks, times
46 # about 1 second per clump).
47 global numtasks
48 numtasks = 10
50 # no more than 3 of the 10 can run at once
51 global sema
52 sema = _threading.BoundedSemaphore(value=3)
53 global mutex
54 mutex = _threading.RLock()
55 global running
56 running = 0
58 global threads
59 threads = []
61 starttasks()
63 if verbose:
64 print 'waiting for all tasks to complete'
65 for t in threads:
66 t.join()
67 if verbose:
68 print 'all tasks done'
72 if __name__ == '__main__':
73 test_main()