1 # -*- encoding: binary -*-
2 require 'mogilefs/client'
3 require 'mogilefs/util'
6 # MogileFS File manipulation client.
8 class MogileFS::MogileFS < MogileFS::Client
10 include MogileFS::Util
11 include MogileFS::Bigfile
14 # The domain of keys for this MogileFS client.
19 # The timeout for get_file_data. Defaults to five seconds.
21 attr_accessor :get_file_data_timeout
24 # Creates a new MogileFS::MogileFS instance. +args+ must include a key
25 # :domain specifying the domain of this client.
27 def initialize(args = {})
28 @domain = args[:domain]
30 @get_file_data_timeout = 5
32 raise ArgumentError, "you must specify a domain" unless @domain
34 if @backend = args[:db_backend]
42 # Enumerates keys starting with +key+.
47 keys, after = list_keys prefix
49 until keys.nil? or keys.empty? do
50 keys.each { |k| yield k }
51 keys, after = list_keys prefix, after
58 # Retrieves the contents of +key+.
60 def get_file_data(key, &block)
61 paths = get_paths(key) or return nil
64 sock = http_read_sock(URI.parse(path))
66 return yield(sock) if block_given?
67 return sock.read(sock.mogilefs_size, "", @get_file_data_timeout)
71 rescue MogileFS::Timeout, MogileFS::InvalidResponseError,
72 Errno::ECONNREFUSED, EOFError, SystemCallError
79 # Get the paths for +key+.
81 def get_paths(key, noverify = true, zone = nil)
82 opts = { :domain => @domain, :key => key,
83 :noverify => noverify ? 1 : 0, :zone => zone }
84 @backend.respond_to?(:_get_paths) and return @backend._get_paths(opts)
85 res = @backend.get_paths(opts)
86 (1..res['paths'].to_i).map { |i| res["path#{i}"] }.compact
90 # Get the URIs for +key+.
92 def get_uris(key, noverify = true, zone = nil)
93 get_paths(key, noverify, zone).map { |path| URI.parse(path) }
97 # Creates a new file +key+ in +klass+. +bytes+ is currently unused.
99 # The +block+ operates like File.open.
101 def new_file(key, klass = nil, bytes = 0, &block) # :yields: file
102 raise MogileFS::ReadOnlyError if readonly?
103 opts = { :domain => @domain, :key => key, :multi_dest => 1 }
104 opts[:class] = klass if klass
105 res = @backend.create_open(opts)
107 dests = if dev_count = res['dev_count'] # multi_dest succeeded
108 (1..dev_count.to_i).map do |i|
109 [res["devid_#{i}"], res["path_#{i}"]]
111 else # single destination returned
112 # 0x0040: d0e4 4f4b 2064 6576 6964 3d31 2666 6964 ..OK.devid=1&fid
113 # 0x0050: 3d33 2670 6174 683d 6874 7470 3a2f 2f31 =3&path=http://1
114 # 0x0060: 3932 2e31 3638 2e31 2e37 323a 3735 3030 92.168.1.72:7500
115 # 0x0070: 2f64 6576 312f 302f 3030 302f 3030 302f /dev1/0/000/000/
116 # 0x0080: 3030 3030 3030 3030 3033 2e66 6964 0d0a 0000000003.fid..
118 [[res['devid'], res['path']]]
121 case (dests[0][1] rescue nil)
123 raise MogileFS::EmptyPathError
124 when /^http:\/\// then
125 MogileFS::HTTPFile.open(self, res['fid'], klass, key,
126 dests, bytes, &block)
128 raise MogileFS::UnsupportedPathError,
129 "paths '#{dests.inspect}' returned by backend is not supported"
134 # Copies the contents of +file+ into +key+ in class +klass+. +file+ can be
135 # either a file name or an object that responds to #sysread.
136 # Returns size of +file+ stored
138 def store_file(key, klass, file)
139 raise MogileFS::ReadOnlyError if readonly?
141 new_file key, klass do |mfp|
142 if file.respond_to? :sysread then
145 size = File.size(file)
146 if size > 0x10000 # Bigass file, handle differently
150 File.open(file, "rb") { |fp| sysrwloop(fp, mfp) }
157 # Stores +content+ into +key+ in class +klass+.
159 def store_content(key, klass, content)
160 raise MogileFS::ReadOnlyError if readonly?
162 new_file key, klass do |mfp|
163 if content.is_a?(MogileFS::Util::StoreContent)
164 mfp.streaming_io = content
177 raise MogileFS::ReadOnlyError if readonly?
179 @backend.delete :domain => @domain, :key => key
186 @backend.sleep :duration => duration
190 # Renames a key +from+ to key +to+.
193 raise MogileFS::ReadOnlyError if readonly?
195 @backend.rename :domain => @domain, :from_key => from, :to_key => to
200 # Returns the size of +key+.
202 @backend.respond_to?(:_size) and return @backend._size(domain, key)
203 paths = get_paths(key) or return nil
207 def paths_size(paths)
210 return http_read_sock(URI.parse(path), "HEAD").mogilefs_size
211 rescue MogileFS::InvalidResponseError, MogileFS::Timeout,
212 Errno::ECONNREFUSED, EOFError, SystemCallError => err
220 # Lists keys starting with +prefix+ follwing +after+ up to +limit+. If
221 # +after+ is nil the list starts at the beginning.
223 def list_keys(prefix, after = nil, limit = 1000, &block)
224 if @backend.respond_to?(:_list_keys)
225 return @backend._list_keys(domain, prefix, after, limit, &block)
229 @backend.list_keys(:domain => domain, :prefix => prefix,
230 :after => after, :limit => limit)
231 rescue MogileFS::Backend::NoneMatchError
235 keys = (1..res['key_count'].to_i).map { |i| res["key_#{i}"] }
237 # emulate the MogileFS::Mysql interface, slowly...
239 paths = get_paths(key) or next
240 length = paths_size(paths) or next
241 yield key, length, paths.size
245 [ keys, res['next_after'] ]
250 # given a URI, this returns a readable socket with ready data from the
251 # body of the response.
252 def http_read_sock(uri, http_method = "GET")
253 tout = @get_file_data_timeout
254 sock = MogileFS::Socket.tcp(uri.host, uri.port, tout)
255 buf = "#{http_method} #{uri.request_uri} HTTP/1.0\r\n\r\n" # no chunking
257 sock.timed_write(buf, tout)
258 sock.timed_peek(4096, buf, tout) or
259 raise MogileFS::InvalidResponseError, "EOF on #{http_method} #{uri}"
261 head, body = buf.split(/\r\n\r\n/, 2)
263 # we're dealing with a seriously slow/stupid HTTP server if we can't
264 # get the header in a single recv(2) syscall.
265 if head =~ %r{\AHTTP/\d+\.\d+\s+200\s*} &&
266 head =~ %r{^Content-Length:\s*(\d+)}i
267 sock.mogilefs_size = $1.to_i
272 sock.read(head.size + 4) # will allow IO.copy_stream to work
276 sock.close rescue nil
277 raise MogileFS::InvalidResponseError,
278 "#{http_method} on #{uri} returned: #{head.inspect}"