Implemented "mark all as read".
[straw.git] / straw / JobManager.py
blob7fe1b434d020377746561c5fc50fd44400ccbfbf
1 """ JobManager.py
3 Provides flexible infrastructure for handling various jobs from within the
4 application.
6 """
8 __copyright__ = "Copyright (c) 2002-2005 Free Software Foundation, Inc."
9 __license__ = """
10 Straw is free software; you can redistribute it and/or modify it under the
11 terms of the GNU General Public License as published by the Free Software
12 Foundation; either version 2 of the License, or (at your option) any later
13 version.
15 Straw is distributed in the hope that it will be useful, but WITHOUT ANY
16 WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
17 A PARTICULAR PURPOSE. See the GNU General Public License for more details.
19 You should have received a copy of the GNU General Public License along with
20 this program; if not, write to the Free Software Foundation, Inc., 59 Temple
21 Place - Suite 330, Boston, MA 02111-1307, USA. """
23 from straw.Queue import Queue, Empty # our copy of the python 2.5 version
24 from gobject import GObject, SIGNAL_RUN_FIRST
25 from threading import Thread
26 import error
27 import gobject
28 import time
30 LOG_STRING_LIMIT = 50
32 class JobManager(GObject):
33 """
34 Main entry point for the subsystem. JobManager is responsible managing
35 JobHandlers, starting/stopping jobs.
37 """
39 def __init__(self):
40 self.handlers = {}
41 self.active = {}
43 def _get_id(self):
44 return time.time()
46 def register_handler(self, clazz):
47 self.handlers[clazz.job_id] = clazz
49 def start(self, job, wait = False):
50 id = self._get_id()
51 handler = self.handlers[job.job_id](id, job)
52 handler_thread = HandlerThread(handler)
54 handler.connect("cleanup", self._on_cleanup)
55 self.active[id] = handler_thread
56 handler_thread.start()
58 if wait:
59 handler_thread.join()
61 return id
63 def stop(self, id):
64 self.active[id].stop()
66 def stop_jobs(self):
67 for handler_thread in self.active.values():
68 handler_thread.stop()
70 def _on_cleanup(self, handler, data):
71 del self.active[handler.id]
73 class Job(object):
74 def __init__(self, job_id):
75 self.job_id = job_id
76 self.tasks = None
77 self.observers = None
79 class JobHandler(GObject):
80 """
81 This class represents an abstract JobHandler which provides common functionality
82 used by all handlers. Each job handler is created with given Job to do.
84 JobHandler implementation should be aware of job's specifics, ie. should be able
85 to split a job into multiple tasks.
87 It is also responsible for providing notification facilities for workers.
89 """
91 __gsignals__ = {
92 'job-done' : (gobject.SIGNAL_RUN_LAST, gobject.TYPE_NONE, (gobject.TYPE_PYOBJECT,)),
93 'task-done' : (gobject.SIGNAL_RUN_LAST, gobject.TYPE_NONE, (gobject.TYPE_PYOBJECT,)),
94 'task-start' : (gobject.SIGNAL_RUN_LAST, gobject.TYPE_NONE, (gobject.TYPE_PYOBJECT,)),
95 'cleanup' : (gobject.SIGNAL_RUN_LAST, gobject.TYPE_NONE, (gobject.TYPE_PYOBJECT,))
98 def __init__(self, id, job):
99 gobject.GObject.__init__(self)
100 self.id = id
101 self.job = job
102 self.task_queue = Queue()
103 self.result_queue = Queue()
104 self.job_size = None
106 if job.observers != None:
107 for observer_dict in job.observers:
108 for signal in observer_dict.keys():
109 for callable in observer_dict[signal]:
110 self.connect(signal, callable)
112 def _debug(self, msg):
113 error.debug("%s %s" % (self.job_id, msg))
115 def start(self):
116 self._prepare()
117 self._run()
118 self._shutdown()
119 self._cleanup()
121 def stop(self):
122 raise NotImplementedError
124 def _prepare(self):
125 pass
127 def _run(self):
128 raise NotImplementedError
130 def _shutdown(self):
131 self._debug("shutting down...")
132 self._notify("job-done")
134 def _cleanup(self):
135 self._notify("cleanup")
137 def _notify(self, event, data = None):
138 self.emit(event, data)
140 def _post_result(self, task, result):
141 self.result_queue.put(result)
143 class HandlerThread(Thread):
144 def __init__(self, handler):
145 Thread.__init__(self)
147 self.handler = handler
149 def run(self):
150 self.handler.start()
152 def stop(self):
153 self.handler.stop()
155 class ThreadPoolJobHandler(JobHandler):
157 This handler uses a thread pool to efficiently process the data in
158 non-blocking manner. Useful for jobs that need to be done in the
159 background.
163 def __init__(self, id, job):
164 JobHandler.__init__(self, id, job)
166 self.pool_size = 3
167 self.running_queue = Queue()
168 self.task_threads = []
169 self.task_class = None
171 def _run(self):
172 if not self.job_size:
173 self.job_size = self.task_queue.qsize()
175 for i in xrange(self.pool_size):
176 t = self.task_class(self)
177 t.setName(str(i))
178 t.start()
179 self.task_threads.append(t)
181 self._debug("created %d thread(s), waits for results" % len(self.task_threads))
183 for t in xrange(self.job_size):
184 self._debug("waiting...")
185 task_result = self.result_queue.get()
186 self._debug("got result [%s]" % str(task_result))
188 if task_result:
189 self._notify("task-done", task_result)
190 else:
191 self._debug("stopping...")
192 break
194 self._debug("got the job done, waits for %d worker(s)" % len(self.task_threads))
196 for t in self.task_threads:
197 self.task_queue.put("job-done")
198 self._debug("put job-done")
200 self.running_queue.join()
202 def stop(self):
203 for t in self.task_threads:
204 t.job_done()
206 class TaskResult:
207 def __init__(self, task, result):
208 self.task = task
209 self.result = result
211 class TaskThread(Thread):
212 def __init__(self, handler):
213 self.handler = handler
214 self.task_queue = handler.task_queue
215 self.result_queue = handler.result_queue
216 self.running_queue = handler.running_queue
217 self.quit = False
219 Thread.__init__(self)
221 def _debug(self, msg):
222 error.debug("%s-%s %s" % (self.handler.job_id, self.getName(), msg))
224 def run(self):
225 self.running_queue.put(True)
227 while 1:
228 self._debug("waiting to get a task")
230 task = self.task_queue.get()
232 self._debug("got a task [%s]" % str(task)[:LOG_STRING_LIMIT])
234 if self.quit or task == "job-done":
235 self._debug("breaking")
236 self._post_result("job-done", None)
237 break
239 if task:
240 self._debug("munching task [%s]" % str(task)[:LOG_STRING_LIMIT])
241 self._task_started(task)
242 result = None
244 try:
245 result = self._process(task)
246 except Exception, e:
247 self._debug("has thrown an exception while processing task [%s] [e = %s]" % (str(task)[:LOG_STRING_LIMIT], str(e)))
248 #error.log_exc("Details:")
250 self._post_result(task, result)
252 self.running_queue.get()
253 self.running_queue.task_done()
254 self._debug("terminating, bye")
256 def job_done(self):
257 self._debug("got job_done")
258 self.quit = True
260 def _post_result(self, task, result):
261 if task == "job-done":
262 self.result_queue.put(None)
263 self._debug("posted None")
264 else:
265 self._debug("posting result to task [%s]" % str(task)[:LOG_STRING_LIMIT])
267 task_result = TaskResult(task, result)
268 self.result_queue.put(task_result)
269 self.task_queue.task_done()
271 def _prepare_task(self, task):
272 return task
274 def _task_started(self, task):
275 self.handler._notify("task-start", self._prepare_task(task))
277 def _process(self, task):
278 raise NotImplementedError
280 _job_manager = None
282 def _get_instance():
283 global _job_manager
284 if not _job_manager:
285 _job_manager = JobManager()
286 return _job_manager
288 def register_handler(clazz):
289 _get_instance().register_handler(clazz)
291 def start(job, wait = False):
292 return _get_instance().start(job, wait)
294 def stop(id):
295 _get_instance().stop(id)
297 def stop_jobs():
298 _get_instance().stop_jobs()