3 # Copyright 2007 Google Inc.
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
21 """Provides thread-pool-like functionality for workers accessing App Engine.
23 The pool adapts to slow or timing out requests by reducing the number of
24 active workers, or increasing the number when requests latency reduces.
40 from google
.appengine
.tools
.requeue
import ReQueue
42 logger
= logging
.getLogger('google.appengine.tools.adaptive_thread_pool')
45 _THREAD_SHOULD_EXIT
= '_THREAD_SHOULD_EXIT'
56 class Error(Exception):
57 """Base-class for exceptions in this module."""
60 class WorkItemError(Error
):
61 """Error while processing a WorkItem."""
64 class RetryException(Error
):
65 """A non-fatal exception that indicates that a work item should be retried."""
68 def InterruptibleSleep(sleep_time
):
69 """Puts thread to sleep, checking this threads exit_flag four times a second.
72 sleep_time: Time to sleep.
76 thread
= threading
.currentThread()
77 while slept
< sleep_time
- epsilon
:
78 remaining
= sleep_time
- slept
79 this_sleep_time
= min(remaining
, 0.25)
80 time
.sleep(this_sleep_time
)
81 slept
+= this_sleep_time
86 class WorkerThread(threading
.Thread
):
87 """A WorkerThread to execute WorkItems.
90 exit_flag: A boolean indicating whether this thread should stop
94 def __init__(self
, thread_pool
, thread_gate
, name
=None):
95 """Initialize a WorkerThread instance.
98 thread_pool: An AdaptiveThreadPool instance.
99 thread_gate: A ThreadGate instance.
100 name: A name for this WorkerThread.
102 threading
.Thread
.__init
__(self
)
109 self
.exit_flag
= False
111 self
.__traceback
= None
112 self
.__thread
_pool
= thread_pool
113 self
.__work
_queue
= thread_pool
.requeue
114 self
.__thread
_gate
= thread_gate
116 self
.__name
= 'Anonymous_' + self
.__class
__.__name
__
121 """Perform the work of the thread."""
122 logger
.debug('[%s] %s: started', self
.getName(), self
.__class
__.__name
__)
130 logger
.debug('[%s] %s: exiting', self
.getName(), self
.__class
__.__name
__)
133 """Sets the error and traceback information for this thread.
135 This must be called from an exception handler.
138 exc_info
= sys
.exc_info()
139 self
.__error
= exc_info
[1]
140 self
.__traceback
= exc_info
[2]
141 logger
.exception('[%s] %s:', self
.getName(), self
.__class
__.__name
__)
143 def WorkOnItems(self
):
144 """Perform the work of a WorkerThread."""
145 while not self
.exit_flag
:
147 self
.__thread
_gate
.StartWork()
151 status
, instruction
= WorkItem
.FAILURE
, ThreadGate
.DECREASE
157 instruction
= ThreadGate
.HOLD
162 item
= self
.__work
_queue
.get(block
=True, timeout
=1.0)
165 instruction
= ThreadGate
.HOLD
167 if item
== _THREAD_SHOULD_EXIT
or self
.exit_flag
:
170 status
, instruction
= WorkItem
.SUCCESS
, ThreadGate
.HOLD
173 logger
.debug('[%s] Got work item %s', self
.getName(), item
)
175 status
, instruction
= item
.PerformWork(self
.__thread
_pool
)
176 except RetryException
:
177 status
, instruction
= WorkItem
.RETRY
, ThreadGate
.HOLD
187 if status
== WorkItem
.SUCCESS
:
188 self
.__work
_queue
.task_done()
189 elif status
== WorkItem
.RETRY
:
193 self
.__work
_queue
.reput(item
, block
=False)
195 logger
.error('[%s] Failed to reput work item.', self
.getName())
196 raise Error('Failed to reput work item')
200 self
.__error
= item
.error
201 self
.__traceback
= item
.traceback
204 self
.__error
= WorkItemError(
205 'Fatal error while processing %s' % item
)
210 self
.__thread
_gate
.FinishWork(instruction
=instruction
)
212 def CheckError(self
):
213 """If an error is present, then log it."""
215 logger
.error('Error in %s: %s', self
.getName(), self
.__error
)
217 logger
.debug('%s', ''.join(traceback
.format_exception(
218 self
.__error
.__class
__,
226 class AdaptiveThreadPool(object):
227 """A thread pool which processes WorkItems from a queue.
230 requeue: The requeue instance which holds work items for this
237 base_thread_name
=None,
238 worker_thread_factory
=WorkerThread
,
239 queue_factory
=Queue
.Queue
):
240 """Initialize an AdaptiveThreadPool.
242 An adaptive thread pool executes WorkItems using a number of
243 WorkerThreads. WorkItems represent items of work that may
244 succeed, soft fail, or hard fail. In addition, a completed work
245 item can signal this AdaptiveThreadPool to enable more or fewer
246 threads. Initially one thread is active. Soft failures are
247 reqeueud to be retried. Hard failures cause this
248 AdaptiveThreadPool to shut down entirely. See the WorkItem class
252 num_threads: The number of threads to use.
253 queue_size: The size of the work item queue to use.
254 base_thread_name: A string from which worker thread names are derived.
255 worker_thread_factory: A factory which procudes WorkerThreads.
256 queue_factory: Used for dependency injection.
258 if queue_size
is None:
259 queue_size
= num_threads
260 self
.requeue
= ReQueue(queue_size
, queue_factory
=queue_factory
)
261 self
.__thread
_gate
= ThreadGate(num_threads
)
262 self
.__num
_threads
= num_threads
264 for i
in xrange(num_threads
):
265 thread
= worker_thread_factory(self
, self
.__thread
_gate
)
267 base
= base_thread_name
269 base
= thread
.__class
__.__name
__
270 thread
.name
= '%s-%d' % (base
, i
)
271 self
.__threads
.append(thread
)
274 def num_threads(self
):
275 """Return the number of threads in this thread pool."""
276 return self
.__num
_threads
279 """Yields the registered threads."""
280 for thread
in self
.__threads
:
283 def SubmitItem(self
, item
, block
=True, timeout
=0.0):
284 """Submit a WorkItem to the AdaptiveThreadPool.
287 item: A WorkItem instance.
288 block: Whether to block on submitting if the submit queue is full.
289 timeout: Time wait for room in the queue if block is True, 0.0 to
293 Queue.Full if the submit queue is full.
295 self
.requeue
.put(item
, block
=block
, timeout
=timeout
)
297 def QueuedItemCount(self
):
298 """Returns the number of items currently in the queue."""
299 return self
.requeue
.qsize()
302 """Shutdown the thread pool.
304 Tasks may remain unexecuted in the submit queue.
307 while not self
.requeue
.empty():
309 unused_item
= self
.requeue
.get_nowait()
310 self
.requeue
.task_done()
314 for thread
in self
.__threads
:
315 thread
.exit_flag
= True
316 self
.requeue
.put(_THREAD_SHOULD_EXIT
)
317 self
.__thread
_gate
.EnableAllThreads()
320 """Wait until all work items have been completed."""
323 def JoinThreads(self
):
324 """Wait for all threads to exit."""
325 for thread
in self
.__threads
:
326 logger
.debug('Waiting for %s to exit' % str(thread
))
329 def CheckErrors(self
):
330 """Output logs for any errors that occurred in the worker threads."""
331 for thread
in self
.__threads
:
335 class ThreadGate(object):
336 """Manage the number of active worker threads.
338 The ThreadGate limits the number of threads that are simultaneously
339 active in order to implement adaptive rate control.
341 Initially the ThreadGate allows only one thread to be active. For
342 each successful work item, another thread is activated and for each
343 failed item, the number of active threads is reduced by one. When only
344 one thread is active, failures will cause exponential backoff.
346 For example, a ThreadGate instance, thread_gate can be used in a number
349 # Block until this thread is enabled for work.
350 thread_gate.StartWork()
352 status = DoSomeWorkInvolvingLimitedSharedResources()
353 succeeded = IsStatusGood(status)
354 badly_failed = IsStatusVeryBad(status)
357 # Succeeded, add more simultaneously enabled threads to the task.
358 thread_gate.FinishWork(instruction=ThreadGate.INCREASE)
360 # Failed, or succeeded but with high resource load, reduce number of
362 thread_gate.FinishWork(instruction=ThreadGate.DECREASE)
364 # We succeeded, but don't want to add more workers to the task.
365 thread_gate.FinishWork(instruction=ThreadGate.HOLD)
367 the thread_gate will enable and disable/backoff threads in response to
368 resource load conditions.
370 StartWork can block indefinitely. FinishWork, while not
371 lock-free, should never block absent a demonic scheduler.
375 INCREASE
= 'increase'
377 DECREASE
= 'decrease'
381 sleep
=InterruptibleSleep
):
382 """Constructor for ThreadGate instances.
385 num_threads: The total number of threads using this gate.
386 sleep: Used for dependency injection.
388 self
.__enabled
_count
= 1
390 self
.__lock
= threading
.Lock()
392 self
.__thread
_semaphore
= threading
.Semaphore(self
.__enabled
_count
)
393 self
.__num
_threads
= num_threads
394 self
.__backoff
_time
= 0
397 def num_threads(self
):
398 return self
.__num
_threads
400 def EnableThread(self
):
401 """Enable one more worker thread."""
402 self
.__lock
.acquire()
404 self
.__enabled
_count
+= 1
406 self
.__lock
.release()
407 self
.__thread
_semaphore
.release()
409 def EnableAllThreads(self
):
410 """Enable all worker threads."""
411 for unused_idx
in xrange(self
.__num
_threads
- self
.__enabled
_count
):
415 """Starts a critical section in which the number of workers is limited.
417 Starts a critical section which allows self.__enabled_count
418 simultaneously operating threads. The critical section is ended by
419 calling self.FinishWork().
422 self
.__thread
_semaphore
.acquire()
424 if self
.__backoff
_time
> 0.0:
425 if not threading
.currentThread().exit_flag
:
426 logger
.info('[%s] Backing off due to errors: %.1f seconds',
427 threading
.currentThread().getName(),
429 self
.__sleep
(self
.__backoff
_time
)
431 def FinishWork(self
, instruction
=None):
432 """Ends a critical section started with self.StartWork()."""
433 if not instruction
or instruction
== ThreadGate
.HOLD
:
435 self
.__thread
_semaphore
.release()
437 elif instruction
== ThreadGate
.INCREASE
:
439 if self
.__backoff
_time
> 0.0:
440 logger
.info('Resetting backoff to 0.0')
441 self
.__backoff
_time
= 0.0
443 self
.__lock
.acquire()
446 if self
.__num
_threads
> self
.__enabled
_count
:
448 self
.__enabled
_count
+= 1
450 self
.__lock
.release()
453 logger
.debug('Increasing active thread count to %d',
454 self
.__enabled
_count
)
455 self
.__thread
_semaphore
.release()
457 self
.__thread
_semaphore
.release()
459 elif instruction
== ThreadGate
.DECREASE
:
461 self
.__lock
.acquire()
464 if self
.__enabled
_count
> 1:
466 self
.__enabled
_count
-= 1
468 if self
.__backoff
_time
== 0.0:
469 self
.__backoff
_time
= INITIAL_BACKOFF
471 self
.__backoff
_time
*= BACKOFF_FACTOR
473 self
.__lock
.release()
476 logger
.debug('Decreasing the number of active threads to %d',
477 self
.__enabled
_count
)
480 self
.__thread
_semaphore
.release()
483 class WorkItem(object):
484 """Holds a unit of work."""
491 def __init__(self
, name
):
494 def PerformWork(self
, thread_pool
):
495 """Perform the work of this work item and report the results.
498 thread_pool: The AdaptiveThreadPool instance associated with this
502 A tuple (status, instruction) of the work status and an instruction
505 raise NotImplementedError