new_file: delay keepalive sockopts until read
[ruby-mogilefs-client.git] / lib / mogilefs / new_file / stream.rb
blob32bd339cda647b19422091128d6d4a1f44c13535
1 # -*- encoding: binary -*-
2 # here are internal implementation details, do not use them in your code
4 require 'mogilefs/new_file/writer'
6 class MogileFS::NewFile::Stream
7   attr_reader :to_io
8   attr_reader :md5
10   include MogileFS::NewFile::Writer
11   include MogileFS::NewFile::Common
13   def initialize(dests, opts)
14     @opts = opts
15     @md5 = nil
16     @bytes_uploaded = 0
17     dests.each do |devid, path|
18       begin
19         uri = URI.parse(path)
20         sock = MogileFS::Socket.tcp(uri.host, uri.port)
21         start_sock(sock, uri) # raise on errors
22         @to_io = sock
23         @uri = uri
24         @devid = devid
25         if ! @md5 && @opts[:content_length]
26           @writer = @to_io
27         else
28           @writer = MogileFS::Chunker.new(@to_io, @md5, @opts[:content_md5])
29         end
30         return
31       rescue SystemCallError => e
32         sock.close if sock && ! sock.closed?
33         errors ||= []
34         errors << "#{path} - #{e.message} (#{e.class})"
35       end
36     end
38     raise NoStorageNodesError,
39           "all paths failed with PUT: #{errors.join(', ')}", []
40   end
42   def write(buf)
43     buf = String buf
44     return 0 if 0 == buf.size
45     rv = @writer.write(buf)
46     @bytes_uploaded += rv
47     rv
48   end
50   def commit
51     @writer.flush
53     clen = @opts[:content_length]
54     if clen && @bytes_uploaded != clen
55       raise MogileFS::SizeMismatchError,
56            "did not upload expected content_length: #{clen} uploaded: " \
57            "#@bytes_uploaded"
58     end
59     read_response(@to_io) # raises on errors
60     create_close(@devid, @uri, @bytes_uploaded)
61     ensure
62       @to_io.close if @to_io && ! @to_io.closed?
63   end
65   def start_sock(sock, uri)
66     host_with_port = "#{uri.host}:#{uri.port}"
67     headers = "PUT #{uri.request_uri} HTTP/1.1\r\n" \
68               "Host: #{host_with_port}\r\n" \
70     content_md5 = @opts[:content_md5]
71     if String === content_md5
72       headers << "Content-MD5: #{content_md5}\r\n"
73     elsif content_md5.respond_to?(:call) ||
74           :trailer == content_md5 ||
75           MD5_TRAILER_NODES[host_with_port]
76       @md5 = Digest::MD5.new
77       headers << "Trailer: Content-MD5\r\n"
78     end
80     if ! @md5 && clen = @opts[:content_length]
81       headers << "Content-Length: #{clen}\r\n"
82     else
83       headers << "Transfer-Encoding: chunked\r\n"
84     end
86     sock.write(headers << "\r\n")
87   end
89   alias syswrite write
90 end