Workaround for temp files reuse
[ci.git] / hbuild / scheduler.py
blobfff2321e48462ed19c48ebb499d7fc2c2c6702a9
1 #!/usr/bin/env python3
4 # Copyright (c) 2017 Vojtech Horky
5 # All rights reserved.
7 # Redistribution and use in source and binary forms, with or without
8 # modification, are permitted provided that the following conditions
9 # are met:
11 # - Redistributions of source code must retain the above copyright
12 # notice, this list of conditions and the following disclaimer.
13 # - Redistributions in binary form must reproduce the above copyright
14 # notice, this list of conditions and the following disclaimer in the
15 # documentation and/or other materials provided with the distribution.
16 # - The name of the author may not be used to endorse or promote products
17 # derived from this software without specific prior written permission.
19 # THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
20 # IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
21 # OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
22 # IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
23 # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
24 # NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
25 # DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
26 # THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
27 # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
28 # THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 import subprocess
32 import os
33 import time
34 import datetime
35 import shutil
36 import sys
38 from threading import Lock, Condition
40 class Task:
41 def __init__(self, report_tag, **report_args):
42 self.report = {
43 'name': report_tag,
44 'result': 'unknown',
45 'attrs': {},
47 for k in report_args:
48 self.report['attrs'][k] = report_args[k]
49 self.ctl = None
51 def execute(self):
52 start_time = time.time()
53 try:
54 res = self.run()
55 if res == False:
56 raise Exception('run() returned False')
57 self.report['result'] = 'ok'
59 self.report['files'] = self.ctl.get_files()
61 end_time = time.time()
62 self.report['attrs']['duration'] = (end_time - start_time) * 1000
64 if res is None:
65 res = {}
67 self.ctl.done()
69 return {
70 'status': 'ok',
71 'data': res
73 except Exception as e:
74 end_time = time.time()
75 self.report['attrs']['duration'] = (end_time - start_time) * 1000
77 self.report['result'] = 'fail'
78 self.report['files'] = self.ctl.get_files()
79 self.ctl.done()
80 raise e
82 def run(self):
83 pass
85 def get_report(self):
86 return self.report
89 class TaskException(Exception):
90 def __init__(self, msg):
91 Exception.__init__(self, msg)
93 class RunCommandException(TaskException):
94 def __init__(self, msg, rc, output):
95 self.rc = rc
96 self.output = output
97 Exception.__init__(self, msg)
99 class TaskController:
100 def __init__(self, name, data, build_directory, artefact_directory, printer, kept_log_lines, print_debug = False):
101 self.name = name
102 self.data = data
103 self.files = []
104 self.log = None
105 self.log_tail = []
106 self.build_directory = build_directory
107 self.artefact_directory = artefact_directory
108 self.printer = printer
109 self.kept_log_lines = kept_log_lines
110 self.print_debug_messages = print_debug
112 def derive(self, name, data):
113 return TaskController(name, data, self.build_directory, self.artefact_directory,
114 self.printer, self.kept_log_lines, self.print_debug_messages)
116 def dprint(self, str, *args):
117 if self.print_debug_messages:
118 self.printer.print_debug(self.name, str % args)
121 def get_dependency_data(self, dep, key=None):
122 if key is None:
123 return self.get_data(dep)
124 return self.data[dep][key]
126 def get_data(self, key):
127 for dep in self.data:
128 if key in self.data[dep]:
129 return self.data[dep][key]
130 raise TaskException("WARN: unknown key %s" % key)
132 def run_command(self, cmd, cwd=None, needs_output=False):
133 self.dprint("Running `%s'..." % ' '.join(cmd))
134 output = []
135 last_line = ""
136 rc = 0
138 # FIXME: can we keep stdout and stderr separated?
139 with subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, cwd=cwd) as proc:
140 for line in proc.stdout:
141 line = line.decode('utf-8').strip('\n')
142 self.append_line_to_log_file(line)
143 if needs_output:
144 output.append(line)
145 last_line = line
146 proc.wait()
147 rc = proc.returncode
148 if rc != 0:
149 #self.dprint('stderr=%s' % stderr.strip('\n'))
151 raise RunCommandException(
152 "`%s' failed: %s" % (' '.join(cmd), last_line),
153 rc, output)
155 return {
156 'output': output,
157 'stdout': '\n'.join(output),
158 'stderr': '\n'.join(output),
159 'rc': rc,
160 'failed': not rc == 0
163 def make_temp_dir(self, name):
164 dname = '%s/%s' % ( self.build_directory, name )
165 os.makedirs(dname, exist_ok=True)
166 return os.path.abspath(dname)
168 def recursive_copy(self, src_dir, dest_dir):
169 os.makedirs(dest_dir, exist_ok=True)
170 self.run_command([ 'rsync', '-a', src_dir + '/', dest_dir ])
172 def remove_silently(self, path):
173 try:
174 os.remove(path)
175 except OSError as ex:
176 pass
178 def remove_silently_recursive(self, path):
179 try:
180 shutil.rmtree(path, True)
181 except OSError as ex:
182 pass
184 def set_log_file(self, log_filename):
185 # TODO: open the log lazily
186 # TODO: propagate the information to XML report
187 self.log = self.open_downloadable_file('logs/' + log_filename, 'w')
189 def append_line_to_log_file(self, line):
190 if not self.log is None:
191 self.log.write(line + '\n')
192 self.log_tail.append(line)
193 self.log_tail = self.log_tail[-self.kept_log_lines:]
195 def get_artefact_absolute_path(self, relative_name, create_dirs=False):
196 base = os.path.dirname(relative_name)
197 name = os.path.basename(relative_name)
198 dname = '%s/%s/' % ( self.artefact_directory, base )
200 if create_dirs:
201 os.makedirs(dname, exist_ok=True)
203 return os.path.abspath(dname + name)
205 def add_downloadable_file(self, title, download_name, current_filename):
206 self.dprint("Downloadable `%s' at %s", title, download_name)
208 target = self.get_artefact_absolute_path(download_name, True)
209 shutil.copy(current_filename, target)
211 self.files.append({
212 'filename' : download_name,
213 'title' : title
216 return target
218 def move_dir_to_downloadable(self, title, download_name, current_dirname):
219 self.dprint("Downloadable `%s' at %s", title, download_name)
221 target = self.get_artefact_absolute_path(download_name, True)
222 shutil.move(current_dirname, target)
224 self.files.append({
225 'filename' : download_name,
226 'title' : title
229 return target
231 def open_downloadable_file(self, download_name, mode):
232 return open(self.get_artefact_absolute_path(download_name, True), mode)
234 def done(self):
235 if not self.log is None:
236 self.log.close()
238 def get_files(self):
239 return self.files
241 def ret(self, *args, **kwargs):
242 status = 'ok'
243 if len(args) == 1:
244 status = args[0]
245 if status == True:
246 status = 'ok'
247 elif status == False:
248 status = 'fail'
249 result = {
250 'status': status,
251 'data': {}
254 for k in kwargs:
255 result['data'][k] = kwargs[k]
257 return result
261 class TaskWrapper:
262 def __init__(self, id, task, description, deps, mutexes):
263 self.id = id
264 self.dependencies = deps
265 self.description = description
266 self.task = task
267 self.status = 'n/a'
268 self.completed = False
269 self.data = {}
270 self.lock = Lock()
271 self.mutexes = mutexes
273 def has_completed_okay(self):
274 with self.lock:
275 return self.completed and (self.status == 'ok')
277 def has_finished(self):
278 with self.lock:
279 return self.completed
281 def get_status(self):
282 with self.lock:
283 return self.status
285 def get_data(self):
286 with self.lock:
287 return self.data
289 def set_status(self, status, reason):
290 with self.lock:
291 self.status = status
292 self.reason = reason
294 def set_done(self, data):
295 with self.lock:
296 self.data = data
297 self.completed = True
299 def set_skipped(self, reason):
300 self.set_status('skip', reason)
303 class BuildScheduler:
304 def __init__(self, max_workers, build, artefact, build_id, printer, inline_log_lines = 10, debug = False):
305 self.config = {
306 'build-directory': build,
307 'artefact-directory': artefact,
310 self.printer = printer
312 self.start_timestamp = time.time()
313 self.start_date = datetime.datetime.now(datetime.timezone.utc).replace(microsecond=0).astimezone().isoformat(' ')
315 # Parent task controller
316 self.ctl = TaskController('scheduler', {}, build, artefact, self.printer, inline_log_lines, debug)
318 # Start the log file
319 self.report_file = self.ctl.open_downloadable_file('report.xml', 'w')
320 self.report_file.write("<?xml version=\"1.0\"?>\n")
321 self.report_file.write("<build number=\"%s\">\n" % build_id)
323 # The following attributes (up to self.guard declaration) are guarded
324 # by the self.guard mutex and use self.cond to notify about changes
325 # in any of them.
326 # Lower granularity of the locking would be possible but would
327 # complicate too much the conditions inside queue processing where we
328 # need to react to multiple events (new task added vs some task
329 # terminated vs selecting the right task to be executed).
331 # Known tasks (regardless of their state). The mapping is between
332 # a task id and TaskWrapper class.
333 self.tasks = {}
335 # Queue of tasks not yet run (uses task ids only). We insert mutex
336 # tasks at queue beginning (instead of appending) as a heuristic to
337 # prevent accumulation of mutex tasks at the end of run where it could
338 # hurt concurrent execution.
339 self.queue = []
341 # Number of currently running (executing) tasks. Used solely to
342 # control number of concurrently running tasks.
343 self.running_tasks_count = 0
345 # Flag for the queue processing whether to terminate the loop to allow
346 # clean termination of the executor.
347 self.terminate = False
349 # Here we record which mutexes are held by executing tasks. Mutexes are
350 # identified by their (string) name that is used as index. When the
351 # value is True, the mutex is held (i.e. do not run any other task
352 # claming the same mutex), mutex is not held when the value is False
353 # or when the key is not present at all.
354 self.task_mutexes = {}
356 # Condition variable guarding the above attributes.
357 # We initialize CV only without attaching a lock as it creates one
358 # automatically and CV serves as a lock too.
360 # Always use notify_all as we are waiting in multiple functions
361 # for it (e.g. while processing the queue or in barrier).
362 self.guard = Condition()
364 # Lock guarding output synchronization
365 self.output_lock = Lock()
367 # Executor for running of individual tasks
368 from concurrent.futures import ThreadPoolExecutor
369 self.max_workers = max_workers
370 self.executor = ThreadPoolExecutor(max_workers=max_workers + 2)
372 # Start the queue processor
373 self.executor.submit(BuildScheduler.process_queue_wrapper, self)
375 def process_queue_wrapper(self):
377 To allow debugging of the queue processor.
379 try:
380 self.process_queue()
381 except:
382 import traceback
383 traceback.print_exc()
385 def submit(self, description, task_id, task, deps = [], mutexes = []):
386 with self.guard:
387 #print("Submitting {} ({}, {}, {})".format(description, task_id, deps, mutexes))
388 # Check that dependencies are known
389 for d in deps:
390 if not d in self.tasks:
391 raise Exception('Dependency %s is not known.' % d)
392 # Add the wrapper
393 wrapper = TaskWrapper(task_id, task, description, deps, mutexes)
394 self.tasks[task_id] = wrapper
396 # Append to the queue
397 # We use a simple heuristic: if the task has no mutexes, we
398 # append to the end of the queue. Otherwise we prioritize the
399 # task a little bit to prevent ending with serialized execution
400 # of the mutually excluded tasks. (We add before first non-mutexed
401 # task.)
402 if len(mutexes) > 0:
403 new_queue = []
404 inserted = False
405 for q in self.queue:
406 if (len(self.tasks[q].mutexes) == 0) and (not inserted):
407 new_queue.append(task_id)
408 inserted = True
409 new_queue.append(q)
410 if not inserted:
411 new_queue.append(task_id)
412 self.queue = new_queue
413 else:
414 self.queue.append(task_id)
416 self.guard.notify_all()
418 def task_run_wrapper(self, wrapper, task_id, can_be_run):
419 try:
420 self.task_run_inner(wrapper, task_id, can_be_run)
421 except:
422 import traceback
423 traceback.print_exc()
425 def xml_escape_line(self, s):
426 from xml.sax.saxutils import escape
427 import re
429 s_without_ctrl = re.sub(r'[\x00-\x08\x0A-\x1F]', '', s)
430 s_escaped = escape(s_without_ctrl)
431 s_all_entities_encoded = s_escaped.encode('ascii', 'xmlcharrefreplace')
433 return s_all_entities_encoded.decode('utf8')
435 def task_run_inner(self, wrapper, task_id, can_be_run):
436 data = {}
438 if can_be_run:
439 for task_dep_id in wrapper.dependencies:
440 task_dep = self.tasks[task_dep_id]
441 data[task_dep_id] = task_dep.get_data()
443 wrapper.task.ctl = self.ctl.derive(task_id, data)
444 wrapper.task.ctl.set_log_file('%s.log' % task_id)
446 if can_be_run:
447 self.announce_task_started_(wrapper)
449 try:
450 res = wrapper.task.execute()
451 if (res == True) or (res is None):
452 res = {
453 'status': 'ok',
454 'data': {}
456 elif res == False:
457 res = {
458 'status': 'fail',
459 'data': {}
461 reason = None
462 except Exception as e:
463 import traceback
464 res = {
465 'status': 'fail',
466 'data': {}
468 #traceback.print_exc()
469 reason = '%s' % e
470 else:
471 for task_dep_id in wrapper.dependencies:
472 task_dep = self.tasks[task_dep_id]
473 if task_dep.has_finished() and (not task_dep.has_completed_okay()):
474 reason = 'dependency %s failed (or also skipped).' % task_dep_id
475 res = {
476 'status': 'skip',
477 'data': {}
479 wrapper.task.ctl.append_line_to_log_file('Skipped: %s' % reason)
481 status = res['status']
482 report = wrapper.task.get_report()
484 if (not report['name'] is None) and (not self.report_file is None):
485 report_xml = '<' + report['name']
486 report['attrs']['result'] = status
487 for key in report['attrs']:
488 report_xml = report_xml + ' %s="%s"' % (key, report['attrs'][key] )
489 report_xml = report_xml + ' log="logs/%s.log"' % wrapper.id
490 report_xml = report_xml + ">\n"
492 if 'files' in report:
493 for f in report['files']:
494 file = '<file title="%s" filename="%s" />\n' % ( f['title'], f['filename'])
495 report_xml = report_xml + file
497 if (not wrapper.task.ctl is None) and (len(wrapper.task.ctl.log_tail) > 0):
498 report_xml = report_xml + ' <log>\n'
499 for line in wrapper.task.ctl.log_tail:
500 report_xml = report_xml + ' <logline>' + self.xml_escape_line(line) + '</logline>\n'
501 report_xml = report_xml + ' </log>\n'
503 report_xml = report_xml + '</' + report['name'] + ">\n"
506 self.report_file.write(report_xml)
508 wrapper.set_status(status, reason)
509 self.announce_task_finished_(wrapper)
510 wrapper.set_done(res['data'])
512 with self.guard:
513 self.running_tasks_count = self.running_tasks_count - 1
515 if can_be_run:
516 for m in wrapper.mutexes:
517 self.task_mutexes [ m ] = False
519 #print("Task finished, waking up (running now {})".format(self.running_tasks_count))
520 self.guard.notify_all()
523 def process_queue(self):
524 while True:
525 with self.guard:
526 #print("Process queue running, tasks {}".format(len(self.queue)))
527 # Break inside the loop
528 while True:
529 slot_available = self.running_tasks_count < self.max_workers
530 task_available = len(self.queue) > 0
531 #print("Queue: {} (running {})".format(len(self.queue), self.running_tasks_count))
532 if slot_available and task_available:
533 break
534 if self.terminate and (not task_available):
535 return
537 #print("Queue waiting for free slots (running {}) or tasks (have {})".format(self.running_tasks_count, len(self.queue)))
538 self.guard.wait()
539 #print("Guard woken-up after waiting for free slots.")
541 # We have some tasks in the queue and we can run at
542 # least one of them
543 ( ready_task_id, can_be_run ) = self.get_first_ready_task_id_()
544 #print("Ready task is {}".format(ready_task_id))
546 if ready_task_id is None:
547 #print("Queue waiting for new tasks to appear (have {})".format(len(self.queue)))
548 self.guard.wait()
549 #print("Guard woken-up after no ready task.")
550 else:
551 # Remove the task from the queue
552 self.queue.remove(ready_task_id)
554 ready_task = self.tasks[ready_task_id]
556 # Need to update number of running tasks here and now
557 # because the executor might start the execution later
558 # and we would evaluate incorrectly the condition above
559 # that we can start another task.
560 self.running_tasks_count = self.running_tasks_count + 1
562 #print("Ready is {}".format(ready_task))
563 if can_be_run:
564 for m in ready_task.mutexes:
565 self.task_mutexes [ m ] = True
566 #print("Actually starting task {}".format(ready_task_id))
567 self.executor.submit(BuildScheduler.task_run_wrapper,
568 self, ready_task, ready_task_id, can_be_run)
570 def get_first_ready_task_id_(self):
572 Return tuple of first task that can be run (or failed immediately)
573 with note whether the result is predetermined.
574 Returns None when no task can be run.
576 # We assume self.guard was already acquired
577 # We use here the for ... else construct of Python (recall that else
578 # is taken when break is not used)
579 for task_id in self.queue:
580 task = self.tasks[task_id]
581 for task_dep_id in task.dependencies:
582 task_dep = self.tasks[ task_dep_id ]
583 if not task_dep.has_finished():
584 break
585 # Failed dependency means we can return now
586 if task_dep.get_status() != 'ok':
587 return ( task_id, False )
588 else:
589 for task_mutex in task.mutexes:
590 if (task_mutex in self.task_mutexes) and self.task_mutexes[ task_mutex ]:
591 break
592 else:
593 return ( task_id, True )
594 return ( None, None )
596 def announce_task_started_(self, task):
597 self.printer.print_starting(task.description + " ...")
599 def announce_task_finished_(self, task):
600 description = task.description
601 if task.status == 'ok':
602 msg = 'done'
603 msg_color = self.printer.GREEN
604 description = description + '.'
605 elif task.status == 'skip':
606 msg = 'skip'
607 msg_color = self.printer.CYAN
608 description = description + ': ' + task.reason
609 else:
610 msg = 'fail'
611 msg_color = self.printer.RED
612 description = description + ': ' + task.reason
614 self.printer.print_finished(msg_color, msg, description)
617 def barrier(self):
618 with self.guard:
619 #print("Barrier ({}, {})...".format(self.running_tasks_count, len(self.queue)))
620 while (self.running_tasks_count > 0) or (len(self.queue) > 0):
621 #print("Barrier waiting ({}, {})...".format(self.running_tasks_count, len(self.queue)))
622 self.guard.wait()
624 def done(self):
625 with self.guard:
626 self.terminate = True
627 self.guard.notify_all()
628 self.barrier()
629 self.close_report()
630 self.executor.shutdown(True)
632 def close_report(self):
633 if not self.report_file is None:
634 end_time = time.time()
635 self.report_file.write("<buildinfo started=\"{}\" duration=\"{}\" parallelism=\"{}\" />\n".format(
636 self.start_date, ( end_time - self.start_timestamp ) * 1000,
637 self.max_workers
639 self.report_file.write("</build>\n")
640 self.report_file.close()
641 self.report_file = None