switch from IO#sendfile_nonblock to IO#trysendfile
[rainbows.git] / lib / rainbows / coolio / client.rb
blob2de421a6006c57f76d5f4fc298b49c04cf44d597
1 # -*- encoding: binary -*-
2 # :enddoc:
3 class Rainbows::Coolio::Client < Coolio::IO
4   include Rainbows::EvCore
5   APP = Rainbows.server.app
6   CONN = Rainbows::Coolio::CONN
7   KATO = Rainbows::Coolio::KATO
8   LOOP = Coolio::Loop.default
10   def initialize(io)
11     CONN[self] = false
12     super(io)
13     post_init
14     @deferred = nil
15   end
17   def want_more
18     enable unless enabled?
19   end
21   def quit
22     super
23     close if nil == @deferred && @_write_buffer.empty?
24   end
26   # override the Coolio::IO#write method try to write directly to the
27   # kernel socket buffers to avoid an extra userspace copy if
28   # possible.
29   def write(buf)
30     if @_write_buffer.empty?
31       begin
32         case rv = @_io.kgio_trywrite(buf)
33         when nil
34           return enable_write_watcher
35         when :wait_writable
36           break # fall through to super(buf)
37         when String
38           buf = rv # retry, skb could grow or been drained
39         end
40       rescue => e
41         return handle_error(e)
42       end while true
43     end
44     super(buf)
45   end
47   def on_readable
48     buf = @_io.kgio_tryread(16384, RBUF)
49     case buf
50     when :wait_readable
51     when nil # eof
52       close
53     else
54       on_read buf
55     end
56   rescue Errno::ECONNRESET
57     close
58   end
60   # allows enabling of write watcher even when read watcher is disabled
61   def evloop
62     LOOP
63   end
65   def next!
66     attached? or return
67     @deferred = nil
68     enable_write_watcher # trigger on_write_complete
69   end
71   def timeout?
72     nil == @deferred && @_write_buffer.empty? and close.nil?
73   end
75   # used for streaming sockets and pipes
76   def stream_response_body(body, io, chunk)
77     # we only want to attach to the Coolio::Loop belonging to the
78     # main thread in Ruby 1.9
79     (chunk ? Rainbows::Coolio::ResponseChunkPipe :
80              Rainbows::Coolio::ResponsePipe).new(io, self, body).attach(LOOP)
81     @deferred = true
82   end
84   def write_response_path(status, headers, body, alive)
85     io = body_to_io(body)
86     st = io.stat
88     if st.file?
89       defer_file(status, headers, body, alive, io, st)
90     elsif st.socket? || st.pipe?
91       chunk = stream_response_headers(status, headers, alive)
92       stream_response_body(body, io, chunk)
93     else
94       # char or block device... WTF?
95       write_response(status, headers, body, alive)
96     end
97   end
99   def ev_write_response(status, headers, body, alive)
100     if body.respond_to?(:to_path)
101       write_response_path(status, headers, body, alive)
102     else
103       write_response(status, headers, body, alive)
104     end
105     return quit unless alive && :close != @state
106     @state = :headers
107   end
109   def app_call input
110     KATO.delete(self)
111     disable if enabled?
112     @env[RACK_INPUT] = input
113     @env[REMOTE_ADDR] = @_io.kgio_addr
114     @env[ASYNC_CALLBACK] = method(:write_async_response)
115     status, headers, body = catch(:async) {
116       APP.call(@env.merge!(RACK_DEFAULTS))
117     }
119     (nil == status || -1 == status) ? @deferred = true :
120         ev_write_response(status, headers, body, @hp.next?)
121   end
123   def on_write_complete
124     case @deferred
125     when true then return # #next! will clear this bit
126     when nil # fall through
127     else
128       return if stream_file_chunk(@deferred)
129       close_deferred # EOF, fall through
130     end
132     case @state
133     when :close
134       close if @_write_buffer.empty?
135     when :headers
136       if @buf.empty?
137         buf = @_io.kgio_tryread(16384, RBUF) or return close
138         String === buf and return on_read(buf)
139         # buf == :wait_readable
140         unless enabled?
141           enable
142           KATO[self] = Time.now
143         end
144       else
145         on_read(Z)
146       end
147     end
148     rescue => e
149       handle_error(e)
150   end
152   def handle_error(e)
153     close_deferred
154     if msg = Rainbows::Error.response(e)
155       @_io.kgio_trywrite(msg) rescue nil
156     end
157     @_write_buffer.clear
158     ensure
159       quit
160   end
162   def close_deferred
163     if @deferred
164       begin
165         @deferred.close if @deferred.respond_to?(:close)
166       rescue => e
167         Rainbows.server.logger.error("closing #@deferred: #{e}")
168       end
169       @deferred = nil
170     end
171   end
173   def on_close
174     close_deferred
175     CONN.delete(self)
176     KATO.delete(self)
177   end
179   if IO.method_defined?(:trysendfile)
180     def defer_file(status, headers, body, alive, io, st)
181       if r = sendfile_range(status, headers)
182         status, headers, range = r
183         write_headers(status, headers, alive)
184         range and defer_file_stream(range[0], range[1], io, body)
185       else
186         write_headers(status, headers, alive)
187         defer_file_stream(0, st.size, io, body)
188       end
189     end
191     def stream_file_chunk(sf) # +sf+ is a Rainbows::StreamFile object
192       case n = @_io.trysendfile(sf, sf.offset, sf.count)
193       when Integer
194         sf.offset += n
195         return if 0 == (sf.count -= n)
196       when :wait_writable
197         return enable_write_watcher
198       else
199         return
200       end while true
201     end
202   else
203     def defer_file(status, headers, body, alive, io, st)
204       write_headers(status, headers, alive)
205       defer_file_stream(0, st.size, io, body)
206     end
208     def stream_file_chunk(body)
209       buf = body.to_io.read(0x4000) and write(buf)
210     end
211   end
213   def defer_file_stream(offset, count, io, body)
214     @deferred = Rainbows::StreamFile.new(offset, count, io, body)
215     enable_write_watcher
216   end