eventmachine: get basic tests working
[rainbows.git] / lib / rainbows / event_machine.rb
blob176bf5176cb8d4ce2fcc58b9af36696181c35f12
1 # -*- encoding: binary -*-
2 require 'eventmachine'
3 require 'rainbows/ev_core'
5 module Rainbows
7   # Implements a basic single-threaded event model with
8   # {EventMachine}[http://rubyeventmachine.com/].  It is capable of
9   # handling thousands of simultaneous client connections, but with only
10   # a single-threaded app dispatch.  It is suited for slow clients and
11   # fast applications (applications that do not have slow network
12   # dependencies) or applications that use DevFdResponse for deferrable
13   # response bodies.  It does not require your Rack application to be
14   # thread-safe, reentrancy is only required for the DevFdResponse body
15   # generator.
16   #
17   # Compatibility: Whatever \EventMachine and Unicorn both  support,
18   # currently Ruby 1.8/1.9.
19   #
20   # This model does not implement as streaming "rack.input" which allows
21   # the Rack application to process data as it arrives.  This means
22   # "rack.input" will be fully buffered in memory or to a temporary file
23   # before the application is entered.
25   module EventMachine
27     include Base
29     class Client < EM::Connection
30       include Rainbows::EvCore
31       G = Rainbows::G
33       def initialize(io)
34         @_io = io
35       end
37       alias write send_data
38       alias receive_data on_read
40       def quit
41         super
42         close_connection_after_writing
43       end
45       def app_call
46         begin
47           (@env[RACK_INPUT] = @input).rewind
48           alive = @hp.keepalive?
49           @env[REMOTE_ADDR] = @remote_addr
50           response = G.app.call(@env.update(RACK_DEFAULTS))
51           alive &&= G.alive
52           out = [ alive ? CONN_ALIVE : CONN_CLOSE ] if @hp.headers?
54           HttpResponse.write(self, response, out)
55           if alive
56             @env.clear
57             @hp.reset
58             @state = :headers
59             # keepalive requests are always body-less, so @input is unchanged
60             @hp.headers(@env, @buf) and next
61           else
62             quit
63           end
64           return
65         end while true
66       end
68       def on_write_complete
69         if body = @deferred_bodies.first
70           return if DeferredResponse === body
71           begin
72             begin
73               write(body.sysread(CHUNK_SIZE))
74             rescue EOFError # expected at file EOF
75               @deferred_bodies.shift
76               body.close
77               close if :close == @state && @deferred_bodies.empty?
78             end
79           rescue Object => e
80             handle_error(e)
81           end
82         else
83           close if :close == @state
84         end
85       end
87     end
89     module Server
91       def initialize(listener, conns)
92         @l = listener
93         @limit = Rainbows::G.max + HttpServer::LISTENERS.size
94         @em_conns = conns
95       end
97       def close
98         detach
99         @l.close
100       end
102       def notify_readable
103         return if @em_conns.size >= @limit
104         begin
105           io = @l.accept_nonblock
106           sig = EM.attach_fd(io.fileno, false, false)
107           @em_conns[sig] = Client.new(sig, io)
108         rescue Errno::EAGAIN, Errno::ECONNABORTED
109         end
110       end
111     end
113     # runs inside each forked worker, this sits around and waits
114     # for connections and doesn't die until the parent dies (or is
115     # given a INT, QUIT, or TERM signal)
116     def worker_loop(worker)
117       init_worker_process(worker)
118       m = 0
119       EM.run {
120         conns = EM.instance_variable_get(:@conns) or
121           raise RuntimeError, "EM @conns instance variable not accessible!"
122         EM.add_periodic_timer(1) do
123           worker.tmp.chmod(m = 0 == m ? 1 : 0)
124           unless G.alive
125             conns.each_value { |client| Client === client and client.quit }
126             EM.stop if conns.empty? && EM.reactor_running?
127           end
128         end
129         LISTENERS.map! { |s| EM.attach(s, Server, s, conns) }
130       }
131     end
133   end