rename "Put" namespace to "NewFile"
[ruby-mogilefs-client.git] / lib / mogilefs / http_file.rb
blob8784f686d5414edd9a0df1d4f82bb2ee795cdd09
1 # -*- encoding: binary -*-
2 # here are internal implementation details, do not use them in your code
3 require 'stringio'
4 require 'mogilefs/new_file'
6 ##
7 # HTTPFile wraps up the new file operations for storing files onto an HTTP
8 # storage node.
10 # You really don't want to create an HTTPFile by hand.  Instead you want to
11 # create a new file using MogileFS::MogileFS.new_file.
13 class MogileFS::HTTPFile < StringIO
14   include MogileFS::NewFile::Common
16   ##
17   # The big_io name in case we have file > 256M
19   attr_accessor :big_io
21   attr_accessor :streaming_io
23   ##
24   # Creates a new HTTPFile with MogileFS-specific data.  Use
25   # MogileFS::MogileFS#new_file instead of this method.
27   def initialize(dests, opts = nil)
28     super ""
29     @md5 = @streaming_io = @big_io = @active = nil
30     @dests = dests
31     @opts = Integer === opts ? { :content_length => opts } : opts
32   end
34   def request_put(sock, uri, file_size, input = nil)
35     host_with_port = "#{uri.host}:#{uri.port}"
36     clen = @opts[:content_length]
37     file_size ||= clen
39     content_md5 = @opts[:content_md5]
40     if String === content_md5
41       file_size or
42         raise ArgumentError,
43               ":content_length must be specified with :content_md5 String"
44       file_size = "#{file_size}\r\nContent-MD5: #{content_md5}"
45     elsif content_md5.respond_to?(:call) ||
46           :trailer == content_md5 ||
47           MD5_TRAILER_NODES[host_with_port]
48       file_size = nil
49       @md5 = Digest::MD5.new
50     end
52     if file_size
53       sock.write("PUT #{uri.request_uri} HTTP/1.0\r\n" \
54                  "Content-Length: #{file_size}\r\n\r\n")
55       rv = input ? MogileFS.io.copy_stream(@active = input, sock) : yield(sock)
56     else
57       trailers = @md5 ? "Trailer: Content-MD5\r\n" : ""
58       sock.write("PUT #{uri.request_uri} HTTP/1.1\r\n" \
59                  "Host: #{host_with_port}\r\n#{trailers}" \
60                  "Transfer-Encoding: chunked\r\n\r\n")
61       tmp = MogileFS::Chunker.new(sock, @md5, content_md5)
62       rv = input ? MogileFS.io.copy_stream(@active = input, tmp) : yield(tmp)
63       tmp.flush
64     end
66     if clen && clen != rv
67       raise MogileFS::SizeMismatchError,
68             ":content_length expected: #{clen.inspect}, actual: #{rv.inspect}"
69     end
70     rv
71   end
73   def put_streaming_io(sock, uri) # unlikely to be used
74     file_size = @streaming_io.length
75     written = 0
76     request_put(sock, uri, file_size) do |wr|
77       @streaming_io.call(Proc.new do |data_to_write|
78         written += wr.write(data_to_write)
79       end)
80     end
81     file_size ? file_size : written
82   end
84   def rewind_or_raise!(uri, err)
85     @active.rewind if @active
86     rescue => e
87       msg = "#{uri} failed with #{err.message} (#{err.class}) and " \
88             "retrying is impossible as rewind on " \
89             "#{@active.inspect} failed with: #{e.message} (#{e.class})"
90       raise NonRetryableError, msg, e.backtrace
91   end
93   ##
94   # Writes an HTTP PUT request to +sock+ to upload the file and
95   # returns file size if the socket finished writing
96   def upload(devid, uri) # :nodoc:
97     sock = MogileFS::Socket.tcp(uri.host, uri.port)
98     set_socket_options(sock)
99     file_size = length
101     if @streaming_io
102       file_size = put_streaming_io(sock, uri)
103     elsif @big_io
104       stat = file = size = nil
105       if @big_io.respond_to?(:stat)
106         stat = @big_io.stat
107       elsif String === @big_io || @big_io.respond_to?(:to_path)
108         file = File.open(@big_io)
109         stat = file.stat
110       elsif @big_io.respond_to?(:size)
111         size = @big_io.size
112       end
113       if stat && stat.file?
114         size ||= stat.size
115         file ||= @big_io.to_io if @big_io.respond_to?(:to_io)
116       end
117       file_size = request_put(sock, uri, size, file || @big_io)
118     else
119       rewind
120       request_put(sock, uri, file_size, self)
121     end
123     read_response(sock) # raises on errors
124     file_size
125     rescue SystemCallError, RetryableError => err
126       rewind_or_raise!(uri, err)
127       raise
128     ensure
129       file.close if file && @big_io != file
130       sock.close if sock
131   end
133   def commit
134     errors = nil
135     @dests.each do |devid, path|
136       begin
137         uri = URI.parse(path)
138         bytes_uploaded = upload(devid, uri)
139         return create_close(devid, uri, bytes_uploaded)
140       rescue SystemCallError, RetryableError => e
141         errors ||= []
142         errors << "#{path} - #{e.message} (#{e.class})"
143       end
144     end
146     raise NoStorageNodesError,
147           "all paths failed with PUT: #{errors.join(', ')}", []
148   end
150   def close
151     commit
152     super
153   end