1 # -*- encoding: binary -*-
5 # base class for Rainbows concurrency models, this is currently
6 # used by ThreadSpawn and ThreadPool models
10 include Rainbows::Const
13 def listen_loop_error(e)
15 logger.error "Unhandled listen loop exception #{e.inspect}."
16 logger.error e.backtrace.join("\n")
19 def init_worker_process(worker)
23 # we're don't use the self-pipe mechanism in the Rainbows! worker
24 # since we don't defer reopening logs
25 HttpServer::SELF_PIPE.each { |x| x.close }.clear
26 trap(:USR1) { reopen_worker_logs(worker.nr) }
27 trap(:QUIT) { G.quit! }
28 [:TERM, :INT].each { |sig| trap(sig) { exit!(0) } } # instant shutdown
29 logger.info "Rainbows! #@use worker_connections=#@worker_connections"
32 # once a client is accepted, it is processed in its entirety here
33 # in 3 easy steps: read request, call app, write app response
34 def process_client(client)
35 buf = client.readpartial(CHUNK_SIZE) # accept filters protect us here
39 remote_addr = TCPSocket === client ? client.peeraddr.last : LOCALHOST
42 while ! hp.headers(env, buf)
43 IO.select([client], nil, nil, 5) or return client.close
44 buf << client.readpartial(CHUNK_SIZE)
47 env[RACK_INPUT] = 0 == hp.content_length ?
48 HttpRequest::NULL_IO :
49 Unicorn::TeeInput.new(client, env, hp, buf)
50 env[REMOTE_ADDR] = remote_addr
51 response = app.call(env.update(RACK_DEFAULTS))
53 if 100 == response.first.to_i
54 client.write(EXPECT_100_RESPONSE)
55 env.delete(HTTP_EXPECT)
56 response = app.call(env)
59 alive = hp.keepalive? && G.alive
60 out = [ alive ? CONN_ALIVE : CONN_CLOSE ] if hp.headers?
61 HttpResponse.write(client, response, out)
62 end while alive and hp.reset.nil? and env.clear
64 # if we get any error, try to write something back to the client
65 # assuming we haven't closed the socket, but don't get hung up
66 # if the socket is already closed or broken. We'll always ensure
67 # the socket is closed at the end of this function
69 handle_error(client, e)
72 def join_threads(threads)
74 expire = Time.now + (timeout * 2.0)
75 until (threads.delete_if { |thr| ! thr.alive? }).empty?
79 break if Time.now >= expire
84 def self.included(klass)
85 klass.const_set :LISTENERS, HttpServer::LISTENERS
86 klass.const_set :G, Rainbows::G