4 # Copyright (c) 2017 Vojtech Horky
7 # Redistribution and use in source and binary forms, with or without
8 # modification, are permitted provided that the following conditions
11 # - Redistributions of source code must retain the above copyright
12 # notice, this list of conditions and the following disclaimer.
13 # - Redistributions in binary form must reproduce the above copyright
14 # notice, this list of conditions and the following disclaimer in the
15 # documentation and/or other materials provided with the distribution.
16 # - The name of the author may not be used to endorse or promote products
17 # derived from this software without specific prior written permission.
19 # THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
20 # IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
21 # OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
22 # IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
23 # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
24 # NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
25 # DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
26 # THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
27 # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
28 # THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
38 from threading
import Lock
, Condition
41 def __init__(self
, report_tag
, **report_args
):
48 self
.report
['attrs'][k
] = report_args
[k
]
52 start_time
= time
.time()
56 raise Exception('run() returned False')
57 self
.report
['result'] = 'ok'
59 self
.report
['files'] = self
.ctl
.get_files()
61 end_time
= time
.time()
62 self
.report
['attrs']['duration'] = (end_time
- start_time
) * 1000
73 except Exception as e
:
74 end_time
= time
.time()
75 self
.report
['attrs']['duration'] = (end_time
- start_time
) * 1000
77 self
.report
['result'] = 'fail'
78 self
.report
['files'] = self
.ctl
.get_files()
89 class TaskException(Exception):
90 def __init__(self
, msg
):
91 Exception.__init
__(self
, msg
)
93 class RunCommandException(TaskException
):
94 def __init__(self
, msg
, rc
, output
):
97 Exception.__init
__(self
, msg
)
100 def __init__(self
, name
, data
, build_directory
, artefact_directory
, printer
, kept_log_lines
, print_debug
= False):
106 self
.build_directory
= build_directory
107 self
.artefact_directory
= artefact_directory
108 self
.printer
= printer
109 self
.kept_log_lines
= kept_log_lines
110 self
.print_debug_messages
= print_debug
112 def derive(self
, name
, data
):
113 return TaskController(name
, data
, self
.build_directory
, self
.artefact_directory
,
114 self
.printer
, self
.kept_log_lines
, self
.print_debug_messages
)
116 def dprint(self
, str, *args
):
117 if self
.print_debug_messages
:
118 self
.printer
.print_debug(self
.name
, str % args
)
121 def get_dependency_data(self
, dep
, key
=None):
123 return self
.get_data(dep
)
124 return self
.data
[dep
][key
]
126 def get_data(self
, key
):
127 for dep
in self
.data
:
128 if key
in self
.data
[dep
]:
129 return self
.data
[dep
][key
]
130 raise TaskException("WARN: unknown key %s" % key
)
132 def run_command(self
, cmd
, cwd
=None, needs_output
=False):
133 self
.dprint("Running `%s'..." % ' '.join(cmd
))
138 # FIXME: can we keep stdout and stderr separated?
139 with subprocess
.Popen(cmd
, stdout
=subprocess
.PIPE
, stderr
=subprocess
.STDOUT
, cwd
=cwd
) as proc
:
140 for line
in proc
.stdout
:
141 line
= line
.decode('utf-8').strip('\n')
142 self
.append_line_to_log_file(line
)
149 #self.dprint('stderr=%s' % stderr.strip('\n'))
151 raise RunCommandException(
152 "`%s' failed: %s" % (' '.join(cmd
), last_line
),
157 'stdout': '\n'.join(output
),
158 'stderr': '\n'.join(output
),
160 'failed': not rc
== 0
163 def make_temp_dir(self
, name
):
164 dname
= '%s/%s' % ( self
.build_directory
, name
)
165 os
.makedirs(dname
, exist_ok
=True)
166 return os
.path
.abspath(dname
)
168 def recursive_copy(self
, src_dir
, dest_dir
):
169 os
.makedirs(dest_dir
, exist_ok
=True)
170 self
.run_command([ 'rsync', '-a', src_dir
+ '/', dest_dir
])
172 def remove_silently(self
, path
):
175 except OSError as ex
:
178 def set_log_file(self
, log_filename
):
179 # TODO: open the log lazily
180 # TODO: propagate the information to XML report
181 self
.log
= self
.open_downloadable_file('logs/' + log_filename
, 'w')
183 def append_line_to_log_file(self
, line
):
184 if not self
.log
is None:
185 self
.log
.write(line
+ '\n')
186 self
.log_tail
.append(line
)
187 self
.log_tail
= self
.log_tail
[-self
.kept_log_lines
:]
189 def get_artefact_absolute_path(self
, relative_name
, create_dirs
=False):
190 base
= os
.path
.dirname(relative_name
)
191 name
= os
.path
.basename(relative_name
)
192 dname
= '%s/%s/' % ( self
.artefact_directory
, base
)
195 os
.makedirs(dname
, exist_ok
=True)
197 return os
.path
.abspath(dname
+ name
)
199 def add_downloadable_file(self
, title
, download_name
, current_filename
):
200 self
.dprint("Downloadable `%s' at %s", title
, download_name
)
202 target
= self
.get_artefact_absolute_path(download_name
, True)
203 shutil
.copy(current_filename
, target
)
206 'filename' : download_name
,
212 def move_dir_to_downloadable(self
, title
, download_name
, current_dirname
):
213 self
.dprint("Downloadable `%s' at %s", title
, download_name
)
215 target
= self
.get_artefact_absolute_path(download_name
, True)
216 shutil
.move(current_dirname
, target
)
219 'filename' : download_name
,
225 def open_downloadable_file(self
, download_name
, mode
):
226 return open(self
.get_artefact_absolute_path(download_name
, True), mode
)
229 if not self
.log
is None:
235 def ret(self
, *args
, **kwargs
):
241 elif status
== False:
249 result
['data'][k
] = kwargs
[k
]
256 def __init__(self
, id, task
, description
, deps
, mutexes
):
258 self
.dependencies
= deps
259 self
.description
= description
262 self
.completed
= False
265 self
.mutexes
= mutexes
267 def has_completed_okay(self
):
269 return self
.completed
and (self
.status
== 'ok')
271 def has_finished(self
):
273 return self
.completed
275 def get_status(self
):
283 def set_status(self
, status
, reason
):
288 def set_done(self
, data
):
291 self
.completed
= True
293 def set_skipped(self
, reason
):
294 self
.set_status('skip', reason
)
297 class BuildScheduler
:
298 def __init__(self
, max_workers
, build
, artefact
, build_id
, printer
, inline_log_lines
= 10, debug
= False):
300 'build-directory': build
,
301 'artefact-directory': artefact
,
304 self
.printer
= printer
306 self
.start_timestamp
= time
.time()
307 self
.start_date
= datetime
.datetime
.now(datetime
.timezone
.utc
).replace(microsecond
=0).astimezone().isoformat(' ')
309 # Parent task controller
310 self
.ctl
= TaskController('scheduler', {}, build
, artefact
, self
.printer
, inline_log_lines
, debug
)
313 self
.report_file
= self
.ctl
.open_downloadable_file('report.xml', 'w')
314 self
.report_file
.write("<?xml version=\"1.0\"?>\n")
315 self
.report_file
.write("<build number=\"%s\">\n" % build_id
)
317 # The following attributes (up to self.guard declaration) are guarded
318 # by the self.guard mutex and use self.cond to notify about changes
320 # Lower granularity of the locking would be possible but would
321 # complicate too much the conditions inside queue processing where we
322 # need to react to multiple events (new task added vs some task
323 # terminated vs selecting the right task to be executed).
325 # Known tasks (regardless of their state). The mapping is between
326 # a task id and TaskWrapper class.
329 # Queue of tasks not yet run (uses task ids only). We insert mutex
330 # tasks at queue beginning (instead of appending) as a heuristic to
331 # prevent accumulation of mutex tasks at the end of run where it could
332 # hurt concurrent execution.
335 # Number of currently running (executing) tasks. Used solely to
336 # control number of concurrently running tasks.
337 self
.running_tasks_count
= 0
339 # Flag for the queue processing whether to terminate the loop to allow
340 # clean termination of the executor.
341 self
.terminate
= False
343 # Here we record which mutexes are held by executing tasks. Mutexes are
344 # identified by their (string) name that is used as index. When the
345 # value is True, the mutex is held (i.e. do not run any other task
346 # claming the same mutex), mutex is not held when the value is False
347 # or when the key is not present at all.
348 self
.task_mutexes
= {}
350 # Condition variable guarding the above attributes.
351 # We initialize CV only without attaching a lock as it creates one
352 # automatically and CV serves as a lock too.
354 # Always use notify_all as we are waiting in multiple functions
355 # for it (e.g. while processing the queue or in barrier).
356 self
.guard
= Condition()
358 # Lock guarding output synchronization
359 self
.output_lock
= Lock()
361 # Executor for running of individual tasks
362 from concurrent
.futures
import ThreadPoolExecutor
363 self
.max_workers
= max_workers
364 self
.executor
= ThreadPoolExecutor(max_workers
=max_workers
+ 2)
366 # Start the queue processor
367 self
.executor
.submit(BuildScheduler
.process_queue_wrapper
, self
)
369 def process_queue_wrapper(self
):
371 To allow debugging of the queue processor.
377 traceback
.print_exc()
379 def submit(self
, description
, task_id
, task
, deps
= [], mutexes
= []):
381 #print("Submitting {} ({}, {}, {})".format(description, task_id, deps, mutexes))
382 # Check that dependencies are known
384 if not d
in self
.tasks
:
385 raise Exception('Dependency %s is not known.' % d
)
387 wrapper
= TaskWrapper(task_id
, task
, description
, deps
, mutexes
)
388 self
.tasks
[task_id
] = wrapper
390 # Append to the queue
391 # We use a simple heuristic: if the task has no mutexes, we
392 # append to the end of the queue. Otherwise we prioritize the
393 # task a little bit to prevent ending with serialized execution
394 # of the mutually excluded tasks. (We add before first non-mutexed
400 if (len(self
.tasks
[q
].mutexes
) == 0) and (not inserted
):
401 new_queue
.append(task_id
)
405 new_queue
.append(task_id
)
406 self
.queue
= new_queue
408 self
.queue
.append(task_id
)
410 self
.guard
.notify_all()
412 def task_run_wrapper(self
, wrapper
, task_id
, can_be_run
):
414 self
.task_run_inner(wrapper
, task_id
, can_be_run
)
417 traceback
.print_exc()
419 def xml_escape_line(self
, s
):
420 from xml
.sax
.saxutils
import escape
423 s_without_ctrl
= re
.sub(r
'[\x00-\x08\x0A-\x1F]', '', s
)
424 s_escaped
= escape(s_without_ctrl
)
425 s_all_entities_encoded
= s_escaped
.encode('ascii', 'xmlcharrefreplace')
427 return s_all_entities_encoded
.decode('utf8')
429 def task_run_inner(self
, wrapper
, task_id
, can_be_run
):
433 for task_dep_id
in wrapper
.dependencies
:
434 task_dep
= self
.tasks
[task_dep_id
]
435 data
[task_dep_id
] = task_dep
.get_data()
437 wrapper
.task
.ctl
= self
.ctl
.derive(task_id
, data
)
438 wrapper
.task
.ctl
.set_log_file('%s.log' % task_id
)
441 self
.announce_task_started_(wrapper
)
444 res
= wrapper
.task
.execute()
445 if (res
== True) or (res
is None):
456 except Exception as e
:
462 #traceback.print_exc()
465 for task_dep_id
in wrapper
.dependencies
:
466 task_dep
= self
.tasks
[task_dep_id
]
467 if task_dep
.has_finished() and (not task_dep
.has_completed_okay()):
468 reason
= 'dependency %s failed (or also skipped).' % task_dep_id
473 wrapper
.task
.ctl
.append_line_to_log_file('Skipped: %s' % reason
)
475 status
= res
['status']
476 report
= wrapper
.task
.get_report()
478 if (not report
['name'] is None) and (not self
.report_file
is None):
479 report_xml
= '<' + report
['name']
480 report
['attrs']['result'] = status
481 for key
in report
['attrs']:
482 report_xml
= report_xml
+ ' %s="%s"' % (key
, report
['attrs'][key
] )
483 report_xml
= report_xml
+ ' log="logs/%s.log"' % wrapper
.id
484 report_xml
= report_xml
+ ">\n"
486 if 'files' in report
:
487 for f
in report
['files']:
488 file = '<file title="%s" filename="%s" />\n' % ( f
['title'], f
['filename'])
489 report_xml
= report_xml
+ file
491 if (not wrapper
.task
.ctl
is None) and (len(wrapper
.task
.ctl
.log_tail
) > 0):
492 report_xml
= report_xml
+ ' <log>\n'
493 for line
in wrapper
.task
.ctl
.log_tail
:
494 report_xml
= report_xml
+ ' <logline>' + self
.xml_escape_line(line
) + '</logline>\n'
495 report_xml
= report_xml
+ ' </log>\n'
497 report_xml
= report_xml
+ '</' + report
['name'] + ">\n"
500 self
.report_file
.write(report_xml
)
502 wrapper
.set_status(status
, reason
)
503 self
.announce_task_finished_(wrapper
)
504 wrapper
.set_done(res
['data'])
507 self
.running_tasks_count
= self
.running_tasks_count
- 1
510 for m
in wrapper
.mutexes
:
511 self
.task_mutexes
[ m
] = False
513 #print("Task finished, waking up (running now {})".format(self.running_tasks_count))
514 self
.guard
.notify_all()
517 def process_queue(self
):
520 #print("Process queue running, tasks {}".format(len(self.queue)))
521 # Break inside the loop
523 slot_available
= self
.running_tasks_count
< self
.max_workers
524 task_available
= len(self
.queue
) > 0
525 #print("Queue: {} (running {})".format(len(self.queue), self.running_tasks_count))
526 if slot_available
and task_available
:
528 if self
.terminate
and (not task_available
):
531 #print("Queue waiting for free slots (running {}) or tasks (have {})".format(self.running_tasks_count, len(self.queue)))
533 #print("Guard woken-up after waiting for free slots.")
535 # We have some tasks in the queue and we can run at
537 ( ready_task_id
, can_be_run
) = self
.get_first_ready_task_id_()
538 #print("Ready task is {}".format(ready_task_id))
540 if ready_task_id
is None:
541 #print("Queue waiting for new tasks to appear (have {})".format(len(self.queue)))
543 #print("Guard woken-up after no ready task.")
545 # Remove the task from the queue
546 self
.queue
.remove(ready_task_id
)
548 ready_task
= self
.tasks
[ready_task_id
]
550 # Need to update number of running tasks here and now
551 # because the executor might start the execution later
552 # and we would evaluate incorrectly the condition above
553 # that we can start another task.
554 self
.running_tasks_count
= self
.running_tasks_count
+ 1
556 #print("Ready is {}".format(ready_task))
558 for m
in ready_task
.mutexes
:
559 self
.task_mutexes
[ m
] = True
560 #print("Actually starting task {}".format(ready_task_id))
561 self
.executor
.submit(BuildScheduler
.task_run_wrapper
,
562 self
, ready_task
, ready_task_id
, can_be_run
)
564 def get_first_ready_task_id_(self
):
566 Return tuple of first task that can be run (or failed immediately)
567 with note whether the result is predetermined.
568 Returns None when no task can be run.
570 # We assume self.guard was already acquired
571 # We use here the for ... else construct of Python (recall that else
572 # is taken when break is not used)
573 for task_id
in self
.queue
:
574 task
= self
.tasks
[task_id
]
575 for task_dep_id
in task
.dependencies
:
576 task_dep
= self
.tasks
[ task_dep_id
]
577 if not task_dep
.has_finished():
579 # Failed dependency means we can return now
580 if task_dep
.get_status() != 'ok':
581 return ( task_id
, False )
583 for task_mutex
in task
.mutexes
:
584 if (task_mutex
in self
.task_mutexes
) and self
.task_mutexes
[ task_mutex
]:
587 return ( task_id
, True )
588 return ( None, None )
590 def announce_task_started_(self
, task
):
591 self
.printer
.print_starting(task
.description
+ " ...")
593 def announce_task_finished_(self
, task
):
594 description
= task
.description
595 if task
.status
== 'ok':
597 msg_color
= self
.printer
.GREEN
598 description
= description
+ '.'
599 elif task
.status
== 'skip':
601 msg_color
= self
.printer
.CYAN
602 description
= description
+ ': ' + task
.reason
605 msg_color
= self
.printer
.RED
606 description
= description
+ ': ' + task
.reason
608 self
.printer
.print_finished(msg_color
, msg
, description
)
613 #print("Barrier ({}, {})...".format(self.running_tasks_count, len(self.queue)))
614 while (self
.running_tasks_count
> 0) or (len(self
.queue
) > 0):
615 #print("Barrier waiting ({}, {})...".format(self.running_tasks_count, len(self.queue)))
620 self
.terminate
= True
621 self
.guard
.notify_all()
624 self
.executor
.shutdown(True)
626 def close_report(self
):
627 if not self
.report_file
is None:
628 end_time
= time
.time()
629 self
.report_file
.write("<buildinfo started=\"{}\" duration=\"{}\" parallelism=\"{}\" />\n".format(
630 self
.start_date
, ( end_time
- self
.start_timestamp
) * 1000,
633 self
.report_file
.write("</build>\n")
634 self
.report_file
.close()
635 self
.report_file
= None