Source code for honeybee.radiance.runmanager.runmanager

"""Radiance Analysis workflows."""
import time
import os


[docs]class TaskGroup(object): """A group of radiance tasks which will be executed in parallel.""" def __init__(self, tasks=None): self._tasks = tasks or () @property def is_finished(self): for task in self._tasks: if not task.is_finished: return False return True @property def progress(self): for task in self._tasks: print('..%s' % task.progress_report)
[docs] def execute(self, cpus=1, cwd=None, env=None, update_freq=5, verbose=True): """Execute tasks in this task group in parallel.""" # execute first subtasks running = [None for task in self._tasks] for count, task in enumerate(self._tasks): if verbose: print('..Starting task {}'.format(task.title)) running[count] = task.execute_next(cwd, env, verbose) while not self.is_finished: for count in xrange(len(self._tasks)): if not running[count]: # no extra task is left in this task group continue if running[count].is_finished: # report success or failure print(running[count].progress_report) if not running[count].is_succeed: # kill all the other runs return -1 # execute next task read to go running[count] = self._tasks[count].execute_next(cwd, env, verbose) else: print('{}'.format(running[count].progress_report)) time.sleep(update_freq)
[docs] def terminate(self): """Terminate task group.""" for task in self._tasks: if task.is_running: task.terminate()
# TODO(mostapha): Keep track of number of cpus # TODO(mostapha): enhance reporting.
[docs]class Runner(object): """Run manager for Radiance tasks.""" def __init__(self, title, tasks): """Run manager for Radiance tasks. Args: tasks: List of lists of tasks to execute. Grouped tasks will be executed in parallel. Each group will be executed in serial. For isinstance for ((task_1, task_2), task_3) input Runner will execute task_1 and task_2 in parallel and executes task_3 only when both task_1 and task_2 is finished. """ self._title = title self._taskgroups = [TaskGroup(task) for task in tasks]
[docs] def execute(self, cpus=1, cwd=None, env=None, update_freq=5, verbose=True): """Execute all task groups.""" if verbose: print('Starting {}'.format(self._title)) for taskgroup in self._taskgroups: # this is a blocking call success = taskgroup.execute(cpus, cwd, env, update_freq, verbose) if success == -1: # task has failed print('Terminating running tasks...') taskgroup.terminate() return
def __repr__(self): """Represent Runner class.""" return 'RunManager: {}'.format(self._title)