Massive refactoring for tasks system.
[zeroinstall/zeroinstall-rsl.git] / zeroinstall / support / tasks.py
blob4603ac56ca89e2e06c650b0012fd7deb0167e898
1 """The tasks module provides a simple light-weight alternative to threads.
3 When you have a long-running job you will want to run it in the background,
4 while the user does other things. There are four ways to do this:
6 - Use a new thread for each task.
7 - Use callbacks from an idle handler.
8 - Use a recursive mainloop.
9 - Use this module.
11 Using threads causes a number of problems. Some builds of pygtk/python don't
12 support them, they introduce race conditions, often lead to many subtle
13 bugs, and they require lots of resources (you probably wouldn't want 10,000
14 threads running at once). In particular, two threads can run at exactly the
15 same time (perhaps on different processors), so you have to be really careful
16 that they don't both try to update the same variable at the same time. This
17 requires lots of messy locking, which is hard to get right.
19 Callbacks work within a single thread. For example, you open a dialog box and
20 then tell the system to call one function if it's closed, and another if the
21 user clicks OK, etc. The function that opened the box then returns, and the
22 system calls one of the given callback functions later. Callbacks only
23 execute one at a time, so you don't have to worry about race conditions.
24 However, they are often very awkward to program with, because you have to
25 save state somewhere and then pass it to the functions when they're called.
27 A recursive mainloop only works with nested tasks (you can create a
28 sub-task, but the main task can't continue until the sub-task has
29 finished). We use these for, eg, rox.alert() boxes since you don't
30 normally want to do anything else until the box is closed, but it is not
31 appropriate for long-running jobs.
33 Tasks use python's generator API to provide a more pleasant interface to
34 callbacks. See the Task class (below) for more information.
35 """
37 import sys
38 from logging import info, warn
39 import gobject
41 # The list of Blockers whose event has happened, in the order they were
42 # triggered
43 _run_queue = []
45 def check(blockers):
46 """See if any of the blockers have pending exceptions.
47 Raise the first and log the rest."""
48 ex = None
49 if isinstance(blockers, Blocker):
50 blockers = (blockers,)
51 for b in blockers:
52 if b.exception:
53 b.exception_read = True
54 if ex is None:
55 ex = b.exception
56 else:
57 warn("Multiple exceptions waiting; skipping %s", b.exception[0])
58 if ex:
59 raise ex[0], None, ex[1]
61 class Blocker:
62 """A Blocker object starts life with 'happened = False'. Tasks can
63 ask to be suspended until 'happened = True'. The value is changed
64 by a call to trigger().
66 Example:
68 kettle_boiled = tasks.Blocker()
70 def make_tea():
71 print "Get cup"
72 print "Add tea leaves"
73 yield kettle_boiled
74 print "Pour water into cup"
75 print "Brew..."
76 yield tasks.TimeoutBlocker(120)
77 print "Add milk"
78 print "Ready!"
80 tasks.Task(make_tea())
82 # elsewhere, later...
83 print "Kettle boiled!"
84 kettle_boiled.trigger()
86 You can also yield a list of Blockers. Your function will resume
87 after any one of them is triggered. Use blocker.happened to
88 find out which one(s). Yielding a Blocker that has already
89 happened is the same as yielding None (gives any other Tasks a
90 chance to run, and then continues).
91 """
93 exception = None
95 def __init__(self, name):
96 self.happened = False # False until event triggered
97 self._zero_lib_tasks = set() # Tasks waiting on this blocker
98 self.name = name
100 def trigger(self, exception = None):
101 """The event has happened. Note that this cannot be undone;
102 instead, create a new Blocker to handle the next occurance
103 of the event.
104 @param exception: exception to raise in waiting tasks
105 @type exception: (Exception, traceback)"""
106 if self.happened: return # Already triggered
107 self.happened = True
108 self.exception = exception
109 self.exception_read = False
110 #assert self not in _run_queue # Slow
111 if not _run_queue:
112 _schedule()
113 _run_queue.append(self)
115 if exception:
116 assert isinstance(exception, tuple), exception
117 if not self._zero_lib_tasks:
118 info("Exception from '%s', but nothing is waiting for it", self)
119 #import traceback
120 #traceback.print_exception(exception[0], None, exception[1])
122 def check_exception():
123 if self.exception:
124 self.exception_read = True
125 raise self.exception
127 def __del__(self):
128 if self.exception and not self.exception_read:
129 warn("Blocker %s garbage collected without having it's exception read: %s", self, self.exception)
131 def add_task(self, task):
132 """Called by the schedular when a Task yields this
133 Blocker. If you override this method, be sure to still
134 call this method with Blocker.add_task(self)!"""
135 self._zero_lib_tasks.add(task)
137 def remove_task(self, task):
138 """Called by the schedular when a Task that was waiting for
139 this blocker is resumed."""
140 self._zero_lib_tasks.remove(task)
142 def __repr__(self):
143 return "<Blocker:%s>" % self
145 def __str__(self):
146 return self.name
148 class IdleBlocker(Blocker):
149 """An IdleBlocker blocks until a task starts waiting on it, then
150 immediately triggers. An instance of this class is used internally
151 when a Task yields None."""
152 def add_task(self, task):
153 """Also calls trigger."""
154 Blocker.add_task(self, task)
155 self.trigger()
157 class TimeoutBlocker(Blocker):
158 """Triggers after a set number of seconds."""
159 def __init__(self, timeout):
160 """Trigger after 'timeout' seconds (may be a fraction)."""
161 Blocker.__init__(self)
162 gobject.timeout_add(long(timeout * 1000), self._timeout)
164 def _timeout(self):
165 self.trigger()
167 def _io_callback(src, cond, blocker):
168 blocker.trigger()
169 return False
171 class InputBlocker(Blocker):
172 """Triggers when os.read(stream) would not block."""
173 _tag = None
174 _stream = None
175 def __init__(self, stream):
176 Blocker.__init__(self)
177 self._stream = stream
179 def add_task(self, task):
180 Blocker.add_task(self, task)
181 if self._tag is None:
182 self._tag = gobject.io_add_watch(self._stream, gobject.IO_IN | gobject.IO_HUP,
183 _io_callback, self)
185 def remove_task(self, task):
186 Blocker.remove_task(self, task)
187 if not self._zero_lib_tasks:
188 gobject.source_remove(self._tag)
189 self._tag = None
191 class OutputBlocker(Blocker):
192 """Triggers when os.write(stream) would not block."""
193 _tag = None
194 _stream = None
195 def __init__(self, stream):
196 Blocker.__init__(self)
197 self._stream = stream
199 def add_task(self, task):
200 Blocker.add_task(self, task)
201 if self._tag is None:
202 self._tag = gobject.io_add_watch(self._stream, gobject.IO_OUT | gobject.IO_HUP,
203 _io_callback, self)
205 def remove_task(self, task):
206 Blocker.remove_task(self, task)
207 if not self._zero_lib_tasks:
208 gobject.source_remove(self._tag)
209 self._tag = None
211 _idle_blocker = IdleBlocker("(idle)")
213 class Task:
214 """Create a new Task when you have some long running function to
215 run in the background, but which needs to do work in 'chunks'.
216 Example:
218 from zeroinstall import tasks
219 def my_task(start):
220 for x in range(start, start + 5):
221 print "x =", x
222 yield None
224 tasks.Task(my_task(0))
225 tasks.Task(my_task(10))
227 mainloop()
229 Yielding None gives up control of the processor to another Task,
230 causing the sequence printed to be interleaved. You can also yield a
231 Blocker (or a list of Blockers) if you want to wait for some
232 particular event before resuming (see the Blocker class for details).
235 def __init__(self, iterator, name):
236 """Call iterator.next() from a glib idle function. This function
237 can yield Blocker() objects to suspend processing while waiting
238 for events. name is used only for debugging."""
239 assert iterator.next, "Object passed is not an iterator!"
240 self.iterator = iterator
241 self.finished = Blocker(name)
242 # Block new task on the idle handler...
243 _idle_blocker.add_task(self)
244 self._zero_blockers = (_idle_blocker,)
245 info("Scheduling new task: %s", self)
247 def _resume(self):
248 # Remove from our blockers' queues
249 exception = None
250 for blocker in self._zero_blockers:
251 blocker.remove_task(self)
252 # Resume the task
253 try:
254 new_blockers = self.iterator.next()
255 except StopIteration:
256 # Task ended
257 self.finished.trigger()
258 return
259 except Exception, ex:
260 # Task crashed
261 info("Exception from '%s': %s", self.finished.name, ex)
262 #import traceback
263 #traceback.print_exc()
264 tb = sys.exc_info()[2]
265 self.finished.trigger(exception = (ex, tb))
266 return
267 if new_blockers is None:
268 # Just give up control briefly
269 new_blockers = (_idle_blocker,)
270 else:
271 if isinstance(new_blockers, Blocker):
272 # Wrap a single yielded blocker into a list
273 new_blockers = (new_blockers,)
274 # Are we blocking on something that already happened?
275 for blocker in new_blockers:
276 assert hasattr(blocker, 'happened'), "Not a Blocker: " + repr(blocker)
277 if blocker.happened:
278 new_blockers = (_idle_blocker,)
279 info("Task '%s' waiting on ready blocker %s!", self, blocker)
280 break
281 else:
282 info("Task '%s' stopping and waiting for '%s'", self, new_blockers)
283 # Add to new blockers' queues
284 for blocker in new_blockers:
285 blocker.add_task(self)
286 self._zero_blockers = new_blockers
288 def __repr__(self):
289 return "Task(%s)" % self.finished.name
291 def __str__(self):
292 return self.finished.name
294 # Must append to _run_queue right after calling this!
295 def _schedule():
296 assert not _run_queue
297 gobject.idle_add(_handle_run_queue)
299 def _handle_run_queue():
300 global _idle_blocker
301 assert _run_queue
303 next = _run_queue[0]
304 assert next.happened
306 if next is _idle_blocker:
307 # Since this blocker will never run again, create a
308 # new one for future idling.
309 _idle_blocker = IdleBlocker("(idle)")
310 elif next._zero_lib_tasks:
311 info("Running %s due to triggering of '%s'", next._zero_lib_tasks, next)
312 else:
313 info("Running %s", next)
315 tasks = frozenset(next._zero_lib_tasks)
316 if tasks:
317 next.noticed = True
319 for task in tasks:
320 # Run 'task'.
321 task._resume()
323 del _run_queue[0]
325 if _run_queue:
326 return True
327 return False