From 53b991b56162e2ef7e5ae16a5f01cf0170289098 Mon Sep 17 00:00:00 2001 From: Thomas Leonard Date: Sat, 31 Dec 2011 11:36:14 +0000 Subject: [PATCH] Added DownloadScheduler --- zeroinstall/injector/download.py | 67 +++--------------------- zeroinstall/injector/fetch.py | 11 +++- zeroinstall/injector/scheduler.py | 107 ++++++++++++++++++++++++++++++++++++++ 3 files changed, 124 insertions(+), 61 deletions(-) create mode 100644 zeroinstall/injector/scheduler.py diff --git a/zeroinstall/injector/download.py b/zeroinstall/injector/download.py index f14f93f..a6f2314 100644 --- a/zeroinstall/injector/download.py +++ b/zeroinstall/injector/download.py @@ -56,7 +56,7 @@ class Download(object): """ __slots__ = ['url', 'tempfile', 'status', 'expected_size', 'downloaded', 'hint', '_final_total_size', 'aborted_by_user', - 'modification_time', 'unmodified'] + 'modification_time', 'unmodified', '_aborted'] def __init__(self, url, hint = None, modification_time = None, expected_size = None): """Create a new download object. @@ -67,7 +67,7 @@ class Download(object): @postcondition: L{status} == L{download_fetching}.""" self.url = url self.hint = hint - self.aborted_by_user = False + self.aborted_by_user = False # replace with _aborted? self.modification_time = modification_time self.unmodified = False @@ -80,55 +80,12 @@ class Download(object): self.status = download_fetching self.tempfile = tempfile.TemporaryFile(prefix = 'injector-dl-data-') - task = tasks.Task(self._do_download(), "download " + self.url) - self.downloaded = task.finished - - def _do_download(self): - """Will trigger L{downloaded} when done (on success or failure).""" - from ._download_child import download_in_thread - - # (changed if we get redirected) - current_url = self.url - - redirections_remaining = 10 - - while True: - result = [] - thread_blocker = tasks.Blocker("wait for thread " + current_url) - def notify_done(status, ex = None, redirect = None): - result.append((status, redirect)) - def wake_up_main(): - thread_blocker.trigger(ex) - return False - gobject.idle_add(wake_up_main) - child = threading.Thread(target = lambda: download_in_thread(current_url, self.tempfile, self.modification_time, notify_done)) - child.daemon = True - child.start() - - # Wait for child to complete download. - yield thread_blocker - - # Download is complete... - child.join() - - (status, redirect), = result - - if status != RESULT_REDIRECT: - assert not redirect, redirect - break - - assert redirect - current_url = redirect - - if redirections_remaining == 0: - raise DownloadError("Too many redirections {url} -> {current}".format( - url = self.url, - current = current_url)) - redirections_remaining -= 1 - # (else go around the loop again) + self._aborted = tasks.Blocker("abort " + url) + def _finish(self, status): assert self.status is download_fetching assert self.tempfile is not None + assert not self.aborted_by_user if status == RESULT_NOT_MODIFIED: debug("%s not modified", self.url) @@ -136,21 +93,13 @@ class Download(object): self.unmodified = True self.status = download_complete self._final_total_size = 0 - self.downloaded.trigger() return self._final_total_size = self.get_bytes_downloaded_so_far() self.tempfile = None - if self.aborted_by_user: - assert self.downloaded.happened - raise DownloadAborted() - try: - - tasks.check(thread_blocker) - assert status == RESULT_OK # Check that the download has the correct size, if we know what it should be. @@ -162,11 +111,9 @@ class Download(object): 'Received: %(size)d bytes') % {'url': self.url, 'expected_size': self.expected_size, 'size': self._final_total_size}) except: self.status = download_failed - _unused, ex, tb = sys.exc_info() - self.downloaded.trigger(exception = (ex, tb)) + raise else: self.status = download_complete - self.downloaded.trigger() def abort(self): """Signal the current download to stop. @@ -183,7 +130,7 @@ class Download(object): self.aborted_by_user = True self.tempfile.close() self.tempfile = None - self.downloaded.trigger((DownloadAborted(), None)) + self._aborted.trigger() def get_current_fraction(self): """Returns the current fraction of this download that has been fetched (from 0 to 1), diff --git a/zeroinstall/injector/fetch.py b/zeroinstall/injector/fetch.py index 99b3962..0bf6bc5 100644 --- a/zeroinstall/injector/fetch.py +++ b/zeroinstall/injector/fetch.py @@ -85,17 +85,25 @@ class Fetcher(object): @ivar key_info: caches information about GPG keys @type key_info: {str: L{KeyInfoFetcher}} """ - __slots__ = ['config', 'key_info'] + __slots__ = ['config', 'key_info', '_scheduler'] def __init__(self, config): assert config.handler, "API change!" self.config = config self.key_info = {} + self._scheduler = None @property def handler(self): return self.config.handler + @property + def scheduler(self): + if self._scheduler is None: + from . import scheduler + self._scheduler = scheduler.DownloadScheduler() + return self._scheduler + @tasks.async def cook(self, required_digest, recipe, stores, force = False, impl_hint = None): """Follow a Recipe. @@ -512,4 +520,5 @@ class Fetcher(object): dl = download.Download(url, hint = hint, modification_time = modification_time, expected_size = expected_size) self.handler.monitor_download(dl) + dl.downloaded = self.scheduler.download(dl) return dl diff --git a/zeroinstall/injector/scheduler.py b/zeroinstall/injector/scheduler.py new file mode 100644 index 0000000..3c276a6 --- /dev/null +++ b/zeroinstall/injector/scheduler.py @@ -0,0 +1,107 @@ +""" +Manage pools of connections so that we can limit the number of requests per site and reuse +connections. +@since: 1.6 +""" + +# Copyright (C) 2011, Thomas Leonard +# See the README file for details, or visit http://0install.net. + +import urlparse +from collections import defaultdict +import threading, gobject + +from zeroinstall.support import tasks +from zeroinstall.injector import download + +default_port = { + 'http': 80, + 'https': 443, +} + +class DownloadStep: + url = None + status = None + redirect = None + +class DownloadScheduler: + """Assigns (and re-assigns on redirect) Downloads to Sites, allowing per-site limits and connection pooling. + @since: 1.6""" + def __init__(self): + self._sites = defaultdict(lambda: Site()) # (scheme://host:port) -> Site + + @tasks.async + def download(self, dl): + # (changed if we get redirected) + current_url = dl.url + + redirections_remaining = 10 + + # Assign the Download to a Site based on its scheme, host and port. If the result is a redirect, + # reassign it to the appropriate new site. Note that proxy handling happens later; we want to group + # and limit by the target site, not treat everything as going to a single site (the proxy). + while True: + location_parts = urlparse.urlparse(current_url) + + site_key = (location_parts.scheme, + location_parts.hostname, + location_parts.port or default_port.get(location_parts.scheme, None)) + + step = DownloadStep() + step.dl = dl + step.url = current_url + blocker = self._sites[site_key].download(step) + yield blocker + tasks.check(blocker) + + if not step.redirect: + break + + current_url = step.redirect + + if redirections_remaining == 0: + raise download.DownloadError("Too many redirections {url} -> {current}".format( + url = dl.url, + current = current_url)) + redirections_remaining -= 1 + # (else go around the loop again) + +class Site: + """Represents a service accepting download requests. All requests with the same scheme, host and port are + handled by the same Site object, allowing it to do connection pooling and queuing, although the current + implementation doesn't do either.""" + @tasks.async + def download(self, step): + from ._download_child import download_in_thread + + thread_blocker = tasks.Blocker("wait for thread " + step.url) + def notify_done(status, ex = None, redirect = None): + step.status = status + step.redirect = redirect + def wake_up_main(): + thread_blocker.trigger(ex) + return False + gobject.idle_add(wake_up_main) + child = threading.Thread(target = lambda: download_in_thread(step.url, step.dl.tempfile, step.dl.modification_time, notify_done)) + child.daemon = True + child.start() + + # Wait for child to complete download. + yield thread_blocker, step.dl._aborted + + if step.dl._aborted.happened: + # Don't wait for child to finish (might be stuck doing IO) + raise download.DownloadAborted() + + # Download is complete... + child.join() + + tasks.check(thread_blocker) + + if step.status == download.RESULT_REDIRECT: + assert step.redirect + return # DownloadScheduler will handle it + + assert not step.redirect, step.redirect + + step.dl._finish(step.status) -- 2.11.4.GIT