426217e2605e02687174304a433d4b8c90ac2e26
[ruby-mogilefs-client.git] / lib / mogilefs / mogilefs.rb
blob426217e2605e02687174304a433d4b8c90ac2e26
1 # -*- encoding: binary -*-
3 # \MogileFS file manipulation client.
5 #   Create a new instance that will communicate with these trackers:
6 #   hosts = %w[192.168.1.69:6001 192.168.1.70:6001]
7 #   mg = MogileFS::MogileFS.new(:domain => 'test', :hosts => hosts)
9 #   # Stores "A bunch of text to store" into 'some_key' with a class of 'text'.
10 #   mg.store_content('some_key', 'text', "A bunch of text to store")
12 #   # Retrieve data from 'some_key' as a string
13 #   data = mg.get_file_data('some_key')
15 #   # Store the contents of 'image.jpeg' into the key 'my_image' with a
16 #   # class of 'image'.
17 #   mg.store_file('my_image', 'image', 'image.jpeg')
19 #   # Store the contents of 'image.jpeg' into the key 'my_image' with a
20 #   # class of 'image' using an open IO object.
21 #   File.open('image.jpeg') { |fp| mg.store_file('my_image', 'image', fp) }
23 #   # Retrieve the contents of 'my_image' into '/path/to/huge_file'
24 #   # without slurping the entire contents into memory:
25 #   mg.get_file_data('my_image', '/path/to/huge_file')
27 #   # Remove the key 'my_image' and 'some_key'.
28 #   mg.delete('my_image')
29 #   mg.delete('some_key')
31 class MogileFS::MogileFS < MogileFS::Client
32   include MogileFS::Bigfile
34   # The domain of keys for this MogileFS client.
35   attr_accessor :domain
37   # The timeout for get_file_data (per-read() system call).
38   # Defaults to five seconds.
39   attr_accessor :get_file_data_timeout
41   # The maximum allowed time for creating a new_file.  Defaults to 1 hour.
42   attr_accessor :new_file_max_time
44   # Creates a new MogileFS::MogileFS instance.  +args+ must include a key
45   # :domain specifying the domain of this client.
46   #
47   # Optional parameters for +args+:
48   #
49   # [:get_file_data_timeout => Numeric]
50   #
51   #   See get_file_data_timeout
52   #
53   # [:new_file_max_time => Numeric]
54   #
55   #   See new_file_max_time
56   #
57   # [:fail_timeout => Numeric]
58   #
59   #   Delay before retrying a failed tracker backends.
60   #   Defaults to 5 seconds.
61   #
62   # [:timeout => Numeric]
63   #
64   #   Timeout for tracker backend responses.
65   #   Defaults to 3 seconds.
66   #
67   def initialize(args = {})
68     @domain = args[:domain]
70     @get_file_data_timeout = args[:get_file_data_timeout] || 5
71     @new_file_max_time = args[:new_file_max_time] || 3600.0
73     raise ArgumentError, "you must specify a domain" unless @domain
75     if @backend = args[:db_backend]
76       @readonly = true
77     else
78       super
79     end
80   end
82   # Enumerates keys, limited by optional +prefix+
83   def each_key(prefix = "", &block)
84     after = nil
85     begin
86       keys, after = list_keys(prefix, after, 1000, &block)
87     end while keys && keys[0]
88     nil
89   end
91   # Enumerates keys and yields a +file_info+ hash for each key matched by
92   # +prefix+
93   def each_file_info(prefix = "", args = nil)
94     # FIXME: there's a lot of duplicate code from list_keys_verbose here...
95     raise ArgumentError, "need block" unless block_given?
96     ordered = ready = nil
97     on_file_info = lambda do |info|
98       Hash === info or raise info
99       file_info_cleanup(info)
101       # deal with trackers with multiple queryworkers responding out-of-order
102       ready[info["key"]] = info
103       while info = ready.delete(ordered[-1])
104         ordered.pop
105         yield info
106       end
107     end
109     nr = 0
110     opts = { :domain => @domain }
111     opts[:devices] = 1 if args && args[:devices]
112     after = args ? args[:after] : nil
113     limit = args ? args[:limit] : nil
115     begin
116       keys, after = list_keys(prefix, after, limit || 1000)
117       return nr unless keys && keys[0]
118       ordered = keys.reverse
119       ready = {}
120       nr += keys.size
121       limit -= keys.size if limit
123       keys.each do |key|
124         opts[:key] = key
125         @backend.pipeline_dispatch(:file_info, opts, &on_file_info)
126       end
127       @backend.pipeline_wait
128     rescue MogileFS::PipelineError, SystemCallError,
129            MogileFS::RequestTruncatedError,
130            MogileFS::UnreadableSocketError,
131            MogileFS::InvalidResponseError, # truncated response
132            MogileFS::Timeout
133       @backend.shutdown
134       keys = ordered - ready.keys
135       retry
136     end while limit == nil || limit > 0
137   rescue
138     @backend.shutdown
139     raise
140   end
142   # Retrieves the contents of +key+.  If +dst+ is specified, +dst+
143   # should be an IO-like object capable of receiving the +write+ method
144   # or a path name.  +copy_length+ may be specified to limit the number of
145   # bytes to retrieve, and +src_offset+ can be specified to specified the
146   # start position of the copy.
147   def get_file_data(key, dst = nil, copy_length = nil, src_offset = nil)
148     paths = get_paths(key)
149     sock = MogileFS::HTTPReader.first(paths, @get_file_data_timeout,
150                                       copy_length, src_offset)
151     if dst
152       sock.stream_to(dst)
153     elsif block_given?
154       yield(sock)
155     else
156       sock.to_s
157     end
158     ensure
159       sock.close if sock && ! sock.closed?
160   end
162   # Get the paths (URLs as strings) for +key+.  If +args+ is specified,
163   # it may contain:
164   # - :noverify -> boolean, whether or not the tracker checks (default: true)
165   # - :pathcount -> a positive integer of URLs to retrieve (default: 2)
166   # - :zone -> "alt" or nil (default: nil)
167   #
168   # :noverify defaults to true because this client library is capable of
169   # verifying paths for readability itself.  It is also faster and more
170   # reliable to verify paths on the client.
171   def get_paths(key, *args)
172     opts = {
173       :domain => @domain,
174       :key => key,
175       :noverify => args[0],
176       :zone => args[1],
177     }
178     if Hash === args[0]
179       args = args[0]
180       opts[:noverify] = args[:noverify]
181       opts[:zone] = args[:zone]
182       pathcount = args[:pathcount] and opts[:pathcount] = pathcount.to_i
183     end
185     opts[:noverify] = false == opts[:noverify] ? 0 : 1
186     @backend.respond_to?(:_get_paths) and return @backend._get_paths(opts)
187     res = @backend.get_paths(opts)
188     (1..res['paths'].to_i).map { |i| res["path#{i}"] }
189   end
191   # Returns +true+ if +key+ exists, +false+ if not
192   def exist?(key)
193     args = { :key => key, :domain => @domain , :ruby_no_raise => true}
194     case rv = @backend.get_paths(args)
195     when Hash
196       true
197     when MogileFS::Backend::UnknownKeyError
198       false
199     else
200       raise rv
201     end
202   end
204   # Get the URIs for +key+ (paths) as URI::HTTP objects
205   def get_uris(key, *args)
206     get_paths(key, *args).map! { |path| URI.parse(path) }
207   end
209   # Creates a new file +key+ in the domain of this object.
210   #
211   # +bytes+ is the expected size of the file if known in advance
212   #
213   # It operates like File.open(..., "w") and may take an optional
214   # block, yielding an IO-like object with support for the methods
215   # documented in MogileFS::NewFile::Writer.
216   #
217   # This atomically replaces existing data stored as +key+
218   # when the block exits or when the returned object is closed.
219   #
220   # +args+ may contain the following options:
221   #
222   # [:content_length => Integer]
223   #
224   #   This has the same effect as the (deprecated) +bytes+ parameter.
225   #
226   # [ :largefile => :stream, :content_range or :tempfile ]
227   #
228   #   See MogileFS::NewFile for more information on this
229   #
230   # [ :class => String]
231   #
232   #   The MogileFS storage class of the object.
233   #
234   # [:content_md5 => String, Proc, or :trailer]
235   #
236   #   This can either be a Base64-encoded String, a Proc object, or
237   #   the :trailer symbol.  If given a String, it will be used as the
238   #   Content-MD5 HTTP header.  If given the :trailer symbol, this library
239   #   will automatically generate an Content-MD5 HTTP trailer.  If given
240   #   a Proc object, this Proc object should give a Base64-encoded string
241   #   which can be used as the Content-MD5 HTTP trailer when called at the
242   #   end of the request.
243   #
244   #   Keep in mind most HTTP servers do not support HTTP trailers, so
245   #   passing a String is usually the safest way to use this.
246   #
247   def new_file(key, args = nil, bytes = nil) # :yields: file
248     raise MogileFS::ReadOnlyError if readonly?
249     opts = { :key => key, :multi_dest => 1 }
250     case args
251     when Hash
252       opts[:domain] = args[:domain]
253       klass = args[:class] and "default" != klass and opts[:class] = klass
254     when String
255       opts[:class] = args if "default" != args
256     end
257     opts[:domain] ||= @domain
258     res = @backend.create_open(opts)
260     dests = if dev_count = res['dev_count'] # multi_dest succeeded
261       (1..dev_count.to_i).map { |i| [res["devid_#{i}"], res["path_#{i}"]] }
262     else # single destination returned
263       # 0x0040:  d0e4 4f4b 2064 6576 6964 3d31 2666 6964  ..OK.devid=1&fid
264       # 0x0050:  3d33 2670 6174 683d 6874 7470 3a2f 2f31  =3&path=http://1
265       # 0x0060:  3932 2e31 3638 2e31 2e37 323a 3735 3030  92.168.1.72:7500
266       # 0x0070:  2f64 6576 312f 302f 3030 302f 3030 302f  /dev1/0/000/000/
267       # 0x0080:  3030 3030 3030 3030 3033 2e66 6964 0d0a  0000000003.fid..
269       [[res['devid'], res['path']]]
270     end
272     opts.merge!(args) if Hash === args
273     opts[:backend] = @backend
274     opts[:fid] = res['fid']
275     opts[:content_length] ||= bytes if bytes
276     opts[:new_file_max_time] ||= @new_file_max_time
277     opts[:start_time] = Time.now
279     case (dests[0][1] rescue nil)
280     when %r{\Ahttp://}
281       http_file = MogileFS::NewFile.new(dests, opts)
282       if block_given?
283         yield http_file
284         return http_file.commit # calls create_close
285       else
286         return http_file
287       end
288     when nil, ''
289       raise MogileFS::EmptyPathError,
290             "Empty path for mogile upload res=#{res.inspect}"
291     else
292       raise MogileFS::UnsupportedPathError,
293             "paths '#{dests.inspect}' returned by backend is not supported"
294     end
295   end
297   # Copies the contents of +file+ into +key+ in class +klass+.  +file+ can be
298   # either a path name (String or Pathname object) or an IO-like object that
299   # responds to #read or #readpartial.  Returns size of +file+ stored.
300   # This atomically replaces existing data stored as +key+
301   def store_file(key, klass, file, opts = nil)
302     raise MogileFS::ReadOnlyError if readonly?
303     (opts ||= {})[:class] = klass if String === klass
305     new_file(key, opts) { |mfp| mfp.big_io = file }
306   end
308   # Stores +content+ into +key+ in class +klass+, where +content+ is a String
309   # This atomically replaces existing data stored as +key+
310   def store_content(key, klass, content, opts = nil)
311     raise MogileFS::ReadOnlyError if readonly?
312     (opts ||= {})[:class] = klass if String === klass
314     new_file(key, opts) do |mfp|
315       if content.is_a?(MogileFS::Util::StoreContent)
316         mfp.streaming_io = content
317       else
318         mfp << content
319       end
320     end
321   end
323   # Removes +key+.
324   def delete(key)
325     raise MogileFS::ReadOnlyError if readonly?
327     @backend.delete :domain => @domain, :key => key
328     true
329   end
331   # Updates +key+ to +newclass+
332   def updateclass(key, newclass)
333     raise MogileFS::ReadOnlyError if readonly?
335     @backend.updateclass(:domain => @domain, :key => key, :class => newclass)
336     true
337   end
339   # Sleeps +duration+, only used for testing
340   def sleep(duration) # :nodoc:
341     @backend.sleep :duration => duration
342   end
344   # Renames a key +from+ to key +to+.
345   def rename(from, to)
346     raise MogileFS::ReadOnlyError if readonly?
348     @backend.rename :domain => @domain, :from_key => from, :to_key => to
349     nil
350   end
352   # Returns the size of +key+.
353   def size(key)
354     @backend.respond_to?(:_size) and return @backend._size(domain, key)
355     begin
356       file_info(key)["length"].to_i
357     rescue MogileFS::Backend::UnknownCommandError
358       paths_size(get_paths(key))
359     end
360   end
362   def paths_size(paths) # :nodoc:
363     require "mogilefs/paths_size"
364     MogileFS::PathsSize.call(paths)
365   end
367   # Lists keys starting with +prefix+ following +after+ up to +limit+.  If
368   # +after+ is nil the list starts at the beginning.
369   def list_keys(prefix = "", after = nil, limit = 1000, &block)
370     @backend.respond_to?(:_list_keys) and
371       return @backend._list_keys(domain, prefix, after, limit, &block)
373     res = @backend.list_keys(:domain => domain, :prefix => prefix,
374                              :after => after, :limit => limit,
375                              :ruby_no_raise => true)
376     MogileFS::Backend::NoneMatchError === res and return
377     raise res if MogileFS::Error === res
379     keys = (1..res['key_count'].to_i).map { |i| res["key_#{i}"] }
380     if block
381       if 1 == block.arity
382         keys.each { |key| block.call(key) }
383       else
384         list_keys_verbose(keys, block)
385       end
386     end
388     [ keys, res['next_after'] ]
389   end
391   def list_keys_verbose(keys, block) # :nodoc:
392     # emulate the MogileFS::Mysql interface, slowly...
393     ordered = keys.dup
394     ready = {}
395     on_file_info = lambda do |info|
396       Hash === info or raise info
397       file_info_cleanup(info)
399       # deal with trackers with multiple queryworkers responding out-of-order
400       ready[info["key"]] = info
401       while info = ready.delete(ordered[0])
402         block.call(ordered.shift, info["length"], info["devcount"])
403       end
404     end
405     opts = { :domain => @domain }
406     begin
407       keys.each do |key|
408         opts[:key] = key
409         @backend.pipeline_dispatch(:file_info, opts, &on_file_info)
410       end
411       @backend.pipeline_wait
412     rescue MogileFS::Backend::UnknownCommandError # MogileFS < 2.45
413       @backend.shutdown # reset the socket
414       args = { :pathcount => 0x7fffffff }
415       keys.each do |key|
416         paths = get_paths(key, args)
417         block.call(key, paths_size(paths), paths.size)
418       end
419     rescue MogileFS::PipelineError, SystemCallError,
420            MogileFS::RequestTruncatedError,
421            MogileFS::UnreadableSocketError,
422            MogileFS::InvalidResponseError, # truncated response
423            MogileFS::Timeout
424       @backend.shutdown
425       keys = ordered - ready.keys
426       retry
427     rescue
428       @backend.shutdown
429       raise
430     end
431   end
433   # Return metadata about a file as a hash.
434   # Returns the domain, class, length, devcount, etc. as keys.
435   # Optionally, device ids (not paths) can be returned as
436   # well if :devices is specified and +true+.
437   #
438   # This should only be used for informational purposes, and not usually
439   # for dynamically serving files.
440   #
441   #   mg.file_info("bar")
442   #
443   # Returns:
444   #
445   #   {
446   #     "domain" => "foo",
447   #     "key" => "bar",
448   #     "class" => "default",
449   #     "devcount" => 2,
450   #     "length => 666
451   #   }
452   def file_info(key, args = nil)
453     opts = { :domain => @domain, :key => key }
454     args and devices = args[:devices] and opts[:devices] = devices ? 1 : 0
455     file_info_cleanup(@backend.file_info(opts))
456   end
458   def file_info_cleanup(rv) # :nodoc:
459     %w(fid length devcount).each { |f| rv[f] = rv[f].to_i }
460     devids = rv["devids"] and
461       rv["devids"] = devids.split(/,/).map! { |x| x.to_i }
462     rv
463   end
465   # Given an Integer +fid+ or String +key+ and domain, thorougly search
466   # the database for all occurences of a particular fid.
467   #
468   # Use this sparingly, this command hits the master database numerous
469   # times and is very expensive.  This is not for production use, only
470   # troubleshooting and debugging.
471   #
472   # Searches for fid=666:
473   #
474   #   client.file_debug(666)
475   #
476   # Search for key=foo using the default domain for this object:
477   #
478   #   client.file_debug("foo")
479   #
480   # Search for key=foo in domain="bar":
481   #
482   #   client.file_debug(:key => "foo", :domain => "bar")
483   #
484   def file_debug(args)
485     case args
486     when Integer then args = { "fid" => args }
487     when String then args = { "key" => args }
488     end
489     opts = { :domain => args[:domain] || @domain }.merge!(args)
491     rv = @backend.file_debug(opts)
492     rv.each do |k,v|
493       case k
494       when /_(?:classid|devcount|dmid|fid|length|
495             nexttry|fromdevid|failcount|flags|devid|type)\z/x
496         rv[k] = v.to_i
497       when /devids\z/
498         rv[k] = v.split(/,/).map! { |x| x.to_i }
499       end
500     end
501   end