sysrwloop: make sure descriptors are flushed and synced
[ruby-mogilefs-client.git] / lib / mogilefs / util.rb
blob32add148cb9c275f922845fe1fae23a9c68e5641
1 require 'mogilefs'
2 require 'socket'
4 module MogileFS::Util
6   CHUNK_SIZE = 65536
8   # for copying large files while avoiding GC thrashing as much as possible
9   # writes the contents of io_rd into io_wr, running through filter if
10   # it is a Proc object.  The filter proc must respond to a string
11   # argument (and return a string) and to nil (possibly returning a
12   # string or nil).  This can be used to filter I/O through an
13   # Zlib::Inflate or Digest::MD5 object
14   def sysrwloop(io_rd, io_wr, filter = nil)
15     copied = 0
16     # avoid making sysread repeatedly allocate a new String
17     # This is not well-documented, but both read/sysread can take
18     # an optional second argument to use as the buffer to avoid
19     # GC overhead of creating new strings in a loop
20     buf = ' ' * CHUNK_SIZE # preallocate to avoid GC thrashing
21     io_rd.flush rescue nil # flush may be needed for sockets/pipes, be safe
22     io_wr.flush
23     io_rd.sync = io_wr.sync = true
24     loop do
25       b = begin
26         io_rd.sysread(CHUNK_SIZE, buf)
27       rescue Errno::EAGAIN, Errno::EINTR
28         IO.select([io_rd], nil, nil, nil)
29         retry
30       rescue EOFError
31         break
32       end
33       b = filter.call(b) if filter
34       copied += syswrite_full(io_wr, b)
35     end
37     # filter must take nil as a possible argument to indicate EOF
38     if filter
39       b = filter.call(nil)
40       copied += syswrite_full(io_wr, b) if b && b.length > 0
41     end
42     copied
43   end # sysrwloop
45   # given an array of URIs, verify that at least one of them is accessible
46   # with the expected HTTP code within the timeout period (in seconds).
47   def verify_uris(uris = [], expect = '200', timeout = 2.00)
48     uri_socks = {}
49     ok_uris = []
50     sockets = []
52     # first, we asynchronously connect to all of them
53     uris.each do |uri|
54       sock = Socket.mogilefs_new_nonblock(uri.host, uri.port) rescue next
55       uri_socks[sock] = uri
56     end
58     # wait for at least one of them to finish connecting and send
59     # HTTP requests to the connected ones
60     begin
61       t0 = Time.now
62       r = IO.select(nil, uri_socks.keys, nil, timeout > 0 ? timeout : 0)
63       timeout -= (Time.now - t0)
64       break unless r && r[1]
65       r[1].each do |sock|
66         begin
67           sock.syswrite "HEAD #{uri_socks[sock].request_uri} HTTP/1.0\r\n\r\n"
68           sockets << sock
69         rescue
70           sock.close rescue nil
71         end
72       end
73     end until sockets[0] || timeout < 0
75     # Await a response from the sockets we had written to, we only need one
76     # valid response, but we'll take more if they return simultaneously
77     if sockets[0]
78       begin
79         t0 = Time.now
80         r = IO.select(sockets, nil, nil, timeout > 0 ? timeout : 0)
81         timeout -= (Time.now - t0)
82         break unless r && r[0]
83         r[0].each do |sock|
84           buf = sock.recv_nonblock(128, Socket::MSG_PEEK) rescue next
85           if buf && /\AHTTP\/[\d\.]+ #{expect} / =~ buf
86             ok_uris << uri_socks.delete(sock)
87             sock.close rescue nil
88           end
89         end
90       end
91     end until ok_uris[0] || timeout < 0
93     ok_uris
94     ensure
95       uri_socks.keys.each { |sock| sock.close rescue nil }
96   end
98   private
100     # writes the contents of buf to io_wr in full w/o blocking
101     def syswrite_full(io_wr, buf)
102       written = 0
103       loop do
104         w = begin
105           io_wr.syswrite(buf)
106         rescue Errno::EAGAIN, Errno::EINTR
107           IO.select(nil, [io_wr], nil, nil)
108           retry
109         end
110         written += w
111         break if w == buf.size
112         buf = buf[w..-1]
113       end
115       written
116     end
120 require 'timeout'
122 # Timeout error class.  Subclassing it from Timeout::Error is the only
123 # reason we require the 'timeout' module, otherwise that module is
124 # broken and worthless to us.
125 class MogileFS::Timeout < Timeout::Error; end
127 class Socket
128   attr_accessor :mogilefs_addr, :mogilefs_connected
130   TCP_CORK = 3 if ! defined?(TCP_CORK) && RUBY_PLATFORM =~ /linux/
132   def mogilefs_tcp_cork=(set)
133     if defined?(TCP_CORK)
134       self.setsockopt(SOL_TCP, TCP_CORK, set ? 1 : 0) rescue nil
135     end
136     set
137   end
139   # Socket lacks peeraddr method of the IPSocket/TCPSocket classes
140   def mogilefs_peername
141     Socket.unpack_sockaddr_in(getpeername).reverse.map {|x| x.to_s }.join(':')
142   end
144   def mogilefs_init(host = nil, port = nil)
145     return true if defined?(@mogilefs_connected)
147     @mogilefs_addr = Socket.sockaddr_in(port, host).freeze if port && host
149     begin
150       connect_nonblock(@mogilefs_addr)
151       @mogilefs_connected = true
152     rescue Errno::EINPROGRESS
153       nil
154     rescue Errno::EISCONN
155       @mogilefs_connected = true
156     end
157   end
159   class << self
161     # Creates a new (TCP) Socket and initiates (but does not wait for) the
162     # connection
163     def mogilefs_new_nonblock(host, port)
164       sock = Socket.new(Socket::AF_INET, Socket::SOCK_STREAM, 0)
165       sock.sync = true
166       if defined?(Socket::TCP_NODELAY)
167         sock.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1)
168       end
169       sock.mogilefs_init(host, port)
170       sock
171     end
173     # Like TCPSocket.new(host, port), but with an explicit timeout
174     # (and we don't care for local address/port we're binding to).
175     # This raises MogileFS::Timeout if timeout expires
176     def mogilefs_new(host, port, timeout = 5.0)
177       sock = mogilefs_new_nonblock(host, port) or return sock
179       while timeout > 0
180         t0 = Time.now
181         r = IO.select(nil, [sock], nil, timeout)
182         return sock if r && r[1] && sock.mogilefs_init
183         timeout -= (Time.now - t0)
184       end
186       sock.close rescue nil
187       raise MogileFS::Timeout, 'socket write timeout'
188     end
190     # Makes a request on a new TCP Socket and returns with a readble socket
191     # within the given timeout.
192     # This raises MogileFS::Timeout if timeout expires
193     def mogilefs_new_request(host, port, request, timeout = 5.0)
194       t0 = Time.now
195       sock = mogilefs_new(host, port, timeout)
196       sock.syswrite(request)
197       timeout -= (Time.now - t0)
198       raise MogileFS::Timeout, 'socket read timeout' if timeout < 0
199       r = IO.select([sock], nil, nil, timeout)
200       return sock if r && r[0]
201       raise MogileFS::Timeout, 'socket read timeout'
202     end
204   end