Apply error handling patch from Matthew Willson
[ruby-mogilefs-client.git] / lib / mogilefs / backend.rb
blob88e4f2f466304bf7473940dbe047956389eb0445
1 require 'socket'
2 require 'thread'
3 require 'mogilefs'
5 ##
6 # MogileFS::Backend communicates with the MogileFS trackers.
8 class MogileFS::Backend
10   ##
11   # Adds MogileFS commands +names+.
13   def self.add_command(*names)
14     names.each do |name|
15       define_method name do |*args|
16         do_request name, args.first || {}
17       end
18     end
19   end
21   ##
22   # The last error
23   #--
24   # TODO Use Exceptions
26   attr_reader :lasterr
28   ##
29   # The string attached to the last error
30   #--
31   # TODO Use Exceptions
33   attr_reader :lasterrstr
35   ##
36   # Creates a new MogileFS::Backend.
37   #
38   # :hosts is a required argument and must be an Array containing one or more
39   # 'hostname:port' pairs as Strings.
40   #
41   # :timeout adjusts the request timeout before an error is returned.
43   def initialize(args)
44     @hosts = args[:hosts]
45     raise ArgumentError, "must specify at least one host" unless @hosts
46     raise ArgumentError, "must specify at least one host" if @hosts.empty?
47     unless @hosts == @hosts.select { |h| h =~ /:\d+$/ } then
48       raise ArgumentError, ":hosts must be in 'host:port' form"
49     end
51     @mutex = Mutex.new
52     @timeout = args[:timeout] || 3
53     @socket = nil
54     @lasterr = nil
55     @lasterrstr = nil
57     @dead = {}
58   end
60   ##
61   # Closes this backend's socket.
63   def shutdown
64     @socket.close unless @socket.nil? or @socket.closed?
65     @socket = nil
66   end
68   # MogileFS::MogileFS commands
70   add_command :create_open
71   add_command :create_close
72   add_command :get_paths
73   add_command :delete
74   add_command :sleep
75   add_command :rename
76   add_command :list_keys
78   # MogileFS::Backend commands
79   
80   add_command :get_hosts
81   add_command :get_devices
82   add_command :list_fids
83   add_command :stats
84   add_command :get_domains
85   add_command :create_domain
86   add_command :delete_domain
87   add_command :create_class
88   add_command :update_class
89   add_command :delete_class
90   add_command :create_host
91   add_command :update_host
92   add_command :delete_host
93   add_command :set_state
95   private unless defined? $TESTING
97   ##
98   # Returns a new TCPSocket connected to +port+ on +host+.
100   def connect_to(host, port)
101     return TCPSocket.new(host, port)
102   end
104   ##
105   # Performs the +cmd+ request with +args+.
107   def do_request(cmd, args)
108     @mutex.synchronize do
109       request = make_request cmd, args
111       begin
112         bytes_sent = socket.send request, 0
113       rescue SystemCallError
114         @socket = nil
115         raise "couldn't connect to mogilefsd backend"
116       end
118       unless bytes_sent == request.length then
119         raise "request truncated (sent #{bytes_sent} expected #{request.length})"
120       end
122       readable?
124       return parse_response(socket.gets)
125     end
126   end
128   ##
129   # Makes a new request string for +cmd+ and +args+.
131   def make_request(cmd, args)
132     return "#{cmd} #{url_encode args}\r\n"
133   end
135   ##
136   # Turns the +line+ response from the server into a Hash of options, an
137   # error, or raises, as appropriate.
139   def parse_response(line)
140     if line =~ /^ERR\s+(\w+)\s*(.*)/ then
141       @lasterr = $1
142       @lasterrstr = $2 ? url_unescape($2) : nil
143       return nil
144     end
146     return url_decode($1) if line =~ /^OK\s+\d*\s*(\S*)/
148     raise "Invalid response from server: #{line.inspect}"
149   end
151   ##
152   # Raises if the socket does not become readable in +@timeout+ seconds.
154   def readable?
155     found = select [socket], nil, nil, @timeout
156     if found.nil? or found.empty? then
157       peer = (@socket ? "#{@socket.peeraddr[3]}:#{@socket.peeraddr[1]} " : nil)
158       socket.close  # we DO NOT want the response we timed out waiting for, to crop up later on, on the same socket, intersperesed with a subsequent request! so, we close the socket if it times out like this
159       raise MogileFS::UnreadableSocketError, "#{peer}never became readable"
160     end
161     return true
162   end
164   ##
165   # Returns a socket connected to a MogileFS tracker.
167   def socket
168     return @socket if @socket and not @socket.closed?
170     now = Time.now
172     @hosts.sort_by { rand(3) - 1 }.each do |host|
173       next if @dead.include? host and @dead[host] > now - 5
175       begin
176         @socket = connect_to(*host.split(':'))
177       rescue SystemCallError
178         @dead[host] = now
179         next
180       end
182       return @socket
183     end
185     raise "couldn't connect to mogilefsd backend"
186   end
188   ##
189   # Turns a url params string into a Hash.
191   def url_decode(str)
192     pairs = str.split('&').map do |pair|
193       pair.split('=', 2).map { |v| url_unescape v }
194     end
196     return Hash[*pairs.flatten]
197   end
199   ##
200   # Turns a Hash (or Array of pairs) into a url params string.
202   def url_encode(params)
203     return params.map do |k,v|
204       "#{url_escape k.to_s}=#{url_escape v.to_s}"
205     end.join("&")
206   end
208   ##
209   # Escapes naughty URL characters.
211   def url_escape(str)
212     return str.gsub(/([^\w\,\-.\/\\\: ])/) { "%%%02x" % $1[0] }.tr(' ', '+')
213   end
215   ##
216   # Unescapes naughty URL characters.
218   def url_unescape(str)
219     return str.gsub(/%([a-f0-9][a-f0-9])/i) { [$1.to_i(16)].pack 'C' }.tr('+', ' ')
220   end