Changes to the config system API to make it more flexible and lightweight, code clean up.
[straw.git] / straw / JobManager.py
blob9756e1166431ab3c8464759b0e3be29a077f1e63
1 """ JobManager.py
3 Provides flexible infrastructure for handling various jobs from within the
4 application.
6 """
8 __copyright__ = "Copyright (c) 2002-2005 Free Software Foundation, Inc."
9 __license__ = """
10 Straw is free software; you can redistribute it and/or modify it under the
11 terms of the GNU General Public License as published by the Free Software
12 Foundation; either version 2 of the License, or (at your option) any later
13 version.
15 Straw is distributed in the hope that it will be useful, but WITHOUT ANY
16 WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
17 A PARTICULAR PURPOSE. See the GNU General Public License for more details.
19 You should have received a copy of the GNU General Public License along with
20 this program; if not, write to the Free Software Foundation, Inc., 59 Temple
21 Place - Suite 330, Boston, MA 02111-1307, USA. """
23 from straw.Queue import Queue, Empty # our copy of the python 2.5 version
24 from gobject import GObject, SIGNAL_RUN_FIRST
25 from threading import Thread, Lock
26 import error
27 import gobject
28 import time
30 class JobManager(object):
31 """
32 Main entry point for the subsystem. JobManager is responsible managing
33 JobHandlers, starting/stopping jobs.
35 """
37 def __init__(self):
38 self.handlers = {}
39 self.active = {}
41 def register_handler(self, clazz):
42 self.handlers[clazz.job_id] = clazz
44 def start_job(self, job, wait = False):
45 t = HandlerThread(self.handlers[job.job_id](job))
47 self.active[job.job_id] = t
49 t.start()
51 if wait:
52 t.join()
54 def stop_jobs(self):
55 for handler_thread in self.active.values():
56 handler_thread.stop()
58 class Job(object):
59 def __init__(self, job_id):
60 self.job_id = job_id
61 self.data = None
62 self.observers = None
64 class JobHandler(GObject):
65 """
66 This class represents an abstract JobHandler which provides common functionality
67 used by all handlers. Each job handler is created with given Job to do.
69 JobHandler implementation should be aware of job's specifics, ie. should be able
70 to split a job into multiple tasks.
72 It is also responsible for providing notification facilities for workers.
74 """
76 __gsignals__ = {
77 'job-done' : (gobject.SIGNAL_RUN_LAST, gobject.TYPE_NONE, (gobject.TYPE_PYOBJECT,)),
78 'task-done' : (gobject.SIGNAL_RUN_LAST, gobject.TYPE_NONE, (gobject.TYPE_PYOBJECT,)),
79 'task-start' : (gobject.SIGNAL_RUN_LAST, gobject.TYPE_NONE, (gobject.TYPE_PYOBJECT,))
82 def __init__(self, job):
83 gobject.GObject.__init__(self)
84 self.job = job
85 self.task_queue = Queue()
86 self.result_queue = Queue()
88 if job.observers != None:
89 for observer_dict in job.observers:
90 for signal in observer_dict.keys():
91 for callable in observer_dict[signal]:
92 self.connect(signal, callable)
94 def start(self):
95 self._split()
96 self._run()
98 def stop(self):
99 raise NotImplementedError
101 def _split(self):
102 raise NotImplementedError
104 def _run(self):
105 raise NotImplementedError
107 def _notify(self, event, data):
108 self.emit(event, data)
110 def _post_result(self, task, result):
111 self.result_queue.push(result)
113 def _prepare_result(self):
114 return self.result_queue
116 class HandlerThread(Thread):
117 def __init__(self, handler):
118 Thread.__init__(self)
120 self.handler = handler
121 self.control_queue = Queue()
122 self.stopped = False
124 def run(self):
125 worker = HandlerWorkerThread(self)
126 worker.start()
128 self.control_queue.put_nowait("STARTED")
129 error.debug("CONTROL -- waiting on control queue")
130 self.control_queue.join()
131 error.debug("CONTROL -- joined control queue")
133 for t in self.handler.task_threads:
134 t.job_done()
136 for t in self.handler.task_threads:
137 t.join()
139 while not self.handler.task_queue.empty():
140 self.handler.task_queue.get_nowait()
141 self.handler.task_queue.task_done()
143 def stop(self):
144 error.debug("CONTROL -- STOP")
146 if not self.stopped:
147 self.control_queue.task_done()
149 self.stopped = True
151 class HandlerWorkerThread(Thread):
152 def __init__(self, handler_thread):
153 Thread.__init__(self)
154 self.handler_thread = handler_thread
156 def run(self):
157 self.handler_thread.handler.start()
158 self.handler_thread.stop()
160 class ThreadPoolJobHandler(JobHandler):
162 This handler uses a thread pool to efficiently process the data in
163 non-blocking manner. Useful for jobs that need to be done in the
164 background.
168 def __init__(self, job):
169 JobHandler.__init__(self, job)
171 self.pool_size = 3
172 self.task_threads = []
173 self.task_class = None
174 self.queue_lock = Lock()
176 def _run(self):
177 for i in xrange(self.pool_size):
178 t = self.task_class(self, self.task_queue, self.result_queue)
179 t.setName(str(i))
180 t.start()
181 self.task_threads.append(t)
183 error.debug("pool -- created threads, now waiting on the queue")
185 self.task_queue.join()
187 error.debug("pool -- work is finished, wait for workers")
189 for t in self.task_threads:
190 t.job_done()
191 t.join()
193 error.debug("pool -- done, sending job-done")
195 self._notify("job-done", self._prepare_result())
197 class TaskInfo:
198 def __init__(self, id, data):
199 self.id = id
200 self.data = data
202 class TaskResult:
203 def __init__(self, task_info, result):
204 self.task_info = task_info
205 self.result = result
207 def get(self):
208 raise NotImplementedError
210 class TaskThread(Thread):
211 def __init__(self, handler, task_queue, result_queue):
212 self.handler = handler
213 self.task_queue = task_queue
214 self.result_queue = result_queue
215 self.quit = False
217 Thread.__init__(self)
219 def run(self):
220 while 1:
221 error.debug(self.getName() + " trying to acquire lock!")
222 self.handler.queue_lock.acquire()
223 error.debug(self.getName() + " acquired lock!")
225 if self.quit or self.task_queue.empty():
226 self.handler.queue_lock.release()
227 error.debug(self.getName() + " is breaking!")
228 break
230 task = None
232 try:
233 task = self.task_queue.get_nowait()
234 except Empty:
235 error.debug(self.getName() + " missed the party!")
237 self.handler.queue_lock.release()
238 error.debug(self.getName() + " released lock!")
240 if task:
241 error.debug(self.getName() + " is munching task with id = %d" % task.id)
242 self._task_started(task)
243 result = None
245 try:
246 result = self._process(task)
247 except Exception, e:
248 error.debug(self.getName() + " threw exception while processing task with id = %d [%s]" % (task.id, str(e)))
249 error.log_exc("Details:")
251 self._post_result(task, result)
253 def job_done(self):
254 error.debug(self.getName() + " got job_done")
255 self.quit = True
257 def _post_result(self, task, result):
258 error.debug(self.getName() + " is posting result to task with id = %d" % task.id)
260 task_result = TaskResult(task, result)
261 self.result_queue.put_nowait(task_result)
262 self.task_queue.task_done()
263 self.handler._notify("task-done", task_result)
265 def _prepare_task_info(self, task_info):
266 return task_info
268 def _task_started(self, task):
269 self.handler._notify("task-start", self._prepare_task_info(task))
271 def _process(self, task):
272 raise NotImplementedError
274 _job_manager = None
276 def _get_instance():
277 global _job_manager
278 if not _job_manager:
279 _job_manager = JobManager()
280 return _job_manager
282 def register_handler(clazz):
283 _get_instance().register_handler(clazz)
285 def start_job(job, wait = False):
286 _get_instance().start_job(job, wait)
288 def stop_jobs():
289 _get_instance().stop_jobs()