Tell the user when a background update completes, not when it starts
[zeroinstall/solver.git] / zeroinstall / injector / scheduler.py
blobe9b4c0c6e4b9d8c2cb5f8697b01083312578e83a
1 """
2 Manage pools of connections so that we can limit the number of requests per site and reuse
3 connections.
4 @since: 1.6
5 """
7 # Copyright (C) 2011, Thomas Leonard
8 # See the README file for details, or visit http://0install.net.
10 import sys
12 if sys.version_info[0] > 2:
13 from urllib import parse as urlparse # Python 3
14 else:
15 import urlparse
17 from collections import defaultdict
18 import threading, gobject
20 from zeroinstall.support import tasks
21 from zeroinstall.injector import download
23 default_port = {
24 'http': 80,
25 'https': 443,
28 class DownloadStep:
29 url = None
30 status = None
31 redirect = None
33 class DownloadScheduler:
34 """Assigns (and re-assigns on redirect) Downloads to Sites, allowing per-site limits and connection pooling.
35 @since: 1.6"""
36 def __init__(self):
37 self._sites = defaultdict(lambda: Site()) # (scheme://host:port) -> Site
39 @tasks.async
40 def download(self, dl):
41 # (changed if we get redirected)
42 current_url = dl.url
44 redirections_remaining = 10
46 # Assign the Download to a Site based on its scheme, host and port. If the result is a redirect,
47 # reassign it to the appropriate new site. Note that proxy handling happens later; we want to group
48 # and limit by the target site, not treat everything as going to a single site (the proxy).
49 while True:
50 location_parts = urlparse.urlparse(current_url)
52 site_key = (location_parts.scheme,
53 location_parts.hostname,
54 location_parts.port or default_port.get(location_parts.scheme, None))
56 step = DownloadStep()
57 step.dl = dl
58 step.url = current_url
59 blocker = self._sites[site_key].download(step)
60 yield blocker
61 tasks.check(blocker)
63 if not step.redirect:
64 break
66 current_url = step.redirect
68 if redirections_remaining == 0:
69 raise download.DownloadError("Too many redirections {url} -> {current}".format(
70 url = dl.url,
71 current = current_url))
72 redirections_remaining -= 1
73 # (else go around the loop again)
75 MAX_DOWNLOADS_PER_SITE = 5
77 def _spawn_thread(step):
78 from ._download_child import download_in_thread
80 thread_blocker = tasks.Blocker("wait for thread " + step.url)
81 def notify_done(status, ex = None, redirect = None):
82 step.status = status
83 step.redirect = redirect
84 def wake_up_main():
85 child.join()
86 thread_blocker.trigger(ex)
87 return False
88 gobject.idle_add(wake_up_main)
89 child = threading.Thread(target = lambda: download_in_thread(step.url, step.dl.tempfile, step.dl.modification_time, notify_done))
90 child.daemon = True
91 child.start()
93 return thread_blocker
95 class Site:
96 """Represents a service accepting download requests. All requests with the same scheme, host and port are
97 handled by the same Site object, allowing it to do connection pooling and queuing, although the current
98 implementation doesn't do either."""
99 def __init__(self):
100 self.queue = []
101 self.active = 0
103 @tasks.async
104 def download(self, step):
105 if self.active == MAX_DOWNLOADS_PER_SITE:
106 # Too busy to start a new download now. Queue this one and wait.
107 ticket = tasks.Blocker('queued download for ' + step.url)
108 self.queue.append(ticket)
109 yield ticket, step.dl._aborted
110 if step.dl._aborted.happened:
111 raise download.DownloadAborted()
113 # Start a new thread for the download
114 thread_blocker = _spawn_thread(step)
116 self.active += 1
118 # Wait for thread to complete download.
119 yield thread_blocker, step.dl._aborted
121 self.active -= 1
122 if self.active < MAX_DOWNLOADS_PER_SITE:
123 self.process_next() # Start next queued download, if any
125 if step.dl._aborted.happened:
126 # Don't wait for child to finish (might be stuck doing IO)
127 raise download.DownloadAborted()
129 tasks.check(thread_blocker)
131 if step.status == download.RESULT_REDIRECT:
132 assert step.redirect
133 return # DownloadScheduler will handle it
135 assert not step.redirect, step.redirect
137 step.dl._finish(step.status)
139 def process_next(self):
140 assert self.active < MAX_DOWNLOADS_PER_SITE
142 if self.queue:
143 nxt = self.queue.pop()
144 nxt.trigger()