1 # -*- encoding: binary -*-
3 EM::VERSION >= '0.12.10' or abort 'eventmachine 0.12.10 is required'
4 require 'rainbows/ev_core'
8 # Implements a basic single-threaded event model with
9 # {EventMachine}[http://rubyeventmachine.com/]. It is capable of
10 # handling thousands of simultaneous client connections, but with only
11 # a single-threaded app dispatch. It is suited for slow clients and
12 # fast applications (applications that do not have slow network
13 # dependencies) or applications that use DevFdResponse for deferrable
14 # response bodies. It does not require your Rack application to be
15 # thread-safe, reentrancy is only required for the DevFdResponse body
18 # Compatibility: Whatever \EventMachine ~> 0.12.10 and Unicorn both
19 # support, currently Ruby 1.8/1.9.
21 # This model is compatible with users of "async.callback" in the Rack
23 # {async_sinatra}[http://github.com/raggi/async_sinatra].
25 # This model does not implement as streaming "rack.input" which allows
26 # the Rack application to process data as it arrives. This means
27 # "rack.input" will be fully buffered in memory or to a temporary file
28 # before the application is entered.
34 class Client < EM::Connection
35 include Rainbows::EvCore
38 # Apps may return this Rack response: AsyncResponse = [ -1, {}, [] ]
39 ASYNC_CALLBACK = 'async.callback'.freeze
46 alias receive_data on_read
50 close_connection_after_writing
54 set_comm_inactivity_timeout 0
56 @env[RACK_INPUT] = @input
57 @env[REMOTE_ADDR] = @remote_addr
58 @env[ASYNC_CALLBACK] = method(:response_write)
60 response = catch(:async) { APP.call(@env.update(RACK_DEFAULTS)) }
62 # too tricky to support pipelining with :async since the
63 # second (pipelined) request could be a stuck behind a
64 # long-running async response
65 (response.nil? || -1 == response.first) and return @state = :close
67 alive = @hp.keepalive? && G.alive
68 out = [ alive ? CONN_ALIVE : CONN_CLOSE ] if @hp.headers?
69 response_write(response, out, alive)
75 # keepalive requests are always body-less, so @input is unchanged
76 @hp.headers(@env, @buf) and next
77 set_comm_inactivity_timeout G.kato
83 def response_write(response, out = [], alive = false)
85 unless body.respond_to?(:to_path)
86 HttpResponse.write(self, response, out)
91 headers = Rack::Utils::HeaderHash.new(response[1])
93 io = body.to_io if body.respond_to?(:to_io)
94 io ||= IO.new($1.to_i) if path =~ %r{\A/dev/fd/(\d+)\z}
95 io ||= File.open(path, 'rb') # could be a named pipe
99 headers.delete('Transfer-Encoding')
100 headers['Content-Length'] ||= st.size.to_s
101 response = [ response.first, headers.to_hash, [] ]
102 HttpResponse.write(self, response, out)
103 stream = stream_file_data(path)
104 stream.callback { quit } unless alive
105 elsif st.socket? || st.pipe?
106 do_chunk = !!(headers['Transfer-Encoding'] =~ %r{\Achunked\z}i)
107 do_chunk = false if headers.delete('X-Rainbows-Autochunk') == 'no'
113 response = [ response.first, headers.to_hash, [] ]
114 HttpResponse.write(self, response, out)
116 EM.watch(io, ResponseChunkPipe, self).notify_readable = true
118 EM.enable_proxy(EM.attach(io, ResponsePipe, self), self, 16384)
121 HttpResponse.write(self, response, out)
131 def initialize(client)
141 module ResponseChunkPipe
145 @client.write("0\r\n\r\n")
152 @io.read_nonblock(16384)
161 @client.send_data(sprintf("%x\r\n", data.size))
162 @client.send_data(data)
163 @client.send_data("\r\n")
176 return if CUR.size >= MAX
177 io = Rainbows.accept(@io) or return
178 sig = EM.attach_fd(io.fileno, false)
179 CUR[sig] = Client.new(sig, io)
183 # runs inside each forked worker, this sits around and waits
184 # for connections and doesn't die until the parent dies (or is
185 # given a INT, QUIT, or TERM signal)
186 def worker_loop(worker)
187 init_worker_process(worker)
189 # enable them both, should be non-fatal if not supported
192 logger.info "EventMachine: epoll=#{EM.epoll?} kqueue=#{EM.kqueue?}"
193 Server.const_set(:MAX, G.server.worker_connections +
194 HttpServer::LISTENERS.size)
197 conns = EM.instance_variable_get(:@conns) or
198 raise RuntimeError, "EM @conns instance variable not accessible!"
199 Server.const_set(:CUR, conns)
200 EM.add_periodic_timer(1) do
202 conns.each_value { |client| Client === client and client.quit }
203 EM.stop if conns.empty? && EM.reactor_running?
206 LISTENERS.map! do |s|
207 EM.watch(s, Server) { |c| c.notify_readable = true }