2 # This Source Code Form is subject to the terms of the Mozilla Public
3 # License, v. 2.0. If a copy of the MPL was not distributed with this
4 # file, You can obtain one at http://mozilla.org/MPL/2.0/.
6 from __future__
import absolute_import
, print_function
, unicode_literals
9 from contextlib
import contextmanager
13 from mozbuild
.base
import MozbuildObject
14 from mozbuild
.generated_sources
import (
15 get_filename_with_digest
,
16 get_s3_region_and_bucket
,
19 from Queue
import Queue
23 from requests
.packages
.urllib3
.util
.retry
import Retry
24 from threading
import Event
, Thread
27 # Arbitrary, should probably measure this.
28 NUM_WORKER_THREADS
= 10
29 log
= logging
.getLogger('upload-generated-sources')
30 log
.setLevel(logging
.INFO
)
36 Yield a function that provides the elapsed time in seconds since this
42 return time
.time() - start
46 def gzip_compress(data
):
48 Apply gzip compression to `data` and return the result as a `BytesIO`.
51 with gzip
.GzipFile(fileobj
=b
, mode
='w') as f
:
58 def upload_worker(queue
, event
, bucket
, session_args
):
60 Get `(name, contents)` entries from `queue` and upload `contents`
61 to S3 with gzip compression using `name` as the key, prefixed with
62 the SHA-512 digest of `contents` as a hex string. If an exception occurs,
67 session
= boto3
.session
.Session(**session_args
)
68 s3
= session
.client('s3')
71 # Some other thread hit an exception.
73 (name
, contents
) = queue
.get()
74 pathname
= get_filename_with_digest(name
, contents
)
75 compressed
= gzip_compress(contents
)
77 'ContentEncoding': 'gzip',
78 'ContentType': 'text/plain',
80 log
.info('Uploading "{}" ({} bytes)'.format(
81 pathname
, len(compressed
.getvalue())))
82 with
timed() as elapsed
:
83 s3
.upload_fileobj(compressed
, bucket
,
84 pathname
, ExtraArgs
=extra_args
)
85 log
.info('Finished uploading "{}" in {:0.3f}s'.format(
89 log
.exception('Thread encountered exception:')
93 def do_work(artifact
, region
, bucket
):
94 session_args
= {'region_name': region
}
95 session
= requests
.Session()
96 retry
= Retry(total
=5, backoff_factor
=0.1,
97 status_forcelist
=[500, 502, 503, 504])
98 http_adapter
= requests
.adapters
.HTTPAdapter(max_retries
=retry
)
99 session
.mount('https://', http_adapter
)
100 session
.mount('http://', http_adapter
)
102 if 'TASK_ID' in os
.environ
:
103 level
= os
.environ
.get('MOZ_SCM_LEVEL', '1')
104 secrets_url
= 'http://taskcluster/secrets/v1/secret/project/releng/gecko/build/level-{}/gecko-generated-sources-upload'.format( # noqa
107 'Using AWS credentials from the secrets service: "{}"'.format(secrets_url
))
108 res
= session
.get(secrets_url
)
109 res
.raise_for_status()
112 aws_access_key_id
=secret
['secret']['AWS_ACCESS_KEY_ID'],
113 aws_secret_access_key
=secret
['secret']['AWS_SECRET_ACCESS_KEY'],
116 log
.info('Trying to use your AWS credentials..')
118 # First, fetch the artifact containing the sources.
119 log
.info('Fetching generated sources artifact: "{}"'.format(artifact
))
120 with
timed() as elapsed
:
121 res
= session
.get(artifact
)
122 log
.info('Fetch HTTP status: {}, {} bytes downloaded in {:0.3f}s'.format(
123 res
.status_code
, len(res
.content
), elapsed()))
124 res
.raise_for_status()
125 # Create a queue and worker threads for uploading.
128 log
.info('Creating {} worker threads'.format(NUM_WORKER_THREADS
))
129 for i
in range(NUM_WORKER_THREADS
):
130 t
= Thread(target
=upload_worker
, args
=(q
, event
, bucket
, session_args
))
133 with tarfile
.open(fileobj
=io
.BytesIO(res
.content
), mode
='r|gz') as tar
:
134 # Next, process each file.
138 log
.info('Queueing "{}"'.format(entry
.name
))
139 q
.put((entry
.name
, tar
.extractfile(entry
).read()))
140 # Wait until all uploads are finished.
141 # We don't use q.join() here because we want to also monitor event.
142 while q
.unfinished_tasks
:
144 log
.error('Worker thread encountered exception, exiting...')
149 logging
.basicConfig(format
='%(levelname)s - %(threadName)s - %(message)s')
150 parser
= argparse
.ArgumentParser(
151 description
='Upload generated source files in ARTIFACT to BUCKET in S3.')
152 parser
.add_argument('artifact',
153 help='generated-sources artifact from build task')
154 args
= parser
.parse_args(argv
)
155 region
, bucket
= get_s3_region_and_bucket()
157 config
= MozbuildObject
.from_environment()
158 config
._activate
_virtualenv
()
159 config
.virtualenv_manager
.install_pip_package('boto3==1.4.4')
161 with
timed() as elapsed
:
162 do_work(region
=region
, bucket
=bucket
, artifact
=args
.artifact
)
163 log
.info('Finished in {:.03f}s'.format(elapsed()))
167 if __name__
== '__main__':
168 sys
.exit(main(sys
.argv
[1:]))