1 """The tasks module provides a simple light-weight alternative to threads.
3 When you have a long-running job you will want to run it in the background,
4 while the user does other things. There are four ways to do this:
6 - Use a new thread for each task.
7 - Use callbacks from an idle handler.
8 - Use a recursive mainloop.
11 Using threads causes a number of problems. Some builds of pygtk/python don't
12 support them, they introduce race conditions, often lead to many subtle
13 bugs, and they require lots of resources (you probably wouldn't want 10,000
14 threads running at once). In particular, two threads can run at exactly the
15 same time (perhaps on different processors), so you have to be really careful
16 that they don't both try to update the same variable at the same time. This
17 requires lots of messy locking, which is hard to get right.
19 Callbacks work within a single thread. For example, you open a dialog box and
20 then tell the system to call one function if it's closed, and another if the
21 user clicks OK, etc. The function that opened the box then returns, and the
22 system calls one of the given callback functions later. Callbacks only
23 execute one at a time, so you don't have to worry about race conditions.
24 However, they are often very awkward to program with, because you have to
25 save state somewhere and then pass it to the functions when they're called.
27 A recursive mainloop only works with nested tasks (you can create a
28 sub-task, but the main task can't continue until the sub-task has
29 finished). We use these for, eg, rox.alert() boxes since you don't
30 normally want to do anything else until the box is closed, but it is not
31 appropriate for long-running jobs.
33 Tasks use python's generator API to provide a more pleasant interface to
34 callbacks. See the Task class (below) for more information.
37 # Copyright (C) 2009, Thomas Leonard
38 # See the README file for details, or visit http://0install.net.
40 from zeroinstall
import _
42 from logging
import info
, warn
, debug
45 # The list of Blockers whose event has happened, in the order they were
49 def check(blockers
, reporter
= None):
50 """See if any of the blockers have pending exceptions.
51 @param reporter: invoke this function on each error
52 If reporter is None, raise the first and log the rest."""
54 if isinstance(blockers
, Blocker
):
55 blockers
= (blockers
,)
58 b
.exception_read
= True
61 reporter(*b
.exception
)
63 warn("Failure reporting error! Error was: %s", repr(b
.exception
[0]))
68 warn(_("Multiple exceptions waiting; skipping %s"), b
.exception
[0])
70 raise ex
[0], None, ex
[1]
73 """A Blocker object starts life with 'happened = False'. Tasks can
74 ask to be suspended until 'happened = True'. The value is changed
75 by a call to trigger().
79 >>> kettle_boiled = tasks.Blocker()
82 print "Add tea leaves"
84 print "Pour water into cup"
86 yield tasks.TimeoutBlocker(120)
89 >>> tasks.Task(make_tea())
91 Then elsewhere, later::
93 print "Kettle boiled!"
94 kettle_boiled.trigger()
96 You can also yield a list of Blockers. Your function will resume
97 after any one of them is triggered. Use blocker.happened to
98 find out which one(s). Yielding a Blocker that has already
99 happened is the same as yielding None (gives any other Tasks a
100 chance to run, and then continues).
105 def __init__(self
, name
):
106 self
.happened
= False # False until event triggered
107 self
._zero
_lib
_tasks
= set() # Tasks waiting on this blocker
110 def trigger(self
, exception
= None):
111 """The event has happened. Note that this cannot be undone;
112 instead, create a new Blocker to handle the next occurance
114 @param exception: exception to raise in waiting tasks
115 @type exception: (Exception, traceback)"""
116 if self
.happened
: return # Already triggered
118 self
.exception
= exception
119 self
.exception_read
= False
120 #assert self not in _run_queue # Slow
123 _run_queue
.append(self
)
126 assert isinstance(exception
, tuple), exception
127 if not self
._zero
_lib
_tasks
:
128 info(_("Exception from '%s', but nothing is waiting for it"), self
)
130 #traceback.print_exception(exception[0], None, exception[1])
133 if self
.exception
and not self
.exception_read
:
134 warn(_("Blocker %(blocker)s garbage collected without having it's exception read: %(exception)s"), {'blocker': self
, 'exception': self
.exception
})
136 def add_task(self
, task
):
137 """Called by the schedular when a Task yields this
138 Blocker. If you override this method, be sure to still
139 call this method with Blocker.add_task(self)!"""
140 assert task
not in self
._zero
_lib
_tasks
, "Blocking on a single task twice: %s (%s)" % (task
, self
)
141 self
._zero
_lib
_tasks
.add(task
)
143 def remove_task(self
, task
):
144 """Called by the schedular when a Task that was waiting for
145 this blocker is resumed."""
146 self
._zero
_lib
_tasks
.remove(task
)
149 return "<Blocker:%s>" % self
154 class IdleBlocker(Blocker
):
155 """An IdleBlocker blocks until a task starts waiting on it, then
156 immediately triggers. An instance of this class is used internally
157 when a Task yields None."""
158 def add_task(self
, task
):
159 """Also calls trigger."""
160 Blocker
.add_task(self
, task
)
163 class TimeoutBlocker(Blocker
):
164 """Triggers after a set number of seconds."""
165 def __init__(self
, timeout
, name
):
166 """Trigger after 'timeout' seconds (may be a fraction)."""
167 Blocker
.__init
__(self
, name
)
168 gobject
.timeout_add(int(timeout
* 1000), self
._timeout
)
173 def _io_callback(src
, cond
, blocker
):
177 class InputBlocker(Blocker
):
178 """Triggers when os.read(stream) would not block."""
181 def __init__(self
, stream
, name
):
182 Blocker
.__init
__(self
, name
)
183 self
._stream
= stream
185 def add_task(self
, task
):
186 Blocker
.add_task(self
, task
)
187 if self
._tag
is None:
188 self
._tag
= gobject
.io_add_watch(self
._stream
, gobject
.IO_IN | gobject
.IO_HUP
,
191 def remove_task(self
, task
):
192 Blocker
.remove_task(self
, task
)
193 if not self
._zero
_lib
_tasks
:
194 gobject
.source_remove(self
._tag
)
197 class OutputBlocker(Blocker
):
198 """Triggers when os.write(stream) would not block."""
201 def __init__(self
, stream
, name
):
202 Blocker
.__init
__(self
, name
)
203 self
._stream
= stream
205 def add_task(self
, task
):
206 Blocker
.add_task(self
, task
)
207 if self
._tag
is None:
208 self
._tag
= gobject
.io_add_watch(self
._stream
, gobject
.IO_OUT | gobject
.IO_HUP
,
211 def remove_task(self
, task
):
212 Blocker
.remove_task(self
, task
)
213 if not self
._zero
_lib
_tasks
:
214 gobject
.source_remove(self
._tag
)
217 _idle_blocker
= IdleBlocker("(idle)")
220 """Create a new Task when you have some long running function to
221 run in the background, but which needs to do work in 'chunks'.
224 >>> from zeroinstall import tasks
225 >>> def my_task(start):
226 for x in range(start, start + 5):
230 >>> tasks.Task(my_task(0))
231 >>> tasks.Task(my_task(10))
234 Yielding None gives up control of the processor to another Task,
235 causing the sequence printed to be interleaved. You can also yield a
236 Blocker (or a list of Blockers) if you want to wait for some
237 particular event before resuming (see the Blocker class for details).
240 def __init__(self
, iterator
, name
):
241 """Call iterator.next() from a glib idle function. This function
242 can yield Blocker() objects to suspend processing while waiting
243 for events. name is used only for debugging."""
244 assert iterator
.next
, "Object passed is not an iterator!"
245 self
.iterator
= iterator
246 self
.finished
= Blocker(name
)
247 # Block new task on the idle handler...
248 _idle_blocker
.add_task(self
)
249 self
._zero
_blockers
= (_idle_blocker
,)
250 info(_("Scheduling new task: %s"), self
)
253 # Remove from our blockers' queues
254 for blocker
in self
._zero
_blockers
:
255 blocker
.remove_task(self
)
258 new_blockers
= self
.iterator
.next()
259 except StopIteration:
261 self
.finished
.trigger()
265 except (Exception, KeyboardInterrupt), ex
:
267 info(_("Exception from '%(name)s': %(exception)s"), {'name': self
.finished
.name
, 'exception': ex
})
269 #traceback.print_exc()
270 tb
= sys
.exc_info()[2]
271 self
.finished
.trigger(exception
= (ex
, tb
))
273 if new_blockers
is None:
274 # Just give up control briefly
275 new_blockers
= (_idle_blocker
,)
277 if isinstance(new_blockers
, Blocker
):
278 # Wrap a single yielded blocker into a list
279 new_blockers
= (new_blockers
,)
280 # Are we blocking on something that already happened?
281 for blocker
in new_blockers
:
282 assert hasattr(blocker
, 'happened'), "Not a Blocker: %s from %s" % (blocker
, self
)
284 new_blockers
= (_idle_blocker
,)
285 info(_("Task '%(task)s' waiting on ready blocker %(blocker)s!"), {'task': self
, 'blocker': blocker
})
288 info(_("Task '%(task)s' stopping and waiting for '%(new_blockers)s'"), {'task': self
, 'new_blockers': new_blockers
})
289 # Add to new blockers' queues
290 for blocker
in new_blockers
:
291 blocker
.add_task(self
)
292 self
._zero
_blockers
= new_blockers
295 return "Task(%s)" % self
.finished
.name
298 return self
.finished
.name
300 # Must append to _run_queue right after calling this!
302 assert not _run_queue
303 gobject
.idle_add(_handle_run_queue
)
305 def _handle_run_queue():
312 if next
is _idle_blocker
:
313 # Since this blocker will never run again, create a
314 # new one for future idling.
315 _idle_blocker
= IdleBlocker("(idle)")
316 elif next
._zero
_lib
_tasks
:
317 info(_("Running %(task)s due to triggering of '%(next)s'"), {'task': next
._zero
_lib
_tasks
, 'next': next
})
319 info(_("Running %s"), next
)
321 tasks
= frozenset(next
._zero
_lib
_tasks
)
335 def named_async(name
):
336 """Decorator that turns a generator function into a function that runs the
337 generator as a Task and returns the Task's finished blocker.
338 @param name: the name for the Task"""
340 def run(*args
, **kwargs
):
341 return Task(fn(*args
, **kwargs
), name
).finished
342 run
.__name
__ = fn
.__name
__
347 """Decorator that turns a generator function into a function that runs the
348 generator as a Task and returns the Task's finished blocker."""
349 def run(*args
, **kwargs
):
350 return Task(fn(*args
, **kwargs
), fn
.__name
__).finished
351 run
.__name
__ = fn
.__name
__
354 def wait_for_blocker(blocker
):
355 """Run a recursive mainloop until blocker is triggered.
356 @param blocker: event to wait on
357 @type blocker: L{Blocker}
360 assert wait_for_blocker
.loop
is None # Avoid recursion
362 if not blocker
.happened
:
365 wait_for_blocker
.loop
.quit()
366 Task(quitter(), "quitter")
368 wait_for_blocker
.loop
= gobject
.MainLoop(gobject
.main_context_default())
370 debug(_("Entering mainloop, waiting for %s"), blocker
)
371 wait_for_blocker
.loop
.run()
373 wait_for_blocker
.loop
= None
375 assert blocker
.happened
, "Someone quit the main loop!"
378 wait_for_blocker
.loop
= None