tests: speedup tests that require hitting mogstored
[ruby-mogilefs-client.git] / lib / mogilefs / put / stream.rb
blob590a24f6f514291b8d018bfb8dcc157ef08e7291
1 # -*- encoding: binary -*-
2 # here are internal implementation details, do not use them in your code
4 require 'mogilefs/new_file_common'
5 require 'mogilefs/new_file_writer'
7 class MogileFS::Put::Stream
8   attr_reader :to_io
9   attr_reader :md5
11   include MogileFS::NewFileWriter
12   include MogileFS::NewFileCommon
14   def initialize(dests, opts)
15     @opts = opts
16     @md5 = nil
17     @bytes_uploaded = 0
18     dests.each do |devid, path|
19       begin
20         uri = URI.parse(path)
21         sock = MogileFS::Socket.tcp(uri.host, uri.port)
22         set_socket_options(sock)
23         start_sock(sock, uri) # raise on errors
24         @to_io = sock
25         @uri = uri
26         @devid = devid
27         if ! @md5 && @opts[:content_length]
28           @writer = @to_io
29         else
30           @writer = MogileFS::Chunker.new(@to_io, @md5, @opts[:content_md5])
31         end
32         return
33       rescue SystemCallError => e
34         sock.close if sock && ! sock.closed?
35         errors ||= []
36         errors << "#{path} - #{e.message} (#{e.class})"
37       end
38     end
40     raise NoStorageNodesError,
41           "all paths failed with PUT: #{errors.join(', ')}", []
42   end
44   def write(buf)
45     buf = String buf
46     return 0 if 0 == buf.size
47     rv = @writer.write(buf)
48     @bytes_uploaded += rv
49     rv
50   end
52   def commit
53     @writer.flush
55     clen = @opts[:content_length]
56     if clen && @bytes_uploaded != clen
57       raise MogileFS::SizeMismatchError,
58            "did not upload expected content_length: #{clen} uploaded: " \
59            "#@bytes_uploaded"
60     end
61     read_response(@to_io) # raises on errors
62     create_close(@devid, @uri, @bytes_uploaded)
63     ensure
64       @to_io.close if @to_io && ! @to_io.closed?
65   end
67   def start_sock(sock, uri)
68     host_with_port = "#{uri.host}:#{uri.port}"
69     headers = "PUT #{uri.request_uri} HTTP/1.1\r\n" \
70               "Host: #{host_with_port}\r\n" \
72     content_md5 = @opts[:content_md5]
73     if String === content_md5
74       headers << "Content-MD5: #{content_md5}\r\n"
75     elsif content_md5.respond_to?(:call) ||
76           :trailer == content_md5 ||
77           MD5_TRAILER_NODES[host_with_port]
78       @md5 = Digest::MD5.new
79       headers << "Trailer: Content-MD5\r\n"
80     end
82     if ! @md5 && clen = @opts[:content_length]
83       headers << "Content-Length: #{clen}\r\n"
84     else
85       headers << "Transfer-Encoding: chunked\r\n"
86     end
88     sock.write(headers << "\r\n")
89   end
91   alias syswrite write
92 end