util: remove syswrite_full method
[ruby-mogilefs-client.git] / lib / mogilefs / util.rb
blob1fb5ffb8badeeda96db363b49a0a986e9f7fbc49
1 # -*- encoding: binary -*-
2 require 'mogilefs'
3 require 'socket'
5 module MogileFS::Util
7   CHUNK_SIZE = 65536
9   # TODO: cleanup
10   if IO.respond_to?(:copy_stream)
11     def copy_stream(src, dst)
12       IO.copy_stream(src, dst)
13     end
14   else
15     def copy_stream(src, dst)
16       sysrwloop(src, dst)
17     end
18   end
20   # for copying large files while avoiding GC thrashing as much as possible
21   # writes the contents of io_rd into io_wr, running through filter if
22   # it is a Proc object.  The filter proc must respond to a string
23   # argument (and return a string) and to nil (possibly returning a
24   # string or nil).  This can be used to filter I/O through an
25   # Zlib::Inflate or Digest::MD5 object
26   def sysrwloop(io_rd, io_wr, filter = nil)
27     copied = 0
28     # avoid making sysread repeatedly allocate a new String
29     # This is not well-documented, but both read/sysread can take
30     # an optional second argument to use as the buffer to avoid
31     # GC overhead of creating new strings in a loop
32     buf = ' ' * CHUNK_SIZE # preallocate to avoid GC thrashing
33     io_rd.flush rescue nil # flush may be needed for sockets/pipes, be safe
34     io_wr.flush
35     io_rd.sync = io_wr.sync = true
36     loop do
37       b = begin
38         io_rd.sysread(CHUNK_SIZE, buf)
39       rescue Errno::EAGAIN, Errno::EINTR
40         IO.select([io_rd], nil, nil, nil)
41         retry
42       rescue EOFError
43         break
44       end
45       b = filter.call(b) if filter
46       copied += syswrite_full(io_wr, b)
47     end
49     # filter must take nil as a possible argument to indicate EOF
50     if filter
51       b = filter.call(nil)
52       copied += syswrite_full(io_wr, b) if b && b.length > 0
53     end
54     copied
55   end # sysrwloop
57   # writes the contents of buf to io_wr in full w/o blocking
58   def syswrite_full(io_wr, buf, timeout = nil)
59     written = 0
60     loop do
61       begin
62         w = io_wr.syswrite(buf)
63         written += w
64         return written if w == buf.size
65         buf = buf[w..-1]
67         # a short syswrite means the next syswrite will likely block
68         # inside the interpreter.  so force an IO.select on it so we can
69         # timeout there if one was specified
70         raise Errno::EAGAIN if timeout
71       rescue Errno::EAGAIN, Errno::EINTR
72         t0 = Time.now if timeout
73         IO.select(nil, [io_wr], nil, timeout)
74         if timeout && ((timeout -= (Time.now - t0)) < 0)
75           raise MogileFS::Timeout, 'syswrite_full timeout'
76         end
77       end
78     end
79     # should never get here
80   end
82   class StoreContent < Proc
83     def initialize(total_size, &writer_proc)
84       @total_size = total_size
85       super(&writer_proc)
86     end
87     def length
88       @total_size
89     end
90   end
92 end
94 require 'timeout'
96 # Timeout error class.  Subclassing it from Timeout::Error is the only
97 # reason we require the 'timeout' module, otherwise that module is
98 # broken and worthless to us.
99 class MogileFS::Timeout < Timeout::Error; end
101 class Socket
102   attr_accessor :mogilefs_size
104   def mogilefs_init(host = nil, port = nil)
105     mogilefs_addr = Socket.sockaddr_in(port, host)
107     begin
108       connect_nonblock(mogilefs_addr)
109     rescue Errno::EINPROGRESS
110     rescue Errno::EISCONN
111       true
112     end
113   end
115   class << self
117     # Creates a new (TCP) Socket and initiates (but does not wait for) the
118     # connection
119     def mogilefs_new_nonblock(host, port)
120       sock = Socket.new(Socket::AF_INET, Socket::SOCK_STREAM, 0)
121       if defined?(Socket::TCP_NODELAY)
122         sock.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1)
123       end
124       sock.mogilefs_init(host, port)
125       sock
126     end
128     # Like TCPSocket.new(host, port), but with an explicit timeout
129     # (and we don't care for local address/port we're binding to).
130     # This raises MogileFS::Timeout if timeout expires
131     def mogilefs_new(host, port, timeout = 5.0)
132       sock = mogilefs_new_nonblock(host, port) or return sock
134       while timeout > 0
135         t0 = Time.now
136         r = IO.select(nil, [sock], nil, timeout)
137         return sock if r && r[1] && sock.mogilefs_init(host, port)
138         timeout -= (Time.now - t0)
139       end
141       sock.close rescue nil
142       raise MogileFS::Timeout, 'socket write timeout'
143     end
145     include MogileFS::Util
147     # Makes a request on a new TCP Socket and returns with a readble socket
148     # within the given timeout.
149     # This raises MogileFS::Timeout if timeout expires
150     def mogilefs_new_request(host, port, request, timeout = 5.0)
151       t0 = Time.now
152       sock = mogilefs_new(host, port, timeout)
153       syswrite_full(sock, request, timeout)
154       timeout -= (Time.now - t0)
155       if timeout < 0
156         sock.close rescue nil
157         raise MogileFS::Timeout, 'socket read timeout'
158       end
159       r = IO.select([sock], nil, nil, timeout)
160       return sock if r && r[0]
162       sock.close rescue nil
163       raise MogileFS::Timeout, 'socket read timeout'
164     end
166   end