From 4a5794447a682b8034b2381054df28bd6296c4ee Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Mon, 22 Dec 2008 18:28:27 -0800 Subject: [PATCH] Replace TCPSocket + timeout code with Socket + IO.select This removes the dependency on unsafe methods used in the Timeout class. Charles makes some good points here: http://blog.headius.com/2008/02/rubys-threadraise-threadkill-timeoutrb.html And even matz agrees: http://blade.nagaokaut.ac.jp/cgi-bin/scat.rb/ruby/ruby-talk/294446 Of course, I strongly dislike any unnecessary use of threads, and implementations using native threads to do timeouts makes me even more uncomfortable. --- lib/mogilefs.rb | 2 -- lib/mogilefs/backend.rb | 11 +++--- lib/mogilefs/httpfile.rb | 4 +-- lib/mogilefs/mogilefs.rb | 32 ++++++----------- lib/mogilefs/util.rb | 90 +++++++++++++++++++++++++++++++++++++++++++----- 5 files changed, 98 insertions(+), 41 deletions(-) diff --git a/lib/mogilefs.rb b/lib/mogilefs.rb index 6a568b0..1778497 100644 --- a/lib/mogilefs.rb +++ b/lib/mogilefs.rb @@ -30,8 +30,6 @@ module MogileFS end -require 'socket' - require 'mogilefs/backend' require 'mogilefs/nfsfile' require 'mogilefs/httpfile' diff --git a/lib/mogilefs/backend.rb b/lib/mogilefs/backend.rb index 6fc52fb..9618f27 100644 --- a/lib/mogilefs/backend.rb +++ b/lib/mogilefs/backend.rb @@ -1,6 +1,5 @@ -require 'socket' -require 'thread' require 'mogilefs' +require 'mogilefs/util' ## # MogileFS::Backend communicates with the MogileFS trackers. @@ -146,10 +145,10 @@ class MogileFS::Backend private unless defined? $TESTING ## - # Returns a new TCPSocket connected to +port+ on +host+. + # Returns a new Socket (TCP) connected to +port+ on +host+. def connect_to(host, port) - return TCPSocket.new(host, port) + Socket.mogilefs_new(host, port, @timeout) end ## @@ -223,7 +222,7 @@ class MogileFS::Backend timeleft -= (Time.now - t0) if timeleft < 0 - peer = @socket ? "#{@socket.peeraddr[3]}:#{@socket.peeraddr[1]} " : nil + peer = @socket ? "#{@socket.mogilefs_peername} " : nil # we DO NOT want the response we timed out waiting for, to crop up later # on, on the same socket, intersperesed with a subsequent request! so, @@ -250,7 +249,7 @@ class MogileFS::Backend begin @socket = connect_to(*host.split(':')) - rescue SystemCallError + rescue SystemCallError, MogileFS::Timeout @dead[host] = now next end diff --git a/lib/mogilefs/httpfile.rb b/lib/mogilefs/httpfile.rb index 0833481..2ab50a1 100644 --- a/lib/mogilefs/httpfile.rb +++ b/lib/mogilefs/httpfile.rb @@ -1,5 +1,3 @@ -require 'fcntl' -require 'socket' require 'stringio' require 'uri' require 'mogilefs/backend' @@ -144,7 +142,7 @@ class MogileFS::HTTPFile < StringIO raise NoStorageNodesError if @path.nil? end - @socket = TCPSocket.new @path.host, @path.port + @socket = Socket.mogilefs_new @path.host, @path.port end def next_path diff --git a/lib/mogilefs/mogilefs.rb b/lib/mogilefs/mogilefs.rb index 2901213..20756f9 100644 --- a/lib/mogilefs/mogilefs.rb +++ b/lib/mogilefs/mogilefs.rb @@ -1,16 +1,8 @@ -require 'socket' -require 'timeout' - require 'mogilefs/client' require 'mogilefs/nfsfile' require 'mogilefs/util' ## -# Timeout error class. - -class MogileFS::Timeout < Timeout::Error; end - -## # MogileFS File manipulation client. class MogileFS::MogileFS < MogileFS::Client @@ -226,12 +218,11 @@ class MogileFS::MogileFS < MogileFS::Client when /^http:\/\// then begin url = URI.parse path + s = Socket.mogilefs_new_request(url.host, url.port, + "HEAD #{url.request_uri} HTTP/1.0\r\n\r\n", + @get_file_data_timeout) + res = s.recv(4096, 0) - res = timeout @get_file_data_timeout, MogileFS::Timeout do - s = TCPSocket.new(url.host, url.port) - s.syswrite("HEAD #{url.request_uri} HTTP/1.0\r\n\r\n") - s.sysread(4096) - end if cl = /^Content-Length:\s*(\d+)/i.match(res) return cl[1].to_i end @@ -281,15 +272,12 @@ class MogileFS::MogileFS < MogileFS::Client protected def http_get_sock(uri) - sock = nil - timeout @get_file_data_timeout, MogileFS::Timeout do - sock = TCPSocket.new(uri.host, uri.port) - sock.sync = true - sock.syswrite("GET #{uri.request_uri} HTTP/1.0\r\n\r\n") - buf = sock.recv(4096, Socket::MSG_PEEK) - head, body = buf.split(/\r\n\r\n/, 2) - head = sock.recv(head.size + 4) - end + sock = Socket.mogilefs_new_request(uri.host, uri.port, + "GET #{uri.request_uri} HTTP/1.0\r\n\r\n", + @get_file_data_timeout) + buf = sock.recv(4096, Socket::MSG_PEEK) + head, body = buf.split(/\r\n\r\n/, 2) + head = sock.recv(head.size + 4, 0) sock end # def http_get_sock diff --git a/lib/mogilefs/util.rb b/lib/mogilefs/util.rb index e1e2ee5..7a10ef3 100644 --- a/lib/mogilefs/util.rb +++ b/lib/mogilefs/util.rb @@ -1,3 +1,6 @@ +require 'mogilefs' +require 'socket' + module MogileFS::Util CHUNK_SIZE = 65536 @@ -48,14 +51,8 @@ module MogileFS::Util # first, we asynchronously connect to all of them uris.each do |uri| - sock = Socket.new(Socket::AF_INET, Socket::SOCK_STREAM, 0) - sock.fcntl(Fcntl::F_SETFL, sock.fcntl(Fcntl::F_GETFL) | Fcntl::O_NONBLOCK) - begin - sock.connect(Socket.pack_sockaddr_in(uri.port, uri.host)) - uri_socks[sock] = uri - rescue - sock.close rescue nil - end + sock = Socket.mogilefs_new_nonblock(uri.host, uri.port) rescue next + uri_socks[sock] = uri end # wait for at least one of them to finish connecting and send @@ -119,3 +116,80 @@ module MogileFS::Util end end + +require 'timeout' +## +# Timeout error class. Subclassing it from Timeout::Error is the only +# reason we require the 'timeout' module, otherwise that module is +# broken and worthless to us. +class MogileFS::Timeout < Timeout::Error; end + +class Socket + attr_accessor :mogilefs_addr, :mogilefs_connected + + # Socket lacks peeraddr method of the IPSocket/TCPSocket classes + def mogilefs_peername + Socket.unpack_sockaddr_in(getpeername).reverse.map {|x| x.to_s }.join(':') + end + + def mogilefs_init(host = nil, port = nil) + return true if defined?(@mogilefs_connected) + + @mogilefs_addr = Socket.sockaddr_in(port, host).freeze if port && host + + begin + connect_nonblock(@mogilefs_addr) + @mogilefs_connected = true + rescue Errno::EINPROGRESS + nil + rescue Errno::EISCONN + @mogilefs_connected = true + end + end + + class << self + + # Creates a new (TCP) Socket and initiates (but does not wait for) the + # connection + def mogilefs_new_nonblock(host, port) + sock = Socket.new(Socket::AF_INET, Socket::SOCK_STREAM, 0) + sock.sync = true + sock.mogilefs_init(host, port) + sock + end + + # Like TCPSocket.new(host, port), but with an explicit timeout + # (and we don't care for local address/port we're binding to). + # This raises MogileFS::Timeout if timeout expires + def mogilefs_new(host, port, timeout = 5.0) + sock = mogilefs_new_nonblock(host, port) or return sock + + while timeout > 0 + t0 = Time.now + r = IO.select(nil, [sock], nil, timeout) + return sock if r && r[1] && sock.mogilefs_init + timeout -= (Time.now - t0) + end + + sock.close rescue nil + raise MogileFS::Timeout, 'socket write timeout' + end + + # Makes a request on a new TCP Socket and returns with a readble socket + # within the given timeout. + # This raises MogileFS::Timeout if timeout expires + def mogilefs_new_request(host, port, request, timeout = 5.0) + t0 = Time.now + sock = mogilefs_new(host, port, timeout) + sock.syswrite(request) + timeout -= (Time.now - t0) + raise MogileFS::Timeout, 'socket read timeout' if timeout < 0 + r = IO.select([sock], nil, nil, timeout) + return sock if r && r[0] + raise MogileFS::Timeout, 'socket read timeout' + end + + end + +end + -- 2.11.4.GIT