2 # Copyright 2008 Google Inc.
3 # Released under the GPLv2
5 import threading
, Queue
8 """ A generic threading class for use in the CLI
9 ThreadPool class takes the function to be executed as an argument and
10 optionally number of threads. It then creates multiple threads for
13 def __init__(self
, function
, numthreads
=40):
14 assert(numthreads
> 0)
15 self
.threads
= Queue
.Queue(0)
16 self
.function
= function
18 self
.queue
= Queue
.Queue(0)
19 self
._start
_threads
(numthreads
)
23 """ Checks to see if any threads are still working and
24 blocks until worker threads all complete. """
25 for x
in xrange(self
.numthreads
):
27 # As only spawned threads are allowed to add new ones,
28 # we can safely wait for the thread queue to be empty
29 # (if we're at the last thread and it creates a new one,
30 # it will get queued before it finishes).
34 thread
= self
.threads
.get(block
=True, timeout
=1)
39 assert(dead
== self
.numthreads
)
43 def queue_work(self
, data
):
44 """ Takes a list of items and appends them to the
46 [self
.queue
.put(item
) for item
in data
]
49 def add_one_thread_post_wait(self
):
50 # Only a spawned thread (not the main one)
51 # should call this (see wait() for details)
52 self
._start
_threads
(1)
56 def _start_threads(self
, nthreads
):
57 """ Start up threads to spawn workers. """
58 self
.numthreads
+= nthreads
59 for i
in xrange(nthreads
):
60 thread
= threading
.Thread(target
=self
._new
_worker
)
61 thread
.setDaemon(True)
62 self
.threads
.put(thread
)
66 def _new_worker(self
):
67 """ Spawned worker threads. These threads loop until queue is empty."""
70 data
= self
.queue
.get()
75 except Exception, full_error
:
76 # Put a catch all here.
77 print ('Unexpected failure in the thread calling %s: %s' %
78 (self
.function
.__name
__, full_error
))