unindent most files
[rainbows.git] / lib / rainbows / event_machine.rb
blob2f363a1db0ad2dc3c898f0821cb7c948a8e394a9
1 # -*- encoding: binary -*-
2 require 'eventmachine'
3 EM::VERSION >= '0.12.10' or abort 'eventmachine 0.12.10 is required'
4 require 'rainbows/ev_core'
6 # Implements a basic single-threaded event model with
7 # {EventMachine}[http://rubyeventmachine.com/].  It is capable of
8 # handling thousands of simultaneous client connections, but with only
9 # a single-threaded app dispatch.  It is suited for slow clients,
10 # and can work with slow applications via asynchronous libraries such as
11 # {async_sinatra}[http://github.com/raggi/async_sinatra],
12 # {Cramp}[http://m.onkey.org/2010/1/7/introducing-cramp],
13 # and {rack-fiber_pool}[http://github.com/mperham/rack-fiber_pool].
15 # It does not require your Rack application to be thread-safe,
16 # reentrancy is only required for the DevFdResponse body
17 # generator.
19 # Compatibility: Whatever \EventMachine ~> 0.12.10 and Unicorn both
20 # support, currently Ruby 1.8/1.9.
22 # This model is compatible with users of "async.callback" in the Rack
23 # environment such as
24 # {async_sinatra}[http://github.com/raggi/async_sinatra].
26 # For a complete asynchronous framework,
27 # {Cramp}[http://m.onkey.org/2010/1/7/introducing-cramp] is fully
28 # supported when using this concurrency model.
30 # This model is fully-compatible with
31 # {rack-fiber_pool}[http://github.com/mperham/rack-fiber_pool]
32 # which allows each request to run inside its own \Fiber after
33 # all request processing is complete.
35 # Merb (and other frameworks/apps) supporting +deferred?+ execution as
36 # documented at http://brainspl.at/articles/2008/04/18/deferred-requests-with-merb-ebb-and-thin
37 # will also get the ability to conditionally defer request processing
38 # to a separate thread.
40 # This model does not implement as streaming "rack.input" which allows
41 # the Rack application to process data as it arrives.  This means
42 # "rack.input" will be fully buffered in memory or to a temporary file
43 # before the application is entered.
44 module Rainbows::EventMachine
46   include Rainbows::Base
47   autoload :ResponsePipe, 'rainbows/event_machine/response_pipe'
48   autoload :ResponseChunkPipe, 'rainbows/event_machine/response_chunk_pipe'
49   autoload :TryDefer, 'rainbows/event_machine/try_defer'
51   class Client < EM::Connection # :nodoc: all
52     attr_writer :body
53     include Rainbows::EvCore
55     def initialize(io)
56       @_io = io
57       @body = nil
58     end
60     alias write send_data
62     def receive_data(data)
63       # To avoid clobbering the current streaming response
64       # (often a static file), we do not attempt to process another
65       # request on the same connection until the first is complete
66       if @body
67         @buf << data
68         @_io.shutdown(Socket::SHUT_RD) if @buf.size > 0x1c000
69         EM.next_tick { receive_data('') }
70       else
71         on_read(data)
72       end
73     end
75     def quit
76       super
77       close_connection_after_writing
78     end
80     def app_call
81       set_comm_inactivity_timeout 0
82       @env[RACK_INPUT] = @input
83       @env[REMOTE_ADDR] = @_io.kgio_addr
84       @env[ASYNC_CALLBACK] = method(:em_write_response)
85       @env[ASYNC_CLOSE] = EM::DefaultDeferrable.new
87       response = catch(:async) { APP.call(@env.update(RACK_DEFAULTS)) }
89       # too tricky to support pipelining with :async since the
90       # second (pipelined) request could be a stuck behind a
91       # long-running async response
92       (response.nil? || -1 == response[0]) and return @state = :close
94       alive = @hp.keepalive? && G.alive && G.kato > 0
95       em_write_response(response, alive)
96       if alive
97         @env.clear
98         @hp.reset
99         @state = :headers
100         if @buf.empty?
101           set_comm_inactivity_timeout(G.kato)
102         else
103           EM.next_tick { receive_data('') }
104         end
105       end
106     end
108     def em_write_response(response, alive = false)
109       status, headers, body = response
110       if @hp.headers?
111         headers = HH.new(headers)
112         headers[CONNECTION] = alive ? KEEP_ALIVE : CLOSE
113       else
114         headers = nil
115       end
117       if body.respond_to?(:errback) && body.respond_to?(:callback)
118         @body = body
119         body.callback { quit }
120         body.errback { quit }
121         # async response, this could be a trickle as is in comet-style apps
122         headers[CONNECTION] = CLOSE if headers
123         alive = true
124       elsif body.respond_to?(:to_path)
125         st = File.stat(path = body.to_path)
127         if st.file?
128           write(response_header(status, headers)) if headers
129           @body = stream_file_data(path)
130           @body.errback do
131             body.close if body.respond_to?(:close)
132             quit
133           end
134           @body.callback do
135             body.close if body.respond_to?(:close)
136             @body = nil
137             alive ? receive_data('') : quit
138           end
139           return
140         elsif st.socket? || st.pipe?
141           @body = io = body_to_io(body)
142           chunk = stream_response_headers(status, headers) if headers
143           m = chunk ? ResponseChunkPipe : ResponsePipe
144           return EM.watch(io, m, self, alive, body).notify_readable = true
145         end
146         # char or block device... WTF? fall through to body.each
147       end
149       write(response_header(status, headers)) if headers
150       write_body_each(self, body)
151       quit unless alive
152     end
154     def unbind
155       async_close = @env[ASYNC_CLOSE] and async_close.succeed
156       @body.respond_to?(:fail) and @body.fail
157       begin
158         @_io.close
159       rescue Errno::EBADF
160         # EventMachine's EventableDescriptor::Close() may close
161         # the underlying file descriptor without invalidating the
162         # associated IO object on errors, so @_io.closed? isn't
163         # sufficient.
164       end
165     end
166   end
168   module Server # :nodoc: all
169     def close
170       detach
171       @io.close
172     end
174     def notify_readable
175       return if CUR.size >= MAX
176       io = @io.kgio_tryaccept or return
177       sig = EM.attach_fd(io.fileno, false)
178       CUR[sig] = CL.new(sig, io)
179     end
180   end
182   def init_worker_process(worker) # :nodoc:
183     Rainbows::Response.setup(Rainbows::EventMachine::Client)
184     super
185   end
187   # runs inside each forked worker, this sits around and waits
188   # for connections and doesn't die until the parent dies (or is
189   # given a INT, QUIT, or TERM signal)
190   def worker_loop(worker) # :nodoc:
191     init_worker_process(worker)
192     G.server.app.respond_to?(:deferred?) and
193       G.server.app = TryDefer[G.server.app]
195     # enable them both, should be non-fatal if not supported
196     EM.epoll
197     EM.kqueue
198     logger.info "#@use: epoll=#{EM.epoll?} kqueue=#{EM.kqueue?}"
199     client_class = Rainbows.const_get(@use).const_get(:Client)
200     Server.const_set(:MAX, worker_connections + LISTENERS.size)
201     Server.const_set(:CL, client_class)
202     client_class.const_set(:APP, G.server.app)
203     EM.run {
204       conns = EM.instance_variable_get(:@conns) or
205         raise RuntimeError, "EM @conns instance variable not accessible!"
206       Server.const_set(:CUR, conns)
207       EM.add_periodic_timer(1) do
208         unless G.tick
209           conns.each_value { |c| client_class === c and c.quit }
210           EM.stop if conns.empty? && EM.reactor_running?
211         end
212       end
213       LISTENERS.map! do |s|
214         EM.watch(s, Server) { |c| c.notify_readable = true }
215       end
216     }
217   end