4 from test
import test_support
5 thread
= test_support
.import_module('thread')
10 from test
import lock_tests
16 _print_mutex
= thread
.allocate_lock()
18 def verbose_print(arg
):
19 """Helper function for printing out debugging output."""
20 if test_support
.verbose
:
25 class BasicThreadTest(unittest
.TestCase
):
28 self
.done_mutex
= thread
.allocate_lock()
29 self
.done_mutex
.acquire()
30 self
.running_mutex
= thread
.allocate_lock()
31 self
.random_mutex
= thread
.allocate_lock()
37 class ThreadRunningTests(BasicThreadTest
):
40 with self
.running_mutex
:
42 verbose_print("creating task %s" % self
.next_ident
)
43 thread
.start_new_thread(self
.task
, (self
.next_ident
,))
47 def task(self
, ident
):
48 with self
.random_mutex
:
49 delay
= random
.random() / 10000.0
50 verbose_print("task %s will run for %sus" % (ident
, round(delay
*1e6
)))
52 verbose_print("task %s done" % ident
)
53 with self
.running_mutex
:
55 if self
.created
== NUMTASKS
and self
.running
== 0:
56 self
.done_mutex
.release()
58 def test_starting_threads(self
):
59 # Basic test for thread creation.
60 for i
in range(NUMTASKS
):
62 verbose_print("waiting for tasks to complete...")
63 self
.done_mutex
.acquire()
64 verbose_print("all tasks done")
66 def test_stack_size(self
):
67 # Various stack size tests.
68 self
.assertEqual(thread
.stack_size(), 0, "initial stack size is not 0")
71 self
.assertEqual(thread
.stack_size(), 0, "stack_size not reset to default")
73 if os
.name
not in ("nt", "os2", "posix"):
78 thread
.stack_size(4096)
80 verbose_print("caught expected ValueError setting "
84 verbose_print("platform does not support changing thread stack "
88 fail_msg
= "stack_size(%d) failed - should succeed"
89 for tss
in (262144, 0x100000, 0):
90 thread
.stack_size(tss
)
91 self
.assertEqual(thread
.stack_size(), tss
, fail_msg
% tss
)
92 verbose_print("successfully set stack_size(%d)" % tss
)
94 for tss
in (262144, 0x100000):
95 verbose_print("trying stack_size = (%d)" % tss
)
98 for i
in range(NUMTASKS
):
101 verbose_print("waiting for all tasks to complete")
102 self
.done_mutex
.acquire()
103 verbose_print("all tasks done")
107 def test__count(self
):
108 # Test the _count() function.
109 orig
= thread
._count
()
110 mut
= thread
.allocate_lock()
117 thread
.start_new_thread(task
, ())
120 self
.assertEqual(thread
._count
(), orig
+ 1)
121 # Allow the task to finish.
123 # The only reliable way to be sure that the thread ended from the
124 # interpreter's point of view is to wait for the function object to be
127 wr
= weakref
.ref(task
, lambda _
: done
.append(None))
131 self
.assertEqual(thread
._count
(), orig
)
135 def __init__(self
, num_threads
):
136 self
.num_threads
= num_threads
138 self
.checkin_mutex
= thread
.allocate_lock()
139 self
.checkout_mutex
= thread
.allocate_lock()
140 self
.checkout_mutex
.acquire()
143 self
.checkin_mutex
.acquire()
144 self
.waiting
= self
.waiting
+ 1
145 if self
.waiting
== self
.num_threads
:
146 self
.waiting
= self
.num_threads
- 1
147 self
.checkout_mutex
.release()
149 self
.checkin_mutex
.release()
151 self
.checkout_mutex
.acquire()
152 self
.waiting
= self
.waiting
- 1
153 if self
.waiting
== 0:
154 self
.checkin_mutex
.release()
156 self
.checkout_mutex
.release()
159 class BarrierTest(BasicThreadTest
):
161 def test_barrier(self
):
162 self
.bar
= Barrier(NUMTASKS
)
163 self
.running
= NUMTASKS
164 for i
in range(NUMTASKS
):
165 thread
.start_new_thread(self
.task2
, (i
,))
166 verbose_print("waiting for tasks to end")
167 self
.done_mutex
.acquire()
168 verbose_print("tasks done")
170 def task2(self
, ident
):
171 for i
in range(NUMTRIPS
):
173 # give it a good chance to enter the next
174 # barrier before the others are all out
178 with self
.random_mutex
:
179 delay
= random
.random() / 10000.0
180 verbose_print("task %s will run for %sus" %
181 (ident
, round(delay
* 1e6
)))
183 verbose_print("task %s entering %s" % (ident
, i
))
185 verbose_print("task %s leaving barrier" % ident
)
186 with self
.running_mutex
:
188 # Must release mutex before releasing done, else the main thread can
189 # exit and set mutex to None as part of global teardown; then
190 # mutex.release() raises AttributeError.
191 finished
= self
.running
== 0
193 self
.done_mutex
.release()
196 class LockTests(lock_tests
.LockTests
):
197 locktype
= thread
.allocate_lock
200 class TestForkInThread(unittest
.TestCase
):
202 self
.read_fd
, self
.write_fd
= os
.pipe()
204 @unittest.skipIf(sys
.platform
.startswith('win'),
205 "This test is only appropriate for POSIX-like systems.")
206 @test_support.reap_threads
207 def test_forkinthread(self
):
210 pid
= os
.fork() # fork in a thread
212 sys
.exit(0) # exit the child
215 os
.close(self
.read_fd
)
216 os
.write(self
.write_fd
, "OK")
219 os
.close(self
.write_fd
)
221 thread
.start_new_thread(thread1
, ())
222 self
.assertEqual(os
.read(self
.read_fd
, 2), "OK",
223 "Unable to fork() in thread")
227 os
.close(self
.read_fd
)
232 os
.close(self
.write_fd
)
238 test_support
.run_unittest(ThreadRunningTests
, BarrierTest
, LockTests
,
241 if __name__
== "__main__":