Added sha256new algorithm
[zeroinstall.git] / zeroinstall / injector / scheduler.py
blobde57613fc6a14fbb2463c3a636c7bf7cf6137f26
1 """
2 Manage pools of connections so that we can limit the number of requests per site and reuse
3 connections.
4 @since: 1.6
5 """
7 # Copyright (C) 2011, Thomas Leonard
8 # See the README file for details, or visit http://0install.net.
10 import sys
12 if sys.version_info[0] > 2:
13 from urllib import parse as urlparse # Python 3
14 else:
15 import urlparse
17 from collections import defaultdict
18 import threading
20 from zeroinstall import gobject
21 from zeroinstall.support import tasks
22 from zeroinstall.injector import download
24 default_port = {
25 'http': 80,
26 'https': 443,
29 class DownloadStep:
30 url = None
31 status = None
32 redirect = None
34 class DownloadScheduler:
35 """Assigns (and re-assigns on redirect) Downloads to Sites, allowing per-site limits and connection pooling.
36 @since: 1.6"""
37 def __init__(self):
38 self._sites = defaultdict(lambda: Site()) # (scheme://host:port) -> Site
40 @tasks.async
41 def download(self, dl):
42 # (changed if we get redirected)
43 current_url = dl.url
45 redirections_remaining = 10
47 # Assign the Download to a Site based on its scheme, host and port. If the result is a redirect,
48 # reassign it to the appropriate new site. Note that proxy handling happens later; we want to group
49 # and limit by the target site, not treat everything as going to a single site (the proxy).
50 while True:
51 location_parts = urlparse.urlparse(current_url)
53 site_key = (location_parts.scheme,
54 location_parts.hostname,
55 location_parts.port or default_port.get(location_parts.scheme, None))
57 step = DownloadStep()
58 step.dl = dl
59 step.url = current_url
60 blocker = self._sites[site_key].download(step)
61 yield blocker
62 tasks.check(blocker)
64 if not step.redirect:
65 break
67 current_url = step.redirect
69 if redirections_remaining == 0:
70 raise download.DownloadError("Too many redirections {url} -> {current}".format(
71 url = dl.url,
72 current = current_url))
73 redirections_remaining -= 1
74 # (else go around the loop again)
76 MAX_DOWNLOADS_PER_SITE = 5
78 def _spawn_thread(step):
79 from ._download_child import download_in_thread
81 thread_blocker = tasks.Blocker("wait for thread " + step.url)
82 def notify_done(status, ex = None, redirect = None):
83 step.status = status
84 step.redirect = redirect
85 def wake_up_main():
86 child.join()
87 thread_blocker.trigger(ex)
88 return False
89 gobject.idle_add(wake_up_main)
90 child = threading.Thread(target = lambda: download_in_thread(step.url, step.dl.tempfile, step.dl.modification_time, notify_done))
91 child.daemon = True
92 child.start()
94 return thread_blocker
96 class Site:
97 """Represents a service accepting download requests. All requests with the same scheme, host and port are
98 handled by the same Site object, allowing it to do connection pooling and queuing, although the current
99 implementation doesn't do either."""
100 def __init__(self):
101 self.queue = []
102 self.active = 0
104 @tasks.async
105 def download(self, step):
106 if self.active == MAX_DOWNLOADS_PER_SITE:
107 # Too busy to start a new download now. Queue this one and wait.
108 ticket = tasks.Blocker('queued download for ' + step.url)
109 self.queue.append(ticket)
110 yield ticket, step.dl._aborted
111 if step.dl._aborted.happened:
112 raise download.DownloadAborted()
114 # Start a new thread for the download
115 thread_blocker = _spawn_thread(step)
117 self.active += 1
119 # Wait for thread to complete download.
120 yield thread_blocker, step.dl._aborted
122 self.active -= 1
123 if self.active < MAX_DOWNLOADS_PER_SITE:
124 self.process_next() # Start next queued download, if any
126 if step.dl._aborted.happened:
127 # Don't wait for child to finish (might be stuck doing IO)
128 raise download.DownloadAborted()
130 tasks.check(thread_blocker)
132 if step.status == download.RESULT_REDIRECT:
133 assert step.redirect
134 return # DownloadScheduler will handle it
136 assert not step.redirect, step.redirect
138 step.dl._finish(step.status)
140 def process_next(self):
141 assert self.active < MAX_DOWNLOADS_PER_SITE
143 if self.queue:
144 nxt = self.queue.pop()
145 nxt.trigger()