8 # for copying large files while avoiding GC thrashing as much as possible
9 # writes the contents of io_rd into io_wr, running through filter if
10 # it is a Proc object. The filter proc must respond to a string
11 # argument (and return a string) and to nil (possibly returning a
12 # string or nil). This can be used to filter I/O through an
13 # Zlib::Inflate or Digest::MD5 object
14 def sysrwloop(io_rd, io_wr, filter = nil)
16 # avoid making sysread repeatedly allocate a new String
17 # This is not well-documented, but both read/sysread can take
18 # an optional second argument to use as the buffer to avoid
19 # GC overhead of creating new strings in a loop
20 buf = ' ' * CHUNK_SIZE # preallocate to avoid GC thrashing
21 io_rd.flush rescue nil # flush may be needed for sockets/pipes, be safe
23 io_rd.sync = io_wr.sync = true
26 io_rd.sysread(CHUNK_SIZE, buf)
27 rescue Errno::EAGAIN, Errno::EINTR
28 IO.select([io_rd], nil, nil, nil)
33 b = filter.call(b) if filter
34 copied += syswrite_full(io_wr, b)
37 # filter must take nil as a possible argument to indicate EOF
40 copied += syswrite_full(io_wr, b) if b && b.length > 0
45 # writes the contents of buf to io_wr in full w/o blocking
46 def syswrite_full(io_wr, buf, timeout = nil)
50 w = io_wr.syswrite(buf)
52 return written if w == buf.size
55 # a short syswrite means the next syswrite will likely block
56 # inside the interpreter. so force an IO.select on it so we can
57 # timeout there if one was specified
58 raise Errno::EAGAIN if timeout
59 rescue Errno::EAGAIN, Errno::EINTR
60 t0 = Time.now if timeout
61 IO.select(nil, [io_wr], nil, timeout)
62 if timeout && ((timeout -= (Time.now - t0)) < 0)
63 raise MogileFS::Timeout, 'syswrite_full timeout'
67 # should never get here
70 def sysread_full(io_rd, size, timeout = nil, full_timeout = false)
71 tmp = [] # avoid expensive string concatenation with every loop iteration
72 reader = io_rd.method(timeout ? :read_nonblock : :sysread)
75 tmp << reader.call(size)
78 rescue Errno::EAGAIN, Errno::EINTR
80 ready = IO.select([ io_rd ], nil, nil, timeout)
81 timeout -= (Time.now - t0) if full_timeout
82 if ready != [ io_rd ] || timeout < 0
83 raise MogileFS::Timeout, 'sysread_full timeout'
91 class StoreContent < Proc
92 def initialize(total_size, &writer_proc)
93 @total_size = total_size
105 # Timeout error class. Subclassing it from Timeout::Error is the only
106 # reason we require the 'timeout' module, otherwise that module is
107 # broken and worthless to us.
108 class MogileFS::Timeout < Timeout::Error; end
111 attr_accessor :mogilefs_addr, :mogilefs_connected, :mogilefs_size
113 TCP_CORK = 3 if ! defined?(TCP_CORK) && RUBY_PLATFORM =~ /linux/
115 def mogilefs_tcp_cork=(set)
116 if defined?(TCP_CORK)
117 self.setsockopt(SOL_TCP, TCP_CORK, set ? 1 : 0) rescue nil
122 # Socket lacks peeraddr method of the IPSocket/TCPSocket classes
123 def mogilefs_peername
124 Socket.unpack_sockaddr_in(getpeername).reverse.map {|x| x.to_s }.join(':')
127 def mogilefs_init(host = nil, port = nil)
128 return true if defined?(@mogilefs_connected)
130 @mogilefs_addr = Socket.sockaddr_in(port, host).freeze if port && host
133 connect_nonblock(@mogilefs_addr)
134 @mogilefs_connected = true
135 rescue Errno::EINPROGRESS
137 rescue Errno::EISCONN
138 @mogilefs_connected = true
144 # Creates a new (TCP) Socket and initiates (but does not wait for) the
146 def mogilefs_new_nonblock(host, port)
147 sock = Socket.new(Socket::AF_INET, Socket::SOCK_STREAM, 0)
149 if defined?(Socket::TCP_NODELAY)
150 sock.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1)
152 sock.mogilefs_init(host, port)
156 # Like TCPSocket.new(host, port), but with an explicit timeout
157 # (and we don't care for local address/port we're binding to).
158 # This raises MogileFS::Timeout if timeout expires
159 def mogilefs_new(host, port, timeout = 5.0)
160 sock = mogilefs_new_nonblock(host, port) or return sock
164 r = IO.select(nil, [sock], nil, timeout)
165 return sock if r && r[1] && sock.mogilefs_init
166 timeout -= (Time.now - t0)
169 sock.close rescue nil
170 raise MogileFS::Timeout, 'socket write timeout'
173 include MogileFS::Util
175 # Makes a request on a new TCP Socket and returns with a readble socket
176 # within the given timeout.
177 # This raises MogileFS::Timeout if timeout expires
178 def mogilefs_new_request(host, port, request, timeout = 5.0)
180 sock = mogilefs_new(host, port, timeout)
181 syswrite_full(sock, request, timeout)
182 timeout -= (Time.now - t0)
183 raise MogileFS::Timeout, 'socket read timeout' if timeout < 0
184 r = IO.select([sock], nil, nil, timeout)
185 return sock if r && r[0]
186 raise MogileFS::Timeout, 'socket read timeout'