1 """ URLFetch_threaded.py
3 Module for retrieving data from a URL (a threaded version using urllib2).
6 __copyright__
= "Copyright (c) 2007 Straw developers"
8 Straw is free software; you can redistribute it and/or modify it under the
9 terms of the GNU General Public License as published by the Free Software
10 Foundation; either version 2 of the License, or (at your option) any later
13 Straw is distributed in the hope that it will be useful, but WITHOUT ANY
14 WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
15 A PARTICULAR PURPOSE. See the GNU General Public License for more details.
17 You should have received a copy of the GNU General Public License along with
18 this program; if not, write to the Free Software Foundation, Inc., 59 Temple
19 Place - Suite 330, Boston, MA 02111-1307, USA. """
26 from sys
import exc_info
29 from straw
import MainloopManager
30 from MainloopManager
import schedule
31 from straw
import NetworkConstants
32 from straw
import Config
33 from straw
import defs
35 class CancelledException(Exception):
36 """Operation is cancelled."""
39 class ConnectionThread(threading
.Thread
):
40 """A thread that fetches a URL XXX fetch several URLs"""
41 def __init__(self
, uri
, consumer
, headers
, client_object
, group
=None, target
=None, name
=None, *args
, **kwargs
):
42 threading
.Thread
.__init
__(self
, group
, target
, name
, args
, kwargs
)
46 self
._consumer
= consumer
47 self
._headers
= headers
48 self
._httpclient
= client_object
50 self
._cancelled
= threading
.Event()
53 """The main loop of the thread"""
54 #print "Fetching %s..." % self._uri
56 self
._handle
_request
()
58 schedule(get_instance().request_done
, self
)
59 #print "Finished with %s" % self._uri
63 This should be called periodically in the thread execution.
64 The method checks whether cancellation has been requested
65 and if so, raises CancelledException.
67 if self
._cancelled
.isSet():
68 raise CancelledException
70 def _handle_request(self
):
77 (response
, data
) = self
._httpclient
.request(self
._uri
, headers
=self
._headers
)
78 except httplib2
.HttpLib2Error
, exception
:
79 if hasattr(exception
, 'response'):
80 response
= exception
.response
81 elif hasattr(exception
, 'content'):
82 data
= exception
.content
85 except CancelledException
:
86 schedule(self
._consumer
.operation_stopped
)
89 self
.cooperate() # last chance to notice cancellation
90 except CancelledException
:
91 schedule(self
._consumer
.operation_stopped
)
93 schedule(self
._consumer
.http_failed
, exc_info()[1])
95 if not hasattr(response
, 'status'): # fake for non-http
100 schedule(self
._consumer
.http_results
, (None, response
.status
, response
.reason
), data
)
104 This can be called to cancel the request.
105 inter-thread safe but not instant
106 XXX network operations can take a long time to timeout
107 XXX call operation_stopped instantly?
109 self
._cancelled
.set()
111 class ConnectionManager
:
112 """A manager for threads that fetch URLs"""
114 CACHE_DIR
= os
.path
.join(Config
.straw_home(), 'cache')
117 self
._starting
= [] # requests waiting to be started
118 self
._active
= [] # requests running right now
120 '''def request(self, uri, consumer, headers={}, user=None, password=None, priority=NetworkConstants.PRIORITY_DEFAULT):
123 httpclient = httplib2.Http(ConnectionManager.CACHE_DIR)
125 if user and password:
126 httpclient.add_credentials(user,password)
128 config = Config.get_instance()
131 httpclient.set_proxy(proxy.host, proxy.port)
135 headers['user-agent'] = 'Straw/%s' % defs.VERSION
137 thread = ConnectionThread(uri, consumer, headers, httpclient)
139 if len(self._active) < NetworkConstants.MAX_CONNECTIONS:
140 self._active.append(thread)
143 self._starting.append(thread)
145 return thread.cancel''' # inter-thread safe
147 def request_done(self
, request
):
148 """Called by the request when it is finished."""
149 self
._active
.remove(request
)
150 if self
._starting
and len(self
._active
) < NetworkConstants
.MAX_CONNECTIONS
:
151 thread
= self
._starting
.pop(0)
152 self
._active
.append(thread
)
155 def create_instance():
156 return ConnectionManager()
158 connection_manager_instance
= None
160 global connection_manager_instance
161 if connection_manager_instance
is None:
162 connection_manager_instance
= create_instance()
163 return connection_manager_instance
165 if __name__
== '__main__':
167 gobject
.threads_init()
170 def http_results(self
, status
, data
):
173 print "%s bytes of content" % len(data
)
174 def http_permanent_redirect(self
, location
):
175 print "Redirected to %s" % location
176 def http_failed(self
, e
):
178 def operation_stopped(self
):
179 print "Operation stopped"
181 for uri
in sys
.argv
[1:]:
182 get_instance().request(uri
, Stub())
185 gobject
.MainLoop().run()
187 print get_instance()._queue