http_file: use String#inspect on broken server response
[ruby-mogilefs-client.git] / lib / mogilefs / http_file.rb
blobe2aa8dc73b804ff2e6c77db3f0b1f486ca60009f
1 # -*- encoding: binary -*-
2 # here are internal implementation details, do not use them in your code
3 require 'stringio'
4 require 'uri'
5 require 'mogilefs/chunker'
7 ##
8 # HTTPFile wraps up the new file operations for storing files onto an HTTP
9 # storage node.
11 # You really don't want to create an HTTPFile by hand.  Instead you want to
12 # create a new file using MogileFS::MogileFS.new_file.
14 class MogileFS::HTTPFile < StringIO
15   class EmptyResponseError < MogileFS::Error; end
16   class BadResponseError < MogileFS::Error; end
17   class UnparseableResponseError < MogileFS::Error; end
18   class NoStorageNodesError < MogileFS::Error
19     def message; 'Unable to open socket to storage node'; end
20   end
21   class NonRetryableError < MogileFS::Error; end
23   ##
24   # The URI this file will be stored to.
26   attr_reader :uri
28   attr_reader :devid
30   ##
31   # The big_io name in case we have file > 256M
33   attr_accessor :big_io
35   attr_accessor :streaming_io
37   ##
38   # Creates a new HTTPFile with MogileFS-specific data.  Use
39   # MogileFS::MogileFS#new_file instead of this method.
41   def initialize(dests, content_length)
42     super ""
43     @streaming_io = @big_io = @uri = @devid = @active = nil
44     @dests = dests
45   end
47   def request_put(sock, uri, file_size, input = nil)
48     if file_size
49       sock.write("PUT #{uri.request_uri} HTTP/1.0\r\n" \
50                  "Content-Length: #{file_size}\r\n\r\n")
51       input ? MogileFS::X.copy_stream(@active = input, sock) : yield(sock)
52     else
53       sock.write("PUT #{uri.request_uri} HTTP/1.1\r\n" \
54                  "Host: #{uri.host}:#{uri.port}\r\n" \
55                  "Transfer-Encoding: chunked\r\n\r\n")
56       tmp = MogileFS::Chunker.new(sock)
57       rv = input ? MogileFS::X.copy_stream(@active = input, tmp) : yield(tmp)
58       tmp.flush
59       rv
60     end
61   end
63   def put_streaming_io(sock, uri) # unlikely to be used
64     file_size = @streaming_io.length
65     written = 0
66     request_put(sock, uri, file_size) do |wr|
67       @streaming_io.call(Proc.new do |data_to_write|
68         written += wr.write(data_to_write)
69       end)
70     end
71     file_size ? file_size : written
72   end
74   def rewind_or_raise!(uri, err)
75     @active.rewind if @active
76     rescue => e
77       msg = "#{uri} failed with #{err.message} (#{err.class}) and " \
78             "retrying is impossible as rewind on " \
79             "#{@active.inspect} failed with: #{e.message} (#{e.class})"
80       raise NonRetryableError, msg, e.backtrace
81   end
83   ##
84   # Writes an HTTP PUT request to +sock+ to upload the file and
85   # returns file size if the socket finished writing
86   def upload(devid, uri) # :nodoc:
87     sock = MogileFS::Socket.tcp(uri.host, uri.port)
88     file_size = length
90     if @streaming_io
91       file_size = put_streaming_io(sock, uri)
92     elsif @big_io
93       if String === @big_io || @big_io.respond_to?(:to_path)
94         file = File.open(@big_io)
95         stat = file.stat
96         file_size = request_put(sock, uri, stat.file? ? stat.size : nil, file)
97       else
98         size = nil
99         if @big_io.respond_to?(:stat)
100           stat = @big_io.stat
101           size = stat.size if stat.file?
102         elsif @big_io.respond_to?(:size)
103           size = @big_io.size
104         end
105         file_size = request_put(sock, uri, size, @big_io)
106       end
107     else
108       sock.write("PUT #{uri.request_uri} HTTP/1.0\r\n" \
109                  "Content-Length: #{file_size}\r\n\r\n#{string}")
110     end
112     case line = sock.timed_read(23, "")
113     when %r{^HTTP/\d\.\d\s+(2\d\d)\s} # success!
114       file_size
115     when nil
116       raise EmptyResponseError, 'Unable to read response line from server'
117     when %r{^HTTP/\d\.\d\s+(\d+)}
118       raise BadResponseError, "HTTP response status from upload: #$1"
119     else
120       raise UnparseableResponseError,
121             "Response line not understood: #{line.inspect}"
122     end
123     rescue => err
124       rewind_or_raise!(uri, err)
125       raise
126     ensure
127       file.close if file
128       sock.close if sock
129   end
131   def commit
132     errors = nil
133     @dests.each do |devid, path|
134       begin
135         uri = URI.parse(path)
136         bytes_uploaded = upload(devid, uri)
137         @devid, @uri = devid, uri
138         return bytes_uploaded
139       rescue NonRetryableError
140         raise
141       rescue => e
142         errors ||= []
143         errors << "#{path} - #{e.message} (#{e.class})"
144       end
145     end
147     raise NoStorageNodesError,
148           "all paths failed with PUT: #{errors.join(', ')}", []
149   end