1 # -*- encoding: binary -*-
4 # This class communicates with the MogileFS trackers.
5 # You should not have to use this directly unless you are developing
6 # support for new commands or plugins for MogileFS
7 class MogileFS::Backend
9 # Adds MogileFS commands +names+.
10 def self.add_command(*names)
12 define_method name do |*args|
13 do_request(name, args[0] || {}, false)
18 # adds idempotent MogileFS commands +names+, these commands may be retried
19 # transparently on a different tracker if there is a network/server error.
20 def self.add_idempotent_command(*names)
22 define_method name do |*args|
23 do_request(name, args[0] || {}, true)
28 BACKEND_ERRORS = {} # :nodoc:
30 # this converts an error code from a mogilefsd tracker to an exception:
32 # Examples of some exceptions that get created:
33 # class AfterMismatchError < MogileFS::Error; end
34 # class DomainNotFoundError < MogileFS::Error; end
35 # class InvalidCharsError < MogileFS::Error; end
36 def self.add_error(err_snake)
37 err_camel = err_snake.gsub(/(?:^|_)([a-z])/) { $1.upcase }
38 err_camel << 'Error' unless /Error\z/ =~ err_camel
39 unless const_defined?(err_camel)
40 const_set(err_camel, Class.new(MogileFS::Error))
42 BACKEND_ERRORS[err_snake] = const_get(err_camel)
45 def self.const_missing(name) # :nodoc:
46 if /Error\z/ =~ name.to_s
47 const_set(name, Class.new(MogileFS::Error))
59 # The string attached to the last error
61 attr_reader :lasterrstr
64 # Creates a new MogileFS::Backend.
66 # :hosts is a required argument and must be an Array containing one or more
67 # 'hostname:port' pairs as Strings.
69 # :timeout adjusts the request timeout before an error is returned.
73 @fail_timeout = args[:fail_timeout] || 5
74 raise ArgumentError, "must specify at least one host" unless @hosts
75 raise ArgumentError, "must specify at least one host" if @hosts.empty?
76 unless @hosts == @hosts.select { |h| h =~ /:\d+$/ } then
77 raise ArgumentError, ":hosts must be in 'host:port' form"
81 @timeout = args[:timeout] || 3
91 # Closes this backend's socket.
94 @mutex.synchronize { shutdown_unlocked }
97 # MogileFS::MogileFS commands
99 add_command :create_open
100 add_command :create_close
101 add_idempotent_command :get_paths
102 add_idempotent_command :noop
104 add_idempotent_command :sleep
106 add_idempotent_command :list_keys
107 add_idempotent_command :file_info
108 add_idempotent_command :file_debug
110 # MogileFS::Backend commands
112 add_idempotent_command :get_hosts
113 add_idempotent_command :get_devices
114 add_idempotent_command :list_fids
115 add_idempotent_command :stats
116 add_idempotent_command :get_domains
117 add_command :create_device
118 add_command :create_domain
119 add_command :delete_domain
120 add_command :create_class
121 add_command :update_class
122 add_command :updateclass
123 add_command :delete_class
124 add_command :create_host
125 add_command :update_host
126 add_command :delete_host
127 add_command :set_state
128 add_command :set_weight
129 add_command :replicate_now
131 def shutdown_unlocked(do_raise = false) # :nodoc:
134 @socket.close rescue nil # ignore errors
140 def dispatch_unlocked(request, timeout = @timeout) # :nodoc:
144 io.timed_write(request, timeout)
146 rescue SystemCallError, MogileFS::RequestTruncatedError => err
147 tries ||= Hash.new { |hash,host| hash[host] = 0 }
148 nr = tries[@active_host] += 1
150 @dead[@active_host] = [ Time.now, err ]
157 def pipeline_gets_unlocked(io, timeout) # :nodoc:
158 line = io.timed_gets(timeout) or
159 raise MogileFS::PipelineError,
160 "EOF with #{@pending.size} requests in-flight"
161 ready = @pending.shift
162 ready[1].call(parse_response(line, ready[0]))
165 def timeout_update(timeout, t0) # :nodoc:
166 timeout -= (Time.now - t0)
167 timeout < 0 ? 0 : timeout
170 # try to read any responses we have pending already before filling
171 # the pipeline more requests. This usually takes very little time,
172 # but trackers may return huge responses and we could be on a slow
174 def pipeline_drain_unlocked(io, timeout) # :nodoc:
176 while @pending.size > 0
178 r = IO.select(set, set, nil, timeout)
179 timeout = timeout_update(timeout, t0)
183 pipeline_gets_unlocked(io, timeout)
184 timeout = timeout_update(timeout, t0)
192 # dispatch a request like do_request, but queue +block+ for execution
193 # upon receiving a response. It is the users' responsibility to ensure
194 # &block is executed in the correct order. Trackers with multiple
195 # queryworkers are not guaranteed to return responses in the same
196 # order they were requested.
197 def pipeline_dispatch(cmd, args, &block) # :nodoc:
198 request = make_request(cmd, args)
201 @mutex.synchronize do
203 timeout = pipeline_drain_unlocked(io, timeout)
205 # send the request out...
207 io.timed_write(request, timeout)
208 @pending << [ request, block ]
209 rescue SystemCallError, MogileFS::RequestTruncatedError => err
210 @dead[@active_host] = [ Time.now, err ]
211 shutdown_unlocked(@pending[0])
220 def pipeline_wait(count = nil) # :nodoc:
221 @mutex.synchronize do
223 count ||= @pending.size
224 @pending.size < count and
225 raise MogileFS::Error,
226 "pending=#{@pending.size} < expected=#{count} failed"
228 count.times { pipeline_gets_unlocked(io, @timeout) }
230 shutdown_unlocked(true)
235 # Performs the +cmd+ request with +args+.
236 def do_request(cmd, args, idempotent = false)
237 no_raise = args.delete(:ruby_no_raise)
238 request = make_request(cmd, args)
241 @mutex.synchronize do
243 io = dispatch_unlocked(request)
244 line = io.timed_gets(@timeout)
245 break if /\r?\n\z/ =~ line
247 line and raise MogileFS::InvalidResponseError,
248 "Invalid response from server: #{line.inspect}"
251 raise EOFError, "end of file reached after: #{request.inspect}"
252 # fall through to retry in loop
253 rescue SystemCallError,
254 MogileFS::InvalidResponseError # truncated response
255 # we got a successful timed_write, but not a timed_gets
258 shutdown_unlocked(false)
261 shutdown_unlocked(true)
262 rescue MogileFS::UnreadableSocketError, MogileFS::Timeout
263 shutdown_unlocked(true)
265 # we DO NOT want the response we timed out waiting for, to crop up later
266 # on, on the same socket, intersperesed with a subsequent request! we
267 # close the socket if there's any error.
268 shutdown_unlocked(true)
270 shutdown_unlocked if failed
271 end # @mutex.synchronize
272 parse_response(line, no_raise ? request : nil)
275 # Makes a new request string for +cmd+ and +args+.
276 def make_request(cmd, args)
277 "#{cmd} #{url_encode args}\r\n"
280 # this converts an error code from a mogilefsd tracker to an exception
281 # Most of these exceptions should already be defined, but since the
282 # MogileFS server code is liable to change and we may not always be
283 # able to keep up with the changes
285 BACKEND_ERRORS[err_snake] || self.class.add_error(err_snake)
288 # Turns the +line+ response from the server into a Hash of options, an
289 # error, or raises, as appropriate.
290 def parse_response(line, request = nil)
292 when /\AOK\s+\d*\s*(\S*)\r?\n\z/
294 when /\AERR\s+(\w+)\s*([^\r\n]*)/
296 @lasterrstr = $2 ? url_unescape($2) : nil
298 request = " request=#{request.strip}"
299 @lasterrstr = @lasterrstr ? (@lasterrstr << request) : request
300 return error(@lasterr).new(@lasterrstr)
302 raise error(@lasterr).new(@lasterrstr)
304 raise MogileFS::InvalidResponseError,
305 "Invalid response from server: #{line.inspect}"
309 # this command is special since the cache is per-tracker, so we connect
310 # to all backends and not just one
311 def clear_cache(types = %w(all))
313 types.each { |type| opts[type] = 1 }
315 sockets = @hosts.map do |host|
316 MogileFS::Socket.start(*(host.split(/:/))) rescue nil
322 request = make_request("clear_cache", opts)
323 while wpending[0] || rpending[0]
324 r = IO.select(rpending, wpending, nil, @timeout) or return
327 r[0].each { |io| io.timed_gets(0) rescue nil }
330 io.timed_write(request, 0)
338 sockets.each { |io| io.close }
341 # Returns a socket connected to a MogileFS tracker.
343 return @socket if @socket and not @socket.closed?
345 @hosts.shuffle.each do |host|
346 next if dead = @dead[host] and dead[0] > (Time.now - @fail_timeout)
349 addr, port = host.split(/:/)
350 @socket = MogileFS::Socket.tcp(addr, port, @timeout)
352 rescue SystemCallError, MogileFS::Timeout => err
353 @dead[host] = [ Time.now, err ]
360 errors = @dead.map { |host,(_,e)| "#{host} - #{e.message} (#{e.class})" }
361 raise MogileFS::UnreachableBackendError,
362 "couldn't connect to any tracker: #{errors.join(', ')}"
365 # Turns a url params string into a Hash.
366 def url_decode(str) # :nodoc:
368 str.split(/&/).each do |pair|
369 k, v = pair.split(/=/, 2).map! { |x| url_unescape(x) }
376 # TODO: see if we can use existing URL-escape/unescaping routines
377 # in the Ruby standard library, Perl MogileFS seems to NIH these
381 # Turns a Hash (or Array of pairs) into a url params string.
382 def url_encode(params) # :nodoc:
384 "#{url_escape k.to_s}=#{url_escape v.to_s}"
388 # Escapes naughty URL characters.
389 if ''.respond_to?(:ord) # Ruby 1.9
390 def url_escape(str) # :nodoc:
391 str.gsub(/([^\w\,\-.\/\\\: ])/) { "%%%02x" % $1.ord }.tr(' ', '+')
394 def url_escape(str) # :nodoc:
395 str.gsub(/([^\w\,\-.\/\\\: ])/) { "%%%02x" % $1[0] }.tr(' ', '+')
399 # Unescapes naughty URL characters.
400 def url_unescape(str) # :nodoc:
401 str.tr('+', ' ').gsub(/%([a-f0-9][a-f0-9])/i) { [$1.to_i(16)].pack 'C' }