1 # -*- encoding: binary -*-
2 # here are internal implementation details, do not use them in your code
5 require 'mogilefs/chunker'
8 # HTTPFile wraps up the new file operations for storing files onto an HTTP
11 # You really don't want to create an HTTPFile by hand. Instead you want to
12 # create a new file using MogileFS::MogileFS.new_file.
14 class MogileFS::HTTPFile < StringIO
15 class EmptyResponseError < MogileFS::Error; end
16 class BadResponseError < MogileFS::Error; end
17 class UnparseableResponseError < MogileFS::Error; end
18 class NoStorageNodesError < MogileFS::Error
19 def message; 'Unable to open socket to storage node'; end
21 class NonRetryableError < MogileFS::Error; end
24 MD5_TRAILER_NODES = {} # :nodoc: # EXPERIMENTAL
26 attr_accessor :response_timeout_cb
29 @response_timeout_cb = lambda do |elapsed_time, bytes_uploaded|
30 mbytes_uploaded = bytes_uploaded / (1024.0 * 1024.0)
31 # assumes worst case is 10M/s on the remote storage disk
32 t = mbytes_uploaded * 10 + elapsed_time
38 # The URI this file will be stored to.
45 # The big_io name in case we have file > 256M
49 attr_accessor :streaming_io
52 # Creates a new HTTPFile with MogileFS-specific data. Use
53 # MogileFS::MogileFS#new_file instead of this method.
55 def initialize(dests, content_length)
57 @streaming_io = @big_io = @uri = @devid = @active = nil
61 def request_put(sock, uri, file_size, input = nil)
62 host_with_port = "#{uri.host}:#{uri.port}"
64 if MD5_TRAILER_NODES[host_with_port]
70 sock.write("PUT #{uri.request_uri} HTTP/1.0\r\n" \
71 "Content-Length: #{file_size}\r\n\r\n")
72 input ? MogileFS.io.copy_stream(@active = input, sock) : yield(sock)
74 trailers = md5 ? "Trailer: Content-MD5\r\n" : ""
75 sock.write("PUT #{uri.request_uri} HTTP/1.1\r\n" \
76 "Host: #{host_with_port}\r\n#{trailers}" \
77 "Transfer-Encoding: chunked\r\n\r\n")
78 tmp = MogileFS::Chunker.new(sock, md5)
79 rv = input ? MogileFS.io.copy_stream(@active = input, tmp) : yield(tmp)
85 def put_streaming_io(sock, uri) # unlikely to be used
86 file_size = @streaming_io.length
88 request_put(sock, uri, file_size) do |wr|
89 @streaming_io.call(Proc.new do |data_to_write|
90 written += wr.write(data_to_write)
93 file_size ? file_size : written
96 def rewind_or_raise!(uri, err)
97 @active.rewind if @active
99 msg = "#{uri} failed with #{err.message} (#{err.class}) and " \
100 "retrying is impossible as rewind on " \
101 "#{@active.inspect} failed with: #{e.message} (#{e.class})"
102 raise NonRetryableError, msg, e.backtrace
106 # Writes an HTTP PUT request to +sock+ to upload the file and
107 # returns file size if the socket finished writing
108 def upload(devid, uri) # :nodoc:
110 sock = MogileFS::Socket.tcp(uri.host, uri.port)
114 file_size = put_streaming_io(sock, uri)
116 if String === @big_io || @big_io.respond_to?(:to_path)
117 file = File.open(@big_io)
119 file_size = request_put(sock, uri, stat.file? ? stat.size : nil, file)
122 if @big_io.respond_to?(:stat)
124 size = stat.size if stat.file?
125 elsif @big_io.respond_to?(:size)
128 file_size = request_put(sock, uri, size, @big_io)
132 request_put(sock, uri, file_size, self)
135 tout = self.class.response_timeout_cb.call(Time.now - start, file_size)
137 case line = sock.timed_read(23, "", tout)
138 when %r{^HTTP/\d\.\d\s+(2\d\d)\s} # success!
141 raise EmptyResponseError, 'Unable to read response line from server'
142 when %r{^HTTP/\d\.\d\s+(\d+)}
143 raise BadResponseError, "HTTP response status from upload: #$1"
145 raise UnparseableResponseError,
146 "Response line not understood: #{line.inspect}"
149 rewind_or_raise!(uri, err)
158 @dests.each do |devid, path|
160 uri = URI.parse(path)
161 bytes_uploaded = upload(devid, uri)
162 @devid, @uri = devid, uri
163 return bytes_uploaded
164 rescue NonRetryableError
168 errors << "#{path} - #{e.message} (#{e.class})"
172 raise NoStorageNodesError,
173 "all paths failed with PUT: #{errors.join(', ')}", []