From 7b50489a1bca82ccb0b2e374f3d4f95bb651be5a Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Fri, 21 Oct 2011 08:03:58 +0000 Subject: [PATCH] introduce MogileFS::Socket class More work towards _not_ monkey-patching core classes. --- lib/mogilefs/socket.rb | 5 +++ lib/mogilefs/socket/kgio.rb | 67 ++++++++++++++++++++++++++++ lib/mogilefs/socket/pure_ruby.rb | 69 +++++++++++++++++++++++++++++ test/socket_test.rb | 93 +++++++++++++++++++++++++++++++++++++++ test/test_mogilefs_socket_kgio.rb | 11 +++++ test/test_mogilefs_socket_pure.rb | 10 +++++ 6 files changed, 255 insertions(+) create mode 100644 lib/mogilefs/socket.rb create mode 100644 lib/mogilefs/socket/kgio.rb create mode 100644 lib/mogilefs/socket/pure_ruby.rb create mode 100644 test/socket_test.rb create mode 100644 test/test_mogilefs_socket_kgio.rb create mode 100644 test/test_mogilefs_socket_pure.rb diff --git a/lib/mogilefs/socket.rb b/lib/mogilefs/socket.rb new file mode 100644 index 0000000..88712e8 --- /dev/null +++ b/lib/mogilefs/socket.rb @@ -0,0 +1,5 @@ +begin + require "mogilefs/socket/kgio" +rescue LoadError + require "mogilefs/socket/pure_ruby" +end diff --git a/lib/mogilefs/socket/kgio.rb b/lib/mogilefs/socket/kgio.rb new file mode 100644 index 0000000..2ccade3 --- /dev/null +++ b/lib/mogilefs/socket/kgio.rb @@ -0,0 +1,67 @@ +# -*- encoding: binary -*- +require "kgio" + +class MogileFS::Socket < Kgio::Socket + def self.start(host, port) + sock = super(Socket.sockaddr_in(port, host)) + Socket.const_defined?(:TCP_NODELAY) and + sock.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1) + sock + end + + def self.tcp(host, port, timeout = 5) + sock = start(host, port) + unless sock.kgio_wait_writable(timeout) + sock.close + raise MogileFS::Timeout, 'socket connect timeout' + end + sock + end + + def timed_read(len, dst = "", timeout = 5) + case rc = kgio_tryread(len, dst) + when :wait_readable + kgio_wait_readable(timeout) or raise MogileFS::Timeout, "read timeout" + else + return rc + end while true + end + + def timed_peek(len, dst, timeout = 5) + case rc = kgio_trypeek(len, dst) + when :wait_readable + kgio_wait_readable(timeout) or raise MogileFS::Timeout, "peek timeout" + else + return rc + end while true + end + + def timed_write(buffer, timeout = 5) + case rc = kgio_trywrite(buffer) + when :wait_writable + kgio_wait_writable(timeout) or raise MogileFS::Timeout, "write timeout" + when String + buffer = rc + else + return rc + end while true + end + + SEP_RE = /\A(.*?#{Regexp.escape("\n")})/ + def timed_gets(timeout = 5) + unless defined?(@rbuf) + @rbuf = timed_read(1024, "", timeout) or return # EOF + end + begin + @rbuf.sub!(SEP_RE, "") and return $1 + tmp ||= "" + if timed_read(1024, tmp, timeout) + @rbuf << tmp + else + # EOF, return the last buffered bit even without SEP_RE matching + # (not ideal for MogileFS, this is an error) + return @rbuf.empty? ? nil : @rbuf.slice!(0, @rbuf.size) + end + end while true + end +end diff --git a/lib/mogilefs/socket/pure_ruby.rb b/lib/mogilefs/socket/pure_ruby.rb new file mode 100644 index 0000000..9176e46 --- /dev/null +++ b/lib/mogilefs/socket/pure_ruby.rb @@ -0,0 +1,69 @@ +require "socket" +require "io/wait" +require "timeout" + +class MogileFS::Socket < Socket + + def self.start(host, port) + sock = new(Socket::AF_INET, Socket::SOCK_STREAM, 0) + begin + sock.connect_nonblock(sockaddr_in(port, host)) + rescue Errno::EINPROGRESS + end + Socket.const_defined?(:TCP_NODELAY) and + sock.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1) + sock + end + + def self.tcp(host, port, timeout = 5) + sock = start(host, port) + unless IO.select(nil, [ sock ], nil, timeout) + sock.close + raise MogileFS::Timeout, 'socket connect timeout' + end + sock + end + + def timed_read(len, dst = "", timeout = 5) + begin + return read_nonblock(len, dst) + rescue Errno::EAGAIN + wait(timeout) or raise MogileFS::Timeout, "read timeout" + rescue EOFError + return + end while true + end + + def timed_peek(len, dst, timeout = 5) + begin + rc = recv_nonblock(len, Socket::MSG_PEEK) + return rc.empty? ? nil : dst.replace(rc) + rescue Errno::EAGAIN + wait(timeout) or raise MogileFS::Timeout, "peek timeout" + rescue EOFError + dst.replace("") + return + end while true + end + + def timed_write(buf, timeout = 5) + begin + rc = write_nonblock(buf) + if buf.respond_to?(:encoding) + return if rc == buf.bytesize + if buf.encoding != Encoding::BINARY + buf = buf.dup.force_encoding(Encoding::BINARY) + end + end + return if rc == buf.size + buf = buf.slice(rc, buf.size) + rescue Errno::EAGAIN + IO.select(nil, [self], nil, timeout) or + raise MogileFS::Timeout, "write timeout" + end while true + end + + def timed_gets(timeout = 5) + Timeout.timeout(timeout, MogileFS::Timeout) { gets("\n") } + end +end diff --git a/test/socket_test.rb b/test/socket_test.rb new file mode 100644 index 0000000..984fc99 --- /dev/null +++ b/test/socket_test.rb @@ -0,0 +1,93 @@ +require "socket" +require "test/unit" +module SocketTest + + def setup + @host = ENV["TEST_HOST"] || '127.0.0.1' + @srv = TCPServer.new(@host, 0) + @port = @srv.addr[1] + end + + def test_start + sock = MogileFS::Socket.start(@host, @port) + assert_instance_of MogileFS::Socket, sock, sock.inspect + assert_nothing_raised do + begin + sock.write_nonblock("a") + rescue Errno::EAGAIN + end + end + thr = Thread.new { @srv.accept } + accepted = thr.value + assert_instance_of TCPSocket, accepted, accepted.inspect + assert_nil sock.close + end + + def test_new + sock = MogileFS::Socket.tcp(@host, @port) + assert_instance_of MogileFS::Socket, sock, sock.inspect + assert_nothing_raised do + sock.write_nonblock("a") + end + thr = Thread.new { @srv.accept } + accepted = thr.value + assert_instance_of TCPSocket, accepted, accepted.inspect + assert_equal "a", accepted.read(1) + assert_nil sock.close + end + + def test_timed_peek + sock = MogileFS::Socket.tcp(@host, @port) + accepted = @srv.accept + buf = "" + assert_raises(MogileFS::Timeout) { sock.timed_peek(2, buf, 0.01) } + accepted.write "HI" + assert_equal "HI", sock.timed_peek(2, buf, 0.1) + assert_equal "HI", buf + assert_equal "HI", sock.timed_peek(2, buf) + assert_equal "HI", sock.timed_read(2, buf) + accepted.close + assert_nil sock.timed_peek(2, buf) + end + + def test_timed_read + sock = MogileFS::Socket.tcp(@host, @port) + accepted = @srv.accept + buf = "" + assert_raises(MogileFS::Timeout) { sock.timed_read(2, buf, 0.01) } + accepted.write "HI" + assert_equal "HI", sock.timed_read(2, buf, 0.1) + assert_equal "HI", buf + assert_raises(MogileFS::Timeout) { sock.timed_read(2, buf, 0.01) } + accepted.close + assert_nil sock.timed_read(2, buf) + end + + def test_timed_write + sock = MogileFS::Socket.tcp(@host, @port) + accepted = @srv.accept + buf = "A" * 100000 + written = 0 + assert_raises(MogileFS::Timeout) do + loop do + sock.timed_write(buf, 0.01) + written += buf.size + end + end + tmp = accepted.read(written) + assert_equal written, tmp.size + end + + def timed_gets + sock = MogileFS::Socket.tcp(@host, @port) + accepted = @srv.accept + buf = "" + assert_raises(MogileFS::Timeout) { sock.timed_gets(0.01) } + accepted.write "HI" + assert_raises(MogileFS::Timeout) { sock.timed_gets(0.01) } + accepted.write "\n" + assert_equal "HI\n", sock.timed_gets + accepted.close + assert_nil sock.timed_gets + end +end diff --git a/test/test_mogilefs_socket_kgio.rb b/test/test_mogilefs_socket_kgio.rb new file mode 100644 index 0000000..e44ed0a --- /dev/null +++ b/test/test_mogilefs_socket_kgio.rb @@ -0,0 +1,11 @@ +require "./test/socket_test" +require "mogilefs" +begin + require "kgio" + require "mogilefs/socket/kgio" +rescue LoadError +end + +class TestSocketKgio < Test::Unit::TestCase + include SocketTest +end if defined?(Kgio) diff --git a/test/test_mogilefs_socket_pure.rb b/test/test_mogilefs_socket_pure.rb new file mode 100644 index 0000000..c60ecec --- /dev/null +++ b/test/test_mogilefs_socket_pure.rb @@ -0,0 +1,10 @@ +require "./test/socket_test" +require "mogilefs" +begin + require "mogilefs/socket/pure_ruby" +rescue LoadError +end + +class TestSocketPure < Test::Unit::TestCase + include SocketTest +end -- 2.11.4.GIT