move sections
[python/dscho.git] / Lib / test / lock_tests.py
blob966f9bd7ca0b710c7ddf4ff2ac2da793708d3f4e
1 """
2 Various tests for synchronization primitives.
3 """
5 import sys
6 import time
7 from thread import start_new_thread, get_ident
8 import threading
9 import unittest
11 from test import test_support as support
14 def _wait():
15 # A crude wait/yield function not relying on synchronization primitives.
16 time.sleep(0.01)
18 class Bunch(object):
19 """
20 A bunch of threads.
21 """
22 def __init__(self, f, n, wait_before_exit=False):
23 """
24 Construct a bunch of `n` threads running the same function `f`.
25 If `wait_before_exit` is True, the threads won't terminate until
26 do_finish() is called.
27 """
28 self.f = f
29 self.n = n
30 self.started = []
31 self.finished = []
32 self._can_exit = not wait_before_exit
33 def task():
34 tid = get_ident()
35 self.started.append(tid)
36 try:
37 f()
38 finally:
39 self.finished.append(tid)
40 while not self._can_exit:
41 _wait()
42 for i in range(n):
43 start_new_thread(task, ())
45 def wait_for_started(self):
46 while len(self.started) < self.n:
47 _wait()
49 def wait_for_finished(self):
50 while len(self.finished) < self.n:
51 _wait()
53 def do_finish(self):
54 self._can_exit = True
57 class BaseTestCase(unittest.TestCase):
58 def setUp(self):
59 self._threads = support.threading_setup()
61 def tearDown(self):
62 support.threading_cleanup(*self._threads)
63 support.reap_children()
66 class BaseLockTests(BaseTestCase):
67 """
68 Tests for both recursive and non-recursive locks.
69 """
71 def test_constructor(self):
72 lock = self.locktype()
73 del lock
75 def test_acquire_destroy(self):
76 lock = self.locktype()
77 lock.acquire()
78 del lock
80 def test_acquire_release(self):
81 lock = self.locktype()
82 lock.acquire()
83 lock.release()
84 del lock
86 def test_try_acquire(self):
87 lock = self.locktype()
88 self.assertTrue(lock.acquire(False))
89 lock.release()
91 def test_try_acquire_contended(self):
92 lock = self.locktype()
93 lock.acquire()
94 result = []
95 def f():
96 result.append(lock.acquire(False))
97 Bunch(f, 1).wait_for_finished()
98 self.assertFalse(result[0])
99 lock.release()
101 def test_acquire_contended(self):
102 lock = self.locktype()
103 lock.acquire()
104 N = 5
105 def f():
106 lock.acquire()
107 lock.release()
109 b = Bunch(f, N)
110 b.wait_for_started()
111 _wait()
112 self.assertEqual(len(b.finished), 0)
113 lock.release()
114 b.wait_for_finished()
115 self.assertEqual(len(b.finished), N)
117 def test_with(self):
118 lock = self.locktype()
119 def f():
120 lock.acquire()
121 lock.release()
122 def _with(err=None):
123 with lock:
124 if err is not None:
125 raise err
126 _with()
127 # Check the lock is unacquired
128 Bunch(f, 1).wait_for_finished()
129 self.assertRaises(TypeError, _with, TypeError)
130 # Check the lock is unacquired
131 Bunch(f, 1).wait_for_finished()
133 def test_thread_leak(self):
134 # The lock shouldn't leak a Thread instance when used from a foreign
135 # (non-threading) thread.
136 lock = self.locktype()
137 def f():
138 lock.acquire()
139 lock.release()
140 n = len(threading.enumerate())
141 # We run many threads in the hope that existing threads ids won't
142 # be recycled.
143 Bunch(f, 15).wait_for_finished()
144 self.assertEqual(n, len(threading.enumerate()))
147 class LockTests(BaseLockTests):
149 Tests for non-recursive, weak locks
150 (which can be acquired and released from different threads).
152 def test_reacquire(self):
153 # Lock needs to be released before re-acquiring.
154 lock = self.locktype()
155 phase = []
156 def f():
157 lock.acquire()
158 phase.append(None)
159 lock.acquire()
160 phase.append(None)
161 start_new_thread(f, ())
162 while len(phase) == 0:
163 _wait()
164 _wait()
165 self.assertEqual(len(phase), 1)
166 lock.release()
167 while len(phase) == 1:
168 _wait()
169 self.assertEqual(len(phase), 2)
171 def test_different_thread(self):
172 # Lock can be released from a different thread.
173 lock = self.locktype()
174 lock.acquire()
175 def f():
176 lock.release()
177 b = Bunch(f, 1)
178 b.wait_for_finished()
179 lock.acquire()
180 lock.release()
183 class RLockTests(BaseLockTests):
185 Tests for recursive locks.
187 def test_reacquire(self):
188 lock = self.locktype()
189 lock.acquire()
190 lock.acquire()
191 lock.release()
192 lock.acquire()
193 lock.release()
194 lock.release()
196 def test_release_unacquired(self):
197 # Cannot release an unacquired lock
198 lock = self.locktype()
199 self.assertRaises(RuntimeError, lock.release)
200 lock.acquire()
201 lock.acquire()
202 lock.release()
203 lock.acquire()
204 lock.release()
205 lock.release()
206 self.assertRaises(RuntimeError, lock.release)
208 def test_different_thread(self):
209 # Cannot release from a different thread
210 lock = self.locktype()
211 def f():
212 lock.acquire()
213 b = Bunch(f, 1, True)
214 try:
215 self.assertRaises(RuntimeError, lock.release)
216 finally:
217 b.do_finish()
219 def test__is_owned(self):
220 lock = self.locktype()
221 self.assertFalse(lock._is_owned())
222 lock.acquire()
223 self.assertTrue(lock._is_owned())
224 lock.acquire()
225 self.assertTrue(lock._is_owned())
226 result = []
227 def f():
228 result.append(lock._is_owned())
229 Bunch(f, 1).wait_for_finished()
230 self.assertFalse(result[0])
231 lock.release()
232 self.assertTrue(lock._is_owned())
233 lock.release()
234 self.assertFalse(lock._is_owned())
237 class EventTests(BaseTestCase):
239 Tests for Event objects.
242 def test_is_set(self):
243 evt = self.eventtype()
244 self.assertFalse(evt.is_set())
245 evt.set()
246 self.assertTrue(evt.is_set())
247 evt.set()
248 self.assertTrue(evt.is_set())
249 evt.clear()
250 self.assertFalse(evt.is_set())
251 evt.clear()
252 self.assertFalse(evt.is_set())
254 def _check_notify(self, evt):
255 # All threads get notified
256 N = 5
257 results1 = []
258 results2 = []
259 def f():
260 results1.append(evt.wait())
261 results2.append(evt.wait())
262 b = Bunch(f, N)
263 b.wait_for_started()
264 _wait()
265 self.assertEqual(len(results1), 0)
266 evt.set()
267 b.wait_for_finished()
268 self.assertEqual(results1, [True] * N)
269 self.assertEqual(results2, [True] * N)
271 def test_notify(self):
272 evt = self.eventtype()
273 self._check_notify(evt)
274 # Another time, after an explicit clear()
275 evt.set()
276 evt.clear()
277 self._check_notify(evt)
279 def test_timeout(self):
280 evt = self.eventtype()
281 results1 = []
282 results2 = []
283 N = 5
284 def f():
285 results1.append(evt.wait(0.0))
286 t1 = time.time()
287 r = evt.wait(0.2)
288 t2 = time.time()
289 results2.append((r, t2 - t1))
290 Bunch(f, N).wait_for_finished()
291 self.assertEqual(results1, [False] * N)
292 for r, dt in results2:
293 self.assertFalse(r)
294 self.assertTrue(dt >= 0.2, dt)
295 # The event is set
296 results1 = []
297 results2 = []
298 evt.set()
299 Bunch(f, N).wait_for_finished()
300 self.assertEqual(results1, [True] * N)
301 for r, dt in results2:
302 self.assertTrue(r)
305 class ConditionTests(BaseTestCase):
307 Tests for condition variables.
310 def test_acquire(self):
311 cond = self.condtype()
312 # Be default we have an RLock: the condition can be acquired multiple
313 # times.
314 cond.acquire()
315 cond.acquire()
316 cond.release()
317 cond.release()
318 lock = threading.Lock()
319 cond = self.condtype(lock)
320 cond.acquire()
321 self.assertFalse(lock.acquire(False))
322 cond.release()
323 self.assertTrue(lock.acquire(False))
324 self.assertFalse(cond.acquire(False))
325 lock.release()
326 with cond:
327 self.assertFalse(lock.acquire(False))
329 def test_unacquired_wait(self):
330 cond = self.condtype()
331 self.assertRaises(RuntimeError, cond.wait)
333 def test_unacquired_notify(self):
334 cond = self.condtype()
335 self.assertRaises(RuntimeError, cond.notify)
337 def _check_notify(self, cond):
338 N = 5
339 results1 = []
340 results2 = []
341 phase_num = 0
342 def f():
343 cond.acquire()
344 cond.wait()
345 cond.release()
346 results1.append(phase_num)
347 cond.acquire()
348 cond.wait()
349 cond.release()
350 results2.append(phase_num)
351 b = Bunch(f, N)
352 b.wait_for_started()
353 _wait()
354 self.assertEqual(results1, [])
355 # Notify 3 threads at first
356 cond.acquire()
357 cond.notify(3)
358 _wait()
359 phase_num = 1
360 cond.release()
361 while len(results1) < 3:
362 _wait()
363 self.assertEqual(results1, [1] * 3)
364 self.assertEqual(results2, [])
365 # Notify 5 threads: they might be in their first or second wait
366 cond.acquire()
367 cond.notify(5)
368 _wait()
369 phase_num = 2
370 cond.release()
371 while len(results1) + len(results2) < 8:
372 _wait()
373 self.assertEqual(results1, [1] * 3 + [2] * 2)
374 self.assertEqual(results2, [2] * 3)
375 # Notify all threads: they are all in their second wait
376 cond.acquire()
377 cond.notify_all()
378 _wait()
379 phase_num = 3
380 cond.release()
381 while len(results2) < 5:
382 _wait()
383 self.assertEqual(results1, [1] * 3 + [2] * 2)
384 self.assertEqual(results2, [2] * 3 + [3] * 2)
385 b.wait_for_finished()
387 def test_notify(self):
388 cond = self.condtype()
389 self._check_notify(cond)
390 # A second time, to check internal state is still ok.
391 self._check_notify(cond)
393 def test_timeout(self):
394 cond = self.condtype()
395 results = []
396 N = 5
397 def f():
398 cond.acquire()
399 t1 = time.time()
400 cond.wait(0.2)
401 t2 = time.time()
402 cond.release()
403 results.append(t2 - t1)
404 Bunch(f, N).wait_for_finished()
405 self.assertEqual(len(results), 5)
406 for dt in results:
407 self.assertTrue(dt >= 0.2, dt)
410 class BaseSemaphoreTests(BaseTestCase):
412 Common tests for {bounded, unbounded} semaphore objects.
415 def test_constructor(self):
416 self.assertRaises(ValueError, self.semtype, value = -1)
417 self.assertRaises(ValueError, self.semtype, value = -sys.maxint)
419 def test_acquire(self):
420 sem = self.semtype(1)
421 sem.acquire()
422 sem.release()
423 sem = self.semtype(2)
424 sem.acquire()
425 sem.acquire()
426 sem.release()
427 sem.release()
429 def test_acquire_destroy(self):
430 sem = self.semtype()
431 sem.acquire()
432 del sem
434 def test_acquire_contended(self):
435 sem = self.semtype(7)
436 sem.acquire()
437 N = 10
438 results1 = []
439 results2 = []
440 phase_num = 0
441 def f():
442 sem.acquire()
443 results1.append(phase_num)
444 sem.acquire()
445 results2.append(phase_num)
446 b = Bunch(f, 10)
447 b.wait_for_started()
448 while len(results1) + len(results2) < 6:
449 _wait()
450 self.assertEqual(results1 + results2, [0] * 6)
451 phase_num = 1
452 for i in range(7):
453 sem.release()
454 while len(results1) + len(results2) < 13:
455 _wait()
456 self.assertEqual(sorted(results1 + results2), [0] * 6 + [1] * 7)
457 phase_num = 2
458 for i in range(6):
459 sem.release()
460 while len(results1) + len(results2) < 19:
461 _wait()
462 self.assertEqual(sorted(results1 + results2), [0] * 6 + [1] * 7 + [2] * 6)
463 # The semaphore is still locked
464 self.assertFalse(sem.acquire(False))
465 # Final release, to let the last thread finish
466 sem.release()
467 b.wait_for_finished()
469 def test_try_acquire(self):
470 sem = self.semtype(2)
471 self.assertTrue(sem.acquire(False))
472 self.assertTrue(sem.acquire(False))
473 self.assertFalse(sem.acquire(False))
474 sem.release()
475 self.assertTrue(sem.acquire(False))
477 def test_try_acquire_contended(self):
478 sem = self.semtype(4)
479 sem.acquire()
480 results = []
481 def f():
482 results.append(sem.acquire(False))
483 results.append(sem.acquire(False))
484 Bunch(f, 5).wait_for_finished()
485 # There can be a thread switch between acquiring the semaphore and
486 # appending the result, therefore results will not necessarily be
487 # ordered.
488 self.assertEqual(sorted(results), [False] * 7 + [True] * 3 )
490 def test_default_value(self):
491 # The default initial value is 1.
492 sem = self.semtype()
493 sem.acquire()
494 def f():
495 sem.acquire()
496 sem.release()
497 b = Bunch(f, 1)
498 b.wait_for_started()
499 _wait()
500 self.assertFalse(b.finished)
501 sem.release()
502 b.wait_for_finished()
504 def test_with(self):
505 sem = self.semtype(2)
506 def _with(err=None):
507 with sem:
508 self.assertTrue(sem.acquire(False))
509 sem.release()
510 with sem:
511 self.assertFalse(sem.acquire(False))
512 if err:
513 raise err
514 _with()
515 self.assertTrue(sem.acquire(False))
516 sem.release()
517 self.assertRaises(TypeError, _with, TypeError)
518 self.assertTrue(sem.acquire(False))
519 sem.release()
521 class SemaphoreTests(BaseSemaphoreTests):
523 Tests for unbounded semaphores.
526 def test_release_unacquired(self):
527 # Unbounded releases are allowed and increment the semaphore's value
528 sem = self.semtype(1)
529 sem.release()
530 sem.acquire()
531 sem.acquire()
532 sem.release()
535 class BoundedSemaphoreTests(BaseSemaphoreTests):
537 Tests for bounded semaphores.
540 def test_release_unacquired(self):
541 # Cannot go past the initial value
542 sem = self.semtype()
543 self.assertRaises(ValueError, sem.release)
544 sem.acquire()
545 sem.release()
546 self.assertRaises(ValueError, sem.release)