1 # -*- encoding: binary -*-
4 # This class communicates with the MogileFS trackers.
5 # You should not have to use this directly unless you are developing
6 # support for new commands or plugins for MogileFS
7 class MogileFS::Backend
9 # Adds MogileFS commands +names+.
10 def self.add_command(*names)
12 define_method name do |*args|
13 do_request(name, args[0] || {}, false)
18 # adds idempotent MogileFS commands +names+, these commands may be retried
19 # transparently on a different tracker if there is a network/server error.
20 def self.add_idempotent_command(*names)
22 define_method name do |*args|
23 do_request(name, args[0] || {}, true)
28 BACKEND_ERRORS = {} # :nodoc:
30 # this converts an error code from a mogilefsd tracker to an exception:
32 # Examples of some exceptions that get created:
33 # class AfterMismatchError < MogileFS::Error; end
34 # class DomainNotFoundError < MogileFS::Error; end
35 # class InvalidCharsError < MogileFS::Error; end
36 def self.add_error(err_snake)
37 err_camel = err_snake.gsub(/(?:^|_)([a-z])/) { $1.upcase }
38 err_camel << 'Error' unless /Error\z/ =~ err_camel
39 unless const_defined?(err_camel)
40 const_set(err_camel, Class.new(MogileFS::Error))
42 BACKEND_ERRORS[err_snake] = const_get(err_camel)
45 def self.const_missing(name) # :nodoc:
46 if /Error\z/ =~ name.to_s
47 const_set(name, Class.new(MogileFS::Error))
59 # The string attached to the last error
61 attr_reader :lasterrstr
64 # Creates a new MogileFS::Backend.
66 # :hosts is a required argument and must be an Array containing one or more
67 # 'hostname:port' pairs as Strings.
69 # :timeout adjusts the request timeout before an error is returned.
73 @fail_timeout = args[:fail_timeout] || 5
74 raise ArgumentError, "must specify at least one host" unless @hosts
75 raise ArgumentError, "must specify at least one host" if @hosts.empty?
76 unless @hosts == @hosts.select { |h| h =~ /:\d+$/ } then
77 raise ArgumentError, ":hosts must be in 'host:port' form"
81 @timeout = args[:timeout] || 3
82 @connect_timeout = args[:connect_timeout] || @timeout
92 # Closes this backend's socket.
95 @mutex.synchronize { shutdown_unlocked }
98 # MogileFS::MogileFS commands
100 add_command :create_open
101 add_command :create_close
102 add_idempotent_command :get_paths
103 add_idempotent_command :noop
105 add_idempotent_command :sleep
107 add_idempotent_command :list_keys
108 add_idempotent_command :file_info
109 add_idempotent_command :file_debug
111 # MogileFS::Backend commands
113 add_idempotent_command :get_hosts
114 add_idempotent_command :get_devices
115 add_idempotent_command :list_fids
116 add_idempotent_command :stats
117 add_idempotent_command :get_domains
118 add_command :create_device
119 add_command :create_domain
120 add_command :delete_domain
121 add_command :create_class
122 add_command :update_class
123 add_command :updateclass
124 add_command :delete_class
125 add_command :create_host
126 add_command :update_host
127 add_command :delete_host
128 add_command :set_state
129 add_command :set_weight
130 add_command :replicate_now
132 def shutdown_unlocked(do_raise = false) # :nodoc:
135 @socket.close rescue nil # ignore errors
141 def dispatch_unlocked(request, timeout = @timeout) # :nodoc:
145 io.timed_write(request, timeout)
147 rescue SystemCallError, MogileFS::RequestTruncatedError => err
148 tries ||= Hash.new { |hash,host| hash[host] = 0 }
149 nr = tries[@active_host] += 1
151 @dead[@active_host] = [ MogileFS.now, err ]
158 def pipeline_gets_unlocked(io, timeout) # :nodoc:
159 line = io.timed_gets(timeout) or
160 raise MogileFS::PipelineError,
161 "EOF with #{@pending.size} requests in-flight"
162 ready = @pending.shift
163 ready[1].call(parse_response(line, ready[0]))
166 def timeout_update(timeout, t0) # :nodoc:
167 timeout -= (MogileFS.now - t0)
168 timeout < 0 ? 0 : timeout
171 # try to read any responses we have pending already before filling
172 # the pipeline more requests. This usually takes very little time,
173 # but trackers may return huge responses and we could be on a slow
175 def pipeline_drain_unlocked(io, timeout) # :nodoc:
177 while @pending.size > 0
179 r = IO.select(set, set, nil, timeout)
180 timeout = timeout_update(timeout, t0)
184 pipeline_gets_unlocked(io, timeout)
185 timeout = timeout_update(timeout, t0)
193 # dispatch a request like do_request, but queue +block+ for execution
194 # upon receiving a response. It is the users' responsibility to ensure
195 # &block is executed in the correct order. Trackers with multiple
196 # queryworkers are not guaranteed to return responses in the same
197 # order they were requested.
198 def pipeline_dispatch(cmd, args, &block) # :nodoc:
199 request = make_request(cmd, args)
202 @mutex.synchronize do
204 timeout = pipeline_drain_unlocked(io, timeout)
206 # send the request out...
208 io.timed_write(request, timeout)
209 @pending << [ request, block ]
210 rescue SystemCallError, MogileFS::RequestTruncatedError => err
211 @dead[@active_host] = [ MogileFS.now, err ]
212 shutdown_unlocked(@pending[0])
221 def pipeline_wait(count = nil) # :nodoc:
222 @mutex.synchronize do
224 count ||= @pending.size
225 @pending.size < count and
226 raise MogileFS::Error,
227 "pending=#{@pending.size} < expected=#{count} failed"
229 count.times { pipeline_gets_unlocked(io, @timeout) }
231 shutdown_unlocked(true)
236 # Performs the +cmd+ request with +args+.
237 def do_request(cmd, args, idempotent = false)
238 no_raise = args.delete(:ruby_no_raise)
239 request = make_request(cmd, args)
242 @mutex.synchronize do
244 io = dispatch_unlocked(request)
245 line = io.timed_gets(@timeout)
246 break if /\n\z/ =~ line
248 line and raise MogileFS::InvalidResponseError,
249 "Invalid response from server: #{line.inspect}"
252 raise EOFError, "end of file reached after: #{request.inspect}"
253 # fall through to retry in loop
254 rescue SystemCallError,
255 MogileFS::InvalidResponseError # truncated response
256 # we got a successful timed_write, but not a timed_gets
259 shutdown_unlocked(false)
262 shutdown_unlocked(true)
263 rescue MogileFS::UnreadableSocketError, MogileFS::Timeout
264 shutdown_unlocked(true)
266 # we DO NOT want the response we timed out waiting for, to crop up later
267 # on, on the same socket, intersperesed with a subsequent request! we
268 # close the socket if there's any error.
269 shutdown_unlocked(true)
271 shutdown_unlocked if failed
272 end # @mutex.synchronize
273 parse_response(line, no_raise ? request : nil)
276 # Makes a new request string for +cmd+ and +args+.
277 def make_request(cmd, args)
278 "#{cmd} #{url_encode args}\r\n"
281 # this converts an error code from a mogilefsd tracker to an exception
282 # Most of these exceptions should already be defined, but since the
283 # MogileFS server code is liable to change and we may not always be
284 # able to keep up with the changes
286 BACKEND_ERRORS[err_snake] || self.class.add_error(err_snake)
289 # Turns the +line+ response from the server into a Hash of options, an
290 # error, or raises, as appropriate.
291 def parse_response(line, request = nil)
293 when /\AOK\s+\d*\s*(\S*)\r?\n\z/
295 when /\AERR\s+(\w+)\s*([^\r\n]*)/
297 @lasterrstr = $2 ? url_unescape($2) : nil
299 request = " request=#{request.strip}"
300 @lasterrstr = @lasterrstr ? (@lasterrstr << request) : request
301 return error(@lasterr).new(@lasterrstr)
303 raise error(@lasterr).new(@lasterrstr)
305 raise MogileFS::InvalidResponseError,
306 "Invalid response from server: #{line.inspect}"
310 # this command is special since the cache is per-tracker, so we connect
311 # to all backends and not just one
312 def clear_cache(types = %w(all))
314 types.each { |type| opts[type] = 1 }
316 sockets = @hosts.map do |host|
317 MogileFS::Socket.start(*(host.split(':'.freeze))) rescue nil
323 request = make_request("clear_cache", opts)
324 while wpending[0] || rpending[0]
325 r = IO.select(rpending, wpending, nil, @timeout) or return
328 r[0].each { |io| io.timed_gets(0) rescue nil }
331 io.timed_write(request, 0)
339 sockets.each { |io| io.close }
342 # Returns a socket connected to a MogileFS tracker.
344 return @socket if @socket and not @socket.closed?
346 @hosts.shuffle.each do |host|
347 next if dead = @dead[host] and dead[0] > (MogileFS.now - @fail_timeout)
350 addr, port = host.split(':'.freeze)
351 @socket = MogileFS::Socket.tcp(addr, port, @connect_timeout)
353 rescue SystemCallError, MogileFS::Timeout => err
354 @dead[host] = [ MogileFS.now, err ]
361 errors = @dead.map { |host,(_,e)| "#{host} - #{e.message} (#{e.class})" }
362 raise MogileFS::UnreachableBackendError,
363 "couldn't connect to any tracker: #{errors.join(', ')}"
366 # Turns a url params string into a Hash.
367 def url_decode(str) # :nodoc:
369 str.split('&'.freeze).each do |pair|
370 k, v = pair.split('='.freeze, 2).map! { |x| url_unescape(x) }
377 # TODO: see if we can use existing URL-escape/unescaping routines
378 # in the Ruby standard library, Perl MogileFS seems to NIH these
382 # Turns a Hash (or Array of pairs) into a url params string.
383 def url_encode(params) # :nodoc:
385 "#{url_escape k.to_s}=#{url_escape v.to_s}"
389 # Escapes naughty URL characters.
390 if ''.respond_to?(:ord) # Ruby 1.9
391 def url_escape(str) # :nodoc:
392 str = str.gsub(/([^\w\,\-.\/\\\: ])/) { "%%%02x".freeze % $1.ord }
393 str.tr!(' '.freeze, '+'.freeze)
397 def url_escape(str) # :nodoc:
398 str.gsub(/([^\w\,\-.\/\\\: ])/) { "%%%02x" % $1[0] }.tr(' ', '+')
402 # Unescapes naughty URL characters.
403 def url_unescape(str) # :nodoc:
404 str = str.tr('+'.freeze, ' '.freeze)
405 str.gsub!(/%([a-f0-9][a-f0-9])/i) { [$1.to_i(16)].pack('C'.freeze) }