3 Provides flexible infrastructure for handling various jobs from within the
6 Very much WIP. Major TODO include:
8 - provide a way for registering observers
9 - handle emitting notification signals
10 - provide a method for getting JobInfo
11 - implement ThreadPoolJobHandler [in progress]
15 __copyright__
= "Copyright (c) 2002-2005 Free Software Foundation, Inc."
17 Straw is free software; you can redistribute it and/or modify it under the
18 terms of the GNU General Public License as published by the Free Software
19 Foundation; either version 2 of the License, or (at your option) any later
22 Straw is distributed in the hope that it will be useful, but WITHOUT ANY
23 WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
24 A PARTICULAR PURPOSE. See the GNU General Public License for more details.
26 You should have received a copy of the GNU General Public License along with
27 this program; if not, write to the Free Software Foundation, Inc., 59 Temple
28 Place - Suite 330, Boston, MA 02111-1307, USA. """
30 from Queue
import Queue
, Empty
31 from gobject
import GObject
, SIGNAL_RUN_FIRST
32 from threading
import Thread
, Lock
37 class JobManager(object):
39 Main entry point for the subsystem. JobManager is responsible managing
40 JobHandlers, starting jobs, providing information about current status of
41 a given job and notifying observers about lifecycle of a job.
48 def register_handler(self
, clazz
):
49 self
.handlers
[clazz
.job_id
] = clazz
51 def start_job(self
, job
, wait
= False):
52 t
= HandlerThread(self
.handlers
[job
.job_id
](job
))
59 def __init__(self
, job_id
):
64 class JobHandler(GObject
):
66 This class represents an abstract JobHandler which provides common functionality
67 used by all handlers. Each job handler is created with given Job to do.
72 'job-done' : (gobject
.SIGNAL_RUN_LAST
, gobject
.TYPE_NONE
, (gobject
.TYPE_PYOBJECT
,)),
73 'task-done' : (gobject
.SIGNAL_RUN_LAST
, gobject
.TYPE_NONE
, (gobject
.TYPE_PYOBJECT
,))
76 def __init__(self
, job
):
77 gobject
.GObject
.__init
__(self
)
79 self
.task_queue
= Queue()
80 self
.result_queue
= Queue()
82 if job
.observers
!= None:
83 for o
in job
.observers
:
84 for p
in job
.observers
[o
]:
92 raise NotImplementedError
95 raise NotImplementedError
97 def _notify(self
, event
, data
):
98 self
.emit(event
, data
)
100 def _post_result(self
, task
, result
):
101 self
.result_queue
.push(result
)
103 def _prepare_result(self
):
104 return self
.result_queue
106 class HandlerThread(Thread
):
107 def __init__(self
, handler
):
108 self
.handler
= handler
109 Thread
.__init
__(self
)
114 class ThreadPoolJobHandler(JobHandler
):
116 This handler uses a thread pool to efficiently process the data in
117 non-blocking manner. Useful for jobs that need to be done in the
122 def __init__(self
, job
):
123 JobHandler
.__init
__(self
, job
)
126 self
.task_threads
= []
127 self
.task_class
= None
128 self
.queue_lock
= Lock()
131 for i
in xrange(self
.pool_size
):
132 t
= self
.task_class(self
, self
.task_queue
, self
.result_queue
)
135 self
.task_threads
.append(t
)
137 error
.debug("pool -- created threads, now waiting on the queue")
139 self
.task_queue
.join()
141 error
.debug("pool -- work is finished, wait for workers")
143 for t
in self
.task_threads
:
147 error
.debug("pool -- done, sending job-done")
149 self
._notify
("job-done", self
._prepare
_result
())
152 def __init__(self
, id, data
):
157 def __init__(self
, task_info
, result
):
158 self
.task_info
= task_info
162 raise NotImplementedError
164 class TaskThread(Thread
):
165 def __init__(self
, handler
, task_queue
, result_queue
):
166 self
.handler
= handler
167 self
.task_queue
= task_queue
168 self
.result_queue
= result_queue
171 Thread
.__init
__(self
)
175 print self
.getName() + " trying to acquire lock!"
176 self
.handler
.queue_lock
.acquire()
177 print self
.getName() + " acquired lock!"
179 if self
.quit
or self
.task_queue
.empty():
180 self
.handler
.queue_lock
.release()
186 task
= self
.task_queue
.get_nowait()
188 print self
.getName() + " missed the party!"
190 self
.handler
.queue_lock
.release()
191 print self
.getName() + " released lock!"
194 print self
.getName() + " is munching task with id = " + str(task
.id)
195 result
= self
._process
(task
)
196 self
._post
_result
(task
, result
)
197 self
.task_queue
.task_done()
202 def _post_result(self
, task
, result
):
203 task_result
= TaskResult(task
, result
)
204 self
.result_queue
.put_nowait(task_result
)
205 self
.handler
._notify
("task-done", task_result
)
207 def _process(self
, task
):
208 raise NotImplementedError
215 _job_manager
= JobManager()
218 def register_handler(clazz
):
219 _get_instance().register_handler(clazz
)
221 def start_job(job
, wait
= False):
222 _get_instance().start_job(job
, wait
)