1 # -*- encoding: binary -*-
5 # base class for Rainbows concurrency models, this is currently
6 # used by ThreadSpawn and ThreadPool models
10 include Rainbows::Const
13 def init_worker_process(worker)
18 # avoid spurious wakeups and blocking-accept() with 1.8 green threads
19 if ! defined?(RUBY_ENGINE) && RUBY_VERSION.to_f < 1.9
21 HttpServer::LISTENERS.each { |l| l.nonblock = true }
24 # we're don't use the self-pipe mechanism in the Rainbows! worker
25 # since we don't defer reopening logs
26 HttpServer::SELF_PIPE.each { |x| x.close }.clear
27 trap(:USR1) { reopen_worker_logs(worker.nr) }
28 trap(:QUIT) { G.quit! }
29 [:TERM, :INT].each { |sig| trap(sig) { exit!(0) } } # instant shutdown
30 logger.info "Rainbows! #@use worker_connections=#@worker_connections"
33 if IO.respond_to?(:copy_stream)
34 def write_body(client, body)
35 if body.respond_to?(:to_path)
36 IO.copy_stream(Rainbows.body_to_io(body), client)
38 body.each { |chunk| client.write(chunk) }
41 body.respond_to?(:close) and body.close
44 def write_body(client, body)
45 body.each { |chunk| client.write(chunk) }
47 body.respond_to?(:close) and body.close
51 module_function :write_body
53 # once a client is accepted, it is processed in its entirety here
54 # in 3 easy steps: read request, call app, write app response
55 # this is used by synchronous concurrency models
56 # Base, ThreadSpawn, ThreadPool
57 def process_client(client)
58 buf = client.readpartial(CHUNK_SIZE) # accept filters protect us here
62 remote_addr = Rainbows.addr(client)
65 while ! hp.headers(env, buf)
66 IO.select([client], nil, nil, G.kato) or return
67 buf << client.readpartial(CHUNK_SIZE)
70 env[CLIENT_IO] = client
71 env[RACK_INPUT] = 0 == hp.content_length ?
72 HttpRequest::NULL_IO : TeeInput.new(client, env, hp, buf)
73 env[REMOTE_ADDR] = remote_addr
74 status, headers, body = app.call(env.update(RACK_DEFAULTS))
77 client.write(EXPECT_100_RESPONSE)
78 env.delete(HTTP_EXPECT)
79 status, headers, body = app.call(env)
82 alive = hp.keepalive? && G.alive
84 out = [ alive ? CONN_ALIVE : CONN_CLOSE ]
85 client.write(HttpResponse.header_string(status, headers, out))
87 write_body(client, body)
88 end while alive and hp.reset.nil? and env.clear
89 # if we get any error, try to write something back to the client
90 # assuming we haven't closed the socket, but don't get hung up
91 # if the socket is already closed or broken. We'll always ensure
92 # the socket is closed at the end of this function
94 Error.write(client, e)
96 client.close unless client.closed?
99 def self.included(klass)
100 klass.const_set :LISTENERS, HttpServer::LISTENERS
101 klass.const_set :G, Rainbows::G