globally refactor Range handling for responses
[rainbows.git] / lib / rainbows / coolio / client.rb
blobd0b17a9ac535501f051ba5d008aca652d16969ab
1 # -*- encoding: binary -*-
2 # :enddoc:
3 class Rainbows::Coolio::Client < Coolio::IO
4   include Rainbows::EvCore
5   G = Rainbows::G
6   SF = Rainbows::StreamFile
7   CONN = Rainbows::Coolio::CONN
8   KATO = Rainbows::Coolio::KATO
9   ResponsePipe = Rainbows::Coolio::ResponsePipe
10   ResponseChunkPipe = Rainbows::Coolio::ResponseChunkPipe
12   def initialize(io)
13     CONN[self] = false
14     super(io)
15     post_init
16     @deferred = nil
17   end
19   def want_more
20     enable unless enabled?
21   end
23   def quit
24     super
25     close if @deferred.nil? && @_write_buffer.empty?
26   end
28   # override the Coolio::IO#write method try to write directly to the
29   # kernel socket buffers to avoid an extra userspace copy if
30   # possible.
31   def write(buf)
32     if @_write_buffer.empty?
33       begin
34         case rv = @_io.kgio_trywrite(buf)
35         when nil
36           return enable_write_watcher
37         when :wait_writable
38           break # fall through to super(buf)
39         when String
40           buf = rv # retry, skb could grow or been drained
41         end
42       rescue => e
43         return handle_error(e)
44       end while true
45     end
46     super(buf)
47   end
49   def on_readable
50     buf = @_io.kgio_tryread(16384)
51     case buf
52     when :wait_readable
53     when nil # eof
54       close
55     else
56       on_read buf
57     end
58   rescue Errno::ECONNRESET
59     close
60   end
62   # queued, optional response bodies, it should only be unpollable "fast"
63   # devices where read(2) is uninterruptable.  Unfortunately, NFS and ilk
64   # are also part of this.  We'll also stick ResponsePipe bodies in
65   # here to prevent connections from being closed on us.
66   def defer_body(io)
67     @deferred = io
68     enable_write_watcher
69   end
71   # allows enabling of write watcher even when read watcher is disabled
72   def evloop
73     LOOP # this constant is set in when a worker starts
74   end
76   def next!
77     attached? or return
78     @deferred = nil
79     enable_write_watcher
80   end
82   def timeout?
83     @deferred.nil? && @_write_buffer.empty? and close.nil?
84   end
86   # used for streaming sockets and pipes
87   def stream_response_body(body, io, chunk)
88     # we only want to attach to the Coolio::Loop belonging to the
89     # main thread in Ruby 1.9
90     io = (chunk ? ResponseChunkPipe : ResponsePipe).new(io, self, body)
91     defer_body(io.attach(LOOP))
92   end
94   def coolio_write_response(response, alive)
95     status, headers, body = response
97     if body.respond_to?(:to_path)
98       io = body_to_io(body)
99       st = io.stat
101       if st.file?
102         if respond_to?(:sendfile_range) && r = sendfile_range(status, headers)
103           status, headers, range = r
104           write_headers(status, headers, alive)
105           defer_body(SF.new(range[0], range[1], io, body)) if range
106         else
107           write_headers(status, headers, alive)
108           defer_body(SF.new(0, st.size, io, body))
109         end
110         return
111       elsif st.socket? || st.pipe?
112         chunk = stream_response_headers(status, headers, alive)
113         return stream_response_body(body, io, chunk)
114       end
115       # char or block device... WTF? fall through to body.each
116     end
117     write_response(status, headers, body, alive)
118   end
120   def app_call
121     KATO.delete(self)
122     @env[RACK_INPUT] = @input
123     @env[REMOTE_ADDR] = @_io.kgio_addr
124     response = APP.call(@env.update(RACK_DEFAULTS))
126     coolio_write_response(response, alive = @hp.next?)
127     return quit unless alive && :close != @state
128     @state = :headers
129     disable if enabled?
130   end
132   def on_write_complete
133     case @deferred
134     when ResponsePipe then return
135     when NilClass # fall through
136     else
137       begin
138         return rev_sendfile(@deferred)
139       rescue EOFError # expected at file EOF
140         close_deferred
141       end
142     end
144     case @state
145     when :close
146       close if @_write_buffer.empty?
147     when :headers
148       if @buf.empty?
149         unless enabled?
150           enable
151           KATO[self] = Time.now
152         end
153       else
154         on_read("")
155       end
156     end
157     rescue => e
158       handle_error(e)
159   end
161   def handle_error(e)
162     close_deferred
163     if msg = Rainbows::Error.response(e)
164       @_io.kgio_trywrite(msg) rescue nil
165     end
166     @_write_buffer.clear
167     ensure
168       quit
169   end
171   def close_deferred
172     case @deferred
173     when ResponsePipe, NilClass
174     else
175       begin
176         @deferred.close
177       rescue => e
178         G.server.logger.error("closing #@deferred: #{e}")
179       end
180       @deferred = nil
181     end
182   end
184   def on_close
185     close_deferred
186     CONN.delete(self)
187     KATO.delete(self)
188   end