Set "seq" alsa midi driver to maximum resolution possible
[jack2.git] / waflib / Runner.py
blob261084d27482f6d3d29325190982e4b52a31a1a3
1 #!/usr/bin/env python
2 # encoding: utf-8
3 # Thomas Nagy, 2005-2018 (ita)
5 """
6 Runner.py: Task scheduling and execution
7 """
9 import heapq, traceback
10 try:
11 from queue import Queue, PriorityQueue
12 except ImportError:
13 from Queue import Queue
14 try:
15 from Queue import PriorityQueue
16 except ImportError:
17 class PriorityQueue(Queue):
18 def _init(self, maxsize):
19 self.maxsize = maxsize
20 self.queue = []
21 def _put(self, item):
22 heapq.heappush(self.queue, item)
23 def _get(self):
24 return heapq.heappop(self.queue)
26 from waflib import Utils, Task, Errors, Logs
28 GAP = 5
29 """
30 Wait for at least ``GAP * njobs`` before trying to enqueue more tasks to run
31 """
33 class PriorityTasks(object):
34 def __init__(self):
35 self.lst = []
36 def __len__(self):
37 return len(self.lst)
38 def __iter__(self):
39 return iter(self.lst)
40 def clear(self):
41 self.lst = []
42 def append(self, task):
43 heapq.heappush(self.lst, task)
44 def appendleft(self, task):
45 "Deprecated, do not use"
46 heapq.heappush(self.lst, task)
47 def pop(self):
48 return heapq.heappop(self.lst)
49 def extend(self, lst):
50 if self.lst:
51 for x in lst:
52 self.append(x)
53 else:
54 if isinstance(lst, list):
55 self.lst = lst
56 heapq.heapify(lst)
57 else:
58 self.lst = lst.lst
60 class Consumer(Utils.threading.Thread):
61 """
62 Daemon thread object that executes a task. It shares a semaphore with
63 the coordinator :py:class:`waflib.Runner.Spawner`. There is one
64 instance per task to consume.
65 """
66 def __init__(self, spawner, task):
67 Utils.threading.Thread.__init__(self)
68 self.task = task
69 """Task to execute"""
70 self.spawner = spawner
71 """Coordinator object"""
72 self.setDaemon(1)
73 self.start()
74 def run(self):
75 """
76 Processes a single task
77 """
78 try:
79 if not self.spawner.master.stop:
80 self.spawner.master.process_task(self.task)
81 finally:
82 self.spawner.sem.release()
83 self.spawner.master.out.put(self.task)
84 self.task = None
85 self.spawner = None
87 class Spawner(Utils.threading.Thread):
88 """
89 Daemon thread that consumes tasks from :py:class:`waflib.Runner.Parallel` producer and
90 spawns a consuming thread :py:class:`waflib.Runner.Consumer` for each
91 :py:class:`waflib.Task.Task` instance.
92 """
93 def __init__(self, master):
94 Utils.threading.Thread.__init__(self)
95 self.master = master
96 """:py:class:`waflib.Runner.Parallel` producer instance"""
97 self.sem = Utils.threading.Semaphore(master.numjobs)
98 """Bounded semaphore that prevents spawning more than *n* concurrent consumers"""
99 self.setDaemon(1)
100 self.start()
101 def run(self):
103 Spawns new consumers to execute tasks by delegating to :py:meth:`waflib.Runner.Spawner.loop`
105 try:
106 self.loop()
107 except Exception:
108 # Python 2 prints unnecessary messages when shutting down
109 # we also want to stop the thread properly
110 pass
111 def loop(self):
113 Consumes task objects from the producer; ends when the producer has no more
114 task to provide.
116 master = self.master
117 while 1:
118 task = master.ready.get()
119 self.sem.acquire()
120 if not master.stop:
121 task.log_display(task.generator.bld)
122 Consumer(self, task)
124 class Parallel(object):
126 Schedule the tasks obtained from the build context for execution.
128 def __init__(self, bld, j=2):
130 The initialization requires a build context reference
131 for computing the total number of jobs.
134 self.numjobs = j
136 Amount of parallel consumers to use
139 self.bld = bld
141 Instance of :py:class:`waflib.Build.BuildContext`
144 self.outstanding = PriorityTasks()
145 """Heap of :py:class:`waflib.Task.Task` that may be ready to be executed"""
147 self.postponed = PriorityTasks()
148 """Heap of :py:class:`waflib.Task.Task` which are not ready to run for non-DAG reasons"""
150 self.incomplete = set()
151 """List of :py:class:`waflib.Task.Task` waiting for dependent tasks to complete (DAG)"""
153 self.ready = PriorityQueue(0)
154 """List of :py:class:`waflib.Task.Task` ready to be executed by consumers"""
156 self.out = Queue(0)
157 """List of :py:class:`waflib.Task.Task` returned by the task consumers"""
159 self.count = 0
160 """Amount of tasks that may be processed by :py:class:`waflib.Runner.TaskConsumer`"""
162 self.processed = 0
163 """Amount of tasks processed"""
165 self.stop = False
166 """Error flag to stop the build"""
168 self.error = []
169 """Tasks that could not be executed"""
171 self.biter = None
172 """Task iterator which must give groups of parallelizable tasks when calling ``next()``"""
174 self.dirty = False
176 Flag that indicates that the build cache must be saved when a task was executed
177 (calls :py:meth:`waflib.Build.BuildContext.store`)"""
179 self.revdeps = Utils.defaultdict(set)
181 The reverse dependency graph of dependencies obtained from Task.run_after
184 self.spawner = Spawner(self)
186 Coordinating daemon thread that spawns thread consumers
189 def get_next_task(self):
191 Obtains the next Task instance to run
193 :rtype: :py:class:`waflib.Task.Task`
195 if not self.outstanding:
196 return None
197 return self.outstanding.pop()
199 def postpone(self, tsk):
201 Adds the task to the list :py:attr:`waflib.Runner.Parallel.postponed`.
202 The order is scrambled so as to consume as many tasks in parallel as possible.
204 :param tsk: task instance
205 :type tsk: :py:class:`waflib.Task.Task`
207 self.postponed.append(tsk)
209 def refill_task_list(self):
211 Pulls a next group of tasks to execute in :py:attr:`waflib.Runner.Parallel.outstanding`.
212 Ensures that all tasks in the current build group are complete before processing the next one.
214 while self.count > self.numjobs * GAP:
215 self.get_out()
217 while not self.outstanding:
218 if self.count:
219 self.get_out()
220 if self.outstanding:
221 break
222 elif self.postponed:
223 try:
224 cond = self.deadlock == self.processed
225 except AttributeError:
226 pass
227 else:
228 if cond:
229 # The most common reason is conflicting build order declaration
230 # for example: "X run_after Y" and "Y run_after X"
231 # Another can be changing "run_after" dependencies while the build is running
232 # for example: updating "tsk.run_after" in the "runnable_status" method
233 lst = []
234 for tsk in self.postponed:
235 deps = [id(x) for x in tsk.run_after if not x.hasrun]
236 lst.append('%s\t-> %r' % (repr(tsk), deps))
237 if not deps:
238 lst.append('\n task %r dependencies are done, check its *runnable_status*?' % id(tsk))
239 raise Errors.WafError('Deadlock detected: check the task build order%s' % ''.join(lst))
240 self.deadlock = self.processed
242 if self.postponed:
243 self.outstanding.extend(self.postponed)
244 self.postponed.clear()
245 elif not self.count:
246 if self.incomplete:
247 for x in self.incomplete:
248 for k in x.run_after:
249 if not k.hasrun:
250 break
251 else:
252 # dependency added after the build started without updating revdeps
253 self.incomplete.remove(x)
254 self.outstanding.append(x)
255 break
256 else:
257 raise Errors.WafError('Broken revdeps detected on %r' % self.incomplete)
258 else:
259 tasks = next(self.biter)
260 ready, waiting = self.prio_and_split(tasks)
261 self.outstanding.extend(ready)
262 self.incomplete.update(waiting)
263 self.total = self.bld.total()
264 break
266 def add_more_tasks(self, tsk):
268 If a task provides :py:attr:`waflib.Task.Task.more_tasks`, then the tasks contained
269 in that list are added to the current build and will be processed before the next build group.
271 The priorities for dependent tasks are not re-calculated globally
273 :param tsk: task instance
274 :type tsk: :py:attr:`waflib.Task.Task`
276 if getattr(tsk, 'more_tasks', None):
277 more = set(tsk.more_tasks)
278 groups_done = set()
279 def iteri(a, b):
280 for x in a:
281 yield x
282 for x in b:
283 yield x
285 # Update the dependency tree
286 # this assumes that task.run_after values were updated
287 for x in iteri(self.outstanding, self.incomplete):
288 for k in x.run_after:
289 if isinstance(k, Task.TaskGroup):
290 if k not in groups_done:
291 groups_done.add(k)
292 for j in k.prev & more:
293 self.revdeps[j].add(k)
294 elif k in more:
295 self.revdeps[k].add(x)
297 ready, waiting = self.prio_and_split(tsk.more_tasks)
298 self.outstanding.extend(ready)
299 self.incomplete.update(waiting)
300 self.total += len(tsk.more_tasks)
302 def mark_finished(self, tsk):
303 def try_unfreeze(x):
304 # DAG ancestors are likely to be in the incomplete set
305 # This assumes that the run_after contents have not changed
306 # after the build starts, else a deadlock may occur
307 if x in self.incomplete:
308 # TODO remove dependencies to free some memory?
309 # x.run_after.remove(tsk)
310 for k in x.run_after:
311 if not k.hasrun:
312 break
313 else:
314 self.incomplete.remove(x)
315 self.outstanding.append(x)
317 if tsk in self.revdeps:
318 for x in self.revdeps[tsk]:
319 if isinstance(x, Task.TaskGroup):
320 x.prev.remove(tsk)
321 if not x.prev:
322 for k in x.next:
323 # TODO necessary optimization?
324 k.run_after.remove(x)
325 try_unfreeze(k)
326 # TODO necessary optimization?
327 x.next = []
328 else:
329 try_unfreeze(x)
330 del self.revdeps[tsk]
332 if hasattr(tsk, 'semaphore'):
333 sem = tsk.semaphore
334 sem.release(tsk)
335 while sem.waiting and not sem.is_locked():
336 # take a frozen task, make it ready to run
337 x = sem.waiting.pop()
338 self._add_task(x)
340 def get_out(self):
342 Waits for a Task that task consumers add to :py:attr:`waflib.Runner.Parallel.out` after execution.
343 Adds more Tasks if necessary through :py:attr:`waflib.Runner.Parallel.add_more_tasks`.
345 :rtype: :py:attr:`waflib.Task.Task`
347 tsk = self.out.get()
348 if not self.stop:
349 self.add_more_tasks(tsk)
350 self.mark_finished(tsk)
352 self.count -= 1
353 self.dirty = True
354 return tsk
356 def add_task(self, tsk):
358 Enqueue a Task to :py:attr:`waflib.Runner.Parallel.ready` so that consumers can run them.
360 :param tsk: task instance
361 :type tsk: :py:attr:`waflib.Task.Task`
363 # TODO change in waf 2.1
364 self.ready.put(tsk)
366 def _add_task(self, tsk):
367 if hasattr(tsk, 'semaphore'):
368 sem = tsk.semaphore
369 try:
370 sem.acquire(tsk)
371 except IndexError:
372 sem.waiting.add(tsk)
373 return
375 self.count += 1
376 self.processed += 1
377 if self.numjobs == 1:
378 tsk.log_display(tsk.generator.bld)
379 try:
380 self.process_task(tsk)
381 finally:
382 self.out.put(tsk)
383 else:
384 self.add_task(tsk)
386 def process_task(self, tsk):
388 Processes a task and attempts to stop the build in case of errors
390 tsk.process()
391 if tsk.hasrun != Task.SUCCESS:
392 self.error_handler(tsk)
394 def skip(self, tsk):
396 Mark a task as skipped/up-to-date
398 tsk.hasrun = Task.SKIPPED
399 self.mark_finished(tsk)
401 def cancel(self, tsk):
403 Mark a task as failed because of unsatisfiable dependencies
405 tsk.hasrun = Task.CANCELED
406 self.mark_finished(tsk)
408 def error_handler(self, tsk):
410 Called when a task cannot be executed. The flag :py:attr:`waflib.Runner.Parallel.stop` is set,
411 unless the build is executed with::
413 $ waf build -k
415 :param tsk: task instance
416 :type tsk: :py:attr:`waflib.Task.Task`
418 if not self.bld.keep:
419 self.stop = True
420 self.error.append(tsk)
422 def task_status(self, tsk):
424 Obtains the task status to decide whether to run it immediately or not.
426 :return: the exit status, for example :py:attr:`waflib.Task.ASK_LATER`
427 :rtype: integer
429 try:
430 return tsk.runnable_status()
431 except Exception:
432 self.processed += 1
433 tsk.err_msg = traceback.format_exc()
434 if not self.stop and self.bld.keep:
435 self.skip(tsk)
436 if self.bld.keep == 1:
437 # if -k stop on the first exception, if -kk try to go as far as possible
438 if Logs.verbose > 1 or not self.error:
439 self.error.append(tsk)
440 self.stop = True
441 else:
442 if Logs.verbose > 1:
443 self.error.append(tsk)
444 return Task.EXCEPTION
446 tsk.hasrun = Task.EXCEPTION
447 self.error_handler(tsk)
449 return Task.EXCEPTION
451 def start(self):
453 Obtains Task instances from the BuildContext instance and adds the ones that need to be executed to
454 :py:class:`waflib.Runner.Parallel.ready` so that the :py:class:`waflib.Runner.Spawner` consumer thread
455 has them executed. Obtains the executed Tasks back from :py:class:`waflib.Runner.Parallel.out`
456 and marks the build as failed by setting the ``stop`` flag.
457 If only one job is used, then executes the tasks one by one, without consumers.
459 self.total = self.bld.total()
461 while not self.stop:
463 self.refill_task_list()
465 # consider the next task
466 tsk = self.get_next_task()
467 if not tsk:
468 if self.count:
469 # tasks may add new ones after they are run
470 continue
471 else:
472 # no tasks to run, no tasks running, time to exit
473 break
475 if tsk.hasrun:
476 # if the task is marked as "run", just skip it
477 self.processed += 1
478 continue
480 if self.stop: # stop immediately after a failure is detected
481 break
483 st = self.task_status(tsk)
484 if st == Task.RUN_ME:
485 self._add_task(tsk)
486 elif st == Task.ASK_LATER:
487 self.postpone(tsk)
488 elif st == Task.SKIP_ME:
489 self.processed += 1
490 self.skip(tsk)
491 self.add_more_tasks(tsk)
492 elif st == Task.CANCEL_ME:
493 # A dependency problem has occurred, and the
494 # build is most likely run with `waf -k`
495 if Logs.verbose > 1:
496 self.error.append(tsk)
497 self.processed += 1
498 self.cancel(tsk)
500 # self.count represents the tasks that have been made available to the consumer threads
501 # collect all the tasks after an error else the message may be incomplete
502 while self.error and self.count:
503 self.get_out()
505 self.ready.put(None)
506 if not self.stop:
507 assert not self.count
508 assert not self.postponed
509 assert not self.incomplete
511 def prio_and_split(self, tasks):
513 Label input tasks with priority values, and return a pair containing
514 the tasks that are ready to run and the tasks that are necessarily
515 waiting for other tasks to complete.
517 The priority system is really meant as an optional layer for optimization:
518 dependency cycles are found quickly, and builds should be more efficient.
519 A high priority number means that a task is processed first.
521 This method can be overridden to disable the priority system::
523 def prio_and_split(self, tasks):
524 return tasks, []
526 :return: A pair of task lists
527 :rtype: tuple
529 # to disable:
530 #return tasks, []
531 for x in tasks:
532 x.visited = 0
534 reverse = self.revdeps
536 groups_done = set()
537 for x in tasks:
538 for k in x.run_after:
539 if isinstance(k, Task.TaskGroup):
540 if k not in groups_done:
541 groups_done.add(k)
542 for j in k.prev:
543 reverse[j].add(k)
544 else:
545 reverse[k].add(x)
547 # the priority number is not the tree depth
548 def visit(n):
549 if isinstance(n, Task.TaskGroup):
550 return sum(visit(k) for k in n.next)
552 if n.visited == 0:
553 n.visited = 1
555 if n in reverse:
556 rev = reverse[n]
557 n.prio_order = n.tree_weight + len(rev) + sum(visit(k) for k in rev)
558 else:
559 n.prio_order = n.tree_weight
561 n.visited = 2
562 elif n.visited == 1:
563 raise Errors.WafError('Dependency cycle found!')
564 return n.prio_order
566 for x in tasks:
567 if x.visited != 0:
568 # must visit all to detect cycles
569 continue
570 try:
571 visit(x)
572 except Errors.WafError:
573 self.debug_cycles(tasks, reverse)
575 ready = []
576 waiting = []
577 for x in tasks:
578 for k in x.run_after:
579 if not k.hasrun:
580 waiting.append(x)
581 break
582 else:
583 ready.append(x)
584 return (ready, waiting)
586 def debug_cycles(self, tasks, reverse):
587 tmp = {}
588 for x in tasks:
589 tmp[x] = 0
591 def visit(n, acc):
592 if isinstance(n, Task.TaskGroup):
593 for k in n.next:
594 visit(k, acc)
595 return
596 if tmp[n] == 0:
597 tmp[n] = 1
598 for k in reverse.get(n, []):
599 visit(k, [n] + acc)
600 tmp[n] = 2
601 elif tmp[n] == 1:
602 lst = []
603 for tsk in acc:
604 lst.append(repr(tsk))
605 if tsk is n:
606 # exclude prior nodes, we want the minimum cycle
607 break
608 raise Errors.WafError('Task dependency cycle in "run_after" constraints: %s' % ''.join(lst))
609 for x in tasks:
610 visit(x, [])