1 # -*- encoding: binary -*-
10 if IO.respond_to?(:copy_stream)
11 def copy_stream(src, dst)
12 IO.copy_stream(src, dst)
15 def copy_stream(src, dst)
20 # for copying large files while avoiding GC thrashing as much as possible
21 # writes the contents of io_rd into io_wr, running through filter if
22 # it is a Proc object. The filter proc must respond to a string
23 # argument (and return a string) and to nil (possibly returning a
24 # string or nil). This can be used to filter I/O through an
25 # Zlib::Inflate or Digest::MD5 object
26 def sysrwloop(io_rd, io_wr, filter = nil)
28 # avoid making sysread repeatedly allocate a new String
29 # This is not well-documented, but both read/sysread can take
30 # an optional second argument to use as the buffer to avoid
31 # GC overhead of creating new strings in a loop
32 buf = ' ' * CHUNK_SIZE # preallocate to avoid GC thrashing
33 io_rd.flush rescue nil # flush may be needed for sockets/pipes, be safe
35 io_rd.sync = io_wr.sync = true
38 io_rd.sysread(CHUNK_SIZE, buf)
39 rescue Errno::EAGAIN, Errno::EINTR
40 IO.select([io_rd], nil, nil, nil)
45 b = filter.call(b) if filter
46 copied += syswrite_full(io_wr, b)
49 # filter must take nil as a possible argument to indicate EOF
52 copied += syswrite_full(io_wr, b) if b && b.length > 0
57 # writes the contents of buf to io_wr in full w/o blocking
58 def syswrite_full(io_wr, buf, timeout = nil)
62 w = io_wr.syswrite(buf)
64 return written if w == buf.size
67 # a short syswrite means the next syswrite will likely block
68 # inside the interpreter. so force an IO.select on it so we can
69 # timeout there if one was specified
70 raise Errno::EAGAIN if timeout
71 rescue Errno::EAGAIN, Errno::EINTR
72 t0 = Time.now if timeout
73 IO.select(nil, [io_wr], nil, timeout)
74 if timeout && ((timeout -= (Time.now - t0)) < 0)
75 raise MogileFS::Timeout, 'syswrite_full timeout'
79 # should never get here
82 class StoreContent < Proc
83 def initialize(total_size, &writer_proc)
84 @total_size = total_size
96 # Timeout error class. Subclassing it from Timeout::Error is the only
97 # reason we require the 'timeout' module, otherwise that module is
98 # broken and worthless to us.
99 class MogileFS::Timeout < Timeout::Error; end
102 attr_accessor :mogilefs_size
104 def mogilefs_init(host = nil, port = nil)
105 mogilefs_addr = Socket.sockaddr_in(port, host)
108 connect_nonblock(mogilefs_addr)
109 rescue Errno::EINPROGRESS
110 rescue Errno::EISCONN