From 73673d200a36caea3cbf7e4c1983406b737961ae Mon Sep 17 00:00:00 2001 From: DrFrasierCrane Date: Sun, 13 Jan 2008 14:35:49 +0100 Subject: [PATCH] Implemented Fetcher job, refactored JobManager, code cleanup. --- data/straw.glade | 62 ++++---- straw/Application.py | 14 +- straw/Config.py | 2 + straw/FeedDiscovery.py | 24 ++- straw/FeedListView.py | 8 +- straw/FeedManager.py | 61 ++++---- straw/FeedUpdater.py | 197 +++++++++++-------------- straw/Fetcher.py | 181 +++++++++++++++++++++++ straw/JobManager.py | 220 ++++++++++++++-------------- straw/__init__.py | 1 + straw/opml.py | 62 ++++---- straw/socks/BUGS | 25 ++++ straw/socks/LICENSE | 22 +++ straw/socks/README | 201 +++++++++++++++++++++++++ straw/socks/socks.py | 387 +++++++++++++++++++++++++++++++++++++++++++++++++ straw/subscribe.py | 15 +- test/TestFeedparser.py | 100 ------------- 17 files changed, 1150 insertions(+), 432 deletions(-) rewrite straw/FeedUpdater.py (69%) create mode 100644 straw/Fetcher.py create mode 100644 straw/socks/BUGS create mode 100644 straw/socks/LICENSE create mode 100644 straw/socks/README create mode 100644 straw/socks/socks.py delete mode 100644 test/TestFeedparser.py diff --git a/data/straw.glade b/data/straw.glade index bf1d706..2a098fc 100644 --- a/data/straw.glade +++ b/data/straw.glade @@ -5,12 +5,14 @@ True + GDK_EXPOSURE_MASK | GDK_POINTER_MOTION_MASK | GDK_POINTER_MOTION_HINT_MASK | GDK_BUTTON_MOTION_MASK | GDK_BUTTON1_MOTION_MASK | GDK_BUTTON2_MOTION_MASK | GDK_BUTTON3_MOTION_MASK | GDK_BUTTON_PRESS_MASK | GDK_BUTTON_RELEASE_MASK | GDK_KEY_PRESS_MASK | GDK_KEY_RELEASE_MASK | GDK_ENTER_NOTIFY_MASK | GDK_LEAVE_NOTIFY_MASK | GDK_FOCUS_CHANGE_MASK | GDK_STRUCTURE_MASK | GDK_PROPERTY_CHANGE_MASK | GDK_VISIBILITY_NOTIFY_MASK | GDK_PROXIMITY_IN_MASK | GDK_PROXIMITY_OUT_MASK | GDK_SUBSTRUCTURE_MASK | GDK_SCROLL_MASK Straw straw.png + True @@ -437,6 +439,8 @@ True True + True + True False True False @@ -529,34 +533,17 @@ 8 8 - - True - True - Earliest date - True - 0 - True - - - - GTK_FILL - - - - - + True - True - Latest date - True - 0 - True - + False + GNOME_DATE_EDIT_24_HR | GNOME_DATE_EDIT_WEEK_STARTS_ON_MONDAY + + 1 + 2 1 2 - GTK_FILL @@ -574,17 +561,34 @@ - + True - False - GNOME_DATE_EDIT_24_HR | GNOME_DATE_EDIT_WEEK_STARTS_ON_MONDAY - + True + Latest date + True + 0 + True + - 1 - 2 1 2 + GTK_FILL + + + + + + True + True + Earliest date + True + 0 + True + + + + GTK_FILL diff --git a/straw/Application.py b/straw/Application.py index 4fcec24..470e567 100644 --- a/straw/Application.py +++ b/straw/Application.py @@ -295,7 +295,7 @@ class ApplicationPresenter(MVP.BasicPresenter): self.update_all_button.set_stock_id(gtk.STOCK_STOP) else: self.update_all_button.set_sensitive(False) - JobManager.stop_jobs() + FeedManager.stop_update_all() def _on_feed_poll_started(self, handler, feed): pass @@ -597,13 +597,15 @@ class ApplicationView(MVP.WidgetView): def _on_straw_main_delete_event(self, *args): return self._presenter.quit() + + def _on_straw_main_window_state_event(self, widget, event): + is_maximized = widget.window.get_state() == gtk.gdk.WINDOW_STATE_MAXIMIZED + Config.set(OPTION_WINDOW_MAX, is_maximized) def _on_straw_main_configure_event(self, widget, event, *args): - if widget.window.get_state() is not gtk.gdk.WINDOW_STATE_MAXIMIZED: - Config.set(OPTION_WINDOW_MAX, False) - self._presenter.check_allocation(widget, event) - else: - Config.set(OPTION_WINDOW_MAX, True) + #if not (widget.window.get_state() & gtk.gdk.WINDOW_STATE_MAXIMIZED): + # self._presenter.check_allocation(widget, event) + pass def _on_main_main_pane_size_allocate(self, widget, *args): self._presenter.check_main_pane_position(widget) diff --git a/straw/Config.py b/straw/Config.py index e5c8d20..656c871 100644 --- a/straw/Config.py +++ b/straw/Config.py @@ -245,6 +245,8 @@ class Config(gobject.GObject): self._defaults = defaults self._options = defaults self.persistence = persistence + + self.initialize_proxy() self._apply_defaults() diff --git a/straw/FeedDiscovery.py b/straw/FeedDiscovery.py index 92a95c5..5760570 100644 --- a/straw/FeedDiscovery.py +++ b/straw/FeedDiscovery.py @@ -17,7 +17,7 @@ You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. """ -from JobManager import Job, TaskResult, TaskThread, TaskInfo, ThreadPoolJobHandler +from JobManager import Job, TaskResult, TaskThread, ThreadPoolJobHandler import JobManager import SummaryParser import feedfinder @@ -25,20 +25,15 @@ import feedfinder class FeedDiscoveryJobHandler(ThreadPoolJobHandler): job_id = "feed-discovery" - def __init__(self, job): - ThreadPoolJobHandler.__init__(self, job) + def __init__(self, id, job): + ThreadPoolJobHandler.__init__(self, id, job) self.pool_size = 1 self.result_class = FeedDiscoveryTaskResult self.task_class = FeedDiscoveryTaskThread - def _split(self): - ti = TaskInfo(1, { "url" : self.job.data }) - self.task_queue.put(ti) - - def _prepare_result(self): - task_result = self.result_queue.get() - return task_result.result + def _prepare(self): + self.task_queue.put(self.job.data) class FeedDiscoveryTaskResult(TaskResult): def __init__(self, task_info, result): @@ -49,11 +44,10 @@ class FeedDiscoveryTaskResult(TaskResult): raise NotImplementedError class FeedDiscoveryTaskThread(TaskThread): - def __init__(self, handler, task_queue, result_queue): - TaskThread.__init__(self, handler, task_queue, result_queue) + def __init__(self, handler): + TaskThread.__init__(self, handler) - def _process(self, task): - url = task.data["url"] + def _process(self, url): data = feedfinder.feeds(url, True) feeds = [SummaryParser.parse(content, location = url) for url, content in data] return feeds @@ -64,4 +58,4 @@ def discover(url, observers): update = Job("feed-discovery") update.data = url update.observers = observers - JobManager.start_job(update) + JobManager.start(update) diff --git a/straw/FeedListView.py b/straw/FeedListView.py index 4f3329c..0e61ce1 100644 --- a/straw/FeedListView.py +++ b/straw/FeedListView.py @@ -51,6 +51,9 @@ class TreeViewNode(object): #print "obj_changed: %s" % threading.currentThread() #debug("obj %d changed: property.name = %s, self.path = %s, self.store[path].id = %s" % (obj.id, property.name, str(self.path), str(self.store[self.path][Column.object].node.id))) #debug("obj %d changed: property.name = %s, self.path = %s, self.store[path].id = %s" % (obj.id, property.name, str(self.path), str(self.store[self.path][Column.object].node.id))) + + #self.store.set(self.iter, Column.pixbuf, self.pixbuf) + if property.name == "unread-count": #debug("obj %d changed: property.name = %s, self.path = %s, self.store[path].id = %s" % (obj.id, property.name, str(self.path), str(self.store[self.path][Column.object].node.id))) iter = self.iter @@ -88,7 +91,10 @@ class TreeViewNode(object): global _tmp_widget if isinstance(self.node, Feed): - return _tmp_widget.render_icon(gtk.STOCK_FILE, gtk.ICON_SIZE_MENU) + if self.node.status == straw.FS_ERROR: + return _tmp_widget.render_icon(gtk.STOCK_CANCEL, gtk.ICON_SIZE_MENU) + else: + return _tmp_widget.render_icon(gtk.STOCK_FILE, gtk.ICON_SIZE_MENU) else: return _tmp_widget.render_icon(gtk.STOCK_DIRECTORY, gtk.ICON_SIZE_MENU) diff --git a/straw/FeedManager.py b/straw/FeedManager.py index ec5e13f..313c0d7 100644 --- a/straw/FeedManager.py +++ b/straw/FeedManager.py @@ -46,7 +46,11 @@ def update_all_feeds(observers): def is_update_all_running(): fm = _get_instance() - return fm.update_all_running + return fm.update_all_id != None + +def stop_update_all(): + fm = _get_instance() + return fm.stop_update_all() def update_nodes(nodes, observers = {}): fm = _get_instance() @@ -110,7 +114,7 @@ class FeedManager(GObject): GObject.__init__(self) self.storage = None - self.update_all_running = False + self.update_all_id = None def emit(self, *args): gobject.idle_add(gobject.GObject.emit, self, *args) @@ -132,24 +136,28 @@ class FeedManager(GObject): else: node.parent = None + if isinstance(node, Feed): + node.status = straw.FS_IDLE + def _setup_node_signals(self, node): node.connect("parent-changed", self.on_parent_changed) node.connect("norder-changed", self.on_norder_changed) def import_opml(self, path, category): job = Job("opml-parse") - job.data = (path, category) - job.observers = [ { "job-done": [ self.import_opml_save ] } ] + job.data = ("file://" + path, category) + job.observers = [ { "task-done": [ self._on_opml_imported ] } ] - JobManager.start_job(job) + JobManager.start(job) - def import_opml_save(self, handler, opml_data): - save_list, category = opml_data + def _on_opml_imported(self, handler, task_result): + save_list = task_result.result + + if save_list: + self.dao.tx_begin() + self.save_all(save_list) + self.dao.tx_commit() - self.dao.tx_begin() - self.save_all(save_list) - self.dao.tx_commit() - def export_opml(self, root_id, filename): if not self.nodes.has_key(root_id): return None @@ -253,12 +261,10 @@ class FeedManager(GObject): def update_all_feeds(self, observers): feeds = [node for node in self.nodes.values() if node.type == "F"] - self.update_all_running = True - - FeedUpdater.update_feeds(feeds, [{ + self.update_all_id = FeedUpdater.update(feeds, [{ "job-done": [ self._on_update_all_done ], - "task-start": [ self._on_update_feed_start ], - "task-done": [ self._on_update_feed_done ] + "update-started": [ self._on_update_feed_start ], + "update-done": [ self._on_update_feed_done ] }, observers]) def update_nodes(self, nodes, observers): @@ -270,31 +276,36 @@ class FeedManager(GObject): else: feeds.extend([child_node for child_node in node.all_children() if child_node.type == "F"]) - FeedUpdater.update_feeds(feeds, [{ - "task-start": [ self._on_update_feed_start ], - "task-done": [ self._on_update_feed_done ] + FeedUpdater.update(feeds, [{ + "update-started": [ self._on_update_feed_start ], + "update-done": [ self._on_update_feed_done ] }, observers]) + def stop_update_all(self): + FeedUpdater.stop(self.update_all_id) + def _on_update_all_done(self, handler, data): - self.update_all_running = False + self.update_all_id = None self.emit("update-all-done") def _on_update_feed_start(self, handler, feed): feed.props.status = straw.FS_UPDATING - def _on_update_feed_done(self, handler, data): - feed = data.task_info.data["feed"] - feed.props.status = straw.FS_IDLE + def _on_update_feed_done(self, handler, update_result): + feed = update_result.feed - if not data.result: + if update_result.error: + feed.props.status = straw.FS_ERROR return + feed.props.status = straw.FS_IDLE + self.dao.tx_begin() # Some metadata could change between the updates. self.save_feed(feed) - for item in data.result.items: + for item in feed.items: item.feed_id = feed.id if not ItemManager.feed_item_exists(item): diff --git a/straw/FeedUpdater.py b/straw/FeedUpdater.py dissimilarity index 69% index 2b0a605..9ea4efb 100644 --- a/straw/FeedUpdater.py +++ b/straw/FeedUpdater.py @@ -1,110 +1,87 @@ -""" FeedUpdater.py - -""" - -__copyright__ = "Copyright (c) 2002-2005 Free Software Foundation, Inc." -__license__ = """ -Straw is free software; you can redistribute it and/or modify it under the -terms of the GNU General Public License as published by the Free Software -Foundation; either version 2 of the License, or (at your option) any later -version. - -Straw is distributed in the hope that it will be useful, but WITHOUT ANY -WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR -A PARTICULAR PURPOSE. See the GNU General Public License for more details. - -You should have received a copy of the GNU General Public License along with -this program; if not, write to the Free Software Foundation, Inc., 59 Temple -Place - Suite 330, Boston, MA 02111-1307, USA. """ - -from JobManager import Job, TaskResult, TaskThread, TaskInfo, ThreadPoolJobHandler -import Config -import JobManager -import httplib2 -import os -import urlparse - -class FeedUpdateJobHandler(ThreadPoolJobHandler): - job_id = "feed-update" - - def __init__(self, job): - ThreadPoolJobHandler.__init__(self, job) - - self.pool_size = 5 - self.result_class = FeedUpdateTaskResult - self.task_class = FeedUpdateTaskThread - - def _split(self): - i = 0 - for a in self.job.data: - i += 1 - ti = TaskInfo(i, { "feed" : a }) - self.task_queue.put(ti) - - def _prepare_result(self): - list = [] - - while not self.result_queue.empty(): - list.append(self.result_queue.get()) - - return list - -class FeedUpdateTaskResult(TaskResult): - def __init__(self, task_info, result): - self.task_info = task_info - self.result = result - - def get(self): - raise NotImplementedError - -class FeedUpdateTaskThread(TaskThread): - def __init__(self, handler, task_queue, result_queue): - TaskThread.__init__(self, handler, task_queue, result_queue) - - def _get_feed_content(self, url): - schema = urlparse.urlsplit(url, "http")[0] - content = None - - if schema == "http": - CACHE_DIR = os.path.join(Config.straw_home(), 'cache') - h = httplib2.Http(CACHE_DIR) - content = h.request(url, "GET")[1] - elif schema == "file": - try: - f = open(url[len("file://"):], "r") - content = f.read() - f.close() - except Exception: - # TODO: error reporting! - pass - - return content - - def _process(self, task): - feed = task.data["feed"] - - content = self._get_feed_content(feed.location) - - import SummaryParser - parsed = SummaryParser.parse(content, feed) - - #i = 0 - #for image in sum([item.images for item in parsed.items], []): - #resp, content = h.request(image, "GET") - # i = i + 1 - #f = open("/home/ppawel/Desktop/test/%s" % i, "w") - #f.write(content) - #f.close() - - return parsed - - def _prepare_task_info(self, task_info): - return task_info.data["feed"] - -JobManager.register_handler(FeedUpdateJobHandler) - -def update_feeds(feeds, observers): - update = Job("feed-update") - update.data = feeds - update.observers = observers - JobManager.start_job(update) +""" FeedUpdater.py + +""" + +__copyright__ = "Copyright (c) 2002-2005 Free Software Foundation, Inc." +__license__ = """ +Straw is free software; you can redistribute it and/or modify it under the +terms of the GNU General Public License as published by the Free Software +Foundation; either version 2 of the License, or (at your option) any later +version. + +Straw is distributed in the hope that it will be useful, but WITHOUT ANY +WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR +A PARTICULAR PURPOSE. See the GNU General Public License for more details. + +You should have received a copy of the GNU General Public License along with +this program; if not, write to the Free Software Foundation, Inc., 59 Temple +Place - Suite 330, Boston, MA 02111-1307, USA. """ + +from Fetcher import FetchTask +from JobManager import Job, TaskResult, TaskThread, JobHandler +import Config +import Fetcher +import JobManager +import SummaryParser +import gobject +import httplib2 +import os +import urlparse + +class FeedUpdateJobHandler(JobHandler): + job_id = "feed-update" + + __gsignals__ = { + "update-started" : (gobject.SIGNAL_RUN_LAST, gobject.TYPE_NONE, (gobject.TYPE_PYOBJECT,)), + "update-done" : (gobject.SIGNAL_RUN_LAST, gobject.TYPE_NONE, (gobject.TYPE_PYOBJECT,)) + } + + def __init__(self, id, job): + JobHandler.__init__(self, id, job) + + def _on_fetch_started(self, handler, task): + self._notify("update-started", task.user_data) + + def _on_url_fetched(self, handler, fetch_result): + feed = fetch_result.task.user_data + update_result = None + + if fetch_result.result.error: + update_result = FeedUpdateResult("fetch-error", feed, fetch_result.result) + else: + update_result = FeedUpdateResult(None, feed, fetch_result.result) + feed = SummaryParser.parse(fetch_result.result.content, feed) + + self._notify("update-done", update_result) + + def _on_fetch_done(self, handler, data): + pass + + def _prepare(self): + self.fetch_tasks = [Fetcher.create_task(url = feed.location, user_data = feed) for feed in self.job.tasks] + + self.observers = [{ "task-done": [ self._on_url_fetched ], + "task-start": [ self._on_fetch_started ], + "job-done": [ self._on_fetch_done ]}] + + def _run(self): + print "starting fetch..." + self.fetcher_id = Fetcher.fetch(self.fetch_tasks, observers = self.observers, wait = True) + print "fetch finished!" + +class FeedUpdateResult(object): + def __init__(self, error, feed, fetch_result): + self.error = error + self.feed = feed + self.fetch_result = fetch_result + +JobManager.register_handler(FeedUpdateJobHandler) + +def update(feeds, observers): + update = Job("feed-update") + update.tasks = feeds + update.observers = observers + return JobManager.start(update) + +def stop(id): + JobManager.stop(id) diff --git a/straw/Fetcher.py b/straw/Fetcher.py new file mode 100644 index 0000000..9e97fa8 --- /dev/null +++ b/straw/Fetcher.py @@ -0,0 +1,181 @@ +""" Fetcher.py + +""" + +__copyright__ = "Copyright (c) 2002-2005 Free Software Foundation, Inc." +__license__ = """ +Straw is free software; you can redistribute it and/or modify it under the +terms of the GNU General Public License as published by the Free Software +Foundation; either version 2 of the License, or (at your option) any later +version. + +Straw is distributed in the hope that it will be useful, but WITHOUT ANY +WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR +A PARTICULAR PURPOSE. See the GNU General Public License for more details. + +You should have received a copy of the GNU General Public License along with +this program; if not, write to the Free Software Foundation, Inc., 59 Temple +Place - Suite 330, Boston, MA 02111-1307, USA. """ + +from JobManager import Job, TaskResult, TaskThread, ThreadPoolJobHandler +from threading import Semaphore +import Config +import JobManager +import httplib2 +import os +import urlparse + +def _get_url_schema(url): + return urlparse.urlsplit(url, "http")[0] + +class Fetcher(object): + def __init__(self): + pass + + def supports(self, **kwargs): + raise NotImplementedError + + def __str__(self): + return "[%s]" % self.url + +class FetchTask(object): + def __init__(self, fetcher, url, user_data): + self.fetcher = fetcher + self.url = url + self.user_data = user_data + +class FetchResult(object): + def __init__(self, content, error): + self.content = content + self.error = error + +class HttpFetcher(Fetcher): + def __init__(self): + Fetcher.__init__(self) + + def supports(self, **kwargs): + schema = _get_url_schema(kwargs["url"]) + return schema == "http" or schema == "https" + + def create_task(self, **kwargs): + return HttpFetchTask(self, **kwargs) + + def fetch(self, task): + url = task.url + CACHE_DIR = os.path.join(Config.straw_home(), 'cache') + h = httplib2.Http(CACHE_DIR) + content = h.request(url, "GET")[1] + error = None + return FetchResult(content, error) + +class HttpFetchTask(FetchTask): + def __init__(self, fetcher, **kwargs): + FetchTask.__init__(self, fetcher, kwargs["url"], kwargs["user_data"]) + +class FileFetchTask(FetchTask): + def __init__(self, fetcher, **kwargs): + FetchTask.__init__(self, fetcher, kwargs["url"], kwargs["user_data"]) + +class FileFetcher(Fetcher): + def __init__(self): + Fetcher.__init__(self) + + def supports(self, **kwargs): + schema = _get_url_schema(kwargs["url"]) + return schema == "file" + + def create_task(self, **kwargs): + return FileFetchTask(self, **kwargs) + + def fetch(self, task): + url = task.url + content = None + error = None + + try: + f = open(url[len("file://"):], "r") + content = f.read() + f.close() + except Exception, e: + error = e + + return FetchResult(content, error) + +_file_fetcher = FileFetcher() +_http_fetcher = HttpFetcher() + +class FetchJobHandler(ThreadPoolJobHandler): + job_id = "fetch" + + def __init__(self, id, job): + ThreadPoolJobHandler.__init__(self, id, job) + + self.pool_size = 5 + self.task_class = FetchTaskThread + + def _prepare(self): + for task in self.job.tasks: + self.task_queue.put(task) + +class FetchTaskThread(TaskThread): + def __init__(self, handler): + TaskThread.__init__(self, handler) + + def _process(self, task): + return task.fetcher.fetch(task) + +class FetcherUsingJobHandler(ThreadPoolJobHandler): + def __init__(self, id, job): + ThreadPoolJobHandler.__init__(self, id, job) + + self.fetcher_id = None + self.fetcher_running = Semaphore(0) + + def _on_fetch_started(self, handler, task): + pass + + def _on_fetch_done(self, handler, data): + self.fetcher_id = None + self.fetcher_running.release() + + def _on_url_fetched(self, handler, task_result): + self.task_queue.put((task_result.task.user_data, task_result.result)) + + def _prepare_fetch_tasks(self): + raise NotImplementedError + + def _prepare(self): + fetch_tasks = self._prepare_fetch_tasks() + self.job_size = len(fetch_tasks) + + observers = [{ "task-done": [ self._on_url_fetched ], + "task-start": [ self._on_fetch_started ], + "job-done": [ self._on_fetch_done ]}] + + self.fetcher_id = fetch(fetch_tasks, observers = observers) + + def _shutdown(self): + self.fetcher_running.acquire() + ThreadPoolJobHandler._shutdown(self) + + def stop(self): + if self.fetcher_id: + stop(self.fetcher_id) + + ThreadPoolJobHandler.stop(self) + +JobManager.register_handler(FetchJobHandler) + +def create_task(**kwargs): + for fetcher in [ _http_fetcher, _file_fetcher ]: + if fetcher.supports(**kwargs): + return fetcher.create_task(**kwargs) + +def fetch(tasks, observers = {}, wait = False): + job = Job("fetch") + job.tasks = tasks + job.observers = observers + return JobManager.start(job, wait = wait) + +def stop(id): + JobManager.stop(id) diff --git a/straw/JobManager.py b/straw/JobManager.py index 9756e11..49df3ee 100644 --- a/straw/JobManager.py +++ b/straw/JobManager.py @@ -22,12 +22,12 @@ Place - Suite 330, Boston, MA 02111-1307, USA. """ from straw.Queue import Queue, Empty # our copy of the python 2.5 version from gobject import GObject, SIGNAL_RUN_FIRST -from threading import Thread, Lock +from threading import Thread import error import gobject import time -class JobManager(object): +class JobManager(GObject): """ Main entry point for the subsystem. JobManager is responsible managing JobHandlers, starting/stopping jobs. @@ -38,27 +38,40 @@ class JobManager(object): self.handlers = {} self.active = {} + def _get_id(self): + return time.time() + def register_handler(self, clazz): self.handlers[clazz.job_id] = clazz - def start_job(self, job, wait = False): - t = HandlerThread(self.handlers[job.job_id](job)) - - self.active[job.job_id] = t + def start(self, job, wait = False): + id = self._get_id() + handler = self.handlers[job.job_id](id, job) + handler_thread = HandlerThread(handler) - t.start() + handler.connect("cleanup", self._on_cleanup) + self.active[id] = handler_thread + handler_thread.start() if wait: - t.join() + handler_thread.join() + + return id + + def stop(self, id): + self.active[id].stop() def stop_jobs(self): for handler_thread in self.active.values(): handler_thread.stop() + def _on_cleanup(self, handler, data): + del self.active[handler.id] + class Job(object): def __init__(self, job_id): self.job_id = job_id - self.data = None + self.tasks = None self.observers = None class JobHandler(GObject): @@ -76,14 +89,17 @@ class JobHandler(GObject): __gsignals__ = { 'job-done' : (gobject.SIGNAL_RUN_LAST, gobject.TYPE_NONE, (gobject.TYPE_PYOBJECT,)), 'task-done' : (gobject.SIGNAL_RUN_LAST, gobject.TYPE_NONE, (gobject.TYPE_PYOBJECT,)), - 'task-start' : (gobject.SIGNAL_RUN_LAST, gobject.TYPE_NONE, (gobject.TYPE_PYOBJECT,)) + 'task-start' : (gobject.SIGNAL_RUN_LAST, gobject.TYPE_NONE, (gobject.TYPE_PYOBJECT,)), + 'cleanup' : (gobject.SIGNAL_RUN_LAST, gobject.TYPE_NONE, (gobject.TYPE_PYOBJECT,)) } - def __init__(self, job): + def __init__(self, id, job): gobject.GObject.__init__(self) + self.id = id self.job = job self.task_queue = Queue() self.result_queue = Queue() + self.job_size = None if job.observers != None: for observer_dict in job.observers: @@ -92,70 +108,43 @@ class JobHandler(GObject): self.connect(signal, callable) def start(self): - self._split() + self._prepare() self._run() + self._shutdown() + self._cleanup() def stop(self): raise NotImplementedError - def _split(self): + def _prepare(self): raise NotImplementedError def _run(self): raise NotImplementedError - def _notify(self, event, data): + def _shutdown(self): + pass + + def _cleanup(self): + self._notify("cleanup") + + def _notify(self, event, data = None): self.emit(event, data) def _post_result(self, task, result): - self.result_queue.push(result) - - def _prepare_result(self): - return self.result_queue + self.result_queue.put(result) class HandlerThread(Thread): def __init__(self, handler): Thread.__init__(self) self.handler = handler - self.control_queue = Queue() - self.stopped = False def run(self): - worker = HandlerWorkerThread(self) - worker.start() - - self.control_queue.put_nowait("STARTED") - error.debug("CONTROL -- waiting on control queue") - self.control_queue.join() - error.debug("CONTROL -- joined control queue") - - for t in self.handler.task_threads: - t.job_done() - - for t in self.handler.task_threads: - t.join() - - while not self.handler.task_queue.empty(): - self.handler.task_queue.get_nowait() - self.handler.task_queue.task_done() + self.handler.start() def stop(self): - error.debug("CONTROL -- STOP") - - if not self.stopped: - self.control_queue.task_done() - - self.stopped = True - -class HandlerWorkerThread(Thread): - def __init__(self, handler_thread): - Thread.__init__(self) - self.handler_thread = handler_thread - - def run(self): - self.handler_thread.handler.start() - self.handler_thread.stop() + self.handler.stop() class ThreadPoolJobHandler(JobHandler): """ @@ -165,108 +154,126 @@ class ThreadPoolJobHandler(JobHandler): """ - def __init__(self, job): - JobHandler.__init__(self, job) + def __init__(self, id, job): + JobHandler.__init__(self, id, job) self.pool_size = 3 + self.running_queue = Queue() self.task_threads = [] self.task_class = None - self.queue_lock = Lock() + + def _debug(self, msg): + error.debug("%s %s" % (self.job_id, msg)) def _run(self): + if not self.job_size: + self.job_size = self.task_queue.qsize() + for i in xrange(self.pool_size): - t = self.task_class(self, self.task_queue, self.result_queue) + t = self.task_class(self) t.setName(str(i)) t.start() self.task_threads.append(t) - error.debug("pool -- created threads, now waiting on the queue") - - self.task_queue.join() + self._debug("created %d thread(s), waits for results" % len(self.task_threads)) + + for t in xrange(self.job_size): + self._debug("waiting...") + task_result = self.result_queue.get() + self._debug("got result [%s]" % str(task_result)) + + if task_result: + self._notify("task-done", task_result) + else: + self._debug("stopping...") + break - error.debug("pool -- work is finished, wait for workers") + self._debug("got the job done, waits for %d worker(s)" % len(self.task_threads)) for t in self.task_threads: - t.job_done() - t.join() + self.task_queue.put("job-done") + self._debug("put job-done") - error.debug("pool -- done, sending job-done") + self.running_queue.join() - self._notify("job-done", self._prepare_result()) + def _shutdown(self): + self._debug("shutting down...") + self._notify("job-done") -class TaskInfo: - def __init__(self, id, data): - self.id = id - self.data = data + def stop(self): + for t in self.task_threads: + t.job_done() class TaskResult: - def __init__(self, task_info, result): - self.task_info = task_info + def __init__(self, task, result): + self.task = task self.result = result - def get(self): - raise NotImplementedError - class TaskThread(Thread): - def __init__(self, handler, task_queue, result_queue): + def __init__(self, handler): self.handler = handler - self.task_queue = task_queue - self.result_queue = result_queue + self.task_queue = handler.task_queue + self.result_queue = handler.result_queue + self.running_queue = handler.running_queue self.quit = False Thread.__init__(self) + def _debug(self, msg): + error.debug("%s-%s %s" % (self.handler.job_id, self.getName(), msg)) + def run(self): - while 1: - error.debug(self.getName() + " trying to acquire lock!") - self.handler.queue_lock.acquire() - error.debug(self.getName() + " acquired lock!") + self.running_queue.put(True) - if self.quit or self.task_queue.empty(): - self.handler.queue_lock.release() - error.debug(self.getName() + " is breaking!") - break + while 1: + self._debug("waiting to get a task") - task = None + task = self.task_queue.get() - try: - task = self.task_queue.get_nowait() - except Empty: - error.debug(self.getName() + " missed the party!") + self._debug("got a task [%s]" % str(task)[:10]) - self.handler.queue_lock.release() - error.debug(self.getName() + " released lock!") + if self.quit or task == "job-done": + self._debug("breaking") + self._post_result("job-done", None) + break if task: - error.debug(self.getName() + " is munching task with id = %d" % task.id) + self._debug("munching task [%s]" % str(task)[:10]) self._task_started(task) result = None try: result = self._process(task) except Exception, e: - error.debug(self.getName() + " threw exception while processing task with id = %d [%s]" % (task.id, str(e))) - error.log_exc("Details:") + self._debug(" threw exception while processing task [%s] [e = %s]" % (str(task)[:10], str(e))) + #error.log_exc("Details:") self._post_result(task, result) + self.running_queue.get() + self.running_queue.task_done() + self._debug("terminating, bye") + def job_done(self): - error.debug(self.getName() + " got job_done") + self._debug("got job_done") self.quit = True def _post_result(self, task, result): - error.debug(self.getName() + " is posting result to task with id = %d" % task.id) + if task == "job-done": + self.result_queue.put(None) + self._debug("posted None") + else: + self._debug("posting result to task [%s]" % str(task)[:10]) - task_result = TaskResult(task, result) - self.result_queue.put_nowait(task_result) - self.task_queue.task_done() - self.handler._notify("task-done", task_result) + task_result = TaskResult(task, result) + self.result_queue.put(task_result) + self.task_queue.task_done() - def _prepare_task_info(self, task_info): - return task_info + def _prepare_task(self, task): + return task def _task_started(self, task): - self.handler._notify("task-start", self._prepare_task_info(task)) + self.handler._notify("task-start", self._prepare_task(task)) def _process(self, task): raise NotImplementedError @@ -282,8 +289,11 @@ def _get_instance(): def register_handler(clazz): _get_instance().register_handler(clazz) -def start_job(job, wait = False): - _get_instance().start_job(job, wait) +def start(job, wait = False): + return _get_instance().start(job, wait) + +def stop(id): + _get_instance().stop(id) def stop_jobs(): _get_instance().stop_jobs() diff --git a/straw/__init__.py b/straw/__init__.py index 73ab84f..033bf1f 100644 --- a/straw/__init__.py +++ b/straw/__init__.py @@ -4,3 +4,4 @@ FS_OK = 1 FS_IDLE = 2 FS_UPDATING = 4 +FS_ERROR = 8 diff --git a/straw/opml.py b/straw/opml.py index 67e8ea3..ea303ac 100644 --- a/straw/opml.py +++ b/straw/opml.py @@ -18,14 +18,15 @@ You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. """ - -from straw.JobManager import Job, TaskThread, TaskInfo, ThreadPoolJobHandler +from Fetcher import FetchTask +from straw.JobManager import Job, TaskThread, ThreadPoolJobHandler from straw.model import Category, Feed from threading import Lock from xml.sax import saxutils, make_parser, SAXParseException from xml.sax.handler import feature_namespaces, feature_namespace_prefixes from xml.sax.saxutils import XMLGenerator from xml.sax.xmlreader import AttributesImpl +import Fetcher import gnomevfs import straw.JobManager as JobManager import sys @@ -44,15 +45,25 @@ class OPMLParseJobHandler(ThreadPoolJobHandler): self.pool_size = 1 self.task_class = OPMLParseTaskThread - def _split(self): - ti = TaskInfo(0, { "file_path": self.job.data[0], "category": self.job.data[1] }) - self.task_queue.put(ti) + def _on_url_fetched(self, handler, task_result): + self.task_queue.put((task_result.task.user_data, task_result.result)) + + def _prepare(self): + category = self.job.data[1] + url = self.job.data[0] + self.job_size = 1 + fetch_tasks = [FetchTask(url = url, user_data = category)] + observers = [{ "task-done": [ self._on_url_fetched ]}] + Fetcher.fetch(fetch_tasks, observers = observers) - def _prepare_result(self): - task_result = self.result_queue.get() - category = task_result.task_info.data["category"] - tree = self._build_tree(task_result.result.roots(), parent = category) - return (tree, task_result.task_info.data["category"]) +class OPMLParseTaskThread(TaskThread): + def __init__(self, handler): + TaskThread.__init__(self, handler) + + def _process(self, task): + opml = read(task[1]) + tree = self._build_tree(opml.roots(), parent = task[0]) + return tree def _build_tree(self, outlines, parent = None): save_list = [] @@ -107,21 +118,6 @@ class OPMLParseJobHandler(ThreadPoolJobHandler): return save_list -class OPMLParseTaskThread(TaskThread): - def __init__(self, handler, task_queue, result_queue): - TaskThread.__init__(self, handler, task_queue, result_queue) - - def _process(self, task): - opml = None - - try: - fstream = open(task.data["file_path"]) - opml = read(fstream) - except Exception, inst: - print inst - - return opml - JobManager.register_handler(OPMLParseJobHandler) class OPML(dict): @@ -249,12 +245,13 @@ class OPMLHandler(xml.sax.handler.ContentHandler): return self._outlines def parse(stream): - parser = make_parser() + """parser = make_parser() parser.setFeature(feature_namespaces, 0) handler = OPMLHandler() - parser.setContentHandler(handler) - parser.parse(stream) - + parser.setContentHandler(handler)""" + handler = OPMLHandler() + xml.sax.parseString(stream, handler) + print handler.get_outlines() return handler.get_outlines() def export(root, filename): @@ -342,11 +339,8 @@ def find_entries(outlines): return entries def read(stream): - try: - o = parse(stream) - return o - except ValueError: - return None + o = parse(stream) + return o entries = find_entries(o.outlines) ret = list() edict = dict() diff --git a/straw/socks/BUGS b/straw/socks/BUGS new file mode 100644 index 0000000..2412351 --- /dev/null +++ b/straw/socks/BUGS @@ -0,0 +1,25 @@ +SocksiPy version 1.00 +A Python SOCKS module. +(C) 2006 Dan-Haim. All rights reserved. +See LICENSE file for details. + + +KNOWN BUGS AND ISSUES +---------------------- + +There are no currently known bugs in this module. +There are some limits though: + +1) Only outgoing connections are supported - This module currently only supports +outgoing TCP connections, though some servers may support incoming connections +as well. UDP is not supported either. + +2) GSSAPI Socks5 authenticaion is not supported. + + +If you find any new bugs, please contact the author at: + +negativeiq@users.sourceforge.net + + +Thank you! diff --git a/straw/socks/LICENSE b/straw/socks/LICENSE new file mode 100644 index 0000000..fc33078 --- /dev/null +++ b/straw/socks/LICENSE @@ -0,0 +1,22 @@ +Copyright 2006 Dan-Haim. All rights reserved. + +Redistribution and use in source and binary forms, with or without modification, +are permitted provided that the following conditions are met: +1. Redistributions of source code must retain the above copyright notice, this + list of conditions and the following disclaimer. +2. Redistributions in binary form must reproduce the above copyright notice, + this list of conditions and the following disclaimer in the documentation + and/or other materials provided with the distribution. +3. Neither the name of Dan Haim nor the names of his contributors may be used + to endorse or promote products derived from this software without specific + prior written permission. + +THIS SOFTWARE IS PROVIDED BY DAN HAIM "AS IS" AND ANY EXPRESS OR IMPLIED +WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF +MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO +EVENT SHALL DAN HAIM OR HIS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, +INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA +OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF +LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT +OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMANGE. diff --git a/straw/socks/README b/straw/socks/README new file mode 100644 index 0000000..deae6f6 --- /dev/null +++ b/straw/socks/README @@ -0,0 +1,201 @@ +SocksiPy version 1.00 +A Python SOCKS module. +(C) 2006 Dan-Haim. All rights reserved. +See LICENSE file for details. + + +WHAT IS A SOCKS PROXY? +A SOCKS proxy is a proxy server at the TCP level. In other words, it acts as +a tunnel, relaying all traffic going through it without modifying it. +SOCKS proxies can be used to relay traffic using any network protocol that +uses TCP. + +WHAT IS SOCKSIPY? +This Python module allows you to create TCP connections through a SOCKS +proxy without any special effort. + +PROXY COMPATIBILITY +SocksiPy is compatible with three different types of proxies: +1. SOCKS Version 4 (Socks4), including the Socks4a extension. +2. SOCKS Version 5 (Socks5). +3. HTTP Proxies which support tunneling using the CONNECT method. + +SYSTEM REQUIREMENTS +Being written in Python, SocksiPy can run on any platform that has a Python +interpreter and TCP/IP support. +This module has been tested with Python 2.3 and should work with greater versions +just as well. + + +INSTALLATION +------------- + +Simply copy the file "socks.py" to your Python's lib/site-packages directory, +and you're ready to go. + + +USAGE +------ + +First load the socks module with the command: + +>>> import socks +>>> + +The socks module provides a class called "socksocket", which is the base to +all of the module's functionality. +The socksocket object has the same initialization parameters as the normal socket +object to ensure maximal compatibility, however it should be noted that socksocket +will only function with family being AF_INET and type being SOCK_STREAM. +Generally, it is best to initialize the socksocket object with no parameters + +>>> s = socks.socksocket() +>>> + +The socksocket object has an interface which is very similiar to socket's (in fact +the socksocket class is derived from socket) with a few extra methods. +To select the proxy server you would like to use, use the setproxy method, whose +syntax is: + +setproxy(proxytype, addr[, port[, rdns[, username[, password]]]]) + +Explaination of the parameters: + +proxytype - The type of the proxy server. This can be one of three possible +choices: PROXY_TYPE_SOCKS4, PROXY_TYPE_SOCKS5 and PROXY_TYPE_HTTP for Socks4, +Socks5 and HTTP servers respectively. + +addr - The IP address or DNS name of the proxy server. + +port - The port of the proxy server. Defaults to 1080 for socks and 8080 for http. + +rdns - This is a boolean flag than modifies the behavior regarding DNS resolving. +If it is set to True, DNS resolving will be preformed remotely, on the server. +If it is set to False, DNS resolving will be preformed locally. Please note that +setting this to True with Socks4 servers actually use an extension to the protocol, +called Socks4a, which may not be supported on all servers (Socks5 and http servers +always support DNS). The default is True. + +username - For Socks5 servers, this allows simple username / password authentication +with the server. For Socks4 servers, this parameter will be sent as the userid. +This parameter is ignored if an HTTP server is being used. If it is not provided, +authentication will not be used (servers may accept unauthentication requests). + +password - This parameter is valid only for Socks5 servers and specifies the +respective password for the username provided. + +Example of usage: + +>>> s.setproxy(socks.PROXY_TYPE_SOCKS5,"socks.example.com") +>>> + +After the setproxy method has been called, simply call the connect method with the +traditional parameters to establish a connection through the proxy: + +>>> s.connect(("www.sourceforge.net",80)) +>>> + +Connection will take a bit longer to allow negotiation with the proxy server. +Please note that calling connect without calling setproxy earlier will connect +without a proxy (just like a regular socket). + +Errors: Any errors in the connection process will trigger exceptions. The exception +may either be generated by the underlying socket layer or may be custom module +exceptions, whose details follow: + +class ProxyError - This is a base exception class. It is not raised directly but +rather all other exception classes raised by this module are derived from it. +This allows an easy way to catch all proxy-related errors. + +class GeneralProxyError - When thrown, it indicates a problem which does not fall +into another category. The parameter is a tuple containing an error code and a +description of the error, from the following list: +1 - invalid data - This error means that unexpected data has been received from +the server. The most common reason is that the server specified as the proxy is +not really a Socks4/Socks5/HTTP proxy, or maybe the proxy type specified is wrong. +4 - bad proxy type - This will be raised if the type of the proxy supplied to the +setproxy function was not PROXY_TYPE_SOCKS4/PROXY_TYPE_SOCKS5/PROXY_TYPE_HTTP. +5 - bad input - This will be raised if the connect method is called with bad input +parameters. + +class Socks5AuthError - This indicates that the connection through a Socks5 server +failed due to an authentication problem. The parameter is a tuple containing a +code and a description message according to the following list: + +1 - authentication is required - This will happen if you use a Socks5 server which +requires authentication without providing a username / password at all. +2 - all offered authentication methods were rejected - This will happen if the proxy +requires a special authentication method which is not supported by this module. +3 - unknown username or invalid password - Self descriptive. + +class Socks5Error - This will be raised for Socks5 errors which are not related to +authentication. The parameter is a tuple containing a code and a description of the +error, as given by the server. The possible errors, according to the RFC are: + +1 - General SOCKS server failure - If for any reason the proxy server is unable to +fulfill your request (internal server error). +2 - connection not allowed by ruleset - If the address you're trying to connect to +is blacklisted on the server or requires authentication. +3 - Network unreachable - The target could not be contacted. A router on the network +had replied with a destination net unreachable error. +4 - Host unreachable - The target could not be contacted. A router on the network +had replied with a destination host unreachable error. +5 - Connection refused - The target server has actively refused the connection +(the requested port is closed). +6 - TTL expired - The TTL value of the SYN packet from the proxy to the target server +has expired. This usually means that there are network problems causing the packet +to be caught in a router-to-router "ping-pong". +7 - Command not supported - The client has issued an invalid command. When using this +module, this error should not occur. +8 - Address type not supported - The client has provided an invalid address type. +When using this module, this error should not occur. + +class Socks4Error - This will be raised for Socks4 errors. The parameter is a tuple +containing a code and a description of the error, as given by the server. The +possible error, according to the specification are: + +1 - Request rejected or failed - Will be raised in the event of an failure for any +reason other then the two mentioned next. +2 - request rejected because SOCKS server cannot connect to identd on the client - +The Socks server had tried an ident lookup on your computer and has failed. In this +case you should run an identd server and/or configure your firewall to allow incoming +connections to local port 113 from the remote server. +3 - request rejected because the client program and identd report different user-ids - +The Socks server had performed an ident lookup on your computer and has received a +different userid than the one you have provided. Change your userid (through the +username parameter of the setproxy method) to match and try again. + +class HTTPError - This will be raised for HTTP errors. The parameter is a tuple +containing the HTTP status code and the description of the server. + + +After establishing the connection, the object behaves like a standard socket. +Call the close method to close the connection. + +In addition to the socksocket class, an additional function worth mentioning is the +setdefaultproxy function. The parameters are the same as the setproxy method. +This function will set default proxy settings for newly created socksocket objects, +in which the proxy settings haven't been changed via the setproxy method. +This is quite useful if you wish to force 3rd party modules to use a socks proxy, +by overriding the socket object. +For example: + +>>> socks.setdefaultproxy(socks.PROXY_TYPE_SOCKS5,"socks.example.com") +>>> socket.socket = socks.socksocket +>>> urllib.urlopen("http://www.sourceforge.net/") + + +PROBLEMS +--------- + +If you have any problems using this module, please first refer to the BUGS file +(containing current bugs and issues). If your problem is not mentioned you may +contact the author at the following E-Mail address: + +negativeiq@users.sourceforge.net + +Please allow some time for your question to be received and handled. + + +Dan-Haim, +Author. diff --git a/straw/socks/socks.py b/straw/socks/socks.py new file mode 100644 index 0000000..7cc1bc3 --- /dev/null +++ b/straw/socks/socks.py @@ -0,0 +1,387 @@ +"""SocksiPy - Python SOCKS module. +Version 1.00 + +Copyright 2006 Dan-Haim. All rights reserved. + +Redistribution and use in source and binary forms, with or without modification, +are permitted provided that the following conditions are met: +1. Redistributions of source code must retain the above copyright notice, this + list of conditions and the following disclaimer. +2. Redistributions in binary form must reproduce the above copyright notice, + this list of conditions and the following disclaimer in the documentation + and/or other materials provided with the distribution. +3. Neither the name of Dan Haim nor the names of his contributors may be used + to endorse or promote products derived from this software without specific + prior written permission. + +THIS SOFTWARE IS PROVIDED BY DAN HAIM "AS IS" AND ANY EXPRESS OR IMPLIED +WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF +MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO +EVENT SHALL DAN HAIM OR HIS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, +INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA +OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF +LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT +OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMANGE. + + +This module provides a standard socket-like interface for Python +for tunneling connections through SOCKS proxies. + +""" + +import socket +import struct + +PROXY_TYPE_SOCKS4 = 1 +PROXY_TYPE_SOCKS5 = 2 +PROXY_TYPE_HTTP = 3 + +_defaultproxy = None +_orgsocket = socket.socket + +class ProxyError(Exception): + def __init__(self, value): + self.value = value + def __str__(self): + return repr(self.value) + +class GeneralProxyError(ProxyError): + def __init__(self, value): + self.value = value + def __str__(self): + return repr(self.value) + +class Socks5AuthError(ProxyError): + def __init__(self, value): + self.value = value + def __str__(self): + return repr(self.value) + +class Socks5Error(ProxyError): + def __init__(self, value): + self.value = value + def __str__(self): + return repr(self.value) + +class Socks4Error(ProxyError): + def __init__(self, value): + self.value = value + def __str__(self): + return repr(self.value) + +class HTTPError(ProxyError): + def __init__(self, value): + self.value = value + def __str__(self): + return repr(self.value) + +_generalerrors = ("success", + "invalid data", + "not connected", + "not available", + "bad proxy type", + "bad input") + +_socks5errors = ("succeeded", + "general SOCKS server failure", + "connection not allowed by ruleset", + "Network unreachable", + "Host unreachable", + "Connection refused", + "TTL expired", + "Command not supported", + "Address type not supported", + "Unknown error") + +_socks5autherrors = ("succeeded", + "authentication is required", + "all offered authentication methods were rejected", + "unknown username or invalid password", + "unknown error") + +_socks4errors = ("request granted", + "request rejected or failed", + "request rejected because SOCKS server cannot connect to identd on the client", + "request rejected because the client program and identd report different user-ids", + "unknown error") + +def setdefaultproxy(proxytype=None,addr=None,port=None,rdns=True,username=None,password=None): + """setdefaultproxy(proxytype, addr[, port[, rdns[, username[, password]]]]) + Sets a default proxy which all further socksocket objects will use, + unless explicitly changed. + """ + global _defaultproxy + _defaultproxy = (proxytype,addr,port,rdns,username,password) + +class socksocket(socket.socket): + """socksocket([family[, type[, proto]]]) -> socket object + + Open a SOCKS enabled socket. The parameters are the same as + those of the standard socket init. In order for SOCKS to work, + you must specify family=AF_INET, type=SOCK_STREAM and proto=0. + """ + + def __init__(self, family=socket.AF_INET, type=socket.SOCK_STREAM, proto=0, _sock=None): + _orgsocket.__init__(self,family,type,proto,_sock) + if _defaultproxy != None: + self.__proxy = _defaultproxy + else: + self.__proxy = (None, None, None, None, None, None) + self.__proxysockname = None + self.__proxypeername = None + + def __recvall(self, bytes): + """__recvall(bytes) -> data + Receive EXACTLY the number of bytes requested from the socket. + Blocks until the required number of bytes have been received. + """ + data = "" + while len(data) < bytes: + data = data + self.recv(bytes-len(data)) + return data + + def setproxy(self,proxytype=None,addr=None,port=None,rdns=True,username=None,password=None): + """setproxy(proxytype, addr[, port[, rdns[, username[, password]]]]) + Sets the proxy to be used. + proxytype - The type of the proxy to be used. Three types + are supported: PROXY_TYPE_SOCKS4 (including socks4a), + PROXY_TYPE_SOCKS5 and PROXY_TYPE_HTTP + addr - The address of the server (IP or DNS). + port - The port of the server. Defaults to 1080 for SOCKS + servers and 8080 for HTTP proxy servers. + rdns - Should DNS queries be preformed on the remote side + (rather than the local side). The default is True. + Note: This has no effect with SOCKS4 servers. + username - Username to authenticate with to the server. + The default is no authentication. + password - Password to authenticate with to the server. + Only relevant when username is also provided. + """ + self.__proxy = (proxytype,addr,port,rdns,username,password) + + def __negotiatesocks5(self,destaddr,destport): + """__negotiatesocks5(self,destaddr,destport) + Negotiates a connection through a SOCKS5 server. + """ + # First we'll send the authentication packages we support. + if (self.__proxy[4]!=None) and (self.__proxy[5]!=None): + # The username/password details were supplied to the + # setproxy method so we support the USERNAME/PASSWORD + # authentication (in addition to the standard none). + self.sendall("\x05\x02\x00\x02") + else: + # No username/password were entered, therefore we + # only support connections with no authentication. + self.sendall("\x05\x01\x00") + # We'll receive the server's response to determine which + # method was selected + chosenauth = self.__recvall(2) + if chosenauth[0] != "\x05": + self.close() + raise GeneralProxyError((1,_generalerrors[1])) + # Check the chosen authentication method + if chosenauth[1] == "\x00": + # No authentication is required + pass + elif chosenauth[1] == "\x02": + # Okay, we need to perform a basic username/password + # authentication. + self.sendall("\x01" + chr(len(self.__proxy[4])) + self.__proxy[4] + chr(len(self.proxy[5])) + self.__proxy[5]) + authstat = self.__recvall(2) + if authstat[0] != "\x01": + # Bad response + self.close() + raise GeneralProxyError((1,_generalerrors[1])) + if authstat[1] != "\x00": + # Authentication failed + self.close() + raise Socks5AuthError,((3,_socks5autherrors[3])) + # Authentication succeeded + else: + # Reaching here is always bad + self.close() + if chosenauth[1] == "\xFF": + raise Socks5AuthError((2,_socks5autherrors[2])) + else: + raise GeneralProxyError((1,_generalerrors[1])) + # Now we can request the actual connection + req = "\x05\x01\x00" + # If the given destination address is an IP address, we'll + # use the IPv4 address request even if remote resolving was specified. + try: + ipaddr = socket.inet_aton(destaddr) + req = req + "\x01" + ipaddr + except socket.error: + # Well it's not an IP number, so it's probably a DNS name. + if self.__proxy[3]==True: + # Resolve remotely + ipaddr = None + req = req + "\x03" + chr(len(destaddr)) + destaddr + else: + # Resolve locally + ipaddr = socket.inet_aton(socket.gethostbyname(destaddr)) + req = req + "\x01" + ipaddr + req = req + struct.pack(">H",destport) + self.sendall(req) + # Get the response + resp = self.__recvall(4) + if resp[0] != "\x05": + self.close() + raise GeneralProxyError((1,_generalerrors[1])) + elif resp[1] != "\x00": + # Connection failed + self.close() + if ord(resp[1])<=8: + raise Socks5Error(ord(resp[1]),_generalerrors[ord(resp[1])]) + else: + raise Socks5Error(9,_generalerrors[9]) + # Get the bound address/port + elif resp[3] == "\x01": + boundaddr = self.__recvall(4) + elif resp[3] == "\x03": + resp = resp + self.recv(1) + boundaddr = self.__recvall(resp[4]) + else: + self.close() + raise GeneralProxyError((1,_generalerrors[1])) + boundport = struct.unpack(">H",self.__recvall(2))[0] + self.__proxysockname = (boundaddr,boundport) + if ipaddr != None: + self.__proxypeername = (socket.inet_ntoa(ipaddr),destport) + else: + self.__proxypeername = (destaddr,destport) + + def getproxysockname(self): + """getsockname() -> address info + Returns the bound IP address and port number at the proxy. + """ + return self.__proxysockname + + def getproxypeername(self): + """getproxypeername() -> address info + Returns the IP and port number of the proxy. + """ + return _orgsocket.getpeername(self) + + def getpeername(self): + """getpeername() -> address info + Returns the IP address and port number of the destination + machine (note: getproxypeername returns the proxy) + """ + return self.__proxypeername + + def __negotiatesocks4(self,destaddr,destport): + """__negotiatesocks4(self,destaddr,destport) + Negotiates a connection through a SOCKS4 server. + """ + # Check if the destination address provided is an IP address + rmtrslv = False + try: + ipaddr = socket.inet_aton(destaddr) + except socket.error: + # It's a DNS name. Check where it should be resolved. + if self.__proxy[3]==True: + ipaddr = "\x00\x00\x00\x01" + rmtrslv = True + else: + ipaddr = socket.inet_aton(socket.gethostbyname(destaddr)) + # Construct the request packet + req = "\x04\x01" + struct.pack(">H",destport) + ipaddr + # The username parameter is considered userid for SOCKS4 + if self.__proxy[4] != None: + req = req + self.__proxy[4] + req = req + "\x00" + # DNS name if remote resolving is required + # NOTE: This is actually an extension to the SOCKS4 protocol + # called SOCKS4A and may not be supported in all cases. + if rmtrslv==True: + req = req + destaddr + "\x00" + self.sendall(req) + # Get the response from the server + resp = self.__recvall(8) + if resp[0] != "\x00": + # Bad data + self.close() + raise GeneralProxyError((1,_generalerrors[1])) + if resp[1] != "\x5A": + # Server returned an error + self.close() + if ord(resp[1]) in (91,92,93): + self.close() + raise Socks4Error((ord(resp[1]),_socks4errors[ord(resp[1])-90])) + else: + raise Socks4Error((94,_socks4errors[4])) + # Get the bound address/port + self.__proxysockname = (socket.inet_ntoa(resp[4:]),struct.unpack(">H",resp[2:4])[0]) + if rmtrslv != None: + self.__proxypeername = (socket.inet_ntoa(ipaddr),destport) + else: + self.__proxypeername = (destaddr,destport) + + def __negotiatehttp(self,destaddr,destport): + """__negotiatehttp(self,destaddr,destport) + Negotiates a connection through an HTTP server. + """ + # If we need to resolve locally, we do this now + if self.__proxy[3] == False: + addr = socket.gethostbyname(destaddr) + else: + addr = destaddr + self.sendall("CONNECT " + addr + ":" + str(destport) + " HTTP/1.1\r\n" + "Host: " + destaddr + "\r\n\r\n") + # We read the response until we get the string "\r\n\r\n" + resp = self.recv(1) + while resp.find("\r\n\r\n")==-1: + resp = resp + self.recv(1) + # We just need the first line to check if the connection + # was successful + statusline = resp.splitlines()[0].split(" ",2) + if statusline[0] not in ("HTTP/1.0","HTTP/1.1"): + self.close() + raise GeneralProxyError((1,_generalerrors[1])) + try: + statuscode = int(statusline[1]) + except ValueError: + self.close() + raise GeneralProxyError((1,_generalerrors[1])) + if statuscode != 200: + self.close() + raise HTTPError((statuscode,statusline[2])) + self.__proxysockname = ("0.0.0.0",0) + self.__proxypeername = (addr,destport) + + def connect(self,destpair): + """connect(self,despair) + Connects to the specified destination through a proxy. + destpar - A tuple of the IP/DNS address and the port number. + (identical to socket's connect). + To select the proxy server use setproxy(). + """ + # Do a minimal input check first + if (type(destpair) in (list,tuple)==False) or (len(destpair)<2) or (type(destpair[0])!=str) or (type(destpair[1])!=int): + raise GeneralProxyError((5,_generalerrors[5])) + if self.__proxy[0] == PROXY_TYPE_SOCKS5: + if self.__proxy[2] != None: + portnum = self.__proxy[2] + else: + portnum = 1080 + _orgsocket.connect(self,(self.__proxy[1],portnum)) + self.__negotiatesocks5(destpair[0],destpair[1]) + elif self.__proxy[0] == PROXY_TYPE_SOCKS4: + if self.__proxy[2] != None: + portnum = self.__proxy[2] + else: + portnum = 1080 + _orgsocket.connect(self,(self.__proxy[1],portnum)) + self.__negotiatesocks4(destpair[0],destpair[1]) + elif self.__proxy[0] == PROXY_TYPE_HTTP: + if self.__proxy[2] != None: + portnum = self.__proxy[2] + else: + portnum = 8080 + _orgsocket.connect(self,(self.__proxy[1],portnum)) + self.__negotiatehttp(destpair[0],destpair[1]) + elif self.__proxy[0] == None: + _orgsocket.connect(self,(destpair[0],destpair[1])) + else: + raise GeneralProxyError((4,_generalerrors[4])) diff --git a/straw/subscribe.py b/straw/subscribe.py index ce0c179..a70767e 100644 --- a/straw/subscribe.py +++ b/straw/subscribe.py @@ -18,7 +18,7 @@ You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. """ - +from ConfigOptions import * from gtk.glade import XML import Config import FeedManager @@ -36,7 +36,6 @@ import time import urllib import urlparse import xml - pygtk.require('2.0') STATE_INTRO = 1 @@ -85,17 +84,18 @@ class SubscribeView(MVP.GladeView): password = self._password_entry.get_text() password = password.strip() - config = Config.get_instance() - if config.offline: + if Config.get(OPTION_OFFLINE): self._window.hide() response = helpers.report_offline_status(self._window) + if response == gtk.RESPONSE_CANCEL: self.show() return False - config.offline = not config.offline + + Config.set(OPTION_OFFLINE, True) import FeedDiscovery - FeedDiscovery.discover(location, [ { "job-done": [ self._discovery_finished ] } ]) + FeedDiscovery.discover(location, [ { "task-done": [ self._discovery_finished ] } ]) #self._presenter.subscribe(location, username, password) return True @@ -222,8 +222,9 @@ class SubscribeView(MVP.GladeView): self._window.hide() - def _discovery_finished(self, handler, feeds): + def _discovery_finished(self, handler, task_result): gtk.gdk.threads_enter() + feeds = task_result.result self._result_size_label.set_text(self._result_size_text % len(feeds)) diff --git a/test/TestFeedparser.py b/test/TestFeedparser.py deleted file mode 100644 index 78c2c4b..0000000 --- a/test/TestFeedparser.py +++ /dev/null @@ -1,100 +0,0 @@ -""" Feedparser threading problem testcase - -This is a simple (non-scientific :P) testcase that helps reproduce the problem we -encountered when developing Straw feed reader. - -The problem probably isn't specific to feedparser, but rather to SAX parser or -sgmllib (with both this problem occurs) that feedparser uses. - -To run the testcase, enter the main Straw directory and execute the following command: - -:~ nosetests -s test/TestFeedparser.py - -Note: if using CountingThread, it's best to redirect test output to a file. Like this: - -:~ nosetests -s test/TestFeedparser.py > result.txt - -Description of this testcase: - -- there are sample feeds in TEST_DATA_PATH directory -- testcase passes these feeds to feedparser's parse method -- before than testcase launches either GUIThread (Straw's main window from Glade file) - or CountingThread (simple thread that counts :-) - -To observe the problem, do the following: - -- GUIThread - open File menu and hold arrow down key - selection in the menu should - cycle smoothly, but it freezes from time to time - this effect can be also observed - in other parts of the GUI - -- CountingThread - set some interval (default is 0.05), dump the counting results into - a file and observe that delays are often more than 0.1 or the other interval that has - been set - -It is possible to control test CPU load by modifying sleep interval in parsing loop. - -Also try uncommenting the line "while 1: pass". In that case, CPU load is constantly at -100%, but the background thread is working smoothly. - -If you have any ideas on what might be the cause of this problem, please come to #straw -at irc.freenode.net and kindly enlighten us ;-) - -""" - -from gtk import glade -from straw import feedparser -from threading import Thread -import gtk -import os -import time - -TEST_DATA_PATH = "test/feeds/" - -class TestFeedparser(object): - def setUp(self): - pass - - def tearDown(self): - pass - - def testPerformance(self): - gtk.gdk.threads_init() - gtk.gdk.threads_enter() - - t = CountingThread() - #t = GUIThread() - - t.start() - - while 1: pass - - for file in os.listdir(TEST_DATA_PATH): - #time.sleep(0.1) - fl = open(TEST_DATA_PATH + file, "r") - content = fl.read() - fl.close() - - feedparser.parse(content) - - t.join() - - gtk.gdk.threads_leave() - -class GUIThread(Thread): - def __init__(self): - Thread.__init__(self) - - def run(self): - xml = glade.XML('data/straw.glade', "straw_main") - window = xml.get_widget('straw_main') - window.show_all() - gtk.main() - -class CountingThread(Thread): - def __init__(self): - Thread.__init__(self) - - def run(self): - while 1: - time.sleep(0.05) - print time.time() -- 2.11.4.GIT