From a310742e6970a6dbcf1ce8a2120ef031b3094b3f Mon Sep 17 00:00:00 2001 From: jmalonzo Date: Wed, 9 May 2007 04:47:30 +0000 Subject: [PATCH] merged revision 274 of threaded_io branch in trunk git-svn-id: svn+ssh://svn.gnome.org/svn/straw/trunk@275 141a2093-ea25-0410-9ad2-d44d734a8f13 --- po/bg.po | 46 ++++------ src/lib/Application.py | 3 +- src/lib/ImageCache.py | 2 +- src/lib/LookupManager.py | 2 +- src/lib/MainloopManager.py | 17 ++++ src/lib/NetworkConstants.py | 2 +- src/lib/PollManager.py | 6 +- src/lib/URLFetch.py | 23 +++-- src/lib/URLFetch_threaded.py | 212 +++++++++++++++++++++++++++++++++++++++++++ src/lib/subscribe.py | 11 +-- 10 files changed, 274 insertions(+), 50 deletions(-) create mode 100644 src/lib/URLFetch_threaded.py diff --git a/po/bg.po b/po/bg.po index 22442e2..a0fd616 100644 --- a/po/bg.po +++ b/po/bg.po @@ -9,8 +9,8 @@ msgid "" msgstr "" "Project-Id-Version: straw HEAD\n" "Report-Msgid-Bugs-To: \n" -"POT-Creation-Date: 2007-03-08 09:18+0200\n" -"PO-Revision-Date: 2007-03-08 09:17+0200\n" +"POT-Creation-Date: 2007-01-26 00:58+0200\n" +"PO-Revision-Date: 2007-01-19 17:15+0200\n" "Last-Translator: Yavor Doganov \n" "Language-Team: Bulgarian \n" "MIME-Version: 1.0\n" @@ -253,7 +253,7 @@ msgstr "Straw" msgid "Straw Preferences" msgstr "Настройки на Straw" -#: ../glade/straw.glade.h:60 ../src/lib/subscribe.py:180 +#: ../glade/straw.glade.h:60 ../src/lib/subscribe.py:173 msgid "Subscribe" msgstr "Абониране" @@ -415,8 +415,8 @@ msgstr "Грешка в емисията:" msgid "Next Refresh: %s" msgstr "Следваща актуализация: %s" -#: ../src/lib/Application.py:155 ../src/lib/subscribe.py:422 -#: ../src/lib/subscribe.py:423 +#: ../src/lib/Application.py:155 ../src/lib/subscribe.py:415 +#: ../src/lib/subscribe.py:416 #, python-format msgid "%s" msgstr "%s" @@ -731,62 +731,52 @@ msgstr "Не е въведен адрес на емисията" msgid "Please provide the URL of the feed you are trying to subscribe to." msgstr "Осигурете адрес на емисията, за която се опитвате да се абонирате." -#. bad feed url -#: ../src/lib/subscribe.py:84 -msgid "Bad Feed URL" -msgstr "Грешен адрес на емисия" - #: ../src/lib/subscribe.py:85 -#, python-format -msgid "Straw wasn't able to parse '%s'" -msgstr "Straw не съумя да анализира „%s“" - -#: ../src/lib/subscribe.py:92 msgid "Unsupported Scheme" msgstr "Неподдържана схема" -#: ../src/lib/subscribe.py:93 +#: ../src/lib/subscribe.py:86 #, python-format msgid "Subscribing to '%s://' is not supported" msgstr "Абонирането за „%s://“ не се поддържа" -#: ../src/lib/subscribe.py:185 +#: ../src/lib/subscribe.py:178 msgid "Title" msgstr "Заглавие" -#: ../src/lib/subscribe.py:337 +#: ../src/lib/subscribe.py:330 #, python-format msgid "Searching %s" msgstr "Търсене на „%s“" -#: ../src/lib/subscribe.py:341 +#: ../src/lib/subscribe.py:334 #, python-format msgid "Searching %s. This may take a while..." msgstr "Търсене на „%s“. Това може да отнеме време..." -#: ../src/lib/subscribe.py:367 +#: ../src/lib/subscribe.py:360 msgid "Unexpected Error Occurred" msgstr "Възникна неочаквана грешка" -#: ../src/lib/subscribe.py:375 +#: ../src/lib/subscribe.py:368 msgid "Unable to Find Feeds" msgstr "Емисиите не могат да бъдат намерени" -#: ../src/lib/subscribe.py:376 +#: ../src/lib/subscribe.py:369 #, python-format msgid "Straw was unable to find feeds in %s" msgstr "Не могат да бъдат намерени емисиите в %s" -#: ../src/lib/subscribe.py:398 +#: ../src/lib/subscribe.py:391 #, python-format msgid "Processing %d of %d feeds" msgstr "Обработване на %d от %d емисии" -#: ../src/lib/subscribe.py:418 +#: ../src/lib/subscribe.py:411 msgid "Error While Subscribing" msgstr "Грешка при абонирането" -#: ../src/lib/subscribe.py:518 +#: ../src/lib/subscribe.py:511 msgid "No Data" msgstr "Няма данни" @@ -827,15 +817,15 @@ msgstr "Емисията е празна." #. are strftime(3) parameters, the whole string is passed to the #. function, Straw does no additional interpretation) if you feel #. it's necessary in the translation file. -#: ../src/lib/utils.py:110 +#: ../src/lib/utils.py:112 msgid "%A %B %e %H:%M" msgstr "%A, %e %B, %H:%M" -#: ../src/lib/utils.py:180 +#: ../src/lib/utils.py:182 msgid "An error occurred while trying to open link" msgstr "Възникна грешка при отварянето на връзката" -#: ../src/lib/utils.py:181 +#: ../src/lib/utils.py:183 #, python-format msgid "" "There was a problem opening '%s'\n" diff --git a/src/lib/Application.py b/src/lib/Application.py index 79b4879..0569e6b 100644 --- a/src/lib/Application.py +++ b/src/lib/Application.py @@ -840,9 +840,10 @@ class Application: threads = os.getenv('STRAW_THREAD_DNS') is not None if threads: try: - gtk.threads_init() + gobject.threads_init() except: threads = False + print "Thread init failed" else: threads = True diff --git a/src/lib/ImageCache.py b/src/lib/ImageCache.py index f0448c0..3f6d1c7 100644 --- a/src/lib/ImageCache.py +++ b/src/lib/ImageCache.py @@ -100,7 +100,7 @@ class Cache(Event.SignalEmitter): if item and item.feed: headers['Referer'] = item.feed.location try: - stopper = URLFetch.connection_manager.request( + stopper = URLFetch.get_instance().request( key, ic, priority=NetworkConstants.PRIORITY_IMAGE, headers=headers) diff --git a/src/lib/LookupManager.py b/src/lib/LookupManager.py index 54afb22..1af9adb 100644 --- a/src/lib/LookupManager.py +++ b/src/lib/LookupManager.py @@ -161,7 +161,7 @@ if _have_adns: cbwrapper.cb(ip) def poll(self, timeout=0.1): - self.queryengine.run(timeout) + self.queryengine.run(0) # Blocking version: use normal lookups, no threads class BlockingLookupManager: diff --git a/src/lib/MainloopManager.py b/src/lib/MainloopManager.py index 63fc8d0..f66f32a 100644 --- a/src/lib/MainloopManager.py +++ b/src/lib/MainloopManager.py @@ -23,6 +23,23 @@ import gtk import gobject import error +def schedule(function, *args, **kw): + """ + Schedules a function call in the main loop thread. + The given arguments and keywords are passed along if any. + Threads may not access Straw and GTK elsewhere. + XXX occasional "Trying re-entry!" warnings + """ + + assert function.__call__ + + def callback(): + function(*args, **kw) + return False # call only once + + gobject.timeout_add(0, callback) # call as soon as possible + + class MainloopManager: instance = None diff --git a/src/lib/NetworkConstants.py b/src/lib/NetworkConstants.py index 1fb287f..2fc1906 100644 --- a/src/lib/NetworkConstants.py +++ b/src/lib/NetworkConstants.py @@ -3,7 +3,7 @@ PRIORITY_RSS = 2 PRIORITY_DEFAULT = 2^32 MAX_CONNECTIONS = 40 -POLL_INTERVAL = 500 +POLL_INTERVAL = 10 POLL_TIMEOUT = 0.05 MAX_DOWNLOAD_SIZE = 2**20 diff --git a/src/lib/PollManager.py b/src/lib/PollManager.py index 66a9c91..3f48369 100644 --- a/src/lib/PollManager.py +++ b/src/lib/PollManager.py @@ -119,7 +119,7 @@ class PollManager: timediff > config_pf > 0))], context, False) except: error.log_exc("Caught an exception while polling") - URLFetch.connection_manager.poll(NetworkConstants.POLL_TIMEOUT) + URLFetch.get_instance().poll(NetworkConstants.POLL_TIMEOUT) return True def poll(self, obj, pollcontext = None): @@ -189,7 +189,7 @@ class FeedPoller: ps = None try: try: - stopper = URLFetch.connection_manager.request( + stopper = URLFetch.get_instance().request( url, self, headers, user, pw, priority=NetworkConstants.PRIORITY_RSS) ps = PollStopper(stopper, self._feed) @@ -281,7 +281,7 @@ class CategoryPoller: ps = None try: try: - stopper = URLFetch.connection_manager.request( + stopper = URLFetch.get_instance().request( sub.location, self, headers, sub.username, sub.password, priority=NetworkConstants.PRIORITY_RSS) ps = PollStopper(stopper, self._category) diff --git a/src/lib/URLFetch.py b/src/lib/URLFetch.py index d8dbe57..0b1ac80 100644 --- a/src/lib/URLFetch.py +++ b/src/lib/URLFetch.py @@ -163,11 +163,8 @@ class ConnectionManager: adapter.set_proxy((proxy_config.ip, proxy_config.port)) adapter.start() # keep the network running - now = time.time() lookup_manager.poll(timeout) - timeout -= (time.time() - now) - if timeout > 0.0: - asyncore.poll(timeout) + asyncore.poll(timeout) # time out stuck consumers self.time_out_consumers() # return non-zero if we should keep polling @@ -180,7 +177,21 @@ class ConnectionManager: if now - pc.start_time > NetworkConstants.MAX_DOWNLOAD_TIME: pc.time_exceeded() -connection_manager = ConnectionManager() +def create_instance(): + if Config.get_instance().use_threads: + print "Using threaded IO" + import URLFetch_threaded + return URLFetch_threaded.get_instance() + else: + print "Using asyncore IO" + return ConnectionManager() + +connection_manager_instance = None +def get_instance(): + global connection_manager_instance + if connection_manager_instance is None: + connection_manager_instance = create_instance() + return connection_manager_instance class ConsumerAdapter(object): CREATED = 0 @@ -264,7 +275,7 @@ class ConsumerAdapter(object): if permanent: self.consumer.http_permanent_redirect(location) - connection_manager.request( + connection_manager_instance.request( location, self.consumer, self.request.headers, self.request.user, self.request.password) self._close_connection() diff --git a/src/lib/URLFetch_threaded.py b/src/lib/URLFetch_threaded.py new file mode 100644 index 0000000..93b4786 --- /dev/null +++ b/src/lib/URLFetch_threaded.py @@ -0,0 +1,212 @@ +""" URLFetch_threaded.py + +Module for retrieving data from a URL (a threaded version using urllib2). + +""" +__copyright__ = "Copyright (c) 2006 Straw developers" +__license__ = """ +Straw is free software; you can redistribute it and/or modify it under the +terms of the GNU General Public License as published by the Free Software +Foundation; either version 2 of the License, or (at your option) any later +version. + +Straw is distributed in the hope that it will be useful, but WITHOUT ANY +WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR +A PARTICULAR PURPOSE. See the GNU General Public License for more details. + +You should have received a copy of the GNU General Public License along with +this program; if not, write to the Free Software Foundation, Inc., 59 Temple +Place - Suite 330, Boston, MA 02111-1307, USA. """ + +from MainloopManager import schedule +import NetworkConstants +try: + import constants +except: + class constants: + VERSION = "URLFetch_threaded" + +import Config + +import threading +import urllib2 +from sys import exc_info + +BUFSIZE = 8*1024 + +class CancelledException(Exception): + """Operation is cancelled.""" + pass + +class ConnectionThread(threading.Thread): + """A thread that fetches a URL XXX fetch several URLs""" + def __init__(self, uri, consumer, headers, handlers, group=None, target=None, name=None, *args, **kwargs): + threading.Thread.__init__(self, group, target, name, args, kwargs) + self.setDaemon(True) + + self._uri = uri + self._consumer = consumer + self._headers = headers + self._handlers = handlers + + self._cancelled = threading.Event() + + def run(self): + """The main loop of the thread""" + print "Fetching %s..." % self._uri + try: + self._handle_request() + finally: + schedule(get_instance().request_done, self) + print "Finished with %s" % self._uri + + def cooperate(self): + """ + This should be called periodically in the thread execution. + The method checks whether cancellation has been requested + and if so, raises CancelledException. + """ + if self._cancelled.isSet(): + raise CancelledException + + def _handle_request(self): + try: + self.cooperate() + request = urllib2.Request(self._uri, headers=self._headers) + + # DNS lookup, connect, headers: ? + opener = urllib2.build_opener(*self._handlers) + self.cooperate() + try: + f = opener.open(request) + except urllib2.HTTPError, exception: + for name in 'read','code','msg','info': + if not hasattr(exception, name): + raise + f = exception # HTTP error document can be loaded + + try: + self.cooperate() + data = "" + while True: + block = f.read(BUFSIZE) + if not block: + break + + data += block # XXX option to save directly as a file + self.cooperate() + + finally: + f.close() + except CancelledException: + schedule(self._consumer.operation_stopped) + except: + try: + self.cooperate() # last chance to notice cancellation + except CancelledException: + schedule(self._consumer.operation_stopped) + else: + schedule(self._consumer.http_failed, exc_info()[1]) + else: + if not hasattr(f, 'code'): # fake for non-http + f.code = 200 + f.msg = "OK" + schedule(self._consumer.http_results, (None, f.code, f.msg), f.info(), data) + + def cancel(self): + """ + This can be called to cancel the request. + inter-thread safe but not instant + XXX network operations can take a long time to timeout + XXX call operation_stopped instantly? + """ + self._cancelled.set() + +class ConnectionManager: + """A manager for threads that fetch URLs""" + def __init__(self): + self._starting = [] # requests waiting to be started + self._active = [] # requests running right now + + def request(self, uri, consumer, headers={}, user=None, password=None, priority=NetworkConstants.PRIORITY_DEFAULT): + + handlers = [] + handlers += [urllib2.HTTPHandler(debuglevel=1)] # XXX + + if user and password: + mgr = urllib2.HTTPPasswordMgrWithDefaultRealm() + mgr.add_password(None, uri, user, password) # XXX redirects? + handlers += [urllib2.HTTPBasicAuthHandler(mgr), + urllib2.HTTPDigestAuthHandler(mgr)] # XXX right ordering? + + pc = Config.get_instance().proxy_config + if pc.use: + proxy_uri = 'http://%s:%s' % (pc.host, pc.port) + proxies = {'http': proxy_uri} + print proxies + handlers.append(urllib2.ProxyHandler(proxies)) + if pc.use_authentication: # FIXME doesn't work + mgr = urllib2.HTTPPasswordMgrWithDefaultRealm() + mgr.add_password(None, proxy_uri, pc.user, pc.password) + handlers += [urllib2.ProxyBasicAuthHandler(mgr),] + # interferes: urllib2.ProxyDigestAuthHandler(mgr),] # XXX right ordering? + + # XXX no decompression yet: headers['Accept-encoding'] = 'gzip' + headers['User-agent'] = 'Straw/%s' % constants.VERSION + + thread = ConnectionThread(uri, consumer, headers, handlers) + + if len(self._active) < NetworkConstants.MAX_CONNECTIONS: + self._active.append(thread) + thread.start() + else: + self._starting.append(thread) + + return thread.cancel # inter-thread safe + + def poll(self, timeout=0.1): + """Supports LookupManager polling XXX for now""" + import LookupManager + LookupManager.get_instance().poll(timeout) + + def request_done(self, request): + """Called by the request when it is finished.""" + self._active.remove(request) + if self._starting and len(self._active) < NetworkConstants.MAX_CONNECTIONS: + thread = self._starting.pop(0) + self._active.append(thread) + thread.start() + +def create_instance(): + return ConnectionManager() + +connection_manager_instance = None +def get_instance(): + global connection_manager_instance + if connection_manager_instance is None: + connection_manager_instance = create_instance() + return connection_manager_instance + +if __name__ == '__main__': + import sys, gobject + gobject.threads_init() + + class Stub: + def http_results(self, status, info, data): + print status + print info + print "%s bytes of content" % len(data) + def http_permanent_redirect(self, location): + print "Redirected to %s" % location + def http_failed(self, e): + print str(e) + def operation_stopped(self): + print "Operation stopped" + + for uri in sys.argv[1:]: + get_instance().request(uri, Stub()) + + try: + gobject.MainLoop().run() + finally: + print get_instance()._queue diff --git a/src/lib/subscribe.py b/src/lib/subscribe.py index 5d1ca54..fcf9f71 100644 --- a/src/lib/subscribe.py +++ b/src/lib/subscribe.py @@ -78,14 +78,7 @@ class FeedLocationView(MVP.GladeView): return config.offline = not config.offline self._window.present() - try: - url, uname, pword = self._presenter.split_url(url) - except TypeError: # bad feed url - dialogs.report_error(_("Bad Feed URL"), - _("Straw wasn't able to parse '%s'")%url, - parent=self._window) - return - + url, uname, pword = self._presenter.split_url(url) self._presenter.find_feed(url, uname, pword) def report_error(self, scheme): @@ -482,7 +475,7 @@ class Poller(Event.SignalEmitter): AuthNeededSignal, PollDoneSignal, PollFailedSignal) - self._request_handler = URLFetch.connection_manager + self._request_handler = URLFetch.get_instance() self._request_priority = NetworkConstants.PRIORITY_RSS self._url = None self._username = None -- 2.11.4.GIT