avoid HttpParser#keepalive? and HttpParser#reset
[rainbows.git] / lib / rainbows / rev / client.rb
blob8c397924910aa643b1230cba87e8ff536bcf02a2
1 # -*- encoding: binary -*-
2 # :enddoc:
3 require 'rainbows/ev_core'
4 class Rainbows::Rev::Client < ::Rev::IO
5   include Rainbows::EvCore
6   G = Rainbows::G
7   SF = Rainbows::StreamFile
8   CONN = Rainbows::Rev::CONN
9   KATO = Rainbows::Rev::KATO
10   DeferredResponse = Rainbows::Rev::DeferredResponse
11   DeferredChunkResponse = Rainbows::Rev::DeferredChunkResponse
13   def initialize(io)
14     CONN[self] = false
15     super(io)
16     post_init
17     @deferred = nil
18   end
20   def want_more
21     enable unless enabled?
22   end
24   def quit
25     super
26     close if @deferred.nil? && @_write_buffer.empty?
27   end
29   # override the ::Rev::IO#write method try to write directly to the
30   # kernel socket buffers to avoid an extra userspace copy if
31   # possible.
32   def write(buf)
33     if @_write_buffer.empty?
34       begin
35         case rv = @_io.kgio_trywrite(buf)
36         when nil
37           return enable_write_watcher
38         when :wait_writable
39           break # fall through to super(buf)
40         when String
41           buf = rv # retry, skb could grow or been drained
42         end
43       rescue => e
44         return handle_error(e)
45       end while true
46     end
47     super(buf)
48   end
50   def on_readable
51     buf = @_io.kgio_tryread(16384)
52     case buf
53     when :wait_readable
54     when nil # eof
55       close
56     else
57       on_read buf
58     end
59   rescue Errno::ECONNRESET
60     close
61   end
63   # queued, optional response bodies, it should only be unpollable "fast"
64   # devices where read(2) is uninterruptable.  Unfortunately, NFS and ilk
65   # are also part of this.  We'll also stick DeferredResponse bodies in
66   # here to prevent connections from being closed on us.
67   def defer_body(io)
68     @deferred = io
69     enable_write_watcher
70   end
72   # allows enabling of write watcher even when read watcher is disabled
73   def evloop
74     LOOP # this constant is set in when a worker starts
75   end
77   def next!
78     @deferred = nil
79     enable_write_watcher
80   end
82   def timeout?
83     @deferred.nil? && @_write_buffer.empty? and close.nil?
84   end
86   # used for streaming sockets and pipes
87   def stream_response(status, headers, io, body)
88     c = stream_response_headers(status, headers) if headers
89     # we only want to attach to the Rev::Loop belonging to the
90     # main thread in Ruby 1.9
91     io = (c ? DeferredChunkResponse : DeferredResponse).new(io, self, body)
92     defer_body(io.attach(LOOP))
93   end
95   def rev_write_response(response, alive)
96     status, headers, body = response
97     headers = @hp.headers? ? HH.new(headers) : nil
99     headers[CONNECTION] = alive ? KEEP_ALIVE : CLOSE if headers
100     if body.respond_to?(:to_path)
101       io = body_to_io(body)
102       st = io.stat
104       if st.file?
105         offset, count = 0, st.size
106         if headers
107           if range = make_range!(@env, status, headers)
108             status, offset, count = range
109           end
110           write(response_header(status, headers))
111         end
112         return defer_body(SF.new(offset, count, io, body))
113       elsif st.socket? || st.pipe?
114         return stream_response(status, headers, io, body)
115       end
116       # char or block device... WTF? fall through to body.each
117     end
118     write(response_header(status, headers)) if headers
119     write_body_each(self, body, nil)
120   end
122   def app_call
123     KATO.delete(self)
124     @env[RACK_INPUT] = @input
125     @env[REMOTE_ADDR] = @_io.kgio_addr
126     response = APP.call(@env.update(RACK_DEFAULTS))
128     rev_write_response(response, alive = @hp.next? && G.alive)
129     return quit unless alive && :close != @state
130     @state = :headers
131     disable if enabled?
132   end
134   def on_write_complete
135     case @deferred
136     when DeferredResponse then return
137     when NilClass # fall through
138     else
139       begin
140         return rev_sendfile(@deferred)
141       rescue EOFError # expected at file EOF
142         close_deferred
143       end
144     end
146     case @state
147     when :close
148       close if @_write_buffer.empty?
149     when :headers
150       if @buf.empty?
151         unless enabled?
152           enable
153           KATO[self] = Time.now
154         end
155       else
156         on_read("")
157       end
158     end
159     rescue => e
160       handle_error(e)
161   end
163   def handle_error(e)
164     close_deferred
165     if msg = Rainbows::Error.response(e)
166       @_io.kgio_trywrite(msg) rescue nil
167     end
168     @_write_buffer.clear
169     ensure
170       quit
171   end
173   def close_deferred
174     case @deferred
175     when DeferredResponse, NilClass
176     else
177       begin
178         @deferred.close
179       rescue => e
180         G.server.logger.error("closing #@deferred: #{e}")
181       end
182       @deferred = nil
183     end
184   end
186   def on_close
187     close_deferred
188     CONN.delete(self)
189     KATO.delete(self)
190   end