Make sure select(2) calls are IO.select
[ruby-mogilefs-client.git] / lib / mogilefs / backend.rb
blob9c6af1c978069f19f37c5d7a7c600009917dfbc3
1 require 'mogilefs'
2 require 'mogilefs/util'
3 require 'thread'
5 ##
6 # MogileFS::Backend communicates with the MogileFS trackers.
8 class MogileFS::Backend
10   ##
11   # Adds MogileFS commands +names+.
13   def self.add_command(*names)
14     names.each do |name|
15       define_method name do |*args|
16         do_request name, args.first || {}
17       end
18     end
19   end
21   BACKEND_ERRORS = {}
23   # this converts an error code from a mogilefsd tracker to an exception:
24   #
25   # Examples of some exceptions that get created:
26   #   class AfterMismatchError < MogileFS::Error; end
27   #   class DomainNotFoundError < MogileFS::Error; end
28   #   class InvalidCharsError < MogileFS::Error; end
29   def self.add_error(err_snake)
30     err_camel = err_snake.gsub(/(?:^|_)([a-z])/) { $1.upcase } << 'Error'
31     unless self.const_defined?(err_camel)
32       self.class_eval("class #{err_camel} < MogileFS::Error; end")
33     end
34     BACKEND_ERRORS[err_snake] = self.const_get(err_camel)
35   end
37   ##
38   # The last error
40   attr_reader :lasterr
42   ##
43   # The string attached to the last error
45   attr_reader :lasterrstr
47   ##
48   # Creates a new MogileFS::Backend.
49   #
50   # :hosts is a required argument and must be an Array containing one or more
51   # 'hostname:port' pairs as Strings.
52   #
53   # :timeout adjusts the request timeout before an error is returned.
55   def initialize(args)
56     @hosts = args[:hosts]
57     raise ArgumentError, "must specify at least one host" unless @hosts
58     raise ArgumentError, "must specify at least one host" if @hosts.empty?
59     unless @hosts == @hosts.select { |h| h =~ /:\d+$/ } then
60       raise ArgumentError, ":hosts must be in 'host:port' form"
61     end
63     @mutex = Mutex.new
64     @timeout = args[:timeout] || 3
65     @socket = nil
66     @lasterr = nil
67     @lasterrstr = nil
69     @dead = {}
70   end
72   ##
73   # Closes this backend's socket.
75   def shutdown
76     if @socket
77       @socket.close rescue nil # ignore errors
78       @socket = nil
79     end
80   end
82   # MogileFS::MogileFS commands
84   add_command :create_open
85   add_command :create_close
86   add_command :get_paths
87   add_command :delete
88   add_command :sleep
89   add_command :rename
90   add_command :list_keys
92   # MogileFS::Backend commands
93   
94   add_command :get_hosts
95   add_command :get_devices
96   add_command :list_fids
97   add_command :stats
98   add_command :get_domains
99   add_command :create_domain
100   add_command :delete_domain
101   add_command :create_class
102   add_command :update_class
103   add_command :delete_class
104   add_command :create_host
105   add_command :update_host
106   add_command :delete_host
107   add_command :set_state
109   # Errors copied from MogileFS/Worker/Query.pm
110   add_error 'dup'
111   add_error 'after_mismatch'
112   add_error 'bad_params'
113   add_error 'class_exists'
114   add_error 'class_has_files'
115   add_error 'class_not_found'
116   add_error 'db'
117   add_error 'domain_has_files'
118   add_error 'domain_exists'
119   add_error 'domain_not_empty'
120   add_error 'domain_not_found'
121   add_error 'failure'
122   add_error 'host_exists'
123   add_error 'host_mismatch'
124   add_error 'host_not_empty'
125   add_error 'host_not_found'
126   add_error 'invalid_chars'
127   add_error 'invalid_checker_level'
128   add_error 'invalid_mindevcount'
129   add_error 'key_exists'
130   add_error 'no_class'
131   add_error 'no_devices'
132   add_error 'no_domain'
133   add_error 'no_host'
134   add_error 'no_ip'
135   add_error 'no_key'
136   add_error 'no_port'
137   add_error 'none_match'
138   add_error 'plugin_aborted'
139   add_error 'state_too_high'
140   add_error 'unknown_command'
141   add_error 'unknown_host'
142   add_error 'unknown_key'
143   add_error 'unknown_state'
144   add_error 'unreg_domain'
146   private unless defined? $TESTING
148   ##
149   # Returns a new Socket (TCP) connected to +port+ on +host+.
151   def connect_to(host, port)
152     Socket.mogilefs_new(host, port, @timeout)
153   end
155   ##
156   # Performs the +cmd+ request with +args+.
158   def do_request(cmd, args)
159     @mutex.synchronize do
160       request = make_request cmd, args
162       begin
163         bytes_sent = socket.send request, 0
164       rescue SystemCallError
165         shutdown
166         raise MogileFS::UnreachableBackendError
167       end
169       unless bytes_sent == request.length then
170         raise MogileFS::RequestTruncatedError,
171           "request truncated (sent #{bytes_sent} expected #{request.length})"
172       end
174       readable?
176       return parse_response(socket.gets)
177     end
178   end
180   ##
181   # Makes a new request string for +cmd+ and +args+.
183   def make_request(cmd, args)
184     return "#{cmd} #{url_encode args}\r\n"
185   end
187   # this converts an error code from a mogilefsd tracker to an exception
188   # Most of these exceptions should already be defined, but since the
189   # MogileFS server code is liable to change and we may not always be
190   # able to keep up with the changes
191   def error(err_snake)
192     BACKEND_ERRORS[err_snake] || self.class.add_error(err_snake)
193   end
195   ##
196   # Turns the +line+ response from the server into a Hash of options, an
197   # error, or raises, as appropriate.
199   def parse_response(line)
200     if line =~ /^ERR\s+(\w+)\s*(.*)/ then
201       @lasterr = $1
202       @lasterrstr = $2 ? url_unescape($2) : nil
203       raise error(@lasterr)
204       return nil
205     end
207     return url_decode($1) if line =~ /^OK\s+\d*\s*(\S*)/
209     raise MogileFS::InvalidResponseError,
210           "Invalid response from server: #{line.inspect}"
211   end
213   ##
214   # Raises if the socket does not become readable in +@timeout+ seconds.
216   def readable?
217     timeleft = @timeout
218     peer = nil
219     loop do
220       t0 = Time.now
221       found = IO.select([socket], nil, nil, timeleft)
222       return true if found && found[0]
223       timeleft -= (Time.now - t0)
225       if timeleft < 0
226         peer = @socket ? "#{@socket.mogilefs_peername} " : nil
228         # we DO NOT want the response we timed out waiting for, to crop up later
229         # on, on the same socket, intersperesed with a subsequent request! so,
230         # we close the socket if it times out like this
231         shutdown
232         raise MogileFS::UnreadableSocketError, "#{peer}never became readable"
233         break
234       end
235       shutdown
236     end
237     false
238   end
240   ##
241   # Returns a socket connected to a MogileFS tracker.
243   def socket
244     return @socket if @socket and not @socket.closed?
246     now = Time.now
248     @hosts.sort_by { rand(3) - 1 }.each do |host|
249       next if @dead.include? host and @dead[host] > now - 5
251       begin
252         @socket = connect_to(*host.split(':'))
253       rescue SystemCallError, MogileFS::Timeout
254         @dead[host] = now
255         next
256       end
258       return @socket
259     end
261     raise MogileFS::UnreachableBackendError
262   end
264   ##
265   # Turns a url params string into a Hash.
267   def url_decode(str)
268     pairs = str.split('&').map do |pair|
269       pair.split('=', 2).map { |v| url_unescape v }
270     end
272     return Hash[*pairs.flatten]
273   end
275   ##
276   # Turns a Hash (or Array of pairs) into a url params string.
278   def url_encode(params)
279     return params.map do |k,v|
280       "#{url_escape k.to_s}=#{url_escape v.to_s}"
281     end.join("&")
282   end
284   ##
285   # Escapes naughty URL characters.
287   def url_escape(str)
288     return str.gsub(/([^\w\,\-.\/\\\: ])/) { "%%%02x" % $1[0] }.tr(' ', '+')
289   end
291   ##
292   # Unescapes naughty URL characters.
294   def url_unescape(str)
295     return str.gsub(/%([a-f0-9][a-f0-9])/i) { [$1.to_i(16)].pack 'C' }.tr('+', ' ')
296   end