1 # -*- encoding: binary -*-
3 class Rainbows::EventMachine::Client < EM::Connection
5 include Rainbows::EvCore
14 def receive_data(data)
15 # To avoid clobbering the current streaming response
16 # (often a static file), we do not attempt to process another
17 # request on the same connection until the first is complete
21 @_io.shutdown(Socket::SHUT_RD) if @buf.size > 0x1c000
23 EM.next_tick { receive_data(nil) } unless @buf.empty?
25 on_read(data || "") if (@buf.size > 0) || data
31 close_connection_after_writing
35 set_comm_inactivity_timeout 0
36 @env[RACK_INPUT] = @input
37 @env[REMOTE_ADDR] = @_io.kgio_addr
38 @env[ASYNC_CALLBACK] = method(:em_write_response)
39 @env[ASYNC_CLOSE] = EM::DefaultDeferrable.new
41 response = catch(:async) { APP.call(@env.update(RACK_DEFAULTS)) }
43 # too tricky to support pipelining with :async since the
44 # second (pipelined) request could be a stuck behind a
45 # long-running async response
46 (response.nil? || -1 == response[0]) and return @state = :close
50 em_write_response(response, true)
52 set_comm_inactivity_timeout(G.kato)
54 EM.next_tick { receive_data(nil) }
57 em_write_response(response, false)
61 def em_write_response(response, alive = false)
62 status, headers, body = response
64 headers = HH.new(headers)
65 headers[CONNECTION] = alive ? KEEP_ALIVE : CLOSE
70 if body.respond_to?(:errback) && body.respond_to?(:callback)
72 body.callback { quit }
74 # async response, this could be a trickle as is in comet-style apps
75 headers[CONNECTION] = CLOSE if headers
77 elsif body.respond_to?(:to_path)
78 st = File.stat(path = body.to_path)
81 write(response_header(status, headers)) if headers
82 @body = stream_file_data(path)
84 body.close if body.respond_to?(:close)
88 body.close if body.respond_to?(:close)
90 alive ? receive_data(nil) : quit
93 elsif st.socket? || st.pipe?
94 @body = io = body_to_io(body)
95 chunk = stream_response_headers(status, headers) if headers
96 m = chunk ? Rainbows::EventMachine::ResponseChunkPipe :
97 Rainbows::EventMachine::ResponsePipe
98 return EM.watch(io, m, self, alive, body).notify_readable = true
100 # char or block device... WTF? fall through to body.each
103 write(response_header(status, headers)) if headers
104 write_body_each(self, body)
109 @hp.keepalive? ? receive_data(@body = nil) : quit
113 async_close = @env[ASYNC_CLOSE] and async_close.succeed
114 @body.respond_to?(:fail) and @body.fail
118 # EventMachine's EventableDescriptor::Close() may close
119 # the underlying file descriptor without invalidating the
120 # associated IO object on errors, so @_io.closed? isn't