Bug 1797755 - Part 5: Use a single initial mark stack size regardless of whether...
[gecko.git] / build / upload_generated_sources.py
blob15439565d91c175219722d9aba51bfbb3b770251
1 #!/usr/bin/env/python
2 # This Source Code Form is subject to the terms of the Mozilla Public
3 # License, v. 2.0. If a copy of the MPL was not distributed with this
4 # file, You can obtain one at http://mozilla.org/MPL/2.0/.
6 from __future__ import absolute_import, print_function, unicode_literals
8 import argparse
9 from contextlib import contextmanager
10 import gzip
11 import io
12 import logging
13 from mozbuild.generated_sources import (
14 get_filename_with_digest,
15 get_s3_region_and_bucket,
17 import os
18 from six.moves.queue import Queue
19 import requests
20 import sys
21 import tarfile
22 from requests.packages.urllib3.util.retry import Retry
23 from threading import Event, Thread
24 import time
26 # Arbitrary, should probably measure this.
27 NUM_WORKER_THREADS = 10
28 log = logging.getLogger("upload-generated-sources")
29 log.setLevel(logging.INFO)
32 @contextmanager
33 def timed():
34 """
35 Yield a function that provides the elapsed time in seconds since this
36 function was called.
37 """
38 start = time.time()
40 def elapsed():
41 return time.time() - start
43 yield elapsed
46 def gzip_compress(data):
47 """
48 Apply gzip compression to `data` and return the result as a `BytesIO`.
49 """
50 b = io.BytesIO()
51 with gzip.GzipFile(fileobj=b, mode="w") as f:
52 f.write(data)
53 b.flush()
54 b.seek(0)
55 return b
58 def upload_worker(queue, event, bucket, session_args):
59 """
60 Get `(name, contents)` entries from `queue` and upload `contents`
61 to S3 with gzip compression using `name` as the key, prefixed with
62 the SHA-512 digest of `contents` as a hex string. If an exception occurs,
63 set `event`.
64 """
65 try:
66 import boto3
68 session = boto3.session.Session(**session_args)
69 s3 = session.client("s3")
70 while True:
71 if event.is_set():
72 # Some other thread hit an exception.
73 return
74 (name, contents) = queue.get()
75 pathname = get_filename_with_digest(name, contents)
76 compressed = gzip_compress(contents)
77 extra_args = {
78 "ContentEncoding": "gzip",
79 "ContentType": "text/plain",
81 log.info(
82 'Uploading "{}" ({} bytes)'.format(pathname, len(compressed.getvalue()))
84 with timed() as elapsed:
85 s3.upload_fileobj(compressed, bucket, pathname, ExtraArgs=extra_args)
86 log.info(
87 'Finished uploading "{}" in {:0.3f}s'.format(pathname, elapsed())
89 queue.task_done()
90 except Exception:
91 log.exception("Thread encountered exception:")
92 event.set()
95 def do_work(artifact, region, bucket):
96 session_args = {"region_name": region}
97 session = requests.Session()
98 retry = Retry(total=5, backoff_factor=0.1, status_forcelist=[500, 502, 503, 504])
99 http_adapter = requests.adapters.HTTPAdapter(max_retries=retry)
100 session.mount("https://", http_adapter)
101 session.mount("http://", http_adapter)
103 if "TASK_ID" in os.environ:
104 level = os.environ.get("MOZ_SCM_LEVEL", "1")
105 secrets_url = "http://taskcluster/secrets/v1/secret/project/releng/gecko/build/level-{}/gecko-generated-sources-upload".format( # noqa
106 level
108 log.info(
109 'Using AWS credentials from the secrets service: "{}"'.format(secrets_url)
111 res = session.get(secrets_url)
112 res.raise_for_status()
113 secret = res.json()
114 session_args.update(
115 aws_access_key_id=secret["secret"]["AWS_ACCESS_KEY_ID"],
116 aws_secret_access_key=secret["secret"]["AWS_SECRET_ACCESS_KEY"],
118 else:
119 log.info("Trying to use your AWS credentials..")
121 # First, fetch the artifact containing the sources.
122 log.info('Fetching generated sources artifact: "{}"'.format(artifact))
123 with timed() as elapsed:
124 res = session.get(artifact)
125 log.info(
126 "Fetch HTTP status: {}, {} bytes downloaded in {:0.3f}s".format(
127 res.status_code, len(res.content), elapsed()
130 res.raise_for_status()
131 # Create a queue and worker threads for uploading.
132 q = Queue()
133 event = Event()
134 log.info("Creating {} worker threads".format(NUM_WORKER_THREADS))
135 for i in range(NUM_WORKER_THREADS):
136 t = Thread(target=upload_worker, args=(q, event, bucket, session_args))
137 t.daemon = True
138 t.start()
139 with tarfile.open(fileobj=io.BytesIO(res.content), mode="r|gz") as tar:
140 # Next, process each file.
141 for entry in tar:
142 if event.is_set():
143 break
144 log.info('Queueing "{}"'.format(entry.name))
145 q.put((entry.name, tar.extractfile(entry).read()))
146 # Wait until all uploads are finished.
147 # We don't use q.join() here because we want to also monitor event.
148 while q.unfinished_tasks:
149 if event.wait(0.1):
150 log.error("Worker thread encountered exception, exiting...")
151 break
154 def main(argv):
155 logging.basicConfig(format="%(levelname)s - %(threadName)s - %(message)s")
156 parser = argparse.ArgumentParser(
157 description="Upload generated source files in ARTIFACT to BUCKET in S3."
159 parser.add_argument("artifact", help="generated-sources artifact from build task")
160 args = parser.parse_args(argv)
161 region, bucket = get_s3_region_and_bucket()
163 with timed() as elapsed:
164 do_work(region=region, bucket=bucket, artifact=args.artifact)
165 log.info("Finished in {:.03f}s".format(elapsed()))
166 return 0
169 if __name__ == "__main__":
170 sys.exit(main(sys.argv[1:]))