call IO.select([sock]) if sock is not ready.
[ruby-mogilefs-client.git] / lib / mogilefs / mogilefs.rb
blobc7c33edbe56550218bffa5b648ce4585c37c8cea
1 require 'mogilefs/client'
2 require 'mogilefs/util'
4 ##
5 # MogileFS File manipulation client.
7 class MogileFS::MogileFS < MogileFS::Client
9   include MogileFS::Util
10   include MogileFS::Bigfile
12   ##
13   # The domain of keys for this MogileFS client.
15   attr_reader :domain
17   ##
18   # The timeout for get_file_data.  Defaults to five seconds.
20   attr_accessor :get_file_data_timeout
22   ##
23   # internal Regexp for matching an "HTTP 200 OK" head response
24   HTTP_200_OK = %r{\AHTTP/\d+\.\d+\s+200\s+}.freeze
26   ##
27   # Creates a new MogileFS::MogileFS instance.  +args+ must include a key
28   # :domain specifying the domain of this client.
30   def initialize(args = {})
31     @domain = args[:domain]
33     @get_file_data_timeout = 5
35     raise ArgumentError, "you must specify a domain" unless @domain
37     if @backend = args[:db_backend]
38       @readonly = true
39     else
40       super
41     end
42   end
44   ##
45   # Enumerates keys starting with +key+.
47   def each_key(prefix)
48     after = nil
50     keys, after = list_keys prefix
52     until keys.nil? or keys.empty? do
53       keys.each { |k| yield k }
54       keys, after = list_keys prefix, after
55     end
57     nil
58   end
60   ##
61   # Retrieves the contents of +key+.
63   def get_file_data(key, &block)
64     paths = get_paths key
66     return nil unless paths
68     paths.each do |path|
69       next unless path
70       case path
71       when /^http:\/\// then
72         begin
73           sock = http_get_sock(URI.parse(path))
74           return( if block_given?
75                     yield(sock)
76                   else
77                     begin
78                       sock.read
79                     rescue Errno::EAGAIN
80                       IO.select([sock])
81                       retry
82                     end
83                   end )
84           # return block_given? ? yield(sock) : sock.read
85         rescue MogileFS::Timeout, Errno::ECONNREFUSED,
86                EOFError, SystemCallError, MogileFS::InvalidResponseError
87           next
88         end
89       else
90         next unless File.exist? path
91         return File.read(path)
92       end
93     end
95     nil
96   end
98   ##
99   # Get the paths for +key+.
101   def get_paths(key, noverify = true, zone = nil)
102     opts = { :domain => @domain, :key => key,
103              :noverify => noverify ? 1 : 0, :zone => zone }
104     @backend.respond_to?(:_get_paths) and return @backend._get_paths(opts)
105     res = @backend.get_paths(opts)
106     (1..res['paths'].to_i).map { |i| res["path#{i}"] }
107   end
109   ##
110   # Creates a new file +key+ in +klass+.  +bytes+ is currently unused.
111   #
112   # The +block+ operates like File.open.
114   def new_file(key, klass = nil, bytes = 0, &block) # :yields: file
115     raise MogileFS::ReadOnlyError if readonly?
116     opts = { :domain => @domain, :key => key, :multi_dest => 1 }
117     opts[:class] = klass if klass
118     res = @backend.create_open(opts)
120     dests = if dev_count = res['dev_count'] # multi_dest succeeded
121       (1..dev_count.to_i).map do |i|
122         [res["devid_#{i}"], res["path_#{i}"]]
123       end
124     else # single destination returned
125       # 0x0040:  d0e4 4f4b 2064 6576 6964 3d31 2666 6964  ..OK.devid=1&fid
126       # 0x0050:  3d33 2670 6174 683d 6874 7470 3a2f 2f31  =3&path=http://1
127       # 0x0060:  3932 2e31 3638 2e31 2e37 323a 3735 3030  92.168.1.72:7500
128       # 0x0070:  2f64 6576 312f 302f 3030 302f 3030 302f  /dev1/0/000/000/
129       # 0x0080:  3030 3030 3030 3030 3033 2e66 6964 0d0a  0000000003.fid..
131       [[res['devid'], res['path']]]
132     end
134     case (dests[0][1] rescue nil)
135     when nil, '' then
136       raise MogileFS::EmptyPathError
137     when /^http:\/\// then
138       MogileFS::HTTPFile.open(self, res['fid'], klass, key,
139                               dests, bytes, &block)
140     else
141       raise MogileFS::UnsupportedPathError,
142             "paths '#{dests.inspect}' returned by backend is not supported"
143     end
144   end
146   ##
147   # Copies the contents of +file+ into +key+ in class +klass+.  +file+ can be
148   # either a file name or an object that responds to #read.
150   def store_file(key, klass, file)
151     raise MogileFS::ReadOnlyError if readonly?
153     new_file key, klass do |mfp|
154       if file.respond_to? :sysread then
155         return sysrwloop(file, mfp)
156       else
157         if File.size(file) > 0x10000 # Bigass file, handle differently
158           mfp.big_io = file
159           return
160         else
161           return File.open(file, "rb") { |fp| sysrwloop(fp, mfp) }
162         end
163       end
164     end
165   end
167   ##
168   # Stores +content+ into +key+ in class +klass+.
170   def store_content(key, klass, content)
171     raise MogileFS::ReadOnlyError if readonly?
173     new_file key, klass do |mfp|
174       if content.is_a?(MogileFS::Util::StoreContent)
175         mfp.streaming_io = content
176       else
177         mfp << content
178       end
179     end
181     content.length
182   end
184   ##
185   # Removes +key+.
187   def delete(key)
188     raise MogileFS::ReadOnlyError if readonly?
190     @backend.delete :domain => @domain, :key => key
191   end
193   ##
194   # Sleeps +duration+.
196   def sleep(duration)
197     @backend.sleep :duration => duration
198   end
200   ##
201   # Renames a key +from+ to key +to+.
203   def rename(from, to)
204     raise MogileFS::ReadOnlyError if readonly?
206     @backend.rename :domain => @domain, :from_key => from, :to_key => to
207     nil
208   end
210   ##
211   # Returns the size of +key+.
212   def size(key)
213     @backend.respond_to?(:_size) and return @backend._size(domain, key)
214     paths = get_paths(key) or return nil
215     paths_size(paths)
216   end
218   def paths_size(paths)
219     paths.each do |path|
220       next unless path
221       case path
222       when /^http:\/\// then
223         begin
224           url = URI.parse path
225           s = Socket.mogilefs_new_request(url.host, url.port,
226                                    "HEAD #{url.request_uri} HTTP/1.0\r\n\r\n",
227                                    @get_file_data_timeout)
228           res = s.recv(4096, 0)
229           if res =~ HTTP_200_OK
230             head, body = res.split(/\r\n\r\n/, 2)
231             if head =~ /^Content-Length:\s*(\d+)/i
232               return $1.to_i
233             end
234           end
235           next
236         rescue MogileFS::Timeout, Errno::ECONNREFUSED,
237                EOFError, SystemCallError
238           next
239         ensure
240           s.close rescue nil
241         end
242       else
243         next unless File.exist? path
244         return File.size(path)
245       end
246     end
248     nil
249   end
251   ##
252   # Lists keys starting with +prefix+ follwing +after+ up to +limit+.  If
253   # +after+ is nil the list starts at the beginning.
255   def list_keys(prefix, after = nil, limit = 1000, &block)
256     if @backend.respond_to?(:_list_keys)
257       return @backend._list_keys(domain, prefix, after, limit, &block)
258     end
260     res = begin
261       @backend.list_keys(:domain => domain, :prefix => prefix,
262                          :after => after, :limit => limit)
263     rescue MogileFS::Backend::NoneMatchError
264       return nil
265     end
267     keys = (1..res['key_count'].to_i).map { |i| res["key_#{i}"] }
268     if block_given?
269       # emulate the MogileFS::Mysql interface, slowly...
270       keys.each do |key|
271         paths = get_paths(key) or next
272         length = paths_size(paths) or next
273         yield key, length, paths.size
274       end
275     end
277     [ keys, res['next_after'] ]
278   end
280   protected
282     # given a URI, this returns a readable socket with ready data from the
283     # body of the response.
284     def http_get_sock(uri)
285       sock = Socket.mogilefs_new_request(uri.host, uri.port,
286                                     "GET #{uri.request_uri} HTTP/1.0\r\n\r\n",
287                                     @get_file_data_timeout)
288       buf = sock.recv(4096, Socket::MSG_PEEK)
289       head, body = buf.split(/\r\n\r\n/, 2)
290       if head =~ HTTP_200_OK
291         sock.recv(head.size + 4, 0)
292         return sock
293       end
294       raise MogileFS::InvalidResponseError,
295             "GET on #{uri} returned: #{head.inspect}"
296     end # def http_get_sock