Moved URLFetch to threaded by default - removed nonthreaded versions
[straw.git] / src / lib / URLFetch.py
blobba728154b30c8a93930815e00c74ec38836e51fa
1 """ URLFetch_threaded.py
3 Module for retrieving data from a URL (a threaded version using urllib2).
5 """
6 __copyright__ = "Copyright (c) 2007 Straw developers"
7 __license__ = """
8 Straw is free software; you can redistribute it and/or modify it under the
9 terms of the GNU General Public License as published by the Free Software
10 Foundation; either version 2 of the License, or (at your option) any later
11 version.
13 Straw is distributed in the hope that it will be useful, but WITHOUT ANY
14 WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
15 A PARTICULAR PURPOSE. See the GNU General Public License for more details.
17 You should have received a copy of the GNU General Public License along with
18 this program; if not, write to the Free Software Foundation, Inc., 59 Temple
19 Place - Suite 330, Boston, MA 02111-1307, USA. """
21 from MainloopManager import schedule
22 import NetworkConstants
23 try:
24 import constants
25 except:
26 class constants:
27 VERSION = "URLFetch_threaded"
29 import Config
31 import threading
32 import urllib2
33 from sys import exc_info
35 BUFSIZE = 8*1024
37 class CancelledException(Exception):
38 """Operation is cancelled."""
39 pass
41 class ConnectionThread(threading.Thread):
42 """A thread that fetches a URL XXX fetch several URLs"""
43 def __init__(self, uri, consumer, headers, handlers, group=None, target=None, name=None, *args, **kwargs):
44 threading.Thread.__init__(self, group, target, name, args, kwargs)
45 self.setDaemon(True)
47 self._uri = uri
48 self._consumer = consumer
49 self._headers = headers
50 self._handlers = handlers
52 self._cancelled = threading.Event()
54 def run(self):
55 """The main loop of the thread"""
56 print "Fetching %s..." % self._uri
57 try:
58 self._handle_request()
59 finally:
60 schedule(get_instance().request_done, self)
61 print "Finished with %s" % self._uri
63 def cooperate(self):
64 """
65 This should be called periodically in the thread execution.
66 The method checks whether cancellation has been requested
67 and if so, raises CancelledException.
68 """
69 if self._cancelled.isSet():
70 raise CancelledException
72 def _handle_request(self):
73 try:
74 self.cooperate()
75 request = urllib2.Request(self._uri, headers=self._headers)
77 # DNS lookup, connect, headers: ?
78 opener = urllib2.build_opener(*self._handlers)
79 self.cooperate()
80 try:
81 f = opener.open(request)
82 except urllib2.HTTPError, exception:
83 for name in 'read','code','msg','info':
84 if not hasattr(exception, name):
85 raise
86 f = exception # HTTP error document can be loaded
88 try:
89 self.cooperate()
90 data = ""
91 while True:
92 block = f.read(BUFSIZE)
93 if not block:
94 break
96 data += block # XXX option to save directly as a file
97 self.cooperate()
99 finally:
100 f.close()
101 except CancelledException:
102 schedule(self._consumer.operation_stopped)
103 except:
104 try:
105 self.cooperate() # last chance to notice cancellation
106 except CancelledException:
107 schedule(self._consumer.operation_stopped)
108 else:
109 schedule(self._consumer.http_failed, exc_info()[1])
110 else:
111 if not hasattr(f, 'code'): # fake for non-http
112 f.code = 200
113 f.msg = "OK"
114 schedule(self._consumer.http_results, (None, f.code, f.msg), f.info(), data)
116 def cancel(self):
118 This can be called to cancel the request.
119 inter-thread safe but not instant
120 XXX network operations can take a long time to timeout
121 XXX call operation_stopped instantly?
123 self._cancelled.set()
125 class ConnectionManager:
126 """A manager for threads that fetch URLs"""
127 def __init__(self):
128 self._starting = [] # requests waiting to be started
129 self._active = [] # requests running right now
131 def request(self, uri, consumer, headers={}, user=None, password=None, priority=NetworkConstants.PRIORITY_DEFAULT):
133 handlers = []
134 handlers += [urllib2.HTTPHandler(debuglevel=1)] # XXX
136 if user and password:
137 mgr = urllib2.HTTPPasswordMgrWithDefaultRealm()
138 mgr.add_password(None, uri, user, password) # XXX redirects?
139 handlers += [urllib2.HTTPBasicAuthHandler(mgr),
140 urllib2.HTTPDigestAuthHandler(mgr)] # XXX right ordering?
142 pc = Config.get_instance().proxy_config
143 if pc.use:
144 proxy_uri = 'http://%s:%s' % (pc.host, pc.port)
145 proxies = {'http': proxy_uri}
146 print proxies
147 handlers.append(urllib2.ProxyHandler(proxies))
148 if pc.use_authentication: # FIXME doesn't work
149 mgr = urllib2.HTTPPasswordMgrWithDefaultRealm()
150 mgr.add_password(None, proxy_uri, pc.user, pc.password)
151 handlers += [urllib2.ProxyBasicAuthHandler(mgr),]
152 # interferes: urllib2.ProxyDigestAuthHandler(mgr),] # XXX right ordering?
154 # XXX no decompression yet: headers['Accept-encoding'] = 'gzip'
155 headers['User-agent'] = 'Straw/%s' % constants.VERSION
157 thread = ConnectionThread(uri, consumer, headers, handlers)
159 if len(self._active) < NetworkConstants.MAX_CONNECTIONS:
160 self._active.append(thread)
161 thread.start()
162 else:
163 self._starting.append(thread)
165 return thread.cancel # inter-thread safe
167 def poll(self, timeout=0.1):
168 """Supports LookupManager polling XXX for now"""
169 import LookupManager
170 LookupManager.get_instance().poll(timeout)
172 def request_done(self, request):
173 """Called by the request when it is finished."""
174 self._active.remove(request)
175 if self._starting and len(self._active) < NetworkConstants.MAX_CONNECTIONS:
176 thread = self._starting.pop(0)
177 self._active.append(thread)
178 thread.start()
180 def create_instance():
181 return ConnectionManager()
183 connection_manager_instance = None
184 def get_instance():
185 global connection_manager_instance
186 if connection_manager_instance is None:
187 connection_manager_instance = create_instance()
188 return connection_manager_instance
190 if __name__ == '__main__':
191 import sys, gobject
192 gobject.threads_init()
194 class Stub:
195 def http_results(self, status, info, data):
196 print status
197 print info
198 print "%s bytes of content" % len(data)
199 def http_permanent_redirect(self, location):
200 print "Redirected to %s" % location
201 def http_failed(self, e):
202 print str(e)
203 def operation_stopped(self):
204 print "Operation stopped"
206 for uri in sys.argv[1:]:
207 get_instance().request(uri, Stub())
209 try:
210 gobject.MainLoop().run()
211 finally:
212 print get_instance()._queue