From e07b59855a61a4053810f72e5a9f2000d54964e5 Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Tue, 30 Dec 2008 14:39:31 -0800 Subject: [PATCH] new MogileFS::Network module, move verify_uris there This will include networks and optimal URI selection support --- Manifest.txt | 1 + lib/mogilefs/network.rb | 58 +++++++++++++++++++++++++++++++++ lib/mogilefs/util.rb | 53 ------------------------------ test/{test_utils.rb => test_network.rb} | 6 ++-- 4 files changed, 62 insertions(+), 56 deletions(-) create mode 100644 lib/mogilefs/network.rb rename test/{test_utils.rb => test_network.rb} (86%) diff --git a/Manifest.txt b/Manifest.txt index 9dd4500..930cf2e 100644 --- a/Manifest.txt +++ b/Manifest.txt @@ -13,6 +13,7 @@ lib/mogilefs/mogilefs.rb lib/mogilefs/mysql.rb lib/mogilefs/pool.rb lib/mogilefs/util.rb +lib/mogilefs/network.rb test/setup.rb test/test_admin.rb test/test_backend.rb diff --git a/lib/mogilefs/network.rb b/lib/mogilefs/network.rb new file mode 100644 index 0000000..a3c3bd3 --- /dev/null +++ b/lib/mogilefs/network.rb @@ -0,0 +1,58 @@ +require 'mogilefs' +require 'socket' + +module MogileFS::Network + # given an array of URIs, verify that at least one of them is accessible + # with the expected HTTP code within the timeout period (in seconds). + def verify_uris(uris = [], expect = '200', timeout = 2.00) + uri_socks = {} + ok_uris = [] + sockets = [] + + # first, we asynchronously connect to all of them + uris.each do |uri| + sock = Socket.mogilefs_new_nonblock(uri.host, uri.port) rescue next + uri_socks[sock] = uri + end + + # wait for at least one of them to finish connecting and send + # HTTP requests to the connected ones + begin + t0 = Time.now + r = IO.select(nil, uri_socks.keys, nil, timeout > 0 ? timeout : 0) + timeout -= (Time.now - t0) + break unless r && r[1] + r[1].each do |sock| + begin + sock.syswrite "HEAD #{uri_socks[sock].request_uri} HTTP/1.0\r\n\r\n" + sockets << sock + rescue + sock.close rescue nil + end + end + end until sockets[0] || timeout < 0 + + # Await a response from the sockets we had written to, we only need one + # valid response, but we'll take more if they return simultaneously + if sockets[0] + begin + t0 = Time.now + r = IO.select(sockets, nil, nil, timeout > 0 ? timeout : 0) + timeout -= (Time.now - t0) + break unless r && r[0] + r[0].each do |sock| + buf = sock.recv_nonblock(128, Socket::MSG_PEEK) rescue next + if buf && /\AHTTP\/[\d\.]+ #{expect} / =~ buf + ok_uris << uri_socks.delete(sock) + sock.close rescue nil + end + end + end + end until ok_uris[0] || timeout < 0 + + ok_uris + ensure + uri_socks.keys.each { |sock| sock.close rescue nil } + end + +end # module MogileFS::Network diff --git a/lib/mogilefs/util.rb b/lib/mogilefs/util.rb index 32add14..1ba36b6 100644 --- a/lib/mogilefs/util.rb +++ b/lib/mogilefs/util.rb @@ -42,59 +42,6 @@ module MogileFS::Util copied end # sysrwloop - # given an array of URIs, verify that at least one of them is accessible - # with the expected HTTP code within the timeout period (in seconds). - def verify_uris(uris = [], expect = '200', timeout = 2.00) - uri_socks = {} - ok_uris = [] - sockets = [] - - # first, we asynchronously connect to all of them - uris.each do |uri| - sock = Socket.mogilefs_new_nonblock(uri.host, uri.port) rescue next - uri_socks[sock] = uri - end - - # wait for at least one of them to finish connecting and send - # HTTP requests to the connected ones - begin - t0 = Time.now - r = IO.select(nil, uri_socks.keys, nil, timeout > 0 ? timeout : 0) - timeout -= (Time.now - t0) - break unless r && r[1] - r[1].each do |sock| - begin - sock.syswrite "HEAD #{uri_socks[sock].request_uri} HTTP/1.0\r\n\r\n" - sockets << sock - rescue - sock.close rescue nil - end - end - end until sockets[0] || timeout < 0 - - # Await a response from the sockets we had written to, we only need one - # valid response, but we'll take more if they return simultaneously - if sockets[0] - begin - t0 = Time.now - r = IO.select(sockets, nil, nil, timeout > 0 ? timeout : 0) - timeout -= (Time.now - t0) - break unless r && r[0] - r[0].each do |sock| - buf = sock.recv_nonblock(128, Socket::MSG_PEEK) rescue next - if buf && /\AHTTP\/[\d\.]+ #{expect} / =~ buf - ok_uris << uri_socks.delete(sock) - sock.close rescue nil - end - end - end - end until ok_uris[0] || timeout < 0 - - ok_uris - ensure - uri_socks.keys.each { |sock| sock.close rescue nil } - end - private # writes the contents of buf to io_wr in full w/o blocking diff --git a/test/test_utils.rb b/test/test_network.rb similarity index 86% rename from test/test_utils.rb rename to test/test_network.rb index 146d420..f479ca0 100644 --- a/test/test_utils.rb +++ b/test/test_network.rb @@ -1,9 +1,9 @@ require 'test/setup' require 'mogilefs' -require 'mogilefs/util' +require 'mogilefs/network' -class TestUtils < Test::Unit::TestCase - include MogileFS::Util +class TestNetwork < Test::Unit::TestCase + include MogileFS::Network def test_verify_uris good = TempServer.new(Proc.new do |serv,port| -- 2.11.4.GIT