common Rainbows.accept method
[rainbows.git] / lib / rainbows / event_machine.rb
blobe28b2320cca59da86482980685b886bb15c1cccf
1 # -*- encoding: binary -*-
2 require 'eventmachine'
3 EM::VERSION >= '0.12.10' or abort 'eventmachine 0.12.10 is required'
4 require 'rainbows/ev_core'
6 module Rainbows
8   # Implements a basic single-threaded event model with
9   # {EventMachine}[http://rubyeventmachine.com/].  It is capable of
10   # handling thousands of simultaneous client connections, but with only
11   # a single-threaded app dispatch.  It is suited for slow clients and
12   # fast applications (applications that do not have slow network
13   # dependencies) or applications that use DevFdResponse for deferrable
14   # response bodies.  It does not require your Rack application to be
15   # thread-safe, reentrancy is only required for the DevFdResponse body
16   # generator.
17   #
18   # Compatibility: Whatever \EventMachine ~> 0.12.10 and Unicorn both
19   # support, currently Ruby 1.8/1.9.
20   #
21   # This model is compatible with users of "async.callback" in the Rack
22   # environment such as
23   # {async_sinatra}[http://github.com/raggi/async_sinatra].
24   #
25   # This model does not implement as streaming "rack.input" which allows
26   # the Rack application to process data as it arrives.  This means
27   # "rack.input" will be fully buffered in memory or to a temporary file
28   # before the application is entered.
30   module EventMachine
32     include Base
34     class Client < EM::Connection
35       include Rainbows::EvCore
36       G = Rainbows::G
38       # Apps may return this Rack response: AsyncResponse = [ -1, {}, [] ]
39       ASYNC_CALLBACK = 'async.callback'.freeze
41       def initialize(io)
42         @_io = io
43       end
45       alias write send_data
46       alias receive_data on_read
48       def quit
49         super
50         close_connection_after_writing
51       end
53       def app_call
54         set_comm_inactivity_timeout 0
55         begin
56           @env[RACK_INPUT] = @input
57           @env[REMOTE_ADDR] = @remote_addr
58           @env[ASYNC_CALLBACK] = method(:response_write)
60           response = catch(:async) { APP.call(@env.update(RACK_DEFAULTS)) }
62           # too tricky to support pipelining with :async since the
63           # second (pipelined) request could be a stuck behind a
64           # long-running async response
65           (response.nil? || -1 == response.first) and return @state = :close
67           alive = @hp.keepalive? && G.alive
68           out = [ alive ? CONN_ALIVE : CONN_CLOSE ] if @hp.headers?
69           response_write(response, out, alive)
71           if alive
72             @env.clear
73             @hp.reset
74             @state = :headers
75             # keepalive requests are always body-less, so @input is unchanged
76             @hp.headers(@env, @buf) and next
77             set_comm_inactivity_timeout G.kato
78           end
79           return
80         end while true
81       end
83       def response_write(response, out = [], alive = false)
84         body = response.last
85         unless body.respond_to?(:to_path)
86           HttpResponse.write(self, response, out)
87           quit unless alive
88           return
89         end
91         headers = Rack::Utils::HeaderHash.new(response[1])
92         path = body.to_path
93         io = body.to_io if body.respond_to?(:to_io)
94         io ||= IO.new($1.to_i) if path =~ %r{\A/dev/fd/(\d+)\z}
95         io ||= File.open(path, 'rb') # could be a named pipe
97         st = io.stat
98         if st.file?
99           headers.delete('Transfer-Encoding')
100           headers['Content-Length'] ||= st.size.to_s
101           response = [ response.first, headers.to_hash, [] ]
102           HttpResponse.write(self, response, out)
103           stream = stream_file_data(path)
104           stream.callback { quit } unless alive
105         elsif st.socket? || st.pipe?
106           do_chunk = !!(headers['Transfer-Encoding'] =~ %r{\Achunked\z}i)
107           do_chunk = false if headers.delete('X-Rainbows-Autochunk') == 'no'
108           if out.nil?
109             do_chunk = false
110           else
111             out[0] = CONN_CLOSE
112           end
113           response = [ response.first, headers.to_hash, [] ]
114           HttpResponse.write(self, response, out)
115           if do_chunk
116             EM.watch(io, ResponseChunkPipe, self).notify_readable = true
117           else
118             EM.enable_proxy(EM.attach(io, ResponsePipe, self), self, 16384)
119           end
120         else
121           HttpResponse.write(self, response, out)
122         end
123       end
125       def unbind
126         @_io.close
127       end
128     end
130     module ResponsePipe
131       def initialize(client)
132         @client = client
133       end
135       def unbind
136         @io.close
137         @client.quit
138       end
139     end
141     module ResponseChunkPipe
142       include ResponsePipe
144       def unbind
145         @client.write("0\r\n\r\n")
146         super
147       end
149       def notify_readable
150         begin
151           data = begin
152             @io.read_nonblock(16384)
153           rescue Errno::EINTR
154             retry
155           rescue Errno::EAGAIN
156             return
157           rescue EOFError
158             detach
159             return
160           end
161           @client.send_data(sprintf("%x\r\n", data.size))
162           @client.send_data(data)
163           @client.send_data("\r\n")
164         end while true
165       end
166     end
168     module Server
170       def close
171         detach
172         @io.close
173       end
175       def notify_readable
176         return if CUR.size >= MAX
177         io = Rainbows.accept(@io) or return
178         sig = EM.attach_fd(io.fileno, false)
179         CUR[sig] = Client.new(sig, io)
180       end
181     end
183     # runs inside each forked worker, this sits around and waits
184     # for connections and doesn't die until the parent dies (or is
185     # given a INT, QUIT, or TERM signal)
186     def worker_loop(worker)
187       init_worker_process(worker)
189       # enable them both, should be non-fatal if not supported
190       EM.epoll
191       EM.kqueue
192       logger.info "EventMachine: epoll=#{EM.epoll?} kqueue=#{EM.kqueue?}"
193       Server.const_set(:MAX, G.server.worker_connections +
194                              HttpServer::LISTENERS.size)
195       EvCore.setup(Client)
196       EM.run {
197         conns = EM.instance_variable_get(:@conns) or
198           raise RuntimeError, "EM @conns instance variable not accessible!"
199         Server.const_set(:CUR, conns)
200         EM.add_periodic_timer(1) do
201           unless G.tick
202             conns.each_value { |client| Client === client and client.quit }
203             EM.stop if conns.empty? && EM.reactor_running?
204           end
205         end
206         LISTENERS.map! do |s|
207           EM.watch(s, Server) { |c| c.notify_readable = true }
208         end
209       }
210     end
212   end