From 6c818b0b6f76ef733679bcea1024142b4ef3ce00 Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Mon, 27 Sep 2010 01:13:30 +0000 Subject: [PATCH] add kgio_tryaccept, kgio_accept _really_ blocks We'll stick with the "try" prefix if we're going to be non-blocking. kgio_accept will favor a blocking accept() call where it's possible to release the GVL, allowing it to avoid thundering herd problems. Otherwise it'll use thread-safe blocking under Ruby 1.8. --- ext/kgio/extconf.rb | 1 + ext/kgio/kgio_ext.c | 150 +++++++++++++++++---- test/lib_server_accept.rb | 70 ++++++++++ test/test_tcp_client_read_server_write.rb | 2 +- test/test_tcp_server.rb | 47 +++---- test/test_tcp_server_read_client_write.rb | 2 +- test/test_unix_client_read_server_write.rb | 2 +- ...nt_read_server_write.rb => test_unix_server.rb} | 15 ++- test/test_unix_server_read_client_write.rb | 2 +- 9 files changed, 222 insertions(+), 69 deletions(-) create mode 100644 test/lib_server_accept.rb rewrite test/test_tcp_server.rb (69%) copy test/{test_unix_client_read_server_write.rb => test_unix_server.rb} (50%) diff --git a/ext/kgio/extconf.rb b/ext/kgio/extconf.rb index 2eb35f7..09d710b 100644 --- a/ext/kgio/extconf.rb +++ b/ext/kgio/extconf.rb @@ -14,6 +14,7 @@ else have_func('rb_fdopen') end have_func('rb_io_ascii8bit_binmode') +have_func('rb_thread_blocking_region') dir_config('kgio') create_makefile('kgio_ext') diff --git a/ext/kgio/kgio_ext.c b/ext/kgio/kgio_ext.c index 7818425..faa25ff 100644 --- a/ext/kgio/kgio_ext.c +++ b/ext/kgio/kgio_ext.c @@ -45,6 +45,12 @@ struct io_args { int fd; }; +struct accept_args { + int fd; + struct sockaddr *addr; + socklen_t *addrlen; +}; + static void wait_readable(VALUE io) { if (io_wait_rd) { @@ -337,23 +343,94 @@ static VALUE wait_rd(VALUE mod) return io_wait_rd ? ID2SYM(io_wait_rd) : Qnil; } +static VALUE xaccept(void *ptr) +{ + struct accept_args *a = ptr; + + return (VALUE)accept4(a->fd, a->addr, a->addrlen, accept4_flags); +} + +#ifdef HAVE_RB_THREAD_BLOCKING_REGION +# include +static int thread_accept(struct accept_args *a, int force_nonblock) +{ + if (force_nonblock) + set_nonblocking(a->fd); + return (int)rb_thread_blocking_region(xaccept, a, RUBY_UBF_IO, 0); +} + +/* + * Try to use a (real) blocking accept() since that can prevent + * thundering herds under Linux: + * http://www.citi.umich.edu/projects/linux-scalability/reports/accept.html + * + * So we periodically disable non-blocking, but not too frequently + * because other processes may set non-blocking (especially during + * a process upgrade) with Rainbows! concurrency model changes. + */ +static void set_blocking_or_block(int fd) +{ + static time_t last_set_blocking; + time_t now = time(NULL); + + if (last_set_blocking == 0) { + last_set_blocking = now; + (void)rb_io_wait_readable(fd); + } else if ((now - last_set_blocking) <= 5) { + (void)rb_io_wait_readable(fd); + } else { + int flags = fcntl(fd, F_GETFL); + if (flags == -1) + rb_sys_fail("fcntl(F_GETFL)"); + if (flags & O_NONBLOCK) { + flags = fcntl(fd, F_SETFL, flags & ~O_NONBLOCK); + if (flags == -1) + rb_sys_fail("fcntl(F_SETFL)"); + } + last_set_blocking = now; + } +} +#else /* ! HAVE_RB_THREAD_BLOCKING_REGION */ +# include +static int thread_accept(struct accept_args *a, int force_nonblock) +{ + int rv; + + /* always use non-blocking accept() under 1.8 for green threads */ + set_nonblocking(a->fd); + TRAP_BEG; + rv = (int)xaccept(a); + TRAP_END; + return rv; +} +#define set_blocking_or_block(fd) (void)rb_io_wait_readable(fd) +#endif /* ! HAVE_RB_THREAD_BLOCKING_REGION */ + static VALUE -my_accept(int sockfd, struct sockaddr *addr, socklen_t *addrlen) +my_accept(VALUE io, struct sockaddr *addr, socklen_t *addrlen, int nonblock) { int client; + struct accept_args a; + a.fd = my_fileno(io); + a.addr = addr; + a.addrlen = addrlen; retry: - client = accept4(sockfd, addr, addrlen, accept4_flags); + client = thread_accept(&a, nonblock); if (client == -1) { switch (errno) { case EAGAIN: + if (nonblock) + return Qnil; + set_blocking_or_block(a.fd); #ifdef ECONNABORTED case ECONNABORTED: #endif /* ECONNABORTED */ #ifdef EPROTO case EPROTO: #endif /* EPROTO */ - return Qnil; + case EINTR: + goto retry; case ENOMEM: case EMFILE: case ENFILE: @@ -362,50 +439,65 @@ retry: #endif /* ENOBUFS */ errno = 0; rb_gc(); - client = accept4(sockfd, addr, addrlen, accept4_flags); - break; - case EINTR: - goto retry; + client = thread_accept(&a, nonblock); } - if (client == -1) + if (client == -1) { + if (errno == EINTR) + goto retry; rb_sys_fail("accept"); + } } return sock_for_fd(cSocket, client); } -/* non-blocking flag should be set on this socket before accept() is called */ -static VALUE unix_accept(VALUE io) +static void in_addr_set(VALUE io, struct sockaddr_in *addr) { - int fd = my_fileno(io); - VALUE rv = my_accept(fd, NULL, NULL); + VALUE host = rb_str_new(0, INET_ADDRSTRLEN); + socklen_t addrlen = (socklen_t)INET_ADDRSTRLEN; + const char *name; - if (! NIL_P(rv)) - rb_ivar_set(rv, iv_kgio_addr, localhost); + name = inet_ntop(AF_INET, &addr->sin_addr, RSTRING_PTR(host), addrlen); + if (name == NULL) + rb_sys_fail("inet_ntop"); + rb_str_set_len(host, strlen(name)); + rb_ivar_set(io, iv_kgio_addr, host); +} +static VALUE tcp_tryaccept(VALUE io) +{ + struct sockaddr_in addr; + socklen_t addrlen = sizeof(struct sockaddr_in); + VALUE rv = my_accept(io, (struct sockaddr *)&addr, &addrlen, 1); + + if (!NIL_P(rv)) + in_addr_set(rv, &addr); return rv; } -/* non-blocking flag should be set on this socket before accept() is called */ static VALUE tcp_accept(VALUE io) { - int fd = my_fileno(io); struct sockaddr_in addr; socklen_t addrlen = sizeof(struct sockaddr_in); - VALUE host; - const char *name; - VALUE rv = my_accept(fd, (struct sockaddr *)&addr, &addrlen); + VALUE rv = my_accept(io, (struct sockaddr *)&addr, &addrlen, 0); - if (NIL_P(rv)) - return rv; + in_addr_set(rv, &addr); + return rv; +} - host = rb_str_new(0, INET_ADDRSTRLEN); - addrlen = (socklen_t)INET_ADDRSTRLEN; - name = inet_ntop(AF_INET, &addr.sin_addr, RSTRING_PTR(host), addrlen); - if (name == NULL) - rb_sys_fail("inet_ntop"); - rb_str_set_len(host, strlen(name)); - rb_ivar_set(rv, iv_kgio_addr, host); +static VALUE unix_tryaccept(VALUE io) +{ + VALUE rv = my_accept(io, NULL, NULL, 1); + + if (!NIL_P(rv)) + rb_ivar_set(rv, iv_kgio_addr, localhost); + return rv; +} + +static VALUE unix_accept(VALUE io) +{ + VALUE rv = my_accept(io, NULL, NULL, 0); + rb_ivar_set(rv, iv_kgio_addr, localhost); return rv; } @@ -697,10 +789,12 @@ void Init_kgio_ext(void) cUNIXServer = rb_const_get(rb_cObject, rb_intern("UNIXServer")); cUNIXServer = rb_define_class_under(mKgio, "UNIXServer", cUNIXServer); + rb_define_method(cUNIXServer, "kgio_tryaccept", unix_tryaccept, 0); rb_define_method(cUNIXServer, "kgio_accept", unix_accept, 0); cTCPServer = rb_const_get(rb_cObject, rb_intern("TCPServer")); cTCPServer = rb_define_class_under(mKgio, "TCPServer", cTCPServer); + rb_define_method(cTCPServer, "kgio_tryaccept", tcp_tryaccept, 0); rb_define_method(cTCPServer, "kgio_accept", tcp_accept, 0); cTCPSocket = rb_const_get(rb_cObject, rb_intern("TCPSocket")); diff --git a/test/lib_server_accept.rb b/test/lib_server_accept.rb new file mode 100644 index 0000000..1e6bf24 --- /dev/null +++ b/test/lib_server_accept.rb @@ -0,0 +1,70 @@ +require 'test/unit' +require 'io/nonblock' +$-w = true +require 'kgio' + +module LibServerAccept + + def teardown + @srv.close unless @srv.closed? + Kgio.accept_cloexec = true + Kgio.accept_nonblock = false + end + + def test_tryaccept_success + a = client_connect + IO.select([@srv]) + b = @srv.kgio_tryaccept + assert_kind_of Kgio::Socket, b + assert_equal @host, b.kgio_addr + end + + def test_tryaccept_fail + assert_equal nil, @srv.kgio_tryaccept + end + + def test_blocking_accept + t0 = Time.now + pid = fork { sleep 1; a = client_connect; sleep } + b = @srv.kgio_accept + elapsed = Time.now - t0 + assert_kind_of Kgio::Socket, b + assert_equal @host, b.kgio_addr + Process.kill(:TERM, pid) + Process.waitpid(pid) + assert elapsed >= 1, "elapsed: #{elapsed}" + end + + def test_blocking_accept_with_nonblock_socket + @srv.nonblock = true + t0 = Time.now + pid = fork { sleep 1; a = client_connect; sleep } + b = @srv.kgio_accept + elapsed = Time.now - t0 + assert_kind_of Kgio::Socket, b + assert_equal @host, b.kgio_addr + Process.kill(:TERM, pid) + Process.waitpid(pid) + assert elapsed >= 1, "elapsed: #{elapsed}" + + t0 = Time.now + pid = fork { sleep 6; a = client_connect; sleep } + b = @srv.kgio_accept + elapsed = Time.now - t0 + assert_kind_of Kgio::Socket, b + assert_equal @host, b.kgio_addr + Process.kill(:TERM, pid) + Process.waitpid(pid) + assert elapsed >= 6, "elapsed: #{elapsed}" + + t0 = Time.now + pid = fork { sleep 1; a = client_connect; sleep } + b = @srv.kgio_accept + elapsed = Time.now - t0 + assert_kind_of Kgio::Socket, b + assert_equal @host, b.kgio_addr + Process.kill(:TERM, pid) + Process.waitpid(pid) + assert elapsed >= 1, "elapsed: #{elapsed}" + end +end diff --git a/test/test_tcp_client_read_server_write.rb b/test/test_tcp_client_read_server_write.rb index 13714e9..6e97321 100644 --- a/test/test_tcp_client_read_server_write.rb +++ b/test/test_tcp_client_read_server_write.rb @@ -6,7 +6,7 @@ class TesTcpClientReadServerWrite < Test::Unit::TestCase @srv = Kgio::TCPServer.new(@host, 0) @port = @srv.addr[1] @wr = Kgio::TCPSocket.new(@host, @port) - @rd = @srv.kgio_accept + @rd = @srv.kgio_tryaccept end include LibReadWriteTest diff --git a/test/test_tcp_server.rb b/test/test_tcp_server.rb dissimilarity index 69% index c2bb518..eb6933e 100644 --- a/test/test_tcp_server.rb +++ b/test/test_tcp_server.rb @@ -1,31 +1,16 @@ -require 'test/unit' -require 'io/nonblock' -$-w = true -require 'kgio' - -class TestKgioTCPServer < Test::Unit::TestCase - - def setup - @host = ENV["TEST_HOST"] || '127.0.0.1' - @srv = Kgio::TCPServer.new(@host, 0) - @port = @srv.addr[1] - end - - def teardown - @srv.close unless @srv.closed? - Kgio.accept_cloexec = true - Kgio.accept_nonblock = false - end - - def test_accept - a = TCPSocket.new(@host, @port) - b = @srv.kgio_accept - assert_kind_of Kgio::Socket, b - assert_equal @host, b.kgio_addr - end - - def test_accept_nonblock - @srv.nonblock = true - assert_equal nil, @srv.kgio_accept - end -end +require './test/lib_server_accept' + +class TestKgioTCPServer < Test::Unit::TestCase + + def setup + @host = ENV["TEST_HOST"] || '127.0.0.1' + @srv = Kgio::TCPServer.new(@host, 0) + @port = @srv.addr[1] + end + + def client_connect + TCPSocket.new(@host, @port) + end + + include LibServerAccept +end diff --git a/test/test_tcp_server_read_client_write.rb b/test/test_tcp_server_read_client_write.rb index 68cada3..8a67917 100644 --- a/test/test_tcp_server_read_client_write.rb +++ b/test/test_tcp_server_read_client_write.rb @@ -6,7 +6,7 @@ class TesTcpServerReadClientWrite < Test::Unit::TestCase @srv = Kgio::TCPServer.new(@host, 0) @port = @srv.addr[1] @wr = Kgio::TCPSocket.new(@host, @port) - @rd = @srv.kgio_accept + @rd = @srv.kgio_tryaccept end include LibReadWriteTest diff --git a/test/test_unix_client_read_server_write.rb b/test/test_unix_client_read_server_write.rb index cf2c5f1..0f8e55b 100644 --- a/test/test_unix_client_read_server_write.rb +++ b/test/test_unix_client_read_server_write.rb @@ -9,7 +9,7 @@ class TestUnixServerReadClientWrite < Test::Unit::TestCase tmp.close rescue nil @srv = Kgio::UNIXServer.new(@path) @rd = Kgio::UNIXSocket.new(@path) - @wr = @srv.kgio_accept + @wr = @srv.kgio_tryaccept end include LibReadWriteTest diff --git a/test/test_unix_client_read_server_write.rb b/test/test_unix_server.rb similarity index 50% copy from test/test_unix_client_read_server_write.rb copy to test/test_unix_server.rb index cf2c5f1..faa8209 100644 --- a/test/test_unix_client_read_server_write.rb +++ b/test/test_unix_server.rb @@ -1,17 +1,20 @@ -require './test/lib_read_write' require 'tempfile' +require './test/lib_server_accept' + +class TestKgioUNIXServer < Test::Unit::TestCase -class TestUnixServerReadClientWrite < Test::Unit::TestCase def setup tmp = Tempfile.new('kgio_unix') @path = tmp.path File.unlink(@path) tmp.close rescue nil @srv = Kgio::UNIXServer.new(@path) - @rd = Kgio::UNIXSocket.new(@path) - @wr = @srv.kgio_accept + @host = '127.0.0.1' end - include LibReadWriteTest -end + def client_connect + UNIXSocket.new(@path) + end + include LibServerAccept +end diff --git a/test/test_unix_server_read_client_write.rb b/test/test_unix_server_read_client_write.rb index 532989e..db304a2 100644 --- a/test/test_unix_server_read_client_write.rb +++ b/test/test_unix_server_read_client_write.rb @@ -9,7 +9,7 @@ class TestUnixServerReadClientWrite < Test::Unit::TestCase tmp.close rescue nil @srv = Kgio::UNIXServer.new(@path) @wr = Kgio::UNIXSocket.new(@path) - @rd = @srv.kgio_accept + @rd = @srv.kgio_tryaccept end include LibReadWriteTest -- 2.11.4.GIT