From ad6113719d37824bcad6b8e78edafd838c9f8a78 Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Tue, 6 Dec 2011 20:54:03 +0000 Subject: [PATCH] "new_file" gets many options for Content-MD5 handling This still needs docs and integration with store_content and store_file, but seems to be mostly working. --- lib/mogilefs/chunker.rb | 13 +++++-- lib/mogilefs/http_file.rb | 76 +++++++++++++++++++++++++++++---------- lib/mogilefs/mogilefs.rb | 42 ++++++++++++---------- test/test_mogilefs_integration.rb | 44 +++++++++++++++++++++++ 4 files changed, 136 insertions(+), 39 deletions(-) diff --git a/lib/mogilefs/chunker.rb b/lib/mogilefs/chunker.rb index f735c68..baddcae 100644 --- a/lib/mogilefs/chunker.rb +++ b/lib/mogilefs/chunker.rb @@ -1,12 +1,12 @@ # -*- encoding: binary -*- -require "digest/md5" class MogileFS::Chunker CRLF = "\r\n" attr_reader :io - def initialize(io, md5) + def initialize(io, md5, expect_md5) @io = io - @md5 = md5 ? Digest::MD5.new : nil + @md5 = md5 + @expect_md5 = expect_md5 end def write(buf) @@ -21,6 +21,13 @@ class MogileFS::Chunker def flush if @md5 content_md5 = [ @md5.digest ].pack('m').strip + if @expect_md5.respond_to?(:call) + expect = @expect_md5.call + if expect != content_md5 + raise MogileFS::ChecksumMismatchError, + "expected: #{expect.inspect} actual: #{content_md5.inspect}" + end + end warn "Content-MD5: #{content_md5}\r\n" if $DEBUG @io.write("0\r\nContent-MD5: #{content_md5}\r\n\r\n") else diff --git a/lib/mogilefs/http_file.rb b/lib/mogilefs/http_file.rb index 4d4a850..2e418e4 100644 --- a/lib/mogilefs/http_file.rb +++ b/lib/mogilefs/http_file.rb @@ -2,6 +2,7 @@ # here are internal implementation details, do not use them in your code require 'stringio' require 'uri' +require 'digest/md5' require 'mogilefs/chunker' ## @@ -12,9 +13,10 @@ require 'mogilefs/chunker' # create a new file using MogileFS::MogileFS.new_file. # class MogileFS::HTTPFile < StringIO - class EmptyResponseError < MogileFS::Error; end - class BadResponseError < MogileFS::Error; end - class UnparseableResponseError < MogileFS::Error; end + class RetryableError < MogileFS::Error; end + class EmptyResponseError < RetryableError; end + class BadResponseError < RetryableError; end + class UnparseableResponseError < RetryableError; end class NoStorageNodesError < MogileFS::Error def message; 'Unable to open socket to storage node'; end end @@ -67,34 +69,50 @@ class MogileFS::HTTPFile < StringIO # Creates a new HTTPFile with MogileFS-specific data. Use # MogileFS::MogileFS#new_file instead of this method. - def initialize(dests, content_length) + def initialize(dests, opts = nil) super "" - @streaming_io = @big_io = @uri = @devid = @active = nil + @md5 = @streaming_io = @big_io = @active = nil @dests = dests + @opts = Integer === opts ? { :content_length => opts } : opts end def request_put(sock, uri, file_size, input = nil) host_with_port = "#{uri.host}:#{uri.port}" - md5 = false - if MD5_TRAILER_NODES[host_with_port] + clen = @opts[:content_length] + file_size ||= clen + + content_md5 = @opts[:content_md5] + if String === content_md5 + file_size or + raise ArgumentError, + ":content_length must be specified with :content_md5 String" + file_size = "#{file_size}\r\nContent-MD5: #{content_md5}" + elsif content_md5.respond_to?(:call) || + :trailer == content_md5 || + MD5_TRAILER_NODES[host_with_port] file_size = nil - md5 = true + @md5 = Digest::MD5.new end if file_size sock.write("PUT #{uri.request_uri} HTTP/1.0\r\n" \ "Content-Length: #{file_size}\r\n\r\n") - input ? MogileFS.io.copy_stream(@active = input, sock) : yield(sock) + rv = input ? MogileFS.io.copy_stream(@active = input, sock) : yield(sock) else - trailers = md5 ? "Trailer: Content-MD5\r\n" : "" + trailers = @md5 ? "Trailer: Content-MD5\r\n" : "" sock.write("PUT #{uri.request_uri} HTTP/1.1\r\n" \ "Host: #{host_with_port}\r\n#{trailers}" \ "Transfer-Encoding: chunked\r\n\r\n") - tmp = MogileFS::Chunker.new(sock, md5) + tmp = MogileFS::Chunker.new(sock, @md5, content_md5) rv = input ? MogileFS.io.copy_stream(@active = input, tmp) : yield(tmp) tmp.flush - rv end + + if clen && clen != rv + raise MogileFS::SizeMismatchError, + ":content_length expected: #{clen.inspect}, actual: #{rv.inspect}" + end + rv end def put_streaming_io(sock, uri) # unlikely to be used @@ -161,7 +179,7 @@ class MogileFS::HTTPFile < StringIO raise UnparseableResponseError, "Response line not understood: #{line.inspect}" end - rescue => err + rescue SystemCallError, RetryableError => err rewind_or_raise!(uri, err) raise ensure @@ -175,11 +193,8 @@ class MogileFS::HTTPFile < StringIO begin uri = URI.parse(path) bytes_uploaded = upload(devid, uri) - @devid, @uri = devid, uri - return bytes_uploaded - rescue NonRetryableError - raise - rescue => e + return create_close(devid, uri, bytes_uploaded) + rescue SystemCallError, RetryableError => e errors ||= [] errors << "#{path} - #{e.message} (#{e.class})" end @@ -188,4 +203,29 @@ class MogileFS::HTTPFile < StringIO raise NoStorageNodesError, "all paths failed with PUT: #{errors.join(', ')}", [] end + + def create_close(devid, uri, bytes_uploaded) + args = { + :fid => @opts[:fid], + :devid => devid, + :key => @opts[:key], + :domain => @opts[:domain], + :size => bytes_uploaded, + :path => uri.to_s, + } + if @md5 + args[:checksum] = "MD5:#{@md5.hexdigest}" + elsif String === @opts[:content_md5] + hex = @opts[:content_md5].unpack('m')[0].unpack('H*')[0] + args[:checksum] = "MD5:#{hex}" + end + args[:checksumverify] = 1 if @opts[:checksumverify] + @opts[:backend].create_close(args) + bytes_uploaded + end + + def close + commit + super + end end diff --git a/lib/mogilefs/mogilefs.rb b/lib/mogilefs/mogilefs.rb index 40ee1ca..6059f5f 100644 --- a/lib/mogilefs/mogilefs.rb +++ b/lib/mogilefs/mogilefs.rb @@ -129,16 +129,21 @@ class MogileFS::MogileFS < MogileFS::Client # Consider using store_file instead of this method for large files. # This requires a block passed to it and operates like File.open. # This atomically replaces existing data stored as +key+ when - def new_file(key, klass = nil, bytes = 0) # :yields: file + def new_file(key, args = nil, bytes = 0) # :yields: file raise MogileFS::ReadOnlyError if readonly? - opts = { :domain => @domain, :key => key, :multi_dest => 1 } - opts[:class] = klass if klass && klass != "default" + opts = { :key => key, :multi_dest => 1 } + case args + when Hash + opts[:domain] = args[:domain] + klass = args[:class] and "default" != klass and opts[:class] = klass + when String + opts[:class] = args if "default" != args + end + opts[:domain] ||= @domain res = @backend.create_open(opts) dests = if dev_count = res['dev_count'] # multi_dest succeeded - (1..dev_count.to_i).map do |i| - [res["devid_#{i}"], res["path_#{i}"]] - end + (1..dev_count.to_i).map { |i| [res["devid_#{i}"], res["path_#{i}"]] } else # single destination returned # 0x0040: d0e4 4f4b 2064 6576 6964 3d31 2666 6964 ..OK.devid=1&fid # 0x0050: 3d33 2670 6174 683d 6874 7470 3a2f 2f31 =3&path=http://1 @@ -149,19 +154,20 @@ class MogileFS::MogileFS < MogileFS::Client [[res['devid'], res['path']]] end + opts.merge!(args) if Hash === args + opts[:backend] = @backend + opts[:fid] = res['fid'] + case (dests[0][1] rescue nil) - when /^http:\/\// then - http_file = MogileFS::HTTPFile.new(dests, bytes) - yield http_file - rv = http_file.commit - @backend.create_close(:fid => res['fid'], - :devid => http_file.devid, - :domain => @domain, - :key => key, - :path => http_file.uri.to_s, - :size => rv) - rv - when nil, '' then + when %r{\Ahttp://} + http_file = MogileFS::HTTPFile.new(dests, opts) + if block_given? + yield http_file + return http_file.commit # calls create_close + else + return http_file + end + when nil, '' raise MogileFS::EmptyPathError, "Empty path for mogile upload res=#{res.inspect}" else diff --git a/test/test_mogilefs_integration.rb b/test/test_mogilefs_integration.rb index e7ca642..8b5e3c1 100644 --- a/test/test_mogilefs_integration.rb +++ b/test/test_mogilefs_integration.rb @@ -65,6 +65,9 @@ class TestMogileFSIntegration < TestMogIntegration def tmp.size 666 end + def tmp.read(len, buf = "") + raise Errno::EIO + end assert_raises(MogileFS::HTTPFile::NonRetryableError) do @client.store_file("non_rewindable", nil, tmp) @@ -169,4 +172,45 @@ class TestMogileFSIntegration < TestMogIntegration end assert_equal count, seen.size end if ENV["TEST_EXPENSIVE"] + + def test_new_file_no_block + rv = @client.new_file("no_block") + assert_nothing_raised { rv.write "HELLO" } + assert_nil rv.close + assert_equal "HELLO", @client.get_file_data("no_block") + end + + def test_new_file_known_content_length + rv = @client.new_file("a", :content_length => 5) + assert_nothing_raised { rv.write "HELLO" } + assert_nil rv.close + assert_equal "HELLO", @client.get_file_data("a") + + rv = @client.new_file("a", :content_length => 6) + assert_nothing_raised { rv.write "GOOD" } + assert_raises(MogileFS::SizeMismatchError) { rv.close } + assert_equal "HELLO", @client.get_file_data("a") + end + + def test_new_file_content_md5 + r, w = IO.pipe + b64digest = [ Digest::MD5.digest("HELLO") ].pack('m').strip + rv = @client.new_file("a", :content_md5 => b64digest, :content_length => 5) + assert_nothing_raised { rv.write "HELLO" } + assert_nil rv.close + assert_equal "HELLO", @client.get_file_data("a") + + assert_nothing_raised { w.write "HIHI"; w.close } + assert_raises(ArgumentError) do + @client.new_file("a", :content_md5 => b64digest) { |f| f.big_io = r } + end + assert_equal "HELLO", @client.get_file_data("a") + + assert_nothing_raised do + @client.new_file("a", :content_md5 => :trailer) { |f| f.big_io = r } + end + assert_equal "HIHI", @client.get_file_data("a") + ensure + r.close if r + end end -- 2.11.4.GIT