frontend.shared.rest_client: Fix reference to an undefined variable
[autotest-zwu.git] / cli / threads.py
blobeabefcaeb8f326e73dd947a1f6195348c6429520
2 # Copyright 2008 Google Inc.
3 # Released under the GPLv2
5 import threading, Queue
7 class ThreadPool:
8 """ A generic threading class for use in the CLI
9 ThreadPool class takes the function to be executed as an argument and
10 optionally number of threads. It then creates multiple threads for
11 faster execution. """
13 def __init__(self, function, numthreads=40):
14 assert(numthreads > 0)
15 self.threads = Queue.Queue(0)
16 self.function = function
17 self.numthreads = 0
18 self.queue = Queue.Queue(0)
19 self._start_threads(numthreads)
22 def wait(self):
23 """ Checks to see if any threads are still working and
24 blocks until worker threads all complete. """
25 for x in xrange(self.numthreads):
26 self.queue.put('die')
27 # As only spawned threads are allowed to add new ones,
28 # we can safely wait for the thread queue to be empty
29 # (if we're at the last thread and it creates a new one,
30 # it will get queued before it finishes).
31 dead = 0
32 while True:
33 try:
34 thread = self.threads.get(block=True, timeout=1)
35 if thread.isAlive():
36 thread.join()
37 dead += 1
38 except Queue.Empty:
39 assert(dead == self.numthreads)
40 return
43 def queue_work(self, data):
44 """ Takes a list of items and appends them to the
45 work queue. """
46 [self.queue.put(item) for item in data]
49 def add_one_thread_post_wait(self):
50 # Only a spawned thread (not the main one)
51 # should call this (see wait() for details)
52 self._start_threads(1)
53 self.queue.put('die')
56 def _start_threads(self, nthreads):
57 """ Start up threads to spawn workers. """
58 self.numthreads += nthreads
59 for i in xrange(nthreads):
60 thread = threading.Thread(target=self._new_worker)
61 thread.setDaemon(True)
62 self.threads.put(thread)
63 thread.start()
66 def _new_worker(self):
67 """ Spawned worker threads. These threads loop until queue is empty."""
68 while True:
69 # Blocking call
70 data = self.queue.get()
71 if data == 'die':
72 return
73 try:
74 self.function(data)
75 except Exception, full_error:
76 # Put a catch all here.
77 print ('Unexpected failure in the thread calling %s: %s' %
78 (self.function.__name__, full_error))