switch from IO#sendfile_nonblock to IO#trysendfile
[rainbows.git] / lib / rainbows / epoll / client.rb
blobb7b0c9eaff128aa4231c1125930d021c289627ad
1 # -*- encoding: binary -*-
2 # :enddoc:
4 module Rainbows::Epoll::Client
6   include Rainbows::EvCore
7   APP = Rainbows.server.app
8   Server = Rainbows::Epoll::Server
9   IN = SleepyPenguin::Epoll::IN | SleepyPenguin::Epoll::ET
10   OUT = SleepyPenguin::Epoll::OUT | SleepyPenguin::Epoll::ET
11   KATO = {}
12   KATO.compare_by_identity if KATO.respond_to?(:compare_by_identity)
13   KEEPALIVE_TIMEOUT = Rainbows.keepalive_timeout
14   EP = Rainbows::Epoll::EP
15   @@last_expire = Time.now
17   def self.expire
18     return if ((now = Time.now) - @@last_expire) < 1.0
19     if (ot = KEEPALIVE_TIMEOUT) >= 0
20       ot = now - ot
21       KATO.delete_if { |client, time| time < ot and client.timeout! }
22     end
23     @@last_expire = now
24   end
26   # only call this once
27   def epoll_once
28     @wr_queue = [] # may contain String, ResponsePipe, and StreamFile objects
29     post_init
30     on_readable
31     rescue => e
32       handle_error(e)
33   end
35   def on_readable
36     case rv = kgio_tryread(16384, RBUF)
37     when String
38       on_read(rv)
39       return if @wr_queue[0] || closed?
40     when :wait_readable
41       KATO[self] = @@last_expire if :headers == @state
42       return EP.set(self, IN)
43     else
44       break
45     end until :close == @state
46     close unless closed?
47     rescue Errno::ECONNRESET
48       close
49     rescue IOError
50   end
52   def app_call input # called by on_read()
53     @env[RACK_INPUT] = input
54     @env[REMOTE_ADDR] = kgio_addr
55     status, headers, body = APP.call(@env.merge!(RACK_DEFAULTS))
56     ev_write_response(status, headers, body, @hp.next?)
57   end
59   def write_response_path(status, headers, body, alive)
60     io = body_to_io(body)
61     st = io.stat
63     if st.file?
64       defer_file(status, headers, body, alive, io, st)
65     elsif st.socket? || st.pipe?
66       chunk = stream_response_headers(status, headers, alive)
67       stream_response_body(body, io, chunk)
68     else
69       # char or block device... WTF?
70       write_response(status, headers, body, alive)
71     end
72   end
74   # used for streaming sockets and pipes
75   def stream_response_body(body, io, chunk)
76     pipe = (chunk ? Rainbows::Epoll::ResponseChunkPipe :
77                     Rainbows::Epoll::ResponsePipe).new(io, self, body)
78     return @wr_queue << pipe if @wr_queue[0]
79     stream_pipe(pipe) or return
80     @wr_queue[0] or @wr_queue << Z
81   end
83   def ev_write_response(status, headers, body, alive)
84     if body.respond_to?(:to_path)
85       write_response_path(status, headers, body, alive)
86     else
87       write_response(status, headers, body, alive)
88     end
89     @state = alive ? :headers : :close
90     on_read(Z) if alive && 0 == @wr_queue.size && 0 != @buf.size
91   end
93   def epoll_run
94     if @wr_queue[0]
95       on_writable
96     else
97       KATO.delete self
98       on_readable
99     end
100   end
102   def want_more
103     Rainbows::Epoll::ReRun << self
104   end
106   def on_deferred_write_complete
107     :close == @state and return close
108     0 == @buf.size ? on_readable : on_read(Z)
109   end
111   def handle_error(e)
112     msg = Rainbows::Error.response(e) and kgio_trywrite(msg) rescue nil
113     ensure
114       close
115   end
117   def write_deferred(obj)
118     Rainbows::StreamFile === obj ? stream_file(obj) : stream_pipe(obj)
119   end
121   # writes until our write buffer is empty or we block
122   # returns true if we're done writing everything
123   def on_writable
124     obj = @wr_queue.shift
126     case rv = String === obj ? kgio_trywrite(obj) : write_deferred(obj)
127     when nil
128       obj = @wr_queue.shift or return on_deferred_write_complete
129     when String
130       obj = rv # retry
131     when :wait_writable # Strings and StreamFiles only
132       @wr_queue.unshift(obj)
133       EP.set(self, OUT)
134       return
135     when :deferred
136       return
137     end while true
138     rescue => e
139       handle_error(e)
140   end
142   def write(buf)
143     unless @wr_queue[0]
144       case rv = kgio_trywrite(buf)
145       when nil
146         return # all written
147       when String
148         buf = rv # retry
149       when :wait_writable
150         @wr_queue << buf.dup # >3-word 1.9 strings are copy-on-write
151         return EP.set(self, OUT)
152       end while true
153     end
154     @wr_queue << buf.dup # >3-word 1.9 strings are copy-on-write
155   end
157   def close
158     @wr_queue.each { |x| x.respond_to?(:close) and x.close rescue nil }
159     super
160     on_close
161   end
163   def on_close
164     KATO.delete(self)
165     Server.decr
166   end
168   def timeout!
169     close
170     true
171   end
173   def defer_file(status, headers, body, alive, io, st)
174     if r = sendfile_range(status, headers)
175       status, headers, range = r
176       write_headers(status, headers, alive)
177       range and defer_file_stream(range[0], range[1], io, body)
178     else
179       write_headers(status, headers, alive)
180       defer_file_stream(0, st.size, io, body)
181     end
182   end
184   # returns +nil+ on EOF, :wait_writable if the client blocks
185   def stream_file(sf) # +sf+ is a Rainbows::StreamFile object
186     case n = trysendfile(sf, sf.offset, sf.count)
187     when Integer
188       sf.offset += n
189       0 == (sf.count -= n) and return sf.close
190     else
191       return n # :wait_writable or nil
192     end while true
193     rescue
194       sf.close
195       raise
196   end
198   def defer_file_stream(offset, count, io, body)
199     sf = Rainbows::StreamFile.new(offset, count, io, body)
200     unless @wr_queue[0]
201       stream_file(sf) or return
202     end
203     @wr_queue << sf
204     EP.set(self, OUT)
205   end
207   # this alternates between a push and pull model from the pipe -> client
208   # to avoid having too much data in userspace on either end.
209   def stream_pipe(pipe)
210     case buf = pipe.tryread
211     when String
212       write(buf)
213       if @wr_queue[0]
214         # client is blocked on write, client will pull from pipe later
215         EP.delete pipe
216         @wr_queue << pipe
217         EP.set(self, OUT)
218         return :deferred
219       end
220       # continue looping...
221     when :wait_readable
222       # pipe blocked on read, let the pipe push to the client in the future
223       EP.delete self
224       EP.set(pipe, IN)
225       return :deferred
226     else # nil => EOF
227       return pipe.close # nil
228     end while true
229     rescue => e
230       pipe.close
231       raise
232   end