redo IO.copy_stream usage
[ruby-mogilefs-client.git] / lib / mogilefs / http_file.rb
blob261e2098def1df4ac833c2f690f79918d08378cd
1 # -*- encoding: binary -*-
2 # here are internal implementation details, do not use them in your code
3 require 'stringio'
4 require 'uri'
5 require 'mogilefs/backend'
7 ##
8 # HTTPFile wraps up the new file operations for storing files onto an HTTP
9 # storage node.
11 # You really don't want to create an HTTPFile by hand.  Instead you want to
12 # create a new file using MogileFS::MogileFS.new_file.
14 class MogileFS::HTTPFile < StringIO
15   class EmptyResponseError < MogileFS::Error; end
16   class BadResponseError < MogileFS::Error; end
17   class UnparseableResponseError < MogileFS::Error; end
18   class NoStorageNodesError < MogileFS::Error
19     def message; 'Unable to open socket to storage node'; end
20   end
22   ##
23   # The URI this file will be stored to.
25   attr_reader :uri
27   attr_reader :devid
29   ##
30   # The big_io name in case we have file > 256M
32   attr_accessor :big_io
34   attr_accessor :streaming_io
36   ##
37   # Creates a new HTTPFile with MogileFS-specific data.  Use
38   # MogileFS::MogileFS#new_file instead of this method.
40   def initialize(dests, content_length)
41     super ""
42     @streaming_io = @big_io = @uri = @devid = nil
43     @dests = dests
44     @tried = {}
45   end
47   ##
48   # Writes an HTTP PUT request to +sock+ to upload the file and
49   # returns file size if the socket finished writing
50   def upload(devid, uri) # :nodoc:
51     file_size = length
52     sock = MogileFS::Socket.tcp(uri.host, uri.port)
54     if @streaming_io
55       file_size = @streaming_io.length
56       sock.write("PUT #{uri.request_uri} HTTP/1.0\r\n" \
57                  "Content-Length: #{file_size}\r\n\r\n")
58       @streaming_io.call(Proc.new do |data_to_write|
59         sock.write(data_to_write)
60       end)
61     elsif @big_io
62       # Don't try to run out of memory
63       File.open(@big_io) do |fp|
64         file_size = fp.stat.size
65         sock.write("PUT #{uri.request_uri} HTTP/1.0\r\n" \
66                    "Content-Length: #{file_size}\r\n\r\n")
67         MogileFS::X.copy_stream(fp, sock)
68       end
69     else
70       sock.write("PUT #{uri.request_uri} HTTP/1.0\r\n" \
71                  "Content-Length: #{length}\r\n\r\n#{string}")
72     end
74     case line = sock.timed_read(23, "")
75     when %r{^HTTP/\d\.\d\s+(2\d\d)\s} # success!
76       file_size
77     when nil
78       raise EmptyResponseError, 'Unable to read response line from server'
79     when %r{^HTTP/\d\.\d\s+(\d+)}
80       raise BadResponseError, "HTTP response status from upload: #$1"
81     else
82       raise UnparseableResponseError, "Response line not understood: #{line}"
83     end
84     ensure
85       sock.close if sock && ! sock.closed?
86   end
88   def commit
89     errors = nil
90     @dests.each do |devid, path|
91       begin
92         uri = URI.parse(path)
93         bytes_uploaded = upload(devid, uri)
94         @devid, @uri = devid, uri
95         return bytes_uploaded
96       rescue => e
97         errors ||= []
98         errors << "#{path} failed with #{e.message} (#{e.class})"
99       end
100     end
102     raise NoStorageNodesError,
103           "all paths failed with PUT: #{errors.join(', ')}", []
104   end