1 """ URLFetch_threaded.py
3 Module for retrieving data from a URL (a threaded version using urllib2).
6 __copyright__
= "Copyright (c) 2007 Straw developers"
8 Straw is free software; you can redistribute it and/or modify it under the
9 terms of the GNU General Public License as published by the Free Software
10 Foundation; either version 2 of the License, or (at your option) any later
13 Straw is distributed in the hope that it will be useful, but WITHOUT ANY
14 WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
15 A PARTICULAR PURPOSE. See the GNU General Public License for more details.
17 You should have received a copy of the GNU General Public License along with
18 this program; if not, write to the Free Software Foundation, Inc., 59 Temple
19 Place - Suite 330, Boston, MA 02111-1307, USA. """
21 from MainloopManager
import schedule
22 import NetworkConstants
27 VERSION
= "URLFetch_threaded"
33 from sys
import exc_info
37 class CancelledException(Exception):
38 """Operation is cancelled."""
41 class ConnectionThread(threading
.Thread
):
42 """A thread that fetches a URL XXX fetch several URLs"""
43 def __init__(self
, uri
, consumer
, headers
, handlers
, group
=None, target
=None, name
=None, *args
, **kwargs
):
44 threading
.Thread
.__init
__(self
, group
, target
, name
, args
, kwargs
)
48 self
._consumer
= consumer
49 self
._headers
= headers
50 self
._handlers
= handlers
52 self
._cancelled
= threading
.Event()
55 """The main loop of the thread"""
56 print "Fetching %s..." % self
._uri
58 self
._handle
_request
()
60 schedule(get_instance().request_done
, self
)
61 print "Finished with %s" % self
._uri
65 This should be called periodically in the thread execution.
66 The method checks whether cancellation has been requested
67 and if so, raises CancelledException.
69 if self
._cancelled
.isSet():
70 raise CancelledException
72 def _handle_request(self
):
75 request
= urllib2
.Request(self
._uri
, headers
=self
._headers
)
77 # DNS lookup, connect, headers: ?
78 opener
= urllib2
.build_opener(*self
._handlers
)
81 f
= opener
.open(request
)
82 except urllib2
.HTTPError
, exception
:
83 for name
in 'read','code','msg','info':
84 if not hasattr(exception
, name
):
86 f
= exception
# HTTP error document can be loaded
92 block
= f
.read(BUFSIZE
)
96 data
+= block
# XXX option to save directly as a file
101 except CancelledException
:
102 schedule(self
._consumer
.operation_stopped
)
105 self
.cooperate() # last chance to notice cancellation
106 except CancelledException
:
107 schedule(self
._consumer
.operation_stopped
)
109 schedule(self
._consumer
.http_failed
, exc_info()[1])
111 if not hasattr(f
, 'code'): # fake for non-http
114 schedule(self
._consumer
.http_results
, (None, f
.code
, f
.msg
), f
.info(), data
)
118 This can be called to cancel the request.
119 inter-thread safe but not instant
120 XXX network operations can take a long time to timeout
121 XXX call operation_stopped instantly?
123 self
._cancelled
.set()
125 class ConnectionManager
:
126 """A manager for threads that fetch URLs"""
128 self
._starting
= [] # requests waiting to be started
129 self
._active
= [] # requests running right now
131 def request(self
, uri
, consumer
, headers
={}, user
=None, password
=None, priority
=NetworkConstants
.PRIORITY_DEFAULT
):
134 handlers
+= [urllib2
.HTTPHandler(debuglevel
=1)] # XXX
136 if user
and password
:
137 mgr
= urllib2
.HTTPPasswordMgrWithDefaultRealm()
138 mgr
.add_password(None, uri
, user
, password
) # XXX redirects?
139 handlers
+= [urllib2
.HTTPBasicAuthHandler(mgr
),
140 urllib2
.HTTPDigestAuthHandler(mgr
)] # XXX right ordering?
142 pc
= Config
.get_instance().proxy_config
144 proxy_uri
= 'http://%s:%s' % (pc
.host
, pc
.port
)
145 proxies
= {'http': proxy_uri
}
147 handlers
.append(urllib2
.ProxyHandler(proxies
))
148 if pc
.use_authentication
: # FIXME doesn't work
149 mgr
= urllib2
.HTTPPasswordMgrWithDefaultRealm()
150 mgr
.add_password(None, proxy_uri
, pc
.user
, pc
.password
)
151 handlers
+= [urllib2
.ProxyBasicAuthHandler(mgr
),]
152 # interferes: urllib2.ProxyDigestAuthHandler(mgr),] # XXX right ordering?
154 # XXX no decompression yet: headers['Accept-encoding'] = 'gzip'
155 headers
['User-agent'] = 'Straw/%s' % constants
.VERSION
157 thread
= ConnectionThread(uri
, consumer
, headers
, handlers
)
159 if len(self
._active
) < NetworkConstants
.MAX_CONNECTIONS
:
160 self
._active
.append(thread
)
163 self
._starting
.append(thread
)
165 return thread
.cancel
# inter-thread safe
167 def poll(self
, timeout
=0.1):
168 """Supports LookupManager polling XXX for now"""
170 LookupManager
.get_instance().poll(timeout
)
172 def request_done(self
, request
):
173 """Called by the request when it is finished."""
174 self
._active
.remove(request
)
175 if self
._starting
and len(self
._active
) < NetworkConstants
.MAX_CONNECTIONS
:
176 thread
= self
._starting
.pop(0)
177 self
._active
.append(thread
)
180 def create_instance():
181 return ConnectionManager()
183 connection_manager_instance
= None
185 global connection_manager_instance
186 if connection_manager_instance
is None:
187 connection_manager_instance
= create_instance()
188 return connection_manager_instance
190 if __name__
== '__main__':
192 gobject
.threads_init()
195 def http_results(self
, status
, info
, data
):
198 print "%s bytes of content" % len(data
)
199 def http_permanent_redirect(self
, location
):
200 print "Redirected to %s" % location
201 def http_failed(self
, e
):
203 def operation_stopped(self
):
204 print "Operation stopped"
206 for uri
in sys
.argv
[1:]:
207 get_instance().request(uri
, Stub())
210 gobject
.MainLoop().run()
212 print get_instance()._queue