From 2100c652a63517601894efbe1ca8a64f8fee0d02 Mon Sep 17 00:00:00 2001 From: Aleksey Lim Date: Fri, 28 Oct 2011 20:11:05 +0000 Subject: [PATCH] Support pool of multithreaded downloaders * use httplib for HTTP with supporting persistence connections and falback to urllib2 otherwise * do not reset tmpfile in Downloader to prevent race conditions * use dns resolving cache --- tests/basetest.py | 3 +- zeroinstall/injector/download.py | 128 ++++++++--------- zeroinstall/injector/wget.py | 294 +++++++++++++++++++++++++++++++++++++++ 3 files changed, 357 insertions(+), 68 deletions(-) create mode 100644 zeroinstall/injector/wget.py diff --git a/tests/basetest.py b/tests/basetest.py index d1954e9..edb07db 100755 --- a/tests/basetest.py +++ b/tests/basetest.py @@ -13,7 +13,7 @@ os.environ['LANGUAGE'] = 'C' sys.path.insert(0, '..') from zeroinstall.injector import qdom -from zeroinstall.injector import iface_cache, download, distro, model, handler, policy, reader, trust +from zeroinstall.injector import iface_cache, download, distro, model, handler, policy, reader, trust, wget from zeroinstall.zerostore import NotStored, Store, Stores; Store._add_with_helper = lambda *unused: False from zeroinstall import support from zeroinstall.support import basedir, tasks @@ -196,6 +196,7 @@ class BaseTest(unittest.TestCase): distro._host_distribution._packagekit = DummyPackageKit() my_dbus.system_services = {} + wget.shutdown() def tearDown(self): assert self.config.handler.ex is None, self.config.handler.ex diff --git a/zeroinstall/injector/download.py b/zeroinstall/injector/download.py index 1b286a9..1531ae5 100644 --- a/zeroinstall/injector/download.py +++ b/zeroinstall/injector/download.py @@ -13,6 +13,7 @@ import tempfile, os, sys, threading, gobject from zeroinstall import SafeException from zeroinstall.support import tasks +from zeroinstall.injector import wget from logging import info, debug from zeroinstall import _ @@ -35,7 +36,7 @@ class DownloadAborted(DownloadError): def __init__(self, message = None): SafeException.__init__(self, message or _("Download aborted at user's request")) -class Download(object): +class Download(gobject.GObject): """A download of a single resource to a temporary file. @ivar url: the URL of the resource being fetched @type url: str @@ -49,15 +50,24 @@ class Download(object): @type downloaded: L{tasks.Blocker} @ivar hint: hint passed by and for caller @type hint: object + @ivar child: the child process + @type child: subprocess.Popen @ivar aborted_by_user: whether anyone has called L{abort} @type aborted_by_user: bool @ivar unmodified: whether the resource was not modified since the modification_time given at construction @type unmodified: bool """ __slots__ = ['url', 'tempfile', 'status', 'expected_size', 'downloaded', + 'child', 'hint', '_final_total_size', 'aborted_by_user', 'modification_time', 'unmodified'] + __gsignals__ = { + 'done': ( + gobject.SIGNAL_RUN_FIRST, gobject.TYPE_NONE, + [object, object, object]), + } + def __init__(self, url, hint = None, modification_time = None, expected_size = None): """Create a new download object. @param url: the resource to download @@ -65,11 +75,14 @@ class Download(object): @param modification_time: string with HTTP date that indicates last modification time. The resource will not be downloaded if it was not modified since that date. @postcondition: L{status} == L{download_fetching}.""" + gobject.GObject.__init__(self) + self.url = url self.hint = hint self.aborted_by_user = False self.modification_time = modification_time self.unmodified = False + self._done_hid = None self.tempfile = None # Stream for result self.downloaded = None @@ -79,75 +92,56 @@ class Download(object): self.status = download_fetching self.tempfile = tempfile.TemporaryFile(prefix = 'injector-dl-data-') + self.downloaded = tasks.Blocker('download %s' % self.url) - task = tasks.Task(self._do_download(), "download " + self.url) - self.downloaded = task.finished - - def _do_download(self): - """Will trigger L{downloaded} when done (on success or failure).""" - from ._download_child import download_in_thread - - result = [] - thread_blocker = tasks.Blocker("wait for thread " + self.url) - def notify_done(status, ex = None): - result.append(status) - def wake_up_main(): - thread_blocker.trigger(ex) - return False - gobject.idle_add(wake_up_main) - child = threading.Thread(target = lambda: download_in_thread(self.url, self.tempfile, self.modification_time, notify_done)) - child.daemon = True - child.start() - - # Wait for child to complete download. - yield thread_blocker - - # Download is complete... - child.join() - - assert self.status is download_fetching - assert self.tempfile is not None - - status, = result - - if status == RESULT_NOT_MODIFIED: - debug("%s not modified", self.url) - self.tempfile = None - self.unmodified = True - self.status = download_complete - self._final_total_size = 0 - self.downloaded.trigger() - return - - self._final_total_size = self.get_bytes_downloaded_so_far() - - self.tempfile = None + self._done_hid = self.connect('done', self.__done_cb) + # Let the caller to read tempfile before closing the connection + # TODO eliminate such unreliable workflow + gobject.idle_add(wget.start, self.url, self.modification_time, + self.tempfile.fileno(), self) - if self.aborted_by_user: - assert self.downloaded.happened - raise DownloadAborted() + def __done_cb(self, *args): + if self._done_hid is not None: + self.disconnect(self._done_hid) + self._done_hid = None + gobject.idle_add(self._done_cb, *args) + def _done_cb(self, sender, status, reason, exception): try: - - tasks.check(thread_blocker) - - assert status == RESULT_OK - - # Check that the download has the correct size, if we know what it should be. - if self.expected_size is not None: - if self._final_total_size != self.expected_size: - raise SafeException(_('Downloaded archive has incorrect size.\n' - 'URL: %(url)s\n' - 'Expected: %(expected_size)d bytes\n' - 'Received: %(size)d bytes') % {'url': self.url, 'expected_size': self.expected_size, 'size': self._final_total_size}) - except: - self.status = download_failed - _unused, ex, tb = sys.exc_info() - self.downloaded.trigger(exception = (ex, tb)) - else: + self._final_total_size = 0 + if self.aborted_by_user: + raise DownloadAborted() + elif status == 304: + debug("No need to download not modified %s", self.url) + self.unmodified = True + elif status == 200: + self._final_total_size = self.get_bytes_downloaded_so_far() + # Check that the download has the correct size, + # if we know what it should be. + if self.expected_size is not None and \ + self.expected_size != self._final_total_size: + raise SafeException( + _('Downloaded archive has incorrect size.\n' + 'URL: %(url)s\n' + 'Expected: %(expected_size)d bytes\n' + 'Received: %(size)d bytes') % { + 'url': self.url, + 'expected_size': self.expected_size, + 'size': self._final_total_size}) + elif exception is None: + raise DownloadError(_('Download %s failed: %s') % \ + (self.url, reason)) + except Exception, error: + __, ex, tb = sys.exc_info() + exception = (ex, tb) + + if exception is None: self.status = download_complete self.downloaded.trigger() - + else: + self.status = download_failed + self.downloaded.trigger(exception=exception) + def abort(self): """Signal the current download to stop. @postcondition: L{aborted_by_user}""" @@ -161,9 +155,9 @@ class Download(object): # In any case, we don't wait for the child to exit before notifying tasks that are waiting # on us. self.aborted_by_user = True - self.tempfile.close() - self.tempfile = None - self.downloaded.trigger((DownloadAborted(), None)) + info(_("Killing download process %s"), self.url) + self.__done_cb(None, None, None, None) + wget.abort(self.url) def get_current_fraction(self): """Returns the current fraction of this download that has been fetched (from 0 to 1), diff --git a/zeroinstall/injector/wget.py b/zeroinstall/injector/wget.py new file mode 100644 index 0000000..3a3e7e3 --- /dev/null +++ b/zeroinstall/injector/wget.py @@ -0,0 +1,294 @@ +# Copyright (C) 2011, Aleksey Lim +# See the README file for details, or visit http://0install.net. + +import os +import sys +import json +import atexit +import thread +import socket +import urllib2 +import urlparse +import threading +from select import select +from httplib import HTTPConnection + + +PAGE_SIZE = 4096 +# Convenient way to set maximum number of workers and maximum number +# of simultaneous connections per domain at the same time +# 15 is a Nettiquete.. +MAX_RUN_WORKERS_AND_POOL = 15 + +_queue = None +_http_proxy_host = None +_http_proxy_port = None +_resolve_cache = {} + + +def start(url, modification_time, fd, receiver): + _init() + _queue.push({'requested_url': url, + 'modification_time': modification_time, + 'fd': fd, + 'receiver': receiver, + }) + + +def abort(url): + _init() + _queue.abort(url) + + +def shutdown(): + global _queue + if _queue is not None: + _queue.clear() + _queue = None + + +def _init(): + global _queue, _http_proxy_host, _http_proxy_port + + if _queue is not None: + return + + proxy_detector = urllib2.ProxyHandler() + if 'http' in proxy_detector.proxies: + proxy = urlparse.urlparse(proxy_detector.proxies['http']) + _http_proxy_host = proxy.hostname + _http_proxy_port = proxy.port + + _queue = _RequestsQueue() + atexit.register(shutdown) + + +class _RequestsQueue(object): + + def __init__(self): + self._mutex = threading.Lock() + self._condition = threading.Condition(self._mutex) + self._workers = [] + self._requests = {} + self._requests_in_process = {} + self._workders_in_wait = 0 + self._pool = _ConnectionsPool() + self._exiting = False + + def push(self, request): + worker = None + + self._mutex.acquire() + try: + self._requests[request['requested_url']] = request + if self._workders_in_wait: + self._condition.notify() + if len(self._workers) < MAX_RUN_WORKERS_AND_POOL: + worker = _Worker(self._pop) + self._workers.append(worker) + finally: + self._mutex.release() + + if worker is not None: + worker.start() + + def abort(self, url): + self._mutex.acquire() + try: + if url in self._requests: + del self._requests[url] + if url in self._requests_in_process: + self._requests_in_process[url].close() + finally: + self._mutex.release() + + def clear(self): + self._mutex.acquire() + try: + self._exiting = True + self._requests.clear() + for connection in self._requests_in_process.values(): + connection.close() + self._condition.notify_all() + finally: + self._mutex.release() + + def _pop(self, prev_connection): + self._mutex.acquire() + try: + if prev_connection is not None: + del self._requests_in_process[ + prev_connection.requested['requested_url']] + self._pool.push(prev_connection) + + if hasattr(prev_connection, 'redirect'): + location_url, request = prev_connection.redirect + delattr(prev_connection, 'redirect') + else: + while not self._requests: + if self._exiting: + return None, None, None + self._workders_in_wait += 1 + self._condition.wait() + self._workders_in_wait -= 1 + location_url, request = self._requests.popitem() + + location_parts = urlparse.urlparse(location_url) + if _http_proxy_host and location_parts.scheme == 'http': + connection_url = (location_parts.scheme, + _http_proxy_host, _http_proxy_port) + else: + connection_url = (location_parts.scheme, + location_parts.hostname, location_parts.port or '80') + connection = self._pool.pop(connection_url) + finally: + self._mutex.release() + + request['location_url'] = location_url + request['connection_url'] = connection_url + + scheme, host, port = connection_url + if connection is None and scheme == 'http': + connection = HTTPConnection(_resolve(host), port) + + if connection is None: + openner = _urllib_openner + else: + connection.requested = request + self._requests_in_process[request['requested_url']] = connection + openner = _http_openner + + return request, connection, openner + + +class _Redirect(Exception): + + def __init__(self, location): + self.location = location + + +class _ConnectionsPool(object): + + def __init__(self): + self._connections = {} + + def __iter__(self): + for i in self._connections.values(): + yield i + + def __getitem__(self, connection_url): + pool = self._connections.get(connection_url) + if pool is None: + pool = self._connections[connection_url] = [] + return pool + + def push(self, connection): + if connection is None: + return + pool = self[connection.requested['connection_url']] + # That should not happen because max number of workers is equal to + # max number of simultaneous connections per domain + assert len(pool) <= MAX_RUN_WORKERS_AND_POOL + pool.insert(0, connection) + + def pop(self, connection_url): + pool = self[connection_url] + if pool: + connection = pool.pop() + if isinstance(connection, HTTPConnection) and \ + connection.sock is not None and \ + select([connection.sock], [], [], 0.0)[0]: + # Either data is buffered (bad), or the connection is dropped + connection.close() + return connection + + +class _Worker(threading.Thread): + + def __init__(self, pop_request_cb): + threading.Thread.__init__(self) + # To not wait for the thread on process exit + self.daemon = True + self._pop_request = pop_request_cb + + def run(self): + try: + connection = None + while True: + request, connection, openner = self._pop_request(connection) + if openner is None: + break + + try: + status, reason = openner(connection, request) + exception = None + except _Redirect, redirect: + connection.redirect = (redirect.location, request) + continue + except Exception, error: + if isinstance(error, urllib2.HTTPError): + status = error.status + else: + status = None + reason = '%s %r' % (error, request) + __, ex, tb = sys.exc_info() + exception = (ex, tb) + + request['receiver'].emit('done', status, reason, exception) + except KeyboardInterrupt, e: + thread.interrupt_main() + raise + except Exception, e: + thread.interrupt_main() + raise + + +def _http_openner(connection, request): + headers = {'connection': 'keep-alive'} + if request.get('modification_time'): + headers['If-Modified-Since'] = request['modification_time'] + connection.request('GET', request['location_url']) + + response = connection.getresponse() + try: + # Handle redirection + if response.status in [301, 302, 303, 307] and \ + response.getheader('location'): + raise _Redirect(response.getheader('location')) + if response.status == 200: + _read_file(request, response) + return response.status, response.reason + finally: + response.close() + + +def _urllib_openner(connection, request): + url_request = urllib2.Request(request['location_url']) + if request['location_url'].startswith('http:') and \ + request.get('modification_time'): + url_request.add_header( + 'If-Modified-Since', request['modification_time']) + + response = urllib2.urlopen(url_request) + try: + _read_file(request, response) + finally: + response.close() + + return 200, None + + +def _read_file(request, response): + while True: + data = response.read(PAGE_SIZE) + if not data: + break + assert os.write(request['fd'], data) == len(data) + + +def _resolve(hostname): + addr = _resolve_cache.get(hostname) + if not addr: + addrinfo = socket.getaddrinfo(hostname, 0)[0] + addr = _resolve_cache[hostname] = addrinfo[4][0] + return addr -- 2.11.4.GIT