From 421ee1e2afb289b187d9747cec4343f592baeb85 Mon Sep 17 00:00:00 2001 From: PawelPaprota Date: Mon, 3 Dec 2007 10:15:03 +0100 Subject: [PATCH] JobManager code cleanup. --- straw/JobManager.py | 30 ++++++++++++++---------------- 1 file changed, 14 insertions(+), 16 deletions(-) diff --git a/straw/JobManager.py b/straw/JobManager.py index 09b50d7..4cecaca 100644 --- a/straw/JobManager.py +++ b/straw/JobManager.py @@ -27,11 +27,12 @@ You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. """ -from gobject import GObject, SIGNAL_RUN_FIRST from Queue import Queue, Empty +from gobject import GObject, SIGNAL_RUN_FIRST from threading import Thread, Lock -import time +import error import gobject +import time class JobManager(object): """ @@ -46,7 +47,7 @@ class JobManager(object): def register_handler(self, clazz): self.handlers[clazz.job_id] = clazz - + def start_job(self, job, wait = False): t = HandlerThread(self.handlers[job.job_id](job)) t.start() @@ -95,14 +96,10 @@ class JobHandler(GObject): def _notify(self, event, data): self.emit(event, data) - #print event - #self.emit - #for observer in self.job.observers: - # observer def _post_result(self, task, result): self.result_queue.push(result) - + def _prepare_result(self): return self.result_queue @@ -125,7 +122,7 @@ class ThreadPoolJobHandler(JobHandler): def __init__(self, job): JobHandler.__init__(self, job) - self.pool_size = 5 + self.pool_size = 3 self.task_threads = [] self.task_class = None self.queue_lock = Lock() @@ -137,18 +134,18 @@ class ThreadPoolJobHandler(JobHandler): t.start() self.task_threads.append(t) - import error - error.debug("pool -- created threads, now waiting for the queue") + error.debug("pool -- created threads, now waiting on the queue") self.task_queue.join() - print "pool -- work is finished, wait for workers" + error.debug("pool -- work is finished, wait for workers") for t in self.task_threads: t.job_done() t.join() - print "pool -- done" + error.debug("pool -- done, sending job-done") + self._notify("job-done", self._prepare_result()) class TaskInfo: @@ -169,6 +166,7 @@ class TaskThread(Thread): self.handler = handler self.task_queue = task_queue self.result_queue = result_queue + self.quit = False Thread.__init__(self) @@ -178,10 +176,10 @@ class TaskThread(Thread): self.handler.queue_lock.acquire() print self.getName() + " acquired lock!" - if self.task_queue.empty(): + if self.quit or self.task_queue.empty(): self.handler.queue_lock.release() break - + task = None try: @@ -199,7 +197,7 @@ class TaskThread(Thread): self.task_queue.task_done() def job_done(self): - print "got job_done()" + self.quit = True def _post_result(self, task, result): task_result = TaskResult(task, result) -- 2.11.4.GIT