rely on SO_KEEPALIVE for upload timeouts
[ruby-mogilefs-client.git] / lib / mogilefs / http_file.rb
blobb8bfd4858ea49c3de981a06b1e19fbe1306105a0
1 # -*- encoding: binary -*-
2 # here are internal implementation details, do not use them in your code
3 require 'stringio'
4 require 'uri'
5 require 'digest/md5'
6 require 'mogilefs/chunker'
8 ##
9 # HTTPFile wraps up the new file operations for storing files onto an HTTP
10 # storage node.
12 # You really don't want to create an HTTPFile by hand.  Instead you want to
13 # create a new file using MogileFS::MogileFS.new_file.
15 class MogileFS::HTTPFile < StringIO
16   class RetryableError < MogileFS::Error; end
17   class EmptyResponseError < RetryableError; end
18   class BadResponseError < RetryableError; end
19   class UnparseableResponseError < RetryableError; end
20   class NoStorageNodesError < MogileFS::Error
21     def message; 'Unable to open socket to storage node'; end
22   end
23   class NonRetryableError < MogileFS::Error; end
25   MD5_TRAILER_NODES = {} # :nodoc: # EXPERIMENTAL
27   ##
28   # The big_io name in case we have file > 256M
30   attr_accessor :big_io
32   attr_accessor :streaming_io
34   ##
35   # Creates a new HTTPFile with MogileFS-specific data.  Use
36   # MogileFS::MogileFS#new_file instead of this method.
38   def initialize(dests, opts = nil)
39     super ""
40     @md5 = @streaming_io = @big_io = @active = nil
41     @dests = dests
42     @opts = Integer === opts ? { :content_length => opts } : opts
43   end
45   def request_put(sock, uri, file_size, input = nil)
46     host_with_port = "#{uri.host}:#{uri.port}"
47     clen = @opts[:content_length]
48     file_size ||= clen
50     content_md5 = @opts[:content_md5]
51     if String === content_md5
52       file_size or
53         raise ArgumentError,
54               ":content_length must be specified with :content_md5 String"
55       file_size = "#{file_size}\r\nContent-MD5: #{content_md5}"
56     elsif content_md5.respond_to?(:call) ||
57           :trailer == content_md5 ||
58           MD5_TRAILER_NODES[host_with_port]
59       file_size = nil
60       @md5 = Digest::MD5.new
61     end
63     if file_size
64       sock.write("PUT #{uri.request_uri} HTTP/1.0\r\n" \
65                  "Content-Length: #{file_size}\r\n\r\n")
66       rv = input ? MogileFS.io.copy_stream(@active = input, sock) : yield(sock)
67     else
68       trailers = @md5 ? "Trailer: Content-MD5\r\n" : ""
69       sock.write("PUT #{uri.request_uri} HTTP/1.1\r\n" \
70                  "Host: #{host_with_port}\r\n#{trailers}" \
71                  "Transfer-Encoding: chunked\r\n\r\n")
72       tmp = MogileFS::Chunker.new(sock, @md5, content_md5)
73       rv = input ? MogileFS.io.copy_stream(@active = input, tmp) : yield(tmp)
74       tmp.flush
75     end
77     if clen && clen != rv
78       raise MogileFS::SizeMismatchError,
79             ":content_length expected: #{clen.inspect}, actual: #{rv.inspect}"
80     end
81     rv
82   end
84   def put_streaming_io(sock, uri) # unlikely to be used
85     file_size = @streaming_io.length
86     written = 0
87     request_put(sock, uri, file_size) do |wr|
88       @streaming_io.call(Proc.new do |data_to_write|
89         written += wr.write(data_to_write)
90       end)
91     end
92     file_size ? file_size : written
93   end
95   def rewind_or_raise!(uri, err)
96     @active.rewind if @active
97     rescue => e
98       msg = "#{uri} failed with #{err.message} (#{err.class}) and " \
99             "retrying is impossible as rewind on " \
100             "#{@active.inspect} failed with: #{e.message} (#{e.class})"
101       raise NonRetryableError, msg, e.backtrace
102   end
104   ##
105   # Writes an HTTP PUT request to +sock+ to upload the file and
106   # returns file size if the socket finished writing
107   def upload(devid, uri) # :nodoc:
108     sock = MogileFS::Socket.tcp(uri.host, uri.port)
109     sock.setsockopt(Socket::IPPROTO_TCP, Socket::SO_KEEPALIVE, 1)
110     file_size = length
112     if @streaming_io
113       file_size = put_streaming_io(sock, uri)
114     elsif @big_io
115       if String === @big_io || @big_io.respond_to?(:to_path)
116         file = File.open(@big_io)
117         stat = file.stat
118         file_size = request_put(sock, uri, stat.file? ? stat.size : nil, file)
119       else
120         size = nil
121         if @big_io.respond_to?(:stat)
122           stat = @big_io.stat
123           size = stat.size if stat.file?
124         elsif @big_io.respond_to?(:size)
125           size = @big_io.size
126         end
127         file_size = request_put(sock, uri, size, @big_io)
128       end
129     else
130       rewind
131       request_put(sock, uri, file_size, self)
132     end
134     # mostly relying on SO_KEEPALIVE to timeout
135     case line = sock.timed_read(23, "", 7200)
136     when %r{^HTTP/\d\.\d\s+(2\d\d)\s} # success!
137       file_size
138     when nil
139       raise EmptyResponseError, 'Unable to read response line from server'
140     when %r{^HTTP/\d\.\d\s+(\d+)}
141       raise BadResponseError, "HTTP response status from upload: #$1"
142     else
143       raise UnparseableResponseError,
144             "Response line not understood: #{line.inspect}"
145     end
146     rescue SystemCallError, RetryableError => err
147       rewind_or_raise!(uri, err)
148       raise
149     ensure
150       file.close if file
151       sock.close if sock
152   end
154   def commit
155     errors = nil
156     @dests.each do |devid, path|
157       begin
158         uri = URI.parse(path)
159         bytes_uploaded = upload(devid, uri)
160         return create_close(devid, uri, bytes_uploaded)
161       rescue SystemCallError, RetryableError => e
162         errors ||= []
163         errors << "#{path} - #{e.message} (#{e.class})"
164       end
165     end
167     raise NoStorageNodesError,
168           "all paths failed with PUT: #{errors.join(', ')}", []
169   end
171   def create_close(devid, uri, bytes_uploaded)
172     args = {
173       :fid => @opts[:fid],
174       :devid => devid,
175       :key => @opts[:key],
176       :domain => @opts[:domain],
177       :size => bytes_uploaded,
178       :path => uri.to_s,
179     }
180     if @md5
181       args[:checksum] = "MD5:#{@md5.hexdigest}"
182     elsif String === @opts[:content_md5]
183       hex = @opts[:content_md5].unpack('m')[0].unpack('H*')[0]
184       args[:checksum] = "MD5:#{hex}"
185     end
186     args[:checksumverify] = 1 if @opts[:checksumverify]
187     @opts[:backend].create_close(args)
188     bytes_uploaded
189   end
191   def close
192     commit
193     super
194   end