From 26e7c927f672e4a589e50387ec846c4d89a6b81e Mon Sep 17 00:00:00 2001 From: Ryan Dahl Date: Fri, 14 Mar 2008 20:19:38 +0100 Subject: [PATCH] Use rb_thread_select instead of rb_thread_schedule This cleans up the event loop processing greatly. I use rb_thread_select to watch for changes on every file descriptor before calling ev_loop(loop, EVLOOP_NONBLOCK); There is a 0.5 second timeout, to return to ruby for checking @running and signals. I still need to run benchmarks on this to make sure that it is still fast :) --- Rakefile | 8 +++- ruby_lib/ebb.rb | 131 +++++++++++++++++++++++++++++++------------------- src/ebb_ruby.c | 60 ++++++++++------------- test/basic_test.rb | 138 +++++++++++------------------------------------------ test/helper.rb | 63 ++++++++++++++++++++++++ 5 files changed, 205 insertions(+), 195 deletions(-) rewrite test/basic_test.rb (76%) create mode 100644 test/helper.rb diff --git a/Rakefile b/Rakefile index bfa61d4..fde9533 100644 --- a/Rakefile +++ b/Rakefile @@ -4,7 +4,7 @@ require 'rake/gempackagetask' require 'rake/clean' COMMON_DISTFILES = FileList.new('src/ebb.{c,h}', 'src/parser.{c,h}', - 'libev/*', 'VERSION', 'README') + 'libev/*', 'README') RUBY_DISTFILES = COMMON_DISTFILES + FileList.new('src/ebb_ruby.c', 'src/extconf.rb', 'ruby_lib/**/*', 'benchmark/*.rb', 'bin/ebb_rails', @@ -18,6 +18,10 @@ CLEAN.add ["**/*.{o,bundle,so,obj,pdb,lib,def,exp}", "benchmark/*.dump", CLOBBER.add ['src/Makefile', 'src/parser.c', 'src/mkmf.log', 'build'] +Rake::TestTask.new do |t| + t.test_files = FileList.new("test/*.rb") + t.verbose = true +end def dir(path) File.expand_path File.join(File.dirname(__FILE__), path) @@ -41,7 +45,7 @@ end task(:wc) { sh "wc -l ruby_lib/*.rb src/ebb*.{c,h}" } -task(:test => :compile) +task(:test => RUBY_DISTFILES) Rake::TestTask.new do |t| t.test_files = 'test/basic_test.rb' t.verbose = true diff --git a/ruby_lib/ebb.rb b/ruby_lib/ebb.rb index d6ad6d4..36578fc 100644 --- a/ruby_lib/ebb.rb +++ b/ruby_lib/ebb.rb @@ -1,6 +1,7 @@ # Ruby Binding to the Ebb Web Server # Copyright (c) 2008 Ry Dahl. This software is released under the MIT License. # See README file for details. +require 'stringio' module Ebb LIBDIR = File.dirname(__FILE__) require Ebb::LIBDIR + '/../src/ebb_ext' @@ -17,26 +18,68 @@ module Ebb Client::BASE_ENV['rack.multithread'] = threaded_processing FFI::server_listen_on_port(port) - - puts "Ebb listening at http://0.0.0.0:#{port}/ (#{threaded_processing ? 'threaded' : 'sequential'} processing)" - trap('INT') { @running = false } @running = true + #trap('INT') { stop_server } + + puts "Ebb listening at http://0.0.0.0:#{port}/ (#{threaded_processing ? 'threaded' : 'sequential'} processing, PID #{Process.pid})" while @running FFI::server_process_connections() while client = FFI::waiting_clients.shift if threaded_processing - Thread.new(client) { |c| c.process(app) } + Thread.new(client) { |c| process(app, c) } else - client.process(app) + process(app, client) end end end - - puts "Ebb unlistening" FFI::server_unlisten() end + def self.running? + FFI::server_open? + end + + def self.stop_server() + @running = false + end + + def self.process(app, client) + begin + status, headers, body = app.call(client.env) + rescue + raise if $DEBUG + status = 500 + headers = {'Content-Type' => 'text/plain'} + body = "Internal Server Error\n" + end + + client.write_status(status) + + if headers.respond_to?(:[]=) and body.respond_to?(:length) and status != 304 + headers['Connection'] = 'close' + headers['Content-Length'] = body.length.to_s + end + + headers.each { |field, value| client.write_header(field, value) } + client.write("\r\n") + + if body.kind_of?(String) + client.write(body) + client.body_written() + client.begin_transmission() + else + client.begin_transmission() + client.body.each { |p| write(p) } + client.body_written() + end + rescue => e + puts "Ebb Error! #{e.class} #{e.message}" + puts e.backtrace.join("\n") + ensure + client.release + end + # This array is created and manipulated in the C extension. def FFI.waiting_clients @waiting_clients @@ -55,50 +98,17 @@ module Ebb 'rack.run_once' => false } - def process(app) - begin - status, headers, body = app.call(env) - rescue - raise if $DEBUG - status = 500 - headers = {'Content-Type' => 'text/plain'} - body = "Internal Server Error\n" - end - - status = status.to_i - FFI::client_write_status(self, status, HTTP_STATUS_CODES[status]) - - if headers.respond_to?(:[]=) and body.respond_to?(:length) and status != 304 - headers['Connection'] = 'close' - headers['Content-Length'] = body.length.to_s - end - - headers.each { |field, value| write_header(field, value) } - write("\r\n") - - if body.kind_of?(String) - write(body) - body_written() - begin_transmission() - else - begin_transmission() - body.each { |p| write(p) } - body_written() - end - rescue => e - puts "Error! #{e.class} #{e.message}" - ensure - FFI::client_release(self) - end - - private - def env env = FFI::client_env(self).update(BASE_ENV) env['rack.input'] = RequestBody.new(self) env end + def write_status(status) + s = status.to_i + FFI::client_write_status(self, s, HTTP_STATUS_CODES[s]) + end + def write(data) FFI::client_write(self, data) end @@ -116,6 +126,10 @@ module Ebb def begin_transmission FFI::client_begin_transmission(self) end + + def release + FFI::client_release(self) + end end class RequestBody @@ -123,19 +137,36 @@ module Ebb @client = client end - def read(len) - FFI::client_read_input(@client, len) + def read(len = nil) + if @io + @io.read(len) + else + if len.nil? + s = '' + while(chunk = read(10*1024)) do + s << chunk + end + s + else + FFI::client_read_input(@client, len) + end + end end def gets - raise NotImplementedError + io.gets + end + + def each(&block) + io.each(&block) end - def each - raise NotImplementedError + def io + @io ||= StringIO.new(read) end end + HTTP_STATUS_CODES = { 100 => 'Continue', 101 => 'Switching Protocols', diff --git a/src/ebb_ruby.c b/src/ebb_ruby.c index 4913cd8..551f2f8 100644 --- a/src/ebb_ruby.c +++ b/src/ebb_ruby.c @@ -28,6 +28,7 @@ static VALUE global_http_host; */ static ebb_server *server; struct ev_loop *loop; +static unsigned int client_count = 0; /* Variables with a leading underscore are C-level variables */ @@ -42,6 +43,7 @@ void request_cb(ebb_client *client, void *data) VALUE waiting_clients = (VALUE)data; VALUE rb_client = Data_Wrap_Struct(cClient, 0, 0, client); rb_ary_push(waiting_clients, rb_client); + client_count++; } VALUE server_listen_on_port(VALUE _, VALUE port) @@ -51,46 +53,35 @@ VALUE server_listen_on_port(VALUE _, VALUE port) return Qnil; } -static void -oneshot_timeout (struct ev_loop *loop, struct ev_timer *w, int revents) {;} +VALUE server_open(VALUE _) +{ + return server->open ? Qtrue : Qfalse; +} + VALUE server_process_connections(VALUE _) { - /* This function is super hacky. The libev loop is called for one iteration - * this means that any pending events are handled. If no events exist then - * the function blocks. We want blocking so that the while loop in ruby - * doesn't race away - however there is a need to continue to process other - * ruby threads which are running. While this function is being called - * other ruby threads cannot execute. - * So we set this timeout event which breaks the block after 0.1 seconds. - * Additionally we make sure that other threads get enough processing time - * by calling rb_thread_schedule() many times. - * - * Instead we should probably use rb_thread_select on server->fd when no - * clients are in_use? Whatever happens here, one should make sure the - * 'wait' benchmark is running as quickly with Ebb as it does with mongrel. - */ - ev_timer timeout; - ev_timer_init (&timeout, oneshot_timeout, 0.1, 0.); - ev_timer_start (loop, &timeout); - ev_loop(loop, EVLOOP_ONESHOT); + int fd_count = 0, max_fd = 0; + struct timeval tv = { tv_sec: 0, tv_usec: 500000 }; + int i; - /* remove the timeout event so that it isn't called immediately the next - * time around (since 0.1 seconds will have passed) - */ - ev_timer_stop(loop, &timeout); + fd_set fds; FD_ZERO(&fds); - /* Call rb_thread_schedule() proportional to the number of rb threads running */ - /* SO HACKY! Anyone have a better way to do this? */ - int i; - for(i = 0; i < EBB_MAX_CLIENTS; i++) - if(server->clients[i].in_use) - rb_thread_schedule(); + FD_SET(server->fd, &fds); fd_count++; + for(i = 0; i < EBB_MAX_CLIENTS; i++) { + ebb_client *client = &server->clients[i]; + if(client->open) { + FD_SET(client->fd, &fds); fd_count++; + if(client->fd > max_fd) max_fd = client->fd; + } + } - if(server->open) - return Qtrue; - else - return Qfalse; + unsigned int last_client_count = client_count; + ev_loop(loop, EVLOOP_NONBLOCK); + if(last_client_count == client_count) { + rb_thread_select(max_fd+1, &fds, &fds, &fds, &tv); + ev_loop(loop, EVLOOP_NONBLOCK); + } } @@ -259,6 +250,7 @@ void Init_ebb_ext() rb_define_singleton_method(mFFI, "server_process_connections", server_process_connections, 0); rb_define_singleton_method(mFFI, "server_listen_on_port", server_listen_on_port, 1); rb_define_singleton_method(mFFI, "server_unlisten", server_unlisten, 0); + rb_define_singleton_method(mFFI, "server_open?", server_open, 0); cClient = rb_define_class_under(mEbb, "Client", rb_cObject); rb_define_singleton_method(mFFI, "client_read_input", client_read_input, 2); diff --git a/test/basic_test.rb b/test/basic_test.rb dissimilarity index 76% index cc7951e..cea9dc9 100644 --- a/test/basic_test.rb +++ b/test/basic_test.rb @@ -1,109 +1,29 @@ -require File.dirname(__FILE__) + '/../ruby_lib/ebb' -require 'test/unit' -require 'net/http' -require 'socket' -require 'rubygems' -require 'json' - -PORT = 4044 - -class EbbTest < Test::Unit::TestCase - def setup - @pid = fork do - STDOUT.reopen "/dev/null", "a" - server = Ebb::start_server(self, :port => PORT) - end - sleep 0.5 - end - - def teardown - Process.kill('KILL', @pid) - sleep 0.5 - end - - def get(path) - Net::HTTP.get_response(URI.parse("http://0.0.0.0:#{PORT}#{path}")) - end - - def post(path, data) - Net::HTTP.post_form(URI.parse("http://0.0.0.0:#{PORT}#{path}"), data) - end - - @@responses = {} - def call(env) - commands = env['PATH_INFO'].split('/') - - if commands.include?('bytes') - n = commands.last.to_i - raise "bytes called with n <= 0" if n <= 0 - body = @@responses[n] || "C"*n - status = 200 - - elsif commands.include?('test_post_length') - input_body = "" - while chunk = env['rack.input'].read(512) - input_body << chunk - end - - content_length_header = env['HTTP_CONTENT_LENGTH'].to_i - - if content_length_header == input_body.length - body = "Content-Length matches input length" - status = 200 - else - body = "Content-Length header is #{content_length_header} but body length is #{input_body.length}" - # content_length = #{env['HTTP_CONTENT_LENGTH'].to_i} - # input_body.length = #{input_body.length}" - status = 500 - end - - else - status = 404 - body = "Undefined url" - end - - [status, {'Content-Type' => 'text/plain'}, body] - end - - def test_get_bytes - [1,10,1000].each do |i| - response = get("/bytes/#{i}") - assert_equal "#{'C'*i.to_i}", response.body - end - end - - def test_get_unknown - response = get('/blah') - assert_equal "Undefined url", response.body - end - - def test_small_posts - [1,10,321,123,1000].each do |i| - response = post("/test_post_length", 'C'*i) - assert_equal 200, response.code.to_i, response.body - end - end - - # this is rough but does detect major problems - def test_ab - r = %x{ab -n 1000 -c 50 -q http://0.0.0.0:#{PORT}/bytes/123} - assert r =~ /Requests per second:\s*(\d+)/, r - assert $1.to_i > 100, r - end - - def test_large_post - [50,60,100].each do |i| - response = post("/test_post_length", 'C'*1024*i) - assert_equal 200, response.code.to_i, response.body - end - end -end - - -class EbbRailsTest < Test::Unit::TestCase - # just to make sure there isn't some load error - def test_ebb_rails_version - out = %x{ruby #{Ebb::LIBDIR}/../bin/ebb_rails -v} - assert_match %r{Ebb #{Ebb::VERSION}}, out - end -end \ No newline at end of file +require File.dirname(__FILE__) + '/helper' + +class BasicTest < ServerTest + def test_get_bytes + [1,10,1000].each do |i| + response = get("/bytes/#{i}") + assert_equal "#{'C'*i.to_i}", response.body + end + end + + def test_get_unknown + response = get('/blah') + assert_equal "Undefined url", response.body + end + + def test_small_posts + [1,10,321,123,1000].each do |i| + response = post("/test_post_length", 'C'*i) + assert_equal 200, response.code.to_i, response.body + end + end + + def test_large_post + [50,60,100].each do |i| + response = post("/test_post_length", 'C'*1024*i) + assert_equal 200, response.code.to_i, response.body + end + end +end diff --git a/test/helper.rb b/test/helper.rb new file mode 100644 index 0000000..eecb37a --- /dev/null +++ b/test/helper.rb @@ -0,0 +1,63 @@ +require 'rubygems' +require File.dirname(__FILE__) + '/../ruby_lib/ebb' +require 'test/unit' +require 'net/http' +require 'socket' +require 'rubygems' +require 'json' + +include Ebb + +TEST_PORT = 4044 + +def get(path) + Net::HTTP.get_response(URI.parse("http://0.0.0.0:#{TEST_PORT}#{path}")) +end + +def post(path, data) + Net::HTTP.post_form(URI.parse("http://0.0.0.0:#{TEST_PORT}#{path}"), data) +end + +class HelperApp + def call(env) + commands = env['PATH_INFO'].split('/') + + if commands.include?('bytes') + n = commands.last.to_i + raise "bytes called with n <= 0" if n <= 0 + body = "C"*n + status = 200 + + elsif commands.include?('test_post_length') + input_body = env['rack.input'].read + + content_length_header = env['HTTP_CONTENT_LENGTH'].to_i + + if content_length_header == input_body.length + body = "Content-Length matches input length" + status = 200 + else + body = "Content-Length header is #{content_length_header} but body length is #{input_body.length}" + status = 500 + end + + else + status = 404 + body = "Undefined url" + end + + [status, {'Content-Type' => 'text/plain'}, body] + end +end + +class ServerTest < Test::Unit::TestCase + def setup + Thread.new { Ebb.start_server(HelperApp.new, :port => TEST_PORT) } + sleep 0.1 until Ebb.running? + end + + def teardown + Ebb.stop_server + sleep 0.1 while Ebb.running? + end +end -- 2.11.4.GIT