rainbows/rev/* require/autoload cleanup
[rainbows.git] / lib / rainbows / event_machine.rb
bloba307f4c83a506d7f27ba68723ef0b1f5fffebdfe
1 # -*- encoding: binary -*-
2 require 'eventmachine'
3 EM::VERSION >= '0.12.10' or abort 'eventmachine 0.12.10 is required'
5 # Implements a basic single-threaded event model with
6 # {EventMachine}[http://rubyeventmachine.com/].  It is capable of
7 # handling thousands of simultaneous client connections, but with only
8 # a single-threaded app dispatch.  It is suited for slow clients,
9 # and can work with slow applications via asynchronous libraries such as
10 # {async_sinatra}[http://github.com/raggi/async_sinatra],
11 # {Cramp}[http://m.onkey.org/2010/1/7/introducing-cramp],
12 # and {rack-fiber_pool}[http://github.com/mperham/rack-fiber_pool].
14 # It does not require your Rack application to be thread-safe,
15 # reentrancy is only required for the DevFdResponse body
16 # generator.
18 # Compatibility: Whatever \EventMachine ~> 0.12.10 and Unicorn both
19 # support, currently Ruby 1.8/1.9.
21 # This model is compatible with users of "async.callback" in the Rack
22 # environment such as
23 # {async_sinatra}[http://github.com/raggi/async_sinatra].
25 # For a complete asynchronous framework,
26 # {Cramp}[http://m.onkey.org/2010/1/7/introducing-cramp] is fully
27 # supported when using this concurrency model.
29 # This model is fully-compatible with
30 # {rack-fiber_pool}[http://github.com/mperham/rack-fiber_pool]
31 # which allows each request to run inside its own \Fiber after
32 # all request processing is complete.
34 # Merb (and other frameworks/apps) supporting +deferred?+ execution as
35 # documented at http://brainspl.at/articles/2008/04/18/deferred-requests-with-merb-ebb-and-thin
36 # will also get the ability to conditionally defer request processing
37 # to a separate thread.
39 # This model does not implement as streaming "rack.input" which allows
40 # the Rack application to process data as it arrives.  This means
41 # "rack.input" will be fully buffered in memory or to a temporary file
42 # before the application is entered.
43 module Rainbows::EventMachine
45   include Rainbows::Base
46   autoload :ResponsePipe, 'rainbows/event_machine/response_pipe'
47   autoload :ResponseChunkPipe, 'rainbows/event_machine/response_chunk_pipe'
48   autoload :TryDefer, 'rainbows/event_machine/try_defer'
50   class Client < EM::Connection # :nodoc: all
51     attr_writer :body
52     include Rainbows::EvCore
54     def initialize(io)
55       @_io = io
56       @body = nil
57     end
59     alias write send_data
61     def receive_data(data)
62       # To avoid clobbering the current streaming response
63       # (often a static file), we do not attempt to process another
64       # request on the same connection until the first is complete
65       if @body
66         @buf << data
67         @_io.shutdown(Socket::SHUT_RD) if @buf.size > 0x1c000
68         EM.next_tick { receive_data('') }
69       else
70         on_read(data)
71       end
72     end
74     def quit
75       super
76       close_connection_after_writing
77     end
79     def app_call
80       set_comm_inactivity_timeout 0
81       @env[RACK_INPUT] = @input
82       @env[REMOTE_ADDR] = @_io.kgio_addr
83       @env[ASYNC_CALLBACK] = method(:em_write_response)
84       @env[ASYNC_CLOSE] = EM::DefaultDeferrable.new
86       response = catch(:async) { APP.call(@env.update(RACK_DEFAULTS)) }
88       # too tricky to support pipelining with :async since the
89       # second (pipelined) request could be a stuck behind a
90       # long-running async response
91       (response.nil? || -1 == response[0]) and return @state = :close
93       alive = @hp.next? && G.alive && G.kato > 0
94       em_write_response(response, alive)
95       if alive
96         @state = :headers
97         if @buf.empty?
98           set_comm_inactivity_timeout(G.kato)
99         else
100           EM.next_tick { receive_data('') }
101         end
102       end
103     end
105     def em_write_response(response, alive = false)
106       status, headers, body = response
107       if @hp.headers?
108         headers = HH.new(headers)
109         headers[CONNECTION] = alive ? KEEP_ALIVE : CLOSE
110       else
111         headers = nil
112       end
114       if body.respond_to?(:errback) && body.respond_to?(:callback)
115         @body = body
116         body.callback { quit }
117         body.errback { quit }
118         # async response, this could be a trickle as is in comet-style apps
119         headers[CONNECTION] = CLOSE if headers
120         alive = true
121       elsif body.respond_to?(:to_path)
122         st = File.stat(path = body.to_path)
124         if st.file?
125           write(response_header(status, headers)) if headers
126           @body = stream_file_data(path)
127           @body.errback do
128             body.close if body.respond_to?(:close)
129             quit
130           end
131           @body.callback do
132             body.close if body.respond_to?(:close)
133             @body = nil
134             alive ? receive_data('') : quit
135           end
136           return
137         elsif st.socket? || st.pipe?
138           @body = io = body_to_io(body)
139           chunk = stream_response_headers(status, headers) if headers
140           m = chunk ? ResponseChunkPipe : ResponsePipe
141           return EM.watch(io, m, self, alive, body).notify_readable = true
142         end
143         # char or block device... WTF? fall through to body.each
144       end
146       write(response_header(status, headers)) if headers
147       write_body_each(self, body)
148       quit unless alive
149     end
151     def unbind
152       async_close = @env[ASYNC_CLOSE] and async_close.succeed
153       @body.respond_to?(:fail) and @body.fail
154       begin
155         @_io.close
156       rescue Errno::EBADF
157         # EventMachine's EventableDescriptor::Close() may close
158         # the underlying file descriptor without invalidating the
159         # associated IO object on errors, so @_io.closed? isn't
160         # sufficient.
161       end
162     end
163   end
165   module Server # :nodoc: all
166     def close
167       detach
168       @io.close
169     end
171     def notify_readable
172       return if CUR.size >= MAX
173       io = @io.kgio_tryaccept or return
174       sig = EM.attach_fd(io.fileno, false)
175       CUR[sig] = CL.new(sig, io)
176     end
177   end
179   def init_worker_process(worker) # :nodoc:
180     Rainbows::Response.setup(Rainbows::EventMachine::Client)
181     super
182   end
184   # runs inside each forked worker, this sits around and waits
185   # for connections and doesn't die until the parent dies (or is
186   # given a INT, QUIT, or TERM signal)
187   def worker_loop(worker) # :nodoc:
188     init_worker_process(worker)
189     G.server.app.respond_to?(:deferred?) and
190       G.server.app = TryDefer[G.server.app]
192     # enable them both, should be non-fatal if not supported
193     EM.epoll
194     EM.kqueue
195     logger.info "#@use: epoll=#{EM.epoll?} kqueue=#{EM.kqueue?}"
196     client_class = Rainbows.const_get(@use).const_get(:Client)
197     Server.const_set(:MAX, worker_connections + LISTENERS.size)
198     Server.const_set(:CL, client_class)
199     client_class.const_set(:APP, G.server.app)
200     Rainbows::EvCore.setup
201     EM.run {
202       conns = EM.instance_variable_get(:@conns) or
203         raise RuntimeError, "EM @conns instance variable not accessible!"
204       Server.const_set(:CUR, conns)
205       EM.add_periodic_timer(1) do
206         unless G.tick
207           conns.each_value { |c| client_class === c and c.quit }
208           EM.stop if conns.empty? && EM.reactor_running?
209         end
210       end
211       LISTENERS.map! do |s|
212         EM.watch(s, Server) { |c| c.notify_readable = true }
213       end
214     }
215   end