Refactoring: better method name
[ci.git] / hbuild / scheduler.py
blobae78612798163d6afe454d6aca2738c272a3a145
1 #!/usr/bin/env python3
4 # Copyright (c) 2017 Vojtech Horky
5 # All rights reserved.
7 # Redistribution and use in source and binary forms, with or without
8 # modification, are permitted provided that the following conditions
9 # are met:
11 # - Redistributions of source code must retain the above copyright
12 # notice, this list of conditions and the following disclaimer.
13 # - Redistributions in binary form must reproduce the above copyright
14 # notice, this list of conditions and the following disclaimer in the
15 # documentation and/or other materials provided with the distribution.
16 # - The name of the author may not be used to endorse or promote products
17 # derived from this software without specific prior written permission.
19 # THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
20 # IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
21 # OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
22 # IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
23 # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
24 # NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
25 # DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
26 # THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
27 # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
28 # THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 import subprocess
32 import os
33 import time
34 import datetime
35 import shutil
36 import sys
38 from threading import Lock, Condition
40 class Task:
41 def __init__(self, report_tag, **report_args):
42 self.report = {
43 'name': report_tag,
44 'result': 'unknown',
45 'attrs': {},
47 for k in report_args:
48 self.report['attrs'][k] = report_args[k]
49 self.ctl = None
51 def execute(self):
52 start_time = time.time()
53 try:
54 res = self.run()
55 if res == False:
56 raise Exception('run() returned False')
57 self.report['result'] = 'ok'
59 self.report['files'] = self.ctl.get_files()
61 end_time = time.time()
62 self.report['attrs']['duration'] = (end_time - start_time) * 1000
64 if res is None:
65 res = {}
67 self.ctl.done()
69 return {
70 'status': 'ok',
71 'data': res
73 except Exception as e:
74 end_time = time.time()
75 self.report['attrs']['duration'] = (end_time - start_time) * 1000
77 self.report['result'] = 'fail'
78 self.report['files'] = self.ctl.get_files()
79 self.ctl.done()
80 raise e
82 def run(self):
83 pass
85 def get_report(self):
86 return self.report
89 class TaskException(Exception):
90 def __init__(self, msg):
91 Exception.__init__(self, msg)
93 class RunCommandException(TaskException):
94 def __init__(self, msg, rc, output):
95 self.rc = rc
96 self.output = output
97 Exception.__init__(self, msg)
99 class TaskController:
100 def __init__(self, name, data, build_directory, artefact_directory, printer, kept_log_lines, print_debug = False):
101 self.name = name
102 self.data = data
103 self.files = []
104 self.log = None
105 self.log_tail = []
106 self.build_directory = build_directory
107 self.artefact_directory = artefact_directory
108 self.printer = printer
109 self.kept_log_lines = kept_log_lines
110 self.print_debug_messages = print_debug
112 def derive(self, name, data):
113 return TaskController(name, data, self.build_directory, self.artefact_directory,
114 self.printer, self.kept_log_lines, self.print_debug_messages)
116 def dprint(self, str, *args):
117 if self.print_debug_messages:
118 self.printer.print_debug(self.name, str % args)
121 def get_dependency_data(self, dep, key=None):
122 if key is None:
123 return self.get_data(dep)
124 return self.data[dep][key]
126 def get_data(self, key):
127 for dep in self.data:
128 if key in self.data[dep]:
129 return self.data[dep][key]
130 raise TaskException("WARN: unknown key %s" % key)
132 def run_command(self, cmd, cwd=None, needs_output=False):
133 self.dprint("Running `%s'..." % ' '.join(cmd))
134 output = []
135 last_line = ""
136 rc = 0
138 # FIXME: can we keep stdout and stderr separated?
139 with subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, cwd=cwd) as proc:
140 for line in proc.stdout:
141 line = line.decode('utf-8').strip('\n')
142 self.append_line_to_log_file(line)
143 if needs_output:
144 output.append(line)
145 last_line = line
146 proc.wait()
147 rc = proc.returncode
148 if rc != 0:
149 #self.dprint('stderr=%s' % stderr.strip('\n'))
151 raise RunCommandException(
152 "`%s' failed: %s" % (' '.join(cmd), last_line),
153 rc, output)
155 return {
156 'output': output,
157 'stdout': '\n'.join(output),
158 'stderr': '\n'.join(output),
159 'rc': rc,
160 'failed': not rc == 0
163 def make_temp_dir(self, name):
164 dname = '%s/%s' % ( self.build_directory, name )
165 os.makedirs(dname, exist_ok=True)
166 return os.path.abspath(dname)
168 def recursive_copy(self, src_dir, dest_dir):
169 os.makedirs(dest_dir, exist_ok=True)
170 self.run_command([ 'rsync', '-a', src_dir + '/', dest_dir ])
172 def remove_silently(self, path):
173 try:
174 os.remove(path)
175 except OSError as ex:
176 pass
178 def set_log_file(self, log_filename):
179 # TODO: open the log lazily
180 # TODO: propagate the information to XML report
181 self.log = self.open_downloadable_file('logs/' + log_filename, 'w')
183 def append_line_to_log_file(self, line):
184 if not self.log is None:
185 self.log.write(line + '\n')
186 self.log_tail.append(line)
187 self.log_tail = self.log_tail[-self.kept_log_lines:]
189 def get_artefact_absolute_path(self, relative_name, create_dirs=False):
190 base = os.path.dirname(relative_name)
191 name = os.path.basename(relative_name)
192 dname = '%s/%s/' % ( self.artefact_directory, base )
194 if create_dirs:
195 os.makedirs(dname, exist_ok=True)
197 return os.path.abspath(dname + name)
199 def add_downloadable_file(self, title, download_name, current_filename):
200 self.dprint("Downloadable `%s' at %s", title, download_name)
202 target = self.get_artefact_absolute_path(download_name, True)
203 shutil.copy(current_filename, target)
205 self.files.append({
206 'filename' : download_name,
207 'title' : title
210 return target
212 def move_dir_to_downloadable(self, title, download_name, current_dirname):
213 self.dprint("Downloadable `%s' at %s", title, download_name)
215 target = self.get_artefact_absolute_path(download_name, True)
216 shutil.move(current_dirname, target)
218 self.files.append({
219 'filename' : download_name,
220 'title' : title
223 return target
225 def open_downloadable_file(self, download_name, mode):
226 return open(self.get_artefact_absolute_path(download_name, True), mode)
228 def done(self):
229 if not self.log is None:
230 self.log.close()
232 def get_files(self):
233 return self.files
235 def ret(self, *args, **kwargs):
236 status = 'ok'
237 if len(args) == 1:
238 status = args[0]
239 if status == True:
240 status = 'ok'
241 elif status == False:
242 status = 'fail'
243 result = {
244 'status': status,
245 'data': {}
248 for k in kwargs:
249 result['data'][k] = kwargs[k]
251 return result
255 class TaskWrapper:
256 def __init__(self, id, task, description, deps, mutexes):
257 self.id = id
258 self.dependencies = deps
259 self.description = description
260 self.task = task
261 self.status = 'n/a'
262 self.completed = False
263 self.data = {}
264 self.lock = Lock()
265 self.mutexes = mutexes
267 def has_completed_okay(self):
268 with self.lock:
269 return self.completed and (self.status == 'ok')
271 def has_finished(self):
272 with self.lock:
273 return self.completed
275 def get_status(self):
276 with self.lock:
277 return self.status
279 def get_data(self):
280 with self.lock:
281 return self.data
283 def set_status(self, status, reason):
284 with self.lock:
285 self.status = status
286 self.reason = reason
288 def set_done(self, data):
289 with self.lock:
290 self.data = data
291 self.completed = True
293 def set_skipped(self, reason):
294 self.set_status('skip', reason)
297 class BuildScheduler:
298 def __init__(self, max_workers, build, artefact, build_id, printer, inline_log_lines = 10, debug = False):
299 self.config = {
300 'build-directory': build,
301 'artefact-directory': artefact,
304 self.printer = printer
306 self.start_timestamp = time.time()
307 self.start_date = datetime.datetime.now(datetime.timezone.utc).replace(microsecond=0).astimezone().isoformat(' ')
309 # Parent task controller
310 self.ctl = TaskController('scheduler', {}, build, artefact, self.printer, inline_log_lines, debug)
312 # Start the log file
313 self.report_file = self.ctl.open_downloadable_file('report.xml', 'w')
314 self.report_file.write("<?xml version=\"1.0\"?>\n")
315 self.report_file.write("<build number=\"%s\">\n" % build_id)
317 # The following attributes (up to self.guard declaration) are guarded
318 # by the self.guard mutex and use self.cond to notify about changes
319 # in any of them.
320 # Lower granularity of the locking would be possible but would
321 # complicate too much the conditions inside queue processing where we
322 # need to react to multiple events (new task added vs some task
323 # terminated vs selecting the right task to be executed).
325 # Known tasks (regardless of their state). The mapping is between
326 # a task id and TaskWrapper class.
327 self.tasks = {}
329 # Queue of tasks not yet run (uses task ids only). We insert mutex
330 # tasks at queue beginning (instead of appending) as a heuristic to
331 # prevent accumulation of mutex tasks at the end of run where it could
332 # hurt concurrent execution.
333 self.queue = []
335 # Number of currently running (executing) tasks. Used solely to
336 # control number of concurrently running tasks.
337 self.running_tasks_count = 0
339 # Flag for the queue processing whether to terminate the loop to allow
340 # clean termination of the executor.
341 self.terminate = False
343 # Here we record which mutexes are held by executing tasks. Mutexes are
344 # identified by their (string) name that is used as index. When the
345 # value is True, the mutex is held (i.e. do not run any other task
346 # claming the same mutex), mutex is not held when the value is False
347 # or when the key is not present at all.
348 self.task_mutexes = {}
350 # Condition variable guarding the above attributes.
351 # We initialize CV only without attaching a lock as it creates one
352 # automatically and CV serves as a lock too.
354 # Always use notify_all as we are waiting in multiple functions
355 # for it (e.g. while processing the queue or in barrier).
356 self.guard = Condition()
358 # Lock guarding output synchronization
359 self.output_lock = Lock()
361 # Executor for running of individual tasks
362 from concurrent.futures import ThreadPoolExecutor
363 self.max_workers = max_workers
364 self.executor = ThreadPoolExecutor(max_workers=max_workers + 2)
366 # Start the queue processor
367 self.executor.submit(BuildScheduler.process_queue_wrapper, self)
369 def process_queue_wrapper(self):
371 To allow debugging of the queue processor.
373 try:
374 self.process_queue()
375 except:
376 import traceback
377 traceback.print_exc()
379 def submit(self, description, task_id, task, deps = [], mutexes = []):
380 with self.guard:
381 #print("Submitting {} ({}, {}, {})".format(description, task_id, deps, mutexes))
382 # Check that dependencies are known
383 for d in deps:
384 if not d in self.tasks:
385 raise Exception('Dependency %s is not known.' % d)
386 # Add the wrapper
387 wrapper = TaskWrapper(task_id, task, description, deps, mutexes)
388 self.tasks[task_id] = wrapper
390 # Append to the queue
391 # We use a simple heuristic: if the task has no mutexes, we
392 # append to the end of the queue. Otherwise we prioritize the
393 # task a little bit to prevent ending with serialized execution
394 # of the mutually excluded tasks. (We add before first non-mutexed
395 # task.)
396 if len(mutexes) > 0:
397 new_queue = []
398 inserted = False
399 for q in self.queue:
400 if (len(self.tasks[q].mutexes) == 0) and (not inserted):
401 new_queue.append(task_id)
402 inserted = True
403 new_queue.append(q)
404 if not inserted:
405 new_queue.append(task_id)
406 self.queue = new_queue
407 else:
408 self.queue.append(task_id)
410 self.guard.notify_all()
412 def task_run_wrapper(self, wrapper, task_id, can_be_run):
413 try:
414 self.task_run_inner(wrapper, task_id, can_be_run)
415 except:
416 import traceback
417 traceback.print_exc()
419 def xml_escape_line(self, s):
420 from xml.sax.saxutils import escape
421 import re
423 s_without_ctrl = re.sub(r'[\x00-\x08\x0A-\x1F]', '', s)
424 s_escaped = escape(s_without_ctrl)
425 s_all_entities_encoded = s_escaped.encode('ascii', 'xmlcharrefreplace')
427 return s_all_entities_encoded.decode('utf8')
429 def task_run_inner(self, wrapper, task_id, can_be_run):
430 data = {}
432 if can_be_run:
433 for task_dep_id in wrapper.dependencies:
434 task_dep = self.tasks[task_dep_id]
435 data[task_dep_id] = task_dep.get_data()
437 wrapper.task.ctl = self.ctl.derive(task_id, data)
438 wrapper.task.ctl.set_log_file('%s.log' % task_id)
440 if can_be_run:
441 self.announce_task_started_(wrapper)
443 try:
444 res = wrapper.task.execute()
445 if (res == True) or (res is None):
446 res = {
447 'status': 'ok',
448 'data': {}
450 elif res == False:
451 res = {
452 'status': 'fail',
453 'data': {}
455 reason = None
456 except Exception as e:
457 import traceback
458 res = {
459 'status': 'fail',
460 'data': {}
462 #traceback.print_exc()
463 reason = '%s' % e
464 else:
465 for task_dep_id in wrapper.dependencies:
466 task_dep = self.tasks[task_dep_id]
467 if task_dep.has_finished() and (not task_dep.has_completed_okay()):
468 reason = 'dependency %s failed (or also skipped).' % task_dep_id
469 res = {
470 'status': 'skip',
471 'data': {}
473 wrapper.task.ctl.append_line_to_log_file('Skipped: %s' % reason)
475 status = res['status']
476 report = wrapper.task.get_report()
478 if (not report['name'] is None) and (not self.report_file is None):
479 report_xml = '<' + report['name']
480 report['attrs']['result'] = status
481 for key in report['attrs']:
482 report_xml = report_xml + ' %s="%s"' % (key, report['attrs'][key] )
483 report_xml = report_xml + ' log="logs/%s.log"' % wrapper.id
484 report_xml = report_xml + ">\n"
486 if 'files' in report:
487 for f in report['files']:
488 file = '<file title="%s" filename="%s" />\n' % ( f['title'], f['filename'])
489 report_xml = report_xml + file
491 if (not wrapper.task.ctl is None) and (len(wrapper.task.ctl.log_tail) > 0):
492 report_xml = report_xml + ' <log>\n'
493 for line in wrapper.task.ctl.log_tail:
494 report_xml = report_xml + ' <logline>' + self.xml_escape_line(line) + '</logline>\n'
495 report_xml = report_xml + ' </log>\n'
497 report_xml = report_xml + '</' + report['name'] + ">\n"
500 self.report_file.write(report_xml)
502 wrapper.set_status(status, reason)
503 self.announce_task_finished_(wrapper)
504 wrapper.set_done(res['data'])
506 with self.guard:
507 self.running_tasks_count = self.running_tasks_count - 1
509 if can_be_run:
510 for m in wrapper.mutexes:
511 self.task_mutexes [ m ] = False
513 #print("Task finished, waking up (running now {})".format(self.running_tasks_count))
514 self.guard.notify_all()
517 def process_queue(self):
518 while True:
519 with self.guard:
520 #print("Process queue running, tasks {}".format(len(self.queue)))
521 # Break inside the loop
522 while True:
523 slot_available = self.running_tasks_count < self.max_workers
524 task_available = len(self.queue) > 0
525 #print("Queue: {} (running {})".format(len(self.queue), self.running_tasks_count))
526 if slot_available and task_available:
527 break
528 if self.terminate and (not task_available):
529 return
531 #print("Queue waiting for free slots (running {}) or tasks (have {})".format(self.running_tasks_count, len(self.queue)))
532 self.guard.wait()
533 #print("Guard woken-up after waiting for free slots.")
535 # We have some tasks in the queue and we can run at
536 # least one of them
537 ( ready_task_id, can_be_run ) = self.get_first_ready_task_id_()
538 #print("Ready task is {}".format(ready_task_id))
540 if ready_task_id is None:
541 #print("Queue waiting for new tasks to appear (have {})".format(len(self.queue)))
542 self.guard.wait()
543 #print("Guard woken-up after no ready task.")
544 else:
545 # Remove the task from the queue
546 self.queue.remove(ready_task_id)
548 ready_task = self.tasks[ready_task_id]
550 # Need to update number of running tasks here and now
551 # because the executor might start the execution later
552 # and we would evaluate incorrectly the condition above
553 # that we can start another task.
554 self.running_tasks_count = self.running_tasks_count + 1
556 #print("Ready is {}".format(ready_task))
557 if can_be_run:
558 for m in ready_task.mutexes:
559 self.task_mutexes [ m ] = True
560 #print("Actually starting task {}".format(ready_task_id))
561 self.executor.submit(BuildScheduler.task_run_wrapper,
562 self, ready_task, ready_task_id, can_be_run)
564 def get_first_ready_task_id_(self):
566 Return tuple of first task that can be run (or failed immediately)
567 with note whether the result is predetermined.
568 Returns None when no task can be run.
570 # We assume self.guard was already acquired
571 # We use here the for ... else construct of Python (recall that else
572 # is taken when break is not used)
573 for task_id in self.queue:
574 task = self.tasks[task_id]
575 for task_dep_id in task.dependencies:
576 task_dep = self.tasks[ task_dep_id ]
577 if not task_dep.has_finished():
578 break
579 # Failed dependency means we can return now
580 if task_dep.get_status() != 'ok':
581 return ( task_id, False )
582 else:
583 for task_mutex in task.mutexes:
584 if (task_mutex in self.task_mutexes) and self.task_mutexes[ task_mutex ]:
585 break
586 else:
587 return ( task_id, True )
588 return ( None, None )
590 def announce_task_started_(self, task):
591 self.printer.print_starting(task.description + " ...")
593 def announce_task_finished_(self, task):
594 description = task.description
595 if task.status == 'ok':
596 msg = 'done'
597 msg_color = self.printer.GREEN
598 description = description + '.'
599 elif task.status == 'skip':
600 msg = 'skip'
601 msg_color = self.printer.CYAN
602 description = description + ': ' + task.reason
603 else:
604 msg = 'fail'
605 msg_color = self.printer.RED
606 description = description + ': ' + task.reason
608 self.printer.print_finished(msg_color, msg, description)
611 def barrier(self):
612 with self.guard:
613 #print("Barrier ({}, {})...".format(self.running_tasks_count, len(self.queue)))
614 while (self.running_tasks_count > 0) or (len(self.queue) > 0):
615 #print("Barrier waiting ({}, {})...".format(self.running_tasks_count, len(self.queue)))
616 self.guard.wait()
618 def done(self):
619 with self.guard:
620 self.terminate = True
621 self.guard.notify_all()
622 self.barrier()
623 self.close_report()
624 self.executor.shutdown(True)
626 def close_report(self):
627 if not self.report_file is None:
628 end_time = time.time()
629 self.report_file.write("<buildinfo started=\"{}\" duration=\"{}\" parallelism=\"{}\" />\n".format(
630 self.start_date, ( end_time - self.start_timestamp ) * 1000,
631 self.max_workers
633 self.report_file.write("</build>\n")
634 self.report_file.close()
635 self.report_file = None