2 # This Source Code Form is subject to the terms of the Mozilla Public
3 # License, v. 2.0. If a copy of the MPL was not distributed with this
4 # file, You can obtain one at http://mozilla.org/MPL/2.0/.
6 from __future__
import absolute_import
, print_function
, unicode_literals
9 from contextlib
import contextmanager
13 from mozbuild
.generated_sources
import (
14 get_filename_with_digest
,
15 get_s3_region_and_bucket
,
18 from six
.moves
.queue
import Queue
22 from requests
.packages
.urllib3
.util
.retry
import Retry
23 from threading
import Event
, Thread
26 # Arbitrary, should probably measure this.
27 NUM_WORKER_THREADS
= 10
28 log
= logging
.getLogger("upload-generated-sources")
29 log
.setLevel(logging
.INFO
)
35 Yield a function that provides the elapsed time in seconds since this
41 return time
.time() - start
46 def gzip_compress(data
):
48 Apply gzip compression to `data` and return the result as a `BytesIO`.
51 with gzip
.GzipFile(fileobj
=b
, mode
="w") as f
:
58 def upload_worker(queue
, event
, bucket
, session_args
):
60 Get `(name, contents)` entries from `queue` and upload `contents`
61 to S3 with gzip compression using `name` as the key, prefixed with
62 the SHA-512 digest of `contents` as a hex string. If an exception occurs,
68 session
= boto3
.session
.Session(**session_args
)
69 s3
= session
.client("s3")
72 # Some other thread hit an exception.
74 (name
, contents
) = queue
.get()
75 pathname
= get_filename_with_digest(name
, contents
)
76 compressed
= gzip_compress(contents
)
78 "ContentEncoding": "gzip",
79 "ContentType": "text/plain",
82 'Uploading "{}" ({} bytes)'.format(pathname
, len(compressed
.getvalue()))
84 with
timed() as elapsed
:
85 s3
.upload_fileobj(compressed
, bucket
, pathname
, ExtraArgs
=extra_args
)
87 'Finished uploading "{}" in {:0.3f}s'.format(pathname
, elapsed())
91 log
.exception("Thread encountered exception:")
95 def do_work(artifact
, region
, bucket
):
96 session_args
= {"region_name": region
}
97 session
= requests
.Session()
98 retry
= Retry(total
=5, backoff_factor
=0.1, status_forcelist
=[500, 502, 503, 504])
99 http_adapter
= requests
.adapters
.HTTPAdapter(max_retries
=retry
)
100 session
.mount("https://", http_adapter
)
101 session
.mount("http://", http_adapter
)
103 if "TASK_ID" in os
.environ
:
104 level
= os
.environ
.get("MOZ_SCM_LEVEL", "1")
105 secrets_url
= "http://taskcluster/secrets/v1/secret/project/releng/gecko/build/level-{}/gecko-generated-sources-upload".format( # noqa
109 'Using AWS credentials from the secrets service: "{}"'.format(secrets_url
)
111 res
= session
.get(secrets_url
)
112 res
.raise_for_status()
115 aws_access_key_id
=secret
["secret"]["AWS_ACCESS_KEY_ID"],
116 aws_secret_access_key
=secret
["secret"]["AWS_SECRET_ACCESS_KEY"],
119 log
.info("Trying to use your AWS credentials..")
121 # First, fetch the artifact containing the sources.
122 log
.info('Fetching generated sources artifact: "{}"'.format(artifact
))
123 with
timed() as elapsed
:
124 res
= session
.get(artifact
)
126 "Fetch HTTP status: {}, {} bytes downloaded in {:0.3f}s".format(
127 res
.status_code
, len(res
.content
), elapsed()
130 res
.raise_for_status()
131 # Create a queue and worker threads for uploading.
134 log
.info("Creating {} worker threads".format(NUM_WORKER_THREADS
))
135 for i
in range(NUM_WORKER_THREADS
):
136 t
= Thread(target
=upload_worker
, args
=(q
, event
, bucket
, session_args
))
139 with tarfile
.open(fileobj
=io
.BytesIO(res
.content
), mode
="r|gz") as tar
:
140 # Next, process each file.
144 log
.info('Queueing "{}"'.format(entry
.name
))
145 q
.put((entry
.name
, tar
.extractfile(entry
).read()))
146 # Wait until all uploads are finished.
147 # We don't use q.join() here because we want to also monitor event.
148 while q
.unfinished_tasks
:
150 log
.error("Worker thread encountered exception, exiting...")
155 logging
.basicConfig(format
="%(levelname)s - %(threadName)s - %(message)s")
156 parser
= argparse
.ArgumentParser(
157 description
="Upload generated source files in ARTIFACT to BUCKET in S3."
159 parser
.add_argument("artifact", help="generated-sources artifact from build task")
160 args
= parser
.parse_args(argv
)
161 region
, bucket
= get_s3_region_and_bucket()
163 with
timed() as elapsed
:
164 do_work(region
=region
, bucket
=bucket
, artifact
=args
.artifact
)
165 log
.info("Finished in {:.03f}s".format(elapsed()))
169 if __name__
== "__main__":
170 sys
.exit(main(sys
.argv
[1:]))