coolio+xepoll_thread*: use shutdown() for keepalive timeout
[rainbows.git] / lib / rainbows / coolio / client.rb
blob8d48bbf0933e56881152bd78bafbfc4d19f7372e
1 # -*- encoding: binary -*-
2 # :enddoc:
3 class Rainbows::Coolio::Client < Coolio::IO
4   include Rainbows::EvCore
5   APP = Rainbows.server.app
6   CONN = Rainbows::Coolio::CONN
7   KATO = Rainbows::Coolio::KATO
8   LOOP = Coolio::Loop.default
10   def initialize(io)
11     CONN[self] = false
12     super(io)
13     post_init
14     @deferred = nil
15   end
17   def want_more
18     enable unless enabled?
19   end
21   def quit
22     super
23     close if nil == @deferred && @_write_buffer.empty?
24   end
26   # override the Coolio::IO#write method try to write directly to the
27   # kernel socket buffers to avoid an extra userspace copy if
28   # possible.
29   def write(buf)
30     if @_write_buffer.empty?
31       begin
32         case rv = @_io.kgio_trywrite(buf)
33         when nil
34           return enable_write_watcher
35         when :wait_writable
36           break # fall through to super(buf)
37         when String
38           buf = rv # retry, skb could grow or been drained
39         end
40       rescue => e
41         return handle_error(e)
42       end while true
43     end
44     super(buf)
45   end
47   def on_readable
48     buf = @_io.kgio_tryread(CLIENT_HEADER_BUFFER_SIZE, RBUF)
49     case buf
50     when :wait_readable
51     when nil # eof
52       close
53     else
54       on_read buf
55     end
56   rescue Errno::ECONNRESET
57     close
58   end
60   # allows enabling of write watcher even when read watcher is disabled
61   def evloop
62     LOOP
63   end
65   def next!
66     attached? or return
67     @deferred = nil
68     enable_write_watcher # trigger on_write_complete
69   end
71   def timeout?
72     if nil == @deferred && @_write_buffer.empty?
73       @_io.shutdown
74       true
75     else
76       false
77     end
78   end
80   # used for streaming sockets and pipes
81   def stream_response_body(body, io, chunk)
82     # we only want to attach to the Coolio::Loop belonging to the
83     # main thread in Ruby 1.9
84     (chunk ? Rainbows::Coolio::ResponseChunkPipe :
85              Rainbows::Coolio::ResponsePipe).new(io, self, body).attach(LOOP)
86     @deferred = true
87   end
89   def write_response_path(status, headers, body, alive)
90     io = body_to_io(body)
91     st = io.stat
93     if st.file?
94       defer_file(status, headers, body, alive, io, st)
95     elsif st.socket? || st.pipe?
96       chunk = stream_response_headers(status, headers, alive)
97       stream_response_body(body, io, chunk)
98     else
99       # char or block device... WTF?
100       write_response(status, headers, body, alive)
101     end
102   end
104   def ev_write_response(status, headers, body, alive)
105     if body.respond_to?(:to_path)
106       write_response_path(status, headers, body, alive)
107     else
108       write_response(status, headers, body, alive)
109     end
110     return quit unless alive && :close != @state
111     @state = :headers
112   end
114   def app_call input
115     KATO.delete(self)
116     disable if enabled?
117     @env[RACK_INPUT] = input
118     @env[REMOTE_ADDR] = @_io.kgio_addr
119     @env[ASYNC_CALLBACK] = method(:write_async_response)
120     status, headers, body = catch(:async) {
121       APP.call(@env.merge!(RACK_DEFAULTS))
122     }
124     (nil == status || -1 == status) ? @deferred = true :
125         ev_write_response(status, headers, body, @hp.next?)
126   end
128   def on_write_complete
129     case @deferred
130     when true then return # #next! will clear this bit
131     when nil # fall through
132     else
133       return if stream_file_chunk(@deferred)
134       close_deferred # EOF, fall through
135     end
137     case @state
138     when :close
139       close if @_write_buffer.empty?
140     when :headers
141       if @buf.empty?
142         buf = @_io.kgio_tryread(CLIENT_HEADER_BUFFER_SIZE, RBUF) or return close
143         String === buf and return on_read(buf)
144         # buf == :wait_readable
145         unless enabled?
146           enable
147           KATO[self] = Time.now
148         end
149       else
150         on_read(Z)
151       end
152     end
153     rescue => e
154       handle_error(e)
155   end
157   def handle_error(e)
158     close_deferred
159     if msg = Rainbows::Error.response(e)
160       @_io.kgio_trywrite(msg) rescue nil
161     end
162     @_write_buffer.clear
163     ensure
164       quit
165   end
167   def close_deferred
168     if @deferred
169       begin
170         @deferred.close if @deferred.respond_to?(:close)
171       rescue => e
172         Unicorn.log_error(Rainbows.server.logger,
173                           "closing deferred=#{@deferred.inspect}", e)
174       end
175       @deferred = nil
176     end
177   end
179   def on_close
180     close_deferred
181     CONN.delete(self)
182     KATO.delete(self)
183   end
185   if IO.method_defined?(:trysendfile)
186     def defer_file(status, headers, body, alive, io, st)
187       if r = sendfile_range(status, headers)
188         status, headers, range = r
189         write_headers(status, headers, alive)
190         range and defer_file_stream(range[0], range[1], io, body)
191       else
192         write_headers(status, headers, alive)
193         defer_file_stream(0, st.size, io, body)
194       end
195     end
197     def stream_file_chunk(sf) # +sf+ is a Rainbows::StreamFile object
198       case n = @_io.trysendfile(sf, sf.offset, sf.count)
199       when Integer
200         sf.offset += n
201         return if 0 == (sf.count -= n)
202       when :wait_writable
203         return enable_write_watcher
204       else
205         return
206       end while true
207     end
208   else
209     def defer_file(status, headers, body, alive, io, st)
210       write_headers(status, headers, alive)
211       defer_file_stream(0, st.size, io, body)
212     end
214     def stream_file_chunk(body)
215       buf = body.to_io.read(0x4000) and write(buf)
216     end
217   end
219   def defer_file_stream(offset, count, io, body)
220     @deferred = Rainbows::StreamFile.new(offset, count, io, body)
221     enable_write_watcher
222   end