3 Provides flexible infrastructure for handling various jobs from within the
8 __copyright__
= "Copyright (c) 2002-2005 Free Software Foundation, Inc."
10 Straw is free software; you can redistribute it and/or modify it under the
11 terms of the GNU General Public License as published by the Free Software
12 Foundation; either version 2 of the License, or (at your option) any later
15 Straw is distributed in the hope that it will be useful, but WITHOUT ANY
16 WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
17 A PARTICULAR PURPOSE. See the GNU General Public License for more details.
19 You should have received a copy of the GNU General Public License along with
20 this program; if not, write to the Free Software Foundation, Inc., 59 Temple
21 Place - Suite 330, Boston, MA 02111-1307, USA. """
23 from straw
.Queue
import Queue
, Empty
# our copy of the python 2.5 version
24 from gobject
import GObject
, SIGNAL_RUN_FIRST
25 from threading
import Thread
, Lock
30 class JobManager(object):
32 Main entry point for the subsystem. JobManager is responsible managing
33 JobHandlers, starting/stopping jobs.
41 def register_handler(self
, clazz
):
42 self
.handlers
[clazz
.job_id
] = clazz
44 def start_job(self
, job
, wait
= False):
45 t
= HandlerThread(self
.handlers
[job
.job_id
](job
))
47 self
.active
[job
.job_id
] = t
55 for handler_thread
in self
.active
.values():
59 def __init__(self
, job_id
):
64 class JobHandler(GObject
):
66 This class represents an abstract JobHandler which provides common functionality
67 used by all handlers. Each job handler is created with given Job to do.
69 JobHandler implementation should be aware of job's specifics, ie. should be able
70 to split a job into multiple tasks.
72 It is also responsible for providing notification facilities for workers.
77 'job-done' : (gobject
.SIGNAL_RUN_LAST
, gobject
.TYPE_NONE
, (gobject
.TYPE_PYOBJECT
,)),
78 'task-done' : (gobject
.SIGNAL_RUN_LAST
, gobject
.TYPE_NONE
, (gobject
.TYPE_PYOBJECT
,)),
79 'task-start' : (gobject
.SIGNAL_RUN_LAST
, gobject
.TYPE_NONE
, (gobject
.TYPE_PYOBJECT
,))
82 def __init__(self
, job
):
83 gobject
.GObject
.__init
__(self
)
85 self
.task_queue
= Queue()
86 self
.result_queue
= Queue()
88 if job
.observers
!= None:
89 for observer_dict
in job
.observers
:
90 for signal
in observer_dict
.keys():
91 for callable in observer_dict
[signal
]:
92 self
.connect(signal
, callable)
99 raise NotImplementedError
102 raise NotImplementedError
105 raise NotImplementedError
107 def _notify(self
, event
, data
):
108 self
.emit(event
, data
)
110 def _post_result(self
, task
, result
):
111 self
.result_queue
.push(result
)
113 def _prepare_result(self
):
114 return self
.result_queue
116 class HandlerThread(Thread
):
117 def __init__(self
, handler
):
118 Thread
.__init
__(self
)
120 self
.handler
= handler
121 self
.control_queue
= Queue()
125 worker
= HandlerWorkerThread(self
)
128 self
.control_queue
.put_nowait("STARTED")
129 error
.debug("CONTROL -- waiting on control queue")
130 self
.control_queue
.join()
131 error
.debug("CONTROL -- joined control queue")
133 for t
in self
.handler
.task_threads
:
136 for t
in self
.handler
.task_threads
:
139 while not self
.handler
.task_queue
.empty():
140 self
.handler
.task_queue
.get_nowait()
141 self
.handler
.task_queue
.task_done()
144 error
.debug("CONTROL -- STOP")
147 self
.control_queue
.task_done()
151 class HandlerWorkerThread(Thread
):
152 def __init__(self
, handler_thread
):
153 Thread
.__init
__(self
)
154 self
.handler_thread
= handler_thread
157 self
.handler_thread
.handler
.start()
158 self
.handler_thread
.stop()
160 class ThreadPoolJobHandler(JobHandler
):
162 This handler uses a thread pool to efficiently process the data in
163 non-blocking manner. Useful for jobs that need to be done in the
168 def __init__(self
, job
):
169 JobHandler
.__init
__(self
, job
)
172 self
.task_threads
= []
173 self
.task_class
= None
174 self
.queue_lock
= Lock()
177 for i
in xrange(self
.pool_size
):
178 t
= self
.task_class(self
, self
.task_queue
, self
.result_queue
)
181 self
.task_threads
.append(t
)
183 error
.debug("pool -- created threads, now waiting on the queue")
185 self
.task_queue
.join()
187 error
.debug("pool -- work is finished, wait for workers")
189 for t
in self
.task_threads
:
193 error
.debug("pool -- done, sending job-done")
195 self
._notify
("job-done", self
._prepare
_result
())
198 def __init__(self
, id, data
):
203 def __init__(self
, task_info
, result
):
204 self
.task_info
= task_info
208 raise NotImplementedError
210 class TaskThread(Thread
):
211 def __init__(self
, handler
, task_queue
, result_queue
):
212 self
.handler
= handler
213 self
.task_queue
= task_queue
214 self
.result_queue
= result_queue
217 Thread
.__init
__(self
)
221 error
.debug(self
.getName() + " trying to acquire lock!")
222 self
.handler
.queue_lock
.acquire()
223 error
.debug(self
.getName() + " acquired lock!")
225 if self
.quit
or self
.task_queue
.empty():
226 self
.handler
.queue_lock
.release()
227 error
.debug(self
.getName() + " is breaking!")
233 task
= self
.task_queue
.get_nowait()
235 error
.debug(self
.getName() + " missed the party!")
237 self
.handler
.queue_lock
.release()
238 error
.debug(self
.getName() + " released lock!")
241 error
.debug(self
.getName() + " is munching task with id = %d" % task
.id)
242 self
._task
_started
(task
)
246 result
= self
._process
(task
)
248 error
.debug(self
.getName() + " threw exception while processing task with id = %d [%s]" % (task
.id, str(e
)))
249 error
.log_exc("Details:")
251 self
._post
_result
(task
, result
)
254 error
.debug(self
.getName() + " got job_done")
257 def _post_result(self
, task
, result
):
258 error
.debug(self
.getName() + " is posting result to task with id = %d" % task
.id)
260 task_result
= TaskResult(task
, result
)
261 self
.result_queue
.put_nowait(task_result
)
262 self
.task_queue
.task_done()
263 self
.handler
._notify
("task-done", task_result
)
265 def _prepare_task_info(self
, task_info
):
268 def _task_started(self
, task
):
269 self
.handler
._notify
("task-start", self
._prepare
_task
_info
(task
))
271 def _process(self
, task
):
272 raise NotImplementedError
279 _job_manager
= JobManager()
282 def register_handler(clazz
):
283 _get_instance().register_handler(clazz
)
285 def start_job(job
, wait
= False):
286 _get_instance().start_job(job
, wait
)
289 _get_instance().stop_jobs()