util: small ivar reduction
[ruby-mogilefs-client.git] / lib / mogilefs / util.rb
blobbca502a6c6aa2c071d50b3f6ae72b0b387ba6b18
1 # -*- encoding: binary -*-
2 require 'mogilefs'
3 require 'socket'
5 module MogileFS::Util
7   CHUNK_SIZE = 65536
9   # for copying large files while avoiding GC thrashing as much as possible
10   # writes the contents of io_rd into io_wr, running through filter if
11   # it is a Proc object.  The filter proc must respond to a string
12   # argument (and return a string) and to nil (possibly returning a
13   # string or nil).  This can be used to filter I/O through an
14   # Zlib::Inflate or Digest::MD5 object
15   def sysrwloop(io_rd, io_wr, filter = nil)
16     copied = 0
17     # avoid making sysread repeatedly allocate a new String
18     # This is not well-documented, but both read/sysread can take
19     # an optional second argument to use as the buffer to avoid
20     # GC overhead of creating new strings in a loop
21     buf = ' ' * CHUNK_SIZE # preallocate to avoid GC thrashing
22     io_rd.flush rescue nil # flush may be needed for sockets/pipes, be safe
23     io_wr.flush
24     io_rd.sync = io_wr.sync = true
25     loop do
26       b = begin
27         io_rd.sysread(CHUNK_SIZE, buf)
28       rescue Errno::EAGAIN, Errno::EINTR
29         IO.select([io_rd], nil, nil, nil)
30         retry
31       rescue EOFError
32         break
33       end
34       b = filter.call(b) if filter
35       copied += syswrite_full(io_wr, b)
36     end
38     # filter must take nil as a possible argument to indicate EOF
39     if filter
40       b = filter.call(nil)
41       copied += syswrite_full(io_wr, b) if b && b.length > 0
42     end
43     copied
44   end # sysrwloop
46   # writes the contents of buf to io_wr in full w/o blocking
47   def syswrite_full(io_wr, buf, timeout = nil)
48     written = 0
49     loop do
50       begin
51         w = io_wr.syswrite(buf)
52         written += w
53         return written if w == buf.size
54         buf = buf[w..-1]
56         # a short syswrite means the next syswrite will likely block
57         # inside the interpreter.  so force an IO.select on it so we can
58         # timeout there if one was specified
59         raise Errno::EAGAIN if timeout
60       rescue Errno::EAGAIN, Errno::EINTR
61         t0 = Time.now if timeout
62         IO.select(nil, [io_wr], nil, timeout)
63         if timeout && ((timeout -= (Time.now - t0)) < 0)
64           raise MogileFS::Timeout, 'syswrite_full timeout'
65         end
66       end
67     end
68     # should never get here
69   end
71   def sysread_full(io_rd, size, timeout = nil, full_timeout = false)
72     tmp = [] # avoid expensive string concatenation with every loop iteration
73     reader = io_rd.method(timeout ? :read_nonblock : :sysread)
74     begin
75       while size > 0
76         tmp << reader.call(size)
77         size -= tmp.last.size
78       end
79     rescue Errno::EAGAIN, Errno::EINTR
80       t0 = Time.now
81       r = IO.select([ io_rd ], nil, nil, timeout)
82       if timeout
83         timeout -= (Time.now - t0) if full_timeout
84         if !(r && r[0]) || timeout < 0
85           raise MogileFS::Timeout, 'sysread_full timeout'
86         end
87       end
88       retry
89     rescue EOFError
90     end
91     tmp.join('')
92   end
94   class StoreContent < Proc
95     def initialize(total_size, &writer_proc)
96       @total_size = total_size
97       super(&writer_proc)
98     end
99     def length
100       @total_size
101     end
102   end
106 require 'timeout'
108 # Timeout error class.  Subclassing it from Timeout::Error is the only
109 # reason we require the 'timeout' module, otherwise that module is
110 # broken and worthless to us.
111 class MogileFS::Timeout < Timeout::Error; end
113 class Socket
114   attr_accessor :mogilefs_size
116   def mogilefs_init(host = nil, port = nil)
117     mogilefs_addr = Socket.sockaddr_in(port, host)
119     begin
120       connect_nonblock(mogilefs_addr)
121     rescue Errno::EINPROGRESS
122     rescue Errno::EISCONN
123       true
124     end
125   end
127   class << self
129     # Creates a new (TCP) Socket and initiates (but does not wait for) the
130     # connection
131     def mogilefs_new_nonblock(host, port)
132       sock = Socket.new(Socket::AF_INET, Socket::SOCK_STREAM, 0)
133       if defined?(Socket::TCP_NODELAY)
134         sock.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1)
135       end
136       sock.mogilefs_init(host, port)
137       sock
138     end
140     # Like TCPSocket.new(host, port), but with an explicit timeout
141     # (and we don't care for local address/port we're binding to).
142     # This raises MogileFS::Timeout if timeout expires
143     def mogilefs_new(host, port, timeout = 5.0)
144       sock = mogilefs_new_nonblock(host, port) or return sock
146       while timeout > 0
147         t0 = Time.now
148         r = IO.select(nil, [sock], nil, timeout)
149         return sock if r && r[1] && sock.mogilefs_init(host, port)
150         timeout -= (Time.now - t0)
151       end
153       sock.close rescue nil
154       raise MogileFS::Timeout, 'socket write timeout'
155     end
157     include MogileFS::Util
159     # Makes a request on a new TCP Socket and returns with a readble socket
160     # within the given timeout.
161     # This raises MogileFS::Timeout if timeout expires
162     def mogilefs_new_request(host, port, request, timeout = 5.0)
163       t0 = Time.now
164       sock = mogilefs_new(host, port, timeout)
165       syswrite_full(sock, request, timeout)
166       timeout -= (Time.now - t0)
167       if timeout < 0
168         sock.close rescue nil
169         raise MogileFS::Timeout, 'socket read timeout'
170       end
171       r = IO.select([sock], nil, nil, timeout)
172       return sock if r && r[0]
174       sock.close rescue nil
175       raise MogileFS::Timeout, 'socket read timeout'
176     end
178   end