1 # -*- encoding: binary -*-
4 # This class communicates with the MogileFS trackers.
5 # You should not have to use this directly unless you are developing
6 # support for new commands or plugins for MogileFS
7 class MogileFS::Backend
9 # Adds MogileFS commands +names+.
10 def self.add_command(*names)
12 define_method name do |*args|
13 do_request(name, args[0] || {}, false)
18 # adds idempotent MogileFS commands +names+, these commands may be retried
19 # transparently on a different tracker if there is a network/server error.
20 def self.add_idempotent_command(*names)
22 define_method name do |*args|
23 do_request(name, args[0] || {}, true)
28 BACKEND_ERRORS = {} # :nodoc:
30 # this converts an error code from a mogilefsd tracker to an exception:
32 # Examples of some exceptions that get created:
33 # class AfterMismatchError < MogileFS::Error; end
34 # class DomainNotFoundError < MogileFS::Error; end
35 # class InvalidCharsError < MogileFS::Error; end
36 def self.add_error(err_snake)
37 err_camel = err_snake.gsub(/(?:^|_)([a-z])/) { $1.upcase }
38 err_camel << 'Error' unless /Error\z/ =~ err_camel
39 unless const_defined?(err_camel)
40 const_set(err_camel, Class.new(MogileFS::Error))
42 BACKEND_ERRORS[err_snake] = const_get(err_camel)
45 def self.const_missing(name) # :nodoc:
46 if /Error\z/ =~ name.to_s
47 const_set(name, Class.new(MogileFS::Error))
59 # The string attached to the last error
61 attr_reader :lasterrstr
64 # Creates a new MogileFS::Backend.
66 # :hosts is a required argument and must be an Array containing one or more
67 # 'hostname:port' pairs as Strings.
69 # :timeout adjusts the request timeout before an error is returned.
73 @fail_timeout = args[:fail_timeout] || 5
74 raise ArgumentError, "must specify at least one host" unless @hosts
75 raise ArgumentError, "must specify at least one host" if @hosts.empty?
76 unless @hosts == @hosts.select { |h| h =~ /:\d+$/ } then
77 raise ArgumentError, ":hosts must be in 'host:port' form"
81 @timeout = args[:timeout] || 3
91 # Closes this backend's socket.
94 @mutex.synchronize { shutdown_unlocked }
97 # MogileFS::MogileFS commands
99 add_command :create_open
100 add_command :create_close
101 add_idempotent_command :get_paths
102 add_idempotent_command :noop
104 add_idempotent_command :sleep
106 add_idempotent_command :list_keys
107 add_idempotent_command :file_info
108 add_idempotent_command :file_debug
110 # MogileFS::Backend commands
112 add_idempotent_command :get_hosts
113 add_idempotent_command :get_devices
114 add_idempotent_command :list_fids
115 add_idempotent_command :stats
116 add_idempotent_command :get_domains
117 add_command :create_device
118 add_command :create_domain
119 add_command :delete_domain
120 add_command :create_class
121 add_command :update_class
122 add_command :updateclass
123 add_command :delete_class
124 add_command :create_host
125 add_command :update_host
126 add_command :delete_host
127 add_command :set_state
128 add_command :set_weight
129 add_command :replicate_now
131 def shutdown_unlocked(do_raise = false) # :nodoc:
134 @socket.close rescue nil # ignore errors
140 def dispatch_unlocked(request, timeout = @timeout) # :nodoc:
143 io.timed_write(request, timeout)
145 rescue SystemCallError, MogileFS::RequestTruncatedError => err
146 @dead[@active_host] = [ Time.now, err ]
152 def pipeline_gets_unlocked(io, timeout) # :nodoc:
153 line = io.timed_gets(timeout) or
154 raise MogileFS::PipelineError,
155 "EOF with #{@pending.size} requests in-flight"
156 ready = @pending.shift
157 ready[1].call(parse_response(line, ready[0]))
160 def timeout_update(timeout, t0) # :nodoc:
161 timeout -= (Time.now - t0)
162 timeout < 0 ? 0 : timeout
165 # try to read any responses we have pending already before filling
166 # the pipeline more requests. This usually takes very little time,
167 # but trackers may return huge responses and we could be on a slow
169 def pipeline_drain_unlocked(io, timeout) # :nodoc:
171 while @pending.size > 0
173 r = IO.select(set, set, nil, timeout)
174 timeout = timeout_update(timeout, t0)
178 pipeline_gets_unlocked(io, timeout)
179 timeout = timeout_update(timeout, t0)
187 # dispatch a request like do_request, but queue +block+ for execution
188 # upon receiving a response. It is the users' responsibility to ensure
189 # &block is executed in the correct order. Trackers with multiple
190 # queryworkers are not guaranteed to return responses in the same
191 # order they were requested.
192 def pipeline_dispatch(cmd, args, &block) # :nodoc:
193 request = make_request(cmd, args)
196 @mutex.synchronize do
198 timeout = pipeline_drain_unlocked(io, timeout)
200 # send the request out...
202 io.timed_write(request, timeout)
203 @pending << [ request, block ]
204 rescue SystemCallError, MogileFS::RequestTruncatedError => err
205 @dead[@active_host] = [ Time.now, err ]
206 shutdown_unlocked(@pending[0])
215 def pipeline_wait(count = nil) # :nodoc:
216 @mutex.synchronize do
218 count ||= @pending.size
219 @pending.size < count and
220 raise MogileFS::Error,
221 "pending=#{@pending.size} < expected=#{count} failed"
223 count.times { pipeline_gets_unlocked(io, @timeout) }
225 shutdown_unlocked(true)
230 # Performs the +cmd+ request with +args+.
231 def do_request(cmd, args, idempotent = false)
232 no_raise = args.delete(:ruby_no_raise)
233 request = make_request(cmd, args)
236 @mutex.synchronize do
238 io = dispatch_unlocked(request)
239 line = io.timed_gets(@timeout)
240 break if /\r?\n\z/ =~ line
242 line and raise MogileFS::InvalidResponseError,
243 "Invalid response from server: #{line.inspect}"
246 raise EOFError, "end of file reached after: #{request.inspect}"
247 # fall through to retry in loop
248 rescue SystemCallError,
249 MogileFS::InvalidResponseError # truncated response
250 # we got a successful timed_write, but not a timed_gets
253 shutdown_unlocked(false)
256 shutdown_unlocked(true)
257 rescue MogileFS::UnreadableSocketError, MogileFS::Timeout
258 shutdown_unlocked(true)
260 # we DO NOT want the response we timed out waiting for, to crop up later
261 # on, on the same socket, intersperesed with a subsequent request! we
262 # close the socket if there's any error.
263 shutdown_unlocked(true)
265 shutdown_unlocked if failed
266 end # @mutex.synchronize
267 parse_response(line, no_raise ? request : nil)
270 # Makes a new request string for +cmd+ and +args+.
271 def make_request(cmd, args)
272 "#{cmd} #{url_encode args}\r\n"
275 # this converts an error code from a mogilefsd tracker to an exception
276 # Most of these exceptions should already be defined, but since the
277 # MogileFS server code is liable to change and we may not always be
278 # able to keep up with the changes
280 BACKEND_ERRORS[err_snake] || self.class.add_error(err_snake)
283 # Turns the +line+ response from the server into a Hash of options, an
284 # error, or raises, as appropriate.
285 def parse_response(line, request = nil)
287 when /\AOK\s+\d*\s*(\S*)\r?\n\z/
289 when /\AERR\s+(\w+)\s*([^\r\n]*)/
291 @lasterrstr = $2 ? url_unescape($2) : nil
293 request = " request=#{request.strip}"
294 @lasterrstr = @lasterrstr ? (@lasterrstr << request) : request
295 return error(@lasterr).new(@lasterrstr)
297 raise error(@lasterr).new(@lasterrstr)
299 raise MogileFS::InvalidResponseError,
300 "Invalid response from server: #{line.inspect}"
304 # this command is special since the cache is per-tracker, so we connect
305 # to all backends and not just one
306 def clear_cache(types = %w(all))
308 types.each { |type| opts[type] = 1 }
310 sockets = @hosts.map do |host|
311 MogileFS::Socket.start(*(host.split(/:/))) rescue nil
317 request = make_request("clear_cache", opts)
318 while wpending[0] || rpending[0]
319 r = IO.select(rpending, wpending, nil, @timeout) or return
322 r[0].each { |io| io.timed_gets(0) rescue nil }
325 io.timed_write(request, 0)
333 sockets.each { |io| io.close }
336 # Returns a socket connected to a MogileFS tracker.
338 return @socket if @socket and not @socket.closed?
340 @hosts.shuffle.each do |host|
341 next if dead = @dead[host] and dead[0] > (Time.now - @fail_timeout)
344 addr, port = host.split(/:/)
345 @socket = MogileFS::Socket.tcp(addr, port, @timeout)
347 rescue SystemCallError, MogileFS::Timeout => err
348 @dead[host] = [ Time.now, err ]
355 errors = @dead.map { |host,(_,e)| "#{host} - #{e.message} (#{e.class})" }
356 raise MogileFS::UnreachableBackendError,
357 "couldn't connect to any tracker: #{errors.join(', ')}"
360 # Turns a url params string into a Hash.
361 def url_decode(str) # :nodoc:
363 str.split(/&/).each do |pair|
364 k, v = pair.split(/=/, 2).map! { |x| url_unescape(x) }
371 # TODO: see if we can use existing URL-escape/unescaping routines
372 # in the Ruby standard library, Perl MogileFS seems to NIH these
376 # Turns a Hash (or Array of pairs) into a url params string.
377 def url_encode(params) # :nodoc:
379 "#{url_escape k.to_s}=#{url_escape v.to_s}"
383 # Escapes naughty URL characters.
384 if ''.respond_to?(:ord) # Ruby 1.9
385 def url_escape(str) # :nodoc:
386 str.gsub(/([^\w\,\-.\/\\\: ])/) { "%%%02x" % $1.ord }.tr(' ', '+')
389 def url_escape(str) # :nodoc:
390 str.gsub(/([^\w\,\-.\/\\\: ])/) { "%%%02x" % $1[0] }.tr(' ', '+')
394 # Unescapes naughty URL characters.
395 def url_unescape(str) # :nodoc:
396 str.tr('+', ' ').gsub(/%([a-f0-9][a-f0-9])/i) { [$1.to_i(16)].pack 'C' }