1 # -*- encoding: binary -*-
3 class Rainbows::EventMachine::Client < EM::Connection
5 include Rainbows::EvCore
14 def receive_data(data)
15 # To avoid clobbering the current streaming response
16 # (often a static file), we do not attempt to process another
17 # request on the same connection until the first is complete
21 @_io.shutdown(Socket::SHUT_RD) if @buf.size > 0x1c000
23 EM.next_tick { receive_data(nil) } unless @buf.empty?
25 on_read(data || "") if (@buf.size > 0) || data
31 close_connection_after_writing
35 set_comm_inactivity_timeout 0
36 @env[RACK_INPUT] = @input
37 @env[REMOTE_ADDR] = @_io.kgio_addr
38 @env[ASYNC_CALLBACK] = method(:em_write_response)
39 @env[ASYNC_CLOSE] = EM::DefaultDeferrable.new
41 response = catch(:async) { APP.call(@env.merge!(RACK_DEFAULTS)) }
43 # too tricky to support pipelining with :async since the
44 # second (pipelined) request could be a stuck behind a
45 # long-running async response
46 (response.nil? || -1 == response[0]) and return @state = :close
50 em_write_response(response, true)
52 set_comm_inactivity_timeout(G.kato)
54 EM.next_tick { receive_data(nil) }
57 em_write_response(response, false)
61 # don't change this method signature, "async.callback" relies on it
62 def em_write_response(response, alive = false)
63 status, headers, body = response
65 if body.respond_to?(:errback) && body.respond_to?(:callback)
67 body.callback { quit }
70 elsif body.respond_to?(:to_path)
71 st = File.stat(path = body.to_path)
74 write_headers(status, headers, alive)
75 @body = stream_file_data(path)
77 body.close if body.respond_to?(:close)
81 body.close if body.respond_to?(:close)
83 alive ? receive_data(nil) : quit
86 elsif st.socket? || st.pipe?
87 io = body_to_io(@body = body)
88 chunk = stream_response_headers(status, headers, alive)
89 m = chunk ? Rainbows::EventMachine::ResponseChunkPipe :
90 Rainbows::EventMachine::ResponsePipe
91 return EM.watch(io, m, self).notify_readable = true
93 # char or block device... WTF? fall through to body.each
95 write_response(status, headers, body, alive)
100 @body.close if @body.respond_to?(:close)
101 @hp.keepalive? ? receive_data(@body = nil) : quit
105 async_close = @env[ASYNC_CLOSE] and async_close.succeed
106 @body.respond_to?(:fail) and @body.fail
110 # EventMachine's EventableDescriptor::Close() may close
111 # the underlying file descriptor without invalidating the
112 # associated IO object on errors, so @_io.closed? isn't