From: Thomas Perl Date: Mon, 22 Aug 2011 12:01:35 +0000 (+0200) Subject: Use concurrent.futures for quicker job completion X-Git-Tag: 1.13~3 X-Git-Url: https://repo.or.cz/w/urlwatch.git/commitdiff_plain/d323d2cbae2028c1bec61b6b194348d0a1d4f65f Use concurrent.futures for quicker job completion --- diff --git a/README b/README index 1a01e3e..5fc21d0 100644 --- a/README +++ b/README @@ -9,6 +9,19 @@ change. Ideal for watching web pages of university courses, so you always know when lecture dates have changed or new tasks are online :) +DEPENDENCIES +------------ + +This package requires the "concurrent.futures" module as included in Python +3.2. For Python versions < 3.2, you can install it using: + + pip install futures + +or download and install it manually from its project page at + + http://code.google.com/p/pythonfutures/ + + QUICK START ----------- diff --git a/urlwatch b/urlwatch index b820591..a339bf2 100755 --- a/urlwatch +++ b/urlwatch @@ -86,6 +86,10 @@ import optparse import logging import imp +# Python 3.2 includes "concurrent.futures", for older versions, +# use "pip install futures" or http://code.google.com/p/pythonfutures/ +import concurrent.futures + from urlwatch import handler # One minute (=60 seconds) timeout for each request to avoid hanging @@ -100,6 +104,20 @@ class NullHandler(logging.Handler): log.addHandler(NullHandler()) +ERROR_MESSAGE_URLS_TXT = """ +Error: You need to create a urls.txt file first.' + +Place it in %s +An example is available in %s +""" + +ERROR_MESSAGE_HOOKS_PY = """ +You can also create %s +An example is available in %s +""" + +MAX_WORKERS = 10 + def foutput(type, url, content=None, summary=None, c='*', n=line_length): """Format output messages @@ -182,15 +200,9 @@ if __name__ == '__main__': log.warning('not a file: %s' % urls_txt) urls_txt_fn = os.path.join(os.path.dirname(urls_txt), os.path.basename(urls_txt_example)) hooks_py_fn = os.path.join(os.path.dirname(hooks_py), os.path.basename(hooks_py_example)) - print 'Error: You need to create a urls.txt file first.' - print '' - print 'Place it in %s' % (urls_txt) - print 'An example is available in %s' % (urls_txt_fn) - print '' + print ERROR_MESSAGE_URLS_TXT % (urls_txt, urls_txt_fn) if not options.hooks: - print 'You can also create %s' % (hooks_py) - print 'An example is available in %s' % (hooks_py_fn) - print '' + print ERROR_MESSAGE_HOOKS_PY % (hooks_py, hooks_py_fn) if os.path.exists(urls_txt_example) and not os.path.exists(urls_txt_fn): shutil.copy(urls_txt_example, urls_txt_fn) if not options.hooks and os.path.exists(hooks_py_example) and not os.path.exists(hooks_py_fn): @@ -205,31 +217,49 @@ if __name__ == '__main__': details = [] count = 0 + filter_func = lambda x, y: y + if os.path.exists(hooks_py): log.info('using hooks.py from %s' % hooks_py) hooks = imp.load_source('hooks', hooks_py) if hasattr(hooks, 'filter'): log.info('found and enabled filter function from hooks.py') - filter = hooks.filter + filter_func = hooks.filter else: log.warning('hooks.py has no filter function - ignoring') - filter = lambda x, y: y else: log.info('not using hooks.py (file not found)') - filter = lambda x, y: y - for job in handler.parse_urls_txt(urls_txt): - log.info('processing job: %s' % job.location) + def process_job(job): + log.info('now processing: %s', job.location) filename = os.path.join(cache_dir, job.get_guid()) + timestamp = None + + if os.path.exists(filename): + timestamp = os.stat(filename)[stat.ST_MTIME] + + data = job.retrieve(timestamp, filter_func, headers, log) + return filename, timestamp, data + + jobs = handler.parse_urls_txt(urls_txt) + log.info('processing %d jobs', len(jobs)) + + executor = concurrent.futures.ThreadPoolExecutor(max_workers=MAX_WORKERS) + + future_to_job = dict((executor.submit(process_job, job), job) + for job in jobs) + + for future in concurrent.futures.as_completed(future_to_job): + job = future_to_job[future] + + log.info('job finished: %s' % job.location) + try: - if os.path.exists(filename): - st = os.stat(filename) - timestamp = st[stat.ST_MTIME] - else: - timestamp = None + exception = future.exception() + if exception is not None: + raise exception - # Retrieve the data - data = job.retrieve(timestamp, filter, headers, log) + filename, timestamp, data = future.result() if os.path.exists(filename): log.info('%s exists - creating unified diff' % filename)