Source code for honeybee.radiance.runmanager.task

"""Radiance Task.

A task is a series of command that will be executed one after each other.
"""
import re
import subprocess
import os
import time


[docs]class Task(object): """A collection of subtasks to be executed one after each other.""" def __init__(self, title, subtasks): self._cpu_demand = 1 # will be overwritten based on input tasks. self._title = title self._last_task_executed = None self.subtasks = subtasks
[docs] @classmethod def from_json(cls, task_json): """Create a task from a dictionary. {'title': self._title, 'subtasks': [{}, {}]} """ return cls(task_json['title'], SubTask.from_json(st for st in task_json['subtasks']))
@property def title(self): """SubTask title.""" return self._title @property def cpu_demand(self): """Number of cpus that this task will be using. This number will always be 1 on Windows as radiance doesn't support multi-processor calculation on Windows. """ return self._cpu_demand @property def subtasks(self): """List of subtasks.""" return self._subtasks @subtasks.setter def subtasks(self, input): for task in input: assert isinstance(task, SubTask), 'Expected SubTask not {}.'.format(task) self._cpu_demand = max(task.cpu_demand, self._cpu_demand) self._subtasks = input @property def count(self): """Length of subtasks.""" return len(self.subtasks) @property def is_running(self): for subtask in self.subtasks: if subtask.is_running: return True return False @property def is_finished(self): """Check if the execution of this task is finished.""" for subtask in self.subtasks: if not subtask.is_finished: return False return True
[docs] def is_succeed(self): """True if all subtasks are executed successfully.""" for subtask in self.subtasks: if not subtask.is_succeed: return False return True
@property def progress_report(self): """Return human readable progress of subtasks.""" for task in self.subtasks: print('....%s' % task.progress_report)
[docs] def execute(self, cwd=None, env=None, verbose=True, update_freq=5): """Execute this taskself. This method is blocking and will wait until the execution is finished. """ for count in range(self.count): task = self.execute_subtask(count, cwd, env, verbose) while task.is_running: if verbose: # replace eith progress print('....[{} of {}]: {}'.format( count + 1, self.count, task.progress_report)) time.sleep(update_freq) # report success or failure if not task.is_succeed: print('....[{} of {}]: {}'.format(count + 1, self.count, task.progress_report)) break
[docs] def execute_subtask(self, task_index, cwd=None, env=None, verbose=True): task = self.subtasks[task_index] if verbose: print('...Starting subtask {} of {}: {}...'.format( task_index + 1, self.count, task.title)) task.execute(cwd, env) self._last_task_executed = task_index return task
[docs] def execute_next(self, cwd=None, env=None, verbose=True): """Execute next task in line.""" if self._last_task_executed and self._last_task_executed + 1 == self.count: return if self._last_task_executed is None: task_index = 0 else: task_index = self._last_task_executed + 1 task = self.subtasks[task_index] if verbose: print('...Starting subtask {} of {}: {}...'.format( task_index + 1, self.count, task.title)) task.execute(cwd, env) self._last_task_executed = task_index return task
[docs] def terminate(self): """Terminate subtask.""" for subtask in self.subtasks: if subtask.is_running: subtask.terminate()
[docs] def to_json(self): """Return a Task as a dictionary.""" return {'title': self.title, 'subtasks': [task.to_json() for task in self.subtasks]}
[docs] def ToString(self): """Overwrite .NET ToString method.""" return self.__repr__()
def __repr__(self): """Task representation.""" return '\n'.join('{}: {}'.format(count + 1, task.title) for count, task in enumerate(self.subtasks))
[docs]class SubTask(object): def __init__(self, title, command, output_file=None, expected_output_size=None): """Task Args: title: Human readable title for this command. command: A radinace command as strings. output_file: Relative path to output_file. This file will be used to report completion process. expected_output_size: Expected output size for output file. This number will be used to report completion process. """ self._title = title self._command = command self._cpu_demand = self._get_cpu_count(command) self._process = None self._output_file = output_file self._expected_size = expected_output_size self._execution_started_at = None
[docs] @classmethod def from_json(cls, task_json): """Create a task from a dictionary. {'title': self._title, 'command': self.command} """ if 'output_file' in task_json: output_file = task_json['output_file'] else: output_file = None if 'output_file' in task_json: expected_size = task_json['expected_size'] else: expected_size = None return cls(task_json['title'], task_json['command'], output_file, expected_size)
@property def title(self): """SubTask title.""" return self._title @property def cpu_demand(self): """Number of cpus that this task will be using. This number will always be 1 on Windows as radiance doesn't support multi-processor calculation on Windows. """ return self._cpu_demand @property def is_started(self): if self._process: return True else: return False @property def is_running(self): if self._process: return self._process.poll() is None else: return False @property def is_finished(self): if self._process: return not self._process.poll() is None else: return False @property def is_succeed(self): if self._process: return self._process.returncode == 0 else: return False @property def command(self): """List of command for this task.""" return self._command @property def progress(self): """Progress as a percentage.""" if not self.is_started: return 0 elif not self._output_file or not self._expected_size: return -1 elif not os.path.isfile(self._output_file): return -1 else: count = os.path.getsize(self._output_file) percent = min(100, round(100.0 * count / float(self._expected_size), 1)) return percent @property def progress_report(self): """Human readable progress report.""" if not self.is_started: return '...{} is not started!'.format(self.title) elif self.is_finished: if self.is_succeed: return '...Finished {} successfully!'.format(self.title) else: # the task failed for some reason return '...{} failed:\n\n\tError message:\n\t{}\n\tCommand:\n\t{}' \ .format(self.title, '\t'.join(self.stderr), self.command) else: # it is still running progress = self.progress if progress == -1: return '....{0} is still running ({1:.2f}s).'.format( self.title, time.time() - self._execution_started_at ) else: return '....{0} is {1}% complete in {2:.2f}s.'.format( self.title, self.progress, time.time() - self._execution_started_at ) @property def stderr(self): """Return standard errors if any.""" if self._process: return self._process.stderr else: return () @property def stdout(self): """Return standard output if any.""" if self._process: return self._process.stdout else: return () @staticmethod def _get_cpu_count(command): """Get number of cpus from command. This method tries to find the digit after -n in command and return the max number. """ n = 1 result = re.findall(r'-n\s*(\d)', command) for num_cpu in result: if int(num_cpu) > n: n = num_cpu return n
[docs] def execute(self, cwd=None, env=None): """Execute command one after each other.""" self._execution_started_at = time.time() if cwd and self._output_file and cwd not in self._output_file: self._output_file = os.path.join(cwd, self._output_file) self._process = subprocess.Popen( self.command, cwd=cwd, env=env, stderr=subprocess.PIPE, stdout=subprocess.PIPE, shell=True )
[docs] def terminate(self): """Terminate subtask.""" if not self.is_started: return elif self.is_running: self._process.terminate()
[docs] def to_json(self): """Return a Task as a dictionary.""" return {'title': self.title, 'command': self.command, 'output_file': self._output_file, 'expected_size': self._expected_size}
[docs] def ToString(self): """Overwrite .NET ToString method.""" return self.__repr__()
def __repr__(self): """Task representation.""" return 'SubTask: %s' % self.command