MogileFS::Mysql: more flexible Mysql driver handling
[ruby-mogilefs-client.git] / lib / mogilefs / mysql.rb
blob95ce32b5c02e9c29dafd8b66e955c48998f5ad5c
1 require 'mogilefs'
2 require 'mogilefs/backend' # for the exceptions
4 # read-only interface that looks like MogileFS::MogileFS This provides
5 # direct, read-only access to any slave MySQL database to provide better
6 # performance and eliminate extra points of failure
8 class MogileFS::Mysql
10   attr_accessor :domain
11   attr_reader :my
12   attr_reader :query_method
14   ##
15   # Creates a new MogileFS::Mysql instance.  +args+ must include a key
16   # :domain specifying the domain of this client and :mysql, specifying
17   # an already-initialized Mysql object.
18   #
19   # The Mysql object can be either the standard Mysql driver or the
20   # Mysqlplus one supporting c_async_query.
21   def initialize(args = {})
22     @domain, @my = args[:domain], args[:mysql]
23     @query_method = @my.respond_to?(:c_async_query) ? :c_async_query : :query
24     @last_update_device = @last_update_domain = Time.at(0)
25     @cache_domain = @cache_device = nil
26   end
28   ##
29   # Lists keys starting with +prefix+ follwing +after+ up to +limit+.  If
30   # +after+ is nil the list starts at the beginning.
31   def list_keys(prefix, after = '', limit = 1000, &block)
32     # this code is based on server/lib/MogileFS/Worker/Query.pm
33     dmid = refresh_domain[@domain] or \
34       raise MogileFS::Backend::DomainNotFoundError
36     # don't modify passed arguments
37     limit ||= 1000
38     limit = limit.to_i
39     limit = 1000 if limit > 1000 || limit <= 0
40     after = "#{after}"
41     prefix = "#{prefix}"
43     if after.length > 0 && /^#{Regexp.quote(prefix)}/ !~ after
44       raise MogileFS::Backend::AfterMismatchError
45     end
47     raise MogileFS::Backend::InvalidCharsError if /[%\\]/ =~ prefix
48     prefix.gsub!(/_/, '\_') # not sure why MogileFS::Worker::Query does this...
50     sql = <<-EOS
51     SELECT dkey,length,devcount FROM file
52     WHERE dmid = #{dmid}
53       AND dkey LIKE '#{@my.quote(prefix)}%'
54       AND dkey > '#{@my.quote(after)}'
55     ORDER BY dkey LIMIT #{limit}
56     EOS
58     keys = []
59     query(sql).each do |dkey,length,devcount|
60       yield(dkey, length, devcount) if block_given?
61       keys << dkey
62     end
63     return [ keys, keys.last || '']
64   end
66   ##
67   # Returns the size of +key+.
68   def size(key)
69     dmid = refresh_domain[@domain] or \
70       raise MogileFS::Backend::DomainNotFoundError
72     sql = <<-EOS
73     SELECT length FROM file
74     WHERE dmid = #{dmid} AND dkey = '#{@my.quote(key)}'
75     LIMIT 1
76     EOS
78     res = query(sql).fetch_row
79     return res[0].to_i if res && res[0]
80     raise MogileFS::Backend::UnknownKeyError
81   end
83   ##
84   # Get the paths for +key+.
85   def get_paths(key, noverify = true, zone = nil)
86     dmid = refresh_domain[@domain] or \
87       raise MogileFS::Backend::DomainNotFoundError
88     devices = refresh_device or raise MogileFS::Backend::NoDevicesError
89     urls = []
90     sql = <<-EOS
91     SELECT fid FROM file
92     WHERE dmid = #{dmid} AND dkey = '#{@my.quote(key)}'
93     LIMIT 1
94     EOS
96     res = query(sql).fetch_row
97     res && res[0] or raise MogileFS::Backend::UnknownKeyError
98     fid = res[0]
99     sql = "SELECT devid FROM file_on WHERE fid = '#{@my.quote(fid)}'"
100     query(sql).each do |devid,|
101       devinfo = devices[devid.to_i]
102       port = devinfo[:http_get_port] || devinfo[:http_port] || 80
103       host = zone && zone == 'alt' ? devinfo[:altip] : devinfo[:hostip]
104       nfid = '%010u' % fid
105       b, mmm, ttt = /(\d)(\d{3})(\d{3})(?:\d{3})/.match(nfid)[1..3]
106       uri = "/dev#{devid}/#{b}/#{mmm}/#{ttt}/#{nfid}.fid"
107       urls << "http://#{host}:#{port}#{uri}"
108     end
109     urls
110   end
112   private
114     unless defined? GET_DEVICES
115       GET_DEVICES = <<-EOS
116         SELECT d.devid, h.hostip, h.altip, h.http_port, h.http_get_port
117         FROM device d
118           LEFT JOIN host h ON d.hostid = h.hostid
119         WHERE d.status IN ('alive','readonly','drain');
120       EOS
121       GET_DEVICES.freeze
122     end
124     def query(sql)
125       @my.send(@query_method, sql)
126     end
128     def refresh_device(force = false)
129       return @cache_device if ! force && ((Time.now - @last_update_device) < 60)
130       tmp = {}
131       res = query(GET_DEVICES)
132       res.each do |devid, hostip, altip, http_port, http_get_port|
133         tmp[devid.to_i] = {
134           :hostip => hostip.freeze,
135           :altip => altip.freeze,
136           :http_port => http_port ? http_port.to_i : nil,
137           :http_get_port => http_get_port ? http_get_port.to_i : nil,
138         }.freeze
139       end
140       @last_update_device = Time.now
141       @cache_device = tmp.freeze
142     end
144     def refresh_domain(force = false)
145       return @cache_domain if ! force && ((Time.now - @last_update_domain) < 5)
146       tmp = {}
147       res = query('SELECT dmid,namespace FROM domain')
148       res.each { |dmid,namespace| tmp[namespace] = dmid.to_i }
149       @last_update_domain = Time.now
150       @cache_domain = tmp.freeze
151     end