1 # Copyright (C) 2011, Aleksey Lim
2 # See the README file for details, or visit http://0install.net.
13 from select
import select
14 from httplib
import HTTPConnection
18 # Convenient way to set maximum number of workers and maximum number
19 # of simultaneous connections per domain at the same time
20 # 15 is a Nettiquete..
21 MAX_RUN_WORKERS_AND_POOL
= 15
24 _http_proxy_host
= None
25 _http_proxy_port
= None
29 def start(url
, modification_time
, fd
, receiver
):
30 """Queue url to be downloaded, writing the contents to fd.
31 When done, emit the signal "done(sender, status, reason, exception)" on receiver.
32 If modification_time is not None, and the resource hasn't been modified since then,
33 the status may be 304 (Not Modified) and the file is not downloaded."""
35 _queue
.push({'requested_url': url
,
36 'modification_time': modification_time
,
43 """Stop downloading url (or remove it from the queue if still pending)."""
50 if _queue
is not None:
56 global _queue
, _http_proxy_host
, _http_proxy_port
58 if _queue
is not None:
61 proxy_detector
= urllib2
.ProxyHandler()
62 if 'http' in proxy_detector
.proxies
:
63 proxy
= proxy_detector
.proxies
['http'].split(':') + [80]
64 _http_proxy_host
= proxy
[0]
65 _http_proxy_port
= int(proxy
[1])
67 _queue
= _RequestsQueue()
68 atexit
.register(shutdown
)
71 class _RequestsQueue(object):
74 self
._mutex
= threading
.Lock()
75 self
._condition
= threading
.Condition(self
._mutex
)
78 self
._requests
_in
_process
= {}
79 self
._workders
_in
_wait
= 0
80 self
._pool
= _ConnectionsPool()
83 def push(self
, request
):
88 self
._requests
[request
['requested_url']] = request
89 if self
._workders
_in
_wait
:
90 self
._condition
.notify()
91 if len(self
._workers
) < MAX_RUN_WORKERS_AND_POOL
:
92 worker
= _Worker(self
._pop
)
93 self
._workers
.append(worker
)
97 if worker
is not None:
100 def abort(self
, url
):
101 self
._mutex
.acquire()
103 if url
in self
._requests
:
104 del self
._requests
[url
]
105 if url
in self
._requests
_in
_process
:
106 self
._requests
_in
_process
[url
].close()
108 self
._mutex
.release()
111 self
._mutex
.acquire()
114 self
._requests
.clear()
115 for connection
in self
._requests
_in
_process
.values():
117 self
._condition
.notify_all()
119 self
._mutex
.release()
121 def _pop(self
, prev_connection
):
122 self
._mutex
.acquire()
124 if prev_connection
is not None:
125 del self
._requests
_in
_process
[
126 prev_connection
.requested
['requested_url']]
127 self
._pool
.push(prev_connection
)
129 if hasattr(prev_connection
, 'redirect'):
130 location_url
, request
= prev_connection
.redirect
131 delattr(prev_connection
, 'redirect')
133 while not self
._requests
:
135 return None, None, None
136 self
._workders
_in
_wait
+= 1
137 self
._condition
.wait()
138 self
._workders
_in
_wait
-= 1
139 location_url
, request
= self
._requests
.popitem()
141 location_parts
= urlparse
.urlparse(location_url
)
142 if _http_proxy_host
and location_parts
.scheme
== 'http':
143 connection_url
= (location_parts
.scheme
,
144 _http_proxy_host
, _http_proxy_port
)
146 connection_url
= (location_parts
.scheme
,
147 location_parts
.hostname
, location_parts
.port
or '80')
148 connection
= self
._pool
.pop(connection_url
)
150 self
._mutex
.release()
152 request
['location_url'] = location_url
153 request
['connection_url'] = connection_url
155 scheme
, host
, port
= connection_url
156 if connection
is None and scheme
== 'http':
157 connection
= HTTPConnection(_resolve(host
), port
)
159 if connection
is None:
160 openner
= _urllib_openner
162 connection
.requested
= request
163 self
._requests
_in
_process
[request
['requested_url']] = connection
164 openner
= _http_openner
166 return request
, connection
, openner
169 class _Redirect(Exception):
171 def __init__(self
, location
):
172 self
.location
= location
175 class _ConnectionsPool(object):
178 self
._connections
= {}
181 for i
in self
._connections
.values():
184 def __getitem__(self
, connection_url
):
185 pool
= self
._connections
.get(connection_url
)
187 pool
= self
._connections
[connection_url
] = []
190 def push(self
, connection
):
191 if connection
is None:
193 pool
= self
[connection
.requested
['connection_url']]
194 # That should not happen because max number of workers is equal to
195 # max number of simultaneous connections per domain
196 assert len(pool
) <= MAX_RUN_WORKERS_AND_POOL
197 pool
.insert(0, connection
)
199 def pop(self
, connection_url
):
200 pool
= self
[connection_url
]
202 connection
= pool
.pop()
203 if isinstance(connection
, HTTPConnection
) and \
204 connection
.sock
is not None and \
205 select([connection
.sock
], [], [], 0.0)[0]:
206 # Either data is buffered (bad), or the connection is dropped
211 class _Worker(threading
.Thread
):
213 def __init__(self
, pop_request_cb
):
214 threading
.Thread
.__init
__(self
)
215 # To not wait for the thread on process exit
217 self
._pop
_request
= pop_request_cb
223 request
, connection
, openner
= self
._pop
_request
(connection
)
228 status
, reason
= openner(connection
, request
)
230 except _Redirect
, redirect
:
231 connection
.redirect
= (redirect
.location
, request
)
233 except Exception, error
:
234 if isinstance(error
, urllib2
.HTTPError
):
235 status
= error
.status
238 reason
= '%s %r' % (error
, request
)
239 __
, ex
, tb
= sys
.exc_info()
242 request
['receiver']._done
_cb
(status
, reason
, exception
)
243 except KeyboardInterrupt, e
:
244 thread
.interrupt_main()
247 thread
.interrupt_main()
251 def _http_openner(connection
, request
):
252 headers
= {'connection': 'keep-alive'}
253 if request
.get('modification_time'):
254 headers
['If-Modified-Since'] = request
['modification_time']
255 connection
.request('GET', request
['location_url'])
257 response
= connection
.getresponse()
260 if response
.status
in [301, 302, 303, 307] and \
261 response
.getheader('location'):
262 raise _Redirect(response
.getheader('location'))
263 if response
.status
== 200:
264 _read_file(request
, response
)
265 return response
.status
, response
.reason
270 def _urllib_openner(connection
, request
):
271 url_request
= urllib2
.Request(request
['location_url'])
272 if request
['location_url'].startswith('http:') and \
273 request
.get('modification_time'):
274 url_request
.add_header(
275 'If-Modified-Since', request
['modification_time'])
277 response
= urllib2
.urlopen(url_request
)
279 _read_file(request
, response
)
284 def _read_file(request
, response
):
286 data
= response
.read(PAGE_SIZE
)
289 os
.write(request
['fd'], data
) # XXX: return value ignored
292 def _resolve(hostname
):
293 addr
= _resolve_cache
.get(hostname
)
295 addrinfo
= socket
.getaddrinfo(hostname
, 0)[0]
296 addr
= _resolve_cache
[hostname
] = addrinfo
[4][0]