3 Provides flexible infrastructure for handling various jobs from within the
8 __copyright__
= "Copyright (c) 2002-2005 Free Software Foundation, Inc."
10 Straw is free software; you can redistribute it and/or modify it under the
11 terms of the GNU General Public License as published by the Free Software
12 Foundation; either version 2 of the License, or (at your option) any later
15 Straw is distributed in the hope that it will be useful, but WITHOUT ANY
16 WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
17 A PARTICULAR PURPOSE. See the GNU General Public License for more details.
19 You should have received a copy of the GNU General Public License along with
20 this program; if not, write to the Free Software Foundation, Inc., 59 Temple
21 Place - Suite 330, Boston, MA 02111-1307, USA. """
23 from straw
.Queue
import Queue
, Empty
# our copy of the python 2.5 version
24 from gobject
import GObject
, SIGNAL_RUN_FIRST
25 from threading
import Thread
32 class JobManager(GObject
):
34 Main entry point for the subsystem. JobManager is responsible managing
35 JobHandlers, starting/stopping jobs.
46 def register_handler(self
, clazz
):
47 self
.handlers
[clazz
.job_id
] = clazz
49 def start(self
, job
, wait
= False):
51 handler
= self
.handlers
[job
.job_id
](id, job
)
52 handler_thread
= HandlerThread(handler
)
54 handler
.connect("cleanup", self
._on
_cleanup
)
55 self
.active
[id] = handler_thread
56 handler_thread
.start()
64 self
.active
[id].stop()
67 for handler_thread
in self
.active
.values():
70 def _on_cleanup(self
, handler
, data
):
71 del self
.active
[handler
.id]
74 def __init__(self
, job_id
):
79 class JobHandler(GObject
):
81 This class represents an abstract JobHandler which provides common functionality
82 used by all handlers. Each job handler is created with given Job to do.
84 JobHandler implementation should be aware of job's specifics, ie. should be able
85 to split a job into multiple tasks.
87 It is also responsible for providing notification facilities for workers.
92 'job-done' : (gobject
.SIGNAL_RUN_LAST
, gobject
.TYPE_NONE
, (gobject
.TYPE_PYOBJECT
,)),
93 'task-done' : (gobject
.SIGNAL_RUN_LAST
, gobject
.TYPE_NONE
, (gobject
.TYPE_PYOBJECT
,)),
94 'task-start' : (gobject
.SIGNAL_RUN_LAST
, gobject
.TYPE_NONE
, (gobject
.TYPE_PYOBJECT
,)),
95 'cleanup' : (gobject
.SIGNAL_RUN_LAST
, gobject
.TYPE_NONE
, (gobject
.TYPE_PYOBJECT
,))
98 def __init__(self
, id, job
):
99 gobject
.GObject
.__init
__(self
)
102 self
.task_queue
= Queue()
103 self
.result_queue
= Queue()
106 if job
.observers
!= None:
107 for observer_dict
in job
.observers
:
108 for signal
in observer_dict
.keys():
109 for callable in observer_dict
[signal
]:
110 self
.connect(signal
, callable)
112 def _debug(self
, msg
):
113 error
.debug("%s %s" % (self
.job_id
, msg
))
122 raise NotImplementedError
128 raise NotImplementedError
131 self
._debug
("shutting down...")
132 self
._notify
("job-done")
135 self
._notify
("cleanup")
137 def _notify(self
, event
, data
= None):
138 self
.emit(event
, data
)
140 def _post_result(self
, task
, result
):
141 self
.result_queue
.put(result
)
143 class HandlerThread(Thread
):
144 def __init__(self
, handler
):
145 Thread
.__init
__(self
)
147 self
.handler
= handler
155 class ThreadPoolJobHandler(JobHandler
):
157 This handler uses a thread pool to efficiently process the data in
158 non-blocking manner. Useful for jobs that need to be done in the
163 def __init__(self
, id, job
):
164 JobHandler
.__init
__(self
, id, job
)
167 self
.running_queue
= Queue()
168 self
.task_threads
= []
169 self
.task_class
= None
172 if not self
.job_size
:
173 self
.job_size
= self
.task_queue
.qsize()
175 for i
in xrange(self
.pool_size
):
176 t
= self
.task_class(self
)
179 self
.task_threads
.append(t
)
181 self
._debug
("created %d thread(s), waits for results" % len(self
.task_threads
))
183 for t
in xrange(self
.job_size
):
184 self
._debug
("waiting...")
185 task_result
= self
.result_queue
.get()
186 self
._debug
("got result [%s]" % str(task_result
))
189 self
._notify
("task-done", task_result
)
191 self
._debug
("stopping...")
194 self
._debug
("got the job done, waits for %d worker(s)" % len(self
.task_threads
))
196 for t
in self
.task_threads
:
197 self
.task_queue
.put("job-done")
198 self
._debug
("put job-done")
200 self
.running_queue
.join()
203 for t
in self
.task_threads
:
207 def __init__(self
, task
, result
):
211 class TaskThread(Thread
):
212 def __init__(self
, handler
):
213 self
.handler
= handler
214 self
.task_queue
= handler
.task_queue
215 self
.result_queue
= handler
.result_queue
216 self
.running_queue
= handler
.running_queue
219 Thread
.__init
__(self
)
221 def _debug(self
, msg
):
222 error
.debug("%s-%s %s" % (self
.handler
.job_id
, self
.getName(), msg
))
225 self
.running_queue
.put(True)
228 self
._debug
("waiting to get a task")
230 task
= self
.task_queue
.get()
232 self
._debug
("got a task [%s]" % str(task
)[:LOG_STRING_LIMIT
])
234 if self
.quit
or task
== "job-done":
235 self
._debug
("breaking")
236 self
._post
_result
("job-done", None)
240 self
._debug
("munching task [%s]" % str(task
)[:LOG_STRING_LIMIT
])
241 self
._task
_started
(task
)
245 result
= self
._process
(task
)
247 self
._debug
("has thrown an exception while processing task [%s] [e = %s]" % (str(task
)[:LOG_STRING_LIMIT
], str(e
)))
248 #error.log_exc("Details:")
250 self
._post
_result
(task
, result
)
252 self
.running_queue
.get()
253 self
.running_queue
.task_done()
254 self
._debug
("terminating, bye")
257 self
._debug
("got job_done")
260 def _post_result(self
, task
, result
):
261 if task
== "job-done":
262 self
.result_queue
.put(None)
263 self
._debug
("posted None")
265 self
._debug
("posting result to task [%s]" % str(task
)[:LOG_STRING_LIMIT
])
267 task_result
= TaskResult(task
, result
)
268 self
.result_queue
.put(task_result
)
269 self
.task_queue
.task_done()
271 def _prepare_task(self
, task
):
274 def _task_started(self
, task
):
275 self
.handler
._notify
("task-start", self
._prepare
_task
(task
))
277 def _process(self
, task
):
278 raise NotImplementedError
285 _job_manager
= JobManager()
288 def register_handler(clazz
):
289 _get_instance().register_handler(clazz
)
291 def start(job
, wait
= False):
292 return _get_instance().start(job
, wait
)
295 _get_instance().stop(id)
298 _get_instance().stop_jobs()