1 # -*- encoding: binary -*-
3 EM::VERSION >= '0.12.10' or abort 'eventmachine 0.12.10 is required'
4 require 'rainbows/ev_core'
8 # Implements a basic single-threaded event model with
9 # {EventMachine}[http://rubyeventmachine.com/]. It is capable of
10 # handling thousands of simultaneous client connections, but with only
11 # a single-threaded app dispatch. It is suited for slow clients,
12 # and can work with slow applications via asynchronous libraries such as
13 # {async_sinatra}[http://github.com/raggi/async_sinatra],
14 # {Cramp}[http://m.onkey.org/2010/1/7/introducing-cramp],
15 # and {rack-fiber_pool}[http://github.com/mperham/rack-fiber_pool].
17 # It does not require your Rack application to be thread-safe,
18 # reentrancy is only required for the DevFdResponse body
21 # Compatibility: Whatever \EventMachine ~> 0.12.10 and Unicorn both
22 # support, currently Ruby 1.8/1.9.
24 # This model is compatible with users of "async.callback" in the Rack
26 # {async_sinatra}[http://github.com/raggi/async_sinatra].
28 # For a complete asynchronous framework,
29 # {Cramp}[http://m.onkey.org/2010/1/7/introducing-cramp] is fully
30 # supported when using this concurrency model.
32 # This model is fully-compatible with
33 # {rack-fiber_pool}[http://github.com/mperham/rack-fiber_pool]
34 # which allows each request to run inside its own \Fiber after
35 # all request processing is complete.
37 # Merb (and other frameworks/apps) supporting +deferred?+ execution as
38 # documented at http://brainspl.at/articles/2008/04/18/deferred-requests-with-merb-ebb-and-thin
39 # will also get the ability to conditionally defer request processing
40 # to a separate thread.
42 # This model does not implement as streaming "rack.input" which allows
43 # the Rack application to process data as it arrives. This means
44 # "rack.input" will be fully buffered in memory or to a temporary file
45 # before the application is entered.
51 class Client < EM::Connection # :nodoc: all
52 include Rainbows::EvCore
53 include Rainbows::Response
62 alias receive_data on_read
66 close_connection_after_writing
70 set_comm_inactivity_timeout 0
72 @env[RACK_INPUT] = @input
73 @env[REMOTE_ADDR] = @remote_addr
74 @env[ASYNC_CALLBACK] = method(:em_write_response)
76 # we're not sure if anybody uses this, but Thin sets it, too
77 @env[ASYNC_CLOSE] = EM::DefaultDeferrable.new
79 response = catch(:async) { APP.call(@env.update(RACK_DEFAULTS)) }
81 # too tricky to support pipelining with :async since the
82 # second (pipelined) request could be a stuck behind a
83 # long-running async response
84 (response.nil? || -1 == response[0]) and return @state = :close
86 em_write_response(response, alive = @hp.keepalive? && G.alive)
91 # keepalive requests are always body-less, so @input is unchanged
92 @hp.headers(@env, @buf) and next
93 set_comm_inactivity_timeout G.kato
99 # used for streaming sockets and pipes
100 def stream_response(status, headers, io)
102 do_chunk = !!(headers['Transfer-Encoding'] =~ %r{\Achunked\z}i)
103 do_chunk = false if headers.delete('X-Rainbows-Autochunk') == 'no'
104 headers[CONNECTION] = CLOSE # TODO: allow keep-alive
105 write(response_header(status, headers))
109 mod = do_chunk ? ResponseChunkPipe : ResponsePipe
110 EM.watch(io, mod, self).notify_readable = true
113 def em_write_response(response, alive = false)
114 status, headers, body = response
115 headers = @hp.headers? ? HH.new(headers) : nil if headers
118 if body.respond_to?(:errback) && body.respond_to?(:callback)
119 body.callback { quit }
120 body.errback { quit }
121 # async response, this could be a trickle as is in comet-style apps
123 headers[CONNECTION] = CLOSE
124 write(response_header(status, headers))
126 return write_body_each(self, body)
127 elsif body.respond_to?(:to_path)
128 io = body_to_io(body)
133 headers[CONNECTION] = alive ? KEEP_ALIVE : CLOSE
134 write(response_header(status, headers))
136 stream = stream_file_data(body.to_path)
137 stream.callback { quit } unless alive
139 elsif st.socket? || st.pipe?
140 return stream_response(status, headers, io)
142 # char or block device... WTF? fall through to body.each
146 headers[CONNECTION] = alive ? KEEP_ALIVE : CLOSE
147 write(response_header(status, headers))
149 write_body_each(self, body)
154 async_close = @env[ASYNC_CLOSE] and async_close.succeed
155 @body.respond_to?(:fail) and @body.fail
160 module ResponsePipe # :nodoc: all
161 # garbage avoidance, EM always uses this in a single thread,
162 # so a single buffer for all clients will work safely
165 def initialize(client)
171 @client.write(@io.read_nonblock(16384, BUF))
188 module ResponseChunkPipe # :nodoc: all
192 @client.write("0\r\n\r\n")
198 data = @io.read_nonblock(16384, BUF)
199 @client.write(sprintf("%x\r\n", data.size))
201 @client.write("\r\n")
213 module Server # :nodoc: all
221 return if CUR.size >= MAX
222 io = Rainbows.accept(@io) or return
223 sig = EM.attach_fd(io.fileno, false)
224 CUR[sig] = CL.new(sig, io)
228 # Middleware that will run the app dispatch in a separate thread.
229 # This middleware is automatically loaded by Rainbows! when using
230 # EventMachine and if the app responds to the +deferred?+ method.
231 class TryDefer < Struct.new(:app) # :nodoc: all
234 # the entire app becomes multithreaded, even the root (non-deferred)
235 # thread since any thread can share processes with others
236 Const::RACK_DEFAULTS['rack.multithread'] = true
241 if app.deferred?(env)
242 EM.defer(proc { catch(:async) { app.call(env) } },
243 env[EvCore::ASYNC_CALLBACK])
244 # all of the async/deferred stuff breaks Rack::Lint :<
252 def init_worker_process(worker) # :nodoc:
253 Rainbows::Response.setup(Rainbows::EventMachine::Client)
257 # runs inside each forked worker, this sits around and waits
258 # for connections and doesn't die until the parent dies (or is
259 # given a INT, QUIT, or TERM signal)
260 def worker_loop(worker) # :nodoc:
261 init_worker_process(worker)
262 G.server.app.respond_to?(:deferred?) and
263 G.server.app = TryDefer[G.server.app]
265 # enable them both, should be non-fatal if not supported
268 logger.info "#@use: epoll=#{EM.epoll?} kqueue=#{EM.kqueue?}"
269 client_class = Rainbows.const_get(@use).const_get(:Client)
270 Server.const_set(:MAX, worker_connections + LISTENERS.size)
271 Server.const_set(:CL, client_class)
272 client_class.const_set(:APP, G.server.app)
274 conns = EM.instance_variable_get(:@conns) or
275 raise RuntimeError, "EM @conns instance variable not accessible!"
276 Server.const_set(:CUR, conns)
277 EM.add_periodic_timer(1) do
279 conns.each_value { |c| client_class === c and c.quit }
280 EM.stop if conns.empty? && EM.reactor_running?
283 LISTENERS.map! do |s|
284 EM.watch(s, Server) { |c| c.notify_readable = true }