Fixed batching of PackageKit requests
[zeroinstall/solver.git] / zeroinstall / injector / packagekit.py
blob0e76d1a978f112c970e9c2cc3aadc617c54599e7
1 """
2 PackageKit integration.
3 """
5 # Copyright (C) 2010, Aleksey Lim
6 # See the README file for details, or visit http://0install.net.
8 import os, sys
9 import locale
10 import logging
11 from zeroinstall import _, SafeException
13 from zeroinstall.support import tasks, unicode
14 from zeroinstall.injector import download, model
16 _logger_pk = logging.getLogger('0install.packagekit')
17 #_logger_pk.setLevel(logging.DEBUG)
19 try:
20 import dbus
21 import dbus.mainloop.glib
22 dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
23 except Exception as ex:
24 _logger_pk.info("D-BUS not available: %s", ex)
25 dbus = None
27 MAX_PACKAGE_KIT_TRANSACTION_SIZE = 100
29 class PackageKit(object):
30 def __init__(self):
31 self._pk = False
33 self._candidates = {} # { package_name : [ (version, arch, size) ] | Blocker }
35 # PackageKit is really slow at handling separate queries, so we use this to
36 # batch them up.
37 self._next_batch = set()
39 @property
40 def available(self):
41 return self.pk is not None
43 @property
44 def pk(self):
45 if self._pk is False:
46 if dbus is None:
47 self._pk = None
48 else:
49 try:
50 self._pk = dbus.Interface(dbus.SystemBus().get_object(
51 'org.freedesktop.PackageKit',
52 '/org/freedesktop/PackageKit', False),
53 'org.freedesktop.PackageKit')
54 _logger_pk.info(_('PackageKit dbus service found'))
55 except Exception as ex:
56 _logger_pk.info(_('PackageKit dbus service not found: %s'), ex)
57 self._pk = None
58 return self._pk
60 def get_candidates(self, package_name, factory, prefix):
61 """Add any cached candidates.
62 The candidates are those discovered by a previous call to L{fetch_candidates}.
63 @param package_name: the distribution's name for the package
64 @param factory: a function to add a new implementation to the feed
65 @param prefix: the prefix for the implementation's ID
66 """
67 candidates = self._candidates.get(package_name, None)
68 if candidates is None:
69 return
71 if isinstance(candidates, tasks.Blocker):
72 return # Fetch still in progress
74 for candidate in candidates:
75 impl_name = '%s:%s:%s:%s' % (prefix, package_name, candidate['version'], candidate['arch'])
77 impl = factory(impl_name, only_if_missing = True, installed = candidate['installed'])
78 if impl is None:
79 # (checking this way because the cached candidate['installed'] may be stale)
80 return # Already installed
82 impl.version = model.parse_version(candidate['version'])
83 if candidate['arch'] != '*':
84 impl.machine = candidate['arch']
86 def install(handler, candidate = candidate, impl = impl):
87 packagekit_id = candidate['packagekit_id']
88 dl = PackageKitDownload('packagekit:' + packagekit_id, hint = impl, pk = self.pk, packagekit_id = packagekit_id, expected_size = candidate['size'])
89 handler.monitor_download(dl)
90 return dl.downloaded
91 impl.download_sources.append(model.DistributionSource(package_name, candidate['size'], install))
93 @tasks.async
94 def fetch_candidates(self, package_names):
95 assert self.pk
97 # Batch requests up
98 self._next_batch |= set(package_names)
99 yield
100 batched_package_names = self._next_batch
101 self._next_batch = set()
102 # The first fetch_candidates instance will now have all the packages.
103 # For the others, batched_package_names will now be empty.
104 # Fetch any we're missing.
105 self._fetch_batch(list(batched_package_names))
107 results = [self._candidates[p] for p in package_names]
109 # (use set because a single Blocker may be checking multiple
110 # packages and we need to avoid duplicates).
111 in_progress = list(set([b for b in results if isinstance(b, tasks.Blocker)]))
112 _logger_pk.debug('Currently querying PackageKit for: %s', in_progress)
114 while in_progress:
115 yield in_progress
116 in_progress = [b for b in in_progress if not b.happened]
118 def _fetch_batch(self, package_names):
119 """Ensure that each of these packages is in self._candidates.
120 Start a new fetch if necessary. Ignore packages that are already downloaded or
121 in the process of being downloaded.
123 # (do we need a 'force' argument here?)
125 package_names = [n for n in package_names if n not in self._candidates]
127 def do_batch(package_names):
128 #_logger_pk.info("sending %d packages in batch", len(package_names))
129 versions = {}
131 blocker = None
133 def error_cb(sender):
134 # Note: probably just means the package wasn't found
135 _logger_pk.info(_('Transaction failed: %s(%s)'), sender.error_code, sender.error_details)
136 blocker.trigger()
138 def details_cb(sender):
139 # The key can be a dbus.String sometimes, so convert to a Python
140 # string to be sure we get a match.
141 details = {}
142 for packagekit_id, d in sender.details.items():
143 details[unicode(packagekit_id)] = d
145 for packagekit_id in details:
146 if packagekit_id not in versions:
147 _logger_pk.info("Unexpected package info for '%s'; was expecting one of %r", packagekit_id, list(versions.keys()))
149 for packagekit_id, info in versions.items():
150 if packagekit_id in details:
151 info.update(details[packagekit_id])
152 info['packagekit_id'] = packagekit_id
153 if (info['name'] not in self._candidates or
154 isinstance(self._candidates[info['name']], tasks.Blocker)):
155 self._candidates[info['name']] = [info]
156 else:
157 self._candidates[info['name']].append(info)
158 else:
159 _logger_pk.info(_('Empty details for %s'), packagekit_id)
160 blocker.trigger()
162 def resolve_cb(sender):
163 if sender.package:
164 _logger_pk.debug(_('Resolved %r'), sender.package)
165 for packagekit_id, info in sender.package.items():
166 packagekit_id = unicode(packagekit_id) # Can be a dbus.String sometimes
167 parts = packagekit_id.split(';', 3)
168 if ':' in parts[3]:
169 parts[3] = parts[3].split(':', 1)[0]
170 packagekit_id = ';'.join(parts)
171 versions[packagekit_id] = info
172 tran = _PackageKitTransaction(self.pk, details_cb, error_cb)
173 tran.proxy.GetDetails(list(versions.keys()))
174 else:
175 _logger_pk.info(_('Empty resolve for %s'), package_names)
176 blocker.trigger()
178 # Send queries
179 blocker = tasks.Blocker('PackageKit %s' % package_names)
180 for package in package_names:
181 self._candidates[package] = blocker
183 try:
184 _logger_pk.debug(_('Ask for %s'), package_names)
185 tran = _PackageKitTransaction(self.pk, resolve_cb, error_cb)
186 tran.proxy.Resolve('none', package_names)
187 except:
188 __, ex, tb = sys.exc_info()
189 blocker.trigger((ex, tb))
190 raise
192 # Now we've collected all the requests together, split them up into chunks
193 # that PackageKit can handle ( < 100 per batch )
194 #_logger_pk.info("sending %d packages", len(package_names))
195 while package_names:
196 next_batch = package_names[:MAX_PACKAGE_KIT_TRANSACTION_SIZE]
197 package_names = package_names[MAX_PACKAGE_KIT_TRANSACTION_SIZE:]
198 do_batch(next_batch)
200 class PackageKitDownload:
201 def __init__(self, url, hint, pk, packagekit_id, expected_size):
202 self.url = url
203 self.status = download.download_fetching
204 self.hint = hint
205 self.aborted_by_user = False
207 self.downloaded = None
209 self.expected_size = expected_size
211 self.packagekit_id = packagekit_id
212 self._impl = hint
213 self._transaction = None
214 self.pk = pk
216 def error_cb(sender):
217 self.status = download.download_failed
218 ex = SafeException('PackageKit install failed: %s' % (sender.error_details or sender.error_code))
219 self.downloaded.trigger(exception = (ex, None))
221 def installed_cb(sender):
222 assert not self._impl.installed, impl
223 self._impl.installed = True
224 self._impl.distro.installed_fixup(self._impl)
226 self.status = download.download_complete
227 self.downloaded.trigger()
229 def install_packages():
230 package_name = self.packagekit_id
231 self._transaction = _PackageKitTransaction(self.pk, installed_cb, error_cb)
232 self._transaction.compat_call([
233 ('InstallPackages', False, [package_name]),
234 ('InstallPackages', [package_name]),
237 _auth_wrapper(install_packages)
239 self.downloaded = tasks.Blocker('PackageKit install %s' % self.packagekit_id)
241 def abort(self):
242 _logger_pk.debug(_('Cancel transaction'))
243 self.aborted_by_user = True
244 self._transaction.proxy.Cancel()
245 self.status = download.download_failed
246 self.downloaded.trigger()
248 def get_current_fraction(self):
249 if self._transaction is None:
250 return None
251 percentage = self._transaction.getPercentage()
252 if percentage > 100:
253 return None
254 else:
255 return float(percentage) / 100.
257 def get_bytes_downloaded_so_far(self):
258 fraction = self.get_current_fraction()
259 if fraction is None:
260 return 0
261 else:
262 if self.expected_size is None:
263 return 0
264 return int(self.expected_size * fraction)
266 def _auth_wrapper(method, *args):
267 try:
268 return method(*args)
269 except dbus.exceptions.DBusException as e:
270 if e.get_dbus_name() != \
271 'org.freedesktop.PackageKit.Transaction.RefusedByPolicy':
272 raise
274 iface, auth = e.get_dbus_message().split()
275 if not auth.startswith('auth_'):
276 raise
278 _logger_pk.debug(_('Authentication required for %s'), auth)
280 pk_auth = dbus.SessionBus().get_object(
281 'org.freedesktop.PolicyKit.AuthenticationAgent', '/',
282 'org.gnome.PolicyKit.AuthorizationManager.SingleInstance')
284 if not pk_auth.ObtainAuthorization(iface, dbus.UInt32(0),
285 dbus.UInt32(os.getpid()), timeout=300):
286 raise
288 return method(*args)
290 class _PackageKitTransaction(object):
291 def __init__(self, pk, finished_cb=None, error_cb=None):
292 self._finished_cb = finished_cb
293 self._error_cb = error_cb
294 self.error_code = None
295 self.error_details = None
296 self.package = {}
297 self.details = {}
298 self.files = {}
300 self.object = dbus.SystemBus().get_object(
301 'org.freedesktop.PackageKit', pk.GetTid(), False)
302 self.proxy = dbus.Interface(self.object,
303 'org.freedesktop.PackageKit.Transaction')
304 self._props = dbus.Interface(self.object, dbus.PROPERTIES_IFACE)
306 self._signals = []
307 for signal, cb in [('Finished', self.__finished_cb),
308 ('ErrorCode', self.__error_code_cb),
309 ('StatusChanged', self.__status_changed_cb),
310 ('Package', self.__package_cb),
311 ('Details', self.__details_cb),
312 ('Files', self.__files_cb)]:
313 self._signals.append(self.proxy.connect_to_signal(signal, cb))
315 defaultlocale = locale.getdefaultlocale()[0]
316 if defaultlocale is not None:
317 self.compat_call([
318 ('SetHints', ['locale=%s' % defaultlocale]),
319 ('SetLocale', defaultlocale),
322 def getPercentage(self):
323 result = self.get_prop('Percentage')
324 if result is None:
325 result, __, __, __ = self.proxy.GetProgress()
326 return result
328 def get_prop(self, prop, default = None):
329 try:
330 return self._props.Get('org.freedesktop.PackageKit.Transaction', prop)
331 except:
332 return default
334 # note: Ubuntu's aptdaemon implementation of PackageKit crashes if passed the wrong
335 # arguments (rather than returning InvalidArgs), so always try its API first.
336 def compat_call(self, calls):
337 for call in calls:
338 method = call[0]
339 args = call[1:]
340 try:
341 dbus_method = self.proxy.get_dbus_method(method)
342 return dbus_method(*args)
343 except dbus.exceptions.DBusException as e:
344 if e.get_dbus_name() not in (
345 'org.freedesktop.DBus.Error.UnknownMethod',
346 'org.freedesktop.DBus.Error.InvalidArgs'):
347 raise
348 raise Exception('Cannot call %r DBus method' % calls)
350 def __finished_cb(self, exit, runtime):
351 _logger_pk.debug(_('Transaction finished: %s'), exit)
353 for i in self._signals:
354 i.remove()
356 if self.error_code is not None:
357 self._error_cb(self)
358 else:
359 self._finished_cb(self)
361 def __error_code_cb(self, code, details):
362 _logger_pk.info(_('Transaction failed: %s(%s)'), details, code)
363 self.error_code = code
364 self.error_details = details
366 def __package_cb(self, status, id, summary):
367 try:
368 from zeroinstall.injector import distro
370 package_name, version, arch, repo_ = id.split(';')
371 clean_version = distro.try_cleanup_distro_version(version)
372 if not clean_version:
373 _logger_pk.info(_("Can't parse distribution version '%(version)s' for package '%(package)s'"), {'version': version, 'package': package_name})
374 return
375 clean_arch = distro.canonical_machine(arch)
376 package = {'version': clean_version,
377 'name': package_name,
378 'arch': clean_arch,
379 'installed': (status == 'installed')}
380 _logger_pk.debug(_('Package: %s %r'), id, package)
381 self.package[str(id)] = package
382 except Exception as ex:
383 _logger_pk.warn("__package_cb(%s, %s, %s): %s", status, id, summary, ex)
385 def __details_cb(self, id, licence, group, detail, url, size):
386 details = {'licence': str(licence),
387 'group': str(group),
388 'detail': str(detail),
389 'url': str(url),
390 'size': int(size)}
391 _logger_pk.debug(_('Details: %s %r'), id, details)
392 self.details[id] = details
394 def __files_cb(self, id, files):
395 self.files[id] = files.split(';')
397 def __status_changed_cb(self, status):
398 pass