eliminate G constant and just use the Rainbows! module
[rainbows.git] / lib / rainbows / coolio / client.rb
blobb5430f64f2198580df1e239bbbfa0328827adc26
1 # -*- encoding: binary -*-
2 # :enddoc:
3 class Rainbows::Coolio::Client < Coolio::IO
4   include Rainbows::EvCore
5   SF = Rainbows::StreamFile
6   CONN = Rainbows::Coolio::CONN
7   KATO = Rainbows::Coolio::KATO
8   ResponsePipe = Rainbows::Coolio::ResponsePipe
9   ResponseChunkPipe = Rainbows::Coolio::ResponseChunkPipe
11   def initialize(io)
12     CONN[self] = false
13     super(io)
14     post_init
15     @deferred = nil
16   end
18   def want_more
19     enable unless enabled?
20   end
22   def quit
23     super
24     close if @deferred.nil? && @_write_buffer.empty?
25   end
27   # override the Coolio::IO#write method try to write directly to the
28   # kernel socket buffers to avoid an extra userspace copy if
29   # possible.
30   def write(buf)
31     if @_write_buffer.empty?
32       begin
33         case rv = @_io.kgio_trywrite(buf)
34         when nil
35           return enable_write_watcher
36         when :wait_writable
37           break # fall through to super(buf)
38         when String
39           buf = rv # retry, skb could grow or been drained
40         end
41       rescue => e
42         return handle_error(e)
43       end while true
44     end
45     super(buf)
46   end
48   def on_readable
49     buf = @_io.kgio_tryread(16384)
50     case buf
51     when :wait_readable
52     when nil # eof
53       close
54     else
55       on_read buf
56     end
57   rescue Errno::ECONNRESET
58     close
59   end
61   # queued, optional response bodies, it should only be unpollable "fast"
62   # devices where read(2) is uninterruptable.  Unfortunately, NFS and ilk
63   # are also part of this.  We'll also stick ResponsePipe bodies in
64   # here to prevent connections from being closed on us.
65   def defer_body(io)
66     @deferred = io
67     enable_write_watcher
68   end
70   # allows enabling of write watcher even when read watcher is disabled
71   def evloop
72     LOOP # this constant is set in when a worker starts
73   end
75   def next!
76     attached? or return
77     @deferred = nil
78     enable_write_watcher
79   end
81   def timeout?
82     @deferred.nil? && @_write_buffer.empty? and close.nil?
83   end
85   # used for streaming sockets and pipes
86   def stream_response_body(body, io, chunk)
87     # we only want to attach to the Coolio::Loop belonging to the
88     # main thread in Ruby 1.9
89     io = (chunk ? ResponseChunkPipe : ResponsePipe).new(io, self, body)
90     defer_body(io.attach(LOOP))
91   end
93   def coolio_write_response(response, alive)
94     status, headers, body = response
96     if body.respond_to?(:to_path)
97       io = body_to_io(body)
98       st = io.stat
100       if st.file?
101         if respond_to?(:sendfile_range) && r = sendfile_range(status, headers)
102           status, headers, range = r
103           write_headers(status, headers, alive)
104           defer_body(SF.new(range[0], range[1], io, body)) if range
105         else
106           write_headers(status, headers, alive)
107           defer_body(SF.new(0, st.size, io, body))
108         end
109         return
110       elsif st.socket? || st.pipe?
111         chunk = stream_response_headers(status, headers, alive)
112         return stream_response_body(body, io, chunk)
113       end
114       # char or block device... WTF? fall through to body.each
115     end
116     write_response(status, headers, body, alive)
117   end
119   def app_call
120     KATO.delete(self)
121     @env[RACK_INPUT] = @input
122     @env[REMOTE_ADDR] = @_io.kgio_addr
123     response = APP.call(@env.merge!(RACK_DEFAULTS))
125     coolio_write_response(response, alive = @hp.next?)
126     return quit unless alive && :close != @state
127     @state = :headers
128     disable if enabled?
129   end
131   def on_write_complete
132     case @deferred
133     when ResponsePipe then return
134     when NilClass # fall through
135     else
136       begin
137         return rev_sendfile(@deferred)
138       rescue EOFError # expected at file EOF
139         close_deferred
140       end
141     end
143     case @state
144     when :close
145       close if @_write_buffer.empty?
146     when :headers
147       if @buf.empty?
148         unless enabled?
149           enable
150           KATO[self] = Time.now
151         end
152       else
153         on_read("")
154       end
155     end
156     rescue => e
157       handle_error(e)
158   end
160   def handle_error(e)
161     close_deferred
162     if msg = Rainbows::Error.response(e)
163       @_io.kgio_trywrite(msg) rescue nil
164     end
165     @_write_buffer.clear
166     ensure
167       quit
168   end
170   def close_deferred
171     case @deferred
172     when ResponsePipe, NilClass
173     else
174       begin
175         @deferred.close
176       rescue => e
177         Rainbows.server.logger.error("closing #@deferred: #{e}")
178       end
179       @deferred = nil
180     end
181   end
183   def on_close
184     close_deferred
185     CONN.delete(self)
186     KATO.delete(self)
187   end