Bug 1494162 - Part 6: Lazy load the modules in the ElementContainer. r=pbro
[gecko.git] / taskcluster / taskgraph / create.py
blob0dae7f469780365a2e8a04ba3fe2852082974bc8
1 # This Source Code Form is subject to the terms of the Mozilla Public
2 # License, v. 2.0. If a copy of the MPL was not distributed with this
3 # file, You can obtain one at http://mozilla.org/MPL/2.0/.
5 from __future__ import absolute_import, print_function, unicode_literals
7 import concurrent.futures as futures
8 import requests
9 import requests.adapters
10 import json
11 import os
12 import sys
13 import logging
15 from slugid import nice as slugid
16 from taskgraph.util.parameterization import resolve_timestamps
17 from taskgraph.util.time import current_json_time
19 logger = logging.getLogger(__name__)
21 # the maximum number of parallel createTask calls to make
22 CONCURRENCY = 50
24 # this is set to true for `mach taskgraph action-callback --test`
25 testing = False
28 def create_tasks(taskgraph, label_to_taskid, params, decision_task_id=None):
29 taskid_to_label = {t: l for l, t in label_to_taskid.iteritems()}
31 session = requests.Session()
33 # Default HTTPAdapter uses 10 connections. Mount custom adapter to increase
34 # that limit. Connections are established as needed, so using a large value
35 # should not negatively impact performance.
36 http_adapter = requests.adapters.HTTPAdapter(pool_connections=CONCURRENCY,
37 pool_maxsize=CONCURRENCY)
38 session.mount('https://', http_adapter)
39 session.mount('http://', http_adapter)
41 decision_task_id = decision_task_id or os.environ.get('TASK_ID')
43 # when running as an actual decision task, we use the decision task's
44 # taskId as the taskGroupId. The process that created the decision task
45 # helpfully placed it in this same taskGroup. If there is no $TASK_ID,
46 # fall back to a slugid
47 task_group_id = decision_task_id or slugid()
48 scheduler_id = 'gecko-level-{}'.format(params['level'])
50 # Add the taskGroupId, schedulerId and optionally the decision task
51 # dependency
52 for task_id in taskgraph.graph.nodes:
53 task_def = taskgraph.tasks[task_id].task
55 # if this task has no dependencies *within* this taskgraph, make it
56 # depend on this decision task. If it has another dependency within
57 # the taskgraph, then it already implicitly depends on the decision
58 # task. The result is that tasks do not start immediately. if this
59 # loop fails halfway through, none of the already-created tasks run.
60 if decision_task_id:
61 if not any(t in taskgraph.tasks for t in task_def.get('dependencies', [])):
62 task_def.setdefault('dependencies', []).append(decision_task_id)
64 task_def['taskGroupId'] = task_group_id
65 task_def['schedulerId'] = scheduler_id
67 # If `testing` is True, then run without parallelization
68 concurrency = CONCURRENCY if not testing else 1
69 with futures.ThreadPoolExecutor(concurrency) as e:
70 fs = {}
72 # We can't submit a task until its dependencies have been submitted.
73 # So our strategy is to walk the graph and submit tasks once all
74 # their dependencies have been submitted.
75 tasklist = set(taskgraph.graph.visit_postorder())
76 alltasks = tasklist.copy()
78 def schedule_tasks():
79 # bail out early if any futures have failed
80 if any(f.done() and f.exception() for f in fs.values()):
81 return
83 to_remove = set()
84 new = set()
86 def submit(task_id, label, task_def):
87 fut = e.submit(create_task, session, task_id, label, task_def)
88 new.add(fut)
89 fs[task_id] = fut
91 for task_id in tasklist:
92 task_def = taskgraph.tasks[task_id].task
93 # If we haven't finished submitting all our dependencies yet,
94 # come back to this later.
95 # Some dependencies aren't in our graph, so make sure to filter
96 # those out
97 deps = set(task_def.get('dependencies', [])) & alltasks
98 if any((d not in fs or not fs[d].done()) for d in deps):
99 continue
101 submit(task_id, taskid_to_label[task_id], task_def)
102 to_remove.add(task_id)
104 # Schedule tasks as many times as task_duplicates indicates
105 attributes = taskgraph.tasks[task_id].attributes
106 for i in range(1, attributes.get('task_duplicates', 1)):
107 # We use slugid() since we want a distinct task id
108 submit(slugid(), taskid_to_label[task_id], task_def)
109 tasklist.difference_update(to_remove)
111 # as each of those futures complete, try to schedule more tasks
112 for f in futures.as_completed(new):
113 schedule_tasks()
115 # start scheduling tasks and run until everything is scheduled
116 schedule_tasks()
118 # check the result of each future, raising an exception if it failed
119 for f in futures.as_completed(fs.values()):
120 f.result()
123 def create_task(session, task_id, label, task_def):
124 # create the task using 'http://taskcluster/queue', which is proxied to the queue service
125 # with credentials appropriate to this job.
127 # Resolve timestamps
128 now = current_json_time(datetime_format=True)
129 task_def = resolve_timestamps(now, task_def)
131 if testing:
132 json.dump([task_id, task_def], sys.stdout,
133 sort_keys=True, indent=4, separators=(',', ': '))
134 # add a newline
135 print("")
136 return
138 logger.debug("Creating task with taskId {} for {}".format(task_id, label))
139 res = session.put('http://taskcluster/queue/v1/task/{}'.format(task_id),
140 data=json.dumps(task_def))
141 if res.status_code != 200:
142 try:
143 logger.error(res.json()['message'])
144 except Exception:
145 logger.error(res.text)
146 res.raise_for_status()