http_file: timeout scaling for "write" method
[ruby-mogilefs-client.git] / lib / mogilefs / http_file.rb
blob4d4a850e01488887bb4e0d704fb99fb0724c3eb2
1 # -*- encoding: binary -*-
2 # here are internal implementation details, do not use them in your code
3 require 'stringio'
4 require 'uri'
5 require 'mogilefs/chunker'
7 ##
8 # HTTPFile wraps up the new file operations for storing files onto an HTTP
9 # storage node.
11 # You really don't want to create an HTTPFile by hand.  Instead you want to
12 # create a new file using MogileFS::MogileFS.new_file.
14 class MogileFS::HTTPFile < StringIO
15   class EmptyResponseError < MogileFS::Error; end
16   class BadResponseError < MogileFS::Error; end
17   class UnparseableResponseError < MogileFS::Error; end
18   class NoStorageNodesError < MogileFS::Error
19     def message; 'Unable to open socket to storage node'; end
20   end
21   class NonRetryableError < MogileFS::Error; end
23   class HTTPSock < MogileFS::Socket
24     attr_accessor :start
26     # Increase timeout as we become more invested in uploading with
27     # this socket.  The server could be experiencing I/O delays
28     # from large uploads because the sysadmin forgot to tune the
29     # VM sysctls for handling large files.
30     def write(buf)
31       timed_write(buf, Time.now - @start + 5.0)
32     end
33   end
35   # :stopdoc:
36   MD5_TRAILER_NODES = {} # :nodoc: # EXPERIMENTAL
37   class << self
38     attr_accessor :response_timeout_cb
39   end
41   # temporary directories (nginx) may not be configured on the
42   # same device, necessitating a time-consuming full file copy
43   # instead of a quick rename(2)/link(2) operation
44   @response_timeout_cb = lambda do |elapsed_time, bytes_uploaded|
45     mbytes_uploaded = bytes_uploaded / (1024.0 * 1024.0)
46     # assumes worst case is 10M/s on the remote storage disk
47     t = mbytes_uploaded * 10 + elapsed_time
48     t < 5 ? 5 : t
49   end
50   # :startdoc:
52   ##
53   # The URI this file will be stored to.
55   attr_reader :uri
57   attr_reader :devid
59   ##
60   # The big_io name in case we have file > 256M
62   attr_accessor :big_io
64   attr_accessor :streaming_io
66   ##
67   # Creates a new HTTPFile with MogileFS-specific data.  Use
68   # MogileFS::MogileFS#new_file instead of this method.
70   def initialize(dests, content_length)
71     super ""
72     @streaming_io = @big_io = @uri = @devid = @active = nil
73     @dests = dests
74   end
76   def request_put(sock, uri, file_size, input = nil)
77     host_with_port = "#{uri.host}:#{uri.port}"
78     md5 = false
79     if MD5_TRAILER_NODES[host_with_port]
80       file_size = nil
81       md5 = true
82     end
84     if file_size
85       sock.write("PUT #{uri.request_uri} HTTP/1.0\r\n" \
86                  "Content-Length: #{file_size}\r\n\r\n")
87       input ? MogileFS.io.copy_stream(@active = input, sock) : yield(sock)
88     else
89       trailers = md5 ? "Trailer: Content-MD5\r\n" : ""
90       sock.write("PUT #{uri.request_uri} HTTP/1.1\r\n" \
91                  "Host: #{host_with_port}\r\n#{trailers}" \
92                  "Transfer-Encoding: chunked\r\n\r\n")
93       tmp = MogileFS::Chunker.new(sock, md5)
94       rv = input ? MogileFS.io.copy_stream(@active = input, tmp) : yield(tmp)
95       tmp.flush
96       rv
97     end
98   end
100   def put_streaming_io(sock, uri) # unlikely to be used
101     file_size = @streaming_io.length
102     written = 0
103     request_put(sock, uri, file_size) do |wr|
104       @streaming_io.call(Proc.new do |data_to_write|
105         written += wr.write(data_to_write)
106       end)
107     end
108     file_size ? file_size : written
109   end
111   def rewind_or_raise!(uri, err)
112     @active.rewind if @active
113     rescue => e
114       msg = "#{uri} failed with #{err.message} (#{err.class}) and " \
115             "retrying is impossible as rewind on " \
116             "#{@active.inspect} failed with: #{e.message} (#{e.class})"
117       raise NonRetryableError, msg, e.backtrace
118   end
120   ##
121   # Writes an HTTP PUT request to +sock+ to upload the file and
122   # returns file size if the socket finished writing
123   def upload(devid, uri) # :nodoc:
124     start = Time.now
125     sock = HTTPSock.tcp(uri.host, uri.port)
126     sock.start = start
127     file_size = length
129     if @streaming_io
130       file_size = put_streaming_io(sock, uri)
131     elsif @big_io
132       if String === @big_io || @big_io.respond_to?(:to_path)
133         file = File.open(@big_io)
134         stat = file.stat
135         file_size = request_put(sock, uri, stat.file? ? stat.size : nil, file)
136       else
137         size = nil
138         if @big_io.respond_to?(:stat)
139           stat = @big_io.stat
140           size = stat.size if stat.file?
141         elsif @big_io.respond_to?(:size)
142           size = @big_io.size
143         end
144         file_size = request_put(sock, uri, size, @big_io)
145       end
146     else
147       rewind
148       request_put(sock, uri, file_size, self)
149     end
151     tout = self.class.response_timeout_cb.call(Time.now - start, file_size)
153     case line = sock.timed_read(23, "", tout)
154     when %r{^HTTP/\d\.\d\s+(2\d\d)\s} # success!
155       file_size
156     when nil
157       raise EmptyResponseError, 'Unable to read response line from server'
158     when %r{^HTTP/\d\.\d\s+(\d+)}
159       raise BadResponseError, "HTTP response status from upload: #$1"
160     else
161       raise UnparseableResponseError,
162             "Response line not understood: #{line.inspect}"
163     end
164     rescue => err
165       rewind_or_raise!(uri, err)
166       raise
167     ensure
168       file.close if file
169       sock.close if sock
170   end
172   def commit
173     errors = nil
174     @dests.each do |devid, path|
175       begin
176         uri = URI.parse(path)
177         bytes_uploaded = upload(devid, uri)
178         @devid, @uri = devid, uri
179         return bytes_uploaded
180       rescue NonRetryableError
181         raise
182       rescue => e
183         errors ||= []
184         errors << "#{path} - #{e.message} (#{e.class})"
185       end
186     end
188     raise NoStorageNodesError,
189           "all paths failed with PUT: #{errors.join(', ')}", []
190   end