cleanup: consolidate write_nonblock error handling
[rainbows.git] / lib / rainbows / base.rb
blob9bbe04958710ea694e9d6ddf6025b9bfa7c7eed8
1 # -*- encoding: binary -*-
3 module Rainbows
5   # base class for Rainbows concurrency models, this is currently
6   # used by ThreadSpawn and ThreadPool models
7   module Base
9     include Unicorn
10     include Rainbows::Const
11     G = Rainbows::G
13     def init_worker_process(worker)
14       super(worker)
15       G.tmp = worker.tmp
17       # we're don't use the self-pipe mechanism in the Rainbows! worker
18       # since we don't defer reopening logs
19       HttpServer::SELF_PIPE.each { |x| x.close }.clear
20       trap(:USR1) { reopen_worker_logs(worker.nr) }
21       trap(:QUIT) { G.quit! }
22       [:TERM, :INT].each { |sig| trap(sig) { exit!(0) } } # instant shutdown
23       logger.info "Rainbows! #@use worker_connections=#@worker_connections"
24     end
26     # once a client is accepted, it is processed in its entirety here
27     # in 3 easy steps: read request, call app, write app response
28     def process_client(client)
29       buf = client.readpartial(CHUNK_SIZE) # accept filters protect us here
30       hp = HttpParser.new
31       env = {}
32       alive = true
33       remote_addr = TCPSocket === client ? client.peeraddr.last : LOCALHOST
35       begin # loop
36         while ! hp.headers(env, buf)
37           IO.select([client], nil, nil, G.kato) or return
38           buf << client.readpartial(CHUNK_SIZE)
39         end
41         env[CLIENT_IO] = client
42         env[RACK_INPUT] = 0 == hp.content_length ?
43                  HttpRequest::NULL_IO :
44                  Unicorn::TeeInput.new(client, env, hp, buf)
45         env[REMOTE_ADDR] = remote_addr
46         response = app.call(env.update(RACK_DEFAULTS))
48         if 100 == response.first.to_i
49           client.write(EXPECT_100_RESPONSE)
50           env.delete(HTTP_EXPECT)
51           response = app.call(env)
52         end
54         alive = hp.keepalive? && G.alive
55         out = [ alive ? CONN_ALIVE : CONN_CLOSE ] if hp.headers?
56         HttpResponse.write(client, response, out)
57       end while alive and hp.reset.nil? and env.clear
58     # if we get any error, try to write something back to the client
59     # assuming we haven't closed the socket, but don't get hung up
60     # if the socket is already closed or broken.  We'll always ensure
61     # the socket is closed at the end of this function
62     rescue => e
63       Error.write(client, e)
64     ensure
65       client.close
66     end
68     def join_threads(threads)
69       G.quit!
70       threads.delete_if do |thr|
71         G.tick
72         thr.alive? ? thr.join(0.01) : true
73       end until threads.empty?
74     end
76     def self.included(klass)
77       klass.const_set :LISTENERS, HttpServer::LISTENERS
78       klass.const_set :G, Rainbows::G
79     end
81   end
82 end