Generalise mirror fallback code.
[zeroinstall/zeroinstall-mseaborn.git] / zeroinstall / injector / fetch.py
blob34df27ed995f04058543baf3a97270133f0e6d23
1 """
2 Downloads feeds, keys, packages and icons.
3 """
5 # Copyright (C) 2008, Thomas Leonard
6 # See the README file for details, or visit http://0install.net.
8 import os, sys
9 from logging import info, debug, warn
11 from zeroinstall.support import tasks, basedir
12 from zeroinstall.injector.namespaces import XMLNS_IFACE, config_site
13 from zeroinstall.injector.model import DownloadSource, Recipe, SafeException, network_offline, escape
14 from zeroinstall.injector.iface_cache import PendingFeed
16 def _escape_slashes(path):
17 return path.replace('/', '#')
19 def _get_feed_dir(feed):
20 """The algorithm from 0mirror."""
21 if '#' in feed:
22 raise SafeException("Invalid URL '%s'" % feed)
23 scheme, rest = feed.split('://', 1)
24 domain, rest = rest.split('/', 1)
25 for x in [scheme, domain, rest]:
26 if not x or x.startswith(','):
27 raise SafeException("Invalid URL '%s'" % feed)
28 return os.path.join('feeds', scheme, domain, _escape_slashes(rest))
30 class Fetcher(object):
31 """Downloads and stores various things.
32 @ivar handler: handler to use for user-interaction
33 @type handler: L{handler.Handler}
34 @ivar feed_mirror: the base URL of a mirror site for keys and feeds
35 @type feed_mirror: str
36 """
37 __slots__ = ['handler', 'feed_mirror']
39 def __init__(self, handler):
40 self.handler = handler
41 self.feed_mirror = "http://roscidus.com/0mirror"
43 @tasks.async
44 def cook(self, required_digest, recipe, stores, force = False, impl_hint = None):
45 """Follow a Recipe.
46 @param impl_hint: the Implementation this is for (if any) as a hint for the GUI
47 @see: L{download_impl} uses this method when appropriate"""
48 # Maybe we're taking this metaphor too far?
50 # Start downloading all the ingredients.
51 downloads = {} # Downloads that are not yet successful
52 streams = {} # Streams collected from successful downloads
54 # Start a download for each ingredient
55 blockers = []
56 for step in recipe.steps:
57 blocker, stream = self.download_archive(step, force = force, impl_hint = impl_hint)
58 assert stream
59 blockers.append(blocker)
60 streams[step] = stream
62 while blockers:
63 yield blockers
64 tasks.check(blockers)
65 blockers = [b for b in blockers if not b.happened]
67 from zeroinstall.zerostore import unpack
69 # Create an empty directory for the new implementation
70 store = stores.stores[0]
71 tmpdir = store.get_tmp_dir_for(required_digest)
72 try:
73 # Unpack each of the downloaded archives into it in turn
74 for step in recipe.steps:
75 stream = streams[step]
76 stream.seek(0)
77 unpack.unpack_archive_over(step.url, stream, tmpdir, step.extract)
78 # Check that the result is correct and store it in the cache
79 store.check_manifest_and_rename(required_digest, tmpdir)
80 tmpdir = None
81 finally:
82 # If unpacking fails, remove the temporary directory
83 if tmpdir is not None:
84 from zeroinstall import support
85 support.ro_rmtree(tmpdir)
87 def get_feed_mirror(self, url):
88 """Return the URL of a mirror for this feed."""
89 return '%s/%s/latest.xml' % (self.feed_mirror, _get_feed_dir(url))
91 def download_with_fallback(self, dl, fallback_url, force, results):
92 """Wait for dl to finish successfully and return dl.stream.
93 If it takes too long, or fails, try downloading from fallback_url.
94 Note this is a generator.
95 @param dl: the first download to try
96 @param fallback_url: URL of fallback download
97 @param results: an empty list to contain the results
98 @return: generates a series of blocker to be yielded. When exhausted, results contains (successful_stream, is_from_fallback)"""
99 dl_stream = dl.tempfile
100 try:
101 yield dl.downloaded
102 tasks.check(dl.downloaded)
103 results += [dl_stream, False]
104 return
105 except Exception, ex:
106 ex = sys.exc_info()
108 warn("Download failed (will try mirror): %s", ex[1])
109 info("Trying mirror URL %s", fallback_url)
110 mirror_dl = self.handler.get_download(fallback_url, force = force, hint = dl.url)
111 mirror_stream = mirror_dl.tempfile
112 yield mirror_dl.downloaded
113 try:
114 tasks.check(mirror_dl.downloaded)
115 except Exception, mirror_ex:
116 # Mirror didn't work either. Log the mirror problem and report the original error.
117 info("Download from mirror failed: %s", mirror_ex)
118 raise ex[0], ex[1], ex[2]
120 results += [mirror_stream, True]
121 return
123 def download_and_import_feed(self, feed_url, iface_cache, force = False):
124 """Download the feed, download any required keys, confirm trust if needed and import.
125 @param feed_url: the feed to be downloaded
126 @type feed_url: str
127 @param iface_cache: cache in which to store the feed
128 @type iface_cache: L{iface_cache.IfaceCache}
129 @param force: whether to abort and restart an existing download"""
131 debug("download_and_import_feed %s (force = %d)", feed_url, force)
132 assert not feed_url.startswith('/')
134 dl = self.handler.get_download(feed_url, force = force, hint = feed_url)
136 @tasks.named_async("fetch_feed " + feed_url)
137 def fetch_feed():
138 results = []
139 for x in self.download_with_fallback(dl, self.get_feed_mirror(feed_url), force = force, results = results):
140 yield x
141 stream, using_mirror = results
143 # TODO: ignore old timestamps if using_mirror
145 pending = PendingFeed(feed_url, stream)
146 iface_cache.add_pending(pending)
148 keys_downloaded = tasks.Task(pending.download_keys(self.handler, feed_hint = feed_url), "download keys for " + feed_url)
149 yield keys_downloaded.finished
150 tasks.check(keys_downloaded.finished)
152 iface = iface_cache.get_interface(pending.url)
153 if not iface_cache.update_interface_if_trusted(iface, pending.sigs, pending.new_xml):
154 blocker = self.handler.confirm_trust_keys(iface, pending.sigs, pending.new_xml)
155 if blocker:
156 yield blocker
157 tasks.check(blocker)
158 if not iface_cache.update_interface_if_trusted(iface, pending.sigs, pending.new_xml):
159 raise SafeException("No signing keys trusted; not importing")
161 return fetch_feed()
163 def download_impl(self, impl, retrieval_method, stores, force = False):
164 """Download an implementation.
165 @param impl: the selected implementation
166 @type impl: L{model.ZeroInstallImplementation}
167 @param retrieval_method: a way of getting the implementation (e.g. an Archive or a Recipe)
168 @type retrieval_method: L{model.RetrievalMethod}
169 @param stores: where to store the downloaded implementation
170 @type stores: L{zerostore.Stores}
171 @param force: whether to abort and restart an existing download
172 @rtype: L{tasks.Blocker}"""
173 assert impl
174 assert retrieval_method
176 from zeroinstall.zerostore import manifest
177 alg = impl.id.split('=', 1)[0]
178 if alg not in manifest.algorithms:
179 raise SafeException("Unknown digest algorithm '%s' for '%s' version %s" %
180 (alg, impl.feed.get_name(), impl.get_version()))
182 @tasks.async
183 def download_impl():
184 if isinstance(retrieval_method, DownloadSource):
185 blocker, stream = self.download_archive(retrieval_method, force = force, impl_hint = impl)
186 yield blocker
187 tasks.check(blocker)
189 stream.seek(0)
190 self._add_to_cache(stores, retrieval_method, stream)
191 elif isinstance(retrieval_method, Recipe):
192 blocker = self.cook(impl.id, retrieval_method, stores, force, impl_hint = impl)
193 yield blocker
194 tasks.check(blocker)
195 else:
196 raise Exception("Unknown download type for '%s'" % retrieval_method)
198 self.handler.impl_added_to_store(impl)
199 return download_impl()
201 def _add_to_cache(self, stores, retrieval_method, stream):
202 assert isinstance(retrieval_method, DownloadSource)
203 required_digest = retrieval_method.implementation.id
204 url = retrieval_method.url
205 stores.add_archive_to_cache(required_digest, stream, retrieval_method.url, retrieval_method.extract,
206 type = retrieval_method.type, start_offset = retrieval_method.start_offset or 0)
208 def download_archive(self, download_source, force = False, impl_hint = None):
209 """Fetch an archive. You should normally call L{download_impl}
210 instead, since it handles other kinds of retrieval method too."""
211 from zeroinstall.zerostore import unpack
212 mime_type = download_source.type
213 if not mime_type:
214 mime_type = unpack.type_from_url(download_source.url)
215 if not mime_type:
216 raise SafeException("No 'type' attribute on archive, and I can't guess from the name (%s)" % download_source.url)
217 unpack.check_type_ok(mime_type)
218 dl = self.handler.get_download(download_source.url, force = force, hint = impl_hint)
219 dl.expected_size = download_source.size + (download_source.start_offset or 0)
220 return (dl.downloaded, dl.tempfile)
222 def download_icon(self, interface, force = False):
223 """Download an icon for this interface and add it to the
224 icon cache. If the interface has no icon or we are offline, do nothing.
225 @return: the task doing the import, or None
226 @rtype: L{tasks.Task}"""
227 debug("download_icon %s (force = %d)", interface, force)
229 # Find a suitable icon to download
230 for icon in interface.get_metadata(XMLNS_IFACE, 'icon'):
231 type = icon.getAttribute('type')
232 if type != 'image/png':
233 debug('Skipping non-PNG icon')
234 continue
235 source = icon.getAttribute('href')
236 if source:
237 break
238 warn('Missing "href" attribute on <icon> in %s', interface)
239 else:
240 info('No PNG icons found in %s', interface)
241 return
243 dl = self.handler.get_download(source, force = force, hint = interface)
245 @tasks.async
246 def download_and_add_icon():
247 stream = dl.tempfile
248 yield dl.downloaded
249 try:
250 tasks.check(dl.downloaded)
251 stream.seek(0)
253 import shutil
254 icons_cache = basedir.save_cache_path(config_site, 'interface_icons')
255 icon_file = file(os.path.join(icons_cache, escape(interface.uri)), 'w')
256 shutil.copyfileobj(stream, icon_file)
257 except Exception, ex:
258 self.handler.report_error(ex)
260 return download_and_add_icon()
262 def download_impls(self, implementations, stores):
263 """Download the given implementations, choosing a suitable retrieval method for each."""
264 blockers = []
266 to_download = []
267 for impl in implementations:
268 debug("start_downloading_impls: for %s get %s", impl.feed, impl)
269 source = self.get_best_source(impl)
270 if not source:
271 raise SafeException("Implementation " + impl.id + " of "
272 "interface " + impl.feed.get_name() + " cannot be "
273 "downloaded (no download locations given in "
274 "interface!)")
275 to_download.append((impl, source))
277 for impl, source in to_download:
278 blockers.append(self.download_impl(impl, source, stores))
280 if not blockers:
281 return None
283 @tasks.async
284 def download_impls(blockers):
285 # Record the first error log the rest
286 error = []
287 def dl_error(ex, tb = None):
288 if error:
289 self.handler.report_error(ex)
290 else:
291 error.append(ex)
292 while blockers:
293 yield blockers
294 tasks.check(blockers, dl_error)
296 blockers = [b for b in blockers if not b.happened]
297 if error:
298 raise error[0]
300 return download_impls(blockers)
302 def get_best_source(self, impl):
303 """Return the best download source for this implementation.
304 @rtype: L{model.RetrievalMethod}"""
305 if impl.download_sources:
306 return impl.download_sources[0]
307 return None