mysql: allow specifying connect,read,write timeouts
[ruby-mogilefs-client.git] / lib / mogilefs / mysql.rb
blobd303628a3fb69f13be5874e9d9e0f3db545d5673
1 require 'mogilefs'
2 require 'mogilefs/backend' # for the exceptions
3 require 'mysql'
5 # read-only interface that looks like MogileFS::MogileFS This provides
6 # direct, read-only access to any slave MySQL database to provide better
7 # performance and eliminate extra points of failure
9 class MogileFS::Mysql
11   attr_accessor :domain
12   attr_reader :my
14   def initialize(param = {})
15     @domain = param[:domain]
16     @my = Mysql.new(param[:host], param[:user], param[:passwd],
17                     param[:db], param[:port], param[:sock], param[:flag])
18     @my.reconnect = param[:reconnect] if param.include?(:reconnect)
19     @my.options(Mysql::OPT_CONNECT_TIMEOUT,
20                 param[:connect_timeout] ? param[:connect_timeout] : 1)
21     @my.options(Mysql::OPT_READ_TIMEOUT,
22                 param[:read_timeout] ? param[:read_timeout] : 1)
23     @my.options(Mysql::OPT_WRITE_TIMEOUT,
24                 param[:write_timeout] ? param[:write_timeout] : 1)
25     @last_update_device = @last_update_domain = Time.at(0)
26     @cache_domain = @cache_device = nil
27   end
29   def list_keys(prefix, after = '', limit = 1000, &block)
30     # this code is based on server/lib/MogileFS/Worker/Query.pm
31     dmid = refresh_domain[@domain] or \
32       raise MogileFS::Backend::DomainNotFoundError
34     # don't modify passed arguments
35     limit ||= 1000
36     limit = limit.to_i
37     limit = 1000 if limit > 1000 || limit <= 0
38     after = "#{after}"
39     prefix = "#{prefix}"
41     if after.length > 0 && /^#{Regexp.quote(prefix)}/ !~ after
42       raise MogileFS::Backend::AfterMismatchError
43     end
45     raise MogileFS::Backend::InvalidCharsError if /[%\\]/ =~ prefix
46     prefix.gsub!(/_/, '\_') # not sure why MogileFS::Worker::Query does this...
48     sql = <<-EOS
49     SELECT dkey,length,devcount FROM file
50     WHERE dmid = #{dmid}
51       AND dkey LIKE '#{@my.quote(prefix)}%'
52       AND dkey > '#{@my.quote(after)}'
53     ORDER BY dkey LIMIT #{limit}
54     EOS
56     keys = []
57     @my.c_async_query(sql).each do |dkey,length,devcount|
58       yield(dkey, length, devcount) if block_given?
59       keys << dkey
60     end
61     return [ keys, keys.last || '']
62   end
64   def size(key)
65     dmid = refresh_domain[@domain] or \
66       raise MogileFS::Backend::DomainNotFoundError
68     sql = <<-EOS
69     SELECT length FROM file
70     WHERE dmid = #{dmid} AND dkey = '#{@my.quote(key)}'
71     LIMIT 1
72     EOS
74     res = @my.c_async_query(sql).fetch_row
75     return res[0].to_i if res && res[0]
76     raise MogileFS::Backend::UnknownKeyError
77   end
79   def get_paths(key, noverify = true, zone = nil)
80     dmid = refresh_domain[@domain] or \
81       raise MogileFS::Backend::DomainNotFoundError
82     devices = refresh_device or raise MogileFS::Backend::NoDevicesError
83     urls = []
84     sql = <<-EOS
85     SELECT fid FROM file
86     WHERE dmid = #{dmid} AND dkey = '#{@my.quote(key)}'
87     LIMIT 1
88     EOS
90     res = @my.c_async_query(sql).fetch_row
91     res && res[0] or raise MogileFS::Backend::UnknownKeyError
92     fid = res[0]
93     sql = "SELECT devid FROM file_on WHERE fid = '#{@my.quote(fid)}'"
94     @my.c_async_query(sql).each do |devid,|
95       devinfo = devices[devid.to_i]
96       port = devinfo[:http_get_port] || devinfo[:http_port] || 80
97       host = zone && zone == 'alt' ? devinfo[:altip] : devinfo[:hostip]
98       nfid = '%010u' % fid
99       b, mmm, ttt = /(\d)(\d{3})(\d{3})(?:\d{3})/.match(nfid)[1..3]
100       uri = "/dev#{devid}/#{b}/#{mmm}/#{ttt}/#{nfid}.fid"
101       urls << "http://#{host}:#{port}#{uri}"
102     end
103     urls
104   end
106   private
108     unless defined? GET_DEVICES
109       GET_DEVICES = <<-EOS
110         SELECT d.devid, h.hostip, h.altip, h.http_port, h.http_get_port
111         FROM device d
112           LEFT JOIN host h ON d.hostid = h.hostid
113         WHERE d.status IN ('alive','readonly','drain');
114       EOS
115       GET_DEVICES.freeze
116     end
118     def refresh_device(force = false)
119       return @cache_device if ! force && ((Time.now - @last_update_device) < 60)
120       tmp = {}
121       res = @my.c_async_query(GET_DEVICES)
122       res.each do |devid, hostip, altip, http_port, http_get_port|
123         tmp[devid.to_i] = {
124           :hostip => hostip.freeze,
125           :altip => altip.freeze,
126           :http_port => http_port ? http_port.to_i : nil,
127           :http_get_port => http_get_port ? http_get_port.to_i : nil,
128         }.freeze
129       end
130       @last_update_device = Time.now
131       @cache_device = tmp.freeze
132     end
134     def refresh_domain(force = false)
135       return @cache_domain if ! force && ((Time.now - @last_update_domain) < 5)
136       tmp = {}
137       res = @my.c_async_query('SELECT dmid,namespace FROM domain')
138       res.each { |dmid,namespace| tmp[namespace] = dmid.to_i }
139       @last_update_domain = Time.now
140       @cache_domain = tmp.freeze
141     end