From b33f28ac37b36d688f421ea574ea07a155559f6c Mon Sep 17 00:00:00 2001 From: Thomas Leonard Date: Sat, 26 Apr 2008 19:30:51 +0100 Subject: [PATCH] Change to mirror logic; keep waiting for primary In the previous code, if the upstream site didn't reply before the mirror then we stopped waiting for it. With a fast mirror and a slow upstream, you might never see the (newer) upstream version. Now, we start a mirror download if the primary fails or takes too long (as before), but we don't abort the primary if the mirror succeeds. We do abort the mirror if the primary succeeeds, though. --- zeroinstall/injector/fetch.py | 190 ++++++++++++++++++++-------------------- zeroinstall/injector/handler.py | 10 ++- 2 files changed, 102 insertions(+), 98 deletions(-) diff --git a/zeroinstall/injector/fetch.py b/zeroinstall/injector/fetch.py index 724325b..3080d7c 100644 --- a/zeroinstall/injector/fetch.py +++ b/zeroinstall/injector/fetch.py @@ -12,6 +12,7 @@ from zeroinstall.support import tasks, basedir from zeroinstall.injector.namespaces import XMLNS_IFACE, config_site from zeroinstall.injector.model import DownloadSource, Recipe, SafeException, network_offline, escape from zeroinstall.injector.iface_cache import PendingFeed, ReplayAttack +from zeroinstall.injector.handler import NoTrustedKeys def _escape_slashes(path): return path.replace('/', '%23') @@ -88,81 +89,6 @@ class Fetcher(object): """Return the URL of a mirror for this feed.""" return '%s/%s/latest.xml' % (self.feed_mirror, _get_feed_dir(url)) - def download_with_fallback(self, dl, fallback_url, force, results): - """Wait for dl to finish successfully and return dl.stream. - If it takes too long, or fails, try downloading from fallback_url. - Note this is a generator. - @param dl: the first download to try - @param fallback_url: URL of fallback download - @param results: an empty list to contain the results - @return: generates a series of blocker to be yielded. When exhausted, results contains (successful_stream, is_from_fallback)""" - dl_stream = dl.tempfile - mirror_dl = None - timeout = tasks.TimeoutBlocker(5, 'Mirror timeout') # 5 seconds - - dl_url = dl.url # Used for logging after dl is None - dl_ex = None # Saved failure, in case the mirror fails too - - # There are three possible states: - # - timeout, not mirror_dl : fallback dl not yet started - # - not timeout, mirror_dl : fallback dl in progress - # - not timeout, not mirror_dl: fallback dl failed - - while True: - blockers = [dl and dl.downloaded, mirror_dl and mirror_dl.downloaded, timeout] - blockers = filter(None, blockers) # Remove Nones - - if not blockers: - # Both downloads failed. Report the original error. - raise dl_ex[0], dl_ex[1], dl_ex[2] - - yield blockers - - if dl: - try: - tasks.check(dl.downloaded) - if dl.downloaded.happened: - # Main download succeeded; use it - results += [dl_stream, False] - if mirror_dl: mirror_dl.abort() - return - except Exception, ex: - # Main download failed. Store the error for later. - dl = None - dl_ex = sys.exc_info() - if timeout or mirror_dl: - warn("Download failed (will try mirror): %s", dl_ex[1]) - else: - warn("Download failed: %s", dl_ex[1]) - if mirror_dl: - try: - tasks.check(mirror_dl.downloaded) - if mirror_dl.downloaded.happened: - # Mirror download succeeded; use that - results += [mirror_stream, True] - if dl: dl.abort() - return - except Exception, ex: - # Mirror download failed. Ignore. - info("Mirror download failed: %s", ex) - mirror_dl = None - - if timeout: - tasks.check(timeout) - assert not mirror_dl - # Fallback dl not yet started. Start now? - if timeout.happened: - info("Download taking too long; trying mirror") - timeout = None - elif dl is None: - # Main has just failed. - timeout = None - if timeout is None: - # Start downloading from the mirror. - info("Trying mirror URL %s", fallback_url) - mirror_dl = self.handler.get_download(fallback_url, force = force, hint = dl_url) - mirror_stream = mirror_dl.tempfile - def download_and_import_feed(self, feed_url, iface_cache, force = False): """Download the feed, download any required keys, confirm trust if needed and import. @param feed_url: the feed to be downloaded @@ -174,19 +100,97 @@ class Fetcher(object): debug("download_and_import_feed %s (force = %d)", feed_url, force) assert not feed_url.startswith('/') - dl = self.handler.get_download(feed_url, force = force, hint = feed_url) + primary = self._download_and_import_feed(feed_url, iface_cache, force, use_mirror = False) - @tasks.named_async("fetch_feed " + feed_url) + @tasks.named_async("monitor feed downloads for " + feed_url) + def wait_for_downloads(primary): + # Download just the upstream feed, unless it takes too long... + timeout = tasks.TimeoutBlocker(5, 'Mirror timeout') # 5 seconds + + yield primary, timeout + tasks.check(timeout) + + try: + tasks.check(primary) + if primary.happened: + return # OK, primary succeeded! + # OK, maybe it's just being slow... + info("Feed download from %s is taking a long time. Trying mirror too...", feed_url) + primary_ex = None + except NoTrustedKeys, ex: + raise # Don't bother trying the mirror if we have a trust problem + except SafeException, ex: + # Primary failed + primary = None + primary_ex = ex + warn("Trying mirror, as feed download from %s failed: %s", feed_url, ex) + + # Start downloading from mirror... + mirror = self._download_and_import_feed(feed_url, iface_cache, force, use_mirror = True) + + # Wait until both mirror and primary tasks are complete... + while True: + blockers = filter(None, [primary, mirror]) + if not blockers: + break + yield blockers + + if primary: + try: + tasks.check(primary) + if primary.happened: + primary = None + # No point carrying on with the mirror once the primary has succeeded + if mirror: + info("Primary feed download succeeded; aborting mirror download for " + feed_url) + mirror.dl.abort() + except SafeException, ex: + primary = None + primary_ex = ex + info("Feed download from %s failed; still trying mirror: %s", feed_url, ex) + + if mirror: + try: + tasks.check(mirror) + if mirror.happened: + mirror = None + if primary_ex: + # We already warned; no need to raise an exception too, + # as the mirror download succeeded. + primary_ex = None + except ReplayAttack, ex: + info("Version from mirror is older than cached version; ignoring it: %s", ex) + mirror = None + primary_ex = None + except SafeException, ex: + info("Mirror download failed: %s", ex) + mirror = None + + if primary_ex: + raise primary_ex + + return wait_for_downloads(primary) + + def _download_and_import_feed(self, feed_url, iface_cache, force, use_mirror): + """Download and import a feed. + @param use_mirror: False to use primary location; True to use mirror.""" + if use_mirror: + url = self.get_feed_mirror(feed_url) + else: + url = feed_url + + dl = self.handler.get_download(url, force = force, hint = feed_url) + stream = dl.tempfile + + @tasks.named_async("fetch_feed " + url) def fetch_feed(): - results = [] - for x in self.download_with_fallback(dl, self.get_feed_mirror(feed_url), force = force, results = results): - yield x - stream, using_mirror = results + yield dl.downloaded + tasks.check(dl.downloaded) pending = PendingFeed(feed_url, stream) iface_cache.add_pending(pending) - if using_mirror: + if use_mirror: # If we got the feed from a mirror, get the key from there too key_mirror = self.feed_mirror + '/keys/' else: @@ -197,21 +201,17 @@ class Fetcher(object): tasks.check(keys_downloaded.finished) iface = iface_cache.get_interface(pending.url) - try: + if not iface_cache.update_interface_if_trusted(iface, pending.sigs, pending.new_xml): + blocker = self.handler.confirm_trust_keys(iface, pending.sigs, pending.new_xml) + if blocker: + yield blocker + tasks.check(blocker) if not iface_cache.update_interface_if_trusted(iface, pending.sigs, pending.new_xml): - blocker = self.handler.confirm_trust_keys(iface, pending.sigs, pending.new_xml) - if blocker: - yield blocker - tasks.check(blocker) - if not iface_cache.update_interface_if_trusted(iface, pending.sigs, pending.new_xml): - raise SafeException("No signing keys trusted; not importing") - except ReplayAttack, ex: - if using_mirror: - info("Version from mirror is older than cached version; ignoring it: %s", ex) - else: - raise + raise NoTrustedKeys("No signing keys trusted; not importing") - return fetch_feed() + task = fetch_feed() + task.dl = dl + return task def download_impl(self, impl, retrieval_method, stores, force = False): """Download an implementation. diff --git a/zeroinstall/injector/handler.py b/zeroinstall/injector/handler.py index 3c0fe87..c6a4180 100644 --- a/zeroinstall/injector/handler.py +++ b/zeroinstall/injector/handler.py @@ -18,6 +18,10 @@ from zeroinstall.support import tasks from zeroinstall.injector import model, download from zeroinstall.injector.iface_cache import iface_cache +class NoTrustedKeys(model.SafeException): + """Thrown by L{Handler.confirm_trust_keys} on failure.""" + pass + class Handler(object): """ This implementation uses the GLib mainloop. Note that QT4 can use the GLib mainloop too. @@ -47,7 +51,7 @@ class Handler(object): self.downloads_changed() @tasks.async - def download_done(): + def download_done_stats(): yield dl.downloaded # NB: we don't check for exceptions here; someone else should be doing that try: @@ -57,7 +61,7 @@ class Handler(object): self.downloads_changed() except Exception, ex: self.report_error(ex) - download_done() + download_done_stats() def impl_added_to_store(self, impl): """Called by the L{fetch.Fetcher} when adding an implementation. @@ -147,7 +151,7 @@ class Handler(object): i = raw_input("Trust [Y/N] ") if not i: continue if i in 'Nn': - raise model.SafeException('Not signed with a trusted key') + raise NoTrustedKeys('Not signed with a trusted key') if i in 'Yy': break for key in valid_sigs: -- 2.11.4.GIT