# coding=utf-8
"""Collection of useful methods."""
from __future__ import print_function
import os
import sys
import collections
from collections import OrderedDict, namedtuple
from subprocess import Popen, PIPE
import gzip
[docs]def list_files(folder, fullpath=False):
"""list files in a folder."""
if not os.path.isdir(folder):
yield None
for f in os.listdir(folder):
if os.path.isfile(os.path.join(folder, f)):
if fullpath:
yield os.path.join(folder, f)
else:
yield f
else:
yield
[docs]def load_case_files(folder, fullpath=False):
"""load openfoam files from a folder."""
files = []
for p in ('0', 'constant', 'system', 'constant/triSurface'):
fp = os.path.join(folder, p)
files.append(tuple(list_files(fp, fullpath)))
Files = namedtuple('Files', 'zero constant system stl')
return Files(*files)
[docs]def mkdir(directory, overwrite=True):
"""Make a directory.
Args:
directory: directory as a string.
overwrite: A boolean to overwrite the folder if already exists.
"""
if not os.path.isdir(directory):
try:
os.mkdir(directory)
except Exception as e:
raise ValueError('Failed to create %s:\n%s' % (directory, e))
return directory
[docs]def write_to_file(full_path, content):
"""write string content to a file."""
try:
with open(full_path, 'wb') as outf:
outf.write(content)
except Exception as e:
raise ValueError('Failed to create %s:\n%s' % (full_path, e))
return full_path
[docs]def run_batch_file(filepath, wait=True):
"""run an executable .bat file.
args:
wait: Wait for analysis to finish (default: True).
returns:
Popen process.
"""
if not os.path.isfile(filepath):
raise ValueError('Cannot find %s' % filepath)
sys.stdout.flush()
p = Popen(filepath, shell=False, stdin=PIPE)
if wait:
(output, err) = p.communicate(input='Y\n')
return p
[docs]def tail(file_path, lines=20):
"""Get tail of the file."""
with open(file_path, 'rb') as f:
total_lines_wanted = lines
BLOCK_SIZE = 1024
f.seek(0, 2)
block_end_byte = f.tell()
lines_to_go = total_lines_wanted
block_number = -1
# blocks of size BLOCK_SIZE, in reverse order starting from the end of
# the file
blocks = []
while lines_to_go > 0 and block_end_byte > 0:
if (block_end_byte - BLOCK_SIZE > 0):
# read the last block we haven't yet read
f.seek(block_number * BLOCK_SIZE, 2)
blocks.append(f.read(BLOCK_SIZE))
else:
# file too small, start from begining
f.seek(0, 0)
# only read what was not read
blocks.append(f.read(block_end_byte))
lines_found = blocks[-1].count('\n')
lines_to_go -= lines_found
block_end_byte -= BLOCK_SIZE
block_number -= 1
all_read_text = ''.join(reversed(blocks))
return '\n'.join(all_read_text.splitlines()[-total_lines_wanted:])
[docs]def read_last_line(filepath, block_size=1024):
"""Read the last line of a file.
Modified from: http://www.manugarg.com/2007/04/tailing-in-python.html
Args:
filepath: path to file
block_size: data is read in chunks of this size (optional, default=1024)
Raises:
IOError if file cannot be processed.
"""
# rU is to open it with Universal newline support
f = open(filepath, 'rU')
offset = block_size
try:
f.seek(0, 2)
file_size = f.tell()
while True:
if file_size < offset:
offset = file_size
f.seek(-1 * offset, 2)
read_str = f.read(offset)
# Remove newline at the end
if read_str[offset - 1] == '\n':
read_str = read_str[0:-1]
lines = read_str.split('\n')
if len(lines) > 1: # Got a line
return lines[len(lines) - 1]
if offset == file_size: # Reached the beginning
return read_str
offset += block_size
except Exception as e:
raise Exception(str(e))
finally:
f.close()
[docs]def update_dict(d, u):
"""Update a dictionary witout overwriting the currect values.
source: http://stackoverflow.com/a/3233356/4394669
Args:
d: original dictionary.
u: new dictionary.
"""
for k, v in u.iteritems():
if isinstance(v, collections.Mapping):
r = update_dict(d.get(k, {}), v)
d[k] = r
else:
d[k] = u[k]
return d
[docs]def get_snappyHexMesh_geometry_feild(project_name, bf_geometries,
meshing_type='triSurfaceMesh',
stl_file=None):
"""Get data for Geometry as a dictionary.
Args:
project_name: Name of OpenFOAM case.
bf_geometries: List of Butterfly geometries.
meshing_type: Meshing type. (Default: triSurfaceMesh)
stl_file: Name of .stl file if it is different from project_name.stl
Returns:
A dictionary of data that can be passed to snappyHexMeshDict.
"""
stl_file = '{}.stl'.format(project_name) if not stl_file else stl_file
_geo = {stl_file: OrderedDict()}
_geo[stl_file]['type'] = meshing_type
_geo[stl_file]['name'] = project_name
_geo[stl_file]['regions'] = {}
for bfgeo in bf_geometries:
if bfgeo.name not in _geo[stl_file]['regions']:
_geo[stl_file]['regions'][bfgeo.name] = {'name': bfgeo.name}
return _geo
[docs]def get_snappyHexMesh_refinement_surfaces(
project_name, bf_geometries, global_levels=None):
"""Get data for MeshRefinementSurfaces as a dictionary.
Args:
project_name: Name of OpenFOAM case.
bf_geometries: List of Butterfly geometries.
global_levels: Default Min, max level of geometry mesh refinement.
Returns:
A dictionary of data that can be passed to snappyHexMeshDict.
"""
global_levels = (0, 0) if not global_levels else tuple(global_levels)
_ref = {project_name: OrderedDict()}
_ref[project_name]['level'] = '({} {})'.format(*(int(v) for v in global_levels))
_ref[project_name]['regions'] = {}
for bfgeo in bf_geometries:
if not bfgeo.refinementLevels:
continue
if bfgeo.name not in _ref[project_name]['regions']:
_ref[project_name]['regions'][bfgeo.name] = \
{'level': '({} {})'.format(int(bfgeo.refinementLevels[0]),
int(bfgeo.refinementLevels[1]))}
return _ref
[docs]def get_snappyHexMesh_surface_layers(bf_geometries):
"""Get data for n_surface_layers as a dictionary.
Args:
bf_geometries: List of Butterfly geometries.
Returns:
A dictionary of data that can be passed to snappyHexMeshDict.
"""
_ref = OrderedDict()
for bfgeo in bf_geometries:
if not bfgeo.nSurfaceLayers:
continue
if bfgeo.name not in _ref:
_ref[bfgeo.name] = {'nSurfaceLayers': str(bfgeo.nSurfaceLayers)}
return _ref
[docs]def get_boundary_field_from_geometries(bf_geometries, field='U'):
"""Get data for boundaryField as a dictionary.
Args:
bf_geometries: List of Butterfly geometries.
parameter: One of the fileds as a string (U , p, k , epsilon, nut)
Returns:
A dictionary of data that can be passed to snappyHexMeshDict.
"""
_bou = {}
for bfgeo in bf_geometries:
if bfgeo.name not in _bou:
if hasattr(bfgeo.boundary_condition, 'isBoundingBoxBoundaryCondition'):
# bounding box for meshing. should not be included in files
continue
_bc = getattr(bfgeo.boundary_condition, field)
_bou[bfgeo.name] = _bc.value_dict
return _bou
[docs]def load_skipped_probes(log_file):
"""Return list of skipped points as tuples."""
assert os.path.isfile(log_file), "Can't find {}.".format(log_file)
_pts = []
with open(log_file, 'rb') as inf:
line = inf.readline()
while line and not line.startswith('Time = '):
if line.startswith(' Did not find location'):
_pts.append(
tuple(float(i) for i in line.split("(")[1].split(")")[0].split()))
line = inf.readline()
return _pts
[docs]def load_probes_from_postProcessing_file(probes_folder, field):
"""Return a generator of probes as tuples.
Args:
probes_folder: full path to probes folder.
field: Probes field (e.g. U, p, T).
"""
if not os.path.isdir(probes_folder):
raise ValueError(
'Failed to find probes folder folder at {}'.format(probes_folder))
folders = [os.path.join(probes_folder, f) for f in os.listdir(probes_folder)
if (os.path.isdir(os.path.join(probes_folder, f)) and
f.isdigit())]
# sort based on last modified
folders = sorted(folders, key=lambda folder:
max(tuple(os.stat(os.path.join(folder, f)).st_mtime
for f in os.listdir(folder))))
# load the last line in the file
_f = os.path.join(probes_folder, str(folders[-1]), field)
assert os.path.isfile(_f), 'Cannot find {}!'.format(_f)
with open(_f, 'rb') as inf:
for line in inf:
if line.startswith('# Probe'):
break
yield tuple(float(v) for v in line.strip().split('(')[-1][:-1].split())
[docs]def load_probe_values_from_folder(probes_folder, field):
"""Return OpenFOAM probe values for a field for the last timestep.
Args:
field: Probes field (e.g. U, p, T).
"""
if not os.path.isdir(probes_folder):
raise ValueError(
'Failed to find probes folder folder at {}'.format(probes_folder))
folders = [os.path.join(probes_folder, f) for f in os.listdir(probes_folder)
if (os.path.isdir(os.path.join(probes_folder, f)) and
f.isdigit())]
# sort based on last modified
folders = sorted(folders, key=lambda folder:
max(tuple(os.stat(os.path.join(folder, f)).st_mtime
for f in os.listdir(folder))))
# load the last line in the file
_f = os.path.join(probes_folder, str(folders[-1]), field)
assert os.path.isfile(_f), 'Cannot find {}!'.format(_f)
_res = read_last_line(_f).split()[1:]
# convert values to tuple or number
_rawres = tuple(d.strip() for d in read_last_line(_f).split()
if d.strip())[1:]
try:
# it's a number
_res = tuple(float(r) for r in _rawres)
except ValueError:
try:
# it's a vector
_res = tuple(eval(','.join(_rawres[3 * i: 3 * (i + 1)]))
for i in range(int(len(_rawres) / 3)))
except Exception as e:
raise Exception('\nFailed to load probes:\n{}'.format(e))
return _res
[docs]def load_probes_and_values_from_sample_file(fp):
"""Load probes and respected values from the results of a sample file."""
with open(fp, 'rb') as inf:
for line in inf:
res = (float(v) for v in line.split())
x = res.next()
y = res.next()
z = res.next()
v = tuple(res)
if len(v) == 1:
yield (x, y, z), float(v[0])
else:
yield (x, y, z), v
[docs]def load_of_points_file(path_to_file):
"""Return points as a generator of tuples."""
assert os.path.isfile(path_to_file), \
'Failed to find points file at {}'.format(path_to_file)
if path_to_file.endswith('.gz'):
pfile = gzip.open(path_to_file, 'rb')
else:
pfile = open(path_to_file, 'rb')
try:
for l in pfile:
if l.strip().startswith('(') and l.strip().endswith(')'):
yield eval(','.join(l.split()))
finally:
pfile.close()
[docs]def load_of_faces_file(path_to_file, inner_mesh=True):
"""Return faces indecies as a generator of tuples."""
assert os.path.isfile(path_to_file), \
'Failed to find faces file: {}'.format(path_to_file)
if path_to_file.endswith('.gz'):
ffile = gzip.open(path_to_file, 'rb')
else:
ffile = open(path_to_file, 'rb')
if not inner_mesh:
p, f = os.path.split(path_to_file)
fins = load_of_boundary_file(os.path.join(p, f.replace('faces', 'boundary')))
try:
fin = -1
for l in ffile:
tl = l.strip()
try:
if tl[1] == '(' and tl.endswith(')'):
fin += 1 # add to face number
if fin in fins:
yield eval(','.join(tl[1:].split()))
except IndexError:
continue
finally:
ffile.close()
else:
try:
for l in ffile:
tl = l.strip()
try:
if tl[1] == '(' and tl.endswith(')'):
yield eval(','.join(tl[1:].split()))
except IndexError:
continue
finally:
ffile.close()
[docs]def load_of_boundary_file(path_to_file):
"""Return Face indecies for boundary faces as a set."""
assert os.path.isfile(path_to_file), \
'Failed to find boundary file: {}'.format(path_to_file)
ind = []
with open(path_to_file, 'rb') as bf:
for line in bf:
line = line.strip()
if line.startswith('nFaces'):
count = int(line.split()[-1][:-1])
nl = next(bf)
st = int(nl.split()[-1][:-1])
ind.append(xrange(st, st + count))
return {i for rng in ind for i in rng}