remove the rest of the Socket monkeypatches
[ruby-mogilefs-client.git] / lib / mogilefs / util.rb
blob8f315aa5b210c97861d8cb782f24de100ba28dac
1 # -*- encoding: binary -*-
2 require 'mogilefs'
3 require 'socket'
5 module MogileFS::Util
7   CHUNK_SIZE = 65536
9   # TODO: cleanup
10   if IO.respond_to?(:copy_stream)
11     def copy_stream(src, dst)
12       IO.copy_stream(src, dst)
13     end
14   else
15     def copy_stream(src, dst)
16       sysrwloop(src, dst)
17     end
18   end
20   # for copying large files while avoiding GC thrashing as much as possible
21   # writes the contents of io_rd into io_wr, running through filter if
22   # it is a Proc object.  The filter proc must respond to a string
23   # argument (and return a string) and to nil (possibly returning a
24   # string or nil).  This can be used to filter I/O through an
25   # Zlib::Inflate or Digest::MD5 object
26   def sysrwloop(io_rd, io_wr, filter = nil)
27     copied = 0
28     # avoid making sysread repeatedly allocate a new String
29     # This is not well-documented, but both read/sysread can take
30     # an optional second argument to use as the buffer to avoid
31     # GC overhead of creating new strings in a loop
32     buf = ' ' * CHUNK_SIZE # preallocate to avoid GC thrashing
33     io_rd.flush rescue nil # flush may be needed for sockets/pipes, be safe
34     io_wr.flush
35     io_rd.sync = io_wr.sync = true
36     loop do
37       b = begin
38         io_rd.sysread(CHUNK_SIZE, buf)
39       rescue Errno::EAGAIN, Errno::EINTR
40         IO.select([io_rd], nil, nil, nil)
41         retry
42       rescue EOFError
43         break
44       end
45       b = filter.call(b) if filter
46       copied += syswrite_full(io_wr, b)
47     end
49     # filter must take nil as a possible argument to indicate EOF
50     if filter
51       b = filter.call(nil)
52       copied += syswrite_full(io_wr, b) if b && b.length > 0
53     end
54     copied
55   end # sysrwloop
57   # writes the contents of buf to io_wr in full w/o blocking
58   def syswrite_full(io_wr, buf, timeout = nil)
59     written = 0
60     loop do
61       begin
62         w = io_wr.syswrite(buf)
63         written += w
64         return written if w == buf.size
65         buf = buf[w..-1]
67         # a short syswrite means the next syswrite will likely block
68         # inside the interpreter.  so force an IO.select on it so we can
69         # timeout there if one was specified
70         raise Errno::EAGAIN if timeout
71       rescue Errno::EAGAIN, Errno::EINTR
72         t0 = Time.now if timeout
73         IO.select(nil, [io_wr], nil, timeout)
74         if timeout && ((timeout -= (Time.now - t0)) < 0)
75           raise MogileFS::Timeout, 'syswrite_full timeout'
76         end
77       end
78     end
79     # should never get here
80   end
82   class StoreContent < Proc
83     def initialize(total_size, &writer_proc)
84       @total_size = total_size
85       super(&writer_proc)
86     end
87     def length
88       @total_size
89     end
90   end
92 end
94 require 'timeout'
96 # Timeout error class.  Subclassing it from Timeout::Error is the only
97 # reason we require the 'timeout' module, otherwise that module is
98 # broken and worthless to us.
99 class MogileFS::Timeout < Timeout::Error; end
101 class Socket
102   attr_accessor :mogilefs_size
104   def mogilefs_init(host = nil, port = nil)
105     mogilefs_addr = Socket.sockaddr_in(port, host)
107     begin
108       connect_nonblock(mogilefs_addr)
109     rescue Errno::EINPROGRESS
110     rescue Errno::EISCONN
111       true
112     end
113   end