1 # -*- encoding: binary -*-
3 class Rainbows::Coolio::Client < Coolio::IO
4 include Rainbows::EvCore
5 APP = Rainbows.server.app
6 CONN = Rainbows::Coolio::CONN
7 KATO = Rainbows::Coolio::KATO
8 LOOP = Coolio::Loop.default
18 enable unless enabled?
23 close if nil == @deferred && @_write_buffer.empty?
26 # override the Coolio::IO#write method try to write directly to the
27 # kernel socket buffers to avoid an extra userspace copy if
30 if @_write_buffer.empty?
32 case rv = @_io.kgio_trywrite(buf)
34 return enable_write_watcher
36 break # fall through to super(buf)
38 buf = rv # retry, skb could grow or been drained
41 return handle_error(e)
48 buf = @_io.kgio_tryread(16384, RBUF)
56 rescue Errno::ECONNRESET
60 # allows enabling of write watcher even when read watcher is disabled
68 enable_write_watcher # trigger on_write_complete
72 nil == @deferred && @_write_buffer.empty? and close.nil?
75 # used for streaming sockets and pipes
76 def stream_response_body(body, io, chunk)
77 # we only want to attach to the Coolio::Loop belonging to the
78 # main thread in Ruby 1.9
79 (chunk ? Rainbows::Coolio::ResponseChunkPipe :
80 Rainbows::Coolio::ResponsePipe).new(io, self, body).attach(LOOP)
84 def write_response_path(status, headers, body, alive)
89 defer_file(status, headers, body, alive, io, st)
90 elsif st.socket? || st.pipe?
91 chunk = stream_response_headers(status, headers, alive)
92 stream_response_body(body, io, chunk)
94 # char or block device... WTF?
95 write_response(status, headers, body, alive)
99 def ev_write_response(status, headers, body, alive)
100 if body.respond_to?(:to_path)
101 write_response_path(status, headers, body, alive)
103 write_response(status, headers, body, alive)
105 return quit unless alive && :close != @state
112 @env[RACK_INPUT] = input
113 @env[REMOTE_ADDR] = @_io.kgio_addr
114 @env[ASYNC_CALLBACK] = method(:write_async_response)
115 status, headers, body = catch(:async) {
116 APP.call(@env.merge!(RACK_DEFAULTS))
119 (nil == status || -1 == status) ? @deferred = true :
120 ev_write_response(status, headers, body, @hp.next?)
123 def on_write_complete
125 when true then return # #next! will clear this bit
126 when nil # fall through
129 return stream_file_chunk(@deferred)
130 rescue EOFError # expected at file EOF
131 close_deferred # fall through
137 close if @_write_buffer.empty?
140 buf = @_io.kgio_tryread(16384, RBUF) or return close
141 String === buf and return on_read(buf)
142 # buf == :wait_readable
145 KATO[self] = Time.now
157 if msg = Rainbows::Error.response(e)
158 @_io.kgio_trywrite(msg) rescue nil
168 @deferred.close if @deferred.respond_to?(:close)
170 Rainbows.server.logger.error("closing #@deferred: #{e}")
182 if IO.method_defined?(:sendfile_nonblock)
183 def defer_file(status, headers, body, alive, io, st)
184 if r = sendfile_range(status, headers)
185 status, headers, range = r
186 write_headers(status, headers, alive)
187 range and defer_file_stream(range[0], range[1], io, body)
189 write_headers(status, headers, alive)
190 defer_file_stream(0, st.size, io, body)
194 def stream_file_chunk(sf) # +sf+ is a Rainbows::StreamFile object
195 sf.offset += (n = @_io.sendfile_nonblock(sf, sf.offset, sf.count))
196 0 == (sf.count -= n) and raise EOFError
202 def defer_file(status, headers, body, alive, io, st)
203 write_headers(status, headers, alive)
204 defer_file_stream(0, st.size, io, body)
207 def stream_file_chunk(body)
208 write(body.to_io.sysread(0x4000))
212 def defer_file_stream(offset, count, io, body)
213 @deferred = Rainbows::StreamFile.new(offset, count, io, body)