Implemented "mark all as read".
[straw.git] / straw / Fetcher.py
blobaab9b7b528ae5a3af4da9f6958c92dbee9f73ccb
1 """ Fetcher.py
3 """
5 __copyright__ = "Copyright (c) 2002-2005 Free Software Foundation, Inc."
6 __license__ = """
7 Straw is free software; you can redistribute it and/or modify it under the
8 terms of the GNU General Public License as published by the Free Software
9 Foundation; either version 2 of the License, or (at your option) any later
10 version.
12 Straw is distributed in the hope that it will be useful, but WITHOUT ANY
13 WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
14 A PARTICULAR PURPOSE. See the GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License along with
17 this program; if not, write to the Free Software Foundation, Inc., 59 Temple
18 Place - Suite 330, Boston, MA 02111-1307, USA. """
20 from JobManager import Job, TaskResult, TaskThread, ThreadPoolJobHandler
21 from threading import Semaphore
22 import Config
23 import JobManager
24 import httplib2
25 import os
26 import urlparse
28 def _get_url_schema(url):
29 return urlparse.urlsplit(url, "http")[0]
31 class Fetcher(object):
32 def __init__(self):
33 pass
35 def supports(self, **kwargs):
36 raise NotImplementedError
38 class FetchTask(object):
39 def __init__(self, fetcher, url, user_data):
40 self.fetcher = fetcher
41 self.url = url
42 self.user_data = user_data
44 def __str__(self):
45 return "[fetch: url = (%s)]" % self.url
47 def fetch(self):
48 return self.fetcher.fetch(self)
50 class FetchResult(object):
51 def __init__(self, content, error):
52 self.content = content
53 self.error = error
55 class HttpFetcher(Fetcher):
56 def __init__(self):
57 Fetcher.__init__(self)
59 def supports(self, **kwargs):
60 schema = _get_url_schema(kwargs["url"])
61 return schema == "http" or schema == "https"
63 def create_task(self, **kwargs):
64 return HttpFetchTask(self, **kwargs)
66 def fetch(self, task):
67 url = task.url
68 CACHE_DIR = os.path.join(Config.straw_home(), 'cache')
69 http = httplib2.Http(CACHE_DIR)
71 content = None
72 error = None
74 if task.credentials:
75 username, password, domain = task.credentials
76 http.add_credentials(username, password, domain)
78 try:
79 response, content = http.request(url, "GET")
80 except Exception, e:
81 error = e
83 if not error and response.status >= 400:
84 error = response
86 return FetchResult(content, error)
88 class HttpFetchTask(FetchTask):
89 def __init__(self, fetcher, **kwargs):
90 FetchTask.__init__(self, fetcher, kwargs["url"], kwargs["user_data"])
92 self.credentials = None
94 if kwargs.has_key("credentials"):
95 self.credentials = kwargs["credentials"]
97 class FileFetchTask(FetchTask):
98 def __init__(self, fetcher, **kwargs):
99 FetchTask.__init__(self, fetcher, kwargs["url"], kwargs["user_data"])
101 class FileFetcher(Fetcher):
102 def __init__(self):
103 Fetcher.__init__(self)
105 def supports(self, **kwargs):
106 schema = _get_url_schema(kwargs["url"])
107 return schema == "file"
109 def create_task(self, **kwargs):
110 return FileFetchTask(self, **kwargs)
112 def fetch(self, task):
113 url = task.url
114 content = None
115 error = None
117 try:
118 f = open(url[len("file://"):], "r")
119 content = f.read()
120 f.close()
121 except Exception, e:
122 error = e
124 return FetchResult(content, error)
126 _file_fetcher = FileFetcher()
127 _http_fetcher = HttpFetcher()
129 class FetchJobHandler(ThreadPoolJobHandler):
130 job_id = "fetch"
132 def __init__(self, id, job):
133 ThreadPoolJobHandler.__init__(self, id, job)
135 self.pool_size = 5
136 self.task_class = FetchTaskThread
138 def _prepare(self):
139 for task in self.job.tasks:
140 self.task_queue.put(task)
142 class FetchTaskThread(TaskThread):
143 def __init__(self, handler):
144 TaskThread.__init__(self, handler)
146 def _process(self, task):
147 return task.fetcher.fetch(task)
149 JobManager.register_handler(FetchJobHandler)
151 def create_task(**kwargs):
152 for fetcher in [ _http_fetcher, _file_fetcher ]:
153 if fetcher.supports(**kwargs):
154 return fetcher.create_task(**kwargs)
156 def fetch(tasks, observers = {}, wait = False):
157 job = Job("fetch")
158 job.tasks = tasks
159 job.observers = observers
160 return JobManager.start(job, wait = wait)
162 def stop(id):
163 JobManager.stop(id)