Fixed deprecation warning with Python 2.6
[rox-lib.git] / ROX-Lib2 / python / rox / tasks.py
blob9fab0fc7e2935a95611a43b09fd55def31585a63
1 """The tasks module provides a simple light-weight alternative to threads.
3 When you have a long-running job you will want to run it in the background,
4 while the user does other things. There are four ways to do this:
6 - Use a new thread for each task.
7 - Use callbacks from an idle handler.
8 - Use a recursive mainloop.
9 - Use this module.
11 Using threads causes a number of problems. Some builds of pygtk/python don't
12 support them, they introduce race conditions, often lead to many subtle
13 bugs, and they require lots of resources (you probably wouldn't want 10,000
14 threads running at once). In particular, two threads can run at exactly the
15 same time (perhaps on different processors), so you have to be really careful
16 that they don't both try to update the same variable at the same time. This
17 requires lots of messy locking, which is hard to get right.
19 Callbacks work within a single thread. For example, you open a dialog box and
20 then tell the system to call one function if it's closed, and another if the
21 user clicks OK, etc. The function that opened the box then returns, and the
22 system calls one of the given callback functions later. Callbacks only
23 execute one at a time, so you don't have to worry about race conditions.
24 However, they are often very awkward to program with, because you have to
25 save state somewhere and then pass it to the functions when they're called.
27 A recursive mainloop only works with nested tasks (you can create a
28 sub-task, but the main task can't continue until the sub-task has
29 finished). We use these for, eg, rox.alert() boxes since you don't
30 normally want to do anything else until the box is closed, but it is not
31 appropriate for long-running jobs.
33 Tasks use python's generator API to provide a more pleasant interface to
34 callbacks. See the Task class (below) for more information.
35 """
37 import rox, gobject
38 from rox import g
39 import gobject
41 # The list of Blockers whose event has happened, in the order they were
42 # triggered
43 _run_queue = []
45 class Blocker:
46 """A Blocker object starts life with 'happened = False'. Tasks can
47 ask to be suspended until 'happened = True'. The value is changed
48 by a call to trigger().
50 Example:
52 kettle_boiled = tasks.Blocker()
54 def make_tea():
55 print "Get cup"
56 print "Add tea leaves"
57 yield kettle_boiled
58 print "Pour water into cup"
59 print "Brew..."
60 yield tasks.TimeoutBlocker(120)
61 print "Add milk"
62 print "Ready!"
64 tasks.Task(make_tea())
66 # elsewhere, later...
67 print "Kettle boiled!"
68 kettle_boiled.trigger()
70 You can also yield a list of Blockers. Your function will resume
71 after any one of them is triggered. Use blocker.happened to
72 find out which one(s). Yielding a Blocker that has already
73 happened is the same as yielding None (gives any other Tasks a
74 chance to run, and then continues).
75 """
77 def __init__(self):
78 self.happened = False # False until event triggered
79 self._rox_lib_tasks = set() # Tasks waiting on this blocker
81 def trigger(self):
82 """The event has happened. Note that this cannot be undone;
83 instead, create a new Blocker to handle the next occurance
84 of the event."""
85 if self.happened: return # Already triggered
86 self.happened = True
87 #assert self not in _run_queue # XXX: Slow
88 if not _run_queue:
89 _schedule()
90 _run_queue.append(self)
92 def add_task(self, task):
93 """Called by the schedular when a Task yields this
94 Blocker. If you override this method, be sure to still
95 call this method with Blocker.add_task(self)!"""
96 self._rox_lib_tasks.add(task)
98 def remove_task(self, task):
99 """Called by the schedular when a Task that was waiting for
100 this blocker is resumed."""
101 self._rox_lib_tasks.remove(task)
103 class IdleBlocker(Blocker):
104 """An IdleBlocker blocks until a task starts waiting on it, then
105 immediately triggers. An instance of this class is used internally
106 when a Task yields None."""
107 def add_task(self, task):
108 """Also calls trigger."""
109 Blocker.add_task(self, task)
110 self.trigger()
112 class TimeoutBlocker(Blocker):
113 """Triggers after a set number of seconds. rox.toplevel_ref/unref
114 are called to prevent the app quitting while a TimeoutBlocker is
115 running."""
116 def __init__(self, timeout):
117 """Trigger after 'timeout' seconds (may be a fraction)."""
118 Blocker.__init__(self)
119 rox.toplevel_ref()
120 gobject.timeout_add(long(timeout * 1000), self._timeout)
122 def _timeout(self):
123 rox.toplevel_unref()
124 self.trigger()
126 def _io_callback(src, cond, blocker):
127 blocker.trigger()
128 return False
130 class InputBlocker(Blocker):
131 """Triggers when os.read(stream) would not block."""
132 _tag = None
133 _stream = None
134 def __init__(self, stream):
135 Blocker.__init__(self)
136 self._stream = stream
138 def add_task(self, task):
139 Blocker.add_task(self, task)
140 if self._tag is None:
141 self._tag = gobject.io_add_watch(self._stream, gobject.IO_IN | gobject.IO_HUP,
142 _io_callback, self)
144 def remove_task(self, task):
145 Blocker.remove_task(self, task)
146 if not self._rox_lib_tasks:
147 gobject.source_remove(self._tag)
148 self._tag = None
150 class OutputBlocker(Blocker):
151 """Triggers when os.write(stream) would not block."""
152 _tag = None
153 _stream = None
154 def __init__(self, stream):
155 Blocker.__init__(self)
156 self._stream = stream
158 def add_task(self, task):
159 Blocker.add_task(self, task)
160 if self._tag is None:
161 self._tag = gobject.io_add_watch(self._stream, gobject.IO_OUT | gobject.IO_HUP,
162 _io_callback, self)
164 def remove_task(self, task):
165 Blocker.remove_task(self, task)
166 if not self._rox_lib_tasks:
167 gobject.source_remove(self._tag)
168 self._tag = None
170 _idle_blocker = IdleBlocker()
172 class Task:
173 """Create a new Task when you have some long running function to
174 run in the background, but which needs to do work in 'chunks'.
175 Example:
177 from rox import tasks
178 def my_task(start):
179 for x in range(start, start + 5):
180 print "x =", x
181 yield None
183 tasks.Task(my_task(0))
184 tasks.Task(my_task(10))
186 rox.mainloop()
188 Yielding None gives up control of the processor to another Task,
189 causing the sequence printed to be interleaved. You can also yield a
190 Blocker (or a list of Blockers) if you want to wait for some
191 particular event before resuming (see the Blocker class for details).
194 def __init__(self, iterator, name = None):
195 """Call iterator.next() from a glib idle function. This function
196 can yield Blocker() objects to suspend processing while waiting
197 for events. name is used only for debugging."""
198 assert iterator.next, "Object passed is not an iterator!"
199 self.next = iterator.next
200 self.name = name
201 self.finished = Blocker()
202 # Block new task on the idle handler...
203 _idle_blocker.add_task(self)
204 self._rox_blockers = (_idle_blocker,)
206 def _resume(self):
207 # Remove from our blockers' queues
208 for blocker in self._rox_blockers:
209 blocker.remove_task(self)
210 # Resume the task
211 try:
212 new_blockers = self.next()
213 except StopIteration:
214 # Task ended
215 self.finished.trigger()
216 return
217 except Exception:
218 # Task crashed
219 rox.report_exception()
220 self.finished.trigger()
221 return
222 if new_blockers is None:
223 # Just give up control briefly
224 new_blockers = (_idle_blocker,)
225 else:
226 if isinstance(new_blockers, Blocker):
227 # Wrap a single yielded blocker into a list
228 new_blockers = (new_blockers,)
229 # Are we blocking on something that already happened?
230 for blocker in new_blockers:
231 if blocker.happened:
232 new_blockers = (_idle_blocker,)
233 break
234 # Add to new blockers' queues
235 for blocker in new_blockers:
236 blocker.add_task(self)
237 self._rox_blockers = new_blockers
239 def __repr__(self):
240 if self.name is None:
241 return "[Task]"
242 return "[Task '%s']" % self.name
244 # Must append to _run_queue right after calling this!
245 def _schedule():
246 assert not _run_queue
247 rox.toplevel_ref()
248 gobject.idle_add(_handle_run_queue)
250 def _handle_run_queue():
251 global _idle_blocker
252 assert _run_queue
254 next = _run_queue[0]
255 assert next.happened
257 if next is _idle_blocker:
258 # Since this blocker will never run again, create a
259 # new one for future idling.
260 _idle_blocker = IdleBlocker()
262 tasks = frozenset(next._rox_lib_tasks)
263 #print "Resume", tasks
264 for task in tasks:
265 # Run 'task'.
266 task._resume()
268 del _run_queue[0]
270 if _run_queue:
271 return True
272 rox.toplevel_unref()
273 return False