Removed gobject signal code from wget
[zeroinstall.git] / zeroinstall / injector / wget.py
blobeb5c78b278e5b32facd3c046e422d106c53ca48a
1 # Copyright (C) 2011, Aleksey Lim
2 # See the README file for details, or visit http://0install.net.
4 import os
5 import sys
6 import json
7 import atexit
8 import thread
9 import socket
10 import urllib2
11 import urlparse
12 import threading
13 from select import select
14 from httplib import HTTPConnection
17 PAGE_SIZE = 4096
18 # Convenient way to set maximum number of workers and maximum number
19 # of simultaneous connections per domain at the same time
20 # 15 is a Nettiquete..
21 MAX_RUN_WORKERS_AND_POOL = 15
23 _queue = None
24 _http_proxy_host = None
25 _http_proxy_port = None
26 _resolve_cache = {}
29 def start(url, modification_time, fd, receiver):
30 """Queue url to be downloaded, writing the contents to fd.
31 When done, emit the signal "done(sender, status, reason, exception)" on receiver.
32 If modification_time is not None, and the resource hasn't been modified since then,
33 the status may be 304 (Not Modified) and the file is not downloaded."""
34 _init()
35 _queue.push({'requested_url': url,
36 'modification_time': modification_time,
37 'fd': fd,
38 'receiver': receiver,
42 def abort(url):
43 """Stop downloading url (or remove it from the queue if still pending)."""
44 _init()
45 _queue.abort(url)
48 def shutdown():
49 global _queue
50 if _queue is not None:
51 _queue.clear()
52 _queue = None
55 def _init():
56 global _queue, _http_proxy_host, _http_proxy_port
58 if _queue is not None:
59 return
61 proxy_detector = urllib2.ProxyHandler()
62 if 'http' in proxy_detector.proxies:
63 proxy = proxy_detector.proxies['http'].split(':') + [80]
64 _http_proxy_host = proxy[0]
65 _http_proxy_port = int(proxy[1])
67 _queue = _RequestsQueue()
68 atexit.register(shutdown)
71 class _RequestsQueue(object):
73 def __init__(self):
74 self._mutex = threading.Lock()
75 self._condition = threading.Condition(self._mutex)
76 self._workers = []
77 self._requests = {}
78 self._requests_in_process = {}
79 self._workders_in_wait = 0
80 self._pool = _ConnectionsPool()
81 self._exiting = False
83 def push(self, request):
84 worker = None
86 self._mutex.acquire()
87 try:
88 self._requests[request['requested_url']] = request
89 if self._workders_in_wait:
90 self._condition.notify()
91 if len(self._workers) < MAX_RUN_WORKERS_AND_POOL:
92 worker = _Worker(self._pop)
93 self._workers.append(worker)
94 finally:
95 self._mutex.release()
97 if worker is not None:
98 worker.start()
100 def abort(self, url):
101 self._mutex.acquire()
102 try:
103 if url in self._requests:
104 del self._requests[url]
105 if url in self._requests_in_process:
106 self._requests_in_process[url].close()
107 finally:
108 self._mutex.release()
110 def clear(self):
111 self._mutex.acquire()
112 try:
113 self._exiting = True
114 self._requests.clear()
115 for connection in self._requests_in_process.values():
116 connection.close()
117 self._condition.notify_all()
118 finally:
119 self._mutex.release()
121 def _pop(self, prev_connection):
122 self._mutex.acquire()
123 try:
124 if prev_connection is not None:
125 del self._requests_in_process[
126 prev_connection.requested['requested_url']]
127 self._pool.push(prev_connection)
129 if hasattr(prev_connection, 'redirect'):
130 location_url, request = prev_connection.redirect
131 delattr(prev_connection, 'redirect')
132 else:
133 while not self._requests:
134 if self._exiting:
135 return None, None, None
136 self._workders_in_wait += 1
137 self._condition.wait()
138 self._workders_in_wait -= 1
139 location_url, request = self._requests.popitem()
141 location_parts = urlparse.urlparse(location_url)
142 if _http_proxy_host and location_parts.scheme == 'http':
143 connection_url = (location_parts.scheme,
144 _http_proxy_host, _http_proxy_port)
145 else:
146 connection_url = (location_parts.scheme,
147 location_parts.hostname, location_parts.port or '80')
148 connection = self._pool.pop(connection_url)
149 finally:
150 self._mutex.release()
152 request['location_url'] = location_url
153 request['connection_url'] = connection_url
155 scheme, host, port = connection_url
156 if connection is None and scheme == 'http':
157 connection = HTTPConnection(_resolve(host), port)
159 if connection is None:
160 openner = _urllib_openner
161 else:
162 connection.requested = request
163 self._requests_in_process[request['requested_url']] = connection
164 openner = _http_openner
166 return request, connection, openner
169 class _Redirect(Exception):
171 def __init__(self, location):
172 self.location = location
175 class _ConnectionsPool(object):
177 def __init__(self):
178 self._connections = {}
180 def __iter__(self):
181 for i in self._connections.values():
182 yield i
184 def __getitem__(self, connection_url):
185 pool = self._connections.get(connection_url)
186 if pool is None:
187 pool = self._connections[connection_url] = []
188 return pool
190 def push(self, connection):
191 if connection is None:
192 return
193 pool = self[connection.requested['connection_url']]
194 # That should not happen because max number of workers is equal to
195 # max number of simultaneous connections per domain
196 assert len(pool) <= MAX_RUN_WORKERS_AND_POOL
197 pool.insert(0, connection)
199 def pop(self, connection_url):
200 pool = self[connection_url]
201 if pool:
202 connection = pool.pop()
203 if isinstance(connection, HTTPConnection) and \
204 connection.sock is not None and \
205 select([connection.sock], [], [], 0.0)[0]:
206 # Either data is buffered (bad), or the connection is dropped
207 connection.close()
208 return connection
211 class _Worker(threading.Thread):
213 def __init__(self, pop_request_cb):
214 threading.Thread.__init__(self)
215 # To not wait for the thread on process exit
216 self.daemon = True
217 self._pop_request = pop_request_cb
219 def run(self):
220 try:
221 connection = None
222 while True:
223 request, connection, openner = self._pop_request(connection)
224 if openner is None:
225 break
227 try:
228 status, reason = openner(connection, request)
229 exception = None
230 except _Redirect, redirect:
231 connection.redirect = (redirect.location, request)
232 continue
233 except Exception, error:
234 if isinstance(error, urllib2.HTTPError):
235 status = error.status
236 else:
237 status = None
238 reason = '%s %r' % (error, request)
239 __, ex, tb = sys.exc_info()
240 exception = (ex, tb)
242 request['receiver']._done_cb(status, reason, exception)
243 except KeyboardInterrupt, e:
244 thread.interrupt_main()
245 raise
246 except Exception, e:
247 thread.interrupt_main()
248 raise
251 def _http_openner(connection, request):
252 headers = {'connection': 'keep-alive'}
253 if request.get('modification_time'):
254 headers['If-Modified-Since'] = request['modification_time']
255 connection.request('GET', request['location_url'])
257 response = connection.getresponse()
258 try:
259 # Handle redirection
260 if response.status in [301, 302, 303, 307] and \
261 response.getheader('location'):
262 raise _Redirect(response.getheader('location'))
263 if response.status == 200:
264 _read_file(request, response)
265 return response.status, response.reason
266 finally:
267 response.close()
270 def _urllib_openner(connection, request):
271 url_request = urllib2.Request(request['location_url'])
272 if request['location_url'].startswith('http:') and \
273 request.get('modification_time'):
274 url_request.add_header(
275 'If-Modified-Since', request['modification_time'])
277 response = urllib2.urlopen(url_request)
279 _read_file(request, response)
281 return 200, None
284 def _read_file(request, response):
285 while True:
286 data = response.read(PAGE_SIZE)
287 if not data:
288 break
289 os.write(request['fd'], data) # XXX: return value ignored
292 def _resolve(hostname):
293 addr = _resolve_cache.get(hostname)
294 if not addr:
295 addrinfo = socket.getaddrinfo(hostname, 0)[0]
296 addr = _resolve_cache[hostname] = addrinfo[4][0]
297 return addr