coolio: enable async.callback for one-shot body responses
[rainbows.git] / lib / rainbows / coolio / client.rb
blob6264df7e3d9c147178556ba3833f5245fef45659
1 # -*- encoding: binary -*-
2 # :enddoc:
3 class Rainbows::Coolio::Client < Coolio::IO
4   include Rainbows::EvCore
5   CONN = Rainbows::Coolio::CONN
6   KATO = Rainbows::Coolio::KATO
7   LOOP = Coolio::Loop.default
9   def initialize(io)
10     CONN[self] = false
11     super(io)
12     post_init
13     @deferred = nil
14   end
16   def want_more
17     enable unless enabled?
18   end
20   def quit
21     super
22     close if nil == @deferred && @_write_buffer.empty?
23   end
25   # override the Coolio::IO#write method try to write directly to the
26   # kernel socket buffers to avoid an extra userspace copy if
27   # possible.
28   def write(buf)
29     if @_write_buffer.empty?
30       begin
31         case rv = @_io.kgio_trywrite(buf)
32         when nil
33           return enable_write_watcher
34         when :wait_writable
35           break # fall through to super(buf)
36         when String
37           buf = rv # retry, skb could grow or been drained
38         end
39       rescue => e
40         return handle_error(e)
41       end while true
42     end
43     super(buf)
44   end
46   def on_readable
47     buf = @_io.kgio_tryread(16384)
48     case buf
49     when :wait_readable
50     when nil # eof
51       close
52     else
53       on_read buf
54     end
55   rescue Errno::ECONNRESET
56     close
57   end
59   # allows enabling of write watcher even when read watcher is disabled
60   def evloop
61     LOOP
62   end
64   def next!
65     attached? or return
66     @deferred = nil
67     enable_write_watcher # trigger on_write_complete
68   end
70   def timeout?
71     nil == @deferred && @_write_buffer.empty? and close.nil?
72   end
74   # used for streaming sockets and pipes
75   def stream_response_body(body, io, chunk)
76     # we only want to attach to the Coolio::Loop belonging to the
77     # main thread in Ruby 1.9
78     (chunk ? Rainbows::Coolio::ResponseChunkPipe :
79              Rainbows::Coolio::ResponsePipe).new(io, self, body).attach(LOOP)
80     @deferred = true
81   end
83   def write_response_path(status, headers, body, alive)
84     io = body_to_io(body)
85     st = io.stat
87     if st.file?
88       defer_file(status, headers, body, alive, io, st)
89     elsif st.socket? || st.pipe?
90       chunk = stream_response_headers(status, headers, alive)
91       stream_response_body(body, io, chunk)
92     else
93       # char or block device... WTF?
94       write_response(status, headers, body, alive)
95     end
96   end
98   def ev_write_response(status, headers, body, alive)
99     if body.respond_to?(:to_path)
100       write_response_path(status, headers, body, alive)
101     else
102       write_response(status, headers, body, alive)
103     end
104     return quit unless alive && :close != @state
105     @state = :headers
106   end
108   def coolio_write_async_response(response)
109     write_async_response(response)
110     @deferred = nil
111   end
113   def app_call
114     KATO.delete(self)
115     disable if enabled?
116     @env[RACK_INPUT] = @input
117     @env[REMOTE_ADDR] = @_io.kgio_addr
118     @env[ASYNC_CALLBACK] = method(:coolio_write_async_response)
119     status, headers, body = catch(:async) {
120       APP.call(@env.merge!(RACK_DEFAULTS))
121     }
123     (nil == status || -1 == status) ? @deferred = true :
124         ev_write_response(status, headers, body, @hp.next?)
125   end
127   def on_write_complete
128     case @deferred
129     when true then return # #next! will clear this bit
130     when nil # fall through
131     else
132       begin
133         return stream_file_chunk(@deferred)
134       rescue EOFError # expected at file EOF
135         close_deferred # fall through
136       end
137     end
139     case @state
140     when :close
141       close if @_write_buffer.empty?
142     when :headers
143       if @buf.empty?
144         unless enabled?
145           enable
146           KATO[self] = Time.now
147         end
148       else
149         on_read("")
150       end
151     end
152     rescue => e
153       handle_error(e)
154   end
156   def handle_error(e)
157     close_deferred
158     if msg = Rainbows::Error.response(e)
159       @_io.kgio_trywrite(msg) rescue nil
160     end
161     @_write_buffer.clear
162     ensure
163       quit
164   end
166   def close_deferred
167     if @deferred
168       begin
169         @deferred.close if @deferred.respond_to?(:close)
170       rescue => e
171         Rainbows.server.logger.error("closing #@deferred: #{e}")
172       end
173       @deferred = nil
174     end
175   end
177   def on_close
178     close_deferred
179     CONN.delete(self)
180     KATO.delete(self)
181   end
183   if IO.method_defined?(:sendfile_nonblock)
184     def defer_file(status, headers, body, alive, io, st)
185       if r = sendfile_range(status, headers)
186         status, headers, range = r
187         write_headers(status, headers, alive)
188         range and defer_file_stream(range[0], range[1], io, body)
189       else
190         write_headers(status, headers, alive)
191         defer_file_stream(0, st.size, io, body)
192       end
193     end
195     def stream_file_chunk(sf) # +sf+ is a Rainbows::StreamFile object
196       sf.offset += (n = @_io.sendfile_nonblock(sf, sf.offset, sf.count))
197       0 == (sf.count -= n) and raise EOFError
198       enable_write_watcher
199       rescue Errno::EAGAIN
200         enable_write_watcher
201     end
202   else
203     def defer_file(status, headers, body, alive, io, st)
204       write_headers(status, headers, alive)
205       defer_file_stream(0, st.size, io, body)
206     end
208     def stream_file_chunk(body)
209       write(body.to_io.sysread(0x4000))
210     end
211   end
213   def defer_file_stream(offset, count, io, body)
214     @deferred = Rainbows::StreamFile.new(offset, count, io, body)
215     enable_write_watcher
216   end