2 # This Source Code Form is subject to the terms of the Mozilla Public
3 # License, v. 2.0. If a copy of the MPL was not distributed with this
4 # file, You can obtain one at http://mozilla.org/MPL/2.0/.
6 from __future__
import absolute_import
, print_function
, unicode_literals
9 from contextlib
import contextmanager
13 from mozbuild
.base
import MozbuildObject
14 from mozbuild
.generated_sources
import (
15 get_filename_with_digest
,
16 get_s3_region_and_bucket
,
19 from six
.moves
.queue
import Queue
23 from requests
.packages
.urllib3
.util
.retry
import Retry
24 from threading
import Event
, Thread
27 # Arbitrary, should probably measure this.
28 NUM_WORKER_THREADS
= 10
29 log
= logging
.getLogger("upload-generated-sources")
30 log
.setLevel(logging
.INFO
)
36 Yield a function that provides the elapsed time in seconds since this
42 return time
.time() - start
47 def gzip_compress(data
):
49 Apply gzip compression to `data` and return the result as a `BytesIO`.
52 with gzip
.GzipFile(fileobj
=b
, mode
="w") as f
:
59 def upload_worker(queue
, event
, bucket
, session_args
):
61 Get `(name, contents)` entries from `queue` and upload `contents`
62 to S3 with gzip compression using `name` as the key, prefixed with
63 the SHA-512 digest of `contents` as a hex string. If an exception occurs,
69 session
= boto3
.session
.Session(**session_args
)
70 s3
= session
.client("s3")
73 # Some other thread hit an exception.
75 (name
, contents
) = queue
.get()
76 pathname
= get_filename_with_digest(name
, contents
)
77 compressed
= gzip_compress(contents
)
79 "ContentEncoding": "gzip",
80 "ContentType": "text/plain",
83 'Uploading "{}" ({} bytes)'.format(pathname
, len(compressed
.getvalue()))
85 with
timed() as elapsed
:
86 s3
.upload_fileobj(compressed
, bucket
, pathname
, ExtraArgs
=extra_args
)
88 'Finished uploading "{}" in {:0.3f}s'.format(pathname
, elapsed())
92 log
.exception("Thread encountered exception:")
96 def do_work(artifact
, region
, bucket
):
97 session_args
= {"region_name": region
}
98 session
= requests
.Session()
99 retry
= Retry(total
=5, backoff_factor
=0.1, status_forcelist
=[500, 502, 503, 504])
100 http_adapter
= requests
.adapters
.HTTPAdapter(max_retries
=retry
)
101 session
.mount("https://", http_adapter
)
102 session
.mount("http://", http_adapter
)
104 if "TASK_ID" in os
.environ
:
105 level
= os
.environ
.get("MOZ_SCM_LEVEL", "1")
106 secrets_url
= "http://taskcluster/secrets/v1/secret/project/releng/gecko/build/level-{}/gecko-generated-sources-upload".format( # noqa
110 'Using AWS credentials from the secrets service: "{}"'.format(secrets_url
)
112 res
= session
.get(secrets_url
)
113 res
.raise_for_status()
116 aws_access_key_id
=secret
["secret"]["AWS_ACCESS_KEY_ID"],
117 aws_secret_access_key
=secret
["secret"]["AWS_SECRET_ACCESS_KEY"],
120 log
.info("Trying to use your AWS credentials..")
122 # First, fetch the artifact containing the sources.
123 log
.info('Fetching generated sources artifact: "{}"'.format(artifact
))
124 with
timed() as elapsed
:
125 res
= session
.get(artifact
)
127 "Fetch HTTP status: {}, {} bytes downloaded in {:0.3f}s".format(
128 res
.status_code
, len(res
.content
), elapsed()
131 res
.raise_for_status()
132 # Create a queue and worker threads for uploading.
135 log
.info("Creating {} worker threads".format(NUM_WORKER_THREADS
))
136 for i
in range(NUM_WORKER_THREADS
):
137 t
= Thread(target
=upload_worker
, args
=(q
, event
, bucket
, session_args
))
140 with tarfile
.open(fileobj
=io
.BytesIO(res
.content
), mode
="r|gz") as tar
:
141 # Next, process each file.
145 log
.info('Queueing "{}"'.format(entry
.name
))
146 q
.put((entry
.name
, tar
.extractfile(entry
).read()))
147 # Wait until all uploads are finished.
148 # We don't use q.join() here because we want to also monitor event.
149 while q
.unfinished_tasks
:
151 log
.error("Worker thread encountered exception, exiting...")
156 logging
.basicConfig(format
="%(levelname)s - %(threadName)s - %(message)s")
157 parser
= argparse
.ArgumentParser(
158 description
="Upload generated source files in ARTIFACT to BUCKET in S3."
160 parser
.add_argument("artifact", help="generated-sources artifact from build task")
161 args
= parser
.parse_args(argv
)
162 region
, bucket
= get_s3_region_and_bucket()
164 config
= MozbuildObject
.from_environment()
165 config
.activate_virtualenv()
166 config
.virtualenv_manager
.install_pip_package("boto3==1.4.4")
168 with
timed() as elapsed
:
169 do_work(region
=region
, bucket
=bucket
, artifact
=args
.artifact
)
170 log
.info("Finished in {:.03f}s".format(elapsed()))
174 if __name__
== "__main__":
175 sys
.exit(main(sys
.argv
[1:]))