Thread*: start implementing keepalive timeout
[rainbows.git] / lib / rainbows / base.rb
blob62a7701d3040519299cb59ffbd5963501e24c001
1 # -*- encoding: binary -*-
3 module Rainbows
5   # base class for Rainbows concurrency models, this is currently
6   # used by ThreadSpawn and ThreadPool models
7   module Base
9     include Unicorn
10     include Rainbows::Const
11     G = Rainbows::G
13     def listen_loop_error(e)
14       G.alive or return
15       logger.error "Unhandled listen loop exception #{e.inspect}."
16       logger.error e.backtrace.join("\n")
17     end
19     def init_worker_process(worker)
20       super(worker)
21       G.tmp = worker.tmp
23       # we're don't use the self-pipe mechanism in the Rainbows! worker
24       # since we don't defer reopening logs
25       HttpServer::SELF_PIPE.each { |x| x.close }.clear
26       trap(:USR1) { reopen_worker_logs(worker.nr) }
27       trap(:QUIT) { G.quit! }
28       [:TERM, :INT].each { |sig| trap(sig) { exit!(0) } } # instant shutdown
29       logger.info "Rainbows! #@use worker_connections=#@worker_connections"
30     end
32     # once a client is accepted, it is processed in its entirety here
33     # in 3 easy steps: read request, call app, write app response
34     def process_client(client)
35       buf = client.readpartial(CHUNK_SIZE) # accept filters protect us here
36       hp = HttpParser.new
37       env = {}
38       alive = true
39       remote_addr = TCPSocket === client ? client.peeraddr.last : LOCALHOST
41       begin # loop
42         while ! hp.headers(env, buf)
43           IO.select([client], nil, nil, 5) or return client.close
44           buf << client.readpartial(CHUNK_SIZE)
45         end
47         env[RACK_INPUT] = 0 == hp.content_length ?
48                  HttpRequest::NULL_IO :
49                  Unicorn::TeeInput.new(client, env, hp, buf)
50         env[REMOTE_ADDR] = remote_addr
51         response = app.call(env.update(RACK_DEFAULTS))
53         if 100 == response.first.to_i
54           client.write(EXPECT_100_RESPONSE)
55           env.delete(HTTP_EXPECT)
56           response = app.call(env)
57         end
59         alive = hp.keepalive? && G.alive
60         out = [ alive ? CONN_ALIVE : CONN_CLOSE ] if hp.headers?
61         HttpResponse.write(client, response, out)
62       end while alive and hp.reset.nil? and env.clear
63       client.close
64     # if we get any error, try to write something back to the client
65     # assuming we haven't closed the socket, but don't get hung up
66     # if the socket is already closed or broken.  We'll always ensure
67     # the socket is closed at the end of this function
68     rescue => e
69       handle_error(client, e)
70     end
72     def join_threads(threads)
73       G.quit!
74       expire = Time.now + (timeout * 2.0)
75       until (threads.delete_if { |thr| ! thr.alive? }).empty?
76         threads.each { |thr|
77           G.tick
78           thr.join(1)
79           break if Time.now >= expire
80         }
81       end
82     end
84     def self.included(klass)
85       klass.const_set :LISTENERS, HttpServer::LISTENERS
86       klass.const_set :G, Rainbows::G
87     end
89   end
90 end