From 9424b13255a238dfa44952ebeb07bea3acee999c Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Wed, 19 Jan 2011 15:06:10 -0800 Subject: [PATCH] initial edge-triggered epoll model Coolio and EventMachine only use level-triggered epoll, but being Rainbows!, we live on the EDGE! --- lib/rainbows.rb | 1 + lib/rainbows/epoll.rb | 22 +++ lib/rainbows/epoll/client.rb | 226 ++++++++++++++++++++++++++++++ lib/rainbows/epoll/response_chunk_pipe.rb | 18 +++ lib/rainbows/epoll/response_pipe.rb | 38 +++++ lib/rainbows/epoll/server.rb | 43 ++++++ lib/rainbows/epoll/state.rb | 22 +++ lib/rainbows/http_server.rb | 2 +- t/GNUmakefile | 1 + t/kgio-pipe-response.ru | 10 +- t/simple-http_Epoll.ru | 9 ++ t/t0034-pipelined-pipe-response.sh | 3 +- t/t0035-kgio-pipe-response.sh | 2 +- t/t0113-rewindable-input-false.sh | 1 + t/t0114-rewindable-input-true.sh | 1 + t/test_isolate.rb | 2 + 16 files changed, 397 insertions(+), 4 deletions(-) create mode 100644 lib/rainbows/epoll.rb create mode 100644 lib/rainbows/epoll/client.rb create mode 100644 lib/rainbows/epoll/response_chunk_pipe.rb create mode 100644 lib/rainbows/epoll/response_pipe.rb create mode 100644 lib/rainbows/epoll/server.rb create mode 100644 lib/rainbows/epoll/state.rb create mode 100644 t/simple-http_Epoll.ru diff --git a/lib/rainbows.rb b/lib/rainbows.rb index 76cb728..5de8a80 100644 --- a/lib/rainbows.rb +++ b/lib/rainbows.rb @@ -121,6 +121,7 @@ module Rainbows :CoolioThreadSpawn => 50, :CoolioThreadPool => 50, :CoolioFiberSpawn => 50, + :Epoll => 50, :EventMachine => 50, :FiberSpawn => 50, :FiberPool => 50, diff --git a/lib/rainbows/epoll.rb b/lib/rainbows/epoll.rb new file mode 100644 index 0000000..8698f78 --- /dev/null +++ b/lib/rainbows/epoll.rb @@ -0,0 +1,22 @@ +# -*- encoding: binary -*- +# :enddoc: +require 'sleepy_penguin' +require 'sendfile' + +# Edge-triggered epoll concurrency model. This is extremely unfair +# and optimized for throughput at the expense of fairness +module Rainbows::Epoll + include Rainbows::Base + autoload :State, 'rainbows/epoll/state' + autoload :Server, 'rainbows/epoll/server' + autoload :Client, 'rainbows/epoll/client' + autoload :ResponsePipe, 'rainbows/epoll/response_pipe' + autoload :ResponseChunkPipe, 'rainbows/epoll/response_chunk_pipe' + + def worker_loop(worker) # :nodoc: + init_worker_process(worker) + Rainbows::EvCore.setup + Rainbows::Client.__send__ :include, Client + Server.run + end +end diff --git a/lib/rainbows/epoll/client.rb b/lib/rainbows/epoll/client.rb new file mode 100644 index 0000000..a3ae6db --- /dev/null +++ b/lib/rainbows/epoll/client.rb @@ -0,0 +1,226 @@ +# -*- encoding: binary -*- +# :enddoc: + +module Rainbows::Epoll::Client + attr_reader :wr_queue, :state, :epoll_active + + include Rainbows::Epoll::State + include Rainbows::EvCore + APP = Rainbows.server.app + Server = Rainbows::Epoll::Server + IN = SleepyPenguin::Epoll::IN | SleepyPenguin::Epoll::ET + INLT = SleepyPenguin::Epoll::IN + OUT = SleepyPenguin::Epoll::OUT | SleepyPenguin::Epoll::ET + KATO = {} + KATO.compare_by_identity if KATO.respond_to?(:compare_by_identity) + KEEPALIVE_TIMEOUT = Rainbows.keepalive_timeout + + def self.expire + if (ot = KEEPALIVE_TIMEOUT) >= 0 + ot = Time.now - ot + KATO.delete_if { |client, time| time < ot and client.timeout! } + end + end + + # only call this once + def epoll_once + @wr_queue = [] # may contain String, ResponsePipe, and StreamFile objects + @epoll_active = false + post_init + epoll_run + rescue => e + handle_error(e) + end + + def on_readable + case rv = kgio_tryread(16384, RBUF) + when String + on_read(rv) + return if @wr_queue[0] || closed? + when :wait_readable + KATO[self] = Time.now if :headers == @state + return epoll_enable(IN) + else + break + end until :close == @state + close unless closed? + rescue IOError + end + + def app_call # called by on_read() + @env[RACK_INPUT] = @input + @env[REMOTE_ADDR] = kgio_addr + status, headers, body = APP.call(@env.merge!(RACK_DEFAULTS)) + ev_write_response(status, headers, body, @hp.next?) + end + + def write_response_path(status, headers, body, alive) + io = body_to_io(body) + st = io.stat + + if st.file? + defer_file(status, headers, body, alive, io, st) + elsif st.socket? || st.pipe? + chunk = stream_response_headers(status, headers, alive) + stream_response_body(body, io, chunk) + else + # char or block device... WTF? + write_response(status, headers, body, alive) + end + end + + # used for streaming sockets and pipes + def stream_response_body(body, io, chunk) + pipe = (chunk ? Rainbows::Epoll::ResponseChunkPipe : + Rainbows::Epoll::ResponsePipe).new(io, self, body) + return @wr_queue << pipe if @wr_queue[0] + stream_pipe(pipe) or return + @wr_queue[0] or @wr_queue << "" + end + + def ev_write_response(status, headers, body, alive) + if body.respond_to?(:to_path) + write_response_path(status, headers, body, alive) + else + write_response(status, headers, body, alive) + end + @state = alive ? :headers : :close + on_read("") if alive && 0 == @wr_queue.size && 0 != @buf.size + end + + def epoll_run + if @wr_queue[0] + on_writable + else + KATO.delete self + on_readable + end + end + + def want_more + Server::ReRun << self + end + + def on_deferred_write_complete + :close == @state and return close + 0 == @buf.size ? on_readable : on_read("") + end + + def handle_error(e) + msg = Rainbows::Error.response(e) and kgio_trywrite(msg) rescue nil + ensure + close + end + + def write_deferred(obj) + Rainbows::StreamFile === obj ? stream_file(obj) : stream_pipe(obj) + end + + # writes until our write buffer is empty or we block + # returns true if we're done writing everything + def on_writable + obj = @wr_queue.shift + + case rv = String === obj ? kgio_trywrite(obj) : write_deferred(obj) + when nil + obj = @wr_queue.shift or return on_deferred_write_complete + when String + obj = rv # retry + when :wait_writable # Strings and StreamFiles only + @wr_queue.unshift(obj) + epoll_enable(OUT) + return + when :deferred + return + end while true + rescue => e + handle_error(e) + end + + # this returns an +Array+ write buffer if blocked + def write(buf) + unless @wr_queue[0] + case rv = kgio_trywrite(buf) + when nil + return # all written + when String + buf = rv # retry + when :wait_writable + epoll_enable(OUT) + break # queue + end while true + end + @wr_queue << buf.dup # >3-word 1.9 strings are copy-on-write + end + + def close + @wr_queue.each { |x| x.respond_to?(:close) and x.close rescue nil } + super + KATO.delete(self) + Server.decr + end + + def timeout! + close + true + end + + def defer_file(status, headers, body, alive, io, st) + if r = sendfile_range(status, headers) + status, headers, range = r + write_headers(status, headers, alive) + range and defer_file_stream(range[0], range[1], io, body) + else + write_headers(status, headers, alive) + defer_file_stream(0, st.size, io, body) + end + end + + # returns +nil+ on EOF, :wait_writable if the client blocks + def stream_file(sf) # +sf+ is a Rainbows::StreamFile object + begin + sf.offset += (n = sendfile_nonblock(sf, sf.offset, sf.count)) + 0 == (sf.count -= n) and return sf.close + rescue Errno::EAGAIN + return :wait_writable + rescue + sf.close + raise + end while true + end + + def defer_file_stream(offset, count, io, body) + sf = Rainbows::StreamFile.new(offset, count, io, body) + unless @wr_queue[0] + stream_file(sf) or return + end + @wr_queue << sf + epoll_enable(OUT) + end + + # this alternates between a push and pull model from the pipe -> client + # to avoid having too much data in userspace on either end. + def stream_pipe(pipe) + case buf = pipe.tryread + when String + if Array === write(buf) + # client is blocked on write, client will pull from pipe later + pipe.epoll_disable + @wr_queue << pipe + epoll_enable(OUT) + return :deferred + end + # continue looping... + when :wait_readable + # pipe blocked on read, let the pipe push to the client in the future + epoll_disable + pipe.epoll_enable(IN) + return :deferred + else # nil => EOF + return pipe.close # nil + end while true + rescue => e + pipe.close + raise + end +end diff --git a/lib/rainbows/epoll/response_chunk_pipe.rb b/lib/rainbows/epoll/response_chunk_pipe.rb new file mode 100644 index 0000000..3ad57a8 --- /dev/null +++ b/lib/rainbows/epoll/response_chunk_pipe.rb @@ -0,0 +1,18 @@ +# -*- encoding: binary -*- +# :enddoc: +# +class Rainbows::Epoll::ResponseChunkPipe < Rainbows::Epoll::ResponsePipe + def tryread + @io or return + + case rv = super + when String + "#{rv.size.to_s(16)}\r\n#{rv}\r\n" + when nil + close + "0\r\n\r\n" + else + rv + end + end +end diff --git a/lib/rainbows/epoll/response_pipe.rb b/lib/rainbows/epoll/response_pipe.rb new file mode 100644 index 0000000..ce240f5 --- /dev/null +++ b/lib/rainbows/epoll/response_pipe.rb @@ -0,0 +1,38 @@ +# -*- encoding: binary -*- +# :enddoc: +# +class Rainbows::Epoll::ResponsePipe + include Rainbows::Epoll::State + attr_reader :io + alias to_io io + IN = SleepyPenguin::Epoll::IN | SleepyPenguin::Epoll::ET + RBUF = Rainbows::EvCore::RBUF + + def initialize(io, client, body) + @io, @client, @body = io, client, body + @epoll_active = false + end + + def epoll_run + return close if @client.closed? + @client.stream_pipe(self) or @client.on_deferred_write_complete + rescue => e + close + @client.handle_error(e) + end + + def close + epoll_disable + @body.respond_to?(:close) and @body.close + @io = @body = nil + end + + def tryread + io = @io + io.respond_to?(:kgio_tryread) and return io.kgio_tryread(16384, RBUF) + io.read_nonblock(16384, RBUF) + rescue Errno::EAGAIN + :wait_readable + rescue EOFError + end +end diff --git a/lib/rainbows/epoll/server.rb b/lib/rainbows/epoll/server.rb new file mode 100644 index 0000000..4586c95 --- /dev/null +++ b/lib/rainbows/epoll/server.rb @@ -0,0 +1,43 @@ +# -*- encoding: binary -*- +# :nodoc: +module Rainbows::Epoll::Server + IN = SleepyPenguin::Epoll::IN | SleepyPenguin::Epoll::ET + @@nr = 0 + MAX = Rainbows.server.worker_connections + THRESH = MAX - 1 + include Rainbows::Epoll::State + LISTENERS = Rainbows::HttpServer::LISTENERS + ReRun = [] + + def self.extended(obj) + obj.instance_variable_set(:@epoll_active, false) + end + + def self.run + LISTENERS.each { |sock| sock.extend(self).epoll_enable(IN) } + begin + EP.wait(100, 1000) { |_, obj| obj.epoll_run } + while obj = ReRun.shift + obj.epoll_run + end + Rainbows::Epoll::Client.expire + rescue => e + Rainbows::Error.listen_loop(e) + end while Rainbows.tick || @@nr > 0 + end + + # rearms all listeners when there's a free slot + def self.decr + THRESH == (@@nr -= 1) and LISTENERS.each { |sock| sock.epoll_enable(IN) } + end + + def epoll_run + return epoll_disable if @@nr >= MAX + while io = kgio_tryaccept + @@nr += 1 + # there's a chance the client never even sees epoll for simple apps + io.epoll_once + return epoll_disable if @@nr >= MAX + end + end +end diff --git a/lib/rainbows/epoll/state.rb b/lib/rainbows/epoll/state.rb new file mode 100644 index 0000000..6e554be --- /dev/null +++ b/lib/rainbows/epoll/state.rb @@ -0,0 +1,22 @@ +# -*- encoding: binary -*- +# :enddoc: +# used to keep track of state for each descriptor and avoid +# unneeded syscall or ENONENT overhead +module Rainbows::Epoll::State + EP = SleepyPenguin::Epoll.new + + def epoll_disable + @epoll_active or return + @epoll_active = false + EP.del(self) + end + + def epoll_enable(flags) + if @epoll_active + flags == @epoll_active or + EP.mod(self, @epoll_active = flags) + else + EP.add(self, @epoll_active = flags) + end + end +end diff --git a/lib/rainbows/http_server.rb b/lib/rainbows/http_server.rb index 84d5a32..a9767bf 100644 --- a/lib/rainbows/http_server.rb +++ b/lib/rainbows/http_server.rb @@ -72,7 +72,7 @@ class Rainbows::HttpServer < Unicorn::HttpServer new_defaults = { 'rainbows.model' => (@use = model.to_sym), 'rack.multithread' => !!(model.to_s =~ /Thread/), - 'rainbows.autochunk' => [:Coolio,:Rev, + 'rainbows.autochunk' => [:Coolio,:Rev,:Epoll, :EventMachine,:NeverBlock].include?(@use), } Rainbows::Const::RACK_DEFAULTS.update(new_defaults) diff --git a/t/GNUmakefile b/t/GNUmakefile index 91e05f5..7b50944 100644 --- a/t/GNUmakefile +++ b/t/GNUmakefile @@ -19,6 +19,7 @@ endif RUBY_ENGINE := $(shell $(RUBY) -e 'puts((RUBY_ENGINE rescue "ruby"))') export RUBY_VERSION RUBY_ENGINE +models += Epoll models += WriterThreadPool models += WriterThreadSpawn models += ThreadPool diff --git a/t/kgio-pipe-response.ru b/t/kgio-pipe-response.ru index edd2aac..9c70d47 100644 --- a/t/kgio-pipe-response.ru +++ b/t/kgio-pipe-response.ru @@ -1,10 +1,18 @@ # must be run without Rack::Lint since that clobbers to_path use Rainbows::DevFdResponse run(lambda { |env| + io = case env["rainbows.model"].to_s + when /Fiber/ + Rainbows::Fiber::IO::Pipe + else + Kgio::Pipe + end.popen('cat random_blob', 'rb') + [ 200, { 'Content-Length' => ::File.stat('random_blob').size.to_s, 'Content-Type' => 'application/octet-stream', }, - Rainbows::Fiber::IO::Pipe.popen('cat random_blob', 'rb') ] + io + ] }) diff --git a/t/simple-http_Epoll.ru b/t/simple-http_Epoll.ru new file mode 100644 index 0000000..6513343 --- /dev/null +++ b/t/simple-http_Epoll.ru @@ -0,0 +1,9 @@ +use Rack::ContentLength +use Rack::ContentType +run lambda { |env| + if env['rack.multithread'] == false && env['rainbows.model'] == :Epoll + [ 200, {}, [ Thread.current.inspect << "\n" ] ] + else + raise env.inspect + end +} diff --git a/t/t0034-pipelined-pipe-response.sh b/t/t0034-pipelined-pipe-response.sh index 8346af9..6dff9ad 100755 --- a/t/t0034-pipelined-pipe-response.sh +++ b/t/t0034-pipelined-pipe-response.sh @@ -22,12 +22,13 @@ require "kcar" $stdin.binmode expect = ENV["random_blob_sha1"] kcar = Kcar::Response.new($stdin, {}) -3.times do +3.times do |i| nr = 0 status, headers, body = kcar.rack dig = Digest::SHA1.new body.each { |buf| dig << buf ; nr += buf.size } sha1 = dig.hexdigest + warn "[#{i}] nr: #{nr}" sha1 == expect or abort "mismatch: sha1=#{sha1} != expect=#{expect}" body.close end diff --git a/t/t0035-kgio-pipe-response.sh b/t/t0035-kgio-pipe-response.sh index 97c3f2a..c4b1096 100755 --- a/t/t0035-kgio-pipe-response.sh +++ b/t/t0035-kgio-pipe-response.sh @@ -2,7 +2,7 @@ . ./test-lib.sh test -r random_blob || die "random_blob required, run with 'make $0'" case $model in -*Fiber* ) ;; +*Fiber*|Epoll) ;; *) t_info "skipping $T since it's not compatible with $model" exit 0 diff --git a/t/t0113-rewindable-input-false.sh b/t/t0113-rewindable-input-false.sh index 1ab79bf..82b0fb7 100755 --- a/t/t0113-rewindable-input-false.sh +++ b/t/t0113-rewindable-input-false.sh @@ -3,6 +3,7 @@ skip_models EventMachine NeverBlock skip_models Rev RevThreadSpawn RevThreadPool skip_models Coolio CoolioThreadSpawn CoolioThreadPool +skip_models Epoll t_plan 4 "rewindable_input toggled to false" diff --git a/t/t0114-rewindable-input-true.sh b/t/t0114-rewindable-input-true.sh index 7e337ea..fd8561c 100755 --- a/t/t0114-rewindable-input-true.sh +++ b/t/t0114-rewindable-input-true.sh @@ -3,6 +3,7 @@ skip_models EventMachine NeverBlock skip_models Rev RevThreadSpawn RevThreadPool skip_models Coolio CoolioThreadSpawn CoolioThreadPool +skip_models Epoll t_plan 4 "rewindable_input toggled to true" diff --git a/t/test_isolate.rb b/t/test_isolate.rb index 9b0c026..f0f16f1 100644 --- a/t/test_isolate.rb +++ b/t/test_isolate.rb @@ -33,6 +33,8 @@ Isolate.now!(opts) do gem 'revactor', '0.1.5' gem 'rack-fiber_pool', '0.9.1' end + + gem 'sleepy_penguin', '1.2.0' end $stdout.reopen(old_out) -- 2.11.4.GIT