epoll: reduce expiration calls and Time objects
[rainbows.git] / lib / rainbows / epoll / client.rb
blobf4e49dc51c4495603d9b14f65e6bb1a0e5cd0ed3
1 # -*- encoding: binary -*-
2 # :enddoc:
4 module Rainbows::Epoll::Client
6   include Rainbows::EvCore
7   APP = Rainbows.server.app
8   Server = Rainbows::Epoll::Server
9   IN = SleepyPenguin::Epoll::IN | SleepyPenguin::Epoll::ET
10   INLT = SleepyPenguin::Epoll::IN
11   OUT = SleepyPenguin::Epoll::OUT | SleepyPenguin::Epoll::ET
12   KATO = {}
13   KATO.compare_by_identity if KATO.respond_to?(:compare_by_identity)
14   KEEPALIVE_TIMEOUT = Rainbows.keepalive_timeout
15   EP = Rainbows::Epoll::EP
16   @@last_expire = Time.now
18   def self.expire
19     return if ((now = Time.now) - @@last_expire) < 1.0
20     if (ot = KEEPALIVE_TIMEOUT) >= 0
21       ot = now - ot
22       KATO.delete_if { |client, time| time < ot and client.timeout! }
23     end
24     @@last_expire = now
25   end
27   # only call this once
28   def epoll_once
29     @wr_queue = [] # may contain String, ResponsePipe, and StreamFile objects
30     post_init
31     epoll_run
32     rescue => e
33       handle_error(e)
34   end
36   def on_readable
37     case rv = kgio_tryread(16384, RBUF)
38     when String
39       on_read(rv)
40       return if @wr_queue[0] || closed?
41     when :wait_readable
42       KATO[self] = @@last_expire if :headers == @state
43       return EP.set(self, IN)
44     else
45       break
46     end until :close == @state
47     close unless closed?
48     rescue Errno::ECONNRESET
49       close
50     rescue IOError
51   end
53   def app_call # called by on_read()
54     @env[RACK_INPUT] = @input
55     @env[REMOTE_ADDR] = kgio_addr
56     status, headers, body = APP.call(@env.merge!(RACK_DEFAULTS))
57     ev_write_response(status, headers, body, @hp.next?)
58   end
60   def write_response_path(status, headers, body, alive)
61     io = body_to_io(body)
62     st = io.stat
64     if st.file?
65       defer_file(status, headers, body, alive, io, st)
66     elsif st.socket? || st.pipe?
67       chunk = stream_response_headers(status, headers, alive)
68       stream_response_body(body, io, chunk)
69     else
70       # char or block device... WTF?
71       write_response(status, headers, body, alive)
72     end
73   end
75   # used for streaming sockets and pipes
76   def stream_response_body(body, io, chunk)
77     pipe = (chunk ? Rainbows::Epoll::ResponseChunkPipe :
78                     Rainbows::Epoll::ResponsePipe).new(io, self, body)
79     return @wr_queue << pipe if @wr_queue[0]
80     stream_pipe(pipe) or return
81     @wr_queue[0] or @wr_queue << Z
82   end
84   def ev_write_response(status, headers, body, alive)
85     if body.respond_to?(:to_path)
86       write_response_path(status, headers, body, alive)
87     else
88       write_response(status, headers, body, alive)
89     end
90     @state = alive ? :headers : :close
91     on_read(Z) if alive && 0 == @wr_queue.size && 0 != @buf.size
92   end
94   def epoll_run
95     if @wr_queue[0]
96       on_writable
97     else
98       KATO.delete self
99       on_readable
100     end
101   end
103   def want_more
104     Server::ReRun << self
105   end
107   def on_deferred_write_complete
108     :close == @state and return close
109     0 == @buf.size ? on_readable : on_read(Z)
110   end
112   def handle_error(e)
113     msg = Rainbows::Error.response(e) and kgio_trywrite(msg) rescue nil
114     ensure
115       close
116   end
118   def write_deferred(obj)
119     Rainbows::StreamFile === obj ? stream_file(obj) : stream_pipe(obj)
120   end
122   # writes until our write buffer is empty or we block
123   # returns true if we're done writing everything
124   def on_writable
125     obj = @wr_queue.shift
127     case rv = String === obj ? kgio_trywrite(obj) : write_deferred(obj)
128     when nil
129       obj = @wr_queue.shift or return on_deferred_write_complete
130     when String
131       obj = rv # retry
132     when :wait_writable # Strings and StreamFiles only
133       @wr_queue.unshift(obj)
134       EP.set(self, OUT)
135       return
136     when :deferred
137       return
138     end while true
139     rescue => e
140       handle_error(e)
141   end
143   # this returns an +Array+ write buffer if blocked
144   def write(buf)
145     unless @wr_queue[0]
146       case rv = kgio_trywrite(buf)
147       when nil
148         return # all written
149       when String
150         buf = rv # retry
151       when :wait_writable
152         EP.set(self, OUT)
153         break # queue
154       end while true
155     end
156     @wr_queue << buf.dup # >3-word 1.9 strings are copy-on-write
157   end
159   def close
160     @wr_queue.each { |x| x.respond_to?(:close) and x.close rescue nil }
161     super
162     KATO.delete(self)
163     Server.decr
164   end
166   def timeout!
167     close
168     true
169   end
171   def defer_file(status, headers, body, alive, io, st)
172     if r = sendfile_range(status, headers)
173       status, headers, range = r
174       write_headers(status, headers, alive)
175       range and defer_file_stream(range[0], range[1], io, body)
176     else
177       write_headers(status, headers, alive)
178       defer_file_stream(0, st.size, io, body)
179     end
180   end
182   # returns +nil+ on EOF, :wait_writable if the client blocks
183   def stream_file(sf) # +sf+ is a Rainbows::StreamFile object
184     begin
185       sf.offset += (n = sendfile_nonblock(sf, sf.offset, sf.count))
186       0 == (sf.count -= n) and return sf.close
187     rescue Errno::EAGAIN
188       return :wait_writable
189     rescue
190       sf.close
191       raise
192     end while true
193   end
195   def defer_file_stream(offset, count, io, body)
196     sf = Rainbows::StreamFile.new(offset, count, io, body)
197     unless @wr_queue[0]
198       stream_file(sf) or return
199     end
200     @wr_queue << sf
201     EP.set(self, OUT)
202   end
204   # this alternates between a push and pull model from the pipe -> client
205   # to avoid having too much data in userspace on either end.
206   def stream_pipe(pipe)
207     case buf = pipe.tryread
208     when String
209       if Array === write(buf)
210         # client is blocked on write, client will pull from pipe later
211         EP.delete pipe
212         @wr_queue << pipe
213         EP.set(self, OUT)
214         return :deferred
215       end
216       # continue looping...
217     when :wait_readable
218       # pipe blocked on read, let the pipe push to the client in the future
219       EP.delete self
220       EP.set(pipe, IN)
221       return :deferred
222     else # nil => EOF
223       return pipe.close # nil
224     end while true
225     rescue => e
226       pipe.close
227       raise
228   end