new_file/stream: correct declare errors array correctly
[ruby-mogilefs-client.git] / lib / mogilefs / new_file / stream.rb
bloba71be74b42abce31d1894bbf8d2fcab579b15b70
1 # -*- encoding: binary -*-
2 # here are internal implementation details, do not use them in your code
4 require 'mogilefs/new_file/writer'
6 class MogileFS::NewFile::Stream
7   attr_reader :to_io
8   attr_reader :md5
10   include MogileFS::NewFile::Writer
11   include MogileFS::NewFile::Common
13   def initialize(dests, opts)
14     @opts = opts
15     @md5 = nil
16     @bytes_uploaded = 0
17     error = nil
18     dests.each do |devid, path|
19       begin
20         uri = URI.parse(path)
21         sock = MogileFS::Socket.tcp(uri.host, uri.port)
22         start_sock(sock, uri) # raise on errors
23         @to_io = sock
24         @uri = uri
25         @devid = devid
26         if ! @md5 && @opts[:content_length]
27           @writer = @to_io
28         else
29           @writer = MogileFS::Chunker.new(@to_io, @md5, @opts[:content_md5])
30         end
31         return
32       rescue SystemCallError => e
33         sock.close if sock && ! sock.closed?
34         errors ||= []
35         errors << "#{path} - #{e.message} (#{e.class})"
36       end
37     end
39     raise NoStorageNodesError,
40           "all paths failed with PUT: #{errors.join(', ')}", []
41   end
43   def write(buf)
44     buf = String buf
45     return 0 if 0 == buf.size
46     rv = @writer.write(buf)
47     @bytes_uploaded += rv
48     rv
49   end
51   def commit
52     @writer.flush
54     clen = @opts[:content_length]
55     if clen && @bytes_uploaded != clen
56       raise MogileFS::SizeMismatchError,
57            "did not upload expected content_length: #{clen} uploaded: " \
58            "#@bytes_uploaded"
59     end
60     read_response(@to_io) # raises on errors
61     create_close(@devid, @uri, @bytes_uploaded)
62     ensure
63       @to_io.close if @to_io && ! @to_io.closed?
64   end
66   def start_sock(sock, uri)
67     host_with_port = "#{uri.host}:#{uri.port}"
68     headers = "PUT #{uri.request_uri} HTTP/1.1\r\n" \
69               "Host: #{host_with_port}\r\n" \
71     content_md5 = @opts[:content_md5]
72     if String === content_md5
73       headers << "Content-MD5: #{content_md5}\r\n"
74     elsif content_md5.respond_to?(:call) ||
75           :trailer == content_md5 ||
76           MD5_TRAILER_NODES[host_with_port]
77       @md5 = Digest::MD5.new
78       headers << "Trailer: Content-MD5\r\n"
79     end
81     if ! @md5 && clen = @opts[:content_length]
82       headers << "Content-Length: #{clen}\r\n"
83     else
84       headers << "Transfer-Encoding: chunked\r\n"
85     end
87     sock.write(headers << "\r\n")
88   end
90   alias syswrite write
91 end