1 # -*- encoding: binary -*-
3 require 'rainbows/ev_core'
7 # Implements a basic single-threaded event model with
8 # {EventMachine}[http://rubyeventmachine.com/]. It is capable of
9 # handling thousands of simultaneous client connections, but with only
10 # a single-threaded app dispatch. It is suited for slow clients and
11 # fast applications (applications that do not have slow network
12 # dependencies) or applications that use DevFdResponse for deferrable
13 # response bodies. It does not require your Rack application to be
14 # thread-safe, reentrancy is only required for the DevFdResponse body
17 # Compatibility: Whatever \EventMachine and Unicorn both support,
18 # currently Ruby 1.8/1.9.
20 # This model does not implement as streaming "rack.input" which allows
21 # the Rack application to process data as it arrives. This means
22 # "rack.input" will be fully buffered in memory or to a temporary file
23 # before the application is entered.
29 class Client < EM::Connection
30 include Rainbows::EvCore
38 alias receive_data on_read
42 close_connection_after_writing
47 (@env[RACK_INPUT] = @input).rewind
48 alive = @hp.keepalive?
49 @env[REMOTE_ADDR] = @remote_addr
50 response = G.app.call(@env.update(RACK_DEFAULTS))
52 out = [ alive ? CONN_ALIVE : CONN_CLOSE ] if @hp.headers?
54 HttpResponse.write(self, response, out)
59 # keepalive requests are always body-less, so @input is unchanged
60 @hp.headers(@env, @buf) and next
69 if body = @deferred_bodies.first
70 return if DeferredResponse === body
73 write(body.sysread(CHUNK_SIZE))
74 rescue EOFError # expected at file EOF
75 @deferred_bodies.shift
77 close if :close == @state && @deferred_bodies.empty?
83 close if :close == @state
91 def initialize(listener, conns)
93 @limit = Rainbows::G.max + HttpServer::LISTENERS.size
103 return if @em_conns.size >= @limit
105 io = @l.accept_nonblock
106 sig = EM.attach_fd(io.fileno, false, false)
107 @em_conns[sig] = Client.new(sig, io)
108 rescue Errno::EAGAIN, Errno::ECONNABORTED
113 # runs inside each forked worker, this sits around and waits
114 # for connections and doesn't die until the parent dies (or is
115 # given a INT, QUIT, or TERM signal)
116 def worker_loop(worker)
117 init_worker_process(worker)
120 conns = EM.instance_variable_get(:@conns) or
121 raise RuntimeError, "EM @conns instance variable not accessible!"
122 EM.add_periodic_timer(1) do
123 worker.tmp.chmod(m = 0 == m ? 1 : 0)
125 conns.each_value { |client| Client === client and client.quit }
126 EM.stop if conns.empty? && EM.reactor_running?
129 LISTENERS.map! { |s| EM.attach(s, Server, s, conns) }