From 2cbeef0f29be4de815b802e705528b6f05d1bb29 Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Wed, 24 Oct 2012 07:27:18 +0000 Subject: [PATCH] optionally use net-http-persistent for StringIO Given StringIO objects are already in memory, NHP can make small uploads which fit into memory faster. Large uploads (using big_io or :largefile => :stream still go through IO.copy_stream for now) --- lib/mogilefs.rb | 11 ++++++++ lib/mogilefs/http_file.rb | 31 +++++++++++++++++++-- lib/mogilefs/new_file/content_range.rb | 15 ++-------- test/test_mogilefs.rb | 51 ++++++++++++++++++++++++---------- 4 files changed, 79 insertions(+), 29 deletions(-) diff --git a/lib/mogilefs.rb b/lib/mogilefs.rb index cd58c49..43a3433 100644 --- a/lib/mogilefs.rb +++ b/lib/mogilefs.rb @@ -64,6 +64,17 @@ module MogileFS @io = MogileFS::CopyStream end + begin + require 'net/http/persistent' + NHP = Net::HTTP::Persistent.new('mogilefs') + rescue LoadError + require 'net/http' + NHP = Object.new + def NHP.request(uri, req) + Net::HTTP.start(uri.host, uri.port) { |h| h.request(req) } + end + end + # autoload rarely-used things: autoload :Mysql, 'mogilefs/mysql' autoload :Pool, 'mogilefs/pool' diff --git a/lib/mogilefs/http_file.rb b/lib/mogilefs/http_file.rb index 7871940..c96e695 100644 --- a/lib/mogilefs/http_file.rb +++ b/lib/mogilefs/http_file.rb @@ -136,14 +136,41 @@ class MogileFS::HTTPFile < StringIO sock.close if sock end + def nhp_put(devid, uri) + clen = @opts[:content_length] + if clen && clen != size + raise MogileFS::SizeMismatchError, + ":content_length expected: #{clen.inspect}, actual: #{size}" + end + + put = Net::HTTP::Put.new(uri.path) + put["Content-Type"] = "application/octet-stream" + if md5 = @opts[:content_md5] + if md5.respond_to?(:call) + md5 = md5.call.strip + elsif md5 == :trailer + md5 = [ Digest::MD5.digest(string) ].pack("m").chomp! + end + put["Content-MD5"] = md5 + end + put.body = string + res = MogileFS::NHP.request(uri, put) + return size if Net::HTTPSuccess === res + raise BadResponseError, "#{res.code} #{res.message}" + rescue => e + /\ANet::/ =~ "#{e.class}" and + raise RetryableError, "#{e.message} (#{e.class})", e.backtrace + raise + end + def commit errors = nil @dests.each do |devid, path| begin uri = URI.parse(path) - bytes_uploaded = upload(devid, uri) + bytes_uploaded = size > 0 ? nhp_put(devid, uri) : upload(devid, uri) return create_close(devid, uri, bytes_uploaded) - rescue SystemCallError, RetryableError => e + rescue Timeout::Error, SystemCallError, RetryableError => e errors ||= [] errors << "#{path} - #{e.message} (#{e.class})" end diff --git a/lib/mogilefs/new_file/content_range.rb b/lib/mogilefs/new_file/content_range.rb index 8dd4725..757fccc 100644 --- a/lib/mogilefs/new_file/content_range.rb +++ b/lib/mogilefs/new_file/content_range.rb @@ -1,6 +1,5 @@ # -*- encoding: binary -*- # here are internal implementation details, do not rely on them in your code -require 'net/http' require 'mogilefs/new_file/writer' # an IO-like object @@ -9,18 +8,8 @@ class MogileFS::NewFile::ContentRange include MogileFS::NewFile::Common attr_reader :md5 - # :stopdoc: - begin - require 'net/http/persistent' - NHP = Net::HTTP::Persistent.new('mogilefs') - - def hit(uri, req) - NHP.request(uri, req).value - end - rescue LoadError - def hit(uri, req) - Net::HTTP.start(uri.host, uri.port) { |h| h.request(req).value } - end + def hit(uri, req) + MogileFS::NHP.request(uri, req).value end # :startdoc: diff --git a/test/test_mogilefs.rb b/test/test_mogilefs.rb index 247feca..6000781 100644 --- a/test/test_mogilefs.rb +++ b/test/test_mogilefs.rb @@ -227,11 +227,12 @@ class TestMogileFS__MogileFS < TestMogileFS to_store = Tempfile.new('small') to_store.syswrite('data') - expected = "PUT /path HTTP/1.0\r\nContent-Length: 4\r\n\r\ndata" t = TempServer.new(Proc.new do |serv, accept| client, _ = serv.accept - client.sync = true - received.syswrite(client.read(expected.bytesize)) + while buf = client.readpartial(666) + received.syswrite(buf) + break if buf =~ /data/ + end client.send("HTTP/1.0 200 OK\r\n\r\n", 0) client.close end) @@ -243,14 +244,18 @@ class TestMogileFS__MogileFS < TestMogileFS nr = @client.store_file 'new_key', 'test', to_store.path assert_equal 4, nr received.sysseek(0) - assert_equal expected, received.sysread(4096) + + a = received.sysread(999999).split(/\r\n/) + assert_match(%r{\APUT /path HTTP/1\.[01]\z}, a[0]) + assert_equal("data", a[-1]) + assert_equal("", a[-2]) + assert a.grep(%r{\AContent-Length: 4\z})[0] ensure TempServer.destroy_all! end def test_store_content_http received = Tempfile.new('received') - expected = "PUT /path HTTP/1.0\r\nContent-Length: 4\r\n\r\ndata" t = TempServer.new(Proc.new do |serv, accept| client, _ = serv.accept @@ -275,7 +280,11 @@ class TestMogileFS__MogileFS < TestMogileFS assert_equal 4, nr received.sysseek(0) - assert_equal expected, received.sysread(4096) + a = received.sysread(999999).split(/\r\n/) + assert_match(%r{\APUT /path HTTP/1\.[01]\z}, a[0]) + assert_equal("data", a[-1]) + assert_equal("", a[-2]) + assert a.grep(%r{\AContent-Length: 4\z})[0] ensure TempServer.destroy_all! end @@ -291,12 +300,14 @@ class TestMogileFS__MogileFS < TestMogileFS client, _ = serv.accept client.sync = true nr = 0 + seen = '' loop do buf = client.readpartial(8192) or break break if buf.length == 0 assert_equal buf.length, received.syswrite(buf) nr += buf.length - break if nr >= expected.size + seen << buf + break if seen =~ /\r\n\r\n(?:data){10}/ end client.send("HTTP/1.0 200 OK\r\n\r\n", 0) client.close @@ -317,7 +328,11 @@ class TestMogileFS__MogileFS < TestMogileFS assert_equal 40, nr received.sysseek(0) - assert_equal expected, received.sysread(4096) + a = received.sysread(999999).split(/\r\n/) + assert_match(%r{\APUT /path HTTP/1\.[01]\z}, a[0]) + assert_equal("data" * 10, a[-1]) + assert_equal("", a[-2]) + assert a.grep(%r{\AContent-Length: 40\z})[0] ensure TempServer.destroy_all! end @@ -329,7 +344,6 @@ class TestMogileFS__MogileFS < TestMogileFS def test_store_content_multi_dest_failover(big_io = false) received1 = Tempfile.new('received') received2 = Tempfile.new('received') - expected = "PUT /path HTTP/1.0\r\nContent-Length: 4\r\n\r\ndata" t1 = TempServer.new(Proc.new do |serv, accept| client, _ = serv.accept @@ -375,8 +389,17 @@ class TestMogileFS__MogileFS < TestMogileFS assert_equal 4, nr received1.sysseek(0) received2.sysseek(0) - assert_equal expected, received1.sysread(4096) - assert_equal expected, received2.sysread(4096) + a = received1.sysread(4096).split(/\r\n/) + b = received2.sysread(4096).split(/\r\n/) + assert_equal a[0], b[0] + assert_match(%r{\APUT /path HTTP/1\.[01]\z}, a[0]) + assert_match(%r{\APUT /path HTTP/1\.[01]\z}, b[0]) + assert_equal("data", a[-1]) + assert_equal("data", b[-1]) + assert_equal("", a[-2]) + assert_equal("", b[-2]) + assert a.grep(%r{\AContent-Length: 4\z})[0] + assert b.grep(%r{\AContent-Length: 4\z})[0] ensure TempServer.destroy_all! end @@ -402,7 +425,6 @@ class TestMogileFS__MogileFS < TestMogileFS def test_store_content_http_empty received = Tempfile.new('received') - expected = "PUT /path HTTP/1.0\r\nContent-Length: 0\r\n\r\n" t = TempServer.new(Proc.new do |serv, accept| client, _ = serv.accept client.sync = true @@ -419,7 +441,9 @@ class TestMogileFS__MogileFS < TestMogileFS nr = @client.store_content 'new_key', 'test', '' assert_equal 0, nr received.sysseek(0) - assert_equal expected, received.sysread(4096) + a = received.sysread(4096).split(/\r\n/) + assert_match(%r{\APUT /path HTTP/1\.[01]\z}, a[0]) + assert a.grep(%r{\AContent-Length: 0\z})[0] end def test_store_content_nfs @@ -613,7 +637,6 @@ class TestMogileFS__MogileFS < TestMogileFS timeout = 1 args = { :hosts => hosts, :domain => "foo", :timeout => timeout } c = MogileFS::MogileFS.new(args) - received = [] secs = timeout + 1 th = Thread.new do close_later = [] -- 2.11.4.GIT