Modified hal to use the threadsafe scheduler.
[halbot.git] / threadsafe_sched.py
blob60850306c87231d46bc59f90ed2a45876acb8bd0
1 # This class is a modification of module "sched", from the Python
2 # standard library. The modifications are Copyright (C) 2008
3 # FunnyMan3595 (Charlie Nolan); All Rights Rexerved.
5 # The intent is to add this to the Python standard library, which
6 # may require a transfer of copyright. Until such addition occurs,
7 # special permission is granted to distribute, run, and otherwise use
8 # this module as though it were part of that library, provided that
9 # this notice is maintained.
11 """A generally useful, thread-safe event scheduler class.
13 This is *NOT* a drop-in replacement for the sched module. See
14 convert_instructions for more information.
16 Each instance of this class manages its own queue.
17 The methods of this class are thread-safe and can be called from
18 other threads while the scheduler is running. Only one scheduled
19 event will run at a time, but be careful of race conditions if other
20 threads work with the same resources.
22 Each instance is parametrized with three functions, one that is
23 supposed to return the current time, one that is supposed to
24 implement a delay, and one that should cause the delay to return
25 prematurely. If none are specified, time.time, .condition.wait, and
26 .condition.notify are used for real-time scheduling. You can also
27 implement simulated time by writing your own functions. This can
28 also be used to integrate scheduling with STDWIN events; the delay
29 function is allowed to modify the queue. Time can be expressed as
30 integers or floating point numbers, as long as it is consistent.
32 Note: the delay function should call .condition.release before
33 any real-time delay occurs, to minimize delays to other threads.
35 Events are specified by tuples (time, priority, action, argument).
36 As in UNIX, lower priority numbers mean higher priority; in this
37 way the queue can be maintained fully sorted. Execution of the
38 event means calling the action function, passing it the argument.
39 The argument must be a tuple, pack single arguments as (argument,).
40 The action function may be an instance method so it has another way
41 to reference private data (besides global variables). Parameterless
42 functions or methods can be used by omitting the argument when calling
43 enter or enterabs, which defaults it to ().
44 """
46 # TODO: Fix the description of argument passing. It's just
47 # func(*args), so I'm guessing the terminology is left over from when
48 # that had another meaning. I've patched it to be more accurate, but
49 # it's clumsy.
51 convert_instructions = """If you have been using the sched module for
52 real-time scheduling, conversion is easy. Simply change
53 sched.schedule(time.time, time.sleep) to threadsafe_sched.schedule()
54 and .run() to .run_once().
56 If you have been using sched for custom scheduling, your task is
57 harder. You need to make four changes:
59 1. The delay function should call .condition.release before any
60 real-time delay delay occurs, to minimize delays to other threads.
61 If you can convert to real-time, you should probably just call
62 .condition.wait(real_time) instead.
64 2. You must implement a wake function that causes the delay function
65 to return prematurely, as soon as is practical. If you use
66 .condition.wait in the delay function, you can leave this as the
67 default (.condition.notify). Otherwise, you may wish to consider
68 using a Condition of your own, from module "threading".
70 3. The delay function must be callable with no arguments, which
71 should cause it to wait a large amount of "time", preferably until
72 the wake function is called. If using .condition.wait, simply call
73 it with no arguments.
75 4. Change .run() to .run_once().
76 """
78 # XXX The timefunc and delayfunc should have been defined as methods
79 # XXX so you can define new kinds of schedulers using subclassing
80 # XXX instead of having to define a module or class just to hold
81 # XXX the global state of your particular time and delay functions.
83 import bisect
84 from threading import Condition
86 __all__ = ["scheduler"]
88 class scheduler:
89 def __init__(self, timefunc = None, delayfunc = None, wakefunc = None):
90 """Initialize a new instance, passing the time, delay, and
91 wake functions
93 If time, delay, or wake are not specified, time.time,
94 .condition.wait, and .condition.notify are used,
95 respectively.
97 """
98 self.queue = []
99 if timefunc:
100 self.timefunc = timefunc
101 else:
102 import time
103 self.timefunc = time.time
104 self.condition = Condition()
105 if delayfunc:
106 self.delayfunc = delayfunc
107 else:
108 self.delayfunc = self.condition.wait
109 if wakefunc:
110 self.wakefunc = wakefunc
111 else:
112 self.wakefunc = self.condition.notify
113 self.running = False
115 def enterabs(self, time, priority, action, argument = ()):
116 """Enter a new event in the queue at an absolute time.
118 Returns an ID for the event which can be used to remove it,
119 if necessary.
122 event = time, priority, action, argument
123 self.condition.acquire()
124 bisect.insort(self.queue, event)
125 if self.queue[0] == event:
126 self.wakefunc()
127 self.condition.release()
128 return event # The ID
130 def enter(self, delay, priority, action, argument = ()):
131 """A variant that specifies the time as a relative time.
133 This is actually the more commonly used interface.
136 time = self.timefunc() + delay
137 return self.enterabs(time, priority, action, argument)
139 def cancel(self, event):
140 """Remove an event from the queue.
142 This must be presented the ID as returned by enter().
143 If the event is not in the queue, this raises RuntimeError.
146 self.condition.acquire()
147 self.queue.remove(event)
148 self.condition.release()
150 def empty(self):
151 """Check whether the queue is empty.
153 Note: .empty() and anything that depends on it should be
154 surrounded by .condition.acquire() and .condition.release()
155 if multiple threads are running.
158 return len(self.queue) == 0
160 def run_once(self):
161 """Execute events until the queue is empty.
163 If the scheduler is already running, raises ValueError.
165 When there is a positive delay until the first event, the
166 delay function is called and the event is left in the queue;
167 otherwise, the event is removed from the queue and executed
168 (its action function is called, passing it the argument). If
169 the delay function returns prematurely, it is simply
170 restarted.
172 It is legal for both the delay function and the action
173 function to to modify the queue or to raise an exception;
174 exceptions are not caught but the scheduler's state remains
175 well-defined so run() may be called again.
177 A questionably hack is added to allow other threads to run:
178 just after an event is executed, a delay of 0 is executed, to
179 avoid monopolizing the CPU when other threads are also
180 runnable.
183 self.condition.acquire()
184 if self.running:
185 raise ValueError, "Scheduler is already running"
186 self.running = True
187 try:
188 while self.queue:
189 time, priority, action, argument = self.queue[0]
190 now = self.timefunc()
191 if now < time:
192 self.delayfunc(time - now)
193 else:
194 del self.queue[0]
195 self.condition.release()
196 void = action(*argument)
197 self.condition.acquire()
198 self.delayfunc(0) # Let other threads run
199 self.condition.acquire()
200 finally:
201 self.running = False
203 def run_forever(self):
204 """Execute events until the program terminates or an
205 exception is raised.
207 If the scheduler is already running, raises ValueError.
209 When the queue is empty, calls delayfunc with no parameter,
210 this should delay until wakefunc is called.
212 See .run_once for more information.
215 try:
216 while True:
217 self.run_once()
218 self.delayfunc()
219 finally:
220 self.condition.acquire()
221 self.condition.release()