1 # -*- encoding: binary -*-
3 require 'rainbows/ev_core'
4 class Rainbows::Rev::Client < ::Rev::IO
5 include Rainbows::EvCore
7 SF = Rainbows::StreamFile
8 CONN = Rainbows::Rev::CONN
9 KATO = Rainbows::Rev::KATO
10 DeferredResponse = Rainbows::Rev::DeferredResponse
11 DeferredChunkResponse = Rainbows::Rev::DeferredChunkResponse
21 enable unless enabled?
26 close if @deferred.nil? && @_write_buffer.empty?
29 # override the ::Rev::IO#write method try to write directly to the
30 # kernel socket buffers to avoid an extra userspace copy if
33 if @_write_buffer.empty?
35 case rv = @_io.kgio_trywrite(buf)
37 return enable_write_watcher
39 break # fall through to super(buf)
41 buf = rv # retry, skb could grow or been drained
44 return handle_error(e)
51 buf = @_io.kgio_tryread(16384)
59 rescue Errno::ECONNRESET
63 # queued, optional response bodies, it should only be unpollable "fast"
64 # devices where read(2) is uninterruptable. Unfortunately, NFS and ilk
65 # are also part of this. We'll also stick DeferredResponse bodies in
66 # here to prevent connections from being closed on us.
72 # allows enabling of write watcher even when read watcher is disabled
74 LOOP # this constant is set in when a worker starts
83 @deferred.nil? && @_write_buffer.empty? and close.nil?
86 # used for streaming sockets and pipes
87 def stream_response(status, headers, io, body)
88 c = stream_response_headers(status, headers) if headers
89 # we only want to attach to the Rev::Loop belonging to the
90 # main thread in Ruby 1.9
91 io = (c ? DeferredChunkResponse : DeferredResponse).new(io, self, body)
92 defer_body(io.attach(LOOP))
95 def rev_write_response(response, alive)
96 status, headers, body = response
97 headers = @hp.headers? ? HH.new(headers) : nil
99 headers[CONNECTION] = alive ? KEEP_ALIVE : CLOSE if headers
100 if body.respond_to?(:to_path)
101 io = body_to_io(body)
105 offset, count = 0, st.size
107 if range = make_range!(@env, status, headers)
108 status, offset, count = range
110 write(response_header(status, headers))
112 return defer_body(SF.new(offset, count, io, body))
113 elsif st.socket? || st.pipe?
114 return stream_response(status, headers, io, body)
116 # char or block device... WTF? fall through to body.each
118 write(response_header(status, headers)) if headers
119 write_body_each(self, body, nil)
124 @env[RACK_INPUT] = @input
125 @env[REMOTE_ADDR] = @_io.kgio_addr
126 response = APP.call(@env.update(RACK_DEFAULTS))
128 rev_write_response(response, alive = @hp.next? && G.alive)
129 return quit unless alive && :close != @state
134 def on_write_complete
136 when DeferredResponse then return
137 when NilClass # fall through
140 return rev_sendfile(@deferred)
141 rescue EOFError # expected at file EOF
148 close if @_write_buffer.empty?
153 KATO[self] = Time.now
165 if msg = Rainbows::Error.response(e)
166 @_io.kgio_trywrite(msg) rescue nil
175 when DeferredResponse, NilClass
180 G.server.logger.error("closing #@deferred: #{e}")