Merged changes from master
[zeroinstall/solver.git] / zeroinstall / injector / scheduler.py
blob76fa9dcefb6285a0ac9068dced357cfb356ab7b4
1 """
2 Manage pools of connections so that we can limit the number of requests per site and reuse
3 connections.
4 @since: 1.6
5 """
7 # Copyright (C) 2011, Thomas Leonard
8 # See the README file for details, or visit http://0install.net.
10 import sys
12 if sys.version_info[0] > 2:
13 from urllib import parse as urlparse # Python 3
14 else:
15 import urlparse
17 from collections import defaultdict
18 import threading
20 from zeroinstall import gobject, logger
21 from zeroinstall.support import tasks
22 from zeroinstall.injector import download
24 default_port = {
25 'http': 80,
26 'https': 443,
29 class DownloadStep:
30 url = None
31 status = None
32 redirect = None
34 class DownloadScheduler:
35 """Assigns (and re-assigns on redirect) Downloads to Sites, allowing per-site limits and connection pooling.
36 @since: 1.6"""
37 def __init__(self):
38 self._sites = defaultdict(lambda: Site()) # (scheme://host:port) -> Site
40 @tasks.async
41 def download(self, dl):
42 # (changed if we get redirected)
43 current_url = dl.url
45 redirections_remaining = 10
47 original_exception = None
49 # Assign the Download to a Site based on its scheme, host and port. If the result is a redirect,
50 # reassign it to the appropriate new site. Note that proxy handling happens later; we want to group
51 # and limit by the target site, not treat everything as going to a single site (the proxy).
52 while True:
53 location_parts = urlparse.urlparse(current_url)
55 site_key = (location_parts.scheme,
56 location_parts.hostname,
57 location_parts.port or default_port.get(location_parts.scheme, None))
59 step = DownloadStep()
60 step.dl = dl
61 step.url = current_url
62 blocker = self._sites[site_key].download(step)
63 yield blocker
65 try:
66 tasks.check(blocker)
67 except download.DownloadError as ex:
68 if original_exception is None:
69 original_exception = ex
70 else:
71 logger.warn("%s (while trying mirror)", ex)
72 mirror_url = step.dl.get_next_mirror_url()
73 if mirror_url is None:
74 raise original_exception
76 # Try the mirror.
77 # There are actually two places where we try to use the mirror: this one
78 # looks to see if we have an exact copy of same file somewhere else. If this
79 # fails, Fetcher will also look for a different archive that would generate
80 # the required implementation.
81 logger.warn("%s: trying archive mirror at %s", ex, mirror_url)
82 step.redirect = mirror_url
83 redirections_remaining = 10
85 if not step.redirect:
86 break
88 current_url = step.redirect
90 if redirections_remaining == 0:
91 raise download.DownloadError("Too many redirections {url} -> {current}".format(
92 url = dl.url,
93 current = current_url))
94 redirections_remaining -= 1
95 # (else go around the loop again)
97 MAX_DOWNLOADS_PER_SITE = 5
99 def _spawn_thread(step):
100 from ._download_child import download_in_thread
102 thread_blocker = tasks.Blocker("wait for thread " + step.url)
103 def notify_done(status, ex = None, redirect = None):
104 step.status = status
105 step.redirect = redirect
106 def wake_up_main():
107 child.join()
108 thread_blocker.trigger(ex)
109 return False
110 gobject.idle_add(wake_up_main)
111 child = threading.Thread(target = lambda: download_in_thread(step.url, step.dl.tempfile, step.dl.modification_time, notify_done))
112 child.daemon = True
113 child.start()
115 return thread_blocker
117 class Site:
118 """Represents a service accepting download requests. All requests with the same scheme, host and port are
119 handled by the same Site object, allowing it to do connection pooling and queuing, although the current
120 implementation doesn't do either."""
121 def __init__(self):
122 self.queue = []
123 self.active = 0
125 @tasks.async
126 def download(self, step):
127 if self.active == MAX_DOWNLOADS_PER_SITE:
128 # Too busy to start a new download now. Queue this one and wait.
129 ticket = tasks.Blocker('queued download for ' + step.url)
130 self.queue.append(ticket)
131 yield ticket, step.dl._aborted
132 if step.dl._aborted.happened:
133 raise download.DownloadAborted()
135 # Start a new thread for the download
136 thread_blocker = _spawn_thread(step)
138 self.active += 1
140 # Wait for thread to complete download.
141 yield thread_blocker, step.dl._aborted
143 self.active -= 1
144 if self.active < MAX_DOWNLOADS_PER_SITE:
145 self.process_next() # Start next queued download, if any
147 if step.dl._aborted.happened:
148 # Don't wait for child to finish (might be stuck doing IO)
149 raise download.DownloadAborted()
151 tasks.check(thread_blocker)
153 if step.status == download.RESULT_REDIRECT:
154 assert step.redirect
155 return # DownloadScheduler will handle it
157 assert not step.redirect, step.redirect
159 step.dl._finish(step.status)
161 def process_next(self):
162 assert self.active < MAX_DOWNLOADS_PER_SITE
164 if self.queue:
165 nxt = self.queue.pop()
166 nxt.trigger()