1 # Copyright (C) 2011, Aleksey Lim
2 # See the README file for details, or visit http://0install.net.
13 from select
import select
14 from httplib
import HTTPConnection
18 # Convenient way to set maximum number of workers and maximum number
19 # of simultaneous connections per domain at the same time
20 # 15 is a Nettiquete..
21 MAX_RUN_WORKERS_AND_POOL
= 15
28 def start(url
, modification_time
, fd
, receiver
):
29 """Queue url to be downloaded, writing the contents to fd.
30 When done, emit the signal "done(sender, status, reason, exception)" on receiver.
31 If modification_time is not None, and the resource hasn't been modified since then,
32 the status may be 304 (Not Modified) and the file is not downloaded."""
34 _queue
.push({'requested_url': url
,
35 'modification_time': modification_time
,
42 """Stop downloading url (or remove it from the queue if still pending)."""
49 if _queue
is not None:
55 global _queue
, _http_proxy
57 if _queue
is not None:
60 proxy_detector
= urllib2
.ProxyHandler()
61 if 'http' in proxy_detector
.proxies
:
63 proxy
= proxy_detector
.proxies
['http']
64 if not proxy
.startswith('http://'):
65 proxy
= 'http://' + proxy
66 _http_proxy
= urlparse
.urlparse(proxy
)
68 _queue
= _RequestsQueue()
69 atexit
.register(shutdown
)
72 class _RequestsQueue(object):
75 self
._mutex
= threading
.Lock()
76 self
._condition
= threading
.Condition(self
._mutex
)
79 self
._requests
_in
_process
= {}
80 self
._workders
_in
_wait
= 0
81 self
._pool
= _ConnectionsPool()
84 def push(self
, request
):
89 self
._requests
[request
['requested_url']] = request
90 if self
._workders
_in
_wait
:
91 self
._condition
.notify()
92 if len(self
._workers
) < MAX_RUN_WORKERS_AND_POOL
:
93 worker
= _Worker(self
._pop
)
94 self
._workers
.append(worker
)
98 if worker
is not None:
101 def abort(self
, url
):
102 self
._mutex
.acquire()
104 if url
in self
._requests
:
105 del self
._requests
[url
]
106 if url
in self
._requests
_in
_process
:
107 self
._requests
_in
_process
[url
].close()
109 self
._mutex
.release()
112 self
._mutex
.acquire()
115 self
._requests
.clear()
116 for connection
in self
._requests
_in
_process
.values():
118 self
._condition
.notify_all()
120 self
._mutex
.release()
122 def _pop(self
, prev_connection
):
123 self
._mutex
.acquire()
125 if prev_connection
is not None:
126 del self
._requests
_in
_process
[
127 prev_connection
.requested
['requested_url']]
128 self
._pool
.push(prev_connection
)
130 if hasattr(prev_connection
, 'redirect'):
131 location_url
, request
= prev_connection
.redirect
132 delattr(prev_connection
, 'redirect')
134 while not self
._requests
:
136 return None, None, None
137 self
._workders
_in
_wait
+= 1
138 self
._condition
.wait()
139 self
._workders
_in
_wait
-= 1
140 location_url
, request
= self
._requests
.popitem()
142 location_parts
= urlparse
.urlparse(location_url
)
143 if _http_proxy
.hostname
and location_parts
.scheme
== 'http':
144 connection_url
= (location_parts
.scheme
,
145 _http_proxy
.hostname
, _http_proxy
.port
)
147 connection_url
= (location_parts
.scheme
,
148 location_parts
.hostname
, location_parts
.port
or '80')
149 connection
= self
._pool
.pop(connection_url
)
151 self
._mutex
.release()
153 request
['location_url'] = location_url
154 request
['connection_url'] = connection_url
156 scheme
, host
, port
= connection_url
157 if connection
is None and scheme
== 'http':
158 connection
= HTTPConnection(_resolve(host
), port
)
160 if connection
is None:
161 openner
= _urllib_openner
163 connection
.requested
= request
164 self
._requests
_in
_process
[request
['requested_url']] = connection
165 openner
= _http_openner
167 return request
, connection
, openner
170 class _Redirect(Exception):
172 def __init__(self
, location
):
173 self
.location
= location
176 class _ConnectionsPool(object):
179 self
._connections
= {}
182 for i
in self
._connections
.values():
185 def __getitem__(self
, connection_url
):
186 pool
= self
._connections
.get(connection_url
)
188 pool
= self
._connections
[connection_url
] = []
191 def push(self
, connection
):
192 if connection
is None:
194 pool
= self
[connection
.requested
['connection_url']]
195 # That should not happen because max number of workers is equal to
196 # max number of simultaneous connections per domain
197 assert len(pool
) <= MAX_RUN_WORKERS_AND_POOL
198 pool
.insert(0, connection
)
200 def pop(self
, connection_url
):
201 pool
= self
[connection_url
]
203 connection
= pool
.pop()
204 if isinstance(connection
, HTTPConnection
) and \
205 connection
.sock
is not None and \
206 select([connection
.sock
], [], [], 0.0)[0]:
207 # Either data is buffered (bad), or the connection is dropped
212 class _Worker(threading
.Thread
):
214 def __init__(self
, pop_request_cb
):
215 threading
.Thread
.__init
__(self
)
216 # To not wait for the thread on process exit
218 self
._pop
_request
= pop_request_cb
224 request
, connection
, openner
= self
._pop
_request
(connection
)
229 status
, reason
= openner(connection
, request
)
231 except _Redirect
, redirect
:
232 connection
.redirect
= (redirect
.location
, request
)
234 except Exception, error
:
235 if isinstance(error
, urllib2
.HTTPError
):
236 status
= error
.status
239 reason
= '%s %r' % (error
, request
)
240 __
, ex
, tb
= sys
.exc_info()
243 request
['receiver']._done
_cb
(status
, reason
, exception
)
244 except KeyboardInterrupt, e
:
245 thread
.interrupt_main()
248 thread
.interrupt_main()
252 def _http_openner(connection
, request
):
253 headers
= {'connection': 'keep-alive'}
254 if request
.get('modification_time'):
255 headers
['If-Modified-Since'] = request
['modification_time']
256 connection
.request('GET', request
['location_url'])
258 response
= connection
.getresponse()
261 if response
.status
in [301, 302, 303, 307] and \
262 response
.getheader('location'):
263 raise _Redirect(response
.getheader('location'))
264 if response
.status
== 200:
265 _read_file(request
, response
)
266 return response
.status
, response
.reason
271 def _urllib_openner(connection
, request
):
272 url_request
= urllib2
.Request(request
['location_url'])
273 if request
['location_url'].startswith('http:') and \
274 request
.get('modification_time'):
275 url_request
.add_header(
276 'If-Modified-Since', request
['modification_time'])
278 response
= urllib2
.urlopen(url_request
)
280 _read_file(request
, response
)
285 def _read_file(request
, response
):
287 data
= response
.read(PAGE_SIZE
)
290 os
.write(request
['fd'], data
) # XXX: return value ignored
293 def _resolve(hostname
):
294 addr
= _resolve_cache
.get(hostname
)
296 addrinfo
= socket
.getaddrinfo(hostname
, 0)[0]
297 addr
= _resolve_cache
[hostname
] = addrinfo
[4][0]