8 # for copying large files while avoiding GC thrashing as much as possible
9 # writes the contents of io_rd into io_wr, running through filter if
10 # it is a Proc object. The filter proc must respond to a string
11 # argument (and return a string) and to nil (possibly returning a
12 # string or nil). This can be used to filter I/O through an
13 # Zlib::Inflate or Digest::MD5 object
14 def sysrwloop(io_rd, io_wr, filter = nil)
16 # avoid making sysread repeatedly allocate a new String
17 # This is not well-documented, but both read/sysread can take
18 # an optional second argument to use as the buffer to avoid
19 # GC overhead of creating new strings in a loop
20 buf = ' ' * CHUNK_SIZE # preallocate to avoid GC thrashing
24 io_rd.sysread(CHUNK_SIZE, buf)
25 rescue Errno::EAGAIN, Errno::EINTR
26 IO.select([io_rd], nil, nil, nil)
31 b = filter.call(b) if filter
32 copied += syswrite_full(io_wr, b)
35 # filter must take nil as a possible argument to indicate EOF
38 copied += syswrite_full(io_wr, b) if b && b.length > 0
43 # given an array of URIs, verify that at least one of them is accessible
44 # with the expected HTTP code within the timeout period (in seconds).
45 def verify_uris(uris = [], expect = '200', timeout = 2.00)
50 # first, we asynchronously connect to all of them
52 sock = Socket.mogilefs_new_nonblock(uri.host, uri.port) rescue next
56 # wait for at least one of them to finish connecting and send
57 # HTTP requests to the connected ones
60 r = IO.select(nil, uri_socks.keys, nil, timeout > 0 ? timeout : 0)
61 timeout -= (Time.now - t0)
62 break unless r && r[1]
65 sock.syswrite "HEAD #{uri_socks[sock].request_uri} HTTP/1.0\r\n\r\n"
71 end until sockets[0] || timeout < 0
73 # Await a response from the sockets we had written to, we only need one
74 # valid response, but we'll take more if they return simultaneously
78 r = IO.select(sockets, nil, nil, timeout > 0 ? timeout : 0)
79 timeout -= (Time.now - t0)
80 break unless r && r[0]
82 buf = sock.recv_nonblock(128, Socket::MSG_PEEK) rescue next
83 if buf && /\AHTTP\/[\d\.]+ #{expect} / =~ buf
84 ok_uris << uri_socks.delete(sock)
89 end until ok_uris[0] || timeout < 0
93 uri_socks.keys.each { |sock| sock.close rescue nil }
98 # writes the contents of buf to io_wr in full w/o blocking
99 def syswrite_full(io_wr, buf)
104 rescue Errno::EAGAIN, Errno::EINTR
105 IO.select(nil, [io_wr], nil, nil)
109 break if w == buf.size
120 # Timeout error class. Subclassing it from Timeout::Error is the only
121 # reason we require the 'timeout' module, otherwise that module is
122 # broken and worthless to us.
123 class MogileFS::Timeout < Timeout::Error; end
126 attr_accessor :mogilefs_addr, :mogilefs_connected
128 TCP_CORK = 3 if ! defined?(TCP_CORK) && RUBY_PLATFORM =~ /linux/
130 def mogilefs_tcp_cork=(set)
131 if defined?(TCP_CORK)
132 self.setsockopt(SOL_TCP, TCP_CORK, set ? 1 : 0) rescue nil
137 # Socket lacks peeraddr method of the IPSocket/TCPSocket classes
138 def mogilefs_peername
139 Socket.unpack_sockaddr_in(getpeername).reverse.map {|x| x.to_s }.join(':')
142 def mogilefs_init(host = nil, port = nil)
143 return true if defined?(@mogilefs_connected)
145 @mogilefs_addr = Socket.sockaddr_in(port, host).freeze if port && host
148 connect_nonblock(@mogilefs_addr)
149 @mogilefs_connected = true
150 rescue Errno::EINPROGRESS
152 rescue Errno::EISCONN
153 @mogilefs_connected = true
159 # Creates a new (TCP) Socket and initiates (but does not wait for) the
161 def mogilefs_new_nonblock(host, port)
162 sock = Socket.new(Socket::AF_INET, Socket::SOCK_STREAM, 0)
164 if defined?(Socket::TCP_NODELAY)
165 sock.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1)
167 sock.mogilefs_init(host, port)
171 # Like TCPSocket.new(host, port), but with an explicit timeout
172 # (and we don't care for local address/port we're binding to).
173 # This raises MogileFS::Timeout if timeout expires
174 def mogilefs_new(host, port, timeout = 5.0)
175 sock = mogilefs_new_nonblock(host, port) or return sock
179 r = IO.select(nil, [sock], nil, timeout)
180 return sock if r && r[1] && sock.mogilefs_init
181 timeout -= (Time.now - t0)
184 sock.close rescue nil
185 raise MogileFS::Timeout, 'socket write timeout'
188 # Makes a request on a new TCP Socket and returns with a readble socket
189 # within the given timeout.
190 # This raises MogileFS::Timeout if timeout expires
191 def mogilefs_new_request(host, port, request, timeout = 5.0)
193 sock = mogilefs_new(host, port, timeout)
194 sock.syswrite(request)
195 timeout -= (Time.now - t0)
196 raise MogileFS::Timeout, 'socket read timeout' if timeout < 0
197 r = IO.select([sock], nil, nil, timeout)
198 return sock if r && r[0]
199 raise MogileFS::Timeout, 'socket read timeout'