1 # -*- encoding: binary -*-
5 # MogileFS::Backend communicates with the MogileFS trackers.
7 class MogileFS::Backend
10 # Adds MogileFS commands +names+.
12 def self.add_command(*names)
14 define_method name do |*args|
15 do_request(name, args[0] || {}, false)
20 def self.add_idempotent_command(*names)
22 define_method name do |*args|
23 do_request(name, args[0] || {}, true)
30 # this converts an error code from a mogilefsd tracker to an exception:
32 # Examples of some exceptions that get created:
33 # class AfterMismatchError < MogileFS::Error; end
34 # class DomainNotFoundError < MogileFS::Error; end
35 # class InvalidCharsError < MogileFS::Error; end
36 def self.add_error(err_snake)
37 err_camel = err_snake.gsub(/(?:^|_)([a-z])/) { $1.upcase }
38 err_camel << 'Error' unless /Error\z/ =~ err_camel
39 unless self.const_defined?(err_camel)
40 self.class_eval("class #{err_camel} < MogileFS::Error; end")
42 BACKEND_ERRORS[err_snake] = self.const_get(err_camel)
51 # The string attached to the last error
53 attr_reader :lasterrstr
56 # Creates a new MogileFS::Backend.
58 # :hosts is a required argument and must be an Array containing one or more
59 # 'hostname:port' pairs as Strings.
61 # :timeout adjusts the request timeout before an error is returned.
65 raise ArgumentError, "must specify at least one host" unless @hosts
66 raise ArgumentError, "must specify at least one host" if @hosts.empty?
67 unless @hosts == @hosts.select { |h| h =~ /:\d+$/ } then
68 raise ArgumentError, ":hosts must be in 'host:port' form"
72 @timeout = args[:timeout] || 3
81 # Closes this backend's socket.
84 @mutex.synchronize { shutdown_unlocked }
87 # MogileFS::MogileFS commands
89 add_command :create_open
90 add_command :create_close
91 add_idempotent_command :get_paths
93 add_idempotent_command :sleep
95 add_idempotent_command :list_keys
96 add_idempotent_command :file_info
97 add_idempotent_command :file_debug
99 # MogileFS::Backend commands
101 add_idempotent_command :get_hosts
102 add_idempotent_command :get_devices
103 add_idempotent_command :list_fids
104 add_idempotent_command :stats
105 add_idempotent_command :get_domains
106 add_command :create_domain
107 add_command :delete_domain
108 add_command :create_class
109 add_command :update_class
110 add_command :delete_class
111 add_command :create_host
112 add_command :update_host
113 add_command :delete_host
114 add_command :set_state
116 # Errors copied from MogileFS/Worker/Query.pm
118 add_error 'after_mismatch'
119 add_error 'bad_params'
120 add_error 'class_exists'
121 add_error 'class_has_files'
122 add_error 'class_not_found'
124 add_error 'domain_has_files'
125 add_error 'domain_exists'
126 add_error 'domain_not_empty'
127 add_error 'domain_not_found'
129 add_error 'host_exists'
130 add_error 'host_mismatch'
131 add_error 'host_not_empty'
132 add_error 'host_not_found'
133 add_error 'invalid_chars'
134 add_error 'invalid_checker_level'
135 add_error 'invalid_mindevcount'
136 add_error 'key_exists'
138 add_error 'no_devices'
139 add_error 'no_domain'
144 add_error 'none_match'
145 add_error 'plugin_aborted'
146 add_error 'state_too_high'
147 add_error 'size_verify_error'
148 add_error 'unknown_command'
149 add_error 'unknown_host'
150 add_error 'unknown_key'
151 add_error 'unknown_state'
152 add_error 'unreg_domain'
154 private unless defined? $TESTING
156 def shutdown_unlocked # :nodoc:
158 @socket.close rescue nil # ignore errors
164 # Performs the +cmd+ request with +args+.
166 def do_request(cmd, args, idempotent = false)
167 request = make_request cmd, args
168 @mutex.synchronize do
172 io.timed_write(request, @timeout)
173 rescue SystemCallError
174 @dead[@active_host] = Time.now
180 line = io.timed_gets(@timeout) and return parse_response(line)
183 raise EOFError, "end of file reached after: #{request.inspect}"
185 # we DO NOT want the response we timed out waiting for, to crop up later
186 # on, on the same socket, intersperesed with a subsequent request! we
187 # close the socket if there's any error.
191 end # @mutex.synchronize
195 # Makes a new request string for +cmd+ and +args+.
197 def make_request(cmd, args)
198 "#{cmd} #{url_encode args}\r\n"
201 # this converts an error code from a mogilefsd tracker to an exception
202 # Most of these exceptions should already be defined, but since the
203 # MogileFS server code is liable to change and we may not always be
204 # able to keep up with the changes
206 BACKEND_ERRORS[err_snake] || self.class.add_error(err_snake)
210 # Turns the +line+ response from the server into a Hash of options, an
211 # error, or raises, as appropriate.
213 def parse_response(line)
214 if line =~ /^ERR\s+(\w+)\s*([^\r\n]*)/
216 @lasterrstr = $2 ? url_unescape($2) : nil
217 raise error(@lasterr), @lasterrstr
220 return url_decode($1) if line =~ /^OK\s+\d*\s*(\S*)\r\n\z/
222 raise MogileFS::InvalidResponseError,
223 "Invalid response from server: #{line.inspect}"
227 # Returns a socket connected to a MogileFS tracker.
230 return @socket if @socket and not @socket.closed?
234 @hosts.shuffle.each do |host|
235 next if @dead.include? host and @dead[host] > now - 5
238 addr, port = host.split(/:/)
239 @socket = MogileFS::Socket.tcp(addr, port, @timeout)
241 rescue SystemCallError, MogileFS::Timeout
249 raise MogileFS::UnreachableBackendError
253 # Turns a url params string into a Hash.
256 Hash[*(str.split(/&/).map! { |pair|
257 pair.split(/=/, 2).map! { |x| url_unescape(x) }
262 # Turns a Hash (or Array of pairs) into a url params string.
264 def url_encode(params)
266 "#{url_escape k.to_s}=#{url_escape v.to_s}"
271 # Escapes naughty URL characters.
272 if ''.respond_to?(:ord) # Ruby 1.9
274 str.gsub(/([^\w\,\-.\/\\\: ])/) { "%%%02x" % $1.ord }.tr(' ', '+')
278 str.gsub(/([^\w\,\-.\/\\\: ])/) { "%%%02x" % $1[0] }.tr(' ', '+')
283 # Unescapes naughty URL characters.
285 def url_unescape(str)
286 str.gsub(/%([a-f0-9][a-f0-9])/i) { [$1.to_i(16)].pack 'C' }.tr('+', ' ')