Branching mogilefs-client to version 1.2.1
[ruby-mogilefs-client.git] / lib / mogilefs / backend.rb
blob04708e95cea9973b2bf42bf6c14f72f7ca4f91f7
1 require 'socket'
2 require 'thread'
3 require 'mogilefs'
5 ##
6 # MogileFS::Backend communicates with the MogileFS trackers.
8 class MogileFS::Backend
10   ##
11   # Adds MogileFS commands +names+.
13   def self.add_command(*names)
14     names.each do |name|
15       define_method name do |*args|
16         do_request name, args.first || {}
17       end
18     end
19   end
21   ##
22   # The last error
23   #--
24   # TODO Use Exceptions
26   attr_reader :lasterr
28   ##
29   # The string attached to the last error
30   #--
31   # TODO Use Exceptions
33   attr_reader :lasterrstr
35   ##
36   # Creates a new MogileFS::Backend.
37   #
38   # :hosts is a required argument and must be an Array containing one or more
39   # 'hostname:port' pairs as Strings.
40   #
41   # :timeout adjusts the request timeout before an error is returned.
43   def initialize(args)
44     @hosts = args[:hosts]
45     raise ArgumentError, "must specify at least one host" unless @hosts
46     raise ArgumentError, "must specify at least one host" if @hosts.empty?
47     unless @hosts == @hosts.select { |h| h =~ /:\d+$/ } then
48       raise ArgumentError, ":hosts must be in 'host:port' form"
49     end
51     @mutex = Mutex.new
52     @timeout = args[:timeout] || 3
53     @socket = nil
54     @lasterr = nil
55     @lasterrstr = nil
57     @dead = {}
58   end
60   ##
61   # Closes this backend's socket.
63   def shutdown
64     @socket.close unless @socket.nil? or @socket.closed?
65     @socket = nil
66   end
68   # MogileFS::MogileFS commands
70   add_command :create_open
71   add_command :create_close
72   add_command :get_paths
73   add_command :delete
74   add_command :sleep
75   add_command :rename
76   add_command :list_keys
78   # MogileFS::Backend commands
79   
80   add_command :get_hosts
81   add_command :get_devices
82   add_command :list_fids
83   add_command :stats
84   add_command :get_domains
85   add_command :create_domain
86   add_command :delete_domain
87   add_command :create_class
88   add_command :update_class
89   add_command :delete_class
90   add_command :create_host
91   add_command :update_host
92   add_command :delete_host
93   add_command :set_state
95   private unless defined? $TESTING
97   ##
98   # Returns a new TCPSocket connected to +port+ on +host+.
100   def connect_to(host, port)
101     return TCPSocket.new(host, port)
102   end
104   ##
105   # Performs the +cmd+ request with +args+.
107   def do_request(cmd, args)
108     @mutex.synchronize do
109       request = make_request cmd, args
111       begin
112         bytes_sent = socket.send request, 0
113       rescue SystemCallError
114         @socket = nil
115         raise "couldn't connect to mogilefsd backend"
116       end
118       unless bytes_sent == request.length then
119         raise "request truncated (sent #{bytes_sent} expected #{request.length})"
120       end
122       readable?
124       return parse_response(socket.gets)
125     end
126   end
128   ##
129   # Makes a new request string for +cmd+ and +args+.
131   def make_request(cmd, args)
132     return "#{cmd} #{url_encode args}\r\n"
133   end
135   ##
136   # Turns the +line+ response from the server into a Hash of options, an
137   # error, or raises, as appropriate.
139   def parse_response(line)
140     if line =~ /^ERR\s+(\w+)\s*(.*)/ then
141       @lasterr = $1
142       @lasterrstr = $2 ? url_unescape($2) : nil
143       return nil
144     end
146     return url_decode($1) if line =~ /^OK\s+\d*\s*(\S*)/
148     raise "Invalid response from server: #{line.inspect}"
149   end
151   ##
152   # Raises if the socket does not become readable in +@timeout+ seconds.
154   def readable?
155     found = select [socket], nil, nil, @timeout
156     if found.nil? or found.empty? then
157       peer = (@socket ? "#{@socket.peeraddr[3]}:#{@socket.peeraddr[1]} " : nil)
158       raise MogileFS::UnreadableSocketError, "#{peer}never became readable"
159     end
160     return true
161   end
163   ##
164   # Returns a socket connected to a MogileFS tracker.
166   def socket
167     return @socket if @socket and not @socket.closed?
169     now = Time.now
171     @hosts.sort_by { rand(3) - 1 }.each do |host|
172       next if @dead.include? host and @dead[host] > now - 5
174       begin
175         @socket = connect_to(*host.split(':'))
176       rescue SystemCallError
177         @dead[host] = now
178         next
179       end
181       return @socket
182     end
184     raise "couldn't connect to mogilefsd backend"
185   end
187   ##
188   # Turns a url params string into a Hash.
190   def url_decode(str)
191     pairs = str.split('&').map do |pair|
192       pair.split('=', 2).map { |v| url_unescape v }
193     end
195     return Hash[*pairs.flatten]
196   end
198   ##
199   # Turns a Hash (or Array of pairs) into a url params string.
201   def url_encode(params)
202     return params.map do |k,v|
203       "#{url_escape k.to_s}=#{url_escape v.to_s}"
204     end.join("&")
205   end
207   ##
208   # Escapes naughty URL characters.
210   def url_escape(str)
211     return str.gsub(/([^\w\,\-.\/\\\: ])/) { "%%%02x" % $1[0] }.tr(' ', '+')
212   end
214   ##
215   # Unescapes naughty URL characters.
217   def url_unescape(str)
218     return str.gsub(/%([a-f0-9][a-f0-9])/i) { [$1.to_i(16)].pack 'C' }.tr('+', ' ')
219   end