Python 3: the GUI now works well enough to run programs
[zeroinstall/solver.git] / zeroinstall / support / tasks.py
blob5af5ffbb0dc4749e6f09d21e391bb6bb84cdabac
1 """The tasks module provides a simple light-weight alternative to threads.
3 When you have a long-running job you will want to run it in the background,
4 while the user does other things. There are four ways to do this:
6 - Use a new thread for each task.
7 - Use callbacks from an idle handler.
8 - Use a recursive mainloop.
9 - Use this module.
11 Using threads causes a number of problems: they introduce race conditions,
12 often lead to many subtle bugs, and they require lots of resources (you
13 probably wouldn't want 10,000 threads running at once). In particular, two
14 threads can run at exactly the same time (perhaps on different processors), so
15 you have to be really careful that they don't both try to update the same
16 variables at the same time. This requires lots of messy locking, which is hard
17 to get right.
19 Callbacks work within a single thread. For example, you open a dialog box and
20 then tell the system to call one function if it's closed, and another if the
21 user clicks OK, etc. The function that opened the box then returns, and the
22 system calls one of the given callback functions later. Callbacks only
23 execute one at a time, so you don't have to worry about race conditions.
24 However, they are often very awkward to program with, because you have to
25 save state somewhere and then pass it to the functions when they're called.
27 A recursive mainloop only works with nested tasks (you can create a
28 sub-task, but the main task can't continue until the sub-task has
29 finished), so they are not appropriate for long-running jobs.
31 Tasks use Python's generator API to provide a more pleasant interface to
32 callbacks. See the Task class (below) for more information.
33 """
35 # Copyright (C) 2009, Thomas Leonard
36 # See the README file for details, or visit http://0install.net.
38 from zeroinstall import _, support, gobject
39 import sys
40 from logging import info, warn, debug
42 # The list of Blockers whose event has happened, in the order they were
43 # triggered
44 _run_queue = []
46 def check(blockers, reporter = None):
47 """See if any of the blockers have pending exceptions.
48 @param reporter: invoke this function on each error
49 If reporter is None, raise the first and log the rest."""
50 ex = None
51 if isinstance(blockers, Blocker):
52 blockers = (blockers,)
53 for b in blockers:
54 if b.exception:
55 b.exception_read = True
56 if reporter:
57 try:
58 reporter(*b.exception)
59 except:
60 warn("Failure reporting error! Error was: %s", repr(b.exception[0]))
61 raise
62 elif ex is None:
63 ex = b.exception
64 else:
65 warn(_("Multiple exceptions waiting; skipping %s"), b.exception[0])
66 if ex:
67 support.raise_with_traceback(ex[0], ex[1])
69 class Blocker:
70 """A Blocker object starts life with 'happened = False'. Tasks can
71 ask to be suspended until 'happened = True'. The value is changed
72 by a call to trigger().
74 Example:
76 >>> kettle_boiled = tasks.Blocker()
77 >>> def make_tea():
78 print "Get cup"
79 print "Add tea leaves"
80 yield kettle_boiled
81 print "Pour water into cup"
82 print "Brew..."
83 yield tasks.TimeoutBlocker(120, "Brewing")
84 print "Add milk"
85 print "Ready!"
86 >>> tasks.Task(make_tea())
88 Then elsewhere, later::
90 print "Kettle boiled!"
91 kettle_boiled.trigger()
93 You can also yield a list of Blockers. Your function will resume
94 after any one of them is triggered. Use blocker.happened to
95 find out which one(s). Yielding a Blocker that has already
96 happened is the same as yielding None (gives any other Tasks a
97 chance to run, and then continues).
98 """
100 exception = None
102 def __init__(self, name):
103 self.happened = False # False until event triggered
104 self._zero_lib_tasks = set() # Tasks waiting on this blocker
105 self.name = name
107 def trigger(self, exception = None):
108 """The event has happened. Note that this cannot be undone;
109 instead, create a new Blocker to handle the next occurance
110 of the event.
111 @param exception: exception to raise in waiting tasks
112 @type exception: (Exception, traceback)"""
113 if self.happened: return # Already triggered
114 self.happened = True
115 self.exception = exception
116 self.exception_read = False
117 #assert self not in _run_queue # Slow
118 if not _run_queue:
119 _schedule()
120 _run_queue.append(self)
122 if exception:
123 assert isinstance(exception, tuple), exception
124 if not self._zero_lib_tasks:
125 info(_("Exception from '%s', but nothing is waiting for it"), self)
126 import traceback
127 debug(''.join(traceback.format_exception(type(exception[0]), exception[0], exception[1])))
129 # (causes leaks by preventing blockers from being GC'd if in cycles)
130 #def __del__(self):
131 # if self.exception and not self.exception_read:
132 # warn(_("Blocker %(blocker)s garbage collected without having it's exception read: %(exception)s"), {'blocker': self, 'exception': self.exception})
134 def add_task(self, task):
135 """Called by the schedular when a Task yields this
136 Blocker. If you override this method, be sure to still
137 call this method with Blocker.add_task(self)!"""
138 assert task not in self._zero_lib_tasks, "Blocking on a single task twice: %s (%s)" % (task, self)
139 self._zero_lib_tasks.add(task)
141 def remove_task(self, task):
142 """Called by the schedular when a Task that was waiting for
143 this blocker is resumed."""
144 self._zero_lib_tasks.remove(task)
146 def __repr__(self):
147 return "<Blocker:%s>" % self
149 def __str__(self):
150 return self.name
152 class IdleBlocker(Blocker):
153 """An IdleBlocker blocks until a task starts waiting on it, then
154 immediately triggers. An instance of this class is used internally
155 when a Task yields None."""
156 def add_task(self, task):
157 """Also calls trigger."""
158 Blocker.add_task(self, task)
159 self.trigger()
161 class TimeoutBlocker(Blocker):
162 """Triggers after a set number of seconds."""
163 def __init__(self, timeout, name):
164 """Trigger after 'timeout' seconds (may be a fraction)."""
165 Blocker.__init__(self, name)
166 gobject.timeout_add(int(timeout * 1000), self._timeout)
168 def _timeout(self):
169 self.trigger()
171 def _io_callback(src, cond, blocker):
172 blocker.trigger()
173 return False
175 class InputBlocker(Blocker):
176 """Triggers when os.read(stream) would not block."""
177 _tag = None
178 _stream = None
179 def __init__(self, stream, name):
180 Blocker.__init__(self, name)
181 self._stream = stream
183 def add_task(self, task):
184 Blocker.add_task(self, task)
185 if self._tag is None:
186 self._tag = gobject.io_add_watch(self._stream, gobject.IO_IN | gobject.IO_HUP,
187 _io_callback, self)
189 def remove_task(self, task):
190 Blocker.remove_task(self, task)
191 if not self._zero_lib_tasks:
192 gobject.source_remove(self._tag)
193 self._tag = None
195 class OutputBlocker(Blocker):
196 """Triggers when os.write(stream) would not block."""
197 _tag = None
198 _stream = None
199 def __init__(self, stream, name):
200 Blocker.__init__(self, name)
201 self._stream = stream
203 def add_task(self, task):
204 Blocker.add_task(self, task)
205 if self._tag is None:
206 self._tag = gobject.io_add_watch(self._stream, gobject.IO_OUT | gobject.IO_HUP,
207 _io_callback, self)
209 def remove_task(self, task):
210 Blocker.remove_task(self, task)
211 if not self._zero_lib_tasks:
212 gobject.source_remove(self._tag)
213 self._tag = None
215 _idle_blocker = IdleBlocker("(idle)")
217 class Task:
218 """Create a new Task when you have some long running function to
219 run in the background, but which needs to do work in 'chunks'.
220 Example:
222 >>> from zeroinstall import tasks
223 >>> def my_task(start):
224 for x in range(start, start + 5):
225 print "x =", x
226 yield None
228 >>> tasks.Task(my_task(0))
229 >>> tasks.Task(my_task(10))
230 >>> mainloop()
232 Yielding None gives up control of the processor to another Task,
233 causing the sequence printed to be interleaved. You can also yield a
234 Blocker (or a list of Blockers) if you want to wait for some
235 particular event before resuming (see the Blocker class for details).
238 def __init__(self, iterator, name):
239 """Call next(iterator) from a glib idle function. This function
240 can yield Blocker() objects to suspend processing while waiting
241 for events. name is used only for debugging."""
242 self.iterator = iterator
243 self.finished = Blocker(name)
244 # Block new task on the idle handler...
245 _idle_blocker.add_task(self)
246 self._zero_blockers = (_idle_blocker,)
247 info(_("Scheduling new task: %s"), self)
249 def _resume(self):
250 # Remove from our blockers' queues
251 for blocker in self._zero_blockers:
252 blocker.remove_task(self)
253 # Resume the task
254 try:
255 new_blockers = next(self.iterator)
256 except StopIteration:
257 # Task ended
258 self.finished.trigger()
259 return
260 except SystemExit:
261 raise
262 except (Exception, KeyboardInterrupt) as ex:
263 # Task crashed
264 info(_("Exception from '%(name)s': %(exception)s"), {'name': self.finished.name, 'exception': ex})
265 #import traceback
266 #debug(''.join(traceback.format_exception(*sys.exc_info())))
267 tb = sys.exc_info()[2]
268 self.finished.trigger(exception = (ex, tb))
269 return
270 if new_blockers is None:
271 # Just give up control briefly
272 new_blockers = (_idle_blocker,)
273 else:
274 if isinstance(new_blockers, Blocker):
275 # Wrap a single yielded blocker into a list
276 new_blockers = (new_blockers,)
277 # Are we blocking on something that already happened?
278 for blocker in new_blockers:
279 assert hasattr(blocker, 'happened'), "Not a Blocker: %s from %s" % (blocker, self)
280 if blocker.happened:
281 new_blockers = (_idle_blocker,)
282 info(_("Task '%(task)s' waiting on ready blocker %(blocker)s!"), {'task': self, 'blocker': blocker})
283 break
284 else:
285 info(_("Task '%(task)s' stopping and waiting for '%(new_blockers)s'"), {'task': self, 'new_blockers': new_blockers})
286 # Add to new blockers' queues
287 for blocker in new_blockers:
288 blocker.add_task(self)
289 self._zero_blockers = new_blockers
291 def __repr__(self):
292 return "Task(%s)" % self.finished.name
294 def __str__(self):
295 return self.finished.name
297 # Must append to _run_queue right after calling this!
298 def _schedule():
299 assert not _run_queue
300 gobject.idle_add(_handle_run_queue)
302 def _handle_run_queue():
303 global _idle_blocker
304 assert _run_queue
306 next = _run_queue[0]
307 assert next.happened
309 if next is _idle_blocker:
310 # Since this blocker will never run again, create a
311 # new one for future idling.
312 _idle_blocker = IdleBlocker("(idle)")
313 elif next._zero_lib_tasks:
314 info(_("Running %(task)s due to triggering of '%(next)s'"), {'task': next._zero_lib_tasks, 'next': next})
315 else:
316 info(_("Running %s"), next)
318 tasks = frozenset(next._zero_lib_tasks)
319 if tasks:
320 next.noticed = True
322 for task in tasks:
323 # Run 'task'.
324 task._resume()
326 del _run_queue[0]
328 if _run_queue:
329 return True
330 return False
332 def named_async(name):
333 """Decorator that turns a generator function into a function that runs the
334 generator as a Task and returns the Task's finished blocker.
335 @param name: the name for the Task"""
336 def deco(fn):
337 def run(*args, **kwargs):
338 return Task(fn(*args, **kwargs), name).finished
339 run.__name__ = fn.__name__
340 return run
341 return deco
343 def async(fn):
344 """Decorator that turns a generator function into a function that runs the
345 generator as a Task and returns the Task's finished blocker."""
346 def run(*args, **kwargs):
347 return Task(fn(*args, **kwargs), fn.__name__).finished
348 run.__name__ = fn.__name__
349 return run
351 def wait_for_blocker(blocker):
352 """Run a recursive mainloop until blocker is triggered.
353 @param blocker: event to wait on
354 @type blocker: L{Blocker}
355 @since: 0.53
357 assert wait_for_blocker.loop is None # Avoid recursion
359 if not blocker.happened:
360 def quitter():
361 yield blocker
362 wait_for_blocker.loop.quit()
363 Task(quitter(), "quitter")
365 wait_for_blocker.loop = gobject.MainLoop()
366 try:
367 debug(_("Entering mainloop, waiting for %s"), blocker)
368 wait_for_blocker.loop.run()
369 finally:
370 wait_for_blocker.loop = None
372 assert blocker.happened, "Someone quit the main loop!"
374 check(blocker)
375 wait_for_blocker.loop = None