rename "Put" namespace to "NewFile"
[ruby-mogilefs-client.git] / lib / mogilefs / mogilefs.rb
blobb1926057b8fa9f0bf38ed6a1a8c1b039da045cce
1 # -*- encoding: binary -*-
3 # \MogileFS file manipulation client.
5 #   Create a new instance that will communicate with these trackers:
6 #   hosts = %w[192.168.1.69:6001 192.168.1.70:6001]
7 #   mg = MogileFS::MogileFS.new(:domain => 'test', :hosts => hosts)
9 #   # Stores "A bunch of text to store" into 'some_key' with a class of 'text'.
10 #   mg.store_content('some_key', 'text', "A bunch of text to store")
12 #   # Retrieve data from 'some_key' as a string
13 #   data = mg.get_file_data('some_key')
15 #   # Store the contents of 'image.jpeg' into the key 'my_image' with a
16 #   # class of 'image'.
17 #   mg.store_file('my_image', 'image', 'image.jpeg')
19 #   # Store the contents of 'image.jpeg' into the key 'my_image' with a
20 #   # class of 'image' using an open IO object.
21 #   File.open('image.jpeg') { |fp| mg.store_file('my_image', 'image', fp) }
23 #   # Retrieve the contents of 'my_image' into '/path/to/huge_file'
24 #   # without slurping the entire contents into memory:
25 #   mg.get_file_data('my_image', '/path/to/huge_file')
27 #   # Remove the key 'my_image' and 'some_key'.
28 #   mg.delete('my_image')
29 #   mg.delete('some_key')
31 class MogileFS::MogileFS < MogileFS::Client
32   include MogileFS::Bigfile
34   # The domain of keys for this MogileFS client.
35   attr_accessor :domain
37   # The timeout for get_file_data.  Defaults to five seconds.
38   attr_accessor :get_file_data_timeout
40   # Creates a new MogileFS::MogileFS instance.  +args+ must include a key
41   # :domain specifying the domain of this client.
42   def initialize(args = {})
43     @domain = args[:domain]
45     @get_file_data_timeout = 5
47     raise ArgumentError, "you must specify a domain" unless @domain
49     if @backend = args[:db_backend]
50       @readonly = true
51     else
52       super
53     end
54   end
56   # Enumerates keys, limited by optional +prefix+
57   def each_key(prefix = "", &block)
58     after = nil
59     begin
60       keys, after = list_keys(prefix, after, 1000, &block)
61     end while keys && keys[0]
62     nil
63   end
65   # Retrieves the contents of +key+.  If +dst+ is specified, +dst+
66   # should be an IO-like object capable of receiving the +write+ method
67   # or a path name.  +copy_length+ may be specified to limit the number of
68   # bytes to retrieve, and +src_offset+ can be specified to specified the
69   # start position of the copy.
70   def get_file_data(key, dst = nil, copy_length = nil, src_offset = nil)
71     paths = get_paths(key)
72     sock = MogileFS::HTTPReader.first(paths, @get_file_data_timeout,
73                                       copy_length, src_offset)
74     if dst
75       sock.stream_to(dst)
76     elsif block_given?
77       yield(sock)
78     else
79       sock.to_s
80     end
81     ensure
82       sock.close if sock && ! sock.closed?
83   end
85   # Get the paths (URLs as strings) for +key+.  If +args+ is specified,
86   # it may contain:
87   # - :noverify -> boolean, whether or not the tracker checks (default: true)
88   # - :pathcount -> a positive integer of URLs to retrieve (default: 2)
89   # - :zone -> "alt" or nil (default: nil)
90   #
91   # :noverify defaults to false because this client library is capable of
92   # verifying paths for readability itself.  It is also faster and more
93   # reliable to verify paths on the client.
94   def get_paths(key, *args)
95     opts = {
96       :domain => @domain,
97       :key => key,
98       :noverify => args[0],
99       :zone => args[1],
100     }
101     if Hash === args[0]
102       args = args[0]
103       opts[:noverify] = args[:noverify]
104       opts[:zone] = args[:zone]
105       pathcount = args[:pathcount] and opts[:pathcount] = pathcount.to_i
106     end
108     opts[:noverify] = false == opts[:noverify] ? 0 : 1
109     @backend.respond_to?(:_get_paths) and return @backend._get_paths(opts)
110     res = @backend.get_paths(opts)
111     (1..res['paths'].to_i).map { |i| res["path#{i}"] }
112   end
114   # Returns +true+ if +key+ exists, +false+ if not
115   def exist?(key)
116     rv = nil
117     args = { :key => key, :domain => @domain }
118     @backend.pipeline_dispatch(:get_paths, args) { |x| rv = (Hash === x) }
119     @backend.pipeline_wait(1)
120     rv
121   end
123   # Get the URIs for +key+ (paths) as URI::HTTP objects
124   def get_uris(key, *args)
125     get_paths(key, *args).map! { |path| URI.parse(path) }
126   end
128   # Creates a new file +key+ in +klass+.  +bytes+ is currently unused.
129   # Consider using store_file instead of this method for large files.
130   # This requires a block passed to it and operates like File.open.
131   # This atomically replaces existing data stored as +key+ when
132   #
133   # +args+ may contain the following options:
134   # * :content_length => Integer
135   # * :largefile => [ :stream, :content_range, :tempfile ]
136   #    (see # MogileFS::NewFile)
137   # * :content_md5 => String, Proc, or :trailer
138   def new_file(key, args = nil, bytes = nil) # :yields: file
139     raise MogileFS::ReadOnlyError if readonly?
140     opts = { :key => key, :multi_dest => 1 }
141     case args
142     when Hash
143       opts[:domain] = args[:domain]
144       klass = args[:class] and "default" != klass and opts[:class] = klass
145     when String
146       opts[:class] = args if "default" != args
147     end
148     opts[:domain] ||= @domain
149     res = @backend.create_open(opts)
151     dests = if dev_count = res['dev_count'] # multi_dest succeeded
152       (1..dev_count.to_i).map { |i| [res["devid_#{i}"], res["path_#{i}"]] }
153     else # single destination returned
154       # 0x0040:  d0e4 4f4b 2064 6576 6964 3d31 2666 6964  ..OK.devid=1&fid
155       # 0x0050:  3d33 2670 6174 683d 6874 7470 3a2f 2f31  =3&path=http://1
156       # 0x0060:  3932 2e31 3638 2e31 2e37 323a 3735 3030  92.168.1.72:7500
157       # 0x0070:  2f64 6576 312f 302f 3030 302f 3030 302f  /dev1/0/000/000/
158       # 0x0080:  3030 3030 3030 3030 3033 2e66 6964 0d0a  0000000003.fid..
160       [[res['devid'], res['path']]]
161     end
163     opts.merge!(args) if Hash === args
164     opts[:backend] = @backend
165     opts[:fid] = res['fid']
166     opts[:content_length] ||= bytes if bytes
168     case (dests[0][1] rescue nil)
169     when %r{\Ahttp://}
170       http_file = MogileFS::NewFile[opts[:largefile]].new(dests, opts)
171       if block_given?
172         yield http_file
173         return http_file.commit # calls create_close
174       else
175         return http_file
176       end
177     when nil, ''
178       raise MogileFS::EmptyPathError,
179             "Empty path for mogile upload res=#{res.inspect}"
180     else
181       raise MogileFS::UnsupportedPathError,
182             "paths '#{dests.inspect}' returned by backend is not supported"
183     end
184   end
186   # Copies the contents of +file+ into +key+ in class +klass+.  +file+ can be
187   # either a path name (String or Pathname object) or an IO-like object that
188   # responds to #read or #readpartial.  Returns size of +file+ stored.
189   # This atomically replaces existing data stored as +key+
190   def store_file(key, klass, file, opts = nil)
191     raise MogileFS::ReadOnlyError if readonly?
192     (opts ||= {})[:class] = klass if String === klass
194     new_file(key, opts) { |mfp| mfp.big_io = file }
195   end
197   # Stores +content+ into +key+ in class +klass+, where +content+ is a String
198   # This atomically replaces existing data stored as +key+
199   def store_content(key, klass, content, opts = nil)
200     raise MogileFS::ReadOnlyError if readonly?
201     (opts ||= {})[:class] = klass if String === klass
203     new_file(key, opts) do |mfp|
204       if content.is_a?(MogileFS::Util::StoreContent)
205         mfp.streaming_io = content
206       else
207         mfp << content
208       end
209     end
210   end
212   # Removes +key+.
213   def delete(key)
214     raise MogileFS::ReadOnlyError if readonly?
216     @backend.delete :domain => @domain, :key => key
217     true
218   end
220   # Sleeps +duration+, only used for testing
221   def sleep(duration) # :nodoc:
222     @backend.sleep :duration => duration
223   end
225   # Renames a key +from+ to key +to+.
226   def rename(from, to)
227     raise MogileFS::ReadOnlyError if readonly?
229     @backend.rename :domain => @domain, :from_key => from, :to_key => to
230     nil
231   end
233   # Returns the size of +key+.
234   def size(key)
235     @backend.respond_to?(:_size) and return @backend._size(domain, key)
236     begin
237       file_info(key)["length"].to_i
238     rescue MogileFS::Backend::UnknownCommandError
239       paths_size(get_paths(key))
240     end
241   end
243   def paths_size(paths) # :nodoc:
244     require "mogilefs/paths_size"
245     MogileFS::PathsSize.call(paths)
246   end
248   # Lists keys starting with +prefix+ following +after+ up to +limit+.  If
249   # +after+ is nil the list starts at the beginning.
250   def list_keys(prefix = "", after = nil, limit = 1000, &block)
251     @backend.respond_to?(:_list_keys) and
252       return @backend._list_keys(domain, prefix, after, limit, &block)
254     begin
255       res = @backend.list_keys(:domain => domain, :prefix => prefix,
256                                :after => after, :limit => limit)
257     rescue MogileFS::Backend::NoneMatchError
258       return
259     end
261     keys = (1..res['key_count'].to_i).map { |i| res["key_#{i}"] }
262     if block
263       if 1 == block.arity
264         keys.each { |key| block.call(key) }
265       else
266         list_keys_verbose(keys, block)
267       end
268     end
270     [ keys, res['next_after'] ]
271   end
273   def list_keys_verbose(keys, block) # :nodoc:
274     # emulate the MogileFS::Mysql interface, slowly...
275     ordered = keys.dup
276     ready = {}
277     on_file_info = lambda do |info|
278       Hash === info or raise info
279       file_info_cleanup(info)
281       # deal with trackers with multiple queryworkers responding out-of-order
282       ready[info["key"]] = info
283       while info = ready.delete(ordered[0])
284         block.call(ordered.shift, info["length"], info["devcount"])
285       end
286     end
287     opts = { :domain => @domain }
288     begin
289       keys.each do |key|
290         opts[:key] = key
291         @backend.pipeline_dispatch(:file_info, opts, &on_file_info)
292       end
293       @backend.pipeline_wait
294     rescue MogileFS::Backend::UnknownCommandError # MogileFS < 2.45
295       @backend.shutdown # reset the socket
296       args = { :pathcount => 0x7fffffff }
297       keys.each do |key|
298         paths = get_paths(key, args)
299         block.call(key, paths_size(paths), paths.size)
300       end
301     rescue MogileFS::PipelineError, SystemCallError,
302            MogileFS::RequestTruncatedError,
303            MogileFS::UnreadableSocketError,
304            MogileFS::InvalidResponseError, # truncated response
305            MogileFS::Timeout
306       @backend.shutdown
307       keys = ordered - ready.keys
308       retry
309     rescue
310       @backend.shutdown
311       raise
312     end
313   end
315   # Return metadata about a file as a hash.
316   # Returns the domain, class, length, devcount, etc. as keys.
317   # Optionally, device ids (not paths) can be returned as
318   # well if :devices is specified and +true+.
319   #
320   # This should only be used for informational purposes, and not usually
321   # for dynamically serving files.
322   #
323   #   mg.file_info("bar")
324   #
325   # Returns:
326   #
327   #   {
328   #     "domain" => "foo",
329   #     "key" => "bar",
330   #     "class" => "default",
331   #     "devcount" => 2,
332   #     "length => 666
333   #   }
334   def file_info(key, args = nil)
335     opts = { :domain => @domain, :key => key }
336     args and devices = args[:devices] and opts[:devices] = devices ? 1 : 0
337     file_info_cleanup(@backend.file_info(opts))
338   end
340   def file_info_cleanup(rv) # :nodoc:
341     %w(fid length devcount).each { |f| rv[f] = rv[f].to_i }
342     devids = rv["devids"] and
343       rv["devids"] = devids.split(/,/).map! { |x| x.to_i }
344     rv
345   end
347   # Given an Integer +fid+ or String +key+ and domain, thorougly search
348   # the database for all occurences of a particular fid.
349   #
350   # Use this sparingly, this command hits the master database numerous
351   # times and is very expensive.  This is not for production use, only
352   # troubleshooting and debugging.
353   #
354   # Searches for fid=666:
355   #
356   #   client.file_debug(666)
357   #
358   # Search for key=foo using the default domain for this object:
359   #
360   #   client.file_debug("foo")
361   #
362   # Search for key=foo in domain="bar":
363   #
364   #   client.file_debug(:key => "foo", :domain => "bar")
365   #
366   def file_debug(args)
367     case args
368     when Integer then args = { "fid" => args }
369     when String then args = { "key" => args }
370     end
371     opts = { :domain => args[:domain] || @domain }.merge!(args)
373     rv = @backend.file_debug(opts)
374     rv.each do |k,v|
375       case k
376       when /_(?:classid|devcount|dmid|fid|length|
377             nexttry|fromdevid|failcount|flags|devid|type)\z/x
378         rv[k] = v.to_i
379       when /devids\z/
380         rv[k] = v.split(/,/).map! { |x| x.to_i }
381       end
382     end
383   end