Allow mirroring both archives and implementations
[zeroinstall.git] / zeroinstall / injector / fetch.py
blob2c637d7b8b519f705edd2138dfcaa84b7bd70938
1 """
2 Downloads feeds, keys, packages and icons.
3 """
5 # Copyright (C) 2009, Thomas Leonard
6 # See the README file for details, or visit http://0install.net.
8 from zeroinstall import _, NeedDownload
9 import os, sys
10 from logging import info, debug, warn
12 from zeroinstall import support
13 from zeroinstall.support import tasks, basedir, portable_rename
14 from zeroinstall.injector.namespaces import XMLNS_IFACE, config_site
15 from zeroinstall.injector import model
16 from zeroinstall.injector.model import DownloadSource, Recipe, SafeException, escape, DistributionSource
17 from zeroinstall.injector.iface_cache import PendingFeed, ReplayAttack
18 from zeroinstall.injector.handler import NoTrustedKeys
19 from zeroinstall.injector import download
21 def _escape_slashes(path):
22 return path.replace('/', '%23')
24 def _get_feed_dir(feed):
25 """The algorithm from 0mirror."""
26 if '#' in feed:
27 raise SafeException(_("Invalid URL '%s'") % feed)
28 scheme, rest = feed.split('://', 1)
29 assert '/' in rest, "Missing / in %s" % feed
30 domain, rest = rest.split('/', 1)
31 for x in [scheme, domain, rest]:
32 if not x or x.startswith('.'):
33 raise SafeException(_("Invalid URL '%s'") % feed)
34 return '/'.join(['feeds', scheme, domain, _escape_slashes(rest)])
36 class KeyInfoFetcher:
37 """Fetches information about a GPG key from a key-info server.
38 See L{Fetcher.fetch_key_info} for details.
39 @since: 0.42
41 Example:
43 >>> kf = KeyInfoFetcher(fetcher, 'https://server', fingerprint)
44 >>> while True:
45 print kf.info
46 if kf.blocker is None: break
47 print kf.status
48 yield kf.blocker
49 """
50 def __init__(self, fetcher, server, fingerprint):
51 self.fingerprint = fingerprint
52 self.info = []
53 self.blocker = None
55 if server is None: return
57 self.status = _('Fetching key information from %s...') % server
59 dl = fetcher.download_url(server + '/key/' + fingerprint)
61 from xml.dom import minidom
63 @tasks.async
64 def fetch_key_info():
65 try:
66 tempfile = dl.tempfile
67 yield dl.downloaded
68 self.blocker = None
69 tasks.check(dl.downloaded)
70 tempfile.seek(0)
71 doc = minidom.parse(tempfile)
72 if doc.documentElement.localName != 'key-lookup':
73 raise SafeException(_('Expected <key-lookup>, not <%s>') % doc.documentElement.localName)
74 self.info += doc.documentElement.childNodes
75 except Exception as ex:
76 doc = minidom.parseString('<item vote="bad"/>')
77 root = doc.documentElement
78 root.appendChild(doc.createTextNode(_('Error getting key information: %s') % ex))
79 self.info.append(root)
81 self.blocker = fetch_key_info()
83 class Fetcher(object):
84 """Downloads and stores various things.
85 @ivar config: used to get handler, iface_cache and stores
86 @type config: L{config.Config}
87 @ivar key_info: caches information about GPG keys
88 @type key_info: {str: L{KeyInfoFetcher}}
89 """
90 __slots__ = ['config', 'key_info', '_scheduler', 'external_store']
92 def __init__(self, config):
93 assert config.handler, "API change!"
94 self.config = config
95 self.key_info = {}
96 self._scheduler = None
97 self.external_store = os.environ.get('ZEROINSTALL_EXTERNAL_STORE')
99 @property
100 def handler(self):
101 return self.config.handler
103 @property
104 def scheduler(self):
105 if self._scheduler is None:
106 from . import scheduler
107 self._scheduler = scheduler.DownloadScheduler()
108 return self._scheduler
110 # (force is deprecated and ignored)
111 @tasks.async
112 def cook(self, required_digest, recipe, stores, force = False, impl_hint = None):
113 """Follow a Recipe.
114 @param impl_hint: the Implementation this is for (if any) as a hint for the GUI
115 @see: L{download_impl} uses this method when appropriate"""
116 # Maybe we're taking this metaphor too far?
118 # Start a download for each ingredient
119 blockers = []
120 steps = []
121 for stepdata in recipe.steps:
122 cls = StepRunner.class_for(stepdata)
123 step = cls(stepdata, impl_hint=impl_hint)
124 step.prepare(self, blockers)
125 steps.append(step)
127 while blockers:
128 yield blockers
129 tasks.check(blockers)
130 blockers = [b for b in blockers if not b.happened]
133 if self.external_store:
134 # Note: external_store will not yet work with non-<archive> steps.
135 streams = [step.stream for step in steps]
136 self._add_to_external_store(required_digest, recipe.steps, streams)
137 else:
138 # Create an empty directory for the new implementation
139 store = stores.stores[0]
140 tmpdir = store.get_tmp_dir_for(required_digest)
141 try:
142 # Unpack each of the downloaded archives into it in turn
143 for step in steps:
144 step.apply(tmpdir)
145 # Check that the result is correct and store it in the cache
146 store.check_manifest_and_rename(required_digest, tmpdir)
147 tmpdir = None
148 finally:
149 # If unpacking fails, remove the temporary directory
150 if tmpdir is not None:
151 support.ro_rmtree(tmpdir)
153 def _get_mirror_url(self, feed_url, resource):
154 """Return the URL of a mirror for this feed."""
155 if self.config.mirror is None:
156 return None
157 if support.urlparse(feed_url).hostname == 'localhost':
158 return None
159 return '%s/%s/%s' % (self.config.mirror, _get_feed_dir(feed_url), resource)
161 def get_feed_mirror(self, url):
162 """Return the URL of a mirror for this feed."""
163 return self._get_mirror_url(url, 'latest.xml')
165 def _get_archive_mirror(self, source):
166 if self.config.mirror is None:
167 return None
168 if support.urlparse(source.url).hostname == 'localhost':
169 return None
170 if sys.version_info[0] > 2:
171 from urllib.parse import quote
172 else:
173 from urllib import quote
174 return '{mirror}/archive/{archive}'.format(
175 mirror = self.config.mirror,
176 archive = quote(source.url.replace('/', '#'), safe = ''))
178 def _get_impl_mirror(self, impl):
179 return self._get_mirror_url(impl.feed.url, 'impl/' + _escape_slashes(impl.id))
181 @tasks.async
182 def get_packagekit_feed(self, feed_url):
183 """Send a query to PackageKit (if available) for information about this package.
184 On success, the result is added to iface_cache.
186 assert feed_url.startswith('distribution:'), feed_url
187 master_feed = self.config.iface_cache.get_feed(feed_url.split(':', 1)[1])
188 if master_feed:
189 fetch = self.config.iface_cache.distro.fetch_candidates(master_feed)
190 if fetch:
191 yield fetch
192 tasks.check(fetch)
194 # Force feed to be regenerated with the new information
195 self.config.iface_cache.get_feed(feed_url, force = True)
197 def download_and_import_feed(self, feed_url, iface_cache = None):
198 """Download the feed, download any required keys, confirm trust if needed and import.
199 @param feed_url: the feed to be downloaded
200 @type feed_url: str
201 @param iface_cache: (deprecated)"""
202 from .download import DownloadAborted
204 assert iface_cache is None or iface_cache is self.config.iface_cache
206 self.config.iface_cache.mark_as_checking(feed_url)
208 debug(_("download_and_import_feed %(url)s"), {'url': feed_url})
209 assert not os.path.isabs(feed_url)
211 if feed_url.startswith('distribution:'):
212 return self.get_packagekit_feed(feed_url)
214 primary = self._download_and_import_feed(feed_url, use_mirror = False)
216 @tasks.named_async("monitor feed downloads for " + feed_url)
217 def wait_for_downloads(primary):
218 # Download just the upstream feed, unless it takes too long...
219 timeout = tasks.TimeoutBlocker(5, 'Mirror timeout') # 5 seconds
221 yield primary, timeout
222 tasks.check(timeout)
224 try:
225 tasks.check(primary)
226 if primary.happened:
227 return # OK, primary succeeded!
228 # OK, maybe it's just being slow...
229 info("Feed download from %s is taking a long time.", feed_url)
230 primary_ex = None
231 except NoTrustedKeys as ex:
232 raise # Don't bother trying the mirror if we have a trust problem
233 except ReplayAttack as ex:
234 raise # Don't bother trying the mirror if we have a replay attack
235 except DownloadAborted as ex:
236 raise # Don't bother trying the mirror if the user cancelled
237 except SafeException as ex:
238 # Primary failed
239 primary = None
240 primary_ex = ex
241 warn(_("Feed download from %(url)s failed: %(exception)s"), {'url': feed_url, 'exception': ex})
243 # Start downloading from mirror...
244 mirror = self._download_and_import_feed(feed_url, use_mirror = True)
246 # Wait until both mirror and primary tasks are complete...
247 while True:
248 blockers = list(filter(None, [primary, mirror]))
249 if not blockers:
250 break
251 yield blockers
253 if primary:
254 try:
255 tasks.check(primary)
256 if primary.happened:
257 primary = None
258 # No point carrying on with the mirror once the primary has succeeded
259 if mirror:
260 info(_("Primary feed download succeeded; aborting mirror download for %s") % feed_url)
261 mirror.dl.abort()
262 except SafeException as ex:
263 primary = None
264 primary_ex = ex
265 info(_("Feed download from %(url)s failed; still trying mirror: %(exception)s"), {'url': feed_url, 'exception': ex})
267 if mirror:
268 try:
269 tasks.check(mirror)
270 if mirror.happened:
271 mirror = None
272 if primary_ex:
273 # We already warned; no need to raise an exception too,
274 # as the mirror download succeeded.
275 primary_ex = None
276 except ReplayAttack as ex:
277 info(_("Version from mirror is older than cached version; ignoring it: %s"), ex)
278 mirror = None
279 primary_ex = None
280 except SafeException as ex:
281 info(_("Mirror download failed: %s"), ex)
282 mirror = None
284 if primary_ex:
285 raise primary_ex
287 return wait_for_downloads(primary)
289 def _download_and_import_feed(self, feed_url, use_mirror):
290 """Download and import a feed.
291 @param use_mirror: False to use primary location; True to use mirror."""
292 if use_mirror:
293 url = self.get_feed_mirror(feed_url)
294 if url is None: return None
295 info(_("Trying mirror server for feed %s") % feed_url)
296 else:
297 url = feed_url
299 dl = self.download_url(url, hint = feed_url)
300 stream = dl.tempfile
302 @tasks.named_async("fetch_feed " + url)
303 def fetch_feed():
304 yield dl.downloaded
305 tasks.check(dl.downloaded)
307 pending = PendingFeed(feed_url, stream)
309 if use_mirror:
310 # If we got the feed from a mirror, get the key from there too
311 key_mirror = self.config.mirror + '/keys/'
312 else:
313 key_mirror = None
315 keys_downloaded = tasks.Task(pending.download_keys(self, feed_hint = feed_url, key_mirror = key_mirror), _("download keys for %s") % feed_url)
316 yield keys_downloaded.finished
317 tasks.check(keys_downloaded.finished)
319 if not self.config.iface_cache.update_feed_if_trusted(pending.url, pending.sigs, pending.new_xml):
320 blocker = self.config.trust_mgr.confirm_keys(pending)
321 if blocker:
322 yield blocker
323 tasks.check(blocker)
324 if not self.config.iface_cache.update_feed_if_trusted(pending.url, pending.sigs, pending.new_xml):
325 raise NoTrustedKeys(_("No signing keys trusted; not importing"))
327 task = fetch_feed()
328 task.dl = dl
329 return task
331 def fetch_key_info(self, fingerprint):
332 try:
333 return self.key_info[fingerprint]
334 except KeyError:
335 self.key_info[fingerprint] = key_info = KeyInfoFetcher(self,
336 self.config.key_info_server, fingerprint)
337 return key_info
339 # (force is deprecated and ignored)
340 def download_impl(self, impl, retrieval_method, stores, force = False):
341 """Download an implementation.
342 @param impl: the selected implementation
343 @type impl: L{model.ZeroInstallImplementation}
344 @param retrieval_method: a way of getting the implementation (e.g. an Archive or a Recipe)
345 @type retrieval_method: L{model.RetrievalMethod}
346 @param stores: where to store the downloaded implementation
347 @type stores: L{zerostore.Stores}
348 @rtype: L{tasks.Blocker}"""
349 assert impl
350 assert retrieval_method
352 if isinstance(retrieval_method, DistributionSource):
353 return retrieval_method.install(self.handler)
355 from zeroinstall.zerostore import manifest, parse_algorithm_digest_pair
356 best = None
357 for digest in impl.digests:
358 alg_name, digest_value = parse_algorithm_digest_pair(digest)
359 alg = manifest.algorithms.get(alg_name, None)
360 if alg and (best is None or best.rating < alg.rating):
361 best = alg
362 required_digest = digest
364 if best is None:
365 if not impl.digests:
366 raise SafeException(_("No <manifest-digest> given for '%(implementation)s' version %(version)s") %
367 {'implementation': impl.feed.get_name(), 'version': impl.get_version()})
368 raise SafeException(_("Unknown digest algorithms '%(algorithms)s' for '%(implementation)s' version %(version)s") %
369 {'algorithms': impl.digests, 'implementation': impl.feed.get_name(), 'version': impl.get_version()})
371 @tasks.async
372 def download_impl(method):
373 original_exception = None
374 while True:
375 try:
376 if isinstance(method, DownloadSource):
377 blocker, stream = self.download_archive(method, impl_hint = impl,
378 may_use_mirror = original_exception is None)
379 yield blocker
380 tasks.check(blocker)
382 stream.seek(0)
383 if self.external_store:
384 self._add_to_external_store(required_digest, [method], [stream])
385 else:
386 self._add_to_cache(required_digest, stores, method, stream)
387 elif isinstance(method, Recipe):
388 blocker = self.cook(required_digest, method, stores, impl_hint = impl)
389 yield blocker
390 tasks.check(blocker)
391 else:
392 raise Exception(_("Unknown download type for '%s'") % method)
393 except download.DownloadError as ex:
394 if original_exception:
395 info("Error from mirror: %s", ex)
396 raise original_exception
397 else:
398 original_exception = ex
399 mirror_url = self._get_impl_mirror(impl)
400 if mirror_url is not None:
401 info("%s: trying implementation mirror at %s", ex, mirror_url)
402 method = model.DownloadSource(impl, mirror_url,
403 None, None, type = 'application/x-bzip-compressed-tar')
404 continue # Retry
405 raise
406 break
408 self.handler.impl_added_to_store(impl)
409 return download_impl(retrieval_method)
411 def _add_to_cache(self, required_digest, stores, retrieval_method, stream):
412 assert isinstance(retrieval_method, DownloadSource)
413 stores.add_archive_to_cache(required_digest, stream, retrieval_method.url, retrieval_method.extract,
414 type = retrieval_method.type, start_offset = retrieval_method.start_offset or 0)
416 def _add_to_external_store(self, required_digest, steps, streams):
417 from zeroinstall.zerostore.unpack import type_from_url
419 # combine archive path, extract directory and MIME type arguments in an alternating fashion
420 paths = map(lambda stream: stream.name, streams)
421 extracts = map(lambda step: step.extract or "", steps)
422 types = map(lambda step: step.type or type_from_url(step.url), steps)
423 args = [None]*(len(paths)+len(extracts)+len(types))
424 args[::3] = paths
425 args[1::3] = extracts
426 args[2::3] = types
428 # close file handles to allow external processes access
429 for stream in streams:
430 stream.close()
432 # delegate extracting archives to external tool
433 import subprocess
434 subprocess.call([self.external_store, "add", required_digest] + args)
436 # delete temp files
437 for path in paths:
438 os.remove(path)
440 # (force is deprecated and ignored)
441 def download_archive(self, download_source, force = False, impl_hint = None, may_use_mirror = False):
442 """Fetch an archive. You should normally call L{download_impl}
443 instead, since it handles other kinds of retrieval method too."""
444 from zeroinstall.zerostore import unpack
446 url = download_source.url
447 if not (url.startswith('http:') or url.startswith('https:') or url.startswith('ftp:')):
448 raise SafeException(_("Unknown scheme in download URL '%s'") % url)
450 mime_type = download_source.type
451 if not mime_type:
452 mime_type = unpack.type_from_url(download_source.url)
453 if not mime_type:
454 raise SafeException(_("No 'type' attribute on archive, and I can't guess from the name (%s)") % download_source.url)
455 if not self.external_store:
456 unpack.check_type_ok(mime_type)
458 if may_use_mirror:
459 mirror = self._get_archive_mirror(download_source)
460 else:
461 mirror = None
463 dl = self.download_url(download_source.url, hint = impl_hint, mirror_url = mirror)
464 if download_source.size is not None:
465 dl.expected_size = download_source.size + (download_source.start_offset or 0)
466 # (else don't know sizes for mirrored archives)
467 return (dl.downloaded, dl.tempfile)
469 # (force is deprecated and ignored)
470 def download_icon(self, interface, force = False):
471 """Download an icon for this interface and add it to the
472 icon cache. If the interface has no icon do nothing.
473 @return: the task doing the import, or None
474 @rtype: L{tasks.Task}"""
475 debug("download_icon %(interface)s", {'interface': interface})
477 modification_time = None
478 existing_icon = self.config.iface_cache.get_icon_path(interface)
479 if existing_icon:
480 file_mtime = os.stat(existing_icon).st_mtime
481 from email.utils import formatdate
482 modification_time = formatdate(timeval = file_mtime, localtime = False, usegmt = True)
484 # Find a suitable icon to download
485 for icon in interface.get_metadata(XMLNS_IFACE, 'icon'):
486 type = icon.getAttribute('type')
487 if type != 'image/png':
488 debug(_('Skipping non-PNG icon'))
489 continue
490 source = icon.getAttribute('href')
491 if source:
492 break
493 warn(_('Missing "href" attribute on <icon> in %s'), interface)
494 else:
495 info(_('No PNG icons found in %s'), interface)
496 return
498 dl = self.download_url(source, hint = interface, modification_time = modification_time)
500 @tasks.async
501 def download_and_add_icon():
502 stream = dl.tempfile
503 yield dl.downloaded
504 try:
505 tasks.check(dl.downloaded)
506 if dl.unmodified: return
507 stream.seek(0)
509 import shutil, tempfile
510 icons_cache = basedir.save_cache_path(config_site, 'interface_icons')
512 tmp_file = tempfile.NamedTemporaryFile(dir = icons_cache, delete = False)
513 shutil.copyfileobj(stream, tmp_file)
514 tmp_file.close()
516 icon_file = os.path.join(icons_cache, escape(interface.uri))
517 portable_rename(tmp_file.name, icon_file)
518 except Exception as ex:
519 self.handler.report_error(ex)
520 finally:
521 stream.close()
523 return download_and_add_icon()
525 def download_impls(self, implementations, stores):
526 """Download the given implementations, choosing a suitable retrieval method for each.
527 If any of the retrieval methods are DistributionSources and
528 need confirmation, handler.confirm is called to check that the
529 installation should proceed.
531 unsafe_impls = []
533 to_download = []
534 for impl in implementations:
535 debug(_("start_downloading_impls: for %(feed)s get %(implementation)s"), {'feed': impl.feed, 'implementation': impl})
536 source = self.get_best_source(impl)
537 if not source:
538 raise SafeException(_("Implementation %(implementation_id)s of interface %(interface)s"
539 " cannot be downloaded (no download locations given in "
540 "interface!)") % {'implementation_id': impl.id, 'interface': impl.feed.get_name()})
541 to_download.append((impl, source))
543 if isinstance(source, DistributionSource) and source.needs_confirmation:
544 unsafe_impls.append(source.package_id)
546 @tasks.async
547 def download_impls():
548 if unsafe_impls:
549 confirm = self.handler.confirm_install(_('The following components need to be installed using native packages. '
550 'These come from your distribution, and should therefore be trustworthy, but they also '
551 'run with extra privileges. In particular, installing them may run extra services on your '
552 'computer or affect other users. You may be asked to enter a password to confirm. The '
553 'packages are:\n\n') + ('\n'.join('- ' + x for x in unsafe_impls)))
554 yield confirm
555 tasks.check(confirm)
557 blockers = []
559 for impl, source in to_download:
560 blockers.append(self.download_impl(impl, source, stores))
562 # Record the first error log the rest
563 error = []
564 def dl_error(ex, tb = None):
565 if error:
566 self.handler.report_error(ex)
567 else:
568 error.append((ex, tb))
569 while blockers:
570 yield blockers
571 tasks.check(blockers, dl_error)
573 blockers = [b for b in blockers if not b.happened]
574 if error:
575 from zeroinstall import support
576 support.raise_with_traceback(*error[0])
578 if not to_download:
579 return None
581 return download_impls()
583 def get_best_source(self, impl):
584 """Return the best download source for this implementation.
585 @rtype: L{model.RetrievalMethod}"""
586 if impl.download_sources:
587 return impl.download_sources[0]
588 return None
590 def download_url(self, url, hint = None, modification_time = None, expected_size = None, mirror_url = None):
591 """The most low-level method here; just download a raw URL.
592 @param url: the location to download from
593 @param hint: user-defined data to store on the Download (e.g. used by the GUI)
594 @param modification_time: don't download unless newer than this
595 @param mirror_url: an altertive URL to try if this one fails
596 @type mirror_url: str
597 @rtype: L{download.Download}
598 @since: 1.5
600 if self.handler.dry_run:
601 raise NeedDownload(url)
603 dl = download.Download(url, hint = hint, modification_time = modification_time, expected_size = expected_size, auto_delete = not self.external_store)
604 dl.mirror = mirror_url
605 self.handler.monitor_download(dl)
606 dl.downloaded = self.scheduler.download(dl)
607 return dl
609 class StepRunner(object):
610 """The base class of all step runners.
611 @since: 1.10"""
613 def __init__(self, stepdata, impl_hint):
614 self.stepdata = stepdata
615 self.impl_hint = impl_hint
617 def prepare(self, fetcher, blockers):
618 pass
620 @classmethod
621 def class_for(cls, model):
622 for subcls in cls.__subclasses__():
623 if subcls.model_type == type(model):
624 return subcls
625 assert False, "Couldn't find step runner for %s" % (type(model),)
627 class RenameStepRunner(StepRunner):
628 """A step runner for the <rename> step.
629 @since: 1.10"""
631 model_type = model.RenameStep
633 def apply(self, basedir):
634 source = native_path_within_base(basedir, self.stepdata.source)
635 dest = native_path_within_base(basedir, self.stepdata.dest)
636 os.rename(source, dest)
638 class DownloadStepRunner(StepRunner):
639 """A step runner for the <archive> step.
640 @since: 1.10"""
642 model_type = model.DownloadSource
644 def prepare(self, fetcher, blockers):
645 self.blocker, self.stream = fetcher.download_archive(self.stepdata, impl_hint = self.impl_hint, may_use_mirror = True)
646 assert self.stream
647 blockers.append(self.blocker)
649 def apply(self, basedir):
650 from zeroinstall.zerostore import unpack
651 assert self.blocker.happened
652 unpack.unpack_archive_over(self.stepdata.url, self.stream, basedir,
653 extract = self.stepdata.extract,
654 type=self.stepdata.type,
655 start_offset = self.stepdata.start_offset or 0)
657 def native_path_within_base(base, crossplatform_path):
658 """Takes a cross-platform relative path (i.e using forward slashes, even on windows)
659 and returns the absolute, platform-native version of the path.
660 If the path does not resolve to a location within `base`, a SafeError is raised.
661 @since: 1.10
663 assert os.path.isabs(base)
664 if crossplatform_path.startswith("/"):
665 raise SafeException("path %r is not within the base directory" % (crossplatform_path,))
666 native_path = os.path.join(*crossplatform_path.split("/"))
667 fullpath = os.path.realpath(os.path.join(base, native_path))
668 base = os.path.realpath(base)
669 if not fullpath.startswith(base + os.path.sep):
670 raise SafeException("path %r is not within the base directory" % (crossplatform_path,))
671 return fullpath