1.9.30 sync.
[gae.git] / python / google / appengine / tools / requeue.py
blob02e90f5900c8ca479e1891bac17127fe095483bc
1 #!/usr/bin/env python
3 # Copyright 2007 Google Inc.
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
17 """A thread-safe queue in which removed objects put back to the front."""
21 import logging
22 import Queue
23 import threading
24 import time
26 logger = logging.getLogger('google.appengine.tools.requeue')
29 class ReQueue(object):
30 """A special thread-safe queue.
32 A ReQueue allows unfinished work items to be returned with a call to
33 reput(). When an item is reput, task_done() should *not* be called
34 in addition, getting an item that has been reput does not increase
35 the number of outstanding tasks.
37 This class shares an interface with Queue.Queue and provides the
38 additional reput method.
39 """
41 def __init__(self,
42 queue_capacity,
43 requeue_capacity=None,
44 queue_factory=Queue.Queue,
45 get_time=time.time):
46 """Initialize a ReQueue instance.
48 Args:
49 queue_capacity: The number of items that can be put in the ReQueue.
50 requeue_capacity: The numer of items that can be reput in the ReQueue.
51 queue_factory: Used for dependency injection.
52 get_time: Used for dependency injection.
53 """
54 if requeue_capacity is None:
55 requeue_capacity = queue_capacity
57 self.get_time = get_time
58 self.queue = queue_factory(queue_capacity)
59 self.requeue = queue_factory(requeue_capacity)
60 self.lock = threading.Lock()
61 self.put_cond = threading.Condition(self.lock)
62 self.get_cond = threading.Condition(self.lock)
64 def _DoWithTimeout(self,
65 action,
66 exc,
67 wait_cond,
68 done_cond,
69 lock,
70 timeout=None,
71 block=True):
72 """Performs the given action with a timeout.
74 The action must be non-blocking, and raise an instance of exc on a
75 recoverable failure. If the action fails with an instance of exc,
76 we wait on wait_cond before trying again. Failure after the
77 timeout is reached is propagated as an exception. Success is
78 signalled by notifying on done_cond and returning the result of
79 the action. If action raises any exception besides an instance of
80 exc, it is immediately propagated.
82 Args:
83 action: A callable that performs a non-blocking action.
84 exc: An exception type that is thrown by the action to indicate
85 a recoverable error.
86 wait_cond: A condition variable which should be waited on when
87 action throws exc.
88 done_cond: A condition variable to signal if the action returns.
89 lock: The lock used by wait_cond and done_cond.
90 timeout: A non-negative float indicating the maximum time to wait.
91 block: Whether to block if the action cannot complete immediately.
93 Returns:
94 The result of the action, if it is successful.
96 Raises:
97 ValueError: If the timeout argument is negative.
98 """
99 if timeout is not None and timeout < 0.0:
100 raise ValueError('\'timeout\' must not be a negative number')
101 if not block:
102 timeout = 0.0
103 result = None
104 success = False
105 start_time = self.get_time()
106 lock.acquire()
107 try:
108 while not success:
109 try:
110 result = action()
111 success = True
112 except Exception, e:
114 if not isinstance(e, exc):
115 raise e
116 if timeout is not None:
117 elapsed_time = self.get_time() - start_time
118 timeout -= elapsed_time
119 if timeout <= 0.0:
120 raise e
121 wait_cond.wait(timeout)
122 finally:
123 if success:
124 done_cond.notify()
125 lock.release()
126 return result
128 def put(self, item, block=True, timeout=None):
129 """Put an item into the requeue.
131 Args:
132 item: An item to add to the requeue.
133 block: Whether to block if the requeue is full.
134 timeout: Maximum on how long to wait until the queue is non-full.
136 Raises:
137 Queue.Full if the queue is full and the timeout expires.
139 def PutAction():
140 self.queue.put(item, block=False)
141 self._DoWithTimeout(PutAction,
142 Queue.Full,
143 self.get_cond,
144 self.put_cond,
145 self.lock,
146 timeout=timeout,
147 block=block)
149 def reput(self, item, block=True, timeout=None):
150 """Re-put an item back into the requeue.
152 Re-putting an item does not increase the number of outstanding
153 tasks, so the reput item should be uniquely associated with an
154 item that was previously removed from the requeue and for which
155 TaskDone has not been called.
157 Args:
158 item: An item to add to the requeue.
159 block: Whether to block if the requeue is full.
160 timeout: Maximum on how long to wait until the queue is non-full.
162 Raises:
163 Queue.Full is the queue is full and the timeout expires.
165 def ReputAction():
166 self.requeue.put(item, block=False)
167 self._DoWithTimeout(ReputAction,
168 Queue.Full,
169 self.get_cond,
170 self.put_cond,
171 self.lock,
172 timeout=timeout,
173 block=block)
175 def get(self, block=True, timeout=None):
176 """Get an item from the requeue.
178 Args:
179 block: Whether to block if the requeue is empty.
180 timeout: Maximum on how long to wait until the requeue is non-empty.
182 Returns:
183 An item from the requeue.
185 Raises:
186 Queue.Empty if the queue is empty and the timeout expires.
188 def GetAction():
190 try:
191 result = self.requeue.get(block=False)
194 self.requeue.task_done()
195 except Queue.Empty:
197 result = self.queue.get(block=False)
199 return result
200 return self._DoWithTimeout(GetAction,
201 Queue.Empty,
202 self.put_cond,
203 self.get_cond,
204 self.lock,
205 timeout=timeout,
206 block=block)
208 def join(self):
209 """Blocks until all of the items in the requeue have been processed."""
210 self.queue.join()
212 def task_done(self):
213 """Indicate that a previously enqueued item has been fully processed."""
214 self.queue.task_done()
216 def empty(self):
217 """Returns true if the requeue is empty."""
218 return self.queue.empty() and self.requeue.empty()
220 def get_nowait(self):
221 """Try to get an item from the queue without blocking."""
222 return self.get(block=False)
224 def qsize(self):
225 return self.queue.qsize() + self.requeue.qsize()