new_file: don't pass private field to create_open
[ruby-mogilefs-client.git] / lib / mogilefs / mysql.rb
blob9be8b560801c9efc3ba2854da3364284138646ef
1 # -*- encoding: binary -*-
2 # Consider this deprecated, to be removed at some point...
4 # read-only interface that can be a backend for MogileFS::MogileFS
6 # This provides direct, read-only access to any slave MySQL database to
7 # provide better performance, scalability and eliminate mogilefsd as a
8 # point of failure
9 class MogileFS::Mysql
11   attr_reader :my
12   attr_reader :query_method
14   ##
15   # Creates a new MogileFS::Mysql instance.  +args+ must include a key
16   # :domain specifying the domain of this client and :mysql, specifying
17   # an already-initialized Mysql object.
18   #
19   # The Mysql object can be either the standard Mysql driver or the
20   # Mysqlplus one supporting c_async_query.
21   def initialize(args = {})
22     @my = args[:mysql]
23     @query_method = @my.respond_to?(:c_async_query) ? :c_async_query : :query
24     @last_update_device = @last_update_domain = Time.at(0)
25     @cache_domain = @cache_device = nil
26   end
28   ##
29   # Lists keys starting with +prefix+ follwing +after+ up to +limit+.  If
30   # +after+ is nil the list starts at the beginning.
31   def _list_keys(domain, prefix = '', after = '', limit = 1000)
32     # this code is based on server/lib/MogileFS/Worker/Query.pm
33     dmid = get_dmid(domain)
35     # don't modify passed arguments
36     limit ||= 1000
37     limit = limit.to_i
38     limit = 1000 if limit > 1000 || limit <= 0
39     after, prefix = "#{after}", "#{prefix}"
41     if after.length > 0 && /^#{Regexp.quote(prefix)}/ !~ after
42       raise MogileFS::Backend::AfterMismatchError
43     end
45     raise MogileFS::Backend::InvalidCharsError if /[%\\]/ =~ prefix
46     prefix.gsub!(/_/, '\_') # not sure why MogileFS::Worker::Query does this...
48     sql = <<-EOS
49     SELECT dkey,length,devcount FROM file
50     WHERE dmid = #{dmid}
51       AND dkey LIKE '#{@my.quote(prefix)}%'
52       AND dkey > '#{@my.quote(after)}'
53     ORDER BY dkey LIMIT #{limit}
54     EOS
56     keys = []
57     query(sql).each do |dkey,length,devcount|
58       yield(dkey, length.to_i, devcount.to_i) if block_given?
59       keys << dkey
60     end
62     keys.empty? ? nil : [ keys, (keys.last || '') ]
63   end
65   ##
66   # Returns the size of +key+.
67   def _size(domain, key)
68     dmid = get_dmid(domain)
70     sql = <<-EOS
71     SELECT length FROM file
72     WHERE dmid = #{dmid} AND dkey = '#{@my.quote(key)}'
73     LIMIT 1
74     EOS
76     res = query(sql).fetch_row
77     return res[0].to_i if res && res[0]
78     raise MogileFS::Backend::UnknownKeyError
79   end
81   ##
82   # Get the paths for +key+.
83   def _get_paths(params = {})
84     zone = params[:zone]
85     noverify = (params[:noverify] == 1) # TODO this is unused atm
86     dmid = get_dmid(params[:domain])
87     devices = refresh_device or raise MogileFS::Backend::NoDevicesError
88     urls = []
89     sql = <<-EOS
90     SELECT fid FROM file
91     WHERE dmid = #{dmid} AND dkey = '#{@my.quote(params[:key])}'
92     LIMIT 1
93     EOS
95     res = query(sql).fetch_row
96     res && res[0] or raise MogileFS::Backend::UnknownKeyError
97     fid = res[0]
98     sql = "SELECT devid FROM file_on WHERE fid = '#{@my.quote(fid)}'"
99     query(sql).each do |devid,|
100       unless devinfo = devices[devid.to_i]
101         devices = refresh_device(true)
102         devinfo = devices[devid.to_i] or next
103       end
104       devinfo[:readable] or next
105       port = devinfo[:http_get_port]
106       host = zone && zone == 'alt' ? devinfo[:altip] : devinfo[:hostip]
107       nfid = '%010u' % fid
108       b, mmm, ttt = /(\d)(\d{3})(\d{3})(?:\d{3})/.match(nfid)[1..3]
109       uri = "/dev#{devid}/#{b}/#{mmm}/#{ttt}/#{nfid}.fid"
110       urls << "http://#{host}:#{port}#{uri}"
111     end
112     urls
113   end
115   def sleep(params); Kernel.sleep(params[:duration] || 10); {}; end
117   private
119     unless defined? GET_DEVICES
120       GET_DOMAINS = 'SELECT dmid,namespace FROM domain'.freeze
122       GET_DEVICES = <<-EOS
123         SELECT d.devid, h.hostip, h.altip, h.http_port, h.http_get_port,
124           d.status, h.status
125         FROM device d
126           LEFT JOIN host h ON d.hostid = h.hostid
127       EOS
128       GET_DEVICES.freeze
129     end
131     def query(sql)
132       @my.send(@query_method, sql)
133     end
135     DEV_STATUS_READABLE = {
136       "alive" => true,
137       "readonly" => true,
138       "drain" => true,
139     }.freeze
141     def refresh_device(force = false)
142       return @cache_device if ! force && ((Time.now - @last_update_device) < 60)
143       tmp = {}
144       res = query(GET_DEVICES)
145       res.each do |devid, hostip, altip, http_port, http_get_port,
146                    dev_status, host_status|
147         http_port = http_port ? http_port.to_i : 80
148         tmp[devid.to_i] = {
149           :hostip => hostip.freeze,
150           :altip => (altip || hostip).freeze,
151           :readable => (host_status == "alive" &&
152                         DEV_STATUS_READABLE.include?(dev_status)),
153           :http_port => http_port,
154           :http_get_port => http_get_port ?  http_get_port.to_i : http_port,
155         }.freeze
156       end
157       @last_update_device = Time.now
158       @cache_device = tmp.freeze
159     end
161     def refresh_domain(force = false)
162       return @cache_domain if ! force && ((Time.now - @last_update_domain) < 5)
163       tmp = {}
164       res = query(GET_DOMAINS)
165       res.each { |dmid,namespace| tmp[namespace] = dmid.to_i }
166       @last_update_domain = Time.now
167       @cache_domain = tmp.freeze
168     end
170     def get_dmid(domain)
171       refresh_domain[domain] || refresh_domain(true)[domain] or \
172         raise MogileFS::Backend::DomainNotFoundError, domain
173     end