Need to check how proxies interact with pooling
[zeroinstall.git] / zeroinstall / injector / wget.py
blob1a3ff543db6b419e806ea8b4a8e8b74686ee38a1
1 # Copyright (C) 2011, Aleksey Lim
2 # See the README file for details, or visit http://0install.net.
4 import os
5 import sys
6 import json
7 import atexit
8 import thread
9 import socket
10 import urllib2
11 import urlparse
12 import threading
13 from select import select
14 from httplib import HTTPConnection, HTTPException
17 PAGE_SIZE = 4096
18 # Convenient way to set maximum number of workers and maximum number
19 # of simultaneous connections per domain at the same time
20 # 15 is a Nettiquete..
21 MAX_RUN_WORKERS_AND_POOL = 15
23 _queue = None
24 _resolve_cache = {}
25 _proxy_support = None
28 def start(url, modification_time, outfile, receiver):
29 _init()
30 _queue.push({'requested_url': url,
31 'modification_time': modification_time,
32 'outfile': outfile,
33 'receiver': receiver,
37 def _split_hostport(host):
38 i = host.rfind(':')
39 j = host.rfind(']') # ipv6 addresses have [...]
40 if i > j:
41 try:
42 port = int(host[i+1:])
43 except ValueError:
44 raise InvalidURL("nonnumeric port: '%s'" % host[i+1:]) # XXX
45 host = host[:i]
46 else:
47 port = self.default_port
48 if host and host[0] == '[' and host[-1] == ']':
49 host = host[1:-1]
50 return host, port
53 def abort(url):
54 _init()
55 _queue.abort(url)
58 def shutdown():
59 global _queue
60 if _queue is not None:
61 _queue.clear()
62 _queue = None
65 def _init():
66 global _queue, _proxy_support
68 if _queue is not None:
69 return
71 # XXX: only works if the HTTP handler is itself HTTP.
72 # We could subclass ProxyHandler and override open to avoid this.
73 _proxy_support = urllib2.ProxyHandler()
75 _queue = _RequestsQueue()
76 atexit.register(shutdown)
79 class _RequestsQueue(object):
81 def __init__(self):
82 self._mutex = threading.Lock()
83 self._condition = threading.Condition(self._mutex)
84 self._workers = []
85 self._requests = {}
86 self._requests_in_process = {}
87 self._workders_in_wait = 0
88 self._pool = _ConnectionsPool()
89 self._exiting = False
91 def push(self, request):
92 worker = None
94 self._mutex.acquire()
95 try:
96 self._requests[request['requested_url']] = request
97 if self._workders_in_wait:
98 self._condition.notify()
99 if len(self._workers) < MAX_RUN_WORKERS_AND_POOL:
100 worker = _Worker(self._pop)
101 self._workers.append(worker)
102 finally:
103 self._mutex.release()
105 if worker is not None:
106 worker.start()
108 def abort(self, url):
109 self._mutex.acquire()
110 try:
111 if url in self._requests:
112 del self._requests[url]
113 if url in self._requests_in_process:
114 self._requests_in_process[url].close()
115 finally:
116 self._mutex.release()
118 def clear(self):
119 self._mutex.acquire()
120 try:
121 self._exiting = True
122 self._requests.clear()
123 for connection in self._requests_in_process.values():
124 connection.close()
125 self._condition.notify_all()
126 finally:
127 self._mutex.release()
129 def _pop(self, prev_connection):
130 self._mutex.acquire()
131 try:
132 if prev_connection is not None:
133 del self._requests_in_process[
134 prev_connection.requested['requested_url']]
135 self._pool.push(prev_connection)
137 if hasattr(prev_connection, 'redirect'):
138 location_url, request = prev_connection.redirect
139 delattr(prev_connection, 'redirect')
140 else:
141 while not self._requests:
142 if self._exiting:
143 return None, None, None
144 self._workders_in_wait += 1
145 self._condition.wait()
146 self._workders_in_wait -= 1
147 location_url, request = self._requests.popitem()
149 req = urllib2.Request(location_url)
151 # XXX: Is this the right place to handle proxies? The connections-per-site
152 # limit and pooling should apply to the final site (the proxy should not
153 # be treated as a site in it own right).
154 meth = req.get_type() + '_open'
155 new_request = getattr(_proxy_support, meth)(req)
156 if new_request:
157 req = new_request
159 # XXX: loses authn information
160 host, port = _split_hostport(req.get_host())
161 connection_url = (req.get_type(), host, port)
163 connection = self._pool.pop(connection_url)
164 finally:
165 self._mutex.release()
167 request['location_url'] = location_url
168 request['connection_url'] = connection_url
170 scheme, host, port = connection_url
171 if connection is None and scheme == 'http':
172 connection = HTTPConnection(_resolve(host), port)
174 if connection is None:
175 openner = _urllib_openner
176 else:
177 connection.requested = request
178 self._requests_in_process[request['requested_url']] = connection
179 openner = _http_openner
181 return request, connection, openner
184 class _Redirect(Exception):
186 def __init__(self, location):
187 self.location = location
190 class _ConnectionsPool(object):
192 def __init__(self):
193 self._connections = {}
195 def __iter__(self):
196 for i in self._connections.values():
197 yield i
199 def __getitem__(self, connection_url):
200 pool = self._connections.get(connection_url)
201 if pool is None:
202 pool = self._connections[connection_url] = []
203 return pool
205 def push(self, connection):
206 if connection is None:
207 return
208 pool = self[connection.requested['connection_url']]
209 # That should not happen because max number of workers is equal to
210 # max number of simultaneous connections per domain
211 assert len(pool) <= MAX_RUN_WORKERS_AND_POOL
212 pool.insert(0, connection)
214 def pop(self, connection_url):
215 pool = self[connection_url]
216 if pool:
217 connection = pool.pop()
218 if isinstance(connection, HTTPConnection) and \
219 connection.sock is not None and \
220 select([connection.sock], [], [], 0.0)[0]:
221 # Either data is buffered (bad), or the connection is dropped
222 connection.close()
223 return connection
226 class _Worker(threading.Thread):
228 def __init__(self, pop_request_cb):
229 threading.Thread.__init__(self)
230 # To not wait for the thread on process exit
231 self.daemon = True
232 self._pop_request = pop_request_cb
234 def run(self):
235 try:
236 connection = None
237 while True:
238 request, connection, openner = self._pop_request(connection)
239 if openner is None:
240 break
242 try:
243 status, reason = openner(connection, request)
244 exception = None
245 except _Redirect, redirect:
246 connection.redirect = (redirect.location, request)
247 continue
248 except (urllib2.HTTPError, urllib2.URLError, HTTPException, socket.error) as ex:
249 if isinstance(ex, urllib2.HTTPError):
250 status = ex.status
251 else:
252 status = None
253 reason = '%s %r' % (ex, request)
254 __, ex, tb = sys.exc_info()
255 import download # XXX
256 from zeroinstall import _ # XXX
257 exception = (download.DownloadError(_('Error downloading {url}: {ex}').format(url = request, ex = ex)), tb)
258 except Exception, error:
259 __, ex, tb = sys.exc_info()
260 exception = (ex, tb)
262 request['receiver'].emit('done', status, reason, exception)
263 except KeyboardInterrupt, e:
264 thread.interrupt_main()
265 raise
266 except Exception, e:
267 thread.interrupt_main()
268 raise
271 def _http_openner(connection, request):
272 headers = {'connection': 'keep-alive'}
273 if request.get('modification_time'):
274 headers['If-Modified-Since'] = request['modification_time']
275 connection.request('GET', request['location_url'])
277 response = connection.getresponse()
278 try:
279 # Handle redirection
280 if response.status in [301, 302, 303, 307] and \
281 response.getheader('location'):
282 raise _Redirect(response.getheader('location'))
283 if response.status == 200:
284 _read_file(request, response)
285 return response.status, response.reason
286 finally:
287 response.close()
290 def _urllib_openner(connection, request):
291 url_request = urllib2.Request(request['location_url'])
292 if request['location_url'].startswith('http:') and \
293 request.get('modification_time'):
294 url_request.add_header(
295 'If-Modified-Since', request['modification_time'])
297 response = urllib2.urlopen(url_request)
298 try:
299 _read_file(request, response)
300 finally:
301 response.close()
303 return 200, None
306 def _read_file(request, response):
307 while True:
308 data = response.read(PAGE_SIZE)
309 if not data:
310 request['outfile'].flush()
311 break
312 request['outfile'].write(data)
315 def _resolve(hostname):
316 addr = _resolve_cache.get(hostname)
317 if not addr:
318 addrinfo = socket.getaddrinfo(hostname, 0)[0]
319 addr = _resolve_cache[hostname] = addrinfo[4][0]
320 return addr