1 # This Source Code Form is subject to the terms of the Mozilla Public
2 # License, v. 2.0. If a copy of the MPL was not distributed with this
3 # file, You can obtain one at http://mozilla.org/MPL/2.0/.
5 from __future__
import absolute_import
, print_function
, unicode_literals
7 import concurrent
.futures
as futures
9 import requests
.adapters
15 from slugid
import nice
as slugid
16 from taskgraph
.util
.parameterization
import resolve_timestamps
17 from taskgraph
.util
.time
import current_json_time
18 from taskgraph
.util
.taskcluster
import get_session
, CONCURRENCY
20 logger
= logging
.getLogger(__name__
)
22 # this is set to true for `mach taskgraph action-callback --test`
26 def create_tasks(taskgraph
, label_to_taskid
, params
, decision_task_id
=None):
27 taskid_to_label
= {t
: l
for l
, t
in label_to_taskid
.iteritems()}
30 decision_task_id
= decision_task_id
or os
.environ
.get('TASK_ID')
32 # when running as an actual decision task, we use the decision task's
33 # taskId as the taskGroupId. The process that created the decision task
34 # helpfully placed it in this same taskGroup. If there is no $TASK_ID,
35 # fall back to a slugid
36 task_group_id
= decision_task_id
or slugid()
37 scheduler_id
= 'gecko-level-{}'.format(params
['level'])
39 # Add the taskGroupId, schedulerId and optionally the decision task
41 for task_id
in taskgraph
.graph
.nodes
:
42 task_def
= taskgraph
.tasks
[task_id
].task
44 # if this task has no dependencies *within* this taskgraph, make it
45 # depend on this decision task. If it has another dependency within
46 # the taskgraph, then it already implicitly depends on the decision
47 # task. The result is that tasks do not start immediately. if this
48 # loop fails halfway through, none of the already-created tasks run.
50 if not any(t
in taskgraph
.tasks
for t
in task_def
.get('dependencies', [])):
51 task_def
.setdefault('dependencies', []).append(decision_task_id
)
53 task_def
['taskGroupId'] = task_group_id
54 task_def
['schedulerId'] = scheduler_id
56 # If `testing` is True, then run without parallelization
57 concurrency
= CONCURRENCY
if not testing
else 1
58 session
= get_session()
59 with futures
.ThreadPoolExecutor(concurrency
) as e
:
62 # We can't submit a task until its dependencies have been submitted.
63 # So our strategy is to walk the graph and submit tasks once all
64 # their dependencies have been submitted.
65 tasklist
= set(taskgraph
.graph
.visit_postorder())
66 alltasks
= tasklist
.copy()
69 # bail out early if any futures have failed
70 if any(f
.done() and f
.exception() for f
in fs
.values()):
76 def submit(task_id
, label
, task_def
):
77 fut
= e
.submit(create_task
, session
, task_id
, label
, task_def
)
81 for task_id
in tasklist
:
82 task_def
= taskgraph
.tasks
[task_id
].task
83 # If we haven't finished submitting all our dependencies yet,
84 # come back to this later.
85 # Some dependencies aren't in our graph, so make sure to filter
87 deps
= set(task_def
.get('dependencies', [])) & alltasks
88 if any((d
not in fs
or not fs
[d
].done()) for d
in deps
):
91 submit(task_id
, taskid_to_label
[task_id
], task_def
)
92 to_remove
.add(task_id
)
94 # Schedule tasks as many times as task_duplicates indicates
95 attributes
= taskgraph
.tasks
[task_id
].attributes
96 for i
in range(1, attributes
.get('task_duplicates', 1)):
97 # We use slugid() since we want a distinct task id
98 submit(slugid(), taskid_to_label
[task_id
], task_def
)
99 tasklist
.difference_update(to_remove
)
101 # as each of those futures complete, try to schedule more tasks
102 for f
in futures
.as_completed(new
):
105 # start scheduling tasks and run until everything is scheduled
108 # check the result of each future, raising an exception if it failed
109 for f
in futures
.as_completed(fs
.values()):
113 def create_task(session
, task_id
, label
, task_def
):
114 # create the task using 'http://taskcluster/queue', which is proxied to the queue service
115 # with credentials appropriate to this job.
118 now
= current_json_time(datetime_format
=True)
119 task_def
= resolve_timestamps(now
, task_def
)
122 json
.dump([task_id
, task_def
], sys
.stdout
,
123 sort_keys
=True, indent
=4, separators
=(',', ': '))
128 logger
.debug("Creating task with taskId {} for {}".format(task_id
, label
))
129 res
= session
.put('http://taskcluster/queue/v1/task/{}'.format(task_id
),
130 data
=json
.dumps(task_def
))
131 if res
.status_code
!= 200:
133 logger
.error(res
.json()['message'])
135 logger
.error(res
.text
)
136 res
.raise_for_status()