1 # -*- encoding: binary -*-
4 module Rainbows::Epoll::Client
5 attr_reader :wr_queue, :state, :epoll_active
7 include Rainbows::Epoll::State
8 include Rainbows::EvCore
9 APP = Rainbows.server.app
10 Server = Rainbows::Epoll::Server
11 IN = SleepyPenguin::Epoll::IN | SleepyPenguin::Epoll::ET
12 INLT = SleepyPenguin::Epoll::IN
13 OUT = SleepyPenguin::Epoll::OUT | SleepyPenguin::Epoll::ET
15 KATO.compare_by_identity if KATO.respond_to?(:compare_by_identity)
16 KEEPALIVE_TIMEOUT = Rainbows.keepalive_timeout
19 if (ot = KEEPALIVE_TIMEOUT) >= 0
21 KATO.delete_if { |client, time| time < ot and client.timeout! }
27 @wr_queue = [] # may contain String, ResponsePipe, and StreamFile objects
36 case rv = kgio_tryread(16384, RBUF)
39 return if @wr_queue[0] || closed?
41 KATO[self] = Time.now if :headers == @state
42 return epoll_enable(IN)
45 end until :close == @state
50 def app_call # called by on_read()
51 @env[RACK_INPUT] = @input
52 @env[REMOTE_ADDR] = kgio_addr
53 status, headers, body = APP.call(@env.merge!(RACK_DEFAULTS))
54 ev_write_response(status, headers, body, @hp.next?)
57 def write_response_path(status, headers, body, alive)
62 defer_file(status, headers, body, alive, io, st)
63 elsif st.socket? || st.pipe?
64 chunk = stream_response_headers(status, headers, alive)
65 stream_response_body(body, io, chunk)
67 # char or block device... WTF?
68 write_response(status, headers, body, alive)
72 # used for streaming sockets and pipes
73 def stream_response_body(body, io, chunk)
74 pipe = (chunk ? Rainbows::Epoll::ResponseChunkPipe :
75 Rainbows::Epoll::ResponsePipe).new(io, self, body)
76 return @wr_queue << pipe if @wr_queue[0]
77 stream_pipe(pipe) or return
78 @wr_queue[0] or @wr_queue << ""
81 def ev_write_response(status, headers, body, alive)
82 if body.respond_to?(:to_path)
83 write_response_path(status, headers, body, alive)
85 write_response(status, headers, body, alive)
87 @state = alive ? :headers : :close
88 on_read("") if alive && 0 == @wr_queue.size && 0 != @buf.size
101 Server::ReRun << self
104 def on_deferred_write_complete
105 :close == @state and return close
106 0 == @buf.size ? on_readable : on_read("")
110 msg = Rainbows::Error.response(e) and kgio_trywrite(msg) rescue nil
115 def write_deferred(obj)
116 Rainbows::StreamFile === obj ? stream_file(obj) : stream_pipe(obj)
119 # writes until our write buffer is empty or we block
120 # returns true if we're done writing everything
122 obj = @wr_queue.shift
124 case rv = String === obj ? kgio_trywrite(obj) : write_deferred(obj)
126 obj = @wr_queue.shift or return on_deferred_write_complete
129 when :wait_writable # Strings and StreamFiles only
130 @wr_queue.unshift(obj)
140 # this returns an +Array+ write buffer if blocked
143 case rv = kgio_trywrite(buf)
153 @wr_queue << buf.dup # >3-word 1.9 strings are copy-on-write
157 @wr_queue.each { |x| x.respond_to?(:close) and x.close rescue nil }
168 def defer_file(status, headers, body, alive, io, st)
169 if r = sendfile_range(status, headers)
170 status, headers, range = r
171 write_headers(status, headers, alive)
172 range and defer_file_stream(range[0], range[1], io, body)
174 write_headers(status, headers, alive)
175 defer_file_stream(0, st.size, io, body)
179 # returns +nil+ on EOF, :wait_writable if the client blocks
180 def stream_file(sf) # +sf+ is a Rainbows::StreamFile object
182 sf.offset += (n = sendfile_nonblock(sf, sf.offset, sf.count))
183 0 == (sf.count -= n) and return sf.close
185 return :wait_writable
192 def defer_file_stream(offset, count, io, body)
193 sf = Rainbows::StreamFile.new(offset, count, io, body)
195 stream_file(sf) or return
201 # this alternates between a push and pull model from the pipe -> client
202 # to avoid having too much data in userspace on either end.
203 def stream_pipe(pipe)
204 case buf = pipe.tryread
206 if Array === write(buf)
207 # client is blocked on write, client will pull from pipe later
213 # continue looping...
215 # pipe blocked on read, let the pipe push to the client in the future
217 pipe.epoll_enable(IN)
220 return pipe.close # nil