From 21fc20b7d798c3eab6155b24dcb58c95b53ef856 Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Sun, 20 Nov 2011 06:29:26 +0000 Subject: [PATCH] add mogstored_rack example This should allow any Rack server to become an HTTP server for mogstored. When using one of Unicorn/Rainbows!/Zbatery, the Content-MD5 HTTP Trailer will be supported. Otherwise, Content-MD5 can always be supported as a regular HTTP header (at the cost of requiring the client to read whatever they upload twice). --- Manifest.txt | 4 + bin/mog | 5 ++ examples/mogstored_rack.rb | 189 +++++++++++++++++++++++++++++++++++++++ lib/mogilefs/chunker.rb | 13 ++- lib/mogilefs/http_file.rb | 18 +++- test/fresh.rb | 10 ++- test/test_mogstored_rack.rb | 89 ++++++++++++++++++ test/test_unit_mogstored_rack.rb | 72 +++++++++++++++ 8 files changed, 391 insertions(+), 9 deletions(-) create mode 100644 examples/mogstored_rack.rb create mode 100644 test/test_mogstored_rack.rb create mode 100644 test/test_unit_mogstored_rack.rb diff --git a/Manifest.txt b/Manifest.txt index 23fe0ff..a5cbb84 100644 --- a/Manifest.txt +++ b/Manifest.txt @@ -11,6 +11,7 @@ README Rakefile TODO bin/mog +examples/mogstored_rack.rb lib/mogilefs.rb lib/mogilefs/admin.rb lib/mogilefs/backend.rb @@ -34,6 +35,7 @@ setup.rb test/.gitignore test/aggregate.rb test/exec.rb +test/fresh.rb test/integration.rb test/setup.rb test/socket_test.rb @@ -50,7 +52,9 @@ test/test_mogilefs_integration_large_pipe.rb test/test_mogilefs_integration_list_keys.rb test/test_mogilefs_socket_kgio.rb test/test_mogilefs_socket_pure.rb +test/test_mogstored_rack.rb test/test_mogtool_bigfile.rb test/test_mysql.rb test/test_pool.rb +test/test_unit_mogstored_rack.rb lib/mogilefs/version.rb diff --git a/bin/mog b/bin/mog index c312790..b511414 100755 --- a/bin/mog +++ b/bin/mog @@ -7,6 +7,11 @@ $stderr.sync = $stdout.sync = true trap('INT') { exit 130 } trap('PIPE') { exit 0 } +if md5_trailer_nodes = ENV["MD5_TRAILER_NODES"] + md5_trailer_nodes.split(/\s*,\s*/).each do |host| + MogileFS::HTTPFile::MD5_TRAILER_NODES[host] = true + end +end # this is to be compatible with config files used by the Perl tools def parse_config_file!(path, overwrite = false) diff --git a/examples/mogstored_rack.rb b/examples/mogstored_rack.rb new file mode 100644 index 0000000..0cdfbfb --- /dev/null +++ b/examples/mogstored_rack.rb @@ -0,0 +1,189 @@ +# -*- encoding: binary -*- +require 'tempfile' +require 'digest/md5' +require 'rack' + +# Rack application for handling HTTP PUT/DELETE/MKCOL operations needed +# for a MogileFS storage server. GET requests are handled by +# Rack::File and Rack::Head _must_ be in the middleware stack for +# mogilefsd fsck to work properly with keepalive. +# +# Usage in rackup config file (config.ru): +# +# require "./mogstored_rack" +# use Rack::Head +# run MogstoredRack.new("/var/mogdata") +class MogstoredRack + class ContentMD5 < Digest::MD5 + def content_md5 + [ digest ].pack("m").strip! + end + end + + def initialize(root, opts = {}) + @root = File.expand_path(root) + @rack_file = (opts[:app] || Rack::File.new(@root)) + @fsync = !! opts[:fsync] + @creat_perms = opts[:creat_perms] || (~File.umask & 0666) + @mkdir_perms = opts[:mkdir_perms] || (~File.umask & 0777) + @reread_verify = !! opts[:reread_verify] + end + + def call(env) + case env["REQUEST_METHOD"] + when "GET", "HEAD" + case env["PATH_INFO"] + when "/" + r(200, "") # MogileFS seems to need this... + else + @rack_file.call(env) + end + when "PUT" + put(env) + when "DELETE" + delete(env) + when "MKCOL" + mkcol(env) + else + r(405, "unsupported method", env) + end + rescue Errno::EPERM, Errno::EACCES => err + r(403, "#{err.message} (#{err.class})", env) + rescue => err + r(500, "#{err.message} (#{err.class})", env) + end + + def mkcol(env) + path = server_path(env) or return r(400) + Dir.mkdir(path, @mkdir_perms) + r(204) + rescue Errno::EEXIST # succeed (204) on race condition + File.directory?(path) ? r(204) : r(409) + end + + def delete(env) + path = server_path(env) or return r(400) + File.exist?(path) or return r(404) + File.directory?(path) ? Dir.rmdir(path) : File.unlink(path) + r(204) + rescue Errno::ENOENT # return 404 on race condition + File.exist?(path) ? r(500) : r(404) + end + + def put(env) + path = server_path(env) or return r(400) + dir = File.dirname(path) + File.directory?(dir) or return r(403) + + Tempfile.open([dir, "#{File.basename(path)}.tmp"]) do |tmp| + tmp = tmp.to_io # delegated method calls are slower + tmp.sync = true + tmp.binmode + buf = "" + received = put_loop(env["rack.input"], tmp, buf) + err = content_md5_fail?(env, received) and return err + if @reread_verify && err = reread_md5_fail?(env, tmp, received, buf) + return err + end + tmp.chmod(@creat_perms) + begin + File.link(tmp.path, path) + rescue Errno::EEXIST + err = rename_overwrite_fail?(tmp.path, path) and return err + end + fsync(dir, tmp) if @fsync + resp = r(201) + resp[1]["X-Received-Content-MD5"] = received + resp + end + end + + def put_loop(src, dst, buf) + md5 = ContentMD5.new + while src.read(0x4000, buf) + md5.update(buf) + dst.write(buf) + end + md5.content_md5 + end + + def server_path(env) + path = env['PATH_INFO'].squeeze('/') + path.split(%r{/}).include?("..") and return false + "#@root#{path}" + end + + # returns a plain-text HTTP response + def r(code, msg = nil, env = nil) + if env && logger = env["rack.logger"] + logger.warn("#{env['REQUEST_METHOD']} #{env['PATH_INFO']} " \ + "#{code} #{msg.inspect}") + end + if Rack::Utils::STATUS_WITH_NO_ENTITY_BODY.include?(code) + [ code, {}, [] ] + else + msg ||= Rack::Utils::HTTP_STATUS_CODES[code] || "" + msg += "\n" if msg.size > 0 + [ code, + { 'Content-Type' => 'text/plain', 'Content-Length' => msg.size.to_s }, + [ msg ] ] + end + end + + # Tries to detect filesystem/disk corruption. + # Unfortunately, posix_fadvise(2)/IO#advise is only advisory and + # can't guarantee we're not just reading the data in the kernel + # page cache. + def reread_md5_fail?(env, tmp, received, buf) + # try to force a reread from the storage device, not cache + tmp.fsync + tmp.rewind + tmp.advise(:dontneed) rescue nil # only in Ruby 1.9.3 and only advisory + + md5 = ContentMD5.new + while tmp.read(0x4000, buf) + md5.update(buf) + end + reread = md5.content_md5 + reread == received and return false # success + r(500, "reread MD5 mismatch\n" \ + "received: #{received}\n" \ + " reread: #{reread}", env) + end + + # Tries to detect network corruption by verifying the client-supplied + # Content-MD5 is correct. It's highly unlikely the MD5 can be corrupted + # in a way that also allows corrupt data to pass through. + # + # The Rainbows!/Unicorn HTTP servers will populate the HTTP_CONTENT_MD5 + # field in +env+ after env["rack.input"] is fully-consumed. Clients + # may also send Content-MD5 as a header and this will still work. + def content_md5_fail?(env, received) + expected = env["HTTP_CONTENT_MD5"] or return false + expected = expected.strip + expected == received and return false # success + r(400, "Content-MD5 mismatch\n" \ + "expected: #{expected}\n" \ + "received: #{received}", env) + end + + def rename_overwrite_fail?(src, dst) + 10.times do + begin + tmp_dst = "#{dst}.#{rand}" + File.link(src, tmp_dst) + rescue Errno::EEXIST + next + end + File.rename(tmp_dst, dst) + return false # success! + end + r(409) + end + + # fsync each and every directory component above us on the same device + def fsync(dir, tmp) + tmp.fsync + File.open(dir) { |io| io.fsync } + end +end diff --git a/lib/mogilefs/chunker.rb b/lib/mogilefs/chunker.rb index 27ff743..f735c68 100644 --- a/lib/mogilefs/chunker.rb +++ b/lib/mogilefs/chunker.rb @@ -1,21 +1,30 @@ # -*- encoding: binary -*- +require "digest/md5" class MogileFS::Chunker CRLF = "\r\n" attr_reader :io - def initialize(io) + def initialize(io, md5) @io = io + @md5 = md5 ? Digest::MD5.new : nil end def write(buf) rv = buf.bytesize @io.write("#{rv.to_s(16)}\r\n") @io.write(buf) + @md5.update(buf) if @md5 @io.write(CRLF) rv end def flush - @io.write("0\r\n\r\n") + if @md5 + content_md5 = [ @md5.digest ].pack('m').strip + warn "Content-MD5: #{content_md5}\r\n" if $DEBUG + @io.write("0\r\nContent-MD5: #{content_md5}\r\n\r\n") + else + @io.write("0\r\n\r\n") + end end end diff --git a/lib/mogilefs/http_file.rb b/lib/mogilefs/http_file.rb index 5ed69a4..1611a5d 100644 --- a/lib/mogilefs/http_file.rb +++ b/lib/mogilefs/http_file.rb @@ -20,6 +20,8 @@ class MogileFS::HTTPFile < StringIO end class NonRetryableError < MogileFS::Error; end + MD5_TRAILER_NODES = {} # :nodoc: # EXPERIMENTAL + ## # The URI this file will be stored to. @@ -45,15 +47,23 @@ class MogileFS::HTTPFile < StringIO end def request_put(sock, uri, file_size, input = nil) + host_with_port = "#{uri.host}:#{uri.port}" + md5 = false + if MD5_TRAILER_NODES[host_with_port] + file_size = nil + md5 = true + end + if file_size sock.write("PUT #{uri.request_uri} HTTP/1.0\r\n" \ "Content-Length: #{file_size}\r\n\r\n") input ? MogileFS.io.copy_stream(@active = input, sock) : yield(sock) else + trailers = md5 ? "Trailer: Content-MD5\r\n" : "" sock.write("PUT #{uri.request_uri} HTTP/1.1\r\n" \ - "Host: #{uri.host}:#{uri.port}\r\n" \ + "Host: #{host_with_port}\r\n#{trailers}" \ "Transfer-Encoding: chunked\r\n\r\n") - tmp = MogileFS::Chunker.new(sock) + tmp = MogileFS::Chunker.new(sock, md5) rv = input ? MogileFS.io.copy_stream(@active = input, tmp) : yield(tmp) tmp.flush rv @@ -105,8 +115,8 @@ class MogileFS::HTTPFile < StringIO file_size = request_put(sock, uri, size, @big_io) end else - sock.write("PUT #{uri.request_uri} HTTP/1.0\r\n" \ - "Content-Length: #{file_size}\r\n\r\n#{string}") + rewind + request_put(sock, uri, file_size, self) end case line = sock.timed_read(23, "") diff --git a/test/fresh.rb b/test/fresh.rb index 217a5c3..9490d34 100644 --- a/test/fresh.rb +++ b/test/fresh.rb @@ -115,14 +115,13 @@ EOF @admin.delete_class(domain, "klassy") rescue nil end - def test_device_file_add + def add_host_device_domain assert_equal [], @admin.get_hosts args = { :ip => @test_host, :port => @mogstored_http_port } args[:status] = "alive" @admin.create_host("me", args) Dir.mkdir("#@docroot/dev1") Dir.mkdir("#@docroot/dev2") - yield_for_monitor_update { @admin.get_hosts.empty? or break } # TODO: allow adding devices via our MogileFS::Admin class @@ -148,7 +147,12 @@ EOF domain = "rbmogtest.#$$" @admin.create_domain(domain) yield_for_monitor_update { @admin.get_domains.include?(domain) and break } - client = MogileFS::MogileFS.new :hosts => @hosts, :domain => domain + @domain = domain + end + + def test_device_file_add + add_host_device_domain + client = MogileFS::MogileFS.new :hosts => @hosts, :domain => @domain r, w = IO.pipe thr = Thread.new do (0..9).each do |i| diff --git a/test/test_mogstored_rack.rb b/test/test_mogstored_rack.rb new file mode 100644 index 0000000..491f372 --- /dev/null +++ b/test/test_mogstored_rack.rb @@ -0,0 +1,89 @@ +# -*- encoding: binary -*- +require "./test/fresh" + +class TestMogstoredRack < Test::Unit::TestCase + include TestFreshSetup + def setup + setup_mogilefs + end + + def test_md5_check + add_host_device_domain + client = MogileFS::MogileFS.new :hosts => @hosts, :domain => @domain + node = "#@test_host:#@mogstored_http_port" + pid = fork do + # not modifying this hash in the same process + MogileFS::HTTPFile::MD5_TRAILER_NODES[node] = true + client.store_content("md5_me", nil, "HELLO WORLD") + end + _, status = Process.waitpid2(pid) + assert status.success?, status.inspect + assert_equal "HELLO WORLD", client.get_file_data("md5_me") + end + + def setup_mogstored + @docroot = Dir.mktmpdir(["mogfresh", "docroot"]) + @mogstored_mgmt = TCPServer.new(@test_host, 0) + @mogstored_http = TCPServer.new(@test_host, 0) + @mogstored_mgmt_port = @mogstored_mgmt.addr[1] + @mogstored_http_port = @mogstored_http.addr[1] + @mogstored_conf = Tempfile.new(["mogstored", "conf"]) + @mogstored_pid = Tempfile.new(["mogstored", "pid"]) + @mogstored_conf.write < 0 + sleep 0.1 + end + end + + def teardown + pid = File.read(@unicorn_pid.path).to_i + Process.kill(:QUIT, pid) if pid > 0 + teardown_mogilefs + puts(@unicorn_stderr.read) if $DEBUG + end +end if `which unicorn`.chomp.size > 0 diff --git a/test/test_unit_mogstored_rack.rb b/test/test_unit_mogstored_rack.rb new file mode 100644 index 0000000..cc99939 --- /dev/null +++ b/test/test_unit_mogstored_rack.rb @@ -0,0 +1,72 @@ +# -*- encoding: binary -*- +require "test/unit" +require "tmpdir" +require "fileutils" +begin + require "./examples/mogstored_rack" +rescue LoadError +end + +class TestUnitMogstoredRack < Test::Unit::TestCase + attr_reader :req + + def setup + @docroot = Dir.mktmpdir(["mogstored_rack", ".docroot"]) + end + + def test_defaults + req = Rack::MockRequest.new(MogstoredRack.new(@docroot)) + all_methods(req) + end + + def test_fsync_true + req = Rack::MockRequest.new(MogstoredRack.new(@docroot, :fsync=>true)) + all_methods(req) + end + + def test_reread_verify + app = MogstoredRack.new(@docroot, :reread_verify=>true) + req = Rack::MockRequest.new(app) + all_methods(req) + end + + def all_methods(req) + assert_equal 200, req.get("/").status + assert ! File.directory?("#@docroot/dev666") + assert_equal 204, req.request("MKCOL", "/dev666").status + assert File.directory?("#@docroot/dev666") + + io = StringIO.new("HELLO") + r = req.request("PUT", "/dev666/666.fid", :input => io) + assert_equal 201, r.status + assert_equal "HELLO", IO.read("#@docroot/dev666/666.fid") + + # invalid MD5 + io = StringIO.new("WORLD") + md5 = [ Digest::MD5.new.digest ].pack("m").strip! + opts = { :input => io, "HTTP_CONTENT_MD5" => md5 } + r = req.request("PUT", "/dev666/666.fid", opts) + assert_equal 400, r.status + assert_equal "HELLO", IO.read("#@docroot/dev666/666.fid") + + # valid MD5 + io = StringIO.new("VALID") + md5 = [ Digest::MD5.digest("VALID") ].pack("m").strip! + opts = { :input => io, "HTTP_CONTENT_MD5" => md5 } + r = req.request("PUT", "/dev666/666.fid", opts) + assert_equal 201, r.status + assert_equal "VALID", IO.read("#@docroot/dev666/666.fid") + + r = req.request("GET", "/dev666/666.fid") + assert_equal 200, r.status + assert_equal "VALID", r.body + + r = req.request("DELETE", "/dev666/666.fid") + assert_equal 204, r.status + assert ! File.exist?("#@docroot/dev666/666.fid") + end + + def teardown + FileUtils.rmtree(@docroot) + end +end if defined?(Rack) -- 2.11.4.GIT