remove unnecessary "::" constant prefixing
[rainbows.git] / lib / rainbows / rev / client.rb
blob00df4d3c58b53423c8481fc2b1e1d3ded5347931
1 # -*- encoding: binary -*-
2 # :enddoc:
3 class Rainbows::Rev::Client < Rev::IO
4   include Rainbows::EvCore
5   G = Rainbows::G
6   SF = Rainbows::StreamFile
7   CONN = Rainbows::Rev::CONN
8   KATO = Rainbows::Rev::KATO
9   DeferredResponse = Rainbows::Rev::DeferredResponse
10   DeferredChunkResponse = Rainbows::Rev::DeferredChunkResponse
12   def initialize(io)
13     CONN[self] = false
14     super(io)
15     post_init
16     @deferred = nil
17   end
19   def want_more
20     enable unless enabled?
21   end
23   def quit
24     super
25     close if @deferred.nil? && @_write_buffer.empty?
26   end
28   # override the Rev::IO#write method try to write directly to the
29   # kernel socket buffers to avoid an extra userspace copy if
30   # possible.
31   def write(buf)
32     if @_write_buffer.empty?
33       begin
34         case rv = @_io.kgio_trywrite(buf)
35         when nil
36           return enable_write_watcher
37         when :wait_writable
38           break # fall through to super(buf)
39         when String
40           buf = rv # retry, skb could grow or been drained
41         end
42       rescue => e
43         return handle_error(e)
44       end while true
45     end
46     super(buf)
47   end
49   def on_readable
50     buf = @_io.kgio_tryread(16384)
51     case buf
52     when :wait_readable
53     when nil # eof
54       close
55     else
56       on_read buf
57     end
58   rescue Errno::ECONNRESET
59     close
60   end
62   # queued, optional response bodies, it should only be unpollable "fast"
63   # devices where read(2) is uninterruptable.  Unfortunately, NFS and ilk
64   # are also part of this.  We'll also stick DeferredResponse bodies in
65   # here to prevent connections from being closed on us.
66   def defer_body(io)
67     @deferred = io
68     enable_write_watcher
69   end
71   # allows enabling of write watcher even when read watcher is disabled
72   def evloop
73     LOOP # this constant is set in when a worker starts
74   end
76   def next!
77     @deferred = nil
78     enable_write_watcher
79   end
81   def timeout?
82     @deferred.nil? && @_write_buffer.empty? and close.nil?
83   end
85   # used for streaming sockets and pipes
86   def stream_response(status, headers, io, body)
87     c = stream_response_headers(status, headers) if headers
88     # we only want to attach to the Rev::Loop belonging to the
89     # main thread in Ruby 1.9
90     io = (c ? DeferredChunkResponse : DeferredResponse).new(io, self, body)
91     defer_body(io.attach(LOOP))
92   end
94   def rev_write_response(response, alive)
95     status, headers, body = response
96     headers = @hp.headers? ? HH.new(headers) : nil
98     headers[CONNECTION] = alive ? KEEP_ALIVE : CLOSE if headers
99     if body.respond_to?(:to_path)
100       io = body_to_io(body)
101       st = io.stat
103       if st.file?
104         offset, count = 0, st.size
105         if headers
106           if range = make_range!(@env, status, headers)
107             status, offset, count = range
108           end
109           write(response_header(status, headers))
110         end
111         return defer_body(SF.new(offset, count, io, body))
112       elsif st.socket? || st.pipe?
113         return stream_response(status, headers, io, body)
114       end
115       # char or block device... WTF? fall through to body.each
116     end
117     write(response_header(status, headers)) if headers
118     write_body_each(self, body, nil)
119   end
121   def app_call
122     KATO.delete(self)
123     @env[RACK_INPUT] = @input
124     @env[REMOTE_ADDR] = @_io.kgio_addr
125     response = APP.call(@env.update(RACK_DEFAULTS))
127     rev_write_response(response, alive = @hp.next? && G.alive)
128     return quit unless alive && :close != @state
129     @state = :headers
130     disable if enabled?
131   end
133   def on_write_complete
134     case @deferred
135     when DeferredResponse then return
136     when NilClass # fall through
137     else
138       begin
139         return rev_sendfile(@deferred)
140       rescue EOFError # expected at file EOF
141         close_deferred
142       end
143     end
145     case @state
146     when :close
147       close if @_write_buffer.empty?
148     when :headers
149       if @buf.empty?
150         unless enabled?
151           enable
152           KATO[self] = Time.now
153         end
154       else
155         on_read("")
156       end
157     end
158     rescue => e
159       handle_error(e)
160   end
162   def handle_error(e)
163     close_deferred
164     if msg = Rainbows::Error.response(e)
165       @_io.kgio_trywrite(msg) rescue nil
166     end
167     @_write_buffer.clear
168     ensure
169       quit
170   end
172   def close_deferred
173     case @deferred
174     when DeferredResponse, NilClass
175     else
176       begin
177         @deferred.close
178       rescue => e
179         G.server.logger.error("closing #@deferred: #{e}")
180       end
181       @deferred = nil
182     end
183   end
185   def on_close
186     close_deferred
187     CONN.delete(self)
188     KATO.delete(self)
189   end