Updated to newer Python syntax where possible
[zeroinstall/zeroinstall-afb.git] / zeroinstall / support / tasks.py
blob302b0ee82c3e3dfa7c896e7dc875498b0612069d
1 """The tasks module provides a simple light-weight alternative to threads.
3 When you have a long-running job you will want to run it in the background,
4 while the user does other things. There are four ways to do this:
6 - Use a new thread for each task.
7 - Use callbacks from an idle handler.
8 - Use a recursive mainloop.
9 - Use this module.
11 Using threads causes a number of problems. Some builds of pygtk/python don't
12 support them, they introduce race conditions, often lead to many subtle
13 bugs, and they require lots of resources (you probably wouldn't want 10,000
14 threads running at once). In particular, two threads can run at exactly the
15 same time (perhaps on different processors), so you have to be really careful
16 that they don't both try to update the same variable at the same time. This
17 requires lots of messy locking, which is hard to get right.
19 Callbacks work within a single thread. For example, you open a dialog box and
20 then tell the system to call one function if it's closed, and another if the
21 user clicks OK, etc. The function that opened the box then returns, and the
22 system calls one of the given callback functions later. Callbacks only
23 execute one at a time, so you don't have to worry about race conditions.
24 However, they are often very awkward to program with, because you have to
25 save state somewhere and then pass it to the functions when they're called.
27 A recursive mainloop only works with nested tasks (you can create a
28 sub-task, but the main task can't continue until the sub-task has
29 finished). We use these for, eg, rox.alert() boxes since you don't
30 normally want to do anything else until the box is closed, but it is not
31 appropriate for long-running jobs.
33 Tasks use python's generator API to provide a more pleasant interface to
34 callbacks. See the Task class (below) for more information.
35 """
37 # Copyright (C) 2009, Thomas Leonard
38 # See the README file for details, or visit http://0install.net.
40 from zeroinstall import _
41 import sys
42 from logging import info, warn, debug
43 import gobject
45 # The list of Blockers whose event has happened, in the order they were
46 # triggered
47 _run_queue = []
49 def check(blockers, reporter = None):
50 """See if any of the blockers have pending exceptions.
51 @param reporter: invoke this function on each error
52 If reporter is None, raise the first and log the rest."""
53 ex = None
54 if isinstance(blockers, Blocker):
55 blockers = (blockers,)
56 for b in blockers:
57 if b.exception:
58 b.exception_read = True
59 if reporter:
60 try:
61 reporter(*b.exception)
62 except:
63 warn("Failure reporting error! Error was: %s", repr(b.exception[0]))
64 raise
65 elif ex is None:
66 ex = b.exception
67 else:
68 warn(_("Multiple exceptions waiting; skipping %s"), b.exception[0])
69 if ex:
70 raise ex[0], None, ex[1]
72 class Blocker:
73 """A Blocker object starts life with 'happened = False'. Tasks can
74 ask to be suspended until 'happened = True'. The value is changed
75 by a call to trigger().
77 Example:
79 >>> kettle_boiled = tasks.Blocker()
80 >>> def make_tea():
81 print "Get cup"
82 print "Add tea leaves"
83 yield kettle_boiled
84 print "Pour water into cup"
85 print "Brew..."
86 yield tasks.TimeoutBlocker(120)
87 print "Add milk"
88 print "Ready!"
89 >>> tasks.Task(make_tea())
91 Then elsewhere, later::
93 print "Kettle boiled!"
94 kettle_boiled.trigger()
96 You can also yield a list of Blockers. Your function will resume
97 after any one of them is triggered. Use blocker.happened to
98 find out which one(s). Yielding a Blocker that has already
99 happened is the same as yielding None (gives any other Tasks a
100 chance to run, and then continues).
103 exception = None
105 def __init__(self, name):
106 self.happened = False # False until event triggered
107 self._zero_lib_tasks = set() # Tasks waiting on this blocker
108 self.name = name
110 def trigger(self, exception = None):
111 """The event has happened. Note that this cannot be undone;
112 instead, create a new Blocker to handle the next occurance
113 of the event.
114 @param exception: exception to raise in waiting tasks
115 @type exception: (Exception, traceback)"""
116 if self.happened: return # Already triggered
117 self.happened = True
118 self.exception = exception
119 self.exception_read = False
120 #assert self not in _run_queue # Slow
121 if not _run_queue:
122 _schedule()
123 _run_queue.append(self)
125 if exception:
126 assert isinstance(exception, tuple), exception
127 if not self._zero_lib_tasks:
128 info(_("Exception from '%s', but nothing is waiting for it"), self)
129 #import traceback
130 #traceback.print_exception(exception[0], None, exception[1])
132 def __del__(self):
133 if self.exception and not self.exception_read:
134 warn(_("Blocker %(blocker)s garbage collected without having it's exception read: %(exception)s"), {'blocker': self, 'exception': self.exception})
136 def add_task(self, task):
137 """Called by the schedular when a Task yields this
138 Blocker. If you override this method, be sure to still
139 call this method with Blocker.add_task(self)!"""
140 self._zero_lib_tasks.add(task)
142 def remove_task(self, task):
143 """Called by the schedular when a Task that was waiting for
144 this blocker is resumed."""
145 self._zero_lib_tasks.remove(task)
147 def __repr__(self):
148 return "<Blocker:%s>" % self
150 def __str__(self):
151 return self.name
153 class IdleBlocker(Blocker):
154 """An IdleBlocker blocks until a task starts waiting on it, then
155 immediately triggers. An instance of this class is used internally
156 when a Task yields None."""
157 def add_task(self, task):
158 """Also calls trigger."""
159 Blocker.add_task(self, task)
160 self.trigger()
162 class TimeoutBlocker(Blocker):
163 """Triggers after a set number of seconds."""
164 def __init__(self, timeout, name):
165 """Trigger after 'timeout' seconds (may be a fraction)."""
166 Blocker.__init__(self, name)
167 gobject.timeout_add(int(timeout * 1000), self._timeout)
169 def _timeout(self):
170 self.trigger()
172 def _io_callback(src, cond, blocker):
173 blocker.trigger()
174 return False
176 class InputBlocker(Blocker):
177 """Triggers when os.read(stream) would not block."""
178 _tag = None
179 _stream = None
180 def __init__(self, stream, name):
181 Blocker.__init__(self, name)
182 self._stream = stream
184 def add_task(self, task):
185 Blocker.add_task(self, task)
186 if self._tag is None:
187 self._tag = gobject.io_add_watch(self._stream, gobject.IO_IN | gobject.IO_HUP,
188 _io_callback, self)
190 def remove_task(self, task):
191 Blocker.remove_task(self, task)
192 if not self._zero_lib_tasks:
193 gobject.source_remove(self._tag)
194 self._tag = None
196 class OutputBlocker(Blocker):
197 """Triggers when os.write(stream) would not block."""
198 _tag = None
199 _stream = None
200 def __init__(self, stream, name):
201 Blocker.__init__(self, name)
202 self._stream = stream
204 def add_task(self, task):
205 Blocker.add_task(self, task)
206 if self._tag is None:
207 self._tag = gobject.io_add_watch(self._stream, gobject.IO_OUT | gobject.IO_HUP,
208 _io_callback, self)
210 def remove_task(self, task):
211 Blocker.remove_task(self, task)
212 if not self._zero_lib_tasks:
213 gobject.source_remove(self._tag)
214 self._tag = None
216 _idle_blocker = IdleBlocker("(idle)")
218 class Task:
219 """Create a new Task when you have some long running function to
220 run in the background, but which needs to do work in 'chunks'.
221 Example:
223 >>> from zeroinstall import tasks
224 >>> def my_task(start):
225 for x in range(start, start + 5):
226 print "x =", x
227 yield None
229 >>> tasks.Task(my_task(0))
230 >>> tasks.Task(my_task(10))
231 >>> mainloop()
233 Yielding None gives up control of the processor to another Task,
234 causing the sequence printed to be interleaved. You can also yield a
235 Blocker (or a list of Blockers) if you want to wait for some
236 particular event before resuming (see the Blocker class for details).
239 def __init__(self, iterator, name):
240 """Call iterator.next() from a glib idle function. This function
241 can yield Blocker() objects to suspend processing while waiting
242 for events. name is used only for debugging."""
243 assert iterator.next, "Object passed is not an iterator!"
244 self.iterator = iterator
245 self.finished = Blocker(name)
246 # Block new task on the idle handler...
247 _idle_blocker.add_task(self)
248 self._zero_blockers = (_idle_blocker,)
249 info(_("Scheduling new task: %s"), self)
251 def _resume(self):
252 # Remove from our blockers' queues
253 for blocker in self._zero_blockers:
254 blocker.remove_task(self)
255 # Resume the task
256 try:
257 new_blockers = self.iterator.next()
258 except StopIteration:
259 # Task ended
260 self.finished.trigger()
261 return
262 except SystemExit:
263 raise
264 except (Exception, KeyboardInterrupt), ex:
265 # Task crashed
266 info(_("Exception from '%(name)s': %(exception)s"), {'name': self.finished.name, 'exception': ex})
267 #import traceback
268 #traceback.print_exc()
269 tb = sys.exc_info()[2]
270 self.finished.trigger(exception = (ex, tb))
271 return
272 if new_blockers is None:
273 # Just give up control briefly
274 new_blockers = (_idle_blocker,)
275 else:
276 if isinstance(new_blockers, Blocker):
277 # Wrap a single yielded blocker into a list
278 new_blockers = (new_blockers,)
279 # Are we blocking on something that already happened?
280 for blocker in new_blockers:
281 assert hasattr(blocker, 'happened'), "Not a Blocker: %s from %s" % (blocker, self)
282 if blocker.happened:
283 new_blockers = (_idle_blocker,)
284 info(_("Task '%(task)s' waiting on ready blocker %(blocker)s!"), {'task': self, 'blocker': blocker})
285 break
286 else:
287 info(_("Task '%(task)s' stopping and waiting for '%(new_blockers)s'"), {'task': self, 'new_blockers': new_blockers})
288 # Add to new blockers' queues
289 for blocker in new_blockers:
290 blocker.add_task(self)
291 self._zero_blockers = new_blockers
293 def __repr__(self):
294 return "Task(%s)" % self.finished.name
296 def __str__(self):
297 return self.finished.name
299 # Must append to _run_queue right after calling this!
300 def _schedule():
301 assert not _run_queue
302 gobject.idle_add(_handle_run_queue)
304 def _handle_run_queue():
305 global _idle_blocker
306 assert _run_queue
308 next = _run_queue[0]
309 assert next.happened
311 if next is _idle_blocker:
312 # Since this blocker will never run again, create a
313 # new one for future idling.
314 _idle_blocker = IdleBlocker("(idle)")
315 elif next._zero_lib_tasks:
316 info(_("Running %(task)s due to triggering of '%(next)s'"), {'task': next._zero_lib_tasks, 'next': next})
317 else:
318 info(_("Running %s"), next)
320 tasks = frozenset(next._zero_lib_tasks)
321 if tasks:
322 next.noticed = True
324 for task in tasks:
325 # Run 'task'.
326 task._resume()
328 del _run_queue[0]
330 if _run_queue:
331 return True
332 return False
334 def named_async(name):
335 """Decorator that turns a generator function into a function that runs the
336 generator as a Task and returns the Task's finished blocker.
337 @param name: the name for the Task"""
338 def deco(fn):
339 def run(*args, **kwargs):
340 return Task(fn(*args, **kwargs), name).finished
341 run.__name__ = fn.__name__
342 return run
343 return deco
345 def async(fn):
346 """Decorator that turns a generator function into a function that runs the
347 generator as a Task and returns the Task's finished blocker."""
348 def run(*args, **kwargs):
349 return Task(fn(*args, **kwargs), fn.__name__).finished
350 run.__name__ = fn.__name__
351 return run
353 def wait_for_blocker(blocker):
354 """Run a recursive mainloop until blocker is triggered.
355 @param blocker: event to wait on
356 @type blocker: L{Blocker}
357 @since: 0.53
359 assert wait_for_blocker.loop is None # Avoid recursion
361 if not blocker.happened:
362 def quitter():
363 yield blocker
364 wait_for_blocker.loop.quit()
365 Task(quitter(), "quitter")
367 wait_for_blocker.loop = gobject.MainLoop(gobject.main_context_default())
368 try:
369 debug(_("Entering mainloop, waiting for %s"), blocker)
370 wait_for_blocker.loop.run()
371 finally:
372 wait_for_blocker.loop = None
374 assert blocker.happened, "Someone quit the main loop!"
376 check(blocker)
377 wait_for_blocker.loop = None