1 """The tasks module provides a simple light-weight alternative to threads.
3 When you have a long-running job you will want to run it in the background,
4 while the user does other things. There are four ways to do this:
6 - Use a new thread for each task.
7 - Use callbacks from an idle handler.
8 - Use a recursive mainloop.
11 Using threads causes a number of problems. Some builds of pygtk/python don't
12 support them, they introduce race conditions, often lead to many subtle
13 bugs, and they require lots of resources (you probably wouldn't want 10,000
14 threads running at once). In particular, two threads can run at exactly the
15 same time (perhaps on different processors), so you have to be really careful
16 that they don't both try to update the same variable at the same time. This
17 requires lots of messy locking, which is hard to get right.
19 Callbacks work within a single thread. For example, you open a dialog box and
20 then tell the system to call one function if it's closed, and another if the
21 user clicks OK, etc. The function that opened the box then returns, and the
22 system calls one of the given callback functions later. Callbacks only
23 execute one at a time, so you don't have to worry about race conditions.
24 However, they are often very awkward to program with, because you have to
25 save state somewhere and then pass it to the functions when they're called.
27 A recursive mainloop only works with nested tasks (you can create a
28 sub-task, but the main task can't continue until the sub-task has
29 finished). We use these for, eg, rox.alert() boxes since you don't
30 normally want to do anything else until the box is closed, but it is not
31 appropriate for long-running jobs.
33 Tasks use python's generator API to provide a more pleasant interface to
34 callbacks. See the Task class (below) for more information.
37 from __future__
import generators
43 # The list of Blockers whose event has happened, in the order they were
48 """A Blocker object starts life with 'happened = False'. Tasks can
49 ask to be suspended until 'happened = True'. The value is changed
50 by a call to trigger().
54 kettle_boiled = tasks.Blocker()
58 print "Add tea leaves"
60 print "Pour water into cup"
62 yield tasks.TimeoutBlocker(120)
66 tasks.Task(make_tea())
69 print "Kettle boiled!"
70 kettle_boiled.trigger()
72 You can also yield a list of Blockers. Your function will resume
73 after any one of them is triggered. Use blocker.happened to
74 find out which one(s). Yielding a Blocker that has already
75 happened is the same as yielding None (gives any other Tasks a
76 chance to run, and then continues).
80 self
.happened
= False # False until event triggered
81 self
._rox
_lib
_tasks
= {} # Tasks waiting on this blocker
84 """The event has happened. Note that this cannot be undone;
85 instead, create a new Blocker to handle the next occurance
87 if self
.happened
: return # Already triggered
89 #assert self not in _run_queue # XXX: Slow
92 _run_queue
.append(self
)
94 def add_task(self
, task
):
95 """Called by the schedular when a Task yields this
96 Blocker. If you override this method, be sure to still
97 call this method with Blocker.add_task(self)!"""
98 self
._rox
_lib
_tasks
[task
] = True
100 def remove_task(self
, task
):
101 """Called by the schedular when a Task that was waiting for
102 this blocker is resumed."""
103 del self
._rox
_lib
_tasks
[task
]
105 class IdleBlocker(Blocker
):
106 """An IdleBlocker blocks until a task starts waiting on it, then
107 immediately triggers. An instance of this class is used internally
108 when a Task yields None."""
109 def add_task(self
, task
):
110 """Also calls trigger."""
111 Blocker
.add_task(self
, task
)
114 class TimeoutBlocker(Blocker
):
115 """Triggers after a set number of seconds. rox.toplevel_ref/unref
116 are called to prevent the app quitting while a TimeoutBlocker is
118 def __init__(self
, timeout
):
119 """Trigger after 'timeout' seconds (may be a fraction)."""
120 Blocker
.__init
__(self
)
122 gobject
.timeout_add(long(timeout
* 1000), self
._timeout
)
128 class InputBlocker(Blocker
):
129 """Triggers when os.read(stream) would not block."""
132 def __init__(self
, stream
):
133 Blocker
.__init
__(self
)
134 self
._stream
= stream
136 def add_task(self
, task
):
137 Blocker
.add_task(self
, task
)
138 if self
._tag
is None:
139 self
._tag
= gobject
.io_add_watch(self
._stream
, gobject
.IO_IN
,
140 lambda src
, cond
: self
.trigger())
142 def remove_task(self
, task
):
143 Blocker
.remove_task(self
, task
)
144 if not self
._rox
_lib
_tasks
:
145 gobject
.source_remove(self
._tag
)
148 class OutputBlocker(Blocker
):
149 """Triggers when os.write(stream) would not block."""
152 def __init__(self
, stream
):
153 Blocker
.__init
__(self
)
154 self
._stream
= stream
156 def add_task(self
, task
):
157 Blocker
.add_task(self
, task
)
158 if self
._tag
is None:
159 self
._tag
= gobject
.io_add_watch(self
._stream
, gobject
.IO_OUT
,
160 lambda src
, cond
: self
.trigger())
162 def remove_task(self
, task
):
163 Blocker
.remove_task(self
, task
)
164 if not self
._rox
_lib
_tasks
:
165 gobject
.source_remove(self
._tag
)
168 _idle_blocker
= IdleBlocker()
171 """Create a new Task when you have some long running function to
172 run in the background, but which needs to do work in 'chunks'.
173 Example (the first line is needed to enable the 'yield' keyword in
176 from __future__ import generators
177 from rox import tasks
179 for x in range(start, start + 5):
183 tasks.Task(my_task(0))
184 tasks.Task(my_task(10))
188 Yielding None gives up control of the processor to another Task,
189 causing the sequence printed to be interleaved. You can also yield a
190 Blocker (or a list of Blockers) if you want to wait for some
191 particular event before resuming (see the Blocker class for details).
194 def __init__(self
, iterator
, name
= None):
195 """Call iterator.next() from a glib idle function. This function
196 can yield Blocker() objects to suspend processing while waiting
197 for events. name is used only for debugging."""
198 assert iterator
.next
, "Object passed is not an iterator!"
199 self
.next
= iterator
.next
201 # Block new task on the idle handler...
202 _idle_blocker
.add_task(self
)
203 self
._rox
_blockers
= (_idle_blocker
,)
206 # Remove from our blockers' queues
207 for blocker
in self
._rox
_blockers
:
208 blocker
.remove_task(self
)
211 new_blockers
= self
.next()
212 except StopIteration:
217 rox
.report_exception()
219 if new_blockers
is None:
220 # Just give up control briefly
221 new_blockers
= (_idle_blocker
,)
223 if isinstance(new_blockers
, Blocker
):
224 # Wrap a single yielded blocker into a list
225 new_blockers
= (new_blockers
,)
226 # Are we blocking on something that already happened?
227 for blocker
in new_blockers
:
229 new_blockers
= (_idle_blocker
,)
231 # Add to new blockers' queues
232 for blocker
in new_blockers
:
233 blocker
.add_task(self
)
234 self
._rox
_blockers
= new_blockers
237 if self
.name
is None:
239 return "[Task '%s']" % self
.name
241 # Must append to _run_queue right after calling this!
243 assert not _run_queue
245 gobject
.idle_add(_handle_run_queue
)
247 def _handle_run_queue():
254 if next
is _idle_blocker
:
255 # Since this blocker will never run again, create a
256 # new one for future idling.
257 _idle_blocker
= IdleBlocker()
259 tasks
= next
._rox
_lib
_tasks
.keys()
260 #print "Resume", tasks