epoll/*: remove user-space array as active queue
[rainbows.git] / lib / rainbows / epoll / client.rb
blob65fcb3e99f601b3e807acaedb7b828c78bdabbf4
1 # -*- encoding: binary -*-
2 # :enddoc:
4 module Rainbows::Epoll::Client
6   include Rainbows::EvCore
7   APP = Rainbows.server.app
8   Server = Rainbows::Epoll::Server
9   IN = SleepyPenguin::Epoll::IN | SleepyPenguin::Epoll::ONESHOT
10   OUT = SleepyPenguin::Epoll::OUT | SleepyPenguin::Epoll::ONESHOT
11   EPINOUT = IN | OUT
12   KATO = {}
13   KATO.compare_by_identity if KATO.respond_to?(:compare_by_identity)
14   Rainbows.at_quit { KATO.each_key { |k| k.timeout! }.clear }
15   Rainbows.config!(self, :keepalive_timeout)
16   EP = Rainbows::EP
17   @@last_expire = Time.now
19   def self.expire
20     return if ((now = Time.now) - @@last_expire) < 1.0
21     if (ot = KEEPALIVE_TIMEOUT) >= 0
22       ot = now - ot
23       KATO.delete_if { |client, time| time < ot and client.timeout! }
24     end
25     @@last_expire = now
26   end
28   def self.loop
29     begin
30       EP.wait(nil, 1000) { |_, obj| obj.epoll_run }
31       expire
32     rescue Errno::EINTR
33     rescue => e
34       Rainbows::Error.listen_loop(e)
35     end while Rainbows.tick || Server.nr > 0
36   end
38   # only call this once
39   def epoll_once
40     @wr_queue = [] # may contain String, ResponsePipe, and StreamFile objects
41     post_init
42     on_readable
43     rescue => e
44       handle_error(e)
45   end
47   def on_readable
48     case rv = kgio_tryread(CLIENT_HEADER_BUFFER_SIZE, RBUF)
49     when String
50       on_read(rv)
51       return if @wr_queue[0] || closed?
52       return hijacked if @hp.hijacked?
53     when :wait_readable
54       KATO[self] = @@last_expire if :headers == @state
55       return EP.set(self, IN)
56     else
57       break
58     end until :close == @state
59     close unless closed?
60     rescue Errno::ECONNRESET
61       close
62     rescue IOError
63   end
65   def app_call input # called by on_read()
66     @env[RACK_INPUT] = input
67     @env[REMOTE_ADDR] = kgio_addr
68     @hp.hijack_setup(@env, self)
69     status, headers, body = APP.call(@env.merge!(RACK_DEFAULTS))
70     return hijacked if @hp.hijacked?
71     ev_write_response(status, headers, body, @hp.next?)
72   end
74   def write_response_path(status, headers, body, alive)
75     io = body_to_io(body)
76     st = io.stat
78     if st.file?
79       defer_file(status, headers, body, alive, io, st)
80     elsif st.socket? || st.pipe?
81       chunk = stream_response_headers(status, headers, alive, body)
82       return hijacked if nil == chunk
83       stream_response_body(body, io, chunk)
84     else
85       # char or block device... WTF?
86       write_response(status, headers, body, alive)
87     end
88   end
90   # used for streaming sockets and pipes
91   def stream_response_body(body, io, chunk)
92     pipe = (chunk ? Rainbows::Epoll::ResponseChunkPipe :
93                     Rainbows::Epoll::ResponsePipe).new(io, self, body)
94     return @wr_queue << pipe if @wr_queue[0]
95     stream_pipe(pipe) or return
96     @wr_queue[0] or @wr_queue << Z
97   end
99   def ev_write_response(status, headers, body, alive)
100     @state = alive ? :headers : :close
101     if body.respond_to?(:to_path)
102       write_response_path(status, headers, body, alive)
103     else
104       write_response(status, headers, body, alive)
105     end
106     return hijacked if @hp.hijacked?
107     # try to read more if we didn't have to buffer writes
108     next_request if alive && 0 == @wr_queue.size
109   end
111   def hijacked
112     KATO.delete(self)
113     Server.decr # no other place to do this
114     EP.delete(self)
115     nil
116   end
118   def next_request
119     if 0 == @buf.size
120       want_more
121     else
122       # pipelined request (already in buffer)
123       on_read(Z)
124       return if @wr_queue[0] || closed?
125       return hijacked if @hp.hijacked?
126       close if :close == @state
127     end
128   end
130   def epoll_run
131     if @wr_queue[0]
132       on_writable
133     else
134       KATO.delete self
135       on_readable
136     end
137   end
139   def want_more
140     EP.set(self, EPINOUT)
141   end
143   def on_deferred_write_complete
144     :close == @state and return close
145     next_request
146   end
148   def handle_error(e)
149     msg = Rainbows::Error.response(e) and kgio_trywrite(msg) rescue nil
150     ensure
151       close
152   end
154   def write_deferred(obj)
155     Rainbows::StreamFile === obj ? stream_file(obj) : stream_pipe(obj)
156   end
158   # writes until our write buffer is empty or we block
159   # returns true if we're done writing everything
160   def on_writable
161     obj = @wr_queue.shift
163     case rv = String === obj ? kgio_trywrite(obj) : write_deferred(obj)
164     when nil
165       obj = @wr_queue.shift or return on_deferred_write_complete
166     when String
167       obj = rv # retry
168     when :wait_writable # Strings and StreamFiles only
169       @wr_queue.unshift(obj)
170       EP.set(self, OUT)
171       return
172     when :deferred
173       return
174     end while true
175     rescue => e
176       handle_error(e)
177   end
179   def write(buf)
180     unless @wr_queue[0]
181       case rv = kgio_trywrite(buf)
182       when nil
183         return # all written
184       when String
185         buf = rv # retry
186       when :wait_writable
187         @wr_queue << buf.dup # >3-word 1.9 strings are copy-on-write
188         return EP.set(self, OUT)
189       end while true
190     end
191     @wr_queue << buf.dup # >3-word 1.9 strings are copy-on-write
192   end
194   def close
195     @wr_queue.each { |x| x.respond_to?(:close) and x.close rescue nil }
196     super
197     on_close
198   end
200   def on_close
201     KATO.delete(self)
202     Server.decr
203   end
205   def timeout!
206     shutdown
207     true
208   end
210   # Rack apps should not hijack here, but they may...
211   def defer_file(status, headers, body, alive, io, st)
212     if r = sendfile_range(status, headers)
213       status, headers, range = r
214       write_headers(status, headers, alive, body) or return hijacked
215       range and defer_file_stream(range[0], range[1], io, body)
216     else
217       write_headers(status, headers, alive, body) or return hijacked
218       defer_file_stream(0, st.size, io, body)
219     end
220   end
222   # returns +nil+ on EOF, :wait_writable if the client blocks
223   def stream_file(sf) # +sf+ is a Rainbows::StreamFile object
224     case n = trysendfile(sf, sf.offset, sf.count)
225     when Integer
226       sf.offset += n
227       0 == (sf.count -= n) and return sf.close
228     else
229       return n # :wait_writable or nil
230     end while true
231     rescue
232       sf.close
233       raise
234   end
236   def defer_file_stream(offset, count, io, body)
237     sf = Rainbows::StreamFile.new(offset, count, io, body)
238     unless @wr_queue[0]
239       stream_file(sf) or return
240     end
241     @wr_queue << sf
242     EP.set(self, OUT)
243   end
245   # this alternates between a push and pull model from the pipe -> client
246   # to avoid having too much data in userspace on either end.
247   def stream_pipe(pipe)
248     case buf = pipe.tryread
249     when String
250       write(buf)
251       if @wr_queue[0]
252         # client is blocked on write, client will pull from pipe later
253         EP.delete pipe
254         @wr_queue << pipe
255         EP.set(self, OUT)
256         return :deferred
257       end
258       # continue looping...
259     when :wait_readable
260       # pipe blocked on read, let the pipe push to the client in the future
261       EP.delete self
262       EP.set(pipe, IN)
263       return :deferred
264     else # nil => EOF
265       return pipe.close # nil
266     end while true
267     rescue
268       pipe.close
269       raise
270   end