4 from test
import test_support
9 from test
import lock_tests
15 _print_mutex
= thread
.allocate_lock()
17 def verbose_print(arg
):
18 """Helper function for printing out debugging output."""
19 if test_support
.verbose
:
24 class BasicThreadTest(unittest
.TestCase
):
27 self
.done_mutex
= thread
.allocate_lock()
28 self
.done_mutex
.acquire()
29 self
.running_mutex
= thread
.allocate_lock()
30 self
.random_mutex
= thread
.allocate_lock()
36 class ThreadRunningTests(BasicThreadTest
):
39 with self
.running_mutex
:
41 verbose_print("creating task %s" % self
.next_ident
)
42 thread
.start_new_thread(self
.task
, (self
.next_ident
,))
46 def task(self
, ident
):
47 with self
.random_mutex
:
48 delay
= random
.random() / 10000.0
49 verbose_print("task %s will run for %sus" % (ident
, round(delay
*1e6
)))
51 verbose_print("task %s done" % ident
)
52 with self
.running_mutex
:
54 if self
.created
== NUMTASKS
and self
.running
== 0:
55 self
.done_mutex
.release()
57 def test_starting_threads(self
):
58 # Basic test for thread creation.
59 for i
in range(NUMTASKS
):
61 verbose_print("waiting for tasks to complete...")
62 self
.done_mutex
.acquire()
63 verbose_print("all tasks done")
65 def test_stack_size(self
):
66 # Various stack size tests.
67 self
.assertEquals(thread
.stack_size(), 0, "intial stack size is not 0")
70 self
.assertEquals(thread
.stack_size(), 0, "stack_size not reset to default")
72 if os
.name
not in ("nt", "os2", "posix"):
77 thread
.stack_size(4096)
79 verbose_print("caught expected ValueError setting "
83 verbose_print("platform does not support changing thread stack "
87 fail_msg
= "stack_size(%d) failed - should succeed"
88 for tss
in (262144, 0x100000, 0):
89 thread
.stack_size(tss
)
90 self
.assertEquals(thread
.stack_size(), tss
, fail_msg
% tss
)
91 verbose_print("successfully set stack_size(%d)" % tss
)
93 for tss
in (262144, 0x100000):
94 verbose_print("trying stack_size = (%d)" % tss
)
97 for i
in range(NUMTASKS
):
100 verbose_print("waiting for all tasks to complete")
101 self
.done_mutex
.acquire()
102 verbose_print("all tasks done")
106 def test__count(self
):
107 # Test the _count() function.
108 orig
= thread
._count
()
109 mut
= thread
.allocate_lock()
116 thread
.start_new_thread(task
, ())
119 self
.assertEquals(thread
._count
(), orig
+ 1)
120 # Allow the task to finish.
122 # The only reliable way to be sure that the thread ended from the
123 # interpreter's point of view is to wait for the function object to be
126 wr
= weakref
.ref(task
, lambda _
: done
.append(None))
130 self
.assertEquals(thread
._count
(), orig
)
134 def __init__(self
, num_threads
):
135 self
.num_threads
= num_threads
137 self
.checkin_mutex
= thread
.allocate_lock()
138 self
.checkout_mutex
= thread
.allocate_lock()
139 self
.checkout_mutex
.acquire()
142 self
.checkin_mutex
.acquire()
143 self
.waiting
= self
.waiting
+ 1
144 if self
.waiting
== self
.num_threads
:
145 self
.waiting
= self
.num_threads
- 1
146 self
.checkout_mutex
.release()
148 self
.checkin_mutex
.release()
150 self
.checkout_mutex
.acquire()
151 self
.waiting
= self
.waiting
- 1
152 if self
.waiting
== 0:
153 self
.checkin_mutex
.release()
155 self
.checkout_mutex
.release()
158 class BarrierTest(BasicThreadTest
):
160 def test_barrier(self
):
161 self
.bar
= Barrier(NUMTASKS
)
162 self
.running
= NUMTASKS
163 for i
in range(NUMTASKS
):
164 thread
.start_new_thread(self
.task2
, (i
,))
165 verbose_print("waiting for tasks to end")
166 self
.done_mutex
.acquire()
167 verbose_print("tasks done")
169 def task2(self
, ident
):
170 for i
in range(NUMTRIPS
):
172 # give it a good chance to enter the next
173 # barrier before the others are all out
177 with self
.random_mutex
:
178 delay
= random
.random() / 10000.0
179 verbose_print("task %s will run for %sus" %
180 (ident
, round(delay
* 1e6
)))
182 verbose_print("task %s entering %s" % (ident
, i
))
184 verbose_print("task %s leaving barrier" % ident
)
185 with self
.running_mutex
:
187 # Must release mutex before releasing done, else the main thread can
188 # exit and set mutex to None as part of global teardown; then
189 # mutex.release() raises AttributeError.
190 finished
= self
.running
== 0
192 self
.done_mutex
.release()
195 class LockTests(lock_tests
.LockTests
):
196 locktype
= thread
.allocate_lock
200 test_support
.run_unittest(ThreadRunningTests
, BarrierTest
, LockTests
)
202 if __name__
== "__main__":