1 # Some simple queue module tests, plus some failure conditions
2 # to ensure the Queue locks remain stable.
6 from test
import test_support
7 threading
= test_support
.import_module('threading')
11 # A thread to run a function that unclogs a blocked Queue.
12 class _TriggerThread(threading
.Thread
):
13 def __init__(self
, fn
, args
):
16 self
.startedEvent
= threading
.Event()
17 threading
.Thread
.__init
__(self
)
20 # The sleep isn't necessary, but is intended to give the blocking
21 # function in the main thread a chance at actually blocking before
22 # we unclog it. But if the sleep is longer than the timeout-based
23 # tests wait in their blocking functions, those tests will fail.
24 # So we give them much longer timeout values compared to the
25 # sleep here (I aimed at 10 seconds for blocking functions --
26 # they should never actually wait that long - they should make
27 # progress as soon as we call self.fn()).
29 self
.startedEvent
.set()
33 # Execute a function that blocks, and in a separate thread, a function that
34 # triggers the release. Returns the result of the blocking function. Caution:
35 # block_func must guarantee to block until trigger_func is called, and
36 # trigger_func must guarantee to change queue state so that block_func can make
37 # enough progress to return. In particular, a block_func that just raises an
38 # exception regardless of whether trigger_func is called will lead to
39 # timing-dependent sporadic failures, and one of those went rarely seen but
40 # undiagnosed for years. Now block_func must be unexceptional. If block_func
41 # is supposed to raise an exception, call do_exceptional_blocking_test()
44 class BlockingTestMixin
:
46 def do_blocking_test(self
, block_func
, block_args
, trigger_func
, trigger_args
):
47 self
.t
= _TriggerThread(trigger_func
, trigger_args
)
49 self
.result
= block_func(*block_args
)
50 # If block_func returned before our thread made the call, we failed!
51 if not self
.t
.startedEvent
.is_set():
52 self
.fail("blocking function '%r' appeared not to block" %
54 self
.t
.join(10) # make sure the thread terminates
56 self
.fail("trigger function '%r' appeared to not return" %
60 # Call this instead if block_func is supposed to raise an exception.
61 def do_exceptional_blocking_test(self
,block_func
, block_args
, trigger_func
,
62 trigger_args
, expected_exception_class
):
63 self
.t
= _TriggerThread(trigger_func
, trigger_args
)
67 block_func(*block_args
)
68 except expected_exception_class
:
71 self
.fail("expected exception of kind %r" %
72 expected_exception_class
)
74 self
.t
.join(10) # make sure the thread terminates
76 self
.fail("trigger function '%r' appeared to not return" %
78 if not self
.t
.startedEvent
.is_set():
79 self
.fail("trigger thread ended but event never set")
82 class BaseQueueTest(unittest
.TestCase
, BlockingTestMixin
):
85 self
.cumlock
= threading
.Lock()
87 def simple_queue_test(self
, q
):
89 raise RuntimeError, "Call this function with an empty queue"
90 # I guess we better check things actually queue correctly a little :)
94 target_order
= dict(Queue
= [111, 333, 222],
95 LifoQueue
= [222, 333, 111],
96 PriorityQueue
= [111, 222, 333])
97 actual_order
= [q
.get(), q
.get(), q
.get()]
98 self
.assertEquals(actual_order
, target_order
[q
.__class
__.__name
__],
99 "Didn't seem to queue the correct data!")
100 for i
in range(QUEUE_SIZE
-1):
102 self
.assertTrue(not q
.empty(), "Queue should not be empty")
103 self
.assertTrue(not q
.full(), "Queue should not be full")
104 last
= 2 * QUEUE_SIZE
105 full
= 3 * 2 * QUEUE_SIZE
107 self
.assertTrue(q
.full(), "Queue should be full")
110 self
.fail("Didn't appear to block with a full queue")
114 q
.put(full
, timeout
=0.01)
115 self
.fail("Didn't appear to time-out with a full queue")
118 # Test a blocking put
119 self
.do_blocking_test(q
.put
, (full
,), q
.get
, ())
120 self
.do_blocking_test(q
.put
, (full
, True, 10), q
.get
, ())
122 for i
in range(QUEUE_SIZE
):
124 self
.assertTrue(q
.empty(), "Queue should be empty")
127 self
.fail("Didn't appear to block with an empty queue")
132 self
.fail("Didn't appear to time-out with an empty queue")
135 # Test a blocking get
136 self
.do_blocking_test(q
.get
, (), q
.put
, ('empty',))
137 self
.do_blocking_test(q
.get
, (True, 10), q
.put
, ('empty',))
150 def queue_join_test(self
, q
):
153 threading
.Thread(target
=self
.worker
, args
=(q
,)).start()
154 for i
in xrange(100):
157 self
.assertEquals(self
.cum
, sum(range(100)),
158 "q.join() did not block until all tasks were done")
160 q
.put(None) # instruct the threads to close
161 q
.join() # verify that you can join twice
163 def test_queue_task_done(self
):
164 # Test to make sure a queue task completed successfully.
171 self
.fail("Did not detect task count going negative")
173 def test_queue_join(self
):
174 # Test that a queue join()s successfully, and before anything else
175 # (done twice for insurance).
177 self
.queue_join_test(q
)
178 self
.queue_join_test(q
)
184 self
.fail("Did not detect task count going negative")
186 def test_simple_queue(self
):
187 # Do it a couple of times on the same queue.
188 # Done twice to make sure works with same instance reused.
189 q
= self
.type2test(QUEUE_SIZE
)
190 self
.simple_queue_test(q
)
191 self
.simple_queue_test(q
)
194 class QueueTest(BaseQueueTest
):
195 type2test
= Queue
.Queue
197 class LifoQueueTest(BaseQueueTest
):
198 type2test
= Queue
.LifoQueue
200 class PriorityQueueTest(BaseQueueTest
):
201 type2test
= Queue
.PriorityQueue
205 # A Queue subclass that can provoke failure at a moment's notice :)
206 class FailingQueueException(Exception):
209 class FailingQueue(Queue
.Queue
):
210 def __init__(self
, *args
):
211 self
.fail_next_put
= False
212 self
.fail_next_get
= False
213 Queue
.Queue
.__init
__(self
, *args
)
214 def _put(self
, item
):
215 if self
.fail_next_put
:
216 self
.fail_next_put
= False
217 raise FailingQueueException
, "You Lose"
218 return Queue
.Queue
._put
(self
, item
)
220 if self
.fail_next_get
:
221 self
.fail_next_get
= False
222 raise FailingQueueException
, "You Lose"
223 return Queue
.Queue
._get
(self
)
225 class FailingQueueTest(unittest
.TestCase
, BlockingTestMixin
):
227 def failing_queue_test(self
, q
):
229 raise RuntimeError, "Call this function with an empty queue"
230 for i
in range(QUEUE_SIZE
-1):
232 # Test a failing non-blocking put.
233 q
.fail_next_put
= True
235 q
.put("oops", block
=0)
236 self
.fail("The queue didn't fail when it should have")
237 except FailingQueueException
:
239 q
.fail_next_put
= True
241 q
.put("oops", timeout
=0.1)
242 self
.fail("The queue didn't fail when it should have")
243 except FailingQueueException
:
246 self
.assertTrue(q
.full(), "Queue should be full")
247 # Test a failing blocking put
248 q
.fail_next_put
= True
250 self
.do_blocking_test(q
.put
, ("full",), q
.get
, ())
251 self
.fail("The queue didn't fail when it should have")
252 except FailingQueueException
:
254 # Check the Queue isn't damaged.
255 # put failed, but get succeeded - re-add
257 # Test a failing timeout put
258 q
.fail_next_put
= True
260 self
.do_exceptional_blocking_test(q
.put
, ("full", True, 10), q
.get
, (),
261 FailingQueueException
)
262 self
.fail("The queue didn't fail when it should have")
263 except FailingQueueException
:
265 # Check the Queue isn't damaged.
266 # put failed, but get succeeded - re-add
268 self
.assertTrue(q
.full(), "Queue should be full")
270 self
.assertTrue(not q
.full(), "Queue should not be full")
272 self
.assertTrue(q
.full(), "Queue should be full")
273 # Test a blocking put
274 self
.do_blocking_test(q
.put
, ("full",), q
.get
, ())
276 for i
in range(QUEUE_SIZE
):
278 self
.assertTrue(q
.empty(), "Queue should be empty")
280 q
.fail_next_get
= True
283 self
.fail("The queue didn't fail when it should have")
284 except FailingQueueException
:
286 self
.assertTrue(not q
.empty(), "Queue should not be empty")
287 q
.fail_next_get
= True
290 self
.fail("The queue didn't fail when it should have")
291 except FailingQueueException
:
293 self
.assertTrue(not q
.empty(), "Queue should not be empty")
295 self
.assertTrue(q
.empty(), "Queue should be empty")
296 q
.fail_next_get
= True
298 self
.do_exceptional_blocking_test(q
.get
, (), q
.put
, ('empty',),
299 FailingQueueException
)
300 self
.fail("The queue didn't fail when it should have")
301 except FailingQueueException
:
303 # put succeeded, but get failed.
304 self
.assertTrue(not q
.empty(), "Queue should not be empty")
306 self
.assertTrue(q
.empty(), "Queue should be empty")
308 def test_failing_queue(self
):
309 # Test to make sure a queue is functioning correctly.
310 # Done twice to the same instance.
311 q
= FailingQueue(QUEUE_SIZE
)
312 self
.failing_queue_test(q
)
313 self
.failing_queue_test(q
)
317 test_support
.run_unittest(QueueTest
, LifoQueueTest
, PriorityQueueTest
,
321 if __name__
== "__main__":