Respect timeout when doing get_file_data
[ruby-mogilefs-client.git] / lib / mogilefs / mogilefs.rb
blob14fc0c2d5a8bd60de663612146805854ac40dbdd
1 require 'mogilefs/client'
2 require 'mogilefs/util'
4 ##
5 # MogileFS File manipulation client.
7 class MogileFS::MogileFS < MogileFS::Client
9   include MogileFS::Util
10   include MogileFS::Bigfile
12   ##
13   # The domain of keys for this MogileFS client.
15   attr_reader :domain
17   ##
18   # The timeout for get_file_data.  Defaults to five seconds.
20   attr_accessor :get_file_data_timeout
22   ##
23   # internal Regexp for matching an "HTTP 200 OK" head response
24   HTTP_200_OK = %r{\AHTTP/\d+\.\d+\s+200\s+}.freeze
26   ##
27   # Creates a new MogileFS::MogileFS instance.  +args+ must include a key
28   # :domain specifying the domain of this client.
30   def initialize(args = {})
31     @domain = args[:domain]
33     @get_file_data_timeout = 5
35     raise ArgumentError, "you must specify a domain" unless @domain
37     if @backend = args[:db_backend]
38       @readonly = true
39     else
40       super
41     end
42   end
44   ##
45   # Enumerates keys starting with +key+.
47   def each_key(prefix)
48     after = nil
50     keys, after = list_keys prefix
52     until keys.nil? or keys.empty? do
53       keys.each { |k| yield k }
54       keys, after = list_keys prefix, after
55     end
57     nil
58   end
60   ##
61   # Retrieves the contents of +key+.
63   def get_file_data(key, &block)
64     paths = get_paths key
66     return nil unless paths
68     paths.each do |path|
69       next unless path
70       case path
71       when /^http:\/\// then
72         begin
73           sock = http_get_sock(URI.parse(path))
74           return yield(sock) if block_given?
75           return sysread_full(sock, sock.mogilefs_size, @get_file_data_timeout)
76         rescue MogileFS::Timeout, Errno::ECONNREFUSED,
77                EOFError, SystemCallError, MogileFS::InvalidResponseError
78           next
79         end
80       else
81         next unless File.exist? path
82         return File.read(path)
83       end
84     end
86     nil
87   end
89   ##
90   # Get the paths for +key+.
92   def get_paths(key, noverify = true, zone = nil)
93     opts = { :domain => @domain, :key => key,
94              :noverify => noverify ? 1 : 0, :zone => zone }
95     @backend.respond_to?(:_get_paths) and return @backend._get_paths(opts)
96     res = @backend.get_paths(opts)
97     (1..res['paths'].to_i).map { |i| res["path#{i}"] }
98   end
100   ##
101   # Creates a new file +key+ in +klass+.  +bytes+ is currently unused.
102   #
103   # The +block+ operates like File.open.
105   def new_file(key, klass = nil, bytes = 0, &block) # :yields: file
106     raise MogileFS::ReadOnlyError if readonly?
107     opts = { :domain => @domain, :key => key, :multi_dest => 1 }
108     opts[:class] = klass if klass
109     res = @backend.create_open(opts)
111     dests = if dev_count = res['dev_count'] # multi_dest succeeded
112       (1..dev_count.to_i).map do |i|
113         [res["devid_#{i}"], res["path_#{i}"]]
114       end
115     else # single destination returned
116       # 0x0040:  d0e4 4f4b 2064 6576 6964 3d31 2666 6964  ..OK.devid=1&fid
117       # 0x0050:  3d33 2670 6174 683d 6874 7470 3a2f 2f31  =3&path=http://1
118       # 0x0060:  3932 2e31 3638 2e31 2e37 323a 3735 3030  92.168.1.72:7500
119       # 0x0070:  2f64 6576 312f 302f 3030 302f 3030 302f  /dev1/0/000/000/
120       # 0x0080:  3030 3030 3030 3030 3033 2e66 6964 0d0a  0000000003.fid..
122       [[res['devid'], res['path']]]
123     end
125     case (dests[0][1] rescue nil)
126     when nil, '' then
127       raise MogileFS::EmptyPathError
128     when /^http:\/\// then
129       MogileFS::HTTPFile.open(self, res['fid'], klass, key,
130                               dests, bytes, &block)
131     else
132       raise MogileFS::UnsupportedPathError,
133             "paths '#{dests.inspect}' returned by backend is not supported"
134     end
135   end
137   ##
138   # Copies the contents of +file+ into +key+ in class +klass+.  +file+ can be
139   # either a file name or an object that responds to #read.
141   def store_file(key, klass, file)
142     raise MogileFS::ReadOnlyError if readonly?
144     new_file key, klass do |mfp|
145       if file.respond_to? :sysread then
146         return sysrwloop(file, mfp)
147       else
148         if File.size(file) > 0x10000 # Bigass file, handle differently
149           mfp.big_io = file
150           return
151         else
152           return File.open(file, "rb") { |fp| sysrwloop(fp, mfp) }
153         end
154       end
155     end
156   end
158   ##
159   # Stores +content+ into +key+ in class +klass+.
161   def store_content(key, klass, content)
162     raise MogileFS::ReadOnlyError if readonly?
164     new_file key, klass do |mfp|
165       if content.is_a?(MogileFS::Util::StoreContent)
166         mfp.streaming_io = content
167       else
168         mfp << content
169       end
170     end
172     content.length
173   end
175   ##
176   # Removes +key+.
178   def delete(key)
179     raise MogileFS::ReadOnlyError if readonly?
181     @backend.delete :domain => @domain, :key => key
182   end
184   ##
185   # Sleeps +duration+.
187   def sleep(duration)
188     @backend.sleep :duration => duration
189   end
191   ##
192   # Renames a key +from+ to key +to+.
194   def rename(from, to)
195     raise MogileFS::ReadOnlyError if readonly?
197     @backend.rename :domain => @domain, :from_key => from, :to_key => to
198     nil
199   end
201   ##
202   # Returns the size of +key+.
203   def size(key)
204     @backend.respond_to?(:_size) and return @backend._size(domain, key)
205     paths = get_paths(key) or return nil
206     paths_size(paths)
207   end
209   def paths_size(paths)
210     paths.each do |path|
211       next unless path
212       case path
213       when /^http:\/\// then
214         begin
215           url = URI.parse path
216           s = Socket.mogilefs_new_request(url.host, url.port,
217                                    "HEAD #{url.request_uri} HTTP/1.0\r\n\r\n",
218                                    @get_file_data_timeout)
219           res = s.recv(4096, 0)
220           if res =~ HTTP_200_OK
221             head, body = res.split(/\r\n\r\n/, 2)
222             if head =~ /^Content-Length:\s*(\d+)/i
223               return $1.to_i
224             end
225           end
226           next
227         rescue MogileFS::Timeout, Errno::ECONNREFUSED,
228                EOFError, SystemCallError
229           next
230         ensure
231           s.close rescue nil
232         end
233       else
234         next unless File.exist? path
235         return File.size(path)
236       end
237     end
239     nil
240   end
242   ##
243   # Lists keys starting with +prefix+ follwing +after+ up to +limit+.  If
244   # +after+ is nil the list starts at the beginning.
246   def list_keys(prefix, after = nil, limit = 1000, &block)
247     if @backend.respond_to?(:_list_keys)
248       return @backend._list_keys(domain, prefix, after, limit, &block)
249     end
251     res = begin
252       @backend.list_keys(:domain => domain, :prefix => prefix,
253                          :after => after, :limit => limit)
254     rescue MogileFS::Backend::NoneMatchError
255       return nil
256     end
258     keys = (1..res['key_count'].to_i).map { |i| res["key_#{i}"] }
259     if block_given?
260       # emulate the MogileFS::Mysql interface, slowly...
261       keys.each do |key|
262         paths = get_paths(key) or next
263         length = paths_size(paths) or next
264         yield key, length, paths.size
265       end
266     end
268     [ keys, res['next_after'] ]
269   end
271   protected
273     # given a URI, this returns a readable socket with ready data from the
274     # body of the response.
275     def http_get_sock(uri)
276       sock = Socket.mogilefs_new_request(uri.host, uri.port,
277                                     "GET #{uri.request_uri} HTTP/1.0\r\n\r\n",
278                                     @get_file_data_timeout)
279       buf = sock.recv(4096, Socket::MSG_PEEK)
280       head, body = buf.split(/\r\n\r\n/, 2)
281       if head =~ HTTP_200_OK
282         sock.mogilefs_size = head[/^Content-Length:\s*(\d+)/i, 1].to_i
283         sock.recv(head.size + 4, 0)
284         return sock
285       end
286       raise MogileFS::InvalidResponseError,
287             "GET on #{uri} returned: #{head.inspect}"
288     end # def http_get_sock