ev_core: force input to be given to app_call
[rainbows.git] / lib / rainbows / event_machine / client.rb
blob8c2549da04122727b7ff79f060211dcb4e387645
1 # -*- encoding: binary -*-
2 # :enddoc:
3 class Rainbows::EventMachine::Client < EM::Connection
4   include Rainbows::EvCore
6   def initialize(io)
7     @_io = io
8     @deferred = nil
9   end
11   alias write send_data
13   def receive_data(data)
14     # To avoid clobbering the current streaming response
15     # (often a static file), we do not attempt to process another
16     # request on the same connection until the first is complete
17     if @deferred
18       if data
19         @buf << data
20         @_io.shutdown(Socket::SHUT_RD) if @buf.size > 0x1c000
21       end
22       EM.next_tick { receive_data(nil) } unless @buf.empty?
23     else
24       on_read(data || Z) if (@buf.size > 0) || data
25     end
26   end
28   def quit
29     super
30     close_connection_after_writing
31   end
33   def app_call input
34     set_comm_inactivity_timeout 0
35     @env[RACK_INPUT] = input
36     @env[REMOTE_ADDR] = @_io.kgio_addr
37     @env[ASYNC_CALLBACK] = method(:write_async_response)
38     @env[ASYNC_CLOSE] = EM::DefaultDeferrable.new
39     status, headers, body = catch(:async) {
40       APP.call(@env.merge!(RACK_DEFAULTS))
41     }
43     (nil == status || -1 == status) ? @deferred = true :
44       ev_write_response(status, headers, body, @hp.next?)
45   end
47   def deferred_errback(orig_body)
48     @deferred.errback do
49       orig_body.close if orig_body.respond_to?(:close)
50       quit
51     end
52   end
54   def deferred_callback(orig_body, alive)
55     @deferred.callback do
56       orig_body.close if orig_body.respond_to?(:close)
57       @deferred = nil
58       alive ? receive_data(nil) : quit
59     end
60   end
62   def ev_write_response(status, headers, body, alive)
63     @state = :headers if alive
64     if body.respond_to?(:errback) && body.respond_to?(:callback)
65       @deferred = body
66       deferred_errback(body)
67       deferred_callback(body, alive)
68     elsif body.respond_to?(:to_path)
69       st = File.stat(path = body.to_path)
71       if st.file?
72         write_headers(status, headers, alive)
73         @deferred = stream_file_data(path)
74         deferred_errback(body)
75         deferred_callback(body, alive)
76         return
77       elsif st.socket? || st.pipe?
78         io = body_to_io(@deferred = body)
79         chunk = stream_response_headers(status, headers, alive)
80         m = chunk ? Rainbows::EventMachine::ResponseChunkPipe :
81                     Rainbows::EventMachine::ResponsePipe
82         return EM.watch(io, m, self).notify_readable = true
83       end
84       # char or block device... WTF? fall through to body.each
85     end
86     write_response(status, headers, body, alive)
87     if alive
88       if @deferred.nil?
89         if @buf.empty?
90           set_comm_inactivity_timeout(Rainbows.keepalive_timeout)
91         else
92           EM.next_tick { receive_data(nil) }
93         end
94       end
95     else
96       quit unless @deferred
97     end
98   end
100   def next!
101     @deferred.close if @deferred.respond_to?(:close)
102     @hp.keepalive? ? receive_data(@deferred = nil) : quit
103   end
105   def unbind
106     async_close = @env[ASYNC_CLOSE] and async_close.succeed
107     @deferred.respond_to?(:fail) and @deferred.fail
108     begin
109       @_io.close
110     rescue Errno::EBADF
111       # EventMachine's EventableDescriptor::Close() may close
112       # the underlying file descriptor without invalidating the
113       # associated IO object on errors, so @_io.closed? isn't
114       # sufficient.
115     end
116   end