2 # This Source Code Form is subject to the terms of the Mozilla Public
3 # License, v. 2.0. If a copy of the MPL was not distributed with this
4 # file, You can obtain one at http://mozilla.org/MPL/2.0/.
8 import concurrent.futures
15 import multiprocessing
41 CONCURRENCY = multiprocessing.cpu_count()
45 print(msg, file=sys.stderr)
49 class IntegrityError(Exception):
50 """Represents an integrity error when downloading a URL."""
53 def ZstdCompressor(*args, **kwargs):
55 raise ValueError("zstandard Python package not available")
56 return zstandard.ZstdCompressor(*args, **kwargs)
59 def ZstdDecompressor(*args, **kwargs):
61 raise ValueError("zstandard Python package not available")
62 return zstandard.ZstdDecompressor(*args, **kwargs)
65 @contextlib.contextmanager
66 def rename_after_close(fname, *args, **kwargs):
68 Context manager that opens a temporary file to use as a writer,
69 and closes the file on context exit, renaming it to the expected
70 file name in case of success, or removing it in case of failure.
72 Takes the same options as open(), but must be used as a context
75 path = pathlib.Path(fname)
76 tmp = path.with_name("%s.tmp" % path.name)
78 with tmp.open(*args, **kwargs) as fh:
87 # The following is copied from
88 # https://github.com/mozilla-releng/redo/blob/6d07678a014e0c525e54a860381a165d34db10ff/redo/__init__.py#L15-L85
89 def retrier(attempts=5, sleeptime=10, max_sleeptime=300, sleepscale=1.5, jitter=1):
91 A generator function that sleeps between retries, handles exponential
92 backoff and jitter. The action you are retrying is meant to run after
95 At each iteration, we sleep for sleeptime + random.randint(-jitter, jitter).
96 Afterwards sleeptime is multiplied by sleepscale for the next iteration.
99 attempts (int): maximum number of times to try; defaults to 5
100 sleeptime (float): how many seconds to sleep between tries; defaults to
102 max_sleeptime (float): the longest we'll sleep, in seconds; defaults to
104 sleepscale (float): how much to multiply the sleep time by each
105 iteration; defaults to 1.5
106 jitter (int): random jitter to introduce to sleep time each iteration.
107 the amount is chosen at random between [-jitter, +jitter]
111 None, a maximum of `attempts` number of times
115 >>> for _ in retrier(sleeptime=0, jitter=0):
117 ... # We did the thing!
124 >>> for _ in retrier(sleeptime=0, jitter=0):
126 ... # We did the thing!
130 ... print("max tries hit")
133 jitter = jitter or 0 # py35 barfs on the next line if jitter is None
134 if jitter > sleeptime:
135 # To prevent negative sleep times
137 "jitter ({}) must be less than sleep time ({})".format(jitter, sleeptime)
140 sleeptime_real = sleeptime
141 for _ in range(attempts):
142 log("attempt %i/%i" % (_ + 1, attempts))
147 sleeptime_real = sleeptime + random.randint(-jitter, jitter)
148 # our jitter should scale along with the sleeptime
149 jitter = int(jitter * sleepscale)
151 sleeptime_real = sleeptime
153 sleeptime *= sleepscale
155 if sleeptime_real > max_sleeptime:
156 sleeptime_real = max_sleeptime
158 # Don't need to sleep the last time
161 "sleeping for %.2fs (attempt %i/%i)" % (sleeptime_real, _ + 1, attempts)
163 time.sleep(sleeptime_real)
166 def stream_download(url, sha256=None, size=None):
167 """Download a URL to a generator, optionally with content verification.
169 If ``sha256`` or ``size`` are defined, the downloaded URL will be
170 validated against those requirements and ``IntegrityError`` will be
171 raised if expectations do not match.
173 Because verification cannot occur until the file is completely downloaded
174 it is recommended for consumers to not do anything meaningful with the
175 data if content verification is being used. To securely handle retrieved
176 content, it should be streamed to a file or memory and only operated
177 on after the generator is exhausted without raising.
179 log("Downloading %s" % url)
185 with urllib.request.urlopen(
186 url, timeout=60, cafile=certifi.where()
187 ) if certifi else urllib.request.urlopen(url, timeout=60) as fh:
188 if not url.endswith(".gz") and fh.info().get("Content-Encoding") == "gzip":
189 fh = gzip.GzipFile(fileobj=fh)
192 chunk = fh.read(65536)
201 duration = time.time() - t0
202 digest = h.hexdigest()
205 "%s resolved to %d bytes with sha256 %s in %.3fs"
206 % (url, length, digest, duration)
211 log("Verified size of %s" % url)
213 raise IntegrityError(
214 "size mismatch on %s: wanted %d; got %d" % (url, size, length)
219 log("Verified sha256 integrity of %s" % url)
221 raise IntegrityError(
222 "sha256 mismatch on %s: wanted %s; got %s" % (url, sha256, digest)
226 def download_to_path(url, path, sha256=None, size=None):
227 """Download a URL to a filesystem path, possibly with verification."""
229 # We download to a temporary file and rename at the end so there's
230 # no chance of the final file being partially written or containing
234 except FileNotFoundError:
237 for _ in retrier(attempts=5, sleeptime=60):
239 log("Downloading %s to %s" % (url, path))
241 with rename_after_close(path, "wb") as fh:
242 for chunk in stream_download(url, sha256=sha256, size=size):
246 except IntegrityError:
248 except Exception as e:
249 log("Download failed: {}".format(e))
252 raise Exception("Download failed, no more retries!")
255 def download_to_memory(url, sha256=None, size=None):
256 """Download a URL to memory, possibly with verification."""
259 for _ in retrier(attempts=5, sleeptime=60):
261 log("Downloading %s" % (url))
263 for chunk in stream_download(url, sha256=sha256, size=size):
267 except IntegrityError:
269 except Exception as e:
270 log("Download failed: {}".format(e))
273 raise Exception("Download failed, no more retries!")
276 def gpg_verify_path(path: pathlib.Path, public_key_data: bytes, signature_data: bytes):
277 """Verify that a filesystem path verifies using GPG.
279 Takes a Path defining a file to verify. ``public_key_data`` contains
280 bytes with GPG public key data. ``signature_data`` contains a signed
281 GPG document to use with ``gpg --verify``.
283 log("Validating GPG signature of %s" % path)
284 log("GPG key data:\n%s" % public_key_data.decode("ascii"))
286 with tempfile.TemporaryDirectory() as td:
288 # --batch since we're running unattended.
289 gpg_args = ["gpg", "--homedir", td, "--batch"]
291 log("Importing GPG key...")
292 subprocess.run(gpg_args + ["--import"], input=public_key_data, check=True)
294 log("Verifying GPG signature...")
296 gpg_args + ["--verify", "-", "%s" % path],
297 input=signature_data,
301 log("GPG signature verified!")
303 # There is a race between the agent self-terminating and
304 # shutil.rmtree() from the temporary directory cleanup that can
305 # lead to exceptions. Kill the agent before cleanup to prevent this.
306 env = dict(os.environ)
307 env["GNUPGHOME"] = td
308 subprocess.run(["gpgconf", "--kill", "gpg-agent"], env=env)
311 def open_tar_stream(path: pathlib.Path):
313 if path.suffix == ".bz2":
314 return bz2.open(str(path), "rb")
315 elif path.suffix in (".gz", ".tgz") :
316 return gzip.open(str(path), "rb")
317 elif path.suffix == ".xz":
318 return lzma.open(str(path), "rb")
319 elif path.suffix == ".zst":
320 dctx = ZstdDecompressor()
321 return dctx.stream_reader(path.open("rb"))
322 elif path.suffix == ".tar":
323 return path.open("rb")
325 raise ValueError("unknown archive format for tar file: %s" % path)
328 def archive_type(path: pathlib.Path):
329 """Attempt to identify a path as an extractable archive."""
330 if path.suffixes[-2:-1] == [".tar"] or path.suffixes[-1:] == [".tgz"]:
332 elif path.suffix == ".zip":
338 def extract_archive(path, dest_dir, typ):
339 """Extract an archive to a destination directory."""
341 # Resolve paths to absolute variants.
342 path = path.resolve()
343 dest_dir = dest_dir.resolve()
345 log("Extracting %s to %s" % (path, dest_dir))
348 # We pipe input to the decompressor program so that we can apply
349 # custom decompressors that the program may not know about.
351 ifh = open_tar_stream(path)
352 # On Windows, the tar program doesn't support things like symbolic
353 # links, while Windows actually support them. The tarfile module in
354 # python does. So use that. But since it's significantly slower than
355 # the tar program on Linux, only use tarfile on Windows (tarfile is
356 # also not much slower on Windows, presumably because of the
357 # notoriously bad I/O).
358 if sys.platform == "win32":
359 tar = tarfile.open(fileobj=ifh, mode="r|")
360 tar.extractall(str(dest_dir))
363 args = ["tar", "xf", "-"]
366 # unzip from stdin has wonky behavior. We don't use a pipe for it.
367 ifh = open(os.devnull, "rb")
368 args = ["unzip", "-o", str(path)]
371 raise ValueError("unknown archive format: %s" % path)
374 with ifh, subprocess.Popen(
375 args, cwd=str(dest_dir), bufsize=0, stdin=subprocess.PIPE
381 chunk = ifh.read(131072)
388 raise Exception("%r exited %d" % (args, p.returncode))
390 log("%s extracted in %.3fs" % (path, time.time() - t0))
394 orig: pathlib.Path, dest: pathlib.Path, strip_components=0, prefix=""
397 log("Repacking as %s" % dest)
398 orig_typ = archive_type(orig)
399 typ = archive_type(dest)
401 raise Exception("Archive type not supported for %s" % orig.name)
403 raise Exception("Archive type not supported for %s" % dest.name)
405 if dest.suffixes[-2:] != [".tar", ".zst"]:
406 raise Exception("Only producing .tar.zst archives is supported.")
408 if strip_components or prefix:
412 stripped = "/".join(name.split("/")[strip_components:])
415 "Stripping %d components would remove files" % strip_components
423 with rename_after_close(dest, "wb") as fh:
424 ctx = ZstdCompressor()
425 if orig_typ == "zip":
427 zip = zipfile.ZipFile(orig)
428 # Convert the zip stream to a tar on the fly.
429 with ctx.stream_writer(fh) as compressor, tarfile.open(
430 fileobj=compressor, mode="w:"
432 for zipinfo in zip.infolist():
435 tarinfo = tarfile.TarInfo()
436 filename = zipinfo.filename
437 tarinfo.name = filter(filename) if filter else filename
438 tarinfo.size = zipinfo.file_size
439 # Zip files don't have any knowledge of the timezone
440 # they were created in. Which is not really convenient to
441 # reliably convert to a timestamp. But we don't really
442 # care about accuracy, but rather about reproducibility,
444 time = datetime.datetime(
445 *zipinfo.date_time, tzinfo=datetime.timezone.utc
447 tarinfo.mtime = time.timestamp()
448 # 0 is MS-DOS, 3 is UNIX. Only in the latter case do we
449 # get anything useful for the tar file mode.
450 if zipinfo.create_system == 3:
451 mode = zipinfo.external_attr >> 16
454 tarinfo.mode = stat.S_IMODE(mode)
455 if stat.S_ISLNK(mode):
456 tarinfo.type = tarfile.SYMTYPE
457 tarinfo.linkname = zip.read(filename).decode()
458 tar.addfile(tarinfo, zip.open(filename))
459 elif stat.S_ISREG(mode) or stat.S_IFMT(mode) == 0:
460 tar.addfile(tarinfo, zip.open(filename))
462 raise Exception("Unsupported file mode %o" % stat.S_IFMT(mode))
464 elif orig_typ == "tar":
466 raise Exception("Repacking a tar to zip is not supported")
469 ifh = open_tar_stream(orig)
471 # To apply the filter, we need to open the tar stream and
473 origtar = tarfile.open(fileobj=ifh, mode="r|")
474 with ctx.stream_writer(fh) as compressor, tarfile.open(
477 format=origtar.format,
479 for tarinfo in origtar:
482 tarinfo.name = filter(tarinfo.name)
483 if "path" in tarinfo.pax_headers:
484 tarinfo.pax_headers["path"] = filter(
485 tarinfo.pax_headers["path"]
488 tar.addfile(tarinfo, origtar.extractfile(tarinfo))
492 # We only change compression here. The tar stream is unchanged.
493 ctx.copy_stream(ifh, fh)
496 def fetch_and_extract(url, dest_dir, extract=True, sha256=None, size=None):
497 """Fetch a URL and extract it to a destination path.
499 If the downloaded URL is an archive, it is extracted automatically
500 and the archive is deleted. Otherwise the file remains in place in
501 the destination directory.
504 basename = urllib.parse.urlparse(url).path.split("/")[-1]
505 dest_path = dest_dir / basename
507 download_to_path(url, dest_path, sha256=sha256, size=size)
512 typ = archive_type(dest_path)
514 extract_archive(dest_path, dest_dir, typ)
515 log("Removing %s" % dest_path)
519 def fetch_urls(downloads):
520 """Fetch URLs pairs to a pathlib.Path."""
521 with concurrent.futures.ThreadPoolExecutor(CONCURRENCY) as e:
524 for download in downloads:
525 fs.append(e.submit(fetch_and_extract, *download))
531 def _git_checkout_github_archive(dest_path: pathlib.Path, repo: str,
532 commit: str, prefix: str):
533 'Use github archive generator to speed up github git repo cloning'
534 repo = repo.rstrip('/')
535 github_url = '{repo}/archive/{commit}.tar.gz'.format(**locals())
537 with tempfile.TemporaryDirectory() as td:
538 temp_dir = pathlib.Path(td)
539 dl_dest = temp_dir / 'archive.tar.gz'
540 download_to_path(github_url, dl_dest)
541 repack_archive(dl_dest, dest_path,
546 def _github_submodule_required(repo: str, commit: str):
547 'Use github API to check if submodules are used'
548 url = '{repo}/blob/{commit}/.gitmodules'.format(**locals())
550 status_code = urllib.request.urlopen(url).getcode()
551 return status_code == 200
556 def git_checkout_archive(
557 dest_path: pathlib.Path,
562 include_dot_git=False,
564 """Produce an archive of the files comprising a Git checkout."""
565 dest_path.parent.mkdir(parents=True, exist_ok=True)
568 prefix = repo.rstrip("/").rsplit("/", 1)[-1]
570 if dest_path.suffixes[-2:] != [".tar", ".zst"]:
571 raise Exception("Only producing .tar.zst archives is supported.")
573 if repo.startswith('https://github.com/'):
574 if not include_dot_git and not _github_submodule_required(repo, commit):
575 log("Using github archive service to speedup archive creation")
576 # Always log sha1 info, either from commit or resolved from repo.
577 if re.match(r"^[a-fA-F0-9]{40}$", commit):
580 ref_output = subprocess.check_output(["git", "ls-remote", repo,
581 'refs/heads/' + commit])
582 revision, _ = ref_output.decode().split(maxsplit=1)
583 log("Fetching revision {}".format(revision))
584 return _git_checkout_github_archive(dest_path, repo, commit, prefix)
586 with tempfile.TemporaryDirectory() as td:
587 temp_dir = pathlib.Path(td)
589 git_dir = temp_dir / prefix
591 # This could be faster with a shallow clone. However, Git requires a ref
592 # to initiate a clone. Since the commit-ish may not refer to a ref, we
593 # simply perform a full clone followed by a checkout.
594 print("cloning %s to %s" % (repo, git_dir))
596 env = os.environ.copy()
599 taskcluster_secret_url = api(
600 os.environ.get("TASKCLUSTER_PROXY_URL"),
603 "secret/{keypath}".format(keypath=ssh_key),
605 taskcluster_secret = b"".join(stream_download(taskcluster_secret_url))
606 taskcluster_secret = json.loads(taskcluster_secret)
607 sshkey = taskcluster_secret["secret"]["ssh_privkey"]
609 keypath = temp_dir.joinpath("ssh-key")
610 keypath.write_text(sshkey)
614 "GIT_SSH_COMMAND": "ssh -o 'StrictHostKeyChecking no' -i {keypath}".format(
619 subprocess.run(["git", "clone", "-n", repo, str(git_dir)], check=True, env=env)
621 # Always use a detached head so that git prints out what it checked out.
623 ["git", "checkout", "--detach", commit], cwd=str(git_dir), check=True
626 # When including the .git, we want --depth 1, but a direct clone would not
627 # necessarily be able to give us the right commit.
629 initial_clone = git_dir.with_name(git_dir.name + ".orig")
630 git_dir.rename(initial_clone)
635 "file://" + str(initial_clone),
643 ["git", "remote", "set-url", "origin", repo],
648 # --depth 1 can induce more work on the server side, so only use it for
649 # submodule initialization when we want to keep the .git directory.
650 depth = ["--depth", "1"] if include_dot_git else []
652 ["git", "submodule", "update", "--init"] + depth,
660 print("creating archive %s of commit %s" % (dest_path, commit))
661 exclude_dot_git = [] if include_dot_git else ["--exclude=.git"]
662 proc = subprocess.Popen(
674 stdout=subprocess.PIPE,
677 with rename_after_close(dest_path, "wb") as out:
678 ctx = ZstdCompressor()
679 ctx.copy_stream(proc.stdout, out)
684 def command_git_checkout_archive(args):
685 dest = pathlib.Path(args.dest)
688 git_checkout_archive(
692 prefix=args.path_prefix,
693 ssh_key=args.ssh_key_secret,
694 include_dot_git=args.include_dot_git,
699 except FileNotFoundError:
705 def command_static_url(args):
706 gpg_sig_url = args.gpg_sig_url
707 gpg_env_key = args.gpg_key_env
709 if bool(gpg_sig_url) != bool(gpg_env_key):
710 print("--gpg-sig-url and --gpg-key-env must both be defined")
714 gpg_signature = b"".join(stream_download(gpg_sig_url))
715 gpg_key = os.environb[gpg_env_key.encode("ascii")]
717 dest = pathlib.Path(args.dest)
718 dest.parent.mkdir(parents=True, exist_ok=True)
720 basename = urllib.parse.urlparse(args.url).path.split("/")[-1]
721 if basename.endswith("".join(dest.suffixes)):
724 dl_dest = dest.parent / basename
727 download_to_path(args.url, dl_dest, sha256=args.sha256, size=args.size)
730 gpg_verify_path(dl_dest, gpg_key, gpg_signature)
732 if dl_dest != dest or args.strip_components or args.add_prefix:
733 repack_archive(dl_dest, dest, args.strip_components, args.add_prefix)
737 except FileNotFoundError:
743 log("Removing %s" % dl_dest)
747 def api(root_url, service, version, path):
748 # taskcluster-lib-urls is not available when this script runs, so
749 # simulate its behavior:
750 return "{root_url}/api/{service}/{version}/{path}".format(
751 root_url=root_url, service=service, version=version, path=path
755 def get_hash(fetch, root_url):
756 path = "task/{task}/artifacts/{artifact}".format(
757 task=fetch["task"], artifact="public/chain-of-trust.json"
759 url = api(root_url, "queue", "v1", path)
760 cot = json.loads(download_to_memory(url))
761 return cot["artifacts"][fetch["artifact"]]["sha256"]
764 def command_task_artifacts(args):
765 start = time.monotonic()
766 fetches = json.loads(os.environ["MOZ_FETCHES"])
768 for fetch in fetches:
769 extdir = pathlib.Path(args.dest)
771 # Note: normpath doesn't like pathlib.Path in python 3.5
772 extdir = pathlib.Path(os.path.normpath(str(extdir.joinpath(fetch["dest"]))))
773 extdir.mkdir(parents=True, exist_ok=True)
774 root_url = os.environ["TASKCLUSTER_ROOT_URL"]
776 if fetch.get("verify-hash"):
777 sha256 = get_hash(fetch, root_url)
778 if fetch["artifact"].startswith("public/"):
779 path = "task/{task}/artifacts/{artifact}".format(
780 task=fetch["task"], artifact=fetch["artifact"]
782 url = api(root_url, "queue", "v1", path)
784 url = ("{proxy_url}/api/queue/v1/task/{task}/artifacts/{artifact}").format(
785 proxy_url=os.environ["TASKCLUSTER_PROXY_URL"],
787 artifact=fetch["artifact"],
789 downloads.append((url, extdir, fetch["extract"], sha256))
791 fetch_urls(downloads)
792 end = time.monotonic()
795 "framework": {"name": "build_metrics"},
798 "name": "fetch_content",
799 "value": end - start,
800 "lowerIsBetter": True,
801 "shouldAlert": False,
806 print("PERFHERDER_DATA: {}".format(json.dumps(perfherder_data)), file=sys.stderr)
810 parser = argparse.ArgumentParser()
811 subparsers = parser.add_subparsers(title="sub commands")
813 git_checkout = subparsers.add_parser(
814 "git-checkout-archive",
815 help="Obtain an archive of files from a Git repository checkout",
817 git_checkout.set_defaults(func=command_git_checkout_archive)
818 git_checkout.add_argument(
819 "--path-prefix", help="Prefix for paths in produced archive"
821 git_checkout.add_argument("repo", help="URL to Git repository to be cloned")
822 git_checkout.add_argument("commit", help="Git commit to check out")
823 git_checkout.add_argument("dest", help="Destination path of archive")
824 git_checkout.add_argument(
825 "--ssh-key-secret", help="The scope path of the ssh key to used for checkout"
827 git_checkout.add_argument(
828 "--include-dot-git", action="store_true", help="Include the .git directory"
831 url = subparsers.add_parser("static-url", help="Download a static URL")
832 url.set_defaults(func=command_static_url)
833 url.add_argument("--sha256", required=True, help="SHA-256 of downloaded content")
835 "--size", required=True, type=int, help="Size of downloaded content, in bytes"
839 help="URL containing signed GPG document validating " "URL to fetch",
842 "--gpg-key-env", help="Environment variable containing GPG key to validate"
845 "--strip-components",
848 help="Number of leading components to strip from file "
849 "names in the downloaded archive",
854 help="Prefix to add to file names in the downloaded " "archive",
856 url.add_argument("url", help="URL to fetch")
857 url.add_argument("dest", help="Destination path")
859 artifacts = subparsers.add_parser("task-artifacts", help="Fetch task artifacts")
860 artifacts.set_defaults(func=command_task_artifacts)
861 artifacts.add_argument(
864 default=os.environ.get("MOZ_FETCHES_DIR"),
865 help="Destination directory which will contain all "
866 "artifacts (defaults to $MOZ_FETCHES_DIR)",
869 args = parser.parse_args()
873 "no destination directory specified, either pass in --dest "
874 "or set $MOZ_FETCHES_DIR"
877 return args.func(args)
880 if __name__ == "__main__":