2 require 'mogilefs/backend' # for the exceptions
4 # read-only interface that looks like MogileFS::MogileFS This provides
5 # direct, read-only access to any slave MySQL database to provide better
6 # performance and eliminate extra points of failure
12 attr_reader :query_method
15 # Creates a new MogileFS::Mysql instance. +args+ must include a key
16 # :domain specifying the domain of this client and :mysql, specifying
17 # an already-initialized Mysql object.
19 # The Mysql object can be either the standard Mysql driver or the
20 # Mysqlplus one supporting c_async_query.
21 def initialize(args = {})
22 @domain, @my = args[:domain], args[:mysql]
23 @query_method = @my.respond_to?(:c_async_query) ? :c_async_query : :query
24 @last_update_device = @last_update_domain = Time.at(0)
25 @cache_domain = @cache_device = nil
29 # Lists keys starting with +prefix+ follwing +after+ up to +limit+. If
30 # +after+ is nil the list starts at the beginning.
31 def list_keys(prefix, after = '', limit = 1000, &block)
32 # this code is based on server/lib/MogileFS/Worker/Query.pm
33 dmid = refresh_domain[@domain] or \
34 raise MogileFS::Backend::DomainNotFoundError
36 # don't modify passed arguments
39 limit = 1000 if limit > 1000 || limit <= 0
43 if after.length > 0 && /^#{Regexp.quote(prefix)}/ !~ after
44 raise MogileFS::Backend::AfterMismatchError
47 raise MogileFS::Backend::InvalidCharsError if /[%\\]/ =~ prefix
48 prefix.gsub!(/_/, '\_') # not sure why MogileFS::Worker::Query does this...
51 SELECT dkey,length,devcount FROM file
53 AND dkey LIKE '#{@my.quote(prefix)}%'
54 AND dkey > '#{@my.quote(after)}'
55 ORDER BY dkey LIMIT #{limit}
59 query(sql).each do |dkey,length,devcount|
60 yield(dkey, length, devcount) if block_given?
63 return [ keys, keys.last || '']
67 # Returns the size of +key+.
69 dmid = refresh_domain[@domain] or \
70 raise MogileFS::Backend::DomainNotFoundError
73 SELECT length FROM file
74 WHERE dmid = #{dmid} AND dkey = '#{@my.quote(key)}'
78 res = query(sql).fetch_row
79 return res[0].to_i if res && res[0]
80 raise MogileFS::Backend::UnknownKeyError
84 # Get the paths for +key+.
85 def get_paths(key, noverify = true, zone = nil)
86 dmid = refresh_domain[@domain] or \
87 raise MogileFS::Backend::DomainNotFoundError
88 devices = refresh_device or raise MogileFS::Backend::NoDevicesError
92 WHERE dmid = #{dmid} AND dkey = '#{@my.quote(key)}'
96 res = query(sql).fetch_row
97 res && res[0] or raise MogileFS::Backend::UnknownKeyError
99 sql = "SELECT devid FROM file_on WHERE fid = '#{@my.quote(fid)}'"
100 query(sql).each do |devid,|
101 devinfo = devices[devid.to_i]
102 port = devinfo[:http_get_port] || devinfo[:http_port] || 80
103 host = zone && zone == 'alt' ? devinfo[:altip] : devinfo[:hostip]
105 b, mmm, ttt = /(\d)(\d{3})(\d{3})(?:\d{3})/.match(nfid)[1..3]
106 uri = "/dev#{devid}/#{b}/#{mmm}/#{ttt}/#{nfid}.fid"
107 urls << "http://#{host}:#{port}#{uri}"
114 unless defined? GET_DEVICES
116 SELECT d.devid, h.hostip, h.altip, h.http_port, h.http_get_port
118 LEFT JOIN host h ON d.hostid = h.hostid
119 WHERE d.status IN ('alive','readonly','drain');
125 @my.send(@query_method, sql)
128 def refresh_device(force = false)
129 return @cache_device if ! force && ((Time.now - @last_update_device) < 60)
131 res = query(GET_DEVICES)
132 res.each do |devid, hostip, altip, http_port, http_get_port|
134 :hostip => hostip.freeze,
135 :altip => altip.freeze,
136 :http_port => http_port ? http_port.to_i : nil,
137 :http_get_port => http_get_port ? http_get_port.to_i : nil,
140 @last_update_device = Time.now
141 @cache_device = tmp.freeze
144 def refresh_domain(force = false)
145 return @cache_domain if ! force && ((Time.now - @last_update_domain) < 5)
147 res = query('SELECT dmid,namespace FROM domain')
148 res.each { |dmid,namespace| tmp[namespace] = dmid.to_i }
149 @last_update_domain = Time.now
150 @cache_domain = tmp.freeze