Bug 1867190 - Add prefs for PHC probablities r=glandium
[gecko.git] / build / upload_generated_sources.py
blob3e4b4acd599bb2e5762c06a668ec3449e2cfdf86
1 #!/usr/bin/env/python
2 # This Source Code Form is subject to the terms of the Mozilla Public
3 # License, v. 2.0. If a copy of the MPL was not distributed with this
4 # file, You can obtain one at http://mozilla.org/MPL/2.0/.
6 import argparse
7 import gzip
8 import io
9 import logging
10 import os
11 import sys
12 import tarfile
13 import time
14 from contextlib import contextmanager
15 from threading import Event, Thread
17 import requests
18 from mozbuild.generated_sources import (
19 get_filename_with_digest,
20 get_s3_region_and_bucket,
22 from requests.packages.urllib3.util.retry import Retry
23 from six.moves.queue import Queue
25 # Arbitrary, should probably measure this.
26 NUM_WORKER_THREADS = 10
27 log = logging.getLogger("upload-generated-sources")
28 log.setLevel(logging.INFO)
31 @contextmanager
32 def timed():
33 """
34 Yield a function that provides the elapsed time in seconds since this
35 function was called.
36 """
37 start = time.time()
39 def elapsed():
40 return time.time() - start
42 yield elapsed
45 def gzip_compress(data):
46 """
47 Apply gzip compression to `data` and return the result as a `BytesIO`.
48 """
49 b = io.BytesIO()
50 with gzip.GzipFile(fileobj=b, mode="w") as f:
51 f.write(data)
52 b.flush()
53 b.seek(0)
54 return b
57 def upload_worker(queue, event, bucket, session_args):
58 """
59 Get `(name, contents)` entries from `queue` and upload `contents`
60 to S3 with gzip compression using `name` as the key, prefixed with
61 the SHA-512 digest of `contents` as a hex string. If an exception occurs,
62 set `event`.
63 """
64 try:
65 import boto3
67 session = boto3.session.Session(**session_args)
68 s3 = session.client("s3")
69 while True:
70 if event.is_set():
71 # Some other thread hit an exception.
72 return
73 (name, contents) = queue.get()
74 pathname = get_filename_with_digest(name, contents)
75 compressed = gzip_compress(contents)
76 extra_args = {
77 "ContentEncoding": "gzip",
78 "ContentType": "text/plain",
80 log.info(
81 'Uploading "{}" ({} bytes)'.format(pathname, len(compressed.getvalue()))
83 with timed() as elapsed:
84 s3.upload_fileobj(compressed, bucket, pathname, ExtraArgs=extra_args)
85 log.info(
86 'Finished uploading "{}" in {:0.3f}s'.format(pathname, elapsed())
88 queue.task_done()
89 except Exception:
90 log.exception("Thread encountered exception:")
91 event.set()
94 def do_work(artifact, region, bucket):
95 session_args = {"region_name": region}
96 session = requests.Session()
97 retry = Retry(total=5, backoff_factor=0.1, status_forcelist=[500, 502, 503, 504])
98 http_adapter = requests.adapters.HTTPAdapter(max_retries=retry)
99 session.mount("https://", http_adapter)
100 session.mount("http://", http_adapter)
102 if "TASK_ID" in os.environ:
103 level = os.environ.get("MOZ_SCM_LEVEL", "1")
104 secrets_url = "http://taskcluster/secrets/v1/secret/project/releng/gecko/build/level-{}/gecko-generated-sources-upload".format( # noqa
105 level
107 log.info(
108 'Using AWS credentials from the secrets service: "{}"'.format(secrets_url)
110 res = session.get(secrets_url)
111 res.raise_for_status()
112 secret = res.json()
113 session_args.update(
114 aws_access_key_id=secret["secret"]["AWS_ACCESS_KEY_ID"],
115 aws_secret_access_key=secret["secret"]["AWS_SECRET_ACCESS_KEY"],
117 else:
118 log.info("Trying to use your AWS credentials..")
120 # First, fetch the artifact containing the sources.
121 log.info('Fetching generated sources artifact: "{}"'.format(artifact))
122 with timed() as elapsed:
123 res = session.get(artifact)
124 log.info(
125 "Fetch HTTP status: {}, {} bytes downloaded in {:0.3f}s".format(
126 res.status_code, len(res.content), elapsed()
129 res.raise_for_status()
130 # Create a queue and worker threads for uploading.
131 q = Queue()
132 event = Event()
133 log.info("Creating {} worker threads".format(NUM_WORKER_THREADS))
134 for i in range(NUM_WORKER_THREADS):
135 t = Thread(target=upload_worker, args=(q, event, bucket, session_args))
136 t.daemon = True
137 t.start()
138 with tarfile.open(fileobj=io.BytesIO(res.content), mode="r|gz") as tar:
139 # Next, process each file.
140 for entry in tar:
141 if event.is_set():
142 break
143 log.info('Queueing "{}"'.format(entry.name))
144 q.put((entry.name, tar.extractfile(entry).read()))
145 # Wait until all uploads are finished.
146 # We don't use q.join() here because we want to also monitor event.
147 while q.unfinished_tasks:
148 if event.wait(0.1):
149 log.error("Worker thread encountered exception, exiting...")
150 sys.exit(1)
153 def main(argv):
154 logging.basicConfig(format="%(levelname)s - %(threadName)s - %(message)s")
155 parser = argparse.ArgumentParser(
156 description="Upload generated source files in ARTIFACT to BUCKET in S3."
158 parser.add_argument("artifact", help="generated-sources artifact from build task")
159 args = parser.parse_args(argv)
160 region, bucket = get_s3_region_and_bucket()
162 with timed() as elapsed:
163 do_work(region=region, bucket=bucket, artifact=args.artifact)
164 log.info("Finished in {:.03f}s".format(elapsed()))
165 return 0
168 if __name__ == "__main__":
169 sys.exit(main(sys.argv[1:]))