initial edge-triggered epoll model
[rainbows.git] / lib / rainbows / epoll / client.rb
bloba3ae6db5989378c85f45747e15c934f9e09f1431
1 # -*- encoding: binary -*-
2 # :enddoc:
4 module Rainbows::Epoll::Client
5   attr_reader :wr_queue, :state, :epoll_active
7   include Rainbows::Epoll::State
8   include Rainbows::EvCore
9   APP = Rainbows.server.app
10   Server = Rainbows::Epoll::Server
11   IN = SleepyPenguin::Epoll::IN | SleepyPenguin::Epoll::ET
12   INLT = SleepyPenguin::Epoll::IN
13   OUT = SleepyPenguin::Epoll::OUT | SleepyPenguin::Epoll::ET
14   KATO = {}
15   KATO.compare_by_identity if KATO.respond_to?(:compare_by_identity)
16   KEEPALIVE_TIMEOUT = Rainbows.keepalive_timeout
18   def self.expire
19     if (ot = KEEPALIVE_TIMEOUT) >= 0
20       ot = Time.now - ot
21       KATO.delete_if { |client, time| time < ot and client.timeout! }
22     end
23   end
25   # only call this once
26   def epoll_once
27     @wr_queue = [] # may contain String, ResponsePipe, and StreamFile objects
28     @epoll_active = false
29     post_init
30     epoll_run
31     rescue => e
32       handle_error(e)
33   end
35   def on_readable
36     case rv = kgio_tryread(16384, RBUF)
37     when String
38       on_read(rv)
39       return if @wr_queue[0] || closed?
40     when :wait_readable
41       KATO[self] = Time.now if :headers == @state
42       return epoll_enable(IN)
43     else
44       break
45     end until :close == @state
46     close unless closed?
47     rescue IOError
48   end
50   def app_call # called by on_read()
51     @env[RACK_INPUT] = @input
52     @env[REMOTE_ADDR] = kgio_addr
53     status, headers, body = APP.call(@env.merge!(RACK_DEFAULTS))
54     ev_write_response(status, headers, body, @hp.next?)
55   end
57   def write_response_path(status, headers, body, alive)
58     io = body_to_io(body)
59     st = io.stat
61     if st.file?
62       defer_file(status, headers, body, alive, io, st)
63     elsif st.socket? || st.pipe?
64       chunk = stream_response_headers(status, headers, alive)
65       stream_response_body(body, io, chunk)
66     else
67       # char or block device... WTF?
68       write_response(status, headers, body, alive)
69     end
70   end
72   # used for streaming sockets and pipes
73   def stream_response_body(body, io, chunk)
74     pipe = (chunk ? Rainbows::Epoll::ResponseChunkPipe :
75                     Rainbows::Epoll::ResponsePipe).new(io, self, body)
76     return @wr_queue << pipe if @wr_queue[0]
77     stream_pipe(pipe) or return
78     @wr_queue[0] or @wr_queue << ""
79   end
81   def ev_write_response(status, headers, body, alive)
82     if body.respond_to?(:to_path)
83       write_response_path(status, headers, body, alive)
84     else
85       write_response(status, headers, body, alive)
86     end
87     @state = alive ? :headers : :close
88     on_read("") if alive && 0 == @wr_queue.size && 0 != @buf.size
89   end
91   def epoll_run
92     if @wr_queue[0]
93       on_writable
94     else
95       KATO.delete self
96       on_readable
97     end
98   end
100   def want_more
101     Server::ReRun << self
102   end
104   def on_deferred_write_complete
105     :close == @state and return close
106     0 == @buf.size ? on_readable : on_read("")
107   end
109   def handle_error(e)
110     msg = Rainbows::Error.response(e) and kgio_trywrite(msg) rescue nil
111     ensure
112       close
113   end
115   def write_deferred(obj)
116     Rainbows::StreamFile === obj ? stream_file(obj) : stream_pipe(obj)
117   end
119   # writes until our write buffer is empty or we block
120   # returns true if we're done writing everything
121   def on_writable
122     obj = @wr_queue.shift
124     case rv = String === obj ? kgio_trywrite(obj) : write_deferred(obj)
125     when nil
126       obj = @wr_queue.shift or return on_deferred_write_complete
127     when String
128       obj = rv # retry
129     when :wait_writable # Strings and StreamFiles only
130       @wr_queue.unshift(obj)
131       epoll_enable(OUT)
132       return
133     when :deferred
134       return
135     end while true
136     rescue => e
137       handle_error(e)
138   end
140   # this returns an +Array+ write buffer if blocked
141   def write(buf)
142     unless @wr_queue[0]
143       case rv = kgio_trywrite(buf)
144       when nil
145         return # all written
146       when String
147         buf = rv # retry
148       when :wait_writable
149         epoll_enable(OUT)
150         break # queue
151       end while true
152     end
153     @wr_queue << buf.dup # >3-word 1.9 strings are copy-on-write
154   end
156   def close
157     @wr_queue.each { |x| x.respond_to?(:close) and x.close rescue nil }
158     super
159     KATO.delete(self)
160     Server.decr
161   end
163   def timeout!
164     close
165     true
166   end
168   def defer_file(status, headers, body, alive, io, st)
169     if r = sendfile_range(status, headers)
170       status, headers, range = r
171       write_headers(status, headers, alive)
172       range and defer_file_stream(range[0], range[1], io, body)
173     else
174       write_headers(status, headers, alive)
175       defer_file_stream(0, st.size, io, body)
176     end
177   end
179   # returns +nil+ on EOF, :wait_writable if the client blocks
180   def stream_file(sf) # +sf+ is a Rainbows::StreamFile object
181     begin
182       sf.offset += (n = sendfile_nonblock(sf, sf.offset, sf.count))
183       0 == (sf.count -= n) and return sf.close
184     rescue Errno::EAGAIN
185       return :wait_writable
186     rescue
187       sf.close
188       raise
189     end while true
190   end
192   def defer_file_stream(offset, count, io, body)
193     sf = Rainbows::StreamFile.new(offset, count, io, body)
194     unless @wr_queue[0]
195       stream_file(sf) or return
196     end
197     @wr_queue << sf
198     epoll_enable(OUT)
199   end
201   # this alternates between a push and pull model from the pipe -> client
202   # to avoid having too much data in userspace on either end.
203   def stream_pipe(pipe)
204     case buf = pipe.tryread
205     when String
206       if Array === write(buf)
207         # client is blocked on write, client will pull from pipe later
208         pipe.epoll_disable
209         @wr_queue << pipe
210         epoll_enable(OUT)
211         return :deferred
212       end
213       # continue looping...
214     when :wait_readable
215       # pipe blocked on read, let the pipe push to the client in the future
216       epoll_disable
217       pipe.epoll_enable(IN)
218       return :deferred
219     else # nil => EOF
220       return pipe.close # nil
221     end while true
222     rescue => e
223       pipe.close
224       raise
225   end