import os
import h5py
import re
from .hdf5_definitions import HDF5GroupDefinition,HDF5DatasetDefinition
from .exceptions import OutputFileException,HDF5SchemaInconsistentException,HDF5SchemaExtraDefinitionException,ExperimentalException
from NekUpload.utils import parsing
from NekUpload.utils import gitlab_api as GitlabAPI
from NekUpload.validate.files import NekOutputFile,NekSessionFile
from NekUpload.validate.session import ValidateSession
import logging
[docs]
class ValidateOutput:
"""Class responsible for all output file validation checks
"""
def __init__(self, file_path: str):
"""Class initialiser
Args:
file_path (str): File path to output file
"""
self.file = file_path
self.file_name = os.path.basename(self.file)
[docs]
def check_schema(self) -> bool:
"""Check Output file conforms to HDF5 schema
Raises:
OutputSchemaHDF5Validator: _description_
Returns:
bool: Passed
"""
try:
with h5py.File(self.file, 'r') as f:
self.schema_checker = OutputSchemaHDF5Validator(f)
self.schema_checker.validate()
except OSError as e:
raise OutputFileException(self.file,f"Geometry file either does not exist or is not in HDF5 format {e}")
return True
[docs]
def check_checkpoint_schema(self,filter_checkpoint_list: list[str],chkpoint_file_list: list[str]) -> bool:
with NekOutputFile(self.file) as f:
decomposition_info = f.get_decomposition()
def chk_file_is_from_filter(chk_file: list[str]) -> bool:
"""Helper to check if a single file matches any filter pattern."""
for pattern in filter_checkpoint_list:
pattern = pattern.split(".")[0] #remove any extensions if they exist
if re.search(rf'{re.escape(pattern)}', chk_file):
return True
return False
for file in chkpoint_file_list:
if chk_file_is_from_filter(file):
#for now just skip over these
# they may have different structure to checkpoint files generated from parameters
continue
with NekOutputFile(file) as f:
decomposition = f.get_decomposition()
if decomposition != decomposition_info:
raise OutputFileException(self.file,f"Output file {self.file} and checkpoint file {file} have mismatched DECOMPOSITION definitions: \n"
f"OUTPUT FILE: {decomposition_info}. \n"
f"CHECKPOINT FILE: {decomposition}. ")
with h5py.File(file, 'r') as f:
schema_checker = OutputSchemaHDF5Validator(f)
schema_checker.validate()
[docs]
def check_checkpoint_from_filter_schema(self,session_file: str,geometry_file: str, solver, filter_checkpoint_list: list[str], chkpoint_file_list: list[str]) -> bool:
def chk_file_is_from_filter(chk_file: list[str]) -> bool:
"""Helper to check if a single file matches any filter pattern."""
for pattern in filter_checkpoint_list:
pattern = pattern.split(".")[0] #remove any extensions if they exist
if re.search(rf'{re.escape(pattern)}', chk_file):
return True
return False
for file in chkpoint_file_list:
if not chk_file_is_from_filter(file):
#for now just skip over these
# they may have different structure to checkpoint files generated from parmaters
continue
with NekSessionFile(session_file) as f:
var_num: int = len(f.get_variable_list())
checker = ValidateSession(session_file)
try:
checker.check_consistent_output_shape(geometry_file,file,solver,var_num)
except Exception as e:
raise ExperimentalException("Checking checkpoint filter files",
"This feature is in ValidateSession.check_consistent_output_shape")
[docs]
def check_chkpoint_filter_shape_only_vars(self,session_file:str,geometry_file:str,solver,chkpoint_file: str):
"""Assumes number of fields in chkpoint file are from VARIABLES list
Args:
session_file (str): _description_
geometry_file (str): _description_
solver (_type_): _description_
chkpoint_file (str): _description_
Raises:
ExperimentalException: _description_
"""
with NekSessionFile(session_file) as f:
var_num: int = len(f.get_variable_list())
checker = ValidateSession(session_file)
try:
checker.check_consistent_output_shape(geometry_file,chkpoint_file,solver,var_num)
except Exception:
raise ExperimentalException("Failed")
[docs]
def check_commit_is_public(self) -> bool:
"""Check that the GitSHA in the file is public
Returns:
bool: _description_
"""
with NekOutputFile(self.file) as f:
gitsha = f.get_gitsha()
if gitsha is None:
return False
try:
GitlabAPI.get_single_commit("https://gitlab.nektar.info","2",gitsha)
return True
except GitlabAPI.MissingGitlabCommit:
return False
[docs]
def get_gitsha(self) -> str:
with NekOutputFile(self.file) as f:
gitsha = f.get_gitsha()
return gitsha
[docs]
class OutputSchemaHDF5Validator:
"""Class for handling output HDF5 schema validation
"""
NO_DIM_CONSTRAINTS = -1 #helper
BASE_GROUPS = (HDF5GroupDefinition("NEKTAR",attributes=["FORMAT_VERSION"]),
#this is bare minimum, depending on solver, can have more, also sessionFile
#previously had TIME, but some runs do not output time
HDF5GroupDefinition("NEKTAR/Metadata",attributes=["ChkFileNum"]),
HDF5GroupDefinition("NEKTAR/Metadata/Provenance",attributes=["GitBranch","GitSHA1","Hostname","NektarVersion","Timestamp"]))
EXPECTED_DATASETS = (HDF5DatasetDefinition("NEKTAR/DECOMPOSITION",(NO_DIM_CONSTRAINTS,)),)
def __init__(self,f: h5py.File):
"""Class initialiser
Args:
f (h5py.File): Opened HDF5 file
"""
self.file: h5py.File = f
[docs]
def validate(self):
"""Check whether specified file conforms to the HDF5 output schema
"""
self._check_mandatory_groups(OutputSchemaHDF5Validator.BASE_GROUPS)
self._check_mandatory_datasets(OutputSchemaHDF5Validator.EXPECTED_DATASETS)
#acquire all other groups and datasets that should be present based on DECOMPOSITION definition
self._assert_decomposition()
expansion_groups: tuple[HDF5GroupDefinition] = tuple(self._get_expansion_groups())
optional_datasets: tuple[HDF5DatasetDefinition] = tuple(self._get_optional_datasets())
self._check_mandatory_groups(expansion_groups)
self._check_mandatory_datasets(optional_datasets)
#check no extraneous groups or datasets
valid_groups: tuple[HDF5GroupDefinition] = OutputSchemaHDF5Validator.BASE_GROUPS + expansion_groups
valid_datasets: tuple[HDF5DatasetDefinition] = OutputSchemaHDF5Validator.EXPECTED_DATASETS + optional_datasets
valid_groups_str = [group.get_path() for group in valid_groups]
valid_datasets_str = [dataset.get_path() for dataset in valid_datasets]
self._check_only_valid_groups_exist(valid_groups_str)
self._check_only_valid_datasets_exist(valid_datasets_str)
#check some more DECOMPOSITION data???
#assert true, for testing purposes
return True
def _check_mandatory_groups(self,groups: tuple[HDF5GroupDefinition]):
"""Check whether mandatory HDF5 Groups are present in the file
Args:
groups (tuple[HDF5GroupDefinition]): list of mandatory HDF5 Group definitions
"""
for group in groups:
group.validate(self.file)
def _check_mandatory_datasets(self,datasets: tuple[HDF5DatasetDefinition]):
"""CHeck whether mandatory HDF5 Datasets are present in the file
Args:
datasets (tuple[HDF5DatasetDefinition]): list of mandatory HDF5 Dataset definitions
"""
for dataset in datasets:
dataset.validate(self.file)
def _assert_decomposition(self):
"""Assert decomposition has correct shape
Raises:
HDF5SchemaInconsistentException: _description_
"""
#decomposition should come in group of 7
if self.file["NEKTAR/DECOMPOSITION"].shape[0] % 7 != 0:
raise HDF5SchemaInconsistentException(self.file,"HDF5 Schema Error: Decomposition shape should be multiple of 7")
def _get_expansion_groups(self) -> list[HDF5GroupDefinition]:
"""Get the expansion groups that should be defined, based on what is in DECOMPOSITION
Raises:
HDF5SchemaInconsistentException: _description_
Returns:
list[HDF5GroupDefinition]: _description_
"""
decomposition_dataset: h5py.Dataset = self.file["NEKTAR/DECOMPOSITION"]
#last of the 7 is a hash pointing to location in HDF5 file containing expansion data
num_expansion_groups = decomposition_dataset.shape[0] // 7
expected_groups: list[HDF5GroupDefinition] = []
for i in range(6,7*num_expansion_groups,7):
hash = decomposition_dataset[i]
expected_groups.append(HDF5GroupDefinition(f"NEKTAR/{hash}",attributes=["BASIS","FIELDS","NUMMODESPERDIR","SHAPE"]))
return expected_groups
def _get_optional_datasets(self) -> list[HDF5DatasetDefinition]:
"""Get all optional datasets defined by DECOMPOSITION
Returns:
list[HDF5DatasetDefinition]: _description_
"""
optional_datasets: list[HDF5DatasetDefinition] = []
optionals = {"NEKTAR/ELEMENTIDS": 0,
"NEKTAR/DATA": 1,
"NEKTAR/POLYORDERS": 2,
"NEKTAR/HOMOGENEOUSYIDS": 3,
"NEKTAR/HOMOGENEOUSZIDS": 4,
"NEKTAR/HOMOGENEOUSSIDS": 5}
for name,idx in optionals.items():
if dataset := self._get_dataset_defined_in_decomposition(name,idx):
optional_datasets.append(dataset)
return optional_datasets
def _get_dataset_defined_in_decomposition(self,
dataset_name: str,
decomposition_entry_id: int) -> HDF5DatasetDefinition | None:
"""DECOMPOSITION contains sequence of 7 entries, some of which will lead to definition of
extra datasets within the file. When the following are non-zero, a dataset is expected, and
are constructed with the same rule:
Note starting from 0:
2 -> number of modes when variable polynomial is defined
3 -> number of y planes for homogeneous simulations
4 -> number of z planes for homogeneous simulations
5 -> number of strips for homogeneous simulations
Args:
dataset_name (str): Name of the dataset to be defined
decomposition_entry_id (int): Decomposition entry id for desired dataset
Returns:
Optional[HDF5DatasetDefinition]: Dataset schema definition if one is required
"""
decomposition_dataset: h5py.Dataset = self.file["NEKTAR/DECOMPOSITION"]
size = decomposition_dataset.shape[0]
num_data_points: int = 0
for i in range(decomposition_entry_id,size,7):
num_data_points += decomposition_dataset[i]
return HDF5DatasetDefinition(dataset_name,(num_data_points,)) if num_data_points > 0 else None
def _get_polyorder_dataset(self) -> HDF5DatasetDefinition | None:
"""Get the polyorder dataset definition if it should exist, based on DECOMPOSITION entries, every third entry
Returns:
Optional[HDF5DatasetDefinition]: If polyorder dataset is defined, definition is returned, else None
"""
decomposition_dataset: h5py.Dataset = self.file["NEKTAR/DECOMPOSITION"]
size = decomposition_dataset.shape[0]
#3rd of the 7 grouping in decomposition
#is a number of modes that are polyorder???
num_polyorder_modes: int = 0
for i in range(2,size,7):
num_polyorder_modes += decomposition_dataset[i]
return HDF5DatasetDefinition("NEKTAR/POLYORDERS",(num_polyorder_modes,)) if num_polyorder_modes > 0 else None
def _check_only_valid_groups_exist(self,valid_groups: list[str]):
"""Check that only valid groups exist.
Args:
valid_groups (str): list of paths for valid HDF5 Groups
"""
#plus one to search for any extra invalid groups
#"" is a valid group too, and is provided in function call
valid_groups.append("")
max_groups = len(valid_groups) + 1
groups = parsing.get_hdf5_groups_with_depth_limit(self.file,3,max_groups=max_groups)
for group in groups:
if group not in valid_groups:
raise HDF5SchemaExtraDefinitionException(self.file,f"Encountered unkown group: {group}")
def _check_only_valid_datasets_exist(self,valid_datasets: list[str]):
"""Check that only valid datasets exist.
Args:
valid_datasets (str): list of paths for valid HDF5 Datasets
"""
max_datasets = len(valid_datasets) + 1
datasets = parsing.get_hdf5_datasets_with_depth_limit(self.file,3,max_datasets=max_datasets)
for dataset in datasets:
if dataset not in valid_datasets:
raise HDF5SchemaExtraDefinitionException(self.file,f"Encountered unkown dataset: {dataset}")