1 # -*- encoding: binary -*-
3 class Rainbows::Coolio::Client < Coolio::IO
4 include Rainbows::EvCore
5 APP = Rainbows.server.app
6 CONN = Rainbows::Coolio::CONN
7 KATO = Rainbows::Coolio::KATO
8 LOOP = Coolio::Loop.default
18 enable unless enabled?
23 close if nil == @deferred && @_write_buffer.empty?
26 # override the Coolio::IO#write method try to write directly to the
27 # kernel socket buffers to avoid an extra userspace copy if
30 if @_write_buffer.empty?
32 case rv = @_io.kgio_trywrite(buf)
34 return enable_write_watcher
36 break # fall through to super(buf)
38 buf = rv # retry, skb could grow or been drained
41 return handle_error(e)
48 buf = @_io.kgio_tryread(16384, RBUF)
56 rescue Errno::ECONNRESET
60 # allows enabling of write watcher even when read watcher is disabled
68 enable_write_watcher # trigger on_write_complete
72 nil == @deferred && @_write_buffer.empty? and close.nil?
75 # used for streaming sockets and pipes
76 def stream_response_body(body, io, chunk)
77 # we only want to attach to the Coolio::Loop belonging to the
78 # main thread in Ruby 1.9
79 (chunk ? Rainbows::Coolio::ResponseChunkPipe :
80 Rainbows::Coolio::ResponsePipe).new(io, self, body).attach(LOOP)
84 def write_response_path(status, headers, body, alive)
89 defer_file(status, headers, body, alive, io, st)
90 elsif st.socket? || st.pipe?
91 chunk = stream_response_headers(status, headers, alive)
92 stream_response_body(body, io, chunk)
94 # char or block device... WTF?
95 write_response(status, headers, body, alive)
99 def ev_write_response(status, headers, body, alive)
100 if body.respond_to?(:to_path)
101 write_response_path(status, headers, body, alive)
103 write_response(status, headers, body, alive)
105 return quit unless alive && :close != @state
112 @env[RACK_INPUT] = input
113 @env[REMOTE_ADDR] = @_io.kgio_addr
114 @env[ASYNC_CALLBACK] = method(:write_async_response)
115 status, headers, body = catch(:async) {
116 APP.call(@env.merge!(RACK_DEFAULTS))
119 (nil == status || -1 == status) ? @deferred = true :
120 ev_write_response(status, headers, body, @hp.next?)
123 def on_write_complete
125 when true then return # #next! will clear this bit
126 when nil # fall through
128 return if stream_file_chunk(@deferred)
129 close_deferred # EOF, fall through
134 close if @_write_buffer.empty?
137 buf = @_io.kgio_tryread(16384, RBUF) or return close
138 String === buf and return on_read(buf)
139 # buf == :wait_readable
142 KATO[self] = Time.now
154 if msg = Rainbows::Error.response(e)
155 @_io.kgio_trywrite(msg) rescue nil
165 @deferred.close if @deferred.respond_to?(:close)
167 Rainbows.server.logger.error("closing #@deferred: #{e}")
179 if IO.method_defined?(:trysendfile)
180 def defer_file(status, headers, body, alive, io, st)
181 if r = sendfile_range(status, headers)
182 status, headers, range = r
183 write_headers(status, headers, alive)
184 range and defer_file_stream(range[0], range[1], io, body)
186 write_headers(status, headers, alive)
187 defer_file_stream(0, st.size, io, body)
191 def stream_file_chunk(sf) # +sf+ is a Rainbows::StreamFile object
192 case n = @_io.trysendfile(sf, sf.offset, sf.count)
195 return if 0 == (sf.count -= n)
197 return enable_write_watcher
203 def defer_file(status, headers, body, alive, io, st)
204 write_headers(status, headers, alive)
205 defer_file_stream(0, st.size, io, body)
208 def stream_file_chunk(body)
209 buf = body.to_io.read(0x4000) and write(buf)
213 def defer_file_stream(offset, count, io, body)
214 @deferred = Rainbows::StreamFile.new(offset, count, io, body)