1 # -*- encoding: binary -*-
4 # This class communicates with the MogileFS trackers.
5 # You should not have to use this directly unless you are developing
6 # support for new commands or plugins for MogileFS
7 class MogileFS::Backend
9 # Adds MogileFS commands +names+.
10 def self.add_command(*names)
12 define_method name do |*args|
13 do_request(name, args[0] || {}, false)
18 # adds idempotent MogileFS commands +names+, these commands may be retried
19 # transparently on a different tracker if there is a network/server error.
20 def self.add_idempotent_command(*names)
22 define_method name do |*args|
23 do_request(name, args[0] || {}, true)
28 BACKEND_ERRORS = {} # :nodoc:
30 # this converts an error code from a mogilefsd tracker to an exception:
32 # Examples of some exceptions that get created:
33 # class AfterMismatchError < MogileFS::Error; end
34 # class DomainNotFoundError < MogileFS::Error; end
35 # class InvalidCharsError < MogileFS::Error; end
36 def self.add_error(err_snake)
37 err_camel = err_snake.gsub(/(?:^|_)([a-z])/) { $1.upcase }
38 err_camel << 'Error' unless /Error\z/ =~ err_camel
39 unless const_defined?(err_camel)
40 const_set(err_camel, Class.new(MogileFS::Error))
42 BACKEND_ERRORS[err_snake] = const_get(err_camel)
51 # The string attached to the last error
53 attr_reader :lasterrstr
56 # Creates a new MogileFS::Backend.
58 # :hosts is a required argument and must be an Array containing one or more
59 # 'hostname:port' pairs as Strings.
61 # :timeout adjusts the request timeout before an error is returned.
65 raise ArgumentError, "must specify at least one host" unless @hosts
66 raise ArgumentError, "must specify at least one host" if @hosts.empty?
67 unless @hosts == @hosts.select { |h| h =~ /:\d+$/ } then
68 raise ArgumentError, ":hosts must be in 'host:port' form"
72 @timeout = args[:timeout] || 3
82 # Closes this backend's socket.
85 @mutex.synchronize { shutdown_unlocked }
88 # MogileFS::MogileFS commands
90 add_command :create_open
91 add_command :create_close
92 add_idempotent_command :get_paths
94 add_idempotent_command :sleep
96 add_idempotent_command :list_keys
97 add_idempotent_command :file_info
98 add_idempotent_command :file_debug
100 # MogileFS::Backend commands
102 add_idempotent_command :get_hosts
103 add_idempotent_command :get_devices
104 add_idempotent_command :list_fids
105 add_idempotent_command :stats
106 add_idempotent_command :get_domains
107 add_command :create_domain
108 add_command :delete_domain
109 add_command :create_class
110 add_command :update_class
111 add_command :delete_class
112 add_command :create_host
113 add_command :update_host
114 add_command :delete_host
115 add_command :set_state
117 # Errors copied from MogileFS/Worker/Query.pm
119 add_error 'after_mismatch'
120 add_error 'bad_params'
121 add_error 'class_exists'
122 add_error 'class_has_files'
123 add_error 'class_not_found'
125 add_error 'domain_has_files'
126 add_error 'domain_exists'
127 add_error 'domain_not_empty'
128 add_error 'domain_not_found'
130 add_error 'host_exists'
131 add_error 'host_mismatch'
132 add_error 'host_not_empty'
133 add_error 'host_not_found'
134 add_error 'invalid_chars'
135 add_error 'invalid_checker_level'
136 add_error 'invalid_mindevcount'
137 add_error 'key_exists'
139 add_error 'no_devices'
140 add_error 'no_domain'
145 add_error 'none_match'
146 add_error 'plugin_aborted'
147 add_error 'state_too_high'
148 add_error 'size_verify_error'
149 add_error 'unknown_command'
150 add_error 'unknown_host'
151 add_error 'unknown_key'
152 add_error 'unknown_state'
153 add_error 'unreg_domain'
155 def shutdown_unlocked(do_raise = false) # :nodoc:
158 @socket.close rescue nil # ignore errors
164 def dispatch_unlocked(request, timeout = @timeout) # :nodoc:
167 io.timed_write(request, timeout)
169 rescue SystemCallError, MogileFS::RequestTruncatedError => err
170 @dead[@active_host] = [ Time.now, err ]
176 def pipeline_gets_unlocked(io, timeout) # :nodoc:
177 line = io.timed_gets(timeout) or
178 raise MogileFS::PipelineError,
179 "EOF with #{@pending.size} requests in-flight"
180 ready = @pending.shift
181 ready[1].call(parse_response(line, ready[0]))
184 def timeout_update(timeout, t0) # :nodoc:
185 timeout -= (Time.now - t0)
186 timeout < 0 ? 0 : timeout
189 # try to read any responses we have pending already before filling
190 # the pipeline more requests. This usually takes very little time,
191 # but trackers may return huge responses and we could be on a slow
193 def pipeline_drain_unlocked(io, timeout) # :nodoc:
195 while @pending.size > 0
197 r = IO.select(set, set, nil, timeout)
198 timeout = timeout_update(timeout, t0)
202 pipeline_gets_unlocked(io, timeout)
203 timeout = timeout_update(timeout, t0)
211 # dispatch a request like do_request, but queue +block+ for execution
212 # upon receiving a response.
213 def pipeline_dispatch(cmd, args, &block) # :nodoc:
214 request = make_request(cmd, args)
217 @mutex.synchronize do
219 timeout = pipeline_drain_unlocked(io, timeout)
221 # send the request out...
223 io.timed_write(request, timeout)
224 @pending << [ request, block ]
225 rescue SystemCallError, MogileFS::RequestTruncatedError => err
226 @dead[@active_host] = [ Time.now, err ]
227 shutdown_unlocked(@pending[0])
236 def pipeline_wait(count = nil) # :nodoc:
237 @mutex.synchronize do
239 count ||= @pending.size
240 @pending.size < count and
241 raise MogileFS::PipelineError,
242 "pending=#{@pending.size} < expected=#{count} failed"
244 count.times { pipeline_gets_unlocked(io, @timeout) }
246 shutdown_unlocked(true)
251 # Performs the +cmd+ request with +args+.
252 def do_request(cmd, args, idempotent = false)
253 request = make_request cmd, args
254 @mutex.synchronize do
256 io = dispatch_unlocked(request)
257 line = io.timed_gets(@timeout) and return parse_response(line)
260 raise EOFError, "end of file reached after: #{request.inspect}"
261 # fall through to retry in loop
262 rescue SystemCallError,
263 MogileFS::UnreadableSocketError,
264 MogileFS::InvalidResponseError, # truncated response
266 # we got a successful timed_write, but not a timed_gets
268 shutdown_unlocked(true)
270 # we DO NOT want the response we timed out waiting for, to crop up later
271 # on, on the same socket, intersperesed with a subsequent request! we
272 # close the socket if there's any error.
273 shutdown_unlocked(true)
275 end # @mutex.synchronize
278 # Makes a new request string for +cmd+ and +args+.
279 def make_request(cmd, args)
280 "#{cmd} #{url_encode args}\r\n"
283 # this converts an error code from a mogilefsd tracker to an exception
284 # Most of these exceptions should already be defined, but since the
285 # MogileFS server code is liable to change and we may not always be
286 # able to keep up with the changes
288 BACKEND_ERRORS[err_snake] || self.class.add_error(err_snake)
291 # Turns the +line+ response from the server into a Hash of options, an
292 # error, or raises, as appropriate.
293 def parse_response(line, request = nil)
294 if line =~ /^ERR\s+(\w+)\s*([^\r\n]*)/
296 @lasterrstr = $2 ? url_unescape($2) : nil
298 request = " request=#{request.strip}"
299 @lasterrstr = @lasterrstr ? (@lasterrstr << request) : request
300 return error(@lasterr).new(@lasterrstr)
302 raise error(@lasterr).new(@lasterrstr)
305 return url_decode($1) if line =~ /^OK\s+\d*\s*(\S*)\r\n\z/
307 raise MogileFS::InvalidResponseError,
308 "Invalid response from server: #{line.inspect}"
311 # Returns a socket connected to a MogileFS tracker.
313 return @socket if @socket and not @socket.closed?
317 @hosts.shuffle.each do |host|
318 next if @dead.include?(host) and @dead[host][0] > now - 5
321 addr, port = host.split(/:/)
322 @socket = MogileFS::Socket.tcp(addr, port, @timeout)
324 rescue SystemCallError, MogileFS::Timeout => err
325 @dead[host] = [ now, err ]
332 errors = @dead.map { |host,(_,e)| "#{host} - #{e.message} (#{e.class})" }
333 raise MogileFS::UnreachableBackendError,
334 "couldn't connect to any tracker: #{errors.join(', ')}"
337 # Turns a url params string into a Hash.
338 def url_decode(str) # :nodoc:
339 Hash[*(str.split(/&/).map! { |pair|
340 pair.split(/=/, 2).map! { |x| url_unescape(x) }
345 # TODO: see if we can use existing URL-escape/unescaping routines
346 # in the Ruby standard library, Perl MogileFS seems to NIH these
350 # Turns a Hash (or Array of pairs) into a url params string.
351 def url_encode(params) # :nodoc:
353 "#{url_escape k.to_s}=#{url_escape v.to_s}"
357 # Escapes naughty URL characters.
358 if ''.respond_to?(:ord) # Ruby 1.9
359 def url_escape(str) # :nodoc:
360 str.gsub(/([^\w\,\-.\/\\\: ])/) { "%%%02x" % $1.ord }.tr(' ', '+')
363 def url_escape(str) # :nodoc:
364 str.gsub(/([^\w\,\-.\/\\\: ])/) { "%%%02x" % $1[0] }.tr(' ', '+')
368 # Unescapes naughty URL characters.
369 def url_unescape(str) # :nodoc:
370 str.tr('+', ' ').gsub(/%([a-f0-9][a-f0-9])/i) { [$1.to_i(16)].pack 'C' }