Bug 1758804 [wpt PR 33140] - Move hint to the provider dictionary, a=testonly
[gecko.git] / build / upload_generated_sources.py
blob4a528b106eb5f037a396c3a3e191b81c9e0c47f8
1 #!/usr/bin/env/python
2 # This Source Code Form is subject to the terms of the Mozilla Public
3 # License, v. 2.0. If a copy of the MPL was not distributed with this
4 # file, You can obtain one at http://mozilla.org/MPL/2.0/.
6 from __future__ import absolute_import, print_function, unicode_literals
8 import argparse
9 from contextlib import contextmanager
10 import gzip
11 import io
12 import logging
13 from mozbuild.base import MozbuildObject
14 from mozbuild.generated_sources import (
15 get_filename_with_digest,
16 get_s3_region_and_bucket,
18 import os
19 from six.moves.queue import Queue
20 import requests
21 import sys
22 import tarfile
23 from requests.packages.urllib3.util.retry import Retry
24 from threading import Event, Thread
25 import time
27 # Arbitrary, should probably measure this.
28 NUM_WORKER_THREADS = 10
29 log = logging.getLogger("upload-generated-sources")
30 log.setLevel(logging.INFO)
33 @contextmanager
34 def timed():
35 """
36 Yield a function that provides the elapsed time in seconds since this
37 function was called.
38 """
39 start = time.time()
41 def elapsed():
42 return time.time() - start
44 yield elapsed
47 def gzip_compress(data):
48 """
49 Apply gzip compression to `data` and return the result as a `BytesIO`.
50 """
51 b = io.BytesIO()
52 with gzip.GzipFile(fileobj=b, mode="w") as f:
53 f.write(data)
54 b.flush()
55 b.seek(0)
56 return b
59 def upload_worker(queue, event, bucket, session_args):
60 """
61 Get `(name, contents)` entries from `queue` and upload `contents`
62 to S3 with gzip compression using `name` as the key, prefixed with
63 the SHA-512 digest of `contents` as a hex string. If an exception occurs,
64 set `event`.
65 """
66 try:
67 import boto3
69 session = boto3.session.Session(**session_args)
70 s3 = session.client("s3")
71 while True:
72 if event.is_set():
73 # Some other thread hit an exception.
74 return
75 (name, contents) = queue.get()
76 pathname = get_filename_with_digest(name, contents)
77 compressed = gzip_compress(contents)
78 extra_args = {
79 "ContentEncoding": "gzip",
80 "ContentType": "text/plain",
82 log.info(
83 'Uploading "{}" ({} bytes)'.format(pathname, len(compressed.getvalue()))
85 with timed() as elapsed:
86 s3.upload_fileobj(compressed, bucket, pathname, ExtraArgs=extra_args)
87 log.info(
88 'Finished uploading "{}" in {:0.3f}s'.format(pathname, elapsed())
90 queue.task_done()
91 except Exception:
92 log.exception("Thread encountered exception:")
93 event.set()
96 def do_work(artifact, region, bucket):
97 session_args = {"region_name": region}
98 session = requests.Session()
99 retry = Retry(total=5, backoff_factor=0.1, status_forcelist=[500, 502, 503, 504])
100 http_adapter = requests.adapters.HTTPAdapter(max_retries=retry)
101 session.mount("https://", http_adapter)
102 session.mount("http://", http_adapter)
104 if "TASK_ID" in os.environ:
105 level = os.environ.get("MOZ_SCM_LEVEL", "1")
106 secrets_url = "http://taskcluster/secrets/v1/secret/project/releng/gecko/build/level-{}/gecko-generated-sources-upload".format( # noqa
107 level
109 log.info(
110 'Using AWS credentials from the secrets service: "{}"'.format(secrets_url)
112 res = session.get(secrets_url)
113 res.raise_for_status()
114 secret = res.json()
115 session_args.update(
116 aws_access_key_id=secret["secret"]["AWS_ACCESS_KEY_ID"],
117 aws_secret_access_key=secret["secret"]["AWS_SECRET_ACCESS_KEY"],
119 else:
120 log.info("Trying to use your AWS credentials..")
122 # First, fetch the artifact containing the sources.
123 log.info('Fetching generated sources artifact: "{}"'.format(artifact))
124 with timed() as elapsed:
125 res = session.get(artifact)
126 log.info(
127 "Fetch HTTP status: {}, {} bytes downloaded in {:0.3f}s".format(
128 res.status_code, len(res.content), elapsed()
131 res.raise_for_status()
132 # Create a queue and worker threads for uploading.
133 q = Queue()
134 event = Event()
135 log.info("Creating {} worker threads".format(NUM_WORKER_THREADS))
136 for i in range(NUM_WORKER_THREADS):
137 t = Thread(target=upload_worker, args=(q, event, bucket, session_args))
138 t.daemon = True
139 t.start()
140 with tarfile.open(fileobj=io.BytesIO(res.content), mode="r|gz") as tar:
141 # Next, process each file.
142 for entry in tar:
143 if event.is_set():
144 break
145 log.info('Queueing "{}"'.format(entry.name))
146 q.put((entry.name, tar.extractfile(entry).read()))
147 # Wait until all uploads are finished.
148 # We don't use q.join() here because we want to also monitor event.
149 while q.unfinished_tasks:
150 if event.wait(0.1):
151 log.error("Worker thread encountered exception, exiting...")
152 break
155 def main(argv):
156 logging.basicConfig(format="%(levelname)s - %(threadName)s - %(message)s")
157 parser = argparse.ArgumentParser(
158 description="Upload generated source files in ARTIFACT to BUCKET in S3."
160 parser.add_argument("artifact", help="generated-sources artifact from build task")
161 args = parser.parse_args(argv)
162 region, bucket = get_s3_region_and_bucket()
164 config = MozbuildObject.from_environment()
165 config.activate_virtualenv()
166 config.virtualenv_manager.install_pip_package("boto3==1.4.4")
168 with timed() as elapsed:
169 do_work(region=region, bucket=bucket, artifact=args.artifact)
170 log.info("Finished in {:.03f}s".format(elapsed()))
171 return 0
174 if __name__ == "__main__":
175 sys.exit(main(sys.argv[1:]))