Caught signals that would otherwise be fatal and saveless.
[halbot.git] / threadsafe_sched.py
blobe1600fb54d562481ce06d20efff78586a9914d05
1 # This class is a modification of module "sched", from the Python
2 # standard library. The modifications are Copyright (C) 2008
3 # FunnyMan3595 (Charlie Nolan); All Rights Rexerved.
5 # The intent is to add this to the Python standard library, which
6 # may require a transfer of copyright. Until such addition occurs,
7 # special permission is granted to distribute, run, and otherwise use
8 # this module as though it were part of that library, provided that
9 # this notice is maintained.
11 """A generally useful, thread-safe event scheduler class.
13 This is *NOT* a drop-in replacement for the sched module. See
14 convert_instructions for more information.
16 Each instance of this class manages its own queue.
17 The methods of this class are thread-safe and can be called from
18 other threads while the scheduler is running. Only one scheduled
19 event will run at a time, but be careful of race conditions if other
20 threads work with the same resources.
22 Each instance is parametrized with three functions, one that is
23 supposed to return the current time, one that is supposed to
24 implement a delay, and one that should cause the delay to return
25 prematurely. If none are specified, time.time, .condition.wait, and
26 .condition.notify are used for real-time scheduling. You can also
27 implement simulated time by writing your own functions. This can
28 also be used to integrate scheduling with STDWIN events; the delay
29 function is allowed to modify the queue. Time can be expressed as
30 integers or floating point numbers, as long as it is consistent.
32 Note: the delay function should call .condition.release before
33 any real-time delay occurs, to minimize delays to other threads.
35 Events are specified by tuples (time, priority, action, argument).
36 As in UNIX, lower priority numbers mean higher priority; in this
37 way the queue can be maintained fully sorted. Execution of the
38 event means calling the action function, passing it the argument.
39 The argument must be a tuple, pack single arguments as (argument,).
40 The action function may be an instance method so it has another way
41 to reference private data (besides global variables). Parameterless
42 functions or methods can be used by omitting the argument when calling
43 enter or enterabs, which defaults it to ().
44 """
46 # TODO: Fix the description of argument passing. It's just
47 # func(*args), so I'm guessing the terminology is left over from when
48 # that had another meaning. I've patched it to be more accurate, but
49 # it's clumsy.
51 convert_instructions = """If you have been using the sched module for
52 real-time scheduling, conversion is easy. Simply change
53 sched.schedule(time.time, time.sleep) to threadsafe_sched.schedule()
54 and .run() to .run_once().
56 If you have been using sched for custom scheduling, your task is
57 harder. You need to make four changes:
59 1. The delay function should call .condition.release before any
60 real-time delay delay occurs, to minimize delays to other threads.
61 If you can convert to real-time, you should probably just call
62 .condition.wait(real_time) instead.
64 2. You must implement a wake function that causes the delay function
65 to return prematurely, as soon as is practical. If you use
66 .condition.wait in the delay function, you can leave this as the
67 default (.condition.notify). Otherwise, you may wish to consider
68 using a Condition of your own, from module "threading".
70 3. The delay function must be callable with no arguments, which
71 should cause it to wait a large amount of "time", preferably until
72 the wake function is called. If using .condition.wait, simply call
73 it with no arguments.
75 4. Change .run() to .run_once().
76 """
78 # XXX The timefunc and delayfunc should have been defined as methods
79 # XXX so you can define new kinds of schedulers using subclassing
80 # XXX instead of having to define a module or class just to hold
81 # XXX the global state of your particular time and delay functions.
83 import bisect
84 from threading import Condition
86 __all__ = ["scheduler"]
88 class scheduler:
89 def __init__(self, timefunc = None, delayfunc = None, wakefunc = None):
90 """Initialize a new instance, passing the time, delay, and
91 wake functions
93 If time, delay, or wake are not specified, time.time,
94 .condition.wait, and .condition.notify are used,
95 respectively.
97 """
98 self.queue = []
99 if timefunc:
100 self.timefunc = timefunc
101 else:
102 import time
103 self.timefunc = time.time
104 self.condition = Condition()
105 if delayfunc:
106 self.delayfunc = delayfunc
107 else:
108 self.delayfunc = self.condition.wait
109 if wakefunc:
110 self.wakefunc = wakefunc
111 else:
112 self.wakefunc = self.condition.notify
113 self.running = False
115 def enterabs(self, time, priority, action, argument = ()):
116 """Enter a new event in the queue at an absolute time.
118 Returns an ID for the event which can be used to remove it,
119 if necessary.
122 event = time, priority, action, argument
123 self.condition.acquire()
124 bisect.insort(self.queue, event)
125 if self.queue[0] == event:
126 self.wakefunc()
127 self.condition.release()
128 return event # The ID
130 def enter(self, delay, priority, action, argument = ()):
131 """A variant that specifies the time as a relative time.
133 This is actually the more commonly used interface.
136 time = self.timefunc() + delay
137 return self.enterabs(time, priority, action, argument)
139 def cancel(self, event):
140 """Remove an event from the queue.
142 This must be presented the ID as returned by enter().
143 If the event is not in the queue, this raises RuntimeError.
146 self.condition.acquire()
147 self.queue.remove(event)
148 self.condition.release()
150 def empty(self):
151 """Check whether the queue is empty.
153 Note: .empty() and anything that depends on it should be
154 surrounded by .condition.acquire() and .condition.release()
155 if multiple threads are running.
158 return len(self.queue) == 0
160 def _maybe_release(self):
161 """Internal function."""
162 # Tries to release the condition, but suppresses any error.
163 try:
164 self.condition.release()
165 except Exception:
166 pass
168 def run_once(self):
169 """Execute events until the queue is empty.
171 If the scheduler is already running, raises ValueError.
173 When there is a positive delay until the first event, the
174 delay function is called and the event is left in the queue;
175 otherwise, the event is removed from the queue and executed
176 (its action function is called, passing it the argument). If
177 the delay function returns prematurely, it is simply
178 restarted.
180 It is legal for both the delay function and the action
181 function to to modify the queue or to raise an exception;
182 exceptions are not caught but the scheduler's state remains
183 well-defined so run() may be called again.
185 A questionably hack is added to allow other threads to run:
186 just after an event is executed, a delay of 0 is executed, to
187 avoid monopolizing the CPU when other threads are also
188 runnable.
191 self.condition.acquire()
192 if self.running:
193 raise ValueError, "Scheduler is already running"
194 self.running = True
195 try:
196 while self.queue:
197 time, priority, action, argument = self.queue[0]
198 now = self.timefunc()
199 if now < time:
200 self.delayfunc(time - now)
201 else:
202 del self.queue[0]
203 self.condition.release()
204 void = action(*argument)
205 self.condition.acquire()
206 self.delayfunc(0) # Let other threads run
207 self._maybe_release()
208 self.condition.acquire()
209 finally:
210 self._maybe_release()
211 self.running = False
213 def run_forever(self):
214 """Execute events until the program terminates or an
215 exception is raised.
217 If the scheduler is already running, raises ValueError.
219 When the queue is empty, calls delayfunc with no parameter,
220 this should delay until wakefunc is called.
222 See .run_once for more information.
225 try:
226 while True:
227 self.run_once()
228 self.running = True
229 self.condition.acquire()
230 self.delayfunc()
231 self._maybe_release()
232 finally:
233 self.running = False
234 self._maybe_release()