7871940a0e2a3ff49a56198fe410815bf0904d81
[ruby-mogilefs-client.git] / lib / mogilefs / http_file.rb
blob7871940a0e2a3ff49a56198fe410815bf0904d81
1 # -*- encoding: binary -*-
2 # here are internal implementation details, do not use them in your code
3 require 'stringio'
4 require 'mogilefs/new_file'
6 ##
7 # HTTPFile wraps up the new file operations for storing files onto an HTTP
8 # storage node.
10 # You really don't want to create an HTTPFile by hand.  Instead you want to
11 # create a new file using MogileFS::MogileFS.new_file.
13 class MogileFS::HTTPFile < StringIO
14   include MogileFS::NewFile::Common
16   ##
17   # The big_io name in case we have file > 256M
19   attr_accessor :big_io
21   attr_accessor :streaming_io
23   ##
24   # Creates a new HTTPFile with MogileFS-specific data.  Use
25   # MogileFS::MogileFS#new_file instead of this method.
27   def initialize(dests, opts = nil)
28     super ""
29     @md5 = @streaming_io = @big_io = @active = nil
30     @dests = dests
31     @opts = Integer === opts ? { :content_length => opts } : opts
32   end
34   def request_put(sock, uri, file_size, input = nil)
35     host_with_port = "#{uri.host}:#{uri.port}"
36     clen = @opts[:content_length]
37     file_size ||= clen
39     content_md5 = @opts[:content_md5]
40     if String === content_md5
41       file_size or
42         raise ArgumentError,
43               ":content_length must be specified with :content_md5 String"
44       file_size = "#{file_size}\r\nContent-MD5: #{content_md5}"
45     elsif content_md5.respond_to?(:call) ||
46           :trailer == content_md5 ||
47           MD5_TRAILER_NODES[host_with_port]
48       file_size = nil
49       @md5 = Digest::MD5.new
50     end
52     if file_size
53       sock.write("PUT #{uri.request_uri} HTTP/1.0\r\n" \
54                  "Content-Length: #{file_size}\r\n\r\n")
55       rv = input ? MogileFS.io.copy_stream(@active = input, sock) : yield(sock)
56     else
57       trailers = @md5 ? "Trailer: Content-MD5\r\n" : ""
58       sock.write("PUT #{uri.request_uri} HTTP/1.1\r\n" \
59                  "Host: #{host_with_port}\r\n#{trailers}" \
60                  "Transfer-Encoding: chunked\r\n\r\n")
61       tmp = MogileFS::Chunker.new(sock, @md5, content_md5)
62       rv = input ? MogileFS.io.copy_stream(@active = input, tmp) : yield(tmp)
63       tmp.flush
64     end
66     if clen && clen != rv
67       raise MogileFS::SizeMismatchError,
68             ":content_length expected: #{clen.inspect}, actual: #{rv.inspect}"
69     end
70     rv
71   end
73   def put_streaming_io(sock, uri) # unlikely to be used
74     file_size = @streaming_io.length
75     written = 0
76     request_put(sock, uri, file_size) do |wr|
77       @streaming_io.call(Proc.new do |data_to_write|
78         written += wr.write(data_to_write)
79       end)
80     end
81     file_size ? file_size : written
82   end
84   def rewind_or_raise!(uri, err)
85     @active.rewind if @active
86     rescue => e
87       msg = "#{uri} failed with #{err.message} (#{err.class}) and " \
88             "retrying is impossible as rewind on " \
89             "#{@active.inspect} failed with: #{e.message} (#{e.class})"
90       raise NonRetryableError, msg, e.backtrace
91   end
93   ##
94   # Writes an HTTP PUT request to +sock+ to upload the file and
95   # returns file size if the socket finished writing
96   def upload(devid, uri) # :nodoc:
97     sock = MogileFS::Socket.tcp(uri.host, uri.port)
98     set_socket_options(sock)
99     file_size = length
101     if @streaming_io
102       file_size = put_streaming_io(sock, uri)
103     elsif @big_io
104       stat = file = size = nil
105       if @big_io.respond_to?(:stat)
106         stat = @big_io.stat
107       elsif String === @big_io || @big_io.respond_to?(:to_path)
108         begin
109           file = File.open(@big_io)
110         rescue => e
111           msg = "Failed to open input (#{@big_io.inspect}): " \
112                 "#{e.message} (#{e.class})"
113           raise NonRetryableError, msg, e.backtrace
114         end
115         stat = file.stat
116       elsif @big_io.respond_to?(:size)
117         size = @big_io.size
118       end
119       if stat && stat.file?
120         size ||= stat.size
121         file ||= @big_io.to_io if @big_io.respond_to?(:to_io)
122       end
123       file_size = request_put(sock, uri, size, file || @big_io)
124     else
125       rewind
126       request_put(sock, uri, file_size, self)
127     end
129     read_response(sock) # raises on errors
130     file_size
131     rescue SystemCallError, RetryableError => err
132       rewind_or_raise!(uri, err)
133       raise
134     ensure
135       file.close if file && @big_io != file
136       sock.close if sock
137   end
139   def commit
140     errors = nil
141     @dests.each do |devid, path|
142       begin
143         uri = URI.parse(path)
144         bytes_uploaded = upload(devid, uri)
145         return create_close(devid, uri, bytes_uploaded)
146       rescue SystemCallError, RetryableError => e
147         errors ||= []
148         errors << "#{path} - #{e.message} (#{e.class})"
149       end
150     end
152     raise NoStorageNodesError,
153           "all paths failed with PUT: #{errors.join(', ')}", []
154   end
156   def close
157     commit
158     super
159   end