1 """The tasks module provides a simple light-weight alternative to threads.
3 When you have a long-running job you will want to run it in the background,
4 while the user does other things. There are four ways to do this:
6 - Use a new thread for each task.
7 - Use callbacks from an idle handler.
8 - Use a recursive mainloop.
11 Using threads causes a number of problems: they introduce race conditions,
12 often lead to many subtle bugs, and they require lots of resources (you
13 probably wouldn't want 10,000 threads running at once). In particular, two
14 threads can run at exactly the same time (perhaps on different processors), so
15 you have to be really careful that they don't both try to update the same
16 variables at the same time. This requires lots of messy locking, which is hard
19 Callbacks work within a single thread. For example, you open a dialog box and
20 then tell the system to call one function if it's closed, and another if the
21 user clicks OK, etc. The function that opened the box then returns, and the
22 system calls one of the given callback functions later. Callbacks only
23 execute one at a time, so you don't have to worry about race conditions.
24 However, they are often very awkward to program with, because you have to
25 save state somewhere and then pass it to the functions when they're called.
27 A recursive mainloop only works with nested tasks (you can create a
28 sub-task, but the main task can't continue until the sub-task has
29 finished), so they are not appropriate for long-running jobs.
31 Tasks use Python's generator API to provide a more pleasant interface to
32 callbacks. See the Task class (below) for more information.
35 # Copyright (C) 2009, Thomas Leonard
36 # See the README file for details, or visit http://0install.net.
38 from zeroinstall
import _
40 from logging
import info
, warn
, debug
43 # The list of Blockers whose event has happened, in the order they were
47 def check(blockers
, reporter
= None):
48 """See if any of the blockers have pending exceptions.
49 @param reporter: invoke this function on each error
50 If reporter is None, raise the first and log the rest."""
52 if isinstance(blockers
, Blocker
):
53 blockers
= (blockers
,)
56 b
.exception_read
= True
59 reporter(*b
.exception
)
61 warn("Failure reporting error! Error was: %s", repr(b
.exception
[0]))
66 warn(_("Multiple exceptions waiting; skipping %s"), b
.exception
[0])
68 raise ex
[0], None, ex
[1]
71 """A Blocker object starts life with 'happened = False'. Tasks can
72 ask to be suspended until 'happened = True'. The value is changed
73 by a call to trigger().
77 >>> kettle_boiled = tasks.Blocker()
80 print "Add tea leaves"
82 print "Pour water into cup"
84 yield tasks.TimeoutBlocker(120, "Brewing")
87 >>> tasks.Task(make_tea())
89 Then elsewhere, later::
91 print "Kettle boiled!"
92 kettle_boiled.trigger()
94 You can also yield a list of Blockers. Your function will resume
95 after any one of them is triggered. Use blocker.happened to
96 find out which one(s). Yielding a Blocker that has already
97 happened is the same as yielding None (gives any other Tasks a
98 chance to run, and then continues).
103 def __init__(self
, name
):
104 self
.happened
= False # False until event triggered
105 self
._zero
_lib
_tasks
= set() # Tasks waiting on this blocker
108 def trigger(self
, exception
= None):
109 """The event has happened. Note that this cannot be undone;
110 instead, create a new Blocker to handle the next occurance
112 @param exception: exception to raise in waiting tasks
113 @type exception: (Exception, traceback)"""
114 if self
.happened
: return # Already triggered
116 self
.exception
= exception
117 self
.exception_read
= False
118 #assert self not in _run_queue # Slow
121 _run_queue
.append(self
)
124 assert isinstance(exception
, tuple), exception
125 if not self
._zero
_lib
_tasks
:
126 info(_("Exception from '%s', but nothing is waiting for it"), self
)
128 debug(''.join(traceback
.format_exception(*sys
.exc_info())))
131 if self
.exception
and not self
.exception_read
:
132 warn(_("Blocker %(blocker)s garbage collected without having it's exception read: %(exception)s"), {'blocker': self
, 'exception': self
.exception
})
134 def add_task(self
, task
):
135 """Called by the schedular when a Task yields this
136 Blocker. If you override this method, be sure to still
137 call this method with Blocker.add_task(self)!"""
138 assert task
not in self
._zero
_lib
_tasks
, "Blocking on a single task twice: %s (%s)" % (task
, self
)
139 self
._zero
_lib
_tasks
.add(task
)
141 def remove_task(self
, task
):
142 """Called by the schedular when a Task that was waiting for
143 this blocker is resumed."""
144 self
._zero
_lib
_tasks
.remove(task
)
147 return "<Blocker:%s>" % self
152 class IdleBlocker(Blocker
):
153 """An IdleBlocker blocks until a task starts waiting on it, then
154 immediately triggers. An instance of this class is used internally
155 when a Task yields None."""
156 def add_task(self
, task
):
157 """Also calls trigger."""
158 Blocker
.add_task(self
, task
)
161 class TimeoutBlocker(Blocker
):
162 """Triggers after a set number of seconds."""
163 def __init__(self
, timeout
, name
):
164 """Trigger after 'timeout' seconds (may be a fraction)."""
165 Blocker
.__init
__(self
, name
)
166 gobject
.timeout_add(int(timeout
* 1000), self
._timeout
)
171 def _io_callback(src
, cond
, blocker
):
175 class InputBlocker(Blocker
):
176 """Triggers when os.read(stream) would not block."""
179 def __init__(self
, stream
, name
):
180 Blocker
.__init
__(self
, name
)
181 self
._stream
= stream
183 def add_task(self
, task
):
184 Blocker
.add_task(self
, task
)
185 if self
._tag
is None:
186 self
._tag
= gobject
.io_add_watch(self
._stream
, gobject
.IO_IN | gobject
.IO_HUP
,
189 def remove_task(self
, task
):
190 Blocker
.remove_task(self
, task
)
191 if not self
._zero
_lib
_tasks
:
192 gobject
.source_remove(self
._tag
)
195 class OutputBlocker(Blocker
):
196 """Triggers when os.write(stream) would not block."""
199 def __init__(self
, stream
, name
):
200 Blocker
.__init
__(self
, name
)
201 self
._stream
= stream
203 def add_task(self
, task
):
204 Blocker
.add_task(self
, task
)
205 if self
._tag
is None:
206 self
._tag
= gobject
.io_add_watch(self
._stream
, gobject
.IO_OUT | gobject
.IO_HUP
,
209 def remove_task(self
, task
):
210 Blocker
.remove_task(self
, task
)
211 if not self
._zero
_lib
_tasks
:
212 gobject
.source_remove(self
._tag
)
215 _idle_blocker
= IdleBlocker("(idle)")
218 """Create a new Task when you have some long running function to
219 run in the background, but which needs to do work in 'chunks'.
222 >>> from zeroinstall import tasks
223 >>> def my_task(start):
224 for x in range(start, start + 5):
228 >>> tasks.Task(my_task(0))
229 >>> tasks.Task(my_task(10))
232 Yielding None gives up control of the processor to another Task,
233 causing the sequence printed to be interleaved. You can also yield a
234 Blocker (or a list of Blockers) if you want to wait for some
235 particular event before resuming (see the Blocker class for details).
238 def __init__(self
, iterator
, name
):
239 """Call iterator.next() from a glib idle function. This function
240 can yield Blocker() objects to suspend processing while waiting
241 for events. name is used only for debugging."""
242 assert iterator
.next
, "Object passed is not an iterator!"
243 self
.iterator
= iterator
244 self
.finished
= Blocker(name
)
245 # Block new task on the idle handler...
246 _idle_blocker
.add_task(self
)
247 self
._zero
_blockers
= (_idle_blocker
,)
248 info(_("Scheduling new task: %s"), self
)
251 # Remove from our blockers' queues
252 for blocker
in self
._zero
_blockers
:
253 blocker
.remove_task(self
)
256 new_blockers
= self
.iterator
.next()
257 except StopIteration:
259 self
.finished
.trigger()
263 except (Exception, KeyboardInterrupt) as ex
:
265 info(_("Exception from '%(name)s': %(exception)s"), {'name': self
.finished
.name
, 'exception': ex
})
267 #debug(''.join(traceback.format_exception(*sys.exc_info())))
268 tb
= sys
.exc_info()[2]
269 self
.finished
.trigger(exception
= (ex
, tb
))
271 if new_blockers
is None:
272 # Just give up control briefly
273 new_blockers
= (_idle_blocker
,)
275 if isinstance(new_blockers
, Blocker
):
276 # Wrap a single yielded blocker into a list
277 new_blockers
= (new_blockers
,)
278 # Are we blocking on something that already happened?
279 for blocker
in new_blockers
:
280 assert hasattr(blocker
, 'happened'), "Not a Blocker: %s from %s" % (blocker
, self
)
282 new_blockers
= (_idle_blocker
,)
283 info(_("Task '%(task)s' waiting on ready blocker %(blocker)s!"), {'task': self
, 'blocker': blocker
})
286 info(_("Task '%(task)s' stopping and waiting for '%(new_blockers)s'"), {'task': self
, 'new_blockers': new_blockers
})
287 # Add to new blockers' queues
288 for blocker
in new_blockers
:
289 blocker
.add_task(self
)
290 self
._zero
_blockers
= new_blockers
293 return "Task(%s)" % self
.finished
.name
296 return self
.finished
.name
298 # Must append to _run_queue right after calling this!
300 assert not _run_queue
301 gobject
.idle_add(_handle_run_queue
)
303 def _handle_run_queue():
310 if next
is _idle_blocker
:
311 # Since this blocker will never run again, create a
312 # new one for future idling.
313 _idle_blocker
= IdleBlocker("(idle)")
314 elif next
._zero
_lib
_tasks
:
315 info(_("Running %(task)s due to triggering of '%(next)s'"), {'task': next
._zero
_lib
_tasks
, 'next': next
})
317 info(_("Running %s"), next
)
319 tasks
= frozenset(next
._zero
_lib
_tasks
)
333 def named_async(name
):
334 """Decorator that turns a generator function into a function that runs the
335 generator as a Task and returns the Task's finished blocker.
336 @param name: the name for the Task"""
338 def run(*args
, **kwargs
):
339 return Task(fn(*args
, **kwargs
), name
).finished
340 run
.__name
__ = fn
.__name
__
345 """Decorator that turns a generator function into a function that runs the
346 generator as a Task and returns the Task's finished blocker."""
347 def run(*args
, **kwargs
):
348 return Task(fn(*args
, **kwargs
), fn
.__name
__).finished
349 run
.__name
__ = fn
.__name
__
352 def wait_for_blocker(blocker
):
353 """Run a recursive mainloop until blocker is triggered.
354 @param blocker: event to wait on
355 @type blocker: L{Blocker}
358 assert wait_for_blocker
.loop
is None # Avoid recursion
360 if not blocker
.happened
:
363 wait_for_blocker
.loop
.quit()
364 Task(quitter(), "quitter")
366 wait_for_blocker
.loop
= gobject
.MainLoop(gobject
.main_context_default())
368 debug(_("Entering mainloop, waiting for %s"), blocker
)
369 wait_for_blocker
.loop
.run()
371 wait_for_blocker
.loop
= None
373 assert blocker
.happened
, "Someone quit the main loop!"
376 wait_for_blocker
.loop
= None