Auto-throw exceptions when resuming tasks
[zeroinstall.git] / zeroinstall / injector / fetch.py
blobfd4642d0d7afa4cc0e475651acf55da30dee7297
1 """
2 Downloads feeds, keys, packages and icons.
3 """
5 # Copyright (C) 2009, Thomas Leonard
6 # See the README file for details, or visit http://0install.net.
8 from zeroinstall import _, NeedDownload
9 import os
10 from logging import info, debug, warn
12 from zeroinstall.support import tasks, basedir
13 from zeroinstall.injector.namespaces import XMLNS_IFACE, config_site
14 from zeroinstall.injector.model import DownloadSource, Recipe, SafeException, escape, DistributionSource
15 from zeroinstall.injector.iface_cache import PendingFeed, ReplayAttack
16 from zeroinstall.injector.handler import NoTrustedKeys
17 from zeroinstall.injector import download
19 def _escape_slashes(path):
20 return path.replace('/', '%23')
22 def _get_feed_dir(feed):
23 """The algorithm from 0mirror."""
24 if '#' in feed:
25 raise SafeException(_("Invalid URL '%s'") % feed)
26 scheme, rest = feed.split('://', 1)
27 assert '/' in rest, "Missing / in %s" % feed
28 domain, rest = rest.split('/', 1)
29 for x in [scheme, domain, rest]:
30 if not x or x.startswith(','):
31 raise SafeException(_("Invalid URL '%s'") % feed)
32 return os.path.join('feeds', scheme, domain, _escape_slashes(rest))
34 class KeyInfoFetcher:
35 """Fetches information about a GPG key from a key-info server.
36 See L{Fetcher.fetch_key_info} for details.
37 @since: 0.42
39 Example:
41 >>> kf = KeyInfoFetcher(fetcher, 'https://server', fingerprint)
42 >>> while True:
43 print kf.info
44 if kf.blocker is None: break
45 print kf.status
46 yield kf.blocker
47 """
48 def __init__(self, fetcher, server, fingerprint):
49 self.fingerprint = fingerprint
50 self.info = []
51 self.blocker = None
53 if server is None: return
55 self.status = _('Fetching key information from %s...') % server
57 dl = fetcher.download_url(server + '/key/' + fingerprint)
59 from xml.dom import minidom
61 @tasks.async
62 def fetch_key_info():
63 try:
64 tempfile = dl.tempfile
65 try:
66 yield dl.downloaded
67 finally:
68 self.blocker = None
69 tempfile.seek(0)
70 doc = minidom.parse(tempfile)
71 if doc.documentElement.localName != 'key-lookup':
72 raise SafeException(_('Expected <key-lookup>, not <%s>') % doc.documentElement.localName)
73 self.info += doc.documentElement.childNodes
74 except Exception as ex:
75 doc = minidom.parseString('<item vote="bad"/>')
76 root = doc.documentElement
77 root.appendChild(doc.createTextNode(_('Error getting key information: %s') % ex))
78 self.info.append(root)
80 self.blocker = fetch_key_info()
82 class Fetcher(object):
83 """Downloads and stores various things.
84 @ivar config: used to get handler, iface_cache and stores
85 @type config: L{config.Config}
86 @ivar key_info: caches information about GPG keys
87 @type key_info: {str: L{KeyInfoFetcher}}
88 """
89 __slots__ = ['config', 'key_info', '_scheduler']
91 def __init__(self, config):
92 assert config.handler, "API change!"
93 self.config = config
94 self.key_info = {}
95 self._scheduler = None
97 @property
98 def handler(self):
99 return self.config.handler
101 @property
102 def scheduler(self):
103 if self._scheduler is None:
104 from . import scheduler
105 self._scheduler = scheduler.DownloadScheduler()
106 return self._scheduler
108 @tasks.async
109 def cook(self, required_digest, recipe, stores, force = False, impl_hint = None):
110 """Follow a Recipe.
111 @param impl_hint: the Implementation this is for (if any) as a hint for the GUI
112 @see: L{download_impl} uses this method when appropriate"""
113 # Maybe we're taking this metaphor too far?
115 # Start downloading all the ingredients.
116 streams = {} # Streams collected from successful downloads
118 # Start a download for each ingredient
119 blockers = []
120 for step in recipe.steps:
121 blocker, stream = self.download_archive(step, force = force, impl_hint = impl_hint)
122 assert stream
123 blockers.append(blocker)
124 streams[step] = stream
126 while blockers:
127 yield blockers
128 blockers = [b for b in blockers if not b.happened]
130 from zeroinstall.zerostore import unpack
132 # Create an empty directory for the new implementation
133 store = stores.stores[0]
134 tmpdir = store.get_tmp_dir_for(required_digest)
135 try:
136 # Unpack each of the downloaded archives into it in turn
137 for step in recipe.steps:
138 stream = streams[step]
139 stream.seek(0)
140 unpack.unpack_archive_over(step.url, stream, tmpdir,
141 extract = step.extract,
142 type = step.type,
143 start_offset = step.start_offset or 0)
144 # Check that the result is correct and store it in the cache
145 store.check_manifest_and_rename(required_digest, tmpdir)
146 tmpdir = None
147 finally:
148 # If unpacking fails, remove the temporary directory
149 if tmpdir is not None:
150 from zeroinstall import support
151 support.ro_rmtree(tmpdir)
153 def get_feed_mirror(self, url):
154 """Return the URL of a mirror for this feed."""
155 if self.config.feed_mirror is None:
156 return None
157 import urlparse
158 if urlparse.urlparse(url).hostname == 'localhost':
159 return None
160 return '%s/%s/latest.xml' % (self.config.feed_mirror, _get_feed_dir(url))
162 @tasks.async
163 def get_packagekit_feed(self, feed_url):
164 """Send a query to PackageKit (if available) for information about this package.
165 On success, the result is added to iface_cache.
167 assert feed_url.startswith('distribution:'), feed_url
168 master_feed = self.config.iface_cache.get_feed(feed_url.split(':', 1)[1])
169 if master_feed:
170 fetch = self.config.iface_cache.distro.fetch_candidates(master_feed)
171 if fetch:
172 yield fetch
174 # Force feed to be regenerated with the new information
175 self.config.iface_cache.get_feed(feed_url, force = True)
177 def download_and_import_feed(self, feed_url, iface_cache = None):
178 """Download the feed, download any required keys, confirm trust if needed and import.
179 @param feed_url: the feed to be downloaded
180 @type feed_url: str
181 @param iface_cache: (deprecated)"""
182 from .download import DownloadAborted
184 assert iface_cache is None or iface_cache is self.config.iface_cache
186 self.config.iface_cache.mark_as_checking(feed_url)
188 debug(_("download_and_import_feed %(url)s"), {'url': feed_url})
189 assert not os.path.isabs(feed_url)
191 if feed_url.startswith('distribution:'):
192 return self.get_packagekit_feed(feed_url)
194 primary = self._download_and_import_feed(feed_url, use_mirror = False)
196 @tasks.named_async("monitor feed downloads for " + feed_url)
197 def wait_for_downloads(primary):
198 # Download just the upstream feed, unless it takes too long...
199 timeout = tasks.TimeoutBlocker(5, 'Mirror timeout') # 5 seconds
201 try:
202 yield primary, timeout
203 if primary.happened:
204 return # OK, primary succeeded!
205 # OK, maybe it's just being slow...
206 info("Feed download from %s is taking a long time.", feed_url)
207 primary_ex = None
208 except NoTrustedKeys as ex:
209 raise # Don't bother trying the mirror if we have a trust problem
210 except ReplayAttack as ex:
211 raise # Don't bother trying the mirror if we have a replay attack
212 except DownloadAborted as ex:
213 raise # Don't bother trying the mirror if the user cancelled
214 except SafeException as ex:
215 # Primary failed
216 primary = None
217 primary_ex = ex
218 warn(_("Feed download from %(url)s failed: %(exception)s"), {'url': feed_url, 'exception': ex})
220 # Start downloading from mirror...
221 mirror = self._download_and_import_feed(feed_url, use_mirror = True)
223 # Wait until both mirror and primary tasks are complete...
224 while True:
225 blockers = filter(None, [primary, mirror])
226 if not blockers:
227 break
228 try:
229 yield blockers
230 except:
231 pass
232 if primary:
233 try:
234 tasks.check(primary)
235 if primary.happened:
236 primary = None
237 # No point carrying on with the mirror once the primary has succeeded
238 if mirror:
239 info(_("Primary feed download succeeded; aborting mirror download for %s") % feed_url)
240 mirror.dl.abort()
241 except SafeException as ex:
242 primary = None
243 primary_ex = ex
244 info(_("Feed download from %(url)s failed; still trying mirror: %(exception)s"), {'url': feed_url, 'exception': ex})
246 if mirror:
247 try:
248 tasks.check(mirror)
249 if mirror.happened:
250 mirror = None
251 if primary_ex:
252 # We already warned; no need to raise an exception too,
253 # as the mirror download succeeded.
254 primary_ex = None
255 except ReplayAttack as ex:
256 info(_("Version from mirror is older than cached version; ignoring it: %s"), ex)
257 mirror = None
258 primary_ex = None
259 except SafeException as ex:
260 info(_("Mirror download failed: %s"), ex)
261 mirror = None
263 if primary_ex:
264 raise primary_ex
266 return wait_for_downloads(primary)
268 def _download_and_import_feed(self, feed_url, use_mirror):
269 """Download and import a feed.
270 @param use_mirror: False to use primary location; True to use mirror."""
271 if use_mirror:
272 url = self.get_feed_mirror(feed_url)
273 if url is None: return None
274 info(_("Trying mirror server for feed %s") % feed_url)
275 else:
276 url = feed_url
278 dl = self.download_url(url, hint = feed_url)
279 stream = dl.tempfile
281 @tasks.named_async("fetch_feed " + url)
282 def fetch_feed():
283 yield dl.downloaded
285 pending = PendingFeed(feed_url, stream)
287 if use_mirror:
288 # If we got the feed from a mirror, get the key from there too
289 key_mirror = self.config.feed_mirror + '/keys/'
290 else:
291 key_mirror = None
293 keys_downloaded = tasks.Task(pending.download_keys(self, feed_hint = feed_url, key_mirror = key_mirror), _("download keys for %s") % feed_url)
294 yield keys_downloaded.finished
296 if not self.config.iface_cache.update_feed_if_trusted(pending.url, pending.sigs, pending.new_xml):
297 blocker = self.config.trust_mgr.confirm_keys(pending)
298 if blocker:
299 yield blocker
300 if not self.config.iface_cache.update_feed_if_trusted(pending.url, pending.sigs, pending.new_xml):
301 raise NoTrustedKeys(_("No signing keys trusted; not importing"))
303 task = fetch_feed()
304 task.dl = dl
305 return task
307 def fetch_key_info(self, fingerprint):
308 try:
309 return self.key_info[fingerprint]
310 except KeyError:
311 self.key_info[fingerprint] = key_info = KeyInfoFetcher(self,
312 self.config.key_info_server, fingerprint)
313 return key_info
315 def download_impl(self, impl, retrieval_method, stores, force = False):
316 """Download an implementation.
317 @param impl: the selected implementation
318 @type impl: L{model.ZeroInstallImplementation}
319 @param retrieval_method: a way of getting the implementation (e.g. an Archive or a Recipe)
320 @type retrieval_method: L{model.RetrievalMethod}
321 @param stores: where to store the downloaded implementation
322 @type stores: L{zerostore.Stores}
323 @param force: whether to abort and restart an existing download
324 @rtype: L{tasks.Blocker}"""
325 assert impl
326 assert retrieval_method
328 if isinstance(retrieval_method, DistributionSource):
329 return retrieval_method.install(self.handler)
331 from zeroinstall.zerostore import manifest
332 best = None
333 for digest in impl.digests:
334 alg_name = digest.split('=', 1)[0]
335 alg = manifest.algorithms.get(alg_name, None)
336 if alg and (best is None or best.rating < alg.rating):
337 best = alg
338 required_digest = digest
340 if best is None:
341 if not impl.digests:
342 raise SafeException(_("No <manifest-digest> given for '%(implementation)s' version %(version)s") %
343 {'implementation': impl.feed.get_name(), 'version': impl.get_version()})
344 raise SafeException(_("Unknown digest algorithms '%(algorithms)s' for '%(implementation)s' version %(version)s") %
345 {'algorithms': impl.digests, 'implementation': impl.feed.get_name(), 'version': impl.get_version()})
347 @tasks.async
348 def download_impl():
349 if isinstance(retrieval_method, DownloadSource):
350 blocker, stream = self.download_archive(retrieval_method, force = force, impl_hint = impl)
351 yield blocker
353 stream.seek(0)
354 self._add_to_cache(required_digest, stores, retrieval_method, stream)
355 elif isinstance(retrieval_method, Recipe):
356 blocker = self.cook(required_digest, retrieval_method, stores, force, impl_hint = impl)
357 yield blocker
358 else:
359 raise Exception(_("Unknown download type for '%s'") % retrieval_method)
361 self.handler.impl_added_to_store(impl)
362 return download_impl()
364 def _add_to_cache(self, required_digest, stores, retrieval_method, stream):
365 assert isinstance(retrieval_method, DownloadSource)
366 stores.add_archive_to_cache(required_digest, stream, retrieval_method.url, retrieval_method.extract,
367 type = retrieval_method.type, start_offset = retrieval_method.start_offset or 0)
369 # (force is deprecated and ignored)
370 def download_archive(self, download_source, force = False, impl_hint = None):
371 """Fetch an archive. You should normally call L{download_impl}
372 instead, since it handles other kinds of retrieval method too."""
373 from zeroinstall.zerostore import unpack
375 url = download_source.url
376 if not (url.startswith('http:') or url.startswith('https:') or url.startswith('ftp:')):
377 raise SafeException(_("Unknown scheme in download URL '%s'") % url)
379 mime_type = download_source.type
380 if not mime_type:
381 mime_type = unpack.type_from_url(download_source.url)
382 if not mime_type:
383 raise SafeException(_("No 'type' attribute on archive, and I can't guess from the name (%s)") % download_source.url)
384 unpack.check_type_ok(mime_type)
385 dl = self.download_url(download_source.url, hint = impl_hint)
386 dl.expected_size = download_source.size + (download_source.start_offset or 0)
387 return (dl.downloaded, dl.tempfile)
389 # (force is deprecated and ignored)
390 def download_icon(self, interface, force = False):
391 """Download an icon for this interface and add it to the
392 icon cache. If the interface has no icon do nothing.
393 @return: the task doing the import, or None
394 @rtype: L{tasks.Task}"""
395 debug("download_icon %(interface)s", {'interface': interface})
397 modification_time = None
398 existing_icon = self.config.iface_cache.get_icon_path(interface)
399 if existing_icon:
400 file_mtime = os.stat(existing_icon).st_mtime
401 from email.utils import formatdate
402 modification_time = formatdate(timeval = file_mtime, localtime = False, usegmt = True)
404 # Find a suitable icon to download
405 for icon in interface.get_metadata(XMLNS_IFACE, 'icon'):
406 type = icon.getAttribute('type')
407 if type != 'image/png':
408 debug(_('Skipping non-PNG icon'))
409 continue
410 source = icon.getAttribute('href')
411 if source:
412 break
413 warn(_('Missing "href" attribute on <icon> in %s'), interface)
414 else:
415 info(_('No PNG icons found in %s'), interface)
416 return
418 dl = self.download_url(source, hint = interface, modification_time = modification_time)
420 @tasks.async
421 def download_and_add_icon():
422 stream = dl.tempfile
423 try:
424 yield dl.downloaded
425 if dl.unmodified: return
426 stream.seek(0)
428 import shutil
429 icons_cache = basedir.save_cache_path(config_site, 'interface_icons')
430 icon_file = open(os.path.join(icons_cache, escape(interface.uri)), 'w')
431 shutil.copyfileobj(stream, icon_file)
432 except Exception as ex:
433 self.handler.report_error(ex)
435 return download_and_add_icon()
437 def download_impls(self, implementations, stores):
438 """Download the given implementations, choosing a suitable retrieval method for each.
439 If any of the retrieval methods are DistributionSources and
440 need confirmation, handler.confirm is called to check that the
441 installation should proceed.
443 unsafe_impls = []
445 to_download = []
446 for impl in implementations:
447 debug(_("start_downloading_impls: for %(feed)s get %(implementation)s"), {'feed': impl.feed, 'implementation': impl})
448 source = self.get_best_source(impl)
449 if not source:
450 raise SafeException(_("Implementation %(implementation_id)s of interface %(interface)s"
451 " cannot be downloaded (no download locations given in "
452 "interface!)") % {'implementation_id': impl.id, 'interface': impl.feed.get_name()})
453 to_download.append((impl, source))
455 if isinstance(source, DistributionSource) and source.needs_confirmation:
456 unsafe_impls.append(source.package_id)
458 @tasks.async
459 def download_impls():
460 if unsafe_impls:
461 confirm = self.handler.confirm_install(_('The following components need to be installed using native packages. '
462 'These come from your distribution, and should therefore be trustworthy, but they also '
463 'run with extra privileges. In particular, installing them may run extra services on your '
464 'computer or affect other users. You may be asked to enter a password to confirm. The '
465 'packages are:\n\n') + ('\n'.join('- ' + x for x in unsafe_impls)))
466 yield confirm
468 blockers = []
470 for impl, source in to_download:
471 blockers.append(self.download_impl(impl, source, stores))
473 # Record the first error log the rest
474 error = []
475 def dl_error(ex, tb = None):
476 if error:
477 self.handler.report_error(ex)
478 else:
479 error.append((ex, tb))
480 while blockers:
481 try:
482 yield blockers
483 except:
484 tasks.check(blockers, dl_error)
486 blockers = [b for b in blockers if not b.happened]
487 if error:
488 from zeroinstall import support
489 support.raise_with_traceback(*error[0])
491 if not to_download:
492 return None
494 return download_impls()
496 def get_best_source(self, impl):
497 """Return the best download source for this implementation.
498 @rtype: L{model.RetrievalMethod}"""
499 if impl.download_sources:
500 return impl.download_sources[0]
501 return None
503 def download_url(self, url, hint = None, modification_time = None, expected_size = None):
504 """The most low-level method here; just download a raw URL.
505 @param url: the location to download from
506 @param hint: user-defined data to store on the Download (e.g. used by the GUI)
507 @param modification_time: don't download unless newer than this
508 @rtype: L{download.Download}
509 @since: 1.5
511 if self.handler.dry_run:
512 raise NeedDownload(url)
514 dl = download.Download(url, hint = hint, modification_time = modification_time, expected_size = expected_size)
515 self.handler.monitor_download(dl)
516 dl.downloaded = self.scheduler.download(dl)
517 return dl