From cc8975bf656476cde2099a6ebee3bdfc7c3dad35 Mon Sep 17 00:00:00 2001 From: Thomas Leonard Date: Wed, 14 Sep 2011 18:04:16 +0100 Subject: [PATCH] Use threads, not processes, for downloads This is the first step towards supporting Aleksey Lim's efficiency improvements patch. --- zeroinstall/injector/_download_child.py | 41 ++++++------- zeroinstall/injector/download.py | 104 ++++++++++++++------------------ 2 files changed, 63 insertions(+), 82 deletions(-) diff --git a/zeroinstall/injector/_download_child.py b/zeroinstall/injector/_download_child.py index 0ab1bc2..e11369e 100644 --- a/zeroinstall/injector/_download_child.py +++ b/zeroinstall/injector/_download_child.py @@ -1,20 +1,15 @@ -# Copyright (C) 2010, Thomas Leonard +# Copyright (C) 2011, Thomas Leonard # See the README file for details, or visit http://0install.net. -import sys, os -sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.dirname(__file__)))) +import sys from zeroinstall import _ +from zeroinstall.injector import download -# NB: duplicated in download.py -RESULT_OK = 0 -RESULT_FAILED = 1 -RESULT_NOT_MODIFIED = 2 - -def _download_as_child(url, if_modified_since): - from httplib import HTTPException - from urllib2 import urlopen, Request, HTTPError, URLError +def download_in_thread(url, target_file, if_modified_since, notify_done): try: + from httplib import HTTPException + from urllib2 import urlopen, Request, HTTPError, URLError #print "Child downloading", url if url.startswith('http:') or url.startswith('https:') or url.startswith('ftp:'): req = Request(url) @@ -31,19 +26,17 @@ def _download_as_child(url, if_modified_since): while True: data = sock.recv(256) if not data: break - os.write(1, data) + target_file.write(data) + target_file.flush() - sys.exit(RESULT_OK) + notify_done(download.RESULT_OK) except (HTTPError, URLError, HTTPException) as ex: if isinstance(ex, HTTPError) and ex.code == 304: # Not modified - sys.exit(RESULT_NOT_MODIFIED) - print >>sys.stderr, "Error downloading '" + url + "': " + (str(ex) or str(ex.__class__.__name__)) - sys.exit(RESULT_FAILED) - -if __name__ == '__main__': - assert (len(sys.argv) == 2) or (len(sys.argv) == 3), "Usage: download URL [If-Modified-Since-Date], not %s" % sys.argv - if len(sys.argv) >= 3: - if_modified_since_date = sys.argv[2] - else: - if_modified_since_date = None - _download_as_child(sys.argv[1], if_modified_since_date) + notify_done(download.RESULT_NOT_MODIFIED) + else: + #print >>sys.stderr, "Error downloading '" + url + "': " + (str(ex) or str(ex.__class__.__name__)) + __, ex, tb = sys.exc_info() + notify_done(download.RESULT_FAILED, (download.DownloadError(unicode(ex)), tb)) + except Exception as ex: + __, ex, tb = sys.exc_info() + notify_done(download.RESULT_FAILED, (ex, tb)) diff --git a/zeroinstall/injector/download.py b/zeroinstall/injector/download.py index c039bb9..59efbda 100644 --- a/zeroinstall/injector/download.py +++ b/zeroinstall/injector/download.py @@ -9,10 +9,7 @@ This is the low-level interface for downloading interfaces, implementations, ico # Copyright (C) 2009, Thomas Leonard # See the README file for details, or visit http://0install.net. -import tempfile, os, sys, subprocess - -if __name__ == '__main__': - sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.dirname(__file__)))) +import tempfile, os, sys, threading from zeroinstall import SafeException from zeroinstall.support import tasks @@ -29,6 +26,8 @@ RESULT_OK = 0 RESULT_FAILED = 1 RESULT_NOT_MODIFIED = 2 +import gobject; gobject.threads_init() + class DownloadError(SafeException): """Download process failed.""" pass @@ -46,23 +45,19 @@ class Download(object): @type tempfile: file @ivar status: the status of the download @type status: (download_starting | download_fetching | download_failed | download_complete) - @ivar errors: data received from the child's stderr - @type errors: str @ivar expected_size: the expected final size of the file @type expected_size: int | None @ivar downloaded: triggered when the download ends (on success or failure) @type downloaded: L{tasks.Blocker} @ivar hint: hint passed by and for caller @type hint: object - @ivar child: the child process - @type child: subprocess.Popen @ivar aborted_by_user: whether anyone has called L{abort} @type aborted_by_user: bool @ivar unmodified: whether the resource was not modified since the modification_time given at construction @type unmodified: bool """ - __slots__ = ['url', 'tempfile', 'status', 'errors', 'expected_size', 'downloaded', - 'hint', 'child', '_final_total_size', 'aborted_by_user', + __slots__ = ['url', 'tempfile', 'status', 'expected_size', 'downloaded', + 'hint', '_final_total_size', 'aborted_by_user', 'modification_time', 'unmodified'] def __init__(self, url, hint = None, modification_time = None): @@ -80,13 +75,10 @@ class Download(object): self.unmodified = False self.tempfile = None # Stream for result - self.errors = None self.downloaded = None self.expected_size = None # Final size (excluding skipped bytes) self._final_total_size = None # Set when download is finished - - self.child = None def start(self): """Create a temporary file and begin the download. @@ -94,6 +86,7 @@ class Download(object): assert self.status == download_starting assert self.downloaded is None + self.status = download_fetching self.tempfile = tempfile.TemporaryFile(prefix = 'injector-dl-data-') task = tasks.Task(self._do_download(), "download " + self.url) @@ -101,38 +94,30 @@ class Download(object): def _do_download(self): """Will trigger L{downloaded} when done (on success or failure).""" - self.errors = '' - - # Can't use fork here, because Windows doesn't have it - assert self.child is None, self.child - my_dir = os.path.dirname(__file__) - child_args = [sys.executable, '-u', os.path.join(my_dir, '_download_child.py'), self.url] - if self.modification_time: child_args.append(self.modification_time) - self.child = subprocess.Popen(child_args, stderr = subprocess.PIPE, stdout = self.tempfile) - - self.status = download_fetching - - # Wait for child to exit, collecting error output as we go - - while True: - yield tasks.InputBlocker(self.child.stderr, "read data from " + self.url) - - data = os.read(self.child.stderr.fileno(), 100) - if not data: - break - self.errors += data + from ._download_child import download_in_thread + + result = [] + thread_blocker = tasks.Blocker("wait for thread " + self.url) + def notify_done(status, ex = None): + result.append(status) + def wake_up_main(): + thread_blocker.trigger(ex) + return False + gobject.idle_add(wake_up_main) + child = threading.Thread(target = lambda: download_in_thread(self.url, self.tempfile, self.modification_time, notify_done)) + child.daemon = True + child.start() + + # Wait for child to complete download. + yield thread_blocker # Download is complete... + child.join() assert self.status is download_fetching assert self.tempfile is not None - assert self.child is not None - - status = self.child.wait() - self.child = None - errors = self.errors - self.errors = None + status, = result if status == RESULT_NOT_MODIFIED: debug("%s not modified", self.url) @@ -143,30 +128,27 @@ class Download(object): self.downloaded.trigger() return - if status and not self.aborted_by_user and not errors: - errors = _('Download process exited with error status ' - 'code %s') % hex(status) - self._final_total_size = self.get_bytes_downloaded_so_far() - stream = self.tempfile self.tempfile = None + if self.aborted_by_user: + assert self.downloaded.happened + raise DownloadAborted() + try: - if self.aborted_by_user: - raise DownloadAborted(errors) - if errors: - raise DownloadError(errors.strip()) + tasks.check(thread_blocker) + + assert status == RESULT_OK # Check that the download has the correct size, if we know what it should be. if self.expected_size is not None: - size = os.fstat(stream.fileno()).st_size - if size != self.expected_size: + if self._final_total_size != self.expected_size: raise SafeException(_('Downloaded archive has incorrect size.\n' 'URL: %(url)s\n' 'Expected: %(expected_size)d bytes\n' - 'Received: %(size)d bytes') % {'url': self.url, 'expected_size': self.expected_size, 'size': size}) + 'Received: %(size)d bytes') % {'url': self.url, 'expected_size': self.expected_size, 'size': self._final_total_size}) except: self.status = download_failed _unused, ex, tb = sys.exc_info() @@ -178,13 +160,19 @@ class Download(object): def abort(self): """Signal the current download to stop. @postcondition: L{aborted_by_user}""" - if self.child is not None: - info(_("Killing download process %s"), self.child.pid) - import signal - os.kill(self.child.pid, signal.SIGTERM) + self.status = download_failed + + if self.tempfile is not None: + info(_("Aborting download of %s"), self.url) + # TODO: we currently just close the output file; the thread will end when it tries to + # write to it. We should try harder to stop the thread immediately (e.g. by closing its + # socket when known), although we can never cover all cases (e.g. a stuck DNS lookup). + # In any case, we don't wait for the child to exit before notifying tasks that are waiting + # on us. self.aborted_by_user = True - else: - self.status = download_failed + self.tempfile.close() + self.tempfile = None + self.downloaded.trigger((DownloadAborted(), None)) def get_current_fraction(self): """Returns the current fraction of this download that has been fetched (from 0 to 1), @@ -208,7 +196,7 @@ class Download(object): elif self.status is download_fetching: return os.fstat(self.tempfile.fileno()).st_size else: - return self._final_total_size + return self._final_total_size or 0 def __str__(self): return _("") % self.url -- 2.11.4.GIT