2 # This Source Code Form is subject to the terms of the Mozilla Public
3 # License, v. 2.0. If a copy of the MPL was not distributed with this
4 # file, You can obtain one at http://mozilla.org/MPL/2.0/.
6 from __future__
import absolute_import
, print_function
, unicode_literals
9 from contextlib
import contextmanager
13 from mozbuild
.base
import MozbuildObject
14 from mozbuild
.generated_sources
import (
15 get_filename_with_digest
,
16 get_s3_region_and_bucket
,
19 from Queue
import Queue
23 from threading
import Event
, Thread
26 # Arbitrary, should probably measure this.
27 NUM_WORKER_THREADS
= 10
28 log
= logging
.getLogger('upload-generated-sources')
29 log
.setLevel(logging
.INFO
)
35 Yield a function that provides the elapsed time in seconds since this
41 return time
.time() - start
45 def gzip_compress(data
):
47 Apply gzip compression to `data` and return the result as a `BytesIO`.
50 with gzip
.GzipFile(fileobj
=b
, mode
='w') as f
:
57 def upload_worker(queue
, event
, bucket
, session_args
):
59 Get `(name, contents)` entries from `queue` and upload `contents`
60 to S3 with gzip compression using `name` as the key, prefixed with
61 the SHA-512 digest of `contents` as a hex string. If an exception occurs,
66 session
= boto3
.session
.Session(**session_args
)
67 s3
= session
.client('s3')
70 # Some other thread hit an exception.
72 (name
, contents
) = queue
.get()
73 pathname
= get_filename_with_digest(name
, contents
)
74 compressed
= gzip_compress(contents
)
76 'ContentEncoding': 'gzip',
77 'ContentType': 'text/plain',
79 log
.info('Uploading "{}" ({} bytes)'.format(
80 pathname
, len(compressed
.getvalue())))
81 with
timed() as elapsed
:
82 s3
.upload_fileobj(compressed
, bucket
,
83 pathname
, ExtraArgs
=extra_args
)
84 log
.info('Finished uploading "{}" in {:0.3f}s'.format(
88 log
.exception('Thread encountered exception:')
92 def do_work(artifact
, region
, bucket
):
93 session_args
= {'region_name': region
}
94 session
= requests
.Session()
95 if 'TASK_ID' in os
.environ
:
96 level
= os
.environ
.get('MOZ_SCM_LEVEL', '1')
97 secrets_url
= 'http://taskcluster/secrets/v1/secret/project/releng/gecko/build/level-{}/gecko-generated-sources-upload'.format( # noqa
100 'Using AWS credentials from the secrets service: "{}"'.format(secrets_url
))
101 res
= session
.get(secrets_url
)
102 res
.raise_for_status()
105 aws_access_key_id
=secret
['secret']['AWS_ACCESS_KEY_ID'],
106 aws_secret_access_key
=secret
['secret']['AWS_SECRET_ACCESS_KEY'],
109 log
.info('Trying to use your AWS credentials..')
111 # First, fetch the artifact containing the sources.
112 log
.info('Fetching generated sources artifact: "{}"'.format(artifact
))
113 with
timed() as elapsed
:
114 res
= session
.get(artifact
)
115 log
.info('Fetch HTTP status: {}, {} bytes downloaded in {:0.3f}s'.format(
116 res
.status_code
, len(res
.content
), elapsed()))
117 res
.raise_for_status()
118 # Create a queue and worker threads for uploading.
121 log
.info('Creating {} worker threads'.format(NUM_WORKER_THREADS
))
122 for i
in range(NUM_WORKER_THREADS
):
123 t
= Thread(target
=upload_worker
, args
=(q
, event
, bucket
, session_args
))
126 with tarfile
.open(fileobj
=io
.BytesIO(res
.content
), mode
='r|gz') as tar
:
127 # Next, process each file.
131 log
.info('Queueing "{}"'.format(entry
.name
))
132 q
.put((entry
.name
, tar
.extractfile(entry
).read()))
133 # Wait until all uploads are finished.
134 # We don't use q.join() here because we want to also monitor event.
135 while q
.unfinished_tasks
:
137 log
.error('Worker thread encountered exception, exiting...')
142 logging
.basicConfig(format
='%(levelname)s - %(threadName)s - %(message)s')
143 parser
= argparse
.ArgumentParser(
144 description
='Upload generated source files in ARTIFACT to BUCKET in S3.')
145 parser
.add_argument('artifact',
146 help='generated-sources artifact from build task')
147 args
= parser
.parse_args(argv
)
148 region
, bucket
= get_s3_region_and_bucket()
150 config
= MozbuildObject
.from_environment()
151 config
._activate
_virtualenv
()
152 config
.virtualenv_manager
.install_pip_package('boto3==1.4.4')
154 with
timed() as elapsed
:
155 do_work(region
=region
, bucket
=bucket
, artifact
=args
.artifact
)
156 log
.info('Finished in {:.03f}s'.format(elapsed()))
160 if __name__
== '__main__':
161 sys
.exit(main(sys
.argv
[1:]))