1 # This Source Code Form is subject to the terms of the Mozilla Public
2 # License, v. 2.0. If a copy of the MPL was not distributed with this
3 # file, You can obtain one at http://mozilla.org/MPL/2.0/.
5 The objective of optimization is to remove as many tasks from the graph as
6 possible, as efficiently as possible, thereby delivering useful results as
7 quickly as possible. For example, ideally if only a test script is modified in
8 a push, then the resulting graph contains only the corresponding test suite
11 See ``taskcluster/docs/optimization.rst`` for more information.
14 from __future__
import absolute_import
, print_function
, unicode_literals
18 from collections
import defaultdict
20 from .graph
import Graph
21 from . import files_changed
22 from .taskgraph
import TaskGraph
23 from .util
.seta
import is_low_value_task
24 from .util
.perfile
import perfile_number_of_chunks
25 from .util
.taskcluster
import find_task_id
26 from .util
.parameterization
import resolve_task_references
27 from mozbuild
.util
import memoize
28 from slugid
import nice
as slugid
29 from mozbuild
.base
import MozbuildObject
31 logger
= logging
.getLogger(__name__
)
33 TOPSRCDIR
= os
.path
.abspath(os
.path
.join(__file__
, '../../../'))
36 def optimize_task_graph(target_task_graph
, params
, do_not_optimize
,
37 existing_tasks
=None, strategies
=None):
39 Perform task optimization, returning a taskgraph and a map from label to
40 assigned taskId, including replacement tasks.
43 if not existing_tasks
:
46 # instantiate the strategies for this optimization process
48 strategies
= _make_default_strategies()
50 optimizations
= _get_optimizations(target_task_graph
, strategies
)
52 removed_tasks
= remove_tasks(
53 target_task_graph
=target_task_graph
,
54 optimizations
=optimizations
,
56 do_not_optimize
=do_not_optimize
)
58 replaced_tasks
= replace_tasks(
59 target_task_graph
=target_task_graph
,
60 optimizations
=optimizations
,
62 do_not_optimize
=do_not_optimize
,
63 label_to_taskid
=label_to_taskid
,
64 existing_tasks
=existing_tasks
,
65 removed_tasks
=removed_tasks
)
68 target_task_graph
, removed_tasks
, replaced_tasks
,
69 label_to_taskid
), label_to_taskid
72 def _make_default_strategies():
74 'never': OptimizationStrategy(), # "never" is the default behavior
75 'index-search': IndexSearch(),
77 'skip-unless-changed': SkipUnlessChanged(),
78 'skip-unless-schedules': SkipUnlessSchedules(),
79 'skip-unless-schedules-or-seta': Either(SkipUnlessSchedules(), SETA()),
80 'only-if-dependencies-run': OnlyIfDependenciesRun(),
84 def _get_optimizations(target_task_graph
, strategies
):
85 def optimizations(label
):
86 task
= target_task_graph
.tasks
[label
]
88 opt_by
, arg
= task
.optimization
.items()[0]
89 return (opt_by
, strategies
[opt_by
], arg
)
91 return ('never', strategies
['never'], None)
95 def _log_optimization(verb
, opt_counts
):
98 '{} '.format(verb
.title()) +
100 '{} tasks by {}'.format(c
, b
)
101 for b
, c
in sorted(opt_counts
.iteritems())) +
102 ' during optimization.')
104 logger
.info('No tasks {} during optimization'.format(verb
))
107 def remove_tasks(target_task_graph
, params
, optimizations
, do_not_optimize
):
109 Implement the "Removing Tasks" phase, returning a set of task labels of all removed tasks.
111 opt_counts
= defaultdict(int)
113 reverse_links_dict
= target_task_graph
.graph
.reverse_links_dict()
115 for label
in target_task_graph
.graph
.visit_preorder():
116 # if we're not allowed to optimize, that's easy..
117 if label
in do_not_optimize
:
120 # if there are remaining tasks depending on this one, do not remove..
121 if any(l
not in removed
for l
in reverse_links_dict
[label
]):
124 # call the optimization strategy
125 task
= target_task_graph
.tasks
[label
]
126 opt_by
, opt
, arg
= optimizations(label
)
127 if opt
.should_remove_task(task
, params
, arg
):
129 opt_counts
[opt_by
] += 1
132 _log_optimization('removed', opt_counts
)
136 def replace_tasks(target_task_graph
, params
, optimizations
, do_not_optimize
,
137 label_to_taskid
, removed_tasks
, existing_tasks
):
139 Implement the "Replacing Tasks" phase, returning a set of task labels of
140 all replaced tasks. The replacement taskIds are added to label_to_taskid as
143 opt_counts
= defaultdict(int)
145 links_dict
= target_task_graph
.graph
.links_dict()
147 for label
in target_task_graph
.graph
.visit_postorder():
148 # if we're not allowed to optimize, that's easy..
149 if label
in do_not_optimize
:
152 # if this task depends on un-replaced, un-removed tasks, do not replace
153 if any(l
not in replaced
and l
not in removed_tasks
for l
in links_dict
[label
]):
156 # if the task already exists, that's an easy replacement
157 repl
= existing_tasks
.get(label
)
159 label_to_taskid
[label
] = repl
161 opt_counts
['existing_tasks'] += 1
164 # call the optimization strategy
165 task
= target_task_graph
.tasks
[label
]
166 opt_by
, opt
, arg
= optimizations(label
)
167 repl
= opt
.should_replace_task(task
, params
, arg
)
170 # True means remove this task; get_subgraph will catch any
171 # problems with removed tasks being depended on
172 removed_tasks
.add(label
)
174 label_to_taskid
[label
] = repl
176 opt_counts
[opt_by
] += 1
179 _log_optimization('replaced', opt_counts
)
183 def get_subgraph(target_task_graph
, removed_tasks
, replaced_tasks
, label_to_taskid
):
185 Return the subgraph of target_task_graph consisting only of
186 non-optimized tasks and edges between them.
188 To avoid losing track of taskIds for tasks optimized away, this method
189 simultaneously substitutes real taskIds for task labels in the graph, and
190 populates each task definition's `dependencies` key with the appropriate
191 taskIds. Task references are resolved in the process.
194 # check for any dependency edges from included to removed tasks
195 bad_edges
= [(l
, r
, n
) for l
, r
, n
in target_task_graph
.graph
.edges
196 if l
not in removed_tasks
and r
in removed_tasks
]
198 probs
= ', '.join('{} depends on {} as {} but it has been removed'.format(l
, r
, n
)
199 for l
, r
, n
in bad_edges
)
200 raise Exception("Optimization error: " + probs
)
202 # fill in label_to_taskid for anything not removed or replaced
203 assert replaced_tasks
<= set(label_to_taskid
)
204 for label
in sorted(target_task_graph
.graph
.nodes
- removed_tasks
- set(label_to_taskid
)):
205 label_to_taskid
[label
] = slugid()
207 # resolve labels to taskIds and populate task['dependencies']
209 named_links_dict
= target_task_graph
.graph
.named_links_dict()
210 omit
= removed_tasks | replaced_tasks
211 for label
, task
in target_task_graph
.tasks
.iteritems():
214 task
.task_id
= label_to_taskid
[label
]
215 named_task_dependencies
= {
216 name
: label_to_taskid
[label
]
217 for name
, label
in named_links_dict
.get(label
, {}).iteritems()}
218 task
.task
= resolve_task_references(task
.label
, task
.task
, named_task_dependencies
)
219 deps
= task
.task
.setdefault('dependencies', [])
220 deps
.extend(sorted(named_task_dependencies
.itervalues()))
221 tasks_by_taskid
[task
.task_id
] = task
223 # resolve edges to taskIds
225 (label_to_taskid
.get(left
), label_to_taskid
.get(right
), name
)
226 for (left
, right
, name
) in target_task_graph
.graph
.edges
228 # ..and drop edges that are no longer entirely in the task graph
229 # (note that this omits edges to replaced tasks, but they are still in task.dependnecies)
230 edges_by_taskid
= set(
232 for (left
, right
, name
) in edges_by_taskid
233 if left
in tasks_by_taskid
and right
in tasks_by_taskid
238 Graph(set(tasks_by_taskid
), edges_by_taskid
))
241 class OptimizationStrategy(object):
242 def should_remove_task(self
, task
, params
, arg
):
243 """Determine whether to optimize this task by removing it. Returns
247 def should_replace_task(self
, task
, params
, arg
):
248 """Determine whether to optimize this task by replacing it. Returns a
249 taskId to replace this task, True to replace with nothing, or False to
254 class Either(OptimizationStrategy
):
255 """Given one or more optimization strategies, remove a task if any of them
256 says to, and replace with a task if any finds a replacement (preferring the
257 earliest). By default, each substrategy gets the same arg, but split_args
258 can return a list of args for each strategy, if desired."""
259 def __init__(self
, *substrategies
, **kwargs
):
260 self
.substrategies
= substrategies
261 self
.split_args
= kwargs
.pop('split_args', None)
262 if not self
.split_args
:
263 self
.split_args
= lambda arg
: [arg
] * len(substrategies
)
265 raise TypeError("unexpected keyword args")
267 def _for_substrategies(self
, arg
, fn
):
268 for sub
, arg
in zip(self
.substrategies
, self
.split_args(arg
)):
274 def should_remove_task(self
, task
, params
, arg
):
275 return self
._for
_substrategies
(
277 lambda sub
, arg
: sub
.should_remove_task(task
, params
, arg
))
279 def should_replace_task(self
, task
, params
, arg
):
280 return self
._for
_substrategies
(
282 lambda sub
, arg
: sub
.should_replace_task(task
, params
, arg
))
285 class OnlyIfDependenciesRun(OptimizationStrategy
):
286 """Run this taks only if its dependencies run."""
288 # This takes advantage of the behavior of the second phase of optimization:
289 # a task can only be replaced if it has no un-optimized dependencies. So if
290 # should_replace_task is called, then a task has no un-optimized
291 # dependencies and can be removed (indicated by returning True)
293 def should_replace_task(self
, task
, params
, arg
):
297 class IndexSearch(OptimizationStrategy
):
299 # A task with no dependencies remaining after optimization will be replaced
300 # if artifacts exist for the corresponding index_paths.
301 # Otherwise, we're in one of the following cases:
302 # - the task has un-optimized dependencies
303 # - the artifacts have expired
304 # - some changes altered the index_paths and new artifacts need to be
306 # In every of those cases, we need to run the task to create or refresh
309 def should_replace_task(self
, task
, params
, index_paths
):
310 "Look for a task with one of the given index paths"
311 for index_path
in index_paths
:
313 task_id
= find_task_id(
315 use_proxy
=bool(os
.environ
.get('TASK_ID')))
318 # 404 will end up here and go on to the next index path
324 class SETA(OptimizationStrategy
):
325 def should_remove_task(self
, task
, params
, _
):
328 # we would like to return 'False, None' while it's high_value_task
329 # and we wouldn't optimize it. Otherwise, it will return 'True, None'
330 if is_low_value_task(label
,
331 params
.get('project'),
332 params
.get('pushlog_id'),
333 params
.get('pushdate')):
334 # Always optimize away low-value tasks
340 class SkipUnlessChanged(OptimizationStrategy
):
341 def should_remove_task(self
, task
, params
, file_patterns
):
342 # pushlog_id == -1 - this is the case when run from a cron.yml job
343 if params
.get('pushlog_id') == -1:
346 changed
= files_changed
.check(params
, file_patterns
)
348 logger
.debug('no files found matching a pattern in `skip-unless-changed` for ' +
354 class SkipUnlessSchedules(OptimizationStrategy
):
357 def scheduled_by_push(self
, repository
, revision
):
358 changed_files
= files_changed
.get_changed_files(repository
, revision
)
360 mbo
= MozbuildObject
.from_environment()
361 # the decision task has a sparse checkout, so, mozbuild_reader will use
362 # a MercurialRevisionFinder with revision '.', which should be the same
363 # as `revision`; in other circumstances, it will use a default reader
364 rdr
= mbo
.mozbuild_reader(config_mode
='empty')
367 for p
, m
in rdr
.files_info(changed_files
).items():
368 components |
= set(m
['SCHEDULES'].components
)
372 def should_remove_task(self
, task
, params
, conditions
):
373 if params
.get('pushlog_id') == -1:
376 scheduled
= self
.scheduled_by_push(params
['head_repository'], params
['head_rev'])
377 conditions
= set(conditions
)
378 # if *any* of the condition components are scheduled, do not optimize
379 if conditions
& scheduled
:
385 class TestVerify(OptimizationStrategy
):
386 def should_remove_task(self
, task
, params
, _
):
387 # we would like to return 'False, None' while it's high_value_task
388 # and we wouldn't optimize it. Otherwise, it will return 'True, None'
389 env
= params
.get('try_task_config', {}) or {}
390 env
= env
.get('templates', {}).get('env', {})
391 if perfile_number_of_chunks(params
.is_try(),
392 env
.get('MOZHARNESS_TEST_PATHS', ''),
393 params
.get('head_repository', ''),
394 params
.get('head_rev', ''),