1 # -*- encoding: binary -*-
2 require 'rainbows/ev_core'
6 class Client < ::Rev::IO
7 include Rainbows::ByteSlice
8 include Rainbows::EvCore
9 include Rainbows::HttpResponse
11 HH = Rack::Utils::HeaderHash
17 @deferred_bodies = [] # for (fast) regular files only
22 close if @deferred_bodies.empty? && @_write_buffer.empty?
25 # override the ::Rev::IO#write method try to write directly to the
26 # kernel socket buffers to avoid an extra userspace copy if
29 if @_write_buffer.empty?
31 w = @_io.write_nonblock(buf)
32 if w == Rack::Utils.bytesize(buf)
33 return on_write_complete
35 # we never care for the return value, but yes, we may return
36 # a "fake" short write from super(buf) if anybody cares.
37 buf = byte_slice(buf, w..-1)
39 break # fall through to super(buf)
47 # queued, optional response bodies, it should only be unpollable "fast"
48 # devices where read(2) is uninterruptable. Unfortunately, NFS and ilk
49 # are also part of this. We'll also stick DeferredResponse bodies in
50 # here to prevent connections from being closed on us.
51 def defer_body(io, out_headers)
52 @deferred_bodies << io
53 schedule_write unless out_headers # triggers a write
57 @_write_buffer.empty? && @deferred_bodies.empty? and close.nil?
60 def rev_write_response(response, out)
61 status, headers, body = response
63 body.respond_to?(:to_path) or
64 return write_response(self, response, out)
66 headers = HH.new(headers)
70 if st.socket? || st.pipe?
71 do_chunk = !!(headers['Transfer-Encoding'] =~ %r{\Achunked\z}i)
72 do_chunk = false if headers.delete('X-Rainbows-Autochunk') == 'no'
73 # too tricky to support keepalive/pipelining when a response can
74 # take an indeterminate amount of time here.
81 # we only want to attach to the Rev::Loop belonging to the
82 # main thread in Ruby 1.9
83 io = DeferredResponse.new(io, self, do_chunk, body).
86 headers.delete('Transfer-Encoding')
87 headers['Content-Length'] ||= st.size.to_s
89 else # char/block device, directory, whatever... nobody cares
90 return write_response(self, response, out)
93 write_header(self, response, out)
99 @env[RACK_INPUT] = @input
100 @env[REMOTE_ADDR] = @remote_addr
101 response = APP.call(@env.update(RACK_DEFAULTS))
102 alive = @hp.keepalive? && G.alive
103 out = [ alive ? CONN_ALIVE : CONN_CLOSE ] if @hp.headers?
105 rev_write_response(response, out)
110 # keepalive requests are always body-less, so @input is unchanged
111 @hp.headers(@env, @buf) and next
112 KATO[self] = Time.now
120 def on_write_complete
121 if body = @deferred_bodies[0]
122 # no socket or pipes, body must be a regular file to continue here
123 return if DeferredResponse === body
127 rescue EOFError # expected at file EOF
128 @deferred_bodies.shift
130 close if :close == @state && @deferred_bodies.empty?
135 close if :close == @state
140 while f = @deferred_bodies.shift
141 DeferredResponse === f or f.close
148 end # module Rainbows