1 # -*- encoding: binary -*-
3 # \MogileFS file manipulation client.
5 # Create a new instance that will communicate with these trackers:
6 # hosts = %w[192.168.1.69:6001 192.168.1.70:6001]
7 # mg = MogileFS::MogileFS.new(:domain => 'test', :hosts => hosts)
9 # # Stores "A bunch of text to store" into 'some_key' with a class of 'text'.
10 # mg.store_content('some_key', 'text', "A bunch of text to store")
12 # # Retrieve data from 'some_key' as a string
13 # data = mg.get_file_data('some_key')
15 # # Store the contents of 'image.jpeg' into the key 'my_image' with a
17 # mg.store_file('my_image', 'image', 'image.jpeg')
19 # # Store the contents of 'image.jpeg' into the key 'my_image' with a
20 # # class of 'image' using an open IO object.
21 # File.open('image.jpeg') { |fp| mg.store_file('my_image', 'image', fp) }
23 # # Retrieve the contents of 'my_image' into '/path/to/huge_file'
24 # # without slurping the entire contents into memory:
25 # mg.get_file_data('my_image', '/path/to/huge_file')
27 # # Remove the key 'my_image' and 'some_key'.
28 # mg.delete('my_image')
29 # mg.delete('some_key')
31 class MogileFS::MogileFS < MogileFS::Client
32 include MogileFS::Bigfile
34 # The domain of keys for this MogileFS client.
37 # The timeout for get_file_data. Defaults to five seconds.
38 attr_accessor :get_file_data_timeout
40 # Creates a new MogileFS::MogileFS instance. +args+ must include a key
41 # :domain specifying the domain of this client.
42 def initialize(args = {})
43 @domain = args[:domain]
45 @get_file_data_timeout = 5
47 raise ArgumentError, "you must specify a domain" unless @domain
49 if @backend = args[:db_backend]
56 # Enumerates keys, limited by optional +prefix+
57 def each_key(prefix = "", &block)
60 keys, after = list_keys(prefix, after, 1000, &block)
61 end while keys && keys[0]
65 # Retrieves the contents of +key+. If +dst+ is specified, +dst+
66 # should be an IO-like object capable of receiving the +write+ method
67 # or a path name. +copy_length+ may be specified to limit the number of
68 # bytes to retrieve, and +src_offset+ can be specified to specified the
69 # start position of the copy.
70 def get_file_data(key, dst = nil, copy_length = nil, src_offset = nil)
71 paths = get_paths(key)
72 sock = MogileFS::HTTPReader.first(paths, @get_file_data_timeout,
73 copy_length, src_offset)
82 sock.close if sock && ! sock.closed?
85 # Get the paths (URLs as strings) for +key+. If +args+ is specified,
87 # - :noverify -> boolean, whether or not the tracker checks (default: true)
88 # - :pathcount -> a positive integer of URLs to retrieve (default: 2)
89 # - :zone -> "alt" or nil (default: nil)
91 # :noverify defaults to false because this client library is capable of
92 # verifying paths for readability itself. It is also faster and more
93 # reliable to verify paths on the client.
94 def get_paths(key, *args)
103 opts[:noverify] = args[:noverify]
104 opts[:zone] = args[:zone]
105 pathcount = args[:pathcount] and opts[:pathcount] = pathcount.to_i
108 opts[:noverify] = false == opts[:noverify] ? 0 : 1
109 @backend.respond_to?(:_get_paths) and return @backend._get_paths(opts)
110 res = @backend.get_paths(opts)
111 (1..res['paths'].to_i).map { |i| res["path#{i}"] }
114 # Returns +true+ if +key+ exists, +false+ if not
117 args = { :key => key, :domain => @domain }
118 @backend.pipeline_dispatch(:get_paths, args) { |x| rv = (Hash === x) }
119 @backend.pipeline_wait(1)
123 # Get the URIs for +key+ (paths) as URI::HTTP objects
124 def get_uris(key, *args)
125 get_paths(key, *args).map! { |path| URI.parse(path) }
128 # Creates a new file +key+ in +klass+. +bytes+ is currently unused.
129 # Consider using store_file instead of this method for large files.
130 # This requires a block passed to it and operates like File.open.
131 # This atomically replaces existing data stored as +key+ when
133 # +args+ may contain the following options:
134 # * :content_length => Integer
135 # * :largefile => [ :stream, :content_range, :tempfile ]
136 # (see # MogileFS::NewFile)
137 # * :content_md5 => String, Proc, or :trailer
138 def new_file(key, args = nil, bytes = nil) # :yields: file
139 raise MogileFS::ReadOnlyError if readonly?
140 opts = { :key => key, :multi_dest => 1 }
143 opts[:domain] = args[:domain]
144 klass = args[:class] and "default" != klass and opts[:class] = klass
146 opts[:class] = args if "default" != args
148 opts[:domain] ||= @domain
149 res = @backend.create_open(opts)
151 dests = if dev_count = res['dev_count'] # multi_dest succeeded
152 (1..dev_count.to_i).map { |i| [res["devid_#{i}"], res["path_#{i}"]] }
153 else # single destination returned
154 # 0x0040: d0e4 4f4b 2064 6576 6964 3d31 2666 6964 ..OK.devid=1&fid
155 # 0x0050: 3d33 2670 6174 683d 6874 7470 3a2f 2f31 =3&path=http://1
156 # 0x0060: 3932 2e31 3638 2e31 2e37 323a 3735 3030 92.168.1.72:7500
157 # 0x0070: 2f64 6576 312f 302f 3030 302f 3030 302f /dev1/0/000/000/
158 # 0x0080: 3030 3030 3030 3030 3033 2e66 6964 0d0a 0000000003.fid..
160 [[res['devid'], res['path']]]
163 opts.merge!(args) if Hash === args
164 opts[:backend] = @backend
165 opts[:fid] = res['fid']
166 opts[:content_length] ||= bytes if bytes
168 case (dests[0][1] rescue nil)
170 http_file = MogileFS::NewFile[opts[:largefile]].new(dests, opts)
173 return http_file.commit # calls create_close
178 raise MogileFS::EmptyPathError,
179 "Empty path for mogile upload res=#{res.inspect}"
181 raise MogileFS::UnsupportedPathError,
182 "paths '#{dests.inspect}' returned by backend is not supported"
186 # Copies the contents of +file+ into +key+ in class +klass+. +file+ can be
187 # either a path name (String or Pathname object) or an IO-like object that
188 # responds to #read or #readpartial. Returns size of +file+ stored.
189 # This atomically replaces existing data stored as +key+
190 def store_file(key, klass, file, opts = nil)
191 raise MogileFS::ReadOnlyError if readonly?
192 (opts ||= {})[:class] = klass if String === klass
194 new_file(key, opts) { |mfp| mfp.big_io = file }
197 # Stores +content+ into +key+ in class +klass+, where +content+ is a String
198 # This atomically replaces existing data stored as +key+
199 def store_content(key, klass, content, opts = nil)
200 raise MogileFS::ReadOnlyError if readonly?
201 (opts ||= {})[:class] = klass if String === klass
203 new_file(key, opts) do |mfp|
204 if content.is_a?(MogileFS::Util::StoreContent)
205 mfp.streaming_io = content
214 raise MogileFS::ReadOnlyError if readonly?
216 @backend.delete :domain => @domain, :key => key
220 # Sleeps +duration+, only used for testing
221 def sleep(duration) # :nodoc:
222 @backend.sleep :duration => duration
225 # Renames a key +from+ to key +to+.
227 raise MogileFS::ReadOnlyError if readonly?
229 @backend.rename :domain => @domain, :from_key => from, :to_key => to
233 # Returns the size of +key+.
235 @backend.respond_to?(:_size) and return @backend._size(domain, key)
237 file_info(key)["length"].to_i
238 rescue MogileFS::Backend::UnknownCommandError
239 paths_size(get_paths(key))
243 def paths_size(paths) # :nodoc:
244 require "mogilefs/paths_size"
245 MogileFS::PathsSize.call(paths)
248 # Lists keys starting with +prefix+ following +after+ up to +limit+. If
249 # +after+ is nil the list starts at the beginning.
250 def list_keys(prefix = "", after = nil, limit = 1000, &block)
251 @backend.respond_to?(:_list_keys) and
252 return @backend._list_keys(domain, prefix, after, limit, &block)
255 res = @backend.list_keys(:domain => domain, :prefix => prefix,
256 :after => after, :limit => limit)
257 rescue MogileFS::Backend::NoneMatchError
261 keys = (1..res['key_count'].to_i).map { |i| res["key_#{i}"] }
264 keys.each { |key| block.call(key) }
266 list_keys_verbose(keys, block)
270 [ keys, res['next_after'] ]
273 def list_keys_verbose(keys, block) # :nodoc:
274 # emulate the MogileFS::Mysql interface, slowly...
277 on_file_info = lambda do |info|
278 Hash === info or raise info
279 file_info_cleanup(info)
281 # deal with trackers with multiple queryworkers responding out-of-order
282 ready[info["key"]] = info
283 while info = ready.delete(ordered[0])
284 block.call(ordered.shift, info["length"], info["devcount"])
287 opts = { :domain => @domain }
291 @backend.pipeline_dispatch(:file_info, opts, &on_file_info)
293 @backend.pipeline_wait
294 rescue MogileFS::Backend::UnknownCommandError # MogileFS < 2.45
295 @backend.shutdown # reset the socket
296 args = { :pathcount => 0x7fffffff }
298 paths = get_paths(key, args)
299 block.call(key, paths_size(paths), paths.size)
301 rescue MogileFS::PipelineError, SystemCallError,
302 MogileFS::RequestTruncatedError,
303 MogileFS::UnreadableSocketError,
304 MogileFS::InvalidResponseError, # truncated response
307 keys = ordered - ready.keys
315 # Return metadata about a file as a hash.
316 # Returns the domain, class, length, devcount, etc. as keys.
317 # Optionally, device ids (not paths) can be returned as
318 # well if :devices is specified and +true+.
320 # This should only be used for informational purposes, and not usually
321 # for dynamically serving files.
323 # mg.file_info("bar")
330 # "class" => "default",
334 def file_info(key, args = nil)
335 opts = { :domain => @domain, :key => key }
336 args and devices = args[:devices] and opts[:devices] = devices ? 1 : 0
337 file_info_cleanup(@backend.file_info(opts))
340 def file_info_cleanup(rv) # :nodoc:
341 %w(fid length devcount).each { |f| rv[f] = rv[f].to_i }
342 devids = rv["devids"] and
343 rv["devids"] = devids.split(/,/).map! { |x| x.to_i }
347 # Given an Integer +fid+ or String +key+ and domain, thorougly search
348 # the database for all occurences of a particular fid.
350 # Use this sparingly, this command hits the master database numerous
351 # times and is very expensive. This is not for production use, only
352 # troubleshooting and debugging.
354 # Searches for fid=666:
356 # client.file_debug(666)
358 # Search for key=foo using the default domain for this object:
360 # client.file_debug("foo")
362 # Search for key=foo in domain="bar":
364 # client.file_debug(:key => "foo", :domain => "bar")
368 when Integer then args = { "fid" => args }
369 when String then args = { "key" => args }
371 opts = { :domain => args[:domain] || @domain }.merge!(args)
373 rv = @backend.file_debug(opts)
376 when /_(?:classid|devcount|dmid|fid|length|
377 nexttry|fromdevid|failcount|flags|devid|type)\z/x
380 rv[k] = v.split(/,/).map! { |x| x.to_i }