1 # -*- encoding: binary -*-
3 class Rainbows::Coolio::Client < Coolio::IO
4 include Rainbows::EvCore
6 SF = Rainbows::StreamFile
7 CONN = Rainbows::Coolio::CONN
8 KATO = Rainbows::Coolio::KATO
9 ResponsePipe = Rainbows::Coolio::ResponsePipe
10 ResponseChunkPipe = Rainbows::Coolio::ResponseChunkPipe
20 enable unless enabled?
25 close if @deferred.nil? && @_write_buffer.empty?
28 # override the Coolio::IO#write method try to write directly to the
29 # kernel socket buffers to avoid an extra userspace copy if
32 if @_write_buffer.empty?
34 case rv = @_io.kgio_trywrite(buf)
36 return enable_write_watcher
38 break # fall through to super(buf)
40 buf = rv # retry, skb could grow or been drained
43 return handle_error(e)
50 buf = @_io.kgio_tryread(16384)
58 rescue Errno::ECONNRESET
62 # queued, optional response bodies, it should only be unpollable "fast"
63 # devices where read(2) is uninterruptable. Unfortunately, NFS and ilk
64 # are also part of this. We'll also stick ResponsePipe bodies in
65 # here to prevent connections from being closed on us.
71 # allows enabling of write watcher even when read watcher is disabled
73 LOOP # this constant is set in when a worker starts
83 @deferred.nil? && @_write_buffer.empty? and close.nil?
86 # used for streaming sockets and pipes
87 def stream_response_body(body, io, chunk)
88 # we only want to attach to the Coolio::Loop belonging to the
89 # main thread in Ruby 1.9
90 io = (chunk ? ResponseChunkPipe : ResponsePipe).new(io, self, body)
91 defer_body(io.attach(LOOP))
94 def coolio_write_response(response, alive)
95 status, headers, body = response
97 if body.respond_to?(:to_path)
102 if respond_to?(:sendfile_range) && r = sendfile_range(status, headers)
103 status, headers, range = r
104 write_headers(status, headers, alive)
105 defer_body(SF.new(range[0], range[1], io, body)) if range
107 write_headers(status, headers, alive)
108 defer_body(SF.new(0, st.size, io, body))
111 elsif st.socket? || st.pipe?
112 chunk = stream_response_headers(status, headers, alive)
113 return stream_response_body(body, io, chunk)
115 # char or block device... WTF? fall through to body.each
117 write_response(status, headers, body, alive)
122 @env[RACK_INPUT] = @input
123 @env[REMOTE_ADDR] = @_io.kgio_addr
124 response = APP.call(@env.update(RACK_DEFAULTS))
126 coolio_write_response(response, alive = @hp.next?)
127 return quit unless alive && :close != @state
132 def on_write_complete
134 when ResponsePipe then return
135 when NilClass # fall through
138 return rev_sendfile(@deferred)
139 rescue EOFError # expected at file EOF
146 close if @_write_buffer.empty?
151 KATO[self] = Time.now
163 if msg = Rainbows::Error.response(e)
164 @_io.kgio_trywrite(msg) rescue nil
173 when ResponsePipe, NilClass
178 G.server.logger.error("closing #@deferred: #{e}")