backend: require CRLF on every line we parse
[ruby-mogilefs-client.git] / lib / mogilefs / backend.rb
blobdb13b4528d0a17003c663a95a2b90f7b8e4f06b9
1 # -*- encoding: binary -*-
2 require 'mogilefs'
3 require 'mogilefs/util'
4 require 'mogilefs/socket'
5 require 'thread'
7 ##
8 # MogileFS::Backend communicates with the MogileFS trackers.
10 class MogileFS::Backend
12   ##
13   # Adds MogileFS commands +names+.
15   def self.add_command(*names)
16     names.each do |name|
17       define_method name do |*args|
18         do_request name, args.first || {}
19       end
20     end
21   end
23   BACKEND_ERRORS = {}
25   # this converts an error code from a mogilefsd tracker to an exception:
26   #
27   # Examples of some exceptions that get created:
28   #   class AfterMismatchError < MogileFS::Error; end
29   #   class DomainNotFoundError < MogileFS::Error; end
30   #   class InvalidCharsError < MogileFS::Error; end
31   def self.add_error(err_snake)
32     err_camel = err_snake.gsub(/(?:^|_)([a-z])/) { $1.upcase }
33     err_camel << 'Error' unless /Error\z/ =~ err_camel
34     unless self.const_defined?(err_camel)
35       self.class_eval("class #{err_camel} < MogileFS::Error; end")
36     end
37     BACKEND_ERRORS[err_snake] = self.const_get(err_camel)
38   end
40   ##
41   # The last error
43   attr_reader :lasterr
45   ##
46   # The string attached to the last error
48   attr_reader :lasterrstr
50   ##
51   # Creates a new MogileFS::Backend.
52   #
53   # :hosts is a required argument and must be an Array containing one or more
54   # 'hostname:port' pairs as Strings.
55   #
56   # :timeout adjusts the request timeout before an error is returned.
58   def initialize(args)
59     @hosts = args[:hosts]
60     raise ArgumentError, "must specify at least one host" unless @hosts
61     raise ArgumentError, "must specify at least one host" if @hosts.empty?
62     unless @hosts == @hosts.select { |h| h =~ /:\d+$/ } then
63       raise ArgumentError, ":hosts must be in 'host:port' form"
64     end
66     @mutex = Mutex.new
67     @timeout = args[:timeout] || 3
68     @socket = nil
69     @lasterr = nil
70     @lasterrstr = nil
72     @dead = {}
73   end
75   ##
76   # Closes this backend's socket.
78   def shutdown
79     @mutex.synchronize { shutdown_unlocked }
80   end
82   # MogileFS::MogileFS commands
84   add_command :create_open
85   add_command :create_close
86   add_command :get_paths
87   add_command :delete
88   add_command :sleep
89   add_command :rename
90   add_command :list_keys
92   # MogileFS::Backend commands
94   add_command :get_hosts
95   add_command :get_devices
96   add_command :list_fids
97   add_command :stats
98   add_command :get_domains
99   add_command :create_domain
100   add_command :delete_domain
101   add_command :create_class
102   add_command :update_class
103   add_command :delete_class
104   add_command :create_host
105   add_command :update_host
106   add_command :delete_host
107   add_command :set_state
109   # Errors copied from MogileFS/Worker/Query.pm
110   add_error 'dup'
111   add_error 'after_mismatch'
112   add_error 'bad_params'
113   add_error 'class_exists'
114   add_error 'class_has_files'
115   add_error 'class_not_found'
116   add_error 'db'
117   add_error 'domain_has_files'
118   add_error 'domain_exists'
119   add_error 'domain_not_empty'
120   add_error 'domain_not_found'
121   add_error 'failure'
122   add_error 'host_exists'
123   add_error 'host_mismatch'
124   add_error 'host_not_empty'
125   add_error 'host_not_found'
126   add_error 'invalid_chars'
127   add_error 'invalid_checker_level'
128   add_error 'invalid_mindevcount'
129   add_error 'key_exists'
130   add_error 'no_class'
131   add_error 'no_devices'
132   add_error 'no_domain'
133   add_error 'no_host'
134   add_error 'no_ip'
135   add_error 'no_key'
136   add_error 'no_port'
137   add_error 'none_match'
138   add_error 'plugin_aborted'
139   add_error 'state_too_high'
140   add_error 'size_verify_error'
141   add_error 'unknown_command'
142   add_error 'unknown_host'
143   add_error 'unknown_key'
144   add_error 'unknown_state'
145   add_error 'unreg_domain'
147   private unless defined? $TESTING
149   def shutdown_unlocked # :nodoc:
150     if @socket
151       @socket.close rescue nil # ignore errors
152       @socket = nil
153     end
154   end
156   ##
157   # Performs the +cmd+ request with +args+.
159   def do_request(cmd, args)
160     response = nil
161     request = make_request cmd, args
162     @mutex.synchronize do
163       begin
164         io = socket
165         begin
166           io.timed_write(request, @timeout)
167         rescue SystemCallError
168           @dead[@active_host] = Time.now
169           shutdown_unlocked
170           io = socket
171           retry
172         end
174         response = io.timed_gets(@timeout) and return parse_response(response)
175       ensure
176         # we DO NOT want the response we timed out waiting for, to crop up later
177         # on, on the same socket, intersperesed with a subsequent request!
178         # we close the socket if it times out like this
179         response or shutdown_unlocked
180       end
181     end # @mutex.synchronize
182   end
184   ##
185   # Makes a new request string for +cmd+ and +args+.
187   def make_request(cmd, args)
188     "#{cmd} #{url_encode args}\r\n"
189   end
191   # this converts an error code from a mogilefsd tracker to an exception
192   # Most of these exceptions should already be defined, but since the
193   # MogileFS server code is liable to change and we may not always be
194   # able to keep up with the changes
195   def error(err_snake)
196     BACKEND_ERRORS[err_snake] || self.class.add_error(err_snake)
197   end
199   ##
200   # Turns the +line+ response from the server into a Hash of options, an
201   # error, or raises, as appropriate.
203   def parse_response(line)
204     if line =~ /^ERR\s+(\w+)\s*([^\r\n]*)/
205       @lasterr = $1
206       @lasterrstr = $2 ? url_unescape($2) : nil
207       raise error(@lasterr), @lasterrstr
208     end
210     return url_decode($1) if line =~ /^OK\s+\d*\s*(\S*)\r\n\z/
212     raise MogileFS::InvalidResponseError,
213           "Invalid response from server: #{line.inspect}"
214   end
216   ##
217   # Returns a socket connected to a MogileFS tracker.
219   def socket
220     return @socket if @socket and not @socket.closed?
222     now = Time.now
224     @hosts.shuffle.each do |host|
225       next if @dead.include? host and @dead[host] > now - 5
227       begin
228         addr, port = host.split(/:/)
229         @socket = MogileFS::Socket.tcp(addr, port, @timeout)
230         @active_host = host
231       rescue SystemCallError, MogileFS::Timeout
232         @dead[host] = now
233         next
234       end
236       return @socket
237     end
239     raise MogileFS::UnreachableBackendError
240   end
242   ##
243   # Turns a url params string into a Hash.
245   def url_decode(str)
246     Hash[*(str.split(/&/).map { |pair|
247       pair.split(/=/, 2).map { |x| url_unescape(x) }
248     } ).flatten]
249   end
251   ##
252   # Turns a Hash (or Array of pairs) into a url params string.
254   def url_encode(params)
255     params.map do |k,v|
256       "#{url_escape k.to_s}=#{url_escape v.to_s}"
257     end.join("&")
258   end
260   ##
261   # Escapes naughty URL characters.
262   if ''.respond_to?(:ord) # Ruby 1.9
263     def url_escape(str)
264       str.gsub(/([^\w\,\-.\/\\\: ])/) { "%%%02x" % $1.ord }.tr(' ', '+')
265     end
266   else # Ruby 1.8
267     def url_escape(str)
268       str.gsub(/([^\w\,\-.\/\\\: ])/) { "%%%02x" % $1[0] }.tr(' ', '+')
269     end
270   end
272   ##
273   # Unescapes naughty URL characters.
275   def url_unescape(str)
276     str.gsub(/%([a-f0-9][a-f0-9])/i) { [$1.to_i(16)].pack 'C' }.tr('+', ' ')
277   end