3 Provides flexible infrastructure for handling various jobs from within the
6 Very much WIP. Major TODO include:
8 - provide a way for registering observers
9 - handle emitting notification signals
10 - provide a method for getting JobInfo
11 - implement ThreadPoolJobHandler [in progress]
15 __copyright__
= "Copyright (c) 2002-2005 Free Software Foundation, Inc."
17 Straw is free software; you can redistribute it and/or modify it under the
18 terms of the GNU General Public License as published by the Free Software
19 Foundation; either version 2 of the License, or (at your option) any later
22 Straw is distributed in the hope that it will be useful, but WITHOUT ANY
23 WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
24 A PARTICULAR PURPOSE. See the GNU General Public License for more details.
26 You should have received a copy of the GNU General Public License along with
27 this program; if not, write to the Free Software Foundation, Inc., 59 Temple
28 Place - Suite 330, Boston, MA 02111-1307, USA. """
30 from Queue
import Queue
, Empty
31 from gobject
import GObject
, SIGNAL_RUN_FIRST
32 from threading
import Thread
, Lock
37 class JobManager(object):
39 Main entry point for the subsystem. JobManager is responsible managing
40 JobHandlers, starting jobs, providing information about current status of
41 a given job and notifying observers about lifecycle of a job.
48 def register_handler(self
, clazz
):
49 self
.handlers
[clazz
.job_id
] = clazz
51 def start_job(self
, job
, wait
= False):
52 t
= HandlerThread(self
.handlers
[job
.job_id
](job
))
59 def __init__(self
, job_id
):
64 class JobHandler(GObject
):
66 This class represents an abstract JobHandler which provides common functionality
67 used by all handlers. Each job handler is created with given Job to do.
72 'job-done' : (gobject
.SIGNAL_RUN_LAST
, gobject
.TYPE_NONE
, (gobject
.TYPE_PYOBJECT
,)),
73 'task-done' : (gobject
.SIGNAL_RUN_LAST
, gobject
.TYPE_NONE
, (gobject
.TYPE_PYOBJECT
,)),
74 'task-start' : (gobject
.SIGNAL_RUN_LAST
, gobject
.TYPE_NONE
, (gobject
.TYPE_PYOBJECT
,))
77 def __init__(self
, job
):
78 gobject
.GObject
.__init
__(self
)
80 self
.task_queue
= Queue()
81 self
.result_queue
= Queue()
83 if job
.observers
!= None:
84 for observer_dict
in job
.observers
:
85 for signal
in observer_dict
.keys():
86 for callable in observer_dict
[signal
]:
87 self
.connect(signal
, callable)
94 raise NotImplementedError
97 raise NotImplementedError
99 def _notify(self
, event
, data
):
100 self
.emit(event
, data
)
102 def _post_result(self
, task
, result
):
103 self
.result_queue
.push(result
)
105 def _prepare_result(self
):
106 return self
.result_queue
108 class HandlerThread(Thread
):
109 def __init__(self
, handler
):
110 self
.handler
= handler
111 Thread
.__init
__(self
)
116 class ThreadPoolJobHandler(JobHandler
):
118 This handler uses a thread pool to efficiently process the data in
119 non-blocking manner. Useful for jobs that need to be done in the
124 def __init__(self
, job
):
125 JobHandler
.__init
__(self
, job
)
128 self
.task_threads
= []
129 self
.task_class
= None
130 self
.queue_lock
= Lock()
133 for i
in xrange(self
.pool_size
):
134 t
= self
.task_class(self
, self
.task_queue
, self
.result_queue
)
137 self
.task_threads
.append(t
)
139 error
.debug("pool -- created threads, now waiting on the queue")
141 self
.task_queue
.join()
143 error
.debug("pool -- work is finished, wait for workers")
145 for t
in self
.task_threads
:
149 error
.debug("pool -- done, sending job-done")
151 self
._notify
("job-done", self
._prepare
_result
())
154 def __init__(self
, id, data
):
159 def __init__(self
, task_info
, result
):
160 self
.task_info
= task_info
164 raise NotImplementedError
166 class TaskThread(Thread
):
167 def __init__(self
, handler
, task_queue
, result_queue
):
168 self
.handler
= handler
169 self
.task_queue
= task_queue
170 self
.result_queue
= result_queue
173 Thread
.__init
__(self
)
177 error
.debug(self
.getName() + " trying to acquire lock!")
178 self
.handler
.queue_lock
.acquire()
179 error
.debug(self
.getName() + " acquired lock!")
181 if self
.quit
or self
.task_queue
.empty():
182 self
.handler
.queue_lock
.release()
188 task
= self
.task_queue
.get_nowait()
190 error
.debug(self
.getName() + " missed the party!")
192 self
.handler
.queue_lock
.release()
193 error
.debug(self
.getName() + " released lock!")
196 error
.debug(self
.getName() + " is munching task with id = " + str(task
.id))
197 self
._task
_started
(task
)
198 result
= self
._process
(task
)
199 self
._post
_result
(task
, result
)
200 error
.debug(self
.getName() + " is posting result to task with id = " + str(task
.id))
201 self
.task_queue
.task_done()
206 def _post_result(self
, task
, result
):
207 task_result
= TaskResult(task
, result
)
208 self
.result_queue
.put_nowait(task_result
)
209 self
.handler
._notify
("task-done", task_result
)
211 def _prepare_task_info(self
, task_info
):
214 def _task_started(self
, task
):
215 self
.handler
._notify
("task-start", self
._prepare
_task
_info
(task
))
217 def _process(self
, task
):
218 raise NotImplementedError
225 _job_manager
= JobManager()
228 def register_handler(clazz
):
229 _get_instance().register_handler(clazz
)
231 def start_job(job
, wait
= False):
232 _get_instance().start_job(job
, wait
)