3 # Thomas Nagy, 2005-2008 (ita)
5 # this replaces the core of Runner.py in waf with a varient that works
6 # on systems with completely broken threading (such as Python 2.5.x on
7 # AIX). For simplicity we enable this when JOBS=1, which is triggered
8 # by the compatibility makefile used for the waf build. That also ensures
9 # this code is tested, as it means it is used in the build farm, and by
10 # anyone using 'make' to build Samba with waf
14 import sys
, random
, time
, threading
, traceback
, os
15 try: from Queue
import Queue
16 except ImportError: from queue
import Queue
17 import Build
, Utils
, Logs
, Options
18 from Logs
import debug
, error
19 from Constants
import *
23 run_old
= threading
.Thread
.run
24 def run(*args
, **kwargs
):
26 run_old(*args
, **kwargs
)
27 except (KeyboardInterrupt, SystemExit):
30 sys
.excepthook(*sys
.exc_info())
31 threading
.Thread
.run
= run
34 class TaskConsumer(object):
44 tsk
.generator
.bld
.printout(tsk
.display())
45 if tsk
.__class
__.stat
: ret
= tsk
.__class
__.stat(tsk
)
46 # actual call to task's run() function
47 else: ret
= tsk
.call_run()
49 tsk
.err_msg
= Utils
.ex_stack()
50 tsk
.hasrun
= EXCEPTION
63 except Utils
.WafError
:
66 tsk
.err_msg
= Utils
.ex_stack()
67 tsk
.hasrun
= EXCEPTION
70 if tsk
.hasrun
!= SUCCESS
:
75 class Parallel(object):
77 keep the consumer threads busy, and avoid consuming cpu cycles
78 when no more tasks can be added (end of the build, etc)
80 def __init__(self
, bld
, j
=2):
85 self
.manager
= bld
.task_manager
86 self
.manager
.current_group
= 0
88 self
.total
= self
.manager
.total()
90 # tasks waiting to be processed - IMPORTANT
92 self
.maxjobs
= MAXJOBS
94 # tasks that are awaiting for another task to complete
97 # tasks returned by the consumers
100 self
.count
= 0 # tasks not in the producer area
102 self
.processed
= 1 # progress indicator
104 self
.stop
= False # error condition to stop the build
105 self
.error
= False # error flag
108 "override this method to schedule the tasks in a particular order"
109 if not self
.outstanding
:
111 return self
.outstanding
.pop(0)
113 def postpone(self
, tsk
):
114 "override this method to schedule the tasks in a particular order"
115 # TODO consider using a deque instead
116 if random
.randint(0, 1):
117 self
.frozen
.insert(0, tsk
)
119 self
.frozen
.append(tsk
)
121 def refill_task_list(self
):
122 "called to set the next group of tasks"
124 while self
.count
> self
.numjobs
+ GAP
or self
.count
>= self
.maxjobs
:
127 while not self
.outstanding
:
132 self
.outstanding
+= self
.frozen
135 (jobs
, tmp
) = self
.manager
.get_next_set()
136 if jobs
!= None: self
.maxjobs
= jobs
137 if tmp
: self
.outstanding
+= tmp
141 "the tasks that are put to execute are all collected using get_out"
143 self
.manager
.add_finished(ret
)
144 if not self
.stop
and getattr(ret
, 'more_tasks', None):
145 self
.outstanding
+= ret
.more_tasks
146 self
.total
+= len(ret
.more_tasks
)
149 def error_handler(self
, tsk
):
150 "by default, errors make the build stop (not thread safe so be careful)"
151 if not Options
.options
.keep
:
160 self
.refill_task_list()
162 # consider the next task
163 tsk
= self
.get_next()
166 # tasks may add new ones after they are run
169 # no tasks to run, no tasks running, time to exit
173 # if the task is marked as "run", just skip it
175 self
.manager
.add_finished(tsk
)
179 st
= tsk
.runnable_status()
182 if self
.stop
and not Options
.options
.keep
:
184 self
.manager
.add_finished(tsk
)
186 self
.error_handler(tsk
)
187 self
.manager
.add_finished(tsk
)
188 tsk
.hasrun
= EXCEPTION
189 tsk
.err_msg
= Utils
.ex_stack()
197 self
.manager
.add_finished(tsk
)
199 # run me: put the task in ready queue
200 tsk
.position
= (self
.processed
, self
.total
)
207 # self.count represents the tasks that have been made available to the consumer threads
208 # collect all the tasks after an error else the message may be incomplete
209 while self
.error
and self
.count
:
213 assert (self
.count
== 0 or self
.stop
)
218 Runner
.process
= process
219 Runner
.Parallel
= Parallel