1 """ URLFetch_threaded.py
3 Module for retrieving data from a URL (a threaded version using urllib2).
6 __copyright__
= "Copyright (c) 2007 Straw developers"
8 Straw is free software; you can redistribute it and/or modify it under the
9 terms of the GNU General Public License as published by the Free Software
10 Foundation; either version 2 of the License, or (at your option) any later
13 Straw is distributed in the hope that it will be useful, but WITHOUT ANY
14 WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
15 A PARTICULAR PURPOSE. See the GNU General Public License for more details.
17 You should have received a copy of the GNU General Public License along with
18 this program; if not, write to the Free Software Foundation, Inc., 59 Temple
19 Place - Suite 330, Boston, MA 02111-1307, USA. """
21 from MainloopManager
import schedule
22 import NetworkConstants
31 from sys
import exc_info
34 class CancelledException(Exception):
35 """Operation is cancelled."""
38 class ConnectionThread(threading
.Thread
):
39 """A thread that fetches a URL XXX fetch several URLs"""
40 def __init__(self
, uri
, consumer
, headers
, client_object
, group
=None, target
=None, name
=None, *args
, **kwargs
):
41 threading
.Thread
.__init
__(self
, group
, target
, name
, args
, kwargs
)
45 self
._consumer
= consumer
46 self
._headers
= headers
47 self
._httpclient
= client_object
49 self
._cancelled
= threading
.Event()
52 """The main loop of the thread"""
53 print "Fetching %s..." % self
._uri
55 self
._handle
_request
()
57 schedule(get_instance().request_done
, self
)
58 print "Finished with %s" % self
._uri
62 This should be called periodically in the thread execution.
63 The method checks whether cancellation has been requested
64 and if so, raises CancelledException.
66 if self
._cancelled
.isSet():
67 raise CancelledException
69 def _handle_request(self
):
74 (response
, data
) = self
._httpclient
.request(self
._uri
, headers
=self
._headers
)
75 except httplib2
.HttpLib2Error
, exception
:
76 if hasattr(exception
, 'response'):
77 response
= exception
.response
78 elif hasattr(exception
, 'content'):
79 data
= exception
.content
82 except CancelledException
:
83 schedule(self
._consumer
.operation_stopped
)
86 self
.cooperate() # last chance to notice cancellation
87 except CancelledException
:
88 schedule(self
._consumer
.operation_stopped
)
90 schedule(self
._consumer
.http_failed
, exc_info()[1])
92 if not hasattr(response
, 'status'): # fake for non-http
95 schedule(self
._consumer
.http_results
, (None, response
.status
, response
.reason
), data
)
99 This can be called to cancel the request.
100 inter-thread safe but not instant
101 XXX network operations can take a long time to timeout
102 XXX call operation_stopped instantly?
104 self
._cancelled
.set()
106 class ConnectionManager
:
107 """A manager for threads that fetch URLs"""
109 CACHE_DIR
= os
.path
.join(Config
.straw_home(), 'cache')
112 self
._starting
= [] # requests waiting to be started
113 self
._active
= [] # requests running right now
115 def request(self
, uri
, consumer
, headers
={}, user
=None, password
=None, priority
=NetworkConstants
.PRIORITY_DEFAULT
):
117 httpclient
= httplib2
.Http(ConnectionManager
.CACHE_DIR
)
119 if user
and password
:
120 httpclient
.add_credentials(user
,password
)
122 config
= Config
.get_instance()
125 httpclient
.set_proxy(proxy
.host
, proxy
.port
)
129 headers
['user-agent'] = 'Straw/%s' % constants
.VERSION
131 thread
= ConnectionThread(uri
, consumer
, headers
, httpclient
)
133 if len(self
._active
) < NetworkConstants
.MAX_CONNECTIONS
:
134 self
._active
.append(thread
)
137 self
._starting
.append(thread
)
139 return thread
.cancel
# inter-thread safe
141 def request_done(self
, request
):
142 """Called by the request when it is finished."""
143 self
._active
.remove(request
)
144 if self
._starting
and len(self
._active
) < NetworkConstants
.MAX_CONNECTIONS
:
145 thread
= self
._starting
.pop(0)
146 self
._active
.append(thread
)
149 def create_instance():
150 return ConnectionManager()
152 connection_manager_instance
= None
154 global connection_manager_instance
155 if connection_manager_instance
is None:
156 connection_manager_instance
= create_instance()
157 return connection_manager_instance
159 if __name__
== '__main__':
161 gobject
.threads_init()
164 def http_results(self
, status
, data
):
167 print "%s bytes of content" % len(data
)
168 def http_permanent_redirect(self
, location
):
169 print "Redirected to %s" % location
170 def http_failed(self
, e
):
172 def operation_stopped(self
):
173 print "Operation stopped"
175 for uri
in sys
.argv
[1:]:
176 get_instance().request(uri
, Stub())
179 gobject
.MainLoop().run()
181 print get_instance()._queue