admin: add clear_cache command
[ruby-mogilefs-client.git] / lib / mogilefs / backend.rb
blobb754bbcc4d3e3d342bc5add6e96bbd398aa9ecfc
1 # -*- encoding: binary -*-
2 require 'thread'
4 # This class communicates with the MogileFS trackers.
5 # You should not have to use this directly unless you are developing
6 # support for new commands or plugins for MogileFS
7 class MogileFS::Backend
9   # Adds MogileFS commands +names+.
10   def self.add_command(*names)
11     names.each do |name|
12       define_method name do |*args|
13         do_request(name, args[0] || {}, false)
14       end
15     end
16   end
18   # adds idempotent MogileFS commands +names+, these commands may be retried
19   # transparently on a different tracker if there is a network/server error.
20   def self.add_idempotent_command(*names)
21     names.each do |name|
22       define_method name do |*args|
23         do_request(name, args[0] || {}, true)
24       end
25     end
26   end
28   BACKEND_ERRORS = {} # :nodoc:
30   # this converts an error code from a mogilefsd tracker to an exception:
31   #
32   # Examples of some exceptions that get created:
33   #   class AfterMismatchError < MogileFS::Error; end
34   #   class DomainNotFoundError < MogileFS::Error; end
35   #   class InvalidCharsError < MogileFS::Error; end
36   def self.add_error(err_snake)
37     err_camel = err_snake.gsub(/(?:^|_)([a-z])/) { $1.upcase }
38     err_camel << 'Error' unless /Error\z/ =~ err_camel
39     unless const_defined?(err_camel)
40       const_set(err_camel, Class.new(MogileFS::Error))
41     end
42     BACKEND_ERRORS[err_snake] = const_get(err_camel)
43   end
45   ##
46   # The last error
48   attr_reader :lasterr
50   ##
51   # The string attached to the last error
53   attr_reader :lasterrstr
55   ##
56   # Creates a new MogileFS::Backend.
57   #
58   # :hosts is a required argument and must be an Array containing one or more
59   # 'hostname:port' pairs as Strings.
60   #
61   # :timeout adjusts the request timeout before an error is returned.
63   def initialize(args)
64     @hosts = args[:hosts]
65     raise ArgumentError, "must specify at least one host" unless @hosts
66     raise ArgumentError, "must specify at least one host" if @hosts.empty?
67     unless @hosts == @hosts.select { |h| h =~ /:\d+$/ } then
68       raise ArgumentError, ":hosts must be in 'host:port' form"
69     end
71     @mutex = Mutex.new
72     @timeout = args[:timeout] || 3
73     @socket = nil
74     @lasterr = nil
75     @lasterrstr = nil
76     @pending = []
78     @dead = {}
79   end
81   ##
82   # Closes this backend's socket.
84   def shutdown
85     @mutex.synchronize { shutdown_unlocked }
86   end
88   # MogileFS::MogileFS commands
90   add_command :create_open
91   add_command :create_close
92   add_idempotent_command :get_paths
93   add_command :delete
94   add_idempotent_command :sleep
95   add_command :rename
96   add_idempotent_command :list_keys
97   add_idempotent_command :file_info
98   add_idempotent_command :file_debug
100   # MogileFS::Backend commands
102   add_idempotent_command :get_hosts
103   add_idempotent_command :get_devices
104   add_idempotent_command :list_fids
105   add_idempotent_command :stats
106   add_idempotent_command :get_domains
107   add_command :create_domain
108   add_command :delete_domain
109   add_command :create_class
110   add_command :update_class
111   add_command :delete_class
112   add_command :create_host
113   add_command :update_host
114   add_command :delete_host
115   add_command :set_state
116   add_command :replicate_now
118   # Errors copied from MogileFS/Worker/Query.pm
119   add_error 'dup'
120   add_error 'after_mismatch'
121   add_error 'bad_params'
122   add_error 'class_exists'
123   add_error 'class_has_files'
124   add_error 'class_not_found'
125   add_error 'db'
126   add_error 'domain_has_files'
127   add_error 'domain_exists'
128   add_error 'domain_not_empty'
129   add_error 'domain_not_found'
130   add_error 'failure'
131   add_error 'host_exists'
132   add_error 'host_mismatch'
133   add_error 'host_not_empty'
134   add_error 'host_not_found'
135   add_error 'invalid_chars'
136   add_error 'invalid_checker_level'
137   add_error 'invalid_mindevcount'
138   add_error 'key_exists'
139   add_error 'no_class'
140   add_error 'no_devices'
141   add_error 'no_domain'
142   add_error 'no_host'
143   add_error 'no_ip'
144   add_error 'no_key'
145   add_error 'no_port'
146   add_error 'none_match'
147   add_error 'plugin_aborted'
148   add_error 'state_too_high'
149   add_error 'size_verify_error'
150   add_error 'unknown_command'
151   add_error 'unknown_host'
152   add_error 'unknown_key'
153   add_error 'unknown_state'
154   add_error 'unreg_domain'
156   def shutdown_unlocked(do_raise = false) # :nodoc:
157     @pending = []
158     if @socket
159       @socket.close rescue nil # ignore errors
160       @socket = nil
161     end
162     raise if do_raise
163   end
165   def dispatch_unlocked(request, timeout = @timeout) # :nodoc:
166     begin
167       io = socket
168       io.timed_write(request, timeout)
169       io
170     rescue SystemCallError, MogileFS::RequestTruncatedError  => err
171       @dead[@active_host] = [ Time.now, err ]
172       shutdown_unlocked
173       retry
174     end
175   end
177   def pipeline_gets_unlocked(io, timeout) # :nodoc:
178     line = io.timed_gets(timeout) or
179       raise MogileFS::PipelineError,
180             "EOF with #{@pending.size} requests in-flight"
181     ready = @pending.shift
182     ready[1].call(parse_response(line, ready[0]))
183   end
185   def timeout_update(timeout, t0) # :nodoc:
186     timeout -= (Time.now - t0)
187     timeout < 0 ? 0 : timeout
188   end
190   # try to read any responses we have pending already before filling
191   # the pipeline more requests.  This usually takes very little time,
192   # but trackers may return huge responses and we could be on a slow
193   # network.
194   def pipeline_drain_unlocked(io, timeout) # :nodoc:
195     set = [ io ]
196     while @pending.size > 0
197       t0 = Time.now
198       r = IO.select(set, set, nil, timeout)
199       timeout = timeout_update(timeout, t0)
201       if r && r[0][0]
202         t0 = Time.now
203         pipeline_gets_unlocked(io, timeout)
204         timeout = timeout_update(timeout, t0)
205       else
206         return timeout
207       end
208     end
209     timeout
210   end
212   # dispatch a request like do_request, but queue +block+ for execution
213   # upon receiving a response.  It is the users' responsibility to ensure
214   # &block is executed in the correct order.  Trackers with multiple
215   # queryworkers are not guaranteed to return responses in the same
216   # order they were requested.
217   def pipeline_dispatch(cmd, args, &block) # :nodoc:
218     request = make_request(cmd, args)
219     timeout = @timeout
221     @mutex.synchronize do
222       io = socket
223       timeout = pipeline_drain_unlocked(io, timeout)
225       # send the request out...
226       begin
227         io.timed_write(request, timeout)
228         @pending << [ request, block ]
229       rescue SystemCallError, MogileFS::RequestTruncatedError => err
230         @dead[@active_host] = [ Time.now, err ]
231         shutdown_unlocked(@pending[0])
232         io = socket
233         retry
234       end
236       @pending.size
237     end
238   end
240   def pipeline_wait(count = nil) # :nodoc:
241     @mutex.synchronize do
242       io = socket
243       count ||= @pending.size
244       @pending.size < count and
245         raise MogileFS::Error,
246               "pending=#{@pending.size} < expected=#{count} failed"
247       begin
248         count.times { pipeline_gets_unlocked(io, @timeout) }
249       rescue
250         shutdown_unlocked(true)
251       end
252     end
253   end
255   # Performs the +cmd+ request with +args+.
256   def do_request(cmd, args, idempotent = false)
257     request = make_request cmd, args
258     @mutex.synchronize do
259       begin
260         io = dispatch_unlocked(request)
261         line = io.timed_gets(@timeout) and return parse_response(line)
263         idempotent or
264           raise EOFError, "end of file reached after: #{request.inspect}"
265         # fall through to retry in loop
266       rescue SystemCallError,
267              MogileFS::UnreadableSocketError,
268              MogileFS::InvalidResponseError, # truncated response
269              MogileFS::Timeout
270         # we got a successful timed_write, but not a timed_gets
271         retry if idempotent
272         shutdown_unlocked(true)
273       rescue
274         # we DO NOT want the response we timed out waiting for, to crop up later
275         # on, on the same socket, intersperesed with a subsequent request!  we
276         # close the socket if there's any error.
277         shutdown_unlocked(true)
278       end while idempotent
279     end # @mutex.synchronize
280   end
282   # Makes a new request string for +cmd+ and +args+.
283   def make_request(cmd, args)
284     "#{cmd} #{url_encode args}\r\n"
285   end
287   # this converts an error code from a mogilefsd tracker to an exception
288   # Most of these exceptions should already be defined, but since the
289   # MogileFS server code is liable to change and we may not always be
290   # able to keep up with the changes
291   def error(err_snake)
292     BACKEND_ERRORS[err_snake] || self.class.add_error(err_snake)
293   end
295   # Turns the +line+ response from the server into a Hash of options, an
296   # error, or raises, as appropriate.
297   def parse_response(line, request = nil)
298     if line =~ /^ERR\s+(\w+)\s*([^\r\n]*)/
299       @lasterr = $1
300       @lasterrstr = $2 ? url_unescape($2) : nil
301       if request
302         request = " request=#{request.strip}"
303         @lasterrstr = @lasterrstr ? (@lasterrstr << request) : request
304         return error(@lasterr).new(@lasterrstr)
305       end
306       raise error(@lasterr).new(@lasterrstr)
307     end
309     return url_decode($1) if line =~ /^OK\s+\d*\s*(\S*)\r\n\z/
311     raise MogileFS::InvalidResponseError,
312           "Invalid response from server: #{line.inspect}"
313   end
315   # this command is special since the cache is per-tracker, so we connect
316   # to all backends and not just one
317   def clear_cache(types = %w(all))
318     opts = {}
319     types.each { |type| opts[type] = 1 }
321     sockets = @hosts.map do |host|
322       MogileFS::Socket.start(*(host.split(/:/))) rescue nil
323     end
324     sockets.compact!
326     wpending = sockets
327     rpending = []
328     request = make_request("clear_cache", opts)
329     while wpending[0] || rpending[0]
330       r = IO.select(rpending, wpending, nil, @timeout) or return
331       rpending -= r[0]
332       wpending -= r[1]
333       r[0].each { |io| io.timed_gets(0) rescue nil }
334       r[1].each do |io|
335         begin
336           io.timed_write(request, 0)
337           rpending << io
338         rescue
339         end
340       end
341     end
342     nil
343     ensure
344       sockets.each { |io| io.close }
345   end
347   # Returns a socket connected to a MogileFS tracker.
348   def socket
349     return @socket if @socket and not @socket.closed?
351     now = Time.now
353     @hosts.shuffle.each do |host|
354       next if @dead.include?(host) and @dead[host][0] > now - 5
356       begin
357         addr, port = host.split(/:/)
358         @socket = MogileFS::Socket.tcp(addr, port, @timeout)
359         @active_host = host
360       rescue SystemCallError, MogileFS::Timeout => err
361         @dead[host] = [ now, err ]
362         next
363       end
365       return @socket
366     end
368     errors = @dead.map { |host,(_,e)| "#{host} - #{e.message} (#{e.class})" }
369     raise MogileFS::UnreachableBackendError,
370           "couldn't connect to any tracker: #{errors.join(', ')}"
371   end
373   # Turns a url params string into a Hash.
374   def url_decode(str) # :nodoc:
375     Hash[*(str.split(/&/).map! { |pair|
376       pair.split(/=/, 2).map! { |x| url_unescape(x) }
377     } ).flatten]
378   end
380   # :stopdoc:
381   # TODO: see if we can use existing URL-escape/unescaping routines
382   # in the Ruby standard library, Perl MogileFS seems to NIH these
383   #  routines, too
384   # :startdoc:
386   # Turns a Hash (or Array of pairs) into a url params string.
387   def url_encode(params) # :nodoc:
388     params.map do |k,v|
389       "#{url_escape k.to_s}=#{url_escape v.to_s}"
390     end.join("&")
391   end
393   # Escapes naughty URL characters.
394   if ''.respond_to?(:ord) # Ruby 1.9
395     def url_escape(str) # :nodoc:
396       str.gsub(/([^\w\,\-.\/\\\: ])/) { "%%%02x" % $1.ord }.tr(' ', '+')
397     end
398   else # Ruby 1.8
399     def url_escape(str) # :nodoc:
400       str.gsub(/([^\w\,\-.\/\\\: ])/) { "%%%02x" % $1[0] }.tr(' ', '+')
401     end
402   end
404   # Unescapes naughty URL characters.
405   def url_unescape(str) # :nodoc:
406     str.tr('+', ' ').gsub(/%([a-f0-9][a-f0-9])/i) { [$1.to_i(16)].pack 'C' }
407   end