Respect timeout when doing get_file_data
[ruby-mogilefs-client.git] / lib / mogilefs / util.rb
blob9ccd4f094b100accb99a07a41f391c8e2dbf5f62
1 require 'mogilefs'
2 require 'socket'
4 module MogileFS::Util
6   CHUNK_SIZE = 65536
8   # for copying large files while avoiding GC thrashing as much as possible
9   # writes the contents of io_rd into io_wr, running through filter if
10   # it is a Proc object.  The filter proc must respond to a string
11   # argument (and return a string) and to nil (possibly returning a
12   # string or nil).  This can be used to filter I/O through an
13   # Zlib::Inflate or Digest::MD5 object
14   def sysrwloop(io_rd, io_wr, filter = nil)
15     copied = 0
16     # avoid making sysread repeatedly allocate a new String
17     # This is not well-documented, but both read/sysread can take
18     # an optional second argument to use as the buffer to avoid
19     # GC overhead of creating new strings in a loop
20     buf = ' ' * CHUNK_SIZE # preallocate to avoid GC thrashing
21     io_rd.flush rescue nil # flush may be needed for sockets/pipes, be safe
22     io_wr.flush
23     io_rd.sync = io_wr.sync = true
24     loop do
25       b = begin
26         io_rd.sysread(CHUNK_SIZE, buf)
27       rescue Errno::EAGAIN, Errno::EINTR
28         IO.select([io_rd], nil, nil, nil)
29         retry
30       rescue EOFError
31         break
32       end
33       b = filter.call(b) if filter
34       copied += syswrite_full(io_wr, b)
35     end
37     # filter must take nil as a possible argument to indicate EOF
38     if filter
39       b = filter.call(nil)
40       copied += syswrite_full(io_wr, b) if b && b.length > 0
41     end
42     copied
43   end # sysrwloop
45   # writes the contents of buf to io_wr in full w/o blocking
46   def syswrite_full(io_wr, buf, timeout = nil)
47     written = 0
48     loop do
49       begin
50         w = io_wr.syswrite(buf)
51         written += w
52         return written if w == buf.size
53         buf = buf[w..-1]
55         # a short syswrite means the next syswrite will likely block
56         # inside the interpreter.  so force an IO.select on it so we can
57         # timeout there if one was specified
58         raise Errno::EAGAIN if timeout
59       rescue Errno::EAGAIN, Errno::EINTR
60         t0 = Time.now if timeout
61         IO.select(nil, [io_wr], nil, timeout)
62         if timeout && ((timeout -= (Time.now - t0)) < 0)
63           raise MogileFS::Timeout, 'syswrite_full timeout'
64         end
65       end
66     end
67     # should never get here
68   end
70   def sysread_full(io_rd, size, timeout = nil, full_timeout = false)
71     tmp = [] # avoid expensive string concatenation with every loop iteration
72     reader = io_rd.method(timeout ? :read_nonblock : :sysread)
73     begin
74       while size > 0
75         tmp << reader.call(size)
76         size -= tmp.last.size
77       end
78     rescue Errno::EAGAIN, Errno::EINTR
79       t0 = Time.now
80       ready = IO.select([ io_rd ], nil, nil, timeout)
81       timeout -= (Time.now - t0) if full_timeout
82       if ready != [ io_rd ] || timeout < 0
83         raise MogileFS::Timeout, 'sysread_full timeout'
84       end
85       retry
86     rescue EOFError
87     end
88     tmp.join('')
89   end
91   class StoreContent < Proc
92     def initialize(total_size, &writer_proc)
93       @total_size = total_size
94       super(&writer_proc)
95     end
96     def length
97       @total_size
98     end
99   end
103 require 'timeout'
105 # Timeout error class.  Subclassing it from Timeout::Error is the only
106 # reason we require the 'timeout' module, otherwise that module is
107 # broken and worthless to us.
108 class MogileFS::Timeout < Timeout::Error; end
110 class Socket
111   attr_accessor :mogilefs_addr, :mogilefs_connected, :mogilefs_size
113   TCP_CORK = 3 if ! defined?(TCP_CORK) && RUBY_PLATFORM =~ /linux/
115   def mogilefs_tcp_cork=(set)
116     if defined?(TCP_CORK)
117       self.setsockopt(SOL_TCP, TCP_CORK, set ? 1 : 0) rescue nil
118     end
119     set
120   end
122   # Socket lacks peeraddr method of the IPSocket/TCPSocket classes
123   def mogilefs_peername
124     Socket.unpack_sockaddr_in(getpeername).reverse.map {|x| x.to_s }.join(':')
125   end
127   def mogilefs_init(host = nil, port = nil)
128     return true if defined?(@mogilefs_connected)
130     @mogilefs_addr = Socket.sockaddr_in(port, host).freeze if port && host
132     begin
133       connect_nonblock(@mogilefs_addr)
134       @mogilefs_connected = true
135     rescue Errno::EINPROGRESS
136       nil
137     rescue Errno::EISCONN
138       @mogilefs_connected = true
139     end
140   end
142   class << self
144     # Creates a new (TCP) Socket and initiates (but does not wait for) the
145     # connection
146     def mogilefs_new_nonblock(host, port)
147       sock = Socket.new(Socket::AF_INET, Socket::SOCK_STREAM, 0)
148       sock.sync = true
149       if defined?(Socket::TCP_NODELAY)
150         sock.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1)
151       end
152       sock.mogilefs_init(host, port)
153       sock
154     end
156     # Like TCPSocket.new(host, port), but with an explicit timeout
157     # (and we don't care for local address/port we're binding to).
158     # This raises MogileFS::Timeout if timeout expires
159     def mogilefs_new(host, port, timeout = 5.0)
160       sock = mogilefs_new_nonblock(host, port) or return sock
162       while timeout > 0
163         t0 = Time.now
164         r = IO.select(nil, [sock], nil, timeout)
165         return sock if r && r[1] && sock.mogilefs_init
166         timeout -= (Time.now - t0)
167       end
169       sock.close rescue nil
170       raise MogileFS::Timeout, 'socket write timeout'
171     end
173     include MogileFS::Util
175     # Makes a request on a new TCP Socket and returns with a readble socket
176     # within the given timeout.
177     # This raises MogileFS::Timeout if timeout expires
178     def mogilefs_new_request(host, port, request, timeout = 5.0)
179       t0 = Time.now
180       sock = mogilefs_new(host, port, timeout)
181       syswrite_full(sock, request, timeout)
182       timeout -= (Time.now - t0)
183       raise MogileFS::Timeout, 'socket read timeout' if timeout < 0
184       r = IO.select([sock], nil, nil, timeout)
185       return sock if r && r[0]
186       raise MogileFS::Timeout, 'socket read timeout'
187     end
189   end