1 # -*- encoding: binary -*-
3 class Rainbows::Coolio::Client < Coolio::IO
4 include Rainbows::EvCore
5 SF = Rainbows::StreamFile
6 CONN = Rainbows::Coolio::CONN
7 KATO = Rainbows::Coolio::KATO
8 ResponsePipe = Rainbows::Coolio::ResponsePipe
9 ResponseChunkPipe = Rainbows::Coolio::ResponseChunkPipe
19 enable unless enabled?
24 close if @deferred.nil? && @_write_buffer.empty?
27 # override the Coolio::IO#write method try to write directly to the
28 # kernel socket buffers to avoid an extra userspace copy if
31 if @_write_buffer.empty?
33 case rv = @_io.kgio_trywrite(buf)
35 return enable_write_watcher
37 break # fall through to super(buf)
39 buf = rv # retry, skb could grow or been drained
42 return handle_error(e)
49 buf = @_io.kgio_tryread(16384)
57 rescue Errno::ECONNRESET
61 # queued, optional response bodies, it should only be unpollable "fast"
62 # devices where read(2) is uninterruptable. Unfortunately, NFS and ilk
63 # are also part of this. We'll also stick ResponsePipe bodies in
64 # here to prevent connections from being closed on us.
70 # allows enabling of write watcher even when read watcher is disabled
72 LOOP # this constant is set in when a worker starts
82 @deferred.nil? && @_write_buffer.empty? and close.nil?
85 # used for streaming sockets and pipes
86 def stream_response_body(body, io, chunk)
87 # we only want to attach to the Coolio::Loop belonging to the
88 # main thread in Ruby 1.9
89 io = (chunk ? ResponseChunkPipe : ResponsePipe).new(io, self, body)
90 defer_body(io.attach(LOOP))
93 def coolio_write_response(response, alive)
94 status, headers, body = response
96 if body.respond_to?(:to_path)
101 if respond_to?(:sendfile_range) && r = sendfile_range(status, headers)
102 status, headers, range = r
103 write_headers(status, headers, alive)
104 defer_body(SF.new(range[0], range[1], io, body)) if range
106 write_headers(status, headers, alive)
107 defer_body(SF.new(0, st.size, io, body))
110 elsif st.socket? || st.pipe?
111 chunk = stream_response_headers(status, headers, alive)
112 return stream_response_body(body, io, chunk)
114 # char or block device... WTF? fall through to body.each
116 write_response(status, headers, body, alive)
121 @env[RACK_INPUT] = @input
122 @env[REMOTE_ADDR] = @_io.kgio_addr
123 response = APP.call(@env.merge!(RACK_DEFAULTS))
125 coolio_write_response(response, alive = @hp.next?)
126 return quit unless alive && :close != @state
131 def on_write_complete
133 when ResponsePipe then return
134 when NilClass # fall through
137 return rev_sendfile(@deferred)
138 rescue EOFError # expected at file EOF
145 close if @_write_buffer.empty?
150 KATO[self] = Time.now
162 if msg = Rainbows::Error.response(e)
163 @_io.kgio_trywrite(msg) rescue nil
172 when ResponsePipe, NilClass
177 Rainbows.server.logger.error("closing #@deferred: #{e}")