1 # -*- encoding: binary -*-
5 # MogileFS::Backend communicates with the MogileFS trackers.
7 class MogileFS::Backend
10 # Adds MogileFS commands +names+.
12 def self.add_command(*names)
14 define_method name do |*args|
15 do_request(name, args[0] || {}, false)
20 def self.add_idempotent_command(*names)
22 define_method name do |*args|
23 do_request(name, args[0] || {}, true)
30 # this converts an error code from a mogilefsd tracker to an exception:
32 # Examples of some exceptions that get created:
33 # class AfterMismatchError < MogileFS::Error; end
34 # class DomainNotFoundError < MogileFS::Error; end
35 # class InvalidCharsError < MogileFS::Error; end
36 def self.add_error(err_snake)
37 err_camel = err_snake.gsub(/(?:^|_)([a-z])/) { $1.upcase }
38 err_camel << 'Error' unless /Error\z/ =~ err_camel
39 unless self.const_defined?(err_camel)
40 self.class_eval("class #{err_camel} < MogileFS::Error; end")
42 BACKEND_ERRORS[err_snake] = self.const_get(err_camel)
51 # The string attached to the last error
53 attr_reader :lasterrstr
56 # Creates a new MogileFS::Backend.
58 # :hosts is a required argument and must be an Array containing one or more
59 # 'hostname:port' pairs as Strings.
61 # :timeout adjusts the request timeout before an error is returned.
65 raise ArgumentError, "must specify at least one host" unless @hosts
66 raise ArgumentError, "must specify at least one host" if @hosts.empty?
67 unless @hosts == @hosts.select { |h| h =~ /:\d+$/ } then
68 raise ArgumentError, ":hosts must be in 'host:port' form"
72 @timeout = args[:timeout] || 3
81 # Closes this backend's socket.
84 @mutex.synchronize { shutdown_unlocked }
87 # MogileFS::MogileFS commands
89 add_command :create_open
90 add_command :create_close
91 add_idempotent_command :get_paths
93 add_idempotent_command :sleep
95 add_idempotent_command :list_keys
96 add_idempotent_command :file_info
97 add_idempotent_command :file_debug
99 # MogileFS::Backend commands
101 add_idempotent_command :get_hosts
102 add_idempotent_command :get_devices
103 add_idempotent_command :list_fids
104 add_idempotent_command :stats
105 add_idempotent_command :get_domains
106 add_command :create_domain
107 add_command :delete_domain
108 add_command :create_class
109 add_command :update_class
110 add_command :delete_class
111 add_command :create_host
112 add_command :update_host
113 add_command :delete_host
114 add_command :set_state
116 # Errors copied from MogileFS/Worker/Query.pm
118 add_error 'after_mismatch'
119 add_error 'bad_params'
120 add_error 'class_exists'
121 add_error 'class_has_files'
122 add_error 'class_not_found'
124 add_error 'domain_has_files'
125 add_error 'domain_exists'
126 add_error 'domain_not_empty'
127 add_error 'domain_not_found'
129 add_error 'host_exists'
130 add_error 'host_mismatch'
131 add_error 'host_not_empty'
132 add_error 'host_not_found'
133 add_error 'invalid_chars'
134 add_error 'invalid_checker_level'
135 add_error 'invalid_mindevcount'
136 add_error 'key_exists'
138 add_error 'no_devices'
139 add_error 'no_domain'
144 add_error 'none_match'
145 add_error 'plugin_aborted'
146 add_error 'state_too_high'
147 add_error 'size_verify_error'
148 add_error 'unknown_command'
149 add_error 'unknown_host'
150 add_error 'unknown_key'
151 add_error 'unknown_state'
152 add_error 'unreg_domain'
154 private unless defined? $TESTING
156 def shutdown_unlocked # :nodoc:
158 @socket.close rescue nil # ignore errors
164 # Performs the +cmd+ request with +args+.
166 def do_request(cmd, args, idempotent = false)
167 request = make_request cmd, args
168 @mutex.synchronize do
172 io.timed_write(request, @timeout)
173 rescue SystemCallError => err
174 @dead[@active_host] = [ Time.now, err ]
180 line = io.timed_gets(@timeout) and return parse_response(line)
183 raise EOFError, "end of file reached after: #{request.inspect}"
184 rescue SystemCallError,
185 MogileFS::UnreadableSocketError,
186 MogileFS::InvalidResponseError, # truncated response
188 # we got a successful timed_write, but not a timed_gets
193 # we DO NOT want the response we timed out waiting for, to crop up later
194 # on, on the same socket, intersperesed with a subsequent request! we
195 # close the socket if there's any error.
199 end # @mutex.synchronize
203 # Makes a new request string for +cmd+ and +args+.
205 def make_request(cmd, args)
206 "#{cmd} #{url_encode args}\r\n"
209 # this converts an error code from a mogilefsd tracker to an exception
210 # Most of these exceptions should already be defined, but since the
211 # MogileFS server code is liable to change and we may not always be
212 # able to keep up with the changes
214 BACKEND_ERRORS[err_snake] || self.class.add_error(err_snake)
218 # Turns the +line+ response from the server into a Hash of options, an
219 # error, or raises, as appropriate.
221 def parse_response(line)
222 if line =~ /^ERR\s+(\w+)\s*([^\r\n]*)/
224 @lasterrstr = $2 ? url_unescape($2) : nil
225 raise error(@lasterr), @lasterrstr
228 return url_decode($1) if line =~ /^OK\s+\d*\s*(\S*)\r\n\z/
230 raise MogileFS::InvalidResponseError,
231 "Invalid response from server: #{line.inspect}"
235 # Returns a socket connected to a MogileFS tracker.
238 return @socket if @socket and not @socket.closed?
242 @hosts.shuffle.each do |host|
243 next if @dead.include?(host) and @dead[host][0] > now - 5
246 addr, port = host.split(/:/)
247 @socket = MogileFS::Socket.tcp(addr, port, @timeout)
249 rescue SystemCallError, MogileFS::Timeout => err
250 @dead[host] = [ now, err ]
257 errors = @dead.map { |host,(_,e)| "#{host} - #{e.message} (#{e.class})" }
258 raise MogileFS::UnreachableBackendError,
259 "couldn't connect to any tracker: #{errors.join(', ')}"
263 # Turns a url params string into a Hash.
266 Hash[*(str.split(/&/).map! { |pair|
267 pair.split(/=/, 2).map! { |x| url_unescape(x) }
272 # Turns a Hash (or Array of pairs) into a url params string.
274 def url_encode(params)
276 "#{url_escape k.to_s}=#{url_escape v.to_s}"
281 # Escapes naughty URL characters.
282 if ''.respond_to?(:ord) # Ruby 1.9
284 str.gsub(/([^\w\,\-.\/\\\: ])/) { "%%%02x" % $1.ord }.tr(' ', '+')
288 str.gsub(/([^\w\,\-.\/\\\: ])/) { "%%%02x" % $1[0] }.tr(' ', '+')
293 # Unescapes naughty URL characters.
295 def url_unescape(str)
296 str.gsub(/%([a-f0-9][a-f0-9])/i) { [$1.to_i(16)].pack 'C' }.tr('+', ' ')