1 # -*- encoding: binary -*-
3 # base class for Rainbows concurrency models, this is currently used by
4 # ThreadSpawn and ThreadPool models. Base is also its own
5 # (non-)concurrency model which is basically Unicorn-with-keepalive, and
6 # not intended for production use, as keepalive with a pure prefork
7 # concurrency model is extremely expensive.
11 include Rainbows::Const
12 include Rainbows::Response
16 NULL_IO = Unicorn::HttpRequest::NULL_IO
17 TeeInput = Rainbows::TeeInput
18 HttpParser = Unicorn::HttpParser
20 # this method is called by all current concurrency models
21 def init_worker_process(worker) # :nodoc:
23 Rainbows::Response.setup(self.class)
24 Rainbows::MaxBody.setup
27 # avoid spurious wakeups and blocking-accept() with 1.8 green threads
28 if ! defined?(RUBY_ENGINE) && RUBY_VERSION.to_f < 1.9
30 Rainbows::HttpServer::LISTENERS.each { |l| l.nonblock = true }
33 # we're don't use the self-pipe mechanism in the Rainbows! worker
34 # since we don't defer reopening logs
35 Rainbows::HttpServer::SELF_PIPE.each { |x| x.close }.clear
36 trap(:USR1) { reopen_worker_logs(worker.nr) }
37 trap(:QUIT) { G.quit! }
38 [:TERM, :INT].each { |sig| trap(sig) { exit!(0) } } # instant shutdown
39 logger.info "Rainbows! #@use worker_connections=#@worker_connections"
42 def wait_headers_readable(client) # :nodoc:
43 IO.select([client], nil, nil, G.kato)
46 # once a client is accepted, it is processed in its entirety here
47 # in 3 easy steps: read request, call app, write app response
48 # this is used by synchronous concurrency models
49 # Base, ThreadSpawn, ThreadPool
50 def process_client(client) # :nodoc:
51 buf = client.readpartial(CHUNK_SIZE) # accept filters protect us here
54 remote_addr = Rainbows.addr(client)
57 until hp.headers(env, buf)
58 wait_headers_readable(client) or return
59 buf << client.readpartial(CHUNK_SIZE)
62 env[CLIENT_IO] = client
63 env[RACK_INPUT] = 0 == hp.content_length ?
64 NULL_IO : TeeInput.new(client, env, hp, buf)
65 env[REMOTE_ADDR] = remote_addr
66 status, headers, body = app.call(env.update(RACK_DEFAULTS))
69 client.write(EXPECT_100_RESPONSE)
70 env.delete(HTTP_EXPECT)
71 status, headers, body = app.call(env)
75 headers = HH.new(headers)
76 range = parse_range(env, status, headers) and status = range.shift
77 env = false unless hp.keepalive? && G.alive
78 headers[CONNECTION] = env ? KEEP_ALIVE : CLOSE
79 client.write(response_header(status, headers))
81 write_body(client, body, range)
82 end while env && env.clear && hp.reset.nil?
83 # if we get any error, try to write something back to the client
84 # assuming we haven't closed the socket, but don't get hung up
85 # if the socket is already closed or broken. We'll always ensure
86 # the socket is closed at the end of this function
88 Rainbows::Error.write(client, e)
90 client.close unless client.closed?
93 def self.included(klass) # :nodoc:
94 klass.const_set :LISTENERS, Rainbows::HttpServer::LISTENERS
95 klass.const_set :G, Rainbows::G