Use concurrent.futures for quicker job completion
authorThomas Perl <m@thp.io>
Mon, 22 Aug 2011 12:01:35 +0000 (22 14:01 +0200)
committerThomas Perl <m@thp.io>
Mon, 22 Aug 2011 12:01:35 +0000 (22 14:01 +0200)
README
urlwatch

diff --git a/README b/README
index 1a01e3e..5fc21d0 100644 (file)
--- a/README
+++ b/README
@@ -9,6 +9,19 @@ change. Ideal for watching web pages of university courses, so you always
 know when lecture dates have changed or new tasks are online :)
 
 
+DEPENDENCIES
+------------
+
+This package requires the "concurrent.futures" module as included in Python
+3.2. For Python versions < 3.2, you can install it using:
+
+    pip install futures
+
+or download and install it manually from its project page at
+
+    http://code.google.com/p/pythonfutures/
+
+
 QUICK START
 -----------
 
index b820591..a339bf2 100755 (executable)
--- a/urlwatch
+++ b/urlwatch
@@ -86,6 +86,10 @@ import optparse
 import logging
 import imp
 
+# Python 3.2 includes "concurrent.futures", for older versions,
+# use "pip install futures" or http://code.google.com/p/pythonfutures/
+import concurrent.futures
+
 from urlwatch import handler
 
 # One minute (=60 seconds) timeout for each request to avoid hanging
@@ -100,6 +104,20 @@ class NullHandler(logging.Handler):
 
 log.addHandler(NullHandler())
 
+ERROR_MESSAGE_URLS_TXT = """
+Error: You need to create a urls.txt file first.'
+
+Place it in %s
+An example is available in %s
+"""
+
+ERROR_MESSAGE_HOOKS_PY = """
+You can also create %s
+An example is available in %s
+"""
+
+MAX_WORKERS = 10
+
 def foutput(type, url, content=None, summary=None, c='*', n=line_length):
     """Format output messages
     
@@ -182,15 +200,9 @@ if __name__ == '__main__':
         log.warning('not a file: %s' % urls_txt)
         urls_txt_fn = os.path.join(os.path.dirname(urls_txt), os.path.basename(urls_txt_example))
         hooks_py_fn = os.path.join(os.path.dirname(hooks_py), os.path.basename(hooks_py_example))
-        print 'Error: You need to create a urls.txt file first.'
-        print ''
-        print 'Place it in %s' % (urls_txt)
-        print 'An example is available in %s' % (urls_txt_fn)
-        print ''
+        print ERROR_MESSAGE_URLS_TXT % (urls_txt, urls_txt_fn)
         if not options.hooks:
-            print 'You can also create %s' % (hooks_py)
-            print 'An example is available in %s' % (hooks_py_fn)
-            print ''
+            print ERROR_MESSAGE_HOOKS_PY % (hooks_py, hooks_py_fn)
         if os.path.exists(urls_txt_example) and not os.path.exists(urls_txt_fn):
             shutil.copy(urls_txt_example, urls_txt_fn)
         if not options.hooks and os.path.exists(hooks_py_example) and not os.path.exists(hooks_py_fn):
@@ -205,31 +217,49 @@ if __name__ == '__main__':
     details = []
     count = 0
 
+    filter_func = lambda x, y: y
+
     if os.path.exists(hooks_py):
         log.info('using hooks.py from %s' % hooks_py)
         hooks = imp.load_source('hooks', hooks_py)
         if hasattr(hooks, 'filter'):
             log.info('found and enabled filter function from hooks.py')
-            filter = hooks.filter
+            filter_func = hooks.filter
         else:
             log.warning('hooks.py has no filter function - ignoring')
-            filter = lambda x, y: y
     else:
         log.info('not using hooks.py (file not found)')
-        filter = lambda x, y: y
 
-    for job in handler.parse_urls_txt(urls_txt):
-        log.info('processing job: %s' % job.location)
+    def process_job(job):
+        log.info('now processing: %s', job.location)
         filename = os.path.join(cache_dir, job.get_guid())
+        timestamp = None
+
+        if os.path.exists(filename):
+            timestamp = os.stat(filename)[stat.ST_MTIME]
+
+        data = job.retrieve(timestamp, filter_func, headers, log)
+        return filename, timestamp, data
+
+    jobs = handler.parse_urls_txt(urls_txt)
+    log.info('processing %d jobs', len(jobs))
+
+    executor = concurrent.futures.ThreadPoolExecutor(max_workers=MAX_WORKERS)
+
+    future_to_job = dict((executor.submit(process_job, job), job)
+            for job in jobs)
+
+    for future in concurrent.futures.as_completed(future_to_job):
+        job = future_to_job[future]
+
+        log.info('job finished: %s' % job.location)
+
         try:
-            if os.path.exists(filename):
-                st = os.stat(filename)
-                timestamp = st[stat.ST_MTIME]
-            else:
-                timestamp = None
+            exception = future.exception()
+            if exception is not None:
+                raise exception
 
-            # Retrieve the data
-            data = job.retrieve(timestamp, filter, headers, log)
+            filename, timestamp, data = future.result()
 
             if os.path.exists(filename):
                 log.info('%s exists - creating unified diff' % filename)