event_machine: better handling of staggered pipelines
[rainbows.git] / lib / rainbows / event_machine.rb
blob757817d6c5c2d3de537c7e86273a3b8dba256d57
1 # -*- encoding: binary -*-
2 require 'eventmachine'
3 EM::VERSION >= '0.12.10' or abort 'eventmachine 0.12.10 is required'
4 require 'rainbows/ev_core'
6 module Rainbows
8   # Implements a basic single-threaded event model with
9   # {EventMachine}[http://rubyeventmachine.com/].  It is capable of
10   # handling thousands of simultaneous client connections, but with only
11   # a single-threaded app dispatch.  It is suited for slow clients,
12   # and can work with slow applications via asynchronous libraries such as
13   # {async_sinatra}[http://github.com/raggi/async_sinatra],
14   # {Cramp}[http://m.onkey.org/2010/1/7/introducing-cramp],
15   # and {rack-fiber_pool}[http://github.com/mperham/rack-fiber_pool].
16   #
17   # It does not require your Rack application to be thread-safe,
18   # reentrancy is only required for the DevFdResponse body
19   # generator.
20   #
21   # Compatibility: Whatever \EventMachine ~> 0.12.10 and Unicorn both
22   # support, currently Ruby 1.8/1.9.
23   #
24   # This model is compatible with users of "async.callback" in the Rack
25   # environment such as
26   # {async_sinatra}[http://github.com/raggi/async_sinatra].
27   #
28   # For a complete asynchronous framework,
29   # {Cramp}[http://m.onkey.org/2010/1/7/introducing-cramp] is fully
30   # supported when using this concurrency model.
31   #
32   # This model is fully-compatible with
33   # {rack-fiber_pool}[http://github.com/mperham/rack-fiber_pool]
34   # which allows each request to run inside its own \Fiber after
35   # all request processing is complete.
36   #
37   # Merb (and other frameworks/apps) supporting +deferred?+ execution as
38   # documented at http://brainspl.at/articles/2008/04/18/deferred-requests-with-merb-ebb-and-thin
39   # will also get the ability to conditionally defer request processing
40   # to a separate thread.
41   #
42   # This model does not implement as streaming "rack.input" which allows
43   # the Rack application to process data as it arrives.  This means
44   # "rack.input" will be fully buffered in memory or to a temporary file
45   # before the application is entered.
47   module EventMachine
49     include Base
50     autoload :ResponsePipe, 'rainbows/event_machine/response_pipe'
51     autoload :ResponseChunkPipe, 'rainbows/event_machine/response_chunk_pipe'
52     autoload :TryDefer, 'rainbows/event_machine/try_defer'
54     class Client < EM::Connection # :nodoc: all
55       attr_writer :body
56       include Rainbows::EvCore
57       G = Rainbows::G
59       def initialize(io)
60         @_io = io
61         @body = nil
62       end
64       alias write send_data
66       def receive_data(data)
67         # To avoid clobbering the current streaming response
68         # (often a static file), we do not attempt to process another
69         # request on the same connection until the first is complete
70         if @body
71           @buf << data
72           @_io.shutdown(Socket::SHUT_RD) if @buf.size > 0x1c000
73           return EM.next_tick { receive_data('') }
74         else
75           on_read(data)
76         end
77       end
79       def quit
80         super
81         close_connection_after_writing
82       end
84       def app_call
85         set_comm_inactivity_timeout 0
86         @env[RACK_INPUT] = @input
87         @env[REMOTE_ADDR] = @remote_addr
88         @env[ASYNC_CALLBACK] = method(:em_write_response)
89         @env[ASYNC_CLOSE] = EM::DefaultDeferrable.new
91         response = catch(:async) { APP.call(@env.update(RACK_DEFAULTS)) }
93         # too tricky to support pipelining with :async since the
94         # second (pipelined) request could be a stuck behind a
95         # long-running async response
96         (response.nil? || -1 == response[0]) and return @state = :close
98         em_write_response(response, alive = @hp.keepalive? && G.alive)
99         if alive
100           @env.clear
101           @hp.reset
102           @state = :headers
103           if @buf.empty?
104             set_comm_inactivity_timeout(G.kato)
105           else
106             EM.next_tick { receive_data('') }
107           end
108         end
109       end
111       def em_write_response(response, alive = false)
112         status, headers, body = response
113         if @hp.headers?
114           headers = HH.new(headers)
115           headers[CONNECTION] = alive ? KEEP_ALIVE : CLOSE
116         else
117           headers = nil
118         end
120         if body.respond_to?(:errback) && body.respond_to?(:callback)
121           @body = body
122           body.callback { quit }
123           body.errback { quit }
124           # async response, this could be a trickle as is in comet-style apps
125           headers[CONNECTION] = CLOSE if headers
126           alive = true
127         elsif body.respond_to?(:to_path)
128           st = File.stat(path = body.to_path)
130           if st.file?
131             write(response_header(status, headers)) if headers
132             @body = stream_file_data(path)
133             @body.errback do
134               body.close if body.respond_to?(:close)
135               quit
136             end
137             @body.callback do
138               body.close if body.respond_to?(:close)
139               @body = nil
140               alive ? receive_data('') : quit
141             end
142             return
143           elsif st.socket? || st.pipe?
144             @body = io = body_to_io(body)
145             chunk = stream_response_headers(status, headers) if headers
146             m = chunk ? ResponseChunkPipe : ResponsePipe
147             return EM.watch(io, m, self, alive, body).notify_readable = true
148           end
149           # char or block device... WTF? fall through to body.each
150         end
152         write(response_header(status, headers)) if headers
153         write_body_each(self, body)
154         quit unless alive
155       end
157       def unbind
158         async_close = @env[ASYNC_CLOSE] and async_close.succeed
159         @body.respond_to?(:fail) and @body.fail
160         @_io.close unless @_io.closed?
161       end
162     end
164     module Server # :nodoc: all
166       def close
167         detach
168         @io.close
169       end
171       def notify_readable
172         return if CUR.size >= MAX
173         io = Rainbows.accept(@io) or return
174         sig = EM.attach_fd(io.fileno, false)
175         CUR[sig] = CL.new(sig, io)
176       end
177     end
179     def init_worker_process(worker) # :nodoc:
180       Rainbows::Response.setup(Rainbows::EventMachine::Client)
181       super
182     end
184     # runs inside each forked worker, this sits around and waits
185     # for connections and doesn't die until the parent dies (or is
186     # given a INT, QUIT, or TERM signal)
187     def worker_loop(worker) # :nodoc:
188       init_worker_process(worker)
189       G.server.app.respond_to?(:deferred?) and
190         G.server.app = TryDefer[G.server.app]
192       # enable them both, should be non-fatal if not supported
193       EM.epoll
194       EM.kqueue
195       logger.info "#@use: epoll=#{EM.epoll?} kqueue=#{EM.kqueue?}"
196       client_class = Rainbows.const_get(@use).const_get(:Client)
197       Server.const_set(:MAX, worker_connections + LISTENERS.size)
198       Server.const_set(:CL, client_class)
199       client_class.const_set(:APP, G.server.app)
200       EM.run {
201         conns = EM.instance_variable_get(:@conns) or
202           raise RuntimeError, "EM @conns instance variable not accessible!"
203         Server.const_set(:CUR, conns)
204         EM.add_periodic_timer(1) do
205           unless G.tick
206             conns.each_value { |c| client_class === c and c.quit }
207             EM.stop if conns.empty? && EM.reactor_running?
208           end
209         end
210         LISTENERS.map! do |s|
211           EM.watch(s, Server) { |c| c.notify_readable = true }
212         end
213       }
214     end
216   end