Ruby mogilefs-client 3.12.2
[ruby-mogilefs-client.git] / lib / mogilefs / backend.rb
blob6d5fe17d29875f47fa311147817860ab38c1ba7c
1 # -*- encoding: binary -*-
2 require 'thread'
4 # This class communicates with the MogileFS trackers.
5 # You should not have to use this directly unless you are developing
6 # support for new commands or plugins for MogileFS
7 class MogileFS::Backend
9   # Adds MogileFS commands +names+.
10   def self.add_command(*names)
11     names.each do |name|
12       define_method name do |*args|
13         do_request(name, args[0] || {}, false)
14       end
15     end
16   end
18   # adds idempotent MogileFS commands +names+, these commands may be retried
19   # transparently on a different tracker if there is a network/server error.
20   def self.add_idempotent_command(*names)
21     names.each do |name|
22       define_method name do |*args|
23         do_request(name, args[0] || {}, true)
24       end
25     end
26   end
28   BACKEND_ERRORS = {} # :nodoc:
30   # this converts an error code from a mogilefsd tracker to an exception:
31   #
32   # Examples of some exceptions that get created:
33   #   class AfterMismatchError < MogileFS::Error; end
34   #   class DomainNotFoundError < MogileFS::Error; end
35   #   class InvalidCharsError < MogileFS::Error; end
36   def self.add_error(err_snake)
37     err_camel = err_snake.gsub(/(?:^|_)([a-z])/) { $1.upcase }
38     err_camel << 'Error' unless /Error\z/ =~ err_camel
39     unless const_defined?(err_camel)
40       const_set(err_camel, Class.new(MogileFS::Error))
41     end
42     BACKEND_ERRORS[err_snake] = const_get(err_camel)
43   end
45   def self.const_missing(name) # :nodoc:
46     if /Error\z/ =~ name.to_s
47       const_set(name, Class.new(MogileFS::Error))
48     else
49       super name
50     end
51   end
53   ##
54   # The last error
56   attr_reader :lasterr
58   ##
59   # The string attached to the last error
61   attr_reader :lasterrstr
63   ##
64   # Creates a new MogileFS::Backend.
65   #
66   # :hosts is a required argument and must be an Array containing one or more
67   # 'hostname:port' pairs as Strings.
68   #
69   # :timeout adjusts the request timeout before an error is returned.
71   def initialize(args)
72     @hosts = args[:hosts]
73     @fail_timeout = args[:fail_timeout] || 5
74     raise ArgumentError, "must specify at least one host" unless @hosts
75     raise ArgumentError, "must specify at least one host" if @hosts.empty?
76     unless @hosts == @hosts.select { |h| h =~ /:\d+$/ } then
77       raise ArgumentError, ":hosts must be in 'host:port' form"
78     end
80     @mutex = Mutex.new
81     @timeout = args[:timeout] || 3
82     @connect_timeout = args[:connect_timeout] || @timeout
83     @socket = nil
84     @lasterr = nil
85     @lasterrstr = nil
86     @pending = []
88     @dead = {}
89   end
91   ##
92   # Closes this backend's socket.
94   def shutdown
95     @mutex.synchronize { shutdown_unlocked }
96   end
98   # MogileFS::MogileFS commands
100   add_command :create_open
101   add_command :create_close
102   add_idempotent_command :get_paths
103   add_idempotent_command :noop
104   add_command :delete
105   add_idempotent_command :sleep
106   add_command :rename
107   add_idempotent_command :list_keys
108   add_idempotent_command :file_info
109   add_idempotent_command :file_debug
111   # MogileFS::Backend commands
113   add_idempotent_command :get_hosts
114   add_idempotent_command :get_devices
115   add_idempotent_command :list_fids
116   add_idempotent_command :stats
117   add_idempotent_command :get_domains
118   add_command :create_device
119   add_command :create_domain
120   add_command :delete_domain
121   add_command :create_class
122   add_command :update_class
123   add_command :updateclass
124   add_command :delete_class
125   add_command :create_host
126   add_command :update_host
127   add_command :delete_host
128   add_command :set_state
129   add_command :set_weight
130   add_command :replicate_now
132   def shutdown_unlocked(do_raise = false) # :nodoc:
133     @pending = []
134     if @socket
135       @socket.close rescue nil # ignore errors
136       @socket = nil
137     end
138     raise if do_raise
139   end
141   def dispatch_unlocked(request, timeout = @timeout) # :nodoc:
142     tries = nil
143     begin
144       io = socket
145       io.timed_write(request, timeout)
146       io
147     rescue SystemCallError, MogileFS::RequestTruncatedError => err
148       tries ||= Hash.new { |hash,host| hash[host] = 0 }
149       nr = tries[@active_host] += 1
150       if nr >= 2
151         @dead[@active_host] = [ MogileFS.now, err ]
152       end
153       shutdown_unlocked
154       retry
155     end
156   end
158   def pipeline_gets_unlocked(io, timeout) # :nodoc:
159     line = io.timed_gets(timeout) or
160       raise MogileFS::PipelineError,
161             "EOF with #{@pending.size} requests in-flight"
162     ready = @pending.shift
163     ready[1].call(parse_response(line, ready[0]))
164   end
166   def timeout_update(timeout, t0) # :nodoc:
167     timeout -= (MogileFS.now - t0)
168     timeout < 0 ? 0 : timeout
169   end
171   # try to read any responses we have pending already before filling
172   # the pipeline more requests.  This usually takes very little time,
173   # but trackers may return huge responses and we could be on a slow
174   # network.
175   def pipeline_drain_unlocked(io, timeout) # :nodoc:
176     set = [ io ]
177     while @pending.size > 0
178       t0 = MogileFS.now
179       r = IO.select(set, set, nil, timeout)
180       timeout = timeout_update(timeout, t0)
182       if r && r[0][0]
183         t0 = MogileFS.now
184         pipeline_gets_unlocked(io, timeout)
185         timeout = timeout_update(timeout, t0)
186       else
187         return timeout
188       end
189     end
190     timeout
191   end
193   # dispatch a request like do_request, but queue +block+ for execution
194   # upon receiving a response.  It is the users' responsibility to ensure
195   # &block is executed in the correct order.  Trackers with multiple
196   # queryworkers are not guaranteed to return responses in the same
197   # order they were requested.
198   def pipeline_dispatch(cmd, args, &block) # :nodoc:
199     request = make_request(cmd, args)
200     timeout = @timeout
202     @mutex.synchronize do
203       io = socket
204       timeout = pipeline_drain_unlocked(io, timeout)
206       # send the request out...
207       begin
208         io.timed_write(request, timeout)
209         @pending << [ request, block ]
210       rescue SystemCallError, MogileFS::RequestTruncatedError => err
211         @dead[@active_host] = [ MogileFS.now, err ]
212         shutdown_unlocked(@pending[0])
213         io = socket
214         retry
215       end
217       @pending.size
218     end
219   end
221   def pipeline_wait(count = nil) # :nodoc:
222     @mutex.synchronize do
223       io = socket
224       count ||= @pending.size
225       @pending.size < count and
226         raise MogileFS::Error,
227               "pending=#{@pending.size} < expected=#{count} failed"
228       begin
229         count.times { pipeline_gets_unlocked(io, @timeout) }
230       rescue
231         shutdown_unlocked(true)
232       end
233     end
234   end
236   # Performs the +cmd+ request with +args+.
237   def do_request(cmd, args, idempotent = false)
238     no_raise = args.delete(:ruby_no_raise)
239     request = make_request(cmd, args)
240     line = nil
241     failed = false
242     @mutex.synchronize do
243       begin
244         io = dispatch_unlocked(request)
245         line = io.timed_gets(@timeout)
246         break if /\n\z/ =~ line
248         line and raise MogileFS::InvalidResponseError,
249                        "Invalid response from server: #{line.inspect}"
251         idempotent or
252           raise EOFError, "end of file reached after: #{request.inspect}"
253         # fall through to retry in loop
254       rescue SystemCallError,
255              MogileFS::InvalidResponseError # truncated response
256         # we got a successful timed_write, but not a timed_gets
257         if idempotent
258           failed = true
259           shutdown_unlocked(false)
260           retry
261         end
262         shutdown_unlocked(true)
263       rescue MogileFS::UnreadableSocketError, MogileFS::Timeout
264         shutdown_unlocked(true)
265       rescue
266         # we DO NOT want the response we timed out waiting for, to crop up later
267         # on, on the same socket, intersperesed with a subsequent request!  we
268         # close the socket if there's any error.
269         shutdown_unlocked(true)
270       end while idempotent
271       shutdown_unlocked if failed
272     end # @mutex.synchronize
273     parse_response(line, no_raise ? request : nil)
274   end
276   # Makes a new request string for +cmd+ and +args+.
277   def make_request(cmd, args)
278     "#{cmd} #{url_encode args}\r\n"
279   end
281   # this converts an error code from a mogilefsd tracker to an exception
282   # Most of these exceptions should already be defined, but since the
283   # MogileFS server code is liable to change and we may not always be
284   # able to keep up with the changes
285   def error(err_snake)
286     BACKEND_ERRORS[err_snake] || self.class.add_error(err_snake)
287   end
289   # Turns the +line+ response from the server into a Hash of options, an
290   # error, or raises, as appropriate.
291   def parse_response(line, request = nil)
292     case line
293     when /\AOK\s+\d*\s*(\S*)\r?\n\z/
294       url_decode($1)
295     when /\AERR\s+(\w+)\s*([^\r\n]*)/
296       @lasterr = $1
297       @lasterrstr = $2 ? url_unescape($2) : nil
298       if request
299         request = " request=#{request.strip}"
300         @lasterrstr = @lasterrstr ? (@lasterrstr << request) : request
301         return error(@lasterr).new(@lasterrstr)
302       end
303       raise error(@lasterr).new(@lasterrstr)
304     else
305       raise MogileFS::InvalidResponseError,
306             "Invalid response from server: #{line.inspect}"
307     end
308   end
310   # this command is special since the cache is per-tracker, so we connect
311   # to all backends and not just one
312   def clear_cache(types = %w(all))
313     opts = {}
314     types.each { |type| opts[type] = 1 }
316     sockets = @hosts.map do |host|
317       MogileFS::Socket.start(*(host.split(':'.freeze))) rescue nil
318     end
319     sockets.compact!
321     wpending = sockets
322     rpending = []
323     request = make_request("clear_cache", opts)
324     while wpending[0] || rpending[0]
325       r = IO.select(rpending, wpending, nil, @timeout) or return
326       rpending -= r[0]
327       wpending -= r[1]
328       r[0].each { |io| io.timed_gets(0) rescue nil }
329       r[1].each do |io|
330         begin
331           io.timed_write(request, 0)
332           rpending << io
333         rescue
334         end
335       end
336     end
337     nil
338   ensure
339     sockets.each { |io| io.close }
340   end
342   # Returns a socket connected to a MogileFS tracker.
343   def socket
344     return @socket if @socket and not @socket.closed?
346     @hosts.shuffle.each do |host|
347       next if dead = @dead[host] and dead[0] > (MogileFS.now - @fail_timeout)
349       begin
350         addr, port = host.split(':'.freeze)
351         @socket = MogileFS::Socket.tcp(addr, port, @connect_timeout)
352         @active_host = host
353       rescue SystemCallError, MogileFS::Timeout => err
354         @dead[host] = [ MogileFS.now, err ]
355         next
356       end
358       return @socket
359     end
361     errors = @dead.map { |host,(_,e)| "#{host} - #{e.message} (#{e.class})" }
362     raise MogileFS::UnreachableBackendError,
363           "couldn't connect to any tracker: #{errors.join(', ')}"
364   end
366   # Turns a url params string into a Hash.
367   def url_decode(str) # :nodoc:
368     rv = {}
369     str.split('&'.freeze).each do |pair|
370       k, v = pair.split('='.freeze, 2).map! { |x| url_unescape(x) }
371       rv[k.freeze] = v
372     end
373     rv
374   end
376   # :stopdoc:
377   # TODO: see if we can use existing URL-escape/unescaping routines
378   # in the Ruby standard library, Perl MogileFS seems to NIH these
379   #  routines, too
380   # :startdoc:
382   # Turns a Hash (or Array of pairs) into a url params string.
383   def url_encode(params) # :nodoc:
384     params.map do |k,v|
385       "#{url_escape k.to_s}=#{url_escape v.to_s}"
386     end.join('&'.freeze)
387   end
389   # Escapes naughty URL characters.
390   if ''.respond_to?(:ord) # Ruby 1.9
391     def url_escape(str) # :nodoc:
392       str = str.gsub(/([^\w\,\-.\/\\\: ])/) { "%%%02x".freeze % $1.ord }
393       str.tr!(' '.freeze, '+'.freeze)
394       str
395     end
396   else # Ruby 1.8
397     def url_escape(str) # :nodoc:
398       str.gsub(/([^\w\,\-.\/\\\: ])/) { "%%%02x" % $1[0] }.tr(' ', '+')
399     end
400   end
402   # Unescapes naughty URL characters.
403   def url_unescape(str) # :nodoc:
404     str = str.tr('+'.freeze, ' '.freeze)
405     str.gsub!(/%([a-f0-9][a-f0-9])/i) { [$1.to_i(16)].pack('C'.freeze) }
406     str
407   end