s3:lib/events: make use of tevent_common_loop_timer_delay()
[Samba/gebeck_regimport.git] / buildtools / wafsamba / nothreads.py
blob075dcd326b78ea93588e1c0ac8f5dded6b634760
1 # encoding: utf-8
2 # Thomas Nagy, 2005-2008 (ita)
4 # this replaces the core of Runner.py in waf with a varient that works
5 # on systems with completely broken threading (such as Python 2.5.x on
6 # AIX). For simplicity we enable this when JOBS=1, which is triggered
7 # by the compatibility makefile used for the waf build. That also ensures
8 # this code is tested, as it means it is used in the build farm, and by
9 # anyone using 'make' to build Samba with waf
11 "Execute the tasks"
13 import sys, random, time, threading, traceback, os
14 try: from Queue import Queue
15 except ImportError: from queue import Queue
16 import Build, Utils, Logs, Options
17 from Logs import debug, error
18 from Constants import *
20 GAP = 15
22 run_old = threading.Thread.run
23 def run(*args, **kwargs):
24 try:
25 run_old(*args, **kwargs)
26 except (KeyboardInterrupt, SystemExit):
27 raise
28 except:
29 sys.excepthook(*sys.exc_info())
30 threading.Thread.run = run
33 class TaskConsumer(object):
34 consumers = 1
36 def process(tsk):
37 m = tsk.master
38 if m.stop:
39 m.out.put(tsk)
40 return
42 try:
43 tsk.generator.bld.printout(tsk.display())
44 if tsk.__class__.stat: ret = tsk.__class__.stat(tsk)
45 # actual call to task's run() function
46 else: ret = tsk.call_run()
47 except Exception, e:
48 tsk.err_msg = Utils.ex_stack()
49 tsk.hasrun = EXCEPTION
51 # TODO cleanup
52 m.error_handler(tsk)
53 m.out.put(tsk)
54 return
56 if ret:
57 tsk.err_code = ret
58 tsk.hasrun = CRASHED
59 else:
60 try:
61 tsk.post_run()
62 except Utils.WafError:
63 pass
64 except Exception:
65 tsk.err_msg = Utils.ex_stack()
66 tsk.hasrun = EXCEPTION
67 else:
68 tsk.hasrun = SUCCESS
69 if tsk.hasrun != SUCCESS:
70 m.error_handler(tsk)
72 m.out.put(tsk)
74 class Parallel(object):
75 """
76 keep the consumer threads busy, and avoid consuming cpu cycles
77 when no more tasks can be added (end of the build, etc)
78 """
79 def __init__(self, bld, j=2):
81 # number of consumers
82 self.numjobs = j
84 self.manager = bld.task_manager
85 self.manager.current_group = 0
87 self.total = self.manager.total()
89 # tasks waiting to be processed - IMPORTANT
90 self.outstanding = []
91 self.maxjobs = MAXJOBS
93 # tasks that are awaiting for another task to complete
94 self.frozen = []
96 # tasks returned by the consumers
97 self.out = Queue(0)
99 self.count = 0 # tasks not in the producer area
101 self.processed = 1 # progress indicator
103 self.stop = False # error condition to stop the build
104 self.error = False # error flag
106 def get_next(self):
107 "override this method to schedule the tasks in a particular order"
108 if not self.outstanding:
109 return None
110 return self.outstanding.pop(0)
112 def postpone(self, tsk):
113 "override this method to schedule the tasks in a particular order"
114 # TODO consider using a deque instead
115 if random.randint(0, 1):
116 self.frozen.insert(0, tsk)
117 else:
118 self.frozen.append(tsk)
120 def refill_task_list(self):
121 "called to set the next group of tasks"
123 while self.count > self.numjobs + GAP or self.count >= self.maxjobs:
124 self.get_out()
126 while not self.outstanding:
127 if self.count:
128 self.get_out()
130 if self.frozen:
131 self.outstanding += self.frozen
132 self.frozen = []
133 elif not self.count:
134 (jobs, tmp) = self.manager.get_next_set()
135 if jobs is not None:
136 self.maxjobs = jobs
137 if tmp:
138 self.outstanding += tmp
139 break
141 def get_out(self):
142 "the tasks that are put to execute are all collected using get_out"
143 ret = self.out.get()
144 self.manager.add_finished(ret)
145 if not self.stop and getattr(ret, 'more_tasks', None):
146 self.outstanding += ret.more_tasks
147 self.total += len(ret.more_tasks)
148 self.count -= 1
150 def error_handler(self, tsk):
151 "by default, errors make the build stop (not thread safe so be careful)"
152 if not Options.options.keep:
153 self.stop = True
154 self.error = True
156 def start(self):
157 "execute the tasks"
159 while not self.stop:
161 self.refill_task_list()
163 # consider the next task
164 tsk = self.get_next()
165 if not tsk:
166 if self.count:
167 # tasks may add new ones after they are run
168 continue
169 else:
170 # no tasks to run, no tasks running, time to exit
171 break
173 if tsk.hasrun:
174 # if the task is marked as "run", just skip it
175 self.processed += 1
176 self.manager.add_finished(tsk)
177 continue
179 try:
180 st = tsk.runnable_status()
181 except Exception, e:
182 self.processed += 1
183 if self.stop and not Options.options.keep:
184 tsk.hasrun = SKIPPED
185 self.manager.add_finished(tsk)
186 continue
187 self.error_handler(tsk)
188 self.manager.add_finished(tsk)
189 tsk.hasrun = EXCEPTION
190 tsk.err_msg = Utils.ex_stack()
191 continue
193 if st == ASK_LATER:
194 self.postpone(tsk)
195 elif st == SKIP_ME:
196 self.processed += 1
197 tsk.hasrun = SKIPPED
198 self.manager.add_finished(tsk)
199 else:
200 # run me: put the task in ready queue
201 tsk.position = (self.processed, self.total)
202 self.count += 1
203 self.processed += 1
204 tsk.master = self
206 process(tsk)
208 # self.count represents the tasks that have been made available to the consumer threads
209 # collect all the tasks after an error else the message may be incomplete
210 while self.error and self.count:
211 self.get_out()
213 #print loop
214 assert (self.count == 0 or self.stop)
217 # enable nothreads
218 import Runner
219 Runner.process = process
220 Runner.Parallel = Parallel