From 592963a5da3dc428959af5829693ccdb43eaed94 Mon Sep 17 00:00:00 2001 From: Thomas Leonard Date: Sun, 27 Jan 2008 19:33:12 +0000 Subject: [PATCH] Downloads manage their own scheduling. Handler just records what's being downloaded; it doesn't actually forward data and control the process. --- zeroinstall/injector/download.py | 83 ++++++++++++++++++++----------------- zeroinstall/injector/handler.py | 43 ++++--------------- zeroinstall/injector/iface_cache.py | 20 ++++----- zeroinstall/injector/policy.py | 1 + zeroinstall/support/tasks.py | 12 +++--- 5 files changed, 70 insertions(+), 89 deletions(-) diff --git a/zeroinstall/injector/download.py b/zeroinstall/injector/download.py index e82dd5c..4721271 100644 --- a/zeroinstall/injector/download.py +++ b/zeroinstall/injector/download.py @@ -34,10 +34,10 @@ class Download(object): "Initial status is starting." self.url = url self.status = download_starting - self.downloaded = tasks.Blocker("Download " + url) self.tempfile = None # Stream for result self.errors = None + self.downloaded = None self.expected_size = None # Final size (excluding skipped bytes) @@ -45,11 +45,16 @@ class Download(object): self.child_stderr = None def start(self): - """Returns stderr stream from child. Call error_stream_closed() when - it returns EOF.""" assert self.status == download_starting + assert self.downloaded is None + self.tempfile = tempfile.TemporaryFile(prefix = 'injector-dl-data-') + task = tasks.Task(self._do_download(), "download " + self.url) + self.downloaded = task.finished + + def _do_download(self): + """Will trigger L{downloaded} when done (on success or failure).""" error_r, error_w = os.pipe() self.errors = '' @@ -67,42 +72,21 @@ class Download(object): # We are the parent os.close(error_w) self.status = download_fetching - return os.fdopen(error_r, 'r') - - def download_as_child(self): - from urllib2 import urlopen, HTTPError, URLError - try: - import shutil - #print "Child downloading", self.url - if self.url.startswith('/'): - if not os.path.isfile(self.url): - print >>sys.stderr, "File '%s' does not " \ - "exist!" % self.url - return - src = file(self.url) - elif self.url.startswith('http:') or self.url.startswith('ftp:'): - src = urlopen(self.url) - else: - raise Exception('Unsupported URL protocol in: ' + self.url) - shutil.copyfileobj(src, self.tempfile) - self.tempfile.flush() - - os._exit(0) - except (HTTPError, URLError), ex: - print >>sys.stderr, "Error downloading '" + self.url + "': " + str(ex) - except: - traceback.print_exc() - - def error_stream_data(self, data): - """Passed with result of os.read(error_stream, n). Can be - called multiple times, once for each read.""" - assert data - assert self.status is download_fetching - self.errors += data + #stream = os.fdopen(error_r, 'r') + + # Wait for child to exit, collecting error output as we go + + while True: + yield tasks.InputBlocker(error_r, "read data from " + self.url) + + data = os.read(error_r, 100) + if not data: + break + self.errors += data + + # Download is complete... - def error_stream_closed(self): - """Ends a download. Status changes from fetching to checking.""" assert self.status is download_fetching assert self.tempfile is not None assert self.child_pid is not None @@ -141,6 +125,31 @@ class Download(object): self.status = download_checking self.downloaded.trigger() + def download_as_child(self): + from urllib2 import urlopen, HTTPError, URLError + try: + import shutil + #print "Child downloading", self.url + if self.url.startswith('/'): + if not os.path.isfile(self.url): + print >>sys.stderr, "File '%s' does not " \ + "exist!" % self.url + return + src = file(self.url) + elif self.url.startswith('http:') or self.url.startswith('ftp:'): + src = urlopen(self.url) + else: + raise Exception('Unsupported URL protocol in: ' + self.url) + + shutil.copyfileobj(src, self.tempfile) + self.tempfile.flush() + + os._exit(0) + except (HTTPError, URLError), ex: + print >>sys.stderr, "Error downloading '" + self.url + "': " + str(ex) + except: + traceback.print_exc() + def abort(self): if self.child_pid is not None: info("Killing download process %s", self.child_pid) diff --git a/zeroinstall/injector/handler.py b/zeroinstall/injector/handler.py index ff70da3..653ad33 100644 --- a/zeroinstall/injector/handler.py +++ b/zeroinstall/injector/handler.py @@ -34,17 +34,13 @@ class Handler(object): def monitor_download(self, dl): """Called when a new L{download} is started. - Call L{download.Download.start} to start the download and get the error - stream, and then call L{download.Download.error_stream_data} whenever - you read any data from it, including nothing (end-of-file), which - indicates that the download is finished.""" - error_stream = dl.start() - self.monitored_downloads[dl.url] = (error_stream, dl) - - import gobject - gobject.io_add_watch(error_stream.fileno(), - gobject.IO_IN | gobject.IO_ERR | gobject.IO_HUP, - self._error_stream_ready, dl) + This is mainly used by the GUI to display the progress bar.""" + dl.start() + self.monitored_downloads[dl.url] = dl + + def download_done(): + yield dl.downloaded + monitor = tasks.Task(download_done(), "download monitor") def _error_stream_ready(self, fd, cond, dl): debug("Download stream for %s is ready...", dl) @@ -83,31 +79,6 @@ class Handler(object): tasks.check(blocker) - def __wait_for_downloads(self): - """Monitor all downloads, waiting until they are complete. This is suitable - for use by non-interactive programs. - @return: list of error messages, one per failed download (since 0.32) - @rtype: [str] or None""" - - import gobject - - if self.monitored_downloads: - assert self._loop is None # Avoid recursion - self._loop_errors = [] - self._loop = gobject.MainLoop(gobject.main_context_default()) - try: - debug("Entering mainloop, waiting for %d download(s)", len(self.monitored_downloads)) - self._loop.run() - finally: - self._loop = None - errors = self._loop_errors - self._loop_errors = None - if errors: - return errors - else: - debug("No downloads in progress, so not waiting") - return None - def get_download(self, url, force = False): """Return the Download object currently downloading 'url'. If no download for this URL has been started, start one now (and diff --git a/zeroinstall/injector/iface_cache.py b/zeroinstall/injector/iface_cache.py index e991d08..6c7d201 100644 --- a/zeroinstall/injector/iface_cache.py +++ b/zeroinstall/injector/iface_cache.py @@ -95,26 +95,26 @@ class PendingFeed(object): while blockers: yield blockers - tasks.check(blockers) old_blockers = blockers blockers = [] for b in old_blockers: - if b.happened: - dl, stream = downloads[b] - try: + try: + tasks.check(b) + if b.happened: + dl, stream = downloads[b] stream.seek(0) self._downloaded_key(stream) any_success = True - except Exception, ex: - warn("Failed to import key for '%s': %s", self.url, str(ex)) - exception = ex - else: - blockers.append(b) + else: + blockers.append(b) + except Exception: + warn("Failed to import key for '%s': %s", self.url, str(ex)) + _, exception, tb = sys.exc_info() if exception and not any_success: - raise exception + raise exception, None, tb self.recheck() diff --git a/zeroinstall/injector/policy.py b/zeroinstall/injector/policy.py index 1df8946..4404600 100644 --- a/zeroinstall/injector/policy.py +++ b/zeroinstall/injector/policy.py @@ -35,6 +35,7 @@ def _cook(policy, required_digest, recipe, force = False): blockers = [] for step in recipe.steps: blocker, stream = policy.download_archive(step, force = force) + assert stream blockers.append(blocker) streams[step] = stream diff --git a/zeroinstall/support/tasks.py b/zeroinstall/support/tasks.py index 4603ac5..42eb12c 100644 --- a/zeroinstall/support/tasks.py +++ b/zeroinstall/support/tasks.py @@ -156,9 +156,9 @@ class IdleBlocker(Blocker): class TimeoutBlocker(Blocker): """Triggers after a set number of seconds.""" - def __init__(self, timeout): + def __init__(self, timeout, name): """Trigger after 'timeout' seconds (may be a fraction).""" - Blocker.__init__(self) + Blocker.__init__(self, name) gobject.timeout_add(long(timeout * 1000), self._timeout) def _timeout(self): @@ -172,8 +172,8 @@ class InputBlocker(Blocker): """Triggers when os.read(stream) would not block.""" _tag = None _stream = None - def __init__(self, stream): - Blocker.__init__(self) + def __init__(self, stream, name): + Blocker.__init__(self, name) self._stream = stream def add_task(self, task): @@ -192,8 +192,8 @@ class OutputBlocker(Blocker): """Triggers when os.write(stream) would not block.""" _tag = None _stream = None - def __init__(self, stream): - Blocker.__init__(self) + def __init__(self, stream, name): + Blocker.__init__(self, name) self._stream = stream def add_task(self, task): -- 2.11.4.GIT