1 # -*- encoding: binary -*-
5 # base class for Rainbows concurrency models, this is currently
6 # used by ThreadSpawn and ThreadPool models
10 include Rainbows::Const
12 # write a response without caring if it went out or not for error
14 # TODO: merge into Unicorn::HttpServer
15 def emergency_response(client, response_str)
16 client.write_nonblock(response_str) rescue nil
17 client.close rescue nil
20 # TODO: migrate into Unicorn::HttpServer
21 def listen_loop_error(e)
22 logger.error "Unhandled listen loop exception #{e.inspect}."
23 logger.error e.backtrace.join("\n")
26 def init_worker_process(worker)
29 # we're don't use the self-pipe mechanism in the Rainbows! worker
30 # since we don't defer reopening logs
31 HttpServer::SELF_PIPE.each { |x| x.close }.clear
32 trap(:USR1) { reopen_worker_logs(worker.nr) rescue nil }
33 # closing anything we IO.select on will raise EBADF
34 trap(:QUIT) { HttpServer::LISTENERS.map! { |s| s.close rescue nil } }
35 [:TERM, :INT].each { |sig| trap(sig) { exit!(0) } } # instant shutdown
36 logger.info "Rainbows! #@use worker_connections=#@worker_connections"
39 # once a client is accepted, it is processed in its entirety here
40 # in 3 easy steps: read request, call app, write app response
41 def process_client(client)
42 buf = client.readpartial(CHUNK_SIZE)
46 remote_addr = TCPSocket === client ? client.peeraddr.last : LOCALHOST
49 while ! hp.headers(env, buf)
50 buf << client.readpartial(CHUNK_SIZE)
53 env[RACK_INPUT] = 0 == hp.content_length ?
54 HttpRequest::NULL_IO :
55 Unicorn::TeeInput.new(client, env, hp, buf)
56 env[REMOTE_ADDR] = remote_addr
57 response = app.call(env.update(RACK_DEFAULTS))
59 if 100 == response.first.to_i
60 client.write(EXPECT_100_RESPONSE)
61 env.delete(HTTP_EXPECT)
62 response = app.call(env)
65 alive = hp.keepalive? && ! Thread.current[:quit]
66 out = [ alive ? CONN_ALIVE : CONN_CLOSE ] if hp.headers?
67 HttpResponse.write(client, response, out)
68 end while alive and hp.reset.nil? and env.clear
70 # if we get any error, try to write something back to the client
71 # assuming we haven't closed the socket, but don't get hung up
72 # if the socket is already closed or broken. We'll always ensure
73 # the socket is closed at the end of this function
74 rescue EOFError,Errno::ECONNRESET,Errno::EPIPE,Errno::EINVAL,Errno::EBADF
75 emergency_response(client, ERROR_500_RESPONSE)
76 rescue HttpParserError # try to tell the client they're bad
77 buf.empty? or emergency_response(client, ERROR_400_RESPONSE)
79 emergency_response(client, ERROR_500_RESPONSE)
80 logger.error "Read error: #{e.inspect}"
81 logger.error e.backtrace.join("\n")
84 def join_threads(threads, worker)
85 logger.info "Joining threads..."
86 threads.each { |thr| thr[:quit] = true }
88 timeleft = timeout * 2.0
90 while (nr = threads.count { |thr| thr.alive? }) > 0 && timeleft > 0
92 worker.tmp.chmod(m = 0 == m ? 1 : 0)
94 break if (timeleft -= (Time.now - t0)) < 0
97 logger.info "Done joining threads. #{nr} left running"
100 def self.included(klass)
101 klass.const_set :LISTENERS, HttpServer::LISTENERS