new_file: support create_open_args and create_close_args
[ruby-mogilefs-client.git] / lib / mogilefs / new_file / common.rb
blobb901f67973bd8e0c87364144f56138109b78f90d
1 # -*- encoding: binary -*-
2 # here are internal implementation details, do not use them in your code
3 require 'socket'
4 require 'uri'
5 require 'digest/md5'
6 require 'mogilefs/chunker'
8 module MogileFS::NewFile::Common
9   # :stopdoc:
10   class RetryableError < MogileFS::Error; end
11   class EmptyResponseError < RetryableError; end
12   class BadResponseError < RetryableError; end
13   class UnparseableResponseError < RetryableError; end
14   class NoStorageNodesError < MogileFS::Error
15     def message; 'Unable to open socket to storage node'; end
16   end
17   class NonRetryableError < MogileFS::Error; end
19   MD5_TRAILER_NODES = {} # :nodoc: # EXPERIMENTAL
21   def read_response(sock)
22     tout = @opts[:new_file_max_time] || 3600.0
23     start_time = @opts[:start_time] and tout -= Time.now - start_time
24     set_socket_options(sock)
25     case line = sock.timed_read(23, "", tout > 0.0 ? tout : 0)
26     when %r{^HTTP/\d\.\d\s+(2\d\d)\s} # success!
27     when nil
28       raise EmptyResponseError, 'Unable to read response line from server'
29     when %r{^HTTP/\d\.\d\s+(\d+)}
30       raise BadResponseError, "HTTP response status from upload: #$1"
31     else
32       raise UnparseableResponseError,
33             "Response line not understood: #{line.inspect}"
34     end
35   end
37   def create_close(devid, uri, bytes_uploaded)
38     close_args = @opts[:create_close_args]
40     dest_info = @opts[:info] ||= {}
41     dest_info["fid"] = @opts[:fid].to_i
42     dest_info["key"] = @opts[:key]
43     dest_info["domain"] = @opts[:domain]
44     dest_info[:devid] = devid
45     dest_info[:path] = uri.to_s
46     dest_info[:size] = bytes_uploaded
47     if @md5
48       dest_info["checksum"] = "MD5:#{@md5.hexdigest}"
49     elsif String === @opts[:content_md5]
50       hex = @opts[:content_md5].unpack('m')[0].unpack('H*')[0]
51       dest_info["checksum"] = "MD5:#{hex}"
52     end
54     dest_info[:checksumverify] = 1 if @opts[:checksumverify]
55     backend = @opts[:backend]
57     # upload could've taken a long time, ping and try to ensure socket
58     # is valid to minimize (but not completely eliminate) the chance
59     # create_close hits a stale socket (while reading the response after
60     # writing to it) and becomes non-retryable.  We treat create_close
61     # specially as its less idempotent than any other command
62     # (even other non-idempotent ones).  There may be no hope of retrying
63     # the upload at all if data was streamed and calling create_close
64     # twice will hurt us...
65     backend.noop
67     backend.create_close(close_args ? close_args.merge(dest_info) : dest_info)
69     # make this look like file_info + get_uris
70     dest_info.delete(:checksumverify)
71     dest_info.delete(:path)
72     dest_info[:uris] = [ uri ]
73     dest_info["devcount"] = 1
74     dest_info["devids"] = [ dest_info.delete(:devid).to_i ]
75     dest_info["length"] = dest_info.delete(:size)
77     bytes_uploaded
78   end
80   # aggressive keepalive settings on Linux + Ruby 1.9.2+
81   TCP_KEEPALIVE = {
82     :TCP_KEEPIDLE => 60, # seconds time before keepalive packet is sent
83     :TCP_KEEPINTVL => 5,
84     :TCP_KEEPCNT => 2,  # number of retries
85   }
87   req_consts = TCP_KEEPALIVE.keys
88   if (Socket.constants & req_consts).size == req_consts.size
89     def set_socket_options(sock)
90       sock.setsockopt(:SOL_SOCKET, :SO_KEEPALIVE, 1)
91       TCP_KEEPALIVE.each do |k,v|
92         sock.setsockopt(:IPPROTO_TCP, k, v)
93       end
94     end
95   else
96     def set_socket_options(sock)
97       sock.setsockopt(Socket::SOL_SOCKET, Socket::SO_KEEPALIVE, 1)
98     end
99   end
100   # :startdoc: