Limit number of concurrent downloads from a single site
[zeroinstall/solver.git] / zeroinstall / injector / iface_cache.py
blobabf8c91f3b440e6d06cf0167df0ebf331bb28592
1 """
2 Manages the feed cache.
4 @var iface_cache: A singleton cache object. You should normally use this rather than
5 creating new cache objects.
7 """
8 # Copyright (C) 2009, Thomas Leonard
9 # See the README file for details, or visit http://0install.net.
11 # Note:
13 # We need to know the modification time of each interface, because we refuse
14 # to update to an older version (this prevents an attack where the attacker
15 # sends back an old version which is correctly signed but has a known bug).
17 # The way we store this is a bit complicated due to backward compatibility:
19 # - GPG-signed interfaces have their signatures removed and a last-modified
20 # attribute is stored containing the date from the signature.
22 # - XML-signed interfaces are stored unmodified with their signatures. The
23 # date is extracted from the signature when needed.
25 # - Older versions used to add the last-modified attribute even to files
26 # with XML signatures - these files therefore have invalid signatures and
27 # we extract from the attribute for these.
29 # Eventually, support for the first and third cases will be removed.
31 import os, sys, time
32 from logging import debug, info, warn
33 from cStringIO import StringIO
35 from zeroinstall import _
36 from zeroinstall.support import basedir
37 from zeroinstall.injector import reader, model
38 from zeroinstall.injector.namespaces import config_site, config_prog
39 from zeroinstall.injector.model import Interface, escape, unescape
40 from zeroinstall import SafeException
42 # If we started a check within this period, don't start another one:
43 FAILED_CHECK_DELAY = 60 * 60 # 1 Hour
45 def _pretty_time(t):
46 assert isinstance(t, (int, long)), t
47 return time.strftime('%Y-%m-%d %H:%M:%S UTC', time.localtime(t))
49 class ReplayAttack(SafeException):
50 """Attempt to import a feed that's older than the one in the cache."""
51 pass
53 class PendingFeed(object):
54 """A feed that has been downloaded but not yet added to the interface cache.
55 Feeds remain in this state until the user confirms that they trust at least
56 one of the signatures.
57 @ivar url: URL for the feed
58 @type url: str
59 @ivar signed_data: the untrusted data
60 @type signed_data: stream
61 @ivar sigs: signatures extracted from signed_data
62 @type sigs: [L{gpg.Signature}]
63 @ivar new_xml: the payload of the signed_data, or the whole thing if XML
64 @type new_xml: str
65 @since: 0.25"""
66 __slots__ = ['url', 'signed_data', 'sigs', 'new_xml']
68 def __init__(self, url, signed_data):
69 """Downloaded data is a GPG-signed message.
70 @param url: the URL of the downloaded feed
71 @type url: str
72 @param signed_data: the downloaded data (not yet trusted)
73 @type signed_data: stream
74 @raise SafeException: if the data is not signed, and logs the actual data"""
75 self.url = url
76 self.signed_data = signed_data
77 self.recheck()
79 def download_keys(self, fetcher, feed_hint = None, key_mirror = None):
80 """Download any required GPG keys not already on our keyring.
81 When all downloads are done (successful or otherwise), add any new keys
82 to the keyring, L{recheck}.
83 @param fetcher: fetcher to manage the download (was Handler before version 1.5)
84 @type fetcher: L{fetch.Fetcher}
85 @param key_mirror: URL of directory containing keys, or None to use feed's directory
86 @type key_mirror: str
87 """
88 downloads = {}
89 blockers = []
90 for x in self.sigs:
91 key_id = x.need_key()
92 if key_id:
93 import urlparse
94 key_url = urlparse.urljoin(key_mirror or self.url, '%s.gpg' % key_id)
95 info(_("Fetching key from %s"), key_url)
96 dl = fetcher.download_url(key_url, hint = feed_hint)
97 downloads[dl.downloaded] = (dl, dl.tempfile)
98 blockers.append(dl.downloaded)
100 exception = None
101 any_success = False
103 from zeroinstall.support import tasks
105 while blockers:
106 yield blockers
108 old_blockers = blockers
109 blockers = []
111 for b in old_blockers:
112 try:
113 tasks.check(b)
114 if b.happened:
115 dl, stream = downloads[b]
116 stream.seek(0)
117 self._downloaded_key(stream)
118 any_success = True
119 else:
120 blockers.append(b)
121 except Exception:
122 _type, exception, tb = sys.exc_info()
123 warn(_("Failed to import key for '%(url)s': %(exception)s"), {'url': self.url, 'exception': str(exception)})
125 if exception and not any_success:
126 raise exception, None, tb
128 self.recheck()
130 def _downloaded_key(self, stream):
131 import shutil, tempfile
132 from zeroinstall.injector import gpg
134 info(_("Importing key for feed '%s'"), self.url)
136 # Python2.4: can't call fileno() on stream, so save to tmp file instead
137 tmpfile = tempfile.TemporaryFile(prefix = 'injector-dl-data-')
138 try:
139 shutil.copyfileobj(stream, tmpfile)
140 tmpfile.flush()
142 tmpfile.seek(0)
143 gpg.import_key(tmpfile)
144 finally:
145 tmpfile.close()
147 def recheck(self):
148 """Set new_xml and sigs by reading signed_data.
149 You need to call this when previously-missing keys are added to the GPG keyring."""
150 from . import gpg
151 try:
152 self.signed_data.seek(0)
153 stream, sigs = gpg.check_stream(self.signed_data)
154 assert sigs
156 data = stream.read()
157 if stream is not self.signed_data:
158 stream.close()
160 self.new_xml = data
161 self.sigs = sigs
162 except:
163 self.signed_data.seek(0)
164 info(_("Failed to check GPG signature. Data received was:\n") + repr(self.signed_data.read()))
165 raise
167 class IfaceCache(object):
169 The interface cache stores downloaded and verified interfaces in
170 ~/.cache/0install.net/interfaces (by default).
172 There are methods to query the cache, add to it, check signatures, etc.
174 The cache is updated by L{fetch.Fetcher}.
176 Confusingly, this class is really two caches combined: the in-memory
177 cache of L{model.Interface} objects, and an on-disk cache of L{model.ZeroInstallFeed}s.
178 It will probably be split into two in future.
180 @ivar distro: the native distribution proxy
181 @type distro: L{distro.Distribution}
183 @see: L{iface_cache} - the singleton IfaceCache instance.
186 __slots__ = ['_interfaces', '_feeds', '_distro', '_config']
188 def __init__(self, distro = None):
189 """@param distro: distribution used to fetch "distribution:" feeds (since 0.49)
190 @param distro: distribution used to resolve "distribution:" feeds
191 @type distro: L{distro.Distribution}, or None to use the host distribution
193 self._interfaces = {}
194 self._feeds = {}
195 self._distro = distro
197 @property
198 def stores(self):
199 from zeroinstall.injector import policy
200 return policy.get_deprecated_singleton_config().stores
202 @property
203 def distro(self):
204 if self._distro is None:
205 from zeroinstall.injector.distro import get_host_distribution
206 self._distro = get_host_distribution()
207 return self._distro
209 def update_interface_if_trusted(self, interface, sigs, xml):
210 import warnings
211 warnings.warn("Use update_feed_if_trusted instead", DeprecationWarning, stacklevel = 2)
212 return self.update_feed_if_trusted(interface.uri, sigs, xml)
214 def update_feed_if_trusted(self, feed_url, sigs, xml):
215 """Update a cached feed (using L{update_feed_from_network})
216 if we trust the signatures.
217 If we don't trust any of the signatures, do nothing.
218 @param feed_url: the feed being updated
219 @type feed_url: str
220 @param sigs: signatures from L{gpg.check_stream}
221 @type sigs: [L{gpg.Signature}]
222 @param xml: the downloaded replacement feed document
223 @type xml: str
224 @return: True if the feed was updated
225 @rtype: bool
226 @since: 0.48
228 from . import trust
229 updated = self._oldest_trusted(sigs, trust.domain_from_url(feed_url))
230 if updated is None: return False # None are trusted
232 self.update_feed_from_network(feed_url, xml, updated)
233 return True
235 def update_interface_from_network(self, interface, new_xml, modified_time):
236 import warnings
237 warnings.warn("Use update_feed_from_network instead", DeprecationWarning, stacklevel = 2)
238 self.update_feed_from_network(interface.uri, new_xml, modified_time)
240 def update_feed_from_network(self, feed_url, new_xml, modified_time):
241 """Update a cached feed.
242 Called by L{update_feed_if_trusted} if we trust this data.
243 After a successful update, L{writer} is used to update the feed's
244 last_checked time.
245 @param feed_url: the feed being updated
246 @type feed_url: L{model.Interface}
247 @param new_xml: the downloaded replacement feed document
248 @type new_xml: str
249 @param modified_time: the timestamp of the oldest trusted signature
250 (used as an approximation to the feed's modification time)
251 @type modified_time: long
252 @raises ReplayAttack: if modified_time is older than the currently cached time
253 @since: 0.48
255 debug(_("Updating '%(interface)s' from network; modified at %(time)s") %
256 {'interface': feed_url, 'time': _pretty_time(modified_time)})
258 if '\n<!-- Base64 Signature' not in new_xml:
259 # Only do this for old-style feeds without
260 # signatures Otherwise, we can get the time from the
261 # signature, and adding this attribute just makes the
262 # signature invalid.
263 from xml.dom import minidom
264 doc = minidom.parseString(new_xml)
265 doc.documentElement.setAttribute('last-modified', str(modified_time))
266 new_xml = StringIO()
267 doc.writexml(new_xml)
268 new_xml = new_xml.getvalue()
270 self._import_new_feed(feed_url, new_xml, modified_time)
272 feed = self.get_feed(feed_url)
274 from . import writer
275 feed.last_checked = int(time.time())
276 writer.save_feed(feed)
278 info(_("Updated feed cache entry for %(interface)s (modified %(time)s)"),
279 {'interface': feed.get_name(), 'time': _pretty_time(modified_time)})
281 def _import_new_feed(self, feed_url, new_xml, modified_time):
282 """Write new_xml into the cache.
283 @param feed_url: the URL for the feed being updated
284 @param new_xml: the data to write
285 @param modified_time: when new_xml was modified
286 @raises ReplayAttack: if the new mtime is older than the current one
288 assert modified_time
290 upstream_dir = basedir.save_cache_path(config_site, 'interfaces')
291 cached = os.path.join(upstream_dir, escape(feed_url))
293 old_modified = None
294 if os.path.exists(cached):
295 old_xml = open(cached).read()
296 if old_xml == new_xml:
297 debug(_("No change"))
298 # Update in-memory copy, in case someone else updated the disk copy
299 self.get_feed(feed_url, force = True)
300 return
301 old_modified = int(os.stat(cached).st_mtime)
303 # Do we need to write this temporary file now?
304 stream = open(cached + '.new', 'w')
305 try:
306 stream.write(new_xml)
307 stream.close()
308 os.utime(cached + '.new', (modified_time, modified_time))
309 new_mtime = reader.check_readable(feed_url, cached + '.new')
310 assert new_mtime == modified_time
312 old_modified = self._get_signature_date(feed_url) or old_modified
314 if old_modified:
315 if new_mtime < old_modified:
316 raise ReplayAttack(_("New feed's modification time is "
317 "before old version!\nInterface: %(iface)s\nOld time: %(old_time)s\nNew time: %(new_time)s\n"
318 "Refusing update.")
319 % {'iface': feed_url, 'old_time': _pretty_time(old_modified), 'new_time': _pretty_time(new_mtime)})
320 if new_mtime == old_modified:
321 # You used to have to update the modification time manually.
322 # Now it comes from the signature, this check isn't useful
323 # and often causes problems when the stored format changes
324 # (e.g., when we stopped writing last-modified attributes)
325 pass
326 #raise SafeException("Interface has changed, but modification time "
327 # "hasn't! Refusing update.")
328 except:
329 os.unlink(cached + '.new')
330 raise
332 os.rename(cached + '.new', cached)
333 debug(_("Saved as %s") % cached)
335 self.get_feed(feed_url, force = True)
337 def get_feed(self, url, force = False, selections_ok = False):
338 """Get a feed from the cache.
339 @param url: the URL of the feed
340 @param force: load the file from disk again
341 @param selections_ok: if url is a local selections file, return that instead
342 @return: the feed, or None if it isn't cached
343 @rtype: L{model.ZeroInstallFeed}"""
344 if not force:
345 feed = self._feeds.get(url, False)
346 if feed != False:
347 return feed
349 if url.startswith('distribution:'):
350 master_feed = self.get_feed(url.split(':', 1)[1])
351 if not master_feed:
352 return None # Can't happen?
353 feed = self.distro.get_feed(master_feed)
354 else:
355 feed = reader.load_feed_from_cache(url, selections_ok = selections_ok)
356 if selections_ok and feed and not isinstance(feed, model.ZeroInstallFeed):
357 assert feed.selections is not None
358 return feed # (it's actually a selections document)
359 if feed:
360 reader.update_user_feed_overrides(feed)
361 self._feeds[url] = feed
362 return feed
364 def get_interface(self, uri):
365 """Get the interface for uri, creating a new one if required.
366 New interfaces are initialised from the disk cache, but not from
367 the network.
368 @param uri: the URI of the interface to find
369 @rtype: L{model.Interface}
371 if type(uri) == str:
372 uri = unicode(uri)
373 assert isinstance(uri, unicode)
375 if uri in self._interfaces:
376 return self._interfaces[uri]
378 debug(_("Initialising new interface object for %s"), uri)
379 self._interfaces[uri] = Interface(uri)
380 reader.update_from_cache(self._interfaces[uri], iface_cache = self)
381 return self._interfaces[uri]
383 def list_all_interfaces(self):
384 """List all interfaces in the cache.
385 @rtype: [str]
387 all = set()
388 for d in basedir.load_cache_paths(config_site, 'interfaces'):
389 for leaf in os.listdir(d):
390 if not leaf.startswith('.'):
391 all.add(unescape(leaf))
392 return list(all) # Why not just return the set?
394 def get_icon_path(self, iface):
395 """Get the path of a cached icon for an interface.
396 @param iface: interface whose icon we want
397 @return: the path of the cached icon, or None if not cached.
398 @rtype: str"""
399 return basedir.load_first_cache(config_site, 'interface_icons',
400 escape(iface.uri))
402 def get_cached_signatures(self, uri):
403 """Verify the cached interface using GPG.
404 Only new-style XML-signed interfaces retain their signatures in the cache.
405 @param uri: the feed to check
406 @type uri: str
407 @return: a list of signatures, or None
408 @rtype: [L{gpg.Signature}] or None
409 @since: 0.25"""
410 from . import gpg
411 if os.path.isabs(uri):
412 old_iface = uri
413 else:
414 old_iface = basedir.load_first_cache(config_site, 'interfaces', escape(uri))
415 if old_iface is None:
416 return None
417 try:
418 return gpg.check_stream(open(old_iface))[1]
419 except SafeException as ex:
420 debug(_("No signatures (old-style interface): %s") % ex)
421 return None
423 def _get_signature_date(self, uri):
424 """Read the date-stamp from the signature of the cached interface.
425 If the date-stamp is unavailable, returns None."""
426 from . import trust
427 sigs = self.get_cached_signatures(uri)
428 if sigs:
429 return self._oldest_trusted(sigs, trust.domain_from_url(uri))
431 def _oldest_trusted(self, sigs, domain):
432 """Return the date of the oldest trusted signature in the list, or None if there
433 are no trusted sigs in the list."""
434 trusted = [s.get_timestamp() for s in sigs if s.is_trusted(domain)]
435 if trusted:
436 return min(trusted)
437 return None
439 def mark_as_checking(self, url):
440 """Touch a 'last-check-attempt' timestamp file for this feed.
441 If url is a local path, nothing happens.
442 This prevents us from repeatedly trying to download a failing feed many
443 times in a short period."""
444 if os.path.isabs(url):
445 return
446 feeds_dir = basedir.save_cache_path(config_site, config_prog, 'last-check-attempt')
447 timestamp_path = os.path.join(feeds_dir, model._pretty_escape(url))
448 fd = os.open(timestamp_path, os.O_WRONLY | os.O_CREAT, 0o644)
449 os.close(fd)
450 os.utime(timestamp_path, None) # In case file already exists
452 def get_last_check_attempt(self, url):
453 """Return the time of the most recent update attempt for a feed.
454 @see: L{mark_as_checking}
455 @return: The time, or None if none is recorded
456 @rtype: float | None"""
457 timestamp_path = basedir.load_first_cache(config_site, config_prog, 'last-check-attempt', model._pretty_escape(url))
458 if timestamp_path:
459 return os.stat(timestamp_path).st_mtime
460 return None
462 def get_feed_imports(self, iface):
463 """Get all feeds that add to this interface.
464 This is the feeds explicitly added by the user, feeds added by the distribution,
465 and feeds imported by a <feed> in the main feed (but not recursively, at present).
466 @rtype: L{Feed}
467 @since: 0.48"""
468 main_feed = self.get_feed(iface.uri)
469 if main_feed:
470 return iface.extra_feeds + main_feed.feeds
471 else:
472 return iface.extra_feeds
474 def get_feeds(self, iface):
475 """Get all feeds for this interface. This is a mapping from feed URLs
476 to ZeroInstallFeeds. It includes the interface's main feed, plus the
477 resolution of every feed returned by L{get_feed_imports}. Uncached
478 feeds are indicated by a value of None.
479 @rtype: {str: L{ZeroInstallFeed} | None}
480 @since: 0.48"""
481 main_feed = self.get_feed(iface.uri)
482 results = {iface.uri: main_feed}
483 for imp in iface.extra_feeds:
484 try:
485 results[imp.uri] = self.get_feed(imp.uri)
486 except SafeException as ex:
487 warn("Failed to load feed '%s: %s", imp.uri, ex)
488 if main_feed:
489 for imp in main_feed.feeds:
490 results[imp.uri] = self.get_feed(imp.uri)
491 return results
493 def get_implementations(self, iface):
494 """Return all implementations from all of iface's feeds.
495 @rtype: [L{Implementation}]
496 @since: 0.48"""
497 impls = []
498 for feed in self.get_feeds(iface).itervalues():
499 if feed:
500 impls += feed.implementations.values()
501 return impls
503 def get_feed_targets(self, feed):
504 """Return a list of Interfaces for which feed can be a feed.
505 This is used by B{0install add-feed}.
506 @param feed: the feed
507 @type feed: L{model.ZeroInstallFeed} (or, deprecated, a URL)
508 @rtype: [model.Interface]
509 @raise SafeException: If there are no known feeds.
510 @since: 0.53"""
512 if not isinstance(feed, model.ZeroInstallFeed):
513 # (deprecated)
514 feed = self.get_feed(feed)
515 if feed is None:
516 raise SafeException("Feed is not cached and using deprecated API")
518 if not feed.feed_for:
519 raise SafeException(_("Missing <feed-for> element in '%s'; "
520 "it can't be used as a feed for any other interface.") % feed.url)
521 feed_targets = feed.feed_for
522 debug(_("Feed targets: %s"), feed_targets)
523 return [self.get_interface(uri) for uri in feed_targets]
525 def is_stale(self, feed_url, freshness_threshold):
526 """Check whether feed needs updating, based on the configured L{config.Config.freshness}.
527 None is considered to be stale.
528 If we already tried to update the feed within FAILED_CHECK_DELAY, returns false.
529 @return: True if feed should be updated
530 @since: 0.53"""
531 if isinstance(feed_url, model.ZeroInstallFeed):
532 feed_url = feed_url.url # old API
533 elif feed_url is None:
534 return True # old API
536 now = time.time()
538 feed = self.get_feed(feed_url)
539 if feed is not None:
540 if feed.local_path is not None:
541 return False # Local feeds are never stale
543 if feed.last_modified is not None:
544 staleness = now - (feed.last_checked or 0)
545 debug(_("Staleness for %(feed)s is %(staleness).2f hours"), {'feed': feed, 'staleness': staleness / 3600.0})
547 if freshness_threshold <= 0 or staleness < freshness_threshold:
548 return False # Fresh enough for us
549 # else we've never had it
551 last_check_attempt = self.get_last_check_attempt(feed_url)
552 if last_check_attempt and last_check_attempt > now - FAILED_CHECK_DELAY:
553 debug(_("Stale, but tried to check recently (%s) so not rechecking now."), time.ctime(last_check_attempt))
554 return False
556 return True
558 def usable_feeds(self, iface, arch):
559 """Generator for C{iface.feeds} that are valid for this architecture.
560 @rtype: generator
561 @see: L{arch}
562 @since: 0.53"""
563 for f in self.get_feed_imports(iface):
564 if f.os in arch.os_ranks and f.machine in arch.machine_ranks:
565 yield f
566 else:
567 debug(_("Skipping '%(feed)s'; unsupported architecture %(os)s-%(machine)s"),
568 {'feed': f, 'os': f.os, 'machine': f.machine})
570 iface_cache = IfaceCache()