From: Eric Wong Date: Wed, 31 Oct 2012 19:38:09 +0000 (+0000) Subject: net-http-persistent usage respects timeouts X-Git-Tag: v3.5.0-rc1~1 X-Git-Url: https://repo.or.cz/w/ruby-mogilefs-client.git/commitdiff_plain/a8bc9cd0fce3169eccc67b191fdb40abfae59547 net-http-persistent usage respects timeouts We now have separate Net::HTTP::Persistent instances between clients that may have different timeouts and also between GET and PUT requests. This hurts our ability to reuse sockets, but correctness is probably more important. --- diff --git a/lib/mogilefs.rb b/lib/mogilefs.rb index 825afc3..dd60e91 100644 --- a/lib/mogilefs.rb +++ b/lib/mogilefs.rb @@ -66,13 +66,10 @@ module MogileFS begin require 'net/http/persistent' - NHP = Net::HTTP::Persistent.new('mogilefs') # :nodoc: + NHP = Net::HTTP::Persistent rescue LoadError - require 'net/http' - NHP = Object.new # :nodoc: - def NHP.request(uri, req) # :nodoc: - Net::HTTP.start(uri.host, uri.port) { |h| h.request(req) } - end + require 'mogilefs/nhp_fake' + NHP = MogileFS::NhpFake end # autoload rarely-used things: diff --git a/lib/mogilefs/http_file.rb b/lib/mogilefs/http_file.rb index 85b4578..caf85ca 100644 --- a/lib/mogilefs/http_file.rb +++ b/lib/mogilefs/http_file.rb @@ -153,7 +153,7 @@ class MogileFS::HTTPFile < StringIO put["Content-MD5"] = md5 end put.body = string - res = MogileFS::NHP.request(uri, put) + res = @opts[:nhp_put].request(uri, put) return size if Net::HTTPSuccess === res raise BadResponseError, "#{res.code} #{res.message}" rescue => e diff --git a/lib/mogilefs/http_reader.rb b/lib/mogilefs/http_reader.rb index 8a0337e..e6b037d 100644 --- a/lib/mogilefs/http_reader.rb +++ b/lib/mogilefs/http_reader.rb @@ -26,13 +26,9 @@ class MogileFS::HTTPReader < MogileFS::Socket "read=#{rv} bytes, expected=#@content_length from #@uri", [] end - def self.first(paths, timeout, count = nil, offset = nil) + def self.first(paths, timeout, range = nil) errors = nil - if offset || count - offset ||= 0 - range_end = count ? offset + count - 1 : "" - range = "Range: bytes=#{offset}-#{range_end}\r\n" - end + range = "Range: bytes=#{range[0]}-#{range[1]}\r\n" if range paths.each do |path| begin diff --git a/lib/mogilefs/mogilefs.rb b/lib/mogilefs/mogilefs.rb index 7503bba..8bab9ef 100644 --- a/lib/mogilefs/mogilefs.rb +++ b/lib/mogilefs/mogilefs.rb @@ -69,6 +69,10 @@ class MogileFS::MogileFS < MogileFS::Client @get_file_data_timeout = args[:get_file_data_timeout] || 5 @new_file_max_time = args[:new_file_max_time] || 3600.0 + @nhp_get = MogileFS::NHP.new('get') + @nhp_get.open_timeout = @nhp_get.read_timeout = @get_file_data_timeout + @nhp_put = MogileFS::NHP.new('put') + @nhp_put.open_timeout = @nhp_put.read_timeout = @new_file_max_time raise ArgumentError, "you must specify a domain" unless @domain @@ -146,17 +150,40 @@ class MogileFS::MogileFS < MogileFS::Client # start position of the copy. def get_file_data(key, dst = nil, copy_length = nil, src_offset = nil) paths = get_paths(key) - sock = MogileFS::HTTPReader.first(paths, @get_file_data_timeout, - copy_length, src_offset) + if src_offset || copy_length + src_offset ||= 0 + range_end = copy_length ? src_offset + copy_length - 1 : nil + range = [ src_offset, range_end ] + end + if dst + sock = MogileFS::HTTPReader.first(paths, @get_file_data_timeout, range) sock.stream_to(dst) elsif block_given? + sock = MogileFS::HTTPReader.first(paths, @get_file_data_timeout, range) yield(sock) else - sock.to_s + errors = nil + paths.each do |path| + uri = URI.parse(path) + get = Net::HTTP::Get.new(uri.path) + get["range"] = "bytes=#{range[0]}-#{range[1]}" if range + begin + res = @nhp_get.request(uri, get) + case res.code.to_i + when 200, 206 + return res.body + end + (errors ||= []) << "#{path} - #{res.message} (#{res.class})" + rescue => e + (errors ||= []) << "#{path} - #{e.message} (#{e.class})" + end + end + raise MogileFS::Error, + "all paths failed with GET: #{errors.join(', ')}", [] end - ensure - sock.close if sock && ! sock.closed? + ensure + sock.close if sock && ! sock.closed? end # Get the paths (URLs as strings) for +key+. If +args+ is specified, @@ -253,7 +280,7 @@ class MogileFS::MogileFS < MogileFS::Client # an array of URI::HTTP objects to the stored destinations def new_file(key, args = nil, bytes = nil) # :yields: file raise MogileFS::ReadOnlyError if readonly? - opts = { :key => key, :multi_dest => 1 } + opts = { :key => key, :multi_dest => 1, :nhp_put => @nhp_put } case args when Hash opts[:domain] = args[:domain] diff --git a/lib/mogilefs/new_file/content_range.rb b/lib/mogilefs/new_file/content_range.rb index 757fccc..437a801 100644 --- a/lib/mogilefs/new_file/content_range.rb +++ b/lib/mogilefs/new_file/content_range.rb @@ -9,7 +9,7 @@ class MogileFS::NewFile::ContentRange attr_reader :md5 def hit(uri, req) - MogileFS::NHP.request(uri, req).value + @opts[:nhp_put].request(uri, req).value end # :startdoc: diff --git a/lib/mogilefs/nhp_fake.rb b/lib/mogilefs/nhp_fake.rb new file mode 100644 index 0000000..9fe9faa --- /dev/null +++ b/lib/mogilefs/nhp_fake.rb @@ -0,0 +1,18 @@ +# :enddoc: +require 'net/http' + +# This is just for folks that don't have net-http-persistent +class MogileFS::NhpFake # :nodoc: + attr_accessor :read_timeout, :open_timeout + + def initialize(name) + @read_timeout = @open_timeout = nil + end + + def request(uri, req) # :nodoc: + http = Net::HTTP.new(uri.host, uri.port) + http.read_timeout = @read_timeout + http.open_timeout = @open_timeout + http.request(req) + end +end diff --git a/test/test_fresh.rb b/test/test_fresh.rb index 7585ab0..b619a6a 100644 --- a/test/test_fresh.rb +++ b/test/test_fresh.rb @@ -116,4 +116,35 @@ class TestMogFresh < Test::Unit::TestCase line = buf.split(/\r?\n/).grep(/\screate_close\s/)[0] assert_match(/\bfarewell=goodnight\b/, line) end + + def test_get_file_data_range + add_host_device_domain + client = MogileFS::MogileFS.new :hosts => @hosts, :domain => @domain + data = "data" + client.store_content("key", "default", data) + + assert_equal data, client.get_file_data("key") + + # ensure offset/length matches IO.copy_stream + src = Tempfile.new("tmp") + src.write(data) + src.flush + [ [1,nil], [1,2], [3,1] ].each do |range| + dst2 = StringIO.new + client.get_file_data("key", dst2, *range) + + src.rewind + + if IO.respond_to?(:copy_stream) + # ensure we match IO.copy_stream semantics + dst = StringIO.new + IO.copy_stream(src, dst, *range) + assert_equal dst.string, dst2.string + assert_equal dst.string, client.get_file_data("key", nil, *range) + end + + assert_equal dst2.string, client.get_file_data("key", nil, *range) + end + src.close! + end end diff --git a/test/test_mogilefs.rb b/test/test_mogilefs.rb index dbaddf3..a6bcf35 100644 --- a/test/test_mogilefs.rb +++ b/test/test_mogilefs.rb @@ -10,6 +10,14 @@ class TestMogileFS__MogileFS < TestMogileFS super end + def read_headers(client) + headers = "" + while line = client.gets + headers << line + return headers if line == "\r\n" + end + end + def test_initialize assert_equal 'test', @client.domain @@ -24,9 +32,10 @@ class TestMogileFS__MogileFS < TestMogileFS svr = Proc.new do |serv, port| client, _ = serv.accept client.sync = true - readed = client.recv(4096, 0) - assert(readed =~ \ - %r{\AGET /dev[12]/0/000/000/0000000062\.fid HTTP/1.[01]\r\n\r\n\Z}) + readed = read_headers(client) + assert_match( + %r{\AGET /dev[12]/0/000/000/0000000062\.fid HTTP/1.[01]\r\n}, + readed) accept.syswrite('.') client.send("HTTP/1.0 200 OK\r\nContent-Length: 5\r\n\r\ndata!", 0) client.close @@ -50,9 +59,10 @@ class TestMogileFS__MogileFS < TestMogileFS svr1 = Proc.new do |serv, port| client, _ = serv.accept client.sync = true - readed = client.recv(4096, 0) - assert(readed =~ \ - %r{\AGET /dev1/0/000/000/0000000062\.fid HTTP/1.[01]\r\n\r\n\Z}) + readed = read_headers(client) + assert_match( + %r{\AGET /dev1/0/000/000/0000000062\.fid HTTP/1.[01]\r\n}, + readed) accept.syswrite('.') client.send("HTTP/1.0 404 Not Found\r\n\r\ndata!", 0) client.close @@ -61,9 +71,10 @@ class TestMogileFS__MogileFS < TestMogileFS svr2 = Proc.new do |serv, port| client, _ = serv.accept client.sync = true - readed = client.recv(4096, 0) - assert(readed =~ \ - %r{\AGET /dev2/0/000/000/0000000062\.fid HTTP/1.[01]\r\n\r\n\Z}) + readed = read_headers(client) + assert_match( + %r{\AGET /dev2/0/000/000/0000000062\.fid HTTP/1.[01].*\r\n}, + readed) accept.syswrite('.') client.send("HTTP/1.0 200 OK\r\nContent-Length: 5\r\n\r\ndata!", 0) client.close @@ -98,9 +109,10 @@ class TestMogileFS__MogileFS < TestMogileFS client, _ = serv.accept client.sync = true accept.syswrite('.') - readed = client.recv(4096, 0) - assert(readed =~ \ - %r{\AGET /dev[12]/0/000/000/0000000062\.fid HTTP/1.[01]\r\n\r\n\Z}) + readed = read_headers(client) + assert_match( + %r{\AGET /dev[12]/0/000/000/0000000062\.fid HTTP/1.[01]\r\n}, + readed) MogileFS.io.copy_stream(tmpfp, client) client.close exit 0 @@ -406,7 +418,7 @@ class TestMogileFS__MogileFS < TestMogileFS t = TempServer.new(Proc.new do |serv, accept| client, _ = serv.accept client.sync = true - client.recv(4096, 0) + read_headers(client) client.send("HTTP/1.0 500 Internal Server Error\r\n\r\n", 0) client.close end)