d72696bfe6f18d634e49862f792d89d9fc64cd3b
[rainbows.git] / lib / rainbows / epoll / client.rb
blobd72696bfe6f18d634e49862f792d89d9fc64cd3b
1 # -*- encoding: binary -*-
2 # :enddoc:
4 module Rainbows::Epoll::Client
6   include Rainbows::EvCore
7   APP = Rainbows.server.app
8   Server = Rainbows::Epoll::Server
9   IN = SleepyPenguin::Epoll::IN | SleepyPenguin::Epoll::ET
10   OUT = SleepyPenguin::Epoll::OUT | SleepyPenguin::Epoll::ET
11   KATO = {}
12   KATO.compare_by_identity if KATO.respond_to?(:compare_by_identity)
13   Rainbows.at_quit { KATO.each_key { |k| k.timeout! }.clear }
14   Rainbows.config!(self, :keepalive_timeout)
15   EP = Rainbows::EP
16   ReRun = []
17   @@last_expire = Time.now
19   def self.expire
20     return if ((now = Time.now) - @@last_expire) < 1.0
21     if (ot = KEEPALIVE_TIMEOUT) >= 0
22       ot = now - ot
23       KATO.delete_if { |client, time| time < ot and client.timeout! }
24     end
25     @@last_expire = now
26   end
28   def self.loop
29     begin
30       EP.wait(nil, 1000) { |_, obj| obj.epoll_run }
31       while obj = ReRun.shift
32         obj.epoll_run
33       end
34       expire
35     rescue Errno::EINTR
36     rescue => e
37       Rainbows::Error.listen_loop(e)
38     end while Rainbows.tick || Server.nr > 0
39   end
41   # only call this once
42   def epoll_once
43     @wr_queue = [] # may contain String, ResponsePipe, and StreamFile objects
44     post_init
45     on_readable
46     rescue => e
47       handle_error(e)
48   end
50   def on_readable
51     case rv = kgio_tryread(CLIENT_HEADER_BUFFER_SIZE, RBUF)
52     when String
53       on_read(rv)
54       return if @wr_queue[0] || closed?
55     when :wait_readable
56       KATO[self] = @@last_expire if :headers == @state
57       return EP.set(self, IN)
58     else
59       break
60     end until :close == @state
61     close unless closed?
62     rescue Errno::ECONNRESET
63       close
64     rescue IOError
65   end
67   def app_call input # called by on_read()
68     @env[RACK_INPUT] = input
69     @env[REMOTE_ADDR] = kgio_addr
70     status, headers, body = APP.call(@env.merge!(RACK_DEFAULTS))
71     ev_write_response(status, headers, body, @hp.next?)
72   end
74   def write_response_path(status, headers, body, alive)
75     io = body_to_io(body)
76     st = io.stat
78     if st.file?
79       defer_file(status, headers, body, alive, io, st)
80     elsif st.socket? || st.pipe?
81       chunk = stream_response_headers(status, headers, alive)
82       stream_response_body(body, io, chunk)
83     else
84       # char or block device... WTF?
85       write_response(status, headers, body, alive)
86     end
87   end
89   # used for streaming sockets and pipes
90   def stream_response_body(body, io, chunk)
91     pipe = (chunk ? Rainbows::Epoll::ResponseChunkPipe :
92                     Rainbows::Epoll::ResponsePipe).new(io, self, body)
93     return @wr_queue << pipe if @wr_queue[0]
94     stream_pipe(pipe) or return
95     @wr_queue[0] or @wr_queue << Z
96   end
98   def ev_write_response(status, headers, body, alive)
99     @state = alive ? :headers : :close
100     if body.respond_to?(:to_path)
101       write_response_path(status, headers, body, alive)
102     else
103       write_response(status, headers, body, alive)
104     end
105     # try to read more if we didn't have to buffer writes
106     next_request if alive && 0 == @wr_queue.size
107   end
109   def next_request
110     if 0 == @buf.size
111       want_more
112     else
113       # pipelined request (already in buffer)
114       on_read(Z)
115       return if @wr_queue[0] || closed?
116       close if :close == @state
117     end
118   end
120   def epoll_run
121     if @wr_queue[0]
122       on_writable
123     else
124       KATO.delete self
125       on_readable
126     end
127   end
129   def want_more
130     ReRun << self
131   end
133   def on_deferred_write_complete
134     :close == @state and return close
135     next_request
136   end
138   def handle_error(e)
139     msg = Rainbows::Error.response(e) and kgio_trywrite(msg) rescue nil
140     ensure
141       close
142   end
144   def write_deferred(obj)
145     Rainbows::StreamFile === obj ? stream_file(obj) : stream_pipe(obj)
146   end
148   # writes until our write buffer is empty or we block
149   # returns true if we're done writing everything
150   def on_writable
151     obj = @wr_queue.shift
153     case rv = String === obj ? kgio_trywrite(obj) : write_deferred(obj)
154     when nil
155       obj = @wr_queue.shift or return on_deferred_write_complete
156     when String
157       obj = rv # retry
158     when :wait_writable # Strings and StreamFiles only
159       @wr_queue.unshift(obj)
160       EP.set(self, OUT)
161       return
162     when :deferred
163       return
164     end while true
165     rescue => e
166       handle_error(e)
167   end
169   def write(buf)
170     unless @wr_queue[0]
171       case rv = kgio_trywrite(buf)
172       when nil
173         return # all written
174       when String
175         buf = rv # retry
176       when :wait_writable
177         @wr_queue << buf.dup # >3-word 1.9 strings are copy-on-write
178         return EP.set(self, OUT)
179       end while true
180     end
181     @wr_queue << buf.dup # >3-word 1.9 strings are copy-on-write
182   end
184   def close
185     @wr_queue.each { |x| x.respond_to?(:close) and x.close rescue nil }
186     super
187     on_close
188   end
190   def on_close
191     KATO.delete(self)
192     Server.decr
193   end
195   def timeout!
196     shutdown
197     true
198   end
200   def defer_file(status, headers, body, alive, io, st)
201     if r = sendfile_range(status, headers)
202       status, headers, range = r
203       write_headers(status, headers, alive)
204       range and defer_file_stream(range[0], range[1], io, body)
205     else
206       write_headers(status, headers, alive)
207       defer_file_stream(0, st.size, io, body)
208     end
209   end
211   # returns +nil+ on EOF, :wait_writable if the client blocks
212   def stream_file(sf) # +sf+ is a Rainbows::StreamFile object
213     case n = trysendfile(sf, sf.offset, sf.count)
214     when Integer
215       sf.offset += n
216       0 == (sf.count -= n) and return sf.close
217     else
218       return n # :wait_writable or nil
219     end while true
220     rescue
221       sf.close
222       raise
223   end
225   def defer_file_stream(offset, count, io, body)
226     sf = Rainbows::StreamFile.new(offset, count, io, body)
227     unless @wr_queue[0]
228       stream_file(sf) or return
229     end
230     @wr_queue << sf
231     EP.set(self, OUT)
232   end
234   # this alternates between a push and pull model from the pipe -> client
235   # to avoid having too much data in userspace on either end.
236   def stream_pipe(pipe)
237     case buf = pipe.tryread
238     when String
239       write(buf)
240       if @wr_queue[0]
241         # client is blocked on write, client will pull from pipe later
242         EP.delete pipe
243         @wr_queue << pipe
244         EP.set(self, OUT)
245         return :deferred
246       end
247       # continue looping...
248     when :wait_readable
249       # pipe blocked on read, let the pipe push to the client in the future
250       EP.delete self
251       EP.set(pipe, IN)
252       return :deferred
253     else # nil => EOF
254       return pipe.close # nil
255     end while true
256     rescue
257       pipe.close
258       raise
259   end