1 # -*- encoding: binary -*-
2 # here are internal implementation details, do not use them in your code
4 require 'mogilefs/new_file'
7 # HTTPFile wraps up the new file operations for storing files onto an HTTP
10 # You really don't want to create an HTTPFile by hand. Instead you want to
11 # create a new file using MogileFS::MogileFS.new_file.
13 class MogileFS::HTTPFile < StringIO
14 include MogileFS::NewFile::Common
17 # The big_io name in case we have file > 256M
21 attr_accessor :streaming_io
24 # Creates a new HTTPFile with MogileFS-specific data. Use
25 # MogileFS::MogileFS#new_file instead of this method.
27 def initialize(dests, opts = nil)
29 @md5 = @streaming_io = @big_io = @active = nil
31 @opts = Integer === opts ? { :content_length => opts } : opts
34 def request_put(sock, uri, file_size, input = nil)
35 host_with_port = "#{uri.host}:#{uri.port}"
36 clen = @opts[:content_length]
39 content_md5 = @opts[:content_md5]
40 if String === content_md5
43 ":content_length must be specified with :content_md5 String"
44 file_size = "#{file_size}\r\nContent-MD5: #{content_md5}"
45 elsif content_md5.respond_to?(:call) ||
46 :trailer == content_md5 ||
47 MD5_TRAILER_NODES[host_with_port]
49 @md5 = Digest::MD5.new
53 sock.write("PUT #{uri.request_uri} HTTP/1.0\r\n" \
54 "Content-Length: #{file_size}\r\n\r\n")
55 rv = input ? MogileFS.io.copy_stream(@active = input, sock) : yield(sock)
57 trailers = @md5 ? "Trailer: Content-MD5\r\n" : ""
58 sock.write("PUT #{uri.request_uri} HTTP/1.1\r\n" \
59 "Host: #{host_with_port}\r\n#{trailers}" \
60 "Transfer-Encoding: chunked\r\n\r\n")
61 tmp = MogileFS::Chunker.new(sock, @md5, content_md5)
62 rv = input ? MogileFS.io.copy_stream(@active = input, tmp) : yield(tmp)
67 raise MogileFS::SizeMismatchError,
68 ":content_length expected: #{clen.inspect}, actual: #{rv.inspect}"
73 def put_streaming_io(sock, uri) # unlikely to be used
74 file_size = @streaming_io.length
76 request_put(sock, uri, file_size) do |wr|
77 @streaming_io.call(Proc.new do |data_to_write|
78 written += wr.write(data_to_write)
81 file_size ? file_size : written
84 def rewind_or_raise!(uri, err)
85 @active.rewind if @active
87 msg = "#{uri} failed with #{err.message} (#{err.class}) and " \
88 "retrying is impossible as rewind on " \
89 "#{@active.inspect} failed with: #{e.message} (#{e.class})"
90 raise NonRetryableError, msg, e.backtrace
94 # Writes an HTTP PUT request to +sock+ to upload the file and
95 # returns file size if the socket finished writing
96 def upload(devid, uri) # :nodoc:
97 sock = MogileFS::Socket.tcp(uri.host, uri.port)
98 set_socket_options(sock)
102 file_size = put_streaming_io(sock, uri)
104 stat = file = size = nil
105 if @big_io.respond_to?(:stat)
107 elsif String === @big_io || @big_io.respond_to?(:to_path)
109 file = File.open(@big_io)
111 msg = "Failed to open input (#{@big_io.inspect}): " \
112 "#{e.message} (#{e.class})"
113 raise NonRetryableError, msg, e.backtrace
116 elsif @big_io.respond_to?(:size)
119 if stat && stat.file?
121 file ||= @big_io.to_io if @big_io.respond_to?(:to_io)
123 file_size = request_put(sock, uri, size, file || @big_io)
126 request_put(sock, uri, file_size, self)
129 read_response(sock) # raises on errors
131 rescue SystemCallError, RetryableError => err
132 rewind_or_raise!(uri, err)
135 file.close if file && @big_io != file
139 def nhp_put(devid, uri)
140 clen = @opts[:content_length]
141 if clen && clen != size
142 raise MogileFS::SizeMismatchError,
143 ":content_length expected: #{clen.inspect}, actual: #{size}"
146 put = Net::HTTP::Put.new(uri.path)
147 put["Content-Type"] = "application/octet-stream"
148 if md5 = @opts[:content_md5]
149 if md5.respond_to?(:call)
151 elsif md5 == :trailer
152 md5 = [ Digest::MD5.digest(string) ].pack("m").chomp!
154 put["Content-MD5"] = md5
157 res = MogileFS::NHP.request(uri, put)
158 return size if Net::HTTPSuccess === res
159 raise BadResponseError, "#{res.code} #{res.message}"
161 /\ANet::/ =~ "#{e.class}" and
162 raise RetryableError, "#{e.message} (#{e.class})", e.backtrace
168 @dests.each do |devid, path|
170 uri = URI.parse(path)
171 bytes_uploaded = size > 0 ? nhp_put(devid, uri) : upload(devid, uri)
172 return create_close(devid, uri, bytes_uploaded)
173 rescue Timeout::Error, SystemCallError, RetryableError => e
175 errors << "#{path} - #{e.message} (#{e.class})"
179 raise NoStorageNodesError,
180 "all paths failed with PUT: #{errors.join(', ')}", []