added 2 missing template files
[limo.git] / server.py
blob1678abfdd2f22a69e82e1f493deb4e0b35c66018
2 import sys, os, re, datetime, compress, cStringIO, traceback, cgitb, Queue
3 from multiprocessing import Pool, Pipe, Lock, Process, cpu_count
4 from session import *
5 # from fcgi import WSGIServer
6 from fcgi import FastCGIServer
7 from fcgi.fcgiutil import flatten, GeneratorException
8 from template import Template, TemplateExecutionError, ellipsis, pad
9 from limoutil import *
10 import settings
11 Settings = settings.Settings
12 __all__ = ["LimoServer",]
14 class Cookies(dict):
15 def __init__(self, s):
16 if len(s) > 1:
17 for t in s.split(';'):
18 p = t.split('=')
19 if len(p) == 2:
20 (k,v) = p
21 k = k.strip()
22 v = v.strip()
23 if self.get(k,None).value is None:
24 self[k] = v
25 else:
26 log("Duplicate cookie value: %s %s %s" % (k,v,self[k]))
27 raise DeleteCookie(k)
28 else:
29 raise Exception("len(t) did not == 2? %d %s from t: %s from s: %s" % (len(p), p, t, s))
30 def __str__(self):
31 return ';'.join(['%s=%s' % (k,v) for k,v in self.items()])
33 # wrapped version of the dict accessors
34 def get(self,k,default=None):
35 try:
36 return self.__getitem__(k)
37 except KeyError:
38 return Cookie(name=k,value=default)
39 def __getitem__(self,k):
40 return dict.__getitem__(self,k)
41 def __setitem__(self,k,v):
42 if type(v) in (str,unicode):
43 dict.__setitem__(self,k,Cookie(name=k,value=v))
44 elif v.__class__.__name__ is 'Cookie':
45 dict.__setitem__(self,k,v)
46 else:
47 raise TypeError("Expected string or Cookie, got %s" % v.__class__.__name__)
48 def __delitem__(self,k):
49 dict.__delitem__(self,k)
51 class Cookie:
52 def __init__(self,name,value,expires=None,path=None,domain=None):
53 self.name = name
54 self.value = value
55 self.expires = expires
56 self.max_age = -1
57 self.path = path
58 self.domain = domain
59 def __str__(self):
60 cookie = "%s=%s" % (self.name,self.value)
61 expires = "; Expires=%s" % self.expires.strftime("%a, %d %b %Y %H:%M:%S GMT")\
62 if type(self.expires) is datetime.datetime else ""
63 path = "; Path=%s" % self.path if self.path is not None else ""
64 domain = "; Domain=%s" % self.domain if self.domain is not None else ""
65 max_age = "; Max-Age=%d" % self.max_age if self.max_age > -1 else ""
66 cookie = cookie\
67 + expires\
68 + path\
69 + domain\
70 + max_age
71 return cookie
73 class Response():
74 def __init__(self):
75 self.status = '200 OK'
76 self.cookies = Cookies('')
77 self.headers = {}
78 self.headers['Content-Type'] = 'text/html'
80 def getheaders(self):
81 ret = [('Status',self.status),]
82 for k,v in self.headers.items():
83 ret.append((k,v))
84 for k,v in self.cookies.items():
85 ret.append(('Set-Cookie',str(v)))
86 return ret
88 def headerstring(self):
89 return '\r\n'.join(["%s: %s" %(k,v) for k,v in self.getheaders()])+'\r\n\r\n'
91 def __str__(self):
92 return self.headerstring()
94 class Request(dict):
95 def __init__(self, environ, stdin=None):
96 for k,v in environ.items():
97 if k == "wsgi.input":
98 # add POST_DATA if we find an input stream to read
99 self["POST_DATA"] = ''.join(v.readlines())
100 if not k.startswith('wsgi.'):
101 self[k] = v
102 if stdin is not None:
103 self["POST_DATA"] = ''.join(stdin.readlines())
104 # add QUERY_STRING and REQUEST_URI if missing
105 if self.get("QUERY_STRING",None) is None:
106 self["QUERY_STRING"] = ""
107 if self.get('REQUEST_URI',None) is None:
108 q = ('?'+self["QUERY_STRING"]) if len(self["QUERY_STRING"]) > 0 else ""
109 self['REQUEST_URI'] = self['SCRIPT_NAME']+q
110 environ['REQUEST_URI'] = self['REQUEST_URI'] # write this back, in case we need it later during an error redirect out of here
111 # normalize script_name
112 i = self['REQUEST_URI'].find('?')
113 if i > -1:
114 self['SCRIPT_NAME'] = self['REQUEST_URI'][:i]
115 else:
116 self['SCRIPT_NAME'] = self['REQUEST_URI']
117 # IIS puts all sorts of crap into the environment, which we should remove
118 self._discarded = []
119 for k,v in self.items():
120 if not (k.startswith('HTTP')\
121 or k in ('SERVER_SOFTWARE','SCRIPT_NAME','LOCAL_ADDR','REQUEST_METHOD','POST_DATA',\
122 'SERVER_PROTOCOL','QUERY_STRING','CONTENT_LENGTH','SERVER_NAME','REMOTE_ADDR','SERVER_ADDR',\
123 'PATH_TRANSLATED','SERVER_PORT','PATH_INFO','GATEWAY_INTERFACE','REMOTE_HOST','REQUEST_URI')):
124 del self[k]
125 self._discarded.append(k)
126 self.cookies = Cookies(self.get('HTTP_COOKIE', ''))
127 # log the request headers
128 if Settings.logRequestHeaders:
129 log("REQUEST: %s" % str(self))
130 def __str__(self):
131 return "self = %s; cookies = %s; discarded = %s;" % ('{'+', '.join(["'%s': '%s'" % (k,v) for k,v in self.items()\
132 if not k.startswith('wsgi')])+'}', str(self.cookies), str(self._discarded))
134 class LimoProcess():
135 _freeList = Queue.Queue()
136 _all = []
137 @staticmethod
138 def get():
139 try:
140 p = LimoProcess._freeList.get()
141 return p
142 except:
143 log("Exception in LimoProcess.get: %s" % str(e))
144 return None
145 @staticmethod
146 def load():
147 #return "[%.2f%%]" % ((1.0 - (LimoProcess._free/LimoProcess._total))*100)
148 return "[%d/%d]" % (LimoProcess._freeList.qsize(), len(LimoProcess._all))
149 def __init__(self, target,index=-1,skipModels=False):
150 """ 'target' is a function like: def request(pipe, index, skip=False), that reads requests from the pipe and sends responses back over it.
151 To work on win32, the target function cannot be a member of anything (static or otherwise).
152 If skipModels is True, then the Model objects won't be initialized for that instance (since presumably they already were imported by some previous launch in the very recent future)
154 self.target = target
155 self.index = index
156 self.is_started = False
157 self.skip_models = skipModels
158 if index == -1:
159 self.index = len(LimoProcess._all)
160 self.start()
162 def free(self):
163 if self.is_started:
164 LimoProcess._freeList.put(self)
166 def start(self):
167 LimoProcess._all.append(self)
168 self.parent_conn, child_conn = Pipe(True) # duplex pipes to talk to the children
169 self.p = Process(target=self.target, args=(child_conn,self.index, self.skip_models))
170 self.p.daemon = True
171 self.p.start()
172 try:
173 self.send("INIT") # this will block until the process is awake enough to recieve it
174 ack = self.recv() # and then again until it is awake enough to respond
175 assert ack == "READY", "Expected READY, got: %s" % str(ack) # and lets just make sure nothing got garbled
176 self.is_started = True
177 self.free()
178 except:
179 self.stop(nicely=False)
180 raise
181 def stop(self,nicely=True):
182 # right away make sure people dont try to use us for something while we are going down
183 LimoProcess._all.remove(self)
184 self.is_started = False
185 # ask the process nicely to die
186 if nicely:
187 log("Sending STOP")
188 self.send("STOP")
189 log("Waiting for response")
190 ack = self.recv()
191 assert ack == "CLOSE", "Expected CLOSE, got: %s" % str(ack)
192 log("Handler acknowledged stop request.")
193 # then just pull the plug
194 self.p.terminate()
195 self.is_started = False
196 del self.p
197 del self.parent_conn
199 # masquerade as a Pipe
200 def send(self,data):
201 return self.parent_conn.send(data)
202 def recv(self):
203 return self.parent_conn.recv()
204 def close(self):
205 self.stop()
207 class LimoServer(Loggable):
208 def __init__(self):
209 self.socket = None
210 self.last_restart = datetime.datetime.now()
212 def stop(self,message=None):
213 self.log("SERVER GOT DEATH REQUEST: %s" % message)
214 sys.exit(1)
216 def log(self,msg):
217 log("[%s] %s" % (self.socket, msg))
219 def start(self,count):
220 assert count > 0, "Must have at least one handler starting"
221 self.procs = []
222 # start a ghetto process pool
223 # these will be our back end request handlers
224 skipModels = False
225 for i in range(count):
226 self.procs.append(LimoProcess(fcgi_handler,i,skipModels))
227 if not skipModels:
228 skipModels = True
229 self.log("Process %d started." % i)
230 # TODO: Start On Demand, up to a maximum
232 def restart(self):
233 now = datetime.datetime.now()
234 diff = ms_elapsed(self.last_restart)
235 if diff < 10000:
236 log("Detected two quick restarts (%.0fms), interpreting as an exit request." % diff)
237 return False
238 for s in self.procs:
239 s.stop()
240 s.start()
241 return True
243 def run(self, socket=Settings.tempDir+"/limo.sock", count=None):
244 try:
245 self.socket = str(socket)
246 if count is None:
247 count = cpu_count()
248 if count != 0:
249 self.log("Starting %d handler processes" % count)
250 self.start(count)
251 self.log("All handlers started.")
252 # then start the front facing fastcgi server
253 self.request_count = 0
254 # ws = WSGIServer(self.wsgi_request, bindAddress = socket, multithreaded=True)
255 self.log("Binding address: %s" % (str(socket)))
256 ws = FastCGIServer(bindAddress = socket)
257 restart = ws.run(self.fcgi_request,log_traffic=Settings.logAllRequestTraffic)
258 if restart:
259 self.log("Server restarting...")
260 del ws
261 if self.restart():
262 self.run(socket=socket,count=0)
263 except Exception, e:
264 self.stop(str(e))
265 finally:
266 self.log("Server exiting...")
268 # the new fastcgi server eschews wsgi for more stream-ability
269 def fcgi_request(self, request):
270 start_wait = datetime.datetime.now()
271 run_pipe = LimoProcess.get()
272 wait_time = ms_elapsed(start_wait)
273 saw_header_end = False
274 headers = ""
275 try:
276 start = datetime.datetime.now()
277 request = Request(request.params, request.stdin)
278 run_pipe.send(request)
279 bytes = 0
280 status = None
281 while True:
282 data = run_pipe.recv()
283 if type(data) in (str, unicode):
284 bytes += len(data)
285 if status is None and data.startswith('Status: '):
286 status = data[8:data.find('\r',8)]
287 if not saw_header_end:
288 n = data.find('\r\n\r\n')
289 if n != -1:
290 headers += data[:n+2] # grab the last chunk of headers plus one crlf
291 saw_header_end = True
292 else:
293 headers += data
294 yield data
295 elif data.__class__.__name__ is 'EOFToken':
296 break
297 elif data.__class__.__name__ is 'ErrorToken':
298 if data.fatal:
299 run_pipe.stop()
300 self.log("Handler returned ErrorToken: %s" % (data.msg))
301 else:
302 raise ValueError("Don't know what to do with response: %s %s" % (str(type(data)), str(data)))
303 load = LimoProcess.load()+"+%d"%run_pipe.index
304 render_time = ms_elapsed(start)
305 finally:
306 run_pipe.free()
307 if not Settings.disableAllLogging:
308 if wait_time > 1:
309 wait_time = "+%.0f" % (wait_time)
310 else:
311 wait_time = ""
312 if bytes > 10000:
313 bytes = "%.2fKb" % (bytes / 1024.0)
314 else:
315 bytes = "%sb" % bytes
316 log("%s %s %s %s %s %s %s %s %s" % (
317 (str(start)[5:-3] if Settings.logRequestStartTime else ""),\
318 (pad(str(load),9) if Settings.logRequestLoad else ""),\
319 pad(request['REQUEST_METHOD'],4),\
320 pad(ellipsis(request['REQUEST_URI'],40,17), 40),\
321 pad(status,9),\
322 pad("(%s)"%bytes,10),\
323 pad("(%.0f%sms)"%(render_time, wait_time),11),\
324 ("[sha:%s]"%ellipsis(request.cookies.get('sha','None').value,7,0)[:-3] if Settings.logSessionToken else ""),\
325 ("\n[headers:\n%s]" % headers) if Settings.logResponseHeaders else "",\
328 class EOFToken(object):
329 """ This is a token passed from LimoProcess handlers to the dispatcher, to signal they are done sending data. """
330 pass
332 class ErrorToken(object):
333 """ This token is used by a handler to send data to the dispatcher that should be sent directly to the web server's stderr.
334 If the fatal flag is True, the handler is considered dead, and is never .free()'d again.
336 def __init__(self, msg, fatal=False):
337 assert type(msg) in (str, unicode)
338 self.msg = msg
339 self.fatal = fatal
341 def get_redirect_response(e,defaultLocation="/"):
342 response = Response()
343 response.data = e.status
344 response.status = e.status
345 have_location = False
346 for item in e.headers:
347 response.headers[item[0]] = item[1]
348 if item[0] == "Location":
349 have_location = True
350 if not have_location:
351 response.headers["Location"] = defaultLocation
352 # log("REDIRECT: %s Location: %s" % (e.status, response.headers["Location"]))
353 return response
355 def get_status_response(e):
356 response = Response()
357 response.status = e.status
358 for item in e.headers:
359 response.headers[item[0]] = item[1]
360 return response
362 def get_error_response():
363 (a,b,c) = sys.exc_info()
364 if ( a == GeneratorException ):
365 x,y,z = (type(b.error), b.error, b.tb)
366 del a,b,c
367 a,b,c = x,y,z
368 log("Handler saw an uncaught exception: %s: %s\n%s" % (a,b,'\n'.join(traceback.format_tb(c))))
369 response = Response()
370 response.status = "200 OK"
371 response.headers["Content-Type"] = "text/html"
372 response.data = cgitb.html((a,b,c))
373 del a, b, c
374 return response
376 # copies of fcgi_handler are the actual handler 'threads'
377 # each one waits for a request to land in its pipe, then processes it
378 # and writes the result back out the pipe
379 def fcgi_handler(pipe, index=0, skipModels=False):
380 from limoutil import log, profile
381 from cache import Cache
382 import modules
383 import settings
384 try:
385 while True:
386 # block until we get a new request in the pipe
387 request = pipe.recv()
388 if request == "INIT": # initialize the handler
389 Settings = reload(settings).Settings
390 log("handler: Initializing modules...")
391 modules.init(skipModels=skipModels)
392 log("handler: Building sitemap...")
393 sitemap = LimoModel().getSiteMap()
394 log("handler: Done initializing and ready to process requests.")
395 pipe.send("READY")
396 elif request == "STOP": # stop the handler gracefully
397 log("Stopping on request")
398 break
399 else: # begin a normal request phase
400 try:
401 Settings = reload(settings).Settings
402 response = Response()
403 session = Session(request, response)
404 messages = session.getMessages() # TODO: pass session messages along so they get on the page somehow
405 if len(messages):
406 session.clearMessages()
407 log("Got messages from previous page: %s" % messages)
408 regions = {'entire': '', 'left': '', 'right': '', 'main': '', 'head': '', 'header': '', 'footer': ''}
409 url = request["REQUEST_URI"].split("?")[0].split("/")
410 # optionally, redirect to standardize URLs
411 if Settings.strictScriptNameRedirects:
412 s = request["SCRIPT_NAME"]
413 if len(s) > 0 and s[-1] != "/" and s[-3] != "." and s[-4] != ".":
414 fixed = s + "/"
415 q = request["QUERY_STRING"]
416 if len(q) == 0:
417 raise HTTPRedirect(fixed, status="301 Moved Permanently")
418 # compute regions
419 for i in range(len(url)):
420 step = '/'.join(url[:i+1]) if i > 0 else '/'
421 try:
422 regions.update(sitemap[step])
423 except KeyError, e:
424 break
425 if Settings.logRequestRegions:
426 log("REGIONS: %s" % str(regions))
427 # define some one-off helper functions
428 # get a list of real blocks, given a comma-sep list of block names
429 def text_to_blocks(text, request, response, session):
430 try:
431 return [modules.blocks[block](request, response, session)
432 for block in reversed(text.split(",")) if block != '']
433 except KeyError:
434 raise KeyError("%s not in %s" % (block, modules.blocks.keys()))
435 # set up the blocks prior to rendering, call init() and action()
436 # redirect if an action added a message to the session
437 def init_blocks(b, request, response, session):
438 ret = text_to_blocks(regions.get(b,''), request, response, session)
439 for block in ret:
440 block.init()
441 try:
442 # log("Calling %s.action()..." % str(type(block).__name__))
443 block.action()
444 if len(session.getMessages()) > 0:
445 log("Got messages: %s" % session.getMessages())
446 raise HTTPRedirect(request["SCRIPT_NAME"])
447 except HTTPStatusError, e:
448 log("Passing along status error: %s %s" % (str(type(e)), str(e)))
449 raise
450 except Exception:
451 # session.addMessage("Action Failure: %s" % str(e))
452 e = sys.exc_info()
453 log("Action Failure: %s %s %s" % (str(e[0]), str(e[1]), traceback.format_tb(e[2])))
454 del e
455 return ret
456 # given a list of blocks, b, call render() on each item and return the result
457 def render_blocks(b):
458 ret = None
459 for block in regions[b]:
460 ret = block.render(ret)
461 return ret
462 # this generator, on each call, will return one piece of the final result
463 def result_generator(request, response, session):
464 if Settings.developmentMode:
465 modules.init(skipModels=False) # will only re-init changed modules
466 if len(regions.get('entire','')) > 0:
467 regions['entire'] = init_blocks('entire', request, response, session)
468 yield render_blocks('entire')
469 else:
470 for region in regions.keys():
471 if region == 'entire':
472 continue
473 regions[region] = init_blocks(region, request, response, session)
474 page = {}
475 for region in regions.keys():
476 if region == 'entire':
477 continue
478 elif region == 'head':
479 page[region] = render_blocks(region)
480 else:
481 page[region] = Template.render("region", {'name':region,'output':render_blocks(region)})
482 page['page_class'] = 'logged-in' if session.user is not None else 'not-logged-in'
483 yield Template.render("page", page)
484 # the final output will come out at the end of a series of generator-filters
486 # define the initial input generator as our result generator from above
487 # this is the basic request generator that will pump LIMO's regions and return raw text or html
488 gen = result_generator(request, response, session)
489 # for now, compression is configured based on user agent
490 # so, first get the configured compression type, like 'gzip','deflate'
491 compression = Settings.getCompression(request.get('HTTP_USER_AGENT',''))
492 # then if the user-agent really says it supports it, lets add a compressor filter
493 if request.get('HTTP_ACCEPT_ENCODING','').find(compression) > -1:
494 if Settings.logCompression:
495 log("server: Using compression: %s" % compression);
496 if compression == "deflate":
497 gen = compress.generate_deflate(gen, cache_file=request.get('LIMO_CACHE_FILE',None))
498 else:
499 gen = compress.generate_gzip(gen, cache_file=request.get('LIMO_CACHE_FILE',None))
500 response.headers['Vary'] = 'Accept-Encoding'
501 response.headers['Content-Encoding'] = compression
502 elif compression != None:
503 # log("server: Compression enabled, but not supported by client: %s" % request.get('HTTP_ACCEPT_ENCODING'))
504 pass
505 # if we dont want to immediately write pieces of the page to the network as they are available
506 # then use a quick buffering filter
507 if Settings.bufferOutput:
508 def buffer(g):
509 buf = cStringIO.StringIO()
510 count = 0
511 for i in flatten(g):
512 count += 1
513 buf.write(i)
514 # yield None # TODO: remove this
515 # by yielding none here we keep loop overhead the same (for comparison sake), but dont produce any incremental output
516 # this allows this setting to test the difference between Transfer-Encoding: chunked responses and not
517 else:
518 yield buf.getvalue()
519 del buf
520 # log("Buffered output collapsed %d writes" % count)
521 # then wrap the result generator in a buffering filter
522 gen = buffer(gen)
523 # now that all the output layers are in place
524 sent_headers = False
525 send_count = 0
526 # execute the whole generator tree and write the results out
527 for text in flatten(gen):
528 if text is not None and len(text) > 0:
529 if not sent_headers:
530 send_count += 1
531 pipe.send(response.headerstring() + text)
532 sent_headers = True
533 continue
534 send_count += 1
535 pipe.send(text)
536 # write an EOF to mark the end
537 pipe.send(EOFToken())
538 except GeneratorException, e: # generator exceptions bundle the real errors, so lets unpack and catch the real thing
539 if e.error.__class__.__name__ == 'HTTPRedirect':
540 pipe.send(get_redirect_response(e.error, request['REQUEST_URI']).headerstring())
541 pipe.send(EOFToken())
542 continue
543 elif e.error.__class__.__name__ == 'HTTPStatusError':
544 pipe.send(get_status_response(e.error).headerstring())
545 pipe.send(EOFToken())
546 continue
547 else:
548 response = get_error_response()
549 pipe.send(response.headerstring())
550 pipe.send(response.data)
551 pipe.send(EOFToken())
552 continue
553 except HTTPRedirect, e:
554 pipe.send(get_redirect_response(e, request['REQUEST_URI']).headerstring())
555 pipe.send(EOFToken())
556 continue
557 except HTTPStatusError, e:
558 pipe.send(get_status_response(e).headerstring())
559 pipe.send(EOFToken())
560 continue
561 except KeyboardInterrupt: # the main process catches the sigint and tells us what to do about it
562 continue
563 except Exception, e:
564 response = get_error_response()
565 pipe.send(response.headerstring())
566 pipe.send(response.data)
567 pipe.send(EOFToken())
568 continue
569 finally: # do our best to clean up and finalize all the request objects
570 try:
571 # make sure all the .css and .js files required in this request are written out to the disk
572 # (as the html we just sent out referenced them, so the browser is likely to request them soon)
573 Cache.CSS.saveContent()
574 Cache.JS.saveContent()
575 except:
576 pass
577 try:
578 # and that the requirements are cleared before processing the next request
579 Cache.CSS.clear()
580 Cache.JS.clear()
581 except:
582 pass
583 try:
584 # write the session back to the database
585 session.save()
586 except Exception, e:
587 log("Error saving session: %s" % str(e))
588 pass
589 if Settings.profileServer and Settings.profileLogQueries:
590 for (sql, ms) in Query.getQueryLog():
591 log("QUERY: [%.2f] %s" % (float(ms), sql))
592 Query.clearQueryLog()
593 except EOFError: # the pipe died (maybe the parent process suddenly died)
594 pass
595 except Exception, e:
596 response = get_error_response()
597 pipe.send(response.headerstring())
598 pipe.send(response.data)
599 pipe.send(EOFToken())
600 pipe.send(ErrorToken("Handler exiting due to error: %s %s" % (a, b), fatal=True))
601 del a, b, c
602 return
603 finally:
604 modules.clear()
605 log("Handler exiting")
606 pipe.close()
607 log("Handler closed")
608 sys.exit(1)