1 # -*- encoding: binary -*-
2 # here are internal implementation details, do not use them in your code
4 require 'mogilefs/new_file/writer'
6 class MogileFS::NewFile::Stream
10 include MogileFS::NewFile::Writer
11 include MogileFS::NewFile::Common
13 def initialize(dests, opts)
18 dests.each do |devid, path|
21 sock = MogileFS::Socket.tcp(uri.host, uri.port)
22 start_sock(sock, uri) # raise on errors
26 if ! @md5 && @opts[:content_length]
29 @writer = MogileFS::Chunker.new(@to_io, @md5, @opts[:content_md5])
32 rescue SystemCallError => e
33 sock.close if sock && ! sock.closed?
35 errors << "#{path} - #{e.message} (#{e.class})"
39 raise NoStorageNodesError,
40 "all paths failed with PUT: #{errors.join(', ')}", []
45 return 0 if 0 == buf.size
46 rv = @writer.write(buf)
54 clen = @opts[:content_length]
55 if clen && @bytes_uploaded != clen
56 raise MogileFS::SizeMismatchError,
57 "did not upload expected content_length: #{clen} uploaded: " \
60 read_response(@to_io) # raises on errors
61 create_close(@devid, @uri, @bytes_uploaded)
63 @to_io.close if @to_io && ! @to_io.closed?
66 def start_sock(sock, uri)
67 host_with_port = "#{uri.host}:#{uri.port}"
68 headers = "PUT #{uri.request_uri} HTTP/1.1\r\n" \
69 "Host: #{host_with_port}\r\n" \
71 content_md5 = @opts[:content_md5]
72 if String === content_md5
73 headers << "Content-MD5: #{content_md5}\r\n"
74 elsif content_md5.respond_to?(:call) ||
75 :trailer == content_md5 ||
76 MD5_TRAILER_NODES[host_with_port]
77 @md5 = Digest::MD5.new
78 headers << "Trailer: Content-MD5\r\n"
81 if ! @md5 && clen = @opts[:content_length]
82 headers << "Content-Length: #{clen}\r\n"
84 headers << "Transfer-Encoding: chunked\r\n"
87 sock.write(headers << "\r\n")