Made DownloadSource and UnpackArchive stateless using a 'StepCommand'
[zeroinstall/zeroinstall-limyreth.git] / zeroinstall / injector / fetch.py
blob94c616b50709f7c2912beedc4cb4386c3c0f065a
1 """
2 Downloads feeds, keys, packages and icons.
3 """
5 # Copyright (C) 2009, Thomas Leonard
6 # See the README file for details, or visit http://0install.net.
8 from zeroinstall import _
9 import os
10 from logging import info, debug, warn
12 from zeroinstall.support import tasks, basedir
13 from zeroinstall.injector.namespaces import XMLNS_IFACE, config_site
14 from zeroinstall.injector.model import DownloadSource, Recipe, SafeException, escape, DistributionSource
15 from zeroinstall.injector.iface_cache import PendingFeed, ReplayAttack
16 from zeroinstall.injector.handler import NoTrustedKeys
17 from zeroinstall.injector import download
19 def _escape_slashes(path):
20 return path.replace('/', '%23')
22 def _get_feed_dir(feed):
23 """The algorithm from 0mirror."""
24 if '#' in feed:
25 raise SafeException(_("Invalid URL '%s'") % feed)
26 scheme, rest = feed.split('://', 1)
27 assert '/' in rest, "Missing / in %s" % feed
28 domain, rest = rest.split('/', 1)
29 for x in [scheme, domain, rest]:
30 if not x or x.startswith(','):
31 raise SafeException(_("Invalid URL '%s'") % feed)
32 return os.path.join('feeds', scheme, domain, _escape_slashes(rest))
34 class KeyInfoFetcher:
35 """Fetches information about a GPG key from a key-info server.
36 See L{Fetcher.fetch_key_info} for details.
37 @since: 0.42
39 Example:
41 >>> kf = KeyInfoFetcher(handler, 'https://server', fingerprint)
42 >>> while True:
43 print kf.info
44 if kf.blocker is None: break
45 print kf.status
46 yield kf.blocker
47 """
48 def __init__(self, handler, server, fingerprint):
49 self.fingerprint = fingerprint
50 self.info = []
51 self.blocker = None
53 if server is None: return
55 self.status = _('Fetching key information from %s...') % server
57 dl = handler.get_download(server + '/key/' + fingerprint)
59 from xml.dom import minidom
61 @tasks.async
62 def fetch_key_info():
63 try:
64 tempfile = dl.tempfile
65 yield dl.downloaded
66 self.blocker = None
67 tasks.check(dl.downloaded)
68 tempfile.seek(0)
69 doc = minidom.parse(tempfile)
70 if doc.documentElement.localName != 'key-lookup':
71 raise SafeException(_('Expected <key-lookup>, not <%s>') % doc.documentElement.localName)
72 self.info += doc.documentElement.childNodes
73 except Exception, ex:
74 doc = minidom.parseString('<item vote="bad"/>')
75 root = doc.documentElement
76 root.appendChild(doc.createTextNode(_('Error getting key information: %s') % ex))
77 self.info.append(root)
79 self.blocker = fetch_key_info()
81 class Fetcher(object):
82 """Downloads and stores various things.
83 @ivar config: used to get handler, iface_cache and stores
84 @type config: L{config.Config}
85 @ivar key_info: caches information about GPG keys
86 @type key_info: {str: L{KeyInfoFetcher}}
87 """
88 __slots__ = ['config', 'key_info']
90 def __init__(self, config):
91 assert config.handler, "API change!"
92 self.config = config
93 self.key_info = {}
95 @property
96 def handler(self):
97 return self.config.handler
99 @tasks.async
100 def cook(self, required_digest, recipe, stores, force = False, impl_hint = None):
101 """Follow a Recipe.
102 @param impl_hint: the Implementation this is for (if any) as a hint for the GUI
103 @see: L{download_impl} uses this method when appropriate"""
104 # Maybe we're taking this metaphor too far?
106 # Start preparing all steps
107 step_commands = [step.prepare(self, force, impl_hint) for step in recipe.steps]
109 # Create an empty directory for the new implementation
110 store = stores.stores[0]
111 tmpdir = store.get_tmp_dir_for(required_digest)
113 try:
114 # Run steps
115 valid_blockers = [s.blocker for s in step_commands if s.blocker is not None]
116 for step_command in step_commands:
117 if step_command.blocker:
118 while not step_command.blocker.happened:
119 yield valid_blockers
120 tasks.check(valid_blockers)
121 step_command.run(tmpdir)
123 # Check that the result is correct and store it in the cache
124 store.check_manifest_and_rename(required_digest, tmpdir)
125 tmpdir = None
126 finally:
127 # If unpacking fails, remove the temporary directory
128 if tmpdir is not None:
129 from zeroinstall import support
130 support.ro_rmtree(tmpdir)
132 def get_feed_mirror(self, url):
133 """Return the URL of a mirror for this feed."""
134 if self.config.feed_mirror is None:
135 return None
136 import urlparse
137 if urlparse.urlparse(url).hostname == 'localhost':
138 return None
139 return '%s/%s/latest.xml' % (self.config.feed_mirror, _get_feed_dir(url))
141 @tasks.async
142 def get_packagekit_feed(self, feed_url):
143 """Send a query to PackageKit (if available) for information about this package.
144 On success, the result is added to iface_cache.
146 assert feed_url.startswith('distribution:'), feed_url
147 master_feed = self.config.iface_cache.get_feed(feed_url.split(':', 1)[1])
148 if master_feed:
149 fetch = self.config.iface_cache.distro.fetch_candidates(master_feed)
150 if fetch:
151 yield fetch
152 tasks.check(fetch)
154 # Force feed to be regenerated with the new information
155 self.config.iface_cache.get_feed(feed_url, force = True)
157 def download_and_import_feed(self, feed_url, iface_cache = None, force = False):
158 """Download the feed, download any required keys, confirm trust if needed and import.
159 @param feed_url: the feed to be downloaded
160 @type feed_url: str
161 @param iface_cache: (deprecated)
162 @param force: whether to abort and restart an existing download"""
163 from .download import DownloadAborted
165 assert iface_cache is None or iface_cache is self.config.iface_cache
167 debug(_("download_and_import_feed %(url)s (force = %(force)d)"), {'url': feed_url, 'force': force})
168 assert not os.path.isabs(feed_url)
170 if feed_url.startswith('distribution:'):
171 return self.get_packagekit_feed(feed_url)
173 primary = self._download_and_import_feed(feed_url, force, use_mirror = False)
175 @tasks.named_async("monitor feed downloads for " + feed_url)
176 def wait_for_downloads(primary):
177 # Download just the upstream feed, unless it takes too long...
178 timeout = tasks.TimeoutBlocker(5, 'Mirror timeout') # 5 seconds
180 yield primary, timeout
181 tasks.check(timeout)
183 try:
184 tasks.check(primary)
185 if primary.happened:
186 return # OK, primary succeeded!
187 # OK, maybe it's just being slow...
188 info("Feed download from %s is taking a long time.", feed_url)
189 primary_ex = None
190 except NoTrustedKeys, ex:
191 raise # Don't bother trying the mirror if we have a trust problem
192 except ReplayAttack, ex:
193 raise # Don't bother trying the mirror if we have a replay attack
194 except DownloadAborted, ex:
195 raise # Don't bother trying the mirror if the user cancelled
196 except SafeException, ex:
197 # Primary failed
198 primary = None
199 primary_ex = ex
200 warn(_("Feed download from %(url)s failed: %(exception)s"), {'url': feed_url, 'exception': ex})
202 # Start downloading from mirror...
203 mirror = self._download_and_import_feed(feed_url, force, use_mirror = True)
205 # Wait until both mirror and primary tasks are complete...
206 while True:
207 blockers = filter(None, [primary, mirror])
208 if not blockers:
209 break
210 yield blockers
212 if primary:
213 try:
214 tasks.check(primary)
215 if primary.happened:
216 primary = None
217 # No point carrying on with the mirror once the primary has succeeded
218 if mirror:
219 info(_("Primary feed download succeeded; aborting mirror download for %s") % feed_url)
220 mirror.dl.abort()
221 except SafeException, ex:
222 primary = None
223 primary_ex = ex
224 info(_("Feed download from %(url)s failed; still trying mirror: %(exception)s"), {'url': feed_url, 'exception': ex})
226 if mirror:
227 try:
228 tasks.check(mirror)
229 if mirror.happened:
230 mirror = None
231 if primary_ex:
232 # We already warned; no need to raise an exception too,
233 # as the mirror download succeeded.
234 primary_ex = None
235 except ReplayAttack, ex:
236 info(_("Version from mirror is older than cached version; ignoring it: %s"), ex)
237 mirror = None
238 primary_ex = None
239 except SafeException, ex:
240 info(_("Mirror download failed: %s"), ex)
241 mirror = None
243 if primary_ex:
244 raise primary_ex
246 return wait_for_downloads(primary)
248 def _download_and_import_feed(self, feed_url, force, use_mirror):
249 """Download and import a feed.
250 @param use_mirror: False to use primary location; True to use mirror."""
251 if use_mirror:
252 url = self.get_feed_mirror(feed_url)
253 if url is None: return None
254 info(_("Trying mirror server for feed %s") % feed_url)
255 else:
256 url = feed_url
258 dl = self.handler.get_download(url, force = force, hint = feed_url)
259 stream = dl.tempfile
261 @tasks.named_async("fetch_feed " + url)
262 def fetch_feed():
263 yield dl.downloaded
264 tasks.check(dl.downloaded)
266 pending = PendingFeed(feed_url, stream)
268 if use_mirror:
269 # If we got the feed from a mirror, get the key from there too
270 key_mirror = self.config.feed_mirror + '/keys/'
271 else:
272 key_mirror = None
274 keys_downloaded = tasks.Task(pending.download_keys(self.handler, feed_hint = feed_url, key_mirror = key_mirror), _("download keys for %s") % feed_url)
275 yield keys_downloaded.finished
276 tasks.check(keys_downloaded.finished)
278 if not self.config.iface_cache.update_feed_if_trusted(pending.url, pending.sigs, pending.new_xml):
279 blocker = self.config.trust_mgr.confirm_keys(pending)
280 if blocker:
281 yield blocker
282 tasks.check(blocker)
283 if not self.config.iface_cache.update_feed_if_trusted(pending.url, pending.sigs, pending.new_xml):
284 raise NoTrustedKeys(_("No signing keys trusted; not importing"))
286 task = fetch_feed()
287 task.dl = dl
288 return task
290 def fetch_key_info(self, fingerprint):
291 try:
292 return self.key_info[fingerprint]
293 except KeyError:
294 self.key_info[fingerprint] = key_info = KeyInfoFetcher(self.handler,
295 self.config.key_info_server, fingerprint)
296 return key_info
298 def download_impl(self, impl, retrieval_method, stores, force = False):
299 """Download an implementation.
300 @param impl: the selected implementation
301 @type impl: L{model.ZeroInstallImplementation}
302 @param retrieval_method: a way of getting the implementation (e.g. an Archive or a Recipe)
303 @type retrieval_method: L{model.RetrievalMethod}
304 @param stores: where to store the downloaded implementation
305 @type stores: L{zerostore.Stores}
306 @param force: whether to abort and restart an existing download
307 @rtype: L{tasks.Blocker}"""
308 assert impl
309 assert retrieval_method
311 if isinstance(retrieval_method, DistributionSource):
312 return retrieval_method.install(self.handler)
314 from zeroinstall.zerostore import manifest
315 best = None
316 for digest in impl.digests:
317 alg_name = digest.split('=', 1)[0]
318 alg = manifest.algorithms.get(alg_name, None)
319 if alg and (best is None or best.rating < alg.rating):
320 best = alg
321 required_digest = digest
323 if best is None:
324 if not impl.digests:
325 raise SafeException(_("No <manifest-digest> given for '%(implementation)s' version %(version)s") %
326 {'implementation': impl.feed.get_name(), 'version': impl.get_version()})
327 raise SafeException(_("Unknown digest algorithms '%(algorithms)s' for '%(implementation)s' version %(version)s") %
328 {'algorithms': impl.digests, 'implementation': impl.feed.get_name(), 'version': impl.get_version()})
330 @tasks.async
331 def download_impl():
332 if isinstance(retrieval_method, DownloadSource):
333 blocker, stream = self.download_archive(retrieval_method, force = force, impl_hint = impl)
334 yield blocker
335 tasks.check(blocker)
337 stream.seek(0)
338 self._add_to_cache(required_digest, stores, retrieval_method, stream)
339 elif isinstance(retrieval_method, Recipe):
340 blocker = self.cook(required_digest, retrieval_method, stores, force, impl_hint = impl)
341 yield blocker
342 tasks.check(blocker)
343 else:
344 raise Exception(_("Unknown download type for '%s'") % retrieval_method)
346 self.handler.impl_added_to_store(impl)
347 return download_impl()
349 def _add_to_cache(self, required_digest, stores, retrieval_method, stream):
350 assert isinstance(retrieval_method, DownloadSource)
351 stores.add_archive_to_cache(required_digest, stream, retrieval_method.url, retrieval_method.extract,
352 type = retrieval_method.type, start_offset = retrieval_method.start_offset or 0)
354 def download_archive(self, download_source, force = False, impl_hint = None):
355 """Fetch an archive. You should normally call L{download_impl}
356 instead, since it handles other kinds of retrieval method too."""
357 from zeroinstall.zerostore import unpack
359 url = download_source.url
360 if not (url.startswith('http:') or url.startswith('https:') or url.startswith('ftp:')):
361 raise SafeException(_("Unknown scheme in download URL '%s'") % url)
363 mime_type = download_source.type
364 if not mime_type:
365 mime_type = unpack.type_from_url(download_source.url)
366 if not mime_type:
367 raise SafeException(_("No 'type' attribute on archive, and I can't guess from the name (%s)") % download_source.url)
368 unpack.check_type_ok(mime_type)
369 dl = self.handler.get_download(download_source.url, force = force, hint = impl_hint)
370 dl.expected_size = download_source.size + (download_source.start_offset or 0)
371 return (dl.downloaded, dl.tempfile)
373 def download_icon(self, interface, force = False):
374 """Download an icon for this interface and add it to the
375 icon cache. If the interface has no icon do nothing.
376 @return: the task doing the import, or None
377 @rtype: L{tasks.Task}"""
378 debug("download_icon %(interface)s (force = %(force)d)", {'interface': interface, 'force': force})
380 modification_time = None
381 existing_icon = self.config.iface_cache.get_icon_path(interface)
382 if existing_icon:
383 file_mtime = os.stat(existing_icon).st_mtime
384 from email.utils import formatdate
385 modification_time = formatdate(timeval = file_mtime, localtime = False, usegmt = True)
387 # Find a suitable icon to download
388 for icon in interface.get_metadata(XMLNS_IFACE, 'icon'):
389 type = icon.getAttribute('type')
390 if type != 'image/png':
391 debug(_('Skipping non-PNG icon'))
392 continue
393 source = icon.getAttribute('href')
394 if source:
395 break
396 warn(_('Missing "href" attribute on <icon> in %s'), interface)
397 else:
398 info(_('No PNG icons found in %s'), interface)
399 return
401 try:
402 dl = self.handler.monitored_downloads[source]
403 if dl and force:
404 dl.abort()
405 raise KeyError
406 except KeyError:
407 dl = download.Download(source, hint = interface, modification_time = modification_time)
408 self.handler.monitor_download(dl)
410 @tasks.async
411 def download_and_add_icon():
412 stream = dl.tempfile
413 yield dl.downloaded
414 try:
415 tasks.check(dl.downloaded)
416 if dl.unmodified: return
417 stream.seek(0)
419 import shutil
420 icons_cache = basedir.save_cache_path(config_site, 'interface_icons')
421 icon_file = file(os.path.join(icons_cache, escape(interface.uri)), 'w')
422 shutil.copyfileobj(stream, icon_file)
423 except Exception, ex:
424 self.handler.report_error(ex)
426 return download_and_add_icon()
428 def download_impls(self, implementations, stores):
429 """Download the given implementations, choosing a suitable retrieval method for each.
430 If any of the retrieval methods are DistributionSources and
431 need confirmation, handler.confirm is called to check that the
432 installation should proceed.
434 unsafe_impls = []
436 to_download = []
437 for impl in implementations:
438 debug(_("start_downloading_impls: for %(feed)s get %(implementation)s"), {'feed': impl.feed, 'implementation': impl})
439 source = self.get_best_source(impl)
440 if not source:
441 raise SafeException(_("Implementation %(implementation_id)s of interface %(interface)s"
442 " cannot be downloaded (no download locations given in "
443 "interface!)") % {'implementation_id': impl.id, 'interface': impl.feed.get_name()})
444 to_download.append((impl, source))
446 if isinstance(source, DistributionSource) and source.needs_confirmation:
447 unsafe_impls.append(source.package_id)
449 @tasks.async
450 def download_impls():
451 if unsafe_impls:
452 confirm = self.handler.confirm_install(_('The following components need to be installed using native packages. '
453 'These come from your distribution, and should therefore be trustworthy, but they also '
454 'run with extra privileges. In particular, installing them may run extra services on your '
455 'computer or affect other users. You may be asked to enter a password to confirm. The '
456 'packages are:\n\n') + ('\n'.join('- ' + x for x in unsafe_impls)))
457 yield confirm
458 tasks.check(confirm)
460 blockers = []
462 for impl, source in to_download:
463 blockers.append(self.download_impl(impl, source, stores))
465 # Record the first error log the rest
466 error = []
467 def dl_error(ex, tb = None):
468 if error:
469 self.handler.report_error(ex)
470 else:
471 error.append(ex)
472 while blockers:
473 yield blockers
474 tasks.check(blockers, dl_error)
476 blockers = [b for b in blockers if not b.happened]
477 if error:
478 raise error[0]
480 if not to_download:
481 return None
483 return download_impls()
485 def get_best_source(self, impl):
486 """Return the best download source for this implementation.
487 @rtype: L{model.RetrievalMethod}"""
488 if impl.download_sources:
489 return impl.download_sources[0]
490 return None