SIGINT/SIGTERM shuts down instantly in workers
[rainbows.git] / lib / rainbows / base.rb
blobf24de315da5a7a1b724614135b69bcaf8eb4fc46
1 # -*- encoding: binary -*-
3 module Rainbows
5   # base class for Rainbows concurrency models, this is currently
6   # used by ThreadSpawn and ThreadPool models
7   module Base
9     include Unicorn
10     include Rainbows::Const
12     # write a response without caring if it went out or not for error
13     # messages.
14     # TODO: merge into Unicorn::HttpServer
15     def emergency_response(client, response_str)
16       client.write_nonblock(response_str) rescue nil
17       client.close rescue nil
18     end
20     # TODO: migrate into Unicorn::HttpServer
21     def listen_loop_error(e)
22       logger.error "Unhandled listen loop exception #{e.inspect}."
23       logger.error e.backtrace.join("\n")
24     end
26     def init_worker_process(worker)
27       super(worker)
29       # we're don't use the self-pipe mechanism in the Rainbows! worker
30       # since we don't defer reopening logs
31       HttpServer::SELF_PIPE.each { |x| x.close }.clear
32       trap(:USR1) { reopen_worker_logs(worker.nr) rescue nil }
33       # closing anything we IO.select on will raise EBADF
34       trap(:QUIT) { HttpServer::LISTENERS.map! { |s| s.close rescue nil } }
35       [:TERM, :INT].each { |sig| trap(sig) { exit!(0) } } # instant shutdown
36       logger.info "Rainbows! #@use worker_connections=#@worker_connections"
37     end
39     # once a client is accepted, it is processed in its entirety here
40     # in 3 easy steps: read request, call app, write app response
41     def process_client(client)
42       buf = client.readpartial(CHUNK_SIZE)
43       hp = HttpParser.new
44       env = {}
45       alive = true
46       remote_addr = TCPSocket === client ? client.peeraddr.last : LOCALHOST
48       begin # loop
49         while ! hp.headers(env, buf)
50           buf << client.readpartial(CHUNK_SIZE)
51         end
53         env[RACK_INPUT] = 0 == hp.content_length ?
54                  HttpRequest::NULL_IO :
55                  Unicorn::TeeInput.new(client, env, hp, buf)
56         env[REMOTE_ADDR] = remote_addr
57         response = app.call(env.update(RACK_DEFAULTS))
59         if 100 == response.first.to_i
60           client.write(EXPECT_100_RESPONSE)
61           env.delete(HTTP_EXPECT)
62           response = app.call(env)
63         end
65         alive = hp.keepalive? && ! Thread.current[:quit]
66         out = [ alive ? CONN_ALIVE : CONN_CLOSE ] if hp.headers?
67         HttpResponse.write(client, response, out)
68       end while alive and hp.reset.nil? and env.clear
69       client.close
70     # if we get any error, try to write something back to the client
71     # assuming we haven't closed the socket, but don't get hung up
72     # if the socket is already closed or broken.  We'll always ensure
73     # the socket is closed at the end of this function
74     rescue EOFError,Errno::ECONNRESET,Errno::EPIPE,Errno::EINVAL,Errno::EBADF
75       emergency_response(client, ERROR_500_RESPONSE)
76     rescue HttpParserError # try to tell the client they're bad
77       buf.empty? or emergency_response(client, ERROR_400_RESPONSE)
78     rescue Object => e
79       emergency_response(client, ERROR_500_RESPONSE)
80       logger.error "Read error: #{e.inspect}"
81       logger.error e.backtrace.join("\n")
82     end
84     def join_threads(threads, worker)
85       logger.info "Joining threads..."
86       threads.each { |thr| thr[:quit] = true }
87       t0 = Time.now
88       timeleft = timeout * 2.0
89       m = 0
90       while (nr = threads.count { |thr| thr.alive? }) > 0 && timeleft > 0
91         threads.each { |thr|
92           worker.tmp.chmod(m = 0 == m ? 1 : 0)
93           thr.join(1)
94           break if (timeleft -= (Time.now - t0)) < 0
95         }
96       end
97       logger.info "Done joining threads. #{nr} left running"
98     end
100     def self.included(klass)
101       klass.const_set :LISTENERS, HttpServer::LISTENERS
102     end
104   end