1 # -*- encoding: binary -*-
3 class Rainbows::Coolio::Client < Coolio::IO
4 include Rainbows::EvCore
5 CONN = Rainbows::Coolio::CONN
6 KATO = Rainbows::Coolio::KATO
7 LOOP = Coolio::Loop.default
17 enable unless enabled?
22 close if nil == @deferred && @_write_buffer.empty?
25 # override the Coolio::IO#write method try to write directly to the
26 # kernel socket buffers to avoid an extra userspace copy if
29 if @_write_buffer.empty?
31 case rv = @_io.kgio_trywrite(buf)
33 return enable_write_watcher
35 break # fall through to super(buf)
37 buf = rv # retry, skb could grow or been drained
40 return handle_error(e)
47 buf = @_io.kgio_tryread(16384)
55 rescue Errno::ECONNRESET
59 # allows enabling of write watcher even when read watcher is disabled
67 enable_write_watcher # trigger on_write_complete
71 nil == @deferred && @_write_buffer.empty? and close.nil?
74 # used for streaming sockets and pipes
75 def stream_response_body(body, io, chunk)
76 # we only want to attach to the Coolio::Loop belonging to the
77 # main thread in Ruby 1.9
78 (chunk ? Rainbows::Coolio::ResponseChunkPipe :
79 Rainbows::Coolio::ResponsePipe).new(io, self, body).attach(LOOP)
83 def write_response_path(status, headers, body, alive)
88 defer_file(status, headers, body, alive, io, st)
89 elsif st.socket? || st.pipe?
90 chunk = stream_response_headers(status, headers, alive)
91 stream_response_body(body, io, chunk)
93 # char or block device... WTF?
94 write_response(status, headers, body, alive)
98 def ev_write_response(status, headers, body, alive)
99 if body.respond_to?(:to_path)
100 write_response_path(status, headers, body, alive)
102 write_response(status, headers, body, alive)
104 return quit unless alive && :close != @state
108 def coolio_write_async_response(response)
109 write_async_response(response)
116 @env[RACK_INPUT] = @input
117 @env[REMOTE_ADDR] = @_io.kgio_addr
118 @env[ASYNC_CALLBACK] = method(:coolio_write_async_response)
119 status, headers, body = catch(:async) {
120 APP.call(@env.merge!(RACK_DEFAULTS))
123 (nil == status || -1 == status) ? @deferred = true :
124 ev_write_response(status, headers, body, @hp.next?)
127 def on_write_complete
129 when true then return # #next! will clear this bit
130 when nil # fall through
133 return stream_file_chunk(@deferred)
134 rescue EOFError # expected at file EOF
135 close_deferred # fall through
141 close if @_write_buffer.empty?
146 KATO[self] = Time.now
158 if msg = Rainbows::Error.response(e)
159 @_io.kgio_trywrite(msg) rescue nil
169 @deferred.close if @deferred.respond_to?(:close)
171 Rainbows.server.logger.error("closing #@deferred: #{e}")
183 if IO.method_defined?(:sendfile_nonblock)
184 def defer_file(status, headers, body, alive, io, st)
185 if r = sendfile_range(status, headers)
186 status, headers, range = r
187 write_headers(status, headers, alive)
188 range and defer_file_stream(range[0], range[1], io, body)
190 write_headers(status, headers, alive)
191 defer_file_stream(0, st.size, io, body)
195 def stream_file_chunk(sf) # +sf+ is a Rainbows::StreamFile object
196 sf.offset += (n = @_io.sendfile_nonblock(sf, sf.offset, sf.count))
197 0 == (sf.count -= n) and raise EOFError
203 def defer_file(status, headers, body, alive, io, st)
204 write_headers(status, headers, alive)
205 defer_file_stream(0, st.size, io, body)
208 def stream_file_chunk(body)
209 write(body.to_io.sysread(0x4000))
213 def defer_file_stream(offset, count, io, body)
214 @deferred = Rainbows::StreamFile.new(offset, count, io, body)