Bug 1890689 accumulate input in LargerReceiverBlockSizeThanDesiredBuffering GTest...
[gecko.git] / taskcluster / gecko_taskgraph / morph.py
blob42fe4597fac9343ef5c2da089cacfde70c3aee5f
1 # This Source Code Form is subject to the terms of the Mozilla Public
2 # License, v. 2.0. If a copy of the MPL was not distributed with this
3 # file, You can obtain one at http://mozilla.org/MPL/2.0/.
5 """
6 Graph morphs are modifications to task-graphs that take place *after* the
7 optimization phase.
9 These graph morphs are largely invisible to developers running `./mach`
10 locally, so they should be limited to changes that do not modify the meaning of
11 the graph.
12 """
14 # Note that the translation of `{'task-reference': '..'}` and
15 # `artifact-reference` are handled in the optimization phase (since
16 # optimization involves dealing with taskIds directly). Similarly,
17 # `{'relative-datestamp': '..'}` is handled at the last possible moment during
18 # task creation.
21 import copy
22 import logging
23 import os
24 import re
26 from slugid import nice as slugid
27 from taskgraph.graph import Graph
28 from taskgraph.morph import register_morph
29 from taskgraph.task import Task
30 from taskgraph.taskgraph import TaskGraph
32 from .util.workertypes import get_worker_type
34 here = os.path.abspath(os.path.dirname(__file__))
35 logger = logging.getLogger(__name__)
36 MAX_ROUTES = 10
39 def amend_taskgraph(taskgraph, label_to_taskid, to_add):
40 """Add the given tasks to the taskgraph, returning a new taskgraph"""
41 new_tasks = taskgraph.tasks.copy()
42 new_edges = set(taskgraph.graph.edges)
43 for task in to_add:
44 new_tasks[task.task_id] = task
45 assert task.label not in label_to_taskid
46 label_to_taskid[task.label] = task.task_id
47 for depname, dep in task.dependencies.items():
48 new_edges.add((task.task_id, dep, depname))
50 taskgraph = TaskGraph(new_tasks, Graph(set(new_tasks), new_edges))
51 return taskgraph, label_to_taskid
54 def derive_misc_task(
55 target_task,
56 purpose,
57 image,
58 taskgraph,
59 label_to_taskid,
60 parameters,
61 graph_config,
62 dependencies,
64 """Create the shell of a task that depends on `dependencies` and on the given docker
65 image."""
66 label = f"{purpose}-{target_task.label}"
68 # this is why all docker image tasks are included in the target task graph: we
69 # need to find them in label_to_taskid, even if nothing else required them
70 image_taskid = label_to_taskid["docker-image-" + image]
72 provisioner_id, worker_type = get_worker_type(
73 graph_config,
74 parameters,
75 "misc",
78 deps = copy.copy(dependencies)
79 deps["docker-image"] = image_taskid
81 task_def = {
82 "provisionerId": provisioner_id,
83 "workerType": worker_type,
84 "dependencies": [d for d in deps.values()],
85 "created": {"relative-datestamp": "0 seconds"},
86 "deadline": target_task.task["deadline"],
87 # no point existing past the parent task's deadline
88 "expires": target_task.task["deadline"],
89 "metadata": {
90 "name": label,
91 "description": f"{purpose} for {target_task.description}",
92 "owner": target_task.task["metadata"]["owner"],
93 "source": target_task.task["metadata"]["source"],
95 "scopes": [],
96 "payload": {
97 "image": {
98 "path": "public/image.tar.zst",
99 "taskId": image_taskid,
100 "type": "task-image",
102 "features": {"taskclusterProxy": True},
103 "maxRunTime": 600,
107 if image_taskid not in taskgraph.tasks:
108 # The task above depends on the replaced docker-image not one in
109 # this current graph.
110 del deps["docker-image"]
112 task = Task(
113 kind="misc",
114 label=label,
115 attributes={},
116 task=task_def,
117 dependencies=deps,
119 task.task_id = slugid()
120 return task
123 # these regular expressions capture route prefixes for which we have a star
124 # scope, allowing them to be summarized. Each should correspond to a star scope
125 # in each Gecko `assume:repo:hg.mozilla.org/...` role.
126 SCOPE_SUMMARY_REGEXPS = [
127 re.compile(r"(index:insert-task:docker\.images\.v1\.[^.]*\.).*"),
128 re.compile(r"(index:insert-task:gecko\.v2\.[^.]*\.).*"),
129 re.compile(r"(index:insert-task:comm\.v2\.[^.]*\.).*"),
133 def make_index_task(
134 parent_task,
135 taskgraph,
136 label_to_taskid,
137 parameters,
138 graph_config,
139 index_paths,
140 index_rank,
141 purpose,
142 dependencies,
144 task = derive_misc_task(
145 parent_task,
146 purpose,
147 "index-task",
148 taskgraph,
149 label_to_taskid,
150 parameters,
151 graph_config,
152 dependencies,
155 # we need to "summarize" the scopes, otherwise a particularly
156 # namespace-heavy index task might have more scopes than can fit in a
157 # temporary credential.
158 scopes = set()
159 for path in index_paths:
160 scope = f"index:insert-task:{path}"
161 for summ_re in SCOPE_SUMMARY_REGEXPS:
162 match = summ_re.match(scope)
163 if match:
164 scope = match.group(1) + "*"
165 break
166 scopes.add(scope)
167 task.task["scopes"] = sorted(scopes)
169 task.task["payload"]["command"] = ["insert-indexes.js"] + index_paths
170 task.task["payload"]["env"] = {
171 "TARGET_TASKID": parent_task.task_id,
172 "INDEX_RANK": index_rank,
174 return task
177 @register_morph
178 def add_index_tasks(taskgraph, label_to_taskid, parameters, graph_config):
180 The TaskCluster queue only allows 10 routes on a task, but we have tasks
181 with many more routes, for purposes of indexing. This graph morph adds
182 "index tasks" that depend on such tasks and do the index insertions
183 directly, avoiding the limits on task.routes.
185 logger.debug("Morphing: adding index tasks")
187 # Add indexes for tasks that exceed MAX_ROUTES.
188 added = []
189 for label, task in taskgraph.tasks.items():
190 if len(task.task.get("routes", [])) <= MAX_ROUTES:
191 continue
192 index_paths = [
193 r.split(".", 1)[1] for r in task.task["routes"] if r.startswith("index.")
195 task.task["routes"] = [
196 r for r in task.task["routes"] if not r.startswith("index.")
198 added.append(
199 make_index_task(
200 task,
201 taskgraph,
202 label_to_taskid,
203 parameters,
204 graph_config,
205 index_paths=index_paths,
206 index_rank=task.task.get("extra", {}).get("index", {}).get("rank", 0),
207 purpose="index-task",
208 dependencies={"parent": task.task_id},
212 if added:
213 taskgraph, label_to_taskid = amend_taskgraph(taskgraph, label_to_taskid, added)
214 logger.info(f"Added {len(added)} index tasks")
216 return taskgraph, label_to_taskid
219 @register_morph
220 def add_eager_cache_index_tasks(taskgraph, label_to_taskid, parameters, graph_config):
222 Some tasks (e.g. cached tasks) we want to exist in the index before they even
223 run/complete. Our current use is to allow us to depend on an unfinished cached
224 task in future pushes. This graph morph adds "eager-index tasks" that depend on
225 the decision task and do the index insertions directly, which does not need to
226 wait on the pointed at task to complete.
228 logger.debug("Morphing: Adding eager cached index's")
230 added = []
231 for label, task in taskgraph.tasks.items():
232 if "eager_indexes" not in task.attributes:
233 continue
234 eager_indexes = task.attributes["eager_indexes"]
235 added.append(
236 make_index_task(
237 task,
238 taskgraph,
239 label_to_taskid,
240 parameters,
241 graph_config,
242 index_paths=eager_indexes,
243 index_rank=0, # Be sure complete tasks get priority
244 purpose="eager-index",
245 dependencies={},
249 if added:
250 taskgraph, label_to_taskid = amend_taskgraph(taskgraph, label_to_taskid, added)
251 logger.info(f"Added {len(added)} eager index tasks")
252 return taskgraph, label_to_taskid
255 @register_morph
256 def add_try_task_duplicates(taskgraph, label_to_taskid, parameters, graph_config):
257 return _add_try_task_duplicates(
258 taskgraph, label_to_taskid, parameters, graph_config
262 # this shim function exists so we can call it from the unittests.
263 # this works around an issue with
264 # third_party/python/taskcluster_taskgraph/taskgraph/morph.py#40
265 def _add_try_task_duplicates(taskgraph, label_to_taskid, parameters, graph_config):
266 try_config = parameters.get("try_task_config", {})
267 tasks = try_config.get("tasks", [])
268 glob_tasks = {x.strip("-*") for x in tasks if x.endswith("-*")}
269 tasks = set(tasks) - glob_tasks
271 rebuild = try_config.get("rebuild")
272 if rebuild:
273 for task in taskgraph.tasks.values():
274 chunk_index = -1
275 if task.label.endswith("-cf"):
276 chunk_index = -2
277 label_parts = task.label.split("-")
278 label_no_chunk = "-".join(label_parts[:chunk_index])
280 if label_parts[chunk_index].isnumeric() and label_no_chunk in glob_tasks:
281 task.attributes["task_duplicates"] = rebuild
282 elif task.label in tasks:
283 task.attributes["task_duplicates"] = rebuild
284 return taskgraph, label_to_taskid