Only process each <package-implementation> once, even if several distributions match...
[zeroinstall/solver.git] / zeroinstall / injector / scheduler.py
bloba2f2c4bee4a9dc4beea3c422d1f97b58eb1e025d
1 """
2 Manage pools of connections so that we can limit the number of requests per site and reuse
3 connections.
4 @since: 1.6
5 """
7 # Copyright (C) 2011, Thomas Leonard
8 # See the README file for details, or visit http://0install.net.
10 import urlparse
11 from collections import defaultdict
12 import threading, gobject
14 from zeroinstall.support import tasks
15 from zeroinstall.injector import download
17 default_port = {
18 'http': 80,
19 'https': 443,
22 class DownloadStep:
23 url = None
24 status = None
25 redirect = None
27 class DownloadScheduler:
28 """Assigns (and re-assigns on redirect) Downloads to Sites, allowing per-site limits and connection pooling.
29 @since: 1.6"""
30 def __init__(self):
31 self._sites = defaultdict(lambda: Site()) # (scheme://host:port) -> Site
33 @tasks.async
34 def download(self, dl):
35 # (changed if we get redirected)
36 current_url = dl.url
38 redirections_remaining = 10
40 # Assign the Download to a Site based on its scheme, host and port. If the result is a redirect,
41 # reassign it to the appropriate new site. Note that proxy handling happens later; we want to group
42 # and limit by the target site, not treat everything as going to a single site (the proxy).
43 while True:
44 location_parts = urlparse.urlparse(current_url)
46 site_key = (location_parts.scheme,
47 location_parts.hostname,
48 location_parts.port or default_port.get(location_parts.scheme, None))
50 step = DownloadStep()
51 step.dl = dl
52 step.url = current_url
53 blocker = self._sites[site_key].download(step)
54 yield blocker
55 tasks.check(blocker)
57 if not step.redirect:
58 break
60 current_url = step.redirect
62 if redirections_remaining == 0:
63 raise download.DownloadError("Too many redirections {url} -> {current}".format(
64 url = dl.url,
65 current = current_url))
66 redirections_remaining -= 1
67 # (else go around the loop again)
69 MAX_DOWNLOADS_PER_SITE = 5
71 def _spawn_thread(step):
72 from ._download_child import download_in_thread
74 thread_blocker = tasks.Blocker("wait for thread " + step.url)
75 def notify_done(status, ex = None, redirect = None):
76 step.status = status
77 step.redirect = redirect
78 def wake_up_main():
79 child.join()
80 thread_blocker.trigger(ex)
81 return False
82 gobject.idle_add(wake_up_main)
83 child = threading.Thread(target = lambda: download_in_thread(step.url, step.dl.tempfile, step.dl.modification_time, notify_done))
84 child.daemon = True
85 child.start()
87 return thread_blocker
89 class Site:
90 """Represents a service accepting download requests. All requests with the same scheme, host and port are
91 handled by the same Site object, allowing it to do connection pooling and queuing, although the current
92 implementation doesn't do either."""
93 def __init__(self):
94 self.queue = []
95 self.active = 0
97 @tasks.async
98 def download(self, step):
99 if self.active == MAX_DOWNLOADS_PER_SITE:
100 # Too busy to start a new download now. Queue this one and wait.
101 ticket = tasks.Blocker('queued download for ' + step.url)
102 self.queue.append(ticket)
103 yield ticket, step.dl._aborted
104 if step.dl._aborted.happened:
105 raise download.DownloadAborted()
107 # Start a new thread for the download
108 thread_blocker = _spawn_thread(step)
110 self.active += 1
112 # Wait for thread to complete download.
113 yield thread_blocker, step.dl._aborted
115 self.active -= 1
116 if self.active < MAX_DOWNLOADS_PER_SITE:
117 self.process_next() # Start next queued download, if any
119 if step.dl._aborted.happened:
120 # Don't wait for child to finish (might be stuck doing IO)
121 raise download.DownloadAborted()
123 tasks.check(thread_blocker)
125 if step.status == download.RESULT_REDIRECT:
126 assert step.redirect
127 return # DownloadScheduler will handle it
129 assert not step.redirect, step.redirect
131 step.dl._finish(step.status)
133 def process_next(self):
134 assert self.active < MAX_DOWNLOADS_PER_SITE
136 if self.queue:
137 nxt = self.queue.pop()
138 nxt.trigger()