2 require 'mogilefs/backend' # for the exceptions
5 # read-only interface that looks like MogileFS::MogileFS This provides
6 # direct, read-only access to any slave MySQL database to provide better
7 # performance and eliminate extra points of failure
14 def initialize(param = {})
15 @domain = param[:domain]
16 @my = Mysql.new(param[:host], param[:user], param[:passwd],
17 param[:db], param[:port], param[:sock], param[:flag])
18 @my.reconnect = param[:reconnect] if param.include?(:reconnect)
19 @my.options(Mysql::OPT_CONNECT_TIMEOUT,
20 param[:connect_timeout] ? param[:connect_timeout] : 1)
21 @my.options(Mysql::OPT_READ_TIMEOUT,
22 param[:read_timeout] ? param[:read_timeout] : 1)
23 @my.options(Mysql::OPT_WRITE_TIMEOUT,
24 param[:write_timeout] ? param[:write_timeout] : 1)
25 @last_update_device = @last_update_domain = Time.at(0)
26 @cache_domain = @cache_device = nil
29 def list_keys(prefix, after = '', limit = 1000, &block)
30 # this code is based on server/lib/MogileFS/Worker/Query.pm
31 dmid = refresh_domain[@domain] or \
32 raise MogileFS::Backend::DomainNotFoundError
34 # don't modify passed arguments
37 limit = 1000 if limit > 1000 || limit <= 0
41 if after.length > 0 && /^#{Regexp.quote(prefix)}/ !~ after
42 raise MogileFS::Backend::AfterMismatchError
45 raise MogileFS::Backend::InvalidCharsError if /[%\\]/ =~ prefix
46 prefix.gsub!(/_/, '\_') # not sure why MogileFS::Worker::Query does this...
49 SELECT dkey,length,devcount FROM file
51 AND dkey LIKE '#{@my.quote(prefix)}%'
52 AND dkey > '#{@my.quote(after)}'
53 ORDER BY dkey LIMIT #{limit}
57 @my.c_async_query(sql).each do |dkey,length,devcount|
58 yield(dkey, length, devcount) if block_given?
61 return [ keys, keys.last || '']
65 dmid = refresh_domain[@domain] or \
66 raise MogileFS::Backend::DomainNotFoundError
69 SELECT length FROM file
70 WHERE dmid = #{dmid} AND dkey = '#{@my.quote(key)}'
74 res = @my.c_async_query(sql).fetch_row
75 return res[0].to_i if res && res[0]
76 raise MogileFS::Backend::UnknownKeyError
79 def get_paths(key, noverify = true, zone = nil)
80 dmid = refresh_domain[@domain] or \
81 raise MogileFS::Backend::DomainNotFoundError
82 devices = refresh_device or raise MogileFS::Backend::NoDevicesError
86 WHERE dmid = #{dmid} AND dkey = '#{@my.quote(key)}'
90 res = @my.c_async_query(sql).fetch_row
91 res && res[0] or raise MogileFS::Backend::UnknownKeyError
93 sql = "SELECT devid FROM file_on WHERE fid = '#{@my.quote(fid)}'"
94 @my.c_async_query(sql).each do |devid,|
95 devinfo = devices[devid.to_i]
96 port = devinfo[:http_get_port] || devinfo[:http_port] || 80
97 host = zone && zone == 'alt' ? devinfo[:altip] : devinfo[:hostip]
99 b, mmm, ttt = /(\d)(\d{3})(\d{3})(?:\d{3})/.match(nfid)[1..3]
100 uri = "/dev#{devid}/#{b}/#{mmm}/#{ttt}/#{nfid}.fid"
101 urls << "http://#{host}:#{port}#{uri}"
108 unless defined? GET_DEVICES
110 SELECT d.devid, h.hostip, h.altip, h.http_port, h.http_get_port
112 LEFT JOIN host h ON d.hostid = h.hostid
113 WHERE d.status IN ('alive','readonly','drain');
118 def refresh_device(force = false)
119 return @cache_device if ! force && ((Time.now - @last_update_device) < 60)
121 res = @my.c_async_query(GET_DEVICES)
122 res.each do |devid, hostip, altip, http_port, http_get_port|
124 :hostip => hostip.freeze,
125 :altip => altip.freeze,
126 :http_port => http_port ? http_port.to_i : nil,
127 :http_get_port => http_get_port ? http_get_port.to_i : nil,
130 @last_update_device = Time.now
131 @cache_device = tmp.freeze
134 def refresh_domain(force = false)
135 return @cache_domain if ! force && ((Time.now - @last_update_domain) < 5)
137 res = @my.c_async_query('SELECT dmid,namespace FROM domain')
138 res.each { |dmid,namespace| tmp[namespace] = dmid.to_i }
139 @last_update_domain = Time.now
140 @cache_domain = tmp.freeze