Fixed proxy handling
[zeroinstall.git] / zeroinstall / injector / wget.py
blob0b701d460b01d4d853f6a9cebc2ff024cb20b76b
1 # Copyright (C) 2011, Aleksey Lim
2 # See the README file for details, or visit http://0install.net.
4 import os
5 import sys
6 import json
7 import atexit
8 import thread
9 import socket
10 import urllib2
11 import urlparse
12 import threading
13 from select import select
14 from httplib import HTTPConnection
17 PAGE_SIZE = 4096
18 # Convenient way to set maximum number of workers and maximum number
19 # of simultaneous connections per domain at the same time
20 # 15 is a Nettiquete..
21 MAX_RUN_WORKERS_AND_POOL = 15
23 _queue = None
24 _http_proxy = None
25 _resolve_cache = {}
28 def start(url, modification_time, fd, receiver):
29 """Queue url to be downloaded, writing the contents to fd.
30 When done, emit the signal "done(sender, status, reason, exception)" on receiver.
31 If modification_time is not None, and the resource hasn't been modified since then,
32 the status may be 304 (Not Modified) and the file is not downloaded."""
33 _init()
34 _queue.push({'requested_url': url,
35 'modification_time': modification_time,
36 'fd': fd,
37 'receiver': receiver,
41 def abort(url):
42 """Stop downloading url (or remove it from the queue if still pending)."""
43 _init()
44 _queue.abort(url)
47 def shutdown():
48 global _queue
49 if _queue is not None:
50 _queue.clear()
51 _queue = None
54 def _init():
55 global _queue, _http_proxy
57 if _queue is not None:
58 return
60 proxy_detector = urllib2.ProxyHandler()
61 if 'http' in proxy_detector.proxies:
62 # XXX: https_proxy?
63 proxy = proxy_detector.proxies['http']
64 if not proxy.startswith('http://'):
65 proxy = 'http://' + proxy
66 _http_proxy = urlparse.urlparse(proxy)
68 _queue = _RequestsQueue()
69 atexit.register(shutdown)
72 class _RequestsQueue(object):
74 def __init__(self):
75 self._mutex = threading.Lock()
76 self._condition = threading.Condition(self._mutex)
77 self._workers = []
78 self._requests = {}
79 self._requests_in_process = {}
80 self._workders_in_wait = 0
81 self._pool = _ConnectionsPool()
82 self._exiting = False
84 def push(self, request):
85 worker = None
87 self._mutex.acquire()
88 try:
89 self._requests[request['requested_url']] = request
90 if self._workders_in_wait:
91 self._condition.notify()
92 if len(self._workers) < MAX_RUN_WORKERS_AND_POOL:
93 worker = _Worker(self._pop)
94 self._workers.append(worker)
95 finally:
96 self._mutex.release()
98 if worker is not None:
99 worker.start()
101 def abort(self, url):
102 self._mutex.acquire()
103 try:
104 if url in self._requests:
105 del self._requests[url]
106 if url in self._requests_in_process:
107 self._requests_in_process[url].close()
108 finally:
109 self._mutex.release()
111 def clear(self):
112 self._mutex.acquire()
113 try:
114 self._exiting = True
115 self._requests.clear()
116 for connection in self._requests_in_process.values():
117 connection.close()
118 self._condition.notify_all()
119 finally:
120 self._mutex.release()
122 def _pop(self, prev_connection):
123 self._mutex.acquire()
124 try:
125 if prev_connection is not None:
126 del self._requests_in_process[
127 prev_connection.requested['requested_url']]
128 self._pool.push(prev_connection)
130 if hasattr(prev_connection, 'redirect'):
131 location_url, request = prev_connection.redirect
132 delattr(prev_connection, 'redirect')
133 else:
134 while not self._requests:
135 if self._exiting:
136 return None, None, None
137 self._workders_in_wait += 1
138 self._condition.wait()
139 self._workders_in_wait -= 1
140 location_url, request = self._requests.popitem()
142 location_parts = urlparse.urlparse(location_url)
143 if _http_proxy.hostname and location_parts.scheme == 'http':
144 connection_url = (location_parts.scheme,
145 _http_proxy.hostname, _http_proxy.port)
146 else:
147 connection_url = (location_parts.scheme,
148 location_parts.hostname, location_parts.port or '80')
149 connection = self._pool.pop(connection_url)
150 finally:
151 self._mutex.release()
153 request['location_url'] = location_url
154 request['connection_url'] = connection_url
156 scheme, host, port = connection_url
157 if connection is None and scheme == 'http':
158 connection = HTTPConnection(_resolve(host), port)
160 if connection is None:
161 openner = _urllib_openner
162 else:
163 connection.requested = request
164 self._requests_in_process[request['requested_url']] = connection
165 openner = _http_openner
167 return request, connection, openner
170 class _Redirect(Exception):
172 def __init__(self, location):
173 self.location = location
176 class _ConnectionsPool(object):
178 def __init__(self):
179 self._connections = {}
181 def __iter__(self):
182 for i in self._connections.values():
183 yield i
185 def __getitem__(self, connection_url):
186 pool = self._connections.get(connection_url)
187 if pool is None:
188 pool = self._connections[connection_url] = []
189 return pool
191 def push(self, connection):
192 if connection is None:
193 return
194 pool = self[connection.requested['connection_url']]
195 # That should not happen because max number of workers is equal to
196 # max number of simultaneous connections per domain
197 assert len(pool) <= MAX_RUN_WORKERS_AND_POOL
198 pool.insert(0, connection)
200 def pop(self, connection_url):
201 pool = self[connection_url]
202 if pool:
203 connection = pool.pop()
204 if isinstance(connection, HTTPConnection) and \
205 connection.sock is not None and \
206 select([connection.sock], [], [], 0.0)[0]:
207 # Either data is buffered (bad), or the connection is dropped
208 connection.close()
209 return connection
212 class _Worker(threading.Thread):
214 def __init__(self, pop_request_cb):
215 threading.Thread.__init__(self)
216 # To not wait for the thread on process exit
217 self.daemon = True
218 self._pop_request = pop_request_cb
220 def run(self):
221 try:
222 connection = None
223 while True:
224 request, connection, openner = self._pop_request(connection)
225 if openner is None:
226 break
228 try:
229 status, reason = openner(connection, request)
230 exception = None
231 except _Redirect, redirect:
232 connection.redirect = (redirect.location, request)
233 continue
234 except Exception, error:
235 if isinstance(error, urllib2.HTTPError):
236 status = error.status
237 else:
238 status = None
239 reason = '%s %r' % (error, request)
240 __, ex, tb = sys.exc_info()
241 exception = (ex, tb)
243 request['receiver']._done_cb(status, reason, exception)
244 except KeyboardInterrupt, e:
245 thread.interrupt_main()
246 raise
247 except Exception, e:
248 thread.interrupt_main()
249 raise
252 def _http_openner(connection, request):
253 headers = {'connection': 'keep-alive'}
254 if request.get('modification_time'):
255 headers['If-Modified-Since'] = request['modification_time']
256 connection.request('GET', request['location_url'])
258 response = connection.getresponse()
259 try:
260 # Handle redirection
261 if response.status in [301, 302, 303, 307] and \
262 response.getheader('location'):
263 raise _Redirect(response.getheader('location'))
264 if response.status == 200:
265 _read_file(request, response)
266 return response.status, response.reason
267 finally:
268 response.close()
271 def _urllib_openner(connection, request):
272 url_request = urllib2.Request(request['location_url'])
273 if request['location_url'].startswith('http:') and \
274 request.get('modification_time'):
275 url_request.add_header(
276 'If-Modified-Since', request['modification_time'])
278 response = urllib2.urlopen(url_request)
280 _read_file(request, response)
282 return 200, None
285 def _read_file(request, response):
286 while True:
287 data = response.read(PAGE_SIZE)
288 if not data:
289 break
290 os.write(request['fd'], data) # XXX: return value ignored
293 def _resolve(hostname):
294 addr = _resolve_cache.get(hostname)
295 if not addr:
296 addrinfo = socket.getaddrinfo(hostname, 0)[0]
297 addr = _resolve_cache[hostname] = addrinfo[4][0]
298 return addr