1 # -*- encoding: binary -*-
2 # here are internal implementation details, do not use them in your code
6 require 'mogilefs/chunker'
9 # HTTPFile wraps up the new file operations for storing files onto an HTTP
12 # You really don't want to create an HTTPFile by hand. Instead you want to
13 # create a new file using MogileFS::MogileFS.new_file.
15 class MogileFS::HTTPFile < StringIO
16 class RetryableError < MogileFS::Error; end
17 class EmptyResponseError < RetryableError; end
18 class BadResponseError < RetryableError; end
19 class UnparseableResponseError < RetryableError; end
20 class NoStorageNodesError < MogileFS::Error
21 def message; 'Unable to open socket to storage node'; end
23 class NonRetryableError < MogileFS::Error; end
25 MD5_TRAILER_NODES = {} # :nodoc: # EXPERIMENTAL
28 # The big_io name in case we have file > 256M
32 attr_accessor :streaming_io
35 # Creates a new HTTPFile with MogileFS-specific data. Use
36 # MogileFS::MogileFS#new_file instead of this method.
38 def initialize(dests, opts = nil)
40 @md5 = @streaming_io = @big_io = @active = nil
42 @opts = Integer === opts ? { :content_length => opts } : opts
45 def request_put(sock, uri, file_size, input = nil)
46 host_with_port = "#{uri.host}:#{uri.port}"
47 clen = @opts[:content_length]
50 content_md5 = @opts[:content_md5]
51 if String === content_md5
54 ":content_length must be specified with :content_md5 String"
55 file_size = "#{file_size}\r\nContent-MD5: #{content_md5}"
56 elsif content_md5.respond_to?(:call) ||
57 :trailer == content_md5 ||
58 MD5_TRAILER_NODES[host_with_port]
60 @md5 = Digest::MD5.new
64 sock.write("PUT #{uri.request_uri} HTTP/1.0\r\n" \
65 "Content-Length: #{file_size}\r\n\r\n")
66 rv = input ? MogileFS.io.copy_stream(@active = input, sock) : yield(sock)
68 trailers = @md5 ? "Trailer: Content-MD5\r\n" : ""
69 sock.write("PUT #{uri.request_uri} HTTP/1.1\r\n" \
70 "Host: #{host_with_port}\r\n#{trailers}" \
71 "Transfer-Encoding: chunked\r\n\r\n")
72 tmp = MogileFS::Chunker.new(sock, @md5, content_md5)
73 rv = input ? MogileFS.io.copy_stream(@active = input, tmp) : yield(tmp)
78 raise MogileFS::SizeMismatchError,
79 ":content_length expected: #{clen.inspect}, actual: #{rv.inspect}"
84 def put_streaming_io(sock, uri) # unlikely to be used
85 file_size = @streaming_io.length
87 request_put(sock, uri, file_size) do |wr|
88 @streaming_io.call(Proc.new do |data_to_write|
89 written += wr.write(data_to_write)
92 file_size ? file_size : written
95 def rewind_or_raise!(uri, err)
96 @active.rewind if @active
98 msg = "#{uri} failed with #{err.message} (#{err.class}) and " \
99 "retrying is impossible as rewind on " \
100 "#{@active.inspect} failed with: #{e.message} (#{e.class})"
101 raise NonRetryableError, msg, e.backtrace
105 # Writes an HTTP PUT request to +sock+ to upload the file and
106 # returns file size if the socket finished writing
107 def upload(devid, uri) # :nodoc:
108 sock = MogileFS::Socket.tcp(uri.host, uri.port)
109 sock.setsockopt(Socket::IPPROTO_TCP, Socket::SO_KEEPALIVE, 1)
113 file_size = put_streaming_io(sock, uri)
115 if String === @big_io || @big_io.respond_to?(:to_path)
116 file = File.open(@big_io)
118 file_size = request_put(sock, uri, stat.file? ? stat.size : nil, file)
121 if @big_io.respond_to?(:stat)
123 size = stat.size if stat.file?
124 elsif @big_io.respond_to?(:size)
127 file_size = request_put(sock, uri, size, @big_io)
131 request_put(sock, uri, file_size, self)
134 # mostly relying on SO_KEEPALIVE to timeout
135 case line = sock.timed_read(23, "", 7200)
136 when %r{^HTTP/\d\.\d\s+(2\d\d)\s} # success!
139 raise EmptyResponseError, 'Unable to read response line from server'
140 when %r{^HTTP/\d\.\d\s+(\d+)}
141 raise BadResponseError, "HTTP response status from upload: #$1"
143 raise UnparseableResponseError,
144 "Response line not understood: #{line.inspect}"
146 rescue SystemCallError, RetryableError => err
147 rewind_or_raise!(uri, err)
156 @dests.each do |devid, path|
158 uri = URI.parse(path)
159 bytes_uploaded = upload(devid, uri)
160 return create_close(devid, uri, bytes_uploaded)
161 rescue SystemCallError, RetryableError => e
163 errors << "#{path} - #{e.message} (#{e.class})"
167 raise NoStorageNodesError,
168 "all paths failed with PUT: #{errors.join(', ')}", []
171 def create_close(devid, uri, bytes_uploaded)
176 :domain => @opts[:domain],
177 :size => bytes_uploaded,
181 args[:checksum] = "MD5:#{@md5.hexdigest}"
182 elsif String === @opts[:content_md5]
183 hex = @opts[:content_md5].unpack('m')[0].unpack('H*')[0]
184 args[:checksum] = "MD5:#{hex}"
186 args[:checksumverify] = 1 if @opts[:checksumverify]
187 @opts[:backend].create_close(args)