3 # Copyright 2007 Google Inc.
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
17 """A thread-safe queue in which removed objects put back to the front."""
26 logger
= logging
.getLogger('google.appengine.tools.requeue')
29 class ReQueue(object):
30 """A special thread-safe queue.
32 A ReQueue allows unfinished work items to be returned with a call to
33 reput(). When an item is reput, task_done() should *not* be called
34 in addition, getting an item that has been reput does not increase
35 the number of outstanding tasks.
37 This class shares an interface with Queue.Queue and provides the
38 additional reput method.
43 requeue_capacity
=None,
44 queue_factory
=Queue
.Queue
,
46 """Initialize a ReQueue instance.
49 queue_capacity: The number of items that can be put in the ReQueue.
50 requeue_capacity: The numer of items that can be reput in the ReQueue.
51 queue_factory: Used for dependency injection.
52 get_time: Used for dependency injection.
54 if requeue_capacity
is None:
55 requeue_capacity
= queue_capacity
57 self
.get_time
= get_time
58 self
.queue
= queue_factory(queue_capacity
)
59 self
.requeue
= queue_factory(requeue_capacity
)
60 self
.lock
= threading
.Lock()
61 self
.put_cond
= threading
.Condition(self
.lock
)
62 self
.get_cond
= threading
.Condition(self
.lock
)
64 def _DoWithTimeout(self
,
72 """Performs the given action with a timeout.
74 The action must be non-blocking, and raise an instance of exc on a
75 recoverable failure. If the action fails with an instance of exc,
76 we wait on wait_cond before trying again. Failure after the
77 timeout is reached is propagated as an exception. Success is
78 signalled by notifying on done_cond and returning the result of
79 the action. If action raises any exception besides an instance of
80 exc, it is immediately propagated.
83 action: A callable that performs a non-blocking action.
84 exc: An exception type that is thrown by the action to indicate
86 wait_cond: A condition variable which should be waited on when
88 done_cond: A condition variable to signal if the action returns.
89 lock: The lock used by wait_cond and done_cond.
90 timeout: A non-negative float indicating the maximum time to wait.
91 block: Whether to block if the action cannot complete immediately.
94 The result of the action, if it is successful.
97 ValueError: If the timeout argument is negative.
99 if timeout
is not None and timeout
< 0.0:
100 raise ValueError('\'timeout\' must not be a negative number')
105 start_time
= self
.get_time()
114 if not isinstance(e
, exc
):
116 if timeout
is not None:
117 elapsed_time
= self
.get_time() - start_time
118 timeout
-= elapsed_time
121 wait_cond
.wait(timeout
)
128 def put(self
, item
, block
=True, timeout
=None):
129 """Put an item into the requeue.
132 item: An item to add to the requeue.
133 block: Whether to block if the requeue is full.
134 timeout: Maximum on how long to wait until the queue is non-full.
137 Queue.Full if the queue is full and the timeout expires.
140 self
.queue
.put(item
, block
=False)
141 self
._DoWithTimeout
(PutAction
,
149 def reput(self
, item
, block
=True, timeout
=None):
150 """Re-put an item back into the requeue.
152 Re-putting an item does not increase the number of outstanding
153 tasks, so the reput item should be uniquely associated with an
154 item that was previously removed from the requeue and for which
155 TaskDone has not been called.
158 item: An item to add to the requeue.
159 block: Whether to block if the requeue is full.
160 timeout: Maximum on how long to wait until the queue is non-full.
163 Queue.Full is the queue is full and the timeout expires.
166 self
.requeue
.put(item
, block
=False)
167 self
._DoWithTimeout
(ReputAction
,
175 def get(self
, block
=True, timeout
=None):
176 """Get an item from the requeue.
179 block: Whether to block if the requeue is empty.
180 timeout: Maximum on how long to wait until the requeue is non-empty.
183 An item from the requeue.
186 Queue.Empty if the queue is empty and the timeout expires.
191 result
= self
.requeue
.get(block
=False)
194 self
.requeue
.task_done()
197 result
= self
.queue
.get(block
=False)
200 return self
._DoWithTimeout
(GetAction
,
209 """Blocks until all of the items in the requeue have been processed."""
213 """Indicate that a previously enqueued item has been fully processed."""
214 self
.queue
.task_done()
217 """Returns true if the requeue is empty."""
218 return self
.queue
.empty() and self
.requeue
.empty()
220 def get_nowait(self
):
221 """Try to get an item from the queue without blocking."""
222 return self
.get(block
=False)
225 return self
.queue
.qsize() + self
.requeue
.qsize()