http_file: wait for the destination server to respond
[ruby-mogilefs-client.git] / lib / mogilefs / http_file.rb
blobb08abaa555960f4cf60c49cb6da600fc0a1ba336
1 # -*- encoding: binary -*-
2 # here are internal implementation details, do not use them in your code
3 require 'stringio'
4 require 'uri'
5 require 'mogilefs/chunker'
7 ##
8 # HTTPFile wraps up the new file operations for storing files onto an HTTP
9 # storage node.
11 # You really don't want to create an HTTPFile by hand.  Instead you want to
12 # create a new file using MogileFS::MogileFS.new_file.
14 class MogileFS::HTTPFile < StringIO
15   class EmptyResponseError < MogileFS::Error; end
16   class BadResponseError < MogileFS::Error; end
17   class UnparseableResponseError < MogileFS::Error; end
18   class NoStorageNodesError < MogileFS::Error
19     def message; 'Unable to open socket to storage node'; end
20   end
21   class NonRetryableError < MogileFS::Error; end
23   # :stopdoc:
24   MD5_TRAILER_NODES = {} # :nodoc: # EXPERIMENTAL
25   class << self
26     attr_accessor :response_timeout_cb
27   end
29   @response_timeout_cb = lambda do |elapsed_time, bytes_uploaded|
30     mbytes_uploaded = bytes_uploaded / (1024.0 * 1024.0)
31     # assumes worst case is 10M/s on the remote storage disk
32     t = mbytes_uploaded * 10 + elapsed_time
33     t < 5 ? 5 : t
34   end
35   # :startdoc:
37   ##
38   # The URI this file will be stored to.
40   attr_reader :uri
42   attr_reader :devid
44   ##
45   # The big_io name in case we have file > 256M
47   attr_accessor :big_io
49   attr_accessor :streaming_io
51   ##
52   # Creates a new HTTPFile with MogileFS-specific data.  Use
53   # MogileFS::MogileFS#new_file instead of this method.
55   def initialize(dests, content_length)
56     super ""
57     @streaming_io = @big_io = @uri = @devid = @active = nil
58     @dests = dests
59   end
61   def request_put(sock, uri, file_size, input = nil)
62     host_with_port = "#{uri.host}:#{uri.port}"
63     md5 = false
64     if MD5_TRAILER_NODES[host_with_port]
65       file_size = nil
66       md5 = true
67     end
69     if file_size
70       sock.write("PUT #{uri.request_uri} HTTP/1.0\r\n" \
71                  "Content-Length: #{file_size}\r\n\r\n")
72       input ? MogileFS.io.copy_stream(@active = input, sock) : yield(sock)
73     else
74       trailers = md5 ? "Trailer: Content-MD5\r\n" : ""
75       sock.write("PUT #{uri.request_uri} HTTP/1.1\r\n" \
76                  "Host: #{host_with_port}\r\n#{trailers}" \
77                  "Transfer-Encoding: chunked\r\n\r\n")
78       tmp = MogileFS::Chunker.new(sock, md5)
79       rv = input ? MogileFS.io.copy_stream(@active = input, tmp) : yield(tmp)
80       tmp.flush
81       rv
82     end
83   end
85   def put_streaming_io(sock, uri) # unlikely to be used
86     file_size = @streaming_io.length
87     written = 0
88     request_put(sock, uri, file_size) do |wr|
89       @streaming_io.call(Proc.new do |data_to_write|
90         written += wr.write(data_to_write)
91       end)
92     end
93     file_size ? file_size : written
94   end
96   def rewind_or_raise!(uri, err)
97     @active.rewind if @active
98     rescue => e
99       msg = "#{uri} failed with #{err.message} (#{err.class}) and " \
100             "retrying is impossible as rewind on " \
101             "#{@active.inspect} failed with: #{e.message} (#{e.class})"
102       raise NonRetryableError, msg, e.backtrace
103   end
105   ##
106   # Writes an HTTP PUT request to +sock+ to upload the file and
107   # returns file size if the socket finished writing
108   def upload(devid, uri) # :nodoc:
109     start = Time.now
110     sock = MogileFS::Socket.tcp(uri.host, uri.port)
111     file_size = length
113     if @streaming_io
114       file_size = put_streaming_io(sock, uri)
115     elsif @big_io
116       if String === @big_io || @big_io.respond_to?(:to_path)
117         file = File.open(@big_io)
118         stat = file.stat
119         file_size = request_put(sock, uri, stat.file? ? stat.size : nil, file)
120       else
121         size = nil
122         if @big_io.respond_to?(:stat)
123           stat = @big_io.stat
124           size = stat.size if stat.file?
125         elsif @big_io.respond_to?(:size)
126           size = @big_io.size
127         end
128         file_size = request_put(sock, uri, size, @big_io)
129       end
130     else
131       rewind
132       request_put(sock, uri, file_size, self)
133     end
135     tout = self.class.response_timeout_cb.call(Time.now - start, file_size)
137     case line = sock.timed_read(23, "", tout)
138     when %r{^HTTP/\d\.\d\s+(2\d\d)\s} # success!
139       file_size
140     when nil
141       raise EmptyResponseError, 'Unable to read response line from server'
142     when %r{^HTTP/\d\.\d\s+(\d+)}
143       raise BadResponseError, "HTTP response status from upload: #$1"
144     else
145       raise UnparseableResponseError,
146             "Response line not understood: #{line.inspect}"
147     end
148     rescue => err
149       rewind_or_raise!(uri, err)
150       raise
151     ensure
152       file.close if file
153       sock.close if sock
154   end
156   def commit
157     errors = nil
158     @dests.each do |devid, path|
159       begin
160         uri = URI.parse(path)
161         bytes_uploaded = upload(devid, uri)
162         @devid, @uri = devid, uri
163         return bytes_uploaded
164       rescue NonRetryableError
165         raise
166       rescue => e
167         errors ||= []
168         errors << "#{path} - #{e.message} (#{e.class})"
169       end
170     end
172     raise NoStorageNodesError,
173           "all paths failed with PUT: #{errors.join(', ')}", []
174   end