3 # Thomas Nagy, 2005-2018 (ita)
6 Runner.py: Task scheduling and execution
9 import heapq
, traceback
11 from queue
import Queue
, PriorityQueue
13 from Queue
import Queue
15 from Queue
import PriorityQueue
17 class PriorityQueue(Queue
):
18 def _init(self
, maxsize
):
19 self
.maxsize
= maxsize
22 heapq
.heappush(self
.queue
, item
)
24 return heapq
.heappop(self
.queue
)
26 from waflib
import Utils
, Task
, Errors
, Logs
30 Wait for at least ``GAP * njobs`` before trying to enqueue more tasks to run
33 class PriorityTasks(object):
42 def append(self
, task
):
43 heapq
.heappush(self
.lst
, task
)
44 def appendleft(self
, task
):
45 "Deprecated, do not use"
46 heapq
.heappush(self
.lst
, task
)
48 return heapq
.heappop(self
.lst
)
49 def extend(self
, lst
):
54 if isinstance(lst
, list):
60 class Consumer(Utils
.threading
.Thread
):
62 Daemon thread object that executes a task. It shares a semaphore with
63 the coordinator :py:class:`waflib.Runner.Spawner`. There is one
64 instance per task to consume.
66 def __init__(self
, spawner
, task
):
67 Utils
.threading
.Thread
.__init
__(self
)
70 self
.spawner
= spawner
71 """Coordinator object"""
76 Processes a single task
79 if not self
.spawner
.master
.stop
:
80 self
.spawner
.master
.process_task(self
.task
)
82 self
.spawner
.sem
.release()
83 self
.spawner
.master
.out
.put(self
.task
)
87 class Spawner(Utils
.threading
.Thread
):
89 Daemon thread that consumes tasks from :py:class:`waflib.Runner.Parallel` producer and
90 spawns a consuming thread :py:class:`waflib.Runner.Consumer` for each
91 :py:class:`waflib.Task.Task` instance.
93 def __init__(self
, master
):
94 Utils
.threading
.Thread
.__init
__(self
)
96 """:py:class:`waflib.Runner.Parallel` producer instance"""
97 self
.sem
= Utils
.threading
.Semaphore(master
.numjobs
)
98 """Bounded semaphore that prevents spawning more than *n* concurrent consumers"""
103 Spawns new consumers to execute tasks by delegating to :py:meth:`waflib.Runner.Spawner.loop`
108 # Python 2 prints unnecessary messages when shutting down
109 # we also want to stop the thread properly
113 Consumes task objects from the producer; ends when the producer has no more
118 task
= master
.ready
.get()
121 task
.log_display(task
.generator
.bld
)
124 class Parallel(object):
126 Schedule the tasks obtained from the build context for execution.
128 def __init__(self
, bld
, j
=2):
130 The initialization requires a build context reference
131 for computing the total number of jobs.
136 Amount of parallel consumers to use
141 Instance of :py:class:`waflib.Build.BuildContext`
144 self
.outstanding
= PriorityTasks()
145 """Heap of :py:class:`waflib.Task.Task` that may be ready to be executed"""
147 self
.postponed
= PriorityTasks()
148 """Heap of :py:class:`waflib.Task.Task` which are not ready to run for non-DAG reasons"""
150 self
.incomplete
= set()
151 """List of :py:class:`waflib.Task.Task` waiting for dependent tasks to complete (DAG)"""
153 self
.ready
= PriorityQueue(0)
154 """List of :py:class:`waflib.Task.Task` ready to be executed by consumers"""
157 """List of :py:class:`waflib.Task.Task` returned by the task consumers"""
160 """Amount of tasks that may be processed by :py:class:`waflib.Runner.TaskConsumer`"""
163 """Amount of tasks processed"""
166 """Error flag to stop the build"""
169 """Tasks that could not be executed"""
172 """Task iterator which must give groups of parallelizable tasks when calling ``next()``"""
176 Flag that indicates that the build cache must be saved when a task was executed
177 (calls :py:meth:`waflib.Build.BuildContext.store`)"""
179 self
.revdeps
= Utils
.defaultdict(set)
181 The reverse dependency graph of dependencies obtained from Task.run_after
184 self
.spawner
= Spawner(self
)
186 Coordinating daemon thread that spawns thread consumers
189 def get_next_task(self
):
191 Obtains the next Task instance to run
193 :rtype: :py:class:`waflib.Task.Task`
195 if not self
.outstanding
:
197 return self
.outstanding
.pop()
199 def postpone(self
, tsk
):
201 Adds the task to the list :py:attr:`waflib.Runner.Parallel.postponed`.
202 The order is scrambled so as to consume as many tasks in parallel as possible.
204 :param tsk: task instance
205 :type tsk: :py:class:`waflib.Task.Task`
207 self
.postponed
.append(tsk
)
209 def refill_task_list(self
):
211 Pulls a next group of tasks to execute in :py:attr:`waflib.Runner.Parallel.outstanding`.
212 Ensures that all tasks in the current build group are complete before processing the next one.
214 while self
.count
> self
.numjobs
* GAP
:
217 while not self
.outstanding
:
224 cond
= self
.deadlock
== self
.processed
225 except AttributeError:
229 # The most common reason is conflicting build order declaration
230 # for example: "X run_after Y" and "Y run_after X"
231 # Another can be changing "run_after" dependencies while the build is running
232 # for example: updating "tsk.run_after" in the "runnable_status" method
234 for tsk
in self
.postponed
:
235 deps
= [id(x
) for x
in tsk
.run_after
if not x
.hasrun
]
236 lst
.append('%s\t-> %r' % (repr(tsk
), deps
))
238 lst
.append('\n task %r dependencies are done, check its *runnable_status*?' % id(tsk
))
239 raise Errors
.WafError('Deadlock detected: check the task build order%s' % ''.join(lst
))
240 self
.deadlock
= self
.processed
243 self
.outstanding
.extend(self
.postponed
)
244 self
.postponed
.clear()
247 for x
in self
.incomplete
:
248 for k
in x
.run_after
:
252 # dependency added after the build started without updating revdeps
253 self
.incomplete
.remove(x
)
254 self
.outstanding
.append(x
)
257 raise Errors
.WafError('Broken revdeps detected on %r' % self
.incomplete
)
259 tasks
= next(self
.biter
)
260 ready
, waiting
= self
.prio_and_split(tasks
)
261 self
.outstanding
.extend(ready
)
262 self
.incomplete
.update(waiting
)
263 self
.total
= self
.bld
.total()
266 def add_more_tasks(self
, tsk
):
268 If a task provides :py:attr:`waflib.Task.Task.more_tasks`, then the tasks contained
269 in that list are added to the current build and will be processed before the next build group.
271 The priorities for dependent tasks are not re-calculated globally
273 :param tsk: task instance
274 :type tsk: :py:attr:`waflib.Task.Task`
276 if getattr(tsk
, 'more_tasks', None):
277 more
= set(tsk
.more_tasks
)
285 # Update the dependency tree
286 # this assumes that task.run_after values were updated
287 for x
in iteri(self
.outstanding
, self
.incomplete
):
288 for k
in x
.run_after
:
289 if isinstance(k
, Task
.TaskGroup
):
290 if k
not in groups_done
:
292 for j
in k
.prev
& more
:
293 self
.revdeps
[j
].add(k
)
295 self
.revdeps
[k
].add(x
)
297 ready
, waiting
= self
.prio_and_split(tsk
.more_tasks
)
298 self
.outstanding
.extend(ready
)
299 self
.incomplete
.update(waiting
)
300 self
.total
+= len(tsk
.more_tasks
)
302 def mark_finished(self
, tsk
):
304 # DAG ancestors are likely to be in the incomplete set
305 # This assumes that the run_after contents have not changed
306 # after the build starts, else a deadlock may occur
307 if x
in self
.incomplete
:
308 # TODO remove dependencies to free some memory?
309 # x.run_after.remove(tsk)
310 for k
in x
.run_after
:
314 self
.incomplete
.remove(x
)
315 self
.outstanding
.append(x
)
317 if tsk
in self
.revdeps
:
318 for x
in self
.revdeps
[tsk
]:
319 if isinstance(x
, Task
.TaskGroup
):
323 # TODO necessary optimization?
324 k
.run_after
.remove(x
)
326 # TODO necessary optimization?
330 del self
.revdeps
[tsk
]
332 if hasattr(tsk
, 'semaphore'):
335 while sem
.waiting
and not sem
.is_locked():
336 # take a frozen task, make it ready to run
337 x
= sem
.waiting
.pop()
342 Waits for a Task that task consumers add to :py:attr:`waflib.Runner.Parallel.out` after execution.
343 Adds more Tasks if necessary through :py:attr:`waflib.Runner.Parallel.add_more_tasks`.
345 :rtype: :py:attr:`waflib.Task.Task`
349 self
.add_more_tasks(tsk
)
350 self
.mark_finished(tsk
)
356 def add_task(self
, tsk
):
358 Enqueue a Task to :py:attr:`waflib.Runner.Parallel.ready` so that consumers can run them.
360 :param tsk: task instance
361 :type tsk: :py:attr:`waflib.Task.Task`
363 # TODO change in waf 2.1
366 def _add_task(self
, tsk
):
367 if hasattr(tsk
, 'semaphore'):
377 if self
.numjobs
== 1:
378 tsk
.log_display(tsk
.generator
.bld
)
380 self
.process_task(tsk
)
386 def process_task(self
, tsk
):
388 Processes a task and attempts to stop the build in case of errors
391 if tsk
.hasrun
!= Task
.SUCCESS
:
392 self
.error_handler(tsk
)
396 Mark a task as skipped/up-to-date
398 tsk
.hasrun
= Task
.SKIPPED
399 self
.mark_finished(tsk
)
401 def cancel(self
, tsk
):
403 Mark a task as failed because of unsatisfiable dependencies
405 tsk
.hasrun
= Task
.CANCELED
406 self
.mark_finished(tsk
)
408 def error_handler(self
, tsk
):
410 Called when a task cannot be executed. The flag :py:attr:`waflib.Runner.Parallel.stop` is set,
411 unless the build is executed with::
415 :param tsk: task instance
416 :type tsk: :py:attr:`waflib.Task.Task`
418 if not self
.bld
.keep
:
420 self
.error
.append(tsk
)
422 def task_status(self
, tsk
):
424 Obtains the task status to decide whether to run it immediately or not.
426 :return: the exit status, for example :py:attr:`waflib.Task.ASK_LATER`
430 return tsk
.runnable_status()
433 tsk
.err_msg
= traceback
.format_exc()
434 if not self
.stop
and self
.bld
.keep
:
436 if self
.bld
.keep
== 1:
437 # if -k stop on the first exception, if -kk try to go as far as possible
438 if Logs
.verbose
> 1 or not self
.error
:
439 self
.error
.append(tsk
)
443 self
.error
.append(tsk
)
444 return Task
.EXCEPTION
446 tsk
.hasrun
= Task
.EXCEPTION
447 self
.error_handler(tsk
)
449 return Task
.EXCEPTION
453 Obtains Task instances from the BuildContext instance and adds the ones that need to be executed to
454 :py:class:`waflib.Runner.Parallel.ready` so that the :py:class:`waflib.Runner.Spawner` consumer thread
455 has them executed. Obtains the executed Tasks back from :py:class:`waflib.Runner.Parallel.out`
456 and marks the build as failed by setting the ``stop`` flag.
457 If only one job is used, then executes the tasks one by one, without consumers.
459 self
.total
= self
.bld
.total()
463 self
.refill_task_list()
465 # consider the next task
466 tsk
= self
.get_next_task()
469 # tasks may add new ones after they are run
472 # no tasks to run, no tasks running, time to exit
476 # if the task is marked as "run", just skip it
480 if self
.stop
: # stop immediately after a failure is detected
483 st
= self
.task_status(tsk
)
484 if st
== Task
.RUN_ME
:
486 elif st
== Task
.ASK_LATER
:
488 elif st
== Task
.SKIP_ME
:
491 self
.add_more_tasks(tsk
)
492 elif st
== Task
.CANCEL_ME
:
493 # A dependency problem has occurred, and the
494 # build is most likely run with `waf -k`
496 self
.error
.append(tsk
)
500 # self.count represents the tasks that have been made available to the consumer threads
501 # collect all the tasks after an error else the message may be incomplete
502 while self
.error
and self
.count
:
507 assert not self
.count
508 assert not self
.postponed
509 assert not self
.incomplete
511 def prio_and_split(self
, tasks
):
513 Label input tasks with priority values, and return a pair containing
514 the tasks that are ready to run and the tasks that are necessarily
515 waiting for other tasks to complete.
517 The priority system is really meant as an optional layer for optimization:
518 dependency cycles are found quickly, and builds should be more efficient.
519 A high priority number means that a task is processed first.
521 This method can be overridden to disable the priority system::
523 def prio_and_split(self, tasks):
526 :return: A pair of task lists
534 reverse
= self
.revdeps
538 for k
in x
.run_after
:
539 if isinstance(k
, Task
.TaskGroup
):
540 if k
not in groups_done
:
547 # the priority number is not the tree depth
549 if isinstance(n
, Task
.TaskGroup
):
550 return sum(visit(k
) for k
in n
.next
)
557 n
.prio_order
= n
.tree_weight
+ len(rev
) + sum(visit(k
) for k
in rev
)
559 n
.prio_order
= n
.tree_weight
563 raise Errors
.WafError('Dependency cycle found!')
568 # must visit all to detect cycles
572 except Errors
.WafError
:
573 self
.debug_cycles(tasks
, reverse
)
578 for k
in x
.run_after
:
584 return (ready
, waiting
)
586 def debug_cycles(self
, tasks
, reverse
):
592 if isinstance(n
, Task
.TaskGroup
):
598 for k
in reverse
.get(n
, []):
604 lst
.append(repr(tsk
))
606 # exclude prior nodes, we want the minimum cycle
608 raise Errors
.WafError('Task dependency cycle in "run_after" constraints: %s' % ''.join(lst
))