Source code for butterfly.solution

# coding=utf-8
"""Butterfly Solution."""
from copy import deepcopy
from collections import namedtuple, OrderedDict
import os

from .utilities import tail, load_skipped_probes
from .parser import CppDictParser


[docs]class Solution(object): """Butterfly Solution. This class creates a solution from a recipe which is ready to run and monitor. Args: case: A butterfly case. recipe: A butterfly recipe. decomposeParDict: decomposeParDict for parallel run (default: None). solution_parameter: A SolutionParameter (default: None). remove_extra_foam_files: set to True if you want butterfly to remove all the extra files in 0 folder once you update the recipe (default: False). """ def __init__(self, case, recipe, decomposeParDict=None, solution_parameter=None, remove_extra_foam_files=False): """Init solution.""" self.__remove = remove_extra_foam_files assert hasattr(case, 'isCase'), \ 'ValueError:: {} is not a Butterfly.Case'.format(case) self.decomposeParDict = decomposeParDict self.__case = case self.case.decomposeParDict = self.decomposeParDict self.recipe = recipe self.update_solution_params(solution_parameter) # set internal properties for running the solution # place holder for residuals self.__residualValues = OrderedDict.fromkeys(self.residual_fields, 0) self.__isRunStarted = False self.__isRunFinished = False self.__process = None self.__log_files = None self.__errFiles = None @property def project_name(self): """Get porject name.""" return self.case.project_name @property def case(self): """Case.""" return self.__case @property def recipe(self): """Get recipe.""" return self.__recipe @recipe.setter def recipe(self, r): """set recipe.""" assert hasattr(r, 'isRecipe'), '{} is not a recipe.'.format(r) self.__recipe = r self.__recipe.prepare_case(self.case, overwrite=True, remove=self.__remove) @property def decomposeParDict(self): """DecomposeParDict.""" return self.__decomposeParDict @decomposeParDict.setter def decomposeParDict(self, dpd): """Set decomposeParDict.""" if dpd: assert hasattr(dpd, 'isDecomposeParDict'), \ '{} is not a DecomposeParDict.'.format(dpd) self.__decomposeParDict = dpd @property def remove_extra_foam_files(self): """If True, solution will remove extra files everytime recipe changes.""" return self.__remove @property def project_dir(self): """Get project directory.""" return self.case.project_dir @property def residual_fields(self): """Get list of residuals to be watched during the run.""" return self.recipe.residual_fields @property def controlDict(self): """Get controlDict.""" return self.__case.controlDict @property def residualControl(self): """Get residualControl values for this solution.""" return self.__case.fvSolution.residualControl @property def probes(self): """Get probes if any.""" return self.__case.probes @property def residual_file(self): """Return address of the residual file.""" return os.path.join(self.case.log_folder, self.recipe.log_file) @property def log_files(self): """Get full path to log files.""" return self.__log_files @property def log(self): """Get the log report.""" isContent, content = self.case.runmanager.check_file_contents(self.log_files) @property def err_files(self): """Get full path to error files.""" return self.__errFiles @property def is_running(self): """Check if the solution is still running.""" if not self.__isRunStarted and not self.__isRunFinished: return False elif self.__process.poll() is None: return True else: self.__isRunFinished = True self.case.rename_snappyHexMesh_folders() # load errors if any self.case.runmanager.check_file_contents(self.log_files) failed, err = self.case.runmanager.check_file_contents(self.err_files) assert not failed, err @property def timestep(self): """Get latest timestep for this solution.""" return self.__get_latestTime() @property def residual_values(self, latestTime=True): """Get timestep and residual values as a tuple.""" if latestTime: return self.__get_info().residual_values else: raise NotImplementedError() @property def info(self): """Get timestep and residual values as a tuple.""" return self.__get_info() def __get_info(self): i = namedtuple('Info', 'timestep residualValues') # get end of the log file if not os.path.isfile(self.residual_file): return i(0, self.__residualValues.values()) text = tail(self.residual_file).split("\nTime =")[-1].split('\n') # get timestep try: t = int(text[0]) except ValueError: t = 0 # read residual values for line in text: try: # quantity, Initial residual, Final residual, No Iterations q, ir, fr, ni = line.split(': Solving for ')[1].split(',') # use final residual if q in self.__residualValues: self.__residualValues[q] = ir.split('= ')[-1] except IndexError: pass return i(t, self.__residualValues.values()) def __get_latestTime(self): # get end of the log file if not os.path.isfile(self.residual_file): return 0 text = tail(self.residual_file).split("\nTime =")[-1].split('\n') # get timestep try: t = int(text[0]) except ValueError: t = 0 return t
[docs] def update_from_recipe(self, recipe): """Update solution from recipe inputs. This method creates a SolutionParameter from each recipe property, and uses updateSolutionParams to update the solution. """ tp = SolutionParameter.from_cpp_dictionary('turbulenceProperties', str(recipe.turbulenceProperties)) fv_sc = SolutionParameter.from_cpp_dictionary('fvSchemes', str(recipe.fvSchemes)) fv_sol = SolutionParameter.from_cpp_dictionary('fvSolution', str(recipe.fvSolution)) self.update_solution_params((tp, fv_sc, fv_sol))
[docs] def update_solution_params(self, sol_params, timestep=None): """Update parameters. Attributes: sol_params: A list of solution parameters. """ if not sol_params: # if input is None return return # check input with the current and update them if there has been changes for solPar in sol_params: assert hasattr(solPar, 'isSolutionParameter'), \ '{} is not a solution parameter.'.format(solPar) # check if this timestep in range of SolutionParameter time range if timestep is not None and not solPar.is_time_in_range(timestep): # not in time range! try the next one continue try: update = getattr(self.__case, solPar.filename) \ .update_values(solPar.values, solPar.replace) except AttributeError as e: # probes can be empty at start raise AttributeError(e) # check to remove functions if needed if solPar.filename == 'controlDict': cur_func = self.__case.controlDict.values['functions'].keys() new_func = solPar.values['functions'].keys() for k in cur_func: if k not in new_func: del(self.__case.controlDict.values['functions'][k]) update = True if update: print('Updating {}...'.format(solPar.filename)) ffile = getattr(self.__case, solPar.filename) ffile.save(self.project_dir) # just in case probes was not there and now should be included # in controlDict if solPar.filename == 'probes': if not self.controlDict.include: self.controlDict.include = solPar.filename self.controlDict.save(self.project_dir)
[docs] def run(self, wait=False): """Execute the solution.""" self.case.rename_snappyHexMesh_folders() log = self.case.command( cmd=self.recipe.application, args=None, decomposeParDict=self.__decomposeParDict, run=True, wait=wait) self.__process = log.process self.__errFiles = log.errorfiles self.__log_files = log.logfiles self.__isRunStarted = True if not wait: self.__isRunFinished = False else: self.__isRunFinished = True
[docs] def purge(self, remove_polyMesh_content=True, remove_snappyHexMesh_folders=True, remove_result_folders=False, remove_postProcessing_folder=False): """Purge solution's case folder.""" self.case.purge( remove_polyMesh_content, remove_snappyHexMesh_folders, remove_result_folders, remove_postProcessing_folder)
[docs] def terminate(self): """Cancel the solution.""" self.case.runmanager.terminate() if self.decomposeParDict: # remove processor folders if they haven't been removed already. self.case.remove_processor_folders()
[docs] def load_probe_values(self, field): """Return OpenFOAM probes results for a given field (e.g. U).""" return self.case.load_probe_values(field)
[docs] def load_probes(self, field): """Return OpenFOAM probes location for a given field (e.g. U).""" return self.case.load_probes(field)
[docs] def skipped_probes(self): """Get list of probes that are skipped from the solution.""" return load_skipped_probes(os.path.join(self.case.log_folder, self.recipe.log_file))
[docs] def sample(self, name, points, field, wait=True): """Sample the results for a certain field. Args: name: A unique name for this sample. points: List of points as (x, y, z). fields: List of fields (e.g. U, p). args: Command arguments. wait: Wait until command execution ends. Returns: namedtuple(probes, values). """ return self.case.sample(name, points, field, wait=True)
[docs] def duplicate(self): """Return a copy of this object.""" return deepcopy(self)
[docs] def ToString(self): """Overwrite .NET ToString method.""" return self.__repr__()
def __repr__(self): """Solution representation.""" return "{}::{}".format(self.project_name, self.recipe)
[docs]class SolutionParameter(object): """A solution parameter that can be changed during the solution. Add solution parameter to to solution settings. Attributes: filename: OpenFOAM filename that the values are belong to (e.g. blockMeshDict, fvSchemes). values: New values as a python dictionary. replace: Set to True if you want the original dictionary to be replaced by new values. Default is False which means the original dictionary will be only updated by new values. time_range: Time range that this SolutioParameter is valid as a tuple (default: (0, 1.0e+100)). """ _of_filenames = ('epsilon', 'k', 'nut', 'p', 'U', 'T', 'turbulenceProperties', 'transportProperties', 'blockMeshDict', 'controlDict', 'fvSchemes', 'fvSolution', 'snappyHexMeshDict', 'probes') def __init__(self, of_filename, values, replace=False, time_range=None): """Create solution parameter.""" self.filename = of_filename self.values = values self.replace = replace try: self.__t0 = int(time_range[0]) self.__t1 = int(time_range[1]) except TypeError: self.__t0, self.__t1 = 0, 1.0e+100 except ValueError: self.__t0, self.__t1 = 0, 1.0e+100
[docs] @classmethod def from_dictionary_file(cls, of_filename, filepath, replace=False, time_range=None): """Create from an OpenFOAM dictionary file.""" # convert values to python dictionary values = CppDictParser.from_file(filepath).values return cls(of_filename, values, replace, time_range)
[docs] @classmethod def from_cpp_dictionary(cls, of_filename, dictionary, replace=False, time_range=None, header=False): """Create from an OpenFOAM dictionary in text format.""" # convert values to python dictionary values = CppDictParser(text=dictionary).values if not header and 'FoamFile' in values: del(values['FoamFile']) return cls(of_filename, values, replace, time_range)
@property def isSolutionParameter(self): """Return True.""" return True @property def values(self): """Return OpenFOAM file name.""" return self.__values @values.setter def values(self, v): assert isinstance(v, dict), 'values should be a dictionary not a {}.' \ .format(type(v)) self.__values = v @property def filename(self): """Return OpenFOAM file name.""" return self.__filename @filename.setter def filename(self, f): assert f in self._of_filenames, '{} is not a valid OpenFOAM dictionary ' \ 'file. Try one of the files below:\n{}.' \ .format(f, '\n'.join(self._of_filenames)) self.__filename = f @property def time_range(self): """Get time range.""" return (self.__t0, self.__t1)
[docs] def is_time_in_range(self, time): """Check if time is in this SolutionParameter time range.""" return self.__t0 <= float(time) < self.__t1
[docs] def duplicate(self): """Return a copy of this object.""" return deepcopy(self)
[docs] def ToString(self): """Overwrite .NET ToString method.""" return self.__repr__()
def __repr__(self): """Class representation.""" kv = '\n'.join('{}: {};'.format(k, v) for k, v in self.values.iteritems()) return 'SolutionParameter::{}\n{}\n@{}'\ .format(self.filename, kv, self.time_range)