quiet mismatched indentation warnings
[rainbows.git] / lib / rainbows / coolio / client.rb
blob12c1434182e87d3a6e650ea6369f27bdb90cb271
1 # -*- encoding: binary -*-
2 # :enddoc:
3 class Rainbows::Coolio::Client < Coolio::IO
4   include Rainbows::EvCore
5   APP = Rainbows.server.app
6   CONN = Rainbows::Coolio::CONN
7   KATO = Rainbows::Coolio::KATO
8   LOOP = Coolio::Loop.default
10   def initialize(io)
11     CONN[self] = false
12     super(io)
13     post_init
14     @deferred = nil
15   end
17   def want_more
18     enable unless enabled?
19   end
21   def quit
22     super
23     close if nil == @deferred && @_write_buffer.empty?
24   end
26   # override the Coolio::IO#write method try to write directly to the
27   # kernel socket buffers to avoid an extra userspace copy if
28   # possible.
29   def write(buf)
30     if @_write_buffer.empty?
31       begin
32         case rv = @_io.kgio_trywrite(buf)
33         when nil
34           return enable_write_watcher
35         when :wait_writable
36           break # fall through to super(buf)
37         when String
38           buf = rv # retry, skb could grow or been drained
39         end
40       rescue => e
41         return handle_error(e)
42       end while true
43     end
44     super(buf)
45   end
47   def on_readable
48     buf = @_io.kgio_tryread(CLIENT_HEADER_BUFFER_SIZE, RBUF)
49     case buf
50     when :wait_readable
51     when nil # eof
52       close
53     else
54       on_read buf
55     end
56   rescue Errno::ECONNRESET
57     close
58   end
60   # allows enabling of write watcher even when read watcher is disabled
61   def evloop
62     LOOP
63   end
65   def next!
66     attached? or return
67     @deferred = nil
68     enable_write_watcher # trigger on_write_complete
69   end
71   def timeout?
72     if nil == @deferred && @_write_buffer.empty?
73       @_io.shutdown
74       true
75     else
76       false
77     end
78   end
80   # used for streaming sockets and pipes
81   def stream_response_body(body, io, chunk)
82     # we only want to attach to the Coolio::Loop belonging to the
83     # main thread in Ruby 1.9
84     (chunk ? Rainbows::Coolio::ResponseChunkPipe :
85              Rainbows::Coolio::ResponsePipe).new(io, self, body).attach(LOOP)
86     @deferred = true
87   end
89   def hijacked
90     CONN.delete(self)
91     detach
92     nil
93   end
95   def write_response_path(status, headers, body, alive)
96     io = body_to_io(body)
97     st = io.stat
99     if st.file?
100       defer_file(status, headers, body, alive, io, st)
101     elsif st.socket? || st.pipe?
102       chunk = stream_response_headers(status, headers, alive, body)
103       return hijacked if nil == chunk
104       stream_response_body(body, io, chunk)
105     else
106       # char or block device... WTF?
107       write_response(status, headers, body, alive)
108     end
109   end
111   def ev_write_response(status, headers, body, alive)
112     if body.respond_to?(:to_path)
113       body = write_response_path(status, headers, body, alive)
114     else
115       body = write_response(status, headers, body, alive)
116     end
117     return hijacked unless body
118     return quit unless alive && :close != @state
119     @state = :headers
120   end
122   def app_call input
123     KATO.delete(self)
124     disable if enabled?
125     @env['rack.input'] = input
126     @env['REMOTE_ADDR'] = @_io.kgio_addr
127     @env['async.callback'] = method(:write_async_response)
128     @hp.hijack_setup(@_io)
129     status, headers, body = catch(:async) {
130       APP.call(@env.merge!(RACK_DEFAULTS))
131     }
132     return hijacked if @hp.hijacked?
134     (nil == status || -1 == status) ? @deferred = true :
135         ev_write_response(status, headers, body, @hp.next?)
136   end
138   def on_write_complete
139     case @deferred
140     when true then return # #next! will clear this bit
141     when nil # fall through
142     else
143       return if stream_file_chunk(@deferred)
144       close_deferred # EOF, fall through
145     end
147     case @state
148     when :close
149       close if @_write_buffer.empty?
150     when :headers
151       if @buf.empty?
152         buf = @_io.kgio_tryread(CLIENT_HEADER_BUFFER_SIZE, RBUF) or return close
153         String === buf and return on_read(buf)
154         # buf == :wait_readable
155         unless enabled?
156           enable
157           KATO[self] = Rainbows.now
158         end
159       else
160         on_read(''.freeze)
161       end
162     end
163   rescue => e
164     handle_error(e)
165   end
167   def handle_error(e)
168     close_deferred
169     if msg = Rainbows::Error.response(e)
170       @_io.kgio_trywrite(msg) rescue nil
171     end
172     @_write_buffer.clear
173   ensure
174     quit
175   end
177   def close_deferred
178     if @deferred
179       begin
180         @deferred.close if @deferred.respond_to?(:close)
181       rescue => e
182         Unicorn.log_error(Rainbows.server.logger,
183                           "closing deferred=#{@deferred.inspect}", e)
184       end
185       @deferred = nil
186     end
187   end
189   def on_close
190     close_deferred
191     CONN.delete(self)
192     KATO.delete(self)
193   end
195   if IO.method_defined?(:trysendfile)
196     def defer_file(status, headers, body, alive, io, st)
197       if r = sendfile_range(status, headers)
198         status, headers, range = r
199         body = write_headers(status, headers, alive, body) or return hijacked
200         range and defer_file_stream(range[0], range[1], io, body)
201       else
202         write_headers(status, headers, alive, body) or return hijacked
203         defer_file_stream(0, st.size, io, body)
204       end
205       body
206     end
208     def stream_file_chunk(sf) # +sf+ is a Rainbows::StreamFile object
209       case n = @_io.trysendfile(sf, sf.offset, sf.count)
210       when Integer
211         sf.offset += n
212         return if 0 == (sf.count -= n)
213       when :wait_writable
214         return enable_write_watcher
215       else
216         return
217       end while true
218     end
219   else
220     def defer_file(status, headers, body, alive, io, st)
221       write_headers(status, headers, alive, body) or return hijacked
222       defer_file_stream(0, st.size, io, body)
223       body
224     end
226     def stream_file_chunk(body)
227       buf = body.to_io.read(0x4000) and write(buf)
228     end
229   end
231   def defer_file_stream(offset, count, io, body)
232     @deferred = Rainbows::StreamFile.new(offset, count, io, body)
233     enable_write_watcher
234   end