Bug 1641886 [wpt PR 23851] - Support interpolating contain-intrinsic-size, a=testonly
[gecko.git] / testing / mozharness / external_tools / tooltool.py
blob9d966f7420a9bab4b0b2c425e631a2b638eb92fb
1 #!/usr/bin/env python
3 # tooltool is a lookaside cache implemented in Python
4 # Copyright (C) 2011 John H. Ford <john@johnford.info>
6 # This program is free software; you can redistribute it and/or
7 # modify it under the terms of the GNU General Public License
8 # as published by the Free Software Foundation version 2
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program; if not, write to the Free Software
17 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
18 # 02110-1301, USA.
20 # A manifest file specifies files in that directory that are stored
21 # elsewhere. This file should only list files in the same directory
22 # in which the manifest file resides and it should be called
23 # 'manifest.tt'
25 from __future__ import print_function
26 from __future__ import absolute_import
28 import base64
29 import calendar
30 import hashlib
31 import hmac
32 import json
33 import logging
34 import math
35 import optparse
36 import os
37 import pprint
38 import re
39 import shutil
40 import sys
41 import tarfile
42 import tempfile
43 import threading
44 import time
45 import zipfile
46 from contextlib import contextmanager, closing
47 from functools import wraps
49 from io import open
50 from io import BytesIO
51 from random import random
52 from subprocess import PIPE
53 from subprocess import Popen
55 __version__ = '1'
57 # Allowed request header characters:
58 # !#$%&'()*+,-./:;<=>?@[]^_`{|}~ and space, a-z, A-Z, 0-9, \, "
59 REQUEST_HEADER_ATTRIBUTE_CHARS = re.compile(
60 r"^[ a-zA-Z0-9_\!#\$%&'\(\)\*\+,\-\./\:;<\=>\?@\[\]\^`\{\|\}~]*$")
61 DEFAULT_MANIFEST_NAME = 'manifest.tt'
62 TOOLTOOL_PACKAGE_SUFFIX = '.TOOLTOOL-PACKAGE'
63 HAWK_VER = 1
64 PY3 = sys.version_info[0] == 3
66 if PY3:
67 six_binary_type = bytes
68 unicode = str # Silence `pyflakes` from reporting `undefined name 'unicode'` in Python 3.
69 import urllib.request as urllib2
70 from http.client import HTTPSConnection, HTTPConnection
71 from urllib.parse import urlparse, urljoin
72 from urllib.request import Request
73 from urllib.error import HTTPError, URLError
74 else:
75 six_binary_type = str
76 import urllib2
77 from httplib import HTTPSConnection, HTTPConnection
78 from urllib2 import Request, HTTPError, URLError
79 from urlparse import urlparse, urljoin
82 log = logging.getLogger(__name__)
85 # Vendored code from `redo` module
86 def retrier(attempts=5, sleeptime=10, max_sleeptime=300, sleepscale=1.5, jitter=1):
87 """
88 This function originates from redo 2.0.3 https://github.com/mozilla-releng/redo
89 A generator function that sleeps between retries, handles exponential
90 backoff and jitter. The action you are retrying is meant to run after
91 retrier yields.
92 """
93 jitter = jitter or 0 # py35 barfs on the next line if jitter is None
94 if jitter > sleeptime:
95 # To prevent negative sleep times
96 raise Exception("jitter ({}) must be less than sleep time ({})".format(jitter, sleeptime))
98 sleeptime_real = sleeptime
99 for _ in range(attempts):
100 log.debug("attempt %i/%i", _ + 1, attempts)
102 yield sleeptime_real
104 if jitter:
105 sleeptime_real = sleeptime + random.uniform(-jitter, jitter)
106 # our jitter should scale along with the sleeptime
107 jitter = jitter * sleepscale
108 else:
109 sleeptime_real = sleeptime
111 sleeptime *= sleepscale
113 if sleeptime_real > max_sleeptime:
114 sleeptime_real = max_sleeptime
116 # Don't need to sleep the last time
117 if _ < attempts - 1:
118 log.debug("sleeping for %.2fs (attempt %i/%i)", sleeptime_real, _ + 1, attempts)
119 time.sleep(sleeptime_real)
122 def retry(
123 action,
124 attempts=5,
125 sleeptime=60,
126 max_sleeptime=5 * 60,
127 sleepscale=1.5,
128 jitter=1,
129 retry_exceptions=(Exception,),
130 cleanup=None,
131 args=(),
132 kwargs={},
133 log_args=True,
136 This function originates from redo 2.0.3 https://github.com/mozilla-releng/redo
137 Calls an action function until it succeeds, or we give up.
139 assert callable(action)
140 assert not cleanup or callable(cleanup)
142 action_name = getattr(action, "__name__", action)
143 if log_args and (args or kwargs):
144 log_attempt_args = ("retry: calling %s with args: %s," " kwargs: %s, attempt #%d",
145 action_name, args, kwargs)
146 else:
147 log_attempt_args = ("retry: calling %s, attempt #%d", action_name)
149 if max_sleeptime < sleeptime:
150 log.debug("max_sleeptime %d less than sleeptime %d", max_sleeptime, sleeptime)
152 n = 1
153 for _ in retrier(
154 attempts=attempts,
155 sleeptime=sleeptime,
156 max_sleeptime=max_sleeptime,
157 sleepscale=sleepscale,
158 jitter=jitter):
159 try:
160 logfn = log.info if n != 1 else log.debug
161 logfn_args = log_attempt_args + (n,)
162 logfn(*logfn_args)
163 return action(*args, **kwargs)
164 except retry_exceptions:
165 log.debug("retry: Caught exception: ", exc_info=True)
166 if cleanup:
167 cleanup()
168 if n == attempts:
169 log.info("retry: Giving up on %s", action_name)
170 raise
171 continue
172 finally:
173 n += 1
176 def retriable(*retry_args, **retry_kwargs):
178 This function originates from redo 2.0.3 https://github.com/mozilla-releng/redo
179 A decorator factory for retry(). Wrap your function in @retriable(...) to
180 give it retry powers!
183 def _retriable_factory(func):
184 @wraps(func)
185 def _retriable_wrapper(*args, **kwargs):
186 return retry(func, args=args, kwargs=kwargs, *retry_args, **retry_kwargs)
188 return _retriable_wrapper
190 return _retriable_factory
192 # end of vendored code from redo module
195 def request_has_data(req):
196 if PY3:
197 return req.data is not None
198 return req.has_data()
201 def get_hexdigest(val):
202 return hashlib.sha512(val).hexdigest()
205 class FileRecordJSONEncoderException(Exception):
206 pass
209 class InvalidManifest(Exception):
210 pass
213 class ExceptionWithFilename(Exception):
215 def __init__(self, filename):
216 Exception.__init__(self)
217 self.filename = filename
220 class BadFilenameException(ExceptionWithFilename):
221 pass
224 class DigestMismatchException(ExceptionWithFilename):
225 pass
228 class MissingFileException(ExceptionWithFilename):
229 pass
232 class InvalidCredentials(Exception):
233 pass
236 class BadHeaderValue(Exception):
237 pass
240 def parse_url(url):
241 url_parts = urlparse(url)
242 url_dict = {
243 'scheme': url_parts.scheme,
244 'hostname': url_parts.hostname,
245 'port': url_parts.port,
246 'path': url_parts.path,
247 'resource': url_parts.path,
248 'query': url_parts.query,
250 if len(url_dict['query']) > 0:
251 url_dict['resource'] = '%s?%s' % (url_dict['resource'], # pragma: no cover
252 url_dict['query'])
254 if url_parts.port is None:
255 if url_parts.scheme == 'http':
256 url_dict['port'] = 80
257 elif url_parts.scheme == 'https': # pragma: no cover
258 url_dict['port'] = 443
259 return url_dict
262 def utc_now(offset_in_seconds=0.0):
263 return int(math.floor(calendar.timegm(time.gmtime()) + float(offset_in_seconds)))
266 def random_string(length):
267 return base64.urlsafe_b64encode(os.urandom(length))[:length]
270 def prepare_header_val(val):
271 if isinstance(val, six_binary_type):
272 val = val.decode('utf-8')
274 if not REQUEST_HEADER_ATTRIBUTE_CHARS.match(val):
275 raise BadHeaderValue( # pragma: no cover
276 'header value value={val} contained an illegal character'.format(val=repr(val)))
278 return val
281 def parse_content_type(content_type): # pragma: no cover
282 if content_type:
283 return content_type.split(';')[0].strip().lower()
284 else:
285 return ''
288 def calculate_payload_hash(algorithm, payload, content_type): # pragma: no cover
289 parts = [
290 part if isinstance(part, six_binary_type) else part.encode('utf8')
291 for part in ['hawk.' + str(HAWK_VER) + '.payload\n',
292 parse_content_type(content_type) + '\n',
293 payload or '',
294 '\n',
298 p_hash = hashlib.new(algorithm)
299 for p in parts:
300 p_hash.update(p)
302 log.debug('calculating payload hash from:\n{parts}'.format(parts=pprint.pformat(parts)))
304 return base64.b64encode(p_hash.digest())
307 def validate_taskcluster_credentials(credentials):
308 if not hasattr(credentials, '__getitem__'):
309 raise InvalidCredentials('credentials must be a dict-like object') # pragma: no cover
310 try:
311 credentials['clientId']
312 credentials['accessToken']
313 except KeyError: # pragma: no cover
314 etype, val, tb = sys.exc_info()
315 raise InvalidCredentials('{etype}: {val}'.format(etype=etype, val=val))
318 def normalize_header_attr(val):
319 if isinstance(val, six_binary_type):
320 return val.decode('utf-8')
321 return val # pragma: no cover
324 def normalize_string(mac_type,
325 timestamp,
326 nonce,
327 method,
328 name,
329 host,
330 port,
331 content_hash,
333 return '\n'.join([
334 normalize_header_attr(header)
335 # The blank lines are important. They follow what the Node Hawk lib does.
336 for header in ['hawk.' + str(HAWK_VER) + '.' + mac_type,
337 timestamp,
338 nonce,
339 method or '',
340 name or '',
341 host,
342 port,
343 content_hash or '',
344 '', # for ext which is empty in this case
345 '', # Add trailing new line.
350 def calculate_mac(mac_type,
351 access_token,
352 algorithm,
353 timestamp,
354 nonce,
355 method,
356 name,
357 host,
358 port,
359 content_hash,
361 normalized = normalize_string(mac_type,
362 timestamp,
363 nonce,
364 method,
365 name,
366 host,
367 port,
368 content_hash)
369 log.debug(u'normalized resource for mac calc: {norm}'.format(norm=normalized))
370 digestmod = getattr(hashlib, algorithm)
372 if not isinstance(normalized, six_binary_type):
373 normalized = normalized.encode('utf8')
375 if not isinstance(access_token, six_binary_type):
376 access_token = access_token.encode('ascii')
378 result = hmac.new(access_token, normalized, digestmod)
379 return base64.b64encode(result.digest())
382 def make_taskcluster_header(credentials, req):
383 validate_taskcluster_credentials(credentials)
385 url = req.get_full_url()
386 method = req.get_method()
387 algorithm = 'sha256'
388 timestamp = str(utc_now())
389 nonce = random_string(6)
390 url_parts = parse_url(url)
392 content_hash = None
393 if request_has_data(req):
394 if PY3:
395 data = req.data
396 else:
397 data = req.get_data()
398 content_hash = calculate_payload_hash( # pragma: no cover
399 algorithm,
400 data,
401 # maybe we should detect this from req.headers but we anyway expect json
402 content_type='application/json',
405 mac = calculate_mac('header',
406 credentials['accessToken'],
407 algorithm,
408 timestamp,
409 nonce,
410 method,
411 url_parts['resource'],
412 url_parts['hostname'],
413 str(url_parts['port']),
414 content_hash,
417 header = u'Hawk mac="{}"'.format(prepare_header_val(mac))
419 if content_hash: # pragma: no cover
420 header = u'{}, hash="{}"'.format(header, prepare_header_val(content_hash))
422 header = u'{header}, id="{id}", ts="{ts}", nonce="{nonce}"'.format(
423 header=header,
424 id=prepare_header_val(credentials['clientId']),
425 ts=prepare_header_val(timestamp),
426 nonce=prepare_header_val(nonce),
429 log.debug('Hawk header for URL={} method={}: {}'.format(url, method, header))
431 return header
434 class FileRecord(object):
436 def __init__(self, filename, size, digest, algorithm, unpack=False,
437 version=None, visibility=None):
438 object.__init__(self)
439 if '/' in filename or '\\' in filename:
440 log.error(
441 "The filename provided contains path information and is, therefore, invalid.")
442 raise BadFilenameException(filename=filename)
443 self.filename = filename
444 self.size = size
445 self.digest = digest
446 self.algorithm = algorithm
447 self.unpack = unpack
448 self.version = version
449 self.visibility = visibility
451 def __eq__(self, other):
452 if self is other:
453 return True
454 if self.filename == other.filename and \
455 self.size == other.size and \
456 self.digest == other.digest and \
457 self.algorithm == other.algorithm and \
458 self.version == other.version and \
459 self.visibility == other.visibility:
460 return True
461 else:
462 return False
464 def __ne__(self, other):
465 return not self.__eq__(other)
467 def __str__(self):
468 return repr(self)
470 def __repr__(self):
471 return "%s.%s(filename='%s', size=%s, digest='%s', algorithm='%s', visibility=%r)" % (
472 __name__, self.__class__.__name__, self.filename, self.size,
473 self.digest, self.algorithm, self.visibility)
475 def present(self):
476 # Doesn't check validity
477 return os.path.exists(self.filename)
479 def validate_size(self):
480 if self.present():
481 return self.size == os.path.getsize(self.filename)
482 else:
483 log.debug(
484 "trying to validate size on a missing file, %s", self.filename)
485 raise MissingFileException(filename=self.filename)
487 def validate_digest(self):
488 if self.present():
489 with open(self.filename, 'rb') as f:
490 return self.digest == digest_file(f, self.algorithm)
491 else:
492 log.debug(
493 "trying to validate digest on a missing file, %s', self.filename")
494 raise MissingFileException(filename=self.filename)
496 def validate(self):
497 if self.size is None or self.validate_size():
498 if self.validate_digest():
499 return True
500 return False
502 def describe(self):
503 if self.present() and self.validate():
504 return "'%s' is present and valid" % self.filename
505 elif self.present():
506 return "'%s' is present and invalid" % self.filename
507 else:
508 return "'%s' is absent" % self.filename
511 def create_file_record(filename, algorithm):
512 fo = open(filename, 'rb')
513 stored_filename = os.path.split(filename)[1]
514 fr = FileRecord(stored_filename, os.path.getsize(
515 filename), digest_file(fo, algorithm), algorithm)
516 fo.close()
517 return fr
520 class FileRecordJSONEncoder(json.JSONEncoder):
522 def encode_file_record(self, obj):
523 if not issubclass(type(obj), FileRecord):
524 err = "FileRecordJSONEncoder is only for FileRecord and lists of FileRecords, " \
525 "not %s" % obj.__class__.__name__
526 log.warn(err)
527 raise FileRecordJSONEncoderException(err)
528 else:
529 rv = {
530 'filename': obj.filename,
531 'size': obj.size,
532 'algorithm': obj.algorithm,
533 'digest': obj.digest,
535 if obj.unpack:
536 rv['unpack'] = True
537 if obj.version:
538 rv['version'] = obj.version
539 if obj.visibility is not None:
540 rv['visibility'] = obj.visibility
541 return rv
543 def default(self, f):
544 if issubclass(type(f), list):
545 record_list = []
546 for i in f:
547 record_list.append(self.encode_file_record(i))
548 return record_list
549 else:
550 return self.encode_file_record(f)
553 class FileRecordJSONDecoder(json.JSONDecoder):
555 """I help the json module materialize a FileRecord from
556 a JSON file. I understand FileRecords and lists of
557 FileRecords. I ignore things that I don't expect for now"""
558 # TODO: make this more explicit in what it's looking for
559 # and error out on unexpected things
561 def process_file_records(self, obj):
562 if isinstance(obj, list):
563 record_list = []
564 for i in obj:
565 record = self.process_file_records(i)
566 if issubclass(type(record), FileRecord):
567 record_list.append(record)
568 return record_list
569 required_fields = [
570 'filename',
571 'size',
572 'algorithm',
573 'digest',
575 if isinstance(obj, dict):
576 missing = False
577 for req in required_fields:
578 if req not in obj:
579 missing = True
580 break
582 if not missing:
583 unpack = obj.get('unpack', False)
584 version = obj.get('version', None)
585 visibility = obj.get('visibility', None)
586 rv = FileRecord(
587 obj['filename'], obj['size'], obj['digest'], obj['algorithm'],
588 unpack, version, visibility)
589 log.debug("materialized %s" % rv)
590 return rv
591 return obj
593 def decode(self, s):
594 decoded = json.JSONDecoder.decode(self, s)
595 rv = self.process_file_records(decoded)
596 return rv
599 class Manifest(object):
601 valid_formats = ('json',)
603 def __init__(self, file_records=None):
604 self.file_records = file_records or []
606 def __eq__(self, other):
607 if self is other:
608 return True
609 if len(self.file_records) != len(other.file_records):
610 log.debug('Manifests differ in number of files')
611 return False
612 # sort the file records by filename before comparing
613 mine = sorted((fr.filename, fr) for fr in self.file_records)
614 theirs = sorted((fr.filename, fr) for fr in other.file_records)
615 return mine == theirs
617 def __ne__(self, other):
618 return not self.__eq__(other)
620 def __deepcopy__(self, memo):
621 # This is required for a deep copy
622 return Manifest(self.file_records[:])
624 def __copy__(self):
625 return Manifest(self.file_records)
627 def copy(self):
628 return Manifest(self.file_records[:])
630 def present(self):
631 return all(i.present() for i in self.file_records)
633 def validate_sizes(self):
634 return all(i.validate_size() for i in self.file_records)
636 def validate_digests(self):
637 return all(i.validate_digest() for i in self.file_records)
639 def validate(self):
640 return all(i.validate() for i in self.file_records)
642 def load(self, data_file, fmt='json'):
643 assert fmt in self.valid_formats
644 if fmt == 'json':
645 try:
646 self.file_records.extend(
647 json.load(data_file, cls=FileRecordJSONDecoder))
648 except ValueError:
649 raise InvalidManifest("trying to read invalid manifest file")
651 def loads(self, data_string, fmt='json'):
652 assert fmt in self.valid_formats
653 if fmt == 'json':
654 try:
655 self.file_records.extend(
656 json.loads(data_string, cls=FileRecordJSONDecoder))
657 except ValueError:
658 raise InvalidManifest("trying to read invalid manifest file")
660 def dump(self, output_file, fmt='json'):
661 assert fmt in self.valid_formats
662 if fmt == 'json':
663 return json.dump(
664 self.file_records, output_file,
665 indent=2, separators=(',', ': '),
666 cls=FileRecordJSONEncoder,
669 def dumps(self, fmt='json'):
670 assert fmt in self.valid_formats
671 if fmt == 'json':
672 return json.dumps(
673 self.file_records,
674 indent=2, separators=(',', ': '),
675 cls=FileRecordJSONEncoder,
679 def digest_file(f, a):
680 """I take a file like object 'f' and return a hex-string containing
681 of the result of the algorithm 'a' applied to 'f'."""
682 h = hashlib.new(a)
683 chunk_size = 1024 * 10
684 data = f.read(chunk_size)
685 while data:
686 h.update(data)
687 data = f.read(chunk_size)
688 name = repr(f.name) if hasattr(f, 'name') else 'a file'
689 log.debug('hashed %s with %s to be %s', name, a, h.hexdigest())
690 return h.hexdigest()
693 def execute(cmd):
694 """Execute CMD, logging its stdout at the info level"""
695 process = Popen(cmd, shell=True, stdout=PIPE)
696 while True:
697 line = process.stdout.readline()
698 if not line:
699 break
700 log.info(line.replace('\n', ' '))
701 return process.wait() == 0
704 def open_manifest(manifest_file):
705 """I know how to take a filename and load it into a Manifest object"""
706 if os.path.exists(manifest_file):
707 manifest = Manifest()
708 with open(manifest_file, "rb") as f:
709 manifest.load(f)
710 log.debug("loaded manifest from file '%s'" % manifest_file)
711 return manifest
712 else:
713 log.debug("tried to load absent file '%s' as manifest" % manifest_file)
714 raise InvalidManifest(
715 "manifest file '%s' does not exist" % manifest_file)
718 def list_manifest(manifest_file):
719 """I know how print all the files in a location"""
720 try:
721 manifest = open_manifest(manifest_file)
722 except InvalidManifest as e:
723 log.error("failed to load manifest file at '%s': %s" % (
724 manifest_file,
725 str(e),
727 return False
728 for f in manifest.file_records:
729 print("{}\t{}\t{}".format("P" if f.present() else "-",
730 "V" if f.present() and f.validate() else "-",
731 f.filename))
732 return True
735 def validate_manifest(manifest_file):
736 """I validate that all files in a manifest are present and valid but
737 don't fetch or delete them if they aren't"""
738 try:
739 manifest = open_manifest(manifest_file)
740 except InvalidManifest as e:
741 log.error("failed to load manifest file at '%s': %s" % (
742 manifest_file,
743 str(e),
745 return False
746 invalid_files = []
747 absent_files = []
748 for f in manifest.file_records:
749 if not f.present():
750 absent_files.append(f)
751 else:
752 if not f.validate():
753 invalid_files.append(f)
754 if len(invalid_files + absent_files) == 0:
755 return True
756 else:
757 return False
760 def add_files(manifest_file, algorithm, filenames, version, visibility, unpack):
761 # returns True if all files successfully added, False if not
762 # and doesn't catch library Exceptions. If any files are already
763 # tracked in the manifest, return will be False because they weren't
764 # added
765 all_files_added = True
766 # Create a old_manifest object to add to
767 if os.path.exists(manifest_file):
768 old_manifest = open_manifest(manifest_file)
769 else:
770 old_manifest = Manifest()
771 log.debug("creating a new manifest file")
772 new_manifest = Manifest() # use a different manifest for the output
773 for filename in filenames:
774 log.debug("adding %s" % filename)
775 path, name = os.path.split(filename)
776 new_fr = create_file_record(filename, algorithm)
777 new_fr.version = version
778 new_fr.visibility = visibility
779 new_fr.unpack = unpack
780 log.debug("appending a new file record to manifest file")
781 add = True
782 for fr in old_manifest.file_records:
783 log.debug("manifest file has '%s'" % "', ".join(
784 [x.filename for x in old_manifest.file_records]))
785 if new_fr == fr:
786 log.info("file already in old_manifest")
787 add = False
788 elif filename == fr.filename:
789 log.error("manifest already contains a different file named %s" % filename)
790 add = False
791 if add:
792 new_manifest.file_records.append(new_fr)
793 log.debug("added '%s' to manifest" % filename)
794 else:
795 all_files_added = False
796 # copy any files in the old manifest that aren't in the new one
797 new_filenames = set(fr.filename for fr in new_manifest.file_records)
798 for old_fr in old_manifest.file_records:
799 if old_fr.filename not in new_filenames:
800 new_manifest.file_records.append(old_fr)
801 if PY3:
802 with open(manifest_file, mode="w") as output:
803 new_manifest.dump(output, fmt='json')
804 else:
805 with open(manifest_file, mode="wb") as output:
806 new_manifest.dump(output, fmt='json')
807 return all_files_added
810 def touch(f):
811 """Used to modify mtime in cached files;
812 mtime is used by the purge command"""
813 try:
814 os.utime(f, None)
815 except OSError:
816 log.warn('impossible to update utime of file %s' % f)
819 @contextmanager
820 @retriable(sleeptime=2)
821 def request(url, auth_file=None):
822 req = Request(url)
823 _authorize(req, auth_file)
824 with closing(urllib2.urlopen(req)) as f:
825 log.debug("opened %s for reading" % url)
826 yield f
829 def fetch_file(base_urls, file_record, grabchunk=1024 * 4, auth_file=None, region=None):
830 # A file which is requested to be fetched that exists locally will be
831 # overwritten by this function
832 fd, temp_path = tempfile.mkstemp(dir=os.getcwd())
833 os.close(fd)
834 fetched_path = None
835 for base_url in base_urls:
836 # Generate the URL for the file on the server side
837 url = urljoin(base_url,
838 '%s/%s' % (file_record.algorithm, file_record.digest))
839 if region is not None:
840 url += '?region=' + region
842 log.info("Attempting to fetch from '%s'..." % base_url)
844 # Well, the file doesn't exist locally. Let's fetch it.
845 try:
846 with request(url, auth_file) as f, open(temp_path, mode='wb') as out:
847 k = True
848 size = 0
849 while k:
850 # TODO: print statistics as file transfers happen both for info and to stop
851 # buildbot timeouts
852 indata = f.read(grabchunk)
853 out.write(indata)
854 size += len(indata)
855 if len(indata) == 0:
856 k = False
857 log.info("File %s fetched from %s as %s" %
858 (file_record.filename, base_url, temp_path))
859 fetched_path = temp_path
860 break
861 except (URLError, HTTPError, ValueError):
862 log.info("...failed to fetch '%s' from %s" %
863 (file_record.filename, base_url), exc_info=True)
864 except IOError: # pragma: no cover
865 log.info("failed to write to temporary file for '%s'" %
866 file_record.filename, exc_info=True)
868 # cleanup temp file in case of issues
869 if fetched_path:
870 return os.path.split(fetched_path)[1]
871 else:
872 try:
873 os.remove(temp_path)
874 except OSError: # pragma: no cover
875 pass
876 return None
879 def clean_path(dirname):
880 """Remove a subtree if is exists. Helper for unpack_file()."""
881 if os.path.exists(dirname):
882 log.info('rm tree: %s' % dirname)
883 shutil.rmtree(dirname)
886 CHECKSUM_SUFFIX = ".checksum"
889 def unpack_file(filename):
890 """Untar `filename`, assuming it is uncompressed or compressed with bzip2,
891 xz, gzip, or unzip a zip file. The file is assumed to contain a single
892 directory with a name matching the base of the given filename.
893 Xz support is handled by shelling out to 'tar'."""
894 if os.path.isfile(filename) and tarfile.is_tarfile(filename):
895 tar_file, zip_ext = os.path.splitext(filename)
896 base_file, tar_ext = os.path.splitext(tar_file)
897 clean_path(base_file)
898 log.info('untarring "%s"' % filename)
899 tar = tarfile.open(filename)
900 tar.extractall()
901 tar.close()
902 elif os.path.isfile(filename) and filename.endswith('.tar.xz'):
903 base_file = filename.replace('.tar.xz', '')
904 clean_path(base_file)
905 log.info('untarring "%s"' % filename)
906 # Not using tar -Jxf because it fails on Windows for some reason.
907 process = Popen(['xz', '-d', '-c', filename], stdout=PIPE)
908 stdout, stderr = process.communicate()
909 if process.returncode != 0:
910 return False
911 fileobj = BytesIO()
912 fileobj.write(stdout)
913 fileobj.seek(0)
914 tar = tarfile.open(fileobj=fileobj, mode='r|')
915 tar.extractall()
916 tar.close()
917 elif os.path.isfile(filename) and zipfile.is_zipfile(filename):
918 base_file = filename.replace('.zip', '')
919 clean_path(base_file)
920 log.info('unzipping "%s"' % filename)
921 z = zipfile.ZipFile(filename)
922 z.extractall()
923 z.close()
924 else:
925 log.error("Unknown archive extension for filename '%s'" % filename)
926 return False
927 return True
930 def fetch_files(manifest_file, base_urls, filenames=[], cache_folder=None,
931 auth_file=None, region=None):
932 # Lets load the manifest file
933 try:
934 manifest = open_manifest(manifest_file)
935 except InvalidManifest as e:
936 log.error("failed to load manifest file at '%s': %s" % (
937 manifest_file,
938 str(e),
940 return False
942 # we want to track files already in current working directory AND valid
943 # we will not need to fetch these
944 present_files = []
946 # We want to track files that fail to be fetched as well as
947 # files that are fetched
948 failed_files = []
949 fetched_files = []
951 # Files that we want to unpack.
952 unpack_files = []
954 # Lets go through the manifest and fetch the files that we want
955 for f in manifest.file_records:
956 # case 1: files are already present
957 if f.present():
958 if f.validate():
959 present_files.append(f.filename)
960 if f.unpack:
961 unpack_files.append(f.filename)
962 else:
963 # we have an invalid file here, better to cleanup!
964 # this invalid file needs to be replaced with a good one
965 # from the local cash or fetched from a tooltool server
966 log.info("File %s is present locally but it is invalid, so I will remove it "
967 "and try to fetch it" % f.filename)
968 os.remove(os.path.join(os.getcwd(), f.filename))
970 # check if file is already in cache
971 if cache_folder and f.filename not in present_files:
972 try:
973 shutil.copy(os.path.join(cache_folder, f.digest),
974 os.path.join(os.getcwd(), f.filename))
975 log.info("File %s retrieved from local cache %s" %
976 (f.filename, cache_folder))
977 touch(os.path.join(cache_folder, f.digest))
979 filerecord_for_validation = FileRecord(
980 f.filename, f.size, f.digest, f.algorithm)
981 if filerecord_for_validation.validate():
982 present_files.append(f.filename)
983 if f.unpack:
984 unpack_files.append(f.filename)
985 else:
986 # the file copied from the cache is invalid, better to
987 # clean up the cache version itself as well
988 log.warn("File %s retrieved from cache is invalid! I am deleting it from the "
989 "cache as well" % f.filename)
990 os.remove(os.path.join(os.getcwd(), f.filename))
991 os.remove(os.path.join(cache_folder, f.digest))
992 except IOError:
993 log.info("File %s not present in local cache folder %s" %
994 (f.filename, cache_folder))
996 # now I will try to fetch all files which are not already present and
997 # valid, appending a suffix to avoid race conditions
998 temp_file_name = None
999 # 'filenames' is the list of filenames to be managed, if this variable
1000 # is a non empty list it can be used to filter if filename is in
1001 # present_files, it means that I have it already because it was already
1002 # either in the working dir or in the cache
1003 if (f.filename in filenames or len(filenames) == 0) and f.filename not in present_files:
1004 log.debug("fetching %s" % f.filename)
1005 temp_file_name = fetch_file(base_urls, f, auth_file=auth_file, region=region)
1006 if temp_file_name:
1007 fetched_files.append((f, temp_file_name))
1008 else:
1009 failed_files.append(f.filename)
1010 else:
1011 log.debug("skipping %s" % f.filename)
1013 # lets ensure that fetched files match what the manifest specified
1014 for localfile, temp_file_name in fetched_files:
1015 # since I downloaded to a temp file, I need to perform all validations on the temp file
1016 # this is why filerecord_for_validation is created
1018 filerecord_for_validation = FileRecord(
1019 temp_file_name, localfile.size, localfile.digest, localfile.algorithm)
1021 if filerecord_for_validation.validate():
1022 # great!
1023 # I can rename the temp file
1024 log.info("File integrity verified, renaming %s to %s" %
1025 (temp_file_name, localfile.filename))
1026 os.rename(os.path.join(os.getcwd(), temp_file_name),
1027 os.path.join(os.getcwd(), localfile.filename))
1029 if localfile.unpack:
1030 unpack_files.append(localfile.filename)
1032 # if I am using a cache and a new file has just been retrieved from a
1033 # remote location, I need to update the cache as well
1034 if cache_folder:
1035 log.info("Updating local cache %s..." % cache_folder)
1036 try:
1037 if not os.path.exists(cache_folder):
1038 log.info("Creating cache in %s..." % cache_folder)
1039 os.makedirs(cache_folder, 0o0700)
1040 shutil.copy(os.path.join(os.getcwd(), localfile.filename),
1041 os.path.join(cache_folder, localfile.digest))
1042 log.info("Local cache %s updated with %s" % (cache_folder,
1043 localfile.filename))
1044 touch(os.path.join(cache_folder, localfile.digest))
1045 except (OSError, IOError):
1046 log.warning('Impossible to add file %s to cache folder %s' %
1047 (localfile.filename, cache_folder), exc_info=True)
1048 else:
1049 failed_files.append(localfile.filename)
1050 log.error("'%s'" % filerecord_for_validation.describe())
1051 os.remove(temp_file_name)
1053 # Unpack files that need to be unpacked.
1054 for filename in unpack_files:
1055 if not unpack_file(filename):
1056 failed_files.append(filename)
1058 # If we failed to fetch or validate a file, we need to fail
1059 if len(failed_files) > 0:
1060 log.error("The following files failed: '%s'" %
1061 "', ".join(failed_files))
1062 return False
1063 return True
1066 def freespace(p):
1067 "Returns the number of bytes free under directory `p`"
1068 if sys.platform == 'win32': # pragma: no cover
1069 # os.statvfs doesn't work on Windows
1070 import win32file
1072 secsPerClus, bytesPerSec, nFreeClus, totClus = win32file.GetDiskFreeSpace(
1074 return secsPerClus * bytesPerSec * nFreeClus
1075 else:
1076 r = os.statvfs(p)
1077 return r.f_frsize * r.f_bavail
1080 def purge(folder, gigs):
1081 """If gigs is non 0, it deletes files in `folder` until `gigs` GB are free,
1082 starting from older files. If gigs is 0, a full purge will be performed.
1083 No recursive deletion of files in subfolder is performed."""
1085 full_purge = bool(gigs == 0)
1086 gigs *= 1024 * 1024 * 1024
1088 if not full_purge and freespace(folder) >= gigs:
1089 log.info("No need to cleanup")
1090 return
1092 files = []
1093 for f in os.listdir(folder):
1094 p = os.path.join(folder, f)
1095 # it deletes files in folder without going into subfolders,
1096 # assuming the cache has a flat structure
1097 if not os.path.isfile(p):
1098 continue
1099 mtime = os.path.getmtime(p)
1100 files.append((mtime, p))
1102 # iterate files sorted by mtime
1103 for _, f in sorted(files):
1104 log.info("removing %s to free up space" % f)
1105 try:
1106 os.remove(f)
1107 except OSError:
1108 log.info("Impossible to remove %s" % f, exc_info=True)
1109 if not full_purge and freespace(folder) >= gigs:
1110 break
1113 def _log_api_error(e):
1114 if hasattr(e, 'hdrs') and e.hdrs['content-type'] == 'application/json':
1115 json_resp = json.load(e.fp)
1116 log.error("%s: %s" % (json_resp['error']['name'],
1117 json_resp['error']['description']))
1118 else:
1119 log.exception("Error making RelengAPI request:")
1122 def _authorize(req, auth_file):
1123 if not auth_file:
1124 return
1126 is_taskcluster_auth = False
1127 with open(auth_file) as f:
1128 auth_file_content = f.read().strip()
1129 try:
1130 auth_file_content = json.loads(auth_file_content)
1131 is_taskcluster_auth = True
1132 except Exception:
1133 pass
1135 if is_taskcluster_auth:
1136 taskcluster_header = make_taskcluster_header(auth_file_content, req)
1137 log.debug("Using taskcluster credentials in %s" % auth_file)
1138 req.add_unredirected_header('Authorization', taskcluster_header)
1139 else:
1140 log.debug("Using Bearer token in %s" % auth_file)
1141 req.add_unredirected_header('Authorization', 'Bearer %s' % auth_file_content)
1144 def _send_batch(base_url, auth_file, batch, region):
1145 url = urljoin(base_url, 'upload')
1146 if region is not None:
1147 url += "?region=" + region
1148 data = json.dumps(batch)
1149 if PY3:
1150 data = data.encode("utf-8")
1151 req = Request(url, data, {'Content-Type': 'application/json'})
1152 _authorize(req, auth_file)
1153 try:
1154 resp = urllib2.urlopen(req)
1155 except (URLError, HTTPError) as e:
1156 _log_api_error(e)
1157 return None
1158 return json.load(resp)['result']
1161 def _s3_upload(filename, file):
1162 # urllib2 does not support streaming, so we fall back to good old httplib
1163 url = urlparse(file['put_url'])
1164 cls = HTTPSConnection if url.scheme == 'https' else HTTPConnection
1165 host, port = url.netloc.split(':') if ':' in url.netloc else (url.netloc, 443)
1166 port = int(port)
1167 conn = cls(host, port)
1168 try:
1169 req_path = "%s?%s" % (url.path, url.query) if url.query else url.path
1170 with open(filename, 'rb') as f:
1171 content = f.read()
1172 content_length = len(content)
1173 f.seek(0)
1174 conn.request(
1175 'PUT',
1176 req_path,
1179 'Content-Type': 'application/octet-stream',
1180 'Content-Length': str(content_length),
1183 resp = conn.getresponse()
1184 resp_body = resp.read()
1185 conn.close()
1186 if resp.status != 200:
1187 raise RuntimeError("Non-200 return from AWS: %s %s\n%s" %
1188 (resp.status, resp.reason, resp_body))
1189 except Exception:
1190 file['upload_exception'] = sys.exc_info()
1191 file['upload_ok'] = False
1192 else:
1193 file['upload_ok'] = True
1196 def _notify_upload_complete(base_url, auth_file, file):
1197 req = Request(
1198 urljoin(
1199 base_url,
1200 'upload/complete/%(algorithm)s/%(digest)s' % file))
1201 _authorize(req, auth_file)
1202 try:
1203 urllib2.urlopen(req)
1204 except HTTPError as e:
1205 if e.code != 409:
1206 _log_api_error(e)
1207 return
1208 # 409 indicates that the upload URL hasn't expired yet and we
1209 # should retry after a delay
1210 to_wait = int(e.headers.get('X-Retry-After', 60))
1211 log.warning("Waiting %d seconds for upload URLs to expire" % to_wait)
1212 time.sleep(to_wait)
1213 _notify_upload_complete(base_url, auth_file, file)
1214 except Exception:
1215 log.exception("While notifying server of upload completion:")
1218 def upload(manifest, message, base_urls, auth_file, region):
1219 try:
1220 manifest = open_manifest(manifest)
1221 except InvalidManifest:
1222 log.exception("failed to load manifest file at '%s'")
1223 return False
1225 # verify the manifest, since we'll need the files present to upload
1226 if not manifest.validate():
1227 log.error('manifest is invalid')
1228 return False
1230 if any(fr.visibility is None for fr in manifest.file_records):
1231 log.error('All files in a manifest for upload must have a visibility set')
1233 # convert the manifest to an upload batch
1234 batch = {
1235 'message': message,
1236 'files': {},
1238 for fr in manifest.file_records:
1239 batch['files'][fr.filename] = {
1240 'size': fr.size,
1241 'digest': fr.digest,
1242 'algorithm': fr.algorithm,
1243 'visibility': fr.visibility,
1246 # make the upload request
1247 resp = _send_batch(base_urls[0], auth_file, batch, region)
1248 if not resp:
1249 return None
1250 files = resp['files']
1252 # Upload the files, each in a thread. This allows us to start all of the
1253 # uploads before any of the URLs expire.
1254 threads = {}
1255 for filename, file in files.items():
1256 if 'put_url' in file:
1257 log.info("%s: starting upload" % (filename,))
1258 thd = threading.Thread(target=_s3_upload,
1259 args=(filename, file))
1260 thd.daemon = 1
1261 thd.start()
1262 threads[filename] = thd
1263 else:
1264 log.info("%s: already exists on server" % (filename,))
1266 # re-join all of those threads as they exit
1267 success = True
1268 while threads:
1269 for filename, thread in list(threads.items()):
1270 if not thread.is_alive():
1271 # _s3_upload has annotated file with result information
1272 file = files[filename]
1273 thread.join()
1274 if file['upload_ok']:
1275 log.info("%s: uploaded" % filename)
1276 else:
1277 log.error("%s: failed" % filename,
1278 exc_info=file['upload_exception'])
1279 success = False
1280 del threads[filename]
1282 # notify the server that the uploads are completed. If the notification
1283 # fails, we don't consider that an error (the server will notice
1284 # eventually)
1285 for filename, file in files.items():
1286 if 'put_url' in file and file['upload_ok']:
1287 log.info("notifying server of upload completion for %s" % (filename,))
1288 _notify_upload_complete(base_urls[0], auth_file, file)
1290 return success
1293 def send_operation_on_file(data, base_urls, digest, auth_file):
1294 url = base_urls[0]
1295 url = urljoin(url, 'file/sha512/' + digest)
1297 data = json.dumps(data)
1299 req = Request(url, data, {'Content-Type': 'application/json'})
1300 req.get_method = lambda: 'PATCH'
1302 _authorize(req, auth_file)
1304 try:
1305 urllib2.urlopen(req)
1306 except (URLError, HTTPError) as e:
1307 _log_api_error(e)
1308 return False
1309 return True
1312 def change_visibility(base_urls, digest, visibility, auth_file):
1313 data = [{
1314 "op": "set_visibility",
1315 "visibility": visibility,
1317 return send_operation_on_file(data, base_urls, digest, visibility, auth_file)
1320 def delete_instances(base_urls, digest, auth_file):
1321 data = [{
1322 "op": "delete_instances",
1324 return send_operation_on_file(data, base_urls, digest, auth_file)
1327 def process_command(options, args):
1328 """ I know how to take a list of program arguments and
1329 start doing the right thing with them"""
1330 cmd = args[0]
1331 cmd_args = args[1:]
1332 log.debug("processing '%s' command with args '%s'" %
1333 (cmd, '", "'.join(cmd_args)))
1334 log.debug("using options: %s" % options)
1336 if cmd == 'list':
1337 return list_manifest(options['manifest'])
1338 if cmd == 'validate':
1339 return validate_manifest(options['manifest'])
1340 elif cmd == 'add':
1341 return add_files(options['manifest'], options['algorithm'], cmd_args,
1342 options['version'], options['visibility'],
1343 options['unpack'])
1344 elif cmd == 'purge':
1345 if options['cache_folder']:
1346 purge(folder=options['cache_folder'], gigs=options['size'])
1347 else:
1348 log.critical('please specify the cache folder to be purged')
1349 return False
1350 elif cmd == 'fetch':
1351 return fetch_files(
1352 options['manifest'],
1353 options['base_url'],
1354 cmd_args,
1355 cache_folder=options['cache_folder'],
1356 auth_file=options.get("auth_file"),
1357 region=options.get('region'))
1358 elif cmd == 'upload':
1359 if not options.get('message'):
1360 log.critical('upload command requires a message')
1361 return False
1362 return upload(
1363 options.get('manifest'),
1364 options.get('message'),
1365 options.get('base_url'),
1366 options.get('auth_file'),
1367 options.get('region'))
1368 elif cmd == 'change-visibility':
1369 if not options.get('digest'):
1370 log.critical('change-visibility command requires a digest option')
1371 return False
1372 if not options.get('visibility'):
1373 log.critical('change-visibility command requires a visibility option')
1374 return False
1375 return change_visibility(
1376 options.get('base_url'),
1377 options.get('digest'),
1378 options.get('visibility'),
1379 options.get('auth_file'),
1381 elif cmd == 'delete':
1382 if not options.get('digest'):
1383 log.critical('change-visibility command requires a digest option')
1384 return False
1385 return delete_instances(
1386 options.get('base_url'),
1387 options.get('digest'),
1388 options.get('auth_file'),
1390 else:
1391 log.critical('command "%s" is not implemented' % cmd)
1392 return False
1395 def main(argv, _skip_logging=False):
1396 # Set up option parsing
1397 parser = optparse.OptionParser()
1398 parser.add_option('-q', '--quiet', default=logging.INFO,
1399 dest='loglevel', action='store_const', const=logging.ERROR)
1400 parser.add_option('-v', '--verbose',
1401 dest='loglevel', action='store_const', const=logging.DEBUG)
1402 parser.add_option('-m', '--manifest', default=DEFAULT_MANIFEST_NAME,
1403 dest='manifest', action='store',
1404 help='specify the manifest file to be operated on')
1405 parser.add_option('-d', '--algorithm', default='sha512',
1406 dest='algorithm', action='store',
1407 help='hashing algorithm to use (only sha512 is allowed)')
1408 parser.add_option('--digest', default=None,
1409 dest='digest', action='store',
1410 help='digest hash to change visibility for')
1411 parser.add_option('--visibility', default=None,
1412 dest='visibility', choices=['internal', 'public'],
1413 help='Visibility level of this file; "internal" is for '
1414 'files that cannot be distributed out of the company '
1415 'but not for secrets; "public" files are available to '
1416 'anyone without restriction')
1417 parser.add_option('--unpack', default=False,
1418 dest='unpack', action='store_true',
1419 help='Request unpacking this file after fetch.'
1420 ' This is helpful with tarballs.')
1421 parser.add_option('--version', default=None,
1422 dest='version', action='store',
1423 help='Version string for this file. This annotates the '
1424 'manifest entry with a version string to help '
1425 'identify the contents.')
1426 parser.add_option('-o', '--overwrite', default=False,
1427 dest='overwrite', action='store_true',
1428 help='UNUSED; present for backward compatibility')
1429 parser.add_option('--url', dest='base_url', action='append',
1430 help='RelengAPI URL ending with /tooltool/; default '
1431 'is appropriate for Mozilla')
1432 parser.add_option('-c', '--cache-folder', dest='cache_folder',
1433 help='Local cache folder')
1434 parser.add_option('-s', '--size',
1435 help='free space required (in GB)', dest='size',
1436 type='float', default=0.)
1437 parser.add_option('-r', '--region', help='Preferred AWS region for upload or fetch; '
1438 'example: --region=us-west-2')
1439 parser.add_option('--message',
1440 help='The "commit message" for an upload; format with a bug number '
1441 'and brief comment',
1442 dest='message')
1443 parser.add_option('--authentication-file',
1444 help='Use the RelengAPI token found in the given file to '
1445 'authenticate to the RelengAPI server.',
1446 dest='auth_file')
1448 (options_obj, args) = parser.parse_args(argv[1:])
1450 if not options_obj.base_url:
1451 tooltool_host = os.environ.get('TOOLTOOL_HOST', 'tooltool.mozilla-releng.net')
1452 taskcluster_proxy_url = os.environ.get('TASKCLUSTER_PROXY_URL')
1453 if taskcluster_proxy_url:
1454 tooltool_url = '{}/{}'.format(taskcluster_proxy_url, tooltool_host)
1455 else:
1456 tooltool_url = 'https://{}'.format(tooltool_host)
1458 options_obj.base_url = [tooltool_url]
1460 # ensure all URLs have a trailing slash
1461 def add_slash(url):
1462 return url if url.endswith('/') else (url + '/')
1463 options_obj.base_url = [add_slash(u) for u in options_obj.base_url]
1465 # expand ~ in --authentication-file
1466 if options_obj.auth_file:
1467 options_obj.auth_file = os.path.expanduser(options_obj.auth_file)
1469 # Dictionaries are easier to work with
1470 options = vars(options_obj)
1472 log.setLevel(options['loglevel'])
1474 # Set up logging, for now just to the console
1475 if not _skip_logging: # pragma: no cover
1476 ch = logging.StreamHandler()
1477 cf = logging.Formatter("%(levelname)s - %(message)s")
1478 ch.setFormatter(cf)
1479 log.addHandler(ch)
1481 if options['algorithm'] != 'sha512':
1482 parser.error('only --algorithm sha512 is supported')
1484 if len(args) < 1:
1485 parser.error('You must specify a command')
1487 return 0 if process_command(options, args) else 1
1490 if __name__ == "__main__": # pragma: no cover
1491 sys.exit(main(sys.argv))