coolio/client: on_write_complete triggers read
[rainbows.git] / lib / rainbows / coolio / client.rb
blob98533216d7a779eb0f22e7b323141ec77c3ca5e8
1 # -*- encoding: binary -*-
2 # :enddoc:
3 class Rainbows::Coolio::Client < Coolio::IO
4   include Rainbows::EvCore
5   APP = Rainbows.server.app
6   CONN = Rainbows::Coolio::CONN
7   KATO = Rainbows::Coolio::KATO
8   LOOP = Coolio::Loop.default
10   def initialize(io)
11     CONN[self] = false
12     super(io)
13     post_init
14     @deferred = nil
15   end
17   def want_more
18     enable unless enabled?
19   end
21   def quit
22     super
23     close if nil == @deferred && @_write_buffer.empty?
24   end
26   # override the Coolio::IO#write method try to write directly to the
27   # kernel socket buffers to avoid an extra userspace copy if
28   # possible.
29   def write(buf)
30     if @_write_buffer.empty?
31       begin
32         case rv = @_io.kgio_trywrite(buf)
33         when nil
34           return enable_write_watcher
35         when :wait_writable
36           break # fall through to super(buf)
37         when String
38           buf = rv # retry, skb could grow or been drained
39         end
40       rescue => e
41         return handle_error(e)
42       end while true
43     end
44     super(buf)
45   end
47   def on_readable
48     buf = @_io.kgio_tryread(16384, RBUF)
49     case buf
50     when :wait_readable
51     when nil # eof
52       close
53     else
54       on_read buf
55     end
56   rescue Errno::ECONNRESET
57     close
58   end
60   # allows enabling of write watcher even when read watcher is disabled
61   def evloop
62     LOOP
63   end
65   def next!
66     attached? or return
67     @deferred = nil
68     enable_write_watcher # trigger on_write_complete
69   end
71   def timeout?
72     nil == @deferred && @_write_buffer.empty? and close.nil?
73   end
75   # used for streaming sockets and pipes
76   def stream_response_body(body, io, chunk)
77     # we only want to attach to the Coolio::Loop belonging to the
78     # main thread in Ruby 1.9
79     (chunk ? Rainbows::Coolio::ResponseChunkPipe :
80              Rainbows::Coolio::ResponsePipe).new(io, self, body).attach(LOOP)
81     @deferred = true
82   end
84   def write_response_path(status, headers, body, alive)
85     io = body_to_io(body)
86     st = io.stat
88     if st.file?
89       defer_file(status, headers, body, alive, io, st)
90     elsif st.socket? || st.pipe?
91       chunk = stream_response_headers(status, headers, alive)
92       stream_response_body(body, io, chunk)
93     else
94       # char or block device... WTF?
95       write_response(status, headers, body, alive)
96     end
97   end
99   def ev_write_response(status, headers, body, alive)
100     if body.respond_to?(:to_path)
101       write_response_path(status, headers, body, alive)
102     else
103       write_response(status, headers, body, alive)
104     end
105     return quit unless alive && :close != @state
106     @state = :headers
107   end
109   def app_call input
110     KATO.delete(self)
111     disable if enabled?
112     @env[RACK_INPUT] = input
113     @env[REMOTE_ADDR] = @_io.kgio_addr
114     @env[ASYNC_CALLBACK] = method(:write_async_response)
115     status, headers, body = catch(:async) {
116       APP.call(@env.merge!(RACK_DEFAULTS))
117     }
119     (nil == status || -1 == status) ? @deferred = true :
120         ev_write_response(status, headers, body, @hp.next?)
121   end
123   def on_write_complete
124     case @deferred
125     when true then return # #next! will clear this bit
126     when nil # fall through
127     else
128       begin
129         return stream_file_chunk(@deferred)
130       rescue EOFError # expected at file EOF
131         close_deferred # fall through
132       end
133     end
135     case @state
136     when :close
137       close if @_write_buffer.empty?
138     when :headers
139       if @buf.empty?
140         buf = @_io.kgio_tryread(16384, RBUF) or return close
141         String === buf and return on_read(buf)
142         # buf == :wait_readable
143         unless enabled?
144           enable
145           KATO[self] = Time.now
146         end
147       else
148         on_read(Z)
149       end
150     end
151     rescue => e
152       handle_error(e)
153   end
155   def handle_error(e)
156     close_deferred
157     if msg = Rainbows::Error.response(e)
158       @_io.kgio_trywrite(msg) rescue nil
159     end
160     @_write_buffer.clear
161     ensure
162       quit
163   end
165   def close_deferred
166     if @deferred
167       begin
168         @deferred.close if @deferred.respond_to?(:close)
169       rescue => e
170         Rainbows.server.logger.error("closing #@deferred: #{e}")
171       end
172       @deferred = nil
173     end
174   end
176   def on_close
177     close_deferred
178     CONN.delete(self)
179     KATO.delete(self)
180   end
182   if IO.method_defined?(:sendfile_nonblock)
183     def defer_file(status, headers, body, alive, io, st)
184       if r = sendfile_range(status, headers)
185         status, headers, range = r
186         write_headers(status, headers, alive)
187         range and defer_file_stream(range[0], range[1], io, body)
188       else
189         write_headers(status, headers, alive)
190         defer_file_stream(0, st.size, io, body)
191       end
192     end
194     def stream_file_chunk(sf) # +sf+ is a Rainbows::StreamFile object
195       sf.offset += (n = @_io.sendfile_nonblock(sf, sf.offset, sf.count))
196       0 == (sf.count -= n) and raise EOFError
197       enable_write_watcher
198       rescue Errno::EAGAIN
199         enable_write_watcher
200     end
201   else
202     def defer_file(status, headers, body, alive, io, st)
203       write_headers(status, headers, alive)
204       defer_file_stream(0, st.size, io, body)
205     end
207     def stream_file_chunk(body)
208       write(body.to_io.sysread(0x4000))
209     end
210   end
212   def defer_file_stream(offset, count, io, body)
213     @deferred = Rainbows::StreamFile.new(offset, count, io, body)
214     enable_write_watcher
215   end