1 # Copyright (C) 2011, Aleksey Lim
2 # See the README file for details, or visit http://0install.net.
13 from select
import select
14 from httplib
import HTTPConnection
, HTTPException
18 # Convenient way to set maximum number of workers and maximum number
19 # of simultaneous connections per domain at the same time
20 # 15 is a Nettiquete..
21 MAX_RUN_WORKERS_AND_POOL
= 15
28 def start(url
, modification_time
, outfile
, receiver
):
30 _queue
.push({'requested_url': url
,
31 'modification_time': modification_time
,
37 def _split_hostport(host
):
39 j
= host
.rfind(']') # ipv6 addresses have [...]
42 port
= int(host
[i
+1:])
44 raise InvalidURL("nonnumeric port: '%s'" % host
[i
+1:]) # XXX
47 port
= self
.default_port
48 if host
and host
[0] == '[' and host
[-1] == ']':
60 if _queue
is not None:
66 global _queue
, _proxy_support
68 if _queue
is not None:
71 # XXX: only works if the HTTP handler is itself HTTP.
72 # We could subclass ProxyHandler and override open to avoid this.
73 _proxy_support
= urllib2
.ProxyHandler()
75 _queue
= _RequestsQueue()
76 atexit
.register(shutdown
)
79 class _RequestsQueue(object):
82 self
._mutex
= threading
.Lock()
83 self
._condition
= threading
.Condition(self
._mutex
)
86 self
._requests
_in
_process
= {}
87 self
._workders
_in
_wait
= 0
88 self
._pool
= _ConnectionsPool()
91 def push(self
, request
):
96 self
._requests
[request
['requested_url']] = request
97 if self
._workders
_in
_wait
:
98 self
._condition
.notify()
99 if len(self
._workers
) < MAX_RUN_WORKERS_AND_POOL
:
100 worker
= _Worker(self
._pop
)
101 self
._workers
.append(worker
)
103 self
._mutex
.release()
105 if worker
is not None:
108 def abort(self
, url
):
109 self
._mutex
.acquire()
111 if url
in self
._requests
:
112 del self
._requests
[url
]
113 if url
in self
._requests
_in
_process
:
114 self
._requests
_in
_process
[url
].close()
116 self
._mutex
.release()
119 self
._mutex
.acquire()
122 self
._requests
.clear()
123 for connection
in self
._requests
_in
_process
.values():
125 self
._condition
.notify_all()
127 self
._mutex
.release()
129 def _pop(self
, prev_connection
):
130 self
._mutex
.acquire()
132 if prev_connection
is not None:
133 del self
._requests
_in
_process
[
134 prev_connection
.requested
['requested_url']]
135 self
._pool
.push(prev_connection
)
137 if hasattr(prev_connection
, 'redirect'):
138 location_url
, request
= prev_connection
.redirect
139 delattr(prev_connection
, 'redirect')
141 while not self
._requests
:
143 return None, None, None
144 self
._workders
_in
_wait
+= 1
145 self
._condition
.wait()
146 self
._workders
_in
_wait
-= 1
147 location_url
, request
= self
._requests
.popitem()
149 req
= urllib2
.Request(location_url
)
151 # XXX: Is this the right place to handle proxies? The connections-per-site
152 # limit and pooling should apply to the final site (the proxy should not
153 # be treated as a site in it own right).
154 meth
= req
.get_type() + '_open'
155 new_request
= getattr(_proxy_support
, meth
)(req
)
159 # XXX: loses authn information
160 host
, port
= _split_hostport(req
.get_host())
161 connection_url
= (req
.get_type(), host
, port
)
163 connection
= self
._pool
.pop(connection_url
)
165 self
._mutex
.release()
167 request
['location_url'] = location_url
168 request
['connection_url'] = connection_url
170 scheme
, host
, port
= connection_url
171 if connection
is None and scheme
== 'http':
172 connection
= HTTPConnection(_resolve(host
), port
)
174 if connection
is None:
175 openner
= _urllib_openner
177 connection
.requested
= request
178 self
._requests
_in
_process
[request
['requested_url']] = connection
179 openner
= _http_openner
181 return request
, connection
, openner
184 class _Redirect(Exception):
186 def __init__(self
, location
):
187 self
.location
= location
190 class _ConnectionsPool(object):
193 self
._connections
= {}
196 for i
in self
._connections
.values():
199 def __getitem__(self
, connection_url
):
200 pool
= self
._connections
.get(connection_url
)
202 pool
= self
._connections
[connection_url
] = []
205 def push(self
, connection
):
206 if connection
is None:
208 pool
= self
[connection
.requested
['connection_url']]
209 # That should not happen because max number of workers is equal to
210 # max number of simultaneous connections per domain
211 assert len(pool
) <= MAX_RUN_WORKERS_AND_POOL
212 pool
.insert(0, connection
)
214 def pop(self
, connection_url
):
215 pool
= self
[connection_url
]
217 connection
= pool
.pop()
218 if isinstance(connection
, HTTPConnection
) and \
219 connection
.sock
is not None and \
220 select([connection
.sock
], [], [], 0.0)[0]:
221 # Either data is buffered (bad), or the connection is dropped
226 class _Worker(threading
.Thread
):
228 def __init__(self
, pop_request_cb
):
229 threading
.Thread
.__init
__(self
)
230 # To not wait for the thread on process exit
232 self
._pop
_request
= pop_request_cb
238 request
, connection
, openner
= self
._pop
_request
(connection
)
243 status
, reason
= openner(connection
, request
)
245 except _Redirect
, redirect
:
246 connection
.redirect
= (redirect
.location
, request
)
248 except (urllib2
.HTTPError
, urllib2
.URLError
, HTTPException
, socket
.error
) as ex
:
249 if isinstance(ex
, urllib2
.HTTPError
):
253 reason
= '%s %r' % (ex
, request
)
254 __
, ex
, tb
= sys
.exc_info()
255 import download
# XXX
256 from zeroinstall
import _
# XXX
257 exception
= (download
.DownloadError(_('Error downloading {url}: {ex}').format(url
= request
, ex
= ex
)), tb
)
258 except Exception, error
:
259 __
, ex
, tb
= sys
.exc_info()
262 request
['receiver'].emit('done', status
, reason
, exception
)
263 except KeyboardInterrupt, e
:
264 thread
.interrupt_main()
267 thread
.interrupt_main()
271 def _http_openner(connection
, request
):
272 headers
= {'connection': 'keep-alive'}
273 if request
.get('modification_time'):
274 headers
['If-Modified-Since'] = request
['modification_time']
275 connection
.request('GET', request
['location_url'])
277 response
= connection
.getresponse()
280 if response
.status
in [301, 302, 303, 307] and \
281 response
.getheader('location'):
282 raise _Redirect(response
.getheader('location'))
283 if response
.status
== 200:
284 _read_file(request
, response
)
285 return response
.status
, response
.reason
290 def _urllib_openner(connection
, request
):
291 url_request
= urllib2
.Request(request
['location_url'])
292 if request
['location_url'].startswith('http:') and \
293 request
.get('modification_time'):
294 url_request
.add_header(
295 'If-Modified-Since', request
['modification_time'])
297 response
= urllib2
.urlopen(url_request
)
299 _read_file(request
, response
)
306 def _read_file(request
, response
):
308 data
= response
.read(PAGE_SIZE
)
310 request
['outfile'].flush()
312 request
['outfile'].write(data
)
315 def _resolve(hostname
):
316 addr
= _resolve_cache
.get(hostname
)
318 addrinfo
= socket
.getaddrinfo(hostname
, 0)[0]
319 addr
= _resolve_cache
[hostname
] = addrinfo
[4][0]