1 """The tasks module provides a simple light-weight alternative to threads.
3 When you have a long-running job you will want to run it in the background,
4 while the user does other things. There are four ways to do this:
6 - Use a new thread for each task.
7 - Use callbacks from an idle handler.
8 - Use a recursive mainloop.
11 Using threads causes a number of problems: they introduce race conditions,
12 often lead to many subtle bugs, and they require lots of resources (you
13 probably wouldn't want 10,000 threads running at once). In particular, two
14 threads can run at exactly the same time (perhaps on different processors), so
15 you have to be really careful that they don't both try to update the same
16 variables at the same time. This requires lots of messy locking, which is hard
19 Callbacks work within a single thread. For example, you open a dialog box and
20 then tell the system to call one function if it's closed, and another if the
21 user clicks OK, etc. The function that opened the box then returns, and the
22 system calls one of the given callback functions later. Callbacks only
23 execute one at a time, so you don't have to worry about race conditions.
24 However, they are often very awkward to program with, because you have to
25 save state somewhere and then pass it to the functions when they're called.
27 A recursive mainloop only works with nested tasks (you can create a
28 sub-task, but the main task can't continue until the sub-task has
29 finished), so they are not appropriate for long-running jobs.
31 Tasks use Python's generator API to provide a more pleasant interface to
32 callbacks. See the Task class (below) for more information.
35 # Copyright (C) 2009, Thomas Leonard
36 # See the README file for details, or visit http://0install.net.
38 from zeroinstall
import _
, support
, gobject
, logger
41 # The list of Blockers whose event has happened, in the order they were
45 def check(blockers
, reporter
= None):
46 """See if any of the blockers have pending exceptions.
47 @param reporter: invoke this function on each error
48 If reporter is None, raise the first and log the rest."""
50 if isinstance(blockers
, Blocker
):
51 blockers
= (blockers
,)
54 b
.exception_read
= True
57 reporter(*b
.exception
)
59 logger
.warn("Failure reporting error! Error was: %s", repr(b
.exception
[0]))
64 logger
.warn(_("Multiple exceptions waiting; skipping %s"), b
.exception
[0])
66 support
.raise_with_traceback(ex
[0], ex
[1])
69 """A Blocker object starts life with 'happened = False'. Tasks can
70 ask to be suspended until 'happened = True'. The value is changed
71 by a call to trigger().
75 >>> kettle_boiled = tasks.Blocker()
78 print "Add tea leaves"
80 print "Pour water into cup"
82 yield tasks.TimeoutBlocker(120, "Brewing")
85 >>> tasks.Task(make_tea())
87 Then elsewhere, later::
89 print "Kettle boiled!"
90 kettle_boiled.trigger()
92 You can also yield a list of Blockers. Your function will resume
93 after any one of them is triggered. Use blocker.happened to
94 find out which one(s). Yielding a Blocker that has already
95 happened is the same as yielding None (gives any other Tasks a
96 chance to run, and then continues).
101 def __init__(self
, name
):
102 self
.happened
= False # False until event triggered
103 self
._zero
_lib
_tasks
= set() # Tasks waiting on this blocker
106 def trigger(self
, exception
= None):
107 """The event has happened. Note that this cannot be undone;
108 instead, create a new Blocker to handle the next occurance
110 @param exception: exception to raise in waiting tasks
111 @type exception: (Exception, traceback)"""
112 if self
.happened
: return # Already triggered
114 self
.exception
= exception
115 self
.exception_read
= False
116 #assert self not in _run_queue # Slow
119 _run_queue
.append(self
)
122 assert isinstance(exception
, tuple), exception
123 if not self
._zero
_lib
_tasks
:
124 logger
.info(_("Exception from '%s', but nothing is waiting for it"), self
)
126 logger
.debug(''.join(traceback
.format_exception(type(exception
[0]), exception
[0], exception
[1])))
128 # (causes leaks by preventing blockers from being GC'd if in cycles)
130 # if self.exception and not self.exception_read:
131 # warn(_("Blocker %(blocker)s garbage collected without having it's exception read: %(exception)s"), {'blocker': self, 'exception': self.exception})
133 def add_task(self
, task
):
134 """Called by the schedular when a Task yields this
135 Blocker. If you override this method, be sure to still
136 call this method with Blocker.add_task(self)!"""
137 assert task
not in self
._zero
_lib
_tasks
, "Blocking on a single task twice: %s (%s)" % (task
, self
)
138 self
._zero
_lib
_tasks
.add(task
)
140 def remove_task(self
, task
):
141 """Called by the schedular when a Task that was waiting for
142 this blocker is resumed."""
143 self
._zero
_lib
_tasks
.remove(task
)
146 return "<Blocker:%s>" % self
151 class IdleBlocker(Blocker
):
152 """An IdleBlocker blocks until a task starts waiting on it, then
153 immediately triggers. An instance of this class is used internally
154 when a Task yields None."""
155 def add_task(self
, task
):
156 """Also calls trigger."""
157 Blocker
.add_task(self
, task
)
160 class TimeoutBlocker(Blocker
):
161 """Triggers after a set number of seconds."""
162 def __init__(self
, timeout
, name
):
163 """Trigger after 'timeout' seconds (may be a fraction)."""
164 Blocker
.__init
__(self
, name
)
165 gobject
.timeout_add(int(timeout
* 1000), self
._timeout
)
170 def _io_callback(src
, cond
, blocker
):
174 class InputBlocker(Blocker
):
175 """Triggers when os.read(stream) would not block."""
178 def __init__(self
, stream
, name
):
179 Blocker
.__init
__(self
, name
)
180 self
._stream
= stream
182 def add_task(self
, task
):
183 Blocker
.add_task(self
, task
)
184 if self
._tag
is None:
185 self
._tag
= gobject
.io_add_watch(self
._stream
, gobject
.IO_IN | gobject
.IO_HUP
,
188 def remove_task(self
, task
):
189 Blocker
.remove_task(self
, task
)
190 if not self
._zero
_lib
_tasks
:
191 gobject
.source_remove(self
._tag
)
194 class OutputBlocker(Blocker
):
195 """Triggers when os.write(stream) would not block."""
198 def __init__(self
, stream
, name
):
199 Blocker
.__init
__(self
, name
)
200 self
._stream
= stream
202 def add_task(self
, task
):
203 Blocker
.add_task(self
, task
)
204 if self
._tag
is None:
205 self
._tag
= gobject
.io_add_watch(self
._stream
, gobject
.IO_OUT | gobject
.IO_HUP
,
208 def remove_task(self
, task
):
209 Blocker
.remove_task(self
, task
)
210 if not self
._zero
_lib
_tasks
:
211 gobject
.source_remove(self
._tag
)
214 _idle_blocker
= IdleBlocker("(idle)")
217 """Create a new Task when you have some long running function to
218 run in the background, but which needs to do work in 'chunks'.
221 >>> from zeroinstall import tasks
222 >>> def my_task(start):
223 for x in range(start, start + 5):
227 >>> tasks.Task(my_task(0))
228 >>> tasks.Task(my_task(10))
231 Yielding None gives up control of the processor to another Task,
232 causing the sequence printed to be interleaved. You can also yield a
233 Blocker (or a list of Blockers) if you want to wait for some
234 particular event before resuming (see the Blocker class for details).
237 def __init__(self
, iterator
, name
):
238 """Call next(iterator) from a glib idle function. This function
239 can yield Blocker() objects to suspend processing while waiting
240 for events. name is used only for debugging."""
241 self
.iterator
= iterator
242 self
.finished
= Blocker(name
)
243 # Block new task on the idle handler...
244 _idle_blocker
.add_task(self
)
245 self
._zero
_blockers
= (_idle_blocker
,)
246 logger
.info(_("Scheduling new task: %s"), self
)
249 # Remove from our blockers' queues
250 for blocker
in self
._zero
_blockers
:
251 blocker
.remove_task(self
)
254 new_blockers
= next(self
.iterator
)
255 except StopIteration:
257 self
.finished
.trigger()
261 except (Exception, KeyboardInterrupt) as ex
:
263 logger
.info(_("Exception from '%(name)s': %(exception)s"), {'name': self
.finished
.name
, 'exception': ex
})
265 #debug(''.join(traceback.format_exception(*sys.exc_info())))
266 tb
= sys
.exc_info()[2]
267 self
.finished
.trigger(exception
= (ex
, tb
))
269 if new_blockers
is None:
270 # Just give up control briefly
271 new_blockers
= (_idle_blocker
,)
273 if isinstance(new_blockers
, Blocker
):
274 # Wrap a single yielded blocker into a list
275 new_blockers
= (new_blockers
,)
276 # Are we blocking on something that already happened?
277 for blocker
in new_blockers
:
278 assert hasattr(blocker
, 'happened'), "Not a Blocker: %s from %s" % (blocker
, self
)
280 new_blockers
= (_idle_blocker
,)
281 logger
.info(_("Task '%(task)s' waiting on ready blocker %(blocker)s!"), {'task': self
, 'blocker': blocker
})
284 logger
.info(_("Task '%(task)s' stopping and waiting for '%(new_blockers)s'"), {'task': self
, 'new_blockers': new_blockers
})
285 # Add to new blockers' queues
286 for blocker
in new_blockers
:
287 blocker
.add_task(self
)
288 self
._zero
_blockers
= new_blockers
291 return "Task(%s)" % self
.finished
.name
294 return self
.finished
.name
296 # Must append to _run_queue right after calling this!
298 assert not _run_queue
299 gobject
.idle_add(_handle_run_queue
)
301 def _handle_run_queue():
308 if next
is _idle_blocker
:
309 # Since this blocker will never run again, create a
310 # new one for future idling.
311 _idle_blocker
= IdleBlocker("(idle)")
312 elif next
._zero
_lib
_tasks
:
313 logger
.info(_("Running %(task)s due to triggering of '%(next)s'"), {'task': next
._zero
_lib
_tasks
, 'next': next
})
315 logger
.info(_("Running %s"), next
)
317 tasks
= frozenset(next
._zero
_lib
_tasks
)
331 def named_async(name
):
332 """Decorator that turns a generator function into a function that runs the
333 generator as a Task and returns the Task's finished blocker.
334 @param name: the name for the Task"""
336 def run(*args
, **kwargs
):
337 return Task(fn(*args
, **kwargs
), name
).finished
338 run
.__name
__ = fn
.__name
__
343 """Decorator that turns a generator function into a function that runs the
344 generator as a Task and returns the Task's finished blocker."""
345 def run(*args
, **kwargs
):
346 return Task(fn(*args
, **kwargs
), fn
.__name
__).finished
347 run
.__name
__ = fn
.__name
__
350 def wait_for_blocker(blocker
):
351 """Run a recursive mainloop until blocker is triggered.
352 @param blocker: event to wait on
353 @type blocker: L{Blocker}
356 assert wait_for_blocker
.loop
is None # Avoid recursion
358 if not blocker
.happened
:
361 wait_for_blocker
.loop
.quit()
362 Task(quitter(), "quitter")
364 wait_for_blocker
.loop
= gobject
.MainLoop()
366 logger
.debug(_("Entering mainloop, waiting for %s"), blocker
)
367 wait_for_blocker
.loop
.run()
369 wait_for_blocker
.loop
= None
371 assert blocker
.happened
, "Someone quit the main loop!"
374 wait_for_blocker
.loop
= None