2 require 'mogilefs/util'
6 # MogileFS::Backend communicates with the MogileFS trackers.
8 class MogileFS::Backend
11 # Adds MogileFS commands +names+.
13 def self.add_command(*names)
15 define_method name do |*args|
16 do_request name, args.first || {}
23 # this converts an error code from a mogilefsd tracker to an exception:
25 # Examples of some exceptions that get created:
26 # class AfterMismatchError < MogileFS::Error; end
27 # class DomainNotFoundError < MogileFS::Error; end
28 # class InvalidCharsError < MogileFS::Error; end
29 def self.add_error(err_snake)
30 err_camel = err_snake.gsub(/(?:^|_)([a-z])/) { $1.upcase } << 'Error'
31 unless self.const_defined?(err_camel)
32 self.class_eval("class #{err_camel} < MogileFS::Error; end")
34 BACKEND_ERRORS[err_snake] = self.const_get(err_camel)
43 # The string attached to the last error
45 attr_reader :lasterrstr
48 # Creates a new MogileFS::Backend.
50 # :hosts is a required argument and must be an Array containing one or more
51 # 'hostname:port' pairs as Strings.
53 # :timeout adjusts the request timeout before an error is returned.
57 raise ArgumentError, "must specify at least one host" unless @hosts
58 raise ArgumentError, "must specify at least one host" if @hosts.empty?
59 unless @hosts == @hosts.select { |h| h =~ /:\d+$/ } then
60 raise ArgumentError, ":hosts must be in 'host:port' form"
64 @timeout = args[:timeout] || 3
73 # Closes this backend's socket.
77 @socket.close rescue nil # ignore errors
82 # MogileFS::MogileFS commands
84 add_command :create_open
85 add_command :create_close
86 add_command :get_paths
90 add_command :list_keys
92 # MogileFS::Backend commands
94 add_command :get_hosts
95 add_command :get_devices
96 add_command :list_fids
98 add_command :get_domains
99 add_command :create_domain
100 add_command :delete_domain
101 add_command :create_class
102 add_command :update_class
103 add_command :delete_class
104 add_command :create_host
105 add_command :update_host
106 add_command :delete_host
107 add_command :set_state
109 # Errors copied from MogileFS/Worker/Query.pm
111 add_error 'after_mismatch'
112 add_error 'bad_params'
113 add_error 'class_exists'
114 add_error 'class_has_files'
115 add_error 'class_not_found'
117 add_error 'domain_has_files'
118 add_error 'domain_exists'
119 add_error 'domain_not_empty'
120 add_error 'domain_not_found'
122 add_error 'host_exists'
123 add_error 'host_mismatch'
124 add_error 'host_not_empty'
125 add_error 'host_not_found'
126 add_error 'invalid_chars'
127 add_error 'invalid_checker_level'
128 add_error 'invalid_mindevcount'
129 add_error 'key_exists'
131 add_error 'no_devices'
132 add_error 'no_domain'
137 add_error 'none_match'
138 add_error 'plugin_aborted'
139 add_error 'state_too_high'
140 add_error 'unknown_command'
141 add_error 'unknown_host'
142 add_error 'unknown_key'
143 add_error 'unknown_state'
144 add_error 'unreg_domain'
146 private unless defined? $TESTING
149 # Returns a new Socket (TCP) connected to +port+ on +host+.
151 def connect_to(host, port)
152 Socket.mogilefs_new(host, port, @timeout)
156 # Performs the +cmd+ request with +args+.
158 def do_request(cmd, args)
159 @mutex.synchronize do
160 request = make_request cmd, args
163 bytes_sent = socket.send request, 0
164 rescue SystemCallError
166 raise MogileFS::UnreachableBackendError
169 unless bytes_sent == request.length then
170 raise MogileFS::RequestTruncatedError,
171 "request truncated (sent #{bytes_sent} expected #{request.length})"
176 return parse_response(socket.gets)
181 # Makes a new request string for +cmd+ and +args+.
183 def make_request(cmd, args)
184 return "#{cmd} #{url_encode args}\r\n"
187 # this converts an error code from a mogilefsd tracker to an exception
188 # Most of these exceptions should already be defined, but since the
189 # MogileFS server code is liable to change and we may not always be
190 # able to keep up with the changes
192 BACKEND_ERRORS[err_snake] || self.class.add_error(err_snake)
196 # Turns the +line+ response from the server into a Hash of options, an
197 # error, or raises, as appropriate.
199 def parse_response(line)
200 if line =~ /^ERR\s+(\w+)\s*(.*)/ then
202 @lasterrstr = $2 ? url_unescape($2) : nil
203 raise error(@lasterr)
207 return url_decode($1) if line =~ /^OK\s+\d*\s*(\S*)/
209 raise MogileFS::InvalidResponseError,
210 "Invalid response from server: #{line.inspect}"
214 # Raises if the socket does not become readable in +@timeout+ seconds.
221 found = select [socket], nil, nil, timeleft
222 return true if found && found[0]
223 timeleft -= (Time.now - t0)
226 peer = @socket ? "#{@socket.mogilefs_peername} " : nil
228 # we DO NOT want the response we timed out waiting for, to crop up later
229 # on, on the same socket, intersperesed with a subsequent request! so,
230 # we close the socket if it times out like this
232 raise MogileFS::UnreadableSocketError, "#{peer}never became readable"
241 # Returns a socket connected to a MogileFS tracker.
244 return @socket if @socket and not @socket.closed?
248 @hosts.sort_by { rand(3) - 1 }.each do |host|
249 next if @dead.include? host and @dead[host] > now - 5
252 @socket = connect_to(*host.split(':'))
253 rescue SystemCallError, MogileFS::Timeout
261 raise MogileFS::UnreachableBackendError
265 # Turns a url params string into a Hash.
268 pairs = str.split('&').map do |pair|
269 pair.split('=', 2).map { |v| url_unescape v }
272 return Hash[*pairs.flatten]
276 # Turns a Hash (or Array of pairs) into a url params string.
278 def url_encode(params)
279 return params.map do |k,v|
280 "#{url_escape k.to_s}=#{url_escape v.to_s}"
285 # Escapes naughty URL characters.
288 return str.gsub(/([^\w\,\-.\/\\\: ])/) { "%%%02x" % $1[0] }.tr(' ', '+')
292 # Unescapes naughty URL characters.
294 def url_unescape(str)
295 return str.gsub(/%([a-f0-9][a-f0-9])/i) { [$1.to_i(16)].pack 'C' }.tr('+', ' ')