3 Generate a list of packages that should be updated from Debian.
4 It uses the the Ultimate Debian Database (UDD) to get a
5 list of version in the Debian repository.
7 By default it will get the last successful build manifest file from testing or stable from here:
8 https://nightly.tails.boum.org/build_Tails_ISO_testing/lastSuccessful/archive/build-artifacts/
9 https://nightly.tails.boum.org/build_Tails_ISO_stable/lastSuccessful/archive/build-artifacts/
14 from dataclasses import dataclass
20 from typing import Iterator, Sequence
27 import distro_info # type: ignore[import]
28 from debian.debian_support import Version
30 logger = logging.getLogger()
33 def partition(pred, iterable):
34 "Use a predicate to partition entries into false entries and true entries"
35 # partition(is_odd, range(10)) --> 0 2 4 6 8 and 1 3 5 7 9
46 return non_match, match
49 class NoBuildManifest(Exception):
53 def add_metadata(yml, name):
54 yml["file_name"] = name
55 match = re.match(r".*@([0-9a-f]+)-([0-9T]+Z).build-manifest", name)
57 yml["git_hash"] = match.group(1)
58 yml["timestamp"] = match.group(2)
61 def get_build_manifest(suite: str) -> dict:
62 base_url = f"https://nightly.tails.boum.org/build_Tails_ISO_{suite}/lastSuccessful/archive/build-artifacts/"
64 shasum_response = requests.get(
65 urllib.parse.urljoin(base_url, "tails-build-artifacts.shasum")
69 shasum_response.raise_for_status()
70 except requests.HTTPError as e:
71 raise NoBuildManifest(f"build-manifest file for {suite} not found!") from e
73 for i in shasum_response.text.splitlines():
76 if name.endswith(".build-manifest"):
77 url = urllib.parse.urljoin(base_url, name)
78 bm_response = requests.get(url, timeout=60)
79 ret = yaml.safe_load(bm_response.text) # type: dict
81 add_metadata(ret, name)
84 raise NoBuildManifest(f"build-manifest file for {suite} not found!")
87 class NotFoundError(Exception):
99 source_version: Version
103 """to manage output of the Ultimate Debian Database (UDD).
104 Normally you give a list of packages you want to check and get the versions on different suites.
107 def __init__(self, packages: Sequence[str], suites: Sequence[str]):
109 self.packages = self._request(packages)
111 def _request(self, packages: Sequence[str]) -> dict[str, dict[str, UDDPackageRow]]:
112 ret: dict[str, dict[str, UDDPackageRow]] = collections.defaultdict(dict)
113 with psycopg2.connect(
114 "postgresql://udd-mirror:udd-mirror@udd-mirror.debian.net/udd"
116 with conn.cursor() as curs:
117 archs = ("all", "amd64")
118 components = ("main", "contrib", "non-free", "non-free-firmware")
119 pkgs = tuple(packages)
120 fields = ", ".join(UDDPackageRow.__dataclass_fields__.keys())
125 " WHERE distribution = 'debian'"
127 " and architecture in %s"
128 " and component in %s"
129 " and package in %s;",
130 (self.suites, archs, components, pkgs),
133 row = UDDPackageRow(*r)
134 row.version = Version(row.version)
135 row.source_version = Version(row.source_version)
136 if row.release in ret[row.package]:
137 if ret[row.package][row.release].version > row.version:
139 ret[row.package][row.release] = row
142 def package(self, name: str) -> dict[str, UDDPackageRow]:
143 return self.packages[name]
145 def source(self, name: str, suite: str) -> str:
146 package = self.package(name)
148 if suite not in package:
149 raise NotFoundError(f"{name} not found in {suite}")
151 return package[suite].source
153 def source_version(self, name: str, suite: str) -> Version:
154 package = self.package(name)
156 if suite not in package:
157 raise NotFoundError(f"{name} not found in {suite}")
159 return package[suite].source_version
161 def version(self, name: str, suite: str) -> Version:
162 package = self.package(name)
164 if suite not in package:
165 raise NotFoundError(f"{name} not found in {suite}")
167 return package[suite].version
169 def get_debian_version(self, name: str, version: Version) -> tuple:
170 for suite in self.suites:
172 suite_version = self.version(name, suite)
173 except NotFoundError:
175 if version <= suite_version:
176 return (suite, suite_version)
179 f"{name}: the package version({version}) is higher than the version on {suite} ({suite_version})"
182 def packages_by_source(self, source: str, suite: str) -> set[str]:
184 for name, pkg in self.packages.items():
188 if p.source == source:
193 def strip_tails_version(version: str) -> Version:
194 """if we have a Tails own fork get the Debian version."""
195 match = re.match(r"^(.*)(\.0tails[0-9]+)$", version)
197 return Version(match[1])
199 return Version(version)
203 class NewVersionIssue:
208 suite_version: Version
209 suite_source_version: Version
212 binaries = getattr(self, "log_binaries", None)
214 binaries = ", ".join(binaries)
215 return f"{self.source}[{binaries}] ({self.version}) to Debian {self.suite} ({self.suite_source_version})"
216 return f"{self.source} ({self.version}) to Debian {self.suite} ({self.suite_source_version})"
218 def tails_fork(self):
219 return re.search(".0tails[0-9]+$", str(self.version)) is not None
222 def get_udd(package_dict: dict[str, Version], suites: tuple[str]) -> UDD:
223 return UDD(package_dict.keys(), suites) # type: ignore[arg-type]
226 def get_issues(udd: UDD, package_dict: dict[str, Version]) -> Iterator[NewVersionIssue]:
227 """Get a issue list of updateable packages."""
228 for package, version in package_dict.items():
229 striped_version = strip_tails_version(str(version))
231 suite, suite_version = udd.get_debian_version(package, striped_version)
232 except NotFoundError as e:
236 if striped_version < suite_version:
237 issue = NewVersionIssue(
239 udd.source(package, suite),
243 udd.source_version(package, suite),
245 if issue.tails_fork() and striped_version >= issue.suite_source_version:
250 def check_build_manifest(
251 build_manifest: dict, config: dict, suites: tuple[str], verbose: bool
253 ignore = config.get("ignore", {})
254 general_ignore = ignore.get("general", [])
255 tmp_ignore = ignore.get("temporary", {})
257 pkg_dict: dict[str, Version] = dict()
258 for pkg in build_manifest["packages"]["binary"]:
260 v = Version(pkg["version"])
262 logger.debug(f"multiple entries for {p}: select max({v}, {pkg_dict[p]}).")
263 if pkg_dict.get(p) > v:
267 udd = get_udd(pkg_dict, suites)
269 issues = list(get_issues(udd, pkg_dict))
271 def _is_ignored(issue):
272 if issue.source in general_ignore:
275 str(issue.suite_source_version)
276 == tmp_ignore.get(issue.source, {"version": None})["version"]
282 issues = list(itertools.filterfalse(_is_ignored, issues))
284 non_forked, forked = partition(
285 operator.attrgetter("tails_fork"),
289 def _log_issue(issue):
290 if _is_ignored(issue):
291 if issue.source in general_ignore:
292 return f"(always ignored) {issue}"
293 return f"(known) {issue}"
297 def log_group(source, issues):
300 names = set(i.name for i in issues)
301 if names != udd.packages_by_source(source, suite):
302 issue.log_binaries = names
304 return _log_issue(issue)
307 for source, i in itertools.groupby(
308 issues, key=operator.attrgetter("source")
310 yield log_group(source, list(i))
313 l = "\n - ".join(sorted(_log(forked)))
314 logger.info(f"Need to upgrade our own forked package:\n - {l}")
317 l = "\n - ".join(sorted(_log(non_forked)))
318 logger.info(f"Need to upgrade to a new APT snapshot:\n - {l}")
320 # Check if we have at least one non ignored issue
322 next(itertools.filterfalse(_is_ignored, issues))
324 except StopIteration:
330 def get_suites(min_codename: str) -> list:
332 ddi = distro_info.DebianDistroInfo()
333 ddi_s = ddi.supported()
334 codename_pos = ddi_s.index(min_codename)
335 testing = ddi.testing()
336 for suite in ddi_s[codename_pos:]:
337 if suite in (testing, "sid", "experimental"):
340 # We always want to make sure that we have the stable-security
341 # version installed, if available.
342 # The rest of the list follows the Debian package flow.
348 f"{suite}-proposed-updates",
349 f"{suite}-backports",
359 format="%(levelname)s: %(message)s",
361 logging.getLogger("urllib3.connectionpool").setLevel(logging.WARNING)
363 parser = argparse.ArgumentParser(description="list all packages that ")
364 parser.add_argument("-v", "--verbose", action="store_true", help="Give more infos")
365 parser.add_argument("--debug", action="store_true", help="Show all debug messages")
369 type=argparse.FileType("r"),
370 default="config/ci/needed-package-updates.yml",
373 group = parser.add_mutually_exclusive_group()
374 group.add_argument("--suite", help="build manifest suite name.")
375 group.add_argument("--file", type=argparse.FileType("r"), help="local file name.")
377 args = parser.parse_args()
379 logger.setLevel(logging.DEBUG if args.debug else logging.INFO)
384 config = yaml.safe_load(args.config)
386 suites = tuple(get_suites(config.get("distribution")))
389 build_manifest = yaml.safe_load(args.file)
390 add_metadata(build_manifest, args.file.name)
391 logger.info("Check local file %s", build_manifest["file_name"])
393 build_manifest = get_build_manifest(args.suite)
394 logger.info("Check %s", build_manifest["file_name"])
397 for suite in ("testing", "stable"):
399 build_manifest = get_build_manifest(suite)
400 logger.info("Check %s", build_manifest["file_name"])
402 except NoBuildManifest as e:
403 logger.debug("No build manifest found for %s.", suite)
408 propose_update = check_build_manifest(build_manifest, config, suites, args.verbose)
413 logger.debug("Nothing to do.")
416 if __name__ == "__main__":