1 require 'mogilefs/client'
2 require 'mogilefs/util'
5 # MogileFS File manipulation client.
7 class MogileFS::MogileFS < MogileFS::Client
10 include MogileFS::Bigfile
13 # The domain of keys for this MogileFS client.
18 # The timeout for get_file_data. Defaults to five seconds.
20 attr_accessor :get_file_data_timeout
23 # internal Regexp for matching an "HTTP 200 OK" head response
24 HTTP_200_OK = %r{\AHTTP/\d+\.\d+\s+200\s+}.freeze
27 # Creates a new MogileFS::MogileFS instance. +args+ must include a key
28 # :domain specifying the domain of this client.
30 def initialize(args = {})
31 @domain = args[:domain]
33 @get_file_data_timeout = 5
35 raise ArgumentError, "you must specify a domain" unless @domain
37 if @backend = args[:db_backend]
45 # Enumerates keys starting with +key+.
50 keys, after = list_keys prefix
52 until keys.nil? or keys.empty? do
53 keys.each { |k| yield k }
54 keys, after = list_keys prefix, after
61 # Retrieves the contents of +key+.
63 def get_file_data(key, &block)
66 return nil unless paths
71 when /^http:\/\// then
73 sock = http_get_sock(URI.parse(path))
74 return yield(sock) if block_given?
75 return sysread_full(sock, sock.mogilefs_size, @get_file_data_timeout)
76 rescue MogileFS::Timeout, Errno::ECONNREFUSED,
77 EOFError, SystemCallError, MogileFS::InvalidResponseError
81 next unless File.exist? path
82 return File.read(path)
90 # Get the paths for +key+.
92 def get_paths(key, noverify = true, zone = nil)
93 opts = { :domain => @domain, :key => key,
94 :noverify => noverify ? 1 : 0, :zone => zone }
95 @backend.respond_to?(:_get_paths) and return @backend._get_paths(opts)
96 res = @backend.get_paths(opts)
97 (1..res['paths'].to_i).map { |i| res["path#{i}"] }
101 # Creates a new file +key+ in +klass+. +bytes+ is currently unused.
103 # The +block+ operates like File.open.
105 def new_file(key, klass = nil, bytes = 0, &block) # :yields: file
106 raise MogileFS::ReadOnlyError if readonly?
107 opts = { :domain => @domain, :key => key, :multi_dest => 1 }
108 opts[:class] = klass if klass
109 res = @backend.create_open(opts)
111 dests = if dev_count = res['dev_count'] # multi_dest succeeded
112 (1..dev_count.to_i).map do |i|
113 [res["devid_#{i}"], res["path_#{i}"]]
115 else # single destination returned
116 # 0x0040: d0e4 4f4b 2064 6576 6964 3d31 2666 6964 ..OK.devid=1&fid
117 # 0x0050: 3d33 2670 6174 683d 6874 7470 3a2f 2f31 =3&path=http://1
118 # 0x0060: 3932 2e31 3638 2e31 2e37 323a 3735 3030 92.168.1.72:7500
119 # 0x0070: 2f64 6576 312f 302f 3030 302f 3030 302f /dev1/0/000/000/
120 # 0x0080: 3030 3030 3030 3030 3033 2e66 6964 0d0a 0000000003.fid..
122 [[res['devid'], res['path']]]
125 case (dests[0][1] rescue nil)
127 raise MogileFS::EmptyPathError
128 when /^http:\/\// then
129 MogileFS::HTTPFile.open(self, res['fid'], klass, key,
130 dests, bytes, &block)
132 raise MogileFS::UnsupportedPathError,
133 "paths '#{dests.inspect}' returned by backend is not supported"
138 # Copies the contents of +file+ into +key+ in class +klass+. +file+ can be
139 # either a file name or an object that responds to #read.
141 def store_file(key, klass, file)
142 raise MogileFS::ReadOnlyError if readonly?
144 new_file key, klass do |mfp|
145 if file.respond_to? :sysread then
146 return sysrwloop(file, mfp)
148 if File.size(file) > 0x10000 # Bigass file, handle differently
152 return File.open(file, "rb") { |fp| sysrwloop(fp, mfp) }
159 # Stores +content+ into +key+ in class +klass+.
161 def store_content(key, klass, content)
162 raise MogileFS::ReadOnlyError if readonly?
164 new_file key, klass do |mfp|
165 if content.is_a?(MogileFS::Util::StoreContent)
166 mfp.streaming_io = content
179 raise MogileFS::ReadOnlyError if readonly?
181 @backend.delete :domain => @domain, :key => key
188 @backend.sleep :duration => duration
192 # Renames a key +from+ to key +to+.
195 raise MogileFS::ReadOnlyError if readonly?
197 @backend.rename :domain => @domain, :from_key => from, :to_key => to
202 # Returns the size of +key+.
204 @backend.respond_to?(:_size) and return @backend._size(domain, key)
205 paths = get_paths(key) or return nil
209 def paths_size(paths)
213 when /^http:\/\// then
216 s = Socket.mogilefs_new_request(url.host, url.port,
217 "HEAD #{url.request_uri} HTTP/1.0\r\n\r\n",
218 @get_file_data_timeout)
219 res = s.recv(4096, 0)
220 if res =~ HTTP_200_OK
221 head, body = res.split(/\r\n\r\n/, 2)
222 if head =~ /^Content-Length:\s*(\d+)/i
227 rescue MogileFS::Timeout, Errno::ECONNREFUSED,
228 EOFError, SystemCallError
234 next unless File.exist? path
235 return File.size(path)
243 # Lists keys starting with +prefix+ follwing +after+ up to +limit+. If
244 # +after+ is nil the list starts at the beginning.
246 def list_keys(prefix, after = nil, limit = 1000, &block)
247 if @backend.respond_to?(:_list_keys)
248 return @backend._list_keys(domain, prefix, after, limit, &block)
252 @backend.list_keys(:domain => domain, :prefix => prefix,
253 :after => after, :limit => limit)
254 rescue MogileFS::Backend::NoneMatchError
258 keys = (1..res['key_count'].to_i).map { |i| res["key_#{i}"] }
260 # emulate the MogileFS::Mysql interface, slowly...
262 paths = get_paths(key) or next
263 length = paths_size(paths) or next
264 yield key, length, paths.size
268 [ keys, res['next_after'] ]
273 # given a URI, this returns a readable socket with ready data from the
274 # body of the response.
275 def http_get_sock(uri)
276 sock = Socket.mogilefs_new_request(uri.host, uri.port,
277 "GET #{uri.request_uri} HTTP/1.0\r\n\r\n",
278 @get_file_data_timeout)
279 buf = sock.recv(4096, Socket::MSG_PEEK)
280 head, body = buf.split(/\r\n\r\n/, 2)
281 if head =~ HTTP_200_OK
282 sock.mogilefs_size = head[/^Content-Length:\s*(\d+)/i, 1].to_i
283 sock.recv(head.size + 4, 0)
286 raise MogileFS::InvalidResponseError,
287 "GET on #{uri} returned: #{head.inspect}"
288 end # def http_get_sock