Bug 1890689 accumulate input in LargerReceiverBlockSizeThanDesiredBuffering GTest...
[gecko.git] / taskcluster / scripts / run-task
blob2f3f6460db2735163fe1dd4f7a346e20a42ea93c
1 #!/usr/bin/python3 -u
2 # This Source Code Form is subject to the terms of the Mozilla Public
3 # License, v. 2.0. If a copy of the MPL was not distributed with this
4 # file, You can obtain one at http://mozilla.org/MPL/2.0/.
6 """Run a task after performing common actions.
8 This script is meant to be the "driver" for TaskCluster based tasks.
9 It receives some common arguments to control the run-time environment.
11 It performs actions as requested from the arguments. Then it executes
12 the requested process and prints its output, prefixing it with the
13 current time to improve log usefulness.
14 """
16 import sys
19 if sys.version_info[0:2] < (3, 5):
20     print('run-task requires Python 3.5+')
21     sys.exit(1)
24 import argparse
25 import datetime
26 import errno
27 import io
28 import json
29 import os
30 import random
31 import re
32 import shutil
33 import signal
34 import socket
35 import stat
36 import subprocess
38 import urllib.error
39 import urllib.request
41 from threading import Thread
43 FINGERPRINT_URL = 'http://taskcluster/secrets/v1/secret/project/taskcluster/gecko/hgfingerprint'
44 FALLBACK_FINGERPRINT = {
45     'fingerprints':
46         "sha256:4D:EB:21:6E:35:2F:99:C6:8F:C3:47:9B:57:B8:6C:17:15:8F:86:09:D4:6C:17:1D:87:B0:DE:F9:0E:51:70:FC,"
47         "sha256:90:85:39:A8:4F:47:20:58:98:0D:48:4D:8A:AC:71:DB:5C:AF:76:44:F1:B1:3E:56:92:FF:21:8C:C9:A9:F7:11"
50 HGMOINTERNAL_CONFIG_URL = 'http://taskcluster/secrets/v1/secret/project/taskcluster/gecko/hgmointernal'
52 CACHE_UID_GID_MISMATCH = '''
53 There is a UID/GID mismatch on the cache. This likely means:
55 a) different tasks are running as a different user/group
56 b) different Docker images have different UID/GID for the same user/group
58 Our cache policy is that the UID/GID for ALL tasks must be consistent
59 for the lifetime of the cache. This eliminates permissions problems due
60 to file/directory user/group ownership.
62 To make this error go away, ensure that all Docker images are use
63 a consistent UID/GID and that all tasks using this cache are running as
64 the same user/group.
65 '''
68 NON_EMPTY_VOLUME = '''
69 error: volume %s is not empty
71 Our Docker image policy requires volumes to be empty.
73 The volume was likely populated as part of building the Docker image.
74 Change the Dockerfile and anything run from it to not create files in
75 any VOLUME.
77 A lesser possibility is that you stumbled upon a TaskCluster platform bug
78 where it fails to use new volumes for tasks.
79 '''
82 FETCH_CONTENT_NOT_FOUND = '''
83 error: fetch-content script not found
85 The script at `taskcluster/scripts/misc/fetch-content` could not be
86 detected in the current environment.
88 If this task clones gecko, make sure the GECKO_PATH environment variable
89 is set to proper location. Otherwise, the script may need to be mounted
90 or added to the task's docker image then added to the PATH.
91 '''
93 # The exit code to use when caches should be purged and the task retried.
94 # This is EX_OSFILE (from sysexits.h):
95 #     Some system file  does not exist, cannot be opened, or has some
96 #     sort of error (e.g., syntax error).
97 EXIT_PURGE_CACHE = 72
100 IS_MACOSX = sys.platform == 'darwin'
101 IS_POSIX = os.name == 'posix'
102 IS_WINDOWS = os.name == 'nt'
105 def print_line(prefix, m):
106     now = datetime.datetime.utcnow().isoformat().encode('utf-8')
107     # slice microseconds to 3 decimals.
108     now = now[:-3] if now[-7:-6] == b'.' else now
109     bytes = b'[%s %sZ] %s' % (prefix, now, m)
110     written = 0
111     while written < len(bytes):
112         written += (sys.stdout.buffer.write(bytes[written:]) or 0)
113     sys.stdout.buffer.flush()
116 def run_and_prefix_output(prefix, args, *, extra_env=None, cwd=None):
117     """Runs a process and prefixes its output with the time.
119     Returns the process exit code.
120     """
121     print_line(
122         prefix,
123         b"executing %r%s\n" % (args, b"in %s" % (cwd.encode("utf-8"),) if cwd else b""),
124     )
126     env = dict(os.environ)
127     env.update(extra_env or {})
129     # Note: TaskCluster's stdin is a TTY. This attribute is lost
130     # when we pass sys.stdin to the invoked process. If we cared
131     # to preserve stdin as a TTY, we could make this work. But until
132     # someone needs it, don't bother.
134     # We want stdout to be bytes on Python 3. That means we can't use
135     # universal_newlines=True (because it implies text mode). But
136     # p.stdout.readline() won't work for bytes text streams. So, on Python 3,
137     # we manually install a latin1 stream wrapper. This allows us to readline()
138     # and preserves bytes, without losing any data.
140     p = subprocess.Popen(args,
141                          # Disable buffering because we want to receive output
142                          # as it is generated so timestamps in logs are
143                          # accurate.
144                          bufsize=0,
145                          stdout=subprocess.PIPE,
146                          stderr=subprocess.STDOUT,
147                          stdin=sys.stdin.fileno(),
148                          env=env,
149                          cwd=cwd)
151     stdout = io.TextIOWrapper(p.stdout, encoding='latin1')
153     while True:
154         data = stdout.readline().encode('latin1')
156         if data == b'':
157             break
159         print_line(prefix, data)
161     return p.wait()
164 def get_posix_user_group(user, group):
165     import grp
166     import pwd
168     try:
169         user_record = pwd.getpwnam(user)
170     except KeyError:
171         print('could not find user %s; specify a valid user with --user' % user)
172         sys.exit(1)
174     try:
175         group_record = grp.getgrnam(group)
176     except KeyError:
177         print('could not find group %s; specify a valid group with --group' %
178               group)
179         sys.exit(1)
181     # Most tasks use worker:worker. We require they have a specific numeric ID
182     # because otherwise it is too easy for files written to caches to have
183     # mismatched numeric IDs, which results in permissions errors.
184     if user_record.pw_name == 'worker' and user_record.pw_uid != 1000:
185         print('user `worker` must have uid=1000; got %d' % user_record.pw_uid)
186         sys.exit(1)
188     if group_record.gr_name == 'worker' and group_record.gr_gid != 1000:
189         print('group `worker` must have gid=1000; got %d' % group_record.gr_gid)
190         sys.exit(1)
192     # Find all groups to which this user is a member.
193     gids = [g.gr_gid for g in grp.getgrall() if group in g.gr_mem]
195     return user_record, group_record, gids
198 def write_audit_entry(path, msg):
199     now = datetime.datetime.utcnow().isoformat().encode('utf-8')
200     with open(path, 'ab') as fh:
201         fh.write(b'[%sZ %s] %s\n' % (
202                  now, os.environb.get(b'TASK_ID', b'UNKNOWN'), msg))
205 WANTED_DIR_MODE = stat.S_IXUSR | stat.S_IRUSR | stat.S_IWUSR
208 def set_dir_permissions(path, uid, gid):
209     st = os.lstat(path)
211     if st.st_uid != uid or st.st_gid != gid:
212         os.chown(path, uid, gid)
214     # Also make sure dirs are writable in case we need to delete
215     # them.
216     if st.st_mode & WANTED_DIR_MODE != WANTED_DIR_MODE:
217         os.chmod(path, st.st_mode | WANTED_DIR_MODE)
220 def chown_recursive(path, user, group, uid, gid):
221     print_line(b'chown',
222                b'recursively changing ownership of %s to %s:%s\n' %
223                (path.encode('utf-8'), user.encode('utf-8'), group.encode(
224                    'utf-8')))
226     set_dir_permissions(path, uid, gid)
228     for root, dirs, files in os.walk(path):
229         for d in dirs:
230             set_dir_permissions(os.path.join(root, d), uid, gid)
232         for f in files:
233             # File may be a symlink that points to nowhere. In which case
234             # os.chown() would fail because it attempts to follow the
235             # symlink. We only care about directory entries, not what
236             # they point to. So setting the owner of the symlink should
237             # be sufficient.
238             os.lchown(os.path.join(root, f), uid, gid)
241 def configure_cache_posix(cache, user, group,
242                           untrusted_caches, running_as_root):
243     """Configure a cache path on POSIX platforms.
245     For each cache, we write out a special file denoting attributes and
246     capabilities of run-task and the task being executed. These attributes
247     are used by subsequent run-task invocations to validate that use of
248     the cache is acceptable.
250     We /could/ blow away the cache data on requirements mismatch.
251     While this would be convenient, this could result in "competing" tasks
252     effectively undoing the other's work. This would slow down task
253     execution in aggregate. Without monitoring for this, people may not notice
254     the problem and tasks would be slower than they could be. We follow the
255     principle of "fail fast" to ensure optimal task execution.
257     We also write an audit log of who used the caches. This log is printed
258     during failures to help aid debugging.
259     """
261     our_requirements = {
262         # Include a version string that we can bump whenever to trigger
263         # fresh caches. The actual value is not relevant and doesn't need
264         # to follow any explicit order. Since taskgraph bakes this file's
265         # hash into cache names, any change to this file/version is sufficient
266         # to force the use of a new cache.
267         b'version=1',
268         # Include the UID and GID the task will run as to ensure that tasks
269         # with different UID and GID don't share the same cache.
270         b'uid=%d' % user.pw_uid,
271         b'gid=%d' % group.gr_gid,
272     }
274     requires_path = os.path.join(cache, '.cacherequires')
275     audit_path = os.path.join(cache, '.cachelog')
277     # The cache is empty. Configure it.
278     if not os.listdir(cache):
279         print_line(b'cache', b'cache %s is empty; writing requirements: '
280                              b'%s\n' % (
281                                  cache.encode('utf-8'), b' '.join(sorted(our_requirements))))
283         # We write a requirements file so future invocations know what the
284         # requirements are.
285         with open(requires_path, 'wb') as fh:
286             fh.write(b'\n'.join(sorted(our_requirements)))
288         # And make it read-only as a precaution against deletion.
289         os.chmod(requires_path, stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH)
291         write_audit_entry(audit_path,
292                           b'created; requirements: %s' %
293                           b', '.join(sorted(our_requirements)))
295         set_dir_permissions(cache, user.pw_uid, group.gr_gid)
296         return
298     # The cache has content and we have a requirements file. Validate
299     # requirements alignment.
300     if os.path.exists(requires_path):
301         with open(requires_path, 'rb') as fh:
302             wanted_requirements = set(fh.read().splitlines())
304         print_line(b'cache', b'cache %s exists; requirements: %s\n' % (
305             cache.encode('utf-8'), b' '.join(sorted(wanted_requirements))))
307         missing = wanted_requirements - our_requirements
309         # Allow requirements mismatch for uid/gid if and only if caches
310         # are untrusted. This allows cache behavior on Try to be
311         # reasonable. Otherwise, random tasks could "poison" cache
312         # usability by introducing uid/gid mismatches. For untrusted
313         # environments like Try, this is a perfectly reasonable thing to
314         # allow.
315         if missing and untrusted_caches and running_as_root and \
316                 all(s.startswith((b'uid=', b'gid=')) for s in missing):
317             print_line(b'cache',
318                        b'cache %s uid/gid mismatch; this is acceptable '
319                        b'because caches for this task are untrusted; '
320                        b'changing ownership to facilitate cache use\n' %
321                        cache.encode('utf-8'))
322             chown_recursive(cache, user.pw_name, group.gr_name, user.pw_uid,
323                             group.gr_gid)
325             # And write out the updated reality.
326             with open(requires_path, 'wb') as fh:
327                 fh.write(b'\n'.join(sorted(our_requirements)))
329             write_audit_entry(audit_path,
330                               b'chown; requirements: %s' %
331                               b', '.join(sorted(our_requirements)))
333         elif missing:
334             print('error: requirements for populated cache %s differ from '
335                   'this task' % cache)
336             print('cache requirements: %s' % ' '.join(sorted(
337                 s.decode('utf-8') for s in wanted_requirements)))
338             print('our requirements:   %s' % ' '.join(sorted(
339                 s.decode('utf-8') for s in our_requirements)))
340             if any(s.startswith((b'uid=', b'gid=')) for s in missing):
341                 print(CACHE_UID_GID_MISMATCH)
343             write_audit_entry(audit_path,
344                               b'requirements mismatch; wanted: %s' %
345                               b', '.join(sorted(our_requirements)))
347             print('')
348             print('audit log:')
349             with open(audit_path, 'r') as fh:
350                 print(fh.read())
352             return True
353         else:
354             write_audit_entry(audit_path, b'used')
356         # We don't need to adjust permissions here because the cache is
357         # associated with a uid/gid and the first task should have set
358         # a proper owner/group.
360         return
362     # The cache has content and no requirements file. This shouldn't
363     # happen because run-task should be the first thing that touches a
364     # cache.
365     print('error: cache %s is not empty and is missing a '
366           '.cacherequires file; the cache names for this task are '
367           'likely mis-configured or TASKCLUSTER_CACHES is not set '
368           'properly' % cache)
370     write_audit_entry(audit_path, b'missing .cacherequires')
371     return True
374 def configure_volume_posix(volume, user, group, running_as_root):
375     # The only time we should see files in the volume is if the Docker
376     # image build put files there.
377     #
378     # For the sake of simplicity, our policy is that volumes should be
379     # empty. This also has the advantage that an empty volume looks
380     # a lot like an empty cache. Tasks can rely on caches being
381     # swapped in and out on any volume without any noticeable change
382     # of behavior.
383     volume_files = os.listdir(volume)
384     if volume_files:
385         print(NON_EMPTY_VOLUME % volume)
386         print('entries in root directory: %s' %
387               ' '.join(sorted(volume_files)))
388         sys.exit(1)
390     # The volume is almost certainly owned by root:root. Chown it so it
391     # is writable.
393     if running_as_root:
394         print_line(b'volume', b'changing ownership of volume %s '
395                               b'to %d:%d\n' % (volume.encode('utf-8'),
396                                                user.pw_uid,
397                                                group.gr_gid))
398         set_dir_permissions(volume, user.pw_uid, group.gr_gid)
401 def vcs_checkout(source_repo, dest, store_path,
402                  base_repo=None, revision=None, branch=None,
403                  fetch_hgfingerprint=False, sparse_profile=None):
404     # Specify method to checkout a revision. This defaults to revisions as
405     # SHA-1 strings, but also supports symbolic revisions like `tip` via the
406     # branch flag.
407     if revision:
408         revision_flag = '--revision'
409         revision_value = revision
410     elif branch:
411         revision_flag = '--branch'
412         revision_value = branch
413     else:
414         print('revision is not specified for checkout')
415         sys.exit(1)
417     if IS_MACOSX or IS_POSIX:
418         hg_bin = 'hg'
419     elif IS_WINDOWS:
420         # This is where OCC installs it in the AMIs.
421         hg_bin = r'C:\Program Files\Mercurial\hg.exe'
422         if not os.path.exists(hg_bin):
423             print('could not find Mercurial executable: %s' % hg_bin)
424             sys.exit(1)
426     store_path = os.path.abspath(store_path)
427     args = [
428         hg_bin,
429         'robustcheckout',
430         '--sharebase', store_path,
431         '--purge',
432     ]
434     # Obtain certificate fingerprints.  Without this, the checkout will use the fingerprint
435     # on the system, which is managed some other way (such as puppet)
436     if fetch_hgfingerprint:
437         try:
438             print_line(b'vcs', b'fetching hg.mozilla.org fingerprint from %s\n' %
439                        FINGERPRINT_URL.encode('utf-8'))
440             res = urllib.request.urlopen(FINGERPRINT_URL, timeout=10)
441             secret = res.read()
442             try:
443                 secret = json.loads(secret.decode('utf-8'))
444             except ValueError:
445                 print_line(b'vcs', b'invalid JSON in hg fingerprint secret')
446                 sys.exit(1)
447         except (urllib.error.URLError, socket.timeout):
448             print_line(b'vcs', b'Unable to retrieve current hg.mozilla.org fingerprint'
449                                b'using the secret service, using fallback instead.')
450             # XXX This fingerprint will not be accurate if running on an old
451             #     revision after the server fingerprint has changed.
452             secret = {'secret': FALLBACK_FINGERPRINT}
454         hgmo_fingerprint = secret['secret']['fingerprints']
455         args.extend([
456             '--config', 'hostsecurity.hg.mozilla.org:fingerprints=%s' % hgmo_fingerprint,
457         ])
459     if base_repo:
460         args.extend(['--upstream', base_repo])
461     if sparse_profile:
462         args.extend(['--sparseprofile', sparse_profile])
464     dest = os.path.abspath(dest)
465     args.extend([
466         revision_flag, revision_value,
467         source_repo, dest,
468     ])
470     res = run_and_prefix_output(b'vcs', args,
471                                 extra_env={'PYTHONUNBUFFERED': '1'})
472     if res:
473         # Mitigation for bug 1539681: if for some reason the clone failed,
474         # we just remove it, so that its possible incomplete state doesn't
475         # interfere with cloning in subsequent tasks.
476         shutil.rmtree(dest, ignore_errors=True)
477         sys.exit(res)
479     # Update the current revision hash and ensure that it is well formed.
480     revision = subprocess.check_output(
481         [hg_bin, 'log',
482          '--rev', '.',
483          '--template', '{node}'],
484         cwd=dest,
485         # Triggers text mode on Python 3.
486         universal_newlines=True)
488     assert re.match('^[a-f0-9]{40}$', revision)
490     msg = ("TinderboxPrint:<a href={source_repo}/rev/{revision} "
491            "title='Built from {repo_name} revision {revision}'>"
492            "{revision}</a>\n".format(revision=revision,
493                                      source_repo=source_repo,
494                                      repo_name=source_repo.split('/')[-1]))
496     print_line(b'vcs', msg.encode('utf-8'))
498     return revision
501 def fetch_artifacts():
502     print_line(b'fetches', b'fetching artifacts\n')
504     fetch_content = shutil.which('fetch-content')
505     if not fetch_content and os.environ.get('GECKO_PATH'):
506         fetch_content = os.path.join(os.environ['GECKO_PATH'], 'taskcluster',
507                                      'scripts', 'misc', 'fetch-content')
509     if not fetch_content or not os.path.isfile(fetch_content):
510         fetch_content = os.path.join(os.path.dirname(__file__),
511                                      'fetch-content')
513     if not os.path.isfile(fetch_content):
514         print(FETCH_CONTENT_NOT_FOUND)
515         sys.exit(1)
517     cmd = [sys.executable, '-u', fetch_content, 'task-artifacts']
518     res = run_and_prefix_output(b'fetches', cmd)
519     if res:
520         sys.exit(res)
522     print_line(b'fetches', b'finished fetching artifacts\n')
525 def add_vcs_arguments(parser, project, name):
526     """Adds arguments to ArgumentParser to control VCS options for a project."""
528     parser.add_argument('--%s-checkout' % project,
529                         help='Directory where %s checkout should be created' %
530                              name)
531     parser.add_argument('--%s-sparse-profile' % project,
532                         help='Path to sparse profile for %s checkout' % name)
535 def resolve_checkout_url(base_repo, head_repo):
536     """Resolve the Mercurial URL to perform a checkout against, either the
537     public hg.mozilla.org service or a CI-only regional mirror.
539     The config will be of the form:
540         {
541             "aws/us-west-2": {  # key built from `TASKCLUSTER_WORKER_LOCATION` variable
542                 "rate": 0.5,
543                 "domain": "us-west-2.hgmointernal.net"
544             },
545             "google/us-central1": {...}
546         }
547     """
548     worker_location = os.getenv('TASKCLUSTER_WORKER_LOCATION')
549     if not worker_location:
550         print_line(b'vcs', b'TASKCLUSTER_WORKER_LOCATION environment variable not set; '
551                            b'using public hg.mozilla.org service\n')
552         return base_repo, head_repo
554     try:
555         worker_location = json.loads(worker_location)
556     except json.JSONDecodeError:
557         print_line(b'vcs', b'Could not decode TASKCLUSTER_WORKER_LOCATION environment variable '
558                            b'as JSON. Content: %s\n' % worker_location.encode('utf-8'))
559         print_line(b'vcs', b'using public hg.mozilla.org service\n')
560         return base_repo, head_repo
562     if 'cloud' not in worker_location or 'region' not in worker_location:
563         print_line(b'vcs', b'TASKCLUSTER_WORKER_LOCATION missing required keys; '
564                            b'using public hg.mozilla.org service\n')
565         return base_repo, head_repo
567     config_key = '%(cloud)s/%(region)s' % worker_location
569     try:
570         print_line(b'vcs', b'fetching hgmointernal config from %s\n' %
571                    HGMOINTERNAL_CONFIG_URL.encode('utf-8'))
573         # Get the hgmointernal config Taskcluster secret
574         res = urllib.request.urlopen(HGMOINTERNAL_CONFIG_URL, timeout=10)
575         hgmointernal_config = json.loads(res.read().decode('utf-8'))['secret']
577         # Use public hg service if region not yet supported
578         if config_key not in hgmointernal_config:
579             print_line(b'vcs', b'region %s not yet supported; using public '
580                                b'hg.mozilla.org service\n' % config_key.encode('utf-8'))
582             return base_repo, head_repo
584         # Only send a percentage of traffic to the internal mirror
585         rate = float(hgmointernal_config[config_key]['rate'])
587         if random.random() > rate:
588             print_line(b'vcs', b'hgmointernal rate miss; using '
589                                b'public hg.mozilla.org service\n')
590             return base_repo, head_repo
592         print_line(b'vcs', b'hgmointernal rate hit; cloning from '
593                            b'private hgweb mirror\n')
595         mirror_domain = hgmointernal_config[config_key]['domain']
597         if base_repo and base_repo.startswith('https://hg.mozilla.org'):
598             base_repo = base_repo.replace('hg.mozilla.org', mirror_domain, 1)
600         if head_repo and head_repo.startswith('https://hg.mozilla.org'):
601             head_repo = head_repo.replace('hg.mozilla.org', mirror_domain, 1)
603         return base_repo, head_repo
605     except (KeyError, ValueError):
606         print_line(b'vcs', b'invalid JSON in hgmointernal config; '
607                            b'falling back to public hg.mozilla.org service\n')
609     except (urllib.error.URLError, socket.timeout):
610         print_line(b'vcs', b'Unable to retrieve hgmointernal config using '
611                            b'the secret service; falling back to public hg.mozilla.org '
612                            b'service\n')
614     return base_repo, head_repo
617 def collect_vcs_options(args, project):
618     checkout = getattr(args, '%s_checkout' % project)
619     sparse_profile = getattr(args, '%s_sparse_profile' % project)
621     env_prefix = project.upper()
623     base_repo = os.environ.get('%s_BASE_REPOSITORY' % env_prefix)
624     head_repo = os.environ.get('%s_HEAD_REPOSITORY' % env_prefix)
625     revision = os.environ.get('%s_HEAD_REV' % env_prefix)
626     branch = os.environ.get('%s_HEAD_REF' % env_prefix)
628     store_path = os.environ.get('HG_STORE_PATH')
630     # Expand ~ in some paths.
631     if checkout:
632         checkout = os.path.expanduser(checkout)
633     if store_path:
634         store_path = os.path.expanduser(store_path)
636     # Some callers set the base repository to mozilla-central for historical
637     # reasons. Switch to mozilla-unified because robustcheckout works best
638     # with it.
639     if base_repo == 'https://hg.mozilla.org/mozilla-central':
640         base_repo = 'https://hg.mozilla.org/mozilla-unified'
642     # No need to check the hgmointernal config if we aren't performing
643     # a checkout.
644     if checkout:
645         base_repo, head_repo = resolve_checkout_url(base_repo, head_repo)
647     return {
648         'store-path': store_path,
649         'project': project,
650         'env-prefix': env_prefix,
651         'checkout': checkout,
652         'sparse-profile': sparse_profile,
653         'base-repo': base_repo,
654         'head-repo': head_repo,
655         'revision': revision,
656         'branch': branch,
657     }
660 def vcs_checkout_from_args(args, project):
661     options = collect_vcs_options(args, project)
663     if not options['checkout']:
664         if options['branch'] and not options['revision']:
665             print('task should be defined in terms of non-symbolic revision')
666             sys.exit(1)
667         return
669     os.environ['%s_HEAD_REV' % options['env-prefix']] = vcs_checkout(
670         options['head-repo'],
671         options['checkout'],
672         options['store-path'],
673         base_repo=options['base-repo'],
674         revision=options['revision'],
675         fetch_hgfingerprint=args.fetch_hgfingerprint,
676         branch=options['branch'],
677         sparse_profile=options['sparse-profile'])
680 def maybe_run_resource_monitoring():
681     """Run the resource monitor if available.
683     Discussion in https://github.com/taskcluster/taskcluster-rfcs/pull/160
684     and https://bugzil.la/1648051
685     """
686     if 'MOZ_FETCHES' not in os.environ:
687         return
688     if 'RESOURCE_MONITOR_OUTPUT' not in os.environ:
689         return
691     prefix = b'resource_monitor'
693     executable = '{}/resource-monitor/resource-monitor{}'.format(
694         os.environ.get('MOZ_FETCHES_DIR'), '.exe' if IS_WINDOWS else '')
696     if not os.path.exists(executable) or not os.access(executable, os.X_OK):
697         print_line(prefix, b"%s not executable\n" % executable.encode('utf-8'))
698         return
699     args = [
700         executable,
701         '-process',
702         str(os.getpid()),
703         '-output',
704         os.environ["RESOURCE_MONITOR_OUTPUT"],
705     ]
706     print_line(prefix, b"Resource monitor starting: %s\n" % str(args).encode('utf-8'))
707     # Avoid environment variables the payload doesn't need.
708     del os.environ['RESOURCE_MONITOR_OUTPUT']
710     # Without CREATE_NEW_PROCESS_GROUP Windows signals will attempt to kill run-task, too.
711     process = subprocess.Popen(args,
712                                bufsize=0,
713                                stdout=subprocess.PIPE,
714                                stderr=subprocess.STDOUT,
715                                creationflags=subprocess.CREATE_NEW_PROCESS_GROUP if IS_WINDOWS else 0,
716                                cwd=os.getcwd())
718     def capture_output():
719         fh = io.TextIOWrapper(process.stdout, encoding='latin1')
720         while True:
721             data = fh.readline().encode('latin1')
722             if data == b'':
723                 break
724             print_line(prefix, data)
726     monitor_process = Thread(target=capture_output)
727     monitor_process.start()
728     return process
731 def main(args):
732     print_line(b'setup', b'run-task started in %s\n' % os.getcwd().encode('utf-8'))
733     running_as_root = IS_POSIX and os.getuid() == 0
735     # Set a reasonable limit to the number of open files.
736     # Running under docker inherits the system defaults, which are not subject
737     # to the "standard" limits set by pam_limits.so, and while they work well
738     # for servers that may receive a lot of connections, they cause performance
739     # problems for things that close file descriptors before forking (for good
740     # reasons), like python's `subprocess.Popen(..., close_fds=True)` (and while
741     # the default was close_fds=False in python2, that changed in python3).
742     # In some cases, Firefox does the same thing when spawning subprocesses.
743     # Processes spawned by this one will inherit the limit set here.
744     try:
745         import resource
746         # Keep the hard limit the same, though, allowing processes to change their
747         # soft limit if they need to (Firefox does, for instance).
748         (soft, hard) = resource.getrlimit(resource.RLIMIT_NOFILE)
749         limit = os.environ.get('MOZ_LIMIT_NOFILE')
750         if limit:
751             limit = int(limit)
752         else:
753             # If no explicit limit is given, use 1024 if it's less than the current
754             # soft limit. For instance, the default on macOS is 256, so we'd pick
755             # that rather than 1024.
756             limit = min(soft, 1024)
757         # Now apply the limit, if it's different from the original one.
758         if limit != soft:
759             resource.setrlimit(resource.RLIMIT_NOFILE, (limit, hard))
760     except ImportError:
761         # The resource module is UNIX only.
762         pass
764     # Arguments up to '--' are ours. After are for the main task
765     # to be executed.
766     try:
767         i = args.index('--')
768         our_args = args[0:i]
769         task_args = args[i + 1:]
770     except ValueError:
771         our_args = args
772         task_args = []
774     parser = argparse.ArgumentParser()
775     parser.add_argument('--user', default='worker', help='user to run as')
776     parser.add_argument('--group', default='worker', help='group to run as')
777     parser.add_argument('--task-cwd', help='directory to run the provided command in')
779     add_vcs_arguments(parser, 'gecko', 'Firefox')
780     add_vcs_arguments(parser, 'comm', 'Comm')
782     parser.add_argument('--fetch-hgfingerprint', action='store_true',
783                         help='Fetch the latest hgfingerprint from the secrets store, '
784                         'using the taskclsuerProxy')
786     args = parser.parse_args(our_args)
788     uid = gid = gids = None
789     if IS_POSIX and running_as_root:
790         user, group, gids = get_posix_user_group(args.user, args.group)
791         uid = user.pw_uid
792         gid = group.gr_gid
794     if running_as_root and os.path.exists("/dev/kvm"):
795         # Ensure kvm permissions for worker, required for Android x86
796         st = os.stat("/dev/kvm")
797         os.chmod("/dev/kvm", st.st_mode | 0o666)
799     # Validate caches.
800     #
801     # Taskgraph should pass in a list of paths that are caches via an
802     # environment variable (which we don't want to pass down to child
803     # processes).
805     if 'TASKCLUSTER_CACHES' in os.environ:
806         caches = os.environ['TASKCLUSTER_CACHES'].split(';')
807         del os.environ['TASKCLUSTER_CACHES']
808     else:
809         caches = []
811     if 'TASKCLUSTER_UNTRUSTED_CACHES' in os.environ:
812         untrusted_caches = True
813         del os.environ['TASKCLUSTER_UNTRUSTED_CACHES']
814     else:
815         untrusted_caches = False
817     for cache in caches:
818         if not os.path.isdir(cache):
819             print('error: cache %s is not a directory; this should never '
820                   'happen' % cache)
821             return 1
823         if running_as_root:
824                 purge = configure_cache_posix(cache, user, group, untrusted_caches,
825                                           running_as_root)
827                 if purge:
828                     return EXIT_PURGE_CACHE
830     if 'TASKCLUSTER_VOLUMES' in os.environ:
831         volumes = os.environ['TASKCLUSTER_VOLUMES'].split(';')
832         del os.environ['TASKCLUSTER_VOLUMES']
833     else:
834         volumes = []
836     if volumes and not IS_POSIX:
837         print('assertion failed: volumes not expected on Windows')
838         return 1
840     # Sanitize volumes.
841     for volume in volumes:
842         # If a volume is a cache, it was dealt with above.
843         if volume in caches:
844             print_line(b'volume', b'volume %s is a cache\n' %
845                        volume.encode('utf-8'))
846             continue
848         if running_as_root:
849             configure_volume_posix(volume, user, group, running_as_root)
851     all_caches_and_volumes = set(map(os.path.normpath, caches))
852     all_caches_and_volumes |= set(map(os.path.normpath, volumes))
854     def path_in_cache_or_volume(path):
855         path = os.path.normpath(path)
857         while path:
858             if path in all_caches_and_volumes:
859                 return True
861             path, child = os.path.split(path)
862             if not child:
863                 break
865         return False
867     def prepare_checkout_dir(checkout):
868         if not checkout:
869             return
871         # The checkout path becomes the working directory. Since there are
872         # special cache files in the cache's root directory and working
873         # directory purging could blow them away, disallow this scenario.
874         if os.path.exists(os.path.join(checkout, '.cacherequires')):
875             print('error: cannot perform vcs checkout into cache root: %s' %
876                   checkout)
877             sys.exit(1)
879         # TODO given the performance implications, consider making this a fatal
880         # error.
881         if not path_in_cache_or_volume(checkout):
882             print_line(b'vcs', b'WARNING: vcs checkout path (%s) not in cache '
883                                b'or volume; performance will likely suffer\n' %
884                                checkout.encode('utf-8'))
886         # Ensure the directory for the source checkout exists.
887         try:
888             os.makedirs(os.path.dirname(checkout))
889         except OSError as e:
890             if e.errno != errno.EEXIST:
891                 raise
893         # And that it is owned by the appropriate user/group.
894         if running_as_root:
895             os.chown(os.path.dirname(checkout), uid, gid)
897     def prepare_hg_store_path():
898         # And ensure the shared store path exists and has proper permissions.
899         if 'HG_STORE_PATH' not in os.environ:
900             print('error: HG_STORE_PATH environment variable not set')
901             sys.exit(1)
903         store_path = os.environ['HG_STORE_PATH']
905         if not path_in_cache_or_volume(store_path):
906             print_line(b'vcs', b'WARNING: HG_STORE_PATH (%s) not in cache or '
907                                b'volume; performance will likely suffer\n' %
908                                store_path.encode('utf-8'))
910         try:
911             os.makedirs(store_path)
912         except OSError as e:
913             if e.errno != errno.EEXIST:
914                 raise
916         if running_as_root:
917             os.chown(store_path, uid, gid)
919     prepare_checkout_dir(args.gecko_checkout)
920     if args.gecko_checkout or args.comm_checkout:
921         prepare_hg_store_path()
923     if IS_POSIX and running_as_root:
924         # Drop permissions to requested user.
925         # This code is modeled after what `sudo` was observed to do in a Docker
926         # container. We do not bother calling setrlimit() because containers have
927         # their own limits.
928         print_line(b'setup', b'running as %s:%s\n' % (
929             args.user.encode('utf-8'), args.group.encode('utf-8')))
931         os.setgroups(gids)
932         os.umask(0o22)
933         os.setresgid(gid, gid, gid)
934         os.setresuid(uid, uid, uid)
936     vcs_checkout_from_args(args, 'gecko')
937     vcs_checkout_from_args(args, 'comm')
939     resource_process = None
941     try:
942         for k in ('GECKO_PATH', 'MOZ_FETCHES_DIR', 'UPLOAD_DIR', 'MOZ_PYTHON_HOME'):
943             if k in os.environ:
944                 # Normalize paths to use forward slashes. Some shell scripts
945                 # tolerate that better on Windows.
946                 os.environ[k] = os.path.abspath(os.environ[k]).replace(os.sep, '/')
947                 print_line(b'setup', b'%s is %s\n' % (
948                     k.encode('utf-8'),
949                     os.environ[k].encode('utf-8')))
951         if 'MOZ_FETCHES' in os.environ:
952             fetch_artifacts()
954             # If Python is a fetch dependency, add it to the PATH and setting
955             # the mozilla-specific MOZ_PYTHON_HOME to relocate binaries.
956             if 'MOZ_PYTHON_HOME' in os.environ:
958                 print_line(b'setup',
959                            b'Setting up local python environment\n')
960                 prev = [os.environ['PATH']] if 'PATH' in os.environ else []
962                 moz_python_home = os.environ['MOZ_PYTHON_HOME']
963                 if IS_WINDOWS:
964                     ext = '.exe'
965                     moz_python_bindir = moz_python_home
966                 else:
967                     ext = ''
968                     moz_python_bindir = moz_python_home + '/bin'
971                 new = os.environ['PATH'] = os.pathsep.join([moz_python_bindir]
972                                                            + prev)
974                 # Relocate the python binary. Standard way uses PYTHONHOME, but
975                 # this conflicts with system python (e.g. used by hg) so we
976                 # maintain a small patch to use MOZPYTHONHOME instead.
977                 os.environ['MOZPYTHONHOME'] = moz_python_home
979                 pyinterp = os.path.join(moz_python_bindir, f'python3{ext}')
980                 # just a sanity check
981                 if not os.path.exists(pyinterp):
982                     raise RuntimeError("Inconsistent Python installation: "
983                                        "archive found, but no python3 binary "
984                                        "detected")
986                 if IS_MACOSX:
987                     # On OSX, we may not have access to the system certificate,
988                     # so use the certifi ones.
989                     certifi_cert_file = subprocess.check_output(
990                     [pyinterp, '-c',
991                      'import certifi; print(certifi.where())'],
992                     text=True
993                     )
994                     os.environ['SSL_CERT_FILE'] = certifi_cert_file.strip()
995                     print_line(b'setup',
996                                b'patching ssl certificate\n')
998                 print_line(b'setup',
999                            b'updated PATH with python artifact: '
1000                            + new.encode() + b'\n')
1003         resource_process = maybe_run_resource_monitoring()
1005         return run_and_prefix_output(b'task', task_args, cwd=args.task_cwd)
1006     finally:
1007         if resource_process:
1008             print_line(b'resource_monitor', b'terminating\n')
1009             if IS_WINDOWS:
1010                 # .terminate() on Windows is not a graceful shutdown, due to
1011                 # differences in signals. CTRL_BREAK_EVENT will work provided
1012                 # the subprocess is in a different process group, so this script
1013                 # isn't also killed.
1014                 os.kill(resource_process.pid, signal.CTRL_BREAK_EVENT)
1015             else:
1016                 resource_process.terminate()
1017             resource_process.wait()
1020 if __name__ == '__main__':
1021     sys.exit(main(sys.argv[1:]))