simplify per-client keepalive state checks
[rainbows.git] / lib / rainbows / coolio / client.rb
blob7b410260c2ebf9e15b664b1c8ab9eccfdb666186
1 # -*- encoding: binary -*-
2 # :enddoc:
3 class Rainbows::Coolio::Client < Coolio::IO
4   include Rainbows::EvCore
5   G = Rainbows::G
6   SF = Rainbows::StreamFile
7   CONN = Rainbows::Coolio::CONN
8   KATO = Rainbows::Coolio::KATO
9   DeferredResponse = Rainbows::Coolio::DeferredResponse
10   DeferredChunkResponse = Rainbows::Coolio::DeferredChunkResponse
12   def initialize(io)
13     CONN[self] = false
14     super(io)
15     post_init
16     @deferred = nil
17   end
19   def want_more
20     enable unless enabled?
21   end
23   def quit
24     super
25     close if @deferred.nil? && @_write_buffer.empty?
26   end
28   # override the Coolio::IO#write method try to write directly to the
29   # kernel socket buffers to avoid an extra userspace copy if
30   # possible.
31   def write(buf)
32     if @_write_buffer.empty?
33       begin
34         case rv = @_io.kgio_trywrite(buf)
35         when nil
36           return enable_write_watcher
37         when :wait_writable
38           break # fall through to super(buf)
39         when String
40           buf = rv # retry, skb could grow or been drained
41         end
42       rescue => e
43         return handle_error(e)
44       end while true
45     end
46     super(buf)
47   end
49   def on_readable
50     buf = @_io.kgio_tryread(16384)
51     case buf
52     when :wait_readable
53     when nil # eof
54       close
55     else
56       on_read buf
57     end
58   rescue Errno::ECONNRESET
59     close
60   end
62   # queued, optional response bodies, it should only be unpollable "fast"
63   # devices where read(2) is uninterruptable.  Unfortunately, NFS and ilk
64   # are also part of this.  We'll also stick DeferredResponse bodies in
65   # here to prevent connections from being closed on us.
66   def defer_body(io)
67     @deferred = io
68     enable_write_watcher
69   end
71   # allows enabling of write watcher even when read watcher is disabled
72   def evloop
73     LOOP # this constant is set in when a worker starts
74   end
76   def next!
77     attached? or return
78     @deferred = nil
79     enable_write_watcher
80   end
82   def timeout?
83     @deferred.nil? && @_write_buffer.empty? and close.nil?
84   end
86   # used for streaming sockets and pipes
87   def stream_response(status, headers, io, body)
88     c = stream_response_headers(status, headers) if headers
89     # we only want to attach to the Coolio::Loop belonging to the
90     # main thread in Ruby 1.9
91     io = (c ? DeferredChunkResponse : DeferredResponse).new(io, self, body)
92     defer_body(io.attach(LOOP))
93   end
95   def coolio_write_response(response, alive)
96     status, headers, body = response
97     headers = @hp.headers? ? HH.new(headers) : nil
99     headers[CONNECTION] = alive ? KEEP_ALIVE : CLOSE if headers
100     if body.respond_to?(:to_path)
101       io = body_to_io(body)
102       st = io.stat
104       if st.file?
105         offset, count = 0, st.size
106         if headers
107           if range = make_range!(@env, status, headers)
108             status, offset, count = range
109           end
110           write(response_header(status, headers))
111         end
112         return defer_body(SF.new(offset, count, io, body))
113       elsif st.socket? || st.pipe?
114         return stream_response(status, headers, io, body)
115       end
116       # char or block device... WTF? fall through to body.each
117     end
118     write(response_header(status, headers)) if headers
119     write_body_each(self, body, nil)
120   end
122   def app_call
123     KATO.delete(self)
124     @env[RACK_INPUT] = @input
125     @env[REMOTE_ADDR] = @_io.kgio_addr
126     response = APP.call(@env.update(RACK_DEFAULTS))
128     coolio_write_response(response, alive = @hp.next?)
129     return quit unless alive && :close != @state
130     @state = :headers
131     disable if enabled?
132   end
134   def on_write_complete
135     case @deferred
136     when DeferredResponse then return
137     when NilClass # fall through
138     else
139       begin
140         return rev_sendfile(@deferred)
141       rescue EOFError # expected at file EOF
142         close_deferred
143       end
144     end
146     case @state
147     when :close
148       close if @_write_buffer.empty?
149     when :headers
150       if @buf.empty?
151         unless enabled?
152           enable
153           KATO[self] = Time.now
154         end
155       else
156         on_read("")
157       end
158     end
159     rescue => e
160       handle_error(e)
161   end
163   def handle_error(e)
164     close_deferred
165     if msg = Rainbows::Error.response(e)
166       @_io.kgio_trywrite(msg) rescue nil
167     end
168     @_write_buffer.clear
169     ensure
170       quit
171   end
173   def close_deferred
174     case @deferred
175     when DeferredResponse, NilClass
176     else
177       begin
178         @deferred.close
179       rescue => e
180         G.server.logger.error("closing #@deferred: #{e}")
181       end
182       @deferred = nil
183     end
184   end
186   def on_close
187     close_deferred
188     CONN.delete(self)
189     KATO.delete(self)
190   end