2 Manage pools of connections so that we can limit the number of requests per site and reuse
7 # Copyright (C) 2011, Thomas Leonard
8 # See the README file for details, or visit http://0install.net.
12 if sys
.version_info
[0] > 2:
13 from urllib
import parse
as urlparse
# Python 3
17 from collections
import defaultdict
20 from zeroinstall
import gobject
, logger
21 from zeroinstall
.support
import tasks
22 from zeroinstall
.injector
import download
34 class DownloadScheduler
:
35 """Assigns (and re-assigns on redirect) Downloads to Sites, allowing per-site limits and connection pooling.
38 self
._sites
= defaultdict(lambda: Site()) # (scheme://host:port) -> Site
41 def download(self
, dl
):
42 # (changed if we get redirected)
45 redirections_remaining
= 10
47 original_exception
= None
49 # Assign the Download to a Site based on its scheme, host and port. If the result is a redirect,
50 # reassign it to the appropriate new site. Note that proxy handling happens later; we want to group
51 # and limit by the target site, not treat everything as going to a single site (the proxy).
53 location_parts
= urlparse
.urlparse(current_url
)
55 site_key
= (location_parts
.scheme
,
56 location_parts
.hostname
,
57 location_parts
.port
or default_port
.get(location_parts
.scheme
, None))
61 step
.url
= current_url
62 blocker
= self
._sites
[site_key
].download(step
)
67 except download
.DownloadError
as ex
:
68 if original_exception
is None:
69 original_exception
= ex
71 logger
.warn("%s (while trying mirror)", ex
)
72 mirror_url
= step
.dl
.get_next_mirror_url()
73 if mirror_url
is None:
74 raise original_exception
77 # There are actually two places where we try to use the mirror: this one
78 # looks to see if we have an exact copy of same file somewhere else. If this
79 # fails, Fetcher will also look for a different archive that would generate
80 # the required implementation.
81 logger
.warn("%s: trying archive mirror at %s", ex
, mirror_url
)
82 step
.redirect
= mirror_url
83 redirections_remaining
= 10
88 current_url
= step
.redirect
90 if redirections_remaining
== 0:
91 raise download
.DownloadError("Too many redirections {url} -> {current}".format(
93 current
= current_url
))
94 redirections_remaining
-= 1
95 # (else go around the loop again)
97 MAX_DOWNLOADS_PER_SITE
= 5
99 def _spawn_thread(step
):
100 from ._download
_child
import download_in_thread
102 thread_blocker
= tasks
.Blocker("wait for thread " + step
.url
)
103 def notify_done(status
, ex
= None, redirect
= None):
105 step
.redirect
= redirect
108 thread_blocker
.trigger(ex
)
110 gobject
.idle_add(wake_up_main
)
111 child
= threading
.Thread(target
= lambda: download_in_thread(step
.url
, step
.dl
.tempfile
, step
.dl
.modification_time
, notify_done
))
115 return thread_blocker
118 """Represents a service accepting download requests. All requests with the same scheme, host and port are
119 handled by the same Site object, allowing it to do connection pooling and queuing, although the current
120 implementation doesn't do either."""
126 def download(self
, step
):
127 if self
.active
== MAX_DOWNLOADS_PER_SITE
:
128 # Too busy to start a new download now. Queue this one and wait.
129 ticket
= tasks
.Blocker('queued download for ' + step
.url
)
130 self
.queue
.append(ticket
)
131 yield ticket
, step
.dl
._aborted
132 if step
.dl
._aborted
.happened
:
133 raise download
.DownloadAborted()
135 # Start a new thread for the download
136 thread_blocker
= _spawn_thread(step
)
140 # Wait for thread to complete download.
141 yield thread_blocker
, step
.dl
._aborted
144 if self
.active
< MAX_DOWNLOADS_PER_SITE
:
145 self
.process_next() # Start next queued download, if any
147 if step
.dl
._aborted
.happened
:
148 # Don't wait for child to finish (might be stuck doing IO)
149 raise download
.DownloadAborted()
151 tasks
.check(thread_blocker
)
153 if step
.status
== download
.RESULT_REDIRECT
:
155 return # DownloadScheduler will handle it
157 assert not step
.redirect
, step
.redirect
159 step
.dl
._finish
(step
.status
)
161 def process_next(self
):
162 assert self
.active
< MAX_DOWNLOADS_PER_SITE
165 nxt
= self
.queue
.pop()