Removed spurious static_path.
[smonitor.git] / monitor / cherrypy / process / wspbus.py
blob46cd75a2beb7c9ac8b6ceee5c7d4d593ee0dd2a9
1 """An implementation of the Web Site Process Bus.
3 This module is completely standalone, depending only on the stdlib.
5 Web Site Process Bus
6 --------------------
8 A Bus object is used to contain and manage site-wide behavior:
9 daemonization, HTTP server start/stop, process reload, signal handling,
10 drop privileges, PID file management, logging for all of these,
11 and many more.
13 In addition, a Bus object provides a place for each web framework
14 to register code that runs in response to site-wide events (like
15 process start and stop), or which controls or otherwise interacts with
16 the site-wide components mentioned above. For example, a framework which
17 uses file-based templates would add known template filenames to an
18 autoreload component.
20 Ideally, a Bus object will be flexible enough to be useful in a variety
21 of invocation scenarios:
23 1. The deployer starts a site from the command line via a
24 framework-neutral deployment script; applications from multiple frameworks
25 are mixed in a single site. Command-line arguments and configuration
26 files are used to define site-wide components such as the HTTP server,
27 WSGI component graph, autoreload behavior, signal handling, etc.
28 2. The deployer starts a site via some other process, such as Apache;
29 applications from multiple frameworks are mixed in a single site.
30 Autoreload and signal handling (from Python at least) are disabled.
31 3. The deployer starts a site via a framework-specific mechanism;
32 for example, when running tests, exploring tutorials, or deploying
33 single applications from a single framework. The framework controls
34 which site-wide components are enabled as it sees fit.
36 The Bus object in this package uses topic-based publish-subscribe
37 messaging to accomplish all this. A few topic channels are built in
38 ('start', 'stop', 'exit', 'graceful', 'log', and 'main'). Frameworks and
39 site containers are free to define their own. If a message is sent to a
40 channel that has not been defined or has no listeners, there is no effect.
42 In general, there should only ever be a single Bus object per process.
43 Frameworks and site containers share a single Bus object by publishing
44 messages and subscribing listeners.
46 The Bus object works as a finite state machine which models the current
47 state of the process. Bus methods move it from one state to another;
48 those methods then publish to subscribed listeners on the channel for
49 the new state.::
54 STOPPING --> STOPPED --> EXITING -> X
55 A A |
56 | \___ |
57 | \ |
58 | V V
59 STARTED <-- STARTING
61 """
63 import atexit
64 import os
65 import sys
66 import threading
67 import time
68 import traceback as _traceback
69 import warnings
71 from cherrypy._cpcompat import set
73 # Here I save the value of os.getcwd(), which, if I am imported early enough,
74 # will be the directory from which the startup script was run. This is needed
75 # by _do_execv(), to change back to the original directory before execv()ing a
76 # new process. This is a defense against the application having changed the
77 # current working directory (which could make sys.executable "not found" if
78 # sys.executable is a relative-path, and/or cause other problems).
79 _startup_cwd = os.getcwd()
81 class ChannelFailures(Exception):
82 """Exception raised when errors occur in a listener during Bus.publish()."""
83 delimiter = '\n'
85 def __init__(self, *args, **kwargs):
86 # Don't use 'super' here; Exceptions are old-style in Py2.4
87 # See http://www.cherrypy.org/ticket/959
88 Exception.__init__(self, *args, **kwargs)
89 self._exceptions = list()
91 def handle_exception(self):
92 """Append the current exception to self."""
93 self._exceptions.append(sys.exc_info())
95 def get_instances(self):
96 """Return a list of seen exception instances."""
97 return [instance for cls, instance, traceback in self._exceptions]
99 def __str__(self):
100 exception_strings = map(repr, self.get_instances())
101 return self.delimiter.join(exception_strings)
103 __repr__ = __str__
105 def __nonzero__(self):
106 return bool(self._exceptions)
108 # Use a flag to indicate the state of the bus.
109 class _StateEnum(object):
110 class State(object):
111 name = None
112 def __repr__(self):
113 return "states.%s" % self.name
115 def __setattr__(self, key, value):
116 if isinstance(value, self.State):
117 value.name = key
118 object.__setattr__(self, key, value)
119 states = _StateEnum()
120 states.STOPPED = states.State()
121 states.STARTING = states.State()
122 states.STARTED = states.State()
123 states.STOPPING = states.State()
124 states.EXITING = states.State()
127 class Bus(object):
128 """Process state-machine and messenger for HTTP site deployment.
130 All listeners for a given channel are guaranteed to be called even
131 if others at the same channel fail. Each failure is logged, but
132 execution proceeds on to the next listener. The only way to stop all
133 processing from inside a listener is to raise SystemExit and stop the
134 whole server.
137 states = states
138 state = states.STOPPED
139 execv = False
141 def __init__(self):
142 self.execv = False
143 self.state = states.STOPPED
144 self.listeners = dict(
145 [(channel, set()) for channel
146 in ('start', 'stop', 'exit', 'graceful', 'log', 'main')])
147 self._priorities = {}
149 def subscribe(self, channel, callback, priority=None):
150 """Add the given callback at the given channel (if not present)."""
151 if channel not in self.listeners:
152 self.listeners[channel] = set()
153 self.listeners[channel].add(callback)
155 if priority is None:
156 priority = getattr(callback, 'priority', 50)
157 self._priorities[(channel, callback)] = priority
159 def unsubscribe(self, channel, callback):
160 """Discard the given callback (if present)."""
161 listeners = self.listeners.get(channel)
162 if listeners and callback in listeners:
163 listeners.discard(callback)
164 del self._priorities[(channel, callback)]
166 def publish(self, channel, *args, **kwargs):
167 """Return output of all subscribers for the given channel."""
168 if channel not in self.listeners:
169 return []
171 exc = ChannelFailures()
172 output = []
174 items = [(self._priorities[(channel, listener)], listener)
175 for listener in self.listeners[channel]]
176 items.sort()
177 for priority, listener in items:
178 try:
179 output.append(listener(*args, **kwargs))
180 except KeyboardInterrupt:
181 raise
182 except SystemExit, e:
183 # If we have previous errors ensure the exit code is non-zero
184 if exc and e.code == 0:
185 e.code = 1
186 raise
187 except:
188 exc.handle_exception()
189 if channel == 'log':
190 # Assume any further messages to 'log' will fail.
191 pass
192 else:
193 self.log("Error in %r listener %r" % (channel, listener),
194 level=40, traceback=True)
195 if exc:
196 raise exc
197 return output
199 def _clean_exit(self):
200 """An atexit handler which asserts the Bus is not running."""
201 if self.state != states.EXITING:
202 warnings.warn(
203 "The main thread is exiting, but the Bus is in the %r state; "
204 "shutting it down automatically now. You must either call "
205 "bus.block() after start(), or call bus.exit() before the "
206 "main thread exits." % self.state, RuntimeWarning)
207 self.exit()
209 def start(self):
210 """Start all services."""
211 atexit.register(self._clean_exit)
213 self.state = states.STARTING
214 self.log('Bus STARTING')
215 try:
216 self.publish('start')
217 self.state = states.STARTED
218 self.log('Bus STARTED')
219 except (KeyboardInterrupt, SystemExit):
220 raise
221 except:
222 self.log("Shutting down due to error in start listener:",
223 level=40, traceback=True)
224 e_info = sys.exc_info()
225 try:
226 self.exit()
227 except:
228 # Any stop/exit errors will be logged inside publish().
229 pass
230 raise e_info[0], e_info[1], e_info[2]
232 def exit(self):
233 """Stop all services and prepare to exit the process."""
234 exitstate = self.state
235 try:
236 self.stop()
238 self.state = states.EXITING
239 self.log('Bus EXITING')
240 self.publish('exit')
241 # This isn't strictly necessary, but it's better than seeing
242 # "Waiting for child threads to terminate..." and then nothing.
243 self.log('Bus EXITED')
244 except:
245 # This method is often called asynchronously (whether thread,
246 # signal handler, console handler, or atexit handler), so we
247 # can't just let exceptions propagate out unhandled.
248 # Assume it's been logged and just die.
249 os._exit(70) # EX_SOFTWARE
251 if exitstate == states.STARTING:
252 # exit() was called before start() finished, possibly due to
253 # Ctrl-C because a start listener got stuck. In this case,
254 # we could get stuck in a loop where Ctrl-C never exits the
255 # process, so we just call os.exit here.
256 os._exit(70) # EX_SOFTWARE
258 def restart(self):
259 """Restart the process (may close connections).
261 This method does not restart the process from the calling thread;
262 instead, it stops the bus and asks the main thread to call execv.
264 self.execv = True
265 self.exit()
267 def graceful(self):
268 """Advise all services to reload."""
269 self.log('Bus graceful')
270 self.publish('graceful')
272 def block(self, interval=0.1):
273 """Wait for the EXITING state, KeyboardInterrupt or SystemExit.
275 This function is intended to be called only by the main thread.
276 After waiting for the EXITING state, it also waits for all threads
277 to terminate, and then calls os.execv if self.execv is True. This
278 design allows another thread to call bus.restart, yet have the main
279 thread perform the actual execv call (required on some platforms).
281 try:
282 self.wait(states.EXITING, interval=interval, channel='main')
283 except (KeyboardInterrupt, IOError):
284 # The time.sleep call might raise
285 # "IOError: [Errno 4] Interrupted function call" on KBInt.
286 self.log('Keyboard Interrupt: shutting down bus')
287 self.exit()
288 except SystemExit:
289 self.log('SystemExit raised: shutting down bus')
290 self.exit()
291 raise
293 # Waiting for ALL child threads to finish is necessary on OS X.
294 # See http://www.cherrypy.org/ticket/581.
295 # It's also good to let them all shut down before allowing
296 # the main thread to call atexit handlers.
297 # See http://www.cherrypy.org/ticket/751.
298 self.log("Waiting for child threads to terminate...")
299 for t in threading.enumerate():
300 if t != threading.currentThread() and t.isAlive():
301 # Note that any dummy (external) threads are always daemonic.
302 if hasattr(threading.Thread, "daemon"):
303 # Python 2.6+
304 d = t.daemon
305 else:
306 d = t.isDaemon()
307 if not d:
308 self.log("Waiting for thread %s." % t.getName())
309 t.join()
311 if self.execv:
312 self._do_execv()
314 def wait(self, state, interval=0.1, channel=None):
315 """Poll for the given state(s) at intervals; publish to channel."""
316 if isinstance(state, (tuple, list)):
317 states = state
318 else:
319 states = [state]
321 def _wait():
322 while self.state not in states:
323 time.sleep(interval)
324 self.publish(channel)
326 # From http://psyco.sourceforge.net/psycoguide/bugs.html:
327 # "The compiled machine code does not include the regular polling
328 # done by Python, meaning that a KeyboardInterrupt will not be
329 # detected before execution comes back to the regular Python
330 # interpreter. Your program cannot be interrupted if caught
331 # into an infinite Psyco-compiled loop."
332 try:
333 sys.modules['psyco'].cannotcompile(_wait)
334 except (KeyError, AttributeError):
335 pass
337 _wait()
339 def _do_execv(self):
340 """Re-execute the current process.
342 This must be called from the main thread, because certain platforms
343 (OS X) don't allow execv to be called in a child thread very well.
345 args = sys.argv[:]
346 self.log('Re-spawning %s' % ' '.join(args))
348 if sys.platform[:4] == 'java':
349 from _systemrestart import SystemRestart
350 raise SystemRestart
351 else:
352 args.insert(0, sys.executable)
353 if sys.platform == 'win32':
354 args = ['"%s"' % arg for arg in args]
356 os.chdir(_startup_cwd)
357 os.execv(sys.executable, args)
359 def stop(self):
360 """Stop all services."""
361 self.state = states.STOPPING
362 self.log('Bus STOPPING')
363 self.publish('stop')
364 self.state = states.STOPPED
365 self.log('Bus STOPPED')
367 def start_with_callback(self, func, args=None, kwargs=None):
368 """Start 'func' in a new thread T, then start self (and return T)."""
369 if args is None:
370 args = ()
371 if kwargs is None:
372 kwargs = {}
373 args = (func,) + args
375 def _callback(func, *a, **kw):
376 self.wait(states.STARTED)
377 func(*a, **kw)
378 t = threading.Thread(target=_callback, args=args, kwargs=kwargs)
379 t.setName('Bus Callback ' + t.getName())
380 t.start()
382 self.start()
384 return t
386 def log(self, msg="", level=20, traceback=False):
387 """Log the given message. Append the last traceback if requested."""
388 if traceback:
389 exc = sys.exc_info()
390 msg += "\n" + "".join(_traceback.format_exception(*exc))
391 self.publish('log', msg, level)
393 bus = Bus()