2 Manage pools of connections so that we can limit the number of requests per site and reuse
7 # Copyright (C) 2011, Thomas Leonard
8 # See the README file for details, or visit http://0install.net.
11 from collections
import defaultdict
12 import threading
, gobject
14 from zeroinstall
.support
import tasks
15 from zeroinstall
.injector
import download
27 class DownloadScheduler
:
28 """Assigns (and re-assigns on redirect) Downloads to Sites, allowing per-site limits and connection pooling.
31 self
._sites
= defaultdict(lambda: Site()) # (scheme://host:port) -> Site
34 def download(self
, dl
):
35 # (changed if we get redirected)
38 redirections_remaining
= 10
40 # Assign the Download to a Site based on its scheme, host and port. If the result is a redirect,
41 # reassign it to the appropriate new site. Note that proxy handling happens later; we want to group
42 # and limit by the target site, not treat everything as going to a single site (the proxy).
44 location_parts
= urlparse
.urlparse(current_url
)
46 site_key
= (location_parts
.scheme
,
47 location_parts
.hostname
,
48 location_parts
.port
or default_port
.get(location_parts
.scheme
, None))
52 step
.url
= current_url
53 blocker
= self
._sites
[site_key
].download(step
)
60 current_url
= step
.redirect
62 if redirections_remaining
== 0:
63 raise download
.DownloadError("Too many redirections {url} -> {current}".format(
65 current
= current_url
))
66 redirections_remaining
-= 1
67 # (else go around the loop again)
70 """Represents a service accepting download requests. All requests with the same scheme, host and port are
71 handled by the same Site object, allowing it to do connection pooling and queuing, although the current
72 implementation doesn't do either."""
74 def download(self
, step
):
75 from ._download
_child
import download_in_thread
77 thread_blocker
= tasks
.Blocker("wait for thread " + step
.url
)
78 def notify_done(status
, ex
= None, redirect
= None):
80 step
.redirect
= redirect
82 thread_blocker
.trigger(ex
)
84 gobject
.idle_add(wake_up_main
)
85 child
= threading
.Thread(target
= lambda: download_in_thread(step
.url
, step
.dl
.tempfile
, step
.dl
.modification_time
, notify_done
))
89 # Wait for child to complete download.
90 yield thread_blocker
, step
.dl
._aborted
92 if step
.dl
._aborted
.happened
:
93 # Don't wait for child to finish (might be stuck doing IO)
94 raise download
.DownloadAborted()
96 # Download is complete...
99 tasks
.check(thread_blocker
)
101 if step
.status
== download
.RESULT_REDIRECT
:
103 return # DownloadScheduler will handle it
105 assert not step
.redirect
, step
.redirect
107 step
.dl
._finish
(step
.status
)