Updated epydoc comments.
[zeroinstall/zeroinstall-mseaborn.git] / zeroinstall / support / tasks.py
blob73356f0164362ea8f2cef499d4aecaab86c5cf45
1 """The tasks module provides a simple light-weight alternative to threads.
3 When you have a long-running job you will want to run it in the background,
4 while the user does other things. There are four ways to do this:
6 - Use a new thread for each task.
7 - Use callbacks from an idle handler.
8 - Use a recursive mainloop.
9 - Use this module.
11 Using threads causes a number of problems. Some builds of pygtk/python don't
12 support them, they introduce race conditions, often lead to many subtle
13 bugs, and they require lots of resources (you probably wouldn't want 10,000
14 threads running at once). In particular, two threads can run at exactly the
15 same time (perhaps on different processors), so you have to be really careful
16 that they don't both try to update the same variable at the same time. This
17 requires lots of messy locking, which is hard to get right.
19 Callbacks work within a single thread. For example, you open a dialog box and
20 then tell the system to call one function if it's closed, and another if the
21 user clicks OK, etc. The function that opened the box then returns, and the
22 system calls one of the given callback functions later. Callbacks only
23 execute one at a time, so you don't have to worry about race conditions.
24 However, they are often very awkward to program with, because you have to
25 save state somewhere and then pass it to the functions when they're called.
27 A recursive mainloop only works with nested tasks (you can create a
28 sub-task, but the main task can't continue until the sub-task has
29 finished). We use these for, eg, rox.alert() boxes since you don't
30 normally want to do anything else until the box is closed, but it is not
31 appropriate for long-running jobs.
33 Tasks use python's generator API to provide a more pleasant interface to
34 callbacks. See the Task class (below) for more information.
35 """
37 import sys
38 from logging import info, warn
39 import gobject
41 # The list of Blockers whose event has happened, in the order they were
42 # triggered
43 _run_queue = []
45 def check(blockers, reporter = None):
46 """See if any of the blockers have pending exceptions.
47 @param reporter: invoke this function on each error
48 If reporter is None, raise the first and log the rest."""
49 ex = None
50 if isinstance(blockers, Blocker):
51 blockers = (blockers,)
52 for b in blockers:
53 if b.exception:
54 b.exception_read = True
55 if reporter:
56 reporter(*b.exception)
57 elif ex is None:
58 ex = b.exception
59 else:
60 warn("Multiple exceptions waiting; skipping %s", b.exception[0])
61 if ex:
62 raise ex[0], None, ex[1]
64 class Blocker:
65 """A Blocker object starts life with 'happened = False'. Tasks can
66 ask to be suspended until 'happened = True'. The value is changed
67 by a call to trigger().
69 Example:
71 >>> kettle_boiled = tasks.Blocker()
72 >>> def make_tea():
73 print "Get cup"
74 print "Add tea leaves"
75 yield kettle_boiled
76 print "Pour water into cup"
77 print "Brew..."
78 yield tasks.TimeoutBlocker(120)
79 print "Add milk"
80 print "Ready!"
81 >>> tasks.Task(make_tea())
83 Then elsewhere, later::
85 print "Kettle boiled!"
86 kettle_boiled.trigger()
88 You can also yield a list of Blockers. Your function will resume
89 after any one of them is triggered. Use blocker.happened to
90 find out which one(s). Yielding a Blocker that has already
91 happened is the same as yielding None (gives any other Tasks a
92 chance to run, and then continues).
93 """
95 exception = None
97 def __init__(self, name):
98 self.happened = False # False until event triggered
99 self._zero_lib_tasks = set() # Tasks waiting on this blocker
100 self.name = name
102 def trigger(self, exception = None):
103 """The event has happened. Note that this cannot be undone;
104 instead, create a new Blocker to handle the next occurance
105 of the event.
106 @param exception: exception to raise in waiting tasks
107 @type exception: (Exception, traceback)"""
108 if self.happened: return # Already triggered
109 self.happened = True
110 self.exception = exception
111 self.exception_read = False
112 #assert self not in _run_queue # Slow
113 if not _run_queue:
114 _schedule()
115 _run_queue.append(self)
117 if exception:
118 assert isinstance(exception, tuple), exception
119 if not self._zero_lib_tasks:
120 info("Exception from '%s', but nothing is waiting for it", self)
121 #import traceback
122 #traceback.print_exception(exception[0], None, exception[1])
124 def __del__(self):
125 if self.exception and not self.exception_read:
126 warn("Blocker %s garbage collected without having it's exception read: %s", self, self.exception)
128 def add_task(self, task):
129 """Called by the schedular when a Task yields this
130 Blocker. If you override this method, be sure to still
131 call this method with Blocker.add_task(self)!"""
132 self._zero_lib_tasks.add(task)
134 def remove_task(self, task):
135 """Called by the schedular when a Task that was waiting for
136 this blocker is resumed."""
137 self._zero_lib_tasks.remove(task)
139 def __repr__(self):
140 return "<Blocker:%s>" % self
142 def __str__(self):
143 return self.name
145 class IdleBlocker(Blocker):
146 """An IdleBlocker blocks until a task starts waiting on it, then
147 immediately triggers. An instance of this class is used internally
148 when a Task yields None."""
149 def add_task(self, task):
150 """Also calls trigger."""
151 Blocker.add_task(self, task)
152 self.trigger()
154 class TimeoutBlocker(Blocker):
155 """Triggers after a set number of seconds."""
156 def __init__(self, timeout, name):
157 """Trigger after 'timeout' seconds (may be a fraction)."""
158 Blocker.__init__(self, name)
159 gobject.timeout_add(long(timeout * 1000), self._timeout)
161 def _timeout(self):
162 self.trigger()
164 def _io_callback(src, cond, blocker):
165 blocker.trigger()
166 return False
168 class InputBlocker(Blocker):
169 """Triggers when os.read(stream) would not block."""
170 _tag = None
171 _stream = None
172 def __init__(self, stream, name):
173 Blocker.__init__(self, name)
174 self._stream = stream
176 def add_task(self, task):
177 Blocker.add_task(self, task)
178 if self._tag is None:
179 self._tag = gobject.io_add_watch(self._stream, gobject.IO_IN | gobject.IO_HUP,
180 _io_callback, self)
182 def remove_task(self, task):
183 Blocker.remove_task(self, task)
184 if not self._zero_lib_tasks:
185 gobject.source_remove(self._tag)
186 self._tag = None
188 class OutputBlocker(Blocker):
189 """Triggers when os.write(stream) would not block."""
190 _tag = None
191 _stream = None
192 def __init__(self, stream, name):
193 Blocker.__init__(self, name)
194 self._stream = stream
196 def add_task(self, task):
197 Blocker.add_task(self, task)
198 if self._tag is None:
199 self._tag = gobject.io_add_watch(self._stream, gobject.IO_OUT | gobject.IO_HUP,
200 _io_callback, self)
202 def remove_task(self, task):
203 Blocker.remove_task(self, task)
204 if not self._zero_lib_tasks:
205 gobject.source_remove(self._tag)
206 self._tag = None
208 _idle_blocker = IdleBlocker("(idle)")
210 class Task:
211 """Create a new Task when you have some long running function to
212 run in the background, but which needs to do work in 'chunks'.
213 Example:
215 >>> from zeroinstall import tasks
216 >>> def my_task(start):
217 for x in range(start, start + 5):
218 print "x =", x
219 yield None
221 >>> tasks.Task(my_task(0))
222 >>> tasks.Task(my_task(10))
223 >>> mainloop()
225 Yielding None gives up control of the processor to another Task,
226 causing the sequence printed to be interleaved. You can also yield a
227 Blocker (or a list of Blockers) if you want to wait for some
228 particular event before resuming (see the Blocker class for details).
231 def __init__(self, iterator, name):
232 """Call iterator.next() from a glib idle function. This function
233 can yield Blocker() objects to suspend processing while waiting
234 for events. name is used only for debugging."""
235 assert iterator.next, "Object passed is not an iterator!"
236 self.iterator = iterator
237 self.finished = Blocker(name)
238 # Block new task on the idle handler...
239 _idle_blocker.add_task(self)
240 self._zero_blockers = (_idle_blocker,)
241 info("Scheduling new task: %s", self)
243 def _resume(self):
244 # Remove from our blockers' queues
245 exception = None
246 for blocker in self._zero_blockers:
247 blocker.remove_task(self)
248 # Resume the task
249 try:
250 new_blockers = self.iterator.next()
251 except StopIteration:
252 # Task ended
253 self.finished.trigger()
254 return
255 except Exception, ex:
256 # Task crashed
257 info("Exception from '%s': %s", self.finished.name, ex)
258 #import traceback
259 #traceback.print_exc()
260 tb = sys.exc_info()[2]
261 self.finished.trigger(exception = (ex, tb))
262 return
263 if new_blockers is None:
264 # Just give up control briefly
265 new_blockers = (_idle_blocker,)
266 else:
267 if isinstance(new_blockers, Blocker):
268 # Wrap a single yielded blocker into a list
269 new_blockers = (new_blockers,)
270 # Are we blocking on something that already happened?
271 for blocker in new_blockers:
272 assert hasattr(blocker, 'happened'), "Not a Blocker: %s from %s" % (blocker, self)
273 if blocker.happened:
274 new_blockers = (_idle_blocker,)
275 info("Task '%s' waiting on ready blocker %s!", self, blocker)
276 break
277 else:
278 info("Task '%s' stopping and waiting for '%s'", self, new_blockers)
279 # Add to new blockers' queues
280 for blocker in new_blockers:
281 blocker.add_task(self)
282 self._zero_blockers = new_blockers
284 def __repr__(self):
285 return "Task(%s)" % self.finished.name
287 def __str__(self):
288 return self.finished.name
290 # Must append to _run_queue right after calling this!
291 def _schedule():
292 assert not _run_queue
293 gobject.idle_add(_handle_run_queue)
295 def _handle_run_queue():
296 global _idle_blocker
297 assert _run_queue
299 next = _run_queue[0]
300 assert next.happened
302 if next is _idle_blocker:
303 # Since this blocker will never run again, create a
304 # new one for future idling.
305 _idle_blocker = IdleBlocker("(idle)")
306 elif next._zero_lib_tasks:
307 info("Running %s due to triggering of '%s'", next._zero_lib_tasks, next)
308 else:
309 info("Running %s", next)
311 tasks = frozenset(next._zero_lib_tasks)
312 if tasks:
313 next.noticed = True
315 for task in tasks:
316 # Run 'task'.
317 task._resume()
319 del _run_queue[0]
321 if _run_queue:
322 return True
323 return False
325 def named_async(name):
326 """Decorator that turns a generator function into a function that runs the
327 generator as a Task and returns the Task's finished blocker.
328 @param name: the name for the Task"""
329 def deco(fn):
330 def run(*args, **kwargs):
331 return Task(fn(*args, **kwargs), name).finished
332 run.__name__ = fn.__name__
333 return run
334 return deco
336 def async(fn):
337 """Decorator that turns a generator function into a function that runs the
338 generator as a Task and returns the Task's finished blocker."""
339 def run(*args, **kwargs):
340 return Task(fn(*args, **kwargs), fn.__name__).finished
341 run.__name__ = fn.__name__
342 return run