App Engine Python SDK version 1.9.9
[gae.git] / python / google / appengine / tools / requeue.py
blobdb571fc215f433436c896c730058ce8ea3bdfb58
1 #!/usr/bin/env python
3 # Copyright 2007 Google Inc.
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
17 """A thread-safe queue in which removed objects put back to the front."""
20 import logging
21 import Queue
22 import threading
23 import time
25 logger = logging.getLogger('google.appengine.tools.requeue')
28 class ReQueue(object):
29 """A special thread-safe queue.
31 A ReQueue allows unfinished work items to be returned with a call to
32 reput(). When an item is reput, task_done() should *not* be called
33 in addition, getting an item that has been reput does not increase
34 the number of outstanding tasks.
36 This class shares an interface with Queue.Queue and provides the
37 additional reput method.
38 """
40 def __init__(self,
41 queue_capacity,
42 requeue_capacity=None,
43 queue_factory=Queue.Queue,
44 get_time=time.time):
45 """Initialize a ReQueue instance.
47 Args:
48 queue_capacity: The number of items that can be put in the ReQueue.
49 requeue_capacity: The numer of items that can be reput in the ReQueue.
50 queue_factory: Used for dependency injection.
51 get_time: Used for dependency injection.
52 """
53 if requeue_capacity is None:
54 requeue_capacity = queue_capacity
56 self.get_time = get_time
57 self.queue = queue_factory(queue_capacity)
58 self.requeue = queue_factory(requeue_capacity)
59 self.lock = threading.Lock()
60 self.put_cond = threading.Condition(self.lock)
61 self.get_cond = threading.Condition(self.lock)
63 def _DoWithTimeout(self,
64 action,
65 exc,
66 wait_cond,
67 done_cond,
68 lock,
69 timeout=None,
70 block=True):
71 """Performs the given action with a timeout.
73 The action must be non-blocking, and raise an instance of exc on a
74 recoverable failure. If the action fails with an instance of exc,
75 we wait on wait_cond before trying again. Failure after the
76 timeout is reached is propagated as an exception. Success is
77 signalled by notifying on done_cond and returning the result of
78 the action. If action raises any exception besides an instance of
79 exc, it is immediately propagated.
81 Args:
82 action: A callable that performs a non-blocking action.
83 exc: An exception type that is thrown by the action to indicate
84 a recoverable error.
85 wait_cond: A condition variable which should be waited on when
86 action throws exc.
87 done_cond: A condition variable to signal if the action returns.
88 lock: The lock used by wait_cond and done_cond.
89 timeout: A non-negative float indicating the maximum time to wait.
90 block: Whether to block if the action cannot complete immediately.
92 Returns:
93 The result of the action, if it is successful.
95 Raises:
96 ValueError: If the timeout argument is negative.
97 """
98 if timeout is not None and timeout < 0.0:
99 raise ValueError('\'timeout\' must not be a negative number')
100 if not block:
101 timeout = 0.0
102 result = None
103 success = False
104 start_time = self.get_time()
105 lock.acquire()
106 try:
107 while not success:
108 try:
109 result = action()
110 success = True
111 except Exception, e:
113 if not isinstance(e, exc):
114 raise e
115 if timeout is not None:
116 elapsed_time = self.get_time() - start_time
117 timeout -= elapsed_time
118 if timeout <= 0.0:
119 raise e
120 wait_cond.wait(timeout)
121 finally:
122 if success:
123 done_cond.notify()
124 lock.release()
125 return result
127 def put(self, item, block=True, timeout=None):
128 """Put an item into the requeue.
130 Args:
131 item: An item to add to the requeue.
132 block: Whether to block if the requeue is full.
133 timeout: Maximum on how long to wait until the queue is non-full.
135 Raises:
136 Queue.Full if the queue is full and the timeout expires.
138 def PutAction():
139 self.queue.put(item, block=False)
140 self._DoWithTimeout(PutAction,
141 Queue.Full,
142 self.get_cond,
143 self.put_cond,
144 self.lock,
145 timeout=timeout,
146 block=block)
148 def reput(self, item, block=True, timeout=None):
149 """Re-put an item back into the requeue.
151 Re-putting an item does not increase the number of outstanding
152 tasks, so the reput item should be uniquely associated with an
153 item that was previously removed from the requeue and for which
154 TaskDone has not been called.
156 Args:
157 item: An item to add to the requeue.
158 block: Whether to block if the requeue is full.
159 timeout: Maximum on how long to wait until the queue is non-full.
161 Raises:
162 Queue.Full is the queue is full and the timeout expires.
164 def ReputAction():
165 self.requeue.put(item, block=False)
166 self._DoWithTimeout(ReputAction,
167 Queue.Full,
168 self.get_cond,
169 self.put_cond,
170 self.lock,
171 timeout=timeout,
172 block=block)
174 def get(self, block=True, timeout=None):
175 """Get an item from the requeue.
177 Args:
178 block: Whether to block if the requeue is empty.
179 timeout: Maximum on how long to wait until the requeue is non-empty.
181 Returns:
182 An item from the requeue.
184 Raises:
185 Queue.Empty if the queue is empty and the timeout expires.
187 def GetAction():
189 try:
190 result = self.requeue.get(block=False)
193 self.requeue.task_done()
194 except Queue.Empty:
196 result = self.queue.get(block=False)
198 return result
199 return self._DoWithTimeout(GetAction,
200 Queue.Empty,
201 self.put_cond,
202 self.get_cond,
203 self.lock,
204 timeout=timeout,
205 block=block)
207 def join(self):
208 """Blocks until all of the items in the requeue have been processed."""
209 self.queue.join()
211 def task_done(self):
212 """Indicate that a previously enqueued item has been fully processed."""
213 self.queue.task_done()
215 def empty(self):
216 """Returns true if the requeue is empty."""
217 return self.queue.empty() and self.requeue.empty()
219 def get_nowait(self):
220 """Try to get an item from the queue without blocking."""
221 return self.get(block=False)
223 def qsize(self):
224 return self.queue.qsize() + self.requeue.qsize()