3 # Thomas Nagy, 2005-2008 (ita)
7 import os
, sys
, random
, time
, threading
, traceback
8 try: from Queue
import Queue
9 except ImportError: from queue
import Queue
10 import Build
, Utils
, Logs
, Options
11 from Logs
import debug
, error
12 from Constants
import *
16 run_old
= threading
.Thread
.run
17 def run(*args
, **kwargs
):
19 run_old(*args
, **kwargs
)
20 except (KeyboardInterrupt, SystemExit):
23 sys
.excepthook(*sys
.exc_info())
24 threading
.Thread
.run
= run
26 def process_task(tsk
):
34 tsk
.generator
.bld
.printout(tsk
.display())
35 if tsk
.__class
__.stat
: ret
= tsk
.__class
__.stat(tsk
)
36 # actual call to task's run() function
37 else: ret
= tsk
.call_run()
39 tsk
.err_msg
= Utils
.ex_stack()
40 tsk
.hasrun
= EXCEPTION
53 except Utils
.WafError
:
56 tsk
.err_msg
= Utils
.ex_stack()
57 tsk
.hasrun
= EXCEPTION
60 if tsk
.hasrun
!= SUCCESS
:
65 class TaskConsumer(threading
.Thread
):
70 threading
.Thread
.__init
__(self
)
82 tsk
= TaskConsumer
.ready
.get()
85 class Parallel(object):
87 keep the consumer threads busy, and avoid consuming cpu cycles
88 when no more tasks can be added (end of the build, etc)
90 def __init__(self
, bld
, j
=2):
95 self
.manager
= bld
.task_manager
96 self
.manager
.current_group
= 0
98 self
.total
= self
.manager
.total()
100 # tasks waiting to be processed - IMPORTANT
101 self
.outstanding
= []
102 self
.maxjobs
= MAXJOBS
104 # tasks that are awaiting for another task to complete
107 # tasks returned by the consumers
110 self
.count
= 0 # tasks not in the producer area
112 self
.processed
= 1 # progress indicator
114 self
.stop
= False # error condition to stop the build
115 self
.error
= False # error flag
118 "override this method to schedule the tasks in a particular order"
119 if not self
.outstanding
:
121 return self
.outstanding
.pop(0)
123 def postpone(self
, tsk
):
124 "override this method to schedule the tasks in a particular order"
125 # TODO consider using a deque instead
126 if random
.randint(0, 1):
127 self
.frozen
.insert(0, tsk
)
129 self
.frozen
.append(tsk
)
131 def refill_task_list(self
):
132 "called to set the next group of tasks"
134 while self
.count
> self
.numjobs
+ GAP
or self
.count
>= self
.maxjobs
:
137 while not self
.outstanding
:
142 self
.outstanding
+= self
.frozen
145 (jobs
, tmp
) = self
.manager
.get_next_set()
146 if jobs
!= None: self
.maxjobs
= jobs
147 if tmp
: self
.outstanding
+= tmp
151 "the tasks that are put to execute are all collected using get_out"
153 self
.manager
.add_finished(ret
)
154 if not self
.stop
and getattr(ret
, 'more_tasks', None):
155 self
.outstanding
+= ret
.more_tasks
156 self
.total
+= len(ret
.more_tasks
)
159 def error_handler(self
, tsk
):
160 "by default, errors make the build stop (not thread safe so be careful)"
161 if not Options
.options
.keep
:
168 if TaskConsumer
.consumers
:
169 # the worker pool is usually loaded lazily (see below)
170 # in case it is re-used with a different value of numjobs:
171 while len(TaskConsumer
.consumers
) < self
.numjobs
:
172 TaskConsumer
.consumers
.append(TaskConsumer())
176 self
.refill_task_list()
178 # consider the next task
179 tsk
= self
.get_next()
182 # tasks may add new ones after they are run
185 # no tasks to run, no tasks running, time to exit
189 # if the task is marked as "run", just skip it
191 self
.manager
.add_finished(tsk
)
195 st
= tsk
.runnable_status()
198 if self
.stop
and not Options
.options
.keep
:
200 self
.manager
.add_finished(tsk
)
202 self
.error_handler(tsk
)
203 self
.manager
.add_finished(tsk
)
204 tsk
.hasrun
= EXCEPTION
205 tsk
.err_msg
= Utils
.ex_stack()
213 self
.manager
.add_finished(tsk
)
215 # run me: put the task in ready queue
216 tsk
.position
= (self
.processed
, self
.total
)
221 if self
.numjobs
== 1:
224 TaskConsumer
.ready
.put(tsk
)
225 # create the consumer threads only if there is something to consume
226 if not TaskConsumer
.consumers
:
227 TaskConsumer
.consumers
= [TaskConsumer() for i
in xrange(self
.numjobs
)]
229 # self.count represents the tasks that have been made available to the consumer threads
230 # collect all the tasks after an error else the message may be incomplete
231 while self
.error
and self
.count
:
235 assert (self
.count
== 0 or self
.stop
)