1 # -*- encoding: binary -*-
3 class Rainbows::EventMachine::Client < EM::Connection
4 include Rainbows::EvCore
5 Rainbows.config!(self, :keepalive_timeout)
14 def receive_data(data)
15 # To avoid clobbering the current streaming response
16 # (often a static file), we do not attempt to process another
17 # request on the same connection until the first is complete
21 @_io.shutdown(Socket::SHUT_RD) if @buf.size > 0x1c000
23 EM.next_tick { receive_data(nil) } unless @buf.empty?
25 on_read(data || Z) if (@buf.size > 0) || data
31 close_connection_after_writing if nil == @deferred
35 set_comm_inactivity_timeout 0
36 @env[RACK_INPUT] = input
37 @env[REMOTE_ADDR] = @_io.kgio_addr
38 @env[ASYNC_CALLBACK] = method(:write_async_response)
39 @env[ASYNC_CLOSE] = EM::DefaultDeferrable.new
40 status, headers, body = catch(:async) {
41 APP.call(@env.merge!(RACK_DEFAULTS))
44 (nil == status || -1 == status) ? @deferred = true :
45 ev_write_response(status, headers, body, @hp.next?)
48 def deferred_errback(orig_body)
50 orig_body.close if orig_body.respond_to?(:close)
56 def deferred_callback(orig_body, alive)
58 orig_body.close if orig_body.respond_to?(:close)
60 alive ? receive_data(nil) : quit
64 def ev_write_response(status, headers, body, alive)
65 @state = :headers if alive
66 if body.respond_to?(:errback) && body.respond_to?(:callback)
68 write_headers(status, headers, alive)
70 deferred_errback(body)
71 deferred_callback(body, alive)
73 elsif body.respond_to?(:to_path)
74 st = File.stat(path = body.to_path)
77 write_headers(status, headers, alive)
78 @deferred = stream_file_data(path)
79 deferred_errback(body)
80 deferred_callback(body, alive)
82 elsif st.socket? || st.pipe?
83 io = body_to_io(@deferred = body)
84 chunk = stream_response_headers(status, headers, alive)
85 m = chunk ? Rainbows::EventMachine::ResponseChunkPipe :
86 Rainbows::EventMachine::ResponsePipe
87 return EM.watch(io, m, self).notify_readable = true
89 # char or block device... WTF? fall through to body.each
91 write_response(status, headers, body, alive)
95 set_comm_inactivity_timeout(KEEPALIVE_TIMEOUT)
97 EM.next_tick { receive_data(nil) }
101 quit unless @deferred
106 @deferred.close if @deferred.respond_to?(:close)
108 @hp.keepalive? ? receive_data(nil) : quit
112 async_close = @env[ASYNC_CLOSE] and async_close.succeed
113 @deferred.respond_to?(:fail) and @deferred.fail
117 # EventMachine's EventableDescriptor::Close() may close
118 # the underlying file descriptor without invalidating the
119 # associated IO object on errors, so @_io.closed? isn't