3 # tooltool is a lookaside cache implemented in Python
4 # Copyright (C) 2011 John H. Ford <john@johnford.info>
6 # This program is free software; you can redistribute it and/or
7 # modify it under the terms of the GNU General Public License
8 # as published by the Free Software Foundation version 2
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program; if not, write to the Free Software
17 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
20 # A manifest file specifies files in that directory that are stored
21 # elsewhere. This file should only list files in the same directory
22 # in which the manifest file resides and it should be called
25 from __future__
import print_function
26 from __future__
import absolute_import
46 from contextlib
import contextmanager
, closing
47 from functools
import wraps
50 from io
import BytesIO
51 from random
import random
52 from subprocess
import PIPE
53 from subprocess
import Popen
57 # Allowed request header characters:
58 # !#$%&'()*+,-./:;<=>?@[]^_`{|}~ and space, a-z, A-Z, 0-9, \, "
59 REQUEST_HEADER_ATTRIBUTE_CHARS
= re
.compile(
60 r
"^[ a-zA-Z0-9_\!#\$%&'\(\)\*\+,\-\./\:;<\=>\?@\[\]\^`\{\|\}~]*$")
61 DEFAULT_MANIFEST_NAME
= 'manifest.tt'
62 TOOLTOOL_PACKAGE_SUFFIX
= '.TOOLTOOL-PACKAGE'
64 PY3
= sys
.version_info
[0] == 3
67 six_binary_type
= bytes
68 unicode = str # Silence `pyflakes` from reporting `undefined name 'unicode'` in Python 3.
69 import urllib
.request
as urllib2
70 from http
.client
import HTTPSConnection
, HTTPConnection
71 from urllib
.parse
import urlparse
, urljoin
72 from urllib
.request
import Request
73 from urllib
.error
import HTTPError
, URLError
77 from httplib
import HTTPSConnection
, HTTPConnection
78 from urllib2
import Request
, HTTPError
, URLError
79 from urlparse
import urlparse
, urljoin
82 log
= logging
.getLogger(__name__
)
85 # Vendored code from `redo` module
86 def retrier(attempts
=5, sleeptime
=10, max_sleeptime
=300, sleepscale
=1.5, jitter
=1):
88 This function originates from redo 2.0.3 https://github.com/mozilla-releng/redo
89 A generator function that sleeps between retries, handles exponential
90 backoff and jitter. The action you are retrying is meant to run after
93 jitter
= jitter
or 0 # py35 barfs on the next line if jitter is None
94 if jitter
> sleeptime
:
95 # To prevent negative sleep times
96 raise Exception("jitter ({}) must be less than sleep time ({})".format(jitter
, sleeptime
))
98 sleeptime_real
= sleeptime
99 for _
in range(attempts
):
100 log
.debug("attempt %i/%i", _
+ 1, attempts
)
105 sleeptime_real
= sleeptime
+ random
.uniform(-jitter
, jitter
)
106 # our jitter should scale along with the sleeptime
107 jitter
= jitter
* sleepscale
109 sleeptime_real
= sleeptime
111 sleeptime
*= sleepscale
113 if sleeptime_real
> max_sleeptime
:
114 sleeptime_real
= max_sleeptime
116 # Don't need to sleep the last time
118 log
.debug("sleeping for %.2fs (attempt %i/%i)", sleeptime_real
, _
+ 1, attempts
)
119 time
.sleep(sleeptime_real
)
126 max_sleeptime
=5 * 60,
129 retry_exceptions
=(Exception,),
136 This function originates from redo 2.0.3 https://github.com/mozilla-releng/redo
137 Calls an action function until it succeeds, or we give up.
139 assert callable(action
)
140 assert not cleanup
or callable(cleanup
)
142 action_name
= getattr(action
, "__name__", action
)
143 if log_args
and (args
or kwargs
):
144 log_attempt_args
= ("retry: calling %s with args: %s," " kwargs: %s, attempt #%d",
145 action_name
, args
, kwargs
)
147 log_attempt_args
= ("retry: calling %s, attempt #%d", action_name
)
149 if max_sleeptime
< sleeptime
:
150 log
.debug("max_sleeptime %d less than sleeptime %d", max_sleeptime
, sleeptime
)
156 max_sleeptime
=max_sleeptime
,
157 sleepscale
=sleepscale
,
160 logfn
= log
.info
if n
!= 1 else log
.debug
161 logfn_args
= log_attempt_args
+ (n
,)
163 return action(*args
, **kwargs
)
164 except retry_exceptions
:
165 log
.debug("retry: Caught exception: ", exc_info
=True)
169 log
.info("retry: Giving up on %s", action_name
)
176 def retriable(*retry_args
, **retry_kwargs
):
178 This function originates from redo 2.0.3 https://github.com/mozilla-releng/redo
179 A decorator factory for retry(). Wrap your function in @retriable(...) to
180 give it retry powers!
183 def _retriable_factory(func
):
185 def _retriable_wrapper(*args
, **kwargs
):
186 return retry(func
, args
=args
, kwargs
=kwargs
, *retry_args
, **retry_kwargs
)
188 return _retriable_wrapper
190 return _retriable_factory
192 # end of vendored code from redo module
195 def request_has_data(req
):
197 return req
.data
is not None
198 return req
.has_data()
201 def get_hexdigest(val
):
202 return hashlib
.sha512(val
).hexdigest()
205 class FileRecordJSONEncoderException(Exception):
209 class InvalidManifest(Exception):
213 class ExceptionWithFilename(Exception):
215 def __init__(self
, filename
):
216 Exception.__init
__(self
)
217 self
.filename
= filename
220 class BadFilenameException(ExceptionWithFilename
):
224 class DigestMismatchException(ExceptionWithFilename
):
228 class MissingFileException(ExceptionWithFilename
):
232 class InvalidCredentials(Exception):
236 class BadHeaderValue(Exception):
241 url_parts
= urlparse(url
)
243 'scheme': url_parts
.scheme
,
244 'hostname': url_parts
.hostname
,
245 'port': url_parts
.port
,
246 'path': url_parts
.path
,
247 'resource': url_parts
.path
,
248 'query': url_parts
.query
,
250 if len(url_dict
['query']) > 0:
251 url_dict
['resource'] = '%s?%s' % (url_dict
['resource'], # pragma: no cover
254 if url_parts
.port
is None:
255 if url_parts
.scheme
== 'http':
256 url_dict
['port'] = 80
257 elif url_parts
.scheme
== 'https': # pragma: no cover
258 url_dict
['port'] = 443
262 def utc_now(offset_in_seconds
=0.0):
263 return int(math
.floor(calendar
.timegm(time
.gmtime()) + float(offset_in_seconds
)))
266 def random_string(length
):
267 return base64
.urlsafe_b64encode(os
.urandom(length
))[:length
]
270 def prepare_header_val(val
):
271 if isinstance(val
, six_binary_type
):
272 val
= val
.decode('utf-8')
274 if not REQUEST_HEADER_ATTRIBUTE_CHARS
.match(val
):
275 raise BadHeaderValue( # pragma: no cover
276 'header value value={val} contained an illegal character'.format(val
=repr(val
)))
281 def parse_content_type(content_type
): # pragma: no cover
283 return content_type
.split(';')[0].strip().lower()
288 def calculate_payload_hash(algorithm
, payload
, content_type
): # pragma: no cover
290 part
if isinstance(part
, six_binary_type
) else part
.encode('utf8')
291 for part
in ['hawk.' + str(HAWK_VER
) + '.payload\n',
292 parse_content_type(content_type
) + '\n',
298 p_hash
= hashlib
.new(algorithm
)
302 log
.debug('calculating payload hash from:\n{parts}'.format(parts
=pprint
.pformat(parts
)))
304 return base64
.b64encode(p_hash
.digest())
307 def validate_taskcluster_credentials(credentials
):
308 if not hasattr(credentials
, '__getitem__'):
309 raise InvalidCredentials('credentials must be a dict-like object') # pragma: no cover
311 credentials
['clientId']
312 credentials
['accessToken']
313 except KeyError: # pragma: no cover
314 etype
, val
, tb
= sys
.exc_info()
315 raise InvalidCredentials('{etype}: {val}'.format(etype
=etype
, val
=val
))
318 def normalize_header_attr(val
):
319 if isinstance(val
, six_binary_type
):
320 return val
.decode('utf-8')
321 return val
# pragma: no cover
324 def normalize_string(mac_type
,
334 normalize_header_attr(header
)
335 # The blank lines are important. They follow what the Node Hawk lib does.
336 for header
in ['hawk.' + str(HAWK_VER
) + '.' + mac_type
,
344 '', # for ext which is empty in this case
345 '', # Add trailing new line.
350 def calculate_mac(mac_type
,
361 normalized
= normalize_string(mac_type
,
369 log
.debug(u
'normalized resource for mac calc: {norm}'.format(norm
=normalized
))
370 digestmod
= getattr(hashlib
, algorithm
)
372 if not isinstance(normalized
, six_binary_type
):
373 normalized
= normalized
.encode('utf8')
375 if not isinstance(access_token
, six_binary_type
):
376 access_token
= access_token
.encode('ascii')
378 result
= hmac
.new(access_token
, normalized
, digestmod
)
379 return base64
.b64encode(result
.digest())
382 def make_taskcluster_header(credentials
, req
):
383 validate_taskcluster_credentials(credentials
)
385 url
= req
.get_full_url()
386 method
= req
.get_method()
388 timestamp
= str(utc_now())
389 nonce
= random_string(6)
390 url_parts
= parse_url(url
)
393 if request_has_data(req
):
397 data
= req
.get_data()
398 content_hash
= calculate_payload_hash( # pragma: no cover
401 # maybe we should detect this from req.headers but we anyway expect json
402 content_type
='application/json',
405 mac
= calculate_mac('header',
406 credentials
['accessToken'],
411 url_parts
['resource'],
412 url_parts
['hostname'],
413 str(url_parts
['port']),
417 header
= u
'Hawk mac="{}"'.format(prepare_header_val(mac
))
419 if content_hash
: # pragma: no cover
420 header
= u
'{}, hash="{}"'.format(header
, prepare_header_val(content_hash
))
422 header
= u
'{header}, id="{id}", ts="{ts}", nonce="{nonce}"'.format(
424 id=prepare_header_val(credentials
['clientId']),
425 ts
=prepare_header_val(timestamp
),
426 nonce
=prepare_header_val(nonce
),
429 log
.debug('Hawk header for URL={} method={}: {}'.format(url
, method
, header
))
434 class FileRecord(object):
436 def __init__(self
, filename
, size
, digest
, algorithm
, unpack
=False,
437 version
=None, visibility
=None):
438 object.__init
__(self
)
439 if '/' in filename
or '\\' in filename
:
441 "The filename provided contains path information and is, therefore, invalid.")
442 raise BadFilenameException(filename
=filename
)
443 self
.filename
= filename
446 self
.algorithm
= algorithm
448 self
.version
= version
449 self
.visibility
= visibility
451 def __eq__(self
, other
):
454 if self
.filename
== other
.filename
and \
455 self
.size
== other
.size
and \
456 self
.digest
== other
.digest
and \
457 self
.algorithm
== other
.algorithm
and \
458 self
.version
== other
.version
and \
459 self
.visibility
== other
.visibility
:
464 def __ne__(self
, other
):
465 return not self
.__eq
__(other
)
471 return "%s.%s(filename='%s', size=%s, digest='%s', algorithm='%s', visibility=%r)" % (
472 __name__
, self
.__class
__.__name
__, self
.filename
, self
.size
,
473 self
.digest
, self
.algorithm
, self
.visibility
)
476 # Doesn't check validity
477 return os
.path
.exists(self
.filename
)
479 def validate_size(self
):
481 return self
.size
== os
.path
.getsize(self
.filename
)
484 "trying to validate size on a missing file, %s", self
.filename
)
485 raise MissingFileException(filename
=self
.filename
)
487 def validate_digest(self
):
489 with
open(self
.filename
, 'rb') as f
:
490 return self
.digest
== digest_file(f
, self
.algorithm
)
493 "trying to validate digest on a missing file, %s', self.filename")
494 raise MissingFileException(filename
=self
.filename
)
497 if self
.size
is None or self
.validate_size():
498 if self
.validate_digest():
503 if self
.present() and self
.validate():
504 return "'%s' is present and valid" % self
.filename
506 return "'%s' is present and invalid" % self
.filename
508 return "'%s' is absent" % self
.filename
511 def create_file_record(filename
, algorithm
):
512 fo
= open(filename
, 'rb')
513 stored_filename
= os
.path
.split(filename
)[1]
514 fr
= FileRecord(stored_filename
, os
.path
.getsize(
515 filename
), digest_file(fo
, algorithm
), algorithm
)
520 class FileRecordJSONEncoder(json
.JSONEncoder
):
522 def encode_file_record(self
, obj
):
523 if not issubclass(type(obj
), FileRecord
):
524 err
= "FileRecordJSONEncoder is only for FileRecord and lists of FileRecords, " \
525 "not %s" % obj
.__class
__.__name
__
527 raise FileRecordJSONEncoderException(err
)
530 'filename': obj
.filename
,
532 'algorithm': obj
.algorithm
,
533 'digest': obj
.digest
,
538 rv
['version'] = obj
.version
539 if obj
.visibility
is not None:
540 rv
['visibility'] = obj
.visibility
543 def default(self
, f
):
544 if issubclass(type(f
), list):
547 record_list
.append(self
.encode_file_record(i
))
550 return self
.encode_file_record(f
)
553 class FileRecordJSONDecoder(json
.JSONDecoder
):
555 """I help the json module materialize a FileRecord from
556 a JSON file. I understand FileRecords and lists of
557 FileRecords. I ignore things that I don't expect for now"""
558 # TODO: make this more explicit in what it's looking for
559 # and error out on unexpected things
561 def process_file_records(self
, obj
):
562 if isinstance(obj
, list):
565 record
= self
.process_file_records(i
)
566 if issubclass(type(record
), FileRecord
):
567 record_list
.append(record
)
575 if isinstance(obj
, dict):
577 for req
in required_fields
:
583 unpack
= obj
.get('unpack', False)
584 version
= obj
.get('version', None)
585 visibility
= obj
.get('visibility', None)
587 obj
['filename'], obj
['size'], obj
['digest'], obj
['algorithm'],
588 unpack
, version
, visibility
)
589 log
.debug("materialized %s" % rv
)
594 decoded
= json
.JSONDecoder
.decode(self
, s
)
595 rv
= self
.process_file_records(decoded
)
599 class Manifest(object):
601 valid_formats
= ('json',)
603 def __init__(self
, file_records
=None):
604 self
.file_records
= file_records
or []
606 def __eq__(self
, other
):
609 if len(self
.file_records
) != len(other
.file_records
):
610 log
.debug('Manifests differ in number of files')
612 # sort the file records by filename before comparing
613 mine
= sorted((fr
.filename
, fr
) for fr
in self
.file_records
)
614 theirs
= sorted((fr
.filename
, fr
) for fr
in other
.file_records
)
615 return mine
== theirs
617 def __ne__(self
, other
):
618 return not self
.__eq
__(other
)
620 def __deepcopy__(self
, memo
):
621 # This is required for a deep copy
622 return Manifest(self
.file_records
[:])
625 return Manifest(self
.file_records
)
628 return Manifest(self
.file_records
[:])
631 return all(i
.present() for i
in self
.file_records
)
633 def validate_sizes(self
):
634 return all(i
.validate_size() for i
in self
.file_records
)
636 def validate_digests(self
):
637 return all(i
.validate_digest() for i
in self
.file_records
)
640 return all(i
.validate() for i
in self
.file_records
)
642 def load(self
, data_file
, fmt
='json'):
643 assert fmt
in self
.valid_formats
646 self
.file_records
.extend(
647 json
.load(data_file
, cls
=FileRecordJSONDecoder
))
649 raise InvalidManifest("trying to read invalid manifest file")
651 def loads(self
, data_string
, fmt
='json'):
652 assert fmt
in self
.valid_formats
655 self
.file_records
.extend(
656 json
.loads(data_string
, cls
=FileRecordJSONDecoder
))
658 raise InvalidManifest("trying to read invalid manifest file")
660 def dump(self
, output_file
, fmt
='json'):
661 assert fmt
in self
.valid_formats
664 self
.file_records
, output_file
,
665 indent
=2, separators
=(',', ': '),
666 cls
=FileRecordJSONEncoder
,
669 def dumps(self
, fmt
='json'):
670 assert fmt
in self
.valid_formats
674 indent
=2, separators
=(',', ': '),
675 cls
=FileRecordJSONEncoder
,
679 def digest_file(f
, a
):
680 """I take a file like object 'f' and return a hex-string containing
681 of the result of the algorithm 'a' applied to 'f'."""
683 chunk_size
= 1024 * 10
684 data
= f
.read(chunk_size
)
687 data
= f
.read(chunk_size
)
688 name
= repr(f
.name
) if hasattr(f
, 'name') else 'a file'
689 log
.debug('hashed %s with %s to be %s', name
, a
, h
.hexdigest())
694 """Execute CMD, logging its stdout at the info level"""
695 process
= Popen(cmd
, shell
=True, stdout
=PIPE
)
697 line
= process
.stdout
.readline()
700 log
.info(line
.replace('\n', ' '))
701 return process
.wait() == 0
704 def open_manifest(manifest_file
):
705 """I know how to take a filename and load it into a Manifest object"""
706 if os
.path
.exists(manifest_file
):
707 manifest
= Manifest()
708 with
open(manifest_file
, "rb") as f
:
710 log
.debug("loaded manifest from file '%s'" % manifest_file
)
713 log
.debug("tried to load absent file '%s' as manifest" % manifest_file
)
714 raise InvalidManifest(
715 "manifest file '%s' does not exist" % manifest_file
)
718 def list_manifest(manifest_file
):
719 """I know how print all the files in a location"""
721 manifest
= open_manifest(manifest_file
)
722 except InvalidManifest
as e
:
723 log
.error("failed to load manifest file at '%s': %s" % (
728 for f
in manifest
.file_records
:
729 print("{}\t{}\t{}".format("P" if f
.present() else "-",
730 "V" if f
.present() and f
.validate() else "-",
735 def validate_manifest(manifest_file
):
736 """I validate that all files in a manifest are present and valid but
737 don't fetch or delete them if they aren't"""
739 manifest
= open_manifest(manifest_file
)
740 except InvalidManifest
as e
:
741 log
.error("failed to load manifest file at '%s': %s" % (
748 for f
in manifest
.file_records
:
750 absent_files
.append(f
)
753 invalid_files
.append(f
)
754 if len(invalid_files
+ absent_files
) == 0:
760 def add_files(manifest_file
, algorithm
, filenames
, version
, visibility
, unpack
):
761 # returns True if all files successfully added, False if not
762 # and doesn't catch library Exceptions. If any files are already
763 # tracked in the manifest, return will be False because they weren't
765 all_files_added
= True
766 # Create a old_manifest object to add to
767 if os
.path
.exists(manifest_file
):
768 old_manifest
= open_manifest(manifest_file
)
770 old_manifest
= Manifest()
771 log
.debug("creating a new manifest file")
772 new_manifest
= Manifest() # use a different manifest for the output
773 for filename
in filenames
:
774 log
.debug("adding %s" % filename
)
775 path
, name
= os
.path
.split(filename
)
776 new_fr
= create_file_record(filename
, algorithm
)
777 new_fr
.version
= version
778 new_fr
.visibility
= visibility
779 new_fr
.unpack
= unpack
780 log
.debug("appending a new file record to manifest file")
782 for fr
in old_manifest
.file_records
:
783 log
.debug("manifest file has '%s'" % "', ".join(
784 [x
.filename
for x
in old_manifest
.file_records
]))
786 log
.info("file already in old_manifest")
788 elif filename
== fr
.filename
:
789 log
.error("manifest already contains a different file named %s" % filename
)
792 new_manifest
.file_records
.append(new_fr
)
793 log
.debug("added '%s' to manifest" % filename
)
795 all_files_added
= False
796 # copy any files in the old manifest that aren't in the new one
797 new_filenames
= set(fr
.filename
for fr
in new_manifest
.file_records
)
798 for old_fr
in old_manifest
.file_records
:
799 if old_fr
.filename
not in new_filenames
:
800 new_manifest
.file_records
.append(old_fr
)
802 with
open(manifest_file
, mode
="w") as output
:
803 new_manifest
.dump(output
, fmt
='json')
805 with
open(manifest_file
, mode
="wb") as output
:
806 new_manifest
.dump(output
, fmt
='json')
807 return all_files_added
811 """Used to modify mtime in cached files;
812 mtime is used by the purge command"""
816 log
.warn('impossible to update utime of file %s' % f
)
820 @retriable(sleeptime
=2)
821 def request(url
, auth_file
=None):
823 _authorize(req
, auth_file
)
824 with
closing(urllib2
.urlopen(req
)) as f
:
825 log
.debug("opened %s for reading" % url
)
829 def fetch_file(base_urls
, file_record
, grabchunk
=1024 * 4, auth_file
=None, region
=None):
830 # A file which is requested to be fetched that exists locally will be
831 # overwritten by this function
832 fd
, temp_path
= tempfile
.mkstemp(dir=os
.getcwd())
835 for base_url
in base_urls
:
836 # Generate the URL for the file on the server side
837 url
= urljoin(base_url
,
838 '%s/%s' % (file_record
.algorithm
, file_record
.digest
))
839 if region
is not None:
840 url
+= '?region=' + region
842 log
.info("Attempting to fetch from '%s'..." % base_url
)
844 # Well, the file doesn't exist locally. Let's fetch it.
846 with
request(url
, auth_file
) as f
, open(temp_path
, mode
='wb') as out
:
850 # TODO: print statistics as file transfers happen both for info and to stop
852 indata
= f
.read(grabchunk
)
857 log
.info("File %s fetched from %s as %s" %
858 (file_record
.filename
, base_url
, temp_path
))
859 fetched_path
= temp_path
861 except (URLError
, HTTPError
, ValueError):
862 log
.info("...failed to fetch '%s' from %s" %
863 (file_record
.filename
, base_url
), exc_info
=True)
864 except IOError: # pragma: no cover
865 log
.info("failed to write to temporary file for '%s'" %
866 file_record
.filename
, exc_info
=True)
868 # cleanup temp file in case of issues
870 return os
.path
.split(fetched_path
)[1]
874 except OSError: # pragma: no cover
879 def clean_path(dirname
):
880 """Remove a subtree if is exists. Helper for unpack_file()."""
881 if os
.path
.exists(dirname
):
882 log
.info('rm tree: %s' % dirname
)
883 shutil
.rmtree(dirname
)
886 CHECKSUM_SUFFIX
= ".checksum"
889 def unpack_file(filename
):
890 """Untar `filename`, assuming it is uncompressed or compressed with bzip2,
891 xz, gzip, or unzip a zip file. The file is assumed to contain a single
892 directory with a name matching the base of the given filename.
893 Xz support is handled by shelling out to 'tar'."""
894 if os
.path
.isfile(filename
) and tarfile
.is_tarfile(filename
):
895 tar_file
, zip_ext
= os
.path
.splitext(filename
)
896 base_file
, tar_ext
= os
.path
.splitext(tar_file
)
897 clean_path(base_file
)
898 log
.info('untarring "%s"' % filename
)
899 tar
= tarfile
.open(filename
)
902 elif os
.path
.isfile(filename
) and filename
.endswith('.tar.xz'):
903 base_file
= filename
.replace('.tar.xz', '')
904 clean_path(base_file
)
905 log
.info('untarring "%s"' % filename
)
906 # Not using tar -Jxf because it fails on Windows for some reason.
907 process
= Popen(['xz', '-d', '-c', filename
], stdout
=PIPE
)
908 stdout
, stderr
= process
.communicate()
909 if process
.returncode
!= 0:
912 fileobj
.write(stdout
)
914 tar
= tarfile
.open(fileobj
=fileobj
, mode
='r|')
917 elif os
.path
.isfile(filename
) and zipfile
.is_zipfile(filename
):
918 base_file
= filename
.replace('.zip', '')
919 clean_path(base_file
)
920 log
.info('unzipping "%s"' % filename
)
921 z
= zipfile
.ZipFile(filename
)
925 log
.error("Unknown archive extension for filename '%s'" % filename
)
930 def fetch_files(manifest_file
, base_urls
, filenames
=[], cache_folder
=None,
931 auth_file
=None, region
=None):
932 # Lets load the manifest file
934 manifest
= open_manifest(manifest_file
)
935 except InvalidManifest
as e
:
936 log
.error("failed to load manifest file at '%s': %s" % (
942 # we want to track files already in current working directory AND valid
943 # we will not need to fetch these
946 # We want to track files that fail to be fetched as well as
947 # files that are fetched
951 # Files that we want to unpack.
954 # Lets go through the manifest and fetch the files that we want
955 for f
in manifest
.file_records
:
956 # case 1: files are already present
959 present_files
.append(f
.filename
)
961 unpack_files
.append(f
.filename
)
963 # we have an invalid file here, better to cleanup!
964 # this invalid file needs to be replaced with a good one
965 # from the local cash or fetched from a tooltool server
966 log
.info("File %s is present locally but it is invalid, so I will remove it "
967 "and try to fetch it" % f
.filename
)
968 os
.remove(os
.path
.join(os
.getcwd(), f
.filename
))
970 # check if file is already in cache
971 if cache_folder
and f
.filename
not in present_files
:
973 shutil
.copy(os
.path
.join(cache_folder
, f
.digest
),
974 os
.path
.join(os
.getcwd(), f
.filename
))
975 log
.info("File %s retrieved from local cache %s" %
976 (f
.filename
, cache_folder
))
977 touch(os
.path
.join(cache_folder
, f
.digest
))
979 filerecord_for_validation
= FileRecord(
980 f
.filename
, f
.size
, f
.digest
, f
.algorithm
)
981 if filerecord_for_validation
.validate():
982 present_files
.append(f
.filename
)
984 unpack_files
.append(f
.filename
)
986 # the file copied from the cache is invalid, better to
987 # clean up the cache version itself as well
988 log
.warn("File %s retrieved from cache is invalid! I am deleting it from the "
989 "cache as well" % f
.filename
)
990 os
.remove(os
.path
.join(os
.getcwd(), f
.filename
))
991 os
.remove(os
.path
.join(cache_folder
, f
.digest
))
993 log
.info("File %s not present in local cache folder %s" %
994 (f
.filename
, cache_folder
))
996 # now I will try to fetch all files which are not already present and
997 # valid, appending a suffix to avoid race conditions
998 temp_file_name
= None
999 # 'filenames' is the list of filenames to be managed, if this variable
1000 # is a non empty list it can be used to filter if filename is in
1001 # present_files, it means that I have it already because it was already
1002 # either in the working dir or in the cache
1003 if (f
.filename
in filenames
or len(filenames
) == 0) and f
.filename
not in present_files
:
1004 log
.debug("fetching %s" % f
.filename
)
1005 temp_file_name
= fetch_file(base_urls
, f
, auth_file
=auth_file
, region
=region
)
1007 fetched_files
.append((f
, temp_file_name
))
1009 failed_files
.append(f
.filename
)
1011 log
.debug("skipping %s" % f
.filename
)
1013 # lets ensure that fetched files match what the manifest specified
1014 for localfile
, temp_file_name
in fetched_files
:
1015 # since I downloaded to a temp file, I need to perform all validations on the temp file
1016 # this is why filerecord_for_validation is created
1018 filerecord_for_validation
= FileRecord(
1019 temp_file_name
, localfile
.size
, localfile
.digest
, localfile
.algorithm
)
1021 if filerecord_for_validation
.validate():
1023 # I can rename the temp file
1024 log
.info("File integrity verified, renaming %s to %s" %
1025 (temp_file_name
, localfile
.filename
))
1026 os
.rename(os
.path
.join(os
.getcwd(), temp_file_name
),
1027 os
.path
.join(os
.getcwd(), localfile
.filename
))
1029 if localfile
.unpack
:
1030 unpack_files
.append(localfile
.filename
)
1032 # if I am using a cache and a new file has just been retrieved from a
1033 # remote location, I need to update the cache as well
1035 log
.info("Updating local cache %s..." % cache_folder
)
1037 if not os
.path
.exists(cache_folder
):
1038 log
.info("Creating cache in %s..." % cache_folder
)
1039 os
.makedirs(cache_folder
, 0o0700)
1040 shutil
.copy(os
.path
.join(os
.getcwd(), localfile
.filename
),
1041 os
.path
.join(cache_folder
, localfile
.digest
))
1042 log
.info("Local cache %s updated with %s" % (cache_folder
,
1043 localfile
.filename
))
1044 touch(os
.path
.join(cache_folder
, localfile
.digest
))
1045 except (OSError, IOError):
1046 log
.warning('Impossible to add file %s to cache folder %s' %
1047 (localfile
.filename
, cache_folder
), exc_info
=True)
1049 failed_files
.append(localfile
.filename
)
1050 log
.error("'%s'" % filerecord_for_validation
.describe())
1051 os
.remove(temp_file_name
)
1053 # Unpack files that need to be unpacked.
1054 for filename
in unpack_files
:
1055 if not unpack_file(filename
):
1056 failed_files
.append(filename
)
1058 # If we failed to fetch or validate a file, we need to fail
1059 if len(failed_files
) > 0:
1060 log
.error("The following files failed: '%s'" %
1061 "', ".join(failed_files
))
1067 "Returns the number of bytes free under directory `p`"
1068 if sys
.platform
== 'win32': # pragma: no cover
1069 # os.statvfs doesn't work on Windows
1072 secsPerClus
, bytesPerSec
, nFreeClus
, totClus
= win32file
.GetDiskFreeSpace(
1074 return secsPerClus
* bytesPerSec
* nFreeClus
1077 return r
.f_frsize
* r
.f_bavail
1080 def purge(folder
, gigs
):
1081 """If gigs is non 0, it deletes files in `folder` until `gigs` GB are free,
1082 starting from older files. If gigs is 0, a full purge will be performed.
1083 No recursive deletion of files in subfolder is performed."""
1085 full_purge
= bool(gigs
== 0)
1086 gigs
*= 1024 * 1024 * 1024
1088 if not full_purge
and freespace(folder
) >= gigs
:
1089 log
.info("No need to cleanup")
1093 for f
in os
.listdir(folder
):
1094 p
= os
.path
.join(folder
, f
)
1095 # it deletes files in folder without going into subfolders,
1096 # assuming the cache has a flat structure
1097 if not os
.path
.isfile(p
):
1099 mtime
= os
.path
.getmtime(p
)
1100 files
.append((mtime
, p
))
1102 # iterate files sorted by mtime
1103 for _
, f
in sorted(files
):
1104 log
.info("removing %s to free up space" % f
)
1108 log
.info("Impossible to remove %s" % f
, exc_info
=True)
1109 if not full_purge
and freespace(folder
) >= gigs
:
1113 def _log_api_error(e
):
1114 if hasattr(e
, 'hdrs') and e
.hdrs
['content-type'] == 'application/json':
1115 json_resp
= json
.load(e
.fp
)
1116 log
.error("%s: %s" % (json_resp
['error']['name'],
1117 json_resp
['error']['description']))
1119 log
.exception("Error making RelengAPI request:")
1122 def _authorize(req
, auth_file
):
1126 is_taskcluster_auth
= False
1127 with
open(auth_file
) as f
:
1128 auth_file_content
= f
.read().strip()
1130 auth_file_content
= json
.loads(auth_file_content
)
1131 is_taskcluster_auth
= True
1135 if is_taskcluster_auth
:
1136 taskcluster_header
= make_taskcluster_header(auth_file_content
, req
)
1137 log
.debug("Using taskcluster credentials in %s" % auth_file
)
1138 req
.add_unredirected_header('Authorization', taskcluster_header
)
1140 log
.debug("Using Bearer token in %s" % auth_file
)
1141 req
.add_unredirected_header('Authorization', 'Bearer %s' % auth_file_content
)
1144 def _send_batch(base_url
, auth_file
, batch
, region
):
1145 url
= urljoin(base_url
, 'upload')
1146 if region
is not None:
1147 url
+= "?region=" + region
1148 data
= json
.dumps(batch
)
1150 data
= data
.encode("utf-8")
1151 req
= Request(url
, data
, {'Content-Type': 'application/json'})
1152 _authorize(req
, auth_file
)
1154 resp
= urllib2
.urlopen(req
)
1155 except (URLError
, HTTPError
) as e
:
1158 return json
.load(resp
)['result']
1161 def _s3_upload(filename
, file):
1162 # urllib2 does not support streaming, so we fall back to good old httplib
1163 url
= urlparse(file['put_url'])
1164 cls
= HTTPSConnection
if url
.scheme
== 'https' else HTTPConnection
1165 host
, port
= url
.netloc
.split(':') if ':' in url
.netloc
else (url
.netloc
, 443)
1167 conn
= cls(host
, port
)
1169 req_path
= "%s?%s" % (url
.path
, url
.query
) if url
.query
else url
.path
1170 with
open(filename
, 'rb') as f
:
1172 content_length
= len(content
)
1179 'Content-Type': 'application/octet-stream',
1180 'Content-Length': str(content_length
),
1183 resp
= conn
.getresponse()
1184 resp_body
= resp
.read()
1186 if resp
.status
!= 200:
1187 raise RuntimeError("Non-200 return from AWS: %s %s\n%s" %
1188 (resp
.status
, resp
.reason
, resp_body
))
1190 file['upload_exception'] = sys
.exc_info()
1191 file['upload_ok'] = False
1193 file['upload_ok'] = True
1196 def _notify_upload_complete(base_url
, auth_file
, file):
1200 'upload/complete/%(algorithm)s/%(digest)s' % file))
1201 _authorize(req
, auth_file
)
1203 urllib2
.urlopen(req
)
1204 except HTTPError
as e
:
1208 # 409 indicates that the upload URL hasn't expired yet and we
1209 # should retry after a delay
1210 to_wait
= int(e
.headers
.get('X-Retry-After', 60))
1211 log
.warning("Waiting %d seconds for upload URLs to expire" % to_wait
)
1213 _notify_upload_complete(base_url
, auth_file
, file)
1215 log
.exception("While notifying server of upload completion:")
1218 def upload(manifest
, message
, base_urls
, auth_file
, region
):
1220 manifest
= open_manifest(manifest
)
1221 except InvalidManifest
:
1222 log
.exception("failed to load manifest file at '%s'")
1225 # verify the manifest, since we'll need the files present to upload
1226 if not manifest
.validate():
1227 log
.error('manifest is invalid')
1230 if any(fr
.visibility
is None for fr
in manifest
.file_records
):
1231 log
.error('All files in a manifest for upload must have a visibility set')
1233 # convert the manifest to an upload batch
1238 for fr
in manifest
.file_records
:
1239 batch
['files'][fr
.filename
] = {
1241 'digest': fr
.digest
,
1242 'algorithm': fr
.algorithm
,
1243 'visibility': fr
.visibility
,
1246 # make the upload request
1247 resp
= _send_batch(base_urls
[0], auth_file
, batch
, region
)
1250 files
= resp
['files']
1252 # Upload the files, each in a thread. This allows us to start all of the
1253 # uploads before any of the URLs expire.
1255 for filename
, file in files
.items():
1256 if 'put_url' in file:
1257 log
.info("%s: starting upload" % (filename
,))
1258 thd
= threading
.Thread(target
=_s3_upload
,
1259 args
=(filename
, file))
1262 threads
[filename
] = thd
1264 log
.info("%s: already exists on server" % (filename
,))
1266 # re-join all of those threads as they exit
1269 for filename
, thread
in list(threads
.items()):
1270 if not thread
.is_alive():
1271 # _s3_upload has annotated file with result information
1272 file = files
[filename
]
1274 if file['upload_ok']:
1275 log
.info("%s: uploaded" % filename
)
1277 log
.error("%s: failed" % filename
,
1278 exc_info
=file['upload_exception'])
1280 del threads
[filename
]
1282 # notify the server that the uploads are completed. If the notification
1283 # fails, we don't consider that an error (the server will notice
1285 for filename
, file in files
.items():
1286 if 'put_url' in file and file['upload_ok']:
1287 log
.info("notifying server of upload completion for %s" % (filename
,))
1288 _notify_upload_complete(base_urls
[0], auth_file
, file)
1293 def send_operation_on_file(data
, base_urls
, digest
, auth_file
):
1295 url
= urljoin(url
, 'file/sha512/' + digest
)
1297 data
= json
.dumps(data
)
1299 req
= Request(url
, data
, {'Content-Type': 'application/json'})
1300 req
.get_method
= lambda: 'PATCH'
1302 _authorize(req
, auth_file
)
1305 urllib2
.urlopen(req
)
1306 except (URLError
, HTTPError
) as e
:
1312 def change_visibility(base_urls
, digest
, visibility
, auth_file
):
1314 "op": "set_visibility",
1315 "visibility": visibility
,
1317 return send_operation_on_file(data
, base_urls
, digest
, visibility
, auth_file
)
1320 def delete_instances(base_urls
, digest
, auth_file
):
1322 "op": "delete_instances",
1324 return send_operation_on_file(data
, base_urls
, digest
, auth_file
)
1327 def process_command(options
, args
):
1328 """ I know how to take a list of program arguments and
1329 start doing the right thing with them"""
1332 log
.debug("processing '%s' command with args '%s'" %
1333 (cmd
, '", "'.join(cmd_args
)))
1334 log
.debug("using options: %s" % options
)
1337 return list_manifest(options
['manifest'])
1338 if cmd
== 'validate':
1339 return validate_manifest(options
['manifest'])
1341 return add_files(options
['manifest'], options
['algorithm'], cmd_args
,
1342 options
['version'], options
['visibility'],
1344 elif cmd
== 'purge':
1345 if options
['cache_folder']:
1346 purge(folder
=options
['cache_folder'], gigs
=options
['size'])
1348 log
.critical('please specify the cache folder to be purged')
1350 elif cmd
== 'fetch':
1352 options
['manifest'],
1353 options
['base_url'],
1355 cache_folder
=options
['cache_folder'],
1356 auth_file
=options
.get("auth_file"),
1357 region
=options
.get('region'))
1358 elif cmd
== 'upload':
1359 if not options
.get('message'):
1360 log
.critical('upload command requires a message')
1363 options
.get('manifest'),
1364 options
.get('message'),
1365 options
.get('base_url'),
1366 options
.get('auth_file'),
1367 options
.get('region'))
1368 elif cmd
== 'change-visibility':
1369 if not options
.get('digest'):
1370 log
.critical('change-visibility command requires a digest option')
1372 if not options
.get('visibility'):
1373 log
.critical('change-visibility command requires a visibility option')
1375 return change_visibility(
1376 options
.get('base_url'),
1377 options
.get('digest'),
1378 options
.get('visibility'),
1379 options
.get('auth_file'),
1381 elif cmd
== 'delete':
1382 if not options
.get('digest'):
1383 log
.critical('change-visibility command requires a digest option')
1385 return delete_instances(
1386 options
.get('base_url'),
1387 options
.get('digest'),
1388 options
.get('auth_file'),
1391 log
.critical('command "%s" is not implemented' % cmd
)
1395 def main(argv
, _skip_logging
=False):
1396 # Set up option parsing
1397 parser
= optparse
.OptionParser()
1398 parser
.add_option('-q', '--quiet', default
=logging
.INFO
,
1399 dest
='loglevel', action
='store_const', const
=logging
.ERROR
)
1400 parser
.add_option('-v', '--verbose',
1401 dest
='loglevel', action
='store_const', const
=logging
.DEBUG
)
1402 parser
.add_option('-m', '--manifest', default
=DEFAULT_MANIFEST_NAME
,
1403 dest
='manifest', action
='store',
1404 help='specify the manifest file to be operated on')
1405 parser
.add_option('-d', '--algorithm', default
='sha512',
1406 dest
='algorithm', action
='store',
1407 help='hashing algorithm to use (only sha512 is allowed)')
1408 parser
.add_option('--digest', default
=None,
1409 dest
='digest', action
='store',
1410 help='digest hash to change visibility for')
1411 parser
.add_option('--visibility', default
=None,
1412 dest
='visibility', choices
=['internal', 'public'],
1413 help='Visibility level of this file; "internal" is for '
1414 'files that cannot be distributed out of the company '
1415 'but not for secrets; "public" files are available to '
1416 'anyone without restriction')
1417 parser
.add_option('--unpack', default
=False,
1418 dest
='unpack', action
='store_true',
1419 help='Request unpacking this file after fetch.'
1420 ' This is helpful with tarballs.')
1421 parser
.add_option('--version', default
=None,
1422 dest
='version', action
='store',
1423 help='Version string for this file. This annotates the '
1424 'manifest entry with a version string to help '
1425 'identify the contents.')
1426 parser
.add_option('-o', '--overwrite', default
=False,
1427 dest
='overwrite', action
='store_true',
1428 help='UNUSED; present for backward compatibility')
1429 parser
.add_option('--url', dest
='base_url', action
='append',
1430 help='RelengAPI URL ending with /tooltool/; default '
1431 'is appropriate for Mozilla')
1432 parser
.add_option('-c', '--cache-folder', dest
='cache_folder',
1433 help='Local cache folder')
1434 parser
.add_option('-s', '--size',
1435 help='free space required (in GB)', dest
='size',
1436 type='float', default
=0.)
1437 parser
.add_option('-r', '--region', help='Preferred AWS region for upload or fetch; '
1438 'example: --region=us-west-2')
1439 parser
.add_option('--message',
1440 help='The "commit message" for an upload; format with a bug number '
1441 'and brief comment',
1443 parser
.add_option('--authentication-file',
1444 help='Use the RelengAPI token found in the given file to '
1445 'authenticate to the RelengAPI server.',
1448 (options_obj
, args
) = parser
.parse_args(argv
[1:])
1450 if not options_obj
.base_url
:
1451 tooltool_host
= os
.environ
.get('TOOLTOOL_HOST', 'tooltool.mozilla-releng.net')
1452 taskcluster_proxy_url
= os
.environ
.get('TASKCLUSTER_PROXY_URL')
1453 if taskcluster_proxy_url
:
1454 tooltool_url
= '{}/{}'.format(taskcluster_proxy_url
, tooltool_host
)
1456 tooltool_url
= 'https://{}'.format(tooltool_host
)
1458 options_obj
.base_url
= [tooltool_url
]
1460 # ensure all URLs have a trailing slash
1462 return url
if url
.endswith('/') else (url
+ '/')
1463 options_obj
.base_url
= [add_slash(u
) for u
in options_obj
.base_url
]
1465 # expand ~ in --authentication-file
1466 if options_obj
.auth_file
:
1467 options_obj
.auth_file
= os
.path
.expanduser(options_obj
.auth_file
)
1469 # Dictionaries are easier to work with
1470 options
= vars(options_obj
)
1472 log
.setLevel(options
['loglevel'])
1474 # Set up logging, for now just to the console
1475 if not _skip_logging
: # pragma: no cover
1476 ch
= logging
.StreamHandler()
1477 cf
= logging
.Formatter("%(levelname)s - %(message)s")
1481 if options
['algorithm'] != 'sha512':
1482 parser
.error('only --algorithm sha512 is supported')
1485 parser
.error('You must specify a command')
1487 return 0 if process_command(options
, args
) else 1
1490 if __name__
== "__main__": # pragma: no cover
1491 sys
.exit(main(sys
.argv
))