1 # This Source Code Form is subject to the terms of the Mozilla Public
2 # License, v. 2.0. If a copy of the MPL was not distributed with this
3 # file, You can obtain one at http://mozilla.org/MPL/2.0/.
10 from taskgraph
.config
import GraphConfig
11 from taskgraph
.parameters
import parameters_loader
12 from taskgraph
.util
.yaml
import load_yaml
14 from . import filter_tasks
15 from .graph
import Graph
16 from .morph
import morph
17 from .optimize
import optimize_task_graph
18 from .task
import Task
19 from .taskgraph
import TaskGraph
20 from .transforms
.base
import TransformSequence
, TransformConfig
21 from .util
.python_path
import find_object
22 from .util
.verify
import (
26 from .config
import load_graph_config
28 logger
= logging
.getLogger(__name__
)
31 class KindNotFound(Exception):
33 Raised when trying to load kind from a directory without a kind.yml.
40 name
= attr
.ib(type=str)
41 path
= attr
.ib(type=str)
42 config
= attr
.ib(type=dict)
43 graph_config
= attr
.ib(type=GraphConfig
)
45 def _get_loader(self
):
47 loader
= self
.config
["loader"]
49 raise KeyError(f
"{self.path!r} does not define `loader`")
50 return find_object(loader
)
52 def load_tasks(self
, parameters
, loaded_tasks
, write_artifacts
):
53 loader
= self
._get
_loader
()
54 config
= copy
.deepcopy(self
.config
)
56 kind_dependencies
= config
.get("kind-dependencies", [])
57 kind_dependencies_tasks
= {
58 task
.label
: task
for task
in loaded_tasks
if task
.kind
in kind_dependencies
61 inputs
= loader(self
.name
, self
.path
, config
, parameters
, loaded_tasks
)
63 transforms
= TransformSequence()
64 for xform_path
in config
["transforms"]:
65 transform
= find_object(xform_path
)
66 transforms
.add(transform
)
68 # perform the transformations on the loaded inputs
69 trans_config
= TransformConfig(
74 kind_dependencies_tasks
,
76 write_artifacts
=write_artifacts
,
81 label
=task_dict
["label"],
82 description
=task_dict
["description"],
83 attributes
=task_dict
["attributes"],
84 task
=task_dict
["task"],
85 optimization
=task_dict
.get("optimization"),
86 dependencies
=task_dict
.get("dependencies"),
87 soft_dependencies
=task_dict
.get("soft-dependencies"),
88 if_dependencies
=task_dict
.get("if-dependencies"),
89 release_artifacts
=task_dict
.get("release-artifacts"),
91 for task_dict
in transforms(trans_config
, inputs
)
96 def load(cls
, root_dir
, graph_config
, kind_name
):
97 path
= os
.path
.join(root_dir
, kind_name
)
98 kind_yml
= os
.path
.join(path
, "kind.yml")
99 if not os
.path
.exists(kind_yml
):
100 raise KindNotFound(kind_yml
)
102 logger
.debug(f
"loading kind `{kind_name}` from `{path}`")
103 config
= load_yaml(kind_yml
)
105 return cls(kind_name
, path
, config
, graph_config
)
108 class TaskGraphGenerator
:
110 The central controller for taskgraph. This handles all phases of graph
111 generation. The task is generated from all of the kinds defined in
112 subdirectories of the generator's root directory.
114 Access to the results of this generation, as well as intermediate values at
115 various phases of generation, is available via properties. This encourages
116 the provision of all generation inputs at instance construction time.
119 # Task-graph generation is implemented as a Python generator that yields
120 # each "phase" of generation. This allows some mach subcommands to short-
121 # circuit generation of the entire graph by never completing the generator.
127 decision_task_id
="DECISION-TASK",
128 write_artifacts
=False,
131 @param root_dir: root directory, with subdirectories for each kind
132 @param paramaters: parameters for this task-graph generation, or callable
133 taking a `GraphConfig` and returning parameters
134 @type parameters: Union[Parameters, Callable[[GraphConfig], Parameters]]
137 root_dir
= "taskcluster/ci"
138 self
.root_dir
= root_dir
139 self
._parameters
= parameters
140 self
._decision
_task
_id
= decision_task_id
141 self
._write
_artifacts
= write_artifacts
143 # start the generator
144 self
._run
= self
._run
()
145 self
._run
_results
= {}
148 def parameters(self
):
150 The properties used for this graph.
154 return self
._run
_until
("parameters")
157 def full_task_set(self
):
159 The full task set: all tasks defined by any kind (a graph without edges)
163 return self
._run
_until
("full_task_set")
166 def full_task_graph(self
):
168 The full task graph: the full task set, with edges representing
173 return self
._run
_until
("full_task_graph")
176 def target_task_set(self
):
178 The set of targetted tasks (a graph without edges)
182 return self
._run
_until
("target_task_set")
185 def target_task_graph(self
):
187 The set of targetted tasks and all of their dependencies
191 return self
._run
_until
("target_task_graph")
194 def optimized_task_graph(self
):
196 The set of targetted tasks and all of their dependencies; tasks that
197 have been optimized out are either omitted or replaced with a Task
198 instance containing only a task_id.
202 return self
._run
_until
("optimized_task_graph")
205 def label_to_taskid(self
):
207 A dictionary mapping task label to assigned taskId. This property helps
208 in interpreting `optimized_task_graph`.
212 return self
._run
_until
("label_to_taskid")
215 def morphed_task_graph(self
):
217 The optimized task graph, with any subsequent morphs applied. This graph
218 will have the same meaning as the optimized task graph, but be in a form
219 more palatable to TaskCluster.
223 return self
._run
_until
("morphed_task_graph")
226 def graph_config(self
):
228 The configuration for this graph.
232 return self
._run
_until
("graph_config")
234 def _load_kinds(self
, graph_config
, target_kind
=None):
236 # docker-image is an implicit dependency that never appears in
238 queue
= [target_kind
, "docker-image"]
241 kind_name
= queue
.pop()
242 if kind_name
in seen_kinds
:
244 seen_kinds
.add(kind_name
)
245 kind
= Kind
.load(self
.root_dir
, graph_config
, kind_name
)
247 queue
.extend(kind
.config
.get("kind-dependencies", []))
249 for kind_name
in os
.listdir(self
.root_dir
):
251 yield Kind
.load(self
.root_dir
, graph_config
, kind_name
)
256 logger
.info("Loading graph configuration.")
257 graph_config
= load_graph_config(self
.root_dir
)
259 yield ("graph_config", graph_config
)
261 graph_config
.register()
263 if callable(self
._parameters
):
264 parameters
= self
._parameters
(graph_config
)
266 parameters
= self
._parameters
267 self
.verify_parameters(parameters
)
269 logger
.info("Using {}".format(parameters
))
270 logger
.debug("Dumping parameters:\n{}".format(repr(parameters
)))
272 filters
= parameters
.get("filters", [])
273 # Always add legacy target tasks method until we deprecate that API.
274 if "target_tasks_method" not in filters
:
275 filters
.insert(0, "target_tasks_method")
276 filters
= [filter_tasks
.filter_task_functions
[f
] for f
in filters
]
278 yield ("parameters", parameters
)
280 logger
.info("Loading kinds")
281 # put the kinds into a graph and sort topologically so that kinds are loaded
283 if parameters
.get("target-kind"):
284 target_kind
= parameters
["target-kind"]
286 "Limiting kinds to {target_kind} and dependencies".format(
287 target_kind
=target_kind
292 for kind
in self
._load
_kinds
(graph_config
, parameters
.get("target-kind"))
294 self
.verify_kinds(kinds
)
297 for kind
in kinds
.values():
298 for dep
in kind
.config
.get("kind-dependencies", []):
299 edges
.add((kind
.name
, dep
, "kind-dependency"))
300 kind_graph
= Graph(set(kinds
), edges
)
302 if parameters
.get("target-kind"):
303 kind_graph
= kind_graph
.transitive_closure({target_kind
, "docker-image"})
305 logger
.info("Generating full task set")
307 for kind_name
in kind_graph
.visit_postorder():
308 logger
.debug(f
"Loading tasks for kind {kind_name}")
309 kind
= kinds
[kind_name
]
311 new_tasks
= kind
.load_tasks(
313 list(all_tasks
.values()),
314 self
._write
_artifacts
,
317 logger
.exception(f
"Error loading tasks for kind {kind_name}:")
319 for task
in new_tasks
:
320 if task
.label
in all_tasks
:
321 raise Exception("duplicate tasks with label " + task
.label
)
322 all_tasks
[task
.label
] = task
323 logger
.info(f
"Generated {len(new_tasks)} tasks for kind {kind_name}")
324 full_task_set
= TaskGraph(all_tasks
, Graph(set(all_tasks
), set()))
325 self
.verify_attributes(all_tasks
)
326 self
.verify_run_using()
327 yield verifications("full_task_set", full_task_set
, graph_config
, parameters
)
329 logger
.info("Generating full task graph")
331 for t
in full_task_set
:
332 for depname
, dep
in t
.dependencies
.items():
333 edges
.add((t
.label
, dep
, depname
))
335 full_task_graph
= TaskGraph(all_tasks
, Graph(full_task_set
.graph
.nodes
, edges
))
337 "Full task graph contains %d tasks and %d dependencies"
338 % (len(full_task_set
.graph
.nodes
), len(edges
))
341 "full_task_graph", full_task_graph
, graph_config
, parameters
344 logger
.info("Generating target task set")
345 target_task_set
= TaskGraph(
346 dict(all_tasks
), Graph(set(all_tasks
.keys()), set())
349 old_len
= len(target_task_set
.graph
.nodes
)
350 target_tasks
= set(fltr(target_task_set
, parameters
, graph_config
))
351 target_task_set
= TaskGraph(
352 {l
: all_tasks
[l
] for l
in target_tasks
}, Graph(target_tasks
, set())
355 "Filter %s pruned %d tasks (%d remain)"
356 % (fltr
.__name
__, old_len
- len(target_tasks
), len(target_tasks
))
360 "target_task_set", target_task_set
, graph_config
, parameters
363 logger
.info("Generating target task graph")
364 # include all docker-image build tasks here, in case they are needed for a graph morph
365 docker_image_tasks
= {
367 for t
in full_task_graph
.tasks
.values()
368 if t
.attributes
["kind"] == "docker-image"
370 # include all tasks with `always_target` set
371 if parameters
["tasks_for"] == "hg-push":
372 always_target_tasks
= {
374 for t
in full_task_graph
.tasks
.values()
375 if t
.attributes
.get("always_target")
378 always_target_tasks
= set()
380 "Adding %d tasks with `always_target` attribute"
381 % (len(always_target_tasks
) - len(always_target_tasks
& target_tasks
))
383 requested_tasks
= target_tasks | docker_image_tasks | always_target_tasks
384 target_graph
= full_task_graph
.graph
.transitive_closure(requested_tasks
)
385 target_task_graph
= TaskGraph(
386 {l
: all_tasks
[l
] for l
in target_graph
.nodes
}, target_graph
389 "target_task_graph", target_task_graph
, graph_config
, parameters
392 logger
.info("Generating optimized task graph")
393 existing_tasks
= parameters
.get("existing_tasks")
394 do_not_optimize
= set(parameters
.get("do_not_optimize", []))
395 if not parameters
.get("optimize_target_tasks", True):
396 do_not_optimize
= set(target_task_set
.graph
.nodes
).union(do_not_optimize
)
398 # this is used for testing experimental optimization strategies
399 strategies
= os
.environ
.get(
400 "TASKGRAPH_OPTIMIZE_STRATEGIES", parameters
.get("optimize_strategies")
403 strategies
= find_object(strategies
)
405 optimized_task_graph
, label_to_taskid
= optimize_task_graph(
410 self
._decision
_task
_id
,
411 existing_tasks
=existing_tasks
,
412 strategy_override
=strategies
,
416 "optimized_task_graph", optimized_task_graph
, graph_config
, parameters
419 morphed_task_graph
, label_to_taskid
= morph(
420 optimized_task_graph
,
424 self
._decision
_task
_id
,
427 yield "label_to_taskid", label_to_taskid
429 "morphed_task_graph", morphed_task_graph
, graph_config
, parameters
432 def _run_until(self
, name
):
433 while name
not in self
._run
_results
:
435 k
, v
= next(self
._run
)
436 except StopIteration:
437 raise AttributeError(f
"No such run result {name}")
438 self
._run
_results
[k
] = v
439 return self
._run
_results
[name
]
441 def verify_parameters(self
, parameters
):
442 if not parameters
.strict
:
445 parameters_dict
= dict(**parameters
)
447 filename
="parameters.rst",
448 identifiers
=list(parameters_dict
),
449 appearing_as
="inline-literal",
452 def verify_kinds(self
, kinds
):
454 filename
="kinds.rst", identifiers
=kinds
.keys(), appearing_as
="heading"
457 def verify_attributes(self
, all_tasks
):
458 attribute_set
= set()
459 for label
, task
in all_tasks
.items():
460 attribute_set
.update(task
.attributes
.keys())
462 filename
="attributes.rst",
463 identifiers
=list(attribute_set
),
464 appearing_as
="heading",
467 def verify_run_using(self
):
468 from .transforms
.job
import registry
471 filename
="transforms.rst",
472 identifiers
=registry
.keys(),
473 appearing_as
="inline-literal",
477 def load_tasks_for_kind(parameters
, kind
, root_dir
=None):
479 Get all the tasks of a given kind.
481 This function is designed to be called from outside of taskgraph.
483 # make parameters read-write
484 parameters
= dict(parameters
)
485 parameters
["target-kind"] = kind
486 parameters
= parameters_loader(spec
=None, strict
=False, overrides
=parameters
)
487 tgg
= TaskGraphGenerator(root_dir
=root_dir
, parameters
=parameters
)
489 task
.task
["metadata"]["name"]: task
490 for task
in tgg
.full_task_set