start using kgio library
[rainbows.git] / lib / rainbows / base.rb
blob59747c7106ecdaa2764752df2c3adf2e160c014e
1 # -*- encoding: binary -*-
3 # base class for \Rainbows! concurrency models, this is currently used by
4 # ThreadSpawn and ThreadPool models.  Base is also its own
5 # (non-)concurrency model which is basically Unicorn-with-keepalive, and
6 # not intended for production use, as keepalive with a pure prefork
7 # concurrency model is extremely expensive.
8 module Rainbows::Base
10   # :stopdoc:
11   include Rainbows::Const
12   include Rainbows::Response
14   # shortcuts...
15   G = Rainbows::G
16   NULL_IO = Unicorn::HttpRequest::NULL_IO
17   TeeInput = Rainbows::TeeInput
18   HttpParser = Unicorn::HttpParser
20   # this method is called by all current concurrency models
21   def init_worker_process(worker) # :nodoc:
22     super(worker)
23     Rainbows::Response.setup(self.class)
24     Rainbows::MaxBody.setup
25     G.tmp = worker.tmp
27     listeners = Rainbows::HttpServer::LISTENERS
28     Rainbows::HttpServer::IO_PURGATORY.concat(listeners)
30     # no need for this when Unicorn uses Kgio
31     listeners.map! do |io|
32       case io
33       when TCPServer
34         Kgio::TCPServer.for_fd(io.fileno)
35       when UNIXServer
36         Kgio::UNIXServer.for_fd(io.fileno)
37       else
38         io
39       end
40     end
42     # we're don't use the self-pipe mechanism in the Rainbows! worker
43     # since we don't defer reopening logs
44     Rainbows::HttpServer::SELF_PIPE.each { |x| x.close }.clear
45     trap(:USR1) { reopen_worker_logs(worker.nr) }
46     trap(:QUIT) { G.quit! }
47     [:TERM, :INT].each { |sig| trap(sig) { exit!(0) } } # instant shutdown
48     logger.info "Rainbows! #@use worker_connections=#@worker_connections"
49   end
51   def wait_headers_readable(client)  # :nodoc:
52     IO.select([client], nil, nil, G.kato)
53   end
55   # once a client is accepted, it is processed in its entirety here
56   # in 3 easy steps: read request, call app, write app response
57   # this is used by synchronous concurrency models
58   #   Base, ThreadSpawn, ThreadPool
59   def process_client(client) # :nodoc:
60     buf = client.readpartial(CHUNK_SIZE) # accept filters protect us here
61     hp = HttpParser.new
62     env = {}
63     remote_addr = Rainbows.addr(client)
65     begin # loop
66       until hp.headers(env, buf)
67         wait_headers_readable(client) or return
68         buf << client.readpartial(CHUNK_SIZE)
69       end
71       env[CLIENT_IO] = client
72       env[RACK_INPUT] = 0 == hp.content_length ?
73                         NULL_IO : TeeInput.new(client, env, hp, buf)
74       env[REMOTE_ADDR] = remote_addr
75       status, headers, body = app.call(env.update(RACK_DEFAULTS))
77       if 100 == status.to_i
78         client.write(EXPECT_100_RESPONSE)
79         env.delete(HTTP_EXPECT)
80         status, headers, body = app.call(env)
81       end
83       if hp.headers?
84         headers = HH.new(headers)
85         range = make_range!(env, status, headers) and status = range.shift
86         env = false unless hp.keepalive? && G.alive
87         headers[CONNECTION] = env ? KEEP_ALIVE : CLOSE
88         client.write(response_header(status, headers))
89       end
90       write_body(client, body, range)
91     end while env && env.clear && hp.reset.nil?
92   # if we get any error, try to write something back to the client
93   # assuming we haven't closed the socket, but don't get hung up
94   # if the socket is already closed or broken.  We'll always ensure
95   # the socket is closed at the end of this function
96   rescue => e
97     Rainbows::Error.write(client, e)
98   ensure
99     client.close unless client.closed?
100   end
102   def self.included(klass) # :nodoc:
103     klass.const_set :LISTENERS, Rainbows::HttpServer::LISTENERS
104     klass.const_set :G, Rainbows::G
105   end
107   # :startdoc: