"new_file" gets many options for Content-MD5 handling
[ruby-mogilefs-client.git] / lib / mogilefs / http_file.rb
blob2e418e4cd8ffb55c69961565459c4d7b41f22f8f
1 # -*- encoding: binary -*-
2 # here are internal implementation details, do not use them in your code
3 require 'stringio'
4 require 'uri'
5 require 'digest/md5'
6 require 'mogilefs/chunker'
8 ##
9 # HTTPFile wraps up the new file operations for storing files onto an HTTP
10 # storage node.
12 # You really don't want to create an HTTPFile by hand.  Instead you want to
13 # create a new file using MogileFS::MogileFS.new_file.
15 class MogileFS::HTTPFile < StringIO
16   class RetryableError < MogileFS::Error; end
17   class EmptyResponseError < RetryableError; end
18   class BadResponseError < RetryableError; end
19   class UnparseableResponseError < RetryableError; end
20   class NoStorageNodesError < MogileFS::Error
21     def message; 'Unable to open socket to storage node'; end
22   end
23   class NonRetryableError < MogileFS::Error; end
25   class HTTPSock < MogileFS::Socket
26     attr_accessor :start
28     # Increase timeout as we become more invested in uploading with
29     # this socket.  The server could be experiencing I/O delays
30     # from large uploads because the sysadmin forgot to tune the
31     # VM sysctls for handling large files.
32     def write(buf)
33       timed_write(buf, Time.now - @start + 5.0)
34     end
35   end
37   # :stopdoc:
38   MD5_TRAILER_NODES = {} # :nodoc: # EXPERIMENTAL
39   class << self
40     attr_accessor :response_timeout_cb
41   end
43   # temporary directories (nginx) may not be configured on the
44   # same device, necessitating a time-consuming full file copy
45   # instead of a quick rename(2)/link(2) operation
46   @response_timeout_cb = lambda do |elapsed_time, bytes_uploaded|
47     mbytes_uploaded = bytes_uploaded / (1024.0 * 1024.0)
48     # assumes worst case is 10M/s on the remote storage disk
49     t = mbytes_uploaded * 10 + elapsed_time
50     t < 5 ? 5 : t
51   end
52   # :startdoc:
54   ##
55   # The URI this file will be stored to.
57   attr_reader :uri
59   attr_reader :devid
61   ##
62   # The big_io name in case we have file > 256M
64   attr_accessor :big_io
66   attr_accessor :streaming_io
68   ##
69   # Creates a new HTTPFile with MogileFS-specific data.  Use
70   # MogileFS::MogileFS#new_file instead of this method.
72   def initialize(dests, opts = nil)
73     super ""
74     @md5 = @streaming_io = @big_io = @active = nil
75     @dests = dests
76     @opts = Integer === opts ? { :content_length => opts } : opts
77   end
79   def request_put(sock, uri, file_size, input = nil)
80     host_with_port = "#{uri.host}:#{uri.port}"
81     clen = @opts[:content_length]
82     file_size ||= clen
84     content_md5 = @opts[:content_md5]
85     if String === content_md5
86       file_size or
87         raise ArgumentError,
88               ":content_length must be specified with :content_md5 String"
89       file_size = "#{file_size}\r\nContent-MD5: #{content_md5}"
90     elsif content_md5.respond_to?(:call) ||
91           :trailer == content_md5 ||
92           MD5_TRAILER_NODES[host_with_port]
93       file_size = nil
94       @md5 = Digest::MD5.new
95     end
97     if file_size
98       sock.write("PUT #{uri.request_uri} HTTP/1.0\r\n" \
99                  "Content-Length: #{file_size}\r\n\r\n")
100       rv = input ? MogileFS.io.copy_stream(@active = input, sock) : yield(sock)
101     else
102       trailers = @md5 ? "Trailer: Content-MD5\r\n" : ""
103       sock.write("PUT #{uri.request_uri} HTTP/1.1\r\n" \
104                  "Host: #{host_with_port}\r\n#{trailers}" \
105                  "Transfer-Encoding: chunked\r\n\r\n")
106       tmp = MogileFS::Chunker.new(sock, @md5, content_md5)
107       rv = input ? MogileFS.io.copy_stream(@active = input, tmp) : yield(tmp)
108       tmp.flush
109     end
111     if clen && clen != rv
112       raise MogileFS::SizeMismatchError,
113             ":content_length expected: #{clen.inspect}, actual: #{rv.inspect}"
114     end
115     rv
116   end
118   def put_streaming_io(sock, uri) # unlikely to be used
119     file_size = @streaming_io.length
120     written = 0
121     request_put(sock, uri, file_size) do |wr|
122       @streaming_io.call(Proc.new do |data_to_write|
123         written += wr.write(data_to_write)
124       end)
125     end
126     file_size ? file_size : written
127   end
129   def rewind_or_raise!(uri, err)
130     @active.rewind if @active
131     rescue => e
132       msg = "#{uri} failed with #{err.message} (#{err.class}) and " \
133             "retrying is impossible as rewind on " \
134             "#{@active.inspect} failed with: #{e.message} (#{e.class})"
135       raise NonRetryableError, msg, e.backtrace
136   end
138   ##
139   # Writes an HTTP PUT request to +sock+ to upload the file and
140   # returns file size if the socket finished writing
141   def upload(devid, uri) # :nodoc:
142     start = Time.now
143     sock = HTTPSock.tcp(uri.host, uri.port)
144     sock.start = start
145     file_size = length
147     if @streaming_io
148       file_size = put_streaming_io(sock, uri)
149     elsif @big_io
150       if String === @big_io || @big_io.respond_to?(:to_path)
151         file = File.open(@big_io)
152         stat = file.stat
153         file_size = request_put(sock, uri, stat.file? ? stat.size : nil, file)
154       else
155         size = nil
156         if @big_io.respond_to?(:stat)
157           stat = @big_io.stat
158           size = stat.size if stat.file?
159         elsif @big_io.respond_to?(:size)
160           size = @big_io.size
161         end
162         file_size = request_put(sock, uri, size, @big_io)
163       end
164     else
165       rewind
166       request_put(sock, uri, file_size, self)
167     end
169     tout = self.class.response_timeout_cb.call(Time.now - start, file_size)
171     case line = sock.timed_read(23, "", tout)
172     when %r{^HTTP/\d\.\d\s+(2\d\d)\s} # success!
173       file_size
174     when nil
175       raise EmptyResponseError, 'Unable to read response line from server'
176     when %r{^HTTP/\d\.\d\s+(\d+)}
177       raise BadResponseError, "HTTP response status from upload: #$1"
178     else
179       raise UnparseableResponseError,
180             "Response line not understood: #{line.inspect}"
181     end
182     rescue SystemCallError, RetryableError => err
183       rewind_or_raise!(uri, err)
184       raise
185     ensure
186       file.close if file
187       sock.close if sock
188   end
190   def commit
191     errors = nil
192     @dests.each do |devid, path|
193       begin
194         uri = URI.parse(path)
195         bytes_uploaded = upload(devid, uri)
196         return create_close(devid, uri, bytes_uploaded)
197       rescue SystemCallError, RetryableError => e
198         errors ||= []
199         errors << "#{path} - #{e.message} (#{e.class})"
200       end
201     end
203     raise NoStorageNodesError,
204           "all paths failed with PUT: #{errors.join(', ')}", []
205   end
207   def create_close(devid, uri, bytes_uploaded)
208     args = {
209       :fid => @opts[:fid],
210       :devid => devid,
211       :key => @opts[:key],
212       :domain => @opts[:domain],
213       :size => bytes_uploaded,
214       :path => uri.to_s,
215     }
216     if @md5
217       args[:checksum] = "MD5:#{@md5.hexdigest}"
218     elsif String === @opts[:content_md5]
219       hex = @opts[:content_md5].unpack('m')[0].unpack('H*')[0]
220       args[:checksum] = "MD5:#{hex}"
221     end
222     args[:checksumverify] = 1 if @opts[:checksumverify]
223     @opts[:backend].create_close(args)
224     bytes_uploaded
225   end
227   def close
228     commit
229     super
230   end