Merge branch 'stable' into devel
[tails.git] / bin / needed-package-updates
blob84be3543b3782cadaa667313f900d7fe9c3aea47
1 #!/usr/bin/env python3
2 """
3 Generate a list of packages that should be updated from Debian.
4 It uses the the Ultimate Debian Database (UDD) to get a
5 list of version in the Debian repository.
7 By default it will get the last successful build manifest file from testing or stable from here:
8 https://nightly.tails.boum.org/build_Tails_ISO_testing/lastSuccessful/archive/build-artifacts/
9 https://nightly.tails.boum.org/build_Tails_ISO_stable/lastSuccessful/archive/build-artifacts/
10 """
12 import argparse
13 import collections
14 from dataclasses import dataclass
15 import itertools
16 import logging
17 import operator
18 import re
19 import sys
20 from typing import Iterator, Sequence
21 import urllib
23 import psycopg2
24 import requests
25 import yaml
27 import distro_info  # type: ignore[import]
28 from debian.debian_support import Version
30 logger = logging.getLogger()
33 def partition(pred, iterable):
34     "Use a predicate to partition entries into false entries and true entries"
35     # partition(is_odd, range(10)) --> 0 2 4 6 8   and  1 3 5 7 9
37     match = []
38     non_match = []
40     for i in iterable:
41         if pred(i)():
42             match.append(i)
43         else:
44             non_match.append(i)
46     return non_match, match
49 class NoBuildManifest(Exception):
50     pass
53 def add_metadata(yml, name):
54     yml["file_name"] = name
55     match = re.match(r".*@([0-9a-f]+)-([0-9T]+Z).build-manifest", name)
56     if match:
57         yml["git_hash"] = match.group(1)
58         yml["timestamp"] = match.group(2)
61 def get_build_manifest(suite: str) -> dict:
62     base_url = f"https://nightly.tails.boum.org/build_Tails_ISO_{suite}/lastSuccessful/archive/build-artifacts/"
64     shasum_response = requests.get(
65         urllib.parse.urljoin(base_url, "tails-build-artifacts.shasum")
66     )
68     try:
69         shasum_response.raise_for_status()
70     except requests.HTTPError as e:
71         raise NoBuildManifest(f"build-manifest file for {suite} not found!") from e
73     for i in shasum_response.text.splitlines():
74         _, name = i.split()
76         if name.endswith(".build-manifest"):
77             url = urllib.parse.urljoin(base_url, name)
78             bm_response = requests.get(url, timeout=60)
79             ret = yaml.safe_load(bm_response.text)  # type: dict
80             ret["url"] = url
81             add_metadata(ret, name)
82             return ret
84     raise NoBuildManifest(f"build-manifest file for {suite} not found!")
87 class NotFoundError(Exception):
88     pass
91 @dataclass
92 class UDDPackageRow:
93     package: str
94     version: Version
95     release: str
96     component: str
97     distribution: str
98     source: str
99     source_version: Version
102 class UDD:
103     """to manage output of the Ultimate Debian Database (UDD).
104     Normally you give a list of packages you want to check and get the versions on different suites.
105     """
107     def __init__(self, packages: Sequence[str], suites: Sequence[str]):
108         self.suites = suites
109         self.packages = self._request(packages)
111     def _request(self, packages: Sequence[str]) -> dict[str, dict[str, UDDPackageRow]]:
112         ret: dict[str, dict[str, UDDPackageRow]] = collections.defaultdict(dict)
113         with psycopg2.connect(
114             "postgresql://udd-mirror:udd-mirror@udd-mirror.debian.net/udd"
115         ) as conn:
116             with conn.cursor() as curs:
117                 archs = ("all", "amd64")
118                 components = ("main", "contrib", "non-free", "non-free-firmware")
119                 pkgs = tuple(packages)
120                 fields = ", ".join(UDDPackageRow.__dataclass_fields__.keys())
122                 curs.execute(
123                     f"SELECT {fields}"
124                     " FROM packages"
125                     " WHERE distribution = 'debian'"
126                     " and release in %s"
127                     " and architecture in %s"
128                     " and component in %s"
129                     " and package in %s;",
130                     (self.suites, archs, components, pkgs),
131                 )
132                 for r in curs:
133                     row = UDDPackageRow(*r)
134                     row.version = Version(row.version)
135                     row.source_version = Version(row.source_version)
136                     if row.release in ret[row.package]:
137                         if ret[row.package][row.release].version > row.version:
138                             continue
139                     ret[row.package][row.release] = row
140         return ret
142     def package(self, name: str) -> dict[str, UDDPackageRow]:
143         return self.packages[name]
145     def source(self, name: str, suite: str) -> str:
146         package = self.package(name)
148         if suite not in package:
149             raise NotFoundError(f"{name} not found in {suite}")
151         return package[suite].source
153     def source_version(self, name: str, suite: str) -> Version:
154         package = self.package(name)
156         if suite not in package:
157             raise NotFoundError(f"{name} not found in {suite}")
159         return package[suite].source_version
161     def version(self, name: str, suite: str) -> Version:
162         package = self.package(name)
164         if suite not in package:
165             raise NotFoundError(f"{name} not found in {suite}")
167         return package[suite].version
169     def get_debian_version(self, name: str, version: Version) -> tuple:
170         for suite in self.suites:
171             try:
172                 suite_version = self.version(name, suite)
173             except NotFoundError:
174                 continue
175             if version <= suite_version:
176                 return (suite, suite_version)
177         else:
178             raise NotFoundError(
179                 f"{name}: the package version({version}) is higher than the version on {suite} ({suite_version})"
180             )
182     def packages_by_source(self, source: str, suite: str) -> set[str]:
183         ret = set()
184         for name, pkg in self.packages.items():
185             if suite not in pkg:
186                 continue
187             p = pkg[suite]
188             if p.source == source:
189                 ret.add(name)
190         return ret
193 def strip_tails_version(version: str) -> Version:
194     """if we have a Tails own fork get the Debian version."""
195     match = re.match(r"^(.*)(\.0tails[0-9]+)$", version)
196     if match:
197         return Version(match[1])
198     else:
199         return Version(version)
202 @dataclass
203 class NewVersionIssue:
204     name: str
205     source: str
206     version: Version
207     suite: str
208     suite_version: Version
209     suite_source_version: Version
211     def __str__(self):
212         binaries = getattr(self, "log_binaries", None)
213         if binaries:
214             binaries = ", ".join(binaries)
215             return f"{self.source}[{binaries}] ({self.version}) to Debian {self.suite} ({self.suite_source_version})"
216         return f"{self.source} ({self.version}) to Debian {self.suite} ({self.suite_source_version})"
218     def tails_fork(self):
219         return re.search(".0tails[0-9]+$", str(self.version)) is not None
222 def get_udd(package_dict: dict[str, Version], suites: tuple[str]) -> UDD:
223     return UDD(package_dict.keys(), suites)  # type: ignore[arg-type]
226 def get_issues(udd: UDD, package_dict: dict[str, Version]) -> Iterator[NewVersionIssue]:
227     """Get a issue list of updateable packages."""
228     for package, version in package_dict.items():
229         striped_version = strip_tails_version(str(version))
230         try:
231             suite, suite_version = udd.get_debian_version(package, striped_version)
232         except NotFoundError as e:
233             logger.error(e)
234             continue
236         if striped_version < suite_version:
237             issue = NewVersionIssue(
238                 package,
239                 udd.source(package, suite),
240                 version,
241                 suite,
242                 suite_version,
243                 udd.source_version(package, suite),
244             )
245             if issue.tails_fork() and striped_version >= issue.suite_source_version:
246                 continue
247             yield issue
250 def check_build_manifest(
251     build_manifest: dict, config: dict, suites: tuple[str], verbose: bool
252 ) -> bool:
253     ignore = config.get("ignore", {})
254     general_ignore = ignore.get("general", [])
255     tmp_ignore = ignore.get("temporary", {})
257     pkg_dict: dict[str, Version] = dict()
258     for pkg in build_manifest["packages"]["binary"]:
259         p = pkg["package"]
260         v = Version(pkg["version"])
261         if pkg_dict.get(p):
262             logger.debug(f"multiple entries for {p}: select max({v}, {pkg_dict[p]}).")
263         if pkg_dict.get(p) > v:
264             continue
265         pkg_dict[p] = v
267     udd = get_udd(pkg_dict, suites)
269     issues = list(get_issues(udd, pkg_dict))
271     def _is_ignored(issue):
272         if issue.source in general_ignore:
273             return True
274         if (
275             str(issue.suite_source_version)
276             == tmp_ignore.get(issue.source, {"version": None})["version"]
277         ):
278             return True
279         return False
281     if not verbose:
282         issues = list(itertools.filterfalse(_is_ignored, issues))
284     non_forked, forked = partition(
285         operator.attrgetter("tails_fork"),
286         issues,
287     )
289     def _log_issue(issue):
290         if _is_ignored(issue):
291             if issue.source in general_ignore:
292                 return f"(always ignored) {issue}"
293             return f"(known) {issue}"
294         else:
295             return str(issue)
297     def log_group(source, issues):
298         issue = issues[0]
299         suite = issue.suite
300         names = set(i.name for i in issues)
301         if names != udd.packages_by_source(source, suite):
302             issue.log_binaries = names
304         return _log_issue(issue)
306     def _log(issues):
307         for source, i in itertools.groupby(
308             issues, key=operator.attrgetter("source")
309         ):
310             yield log_group(source, list(i))
312     if forked:
313         l = "\n  - ".join(sorted(_log(forked)))
314         logger.info(f"Need to upgrade our own forked package:\n  - {l}")
316     if non_forked:
317         l = "\n  - ".join(sorted(_log(non_forked)))
318         logger.info(f"Need to upgrade to a new APT snapshot:\n  - {l}")
320     # Check if we have at least one non ignored issue
321     try:
322         next(itertools.filterfalse(_is_ignored, issues))
323         return True
324     except StopIteration:
325         pass
327     return False
330 def get_suites(min_codename: str) -> list:
331     suites = []
332     ddi = distro_info.DebianDistroInfo()
333     ddi_s = ddi.supported()
334     codename_pos = ddi_s.index(min_codename)
335     testing = ddi.testing()
336     for suite in ddi_s[codename_pos:]:
337         if suite in (testing, "sid", "experimental"):
338             suites.append(suite)
339         else:
340             # We always want to make sure that we have the stable-security
341             # version installed, if available.
342             # The rest of the list follows the Debian package flow.
343             suites.extend(
344                 [
345                     f"{suite}-security",
346                     suite,
347                     f"{suite}-updates",
348                     f"{suite}-proposed-updates",
349                     f"{suite}-backports",
350                 ]
351             )
353     return suites
356 def main():
357     logging.basicConfig(
358         level=logging.DEBUG,
359         format="%(levelname)s: %(message)s",
360     )
361     logging.getLogger("urllib3.connectionpool").setLevel(logging.WARNING)
363     parser = argparse.ArgumentParser(description="list all packages that ")
364     parser.add_argument("-v", "--verbose", action="store_true", help="Give more infos")
365     parser.add_argument("--debug", action="store_true", help="Show all debug messages")
366     parser.add_argument(
367         "-c",
368         "--config",
369         type=argparse.FileType("r"),
370         default="config/ci/needed-package-updates.yml",
371         help="Config file",
372     )
373     group = parser.add_mutually_exclusive_group()
374     group.add_argument("--suite", help="build manifest suite name.")
375     group.add_argument("--file", type=argparse.FileType("r"), help="local file name.")
377     args = parser.parse_args()
379     logger.setLevel(logging.DEBUG if args.debug else logging.INFO)
381     if args.debug:
382         args.verbose = True
384     config = yaml.safe_load(args.config)
386     suites = tuple(get_suites(config.get("distribution")))
388     if args.file:
389         build_manifest = yaml.safe_load(args.file)
390         add_metadata(build_manifest, args.file.name)
391         logger.info("Check local file %s", build_manifest["file_name"])
392     elif args.suite:
393         build_manifest = get_build_manifest(args.suite)
394         logger.info("Check %s", build_manifest["file_name"])
395     else:
396         err = None
397         for suite in ("testing", "stable"):
398             try:
399                 build_manifest = get_build_manifest(suite)
400                 logger.info("Check %s", build_manifest["file_name"])
401                 break
402             except NoBuildManifest as e:
403                 logger.debug("No build manifest found for %s.", suite)
404                 err = e
405         else:
406             raise err
408     propose_update = check_build_manifest(build_manifest, config, suites, args.verbose)
410     if propose_update:
411         sys.exit(1)
412     else:
413         logger.debug("Nothing to do.")
416 if __name__ == "__main__":
417     main()