From 60c22ca86f8fcea8b51c90d5cb759cc2f29cac59 Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Wed, 12 Jan 2011 17:49:32 -0800 Subject: [PATCH] refactor backend socket/connection handling We now take steps to ensure the socket we wrote to is always the same as the socket we'll read from. And if a read ever fails we'll close the socket to avoid having responses. We've also switched from Socket#send to IO#write as IO#write guarantees write-in-full behavior as long as the socket is alive (MogileFS queries are small) The public MogileFS::Backend#shutdown method is now thread-safe, too. --- lib/mogilefs/backend.rb | 71 ++++++++++++++++++++++++++----------------------- test/test_backend.rb | 23 +++------------- 2 files changed, 40 insertions(+), 54 deletions(-) diff --git a/lib/mogilefs/backend.rb b/lib/mogilefs/backend.rb index fe5949c..80f714f 100644 --- a/lib/mogilefs/backend.rb +++ b/lib/mogilefs/backend.rb @@ -75,10 +75,7 @@ class MogileFS::Backend # Closes this backend's socket. def shutdown - if @socket - @socket.close rescue nil # ignore errors - @socket = nil - end + @mutex.synchronize { shutdown_unlocked } end # MogileFS::MogileFS commands @@ -148,29 +145,44 @@ class MogileFS::Backend private unless defined? $TESTING + # record-separator for mogilefsd responses, update this if the protocol + # changes + RS = "\n" + + def shutdown_unlocked # :nodoc: + if @socket + @socket.close rescue nil # ignore errors + @socket = nil + end + end + ## # Performs the +cmd+ request with +args+. def do_request(cmd, args) + response = nil + request = make_request cmd, args @mutex.synchronize do - request = make_request cmd, args - begin - bytes_sent = socket.send request, 0 - rescue SystemCallError - shutdown - raise MogileFS::UnreachableBackendError - end - - unless bytes_sent == request.length then - raise MogileFS::RequestTruncatedError, - "request truncated (sent #{bytes_sent} expected #{request.length})" + io = socket + begin + bytes_sent = io.write request + bytes_sent == request.size or + raise MogileFS::RequestTruncatedError, + "request truncated (sent #{bytes_sent} expected #{request.size})" + rescue SystemCallError + raise MogileFS::UnreachableBackendError + end + + readable?(io) + response = io.gets(RS) and return parse_response(response) + ensure + # we DO NOT want the response we timed out waiting for, to crop up later + # on, on the same socket, intersperesed with a subsequent request! + # we close the socket if it times out like this + response or shutdown_unlocked end - - readable? - - parse_response(socket.gets("\n")) - end + end # @mutex.synchronize end ## @@ -197,7 +209,6 @@ class MogileFS::Backend @lasterr = $1 @lasterrstr = $2 ? url_unescape($2) : nil raise error(@lasterr), @lasterrstr - return nil end return url_decode($1) if line =~ /^OK\s+\d*\s*(\S*)/ @@ -209,26 +220,18 @@ class MogileFS::Backend ## # Raises if the socket does not become readable in +@timeout+ seconds. - def readable? + def readable?(io = @socket) timeleft = @timeout peer = nil loop do t0 = Time.now - found = IO.select([socket], nil, nil, timeleft) + found = IO.select([io], nil, nil, timeleft) return true if found && found[0] timeleft -= (Time.now - t0) + timeleft >= 0 and next + peer = io ? "#{io.mogilefs_peername} " : nil - if timeleft < 0 - peer = @socket ? "#{@socket.mogilefs_peername} " : nil - - # we DO NOT want the response we timed out waiting for, to crop up later - # on, on the same socket, intersperesed with a subsequent request! so, - # we close the socket if it times out like this - shutdown - raise MogileFS::UnreadableSocketError, "#{peer}never became readable" - break - end - shutdown + raise MogileFS::UnreadableSocketError, "#{peer}never became readable" end false end diff --git a/test/test_backend.rb b/test/test_backend.rb index 01f0fff..7858af6 100644 --- a/test/test_backend.rb +++ b/test/test_backend.rb @@ -58,7 +58,7 @@ class TestBackend < Test::Unit::TestCase socket_request = '' socket = Object.new def socket.closed?() false end - def socket.send(request, flags) raise SystemCallError, 'dummy' end + def socket.write(request) raise SystemCallError, 'dummy' end @backend.instance_variable_set '@socket', socket @@ -93,7 +93,7 @@ class TestBackend < Test::Unit::TestCase socket_request = '' socket = Object.new def socket.closed?() false end - def socket.send(request, flags) return request.length - 1 end + def socket.write(request) return request.length - 1 end @backend.instance_variable_set '@socket', socket @@ -140,29 +140,12 @@ class TestBackend < Test::Unit::TestCase assert_equal 'totally suck', @backend.lasterrstr end - def test_readable_eh_readable - accept = Tempfile.new('accept') - tmp = TempServer.new(Proc.new do |serv, port| - client, client_addr = serv.accept - client.sync = true - accept.syswrite('.') - client.send('.', 0) - sleep - end) - - @backend = MogileFS::Backend.new :hosts => [ "127.0.0.1:#{tmp.port}" ] - assert_equal true, @backend.readable? - assert_equal 1, accept.stat.size - ensure - TempServer.destroy_all! - end - def test_readable_eh_not_readable tmp = TempServer.new(Proc.new { |serv,port| serv.accept; sleep }) @backend = MogileFS::Backend.new(:hosts => [ "127.0.0.1:#{tmp.port}" ], :timeout => 0.5) begin - @backend.readable? + @backend.do_request 'foo', {} rescue MogileFS::UnreadableSocketError => e assert_equal "127.0.0.1:#{tmp.port} never became readable", e.message rescue Exception => err -- 2.11.4.GIT