event_machine: ResponseChunkPipe style cleanup
[rainbows.git] / lib / rainbows / event_machine.rb
blob625357eacfe12ed9921d102d4bfab9c967b4f980
1 # -*- encoding: binary -*-
2 require 'eventmachine'
3 EM::VERSION >= '0.12.10' or abort 'eventmachine 0.12.10 is required'
4 require 'rainbows/ev_core'
6 module Rainbows
8   # Implements a basic single-threaded event model with
9   # {EventMachine}[http://rubyeventmachine.com/].  It is capable of
10   # handling thousands of simultaneous client connections, but with only
11   # a single-threaded app dispatch.  It is suited for slow clients,
12   # and can work with slow applications via asynchronous libraries such as
13   # {async_sinatra}[http://github.com/raggi/async_sinatra],
14   # {Cramp}[http://m.onkey.org/2010/1/7/introducing-cramp],
15   # and {rack-fiber_pool}[http://github.com/mperham/rack-fiber_pool].
16   #
17   # It does not require your Rack application to be thread-safe,
18   # reentrancy is only required for the DevFdResponse body
19   # generator.
20   #
21   # Compatibility: Whatever \EventMachine ~> 0.12.10 and Unicorn both
22   # support, currently Ruby 1.8/1.9.
23   #
24   # This model is compatible with users of "async.callback" in the Rack
25   # environment such as
26   # {async_sinatra}[http://github.com/raggi/async_sinatra].
27   #
28   # For a complete asynchronous framework,
29   # {Cramp}[http://m.onkey.org/2010/1/7/introducing-cramp] is fully
30   # supported when using this concurrency model.
31   #
32   # This model is fully-compatible with
33   # {rack-fiber_pool}[http://github.com/mperham/rack-fiber_pool]
34   # which allows each request to run inside its own \Fiber after
35   # all request processing is complete.
36   #
37   # Merb (and other frameworks/apps) supporting +deferred?+ execution as
38   # documented at http://brainspl.at/articles/2008/04/18/deferred-requests-with-merb-ebb-and-thin
39   # will also get the ability to conditionally defer request processing
40   # to a separate thread.
41   #
42   # This model does not implement as streaming "rack.input" which allows
43   # the Rack application to process data as it arrives.  This means
44   # "rack.input" will be fully buffered in memory or to a temporary file
45   # before the application is entered.
47   module EventMachine
49     include Base
51     class Client < EM::Connection # :nodoc: all
52       include Rainbows::EvCore
53       include Rainbows::Response
54       G = Rainbows::G
56       def initialize(io)
57         @_io = io
58         @body = nil
59       end
61       alias write send_data
62       alias receive_data on_read
64       def quit
65         super
66         close_connection_after_writing
67       end
69       def app_call
70         set_comm_inactivity_timeout 0
71         begin
72           @env[RACK_INPUT] = @input
73           @env[REMOTE_ADDR] = @remote_addr
74           @env[ASYNC_CALLBACK] = method(:em_write_response)
76           # we're not sure if anybody uses this, but Thin sets it, too
77           @env[ASYNC_CLOSE] = EM::DefaultDeferrable.new
79           response = catch(:async) { APP.call(@env.update(RACK_DEFAULTS)) }
81           # too tricky to support pipelining with :async since the
82           # second (pipelined) request could be a stuck behind a
83           # long-running async response
84           (response.nil? || -1 == response[0]) and return @state = :close
86           em_write_response(response, alive = @hp.keepalive? && G.alive)
87           if alive
88             @env.clear
89             @hp.reset
90             @state = :headers
91             # keepalive requests are always body-less, so @input is unchanged
92             @hp.headers(@env, @buf) and next
93             set_comm_inactivity_timeout G.kato
94           end
95           return
96         end while true
97       end
99       # used for streaming sockets and pipes
100       def stream_response(status, headers, io)
101         if headers
102           do_chunk = !!(headers['Transfer-Encoding'] =~ %r{\Achunked\z}i)
103           do_chunk = false if headers.delete('X-Rainbows-Autochunk') == 'no'
104           headers[CONNECTION] = CLOSE # TODO: allow keep-alive
105           write(response_header(status, headers))
106         else
107           do_chunk = false
108         end
109         mod = do_chunk ? ResponseChunkPipe : ResponsePipe
110         EM.watch(io, mod, self).notify_readable = true
111       end
113       def em_write_response(response, alive = false)
114         status, headers, body = response
115         headers = @hp.headers? ? HH.new(headers) : nil if headers
116         @body = body
118         if body.respond_to?(:errback) && body.respond_to?(:callback)
119           body.callback { quit }
120           body.errback { quit }
121           # async response, this could be a trickle as is in comet-style apps
122           if headers
123             headers[CONNECTION] = CLOSE
124             write(response_header(status, headers))
125           end
126           return write_body_each(self, body)
127         elsif body.respond_to?(:to_path)
128           io = body_to_io(body)
129           st = io.stat
131           if st.file?
132             if headers
133               headers[CONNECTION] = alive ? KEEP_ALIVE : CLOSE
134               write(response_header(status, headers))
135             end
136             stream = stream_file_data(body.to_path)
137             stream.callback { quit } unless alive
138             return
139           elsif st.socket? || st.pipe?
140             return stream_response(status, headers, io)
141           end
142           # char or block device... WTF? fall through to body.each
143         end
145         if headers
146           headers[CONNECTION] = alive ? KEEP_ALIVE : CLOSE
147           write(response_header(status, headers))
148         end
149         write_body_each(self, body)
150         quit unless alive
151       end
153       def unbind
154         async_close = @env[ASYNC_CLOSE] and async_close.succeed
155         @body.respond_to?(:fail) and @body.fail
156         @_io.close
157       end
158     end
160     module ResponsePipe # :nodoc: all
161       # garbage avoidance, EM always uses this in a single thread,
162       # so a single buffer for all clients will work safely
163       BUF = ''
165       def initialize(client)
166         @client = client
167       end
169       def notify_readable
170         begin
171           @client.write(@io.read_nonblock(16384, BUF))
172         rescue Errno::EINTR
173           retry
174         rescue Errno::EAGAIN
175           return
176         rescue EOFError
177           detach
178           return
179         end while true
180       end
182       def unbind
183         @io.close
184         @client.quit
185       end
186     end
188     module ResponseChunkPipe # :nodoc: all
189       include ResponsePipe
191       def unbind
192         @client.write("0\r\n\r\n")
193         super
194       end
196       def notify_readable
197         begin
198           data = @io.read_nonblock(16384, BUF)
199           @client.write(sprintf("%x\r\n", data.size))
200           @client.write(data)
201           @client.write("\r\n")
202         rescue Errno::EINTR
203           retry
204         rescue Errno::EAGAIN
205           return
206         rescue EOFError
207           detach
208           return
209         end while true
210       end
211     end
213     module Server # :nodoc: all
215       def close
216         detach
217         @io.close
218       end
220       def notify_readable
221         return if CUR.size >= MAX
222         io = Rainbows.accept(@io) or return
223         sig = EM.attach_fd(io.fileno, false)
224         CUR[sig] = CL.new(sig, io)
225       end
226     end
228     # Middleware that will run the app dispatch in a separate thread.
229     # This middleware is automatically loaded by Rainbows! when using
230     # EventMachine and if the app responds to the +deferred?+ method.
231     class TryDefer < Struct.new(:app) # :nodoc: all
233       def initialize(app)
234         # the entire app becomes multithreaded, even the root (non-deferred)
235         # thread since any thread can share processes with others
236         Const::RACK_DEFAULTS['rack.multithread'] = true
237         super
238       end
240       def call(env)
241         if app.deferred?(env)
242           EM.defer(proc { catch(:async) { app.call(env) } },
243                    env[EvCore::ASYNC_CALLBACK])
244           # all of the async/deferred stuff breaks Rack::Lint :<
245           nil
246         else
247           app.call(env)
248         end
249       end
250     end
252     def init_worker_process(worker) # :nodoc:
253       Rainbows::Response.setup(Rainbows::EventMachine::Client)
254       super
255     end
257     # runs inside each forked worker, this sits around and waits
258     # for connections and doesn't die until the parent dies (or is
259     # given a INT, QUIT, or TERM signal)
260     def worker_loop(worker) # :nodoc:
261       init_worker_process(worker)
262       G.server.app.respond_to?(:deferred?) and
263         G.server.app = TryDefer[G.server.app]
265       # enable them both, should be non-fatal if not supported
266       EM.epoll
267       EM.kqueue
268       logger.info "#@use: epoll=#{EM.epoll?} kqueue=#{EM.kqueue?}"
269       client_class = Rainbows.const_get(@use).const_get(:Client)
270       Server.const_set(:MAX, worker_connections + LISTENERS.size)
271       Server.const_set(:CL, client_class)
272       client_class.const_set(:APP, G.server.app)
273       EM.run {
274         conns = EM.instance_variable_get(:@conns) or
275           raise RuntimeError, "EM @conns instance variable not accessible!"
276         Server.const_set(:CUR, conns)
277         EM.add_periodic_timer(1) do
278           unless G.tick
279             conns.each_value { |c| client_class === c and c.quit }
280             EM.stop if conns.empty? && EM.reactor_running?
281           end
282         end
283         LISTENERS.map! do |s|
284           EM.watch(s, Server) { |c| c.notify_readable = true }
285         end
286       }
287     end
289   end