3 # Copyright 2007 Google Inc.
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
17 """A thread-safe queue in which removed objects put back to the front."""
25 logger
= logging
.getLogger('google.appengine.tools.requeue')
28 class ReQueue(object):
29 """A special thread-safe queue.
31 A ReQueue allows unfinished work items to be returned with a call to
32 reput(). When an item is reput, task_done() should *not* be called
33 in addition, getting an item that has been reput does not increase
34 the number of outstanding tasks.
36 This class shares an interface with Queue.Queue and provides the
37 additional reput method.
42 requeue_capacity
=None,
43 queue_factory
=Queue
.Queue
,
45 """Initialize a ReQueue instance.
48 queue_capacity: The number of items that can be put in the ReQueue.
49 requeue_capacity: The numer of items that can be reput in the ReQueue.
50 queue_factory: Used for dependency injection.
51 get_time: Used for dependency injection.
53 if requeue_capacity
is None:
54 requeue_capacity
= queue_capacity
56 self
.get_time
= get_time
57 self
.queue
= queue_factory(queue_capacity
)
58 self
.requeue
= queue_factory(requeue_capacity
)
59 self
.lock
= threading
.Lock()
60 self
.put_cond
= threading
.Condition(self
.lock
)
61 self
.get_cond
= threading
.Condition(self
.lock
)
63 def _DoWithTimeout(self
,
71 """Performs the given action with a timeout.
73 The action must be non-blocking, and raise an instance of exc on a
74 recoverable failure. If the action fails with an instance of exc,
75 we wait on wait_cond before trying again. Failure after the
76 timeout is reached is propagated as an exception. Success is
77 signalled by notifying on done_cond and returning the result of
78 the action. If action raises any exception besides an instance of
79 exc, it is immediately propagated.
82 action: A callable that performs a non-blocking action.
83 exc: An exception type that is thrown by the action to indicate
85 wait_cond: A condition variable which should be waited on when
87 done_cond: A condition variable to signal if the action returns.
88 lock: The lock used by wait_cond and done_cond.
89 timeout: A non-negative float indicating the maximum time to wait.
90 block: Whether to block if the action cannot complete immediately.
93 The result of the action, if it is successful.
96 ValueError: If the timeout argument is negative.
98 if timeout
is not None and timeout
< 0.0:
99 raise ValueError('\'timeout\' must not be a negative number')
104 start_time
= self
.get_time()
113 if not isinstance(e
, exc
):
115 if timeout
is not None:
116 elapsed_time
= self
.get_time() - start_time
117 timeout
-= elapsed_time
120 wait_cond
.wait(timeout
)
127 def put(self
, item
, block
=True, timeout
=None):
128 """Put an item into the requeue.
131 item: An item to add to the requeue.
132 block: Whether to block if the requeue is full.
133 timeout: Maximum on how long to wait until the queue is non-full.
136 Queue.Full if the queue is full and the timeout expires.
139 self
.queue
.put(item
, block
=False)
140 self
._DoWithTimeout
(PutAction
,
148 def reput(self
, item
, block
=True, timeout
=None):
149 """Re-put an item back into the requeue.
151 Re-putting an item does not increase the number of outstanding
152 tasks, so the reput item should be uniquely associated with an
153 item that was previously removed from the requeue and for which
154 TaskDone has not been called.
157 item: An item to add to the requeue.
158 block: Whether to block if the requeue is full.
159 timeout: Maximum on how long to wait until the queue is non-full.
162 Queue.Full is the queue is full and the timeout expires.
165 self
.requeue
.put(item
, block
=False)
166 self
._DoWithTimeout
(ReputAction
,
174 def get(self
, block
=True, timeout
=None):
175 """Get an item from the requeue.
178 block: Whether to block if the requeue is empty.
179 timeout: Maximum on how long to wait until the requeue is non-empty.
182 An item from the requeue.
185 Queue.Empty if the queue is empty and the timeout expires.
190 result
= self
.requeue
.get(block
=False)
193 self
.requeue
.task_done()
196 result
= self
.queue
.get(block
=False)
199 return self
._DoWithTimeout
(GetAction
,
208 """Blocks until all of the items in the requeue have been processed."""
212 """Indicate that a previously enqueued item has been fully processed."""
213 self
.queue
.task_done()
216 """Returns true if the requeue is empty."""
217 return self
.queue
.empty() and self
.requeue
.empty()
219 def get_nowait(self
):
220 """Try to get an item from the queue without blocking."""
221 return self
.get(block
=False)
224 return self
.queue
.qsize() + self
.requeue
.qsize()