1 # This Source Code Form is subject to the terms of the Mozilla Public
2 # License, v. 2.0. If a copy of the MPL was not distributed with this
3 # file, You can obtain one at http://mozilla.org/MPL/2.0/.
5 from __future__
import absolute_import
, print_function
, unicode_literals
7 import concurrent
.futures
as futures
9 import requests
.adapters
15 from slugid
import nice
as slugid
16 from taskgraph
.util
.parameterization
import resolve_timestamps
17 from taskgraph
.util
.time
import current_json_time
19 logger
= logging
.getLogger(__name__
)
21 # the maximum number of parallel createTask calls to make
24 # this is set to true for `mach taskgraph action-callback --test`
28 def create_tasks(taskgraph
, label_to_taskid
, params
, decision_task_id
=None):
29 taskid_to_label
= {t
: l
for l
, t
in label_to_taskid
.iteritems()}
31 session
= requests
.Session()
33 # Default HTTPAdapter uses 10 connections. Mount custom adapter to increase
34 # that limit. Connections are established as needed, so using a large value
35 # should not negatively impact performance.
36 http_adapter
= requests
.adapters
.HTTPAdapter(pool_connections
=CONCURRENCY
,
37 pool_maxsize
=CONCURRENCY
)
38 session
.mount('https://', http_adapter
)
39 session
.mount('http://', http_adapter
)
41 decision_task_id
= decision_task_id
or os
.environ
.get('TASK_ID')
43 # when running as an actual decision task, we use the decision task's
44 # taskId as the taskGroupId. The process that created the decision task
45 # helpfully placed it in this same taskGroup. If there is no $TASK_ID,
46 # fall back to a slugid
47 task_group_id
= decision_task_id
or slugid()
48 scheduler_id
= 'gecko-level-{}'.format(params
['level'])
50 # Add the taskGroupId, schedulerId and optionally the decision task
52 for task_id
in taskgraph
.graph
.nodes
:
53 task_def
= taskgraph
.tasks
[task_id
].task
55 # if this task has no dependencies *within* this taskgraph, make it
56 # depend on this decision task. If it has another dependency within
57 # the taskgraph, then it already implicitly depends on the decision
58 # task. The result is that tasks do not start immediately. if this
59 # loop fails halfway through, none of the already-created tasks run.
61 if not any(t
in taskgraph
.tasks
for t
in task_def
.get('dependencies', [])):
62 task_def
.setdefault('dependencies', []).append(decision_task_id
)
64 task_def
['taskGroupId'] = task_group_id
65 task_def
['schedulerId'] = scheduler_id
67 # If `testing` is True, then run without parallelization
68 concurrency
= CONCURRENCY
if not testing
else 1
69 with futures
.ThreadPoolExecutor(concurrency
) as e
:
72 # We can't submit a task until its dependencies have been submitted.
73 # So our strategy is to walk the graph and submit tasks once all
74 # their dependencies have been submitted.
75 tasklist
= set(taskgraph
.graph
.visit_postorder())
76 alltasks
= tasklist
.copy()
79 # bail out early if any futures have failed
80 if any(f
.done() and f
.exception() for f
in fs
.values()):
86 def submit(task_id
, label
, task_def
):
87 fut
= e
.submit(create_task
, session
, task_id
, label
, task_def
)
91 for task_id
in tasklist
:
92 task_def
= taskgraph
.tasks
[task_id
].task
93 # If we haven't finished submitting all our dependencies yet,
94 # come back to this later.
95 # Some dependencies aren't in our graph, so make sure to filter
97 deps
= set(task_def
.get('dependencies', [])) & alltasks
98 if any((d
not in fs
or not fs
[d
].done()) for d
in deps
):
101 submit(task_id
, taskid_to_label
[task_id
], task_def
)
102 to_remove
.add(task_id
)
104 # Schedule tasks as many times as task_duplicates indicates
105 attributes
= taskgraph
.tasks
[task_id
].attributes
106 for i
in range(1, attributes
.get('task_duplicates', 1)):
107 # We use slugid() since we want a distinct task id
108 submit(slugid(), taskid_to_label
[task_id
], task_def
)
109 tasklist
.difference_update(to_remove
)
111 # as each of those futures complete, try to schedule more tasks
112 for f
in futures
.as_completed(new
):
115 # start scheduling tasks and run until everything is scheduled
118 # check the result of each future, raising an exception if it failed
119 for f
in futures
.as_completed(fs
.values()):
123 def create_task(session
, task_id
, label
, task_def
):
124 # create the task using 'http://taskcluster/queue', which is proxied to the queue service
125 # with credentials appropriate to this job.
128 now
= current_json_time(datetime_format
=True)
129 task_def
= resolve_timestamps(now
, task_def
)
132 json
.dump([task_id
, task_def
], sys
.stdout
,
133 sort_keys
=True, indent
=4, separators
=(',', ': '))
138 logger
.debug("Creating task with taskId {} for {}".format(task_id
, label
))
139 res
= session
.put('http://taskcluster/queue/v1/task/{}'.format(task_id
),
140 data
=json
.dumps(task_def
))
141 if res
.status_code
!= 200:
143 logger
.error(res
.json()['message'])
145 logger
.error(res
.text
)
146 res
.raise_for_status()