1 # -*- encoding: binary -*-
2 # here are internal implementation details, do not use them in your code
4 require 'mogilefs/new_file/writer'
6 class MogileFS::NewFile::Stream
10 include MogileFS::NewFile::Writer
11 include MogileFS::NewFile::Common
13 def initialize(dests, opts)
17 dests.each do |devid, path|
20 sock = MogileFS::Socket.tcp(uri.host, uri.port)
21 start_sock(sock, uri) # raise on errors
25 if ! @md5 && @opts[:content_length]
28 @writer = MogileFS::Chunker.new(@to_io, @md5, @opts[:content_md5])
31 rescue SystemCallError => e
32 sock.close if sock && ! sock.closed?
34 errors << "#{path} - #{e.message} (#{e.class})"
38 raise NoStorageNodesError,
39 "all paths failed with PUT: #{errors.join(', ')}", []
44 return 0 if 0 == buf.size
45 rv = @writer.write(buf)
53 clen = @opts[:content_length]
54 if clen && @bytes_uploaded != clen
55 raise MogileFS::SizeMismatchError,
56 "did not upload expected content_length: #{clen} uploaded: " \
59 read_response(@to_io) # raises on errors
60 create_close(@devid, @uri, @bytes_uploaded)
62 @to_io.close if @to_io && ! @to_io.closed?
65 def start_sock(sock, uri)
66 host_with_port = "#{uri.host}:#{uri.port}"
67 headers = "PUT #{uri.request_uri} HTTP/1.1\r\n" \
68 "Host: #{host_with_port}\r\n" \
70 content_md5 = @opts[:content_md5]
71 if String === content_md5
72 headers << "Content-MD5: #{content_md5}\r\n"
73 elsif content_md5.respond_to?(:call) ||
74 :trailer == content_md5 ||
75 MD5_TRAILER_NODES[host_with_port]
76 @md5 = Digest::MD5.new
77 headers << "Trailer: Content-MD5\r\n"
80 if ! @md5 && clen = @opts[:content_length]
81 headers << "Content-Length: #{clen}\r\n"
83 headers << "Transfer-Encoding: chunked\r\n"
86 sock.write(headers << "\r\n")