From 3b691c49ec1376521bb535e680c5b35377226c0f Mon Sep 17 00:00:00 2001 From: DrFrasierCrane Date: Sun, 13 Jan 2008 22:56:29 +0100 Subject: [PATCH] Refactored FeedDiscovery, now it uses Fetcher; added support for HTTP Basic Auth in Fetcher. --- straw/FeedDiscovery.py | 78 ++++++++++++++++++++++++++++++++++---------------- straw/FeedManager.py | 1 + straw/FeedUpdater.py | 13 +++++++-- straw/Fetcher.py | 66 ++++++++++++++++-------------------------- straw/JobManager.py | 13 ++++----- straw/feedfinder.py | 57 ++++++++++++------------------------ straw/subscribe.py | 22 +++++++++++--- 7 files changed, 130 insertions(+), 120 deletions(-) diff --git a/straw/FeedDiscovery.py b/straw/FeedDiscovery.py index 5760570..d1fd078 100644 --- a/straw/FeedDiscovery.py +++ b/straw/FeedDiscovery.py @@ -17,45 +17,73 @@ You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. """ -from JobManager import Job, TaskResult, TaskThread, ThreadPoolJobHandler +from JobManager import Job, TaskResult, TaskThread, JobHandler +import Fetcher import JobManager import SummaryParser import feedfinder +import gobject -class FeedDiscoveryJobHandler(ThreadPoolJobHandler): +class FeedDiscoveryJobHandler(JobHandler): job_id = "feed-discovery" + __gsignals__ = { + "feed-discovered" : (gobject.SIGNAL_RUN_LAST, gobject.TYPE_NONE, (gobject.TYPE_PYOBJECT,)) + } + def __init__(self, id, job): - ThreadPoolJobHandler.__init__(self, id, job) + JobHandler.__init__(self, id, job) + + def _on_fetch_started(self, handler, task): + pass + + def _on_url_fetched(self, handler, task_result): + url = task_result.task.user_data + fetch_result = task_result.result + + if feedfinder.couldBeFeedData(fetch_result.content): + feed = SummaryParser.parse(fetch_result.content, location = url) + self._notify("feed-discovered", feed) - self.pool_size = 1 - self.result_class = FeedDiscoveryTaskResult - self.task_class = FeedDiscoveryTaskThread + def _on_fetch_done(self, handler, data): + pass def _prepare(self): - self.task_queue.put(self.job.data) + self.observers = [{ "task-done": [ self._on_url_fetched ], + "task-start": [ self._on_fetch_started ], + "job-done": [ self._on_fetch_done ]}] + + def _run(self): + fetch_task = Fetcher.create_task(url = self.job.url, credentials = self.job.credentials, user_data = None) + fetch_result = fetch_task.fetch() + + if fetch_result.error: + return + + if feedfinder.couldBeFeedData(fetch_result.content): + feed = SummaryParser.parse(fetch_result.content, location = self.job.url) + + if self.job.credentials: + print self.job.credentials + feed.pdict["username"], feed.pdict["password"], domain = self.job.credentials -class FeedDiscoveryTaskResult(TaskResult): - def __init__(self, task_info, result): - self.task_info = task_info - self.result = result + self._notify("feed-discovered", feed) + return - def get(self): - raise NotImplementedError + urls = feedfinder.urls(self.job.url, fetch_result.content) + fetch_tasks = [Fetcher.create_task(url = url, user_data = url) for url in urls] + self.fetcher_id = Fetcher.fetch(fetch_tasks, observers = self.observers, wait = True) -class FeedDiscoveryTaskThread(TaskThread): - def __init__(self, handler): - TaskThread.__init__(self, handler) +class FeedDiscoveryJob(Job): + def __init__(self, url, credentials, observers): + Job.__init__(self, "feed-discovery") - def _process(self, url): - data = feedfinder.feeds(url, True) - feeds = [SummaryParser.parse(content, location = url) for url, content in data] - return feeds + self.observers = observers + self.url = url + self.credentials = credentials JobManager.register_handler(FeedDiscoveryJobHandler) -def discover(url, observers): - update = Job("feed-discovery") - update.data = url - update.observers = observers - JobManager.start(update) +def discover(url, credentials, observers): + job = FeedDiscoveryJob(url, credentials, observers) + JobManager.start(job) diff --git a/straw/FeedManager.py b/straw/FeedManager.py index 313c0d7..3056d06 100644 --- a/straw/FeedManager.py +++ b/straw/FeedManager.py @@ -211,6 +211,7 @@ class FeedManager(GObject): def save_all(self, save_list): for item in save_list: if isinstance(item, Feed): + item.status = straw.FS_IDLE item.parent_id = 1 if item.parent: diff --git a/straw/FeedUpdater.py b/straw/FeedUpdater.py index 9ea4efb..dd396b5 100644 --- a/straw/FeedUpdater.py +++ b/straw/FeedUpdater.py @@ -17,7 +17,6 @@ You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. """ -from Fetcher import FetchTask from JobManager import Job, TaskResult, TaskThread, JobHandler import Config import Fetcher @@ -58,7 +57,17 @@ class FeedUpdateJobHandler(JobHandler): pass def _prepare(self): - self.fetch_tasks = [Fetcher.create_task(url = feed.location, user_data = feed) for feed in self.job.tasks] + self.fetch_tasks = [] + + for feed in self.job.tasks: + credentials = None + + if feed.pdict.has_key("username"): + credentials = feed.pdict["username"], feed.pdict["password"], "" + + task = Fetcher.create_task(url = feed.location, credentials = credentials, user_data = feed) + + self.fetch_tasks.append(task) self.observers = [{ "task-done": [ self._on_url_fetched ], "task-start": [ self._on_fetch_started ], diff --git a/straw/Fetcher.py b/straw/Fetcher.py index 9e97fa8..d986459 100644 --- a/straw/Fetcher.py +++ b/straw/Fetcher.py @@ -44,6 +44,9 @@ class FetchTask(object): self.url = url self.user_data = user_data + def fetch(self): + return self.fetcher.fetch(self) + class FetchResult(object): def __init__(self, content, error): self.content = content @@ -63,15 +66,34 @@ class HttpFetcher(Fetcher): def fetch(self, task): url = task.url CACHE_DIR = os.path.join(Config.straw_home(), 'cache') - h = httplib2.Http(CACHE_DIR) - content = h.request(url, "GET")[1] + http = httplib2.Http(CACHE_DIR) + + content = None error = None + + if task.credentials: + username, password, domain = task.credentials + http.add_credentials(username, password, domain) + + try: + response, content = http.request(url, "GET") + except Exception, e: + error = e + + if not error and response.status >= 400: + error = response + return FetchResult(content, error) class HttpFetchTask(FetchTask): def __init__(self, fetcher, **kwargs): FetchTask.__init__(self, fetcher, kwargs["url"], kwargs["user_data"]) + self.credentials = None + + if kwargs.has_key("credentials"): + self.credentials = kwargs["credentials"] + class FileFetchTask(FetchTask): def __init__(self, fetcher, **kwargs): FetchTask.__init__(self, fetcher, kwargs["url"], kwargs["user_data"]) @@ -124,46 +146,6 @@ class FetchTaskThread(TaskThread): def _process(self, task): return task.fetcher.fetch(task) -class FetcherUsingJobHandler(ThreadPoolJobHandler): - def __init__(self, id, job): - ThreadPoolJobHandler.__init__(self, id, job) - - self.fetcher_id = None - self.fetcher_running = Semaphore(0) - - def _on_fetch_started(self, handler, task): - pass - - def _on_fetch_done(self, handler, data): - self.fetcher_id = None - self.fetcher_running.release() - - def _on_url_fetched(self, handler, task_result): - self.task_queue.put((task_result.task.user_data, task_result.result)) - - def _prepare_fetch_tasks(self): - raise NotImplementedError - - def _prepare(self): - fetch_tasks = self._prepare_fetch_tasks() - self.job_size = len(fetch_tasks) - - observers = [{ "task-done": [ self._on_url_fetched ], - "task-start": [ self._on_fetch_started ], - "job-done": [ self._on_fetch_done ]}] - - self.fetcher_id = fetch(fetch_tasks, observers = observers) - - def _shutdown(self): - self.fetcher_running.acquire() - ThreadPoolJobHandler._shutdown(self) - - def stop(self): - if self.fetcher_id: - stop(self.fetcher_id) - - ThreadPoolJobHandler.stop(self) - JobManager.register_handler(FetchJobHandler) def create_task(**kwargs): diff --git a/straw/JobManager.py b/straw/JobManager.py index 49df3ee..252a571 100644 --- a/straw/JobManager.py +++ b/straw/JobManager.py @@ -107,6 +107,9 @@ class JobHandler(GObject): for callable in observer_dict[signal]: self.connect(signal, callable) + def _debug(self, msg): + error.debug("%s %s" % (self.job_id, msg)) + def start(self): self._prepare() self._run() @@ -123,7 +126,8 @@ class JobHandler(GObject): raise NotImplementedError def _shutdown(self): - pass + self._debug("shutting down...") + self._notify("job-done") def _cleanup(self): self._notify("cleanup") @@ -162,9 +166,6 @@ class ThreadPoolJobHandler(JobHandler): self.task_threads = [] self.task_class = None - def _debug(self, msg): - error.debug("%s %s" % (self.job_id, msg)) - def _run(self): if not self.job_size: self.job_size = self.task_queue.qsize() @@ -196,10 +197,6 @@ class ThreadPoolJobHandler(JobHandler): self.running_queue.join() - def _shutdown(self): - self._debug("shutting down...") - self._notify("job-done") - def stop(self): for t in self.task_threads: t.job_done() diff --git a/straw/feedfinder.py b/straw/feedfinder.py index 657f5e9..39bf916 100644 --- a/straw/feedfinder.py +++ b/straw/feedfinder.py @@ -118,10 +118,10 @@ class URLGatekeeper: try: rp.read() except: - pass + _debuglog('failed to fetch %s' % robotsurl) self.rpcache[domain] = rp return rp - + def can_fetch(self, url): rp = self._getrp(url) allow = rp.can_fetch(self.urlopener.version, url) @@ -243,6 +243,9 @@ def tryBrokenRedirect(data): if newuris: return newuris[0].strip() def couldBeFeedData(data): + if not data: + return False + data = data.lower() if data.count(' 0] -def feed(uri): - #todo: give preference to certain feed formats - feedlist = feeds(uri) - if feedlist: - return feedlist[0] - else: - return None - ##### test harness ###### def test(): @@ -372,17 +365,3 @@ def test(): uri = urlparse.urljoin(uri, data.split('