From f61cef65b8a8816160c622324b4f1aad55034e4a Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Thu, 27 Dec 2012 01:16:56 +0000 Subject: [PATCH] implement TCP Fast Open support (client + server) Server support just requires exposing one constant for setsockopt: Kgio::TCP_FASTOPEN Client support implements a new Kgio::Socket#fastopen method. This new method wraps the the sendto() syscall. With TCP Fast Open, the sendto() syscall is overloaded for stream sockets to implement the functionality of both connect() + write() Since it only makes sense to use _blocking_ I/O for sendto(), TFO clients are only supported in Ruby implementations with native threads. --- ext/kgio/connect.c | 80 +++++++++++++++++++++++++++++++++++++++++++++++------ ext/kgio/kgio.h | 23 +++++++++++++++ ext/kgio/kgio_ext.c | 35 +++++++++++++++++++++++ test/test_tfo.rb | 70 ++++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 200 insertions(+), 8 deletions(-) create mode 100644 test/test_tfo.rb diff --git a/ext/kgio/connect.c b/ext/kgio/connect.c index 42ab44c..21b3f7c 100644 --- a/ext/kgio/connect.c +++ b/ext/kgio/connect.c @@ -1,5 +1,7 @@ #include "kgio.h" +#include "my_fileno.h" #include "sock_for_fd.h" +#include "blocking_io_region.h" static void close_fail(int fd, const char *msg) { @@ -131,6 +133,72 @@ static VALUE tcp_connect(VALUE klass, VALUE ip, VALUE port, int io_wait) &addr, hints.ai_addrlen); } +static struct sockaddr *sockaddr_from(socklen_t *addrlen, VALUE addr) +{ + if (TYPE(addr) == T_STRING) { + *addrlen = (socklen_t)RSTRING_LEN(addr); + return (struct sockaddr *)(RSTRING_PTR(addr)); + } + rb_raise(rb_eTypeError, "invalid address"); + return NULL; +} + +#if defined(MSG_FASTOPEN) && defined(HAVE_RB_THREAD_BLOCKING_REGION) +#ifndef HAVE_RB_STR_SUBSEQ +#define rb_str_subseq rb_str_substr +#endif +struct tfo_args { + int fd; + void *buf; + size_t buflen; + struct sockaddr *addr; + socklen_t addrlen; +}; + +static VALUE tfo_sendto(void *_a) +{ + struct tfo_args *a = _a; + ssize_t w; + + w = sendto(a->fd, a->buf, a->buflen, MSG_FASTOPEN, a->addr, a->addrlen); + return (VALUE)w; +} + +/* + * call-seq: + * + * s = Kgio::Socket.new(:INET, :STREAM) + * addr = Socket.pack_sockaddr_in("example.com", 80) + * s.fastopen("hello world", addr) -> nil + * + * Starts a TCP connection using TCP Fast Open. This uses a blocking + * sendto() syscall and is only available on Ruby 1.9 or later. + * This raises exceptions (including Errno::EINPROGRESS/Errno::EAGAIN) + * on errors. Using this is only recommended for blocking sockets. + * s.setsockopt(:SOCKET, :SNDTIMEO, [1,0].pack("l_l_")) + */ +static VALUE fastopen(VALUE sock, VALUE buf, VALUE addr) +{ + struct tfo_args a; + VALUE str = (TYPE(buf) == T_STRING) ? buf : rb_obj_as_string(buf); + ssize_t w; + + a.fd = my_fileno(sock); + a.buf = RSTRING_PTR(str); + a.buflen = (size_t)RSTRING_LEN(str); + a.addr = sockaddr_from(&a.addrlen, addr); + + /* n.b. rb_thread_blocking_region preserves errno */ + w = (ssize_t)rb_thread_io_blocking_region(tfo_sendto, &a, a.fd); + if (w < 0) + rb_sys_fail("sendto"); + if ((size_t)w == a.buflen) + return Qnil; + + return rb_str_subseq(str, w, a.buflen - w); +} +#endif /* MSG_FASTOPEN */ + /* * call-seq: * @@ -225,14 +293,8 @@ static VALUE stream_connect(VALUE klass, VALUE addr, int io_wait) { int domain; socklen_t addrlen; - struct sockaddr *sockaddr; + struct sockaddr *sockaddr = sockaddr_from(&addrlen, addr); - if (TYPE(addr) == T_STRING) { - sockaddr = (struct sockaddr *)(RSTRING_PTR(addr)); - addrlen = (socklen_t)RSTRING_LEN(addr); - } else { - rb_raise(rb_eTypeError, "invalid address"); - } switch (((struct sockaddr_storage *)(sockaddr))->ss_family) { case AF_UNIX: domain = PF_UNIX; break; case AF_INET: domain = PF_INET; break; @@ -316,7 +378,9 @@ void init_kgio_connect(void) rb_define_singleton_method(cKgio_Socket, "new", kgio_new, -1); rb_define_singleton_method(cKgio_Socket, "connect", kgio_connect, 1); rb_define_singleton_method(cKgio_Socket, "start", kgio_start, 1); - +#if defined(MSG_FASTOPEN) && defined(HAVE_RB_THREAD_BLOCKING_REGION) + rb_define_method(cKgio_Socket, "fastopen", fastopen, 2); +#endif /* * Document-class: Kgio::TCPSocket * diff --git a/ext/kgio/kgio.h b/ext/kgio/kgio.h index fcdf0fe..983280d 100644 --- a/ext/kgio/kgio.h +++ b/ext/kgio/kgio.h @@ -49,4 +49,27 @@ VALUE kgio_call_wait_readable(VALUE io); #ifndef HAVE_RB_UPDATE_MAX_FD # define rb_update_max_fd(fd) for (;0;) #endif + +/* + * 2012/12/13 - Linux 3.7 was released on 2012/12/10 with TFO. + * Headers distributed with glibc will take some time to catch up and + * be officially released. Most GNU/Linux distros will take a few months + * to a year longer. "Enterprise" distros will probably take 5-7 years. + * So keep these until 2017 at least... + */ +#ifdef __linux__ +# ifndef MSG_FASTOPEN +# define MSG_FASTOPEN 0x20000000 /* for clients */ +# endif +# ifndef TCP_FASTOPEN +# define TCP_FASTOPEN 23 /* for listeners */ +# endif + /* we _may_ have TFO support */ +# define KGIO_TFO_MAYBE (1) +#else /* rely entirely on standard system headers */ +# define KGIO_TFO_MAYBE (0) +#endif + +extern unsigned kgio_tfo; + #endif /* KGIO_H */ diff --git a/ext/kgio/kgio_ext.c b/ext/kgio/kgio_ext.c index 2365fd0..03b30e5 100644 --- a/ext/kgio/kgio_ext.c +++ b/ext/kgio/kgio_ext.c @@ -1,7 +1,42 @@ #include "kgio.h" +#include +#include +/* true if TCP Fast Open is usable */ +unsigned kgio_tfo; + +static void tfo_maybe(void) +{ + VALUE mKgio = rb_define_module("Kgio"); + + /* Deal with the case where system headers have not caught up */ + if (KGIO_TFO_MAYBE) { + /* Ensure Linux 3.7 or later for TCP_FASTOPEN */ + struct utsname buf; + unsigned maj, min; + + if (uname(&buf) != 0) + rb_sys_fail("uname"); + if (sscanf(buf.release, "%u.%u", &maj, &min) != 2) + return; + if (maj < 3 || (maj == 3 && min < 7)) + return; + } + + /* + * KGIO_TFO_MAYBE will be false if a distro backports TFO + * to a pre-3.7 kernel, but includes the necessary constants + * in system headers + */ +#if defined(MSG_FASTOPEN) && defined(TCP_FASTOPEN) + rb_define_const(mKgio, "TCP_FASTOPEN", INT2NUM(TCP_FASTOPEN)); + rb_define_const(mKgio, "MSG_FASTOPEN", INT2NUM(MSG_FASTOPEN)); + kgio_tfo = 1; +#endif +} void Init_kgio_ext(void) { + tfo_maybe(); init_kgio_wait(); init_kgio_read_write(); init_kgio_connect(); diff --git a/test/test_tfo.rb b/test/test_tfo.rb new file mode 100644 index 0000000..846e273 --- /dev/null +++ b/test/test_tfo.rb @@ -0,0 +1,70 @@ +require 'test/unit' +require 'kgio' + +class TestTFO < Test::Unit::TestCase + def test_constants + if `uname -s`.chomp == "Linux" && `uname -r`.to_f >= 3.7 + assert_equal 23, Kgio::TCP_FASTOPEN + assert_equal 0x20000000, Kgio::MSG_FASTOPEN + end + end + + def fastopen_ok? + if RUBY_PLATFORM =~ /linux/ + tfo = File.read("/proc/sys/net/ipv4/tcp_fastopen").to_i + client_enable = 1 + server_enable = 2 + enable = client_enable | server_enable + (tfo & enable) == enable + else + false + end + end + + def test_tfo_client_server + unless fastopen_ok? + warn "TCP Fast Open not enabled on this system (check kernel docs)" + return + end + addr = '127.0.0.1' + qlen = 1024 + s = Kgio::TCPServer.new(addr, 0) + s.setsockopt(:TCP, Kgio::TCP_FASTOPEN, qlen) + port = s.local_address.ip_port + addr = Socket.pack_sockaddr_in(port, addr) + c = Kgio::Socket.new(:INET, :STREAM) + assert_nil c.fastopen("HELLO", addr) + a = s.accept + assert_equal "HELLO", a.read(5) + c.close + a.close + + # ensure empty sends work + c = Kgio::Socket.new(:INET, :STREAM) + assert_nil c.fastopen("", addr) + a = s.accept + Thread.new { c.close } + assert_nil a.read(1) + a.close + + # try a monster packet + buf = 'x' * (1024 * 1024 * 320) + + c = Kgio::Socket.new(:INET, :STREAM) + thr = Thread.new do + a = s.accept + assert_equal buf.size, a.read(buf.size).size + a.close + end + assert_nil c.fastopen(buf, addr) + thr.join + c.close + + # allow timeouts + c = Kgio::Socket.new(:INET, :STREAM) + c.setsockopt(:SOCKET, :SNDTIMEO, [ 0, 10 ].pack("l_l_")) + unsent = c.fastopen(buf, addr) + c.close + assert_equal s.accept.read.size + unsent.size, buf.size + end if defined?(Addrinfo) && defined?(Kgio::TCP_FASTOPEN) +end -- 2.11.4.GIT