Fixes (workarounds) in OPML parsing, more work on GUI...
[straw/fork.git] / straw / JobManager.py
blobf318d9283f027adf5f525cf3bb6bf71deeba74c7
1 """ JobManager.py
3 Provides flexible infrastructure for handling various jobs from within the
4 application.
6 Very much WIP. Major TODO include:
8 - provide a way for registering observers
9 - handle emitting notification signals
10 - provide a method for getting JobInfo
11 - implement ThreadPoolJobHandler [in progress]
13 """
15 __copyright__ = "Copyright (c) 2002-2005 Free Software Foundation, Inc."
16 __license__ = """
17 Straw is free software; you can redistribute it and/or modify it under the
18 terms of the GNU General Public License as published by the Free Software
19 Foundation; either version 2 of the License, or (at your option) any later
20 version.
22 Straw is distributed in the hope that it will be useful, but WITHOUT ANY
23 WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
24 A PARTICULAR PURPOSE. See the GNU General Public License for more details.
26 You should have received a copy of the GNU General Public License along with
27 this program; if not, write to the Free Software Foundation, Inc., 59 Temple
28 Place - Suite 330, Boston, MA 02111-1307, USA. """
30 from Queue import Queue, Empty
31 from gobject import GObject, SIGNAL_RUN_FIRST
32 from threading import Thread, Lock
33 import error
34 import gobject
35 import time
37 class JobManager(object):
38 """
39 Main entry point for the subsystem. JobManager is responsible managing
40 JobHandlers, starting jobs, providing information about current status of
41 a given job and notifying observers about lifecycle of a job.
43 """
45 def __init__(self):
46 self.handlers = {}
48 def register_handler(self, clazz):
49 self.handlers[clazz.job_id] = clazz
51 def start_job(self, job, wait = False):
52 t = HandlerThread(self.handlers[job.job_id](job))
53 t.start()
55 if wait:
56 t.join()
58 class Job(object):
59 def __init__(self, job_id):
60 self.job_id = job_id
61 self.data = None
62 self.observers = None
64 class JobHandler(GObject):
65 """
66 This class represents an abstract JobHandler which provides common functionality
67 used by all handlers. Each job handler is created with given Job to do.
69 """
71 __gsignals__ = {
72 'job-done' : (gobject.SIGNAL_RUN_LAST, gobject.TYPE_NONE, (gobject.TYPE_PYOBJECT,)),
73 'task-done' : (gobject.SIGNAL_RUN_LAST, gobject.TYPE_NONE, (gobject.TYPE_PYOBJECT,)),
74 'task-start' : (gobject.SIGNAL_RUN_LAST, gobject.TYPE_NONE, (gobject.TYPE_PYOBJECT,))
77 def __init__(self, job):
78 gobject.GObject.__init__(self)
79 self.job = job
80 self.task_queue = Queue()
81 self.result_queue = Queue()
83 if job.observers != None:
84 for observer_dict in job.observers:
85 for signal in observer_dict.keys():
86 for callable in observer_dict[signal]:
87 self.connect(signal, callable)
89 def start(self):
90 self._split()
91 self._run()
93 def _split(self):
94 raise NotImplementedError
96 def _run(self):
97 raise NotImplementedError
99 def _notify(self, event, data):
100 self.emit(event, data)
102 def _post_result(self, task, result):
103 self.result_queue.push(result)
105 def _prepare_result(self):
106 return self.result_queue
108 class HandlerThread(Thread):
109 def __init__(self, handler):
110 self.handler = handler
111 Thread.__init__(self)
113 def run(self):
114 self.handler.start()
116 class ThreadPoolJobHandler(JobHandler):
118 This handler uses a thread pool to efficiently process the data in
119 non-blocking manner. Useful for jobs that need to be done in the
120 background.
124 def __init__(self, job):
125 JobHandler.__init__(self, job)
127 self.pool_size = 3
128 self.task_threads = []
129 self.task_class = None
130 self.queue_lock = Lock()
132 def _run(self):
133 for i in xrange(self.pool_size):
134 t = self.task_class(self, self.task_queue, self.result_queue)
135 t.setName(str(i))
136 t.start()
137 self.task_threads.append(t)
139 error.debug("pool -- created threads, now waiting on the queue")
141 self.task_queue.join()
143 error.debug("pool -- work is finished, wait for workers")
145 for t in self.task_threads:
146 t.job_done()
147 t.join()
149 error.debug("pool -- done, sending job-done")
151 self._notify("job-done", self._prepare_result())
153 class TaskInfo:
154 def __init__(self, id, data):
155 self.id = id
156 self.data = data
158 class TaskResult:
159 def __init__(self, task_info, result):
160 self.task_info = task_info
161 self.result = result
163 def get(self):
164 raise NotImplementedError
166 class TaskThread(Thread):
167 def __init__(self, handler, task_queue, result_queue):
168 self.handler = handler
169 self.task_queue = task_queue
170 self.result_queue = result_queue
171 self.quit = False
173 Thread.__init__(self)
175 def run(self):
176 while 1:
177 error.debug(self.getName() + " trying to acquire lock!")
178 self.handler.queue_lock.acquire()
179 error.debug(self.getName() + " acquired lock!")
181 if self.quit or self.task_queue.empty():
182 self.handler.queue_lock.release()
183 break
185 task = None
187 try:
188 task = self.task_queue.get_nowait()
189 except Empty:
190 error.debug(self.getName() + " missed the party!")
192 self.handler.queue_lock.release()
193 error.debug(self.getName() + " released lock!")
195 if task:
196 error.debug(self.getName() + " is munching task with id = " + str(task.id))
197 self._task_started(task)
198 result = self._process(task)
199 self._post_result(task, result)
200 error.debug(self.getName() + " is posting result to task with id = " + str(task.id))
201 self.task_queue.task_done()
203 def job_done(self):
204 self.quit = True
206 def _post_result(self, task, result):
207 task_result = TaskResult(task, result)
208 self.result_queue.put_nowait(task_result)
209 self.handler._notify("task-done", task_result)
211 def _prepare_task_info(self, task_info):
212 return task_info
214 def _task_started(self, task):
215 self.handler._notify("task-start", self._prepare_task_info(task))
217 def _process(self, task):
218 raise NotImplementedError
220 _job_manager = None
222 def _get_instance():
223 global _job_manager
224 if not _job_manager:
225 _job_manager = JobManager()
226 return _job_manager
228 def register_handler(clazz):
229 _get_instance().register_handler(clazz)
231 def start_job(job, wait = False):
232 _get_instance().start_job(job, wait)