from lxml import etree
import os
import numpy as np
from NekUpload.validate.exceptions import XMLSchemaException,MissingInputFileException,MissingOutputFileException,SessionFileException
from NekUpload.validate.files import NekGeometryFile,NekSessionFile,NekOutputFile
from NekUpload.NekData.composite import CompositeDefinition
from NekUpload.NekData.expansions import ExpansionDefinition
from NekUpload.NekData.data_type import Elements,SolverType
from NekUpload.NekData.solver import SolverInfo
from NekUpload.NekData.filters import CheckpointFilter,NekFilterFactory,NekFilter
from NekUpload.utils.notification import warn_with_logging
[docs]
class ValidateSession:
"""Class responsible for validation session files
"""
def __init__(self,file_path: str):
"""Class initialiser
Args:
file_path (str): File path to the session XML file
"""
self.file_path = file_path
self.xml_tree = self._load_DOM_tree(self.file_path)
def _load_DOM_tree(self, xml_file: str) -> etree._Element:
"""Loads the DOM tree of the XML file
Args:
xml_file (str): Path to XML file
Returns:
etree._Element: Internal representation of DOM tree
"""
with open(xml_file, "rb") as xml:
xml_tree = etree.XML(xml.read())
return xml_tree
[docs]
def is_valid_xml(self,xml_file: str,schema_file_path: str) -> bool:
"""Checks whether XML file conforms to a schema
Args:
xml_file (str): XML file path
schema_file_path (str): XSD schema file path
Raises:
XMLSchemaException: _description_
Returns:
bool: Passed
"""
xsd_file = schema_file_path
with open(xsd_file,"rb") as xsd:
schema_root = etree.XML(xsd.read())
schema = etree.XMLSchema(schema_root)
with open(xml_file,"rb") as xml:
xml_tree = etree.XML(xml.read())
if schema.validate(xml_tree):
return True
else:
raise XMLSchemaException(xml_file,schema.error_log)
[docs]
def check_schema(self) -> bool:
"""Check file conforms to XML session schema
Returns:
bool: Passed
"""
xsd_schema = os.path.join(os.path.dirname(__file__), 'schemas/nektar.xsd') #ensure path always correct
return self.is_valid_xml(self.file_path, xsd_schema)
[docs]
def check_boundary_condition_schema(self,boundary_condition_files: list[str]) -> bool:
"""Identifies boundary condition files, and checks against schema
Args:
boundary_condition_files (list[str]): INput files
Returns:
bool: Valid
"""
filenames: list[str] = [os.path.basename(file) for file in boundary_condition_files]
with NekSessionFile(self.file_path) as f:
bc_filenames: list[str] = f.get_all_boundary_condition_files()
xsd_schema = os.path.join(os.path.dirname(__file__),"schemas/nektarbc.xsd")
for idx,file in enumerate(filenames):
if file in bc_filenames:
file_path = boundary_condition_files[idx]
self.is_valid_xml(file_path,xsd_schema)
return True
[docs]
def check_boundary_definition(self,geometry_file: str) -> bool:
"""Check that all boundaries defined in session file are present in the geometry file
Args:
geometry_file (str): Geometry nekg file
Returns:
bool: Valid if all boundaries in geometry files are defined in nekg file
"""
with NekSessionFile(self.file_path) as f:
geometry_dim,*_ = f.get_geometry_info()
#thses composite definitions contain C[...] i.e. ids for composites
session_defined_composite: dict[int,CompositeDefinition] = f.get_all_defined_boundary_regions()
with NekGeometryFile(geometry_file) as f:
geometry_boundary_info: dict[int,CompositeDefinition] = None
if geometry_dim == 2:
#these composite definitions contain E[...] i.e. ids for edges
geometry_boundary_info = f.get_composite_edge_info()
elif geometry_dim == 3:
#these composite definitions contain F[...] i.e. ids for faces
geometry_boundary_info = f.get_composite_face_info()
#extract all composite ids from geometry file that should be defined
boundary_composite_ids_to_be_defined_in_geometry: set[int] = set(geometry_boundary_info.keys())
#extract all composites that have been defined by session file
boundary_composite_ids_defined_by_session: set[int] = set()
for composite in session_defined_composite.values():
boundary_composite_ids_defined_by_session.update(composite.composite_ids)
#make sure all boundary composites in geometry are defined by session file
if not boundary_composite_ids_defined_by_session <= boundary_composite_ids_to_be_defined_in_geometry:
raise SessionFileException(self.file_path,(f"Session file has defined dimension as {geometry_dim} "
f"and expects {Elements.EDGE if geometry_dim == 2 else Elements.FACE}."
f"Geometry {geometry_file} expects COMPOSITE_ID : COMPOSITE_DEFINITION of {geometry_boundary_info}, "
f"while session {self.file_path} defined the following COMPOSITE_ID: {boundary_composite_ids_defined_by_session}"))
return True
[docs]
def check_geometry_file_reference(self,geometry_file: str) -> bool:
"""Check whether the geometry file provided is the one referencedd by the session file
Args:
geometry_file (str): Geometry file name
Raises:
SessionFileException: _description_
Returns:
bool: Valid
"""
with NekSessionFile(self.file_path) as f:
_,_,referenced_geometry_filename = f.get_geometry_info()
geometry_filename: str = os.path.basename(geometry_file)
if geometry_filename != referenced_geometry_filename:
raise SessionFileException(self.file_path,(f"Session file makes reference to <GEOMETRY HDF5FILE={referenced_geometry_filename}. "
f"However, you are uploading the geometry file: {geometry_filename}."))
return True
[docs]
def check_boundary_conditions_reference(self,boundary_condition_files: list[str]) -> bool:
"""Checks whetehr all files referenced in BOUNDARYCONDITION in session file are
present in the input list
Args:
boundary_condition_files (list[str]): Input file list
Raises:
SessionFileException: _description_
Returns:
bool: Valid
"""
boundary_condition_file_set: set[str] = {os.path.basename(file) for file in boundary_condition_files}
with NekSessionFile(self.file_path) as f:
mandatory_bc_files: list[str] = f.get_all_boundary_condition_files()
#make sure all mandatory boundary condition files are present
for file in mandatory_bc_files:
if file not in boundary_condition_file_set:
raise SessionFileException(self.file_path,(f"Session file, under BOUNDARYCONDITIONS, makes reference to "
f" the following boundary condition files {mandatory_bc_files}. "
f"Did not find the file: {file}. "
f"You have provided the following files: {boundary_condition_files}"))
return True
[docs]
def check_function_reference(self,function_file_list: list[str]) -> bool:
"""Checks whetehr all files referenced in FUNCTION in session file are
present in the input list
Args:
function_file_list (list[str]): Input file list
Raises:
SessionFileException: _description_
Returns:
bool: Valid
"""
function_file_set: set[str] = {os.path.basename(file) for file in function_file_list}
with NekSessionFile(self.file_path) as f:
mandatory_function_files: list[str] = f.get_all_function_files()
#make sure all mandatory boundary condition files are present
for file in mandatory_function_files:
if file not in function_file_set:
raise SessionFileException(self.file_path,(f"Session file, under FUNCTION/F, makes reference to "
f" the following function files {mandatory_function_files}. "
f"Did not find the file: {file}. "
f"You have provided the following files: {function_file_list}"))
return True
[docs]
def check_filter_files_reference(self,file_list: list[str]) -> bool:
#nektar is messy, some append ext others don't, so handle both cases
#exclude .chk files as though they can come from filters, they are counted as a separate entity
file_set: set[str] = {os.path.basename(file) for file in file_list}
file_set_no_extension: set[str] = {os.path.splitext(os.path.basename(file))[0] for file in file_list}
with NekSessionFile(self.file_path) as f:
filter_list: list[NekFilter] = f.get_filters(exclude_chkpoints=True)
expected_file_names: list[str] = [os.path.basename(f.output_file) for f in filter_list]
#make sure all mandatory filter files are present
for file in expected_file_names:
if file not in file_set and file not in file_set_no_extension:
raise SessionFileException(self.file_path,(f"Session file, under FILTERS, makes reference to "
f" the following filters {expected_file_names}. "
f"Did not find the output file for filter: {file}. "
f"You have provided the following files: {file_set}"))
return True
[docs]
def get_checkpoint_filter_filenames(self) -> list[str]:
with NekSessionFile(self.file_path) as f:
filter_list: list[NekFilter] = f.get_filters("Checkpoint")
filename_list = [filter.output_file for filter in filter_list]
return filename_list
[docs]
def check_checkpoint_files(self,checkpoint_files:list[str]) -> bool:
#if checkpoints are defined in both parameters and filters, both are printed out
chk_file_num_from_parameters,chk_timesteps_from_parameters = self._compute_chk_file_requirements_from_filter()
chk_file_num_from_filter,chk_timesteps_from_filter = self._compute_chk_file_requirements_from_parameters()
chk_file_num = chk_file_num_from_parameters + chk_file_num_from_filter
#generate a list from the checkpoint_files, as may not all be checkpoint files
#should have .chk
chk_file_list: list[str] = [file for file in checkpoint_files if file.endswith(".chk")]
if len(chk_file_list) != chk_file_num:
raise SessionFileException(self.file_path,(f"Based on the session file provided, there should be {chk_file_num} checkpoint files (.chk)."
f"There are actually {len(chk_file_list)} files: {chk_file_list}"))
return True
def _compute_chk_file_requirements_from_filter(self) -> tuple[int,tuple[float,...]]:
"""Given PARAMETERS and FILTERS in session file, try and identify how many checkpoint files are generated.
Attempts to find Checkpoint FILTER in FILTERS. If does not exist, returns empty information.
Returns:
tuple[int,tuple[float,...]]: Number of checkpoint files and a tuple contianing the expected time step at each point
"""
with NekSessionFile(self.file_path) as f:
chk_filter: list[CheckpointFilter] = f.get_filters("Checkpoint")
params: dict[str,int | float] = f.get_parameters()
num_steps: int = int(params.get("NumSteps", 0))
time_step: float | None = params.get("TimeStep")
final_time: float | None = params.get("FinTime")
#TODO unknown implementation, will need to doublecheck
if len(chk_filter) > 2:
raise SessionFileException(self.file_path,(
f"While looking for checkpoint filters under FILTERS, found {len(chk_filter)} CHECKPOINT definitions."
f"Undefined behaviour."
))
#for no checkpoint filters found, return empty
if not chk_filter:
return (0,())
output_frequency:int = chk_filter[0].output_frequency
output_start_time: int = chk_filter[0].output_start_time
if output_frequency == 0:
return (0,())
num_steps,time_step,final_time = self._resolve_timestep_parameters(num_steps,time_step,final_time)
#make end inclusive, use deltat/2 to prevent accidental additions in other point
time_steps_at_chk_number: tuple[float] = tuple(np.arange(output_start_time,final_time + time_step / 2,output_frequency*time_step))
chk_file_num: int = len(time_steps_at_chk_number)
return (chk_file_num,time_steps_at_chk_number)
def _compute_chk_file_requirements_from_parameters(self) -> tuple[int,tuple[float,...]]:
"""Given PARAMETERS in session file, try and identify how many checkpoint files are generated.
If IO_CheckSteps is 0, function returns safely. If IO_CheckSteps is not defined, function returns safely.
If IO_CheckSteps is defined, but two of NumSteps,TimeStep,FinTime are not defined, then function will exit with exception.
Otherwise, the number of checkpoint files to be generated is returned.
Returns:
tuple[int,tuple[float,...]]: Number of checkpoint files and a tuple contianing the expected time step at each point
"""
with NekSessionFile(self.file_path) as f:
params: dict[str,int | float] = f.get_parameters()
checkpoint_steps: int = int(params.get("IO_CheckSteps", 0))
num_steps: int = int(params.get("NumSteps", 0))
time_step: float | None = params.get("TimeStep")
final_time: float | None = params.get("FinTime")
#if checkpoint_steps = 0 or is None, no information on checkpoint files from PARAMETERS
if checkpoint_steps == 0:
return (0,())
warn_with_logging((f"You have specified checkpoint files via IO_CHECKSTEPS in PARAMETERS. "
f"This may be deprecated in future. Recommendation is to set checkpoint file "
f"output under FILTERS using <FILTER NAME='Checkpoint'>. See user documentation for more details."
))
num_steps,time_step,final_time = self._resolve_timestep_parameters(num_steps,time_step,final_time)
INITIAL_STATE_CHK_FILE: int = 1
num_chk_files = num_steps // checkpoint_steps + INITIAL_STATE_CHK_FILE
time_steps_at_chk_number: tuple[float,...] = tuple([chk_num * checkpoint_steps * time_step for chk_num in range(0,num_chk_files)])
return (num_chk_files,time_steps_at_chk_number)
def _resolve_timestep_parameters(self,num_steps:int | None,time_step:float | None,final_time:float | None) -> tuple[int,float,float]:
"""These parameters may or may not be explicitly defined. They therefore need to be resolved.
If insufficient information to caclulate these values, throw an error.
Args:
num_steps (int): _description_
time_step (float): _description_
final_time (float): _description_
Raises:
SessionFileException: _description_
SessionFileException: _description_
Returns:
tuple[int,float,float]: num_steps,time_step,final_time
"""
#if total number of simulation steps not defined explicitly,
#then time_stpe and final_time must be defined
if num_steps is None or num_steps == 0:
if time_step is None or final_time is None:
raise SessionFileException(self.file_path,(
"While attempting to identify the number of checkpoint files generated based on provided parameters, "
f"identified the following: NumSteps=None, TimeStep={time_step}, "
f"FinTime: {final_time}. Insufficient information to identify NumSteps from TimeStep and FinTime."
))
else:
num_steps = int(round(final_time / time_step))
#if time step not defined explicitly, then num_steps and final_time should be defined
if time_step is None:
if num_steps is None or num_steps == 0 or final_time is None:
raise SessionFileException(self.file_path,(
"While attempting to identify the number of checkpoint files generated based on provided parameters, "
f"identified the following: NumSteps={num_steps}, TimeStep={time_step}, "
f"FinTime: {final_time}. Insufficient information to identify TimeStep from NumSteps and FinTime."
))
else:
time_step = final_time / num_steps
#if final time not defined explicitly, then num_steps and time_steps should be defined
#if got to this point in code, already guaranteed that num_steps and time_steps are defined
if not final_time:
final_time = time_step * num_steps
#finally, check that numbers are constistent
#particularly important for when all three parameters are explicitly defined
if abs(time_step * num_steps - final_time) > 1e-6:
raise SessionFileException(self.file_path,(
"While attempting to identify the number of checkpoint files generated based on provided parameters, "
f"identified the following: NumSteps={num_steps}, TimeStep={time_step}, "
f"FinTime: {final_time}. Inconsistency found between provided information. "
f"NumSteps * TimeStep = FinTime => {num_steps * time_step} != {final_time}"
))
return num_steps,time_step,final_time
[docs]
def check_expansion_definition(self,geometry_file:str) -> bool:
"""Check whether expansion definition reference the composite objects
defined in the geometry file
Args:
geometry_file (str): Path to geometry nekg file
Returns:
True: Valid
"""
with NekSessionFile(self.file_path) as f:
#this function indirectly checks that all expansions defined in session
# are defined correctly in geometry
f.get_expansions(geometry_file)
return True
#TODO Refactor, quite ugly atm
[docs]
def check_consistent_output_shape(self,
geometry_file:str,
output_file:str,
solver:SolverType,
field_count:int=0) -> tuple[bool,list[str]]:
with NekSessionFile(self.file_path) as f:
composite_id_to_expansion: dict[int,list[ExpansionDefinition]] = f.get_expansions(geometry_file)
eq_type: str = f.get_equation_type()
dim,*_ = f.get_geometry_info()
is_movement_exist: bool = f.is_movement()
homogeneous_property: str = f.get_homogeneous_property()
hom_y_modes,hom_z_modes = f.get_homogeneous_modes()
homogeneous_coeffs_multiplier: int = 1
#different flavours of the same thing, as specified in nektar/FieldUtils/Fields
if (homogeneous_property == "1D" or homogeneous_property == "HOMOGENEOUS1D"
or homogeneous_property == "Homogeneous1D" or homogeneous_property == "Homo1D"):
homogeneous_coeffs_multiplier *= hom_z_modes
dim = 3#recast as quasi-3D
elif (homogeneous_property == "2D" or homogeneous_property == "HOMOGENEOUS2D"
or homogeneous_property == "Homogeneous2D" or homogeneous_property == "Homo2D"):
homogeneous_coeffs_multiplier *= (hom_z_modes * hom_y_modes)
dim = 3#recast as quasi-3D
IS_REFINEMENT_DEFINED = bool(f.get_refinements())
IS_FORCING_DEFINED = f.is_forcing_defined()
with NekGeometryFile(geometry_file) as f:
composite_list: dict[int,CompositeDefinition] = f.get_composite_info()
solver_info = SolverInfo(solver,dim,eq_type)
try:
num_fields_default = solver_info.get_var_num() if field_count == 0 else field_count
except KeyError as e:
raise SessionFileException(self.file_path,e)
except NotImplementedError as e:
warn_with_logging(f"Unimplemented feature, validation will not fail but admins will be notified: {e}")
num_unkown_coefficients: int = 0
for composite_id,expansion_list in composite_id_to_expansion.items():
composite: CompositeDefinition = composite_list[composite_id]
#first find total number of fields and max element number for same composite
#take larger coefficients, as padding is used
num_fields_from_file = sum([len(expansion.fields) for expansion in expansion_list])
max_num_coeffs = max([expansion.get_num_coefficients() for expansion in expansion_list])
num_fields = 0
if solver == SolverType.ADR_SOLVER:
num_fields = num_fields_from_file
else:
num_fields = num_fields_default
if is_movement_exist:
num_fields += len(["gridVx","gridVy","gridVz"])
num_unkown_coefficients += max_num_coeffs * composite.count * num_fields * homogeneous_coeffs_multiplier
with NekOutputFile(output_file) as f:
total_coeffs:int = f.get_total_coefficients() #this should be 1D
try:
if total_coeffs != num_unkown_coefficients:
raise SessionFileException(self.file_path,(f"Based on session file, "
f"there should be {num_unkown_coefficients} unknown coefficients."
f"Provided output file {output_file} contains {total_coeffs} calculated coefficients"))
except SessionFileException as e:
#if one of the unimplemented cases, skip, else raise
if IS_FORCING_DEFINED:
warn_with_logging(f"Due to presence of FORCING, which currently has unkown rules, "
f" failed validation will not cause hard failure. Administrators will "
f"check output file.")
return True,["Due to presence of FORCING, unable to ascertain whether output shape is consistent with input check."]
if IS_REFINEMENT_DEFINED:
warn_with_logging((f"You have refinements in this file. Skipping output shape check as this "
"has not yet been implemented. "))
return True,["Due to presence of REFINEMENTS, unable to ascertain whether output shape is consistent with input check."]
raise
return True,[]
if __name__ == "__main__":
f = "tests/datasets/IncNavierStokes/PlungingAirfoil_3DH1D_with_NACA0012_Re400"
validator = ValidateSession(f"{f}.xml")
validator.check_consistent_output_shape(f"{f}.nekg",f"{f}.fld",SolverType.INCOMPRESSIBLE_NAVIER_STOKES_SOLVER)