1 # This Source Code Form is subject to the terms of the Mozilla Public
2 # License, v. 2.0. If a copy of the MPL was not distributed with this
3 # file, You can obtain one at http://mozilla.org/MPL/2.0/.
5 """Implements Auth0 Device Code flow and Lando try submission.
7 See https://auth0.com/blog/securing-a-python-cli-application-with-auth0/ for more.
10 from __future__
import annotations
18 from dataclasses
import (
22 from pathlib
import Path
31 from mach
.util
import get_state_dir
32 from mozbuild
.base
import MozbuildObject
33 from mozversioncontrol
import (
38 from .task_config
import (
43 Path(get_state_dir(specific_to_topsrcdir
=False)) / "lando_auth0_user_token.json"
46 # The supported variants of `Repository` for this workflow.
47 SupportedVcsRepository
= Union
[GitRepository
, HgRepository
]
49 here
= os
.path
.abspath(os
.path
.dirname(__file__
))
50 build
= MozbuildObject
.from_environment(cwd
=here
)
53 def convert_bytes_patch_to_base64(patch_bytes
: bytes
) -> str:
54 """Return a base64 encoded `str` representing the passed `bytes` patch."""
55 return base64
.b64encode(patch_bytes
).decode("ascii")
58 def load_token_from_disk() -> Optional
[dict]:
59 """Load and validate an existing Auth0 token from disk.
61 Return the token as a `dict` if it can be validated, or return `None`
62 if any error was encountered.
64 if not TOKEN_FILE
.exists():
65 print("No existing Auth0 token found.")
69 user_token
= json
.loads(TOKEN_FILE
.read_bytes())
70 except json
.JSONDecodeError
:
71 print("Existing Auth0 token could not be decoded as JSON.")
77 def get_stack_info(vcs
: SupportedVcsRepository
) -> Tuple
[str, List
[str]]:
78 """Retrieve information about the current stack for submission via Lando.
80 Returns a tuple of the current public base commit as a Mercurial SHA,
81 and a list of ordered base64 encoded patches.
83 base_commit
= vcs
.base_ref_as_hg()
86 "Could not determine base Mercurial commit hash for submission."
88 print("Using", base_commit
, "as the hg base commit.")
90 # Reuse the base revision when on Mercurial to avoid multiple calls to `hg log`.
91 branch_nodes_kwargs
= {}
92 if isinstance(vcs
, HgRepository
):
93 branch_nodes_kwargs
["base_ref"] = base_commit
95 nodes
= vcs
.get_branch_nodes(**branch_nodes_kwargs
)
97 raise ValueError("Could not find any commit hashes for submission.")
99 print("Submitting a single try config commit.")
100 elif len(nodes
) == 2:
101 print("Submitting 1 node and the try commit.")
103 print("Submitting stack of", len(nodes
) - 1, "nodes and the try commit.")
105 patches
= vcs
.get_commit_patches(nodes
)
107 convert_bytes_patch_to_base64(patch_bytes
) for patch_bytes
in patches
109 print("Patches gathered for submission.")
111 return base_commit
, base64_patches
116 """Helper class to interact with Auth0."""
122 algorithms
: list[str] = field(default_factory
=lambda: ["RS256"])
125 def base_url(self
) -> str:
126 """Auth0 base URL."""
127 return f
"https://{self.domain}"
130 def device_code_url(self
) -> str:
131 """URL of the Device Code API endpoint."""
132 return f
"{self.base_url}/oauth/device/code"
135 def issuer(self
) -> str:
136 """Token issuer URL."""
137 return f
"{self.base_url}/"
140 def jwks_url(self
) -> str:
141 """URL of the JWKS file."""
142 return f
"{self.base_url}/.well-known/jwks.json"
145 def oauth_token_url(self
) -> str:
146 """URL of the OAuth Token endpoint."""
147 return f
"{self.base_url}/oauth/token"
149 def request_device_code(self
) -> dict:
150 """Request authorization from Auth0 using the Device Code Flow.
152 See https://auth0.com/docs/api/authentication#get-device-code for more.
154 response
= requests
.post(
155 self
.device_code_url
,
156 headers
={"Content-Type": "application/x-www-form-urlencoded"},
158 "audience": self
.audience
,
159 "client_id": self
.client_id
,
164 response
.raise_for_status()
166 return response
.json()
168 def validate_token(self
, user_token
: dict) -> Optional
[dict]:
169 """Verify the given user token is valid.
171 Validate the ID token, and validate the access token's expiration claim.
173 # Import `auth0-python` here to avoid `ImportError` in tests, since
174 # the `python-test` site won't have `auth0-python` installed.
176 from auth0
.authentication
.token_verifier
import (
177 AsymmetricSignatureVerifier
,
180 from auth0
.exceptions
import (
181 TokenValidationError
,
184 signature_verifier
= AsymmetricSignatureVerifier(self
.jwks_url
)
185 token_verifier
= TokenVerifier(
186 audience
=self
.client_id
,
188 signature_verifier
=signature_verifier
,
192 token_verifier
.verify(user_token
["id_token"])
193 except TokenValidationError
as e
:
194 print("Could not validate existing Auth0 ID token:", str(e
))
197 decoded_access_token
= jwt
.decode(
198 user_token
["access_token"],
199 algorithms
=self
.algorithms
,
200 options
={"verify_signature": False},
203 access_token_expiration
= decoded_access_token
["exp"]
205 # Assert that the access token isn't expired or expiring within a minute.
206 if time
.time() > access_token_expiration
+ 60:
207 print("Access token is expired.")
212 user_token
["id_token"],
213 algorithms
=self
.algorithms
,
214 options
={"verify_signature": False},
217 print("Auth0 token validated.")
220 def device_authorization_flow(self
) -> dict:
221 """Perform the Device Authorization Flow.
223 See https://auth0.com/docs/get-started/authentication-and-authorization-flow/device-authorization-flow
226 start
= time
.perf_counter()
228 device_code_data
= self
.request_device_code()
230 "1. On your computer or mobile device navigate to:",
231 device_code_data
["verification_uri_complete"],
233 print("2. Enter the following code:", device_code_data
["user_code"])
235 auth_msg
= f
"Auth0 token validation required at: {device_code_data['verification_uri_complete']}"
236 build
.notify(auth_msg
)
239 webbrowser
.open(device_code_data
["verification_uri_complete"])
240 except webbrowser
.Error
:
241 print("Could not automatically open the web browser.")
243 device_code_lifetime_s
= device_code_data
["expires_in"]
245 # Print successive periods on the same line to avoid moving the link
246 # while the user is trying to click it.
247 print("Waiting...", end
="", flush
=True)
248 while time
.perf_counter() - start
< device_code_lifetime_s
:
249 response
= requests
.post(
250 self
.oauth_token_url
,
252 "client_id": self
.client_id
,
253 "device_code": device_code_data
["device_code"],
254 "grant_type": "urn:ietf:params:oauth:grant-type:device_code",
258 response_data
= response
.json()
260 if response
.status_code
== 200:
261 print("\nLogin successful.")
264 if response_data
["error"] not in ("authorization_pending", "slow_down"):
265 raise RuntimeError(response_data
["error_description"])
267 time
.sleep(device_code_data
["interval"])
268 print(".", end
="", flush
=True)
270 raise ValueError("Timed out waiting for Auth0 device code authentication!")
272 def get_token(self
) -> dict:
273 """Retrieve an access token for authentication.
275 If a cached token is found and can be confirmed to be valid, return it.
276 Otherwise, perform the Device Code Flow authorization to request a new
277 token, validate it and save it to disk.
279 # Load a cached token and validate it if one is available.
280 cached_token
= load_token_from_disk()
281 user_token
= self
.validate_token(cached_token
) if cached_token
else None
283 # Login with the Device Authorization Flow if an existing token isn't found.
285 new_token
= self
.device_authorization_flow()
286 user_token
= self
.validate_token(new_token
)
289 raise ValueError("Could not get an Auth0 token.")
291 # Save token to disk.
292 with TOKEN_FILE
.open("w") as f
:
293 json
.dump(user_token
, f
, indent
=2, sort_keys
=True)
298 class LandoAPIException(Exception):
299 """Raised when Lando throws an exception."""
301 def __init__(self
, detail
: Optional
[str] = None):
302 super().__init
__(detail
or "")
307 """Helper class to interact with Lando-API."""
313 def lando_try_api_url(self
) -> str:
314 """URL of the Lando Try endpoint."""
315 return f
"https://{self.api_url}/try/patches"
318 def api_headers(self
) -> dict[str, str]:
319 """Headers for use accessing and authenticating against the API."""
321 "Authorization": f
"Bearer {self.access_token}",
322 "Content-Type": "application/json",
326 def from_lando_config_file(cls
, config_path
: Path
, section
: str) -> LandoAPI
:
327 """Build a `LandoConfig` from `section` in the file at `config_path`."""
328 if not config_path
.exists():
329 raise ValueError(f
"Could not find a Lando config file at `{config_path}`.")
331 lando_ini_contents
= config_path
.read_text()
333 parser
= configparser
.ConfigParser(delimiters
="=")
334 parser
.read_string(lando_ini_contents
)
336 if not parser
.has_section(section
):
337 raise ValueError(f
"Lando config file does not have a {section} section.")
340 domain
=parser
.get(section
, "auth0_domain"),
341 client_id
=parser
.get(section
, "auth0_client_id"),
342 audience
=parser
.get(section
, "auth0_audience"),
343 scope
=parser
.get(section
, "auth0_scope"),
346 token
= auth0
.get_token()
349 api_url
=parser
.get(section
, "api_domain"),
350 access_token
=token
["access_token"],
353 def post(self
, url
: str, body
: dict) -> dict:
354 """Make a POST request to Lando."""
355 response
= requests
.post(url
, headers
=self
.api_headers
, json
=body
)
358 response_json
= response
.json()
359 except json
.JSONDecodeError
:
360 # If the server didn't send back a valid JSON object, raise a stack
361 # trace to the terminal which includes error details.
362 response
.raise_for_status()
364 # Raise `ValueError` if the response wasn't JSON and we didn't raise
365 # from an invalid status.
366 raise LandoAPIException(
367 detail
="Response was not valid JSON yet status was valid."
370 if response
.status_code
>= 400:
371 raise LandoAPIException(detail
=response_json
["detail"])
375 def post_try_push_patches(
381 """Send try push contents to Lando.
383 Send the list of base64-encoded `patches` in `patch_format` to Lando, to be applied to
384 the Mercurial `base_commit`, using the Auth0 `access_token` for authorization.
386 request_json_body
= {
387 "base_commit": base_commit
,
388 "patch_format": patch_format
,
392 print("Submitting patches to Lando.")
393 response_json
= self
.post(self
.lando_try_api_url
, request_json_body
)
398 def push_to_lando_try(vcs
: SupportedVcsRepository
, commit_message
: str):
399 """Push a set of patches to Lando's try endpoint."""
400 # Map `Repository` subclasses to the `patch_format` value Lando expects.
401 PATCH_FORMAT_STRING_MAPPING
= {
402 GitRepository
: "git-format-patch",
403 HgRepository
: "hgexport",
405 patch_format
= PATCH_FORMAT_STRING_MAPPING
.get(type(vcs
))
407 # Other VCS types (namely `src`) are unsupported.
408 raise ValueError(f
"Try push via Lando is not supported for `{vcs.name}`.")
410 # Use Lando Prod unless the `LANDO_TRY_USE_DEV` environment variable is defined.
411 lando_config_section
= (
412 "lando-prod" if not os
.getenv("LANDO_TRY_USE_DEV") else "lando-dev"
415 # Load Auth0 config from `.lando.ini`.
416 lando_ini_path
= Path(vcs
.path
) / ".lando.ini"
417 lando_api
= LandoAPI
.from_lando_config_file(lando_ini_path
, lando_config_section
)
419 # Get the time when the push was initiated, not including Auth0 login time.
420 push_start_time
= time
.perf_counter()
422 with
try_config_commit(vcs
, commit_message
):
424 base_commit
, patches
= get_stack_info(vcs
)
425 except ValueError as exc
:
426 error_msg
= "abort: error gathering patches for submission."
429 build
.notify(error_msg
)
433 # Make the try request to Lando.
434 response_json
= lando_api
.post_try_push_patches(
435 patches
, patch_format
, base_commit
437 except LandoAPIException
as exc
:
438 error_msg
= "abort: error submitting patches to Lando."
441 build
.notify(error_msg
)
444 duration
= round(time
.perf_counter() - push_start_time
, ndigits
=2)
446 job_id
= response_json
["id"]
448 f
"Lando try submission success, took {duration} seconds. "
449 f
"Landing job id: {job_id}."
452 build
.notify(success_msg
)