Merged changes from master
[zeroinstall/solver.git] / zeroinstall / injector / fetch.py
blobcc222f4cd9b7f064587bdc1ca49915b3269e62d6
1 """
2 Downloads feeds, keys, packages and icons.
3 """
5 # Copyright (C) 2009, Thomas Leonard
6 # See the README file for details, or visit http://0install.net.
8 from zeroinstall import _, NeedDownload, logger
9 import os, sys
11 from zeroinstall import support
12 from zeroinstall.support import tasks, basedir, portable_rename
13 from zeroinstall.injector.namespaces import XMLNS_IFACE, config_site
14 from zeroinstall.injector import model
15 from zeroinstall.injector.model import DownloadSource, Recipe, SafeException, escape, DistributionSource
16 from zeroinstall.injector.iface_cache import PendingFeed, ReplayAttack
17 from zeroinstall.injector.handler import NoTrustedKeys
18 from zeroinstall.injector import download
20 def _escape_slashes(path):
21 return path.replace('/', '%23')
23 def _get_feed_dir(feed):
24 """The algorithm from 0mirror."""
25 if '#' in feed:
26 raise SafeException(_("Invalid URL '%s'") % feed)
27 scheme, rest = feed.split('://', 1)
28 assert '/' in rest, "Missing / in %s" % feed
29 domain, rest = rest.split('/', 1)
30 for x in [scheme, domain, rest]:
31 if not x or x.startswith('.'):
32 raise SafeException(_("Invalid URL '%s'") % feed)
33 return '/'.join(['feeds', scheme, domain, _escape_slashes(rest)])
35 class KeyInfoFetcher:
36 """Fetches information about a GPG key from a key-info server.
37 See L{Fetcher.fetch_key_info} for details.
38 @since: 0.42
40 Example:
42 >>> kf = KeyInfoFetcher(fetcher, 'https://server', fingerprint)
43 >>> while True:
44 print kf.info
45 if kf.blocker is None: break
46 print kf.status
47 yield kf.blocker
48 """
49 def __init__(self, fetcher, server, fingerprint):
50 self.fingerprint = fingerprint
51 self.info = []
52 self.blocker = None
54 if server is None: return
56 self.status = _('Fetching key information from %s...') % server
58 dl = fetcher.download_url(server + '/key/' + fingerprint)
60 from xml.dom import minidom
62 @tasks.async
63 def fetch_key_info():
64 tempfile = dl.tempfile
65 try:
66 yield dl.downloaded
67 self.blocker = None
68 tasks.check(dl.downloaded)
69 tempfile.seek(0)
70 doc = minidom.parse(tempfile)
71 if doc.documentElement.localName != 'key-lookup':
72 raise SafeException(_('Expected <key-lookup>, not <%s>') % doc.documentElement.localName)
73 self.info += doc.documentElement.childNodes
74 except Exception as ex:
75 doc = minidom.parseString('<item vote="bad"/>')
76 root = doc.documentElement
77 root.appendChild(doc.createTextNode(_('Error getting key information: %s') % ex))
78 self.info.append(root)
79 finally:
80 tempfile.close()
82 self.blocker = fetch_key_info()
84 class Fetcher(object):
85 """Downloads and stores various things.
86 @ivar config: used to get handler, iface_cache and stores
87 @type config: L{config.Config}
88 @ivar key_info: caches information about GPG keys
89 @type key_info: {str: L{KeyInfoFetcher}}
90 """
91 __slots__ = ['config', 'key_info', '_scheduler', 'external_store']
93 def __init__(self, config):
94 assert config.handler, "API change!"
95 self.config = config
96 self.key_info = {}
97 self._scheduler = None
98 self.external_store = os.environ.get('ZEROINSTALL_EXTERNAL_STORE')
100 @property
101 def handler(self):
102 return self.config.handler
104 @property
105 def scheduler(self):
106 if self._scheduler is None:
107 from . import scheduler
108 self._scheduler = scheduler.DownloadScheduler()
109 return self._scheduler
111 # (force is deprecated and ignored)
112 @tasks.async
113 def cook(self, required_digest, recipe, stores, force = False, impl_hint = None):
114 """Follow a Recipe.
115 @param impl_hint: the Implementation this is for (if any) as a hint for the GUI
116 @see: L{download_impl} uses this method when appropriate"""
117 # Maybe we're taking this metaphor too far?
119 # Start a download for each ingredient
120 blockers = []
121 steps = []
122 try:
123 for stepdata in recipe.steps:
124 cls = StepRunner.class_for(stepdata)
125 step = cls(stepdata, impl_hint=impl_hint)
126 step.prepare(self, blockers)
127 steps.append(step)
129 while blockers:
130 yield blockers
131 tasks.check(blockers)
132 blockers = [b for b in blockers if not b.happened]
135 if self.external_store:
136 # Note: external_store will not yet work with non-<archive> steps.
137 streams = [step.stream for step in steps]
138 self._add_to_external_store(required_digest, recipe.steps, streams)
139 else:
140 # Create an empty directory for the new implementation
141 store = stores.stores[0]
142 tmpdir = store.get_tmp_dir_for(required_digest)
143 try:
144 # Unpack each of the downloaded archives into it in turn
145 for step in steps:
146 step.apply(tmpdir)
147 # Check that the result is correct and store it in the cache
148 store.check_manifest_and_rename(required_digest, tmpdir)
149 tmpdir = None
150 finally:
151 # If unpacking fails, remove the temporary directory
152 if tmpdir is not None:
153 support.ro_rmtree(tmpdir)
154 finally:
155 for step in steps:
156 step.close()
158 def _get_mirror_url(self, feed_url, resource):
159 """Return the URL of a mirror for this feed."""
160 if self.config.mirror is None:
161 return None
162 if support.urlparse(feed_url).hostname == 'localhost':
163 return None
164 return '%s/%s/%s' % (self.config.mirror, _get_feed_dir(feed_url), resource)
166 def get_feed_mirror(self, url):
167 """Return the URL of a mirror for this feed."""
168 return self._get_mirror_url(url, 'latest.xml')
170 def _get_archive_mirror(self, source):
171 if self.config.mirror is None:
172 return None
173 if support.urlparse(source.url).hostname == 'localhost':
174 return None
175 if sys.version_info[0] > 2:
176 from urllib.parse import quote
177 else:
178 from urllib import quote
179 return '{mirror}/archive/{archive}'.format(
180 mirror = self.config.mirror,
181 archive = quote(source.url.replace('/', '#'), safe = ''))
183 def _get_impl_mirror(self, impl):
184 return self._get_mirror_url(impl.feed.url, 'impl/' + _escape_slashes(impl.id))
186 @tasks.async
187 def get_packagekit_feed(self, feed_url):
188 """Send a query to PackageKit (if available) for information about this package.
189 On success, the result is added to iface_cache.
191 assert feed_url.startswith('distribution:'), feed_url
192 master_feed = self.config.iface_cache.get_feed(feed_url.split(':', 1)[1])
193 if master_feed:
194 fetch = self.config.iface_cache.distro.fetch_candidates(master_feed)
195 if fetch:
196 yield fetch
197 tasks.check(fetch)
199 # Force feed to be regenerated with the new information
200 self.config.iface_cache.get_feed(feed_url, force = True)
202 def download_and_import_feed(self, feed_url, iface_cache = None):
203 """Download the feed, download any required keys, confirm trust if needed and import.
204 @param feed_url: the feed to be downloaded
205 @type feed_url: str
206 @param iface_cache: (deprecated)"""
207 from .download import DownloadAborted
209 assert iface_cache is None or iface_cache is self.config.iface_cache
211 self.config.iface_cache.mark_as_checking(feed_url)
213 logger.debug(_("download_and_import_feed %(url)s"), {'url': feed_url})
214 assert not os.path.isabs(feed_url)
216 if feed_url.startswith('distribution:'):
217 return self.get_packagekit_feed(feed_url)
219 primary = self._download_and_import_feed(feed_url, use_mirror = False)
221 @tasks.named_async("monitor feed downloads for " + feed_url)
222 def wait_for_downloads(primary):
223 # Download just the upstream feed, unless it takes too long...
224 timeout = tasks.TimeoutBlocker(5, 'Mirror timeout') # 5 seconds
226 yield primary, timeout
227 tasks.check(timeout)
229 try:
230 tasks.check(primary)
231 if primary.happened:
232 return # OK, primary succeeded!
233 # OK, maybe it's just being slow...
234 logger.info("Feed download from %s is taking a long time.", feed_url)
235 primary_ex = None
236 except NoTrustedKeys as ex:
237 raise # Don't bother trying the mirror if we have a trust problem
238 except ReplayAttack as ex:
239 raise # Don't bother trying the mirror if we have a replay attack
240 except DownloadAborted as ex:
241 raise # Don't bother trying the mirror if the user cancelled
242 except SafeException as ex:
243 # Primary failed
244 primary = None
245 primary_ex = ex
246 logger.warn(_("Feed download from %(url)s failed: %(exception)s"), {'url': feed_url, 'exception': ex})
248 # Start downloading from mirror...
249 mirror = self._download_and_import_feed(feed_url, use_mirror = True)
251 # Wait until both mirror and primary tasks are complete...
252 while True:
253 blockers = list(filter(None, [primary, mirror]))
254 if not blockers:
255 break
256 yield blockers
258 if primary:
259 try:
260 tasks.check(primary)
261 if primary.happened:
262 primary = None
263 # No point carrying on with the mirror once the primary has succeeded
264 if mirror:
265 logger.info(_("Primary feed download succeeded; aborting mirror download for %s") % feed_url)
266 mirror.dl.abort()
267 except SafeException as ex:
268 primary = None
269 primary_ex = ex
270 logger.info(_("Feed download from %(url)s failed; still trying mirror: %(exception)s"), {'url': feed_url, 'exception': ex})
272 if mirror:
273 try:
274 tasks.check(mirror)
275 if mirror.happened:
276 mirror = None
277 if primary_ex:
278 # We already warned; no need to raise an exception too,
279 # as the mirror download succeeded.
280 primary_ex = None
281 except ReplayAttack as ex:
282 logger.info(_("Version from mirror is older than cached version; ignoring it: %s"), ex)
283 mirror = None
284 primary_ex = None
285 except SafeException as ex:
286 logger.info(_("Mirror download failed: %s"), ex)
287 mirror = None
289 if primary_ex:
290 raise primary_ex
292 return wait_for_downloads(primary)
294 def _download_and_import_feed(self, feed_url, use_mirror):
295 """Download and import a feed.
296 @param use_mirror: False to use primary location; True to use mirror."""
297 if use_mirror:
298 url = self.get_feed_mirror(feed_url)
299 if url is None: return None
300 logger.info(_("Trying mirror server for feed %s") % feed_url)
301 else:
302 url = feed_url
304 dl = self.download_url(url, hint = feed_url)
305 stream = dl.tempfile
307 @tasks.named_async("fetch_feed " + url)
308 def fetch_feed():
309 try:
310 yield dl.downloaded
311 tasks.check(dl.downloaded)
313 pending = PendingFeed(feed_url, stream)
315 if use_mirror:
316 # If we got the feed from a mirror, get the key from there too
317 key_mirror = self.config.mirror + '/keys/'
318 else:
319 key_mirror = None
321 keys_downloaded = tasks.Task(pending.download_keys(self, feed_hint = feed_url, key_mirror = key_mirror), _("download keys for %s") % feed_url)
322 yield keys_downloaded.finished
323 tasks.check(keys_downloaded.finished)
325 if not self.config.iface_cache.update_feed_if_trusted(pending.url, pending.sigs, pending.new_xml):
326 blocker = self.config.trust_mgr.confirm_keys(pending)
327 if blocker:
328 yield blocker
329 tasks.check(blocker)
330 if not self.config.iface_cache.update_feed_if_trusted(pending.url, pending.sigs, pending.new_xml):
331 raise NoTrustedKeys(_("No signing keys trusted; not importing"))
332 finally:
333 stream.close()
335 task = fetch_feed()
336 task.dl = dl
337 return task
339 def fetch_key_info(self, fingerprint):
340 try:
341 return self.key_info[fingerprint]
342 except KeyError:
343 self.key_info[fingerprint] = key_info = KeyInfoFetcher(self,
344 self.config.key_info_server, fingerprint)
345 return key_info
347 # (force is deprecated and ignored)
348 def download_impl(self, impl, retrieval_method, stores, force = False):
349 """Download an implementation.
350 @param impl: the selected implementation
351 @type impl: L{model.ZeroInstallImplementation}
352 @param retrieval_method: a way of getting the implementation (e.g. an Archive or a Recipe)
353 @type retrieval_method: L{model.RetrievalMethod}
354 @param stores: where to store the downloaded implementation
355 @type stores: L{zerostore.Stores}
356 @rtype: L{tasks.Blocker}"""
357 assert impl
358 assert retrieval_method
360 if isinstance(retrieval_method, DistributionSource):
361 return retrieval_method.install(self.handler)
363 from zeroinstall.zerostore import manifest, parse_algorithm_digest_pair
364 best = None
365 for digest in impl.digests:
366 alg_name, digest_value = parse_algorithm_digest_pair(digest)
367 alg = manifest.algorithms.get(alg_name, None)
368 if alg and (best is None or best.rating < alg.rating):
369 best = alg
370 required_digest = digest
372 if best is None:
373 if not impl.digests:
374 raise SafeException(_("No <manifest-digest> given for '%(implementation)s' version %(version)s") %
375 {'implementation': impl.feed.get_name(), 'version': impl.get_version()})
376 raise SafeException(_("Unknown digest algorithms '%(algorithms)s' for '%(implementation)s' version %(version)s") %
377 {'algorithms': impl.digests, 'implementation': impl.feed.get_name(), 'version': impl.get_version()})
379 @tasks.async
380 def download_impl(method):
381 original_exception = None
382 while True:
383 try:
384 if isinstance(method, DownloadSource):
385 blocker, stream = self.download_archive(method, impl_hint = impl,
386 may_use_mirror = original_exception is None)
387 try:
388 yield blocker
389 tasks.check(blocker)
391 stream.seek(0)
392 if self.external_store:
393 self._add_to_external_store(required_digest, [method], [stream])
394 else:
395 self._add_to_cache(required_digest, stores, method, stream)
396 finally:
397 stream.close()
398 elif isinstance(method, Recipe):
399 blocker = self.cook(required_digest, method, stores, impl_hint = impl)
400 yield blocker
401 tasks.check(blocker)
402 else:
403 raise Exception(_("Unknown download type for '%s'") % method)
404 except download.DownloadError as ex:
405 if original_exception:
406 logger.info("Error from mirror: %s", ex)
407 raise original_exception
408 else:
409 original_exception = ex
410 mirror_url = self._get_impl_mirror(impl)
411 if mirror_url is not None:
412 logger.info("%s: trying implementation mirror at %s", ex, mirror_url)
413 method = model.DownloadSource(impl, mirror_url,
414 None, None, type = 'application/x-bzip-compressed-tar')
415 continue # Retry
416 raise
417 break
419 self.handler.impl_added_to_store(impl)
420 return download_impl(retrieval_method)
422 def _add_to_cache(self, required_digest, stores, retrieval_method, stream):
423 assert isinstance(retrieval_method, DownloadSource)
424 stores.add_archive_to_cache(required_digest, stream, retrieval_method.url, retrieval_method.extract,
425 type = retrieval_method.type, start_offset = retrieval_method.start_offset or 0)
427 def _add_to_external_store(self, required_digest, steps, streams):
428 from zeroinstall.zerostore.unpack import type_from_url
430 # combine archive path, extract directory and MIME type arguments in an alternating fashion
431 paths = map(lambda stream: stream.name, streams)
432 extracts = map(lambda step: step.extract or "", steps)
433 types = map(lambda step: step.type or type_from_url(step.url), steps)
434 args = [None]*(len(paths)+len(extracts)+len(types))
435 args[::3] = paths
436 args[1::3] = extracts
437 args[2::3] = types
439 # close file handles to allow external processes access
440 for stream in streams:
441 stream.close()
443 # delegate extracting archives to external tool
444 import subprocess
445 subprocess.call([self.external_store, "add", required_digest] + args)
447 # delete temp files
448 for path in paths:
449 os.remove(path)
451 # (force is deprecated and ignored)
452 def download_archive(self, download_source, force = False, impl_hint = None, may_use_mirror = False):
453 """Fetch an archive. You should normally call L{download_impl}
454 instead, since it handles other kinds of retrieval method too.
455 It is the caller's responsibility to ensure that the returned stream is closed.
457 from zeroinstall.zerostore import unpack
459 url = download_source.url
460 if not (url.startswith('http:') or url.startswith('https:') or url.startswith('ftp:')):
461 raise SafeException(_("Unknown scheme in download URL '%s'") % url)
463 mime_type = download_source.type
464 if not mime_type:
465 mime_type = unpack.type_from_url(download_source.url)
466 if not mime_type:
467 raise SafeException(_("No 'type' attribute on archive, and I can't guess from the name (%s)") % download_source.url)
468 if not self.external_store:
469 unpack.check_type_ok(mime_type)
471 if may_use_mirror:
472 mirror = self._get_archive_mirror(download_source)
473 else:
474 mirror = None
476 dl = self.download_url(download_source.url, hint = impl_hint, mirror_url = mirror)
477 if download_source.size is not None:
478 dl.expected_size = download_source.size + (download_source.start_offset or 0)
479 # (else don't know sizes for mirrored archives)
480 return (dl.downloaded, dl.tempfile)
482 # (force is deprecated and ignored)
483 def download_icon(self, interface, force = False):
484 """Download an icon for this interface and add it to the
485 icon cache. If the interface has no icon do nothing.
486 @return: the task doing the import, or None
487 @rtype: L{tasks.Task}"""
488 logger.debug("download_icon %(interface)s", {'interface': interface})
490 modification_time = None
491 existing_icon = self.config.iface_cache.get_icon_path(interface)
492 if existing_icon:
493 file_mtime = os.stat(existing_icon).st_mtime
494 from email.utils import formatdate
495 modification_time = formatdate(timeval = file_mtime, localtime = False, usegmt = True)
497 feed = self.config.iface_cache.get_feed(interface.uri)
498 if feed is None:
499 return None
501 # Find a suitable icon to download
502 for icon in feed.get_metadata(XMLNS_IFACE, 'icon'):
503 type = icon.getAttribute('type')
504 if type != 'image/png':
505 logger.debug(_('Skipping non-PNG icon'))
506 continue
507 source = icon.getAttribute('href')
508 if source:
509 break
510 logger.warn(_('Missing "href" attribute on <icon> in %s'), interface)
511 else:
512 logger.info(_('No PNG icons found in %s'), interface)
513 return
515 dl = self.download_url(source, hint = interface, modification_time = modification_time)
517 @tasks.async
518 def download_and_add_icon():
519 stream = dl.tempfile
520 try:
521 yield dl.downloaded
522 tasks.check(dl.downloaded)
523 if dl.unmodified: return
524 stream.seek(0)
526 import shutil, tempfile
527 icons_cache = basedir.save_cache_path(config_site, 'interface_icons')
529 tmp_file = tempfile.NamedTemporaryFile(dir = icons_cache, delete = False)
530 shutil.copyfileobj(stream, tmp_file)
531 tmp_file.close()
533 icon_file = os.path.join(icons_cache, escape(interface.uri))
534 portable_rename(tmp_file.name, icon_file)
535 finally:
536 stream.close()
538 return download_and_add_icon()
540 def download_impls(self, implementations, stores):
541 """Download the given implementations, choosing a suitable retrieval method for each.
542 If any of the retrieval methods are DistributionSources and
543 need confirmation, handler.confirm is called to check that the
544 installation should proceed.
546 unsafe_impls = []
548 to_download = []
549 for impl in implementations:
550 logger.debug(_("start_downloading_impls: for %(feed)s get %(implementation)s"), {'feed': impl.feed, 'implementation': impl})
551 source = self.get_best_source(impl)
552 if not source:
553 raise SafeException(_("Implementation %(implementation_id)s of interface %(interface)s"
554 " cannot be downloaded (no download locations given in "
555 "interface!)") % {'implementation_id': impl.id, 'interface': impl.feed.get_name()})
556 to_download.append((impl, source))
558 if isinstance(source, DistributionSource) and source.needs_confirmation:
559 unsafe_impls.append(source.package_id)
561 @tasks.async
562 def download_impls():
563 if unsafe_impls:
564 confirm = self.handler.confirm_install(_('The following components need to be installed using native packages. '
565 'These come from your distribution, and should therefore be trustworthy, but they also '
566 'run with extra privileges. In particular, installing them may run extra services on your '
567 'computer or affect other users. You may be asked to enter a password to confirm. The '
568 'packages are:\n\n') + ('\n'.join('- ' + x for x in unsafe_impls)))
569 yield confirm
570 tasks.check(confirm)
572 blockers = []
574 for impl, source in to_download:
575 blockers.append(self.download_impl(impl, source, stores))
577 # Record the first error log the rest
578 error = []
579 def dl_error(ex, tb = None):
580 if error:
581 self.handler.report_error(ex)
582 else:
583 error.append((ex, tb))
584 while blockers:
585 yield blockers
586 tasks.check(blockers, dl_error)
588 blockers = [b for b in blockers if not b.happened]
589 if error:
590 from zeroinstall import support
591 support.raise_with_traceback(*error[0])
593 if not to_download:
594 return None
596 return download_impls()
598 def get_best_source(self, impl):
599 """Return the best download source for this implementation.
600 @rtype: L{model.RetrievalMethod}"""
601 if impl.download_sources:
602 return impl.download_sources[0]
603 return None
605 def download_url(self, url, hint = None, modification_time = None, expected_size = None, mirror_url = None):
606 """The most low-level method here; just download a raw URL.
607 It is the caller's responsibility to ensure that dl.stream is closed.
608 @param url: the location to download from
609 @param hint: user-defined data to store on the Download (e.g. used by the GUI)
610 @param modification_time: don't download unless newer than this
611 @param mirror_url: an altertive URL to try if this one fails
612 @type mirror_url: str
613 @rtype: L{download.Download}
614 @since: 1.5
616 if self.handler.dry_run:
617 raise NeedDownload(url)
619 dl = download.Download(url, hint = hint, modification_time = modification_time, expected_size = expected_size, auto_delete = not self.external_store)
620 dl.mirror = mirror_url
621 self.handler.monitor_download(dl)
622 dl.downloaded = self.scheduler.download(dl)
623 return dl
625 class StepRunner(object):
626 """The base class of all step runners.
627 @since: 1.10"""
629 def __init__(self, stepdata, impl_hint):
630 self.stepdata = stepdata
631 self.impl_hint = impl_hint
633 def prepare(self, fetcher, blockers):
634 pass
636 @classmethod
637 def class_for(cls, model):
638 for subcls in cls.__subclasses__():
639 if subcls.model_type == type(model):
640 return subcls
641 assert False, "Couldn't find step runner for %s" % (type(model),)
643 def close(self):
644 """Release any resources (called on success or failure)."""
645 pass
647 class RenameStepRunner(StepRunner):
648 """A step runner for the <rename> step.
649 @since: 1.10"""
651 model_type = model.RenameStep
653 def apply(self, basedir):
654 source = native_path_within_base(basedir, self.stepdata.source)
655 dest = native_path_within_base(basedir, self.stepdata.dest)
656 os.rename(source, dest)
658 class DownloadStepRunner(StepRunner):
659 """A step runner for the <archive> step.
660 @since: 1.10"""
662 model_type = model.DownloadSource
664 def prepare(self, fetcher, blockers):
665 self.blocker, self.stream = fetcher.download_archive(self.stepdata, impl_hint = self.impl_hint, may_use_mirror = True)
666 assert self.stream
667 blockers.append(self.blocker)
669 def apply(self, basedir):
670 from zeroinstall.zerostore import unpack
671 assert self.blocker.happened
672 unpack.unpack_archive_over(self.stepdata.url, self.stream, basedir,
673 extract = self.stepdata.extract,
674 type=self.stepdata.type,
675 start_offset = self.stepdata.start_offset or 0)
677 def close(self):
678 self.stream.close()
680 def native_path_within_base(base, crossplatform_path):
681 """Takes a cross-platform relative path (i.e using forward slashes, even on windows)
682 and returns the absolute, platform-native version of the path.
683 If the path does not resolve to a location within `base`, a SafeError is raised.
684 @since: 1.10
686 assert os.path.isabs(base)
687 if crossplatform_path.startswith("/"):
688 raise SafeException("path %r is not within the base directory" % (crossplatform_path,))
689 native_path = os.path.join(*crossplatform_path.split("/"))
690 fullpath = os.path.realpath(os.path.join(base, native_path))
691 base = os.path.realpath(base)
692 if not fullpath.startswith(base + os.path.sep):
693 raise SafeException("path %r is not within the base directory" % (crossplatform_path,))
694 return fullpath