copy_stream: always use SPLICE_F_NONBLOCK for partial copy
authorEric Wong <normalperson@yhbt.net>
Tue, 17 May 2011 03:18:12 +0000 (16 20:18 -0700)
committerEric Wong <normalperson@yhbt.net>
Tue, 17 May 2011 21:14:26 +0000 (17 14:14 -0700)
The pipe may be full from small buffers due to how TCP
works, so we need to use non-blocking I/O on the pipe.

ref: http://lkml.org/lkml/2009/1/13/478

lib/io/splice.rb
test/test_tcp_splice.rb [new file with mode: 0644]

index 1ff2f6a..89fab92 100644 (file)
@@ -94,16 +94,15 @@ module IO::Splice
   # Returns the number of bytes actually spliced.
   # Like IO#readpartial, this never returns Errno::EAGAIN
   def self.partial(src, dst, len, src_offset)
-    IO.splice(src, src_offset, dst, nil, len, F_MOVE)
-    rescue EOFError
-      nil
-    rescue Errno::EAGAIN
-      begin
-        src.to_io.wait
-        IO.select(nil, [dst])
-        rv = IO.trysplice(src, src_offset, dst, nil, len, F_MOVE)
-      end while rv == :EAGAIN
-      rv
+    case rv = IO.trysplice(src, src_offset, dst, nil, len, F_MOVE)
+    when :EAGAIN
+      src.to_io.wait
+      IO.select(nil, [dst])
+    when Integer
+      return rv
+    else
+      return nil
+    end while true
   end
 end
 if (! defined?(RUBY_ENGINE) || RUBY_ENGINE == "ruby") &&
diff --git a/test/test_tcp_splice.rb b/test/test_tcp_splice.rb
new file mode 100644 (file)
index 0000000..2ddbfcb
--- /dev/null
@@ -0,0 +1,49 @@
+require 'socket'
+require 'io/wait'
+require 'io/splice'
+require 'io/nonblock'
+require "test/unit"
+
+class TestTCPCopyStream < Test::Unit::TestCase
+  def setup
+    host = ENV["TEST_HOST"] || "127.0.0.1"
+    @srv = TCPServer.new(host, 0)
+    @port = @srv.addr[1]
+    @client = TCPSocket.new(host, @port)
+    @client.setsockopt Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1
+    @accept = @srv.accept
+    @accept.setsockopt Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1
+    @client.sync = @accept.sync = true
+    @r, @w = IO.pipe
+  end
+
+  def teardown
+    @srv.close
+    [ @client, @accept, @r, @w ].each { |io| io.close unless io.closed? }
+  end
+
+  def test_client_to_server_eof
+    nr = 2000
+    buf = '0123456789abcdef' * 1024
+    expect = buf.size * nr
+    thr = Thread.new do
+      nr.times { @client.write(buf) }
+      @client.close
+    end
+    sleep 1 # wait for rcvbuf to fill up
+    bytes = IO::Splice.copy_stream(@accept, "/dev/null")
+    assert_equal expect, bytes
+  end
+
+  def test_client_to_server_expect
+    nr = 2000
+    buf = '0123456789abcdef' * 1024
+    expect = buf.size * nr
+    thr = Thread.new do
+      nr.times { @client.write(buf) }
+    end
+    sleep 1 # wait for rcvbuf to fill up
+    bytes = IO::Splice.copy_stream(@accept, "/dev/null", expect)
+    assert_equal expect, bytes
+  end
+end