epoll/xepoll: more consistent client implementations
[rainbows.git] / lib / rainbows / epoll / client.rb
blobe23d4e704defef852184f89f6d7bc0c228845bcf
1 # -*- encoding: binary -*-
2 # :enddoc:
4 module Rainbows::Epoll::Client
6   include Rainbows::EvCore
7   APP = Rainbows.server.app
8   Server = Rainbows::Epoll::Server
9   IN = SleepyPenguin::Epoll::IN | SleepyPenguin::Epoll::ET
10   OUT = SleepyPenguin::Epoll::OUT | SleepyPenguin::Epoll::ET
11   KATO = {}
12   KATO.compare_by_identity if KATO.respond_to?(:compare_by_identity)
13   Rainbows.config!(self, :keepalive_timeout)
14   EP = Rainbows::EP
15   ReRun = []
16   @@last_expire = Time.now
18   def self.expire
19     return if ((now = Time.now) - @@last_expire) < 1.0
20     if (ot = KEEPALIVE_TIMEOUT) >= 0
21       ot = now - ot
22       KATO.delete_if { |client, time| time < ot and client.timeout! }
23     end
24     @@last_expire = now
25   end
27   def self.loop
28     begin
29       EP.wait(nil, 1000) { |_, obj| obj.epoll_run }
30       while obj = ReRun.shift
31         obj.epoll_run
32       end
33       expire
34     rescue Errno::EINTR
35     rescue => e
36       Rainbows::Error.listen_loop(e)
37     end while Rainbows.tick || Server.nr > 0
38   end
40   # only call this once
41   def epoll_once
42     @wr_queue = [] # may contain String, ResponsePipe, and StreamFile objects
43     post_init
44     on_readable
45     rescue => e
46       handle_error(e)
47   end
49   def on_readable
50     case rv = kgio_tryread(CLIENT_HEADER_BUFFER_SIZE, RBUF)
51     when String
52       on_read(rv)
53       return if @wr_queue[0] || closed?
54     when :wait_readable
55       KATO[self] = @@last_expire if :headers == @state
56       return EP.set(self, IN)
57     else
58       break
59     end until :close == @state
60     close unless closed?
61     rescue Errno::ECONNRESET
62       close
63     rescue IOError
64   end
66   def app_call input # called by on_read()
67     @env[RACK_INPUT] = input
68     @env[REMOTE_ADDR] = kgio_addr
69     status, headers, body = APP.call(@env.merge!(RACK_DEFAULTS))
70     ev_write_response(status, headers, body, @hp.next?)
71   end
73   def write_response_path(status, headers, body, alive)
74     io = body_to_io(body)
75     st = io.stat
77     if st.file?
78       defer_file(status, headers, body, alive, io, st)
79     elsif st.socket? || st.pipe?
80       chunk = stream_response_headers(status, headers, alive)
81       stream_response_body(body, io, chunk)
82     else
83       # char or block device... WTF?
84       write_response(status, headers, body, alive)
85     end
86   end
88   # used for streaming sockets and pipes
89   def stream_response_body(body, io, chunk)
90     pipe = (chunk ? Rainbows::Epoll::ResponseChunkPipe :
91                     Rainbows::Epoll::ResponsePipe).new(io, self, body)
92     return @wr_queue << pipe if @wr_queue[0]
93     stream_pipe(pipe) or return
94     @wr_queue[0] or @wr_queue << Z
95   end
97   def ev_write_response(status, headers, body, alive)
98     if body.respond_to?(:to_path)
99       write_response_path(status, headers, body, alive)
100     else
101       write_response(status, headers, body, alive)
102     end
103     @state = alive ? :headers : :close
104     on_read(Z) if alive && 0 == @wr_queue.size && 0 != @buf.size
105   end
107   def epoll_run
108     if @wr_queue[0]
109       on_writable
110     else
111       KATO.delete self
112       on_readable
113     end
114   end
116   def want_more
117     ReRun << self
118   end
120   def on_deferred_write_complete
121     :close == @state and return close
122     0 == @buf.size ? on_readable : on_read(Z)
123   end
125   def handle_error(e)
126     msg = Rainbows::Error.response(e) and kgio_trywrite(msg) rescue nil
127     ensure
128       close
129   end
131   def write_deferred(obj)
132     Rainbows::StreamFile === obj ? stream_file(obj) : stream_pipe(obj)
133   end
135   # writes until our write buffer is empty or we block
136   # returns true if we're done writing everything
137   def on_writable
138     obj = @wr_queue.shift
140     case rv = String === obj ? kgio_trywrite(obj) : write_deferred(obj)
141     when nil
142       obj = @wr_queue.shift or return on_deferred_write_complete
143     when String
144       obj = rv # retry
145     when :wait_writable # Strings and StreamFiles only
146       @wr_queue.unshift(obj)
147       EP.set(self, OUT)
148       return
149     when :deferred
150       return
151     end while true
152     rescue => e
153       handle_error(e)
154   end
156   def write(buf)
157     unless @wr_queue[0]
158       case rv = kgio_trywrite(buf)
159       when nil
160         return # all written
161       when String
162         buf = rv # retry
163       when :wait_writable
164         @wr_queue << buf.dup # >3-word 1.9 strings are copy-on-write
165         return EP.set(self, OUT)
166       end while true
167     end
168     @wr_queue << buf.dup # >3-word 1.9 strings are copy-on-write
169   end
171   def close
172     @wr_queue.each { |x| x.respond_to?(:close) and x.close rescue nil }
173     super
174     on_close
175   end
177   def on_close
178     KATO.delete(self)
179     Server.decr
180   end
182   def timeout!
183     close
184     true
185   end
187   def defer_file(status, headers, body, alive, io, st)
188     if r = sendfile_range(status, headers)
189       status, headers, range = r
190       write_headers(status, headers, alive)
191       range and defer_file_stream(range[0], range[1], io, body)
192     else
193       write_headers(status, headers, alive)
194       defer_file_stream(0, st.size, io, body)
195     end
196   end
198   # returns +nil+ on EOF, :wait_writable if the client blocks
199   def stream_file(sf) # +sf+ is a Rainbows::StreamFile object
200     case n = trysendfile(sf, sf.offset, sf.count)
201     when Integer
202       sf.offset += n
203       0 == (sf.count -= n) and return sf.close
204     else
205       return n # :wait_writable or nil
206     end while true
207     rescue
208       sf.close
209       raise
210   end
212   def defer_file_stream(offset, count, io, body)
213     sf = Rainbows::StreamFile.new(offset, count, io, body)
214     unless @wr_queue[0]
215       stream_file(sf) or return
216     end
217     @wr_queue << sf
218     EP.set(self, OUT)
219   end
221   # this alternates between a push and pull model from the pipe -> client
222   # to avoid having too much data in userspace on either end.
223   def stream_pipe(pipe)
224     case buf = pipe.tryread
225     when String
226       write(buf)
227       if @wr_queue[0]
228         # client is blocked on write, client will pull from pipe later
229         EP.delete pipe
230         @wr_queue << pipe
231         EP.set(self, OUT)
232         return :deferred
233       end
234       # continue looping...
235     when :wait_readable
236       # pipe blocked on read, let the pipe push to the client in the future
237       EP.delete self
238       EP.set(pipe, IN)
239       return :deferred
240     else # nil => EOF
241       return pipe.close # nil
242     end while true
243     rescue
244       pipe.close
245       raise
246   end