From 6461acde05a5ecbd608be3f4ec1fc0b91a2ad8e0 Mon Sep 17 00:00:00 2001 From: Thomas Leonard Date: Sun, 19 Aug 2012 12:41:26 +0100 Subject: [PATCH] Fixed some ResourceWarnings on Python 3 Ensure that all temporary files and sockets are closed promptly, without relying on the GC to do it. This avoids ResourceWarnings. Note that urllib fails to close sockets in some cases (http://bugs.python.org/issue12133), so we cannot enable the warnings for all tests yet. --- tests/basetest.py | 4 - tests/testdownload.py | 164 ++++++++++++++++++-------------- tests/teststore.py | 3 +- zeroinstall/injector/_download_child.py | 4 + zeroinstall/injector/download.py | 5 +- zeroinstall/injector/fetch.py | 140 +++++++++++++++------------ zeroinstall/injector/iface_cache.py | 4 +- zeroinstall/zerostore/optimise.py | 2 + 8 files changed, 187 insertions(+), 139 deletions(-) diff --git a/tests/basetest.py b/tests/basetest.py index bd04256..8399db2 100755 --- a/tests/basetest.py +++ b/tests/basetest.py @@ -170,10 +170,6 @@ class BaseTest(unittest.TestCase): def setUp(self): warnings.resetwarnings() - if sys.version_info[0] > 2: - # Currently, we rely on the GC to close download streams automatically, so don't warn about it. - warnings.filterwarnings("ignore", category = ResourceWarning) - self.config_home = tempfile.mktemp() self.cache_home = tempfile.mktemp() self.cache_system = tempfile.mktemp() diff --git a/tests/testdownload.py b/tests/testdownload.py index ccad3b4..f134a61 100755 --- a/tests/testdownload.py +++ b/tests/testdownload.py @@ -3,7 +3,7 @@ from __future__ import with_statement from basetest import BaseTest, StringIO import sys, tempfile, os import unittest -import logging +import logging, warnings from logging import getLogger, WARN, ERROR from contextlib import contextmanager @@ -80,6 +80,17 @@ def trapped_exit(expected_exit_status): finally: os._exit = old_exit +@contextmanager +def resourcewarnings_suppressed(): + import gc + if sys.version_info[0] < 3: + yield + else: + with warnings.catch_warnings(): + warnings.filterwarnings("ignore", category = ResourceWarning) + yield + gc.collect() + class Reply: def __init__(self, reply): self.reply = reply @@ -157,6 +168,9 @@ class TestDownload(BaseTest): BaseTest.tearDown(self) kill_server_process() + # Flush out ResourceWarnings + import gc; gc.collect() + def testRejectKey(self): with output_suppressed(): run_server('Hello', '6FCF121BE2390E0B.gpg', '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B') @@ -403,92 +417,96 @@ class TestDownload(BaseTest): sys.stdout = old_out def testRecipeFailure(self): - old_out = sys.stdout - try: - run_server('*') - driver = Driver(requirements = Requirements(os.path.abspath('Recipe.xml')), config = self.config) + with resourcewarnings_suppressed(): + old_out = sys.stdout try: - download_and_execute(driver, []) - assert False - except download.DownloadError as ex: - if "Connection" not in str(ex): - raise - finally: - sys.stdout = old_out + run_server('*') + driver = Driver(requirements = Requirements(os.path.abspath('Recipe.xml')), config = self.config) + try: + download_and_execute(driver, []) + assert False + except download.DownloadError as ex: + if "Connection" not in str(ex): + raise + finally: + sys.stdout = old_out def testMirrors(self): - getLogger().setLevel(logging.ERROR) - trust.trust_db.trust_key('DE937DD411906ACF7C263B396FCF121BE2390E0B', 'example.com:8000') - run_server(server.Give404('/Hello.xml'), - '/0mirror/feeds/http/example.com:8000/Hello.xml/latest.xml', - '/0mirror/keys/6FCF121BE2390E0B.gpg', - server.Give404('/HelloWorld.tgz'), - '/0mirror/archive/http%3A%23%23example.com%3A8000%23HelloWorld.tgz') - driver = Driver(requirements = Requirements('http://example.com:8000/Hello.xml'), config = self.config) - self.config.mirror = 'http://example.com:8000/0mirror' - - refreshed = driver.solve_with_downloads() - tasks.wait_for_blocker(refreshed) - assert driver.solver.ready - - #getLogger().setLevel(logging.WARN) - downloaded = driver.download_uncached_implementations() - tasks.wait_for_blocker(downloaded) - path = self.config.stores.lookup_any(driver.solver.selections.selections['http://example.com:8000/Hello.xml'].digests) - assert os.path.exists(os.path.join(path, 'HelloWorld', 'main')) + with resourcewarnings_suppressed(): + getLogger().setLevel(logging.ERROR) + trust.trust_db.trust_key('DE937DD411906ACF7C263B396FCF121BE2390E0B', 'example.com:8000') + run_server(server.Give404('/Hello.xml'), + '/0mirror/feeds/http/example.com:8000/Hello.xml/latest.xml', + '/0mirror/keys/6FCF121BE2390E0B.gpg', + server.Give404('/HelloWorld.tgz'), + '/0mirror/archive/http%3A%23%23example.com%3A8000%23HelloWorld.tgz') + driver = Driver(requirements = Requirements('http://example.com:8000/Hello.xml'), config = self.config) + self.config.mirror = 'http://example.com:8000/0mirror' - def testImplMirror(self): - # This is like testMirror, except we have a different archive (that generates the same content), - # rather than an exact copy of the unavailable archive. - trust.trust_db.trust_key('DE937DD411906ACF7C263B396FCF121BE2390E0B', 'example.com:8000') - run_server('/Hello.xml', - '/6FCF121BE2390E0B.gpg', - server.Give404('/HelloWorld.tgz'), - server.Give404('/0mirror/archive/http%3A%2F%2Flocalhost%3A8000%2FHelloWorld.tgz'), - '/0mirror/feeds/http/example.com:8000/Hello.xml/impl/sha1=3ce644dc725f1d21cfcf02562c76f375944b266a') - driver = Driver(requirements = Requirements('http://example.com:8000/Hello.xml'), config = self.config) - self.config.mirror = 'http://example.com:8000/0mirror' - - refreshed = driver.solve_with_downloads() - tasks.wait_for_blocker(refreshed) - assert driver.solver.ready - - getLogger().setLevel(logging.ERROR) - downloaded = driver.download_uncached_implementations() - tasks.wait_for_blocker(downloaded) - path = self.config.stores.lookup_any(driver.solver.selections.selections['http://example.com:8000/Hello.xml'].digests) - assert os.path.exists(os.path.join(path, 'HelloWorld', 'main')) + refreshed = driver.solve_with_downloads() + tasks.wait_for_blocker(refreshed) + assert driver.solver.ready - def testReplay(self): - old_out = sys.stdout - try: - sys.stdout = StringIO() - getLogger().setLevel(ERROR) - iface = self.config.iface_cache.get_interface('http://example.com:8000/Hello.xml') - mtime = int(os.stat('Hello-new.xml').st_mtime) - with open('Hello-new.xml', 'rb') as stream: - self.config.iface_cache.update_feed_from_network(iface.uri, stream.read(), mtime + 10000) + #getLogger().setLevel(logging.WARN) + downloaded = driver.download_uncached_implementations() + tasks.wait_for_blocker(downloaded) + path = self.config.stores.lookup_any(driver.solver.selections.selections['http://example.com:8000/Hello.xml'].digests) + assert os.path.exists(os.path.join(path, 'HelloWorld', 'main')) + def testImplMirror(self): + with resourcewarnings_suppressed(): + # This is like testMirror, except we have a different archive (that generates the same content), + # rather than an exact copy of the unavailable archive. trust.trust_db.trust_key('DE937DD411906ACF7C263B396FCF121BE2390E0B', 'example.com:8000') - run_server(server.Give404('/Hello.xml'), 'latest.xml', '/0mirror/keys/6FCF121BE2390E0B.gpg', 'Hello.xml') + run_server('/Hello.xml', + '/6FCF121BE2390E0B.gpg', + server.Give404('/HelloWorld.tgz'), + server.Give404('/0mirror/archive/http%3A%2F%2Flocalhost%3A8000%2FHelloWorld.tgz'), + '/0mirror/feeds/http/example.com:8000/Hello.xml/impl/sha1=3ce644dc725f1d21cfcf02562c76f375944b266a') + driver = Driver(requirements = Requirements('http://example.com:8000/Hello.xml'), config = self.config) self.config.mirror = 'http://example.com:8000/0mirror' - # Update from mirror (should ignore out-of-date timestamp) - refreshed = self.config.fetcher.download_and_import_feed(iface.uri, self.config.iface_cache) + refreshed = driver.solve_with_downloads() tasks.wait_for_blocker(refreshed) + assert driver.solver.ready - # Update from upstream (should report an error) - refreshed = self.config.fetcher.download_and_import_feed(iface.uri, self.config.iface_cache) + getLogger().setLevel(logging.ERROR) + downloaded = driver.download_uncached_implementations() + tasks.wait_for_blocker(downloaded) + path = self.config.stores.lookup_any(driver.solver.selections.selections['http://example.com:8000/Hello.xml'].digests) + assert os.path.exists(os.path.join(path, 'HelloWorld', 'main')) + + def testReplay(self): + with resourcewarnings_suppressed(): + old_out = sys.stdout try: + sys.stdout = StringIO() + getLogger().setLevel(ERROR) + iface = self.config.iface_cache.get_interface('http://example.com:8000/Hello.xml') + mtime = int(os.stat('Hello-new.xml').st_mtime) + with open('Hello-new.xml', 'rb') as stream: + self.config.iface_cache.update_feed_from_network(iface.uri, stream.read(), mtime + 10000) + + trust.trust_db.trust_key('DE937DD411906ACF7C263B396FCF121BE2390E0B', 'example.com:8000') + run_server(server.Give404('/Hello.xml'), 'latest.xml', '/0mirror/keys/6FCF121BE2390E0B.gpg', 'Hello.xml') + self.config.mirror = 'http://example.com:8000/0mirror' + + # Update from mirror (should ignore out-of-date timestamp) + refreshed = self.config.fetcher.download_and_import_feed(iface.uri, self.config.iface_cache) tasks.wait_for_blocker(refreshed) - raise Exception("Should have been rejected!") - except model.SafeException as ex: - assert "New feed's modification time is before old version" in str(ex) - # Must finish with the newest version - self.assertEqual(1342285569, self.config.iface_cache._get_signature_date(iface.uri)) - finally: - sys.stdout = old_out + # Update from upstream (should report an error) + refreshed = self.config.fetcher.download_and_import_feed(iface.uri, self.config.iface_cache) + try: + tasks.wait_for_blocker(refreshed) + raise Exception("Should have been rejected!") + except model.SafeException as ex: + assert "New feed's modification time is before old version" in str(ex) + + # Must finish with the newest version + self.assertEqual(1342285569, self.config.iface_cache._get_signature_date(iface.uri)) + finally: + sys.stdout = old_out def testBackground(self, verbose = False): r = Requirements('http://example.com:8000/Hello.xml') diff --git a/tests/teststore.py b/tests/teststore.py index bd92f00..a5f5a95 100755 --- a/tests/teststore.py +++ b/tests/teststore.py @@ -312,7 +312,8 @@ class TestStore(BaseTest): cli.do_copy([source, copy]) - self.assertEqual('Hello', open(os.path.join(copy, digest, 'MyFile')).read()) + with open(os.path.join(copy, digest, 'MyFile'), 'rt') as stream: + self.assertEqual('Hello', stream.read()) finally: support.ro_rmtree(copy) diff --git a/zeroinstall/injector/_download_child.py b/zeroinstall/injector/_download_child.py index 8d30073..8c983dd 100644 --- a/zeroinstall/injector/_download_child.py +++ b/zeroinstall/injector/_download_child.py @@ -76,6 +76,7 @@ for klass in [urllib2.ProxyHandler, urllib2.UnknownHandler, urllib2.HTTPHandler, _my_urlopen.add_handler(klass()) def download_in_thread(url, target_file, if_modified_since, notify_done): + src = None try: #print "Child downloading", url if url.startswith('http:') or url.startswith('https:') or url.startswith('ftp:'): @@ -112,3 +113,6 @@ def download_in_thread(url, target_file, if_modified_since, notify_done): except Exception as ex: __, ex, tb = sys.exc_info() notify_done(download.RESULT_FAILED, (ex, tb)) + finally: + if src is not None: + src.close() diff --git a/zeroinstall/injector/download.py b/zeroinstall/injector/download.py index ddd69f3..84e5c1b 100644 --- a/zeroinstall/injector/download.py +++ b/zeroinstall/injector/download.py @@ -155,7 +155,10 @@ class Download(object): """Get the download progress. Will be zero if the download has not yet started. @rtype: int""" if self.status is download_fetching: - return os.fstat(self.tempfile.fileno()).st_size + if self.tempfile.closed: + return 1 + else: + return os.fstat(self.tempfile.fileno()).st_size else: return self._final_total_size or 0 diff --git a/zeroinstall/injector/fetch.py b/zeroinstall/injector/fetch.py index 7c6ef08..942e8f7 100644 --- a/zeroinstall/injector/fetch.py +++ b/zeroinstall/injector/fetch.py @@ -61,8 +61,8 @@ class KeyInfoFetcher: @tasks.async def fetch_key_info(): + tempfile = dl.tempfile try: - tempfile = dl.tempfile yield dl.downloaded self.blocker = None tasks.check(dl.downloaded) @@ -76,6 +76,8 @@ class KeyInfoFetcher: root = doc.documentElement root.appendChild(doc.createTextNode(_('Error getting key information: %s') % ex)) self.info.append(root) + finally: + tempfile.close() self.blocker = fetch_key_info() @@ -117,37 +119,41 @@ class Fetcher(object): # Start a download for each ingredient blockers = [] steps = [] - for stepdata in recipe.steps: - cls = StepRunner.class_for(stepdata) - step = cls(stepdata, impl_hint=impl_hint) - step.prepare(self, blockers) - steps.append(step) - - while blockers: - yield blockers - tasks.check(blockers) - blockers = [b for b in blockers if not b.happened] - - - if self.external_store: - # Note: external_store will not yet work with non- steps. - streams = [step.stream for step in steps] - self._add_to_external_store(required_digest, recipe.steps, streams) - else: - # Create an empty directory for the new implementation - store = stores.stores[0] - tmpdir = store.get_tmp_dir_for(required_digest) - try: - # Unpack each of the downloaded archives into it in turn - for step in steps: - step.apply(tmpdir) - # Check that the result is correct and store it in the cache - store.check_manifest_and_rename(required_digest, tmpdir) - tmpdir = None - finally: - # If unpacking fails, remove the temporary directory - if tmpdir is not None: - support.ro_rmtree(tmpdir) + try: + for stepdata in recipe.steps: + cls = StepRunner.class_for(stepdata) + step = cls(stepdata, impl_hint=impl_hint) + step.prepare(self, blockers) + steps.append(step) + + while blockers: + yield blockers + tasks.check(blockers) + blockers = [b for b in blockers if not b.happened] + + + if self.external_store: + # Note: external_store will not yet work with non- steps. + streams = [step.stream for step in steps] + self._add_to_external_store(required_digest, recipe.steps, streams) + else: + # Create an empty directory for the new implementation + store = stores.stores[0] + tmpdir = store.get_tmp_dir_for(required_digest) + try: + # Unpack each of the downloaded archives into it in turn + for step in steps: + step.apply(tmpdir) + # Check that the result is correct and store it in the cache + store.check_manifest_and_rename(required_digest, tmpdir) + tmpdir = None + finally: + # If unpacking fails, remove the temporary directory + if tmpdir is not None: + support.ro_rmtree(tmpdir) + finally: + for step in steps: + step.close() def _get_mirror_url(self, feed_url, resource): """Return the URL of a mirror for this feed.""" @@ -300,28 +306,31 @@ class Fetcher(object): @tasks.named_async("fetch_feed " + url) def fetch_feed(): - yield dl.downloaded - tasks.check(dl.downloaded) + try: + yield dl.downloaded + tasks.check(dl.downloaded) - pending = PendingFeed(feed_url, stream) + pending = PendingFeed(feed_url, stream) - if use_mirror: - # If we got the feed from a mirror, get the key from there too - key_mirror = self.config.mirror + '/keys/' - else: - key_mirror = None + if use_mirror: + # If we got the feed from a mirror, get the key from there too + key_mirror = self.config.mirror + '/keys/' + else: + key_mirror = None - keys_downloaded = tasks.Task(pending.download_keys(self, feed_hint = feed_url, key_mirror = key_mirror), _("download keys for %s") % feed_url) - yield keys_downloaded.finished - tasks.check(keys_downloaded.finished) + keys_downloaded = tasks.Task(pending.download_keys(self, feed_hint = feed_url, key_mirror = key_mirror), _("download keys for %s") % feed_url) + yield keys_downloaded.finished + tasks.check(keys_downloaded.finished) - if not self.config.iface_cache.update_feed_if_trusted(pending.url, pending.sigs, pending.new_xml): - blocker = self.config.trust_mgr.confirm_keys(pending) - if blocker: - yield blocker - tasks.check(blocker) if not self.config.iface_cache.update_feed_if_trusted(pending.url, pending.sigs, pending.new_xml): - raise NoTrustedKeys(_("No signing keys trusted; not importing")) + blocker = self.config.trust_mgr.confirm_keys(pending) + if blocker: + yield blocker + tasks.check(blocker) + if not self.config.iface_cache.update_feed_if_trusted(pending.url, pending.sigs, pending.new_xml): + raise NoTrustedKeys(_("No signing keys trusted; not importing")) + finally: + stream.close() task = fetch_feed() task.dl = dl @@ -375,14 +384,17 @@ class Fetcher(object): if isinstance(method, DownloadSource): blocker, stream = self.download_archive(method, impl_hint = impl, may_use_mirror = original_exception is None) - yield blocker - tasks.check(blocker) - - stream.seek(0) - if self.external_store: - self._add_to_external_store(required_digest, [method], [stream]) - else: - self._add_to_cache(required_digest, stores, method, stream) + try: + yield blocker + tasks.check(blocker) + + stream.seek(0) + if self.external_store: + self._add_to_external_store(required_digest, [method], [stream]) + else: + self._add_to_cache(required_digest, stores, method, stream) + finally: + stream.close() elif isinstance(method, Recipe): blocker = self.cook(required_digest, method, stores, impl_hint = impl) yield blocker @@ -439,7 +451,9 @@ class Fetcher(object): # (force is deprecated and ignored) def download_archive(self, download_source, force = False, impl_hint = None, may_use_mirror = False): """Fetch an archive. You should normally call L{download_impl} - instead, since it handles other kinds of retrieval method too.""" + instead, since it handles other kinds of retrieval method too. + It is the caller's responsibility to ensure that the returned stream is closed. + """ from zeroinstall.zerostore import unpack url = download_source.url @@ -499,8 +513,8 @@ class Fetcher(object): @tasks.async def download_and_add_icon(): stream = dl.tempfile - yield dl.downloaded try: + yield dl.downloaded tasks.check(dl.downloaded) if dl.unmodified: return stream.seek(0) @@ -588,6 +602,7 @@ class Fetcher(object): def download_url(self, url, hint = None, modification_time = None, expected_size = None, mirror_url = None): """The most low-level method here; just download a raw URL. + It is the caller's responsibility to ensure that dl.stream is closed. @param url: the location to download from @param hint: user-defined data to store on the Download (e.g. used by the GUI) @param modification_time: don't download unless newer than this @@ -622,6 +637,10 @@ class StepRunner(object): if subcls.model_type == type(model): return subcls assert False, "Couldn't find step runner for %s" % (type(model),) + + def close(self): + """Release any resources (called on success or failure).""" + pass class RenameStepRunner(StepRunner): """A step runner for the step. @@ -652,6 +671,9 @@ class DownloadStepRunner(StepRunner): extract = self.stepdata.extract, type=self.stepdata.type, start_offset = self.stepdata.start_offset or 0) + + def close(self): + self.stream.close() def native_path_within_base(base, crossplatform_path): """Takes a cross-platform relative path (i.e using forward slashes, even on windows) diff --git a/zeroinstall/injector/iface_cache.py b/zeroinstall/injector/iface_cache.py index f9caa19..dd069df 100644 --- a/zeroinstall/injector/iface_cache.py +++ b/zeroinstall/injector/iface_cache.py @@ -110,18 +110,20 @@ class PendingFeed(object): blockers = [] for b in old_blockers: + dl, stream = downloads[b] try: tasks.check(b) if b.happened: - dl, stream = downloads[b] stream.seek(0) self._downloaded_key(stream) any_success = True + stream.close() else: blockers.append(b) except Exception: _type, exception, tb = sys.exc_info() logger.warn(_("Failed to import key for '%(url)s': %(exception)s"), {'url': self.url, 'exception': str(exception)}) + stream.close() if exception and not any_success: raise_with_traceback(exception, tb) diff --git a/zeroinstall/zerostore/optimise.py b/zeroinstall/zerostore/optimise.py index 8ba46a3..cbf4fc6 100644 --- a/zeroinstall/zerostore/optimise.py +++ b/zeroinstall/zerostore/optimise.py @@ -124,5 +124,7 @@ def optimise(impl_dir): else: first_copy[key] = loc_path uniq_size += size + + ms.close() clear() return (uniq_size, dup_size, already_linked, man_size) -- 2.11.4.GIT