1 # -*- encoding: binary -*-
2 # here are internal implementation details, do not use them in your code
4 require 'mogilefs/new_file_common'
5 require 'mogilefs/new_file_writer'
7 class MogileFS::Put::Stream
11 include MogileFS::NewFileWriter
12 include MogileFS::NewFileCommon
14 def initialize(dests, opts)
18 dests.each do |devid, path|
21 sock = MogileFS::Socket.tcp(uri.host, uri.port)
22 set_socket_options(sock)
23 start_sock(sock, uri) # raise on errors
27 if ! @md5 && @opts[:content_length]
30 @writer = MogileFS::Chunker.new(@to_io, @md5, @opts[:content_md5])
33 rescue SystemCallError => e
34 sock.close if sock && ! sock.closed?
36 errors << "#{path} - #{e.message} (#{e.class})"
40 raise NoStorageNodesError,
41 "all paths failed with PUT: #{errors.join(', ')}", []
46 return 0 if 0 == buf.size
47 rv = @writer.write(buf)
55 clen = @opts[:content_length]
56 if clen && @bytes_uploaded != clen
57 raise MogileFS::SizeMismatchError,
58 "did not upload expected content_length: #{clen} uploaded: " \
61 read_response(@to_io) # raises on errors
62 create_close(@devid, @uri, @bytes_uploaded)
64 @to_io.close if @to_io && ! @to_io.closed?
67 def start_sock(sock, uri)
68 host_with_port = "#{uri.host}:#{uri.port}"
69 headers = "PUT #{uri.request_uri} HTTP/1.1\r\n" \
70 "Host: #{host_with_port}\r\n" \
72 content_md5 = @opts[:content_md5]
73 if String === content_md5
74 headers << "Content-MD5: #{content_md5}\r\n"
75 elsif content_md5.respond_to?(:call) ||
76 :trailer == content_md5 ||
77 MD5_TRAILER_NODES[host_with_port]
78 @md5 = Digest::MD5.new
79 headers << "Trailer: Content-MD5\r\n"
82 if ! @md5 && clen = @opts[:content_length]
83 headers << "Content-Length: #{clen}\r\n"
85 headers << "Transfer-Encoding: chunked\r\n"
88 sock.write(headers << "\r\n")