Make sure select(2) calls are IO.select
[ruby-mogilefs-client.git] / lib / mogilefs / util.rb
blobf67ccd7e56a66e6d283e75c45100f8d71ec90a1a
1 require 'mogilefs'
2 require 'socket'
4 module MogileFS::Util
6   CHUNK_SIZE = 65536
8   # for copying large files while avoiding GC thrashing as much as possible
9   # writes the contents of io_rd into io_wr, running through filter if
10   # it is a Proc object.  The filter proc must respond to a string
11   # argument (and return a string) and to nil (possibly returning a
12   # string or nil).  This can be used to filter I/O through an
13   # Zlib::Inflate or Digest::MD5 object
14   def sysrwloop(io_rd, io_wr, filter = nil)
15     copied = 0
16     # avoid making sysread repeatedly allocate a new String
17     # This is not well-documented, but both read/sysread can take
18     # an optional second argument to use as the buffer to avoid
19     # GC overhead of creating new strings in a loop
20     buf = ' ' * CHUNK_SIZE # preallocate to avoid GC thrashing
21     io_wr.sync = true
22     loop do
23       b = begin
24         io_rd.sysread(CHUNK_SIZE, buf)
25       rescue Errno::EAGAIN, Errno::EINTR
26         IO.select([io_rd], nil, nil, nil)
27         retry
28       rescue EOFError
29         break
30       end
31       b = filter.call(b) if filter
32       copied += syswrite_full(io_wr, b)
33     end
35     # filter must take nil as a possible argument to indicate EOF
36     if filter
37       b = filter.call(nil)
38       copied += syswrite_full(io_wr, b) if b && b.length > 0
39     end
40     copied
41   end # sysrwloop
43   # given an array of URIs, verify that at least one of them is accessible
44   # with the expected HTTP code within the timeout period (in seconds).
45   def verify_uris(uris = [], expect = '200', timeout = 2.00)
46     uri_socks = {}
47     ok_uris = []
48     sockets = []
50     # first, we asynchronously connect to all of them
51     uris.each do |uri|
52       sock = Socket.mogilefs_new_nonblock(uri.host, uri.port) rescue next
53       uri_socks[sock] = uri
54     end
56     # wait for at least one of them to finish connecting and send
57     # HTTP requests to the connected ones
58     begin
59       t0 = Time.now
60       r = IO.select(nil, uri_socks.keys, nil, timeout > 0 ? timeout : 0)
61       timeout -= (Time.now - t0)
62       break unless r && r[1]
63       r[1].each do |sock|
64         begin
65           sock.syswrite "HEAD #{uri_socks[sock].request_uri} HTTP/1.0\r\n\r\n"
66           sockets << sock
67         rescue
68           sock.close rescue nil
69         end
70       end
71     end until sockets[0] || timeout < 0
73     # Await a response from the sockets we had written to, we only need one
74     # valid response, but we'll take more if they return simultaneously
75     if sockets[0]
76       begin
77         t0 = Time.now
78         r = IO.select(sockets, nil, nil, timeout > 0 ? timeout : 0)
79         timeout -= (Time.now - t0)
80         break unless r && r[0]
81         r[0].each do |sock|
82           buf = sock.recv_nonblock(128, Socket::MSG_PEEK) rescue next
83           if buf && /\AHTTP\/[\d\.]+ #{expect} / =~ buf
84             ok_uris << uri_socks.delete(sock)
85             sock.close rescue nil
86           end
87         end
88       end
89     end until ok_uris[0] || timeout < 0
91     ok_uris
92     ensure
93       uri_socks.keys.each { |sock| sock.close rescue nil }
94   end
96   private
98     # writes the contents of buf to io_wr in full w/o blocking
99     def syswrite_full(io_wr, buf)
100       written = 0
101       loop do
102         w = begin
103           io_wr.syswrite(buf)
104         rescue Errno::EAGAIN, Errno::EINTR
105           IO.select(nil, [io_wr], nil, nil)
106           retry
107         end
108         written += w
109         break if w == buf.size
110         buf = buf[w..-1]
111       end
113       written
114     end
118 require 'timeout'
120 # Timeout error class.  Subclassing it from Timeout::Error is the only
121 # reason we require the 'timeout' module, otherwise that module is
122 # broken and worthless to us.
123 class MogileFS::Timeout < Timeout::Error; end
125 class Socket
126   attr_accessor :mogilefs_addr, :mogilefs_connected
128   TCP_CORK = 3 if ! defined?(TCP_CORK) && RUBY_PLATFORM =~ /linux/
130   def mogilefs_tcp_cork=(set)
131     if defined?(TCP_CORK)
132       self.setsockopt(SOL_TCP, TCP_CORK, set ? 1 : 0) rescue nil
133     end
134     set
135   end
137   # Socket lacks peeraddr method of the IPSocket/TCPSocket classes
138   def mogilefs_peername
139     Socket.unpack_sockaddr_in(getpeername).reverse.map {|x| x.to_s }.join(':')
140   end
142   def mogilefs_init(host = nil, port = nil)
143     return true if defined?(@mogilefs_connected)
145     @mogilefs_addr = Socket.sockaddr_in(port, host).freeze if port && host
147     begin
148       connect_nonblock(@mogilefs_addr)
149       @mogilefs_connected = true
150     rescue Errno::EINPROGRESS
151       nil
152     rescue Errno::EISCONN
153       @mogilefs_connected = true
154     end
155   end
157   class << self
159     # Creates a new (TCP) Socket and initiates (but does not wait for) the
160     # connection
161     def mogilefs_new_nonblock(host, port)
162       sock = Socket.new(Socket::AF_INET, Socket::SOCK_STREAM, 0)
163       sock.sync = true
164       if defined?(Socket::TCP_NODELAY)
165         sock.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1)
166       end
167       sock.mogilefs_init(host, port)
168       sock
169     end
171     # Like TCPSocket.new(host, port), but with an explicit timeout
172     # (and we don't care for local address/port we're binding to).
173     # This raises MogileFS::Timeout if timeout expires
174     def mogilefs_new(host, port, timeout = 5.0)
175       sock = mogilefs_new_nonblock(host, port) or return sock
177       while timeout > 0
178         t0 = Time.now
179         r = IO.select(nil, [sock], nil, timeout)
180         return sock if r && r[1] && sock.mogilefs_init
181         timeout -= (Time.now - t0)
182       end
184       sock.close rescue nil
185       raise MogileFS::Timeout, 'socket write timeout'
186     end
188     # Makes a request on a new TCP Socket and returns with a readble socket
189     # within the given timeout.
190     # This raises MogileFS::Timeout if timeout expires
191     def mogilefs_new_request(host, port, request, timeout = 5.0)
192       t0 = Time.now
193       sock = mogilefs_new(host, port, timeout)
194       sock.syswrite(request)
195       timeout -= (Time.now - t0)
196       raise MogileFS::Timeout, 'socket read timeout' if timeout < 0
197       r = IO.select([sock], nil, nil, timeout)
198       return sock if r && r[0]
199       raise MogileFS::Timeout, 'socket read timeout'
200     end
202   end