App Engine Python SDK version 1.8.8
[gae.git] / python / google / appengine / tools / adaptive_thread_pool.py
blobfac588520fd4934e07a6249857c59f1b43afc4fe
1 #!/usr/bin/env python
3 # Copyright 2007 Google Inc.
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
21 """Provides thread-pool-like functionality for workers accessing App Engine.
23 The pool adapts to slow or timing out requests by reducing the number of
24 active workers, or increasing the number when requests latency reduces.
25 """
33 import logging
34 import Queue
35 import sys
36 import threading
37 import time
38 import traceback
40 from google.appengine.tools.requeue import ReQueue
42 logger = logging.getLogger('google.appengine.tools.adaptive_thread_pool')
45 _THREAD_SHOULD_EXIT = '_THREAD_SHOULD_EXIT'
50 INITIAL_BACKOFF = 1.0
53 BACKOFF_FACTOR = 2.0
56 class Error(Exception):
57 """Base-class for exceptions in this module."""
60 class WorkItemError(Error):
61 """Error while processing a WorkItem."""
64 class RetryException(Error):
65 """A non-fatal exception that indicates that a work item should be retried."""
68 def InterruptibleSleep(sleep_time):
69 """Puts thread to sleep, checking this threads exit_flag four times a second.
71 Args:
72 sleep_time: Time to sleep.
73 """
74 slept = 0.0
75 epsilon = .0001
76 thread = threading.currentThread()
77 while slept < sleep_time - epsilon:
78 remaining = sleep_time - slept
79 this_sleep_time = min(remaining, 0.25)
80 time.sleep(this_sleep_time)
81 slept += this_sleep_time
82 if thread.exit_flag:
83 return
86 class WorkerThread(threading.Thread):
87 """A WorkerThread to execute WorkItems.
89 Attributes:
90 exit_flag: A boolean indicating whether this thread should stop
91 its work and exit.
92 """
94 def __init__(self, thread_pool, thread_gate, name=None):
95 """Initialize a WorkerThread instance.
97 Args:
98 thread_pool: An AdaptiveThreadPool instance.
99 thread_gate: A ThreadGate instance.
100 name: A name for this WorkerThread.
102 threading.Thread.__init__(self)
107 self.setDaemon(True)
109 self.exit_flag = False
110 self.__error = None
111 self.__traceback = None
112 self.__thread_pool = thread_pool
113 self.__work_queue = thread_pool.requeue
114 self.__thread_gate = thread_gate
115 if not name:
116 self.__name = 'Anonymous_' + self.__class__.__name__
117 else:
118 self.__name = name
120 def run(self):
121 """Perform the work of the thread."""
122 logger.debug('[%s] %s: started', self.getName(), self.__class__.__name__)
125 try:
126 self.WorkOnItems()
127 except:
128 self.SetError()
130 logger.debug('[%s] %s: exiting', self.getName(), self.__class__.__name__)
132 def SetError(self):
133 """Sets the error and traceback information for this thread.
135 This must be called from an exception handler.
137 if not self.__error:
138 exc_info = sys.exc_info()
139 self.__error = exc_info[1]
140 self.__traceback = exc_info[2]
141 logger.exception('[%s] %s:', self.getName(), self.__class__.__name__)
143 def WorkOnItems(self):
144 """Perform the work of a WorkerThread."""
145 while not self.exit_flag:
146 item = None
147 self.__thread_gate.StartWork()
149 try:
151 status, instruction = WorkItem.FAILURE, ThreadGate.DECREASE
154 try:
155 if self.exit_flag:
157 instruction = ThreadGate.HOLD
158 break
161 try:
162 item = self.__work_queue.get(block=True, timeout=1.0)
163 except Queue.Empty:
165 instruction = ThreadGate.HOLD
166 continue
167 if item == _THREAD_SHOULD_EXIT or self.exit_flag:
170 status, instruction = WorkItem.SUCCESS, ThreadGate.HOLD
171 break
173 logger.debug('[%s] Got work item %s', self.getName(), item)
175 status, instruction = item.PerformWork(self.__thread_pool)
176 except RetryException:
177 status, instruction = WorkItem.RETRY, ThreadGate.HOLD
178 except:
179 self.SetError()
180 raise
182 finally:
184 try:
186 if item:
187 if status == WorkItem.SUCCESS:
188 self.__work_queue.task_done()
189 elif status == WorkItem.RETRY:
192 try:
193 self.__work_queue.reput(item, block=False)
194 except Queue.Full:
195 logger.error('[%s] Failed to reput work item.', self.getName())
196 raise Error('Failed to reput work item')
197 else:
198 if not self.__error:
199 if item.error:
200 self.__error = item.error
201 self.__traceback = item.traceback
202 else:
204 self.__error = WorkItemError(
205 'Fatal error while processing %s' % item)
207 raise self.__error
209 finally:
210 self.__thread_gate.FinishWork(instruction=instruction)
212 def CheckError(self):
213 """If an error is present, then log it."""
214 if self.__error:
215 logger.error('Error in %s: %s', self.getName(), self.__error)
216 if self.__traceback:
217 logger.debug('%s', ''.join(traceback.format_exception(
218 self.__error.__class__,
219 self.__error,
220 self.__traceback)))
222 def __str__(self):
223 return self.__name
226 class AdaptiveThreadPool(object):
227 """A thread pool which processes WorkItems from a queue.
229 Attributes:
230 requeue: The requeue instance which holds work items for this
231 thread pool.
234 def __init__(self,
235 num_threads,
236 queue_size=None,
237 base_thread_name=None,
238 worker_thread_factory=WorkerThread,
239 queue_factory=Queue.Queue):
240 """Initialize an AdaptiveThreadPool.
242 An adaptive thread pool executes WorkItems using a number of
243 WorkerThreads. WorkItems represent items of work that may
244 succeed, soft fail, or hard fail. In addition, a completed work
245 item can signal this AdaptiveThreadPool to enable more or fewer
246 threads. Initially one thread is active. Soft failures are
247 reqeueud to be retried. Hard failures cause this
248 AdaptiveThreadPool to shut down entirely. See the WorkItem class
249 for more details.
251 Args:
252 num_threads: The number of threads to use.
253 queue_size: The size of the work item queue to use.
254 base_thread_name: A string from which worker thread names are derived.
255 worker_thread_factory: A factory which procudes WorkerThreads.
256 queue_factory: Used for dependency injection.
258 if queue_size is None:
259 queue_size = num_threads
260 self.requeue = ReQueue(queue_size, queue_factory=queue_factory)
261 self.__thread_gate = ThreadGate(num_threads)
262 self.__num_threads = num_threads
263 self.__threads = []
264 for i in xrange(num_threads):
265 thread = worker_thread_factory(self, self.__thread_gate)
266 if base_thread_name:
267 base = base_thread_name
268 else:
269 base = thread.__class__.__name__
270 thread.name = '%s-%d' % (base, i)
271 self.__threads.append(thread)
272 thread.start()
274 def num_threads(self):
275 """Return the number of threads in this thread pool."""
276 return self.__num_threads
278 def Threads(self):
279 """Yields the registered threads."""
280 for thread in self.__threads:
281 yield thread
283 def SubmitItem(self, item, block=True, timeout=0.0):
284 """Submit a WorkItem to the AdaptiveThreadPool.
286 Args:
287 item: A WorkItem instance.
288 block: Whether to block on submitting if the submit queue is full.
289 timeout: Time wait for room in the queue if block is True, 0.0 to
290 block indefinitely.
292 Raises:
293 Queue.Full if the submit queue is full.
295 self.requeue.put(item, block=block, timeout=timeout)
297 def QueuedItemCount(self):
298 """Returns the number of items currently in the queue."""
299 return self.requeue.qsize()
301 def Shutdown(self):
302 """Shutdown the thread pool.
304 Tasks may remain unexecuted in the submit queue.
307 while not self.requeue.empty():
308 try:
309 unused_item = self.requeue.get_nowait()
310 self.requeue.task_done()
311 except Queue.Empty:
313 pass
314 for thread in self.__threads:
315 thread.exit_flag = True
316 self.requeue.put(_THREAD_SHOULD_EXIT)
317 self.__thread_gate.EnableAllThreads()
319 def Wait(self):
320 """Wait until all work items have been completed."""
321 self.requeue.join()
323 def JoinThreads(self):
324 """Wait for all threads to exit."""
325 for thread in self.__threads:
326 logger.debug('Waiting for %s to exit' % str(thread))
327 thread.join()
329 def CheckErrors(self):
330 """Output logs for any errors that occurred in the worker threads."""
331 for thread in self.__threads:
332 thread.CheckError()
335 class ThreadGate(object):
336 """Manage the number of active worker threads.
338 The ThreadGate limits the number of threads that are simultaneously
339 active in order to implement adaptive rate control.
341 Initially the ThreadGate allows only one thread to be active. For
342 each successful work item, another thread is activated and for each
343 failed item, the number of active threads is reduced by one. When only
344 one thread is active, failures will cause exponential backoff.
346 For example, a ThreadGate instance, thread_gate can be used in a number
347 of threads as so:
349 # Block until this thread is enabled for work.
350 thread_gate.StartWork()
351 try:
352 status = DoSomeWorkInvolvingLimitedSharedResources()
353 succeeded = IsStatusGood(status)
354 badly_failed = IsStatusVeryBad(status)
355 finally:
356 if succeeded:
357 # Succeeded, add more simultaneously enabled threads to the task.
358 thread_gate.FinishWork(instruction=ThreadGate.INCREASE)
359 elif badly_failed:
360 # Failed, or succeeded but with high resource load, reduce number of
361 # workers.
362 thread_gate.FinishWork(instruction=ThreadGate.DECREASE)
363 else:
364 # We succeeded, but don't want to add more workers to the task.
365 thread_gate.FinishWork(instruction=ThreadGate.HOLD)
367 the thread_gate will enable and disable/backoff threads in response to
368 resource load conditions.
370 StartWork can block indefinitely. FinishWork, while not
371 lock-free, should never block absent a demonic scheduler.
375 INCREASE = 'increase'
376 HOLD = 'hold'
377 DECREASE = 'decrease'
379 def __init__(self,
380 num_threads,
381 sleep=InterruptibleSleep):
382 """Constructor for ThreadGate instances.
384 Args:
385 num_threads: The total number of threads using this gate.
386 sleep: Used for dependency injection.
388 self.__enabled_count = 1
390 self.__lock = threading.Lock()
392 self.__thread_semaphore = threading.Semaphore(self.__enabled_count)
393 self.__num_threads = num_threads
394 self.__backoff_time = 0
395 self.__sleep = sleep
397 def num_threads(self):
398 return self.__num_threads
400 def EnableThread(self):
401 """Enable one more worker thread."""
402 self.__lock.acquire()
403 try:
404 self.__enabled_count += 1
405 finally:
406 self.__lock.release()
407 self.__thread_semaphore.release()
409 def EnableAllThreads(self):
410 """Enable all worker threads."""
411 for unused_idx in xrange(self.__num_threads - self.__enabled_count):
412 self.EnableThread()
414 def StartWork(self):
415 """Starts a critical section in which the number of workers is limited.
417 Starts a critical section which allows self.__enabled_count
418 simultaneously operating threads. The critical section is ended by
419 calling self.FinishWork().
422 self.__thread_semaphore.acquire()
424 if self.__backoff_time > 0.0:
425 if not threading.currentThread().exit_flag:
426 logger.info('[%s] Backing off due to errors: %.1f seconds',
427 threading.currentThread().getName(),
428 self.__backoff_time)
429 self.__sleep(self.__backoff_time)
431 def FinishWork(self, instruction=None):
432 """Ends a critical section started with self.StartWork()."""
433 if not instruction or instruction == ThreadGate.HOLD:
435 self.__thread_semaphore.release()
437 elif instruction == ThreadGate.INCREASE:
439 if self.__backoff_time > 0.0:
440 logger.info('Resetting backoff to 0.0')
441 self.__backoff_time = 0.0
442 do_enable = False
443 self.__lock.acquire()
444 try:
446 if self.__num_threads > self.__enabled_count:
447 do_enable = True
448 self.__enabled_count += 1
449 finally:
450 self.__lock.release()
452 if do_enable:
453 logger.debug('Increasing active thread count to %d',
454 self.__enabled_count)
455 self.__thread_semaphore.release()
457 self.__thread_semaphore.release()
459 elif instruction == ThreadGate.DECREASE:
460 do_disable = False
461 self.__lock.acquire()
462 try:
464 if self.__enabled_count > 1:
465 do_disable = True
466 self.__enabled_count -= 1
467 else:
468 if self.__backoff_time == 0.0:
469 self.__backoff_time = INITIAL_BACKOFF
470 else:
471 self.__backoff_time *= BACKOFF_FACTOR
472 finally:
473 self.__lock.release()
475 if do_disable:
476 logger.debug('Decreasing the number of active threads to %d',
477 self.__enabled_count)
479 else:
480 self.__thread_semaphore.release()
483 class WorkItem(object):
484 """Holds a unit of work."""
487 SUCCESS = 'success'
488 RETRY = 'retry'
489 FAILURE = 'failure'
491 def __init__(self, name):
492 self.__name = name
494 def PerformWork(self, thread_pool):
495 """Perform the work of this work item and report the results.
497 Args:
498 thread_pool: The AdaptiveThreadPool instance associated with this
499 thread.
501 Returns:
502 A tuple (status, instruction) of the work status and an instruction
503 for the ThreadGate.
505 raise NotImplementedError
507 def __str__(self):
508 return self.__name