Handle KeyboardInterrupt exceptions correctly
[zeroinstall.git] / zeroinstall / support / tasks.py
bloba32b13298f03e2236faf93bd6cb2bb316c389415
1 """The tasks module provides a simple light-weight alternative to threads.
3 When you have a long-running job you will want to run it in the background,
4 while the user does other things. There are four ways to do this:
6 - Use a new thread for each task.
7 - Use callbacks from an idle handler.
8 - Use a recursive mainloop.
9 - Use this module.
11 Using threads causes a number of problems. Some builds of pygtk/python don't
12 support them, they introduce race conditions, often lead to many subtle
13 bugs, and they require lots of resources (you probably wouldn't want 10,000
14 threads running at once). In particular, two threads can run at exactly the
15 same time (perhaps on different processors), so you have to be really careful
16 that they don't both try to update the same variable at the same time. This
17 requires lots of messy locking, which is hard to get right.
19 Callbacks work within a single thread. For example, you open a dialog box and
20 then tell the system to call one function if it's closed, and another if the
21 user clicks OK, etc. The function that opened the box then returns, and the
22 system calls one of the given callback functions later. Callbacks only
23 execute one at a time, so you don't have to worry about race conditions.
24 However, they are often very awkward to program with, because you have to
25 save state somewhere and then pass it to the functions when they're called.
27 A recursive mainloop only works with nested tasks (you can create a
28 sub-task, but the main task can't continue until the sub-task has
29 finished). We use these for, eg, rox.alert() boxes since you don't
30 normally want to do anything else until the box is closed, but it is not
31 appropriate for long-running jobs.
33 Tasks use python's generator API to provide a more pleasant interface to
34 callbacks. See the Task class (below) for more information.
35 """
37 # Copyright (C) 2009, Thomas Leonard
38 # See the README file for details, or visit http://0install.net.
40 from zeroinstall import _
41 import sys
42 from logging import info, warn
43 import gobject
45 # The list of Blockers whose event has happened, in the order they were
46 # triggered
47 _run_queue = []
49 def check(blockers, reporter = None):
50 """See if any of the blockers have pending exceptions.
51 @param reporter: invoke this function on each error
52 If reporter is None, raise the first and log the rest."""
53 ex = None
54 if isinstance(blockers, Blocker):
55 blockers = (blockers,)
56 for b in blockers:
57 if b.exception:
58 b.exception_read = True
59 if reporter:
60 reporter(*b.exception)
61 elif ex is None:
62 ex = b.exception
63 else:
64 warn(_("Multiple exceptions waiting; skipping %s"), b.exception[0])
65 if ex:
66 raise ex[0], None, ex[1]
68 class Blocker:
69 """A Blocker object starts life with 'happened = False'. Tasks can
70 ask to be suspended until 'happened = True'. The value is changed
71 by a call to trigger().
73 Example:
75 >>> kettle_boiled = tasks.Blocker()
76 >>> def make_tea():
77 print "Get cup"
78 print "Add tea leaves"
79 yield kettle_boiled
80 print "Pour water into cup"
81 print "Brew..."
82 yield tasks.TimeoutBlocker(120)
83 print "Add milk"
84 print "Ready!"
85 >>> tasks.Task(make_tea())
87 Then elsewhere, later::
89 print "Kettle boiled!"
90 kettle_boiled.trigger()
92 You can also yield a list of Blockers. Your function will resume
93 after any one of them is triggered. Use blocker.happened to
94 find out which one(s). Yielding a Blocker that has already
95 happened is the same as yielding None (gives any other Tasks a
96 chance to run, and then continues).
97 """
99 exception = None
101 def __init__(self, name):
102 self.happened = False # False until event triggered
103 self._zero_lib_tasks = set() # Tasks waiting on this blocker
104 self.name = name
106 def trigger(self, exception = None):
107 """The event has happened. Note that this cannot be undone;
108 instead, create a new Blocker to handle the next occurance
109 of the event.
110 @param exception: exception to raise in waiting tasks
111 @type exception: (Exception, traceback)"""
112 if self.happened: return # Already triggered
113 self.happened = True
114 self.exception = exception
115 self.exception_read = False
116 #assert self not in _run_queue # Slow
117 if not _run_queue:
118 _schedule()
119 _run_queue.append(self)
121 if exception:
122 assert isinstance(exception, tuple), exception
123 if not self._zero_lib_tasks:
124 info(_("Exception from '%s', but nothing is waiting for it"), self)
125 #import traceback
126 #traceback.print_exception(exception[0], None, exception[1])
128 def __del__(self):
129 if self.exception and not self.exception_read:
130 warn(_("Blocker %(blocker)s garbage collected without having it's exception read: %(exception)s"), {'blocker': self, 'exception': self.exception})
132 def add_task(self, task):
133 """Called by the schedular when a Task yields this
134 Blocker. If you override this method, be sure to still
135 call this method with Blocker.add_task(self)!"""
136 self._zero_lib_tasks.add(task)
138 def remove_task(self, task):
139 """Called by the schedular when a Task that was waiting for
140 this blocker is resumed."""
141 self._zero_lib_tasks.remove(task)
143 def __repr__(self):
144 return "<Blocker:%s>" % self
146 def __str__(self):
147 return self.name
149 class IdleBlocker(Blocker):
150 """An IdleBlocker blocks until a task starts waiting on it, then
151 immediately triggers. An instance of this class is used internally
152 when a Task yields None."""
153 def add_task(self, task):
154 """Also calls trigger."""
155 Blocker.add_task(self, task)
156 self.trigger()
158 class TimeoutBlocker(Blocker):
159 """Triggers after a set number of seconds."""
160 def __init__(self, timeout, name):
161 """Trigger after 'timeout' seconds (may be a fraction)."""
162 Blocker.__init__(self, name)
163 gobject.timeout_add(long(timeout * 1000), self._timeout)
165 def _timeout(self):
166 self.trigger()
168 def _io_callback(src, cond, blocker):
169 blocker.trigger()
170 return False
172 class InputBlocker(Blocker):
173 """Triggers when os.read(stream) would not block."""
174 _tag = None
175 _stream = None
176 def __init__(self, stream, name):
177 Blocker.__init__(self, name)
178 self._stream = stream
180 def add_task(self, task):
181 Blocker.add_task(self, task)
182 if self._tag is None:
183 self._tag = gobject.io_add_watch(self._stream, gobject.IO_IN | gobject.IO_HUP,
184 _io_callback, self)
186 def remove_task(self, task):
187 Blocker.remove_task(self, task)
188 if not self._zero_lib_tasks:
189 gobject.source_remove(self._tag)
190 self._tag = None
192 class OutputBlocker(Blocker):
193 """Triggers when os.write(stream) would not block."""
194 _tag = None
195 _stream = None
196 def __init__(self, stream, name):
197 Blocker.__init__(self, name)
198 self._stream = stream
200 def add_task(self, task):
201 Blocker.add_task(self, task)
202 if self._tag is None:
203 self._tag = gobject.io_add_watch(self._stream, gobject.IO_OUT | gobject.IO_HUP,
204 _io_callback, self)
206 def remove_task(self, task):
207 Blocker.remove_task(self, task)
208 if not self._zero_lib_tasks:
209 gobject.source_remove(self._tag)
210 self._tag = None
212 _idle_blocker = IdleBlocker("(idle)")
214 class Task:
215 """Create a new Task when you have some long running function to
216 run in the background, but which needs to do work in 'chunks'.
217 Example:
219 >>> from zeroinstall import tasks
220 >>> def my_task(start):
221 for x in range(start, start + 5):
222 print "x =", x
223 yield None
225 >>> tasks.Task(my_task(0))
226 >>> tasks.Task(my_task(10))
227 >>> mainloop()
229 Yielding None gives up control of the processor to another Task,
230 causing the sequence printed to be interleaved. You can also yield a
231 Blocker (or a list of Blockers) if you want to wait for some
232 particular event before resuming (see the Blocker class for details).
235 def __init__(self, iterator, name):
236 """Call iterator.next() from a glib idle function. This function
237 can yield Blocker() objects to suspend processing while waiting
238 for events. name is used only for debugging."""
239 assert iterator.next, "Object passed is not an iterator!"
240 self.iterator = iterator
241 self.finished = Blocker(name)
242 # Block new task on the idle handler...
243 _idle_blocker.add_task(self)
244 self._zero_blockers = (_idle_blocker,)
245 info(_("Scheduling new task: %s"), self)
247 def _resume(self):
248 # Remove from our blockers' queues
249 exception = None
250 for blocker in self._zero_blockers:
251 blocker.remove_task(self)
252 # Resume the task
253 try:
254 new_blockers = self.iterator.next()
255 except StopIteration:
256 # Task ended
257 self.finished.trigger()
258 return
259 except SystemExit:
260 raise
261 except (Exception, KeyboardInterrupt), ex:
262 # Task crashed
263 info(_("Exception from '%(name)s': %(exception)s"), {'name': self.finished.name, 'exception': ex})
264 #import traceback
265 #traceback.print_exc()
266 tb = sys.exc_info()[2]
267 self.finished.trigger(exception = (ex, tb))
268 return
269 if new_blockers is None:
270 # Just give up control briefly
271 new_blockers = (_idle_blocker,)
272 else:
273 if isinstance(new_blockers, Blocker):
274 # Wrap a single yielded blocker into a list
275 new_blockers = (new_blockers,)
276 # Are we blocking on something that already happened?
277 for blocker in new_blockers:
278 assert hasattr(blocker, 'happened'), "Not a Blocker: %s from %s" % (blocker, self)
279 if blocker.happened:
280 new_blockers = (_idle_blocker,)
281 info(_("Task '%(task)s' waiting on ready blocker %(blocker)s!"), {'task': self, 'blocker': blocker})
282 break
283 else:
284 info(_("Task '%(task)s' stopping and waiting for '%(new_blockers)s'"), {'task': self, 'new_blockers': new_blockers})
285 # Add to new blockers' queues
286 for blocker in new_blockers:
287 blocker.add_task(self)
288 self._zero_blockers = new_blockers
290 def __repr__(self):
291 return "Task(%s)" % self.finished.name
293 def __str__(self):
294 return self.finished.name
296 # Must append to _run_queue right after calling this!
297 def _schedule():
298 assert not _run_queue
299 gobject.idle_add(_handle_run_queue)
301 def _handle_run_queue():
302 global _idle_blocker
303 assert _run_queue
305 next = _run_queue[0]
306 assert next.happened
308 if next is _idle_blocker:
309 # Since this blocker will never run again, create a
310 # new one for future idling.
311 _idle_blocker = IdleBlocker("(idle)")
312 elif next._zero_lib_tasks:
313 info(_("Running %(task)s due to triggering of '%(next)s'"), {'task': next._zero_lib_tasks, 'next': next})
314 else:
315 info(_("Running %s"), next)
317 tasks = frozenset(next._zero_lib_tasks)
318 if tasks:
319 next.noticed = True
321 for task in tasks:
322 # Run 'task'.
323 task._resume()
325 del _run_queue[0]
327 if _run_queue:
328 return True
329 return False
331 def named_async(name):
332 """Decorator that turns a generator function into a function that runs the
333 generator as a Task and returns the Task's finished blocker.
334 @param name: the name for the Task"""
335 def deco(fn):
336 def run(*args, **kwargs):
337 return Task(fn(*args, **kwargs), name).finished
338 run.__name__ = fn.__name__
339 return run
340 return deco
342 def async(fn):
343 """Decorator that turns a generator function into a function that runs the
344 generator as a Task and returns the Task's finished blocker."""
345 def run(*args, **kwargs):
346 return Task(fn(*args, **kwargs), fn.__name__).finished
347 run.__name__ = fn.__name__
348 return run