Backed out 2 changesets (bug 1900622) for causing Bug 1908553 and ktlint failure...
[gecko.git] / tools / tryselect / lando.py
blob74dc4a80cccf94d5fc29d647d7b320e8d124edd8
1 # This Source Code Form is subject to the terms of the Mozilla Public
2 # License, v. 2.0. If a copy of the MPL was not distributed with this
3 # file, You can obtain one at http://mozilla.org/MPL/2.0/.
5 """Implements Auth0 Device Code flow and Lando try submission.
7 See https://auth0.com/blog/securing-a-python-cli-application-with-auth0/ for more.
8 """
10 from __future__ import annotations
12 import base64
13 import configparser
14 import json
15 import os
16 import time
17 import webbrowser
18 from dataclasses import (
19 dataclass,
20 field,
22 from pathlib import Path
23 from typing import (
24 List,
25 Optional,
26 Tuple,
27 Union,
30 import requests
31 from mach.util import get_state_dir
32 from mozbuild.base import MozbuildObject
33 from mozversioncontrol import (
34 GitRepository,
35 HgRepository,
38 TOKEN_FILE = (
39 Path(get_state_dir(specific_to_topsrcdir=False)) / "lando_auth0_user_token.json"
42 # The supported variants of `Repository` for this workflow.
43 SupportedVcsRepository = Union[GitRepository, HgRepository]
45 here = os.path.abspath(os.path.dirname(__file__))
46 build = MozbuildObject.from_environment(cwd=here)
49 def convert_bytes_patch_to_base64(patch_bytes: bytes) -> str:
50 """Return a base64 encoded `str` representing the passed `bytes` patch."""
51 return base64.b64encode(patch_bytes).decode("ascii")
54 def load_token_from_disk() -> Optional[dict]:
55 """Load and validate an existing Auth0 token from disk.
57 Return the token as a `dict` if it can be validated, or return `None`
58 if any error was encountered.
59 """
60 if not TOKEN_FILE.exists():
61 print("No existing Auth0 token found.")
62 return None
64 try:
65 user_token = json.loads(TOKEN_FILE.read_bytes())
66 except json.JSONDecodeError:
67 print("Existing Auth0 token could not be decoded as JSON.")
68 return None
70 return user_token
73 def get_stack_info(
74 vcs: SupportedVcsRepository, head: Optional[str]
75 ) -> Tuple[str, List[str]]:
76 """Retrieve information about the current stack for submission via Lando.
78 Returns a tuple of the current public base commit as a Mercurial SHA,
79 and a list of ordered base64 encoded patches.
80 """
81 base_commit = vcs.base_ref_as_hg()
82 if not base_commit:
83 raise ValueError(
84 "Could not determine base Mercurial commit hash for submission."
86 print("Using", base_commit, "as the hg base commit.")
88 # Reuse the base revision when on Mercurial to avoid multiple calls to `hg log`.
89 branch_nodes_kwargs = {}
90 if isinstance(vcs, HgRepository):
91 branch_nodes_kwargs["base_ref"] = base_commit
93 nodes = vcs.get_branch_nodes(head, **branch_nodes_kwargs)
94 if not nodes:
95 raise ValueError("Could not find any commit hashes for submission.")
96 elif len(nodes) == 1:
97 print("Submitting a single try config commit.")
98 elif len(nodes) == 2:
99 print("Submitting 1 node and the try commit.")
100 else:
101 print("Submitting stack of", len(nodes) - 1, "nodes and the try commit.")
103 patches = vcs.get_commit_patches(nodes)
104 base64_patches = [
105 convert_bytes_patch_to_base64(patch_bytes) for patch_bytes in patches
107 print("Patches gathered for submission.")
109 return base_commit, base64_patches
112 @dataclass
113 class Auth0Config:
114 """Helper class to interact with Auth0."""
116 domain: str
117 client_id: str
118 audience: str
119 scope: str
120 algorithms: list[str] = field(default_factory=lambda: ["RS256"])
122 @property
123 def base_url(self) -> str:
124 """Auth0 base URL."""
125 return f"https://{self.domain}"
127 @property
128 def device_code_url(self) -> str:
129 """URL of the Device Code API endpoint."""
130 return f"{self.base_url}/oauth/device/code"
132 @property
133 def issuer(self) -> str:
134 """Token issuer URL."""
135 return f"{self.base_url}/"
137 @property
138 def jwks_url(self) -> str:
139 """URL of the JWKS file."""
140 return f"{self.base_url}/.well-known/jwks.json"
142 @property
143 def oauth_token_url(self) -> str:
144 """URL of the OAuth Token endpoint."""
145 return f"{self.base_url}/oauth/token"
147 def request_device_code(self) -> dict:
148 """Request authorization from Auth0 using the Device Code Flow.
150 See https://auth0.com/docs/api/authentication#get-device-code for more.
152 response = requests.post(
153 self.device_code_url,
154 headers={"Content-Type": "application/x-www-form-urlencoded"},
155 data={
156 "audience": self.audience,
157 "client_id": self.client_id,
158 "scope": self.scope,
162 response.raise_for_status()
164 return response.json()
166 def validate_token(self, user_token: dict) -> Optional[dict]:
167 """Verify the given user token is valid.
169 Validate the ID token, and validate the access token's expiration claim.
171 # Import `auth0-python` here to avoid `ImportError` in tests, since
172 # the `python-test` site won't have `auth0-python` installed.
173 import jwt
174 from auth0.authentication.token_verifier import (
175 AsymmetricSignatureVerifier,
176 TokenVerifier,
178 from auth0.exceptions import (
179 TokenValidationError,
182 signature_verifier = AsymmetricSignatureVerifier(self.jwks_url)
183 token_verifier = TokenVerifier(
184 audience=self.client_id,
185 issuer=self.issuer,
186 signature_verifier=signature_verifier,
189 try:
190 token_verifier.verify(user_token["id_token"])
191 except TokenValidationError as e:
192 print("Could not validate existing Auth0 ID token:", str(e))
193 return None
195 decoded_access_token = jwt.decode(
196 user_token["access_token"],
197 algorithms=self.algorithms,
198 options={"verify_signature": False},
201 access_token_expiration = decoded_access_token["exp"]
203 # Assert that the access token isn't expired or expiring within a minute.
204 if time.time() > access_token_expiration + 60:
205 print("Access token is expired.")
206 return None
208 user_token.update(
209 jwt.decode(
210 user_token["id_token"],
211 algorithms=self.algorithms,
212 options={"verify_signature": False},
215 print("Auth0 token validated.")
216 return user_token
218 def device_authorization_flow(self) -> dict:
219 """Perform the Device Authorization Flow.
221 See https://auth0.com/docs/get-started/authentication-and-authorization-flow/device-authorization-flow
222 for more.
224 start = time.perf_counter()
226 device_code_data = self.request_device_code()
227 print(
228 "1. On your computer or mobile device navigate to:",
229 device_code_data["verification_uri_complete"],
231 print("2. Enter the following code:", device_code_data["user_code"])
233 auth_msg = f"Auth0 token validation required at: {device_code_data['verification_uri_complete']}"
234 build.notify(auth_msg)
236 try:
237 webbrowser.open(device_code_data["verification_uri_complete"])
238 except webbrowser.Error:
239 print("Could not automatically open the web browser.")
241 device_code_lifetime_s = device_code_data["expires_in"]
243 # Print successive periods on the same line to avoid moving the link
244 # while the user is trying to click it.
245 print("Waiting...", end="", flush=True)
246 while time.perf_counter() - start < device_code_lifetime_s:
247 response = requests.post(
248 self.oauth_token_url,
249 data={
250 "client_id": self.client_id,
251 "device_code": device_code_data["device_code"],
252 "grant_type": "urn:ietf:params:oauth:grant-type:device_code",
253 "scope": self.scope,
256 response_data = response.json()
258 if response.status_code == 200:
259 print("\nLogin successful.")
260 return response_data
262 if response_data["error"] not in ("authorization_pending", "slow_down"):
263 raise RuntimeError(response_data["error_description"])
265 time.sleep(device_code_data["interval"])
266 print(".", end="", flush=True)
268 raise ValueError("Timed out waiting for Auth0 device code authentication!")
270 def get_token(self) -> dict:
271 """Retrieve an access token for authentication.
273 If a cached token is found and can be confirmed to be valid, return it.
274 Otherwise, perform the Device Code Flow authorization to request a new
275 token, validate it and save it to disk.
277 # Load a cached token and validate it if one is available.
278 cached_token = load_token_from_disk()
279 user_token = self.validate_token(cached_token) if cached_token else None
281 # Login with the Device Authorization Flow if an existing token isn't found.
282 if not user_token:
283 new_token = self.device_authorization_flow()
284 user_token = self.validate_token(new_token)
286 if not user_token:
287 raise ValueError("Could not get an Auth0 token.")
289 # Save token to disk.
290 with TOKEN_FILE.open("w") as f:
291 json.dump(user_token, f, indent=2, sort_keys=True)
293 return user_token
296 class LandoAPIException(Exception):
297 """Raised when Lando throws an exception."""
299 def __init__(self, detail: Optional[str] = None):
300 super().__init__(detail or "")
303 @dataclass
304 class LandoAPI:
305 """Helper class to interact with Lando-API."""
307 access_token: str
308 api_url: str
310 @property
311 def lando_try_api_url(self) -> str:
312 """URL of the Lando Try endpoint."""
313 return f"https://{self.api_url}/try/patches"
315 @property
316 def api_headers(self) -> dict[str, str]:
317 """Headers for use accessing and authenticating against the API."""
318 return {
319 "Authorization": f"Bearer {self.access_token}",
320 "Content-Type": "application/json",
323 @classmethod
324 def from_lando_config_file(cls, config_path: Path, section: str) -> LandoAPI:
325 """Build a `LandoConfig` from `section` in the file at `config_path`."""
326 if not config_path.exists():
327 raise ValueError(f"Could not find a Lando config file at `{config_path}`.")
329 lando_ini_contents = config_path.read_text()
331 parser = configparser.ConfigParser(delimiters="=")
332 parser.read_string(lando_ini_contents)
334 if not parser.has_section(section):
335 raise ValueError(f"Lando config file does not have a {section} section.")
337 auth0 = Auth0Config(
338 domain=parser.get(section, "auth0_domain"),
339 client_id=parser.get(section, "auth0_client_id"),
340 audience=parser.get(section, "auth0_audience"),
341 scope=parser.get(section, "auth0_scope"),
344 token = auth0.get_token()
346 return LandoAPI(
347 api_url=parser.get(section, "api_domain"),
348 access_token=token["access_token"],
351 def post(self, url: str, body: dict) -> dict:
352 """Make a POST request to Lando."""
353 response = requests.post(url, headers=self.api_headers, json=body)
355 try:
356 response_json = response.json()
357 except json.JSONDecodeError:
358 # If the server didn't send back a valid JSON object, raise a stack
359 # trace to the terminal which includes error details.
360 response.raise_for_status()
362 # Raise `ValueError` if the response wasn't JSON and we didn't raise
363 # from an invalid status.
364 raise LandoAPIException(
365 detail="Response was not valid JSON yet status was valid."
368 if response.status_code >= 400:
369 raise LandoAPIException(detail=response_json["detail"])
371 return response_json
373 def post_try_push_patches(
374 self,
375 patches: List[str],
376 patch_format: str,
377 base_commit: str,
378 ) -> dict:
379 """Send try push contents to Lando.
381 Send the list of base64-encoded `patches` in `patch_format` to Lando, to be applied to
382 the Mercurial `base_commit`, using the Auth0 `access_token` for authorization.
384 request_json_body = {
385 "base_commit": base_commit,
386 "patch_format": patch_format,
387 "patches": patches,
390 print("Submitting patches to Lando.")
391 response_json = self.post(self.lando_try_api_url, request_json_body)
393 return response_json
396 def push_to_lando_try(
397 vcs: SupportedVcsRepository, commit_message: str, changed_files: dict
399 """Push a set of patches to Lando's try endpoint."""
400 # Map `Repository` subclasses to the `patch_format` value Lando expects.
401 PATCH_FORMAT_STRING_MAPPING = {
402 GitRepository: "git-format-patch",
403 HgRepository: "hgexport",
405 patch_format = PATCH_FORMAT_STRING_MAPPING.get(type(vcs))
406 if not patch_format:
407 # Other VCS types (namely `src`) are unsupported.
408 raise ValueError(f"Try push via Lando is not supported for `{vcs.name}`.")
410 # Use Lando Prod unless the `LANDO_TRY_USE_DEV` environment variable is defined.
411 lando_config_section = (
412 "lando-prod" if not os.getenv("LANDO_TRY_USE_DEV") else "lando-dev"
415 # Load Auth0 config from `.lando.ini`.
416 lando_ini_path = Path(vcs.path) / ".lando.ini"
417 lando_api = LandoAPI.from_lando_config_file(lando_ini_path, lando_config_section)
419 # Get the time when the push was initiated, not including Auth0 login time.
420 push_start_time = time.perf_counter()
422 with vcs.try_commit(commit_message, changed_files) as head:
423 try:
424 base_commit, patches = get_stack_info(vcs, head)
425 except ValueError as exc:
426 error_msg = "abort: error gathering patches for submission."
427 print(error_msg)
428 print(str(exc))
429 build.notify(error_msg)
430 return
432 try:
433 # Make the try request to Lando.
434 response_json = lando_api.post_try_push_patches(
435 patches, patch_format, base_commit
437 except LandoAPIException as exc:
438 error_msg = "abort: error submitting patches to Lando."
439 print(error_msg)
440 print(str(exc))
441 build.notify(error_msg)
442 return
444 duration = round(time.perf_counter() - push_start_time, ndigits=2)
446 job_id = response_json["id"]
447 success_msg = (
448 f"Lando try submission success, took {duration} seconds. "
449 f"Landing job id: {job_id}."
451 print(success_msg)
452 build.notify(success_msg)