1 # -*- encoding: binary -*-
3 class Rainbows::Coolio::Client < Coolio::IO
4 include Rainbows::EvCore
5 APP = Rainbows.server.app
6 CONN = Rainbows::Coolio::CONN
7 KATO = Rainbows::Coolio::KATO
8 LOOP = Coolio::Loop.default
18 enable unless enabled?
23 close if nil == @deferred && @_write_buffer.empty?
26 # override the Coolio::IO#write method try to write directly to the
27 # kernel socket buffers to avoid an extra userspace copy if
30 if @_write_buffer.empty?
32 case rv = @_io.kgio_trywrite(buf)
34 return enable_write_watcher
36 break # fall through to super(buf)
38 buf = rv # retry, skb could grow or been drained
41 return handle_error(e)
48 buf = @_io.kgio_tryread(CLIENT_HEADER_BUFFER_SIZE, RBUF)
56 rescue Errno::ECONNRESET
60 # allows enabling of write watcher even when read watcher is disabled
68 enable_write_watcher # trigger on_write_complete
72 if nil == @deferred && @_write_buffer.empty?
80 # used for streaming sockets and pipes
81 def stream_response_body(body, io, chunk)
82 # we only want to attach to the Coolio::Loop belonging to the
83 # main thread in Ruby 1.9
84 (chunk ? Rainbows::Coolio::ResponseChunkPipe :
85 Rainbows::Coolio::ResponsePipe).new(io, self, body).attach(LOOP)
89 def write_response_path(status, headers, body, alive)
94 defer_file(status, headers, body, alive, io, st)
95 elsif st.socket? || st.pipe?
96 chunk = stream_response_headers(status, headers, alive)
97 stream_response_body(body, io, chunk)
99 # char or block device... WTF?
100 write_response(status, headers, body, alive)
104 def ev_write_response(status, headers, body, alive)
105 if body.respond_to?(:to_path)
106 write_response_path(status, headers, body, alive)
108 write_response(status, headers, body, alive)
110 return quit unless alive && :close != @state
117 @env[RACK_INPUT] = input
118 @env[REMOTE_ADDR] = @_io.kgio_addr
119 @env[ASYNC_CALLBACK] = method(:write_async_response)
120 status, headers, body = catch(:async) {
121 APP.call(@env.merge!(RACK_DEFAULTS))
124 (nil == status || -1 == status) ? @deferred = true :
125 ev_write_response(status, headers, body, @hp.next?)
128 def on_write_complete
130 when true then return # #next! will clear this bit
131 when nil # fall through
133 return if stream_file_chunk(@deferred)
134 close_deferred # EOF, fall through
139 close if @_write_buffer.empty?
142 buf = @_io.kgio_tryread(CLIENT_HEADER_BUFFER_SIZE, RBUF) or return close
143 String === buf and return on_read(buf)
144 # buf == :wait_readable
147 KATO[self] = Time.now
159 if msg = Rainbows::Error.response(e)
160 @_io.kgio_trywrite(msg) rescue nil
170 @deferred.close if @deferred.respond_to?(:close)
172 Unicorn.log_error(Rainbows.server.logger,
173 "closing deferred=#{@deferred.inspect}", e)
185 if IO.method_defined?(:trysendfile)
186 def defer_file(status, headers, body, alive, io, st)
187 if r = sendfile_range(status, headers)
188 status, headers, range = r
189 write_headers(status, headers, alive)
190 range and defer_file_stream(range[0], range[1], io, body)
192 write_headers(status, headers, alive)
193 defer_file_stream(0, st.size, io, body)
197 def stream_file_chunk(sf) # +sf+ is a Rainbows::StreamFile object
198 case n = @_io.trysendfile(sf, sf.offset, sf.count)
201 return if 0 == (sf.count -= n)
203 return enable_write_watcher
209 def defer_file(status, headers, body, alive, io, st)
210 write_headers(status, headers, alive)
211 defer_file_stream(0, st.size, io, body)
214 def stream_file_chunk(body)
215 buf = body.to_io.read(0x4000) and write(buf)
219 def defer_file_stream(offset, count, io, body)
220 @deferred = Rainbows::StreamFile.new(offset, count, io, body)