From 93242dbc664409e8bd78f021ba7b527f340fa1d7 Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Wed, 2 Nov 2011 23:20:58 +0000 Subject: [PATCH] remove sysrwloop for copy_stream We're trying to use as much as we can from Ruby 1.9 --- bin/mog | 2 +- lib/mogilefs/mogilefs.rb | 6 +- lib/mogilefs/socket_common.rb | 4 + lib/mogilefs/util.rb | 171 ++++++++++++++---------------------------- test/test_mogilefs.rb | 2 +- 5 files changed, 66 insertions(+), 119 deletions(-) rewrite lib/mogilefs/util.rb (68%) diff --git a/bin/mog b/bin/mog index 932e766..c62d818 100755 --- a/bin/mog +++ b/bin/mog @@ -129,7 +129,7 @@ begin if (!cat[:raw] && key =~ /^_big_info:/) mg.bigfile_write(key, STDOUT, {:verify => true}) else - mg.get_file_data(key) { |fp| sysrwloop(fp, STDOUT) } + mg.get_file_data(key) { |fp| copy_stream(fp, STDOUT) } end end when 'ls' diff --git a/lib/mogilefs/mogilefs.rb b/lib/mogilefs/mogilefs.rb index 6c4220b..f064d43 100644 --- a/lib/mogilefs/mogilefs.rb +++ b/lib/mogilefs/mogilefs.rb @@ -139,15 +139,15 @@ class MogileFS::MogileFS < MogileFS::Client raise MogileFS::ReadOnlyError if readonly? new_file key, klass do |mfp| - if file.respond_to? :sysread then - sysrwloop(file, mfp) + if file.respond_to?(:read) + copy_stream(file, mfp) else size = File.size(file) if size > 0x10000 # Bigass file, handle differently mfp.big_io = file size else - File.open(file, "rb") { |fp| sysrwloop(fp, mfp) } + File.open(file, "rb") { |fp| copy_stream(fp, mfp) } end end end diff --git a/lib/mogilefs/socket_common.rb b/lib/mogilefs/socket_common.rb index 567e715..e724e8f 100644 --- a/lib/mogilefs/socket_common.rb +++ b/lib/mogilefs/socket_common.rb @@ -51,4 +51,8 @@ module MogileFS::SocketCommon buf # full read end + + def readpartial(size, buf = "", timeout = 5) + timed_read(size, buf, timeout) or raise EOFError, "end of file reached" + end end diff --git a/lib/mogilefs/util.rb b/lib/mogilefs/util.rb dissimilarity index 68% index 8f315aa..ece5645 100644 --- a/lib/mogilefs/util.rb +++ b/lib/mogilefs/util.rb @@ -1,114 +1,57 @@ -# -*- encoding: binary -*- -require 'mogilefs' -require 'socket' - -module MogileFS::Util - - CHUNK_SIZE = 65536 - - # TODO: cleanup - if IO.respond_to?(:copy_stream) - def copy_stream(src, dst) - IO.copy_stream(src, dst) - end - else - def copy_stream(src, dst) - sysrwloop(src, dst) - end - end - - # for copying large files while avoiding GC thrashing as much as possible - # writes the contents of io_rd into io_wr, running through filter if - # it is a Proc object. The filter proc must respond to a string - # argument (and return a string) and to nil (possibly returning a - # string or nil). This can be used to filter I/O through an - # Zlib::Inflate or Digest::MD5 object - def sysrwloop(io_rd, io_wr, filter = nil) - copied = 0 - # avoid making sysread repeatedly allocate a new String - # This is not well-documented, but both read/sysread can take - # an optional second argument to use as the buffer to avoid - # GC overhead of creating new strings in a loop - buf = ' ' * CHUNK_SIZE # preallocate to avoid GC thrashing - io_rd.flush rescue nil # flush may be needed for sockets/pipes, be safe - io_wr.flush - io_rd.sync = io_wr.sync = true - loop do - b = begin - io_rd.sysread(CHUNK_SIZE, buf) - rescue Errno::EAGAIN, Errno::EINTR - IO.select([io_rd], nil, nil, nil) - retry - rescue EOFError - break - end - b = filter.call(b) if filter - copied += syswrite_full(io_wr, b) - end - - # filter must take nil as a possible argument to indicate EOF - if filter - b = filter.call(nil) - copied += syswrite_full(io_wr, b) if b && b.length > 0 - end - copied - end # sysrwloop - - # writes the contents of buf to io_wr in full w/o blocking - def syswrite_full(io_wr, buf, timeout = nil) - written = 0 - loop do - begin - w = io_wr.syswrite(buf) - written += w - return written if w == buf.size - buf = buf[w..-1] - - # a short syswrite means the next syswrite will likely block - # inside the interpreter. so force an IO.select on it so we can - # timeout there if one was specified - raise Errno::EAGAIN if timeout - rescue Errno::EAGAIN, Errno::EINTR - t0 = Time.now if timeout - IO.select(nil, [io_wr], nil, timeout) - if timeout && ((timeout -= (Time.now - t0)) < 0) - raise MogileFS::Timeout, 'syswrite_full timeout' - end - end - end - # should never get here - end - - class StoreContent < Proc - def initialize(total_size, &writer_proc) - @total_size = total_size - super(&writer_proc) - end - def length - @total_size - end - end - -end - -require 'timeout' -## -# Timeout error class. Subclassing it from Timeout::Error is the only -# reason we require the 'timeout' module, otherwise that module is -# broken and worthless to us. -class MogileFS::Timeout < Timeout::Error; end - -class Socket - attr_accessor :mogilefs_size - - def mogilefs_init(host = nil, port = nil) - mogilefs_addr = Socket.sockaddr_in(port, host) - - begin - connect_nonblock(mogilefs_addr) - rescue Errno::EINPROGRESS - rescue Errno::EISCONN - true - end - end -end +# -*- encoding: binary -*- +require 'mogilefs' +require 'socket' + +module MogileFS::Util + # TODO: cleanup + if IO.respond_to?(:copy_stream) + def copy_stream(src, dst) + IO.copy_stream(src, dst) + end + else + def copy_stream(src, dst) + buf = "" + written = 0 + begin + src.readpartial(0x4000, buf) + written += dst.write(buf) + rescue EOFError + break + end while true + written + end + end + + class StoreContent < Proc + def initialize(total_size, &writer_proc) + @total_size = total_size + super(&writer_proc) + end + def length + @total_size + end + end + +end + +require 'timeout' +## +# Timeout error class. Subclassing it from Timeout::Error is the only +# reason we require the 'timeout' module, otherwise that module is +# broken and worthless to us. +class MogileFS::Timeout < Timeout::Error; end + +class Socket + attr_accessor :mogilefs_size + + def mogilefs_init(host = nil, port = nil) + mogilefs_addr = Socket.sockaddr_in(port, host) + + begin + connect_nonblock(mogilefs_addr) + rescue Errno::EINPROGRESS + rescue Errno::EISCONN + true + end + end +end diff --git a/test/test_mogilefs.rb b/test/test_mogilefs.rb index 1749a97..7fe53e2 100644 --- a/test/test_mogilefs.rb +++ b/test/test_mogilefs.rb @@ -103,7 +103,7 @@ class TestMogileFS__MogileFS < TestMogileFS readed = client.recv(4096, 0) assert(readed =~ \ %r{\AGET /dev[12]/0/000/000/0000000062\.fid HTTP/1.[01]\r\n\r\n\Z}) - sysrwloop(tmpfp, client) + copy_stream(tmpfp, client) client.close exit 0 end -- 2.11.4.GIT