JobManager code cleanup.
[straw/fork.git] / straw / JobManager.py
blob4cecaca6f20288f5aad79303c485528b39b1118d
1 """ JobManager.py
3 Provides flexible infrastructure for handling various jobs from within the
4 application.
6 Very much WIP. Major TODO include:
8 - provide a way for registering observers
9 - handle emitting notification signals
10 - provide a method for getting JobInfo
11 - implement ThreadPoolJobHandler [in progress]
13 """
15 __copyright__ = "Copyright (c) 2002-2005 Free Software Foundation, Inc."
16 __license__ = """
17 Straw is free software; you can redistribute it and/or modify it under the
18 terms of the GNU General Public License as published by the Free Software
19 Foundation; either version 2 of the License, or (at your option) any later
20 version.
22 Straw is distributed in the hope that it will be useful, but WITHOUT ANY
23 WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
24 A PARTICULAR PURPOSE. See the GNU General Public License for more details.
26 You should have received a copy of the GNU General Public License along with
27 this program; if not, write to the Free Software Foundation, Inc., 59 Temple
28 Place - Suite 330, Boston, MA 02111-1307, USA. """
30 from Queue import Queue, Empty
31 from gobject import GObject, SIGNAL_RUN_FIRST
32 from threading import Thread, Lock
33 import error
34 import gobject
35 import time
37 class JobManager(object):
38 """
39 Main entry point for the subsystem. JobManager is responsible managing
40 JobHandlers, starting jobs, providing information about current status of
41 a given job and notifying observers about lifecycle of a job.
43 """
45 def __init__(self):
46 self.handlers = {}
48 def register_handler(self, clazz):
49 self.handlers[clazz.job_id] = clazz
51 def start_job(self, job, wait = False):
52 t = HandlerThread(self.handlers[job.job_id](job))
53 t.start()
55 if wait:
56 t.join()
58 class Job(object):
59 def __init__(self, job_id):
60 self.job_id = job_id
61 self.data = None
62 self.observers = None
64 class JobHandler(GObject):
65 """
66 This class represents an abstract JobHandler which provides common functionality
67 used by all handlers. Each job handler is created with given Job to do.
69 """
71 __gsignals__ = {
72 'job-done' : (gobject.SIGNAL_RUN_LAST, gobject.TYPE_NONE, (gobject.TYPE_PYOBJECT,)),
73 'task-done' : (gobject.SIGNAL_RUN_LAST, gobject.TYPE_NONE, (gobject.TYPE_PYOBJECT,))
76 def __init__(self, job):
77 gobject.GObject.__init__(self)
78 self.job = job
79 self.task_queue = Queue()
80 self.result_queue = Queue()
82 if job.observers != None:
83 for o in job.observers:
84 for p in job.observers[o]:
85 self.connect(o, p)
87 def start(self):
88 self._split()
89 self._run()
91 def _split(self):
92 raise NotImplementedError
94 def _run(self):
95 raise NotImplementedError
97 def _notify(self, event, data):
98 self.emit(event, data)
100 def _post_result(self, task, result):
101 self.result_queue.push(result)
103 def _prepare_result(self):
104 return self.result_queue
106 class HandlerThread(Thread):
107 def __init__(self, handler):
108 self.handler = handler
109 Thread.__init__(self)
111 def run(self):
112 self.handler.start()
114 class ThreadPoolJobHandler(JobHandler):
116 This handler uses a thread pool to efficiently process the data in
117 non-blocking manner. Useful for jobs that need to be done in the
118 background.
122 def __init__(self, job):
123 JobHandler.__init__(self, job)
125 self.pool_size = 3
126 self.task_threads = []
127 self.task_class = None
128 self.queue_lock = Lock()
130 def _run(self):
131 for i in xrange(self.pool_size):
132 t = self.task_class(self, self.task_queue, self.result_queue)
133 t.setName(str(i))
134 t.start()
135 self.task_threads.append(t)
137 error.debug("pool -- created threads, now waiting on the queue")
139 self.task_queue.join()
141 error.debug("pool -- work is finished, wait for workers")
143 for t in self.task_threads:
144 t.job_done()
145 t.join()
147 error.debug("pool -- done, sending job-done")
149 self._notify("job-done", self._prepare_result())
151 class TaskInfo:
152 def __init__(self, id, data):
153 self.id = id
154 self.data = data
156 class TaskResult:
157 def __init__(self, task_info, result):
158 self.task_info = task_info
159 self.result = result
161 def get(self):
162 raise NotImplementedError
164 class TaskThread(Thread):
165 def __init__(self, handler, task_queue, result_queue):
166 self.handler = handler
167 self.task_queue = task_queue
168 self.result_queue = result_queue
169 self.quit = False
171 Thread.__init__(self)
173 def run(self):
174 while 1:
175 print self.getName() + " trying to acquire lock!"
176 self.handler.queue_lock.acquire()
177 print self.getName() + " acquired lock!"
179 if self.quit or self.task_queue.empty():
180 self.handler.queue_lock.release()
181 break
183 task = None
185 try:
186 task = self.task_queue.get_nowait()
187 except Empty:
188 print self.getName() + " missed the party!"
190 self.handler.queue_lock.release()
191 print self.getName() + " released lock!"
193 if task:
194 print self.getName() + " is munching task with id = " + str(task.id)
195 result = self._process(task)
196 self._post_result(task, result)
197 self.task_queue.task_done()
199 def job_done(self):
200 self.quit = True
202 def _post_result(self, task, result):
203 task_result = TaskResult(task, result)
204 self.result_queue.put_nowait(task_result)
205 self.handler._notify("task-done", task_result)
207 def _process(self, task):
208 raise NotImplementedError
210 _job_manager = None
212 def _get_instance():
213 global _job_manager
214 if not _job_manager:
215 _job_manager = JobManager()
216 return _job_manager
218 def register_handler(clazz):
219 _get_instance().register_handler(clazz)
221 def start_job(job, wait = False):
222 _get_instance().start_job(job, wait)