dsdb repl_meta_data: Use dsdb_request_add_controls()
[Samba.git] / buildtools / wafadmin / Runner.py
blob94db0fbd1ba4d5f0879ef27da6f7250f36b400b6
1 #!/usr/bin/env python
2 # encoding: utf-8
3 # Thomas Nagy, 2005-2008 (ita)
5 "Execute the tasks"
7 import os, sys, random, time, threading, traceback
8 try: from Queue import Queue
9 except ImportError: from queue import Queue
10 import Build, Utils, Logs, Options
11 from Logs import debug, error
12 from Constants import *
14 GAP = 15
16 run_old = threading.Thread.run
17 def run(*args, **kwargs):
18 try:
19 run_old(*args, **kwargs)
20 except (KeyboardInterrupt, SystemExit):
21 raise
22 except:
23 sys.excepthook(*sys.exc_info())
24 threading.Thread.run = run
26 def process_task(tsk):
28 m = tsk.master
29 if m.stop:
30 m.out.put(tsk)
31 return
33 try:
34 tsk.generator.bld.printout(tsk.display())
35 if tsk.__class__.stat: ret = tsk.__class__.stat(tsk)
36 # actual call to task's run() function
37 else: ret = tsk.call_run()
38 except Exception, e:
39 tsk.err_msg = Utils.ex_stack()
40 tsk.hasrun = EXCEPTION
42 # TODO cleanup
43 m.error_handler(tsk)
44 m.out.put(tsk)
45 return
47 if ret:
48 tsk.err_code = ret
49 tsk.hasrun = CRASHED
50 else:
51 try:
52 tsk.post_run()
53 except Utils.WafError:
54 pass
55 except Exception:
56 tsk.err_msg = Utils.ex_stack()
57 tsk.hasrun = EXCEPTION
58 else:
59 tsk.hasrun = SUCCESS
60 if tsk.hasrun != SUCCESS:
61 m.error_handler(tsk)
63 m.out.put(tsk)
65 class TaskConsumer(threading.Thread):
66 ready = Queue(0)
67 consumers = []
69 def __init__(self):
70 threading.Thread.__init__(self)
71 self.setDaemon(1)
72 self.start()
74 def run(self):
75 try:
76 self.loop()
77 except:
78 pass
80 def loop(self):
81 while 1:
82 tsk = TaskConsumer.ready.get()
83 process_task(tsk)
85 class Parallel(object):
86 """
87 keep the consumer threads busy, and avoid consuming cpu cycles
88 when no more tasks can be added (end of the build, etc)
89 """
90 def __init__(self, bld, j=2):
92 # number of consumers
93 self.numjobs = j
95 self.manager = bld.task_manager
96 self.manager.current_group = 0
98 self.total = self.manager.total()
100 # tasks waiting to be processed - IMPORTANT
101 self.outstanding = []
102 self.maxjobs = MAXJOBS
104 # tasks that are awaiting for another task to complete
105 self.frozen = []
107 # tasks returned by the consumers
108 self.out = Queue(0)
110 self.count = 0 # tasks not in the producer area
112 self.processed = 1 # progress indicator
114 self.stop = False # error condition to stop the build
115 self.error = False # error flag
117 def get_next(self):
118 "override this method to schedule the tasks in a particular order"
119 if not self.outstanding:
120 return None
121 return self.outstanding.pop(0)
123 def postpone(self, tsk):
124 "override this method to schedule the tasks in a particular order"
125 # TODO consider using a deque instead
126 if random.randint(0, 1):
127 self.frozen.insert(0, tsk)
128 else:
129 self.frozen.append(tsk)
131 def refill_task_list(self):
132 "called to set the next group of tasks"
134 while self.count > self.numjobs + GAP or self.count >= self.maxjobs:
135 self.get_out()
137 while not self.outstanding:
138 if self.count:
139 self.get_out()
141 if self.frozen:
142 self.outstanding += self.frozen
143 self.frozen = []
144 elif not self.count:
145 (jobs, tmp) = self.manager.get_next_set()
146 if jobs != None: self.maxjobs = jobs
147 if tmp: self.outstanding += tmp
148 break
150 def get_out(self):
151 "the tasks that are put to execute are all collected using get_out"
152 ret = self.out.get()
153 self.manager.add_finished(ret)
154 if not self.stop and getattr(ret, 'more_tasks', None):
155 self.outstanding += ret.more_tasks
156 self.total += len(ret.more_tasks)
157 self.count -= 1
159 def error_handler(self, tsk):
160 "by default, errors make the build stop (not thread safe so be careful)"
161 if not Options.options.keep:
162 self.stop = True
163 self.error = True
165 def start(self):
166 "execute the tasks"
168 if TaskConsumer.consumers:
169 # the worker pool is usually loaded lazily (see below)
170 # in case it is re-used with a different value of numjobs:
171 while len(TaskConsumer.consumers) < self.numjobs:
172 TaskConsumer.consumers.append(TaskConsumer())
174 while not self.stop:
176 self.refill_task_list()
178 # consider the next task
179 tsk = self.get_next()
180 if not tsk:
181 if self.count:
182 # tasks may add new ones after they are run
183 continue
184 else:
185 # no tasks to run, no tasks running, time to exit
186 break
188 if tsk.hasrun:
189 # if the task is marked as "run", just skip it
190 self.processed += 1
191 self.manager.add_finished(tsk)
192 continue
194 try:
195 st = tsk.runnable_status()
196 except Exception, e:
197 self.processed += 1
198 if self.stop and not Options.options.keep:
199 tsk.hasrun = SKIPPED
200 self.manager.add_finished(tsk)
201 continue
202 self.error_handler(tsk)
203 self.manager.add_finished(tsk)
204 tsk.hasrun = EXCEPTION
205 tsk.err_msg = Utils.ex_stack()
206 continue
208 if st == ASK_LATER:
209 self.postpone(tsk)
210 elif st == SKIP_ME:
211 self.processed += 1
212 tsk.hasrun = SKIPPED
213 self.manager.add_finished(tsk)
214 else:
215 # run me: put the task in ready queue
216 tsk.position = (self.processed, self.total)
217 self.count += 1
218 tsk.master = self
219 self.processed += 1
221 if self.numjobs == 1:
222 process_task(tsk)
223 else:
224 TaskConsumer.ready.put(tsk)
225 # create the consumer threads only if there is something to consume
226 if not TaskConsumer.consumers:
227 TaskConsumer.consumers = [TaskConsumer() for i in xrange(self.numjobs)]
229 # self.count represents the tasks that have been made available to the consumer threads
230 # collect all the tasks after an error else the message may be incomplete
231 while self.error and self.count:
232 self.get_out()
234 #print loop
235 assert (self.count == 0 or self.stop)