2 # This Source Code Form is subject to the terms of the Mozilla Public
3 # License, v. 2.0. If a copy of the MPL was not distributed with this
4 # file, You can obtain one at http://mozilla.org/MPL/2.0/.
14 from contextlib
import contextmanager
15 from queue
import Queue
16 from threading
import Event
, Thread
19 from mozbuild
.generated_sources
import (
20 get_filename_with_digest
,
21 get_s3_region_and_bucket
,
23 from requests
.packages
.urllib3
.util
.retry
import Retry
25 # Arbitrary, should probably measure this.
26 NUM_WORKER_THREADS
= 10
27 log
= logging
.getLogger("upload-generated-sources")
28 log
.setLevel(logging
.INFO
)
34 Yield a function that provides the elapsed time in seconds since this
40 return time
.time() - start
45 def gzip_compress(data
):
47 Apply gzip compression to `data` and return the result as a `BytesIO`.
50 with gzip
.GzipFile(fileobj
=b
, mode
="w") as f
:
57 def upload_worker(queue
, event
, bucket
, session_args
):
59 Get `(name, contents)` entries from `queue` and upload `contents`
60 to S3 with gzip compression using `name` as the key, prefixed with
61 the SHA-512 digest of `contents` as a hex string. If an exception occurs,
67 session
= boto3
.session
.Session(**session_args
)
68 s3
= session
.client("s3")
71 # Some other thread hit an exception.
73 (name
, contents
) = queue
.get()
74 pathname
= get_filename_with_digest(name
, contents
)
75 compressed
= gzip_compress(contents
)
77 "ContentEncoding": "gzip",
78 "ContentType": "text/plain",
81 'Uploading "{}" ({} bytes)'.format(pathname
, len(compressed
.getvalue()))
83 with
timed() as elapsed
:
84 s3
.upload_fileobj(compressed
, bucket
, pathname
, ExtraArgs
=extra_args
)
86 'Finished uploading "{}" in {:0.3f}s'.format(pathname
, elapsed())
90 log
.exception("Thread encountered exception:")
94 def do_work(artifact
, region
, bucket
):
95 session_args
= {"region_name": region
}
96 session
= requests
.Session()
97 retry
= Retry(total
=5, backoff_factor
=0.1, status_forcelist
=[500, 502, 503, 504])
98 http_adapter
= requests
.adapters
.HTTPAdapter(max_retries
=retry
)
99 session
.mount("https://", http_adapter
)
100 session
.mount("http://", http_adapter
)
102 if "TASK_ID" in os
.environ
:
103 level
= os
.environ
.get("MOZ_SCM_LEVEL", "1")
104 secrets_url
= "http://taskcluster/secrets/v1/secret/project/releng/gecko/build/level-{}/gecko-generated-sources-upload".format( # noqa
108 'Using AWS credentials from the secrets service: "{}"'.format(secrets_url
)
110 res
= session
.get(secrets_url
)
111 res
.raise_for_status()
114 aws_access_key_id
=secret
["secret"]["AWS_ACCESS_KEY_ID"],
115 aws_secret_access_key
=secret
["secret"]["AWS_SECRET_ACCESS_KEY"],
118 log
.info("Trying to use your AWS credentials..")
120 # First, fetch the artifact containing the sources.
121 log
.info('Fetching generated sources artifact: "{}"'.format(artifact
))
122 with
timed() as elapsed
:
123 res
= session
.get(artifact
)
125 "Fetch HTTP status: {}, {} bytes downloaded in {:0.3f}s".format(
126 res
.status_code
, len(res
.content
), elapsed()
129 res
.raise_for_status()
130 # Create a queue and worker threads for uploading.
133 log
.info("Creating {} worker threads".format(NUM_WORKER_THREADS
))
134 for i
in range(NUM_WORKER_THREADS
):
135 t
= Thread(target
=upload_worker
, args
=(q
, event
, bucket
, session_args
))
138 with tarfile
.open(fileobj
=io
.BytesIO(res
.content
), mode
="r|gz") as tar
:
139 # Next, process each file.
143 log
.info('Queueing "{}"'.format(entry
.name
))
144 q
.put((entry
.name
, tar
.extractfile(entry
).read()))
145 # Wait until all uploads are finished.
146 # We don't use q.join() here because we want to also monitor event.
147 while q
.unfinished_tasks
:
149 log
.error("Worker thread encountered exception, exiting...")
154 logging
.basicConfig(format
="%(levelname)s - %(threadName)s - %(message)s")
155 parser
= argparse
.ArgumentParser(
156 description
="Upload generated source files in ARTIFACT to BUCKET in S3."
158 parser
.add_argument("artifact", help="generated-sources artifact from build task")
159 args
= parser
.parse_args(argv
)
160 region
, bucket
= get_s3_region_and_bucket()
162 with
timed() as elapsed
:
163 do_work(region
=region
, bucket
=bucket
, artifact
=args
.artifact
)
164 log
.info("Finished in {:.03f}s".format(elapsed()))
168 if __name__
== "__main__":
169 sys
.exit(main(sys
.argv
[1:]))