1 # -*- encoding: binary -*-
9 # for copying large files while avoiding GC thrashing as much as possible
10 # writes the contents of io_rd into io_wr, running through filter if
11 # it is a Proc object. The filter proc must respond to a string
12 # argument (and return a string) and to nil (possibly returning a
13 # string or nil). This can be used to filter I/O through an
14 # Zlib::Inflate or Digest::MD5 object
15 def sysrwloop(io_rd, io_wr, filter = nil)
17 # avoid making sysread repeatedly allocate a new String
18 # This is not well-documented, but both read/sysread can take
19 # an optional second argument to use as the buffer to avoid
20 # GC overhead of creating new strings in a loop
21 buf = ' ' * CHUNK_SIZE # preallocate to avoid GC thrashing
22 io_rd.flush rescue nil # flush may be needed for sockets/pipes, be safe
24 io_rd.sync = io_wr.sync = true
27 io_rd.sysread(CHUNK_SIZE, buf)
28 rescue Errno::EAGAIN, Errno::EINTR
29 IO.select([io_rd], nil, nil, nil)
34 b = filter.call(b) if filter
35 copied += syswrite_full(io_wr, b)
38 # filter must take nil as a possible argument to indicate EOF
41 copied += syswrite_full(io_wr, b) if b && b.length > 0
46 # writes the contents of buf to io_wr in full w/o blocking
47 def syswrite_full(io_wr, buf, timeout = nil)
51 w = io_wr.syswrite(buf)
53 return written if w == buf.size
56 # a short syswrite means the next syswrite will likely block
57 # inside the interpreter. so force an IO.select on it so we can
58 # timeout there if one was specified
59 raise Errno::EAGAIN if timeout
60 rescue Errno::EAGAIN, Errno::EINTR
61 t0 = Time.now if timeout
62 IO.select(nil, [io_wr], nil, timeout)
63 if timeout && ((timeout -= (Time.now - t0)) < 0)
64 raise MogileFS::Timeout, 'syswrite_full timeout'
68 # should never get here
71 def sysread_full(io_rd, size, timeout = nil, full_timeout = false)
72 tmp = [] # avoid expensive string concatenation with every loop iteration
73 reader = io_rd.method(timeout ? :read_nonblock : :sysread)
76 tmp << reader.call(size)
79 rescue Errno::EAGAIN, Errno::EINTR
81 r = IO.select([ io_rd ], nil, nil, timeout)
83 timeout -= (Time.now - t0) if full_timeout
84 if !(r && r[0]) || timeout < 0
85 raise MogileFS::Timeout, 'sysread_full timeout'
94 class StoreContent < Proc
95 def initialize(total_size, &writer_proc)
96 @total_size = total_size
108 # Timeout error class. Subclassing it from Timeout::Error is the only
109 # reason we require the 'timeout' module, otherwise that module is
110 # broken and worthless to us.
111 class MogileFS::Timeout < Timeout::Error; end
114 attr_accessor :mogilefs_size
116 def mogilefs_init(host = nil, port = nil)
117 mogilefs_addr = Socket.sockaddr_in(port, host)
120 connect_nonblock(mogilefs_addr)
121 rescue Errno::EINPROGRESS
122 rescue Errno::EISCONN
129 # Creates a new (TCP) Socket and initiates (but does not wait for) the
131 def mogilefs_new_nonblock(host, port)
132 sock = Socket.new(Socket::AF_INET, Socket::SOCK_STREAM, 0)
133 if defined?(Socket::TCP_NODELAY)
134 sock.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1)
136 sock.mogilefs_init(host, port)
140 # Like TCPSocket.new(host, port), but with an explicit timeout
141 # (and we don't care for local address/port we're binding to).
142 # This raises MogileFS::Timeout if timeout expires
143 def mogilefs_new(host, port, timeout = 5.0)
144 sock = mogilefs_new_nonblock(host, port) or return sock
148 r = IO.select(nil, [sock], nil, timeout)
149 return sock if r && r[1] && sock.mogilefs_init(host, port)
150 timeout -= (Time.now - t0)
153 sock.close rescue nil
154 raise MogileFS::Timeout, 'socket write timeout'
157 include MogileFS::Util
159 # Makes a request on a new TCP Socket and returns with a readble socket
160 # within the given timeout.
161 # This raises MogileFS::Timeout if timeout expires
162 def mogilefs_new_request(host, port, request, timeout = 5.0)
164 sock = mogilefs_new(host, port, timeout)
165 syswrite_full(sock, request, timeout)
166 timeout -= (Time.now - t0)
168 sock.close rescue nil
169 raise MogileFS::Timeout, 'socket read timeout'
171 r = IO.select([sock], nil, nil, timeout)
172 return sock if r && r[0]
174 sock.close rescue nil
175 raise MogileFS::Timeout, 'socket read timeout'