bigfile: better factor this for use with IO.copy_stream
[ruby-mogilefs-client.git] / lib / mogilefs / mysql.rb
blob4fe96061e8a2bf8bba5ec1e556341459ec6d9fd1
1 # -*- encoding: binary -*-
2 require 'mogilefs'
3 require 'mogilefs/backend' # for the exceptions
5 # read-only interface that can be a backend for MogileFS::MogileFS
7 # This provides direct, read-only access to any slave MySQL database to
8 # provide better performance, scalability and eliminate mogilefsd as a
9 # point of failure
10 class MogileFS::Mysql
12   attr_reader :my
13   attr_reader :query_method
15   ##
16   # Creates a new MogileFS::Mysql instance.  +args+ must include a key
17   # :domain specifying the domain of this client and :mysql, specifying
18   # an already-initialized Mysql object.
19   #
20   # The Mysql object can be either the standard Mysql driver or the
21   # Mysqlplus one supporting c_async_query.
22   def initialize(args = {})
23     @my = args[:mysql]
24     @query_method = @my.respond_to?(:c_async_query) ? :c_async_query : :query
25     @last_update_device = @last_update_domain = Time.at(0)
26     @cache_domain = @cache_device = nil
27   end
29   ##
30   # Lists keys starting with +prefix+ follwing +after+ up to +limit+.  If
31   # +after+ is nil the list starts at the beginning.
32   def _list_keys(domain, prefix = '', after = '', limit = 1000, &block)
33     # this code is based on server/lib/MogileFS/Worker/Query.pm
34     dmid = get_dmid(domain)
36     # don't modify passed arguments
37     limit ||= 1000
38     limit = limit.to_i
39     limit = 1000 if limit > 1000 || limit <= 0
40     after, prefix = "#{after}", "#{prefix}"
42     if after.length > 0 && /^#{Regexp.quote(prefix)}/ !~ after
43       raise MogileFS::Backend::AfterMismatchError
44     end
46     raise MogileFS::Backend::InvalidCharsError if /[%\\]/ =~ prefix
47     prefix.gsub!(/_/, '\_') # not sure why MogileFS::Worker::Query does this...
49     sql = <<-EOS
50     SELECT dkey,length,devcount FROM file
51     WHERE dmid = #{dmid}
52       AND dkey LIKE '#{@my.quote(prefix)}%'
53       AND dkey > '#{@my.quote(after)}'
54     ORDER BY dkey LIMIT #{limit}
55     EOS
57     keys = []
58     query(sql).each do |dkey,length,devcount|
59       yield(dkey, length.to_i, devcount.to_i) if block_given?
60       keys << dkey
61     end
63     keys.empty? ? nil : [ keys, (keys.last || '') ]
64   end
66   ##
67   # Returns the size of +key+.
68   def _size(domain, key)
69     dmid = get_dmid(domain)
71     sql = <<-EOS
72     SELECT length FROM file
73     WHERE dmid = #{dmid} AND dkey = '#{@my.quote(key)}'
74     LIMIT 1
75     EOS
77     res = query(sql).fetch_row
78     return res[0].to_i if res && res[0]
79     raise MogileFS::Backend::UnknownKeyError
80   end
82   ##
83   # Get the paths for +key+.
84   def _get_paths(params = {})
85     zone = params[:zone]
86     noverify = (params[:noverify] == 1) # TODO this is unused atm
87     dmid = get_dmid(params[:domain])
88     devices = refresh_device or raise MogileFS::Backend::NoDevicesError
89     urls = []
90     sql = <<-EOS
91     SELECT fid FROM file
92     WHERE dmid = #{dmid} AND dkey = '#{@my.quote(params[:key])}'
93     LIMIT 1
94     EOS
96     res = query(sql).fetch_row
97     res && res[0] or raise MogileFS::Backend::UnknownKeyError
98     fid = res[0]
99     sql = "SELECT devid FROM file_on WHERE fid = '#{@my.quote(fid)}'"
100     query(sql).each do |devid,|
101       unless devinfo = devices[devid.to_i]
102         devices = refresh_device(true)
103         devinfo = devices[devid.to_i] or next
104       end
105       devinfo[:readable] or next
106       port = devinfo[:http_get_port]
107       host = zone && zone == 'alt' ? devinfo[:altip] : devinfo[:hostip]
108       nfid = '%010u' % fid
109       b, mmm, ttt = /(\d)(\d{3})(\d{3})(?:\d{3})/.match(nfid)[1..3]
110       uri = "/dev#{devid}/#{b}/#{mmm}/#{ttt}/#{nfid}.fid"
111       urls << "http://#{host}:#{port}#{uri}"
112     end
113     urls
114   end
116   def sleep(params); Kernel.sleep(params[:duration] || 10); {}; end
118   private
120     unless defined? GET_DEVICES
121       GET_DOMAINS = 'SELECT dmid,namespace FROM domain'.freeze
123       GET_DEVICES = <<-EOS
124         SELECT d.devid, h.hostip, h.altip, h.http_port, h.http_get_port,
125           d.status, h.status
126         FROM device d
127           LEFT JOIN host h ON d.hostid = h.hostid
128       EOS
129       GET_DEVICES.freeze
130     end
132     def query(sql)
133       @my.send(@query_method, sql)
134     end
136     DEV_STATUS_READABLE = {
137       "alive" => true,
138       "readonly" => true,
139       "drain" => true,
140     }.freeze
142     def refresh_device(force = false)
143       return @cache_device if ! force && ((Time.now - @last_update_device) < 60)
144       tmp = {}
145       res = query(GET_DEVICES)
146       res.each do |devid, hostip, altip, http_port, http_get_port,
147                    dev_status, host_status|
148         http_port = http_port ? http_port.to_i : 80
149         tmp[devid.to_i] = {
150           :hostip => hostip.freeze,
151           :altip => (altip || hostip).freeze,
152           :readable => (host_status == "alive" &&
153                         DEV_STATUS_READABLE.include?(dev_status)),
154           :http_port => http_port,
155           :http_get_port => http_get_port ?  http_get_port.to_i : http_port,
156         }.freeze
157       end
158       @last_update_device = Time.now
159       @cache_device = tmp.freeze
160     end
162     def refresh_domain(force = false)
163       return @cache_domain if ! force && ((Time.now - @last_update_domain) < 5)
164       tmp = {}
165       res = query(GET_DOMAINS)
166       res.each { |dmid,namespace| tmp[namespace] = dmid.to_i }
167       @last_update_domain = Time.now
168       @cache_domain = tmp.freeze
169     end
171     def get_dmid(domain)
172       refresh_domain[domain] || refresh_domain(true)[domain] or \
173         raise MogileFS::Backend::DomainNotFoundError, domain
174     end