2 Various tests for synchronization primitives.
7 from thread
import start_new_thread
, get_ident
11 from test
import test_support
as support
15 # A crude wait/yield function not relying on synchronization primitives.
22 def __init__(self
, f
, n
, wait_before_exit
=False):
24 Construct a bunch of `n` threads running the same function `f`.
25 If `wait_before_exit` is True, the threads won't terminate until
26 do_finish() is called.
32 self
._can
_exit
= not wait_before_exit
35 self
.started
.append(tid
)
39 self
.finished
.append(tid
)
40 while not self
._can
_exit
:
43 start_new_thread(task
, ())
45 def wait_for_started(self
):
46 while len(self
.started
) < self
.n
:
49 def wait_for_finished(self
):
50 while len(self
.finished
) < self
.n
:
57 class BaseTestCase(unittest
.TestCase
):
59 self
._threads
= support
.threading_setup()
62 support
.threading_cleanup(*self
._threads
)
63 support
.reap_children()
66 class BaseLockTests(BaseTestCase
):
68 Tests for both recursive and non-recursive locks.
71 def test_constructor(self
):
72 lock
= self
.locktype()
75 def test_acquire_destroy(self
):
76 lock
= self
.locktype()
80 def test_acquire_release(self
):
81 lock
= self
.locktype()
86 def test_try_acquire(self
):
87 lock
= self
.locktype()
88 self
.assertTrue(lock
.acquire(False))
91 def test_try_acquire_contended(self
):
92 lock
= self
.locktype()
96 result
.append(lock
.acquire(False))
97 Bunch(f
, 1).wait_for_finished()
98 self
.assertFalse(result
[0])
101 def test_acquire_contended(self
):
102 lock
= self
.locktype()
112 self
.assertEqual(len(b
.finished
), 0)
114 b
.wait_for_finished()
115 self
.assertEqual(len(b
.finished
), N
)
118 lock
= self
.locktype()
127 # Check the lock is unacquired
128 Bunch(f
, 1).wait_for_finished()
129 self
.assertRaises(TypeError, _with
, TypeError)
130 # Check the lock is unacquired
131 Bunch(f
, 1).wait_for_finished()
133 def test_thread_leak(self
):
134 # The lock shouldn't leak a Thread instance when used from a foreign
135 # (non-threading) thread.
136 lock
= self
.locktype()
140 n
= len(threading
.enumerate())
141 # We run many threads in the hope that existing threads ids won't
143 Bunch(f
, 15).wait_for_finished()
144 self
.assertEqual(n
, len(threading
.enumerate()))
147 class LockTests(BaseLockTests
):
149 Tests for non-recursive, weak locks
150 (which can be acquired and released from different threads).
152 def test_reacquire(self
):
153 # Lock needs to be released before re-acquiring.
154 lock
= self
.locktype()
161 start_new_thread(f
, ())
162 while len(phase
) == 0:
165 self
.assertEqual(len(phase
), 1)
167 while len(phase
) == 1:
169 self
.assertEqual(len(phase
), 2)
171 def test_different_thread(self
):
172 # Lock can be released from a different thread.
173 lock
= self
.locktype()
178 b
.wait_for_finished()
183 class RLockTests(BaseLockTests
):
185 Tests for recursive locks.
187 def test_reacquire(self
):
188 lock
= self
.locktype()
196 def test_release_unacquired(self
):
197 # Cannot release an unacquired lock
198 lock
= self
.locktype()
199 self
.assertRaises(RuntimeError, lock
.release
)
206 self
.assertRaises(RuntimeError, lock
.release
)
208 def test_different_thread(self
):
209 # Cannot release from a different thread
210 lock
= self
.locktype()
213 b
= Bunch(f
, 1, True)
215 self
.assertRaises(RuntimeError, lock
.release
)
219 def test__is_owned(self
):
220 lock
= self
.locktype()
221 self
.assertFalse(lock
._is
_owned
())
223 self
.assertTrue(lock
._is
_owned
())
225 self
.assertTrue(lock
._is
_owned
())
228 result
.append(lock
._is
_owned
())
229 Bunch(f
, 1).wait_for_finished()
230 self
.assertFalse(result
[0])
232 self
.assertTrue(lock
._is
_owned
())
234 self
.assertFalse(lock
._is
_owned
())
237 class EventTests(BaseTestCase
):
239 Tests for Event objects.
242 def test_is_set(self
):
243 evt
= self
.eventtype()
244 self
.assertFalse(evt
.is_set())
246 self
.assertTrue(evt
.is_set())
248 self
.assertTrue(evt
.is_set())
250 self
.assertFalse(evt
.is_set())
252 self
.assertFalse(evt
.is_set())
254 def _check_notify(self
, evt
):
255 # All threads get notified
260 results1
.append(evt
.wait())
261 results2
.append(evt
.wait())
265 self
.assertEqual(len(results1
), 0)
267 b
.wait_for_finished()
268 self
.assertEqual(results1
, [True] * N
)
269 self
.assertEqual(results2
, [True] * N
)
271 def test_notify(self
):
272 evt
= self
.eventtype()
273 self
._check
_notify
(evt
)
274 # Another time, after an explicit clear()
277 self
._check
_notify
(evt
)
279 def test_timeout(self
):
280 evt
= self
.eventtype()
285 results1
.append(evt
.wait(0.0))
289 results2
.append((r
, t2
- t1
))
290 Bunch(f
, N
).wait_for_finished()
291 self
.assertEqual(results1
, [False] * N
)
292 for r
, dt
in results2
:
294 self
.assertTrue(dt
>= 0.2, dt
)
299 Bunch(f
, N
).wait_for_finished()
300 self
.assertEqual(results1
, [True] * N
)
301 for r
, dt
in results2
:
305 class ConditionTests(BaseTestCase
):
307 Tests for condition variables.
310 def test_acquire(self
):
311 cond
= self
.condtype()
312 # Be default we have an RLock: the condition can be acquired multiple
318 lock
= threading
.Lock()
319 cond
= self
.condtype(lock
)
321 self
.assertFalse(lock
.acquire(False))
323 self
.assertTrue(lock
.acquire(False))
324 self
.assertFalse(cond
.acquire(False))
327 self
.assertFalse(lock
.acquire(False))
329 def test_unacquired_wait(self
):
330 cond
= self
.condtype()
331 self
.assertRaises(RuntimeError, cond
.wait
)
333 def test_unacquired_notify(self
):
334 cond
= self
.condtype()
335 self
.assertRaises(RuntimeError, cond
.notify
)
337 def _check_notify(self
, cond
):
346 results1
.append(phase_num
)
350 results2
.append(phase_num
)
354 self
.assertEqual(results1
, [])
355 # Notify 3 threads at first
361 while len(results1
) < 3:
363 self
.assertEqual(results1
, [1] * 3)
364 self
.assertEqual(results2
, [])
365 # Notify 5 threads: they might be in their first or second wait
371 while len(results1
) + len(results2
) < 8:
373 self
.assertEqual(results1
, [1] * 3 + [2] * 2)
374 self
.assertEqual(results2
, [2] * 3)
375 # Notify all threads: they are all in their second wait
381 while len(results2
) < 5:
383 self
.assertEqual(results1
, [1] * 3 + [2] * 2)
384 self
.assertEqual(results2
, [2] * 3 + [3] * 2)
385 b
.wait_for_finished()
387 def test_notify(self
):
388 cond
= self
.condtype()
389 self
._check
_notify
(cond
)
390 # A second time, to check internal state is still ok.
391 self
._check
_notify
(cond
)
393 def test_timeout(self
):
394 cond
= self
.condtype()
403 results
.append(t2
- t1
)
404 Bunch(f
, N
).wait_for_finished()
405 self
.assertEqual(len(results
), 5)
407 self
.assertTrue(dt
>= 0.2, dt
)
410 class BaseSemaphoreTests(BaseTestCase
):
412 Common tests for {bounded, unbounded} semaphore objects.
415 def test_constructor(self
):
416 self
.assertRaises(ValueError, self
.semtype
, value
= -1)
417 self
.assertRaises(ValueError, self
.semtype
, value
= -sys
.maxint
)
419 def test_acquire(self
):
420 sem
= self
.semtype(1)
423 sem
= self
.semtype(2)
429 def test_acquire_destroy(self
):
434 def test_acquire_contended(self
):
435 sem
= self
.semtype(7)
443 results1
.append(phase_num
)
445 results2
.append(phase_num
)
448 while len(results1
) + len(results2
) < 6:
450 self
.assertEqual(results1
+ results2
, [0] * 6)
454 while len(results1
) + len(results2
) < 13:
456 self
.assertEqual(sorted(results1
+ results2
), [0] * 6 + [1] * 7)
460 while len(results1
) + len(results2
) < 19:
462 self
.assertEqual(sorted(results1
+ results2
), [0] * 6 + [1] * 7 + [2] * 6)
463 # The semaphore is still locked
464 self
.assertFalse(sem
.acquire(False))
465 # Final release, to let the last thread finish
467 b
.wait_for_finished()
469 def test_try_acquire(self
):
470 sem
= self
.semtype(2)
471 self
.assertTrue(sem
.acquire(False))
472 self
.assertTrue(sem
.acquire(False))
473 self
.assertFalse(sem
.acquire(False))
475 self
.assertTrue(sem
.acquire(False))
477 def test_try_acquire_contended(self
):
478 sem
= self
.semtype(4)
482 results
.append(sem
.acquire(False))
483 results
.append(sem
.acquire(False))
484 Bunch(f
, 5).wait_for_finished()
485 # There can be a thread switch between acquiring the semaphore and
486 # appending the result, therefore results will not necessarily be
488 self
.assertEqual(sorted(results
), [False] * 7 + [True] * 3 )
490 def test_default_value(self
):
491 # The default initial value is 1.
500 self
.assertFalse(b
.finished
)
502 b
.wait_for_finished()
505 sem
= self
.semtype(2)
508 self
.assertTrue(sem
.acquire(False))
511 self
.assertFalse(sem
.acquire(False))
515 self
.assertTrue(sem
.acquire(False))
517 self
.assertRaises(TypeError, _with
, TypeError)
518 self
.assertTrue(sem
.acquire(False))
521 class SemaphoreTests(BaseSemaphoreTests
):
523 Tests for unbounded semaphores.
526 def test_release_unacquired(self
):
527 # Unbounded releases are allowed and increment the semaphore's value
528 sem
= self
.semtype(1)
535 class BoundedSemaphoreTests(BaseSemaphoreTests
):
537 Tests for bounded semaphores.
540 def test_release_unacquired(self
):
541 # Cannot go past the initial value
543 self
.assertRaises(ValueError, sem
.release
)
546 self
.assertRaises(ValueError, sem
.release
)