Improved logging of exceptions in worker threads.
[straw.git] / straw / JobManager.py
blobd8e5e824dc827ac1ef1c0956ef84ba4514d92453
1 """ JobManager.py
3 Provides flexible infrastructure for handling various jobs from within the
4 application.
6 """
8 __copyright__ = "Copyright (c) 2002-2005 Free Software Foundation, Inc."
9 __license__ = """
10 Straw is free software; you can redistribute it and/or modify it under the
11 terms of the GNU General Public License as published by the Free Software
12 Foundation; either version 2 of the License, or (at your option) any later
13 version.
15 Straw is distributed in the hope that it will be useful, but WITHOUT ANY
16 WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
17 A PARTICULAR PURPOSE. See the GNU General Public License for more details.
19 You should have received a copy of the GNU General Public License along with
20 this program; if not, write to the Free Software Foundation, Inc., 59 Temple
21 Place - Suite 330, Boston, MA 02111-1307, USA. """
23 from straw.Queue import Queue, Empty # our copy of the python 2.5 version
24 from gobject import GObject, SIGNAL_RUN_FIRST
25 from threading import Thread, Lock
26 import error
27 import gobject
28 import time
30 class JobManager(object):
31 """
32 Main entry point for the subsystem. JobManager is responsible managing
33 JobHandlers, starting/stopping jobs.
35 """
37 def __init__(self):
38 self.handlers = {}
39 self.active = {}
41 def register_handler(self, clazz):
42 self.handlers[clazz.job_id] = clazz
44 def start_job(self, job, wait = False):
45 t = HandlerThread(self.handlers[job.job_id](job))
47 self.active[job.job_id] = t
49 t.start()
51 if wait:
52 t.join()
54 def stop_jobs(self):
55 for handler_thread in self.active.values():
56 handler_thread.stop()
58 class Job(object):
59 def __init__(self, job_id):
60 self.job_id = job_id
61 self.data = None
62 self.observers = None
64 class JobHandler(GObject):
65 """
66 This class represents an abstract JobHandler which provides common functionality
67 used by all handlers. Each job handler is created with given Job to do.
69 JobHandler implementation should be aware of job's specifics, ie. should be able
70 to split a job into multiple tasks.
72 It is also responsible for providing notification facilities for workers.
74 """
76 __gsignals__ = {
77 'job-done' : (gobject.SIGNAL_RUN_LAST, gobject.TYPE_NONE, (gobject.TYPE_PYOBJECT,)),
78 'task-done' : (gobject.SIGNAL_RUN_LAST, gobject.TYPE_NONE, (gobject.TYPE_PYOBJECT,)),
79 'task-start' : (gobject.SIGNAL_RUN_LAST, gobject.TYPE_NONE, (gobject.TYPE_PYOBJECT,))
82 def __init__(self, job):
83 gobject.GObject.__init__(self)
84 self.job = job
85 self.task_queue = Queue()
86 self.result_queue = Queue()
88 if job.observers != None:
89 for observer_dict in job.observers:
90 for signal in observer_dict.keys():
91 for callable in observer_dict[signal]:
92 self.connect(signal, callable)
94 def start(self):
95 self._split()
96 self._run()
98 def stop(self):
99 raise NotImplementedError
101 def _split(self):
102 raise NotImplementedError
104 def _run(self):
105 raise NotImplementedError
107 def _notify(self, event, data):
108 self.emit(event, data)
110 def _post_result(self, task, result):
111 self.result_queue.push(result)
113 def _prepare_result(self):
114 return self.result_queue
116 class HandlerThread(Thread):
117 def __init__(self, handler):
118 Thread.__init__(self)
120 self.handler = handler
121 self.control_queue = Queue()
122 self.stopped = False
124 def run(self):
125 worker = HandlerWorkerThread(self)
126 worker.start()
128 self.control_queue.put_nowait("STARTED")
129 error.debug("CONTROL -- waiting on control queue")
130 self.control_queue.join()
131 error.debug("CONTROL -- joined control queue")
133 for t in self.handler.task_threads:
134 t.job_done()
136 for t in self.handler.task_threads:
137 t.join()
139 while not self.handler.task_queue.empty():
140 self.handler.task_queue.get_nowait()
141 self.handler.task_queue.task_done()
143 def stop(self):
144 error.debug("CONTROL -- STOP")
146 if not self.stopped:
147 self.control_queue.task_done()
149 self.stopped = True
151 class HandlerWorkerThread(Thread):
152 def __init__(self, handler_thread):
153 Thread.__init__(self)
154 self.handler_thread = handler_thread
156 def run(self):
157 self.handler_thread.handler.start()
158 self.handler_thread.stop()
160 class ThreadPoolJobHandler(JobHandler):
162 This handler uses a thread pool to efficiently process the data in
163 non-blocking manner. Useful for jobs that need to be done in the
164 background.
168 def __init__(self, job):
169 JobHandler.__init__(self, job)
171 self.pool_size = 3
172 self.task_threads = []
173 self.task_class = None
174 self.queue_lock = Lock()
176 def _run(self):
177 for i in xrange(self.pool_size):
178 t = self.task_class(self, self.task_queue, self.result_queue)
179 t.setName(str(i))
180 t.start()
181 self.task_threads.append(t)
183 error.debug("pool -- created threads, now waiting on the queue")
185 self.task_queue.join()
187 error.debug("pool -- work is finished, wait for workers")
189 for t in self.task_threads:
190 t.job_done()
191 t.join()
193 error.debug("pool -- done, sending job-done")
195 self._notify("job-done", self._prepare_result())
197 class TaskInfo:
198 def __init__(self, id, data):
199 self.id = id
200 self.data = data
202 class TaskResult:
203 def __init__(self, task_info, result):
204 self.task_info = task_info
205 self.result = result
207 def get(self):
208 raise NotImplementedError
210 class TaskThread(Thread):
211 def __init__(self, handler, task_queue, result_queue):
212 self.handler = handler
213 self.task_queue = task_queue
214 self.result_queue = result_queue
215 self.quit = False
217 Thread.__init__(self)
219 def run(self):
220 while 1:
221 error.debug(self.getName() + " trying to acquire lock!")
222 self.handler.queue_lock.acquire()
223 error.debug(self.getName() + " acquired lock!")
225 if self.quit or self.task_queue.empty():
226 self.handler.queue_lock.release()
227 error.debug(self.getName() + " is breaking!")
228 break
230 task = None
232 try:
233 task = self.task_queue.get_nowait()
234 except Empty:
235 error.debug(self.getName() + " missed the party!")
237 self.handler.queue_lock.release()
238 error.debug(self.getName() + " released lock!")
240 if task:
241 error.debug(self.getName() + " is munching task with id = %d" % task.id)
242 self._task_started(task)
244 try:
245 result = self._process(task)
246 self._post_result(task, result)
247 error.debug(self.getName() + " is posting result to task with id = %d" % task.id)
248 except Exception, e:
249 error.debug(self.getName() + " threw exception while processing task with id = %d [%s]" % (task.id, str(e)))
250 error.log_exc("Details:")
252 self.task_queue.task_done()
254 def job_done(self):
255 error.debug(self.getName() + " got job_done")
256 self.quit = True
258 def _post_result(self, task, result):
259 task_result = TaskResult(task, result)
260 self.result_queue.put_nowait(task_result)
261 self.handler._notify("task-done", task_result)
263 def _prepare_task_info(self, task_info):
264 return task_info
266 def _task_started(self, task):
267 self.handler._notify("task-start", self._prepare_task_info(task))
269 def _process(self, task):
270 raise NotImplementedError
272 _job_manager = None
274 def _get_instance():
275 global _job_manager
276 if not _job_manager:
277 _job_manager = JobManager()
278 return _job_manager
280 def register_handler(clazz):
281 _get_instance().register_handler(clazz)
283 def start_job(job, wait = False):
284 _get_instance().start_job(job, wait)
286 def stop_jobs():
287 _get_instance().stop_jobs()