From c07be896889f13235bda1bd32983261e73513a70 Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Tue, 24 Mar 2009 18:56:50 -0700 Subject: [PATCH] Respect timeout when doing get_file_data This adds a sysread_full utility method with configurable timeouts. Individual reads can be timed out as well as the entire sysread_full call. --- lib/mogilefs/mogilefs.rb | 14 +++-------- lib/mogilefs/util.rb | 23 +++++++++++++++++- test/test_util.rb | 61 ++++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 86 insertions(+), 12 deletions(-) diff --git a/lib/mogilefs/mogilefs.rb b/lib/mogilefs/mogilefs.rb index c7c33ed..14fc0c2 100644 --- a/lib/mogilefs/mogilefs.rb +++ b/lib/mogilefs/mogilefs.rb @@ -71,17 +71,8 @@ class MogileFS::MogileFS < MogileFS::Client when /^http:\/\// then begin sock = http_get_sock(URI.parse(path)) - return( if block_given? - yield(sock) - else - begin - sock.read - rescue Errno::EAGAIN - IO.select([sock]) - retry - end - end ) - # return block_given? ? yield(sock) : sock.read + return yield(sock) if block_given? + return sysread_full(sock, sock.mogilefs_size, @get_file_data_timeout) rescue MogileFS::Timeout, Errno::ECONNREFUSED, EOFError, SystemCallError, MogileFS::InvalidResponseError next @@ -288,6 +279,7 @@ class MogileFS::MogileFS < MogileFS::Client buf = sock.recv(4096, Socket::MSG_PEEK) head, body = buf.split(/\r\n\r\n/, 2) if head =~ HTTP_200_OK + sock.mogilefs_size = head[/^Content-Length:\s*(\d+)/i, 1].to_i sock.recv(head.size + 4, 0) return sock end diff --git a/lib/mogilefs/util.rb b/lib/mogilefs/util.rb index deaf6b0..9ccd4f0 100644 --- a/lib/mogilefs/util.rb +++ b/lib/mogilefs/util.rb @@ -67,6 +67,27 @@ module MogileFS::Util # should never get here end + def sysread_full(io_rd, size, timeout = nil, full_timeout = false) + tmp = [] # avoid expensive string concatenation with every loop iteration + reader = io_rd.method(timeout ? :read_nonblock : :sysread) + begin + while size > 0 + tmp << reader.call(size) + size -= tmp.last.size + end + rescue Errno::EAGAIN, Errno::EINTR + t0 = Time.now + ready = IO.select([ io_rd ], nil, nil, timeout) + timeout -= (Time.now - t0) if full_timeout + if ready != [ io_rd ] || timeout < 0 + raise MogileFS::Timeout, 'sysread_full timeout' + end + retry + rescue EOFError + end + tmp.join('') + end + class StoreContent < Proc def initialize(total_size, &writer_proc) @total_size = total_size @@ -87,7 +108,7 @@ require 'timeout' class MogileFS::Timeout < Timeout::Error; end class Socket - attr_accessor :mogilefs_addr, :mogilefs_connected + attr_accessor :mogilefs_addr, :mogilefs_connected, :mogilefs_size TCP_CORK = 3 if ! defined?(TCP_CORK) && RUBY_PLATFORM =~ /linux/ diff --git a/test/test_util.rb b/test/test_util.rb index 49c1735..55cac66 100644 --- a/test/test_util.rb +++ b/test/test_util.rb @@ -56,4 +56,65 @@ class TestMogileFS__Util < Test::Unit::TestCase t.destroy! end + def test_sysread_slowly + nr = 10 + str = 'abcde' + expect = str * nr + rd, wr = IO.pipe + pid = fork do + rd.close + nr.times do + syswrite_full(wr, str) + sleep(0.1) + end + end + wr.close + buf = sysread_full(rd, expect.size) + assert_equal expect, buf + rd.close + ensure + Process.kill('TERM', pid) rescue nil + Process.waitpid(pid) rescue nil + end + + def test_sysread_timeout + nr = 10 + str = 'abcde' + expect = str * nr + rd, wr = IO.pipe + pid = fork do + rd.close + nr.times do + syswrite_full(wr, str) + sleep 1 + end + end + wr.close + assert_raises(MogileFS::Timeout) { sysread_full(rd, expect.size, 0.1) } + rd.close + ensure + Process.kill('TERM', pid) rescue nil + Process.waitpid(pid) rescue nil + end + + def test_sysread_full_timeout + nr = 100 + str = 'abcde' + expect = str * nr + rd, wr = IO.pipe + pid = fork do + rd.close + nr.times do + syswrite_full(wr, str) + sleep 0.01 + end + end + wr.close + assert_raises(MogileFS::Timeout) { sysread_full(rd,expect.size,0.1,true) } + rd.close + ensure + Process.kill('TERM', pid) rescue nil + Process.waitpid(pid) rescue nil + end + end -- 2.11.4.GIT