1 # -*- encoding: binary -*-
3 require 'mogilefs/backend' # for the exceptions
5 # read-only interface that can be a backend for MogileFS::MogileFS
7 # This provides direct, read-only access to any slave MySQL database to
8 # provide better performance, scalability and eliminate mogilefsd as a
13 attr_reader :query_method
16 # Creates a new MogileFS::Mysql instance. +args+ must include a key
17 # :domain specifying the domain of this client and :mysql, specifying
18 # an already-initialized Mysql object.
20 # The Mysql object can be either the standard Mysql driver or the
21 # Mysqlplus one supporting c_async_query.
22 def initialize(args = {})
24 @query_method = @my.respond_to?(:c_async_query) ? :c_async_query : :query
25 @last_update_device = @last_update_domain = Time.at(0)
26 @cache_domain = @cache_device = nil
30 # Lists keys starting with +prefix+ follwing +after+ up to +limit+. If
31 # +after+ is nil the list starts at the beginning.
32 def _list_keys(domain, prefix = '', after = '', limit = 1000, &block)
33 # this code is based on server/lib/MogileFS/Worker/Query.pm
34 dmid = get_dmid(domain)
36 # don't modify passed arguments
39 limit = 1000 if limit > 1000 || limit <= 0
40 after, prefix = "#{after}", "#{prefix}"
42 if after.length > 0 && /^#{Regexp.quote(prefix)}/ !~ after
43 raise MogileFS::Backend::AfterMismatchError
46 raise MogileFS::Backend::InvalidCharsError if /[%\\]/ =~ prefix
47 prefix.gsub!(/_/, '\_') # not sure why MogileFS::Worker::Query does this...
50 SELECT dkey,length,devcount FROM file
52 AND dkey LIKE '#{@my.quote(prefix)}%'
53 AND dkey > '#{@my.quote(after)}'
54 ORDER BY dkey LIMIT #{limit}
58 query(sql).each do |dkey,length,devcount|
59 yield(dkey, length.to_i, devcount.to_i) if block_given?
63 keys.empty? ? nil : [ keys, (keys.last || '') ]
67 # Returns the size of +key+.
68 def _size(domain, key)
69 dmid = get_dmid(domain)
72 SELECT length FROM file
73 WHERE dmid = #{dmid} AND dkey = '#{@my.quote(key)}'
77 res = query(sql).fetch_row
78 return res[0].to_i if res && res[0]
79 raise MogileFS::Backend::UnknownKeyError
83 # Get the paths for +key+.
84 def _get_paths(params = {})
86 noverify = (params[:noverify] == 1) # TODO this is unused atm
87 dmid = get_dmid(params[:domain])
88 devices = refresh_device or raise MogileFS::Backend::NoDevicesError
92 WHERE dmid = #{dmid} AND dkey = '#{@my.quote(params[:key])}'
96 res = query(sql).fetch_row
97 res && res[0] or raise MogileFS::Backend::UnknownKeyError
99 sql = "SELECT devid FROM file_on WHERE fid = '#{@my.quote(fid)}'"
100 query(sql).each do |devid,|
101 unless devinfo = devices[devid.to_i]
102 devices = refresh_device(true)
103 devinfo = devices[devid.to_i] or next
105 devinfo[:readable] or next
106 port = devinfo[:http_get_port]
107 host = zone && zone == 'alt' ? devinfo[:altip] : devinfo[:hostip]
109 b, mmm, ttt = /(\d)(\d{3})(\d{3})(?:\d{3})/.match(nfid)[1..3]
110 uri = "/dev#{devid}/#{b}/#{mmm}/#{ttt}/#{nfid}.fid"
111 urls << "http://#{host}:#{port}#{uri}"
116 def sleep(params); Kernel.sleep(params[:duration] || 10); {}; end
120 unless defined? GET_DEVICES
121 GET_DOMAINS = 'SELECT dmid,namespace FROM domain'.freeze
124 SELECT d.devid, h.hostip, h.altip, h.http_port, h.http_get_port,
127 LEFT JOIN host h ON d.hostid = h.hostid
133 @my.send(@query_method, sql)
136 DEV_STATUS_READABLE = {
142 def refresh_device(force = false)
143 return @cache_device if ! force && ((Time.now - @last_update_device) < 60)
145 res = query(GET_DEVICES)
146 res.each do |devid, hostip, altip, http_port, http_get_port,
147 dev_status, host_status|
148 http_port = http_port ? http_port.to_i : 80
150 :hostip => hostip.freeze,
151 :altip => (altip || hostip).freeze,
152 :readable => (host_status == "alive" &&
153 DEV_STATUS_READABLE.include?(dev_status)),
154 :http_port => http_port,
155 :http_get_port => http_get_port ? http_get_port.to_i : http_port,
158 @last_update_device = Time.now
159 @cache_device = tmp.freeze
162 def refresh_domain(force = false)
163 return @cache_domain if ! force && ((Time.now - @last_update_domain) < 5)
165 res = query(GET_DOMAINS)
166 res.each { |dmid,namespace| tmp[namespace] = dmid.to_i }
167 @last_update_domain = Time.now
168 @cache_domain = tmp.freeze
172 refresh_domain[domain] || refresh_domain(true)[domain] or \
173 raise MogileFS::Backend::DomainNotFoundError, domain