new_file: delay keepalive sockopts until read
[ruby-mogilefs-client.git] / lib / mogilefs / http_file.rb
blob85b4578e06b9423dab46b9166f83f777dba6523f
1 # -*- encoding: binary -*-
2 # here are internal implementation details, do not use them in your code
3 require 'stringio'
4 require 'mogilefs/new_file'
6 ##
7 # HTTPFile wraps up the new file operations for storing files onto an HTTP
8 # storage node.
10 # You really don't want to create an HTTPFile by hand.  Instead you want to
11 # create a new file using MogileFS::MogileFS.new_file.
13 class MogileFS::HTTPFile < StringIO
14   include MogileFS::NewFile::Common
16   ##
17   # The big_io name in case we have file > 256M
19   attr_accessor :big_io
21   attr_accessor :streaming_io
23   ##
24   # Creates a new HTTPFile with MogileFS-specific data.  Use
25   # MogileFS::MogileFS#new_file instead of this method.
27   def initialize(dests, opts = nil)
28     super ""
29     @md5 = @streaming_io = @big_io = @active = nil
30     @dests = dests
31     @opts = Integer === opts ? { :content_length => opts } : opts
32   end
34   def request_put(sock, uri, file_size, input = nil)
35     host_with_port = "#{uri.host}:#{uri.port}"
36     clen = @opts[:content_length]
37     file_size ||= clen
39     content_md5 = @opts[:content_md5]
40     if String === content_md5
41       file_size or
42         raise ArgumentError,
43               ":content_length must be specified with :content_md5 String"
44       file_size = "#{file_size}\r\nContent-MD5: #{content_md5}"
45     elsif content_md5.respond_to?(:call) ||
46           :trailer == content_md5 ||
47           MD5_TRAILER_NODES[host_with_port]
48       file_size = nil
49       @md5 = Digest::MD5.new
50     end
52     if file_size
53       sock.write("PUT #{uri.request_uri} HTTP/1.0\r\n" \
54                  "Content-Length: #{file_size}\r\n\r\n")
55       rv = input ? MogileFS.io.copy_stream(@active = input, sock) : yield(sock)
56     else
57       trailers = @md5 ? "Trailer: Content-MD5\r\n" : ""
58       sock.write("PUT #{uri.request_uri} HTTP/1.1\r\n" \
59                  "Host: #{host_with_port}\r\n#{trailers}" \
60                  "Transfer-Encoding: chunked\r\n\r\n")
61       tmp = MogileFS::Chunker.new(sock, @md5, content_md5)
62       rv = input ? MogileFS.io.copy_stream(@active = input, tmp) : yield(tmp)
63       tmp.flush
64     end
66     if clen && clen != rv
67       raise MogileFS::SizeMismatchError,
68             ":content_length expected: #{clen.inspect}, actual: #{rv.inspect}"
69     end
70     rv
71   end
73   def put_streaming_io(sock, uri) # unlikely to be used
74     file_size = @streaming_io.length
75     written = 0
76     request_put(sock, uri, file_size) do |wr|
77       @streaming_io.call(Proc.new do |data_to_write|
78         written += wr.write(data_to_write)
79       end)
80     end
81     file_size ? file_size : written
82   end
84   def rewind_or_raise!(uri, err)
85     @active.rewind if @active
86     rescue => e
87       msg = "#{uri} failed with #{err.message} (#{err.class}) and " \
88             "retrying is impossible as rewind on " \
89             "#{@active.inspect} failed with: #{e.message} (#{e.class})"
90       raise NonRetryableError, msg, e.backtrace
91   end
93   ##
94   # Writes an HTTP PUT request to +sock+ to upload the file and
95   # returns file size if the socket finished writing
96   def upload(devid, uri) # :nodoc:
97     sock = MogileFS::Socket.tcp(uri.host, uri.port)
98     file_size = length
100     if @streaming_io
101       file_size = put_streaming_io(sock, uri)
102     elsif @big_io
103       stat = file = size = nil
104       if @big_io.respond_to?(:stat)
105         stat = @big_io.stat
106       elsif String === @big_io || @big_io.respond_to?(:to_path)
107         begin
108           file = File.open(@big_io)
109         rescue => e
110           msg = "Failed to open input (#{@big_io.inspect}): " \
111                 "#{e.message} (#{e.class})"
112           raise NonRetryableError, msg, e.backtrace
113         end
114         stat = file.stat
115       elsif @big_io.respond_to?(:size)
116         size = @big_io.size
117       end
118       if stat && stat.file?
119         size ||= stat.size
120         file ||= @big_io.to_io if @big_io.respond_to?(:to_io)
121       end
122       file_size = request_put(sock, uri, size, file || @big_io)
123     else
124       rewind
125       request_put(sock, uri, file_size, self)
126     end
128     read_response(sock) # raises on errors
129     file_size
130     rescue SystemCallError, RetryableError => err
131       rewind_or_raise!(uri, err)
132       raise
133     ensure
134       file.close if file && @big_io != file
135       sock.close if sock
136   end
138   def nhp_put(devid, uri)
139     clen = @opts[:content_length]
140     if clen && clen != size
141       raise MogileFS::SizeMismatchError,
142         ":content_length expected: #{clen.inspect}, actual: #{size}"
143     end
145     put = Net::HTTP::Put.new(uri.path)
146     put["Content-Type"] = "application/octet-stream"
147     if md5 = @opts[:content_md5]
148       if md5.respond_to?(:call)
149         md5 = md5.call.strip
150       elsif md5 == :trailer
151         md5 = [ Digest::MD5.digest(string) ].pack("m").chomp!
152       end
153       put["Content-MD5"] = md5
154     end
155     put.body = string
156     res = MogileFS::NHP.request(uri, put)
157     return size if Net::HTTPSuccess === res
158     raise BadResponseError, "#{res.code} #{res.message}"
159   rescue => e
160     /\ANet::/ =~ "#{e.class}" and
161         raise RetryableError, "#{e.message} (#{e.class})", e.backtrace
162     raise
163   end
165   def commit
166     errors = nil
167     @dests.each do |devid, path|
168       begin
169         uri = URI.parse(path)
170         bytes_uploaded = size > 0 ? nhp_put(devid, uri) : upload(devid, uri)
171         return create_close(devid, uri, bytes_uploaded)
172       rescue Timeout::Error, SystemCallError, RetryableError => e
173         errors ||= []
174         errors << "#{path} - #{e.message} (#{e.class})"
175       end
176     end
178     raise NoStorageNodesError,
179           "all paths failed with PUT: #{errors.join(', ')}", []
180   end
182   def close
183     commit
184     super
185   end