Ruby mogilefs-client 3.12.2
[ruby-mogilefs-client.git] / lib / mogilefs / mogilefs.rb
blob990ae387e8526b2f6b2549cd2402e5af4e5bec49
1 # -*- encoding: binary -*-
3 # \MogileFS file manipulation client.
5 #   Create a new instance that will communicate with these trackers:
6 #   hosts = %w[192.168.1.69:6001 192.168.1.70:6001]
7 #   mg = MogileFS::MogileFS.new(:domain => 'test', :hosts => hosts)
9 #   # Stores "A bunch of text to store" into 'some_key' with a class of 'text'.
10 #   mg.store_content('some_key', 'text', "A bunch of text to store")
12 #   # Retrieve data from 'some_key' as a string
13 #   data = mg.get_file_data('some_key')
15 #   # Store the contents of 'image.jpeg' into the key 'my_image' with a
16 #   # class of 'image'.
17 #   mg.store_file('my_image', 'image', 'image.jpeg')
19 #   # Store the contents of 'image.jpeg' into the key 'my_image' with a
20 #   # class of 'image' using an open IO object.
21 #   File.open('image.jpeg') { |fp| mg.store_file('my_image', 'image', fp) }
23 #   # Retrieve the contents of 'my_image' into '/path/to/huge_file'
24 #   # without slurping the entire contents into memory:
25 #   mg.get_file_data('my_image', '/path/to/huge_file')
27 #   # Remove the key 'my_image' and 'some_key'.
28 #   mg.delete('my_image')
29 #   mg.delete('some_key')
31 class MogileFS::MogileFS < MogileFS::Client
32   include MogileFS::Bigfile
34   # The domain of keys for this MogileFS client.
35   attr_accessor :domain
37   # The timeout for get_file_data (per-read() system call).
38   # Defaults to five seconds.
39   attr_accessor :get_file_data_timeout
41   # The maximum allowed time for creating a new_file.  Defaults to 1 hour.
42   attr_accessor :new_file_max_time
44   # Creates a new MogileFS::MogileFS instance.  +args+ must include a key
45   # :domain specifying the domain of this client.
46   #
47   # Optional parameters for +args+:
48   #
49   # [:get_file_data_timeout => Numeric]
50   #
51   #   See get_file_data_timeout
52   #
53   # [:new_file_max_time => Numeric]
54   #
55   #   See new_file_max_time
56   #
57   # [:fail_timeout => Numeric]
58   #
59   #   Delay before retrying a failed tracker backends.
60   #   Defaults to 5 seconds.
61   #
62   # [:timeout => Numeric]
63   #
64   #   Timeout for tracker backend responses.
65   #   Defaults to 3 seconds.
66   #
67   # [:connect_timeout => Integer]
68   #
69   #   Timeout for connecting to a tracker
70   #   Defaults to 3 seconds
71   def initialize(args = {})
72     @domain = args[:domain]
74     @get_file_data_timeout = args[:get_file_data_timeout] || 5
75     @new_file_max_time = args[:new_file_max_time] || 3600.0
76     @nhp_get = nhp_new('get')
77     @nhp_get.open_timeout = @nhp_get.read_timeout = @get_file_data_timeout
78     @nhp_put = nhp_new('put')
79     @nhp_put.open_timeout = @nhp_put.read_timeout = @new_file_max_time
81     raise ArgumentError, "you must specify a domain" unless @domain
83     if @backend = args[:db_backend]
84       @readonly = true
85     else
86       super
87     end
88   end
90   # Enumerates keys, limited by optional +prefix+
91   # +args+ may also be specified for an optional +:limit+
92   # and +:after+ (default: nil)
93   def each_key(prefix = "", args = nil, &block)
94     after = limit = nil
95     if args
96       after = args[:after]
97       limit = args[:limit]
98     end
99     begin
100       keys, after = list_keys(prefix, after, limit || 1000, &block)
101       return unless keys && keys[0]
102       limit -= keys.size if limit
103     end while limit == nil || limit > 0
104     nil
105   end
107   # Enumerates keys and yields a +file_info+ hash for each key matched by
108   # +prefix+
109   def each_file_info(prefix = "", args = nil)
110     # FIXME: there's a lot of duplicate code from list_keys_verbose here...
111     raise ArgumentError, "need block" unless block_given?
112     ordered = ready = nil
113     on_file_info = lambda do |info|
114       Hash === info or raise info
115       file_info_cleanup(info)
117       # deal with trackers with multiple queryworkers responding out-of-order
118       ready[info["key"]] = info
119       while info = ready.delete(ordered[-1])
120         ordered.pop
121         yield info
122       end
123     end
125     nr = 0
126     opts = { :domain => @domain }
127     opts[:devices] = 1 if args && args[:devices]
128     after = args ? args[:after] : nil
129     limit = args ? args[:limit] : nil
131     begin
132       keys, after = list_keys(prefix, after, limit || 1000)
133       return nr unless keys && keys[0]
134       ordered = keys.reverse
135       ready = {}
136       nr += keys.size
137       limit -= keys.size if limit
139       keys.each do |key|
140         opts[:key] = key
141         @backend.pipeline_dispatch(:file_info, opts, &on_file_info)
142       end
143       @backend.pipeline_wait
144     rescue MogileFS::PipelineError, SystemCallError,
145            MogileFS::RequestTruncatedError,
146            MogileFS::UnreadableSocketError,
147            MogileFS::InvalidResponseError, # truncated response
148            MogileFS::Timeout
149       @backend.shutdown
150       keys = (ordered - ready.keys).reverse!
151       retry
152     end while limit == nil || limit > 0
153   rescue
154     @backend.shutdown
155     raise
156   end
158   # Retrieves the contents of +key+.  If +dst+ is specified, +dst+
159   # should be an IO-like object capable of receiving the +write+ method
160   # or a path name.  +copy_length+ may be specified to limit the number of
161   # bytes to retrieve, and +src_offset+ can be specified to specified the
162   # start position of the copy.
163   def get_file_data(key, dst = nil, copy_length = nil, src_offset = nil)
164     paths = get_paths(key)
165     if src_offset || copy_length
166       src_offset ||= 0
167       range_end = copy_length ? src_offset + copy_length - 1 : nil
168       range = [ src_offset, range_end ]
169     end
171     if dst
172       sock = MogileFS::HTTPReader.first(paths, @get_file_data_timeout, range)
173       sock.stream_to(dst)
174     elsif block_given?
175       sock = MogileFS::HTTPReader.first(paths, @get_file_data_timeout, range)
176       yield(sock)
177     else
178       errors = nil
179       paths.each do |path|
180         uri = URI.parse(path)
181         get = Net::HTTP::Get.new(uri.path)
182         get["range"] = "bytes=#{range[0]}-#{range[1]}" if range
183         begin
184           res = @nhp_get.request(uri, get)
185           case res.code.to_i
186           when 200, 206
187             return res.body
188           end
189           (errors ||= []) << "#{path} - #{res.message} (#{res.class})"
190         rescue => e
191           (errors ||= []) << "#{path} - #{e.message} (#{e.class})"
192         end
193       end
194       raise MogileFS::Error,
195             "all paths failed with GET: #{errors.join(', ')}", []
196     end
197   ensure
198     sock.close if sock && ! sock.closed?
199   end
201   # Get the paths (URLs as strings) for +key+.  If +args+ is specified,
202   # it may contain:
203   # - :noverify -> boolean, whether or not the tracker checks (default: true)
204   # - :pathcount -> a positive integer of URLs to retrieve (default: 2)
205   # - :zone -> "alt" or nil (default: nil)
206   #
207   # :noverify defaults to true because this client library is capable of
208   # verifying paths for readability itself.  It is also faster and more
209   # reliable to verify paths on the client.
210   def get_paths(key, *args)
211     opts = {
212       :domain => @domain,
213       :key => key,
214       :noverify => args[0],
215       :zone => args[1],
216     }
217     if Hash === args[0]
218       args = args[0]
219       opts[:noverify] = args[:noverify]
220       zone = args[:zone] and opts[:zone] = zone
221       pathcount = args[:pathcount] and opts[:pathcount] = pathcount.to_i
222     end
224     opts[:noverify] = false == opts[:noverify] ? 0 : 1
225     @backend.respond_to?(:_get_paths) and return @backend._get_paths(opts)
226     res = @backend.get_paths(opts)
227     (1..res['paths'].to_i).map { |i| res["path#{i}"] }
228   end
230   # Returns +true+ if +key+ exists, +false+ if not
231   def exist?(key)
232     args = { :key => key, :domain => @domain , :ruby_no_raise => true}
233     case rv = @backend.get_paths(args)
234     when Hash
235       true
236     when MogileFS::Backend::UnknownKeyError
237       false
238     else
239       raise rv
240     end
241   end
243   # Get the URIs for +key+ (paths) as URI::HTTP objects
244   def get_uris(key, *args)
245     get_paths(key, *args).map! { |path| URI.parse(path) }
246   end
248   # Creates a new file +key+ in the domain of this object.
249   #
250   # +bytes+ is the expected size of the file if known in advance
251   #
252   # It operates like File.open(..., "w") and may take an optional
253   # block, yielding an IO-like object with support for the methods
254   # documented in MogileFS::NewFile::Writer.
255   #
256   # This atomically replaces existing data stored as +key+
257   # when the block exits or when the returned object is closed.
258   #
259   # +args+ may contain the following options:
260   #
261   # [:content_length => Integer]
262   #
263   #   This has the same effect as the (deprecated) +bytes+ parameter.
264   #
265   # [ :largefile => :stream, :content_range or :tempfile ]
266   #
267   #   See MogileFS::NewFile for more information on this
268   #
269   # [ :class => String]
270   #
271   #   The MogileFS storage class of the object.
272   #
273   # [:content_md5 => String, Proc, or :trailer]
274   #
275   #   This can either be a Base64-encoded String, a Proc object, or
276   #   the :trailer symbol.  If given a String, it will be used as the
277   #   Content-MD5 HTTP header.  If given the :trailer symbol, this library
278   #   will automatically generate an Content-MD5 HTTP trailer.  If given
279   #   a Proc object, this Proc object should give a Base64-encoded string
280   #   which can be used as the Content-MD5 HTTP trailer when called at the
281   #   end of the request.
282   #
283   #   Keep in mind most HTTP servers do not support HTTP trailers, so
284   #   passing a String is usually the safest way to use this.
285   #
286   # [:info => Hash]
287   #
288   #   This is an empty hash that will be filled the same information
289   #   MogileFS::MogileFS#file_info.
290   #
291   #   Additionally, it contains one additional key: :uris,
292   #   an array of URI::HTTP objects to the stored destinations
293   def new_file(key, args = nil, bytes = nil) # :yields: file
294     raise MogileFS::ReadOnlyError if readonly?
295     opts = { :key => key, :multi_dest => 1 }
296     case args
297     when Hash
298       opts[:domain] = args[:domain]
299       open_args = args[:create_open_args]
300       klass = args[:class] and "default" != klass and opts[:class] = klass
301     when String
302       opts[:class] = args if "default" != args
303     end
304     opts[:domain] ||= @domain
305     res = @backend.create_open(open_args ? open_args.merge(opts) : opts)
306     opts[:nhp_put] = @nhp_put
308     dests = if dev_count = res['dev_count'] # multi_dest succeeded
309       (1..dev_count.to_i).map { |i| [res["devid_#{i}"], res["path_#{i}"]] }
310     else # single destination returned
311       # 0x0040:  d0e4 4f4b 2064 6576 6964 3d31 2666 6964  ..OK.devid=1&fid
312       # 0x0050:  3d33 2670 6174 683d 6874 7470 3a2f 2f31  =3&path=http://1
313       # 0x0060:  3932 2e31 3638 2e31 2e37 323a 3735 3030  92.168.1.72:7500
314       # 0x0070:  2f64 6576 312f 302f 3030 302f 3030 302f  /dev1/0/000/000/
315       # 0x0080:  3030 3030 3030 3030 3033 2e66 6964 0d0a  0000000003.fid..
317       [[res['devid'], res['path']]]
318     end
320     opts.merge!(args) if Hash === args
321     opts[:backend] = @backend
322     opts[:fid] = res['fid']
323     opts[:content_length] ||= bytes if bytes
324     opts[:new_file_max_time] ||= @new_file_max_time
325     opts[:start_time] = MogileFS.now
326     info = opts[:info] and info["class"] = klass || "default"
328     case (dests[0][1] rescue nil)
329     when %r{\Ahttp://}
330       http_file = MogileFS::NewFile.new(dests, opts)
331       if block_given?
332         yield http_file
333         return http_file.commit # calls create_close
334       else
335         return http_file
336       end
337     when nil, ''
338       raise MogileFS::EmptyPathError,
339             "Empty path for mogile upload res=#{res.inspect}"
340     else
341       raise MogileFS::UnsupportedPathError,
342             "paths '#{dests.inspect}' returned by backend is not supported"
343     end
344   end
346   # Copies the contents of +file+ into +key+ in class +klass+.  +file+ can be
347   # either a path name (String or Pathname object) or an IO-like object that
348   # responds to #read or #readpartial.  Returns size of +file+ stored.
349   # This atomically replaces existing data stored as +key+
350   def store_file(key, klass, file, opts = nil)
351     raise MogileFS::ReadOnlyError if readonly?
352     (opts ||= {})[:class] = klass if String === klass
354     new_file(key, opts) { |mfp| mfp.big_io = file }
355   end
357   # Stores +content+ into +key+ in class +klass+, where +content+ is a String
358   # This atomically replaces existing data stored as +key+
359   def store_content(key, klass, content, opts = nil)
360     raise MogileFS::ReadOnlyError if readonly?
361     (opts ||= {})[:class] = klass if String === klass
363     new_file(key, opts) do |mfp|
364       if content.is_a?(MogileFS::Util::StoreContent)
365         mfp.streaming_io = content
366       else
367         mfp << content
368       end
369     end
370   end
372   # Removes +key+.
373   def delete(key)
374     raise MogileFS::ReadOnlyError if readonly?
376     @backend.delete :domain => @domain, :key => key
377     true
378   end
380   # Updates +key+ to +newclass+
381   def updateclass(key, newclass)
382     raise MogileFS::ReadOnlyError if readonly?
384     @backend.updateclass(:domain => @domain, :key => key, :class => newclass)
385     true
386   end
388   # Sleeps +duration+, only used for testing
389   def sleep(duration) # :nodoc:
390     @backend.sleep :duration => duration
391   end
393   # Renames a key +from+ to key +to+.
394   def rename(from, to)
395     raise MogileFS::ReadOnlyError if readonly?
397     @backend.rename :domain => @domain, :from_key => from, :to_key => to
398     nil
399   end
401   # Returns the size of +key+.
402   def size(key)
403     @backend.respond_to?(:_size) and return @backend._size(domain, key)
404     begin
405       file_info(key)["length"].to_i
406     rescue MogileFS::Backend::UnknownCommandError
407       paths_size(get_paths(key))
408     end
409   end
411   def paths_size(paths) # :nodoc:
412     require "mogilefs/paths_size"
413     MogileFS::PathsSize.call(paths)
414   end
416   # Lists keys starting with +prefix+ following +after+ up to +limit+.  If
417   # +after+ is nil the list starts at the beginning.
418   def list_keys(prefix = "", after = nil, limit = 1000, &block)
419     @backend.respond_to?(:_list_keys) and
420       return @backend._list_keys(domain, prefix, after, limit, &block)
422     res = @backend.list_keys(:domain => domain, :prefix => prefix,
423                              :after => after, :limit => limit,
424                              :ruby_no_raise => true)
425     MogileFS::Backend::NoneMatchError === res and return
426     raise res if MogileFS::Error === res
428     keys = (1..res['key_count'].to_i).map { |i| res["key_#{i}"] }
429     if block
430       if 1 == block.arity
431         keys.each { |key| block.call(key) }
432       else
433         list_keys_verbose(keys, block)
434       end
435     end
437     [ keys, res['next_after'] ]
438   end
440   def list_keys_verbose(keys, block) # :nodoc:
441     # emulate the MogileFS::Mysql interface, slowly...
442     ordered = keys.reverse
443     ready = {}
444     on_file_info = lambda do |info|
445       Hash === info or raise info
446       file_info_cleanup(info)
448       # deal with trackers with multiple queryworkers responding out-of-order
449       ready[info["key"]] = info
450       while info = ready.delete(ordered[-1])
451         block.call(ordered.pop, info["length"], info["devcount"])
452       end
453     end
454     opts = { :domain => @domain }
455     begin
456       keys.each do |key|
457         opts[:key] = key
458         @backend.pipeline_dispatch(:file_info, opts, &on_file_info)
459       end
460       @backend.pipeline_wait
461     rescue MogileFS::Backend::UnknownCommandError # MogileFS < 2.45
462       @backend.shutdown # reset the socket
463       args = { :pathcount => 0x7fffffff }
464       keys.each do |key|
465         paths = get_paths(key, args)
466         block.call(key, paths_size(paths), paths.size)
467       end
468     rescue MogileFS::PipelineError, SystemCallError,
469            MogileFS::RequestTruncatedError,
470            MogileFS::UnreadableSocketError,
471            MogileFS::InvalidResponseError, # truncated response
472            MogileFS::Timeout
473       @backend.shutdown
474       keys = (ordered - ready.keys).reverse!
475       retry
476     rescue
477       @backend.shutdown
478       raise
479     end
480   end
482   # Return metadata about a file as a hash.
483   # Returns the domain, class, length, devcount, etc. as keys.
484   # Optionally, device ids (not paths) can be returned as
485   # well if :devices is specified and +true+.
486   #
487   # This should only be used for informational purposes, and not usually
488   # for dynamically serving files.
489   #
490   #   mg.file_info("bar")
491   #
492   # Returns:
493   #
494   #   {
495   #     "domain" => "foo",
496   #     "key" => "bar",
497   #     "class" => "default",
498   #     "devcount" => 2,
499   #     "length => 666
500   #   }
501   def file_info(key, args = nil)
502     opts = { :domain => @domain, :key => key }
503     args and devices = args[:devices] and opts[:devices] = devices ? 1 : 0
504     file_info_cleanup(@backend.file_info(opts))
505   end
507   def file_info_cleanup(rv) # :nodoc:
508     %w(fid length devcount).each { |f| rv[f] = rv[f].to_i }
509     devids = rv["devids"] and
510       rv["devids"] = devids.split(','.freeze).map! { |x| x.to_i }
511     rv
512   end
514   # Given an Integer +fid+ or String +key+ and domain, thorougly search
515   # the database for all occurences of a particular fid.
516   #
517   # Use this sparingly, this command hits the master database numerous
518   # times and is very expensive.  This is not for production use, only
519   # troubleshooting and debugging.
520   #
521   # Searches for fid=666:
522   #
523   #   client.file_debug(666)
524   #
525   # Search for key=foo using the default domain for this object:
526   #
527   #   client.file_debug("foo")
528   #
529   # Search for key=foo in domain="bar":
530   #
531   #   client.file_debug(:key => "foo", :domain => "bar")
532   #
533   def file_debug(args)
534     case args
535     when Integer then args = { "fid" => args }
536     when String then args = { "key" => args }
537     end
538     opts = { :domain => args[:domain] || @domain }.merge!(args)
540     rv = @backend.file_debug(opts)
541     rv.each do |k,v|
542       case k
543       when /_(?:classid|devcount|dmid|fid|length|
544             nexttry|fromdevid|failcount|flags|devid|type)\z/x
545         rv[k] = v.to_i
546       when /devids\z/
547         rv[k] = v.split(','.freeze).map! { |x| x.to_i }
548       end
549     end
550   end
552   def nhp_new(name) # :nodoc:
553     if Net::HTTP::Persistent::VERSION.to_f >= 3.0
554       MogileFS::NHP.new(:name => name)
555     else
556       MogileFS::NHP.new(name)
557     end
558   rescue NameError
559     MogileFS::NHP.new(name)
560   end