1 # -*- encoding: binary -*-
3 # \MogileFS file manipulation client.
5 # Create a new instance that will communicate with these trackers:
6 # hosts = %w[192.168.1.69:6001 192.168.1.70:6001]
7 # mg = MogileFS::MogileFS.new(:domain => 'test', :hosts => hosts)
9 # # Stores "A bunch of text to store" into 'some_key' with a class of 'text'.
10 # mg.store_content('some_key', 'text', "A bunch of text to store")
12 # # Retrieve data from 'some_key' as a string
13 # data = mg.get_file_data('some_key')
15 # # Store the contents of 'image.jpeg' into the key 'my_image' with a
17 # mg.store_file('my_image', 'image', 'image.jpeg')
19 # # Store the contents of 'image.jpeg' into the key 'my_image' with a
20 # # class of 'image' using an open IO object.
21 # File.open('image.jpeg') { |fp| mg.store_file('my_image', 'image', fp) }
23 # # Retrieve the contents of 'my_image' into '/path/to/huge_file'
24 # # without slurping the entire contents into memory:
25 # mg.get_file_data('my_image', '/path/to/huge_file')
27 # # Remove the key 'my_image' and 'some_key'.
28 # mg.delete('my_image')
29 # mg.delete('some_key')
31 class MogileFS::MogileFS < MogileFS::Client
32 include MogileFS::Bigfile
34 # The domain of keys for this MogileFS client.
37 # The timeout for get_file_data (per-read() system call).
38 # Defaults to five seconds.
39 attr_accessor :get_file_data_timeout
41 # The maximum allowed time for creating a new_file. Defaults to 1 hour.
42 attr_accessor :new_file_max_time
44 # Creates a new MogileFS::MogileFS instance. +args+ must include a key
45 # :domain specifying the domain of this client.
47 # Optional parameters for +args+:
49 # [:get_file_data_timeout => Numeric]
51 # See get_file_data_timeout
53 # [:new_file_max_time => Numeric]
55 # See new_file_max_time
57 # [:fail_timeout => Numeric]
59 # Delay before retrying a failed tracker backends.
60 # Defaults to 5 seconds.
62 # [:timeout => Numeric]
64 # Timeout for tracker backend responses.
65 # Defaults to 3 seconds.
67 # [:connect_timeout => Integer]
69 # Timeout for connecting to a tracker
70 # Defaults to 3 seconds
71 def initialize(args = {})
72 @domain = args[:domain]
74 @get_file_data_timeout = args[:get_file_data_timeout] || 5
75 @new_file_max_time = args[:new_file_max_time] || 3600.0
76 @nhp_get = nhp_new('get')
77 @nhp_get.open_timeout = @nhp_get.read_timeout = @get_file_data_timeout
78 @nhp_put = nhp_new('put')
79 @nhp_put.open_timeout = @nhp_put.read_timeout = @new_file_max_time
81 raise ArgumentError, "you must specify a domain" unless @domain
83 if @backend = args[:db_backend]
90 # Enumerates keys, limited by optional +prefix+
91 # +args+ may also be specified for an optional +:limit+
92 # and +:after+ (default: nil)
93 def each_key(prefix = "", args = nil, &block)
100 keys, after = list_keys(prefix, after, limit || 1000, &block)
101 return unless keys && keys[0]
102 limit -= keys.size if limit
103 end while limit == nil || limit > 0
107 # Enumerates keys and yields a +file_info+ hash for each key matched by
109 def each_file_info(prefix = "", args = nil)
110 # FIXME: there's a lot of duplicate code from list_keys_verbose here...
111 raise ArgumentError, "need block" unless block_given?
112 ordered = ready = nil
113 on_file_info = lambda do |info|
114 Hash === info or raise info
115 file_info_cleanup(info)
117 # deal with trackers with multiple queryworkers responding out-of-order
118 ready[info["key"]] = info
119 while info = ready.delete(ordered[-1])
126 opts = { :domain => @domain }
127 opts[:devices] = 1 if args && args[:devices]
128 after = args ? args[:after] : nil
129 limit = args ? args[:limit] : nil
132 keys, after = list_keys(prefix, after, limit || 1000)
133 return nr unless keys && keys[0]
134 ordered = keys.reverse
137 limit -= keys.size if limit
141 @backend.pipeline_dispatch(:file_info, opts, &on_file_info)
143 @backend.pipeline_wait
144 rescue MogileFS::PipelineError, SystemCallError,
145 MogileFS::RequestTruncatedError,
146 MogileFS::UnreadableSocketError,
147 MogileFS::InvalidResponseError, # truncated response
150 keys = (ordered - ready.keys).reverse!
152 end while limit == nil || limit > 0
158 # Retrieves the contents of +key+. If +dst+ is specified, +dst+
159 # should be an IO-like object capable of receiving the +write+ method
160 # or a path name. +copy_length+ may be specified to limit the number of
161 # bytes to retrieve, and +src_offset+ can be specified to specified the
162 # start position of the copy.
163 def get_file_data(key, dst = nil, copy_length = nil, src_offset = nil)
164 paths = get_paths(key)
165 if src_offset || copy_length
167 range_end = copy_length ? src_offset + copy_length - 1 : nil
168 range = [ src_offset, range_end ]
172 sock = MogileFS::HTTPReader.first(paths, @get_file_data_timeout, range)
175 sock = MogileFS::HTTPReader.first(paths, @get_file_data_timeout, range)
180 uri = URI.parse(path)
181 get = Net::HTTP::Get.new(uri.path)
182 get["range"] = "bytes=#{range[0]}-#{range[1]}" if range
184 res = @nhp_get.request(uri, get)
189 (errors ||= []) << "#{path} - #{res.message} (#{res.class})"
191 (errors ||= []) << "#{path} - #{e.message} (#{e.class})"
194 raise MogileFS::Error,
195 "all paths failed with GET: #{errors.join(', ')}", []
198 sock.close if sock && ! sock.closed?
201 # Get the paths (URLs as strings) for +key+. If +args+ is specified,
203 # - :noverify -> boolean, whether or not the tracker checks (default: true)
204 # - :pathcount -> a positive integer of URLs to retrieve (default: 2)
205 # - :zone -> "alt" or nil (default: nil)
207 # :noverify defaults to true because this client library is capable of
208 # verifying paths for readability itself. It is also faster and more
209 # reliable to verify paths on the client.
210 def get_paths(key, *args)
214 :noverify => args[0],
219 opts[:noverify] = args[:noverify]
220 zone = args[:zone] and opts[:zone] = zone
221 pathcount = args[:pathcount] and opts[:pathcount] = pathcount.to_i
224 opts[:noverify] = false == opts[:noverify] ? 0 : 1
225 @backend.respond_to?(:_get_paths) and return @backend._get_paths(opts)
226 res = @backend.get_paths(opts)
227 (1..res['paths'].to_i).map { |i| res["path#{i}"] }
230 # Returns +true+ if +key+ exists, +false+ if not
232 args = { :key => key, :domain => @domain , :ruby_no_raise => true}
233 case rv = @backend.get_paths(args)
236 when MogileFS::Backend::UnknownKeyError
243 # Get the URIs for +key+ (paths) as URI::HTTP objects
244 def get_uris(key, *args)
245 get_paths(key, *args).map! { |path| URI.parse(path) }
248 # Creates a new file +key+ in the domain of this object.
250 # +bytes+ is the expected size of the file if known in advance
252 # It operates like File.open(..., "w") and may take an optional
253 # block, yielding an IO-like object with support for the methods
254 # documented in MogileFS::NewFile::Writer.
256 # This atomically replaces existing data stored as +key+
257 # when the block exits or when the returned object is closed.
259 # +args+ may contain the following options:
261 # [:content_length => Integer]
263 # This has the same effect as the (deprecated) +bytes+ parameter.
265 # [ :largefile => :stream, :content_range or :tempfile ]
267 # See MogileFS::NewFile for more information on this
269 # [ :class => String]
271 # The MogileFS storage class of the object.
273 # [:content_md5 => String, Proc, or :trailer]
275 # This can either be a Base64-encoded String, a Proc object, or
276 # the :trailer symbol. If given a String, it will be used as the
277 # Content-MD5 HTTP header. If given the :trailer symbol, this library
278 # will automatically generate an Content-MD5 HTTP trailer. If given
279 # a Proc object, this Proc object should give a Base64-encoded string
280 # which can be used as the Content-MD5 HTTP trailer when called at the
281 # end of the request.
283 # Keep in mind most HTTP servers do not support HTTP trailers, so
284 # passing a String is usually the safest way to use this.
288 # This is an empty hash that will be filled the same information
289 # MogileFS::MogileFS#file_info.
291 # Additionally, it contains one additional key: :uris,
292 # an array of URI::HTTP objects to the stored destinations
293 def new_file(key, args = nil, bytes = nil) # :yields: file
294 raise MogileFS::ReadOnlyError if readonly?
295 opts = { :key => key, :multi_dest => 1 }
298 opts[:domain] = args[:domain]
299 open_args = args[:create_open_args]
300 klass = args[:class] and "default" != klass and opts[:class] = klass
302 opts[:class] = args if "default" != args
304 opts[:domain] ||= @domain
305 res = @backend.create_open(open_args ? open_args.merge(opts) : opts)
306 opts[:nhp_put] = @nhp_put
308 dests = if dev_count = res['dev_count'] # multi_dest succeeded
309 (1..dev_count.to_i).map { |i| [res["devid_#{i}"], res["path_#{i}"]] }
310 else # single destination returned
311 # 0x0040: d0e4 4f4b 2064 6576 6964 3d31 2666 6964 ..OK.devid=1&fid
312 # 0x0050: 3d33 2670 6174 683d 6874 7470 3a2f 2f31 =3&path=http://1
313 # 0x0060: 3932 2e31 3638 2e31 2e37 323a 3735 3030 92.168.1.72:7500
314 # 0x0070: 2f64 6576 312f 302f 3030 302f 3030 302f /dev1/0/000/000/
315 # 0x0080: 3030 3030 3030 3030 3033 2e66 6964 0d0a 0000000003.fid..
317 [[res['devid'], res['path']]]
320 opts.merge!(args) if Hash === args
321 opts[:backend] = @backend
322 opts[:fid] = res['fid']
323 opts[:content_length] ||= bytes if bytes
324 opts[:new_file_max_time] ||= @new_file_max_time
325 opts[:start_time] = MogileFS.now
326 info = opts[:info] and info["class"] = klass || "default"
328 case (dests[0][1] rescue nil)
330 http_file = MogileFS::NewFile.new(dests, opts)
333 return http_file.commit # calls create_close
338 raise MogileFS::EmptyPathError,
339 "Empty path for mogile upload res=#{res.inspect}"
341 raise MogileFS::UnsupportedPathError,
342 "paths '#{dests.inspect}' returned by backend is not supported"
346 # Copies the contents of +file+ into +key+ in class +klass+. +file+ can be
347 # either a path name (String or Pathname object) or an IO-like object that
348 # responds to #read or #readpartial. Returns size of +file+ stored.
349 # This atomically replaces existing data stored as +key+
350 def store_file(key, klass, file, opts = nil)
351 raise MogileFS::ReadOnlyError if readonly?
352 (opts ||= {})[:class] = klass if String === klass
354 new_file(key, opts) { |mfp| mfp.big_io = file }
357 # Stores +content+ into +key+ in class +klass+, where +content+ is a String
358 # This atomically replaces existing data stored as +key+
359 def store_content(key, klass, content, opts = nil)
360 raise MogileFS::ReadOnlyError if readonly?
361 (opts ||= {})[:class] = klass if String === klass
363 new_file(key, opts) do |mfp|
364 if content.is_a?(MogileFS::Util::StoreContent)
365 mfp.streaming_io = content
374 raise MogileFS::ReadOnlyError if readonly?
376 @backend.delete :domain => @domain, :key => key
380 # Updates +key+ to +newclass+
381 def updateclass(key, newclass)
382 raise MogileFS::ReadOnlyError if readonly?
384 @backend.updateclass(:domain => @domain, :key => key, :class => newclass)
388 # Sleeps +duration+, only used for testing
389 def sleep(duration) # :nodoc:
390 @backend.sleep :duration => duration
393 # Renames a key +from+ to key +to+.
395 raise MogileFS::ReadOnlyError if readonly?
397 @backend.rename :domain => @domain, :from_key => from, :to_key => to
401 # Returns the size of +key+.
403 @backend.respond_to?(:_size) and return @backend._size(domain, key)
405 file_info(key)["length"].to_i
406 rescue MogileFS::Backend::UnknownCommandError
407 paths_size(get_paths(key))
411 def paths_size(paths) # :nodoc:
412 require "mogilefs/paths_size"
413 MogileFS::PathsSize.call(paths)
416 # Lists keys starting with +prefix+ following +after+ up to +limit+. If
417 # +after+ is nil the list starts at the beginning.
418 def list_keys(prefix = "", after = nil, limit = 1000, &block)
419 @backend.respond_to?(:_list_keys) and
420 return @backend._list_keys(domain, prefix, after, limit, &block)
422 res = @backend.list_keys(:domain => domain, :prefix => prefix,
423 :after => after, :limit => limit,
424 :ruby_no_raise => true)
425 MogileFS::Backend::NoneMatchError === res and return
426 raise res if MogileFS::Error === res
428 keys = (1..res['key_count'].to_i).map { |i| res["key_#{i}"] }
431 keys.each { |key| block.call(key) }
433 list_keys_verbose(keys, block)
437 [ keys, res['next_after'] ]
440 def list_keys_verbose(keys, block) # :nodoc:
441 # emulate the MogileFS::Mysql interface, slowly...
442 ordered = keys.reverse
444 on_file_info = lambda do |info|
445 Hash === info or raise info
446 file_info_cleanup(info)
448 # deal with trackers with multiple queryworkers responding out-of-order
449 ready[info["key"]] = info
450 while info = ready.delete(ordered[-1])
451 block.call(ordered.pop, info["length"], info["devcount"])
454 opts = { :domain => @domain }
458 @backend.pipeline_dispatch(:file_info, opts, &on_file_info)
460 @backend.pipeline_wait
461 rescue MogileFS::Backend::UnknownCommandError # MogileFS < 2.45
462 @backend.shutdown # reset the socket
463 args = { :pathcount => 0x7fffffff }
465 paths = get_paths(key, args)
466 block.call(key, paths_size(paths), paths.size)
468 rescue MogileFS::PipelineError, SystemCallError,
469 MogileFS::RequestTruncatedError,
470 MogileFS::UnreadableSocketError,
471 MogileFS::InvalidResponseError, # truncated response
474 keys = (ordered - ready.keys).reverse!
482 # Return metadata about a file as a hash.
483 # Returns the domain, class, length, devcount, etc. as keys.
484 # Optionally, device ids (not paths) can be returned as
485 # well if :devices is specified and +true+.
487 # This should only be used for informational purposes, and not usually
488 # for dynamically serving files.
490 # mg.file_info("bar")
497 # "class" => "default",
501 def file_info(key, args = nil)
502 opts = { :domain => @domain, :key => key }
503 args and devices = args[:devices] and opts[:devices] = devices ? 1 : 0
504 file_info_cleanup(@backend.file_info(opts))
507 def file_info_cleanup(rv) # :nodoc:
508 %w(fid length devcount).each { |f| rv[f] = rv[f].to_i }
509 devids = rv["devids"] and
510 rv["devids"] = devids.split(','.freeze).map! { |x| x.to_i }
514 # Given an Integer +fid+ or String +key+ and domain, thorougly search
515 # the database for all occurences of a particular fid.
517 # Use this sparingly, this command hits the master database numerous
518 # times and is very expensive. This is not for production use, only
519 # troubleshooting and debugging.
521 # Searches for fid=666:
523 # client.file_debug(666)
525 # Search for key=foo using the default domain for this object:
527 # client.file_debug("foo")
529 # Search for key=foo in domain="bar":
531 # client.file_debug(:key => "foo", :domain => "bar")
535 when Integer then args = { "fid" => args }
536 when String then args = { "key" => args }
538 opts = { :domain => args[:domain] || @domain }.merge!(args)
540 rv = @backend.file_debug(opts)
543 when /_(?:classid|devcount|dmid|fid|length|
544 nexttry|fromdevid|failcount|flags|devid|type)\z/x
547 rv[k] = v.split(','.freeze).map! { |x| x.to_i }
552 def nhp_new(name) # :nodoc:
553 if Net::HTTP::Persistent::VERSION.to_f >= 3.0
554 MogileFS::NHP.new(:name => name)
556 MogileFS::NHP.new(name)
559 MogileFS::NHP.new(name)