1 # This Source Code Form is subject to the terms of the Mozilla Public
2 # License, v. 2.0. If a copy of the MPL was not distributed with this
3 # file, You can obtain one at http://mozilla.org/MPL/2.0/.
5 """Implements Auth0 Device Code flow and Lando try submission.
7 See https://auth0.com/blog/securing-a-python-cli-application-with-auth0/ for more.
10 from __future__
import annotations
18 from dataclasses
import (
22 from pathlib
import Path
31 from mach
.util
import get_state_dir
32 from mozbuild
.base
import MozbuildObject
33 from mozversioncontrol
import (
39 Path(get_state_dir(specific_to_topsrcdir
=False)) / "lando_auth0_user_token.json"
42 # The supported variants of `Repository` for this workflow.
43 SupportedVcsRepository
= Union
[GitRepository
, HgRepository
]
45 here
= os
.path
.abspath(os
.path
.dirname(__file__
))
46 build
= MozbuildObject
.from_environment(cwd
=here
)
49 def convert_bytes_patch_to_base64(patch_bytes
: bytes
) -> str:
50 """Return a base64 encoded `str` representing the passed `bytes` patch."""
51 return base64
.b64encode(patch_bytes
).decode("ascii")
54 def load_token_from_disk() -> Optional
[dict]:
55 """Load and validate an existing Auth0 token from disk.
57 Return the token as a `dict` if it can be validated, or return `None`
58 if any error was encountered.
60 if not TOKEN_FILE
.exists():
61 print("No existing Auth0 token found.")
65 user_token
= json
.loads(TOKEN_FILE
.read_bytes())
66 except json
.JSONDecodeError
:
67 print("Existing Auth0 token could not be decoded as JSON.")
74 vcs
: SupportedVcsRepository
, head
: Optional
[str]
75 ) -> Tuple
[str, List
[str]]:
76 """Retrieve information about the current stack for submission via Lando.
78 Returns a tuple of the current public base commit as a Mercurial SHA,
79 and a list of ordered base64 encoded patches.
81 base_commit
= vcs
.base_ref_as_hg()
84 "Could not determine base Mercurial commit hash for submission."
86 print("Using", base_commit
, "as the hg base commit.")
88 # Reuse the base revision when on Mercurial to avoid multiple calls to `hg log`.
89 branch_nodes_kwargs
= {}
90 if isinstance(vcs
, HgRepository
):
91 branch_nodes_kwargs
["base_ref"] = base_commit
93 nodes
= vcs
.get_branch_nodes(head
, **branch_nodes_kwargs
)
95 raise ValueError("Could not find any commit hashes for submission.")
97 print("Submitting a single try config commit.")
99 print("Submitting 1 node and the try commit.")
101 print("Submitting stack of", len(nodes
) - 1, "nodes and the try commit.")
103 patches
= vcs
.get_commit_patches(nodes
)
105 convert_bytes_patch_to_base64(patch_bytes
) for patch_bytes
in patches
107 print("Patches gathered for submission.")
109 return base_commit
, base64_patches
114 """Helper class to interact with Auth0."""
120 algorithms
: list[str] = field(default_factory
=lambda: ["RS256"])
123 def base_url(self
) -> str:
124 """Auth0 base URL."""
125 return f
"https://{self.domain}"
128 def device_code_url(self
) -> str:
129 """URL of the Device Code API endpoint."""
130 return f
"{self.base_url}/oauth/device/code"
133 def issuer(self
) -> str:
134 """Token issuer URL."""
135 return f
"{self.base_url}/"
138 def jwks_url(self
) -> str:
139 """URL of the JWKS file."""
140 return f
"{self.base_url}/.well-known/jwks.json"
143 def oauth_token_url(self
) -> str:
144 """URL of the OAuth Token endpoint."""
145 return f
"{self.base_url}/oauth/token"
147 def request_device_code(self
) -> dict:
148 """Request authorization from Auth0 using the Device Code Flow.
150 See https://auth0.com/docs/api/authentication#get-device-code for more.
152 response
= requests
.post(
153 self
.device_code_url
,
154 headers
={"Content-Type": "application/x-www-form-urlencoded"},
156 "audience": self
.audience
,
157 "client_id": self
.client_id
,
162 response
.raise_for_status()
164 return response
.json()
166 def validate_token(self
, user_token
: dict) -> Optional
[dict]:
167 """Verify the given user token is valid.
169 Validate the ID token, and validate the access token's expiration claim.
171 # Import `auth0-python` here to avoid `ImportError` in tests, since
172 # the `python-test` site won't have `auth0-python` installed.
174 from auth0
.authentication
.token_verifier
import (
175 AsymmetricSignatureVerifier
,
178 from auth0
.exceptions
import (
179 TokenValidationError
,
182 signature_verifier
= AsymmetricSignatureVerifier(self
.jwks_url
)
183 token_verifier
= TokenVerifier(
184 audience
=self
.client_id
,
186 signature_verifier
=signature_verifier
,
190 token_verifier
.verify(user_token
["id_token"])
191 except TokenValidationError
as e
:
192 print("Could not validate existing Auth0 ID token:", str(e
))
195 decoded_access_token
= jwt
.decode(
196 user_token
["access_token"],
197 algorithms
=self
.algorithms
,
198 options
={"verify_signature": False},
201 access_token_expiration
= decoded_access_token
["exp"]
203 # Assert that the access token isn't expired or expiring within a minute.
204 if time
.time() > access_token_expiration
+ 60:
205 print("Access token is expired.")
210 user_token
["id_token"],
211 algorithms
=self
.algorithms
,
212 options
={"verify_signature": False},
215 print("Auth0 token validated.")
218 def device_authorization_flow(self
) -> dict:
219 """Perform the Device Authorization Flow.
221 See https://auth0.com/docs/get-started/authentication-and-authorization-flow/device-authorization-flow
224 start
= time
.perf_counter()
226 device_code_data
= self
.request_device_code()
228 "1. On your computer or mobile device navigate to:",
229 device_code_data
["verification_uri_complete"],
231 print("2. Enter the following code:", device_code_data
["user_code"])
233 auth_msg
= f
"Auth0 token validation required at: {device_code_data['verification_uri_complete']}"
234 build
.notify(auth_msg
)
237 webbrowser
.open(device_code_data
["verification_uri_complete"])
238 except webbrowser
.Error
:
239 print("Could not automatically open the web browser.")
241 device_code_lifetime_s
= device_code_data
["expires_in"]
243 # Print successive periods on the same line to avoid moving the link
244 # while the user is trying to click it.
245 print("Waiting...", end
="", flush
=True)
246 while time
.perf_counter() - start
< device_code_lifetime_s
:
247 response
= requests
.post(
248 self
.oauth_token_url
,
250 "client_id": self
.client_id
,
251 "device_code": device_code_data
["device_code"],
252 "grant_type": "urn:ietf:params:oauth:grant-type:device_code",
256 response_data
= response
.json()
258 if response
.status_code
== 200:
259 print("\nLogin successful.")
262 if response_data
["error"] not in ("authorization_pending", "slow_down"):
263 raise RuntimeError(response_data
["error_description"])
265 time
.sleep(device_code_data
["interval"])
266 print(".", end
="", flush
=True)
268 raise ValueError("Timed out waiting for Auth0 device code authentication!")
270 def get_token(self
) -> dict:
271 """Retrieve an access token for authentication.
273 If a cached token is found and can be confirmed to be valid, return it.
274 Otherwise, perform the Device Code Flow authorization to request a new
275 token, validate it and save it to disk.
277 # Load a cached token and validate it if one is available.
278 cached_token
= load_token_from_disk()
279 user_token
= self
.validate_token(cached_token
) if cached_token
else None
281 # Login with the Device Authorization Flow if an existing token isn't found.
283 new_token
= self
.device_authorization_flow()
284 user_token
= self
.validate_token(new_token
)
287 raise ValueError("Could not get an Auth0 token.")
289 # Save token to disk.
290 with TOKEN_FILE
.open("w") as f
:
291 json
.dump(user_token
, f
, indent
=2, sort_keys
=True)
296 class LandoAPIException(Exception):
297 """Raised when Lando throws an exception."""
299 def __init__(self
, detail
: Optional
[str] = None):
300 super().__init
__(detail
or "")
305 """Helper class to interact with Lando-API."""
311 def lando_try_api_url(self
) -> str:
312 """URL of the Lando Try endpoint."""
313 return f
"https://{self.api_url}/try/patches"
316 def api_headers(self
) -> dict[str, str]:
317 """Headers for use accessing and authenticating against the API."""
319 "Authorization": f
"Bearer {self.access_token}",
320 "Content-Type": "application/json",
324 def from_lando_config_file(cls
, config_path
: Path
, section
: str) -> LandoAPI
:
325 """Build a `LandoConfig` from `section` in the file at `config_path`."""
326 if not config_path
.exists():
327 raise ValueError(f
"Could not find a Lando config file at `{config_path}`.")
329 lando_ini_contents
= config_path
.read_text()
331 parser
= configparser
.ConfigParser(delimiters
="=")
332 parser
.read_string(lando_ini_contents
)
334 if not parser
.has_section(section
):
335 raise ValueError(f
"Lando config file does not have a {section} section.")
338 domain
=parser
.get(section
, "auth0_domain"),
339 client_id
=parser
.get(section
, "auth0_client_id"),
340 audience
=parser
.get(section
, "auth0_audience"),
341 scope
=parser
.get(section
, "auth0_scope"),
344 token
= auth0
.get_token()
347 api_url
=parser
.get(section
, "api_domain"),
348 access_token
=token
["access_token"],
351 def post(self
, url
: str, body
: dict) -> dict:
352 """Make a POST request to Lando."""
353 response
= requests
.post(url
, headers
=self
.api_headers
, json
=body
)
356 response_json
= response
.json()
357 except json
.JSONDecodeError
:
358 # If the server didn't send back a valid JSON object, raise a stack
359 # trace to the terminal which includes error details.
360 response
.raise_for_status()
362 # Raise `ValueError` if the response wasn't JSON and we didn't raise
363 # from an invalid status.
364 raise LandoAPIException(
365 detail
="Response was not valid JSON yet status was valid."
368 if response
.status_code
>= 400:
369 raise LandoAPIException(detail
=response_json
["detail"])
373 def post_try_push_patches(
379 """Send try push contents to Lando.
381 Send the list of base64-encoded `patches` in `patch_format` to Lando, to be applied to
382 the Mercurial `base_commit`, using the Auth0 `access_token` for authorization.
384 request_json_body
= {
385 "base_commit": base_commit
,
386 "patch_format": patch_format
,
390 print("Submitting patches to Lando.")
391 response_json
= self
.post(self
.lando_try_api_url
, request_json_body
)
396 def push_to_lando_try(
397 vcs
: SupportedVcsRepository
, commit_message
: str, changed_files
: dict
399 """Push a set of patches to Lando's try endpoint."""
400 # Map `Repository` subclasses to the `patch_format` value Lando expects.
401 PATCH_FORMAT_STRING_MAPPING
= {
402 GitRepository
: "git-format-patch",
403 HgRepository
: "hgexport",
405 patch_format
= PATCH_FORMAT_STRING_MAPPING
.get(type(vcs
))
407 # Other VCS types (namely `src`) are unsupported.
408 raise ValueError(f
"Try push via Lando is not supported for `{vcs.name}`.")
410 # Use Lando Prod unless the `LANDO_TRY_USE_DEV` environment variable is defined.
411 lando_config_section
= (
412 "lando-prod" if not os
.getenv("LANDO_TRY_USE_DEV") else "lando-dev"
415 # Load Auth0 config from `.lando.ini`.
416 lando_ini_path
= Path(vcs
.path
) / ".lando.ini"
417 lando_api
= LandoAPI
.from_lando_config_file(lando_ini_path
, lando_config_section
)
419 # Get the time when the push was initiated, not including Auth0 login time.
420 push_start_time
= time
.perf_counter()
422 with vcs
.try_commit(commit_message
, changed_files
) as head
:
424 base_commit
, patches
= get_stack_info(vcs
, head
)
425 except ValueError as exc
:
426 error_msg
= "abort: error gathering patches for submission."
429 build
.notify(error_msg
)
433 # Make the try request to Lando.
434 response_json
= lando_api
.post_try_push_patches(
435 patches
, patch_format
, base_commit
437 except LandoAPIException
as exc
:
438 error_msg
= "abort: error submitting patches to Lando."
441 build
.notify(error_msg
)
444 duration
= round(time
.perf_counter() - push_start_time
, ndigits
=2)
446 job_id
= response_json
["id"]
448 f
"Lando try submission success, took {duration} seconds. "
449 f
"Landing job id: {job_id}."
452 build
.notify(success_msg
)