from __future__ import annotations
import h5py
import numpy as np
from lxml import etree
from NekUpload.metadata import extractor
from .exceptions import SessionFileException
from NekUpload.NekData.composite import CompositeDefinition
import NekUpload.NekData.expansions as nd_exp
from NekUpload.NekData.data_type import SolverType,Elements,IntegrationPoint,BasisType
from NekUpload.NekData.config import EQUATION_TYPE_TO_SOLVER
from NekUpload.NekData.filters import NekFilter,NekFilterFactory
from NekUpload.NekData.refinements import Refinements
from NekUpload.utils.notification import warn_with_logging
from NekUpload.utils import parsing
###############################################################################
# This module contains file abstractions for the expected Nektar files
# Responsible for returning relevant data from their associated files
# This should be used as an internal representation of files
###############################################################################
[docs]
class NekGeometryFile:
def __init__(self,geometry_path: str):
self.geometry_path: str = geometry_path
self.geometry_file: h5py.File | None = None
def __enter__(self) -> NekGeometryFile:
"""Opens the files and loads data"""
self.geometry_file = h5py.File(self.geometry_path,"r")
return self
def __exit__(self,exc_type: type=None, exec_value: Exception=None, traceback: object=None):
"""Ensures the file is closed on exit"""
if self.geometry_file:
self.geometry_file.close()
[docs]
def get_composite_info(self) -> dict[int,CompositeDefinition]:
"""Get dictionary of composite information, mapping composite id to its string definition, as per Nektar++ definition
Returns:
dict[int,CompositeDefinition]: COMPOSITE ID -> DEFINITION OF COMPOSITE
"""
composite_maps: np.ndarray = self.geometry_file["NEKTAR/GEOMETRY/MAPS/COMPOSITE"][:]
composite_mesh: np.ndarray = self.geometry_file["NEKTAR/GEOMETRY/MESH/COMPOSITE"][:]
composite_info: dict[int,CompositeDefinition] = {comp_id: CompositeDefinition(definition.decode("utf-8").strip())
for comp_id,definition in zip(composite_maps,composite_mesh)}
return composite_info
[docs]
def get_composite_edge_info(self) -> dict[int,CompositeDefinition]:
"""Get dictionary of composite information, mapping composite id to its string definition, as per Nektar++ definition.
Only returns edge information.
Returns:
dict[int,CompositeDefinition]: COMPOSITE ID -> DEFINITION OF COMPOSITE (ONLY EDGES)
"""
composite_info: dict[int,CompositeDefinition] = self.get_composite_info()
#E[...] is an edge
edges:dict[int,CompositeDefinition] = {comp_id: definition for comp_id,definition in composite_info.items()
if definition.element == Elements.EDGE}
return edges
[docs]
def get_composite_face_info(self) -> dict[int,CompositeDefinition]:
"""Get dictionary of composite information, mapping composite id to its string definition, as per Nektar++ definition.
Only returns face information.
Returns:
dict[int,CompositeDefinition]: COMPOSITE ID -> DEFINITION OF COMPOSITE (ONLY FACES)
"""
composite_info: dict[int,CompositeDefinition] = self.get_composite_info()
#F[...] is a face
faces:dict[int,CompositeDefinition] = {comp_id: definition for comp_id,definition in composite_info.items()
if definition.element == Elements.FACE}
return faces
[docs]
class NekSessionFile:
def __init__(self,session_path: str):
self.session_path = session_path
self.xml_tree: etree._ElementTree | None = None
self.root_nektar: etree._Element | None = None
def __enter__(self) -> NekSessionFile:
self.xml_tree = etree.parse(self.session_path)
self.root_nektar = self.xml_tree.getroot()
return self
def __exit__(self,exc_type: type | None, exec_value: Exception | None, traceback: object | None) -> None:
self.xml_tree = None
self.root_nektar = None
[docs]
def get_solver(self) -> SolverType | None:
if equation_type := self.get_equation_type():
return EQUATION_TYPE_TO_SOLVER[equation_type]
return None
def get_equation_type(self) -> SolverType | None:
CONDITIONS: etree._Element = self.root_nektar.find("CONDITIONS")
SOLVER_INFO: etree._Element = CONDITIONS.find("SOLVERINFO")
for I in SOLVER_INFO:
property:str = I.get("PROPERTY")
if property.upper() == "EQTYPE":
equation_type: str = I.get("VALUE")
return equation_type
return None
[docs]
def get_all_defined_boundary_regions(self) -> dict[int,CompositeDefinition]:
"""Gets all the defined boundary regions defined in session file. If
these boundary regions are not defined in boundary conditions, raise an error.
Otherwise, return a mapping from the session region reference id -> composite definition.
Raises:
SessionFileException: Regions are referenced, but no boundary condition is imposed
Returns:
dict[int,CompositeDefinition]: REGION REFERENCE ID -> COMPOSITE IDS
"""
CONDITIONS: etree._Element = self.root_nektar.find("CONDITIONS")
boundary_regions = CONDITIONS.find("BOUNDARYREGIONS")
boundary_conditions = CONDITIONS.find("BOUNDARYCONDITIONS")
#check what boundary regions are defined
boundary_region_map: dict[int,CompositeDefinition] = {
int(b.get("ID")): CompositeDefinition(ensure_composite_format(b.text))
for b in boundary_regions.findall("B")
}
boundary_region_conditions_defined: set[int] = {int(region.get("REF")) for region in boundary_conditions.findall("REGION")}
#first check that all boundary regions references are defined in boundary conditions
if not set(boundary_region_map.keys()).issubset(boundary_region_conditions_defined):
raise SessionFileException(self.session_path,(f"<BOUNDARYREGION> contains references to {sorted(list(boundary_region_map.keys()))} "
f" but only the following REFS are defined in <BOUNDARYCONDITION>: "
f"{sorted(list(boundary_region_conditions_defined))}"))
return boundary_region_map
[docs]
def get_geometry_info(self) -> tuple[int,int,str]:
"""Return all info in the GEOMETRY tag of the session file
Returns:
tuple[int,int,str]: Max dimension of elements, max dimension of the space, hdf5 geometry file name to be used
"""
GEOMETRY: etree._Element = self.root_nektar.find("GEOMETRY")
elmt_dimension = int(GEOMETRY.get("DIM"))
space_dimension = int(GEOMETRY.get("SPACE"))
file = str(GEOMETRY.get("HDF5FILE")).strip()
return elmt_dimension,space_dimension,file
[docs]
def get_all_boundary_condition_files(self) -> list[str]:
"""Get all filenames referenced by the BOUNDARYCONDITIONS
Returns:
list[str]: List of file names
"""
CONDITIONS: etree._Element = self.root_nektar.find("CONDITIONS")
BOUNDARYCONDITIONS: etree._Element = CONDITIONS.find("BOUNDARYCONDITIONS")
referenced_boundary_condition_files: set[str] = set() #remove repeated references to same file
all_regions = BOUNDARYCONDITIONS.findall("REGION")
for region in all_regions:
definition = region.findall("D") + region.findall("N") + region.findall("R")#only 3 tyeps of BCs, Dirichlet, Neumann, Robin
for d in definition:
if filename := d.get("FILE",None):
referenced_boundary_condition_files.add(str(filename).strip())
return list(referenced_boundary_condition_files)
[docs]
def get_all_function_files(self) -> list[str]:
"""Get all filenames referenced by all stated CONDITIONS/FUNCTION
Returns:
list[str]: List of file names
"""
CONDITIONS: etree._Element = self.root_nektar.find("CONDITIONS")
FUNCTIONS: list[etree._Element] = CONDITIONS.findall("FUNCTION")
referenced_function_files: set[str] = set() #can have repeated references to same file
for function in FUNCTIONS:
for f in function:
if filename := f.get("FILE"):
referenced_function_files.add(str(filename).strip())
return list(referenced_function_files)
[docs]
def get_parameters(self) -> dict[str,int|float]:
"""Parse and evaluate all <P> param = value </P> found under NEKTAR/CONDITIONS/PARAMETERS
Returns:
dict[str,int|float]: Parameter name -> numeric value
"""
CONDITIONS: etree._Element = self.root_nektar.find("CONDITIONS")
PARAMETERS: etree._Element = CONDITIONS.find("PARAMETERS")
raw_params: dict[str,str] = {}
#of form <P> param = value<P/>
for p in PARAMETERS.findall("P"):
lhs,rhs = parsing.get_both_sides_of_equals(p.text)
raw_params[lhs] = rhs
params: dict[str,int | float] = parsing.evaluate_parameters(raw_params)
return params
[docs]
def get_filters(self,filter_type:str=None,exclude_chkpoints:bool=False) -> list[NekFilter]:
"""Get list of available filters from file. Note that currently supports only AEROFORCE and CHECKPOINT and History.
Ignores all other filters. #TODO Implement all other filters.
Params:
filter_type(str): Optional, if only want a specific filter type #TODO could make this an enum
exclude_chkpoints(bool): If True, excludes checkpoint files from list of filters
Returns:
list[NekFilter]: List of available filters
"""
FILTERS: etree._Element = self.root_nektar.find("FILTERS")
#if no filters defined, return empty list
if FILTERS is None:
return []
filter_list: list[NekFilter] = []
for filter in FILTERS.findall("FILTER"):
filter_name = filter.get("TYPE").strip()
#if only want certain type of filter, and not satisfied, skip
if filter_type and filter_name != filter_type:
continue
#exclude checkpoints if bool flag is set
if exclude_chkpoints and filter_name == "Checkpoint":
continue
params: dict[str,str] = {param.get("NAME").strip(): param.text.strip() for param in filter.findall("PARAM")}
if nek_filter := NekFilterFactory.get_filter(filter_name,params,self.session_path):
filter_list.append(nek_filter)
return filter_list
[docs]
def get_equation_type(self) -> str:
CONDITIONS: etree._Element = self.root_nektar.find("CONDITIONS")
SOLVER_INFO: etree._Element = CONDITIONS.find("SOLVERINFO")
for I in SOLVER_INFO:
if I.get("PROPERTY").strip().lower() == "eqtype":
return I.get("VALUE")
return None
[docs]
def get_refinements(self) -> dict[int,Refinements]:
REFINEMENTS: etree._Element = self.root_nektar.find("REFINEMENTS")
if REFINEMENTS is None:
return {}
ref_id_to_refinement_map: dict[int,Refinements] = {}
for R in REFINEMENTS.findall("R"):
ref_id: int = int(R.get("REF"))
type: str = R.get("TYPE").strip()
radius: float = float(R.get("RADIUS"))
coord1:tuple[str,str,str] = R.get("COORDINATE1").split(",")
coord1: np.ndarray = np.array([float(x) for x in coord1])
if coord2 := R.get("COORDINATE2"):
coord2:tuple[str,str,str] = coord2.split(",")
coord2: np.ndarray = np.array([float(x) for x in coord2])
nummodes = R.get("NUMMODES")
numpoints = R.get("NUMPOINTS")
refinement = Refinements(ref_id=ref_id,
type=type,
radius=radius,
coordinate1=coord1,
coordinate2=coord2,
nummodes=nummodes,
numpoints=numpoints)
ref_id_to_refinement_map[ref_id] = refinement
return ref_id_to_refinement_map
[docs]
def get_variable_list(self) -> list[str]:
CONDITIONS: etree._Element = self.root_nektar.find("CONDITIONS")
VARIABLES: etree._Element = CONDITIONS.find("VARIABLES")
var_list: list[str] = []
for V in VARIABLES.findall("V"):
var_list.append(V.text.strip())
return var_list
[docs]
def is_forcing_defined(self) -> bool:
"""placeholder function for deterining if forcing is defined
Returns:
bool: _description_
"""
FORCING: etree._Element = self.root_nektar.find("FORCING")
return True if FORCING is not None else False
[docs]
def get_expansions(self,geometry_file:str) -> dict[int,list[nd_exp.ExpansionDefinition]]:
"""Get expansion list from expansion definitions. Extracts composite info from geometry file,
and uses it to generate expansion informaiton.
Args:
geometry_file (str): Geometry nekg HDF5 file
Returns:
dict[int,list[nd_exp.ExpansionDefinition]]: Mapping of composite ID -> list of expansion definitions with this id
"""
homogeneous_property: str | None = self.get_homogeneous_property()
with NekGeometryFile(geometry_file) as f:
COMPOSITE_INFO_MAP: dict[int,CompositeDefinition] = f.get_composite_info()
EXPANSIONS: etree._Element = self.root_nektar.find("EXPANSIONS")
#list as can have case where custom type defines u,v and p with different expansion for same element
expansion_objs: dict[int,list[nd_exp.ExpansionDefinition]] = {}
for expansion in EXPANSIONS.findall("E"):
composite_info: CompositeDefinition = CompositeDefinition(expansion.get("COMPOSITE"))
for composite_id in composite_info.composite_ids:
#so previously got C[...] so id is that of composite
#want to get actual composite element definition Q[...] or T[...] etc
composite_element: CompositeDefinition = COMPOSITE_INFO_MAP[composite_id]
shape: Elements = composite_element.element
#TODO, set default here based on solver type
fields: tuple[str,...] = tuple(expansion.get("FIELDS",None).split(","))
if not expansion_objs.get(composite_id,None):
expansion_objs[composite_id] = []
expansion_def: nd_exp.ExpansionDefinition = None
if expansion.get("TYPE",None):
expansion_def = self._get_default_expansions(expansion,shape,fields)
else:
expansion_def = self._get_custom_expansions(expansion,shape,fields)
if refinement_ids := expansion.get("REFIDS"):
refinement_ids: tuple[str,...] = refinement_ids.split(",")
refinement_ids: list[int] = [int(ref_id) for ref_id in refinement_ids]
expansion_def.add_refinement_ids(refinement_ids)
expansion_objs[composite_id].append(expansion_def)
return expansion_objs
[docs]
def get_homogeneous_property(self) -> str | None:
"""Get HOMOGENEOUS property. If not specified, return None.
Returns:
str | None: _description_
"""
CONDITIONS = self.root_nektar.find("CONDITIONS")
SOLVER_INFO = CONDITIONS.find("SOLVERINFO")
for I in SOLVER_INFO:
if I.get("PROPERTY") == "HOMOGENEOUS":
return I.get("VALUE").strip()
return None
[docs]
def get_homogeneous_modes(self) -> tuple[int,int]:
"""Get number of homogenous modes. If none exist, returns unity.
Else, returns modes in y direction, and modes in z direction
Returns:
tuple[int,int]: _description_
"""
params = self.get_parameters()
#try and get modes
hom_modes_y = int(params.get("HomModesY",1))
hom_modes_z = int(params.get("HomModesZ",1))
return hom_modes_y,hom_modes_z
#allows for lazy loading of factories
_EXPANSION_INSTANCES = {}
#make assumption on what it looks like
BASIS_TYPE = {"Modified_A": BasisType.MODIFIED_A,
"Modified_B": BasisType.MODIFIED_B,
"Modified_C": BasisType.MODIFIED_C,
"Ortho_A": BasisType.ORTHO_A,
"Ortho_B": BasisType.ORTHO_B,
"Ortho_C": BasisType.ORTHO_C,
"Ortho_Pyr_C": BasisType.ORTHO_PYR_C,
"Modified_Pyr_C": BasisType.MODIFIED_PYR_C,
"Fourier": BasisType.FOURIER,
"GLL_Lagrange": BasisType.GLL_LAGRANGE,
"Gauss_Lagrange": BasisType.GAUSS_LAGRANGE,
"Legendre": BasisType.LEGENDRE,
"Chebyshev": BasisType.CHEBYSHEV,
"Monomial": BasisType.MONONMIAL,
"Fourier_Single_Mode": BasisType.FOURIER_SINGLE_MODE,
"Fourier_Half_Mode_Re": BasisType.FOURIER_HALF_MODE_RE,
"Fourier_Half_Mode_Im": BasisType.FOURIER_HALF_MODE_IM,
}
#also make assumptions
INTEGRATION_POINTS_TYPE = {
"GaussGaussLegendre": IntegrationPoint.GAUSS_GAUSS_LEGENDRE,
"GaussRadauMLegendre": IntegrationPoint.GAUSS_RADAU_M_LEGENDRE,
"GaussRadauPLegendre": IntegrationPoint.GAUSS_RADAU_P_LEGENDRE,
"GaussLobattoLegendre": IntegrationPoint.GAUSS_LOBATTO_LEGENDRE,
"GaussGaussChebyshev": IntegrationPoint.GAUSS_GAUSS_CHEBYSHEV,
"GaussRadauMChebyshev": IntegrationPoint.GAUSS_RADAU_M_CHEBYSHEV,
"GaussRadauPChebyshev": IntegrationPoint.GAUSS_RADAU_P_CHEBYSHEV,
"GaussLobattoChebyshev": IntegrationPoint.GAUSS_LOBATTO_CHEBYSHEV,
"GaussRadauMAlpha0Beta1": IntegrationPoint.GAUSS_RADAU_M_ALPHA0_BETA1,
"GaussRadauMAlpha0Beta2": IntegrationPoint.GAUSS_RADAU_M_ALPHA0_BETA2,
"GaussRadauMAlpha1Beta0": IntegrationPoint.GAUSS_RADAU_M_ALPHA1_BETA0,
"GaussRadauMAlpha2Beta0": IntegrationPoint.GAUSS_RADAU_M_ALPHA2_BETA0,
"GaussKronrodLegendre": IntegrationPoint.GAUSS_KRONROD_LEGENDRE,
"GaussRadauKronrodMLegendre": IntegrationPoint.GAUSS_RADAU_KRONROD_M_LEGENDRE,
"GaussRadauKronrodMAlpha1Beta0": IntegrationPoint.GAUSS_RADAU_KRONROD_M_ALPHA1_BETA0,
"GaussLobattoKronrodLegendre": IntegrationPoint.GAUSS_LOBATTO_KRONROD_LEGENDRE,
"PolyEvenlySpaced": IntegrationPoint.POLY_EVENLY_SPACED,
"FourierEvenlySpaced": IntegrationPoint.FOURIER_EVENLY_SPACED,
"FourierSingleModeSpaced": IntegrationPoint.FOURIER_SINGLE_MODE_SPACED,
"BoundaryLayerPoints": IntegrationPoint.BOUNDARY_LAYER_POINTS,
"BoundaryLayerPointsRev": IntegrationPoint.BOUNDARY_LAYER_POINTS_REV,
"NodalTriElec": IntegrationPoint.NODAL_TRI_ELEC,
"NodalTriFekete": IntegrationPoint.NODAL_TRI_FEKETE,
"NodalTriEvenlySpaced": IntegrationPoint.NODAL_TRI_EVENLY_SPACED,
"NodalTetEvenlySpaced": IntegrationPoint.NODAL_TET_EVENLY_SPACED,
"NodalTetElec": IntegrationPoint.NODAL_TET_ELEC,
"NodalPrismEvenlySpaced": IntegrationPoint.NODAL_PRISM_EVENLY_SPACED,
"NodalPrismElec": IntegrationPoint.NODAL_PRISM_ELEC,
"NodalTriSPI": IntegrationPoint.NODAL_TRI_SPI,
"NodalTetSPI": IntegrationPoint.NODAL_TET_SPI,
"NodalPrismSPI": IntegrationPoint.NODAL_PRISM_SPI,
"NodalQuadElec": IntegrationPoint.NODAL_QUAD_ELEC,
"NodalHexElec": IntegrationPoint.NODAL_HEX_ELEC,
"SizePointsType": IntegrationPoint.SIZE_POINTS_TYPE,
}
@staticmethod
def _get_expansion_factory(name:str) -> nd_exp.ExpansionFactory:
"""Method to lazily load in expansion factory on demand to reduce overhead.
Returns the correct expansion factory type. #TODO Check names
Args:
name (str): String denoting expansion type, as found in session files
Returns:
nd_exp.ExpansionFactory: Expansion Factory
"""
#making some assumptions here about XML input, can't find docs anywhere
factories = {
"MODIFIED": nd_exp.ModifiedExpansionFactory,
"MODIFIEDQUADPLUS1": nd_exp.ModifiedQuadPlus1ExpansionFactory,
"MODIFIEDQUADPLUS2": nd_exp.ModifiedQuadPlus2ExpansionFactory,
"ModifiedGLLRadau10": nd_exp.ModifiedGLLRadau10ExpansionFactory,
"GLL_LAGRANGE": nd_exp.GLLLagranageExpansionFactory,
"GAUSS_LAGRANGE": nd_exp.GaussLagrangeExpansionFactory,
"ORTHOGONAL": nd_exp.OrthogonalExpansionFactory,
"GLL_LAGRANGE_SEM": nd_exp.GLLLagrangeSEMExpansionFactory,
"FOURIER": nd_exp.FourierExpansionFactory,
"FOURIERSINGLEMODE": nd_exp.FourierSingleModeExpansionFactory,
"FOURIERHALFMODERE": nd_exp.FourierHalfModeReExpansionFactory,
"FOURIERHALFMODEIM": nd_exp.FourierHalfModeImExpansionFactory,
"CHEBYSHEV": nd_exp.ChebyshevExpansionFactory,
"FOURIERCHEBYSHEV": nd_exp.FourierChebyshevExpansionFactory,
"CHEBYSHEVFOURIER": nd_exp.ChebyshevFourierExpansionFactory,
"FOURIERMODIFIED": nd_exp.ModifiedFourierExpansionFactory,
}
if name in factories:
instance = factories[name]()
NekSessionFile._EXPANSION_INSTANCES[name] = instance
return instance
else:
return None
def _get_default_expansions(self,expansion: etree._Element,shape: Elements,fields: tuple[str,...]=None) -> nd_exp.ExpansionDefinition:
"""Gets default expansion if <E ... TYPE=""/>
Args:
expansion (etree._Element): Expansion element
shape (Elements): Element shape
fields (tuple[str,...], optional): Fields with this definition. Defaults to None.
Returns:
ExpansionDefinition: Expansion definition
"""
num_modes = int(expansion.get("NUMMODES"))
type: str = expansion.get("TYPE")
return self._get_expansion_factory(type).get_expansion(shape,num_modes,fields)
def _get_custom_expansions(self,expansion: etree._Element,shape: Elements,fields: tuple[str,...]=None) -> nd_exp.ExpansionDefinition:
"""If not using predefined element, and instead <E ... /> with custom definition, construct expansion
Args:
expansion (etree._Element): Expansion element
shape (Elements): Element shape
fields (tuple[str,...], optional): Fields with this definition. Defaults to None.
Returns:
ExpansionDefinition: Expansion definition
"""
numpoints_str = tuple(item for item in expansion.get("NUMPOINTS").split(","))
nummodes_str = tuple(expansion.get("NUMMODES", "").split(","))
basistype_str = tuple(expansion.get("BASISTYPE", "").split(","))
pointstype_str = tuple(expansion.get("POINTSTYPE", "").split(","))
#convert ot format for ExpansionDefinition
numpoints = tuple(int(val) for val in numpoints_str if val)
nummodes = tuple(int(val) for val in nummodes_str if val)
basistype = tuple(NekSessionFile.BASIS_TYPE[basis] for basis in basistype_str)
pointstype = tuple(NekSessionFile.INTEGRATION_POINTS_TYPE[point] for point in pointstype_str)
return nd_exp.ExpansionDefinition(shape,basistype,nummodes,pointstype,numpoints,fields)
[docs]
def is_movement(self) -> bool:
#checks existence of MOVEMENTS
return self.root_nektar.find("MOVEMENT") is not None
[docs]
class NekOutputFile:
def __init__(self,output_path: str):
self.output_path: str = output_path
self.output_file: h5py.File | None = None
def __enter__(self) -> NekOutputFile:
"""Opens the files and loads data"""
self.output_file = h5py.File(self.output_path,"r")
return self
def __exit__(self,exc_type: type=None, exec_value: Exception=None, traceback: object=None):
"""Ensures the file is closed on exit"""
if self.output_file:
self.output_file.close()
[docs]
def get_gitsha(self) -> str | None:
return extractor.HDF5Extractor.extract_attribute(self.output_file,"NEKTAR/Metadata/Provenance","GitSHA1")
[docs]
def get_total_coefficients(self) -> int:
return int(self.output_file["NEKTAR/DATA"].shape[0]) #should be 1d anyway
[docs]
def get_decomposition(self) -> list[int]:
return list(self.output_file["NEKTAR/DECOMPOSITION"][:])