From 037923f0bd2833955fae51deb2822e632b2d739b Mon Sep 17 00:00:00 2001 From: Rodrigo Neto Date: Mon, 4 Mar 2024 18:32:14 -0300 Subject: [PATCH 1/2] Add History Matching metadata and reading functions to aggregator ASIM-5600 --- src/alfasim_sdk/result_reader/aggregator.py | 224 +++++++++++++++--- .../result_reader/aggregator_constants.py | 4 +- tests/conftest.py | 107 +++++++++ tests/results/test_aggregator.py | 117 ++++++++- 4 files changed, 417 insertions(+), 35 deletions(-) diff --git a/src/alfasim_sdk/result_reader/aggregator.py b/src/alfasim_sdk/result_reader/aggregator.py index ed99a2038..85e3520fa 100644 --- a/src/alfasim_sdk/result_reader/aggregator.py +++ b/src/alfasim_sdk/result_reader/aggregator.py @@ -10,6 +10,7 @@ from typing import Dict from typing import Iterator from typing import List +from typing import Literal from typing import Optional from typing import Tuple @@ -22,6 +23,13 @@ from alfasim_sdk.result_reader.aggregator_constants import ( GLOBAL_SENSITIVITY_ANALYSIS_GROUP_NAME, ) +from alfasim_sdk.result_reader.aggregator_constants import ( + HISTORY_MATCHING_DETERMINISTIC_DSET_NAME, +) +from alfasim_sdk.result_reader.aggregator_constants import HISTORY_MATCHING_GROUP_NAME +from alfasim_sdk.result_reader.aggregator_constants import ( + HISTORY_MATCHING_PROBABILISTIC_DSET_NAME, +) from alfasim_sdk.result_reader.aggregator_constants import META_GROUP_NAME from alfasim_sdk.result_reader.aggregator_constants import PROFILES_GROUP_NAME from alfasim_sdk.result_reader.aggregator_constants import ( @@ -69,6 +77,10 @@ All the metadata will represents the same kind of output (profiles/trends). """ +HistoryMatchingResultKeyType = str +"""\ +A HM result key is simply the id of the parametric var associated with that particular result. +""" TimeSetInfoItem = namedtuple("TimeSetInfoItem", "global_start size uuid") TimeSetInfo = Dict[int, TimeSetInfoItem] @@ -192,8 +204,8 @@ def map_data( for key, data in gsa_metadata.items() } - with open_global_sensitivity_analysis_result_file( - result_directory=result_directory + with open_result_file( + result_directory, result_filename="uq_result" ) as result_file: if not result_file: return cls.empty(result_directory=result_directory) @@ -206,6 +218,106 @@ def map_data( ) +@attr.s(slots=True, hash=False) +class HistoryMatchingMetadata: + """ + Holder for the History Matching results metadata. + + :ivar hm_items: + Map of the data id and its associated metadata. + :ivar objective_functions: + Map of observed curve id to a dict of Quantity of Interest data, populated with keys + 'trend_id' and 'property_id'. This represents the setup for this HM analysis. + :ivar result_directory: + The directory in which the result is saved. + """ + + @attr.s(slots=True, hash=False) + class HMItem: + """ + Metadata associated with each item of the HM results. + + :ivar parametric_var_id: + The id of the associated parametric var. + :ivar parametric_var_name: + The name of the associated parametric var. + :ivar min_value: + Lower limit of the specified range for the parametric var. + :ivar max_value: + Upper limit of the specified range for the parametric var. + :ivar data_index: + The index of the data in the result datasets. + """ + + parametric_var_id: str = attr.ib(validator=attr.validators.instance_of(str)) + parametric_var_name: str = attr.ib(validator=attr.validators.instance_of(str)) + min_value: float = attr.ib(validator=attr.validators.instance_of(float)) + max_value: float = attr.ib(validator=attr.validators.instance_of(float)) + data_index: int = attr.ib(validator=attr.validators.instance_of(int)) + + @classmethod + def from_dict(cls, data: Dict[str, Any]) -> Self: + """ + Parse a dict into a HM item. + + :raises: KeyError if some expected key is not present in the given data dict. + """ + return cls( + parametric_var_name=data["parametric_var_name"], + parametric_var_id=data["parametric_var_id"], + min_value=data["min_value"], + max_value=data["max_value"], + data_index=data["data_index"], + ) + + hm_items: Dict[str, HMItem] = attr.ib(validator=attr.validators.instance_of(Dict)) + objective_functions: Dict[str, Dict[str, str]] = attr.ib( + validator=attr.validators.instance_of(Dict) + ) + result_directory: Path = attr.ib(validator=attr.validators.instance_of(Path)) + + @classmethod + def empty(cls, result_directory: Path) -> Self: + return cls( + hm_items={}, objective_functions={}, result_directory=result_directory + ) + + @classmethod + def from_result_directory(cls, result_directory: Path) -> Self: + """ + Read History Matching results metadata from result directory/file. + + If result file is not ready or doesn't exist, return an empty metadata. + """ + + def map_data(hm_metadata: Dict) -> Dict[str, HistoryMatchingMetadata.HMItem]: + return { + key: HistoryMatchingMetadata.HMItem.from_dict(data) + for key, data in hm_metadata.items() + } + + with open_result_file(result_directory) as result_file: + if not result_file: + return cls.empty(result_directory=result_directory) + + loaded_metadata = json.loads( + result_file[META_GROUP_NAME].attrs[HISTORY_MATCHING_GROUP_NAME] + ) + + if len(loaded_metadata) == 0: + return cls.empty(result_directory=result_directory) + + objective_functions = list(loaded_metadata.values())[0][ + "objective_functions" + ] + + return cls( + hm_items=map_data(loaded_metadata), + objective_functions=objective_functions, + result_directory=result_directory, + ) + + @attr.s(slots=True, hash=False) class ALFASimResultMetadata: """ @@ -1556,28 +1668,6 @@ def merge_metadata(a_metadata, b_metadata, *, source, update=(), min_=(), max_=( ) -@contextmanager -def open_global_sensitivity_analysis_result_file( - result_directory: Path, -) -> Iterator[Optional[h5py.File]]: - """ - Open a global sensitivity analysis result file. - :param result_directory: - The path to result directory. - """ - filename = result_directory / "uq_result" - ignored_file = result_directory / "uq_result.creating" - - if not filename.is_file(): - yield None - # Avoid to read result files with incomplete metadata. - elif not ignored_file.is_file(): - with _open_result_file(filename) as file: - yield file - else: - yield None - - def read_global_sensitivity_analysis_meta_data( result_directory: Path, ) -> Optional[GlobalSensitivityAnalysisMetadata]: @@ -1596,9 +1686,7 @@ def read_global_sensitivity_analysis_time_set( """ Get the time set for sensitivity analysis results. """ - with open_global_sensitivity_analysis_result_file( - result_directory=result_directory - ) as result_file: + with open_result_file(result_directory, result_filename="uq_result") as result_file: if not result_file: return return result_file[GLOBAL_SENSITIVITY_ANALYSIS_GROUP_NAME]["time_set"][:] @@ -1615,9 +1703,87 @@ def read_global_sensitivity_coefficients( if not metadata.gsa_items: return None meta = metadata.gsa_items[coefficients_key] - with open_global_sensitivity_analysis_result_file( - metadata.result_directory + with open_result_file( + metadata.result_directory, result_filename="uq_result" ) as result_file: gsa_group = result_file[GLOBAL_SENSITIVITY_ANALYSIS_GROUP_NAME] coefficients_dset = gsa_group["global_sensitivity_analysis"] return coefficients_dset[meta.qoi_index, meta.qoi_data_index] + + +def read_history_matching_metadata(result_directory: Path) -> HistoryMatchingMetadata: + """ + :param result_directory: + The directory to lookup for the History Matching result file. + """ + return HistoryMatchingMetadata.from_result_directory(result_directory) + + +def read_history_matching_result( + metadata: HistoryMatchingMetadata, + hm_type: Literal["deterministic", "probabilistic"], + hm_result_key: HistoryMatchingResultKeyType | None = None, +) -> Dict[HistoryMatchingResultKeyType, np.ndarray | float]: + """ + :param metadata: + History Matching result metadata. + :param hm_type: + The type of HM analysis. Can be 'deterministic' or 'probabilistic'. + :param hm_result_key: + The id of the parametric vars to collect the result. Defaults to None, in which case the + result of all keys found in the metadata will be returned. + :return: + A dict mapping the HM result key (the parametric var id) to its corresponding result, which + could be an array with N values (N being the sampling size) for the probabilistic or a single + float for the deterministic. + """ + if hm_type not in ("deterministic", "probabilistic"): + raise ValueError(f"history matching of type `{hm_type}` not supported") + + if hm_type == "deterministic": + dataset_key = HISTORY_MATCHING_DETERMINISTIC_DSET_NAME + else: + assert hm_type == "probabilistic" + dataset_key = HISTORY_MATCHING_PROBABILISTIC_DSET_NAME + + with open_result_file(metadata.result_directory) as result_file: + if not result_file: + return {} + + result = result_file[HISTORY_MATCHING_GROUP_NAME][dataset_key] + + result_map = {} + if hm_result_key is None: + for key, meta in metadata.hm_items.items(): + result_map[key] = result[meta.data_index] + else: + meta = metadata.hm_items.get(hm_result_key) + if meta is not None: + result_map[hm_result_key] = result[meta.data_index] + + return result_map + + +@contextmanager +def open_result_file( + result_directory: Path, result_filename: str = "result" +) -> Iterator[Optional[h5py.File]]: + """ + :param result_directory: + The directory to lookup for the result file. + :param result_filename: + The filename. + :return: + The result HDF file, or None if it doesn't exist or is still being created. + """ + filepath = result_directory / result_filename + ignored_file = result_directory / f"{result_filename}.creating" + + if not filepath.is_file(): + yield None + # Avoid to read result files with incomplete metadata. + elif not ignored_file.is_file(): + with _open_result_file(filepath) as file: + yield file + else: + yield None diff --git a/src/alfasim_sdk/result_reader/aggregator_constants.py b/src/alfasim_sdk/result_reader/aggregator_constants.py index e303e6dcd..cafa509c4 100644 --- a/src/alfasim_sdk/result_reader/aggregator_constants.py +++ b/src/alfasim_sdk/result_reader/aggregator_constants.py @@ -4,8 +4,8 @@ GLOBAL_SENSITIVITY_ANALYSIS_GROUP_NAME = "global_sensitivity_analysis" HISTORY_MATCHING_GROUP_NAME = "history_matching" -HISTORY_MATCHING_DETERMINISTIC_DSET_NAME = "deterministic_values" -HISTORY_MATCHING_PROBABILISTIC_DSET_NAME = "probabilistic_distributions" +HISTORY_MATCHING_DETERMINISTIC_DSET_NAME = "history_matching_deterministic" +HISTORY_MATCHING_PROBABILISTIC_DSET_NAME = "history_matching_probabilistic" TIME_SET_DSET_NAME = "time_set" diff --git a/tests/conftest.py b/tests/conftest.py index baa0f0dde..5c86a0752 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -4,8 +4,11 @@ import textwrap from pathlib import Path from typing import List +from typing import Sequence +from typing import Tuple import h5py +import numpy as np import pytest from _pytest.fixtures import FixtureRequest from _pytest.monkeypatch import MonkeyPatch @@ -13,6 +16,13 @@ from alfasim_sdk.result_reader.aggregator_constants import ( GLOBAL_SENSITIVITY_ANALYSIS_GROUP_NAME, ) +from alfasim_sdk.result_reader.aggregator_constants import ( + HISTORY_MATCHING_DETERMINISTIC_DSET_NAME, +) +from alfasim_sdk.result_reader.aggregator_constants import HISTORY_MATCHING_GROUP_NAME +from alfasim_sdk.result_reader.aggregator_constants import ( + HISTORY_MATCHING_PROBABILISTIC_DSET_NAME, +) from alfasim_sdk.result_reader.aggregator_constants import META_GROUP_NAME from alfasim_sdk.result_reader.aggregator_constants import TIME_SET_DSET_NAME from alfasim_sdk.result_reader.reader import Results @@ -259,3 +269,100 @@ def global_sa_results_dir(datadir: Path) -> Path: gsa_data_set[:] = global_sensitivity_analysis file.close() return result_dir + + +def _create_and_populate_hm_result_file( + result_dir: Path, + result: np.ndarray, + dataset_key: str, + limits: Sequence[Tuple[float, float]], +) -> None: + result_dir.mkdir(parents=True, exist_ok=True) + result_filepath = result_dir / "result" + + file = h5py.File(result_filepath, "x", libver="latest", locking=False) + meta_group = file.create_group(META_GROUP_NAME, track_order=True) + data_group = file.create_group(HISTORY_MATCHING_GROUP_NAME, track_order=True) + + dataset = data_group.create_dataset( + dataset_key, + shape=result.shape, + dtype=np.float64, + maxshape=tuple(None for _ in result.shape), + ) + + objective_functions = { + "observed_curve_1": {"trend_id": "trend_1", "property_id": "holdup"}, + "observed_curve_2": {"trend_id": "trend_2", "property_id": "pressure"}, + } + + fake_meta = { + "parametric_var_1": { + "parametric_var_id": "parametric_var_1", + "parametric_var_name": "mg", + "min_value": limits[0][0], + "max_value": limits[0][1], + "objective_functions": objective_functions, + "data_index": 0, + }, + "parametric_var_2": { + "parametric_var_id": "parametric_var_2", + "parametric_var_name": "mo", + "min_value": limits[1][0], + "max_value": limits[1][1], + "objective_functions": objective_functions, + "data_index": 1, + }, + } + + meta_group.attrs[HISTORY_MATCHING_GROUP_NAME] = json.dumps(fake_meta) + dataset[:] = result + + file.swmr_mode = True + file.close() + + +@pytest.fixture() +def hm_probabilistic_results_dir(datadir: Path) -> Path: + """ + Crete a History Matching result folder with a populated result file for each type of analysis + (probabilistic and deterministic). + """ + import numpy as np + + result_dir = datadir / "main-HM-probabilistic" + probabilistic_result = np.array( + [[0.1, 0.22, 1.0, 0.8, 0.55], [3.0, 6.0, 5.1, 4.7, 6.3]] + ) + limits = [(0.0, 1.0), (2.5, 7.5)] + + _create_and_populate_hm_result_file( + result_dir=result_dir, + result=probabilistic_result, + dataset_key=HISTORY_MATCHING_PROBABILISTIC_DSET_NAME, + limits=limits, + ) + + return result_dir + + +@pytest.fixture() +def hm_deterministic_results_dir(datadir: Path) -> Path: + """ + Crete a History Matching result folder with a populated result file for each type of analysis + (probabilistic and deterministic). + """ + import numpy as np + + result_dir = datadir / "main-HM-deterministic" + deterministic_result = np.array([0.1, 3.2]) + limits = [(0.0, 1.0), (2.5, 7.5)] + + _create_and_populate_hm_result_file( + result_dir=result_dir, + result=deterministic_result, + dataset_key=HISTORY_MATCHING_DETERMINISTIC_DSET_NAME, + limits=limits, + ) + + return result_dir diff --git a/tests/results/test_aggregator.py b/tests/results/test_aggregator.py index a21dc5495..e686edaeb 100644 --- a/tests/results/test_aggregator.py +++ b/tests/results/test_aggregator.py @@ -2,6 +2,7 @@ import re from pathlib import Path from typing import List +from typing import Literal import attr import numpy @@ -10,6 +11,7 @@ from pytest_regressions.num_regression import NumericRegressionFixture from alfasim_sdk.result_reader.aggregator import concatenate_metadata +from alfasim_sdk.result_reader.aggregator import HistoryMatchingMetadata from alfasim_sdk.result_reader.aggregator import open_result_files from alfasim_sdk.result_reader.aggregator import ( read_global_sensitivity_analysis_meta_data, @@ -18,6 +20,8 @@ read_global_sensitivity_analysis_time_set, ) from alfasim_sdk.result_reader.aggregator import read_global_sensitivity_coefficients +from alfasim_sdk.result_reader.aggregator import read_history_matching_metadata +from alfasim_sdk.result_reader.aggregator import read_history_matching_result from alfasim_sdk.result_reader.aggregator import read_metadata from alfasim_sdk.result_reader.aggregator import read_profiles_local_statistics from alfasim_sdk.result_reader.aggregator import read_time_sets @@ -301,7 +305,7 @@ def test_read_time_sets( num_regression.check({str(k): v for k, v in time_sets.items()}) -def test_read_empty_uq_metadata(datadir: Path) -> None: +def test_read_empty_gsa_metadata(datadir: Path) -> None: fake_uq_dir = datadir / "fake_uq_dir" uq_metadata = read_global_sensitivity_analysis_meta_data( @@ -317,7 +321,7 @@ def test_read_empty_uq_metadata(datadir: Path) -> None: assert uq_metadata.gsa_items == {} -def test_read_uq_metadata(global_sa_results_dir: Path) -> None: +def test_read_gsa_metadata(global_sa_results_dir: Path) -> None: uq_metadata = read_global_sensitivity_analysis_meta_data(global_sa_results_dir) global_gsa_meta_data = uq_metadata.gsa_items meta_var_1 = global_gsa_meta_data["temperature::parametric_var_1@trend_id_1"] @@ -333,7 +337,7 @@ def test_read_uq_metadata(global_sa_results_dir: Path) -> None: assert meta_var_2.parametric_var_name == "B" -def test_read_uq_timeset(global_sa_results_dir: Path) -> None: +def test_read_gsa_timeset(global_sa_results_dir: Path) -> None: time_set = read_global_sensitivity_analysis_time_set( result_directory=global_sa_results_dir ) @@ -354,7 +358,7 @@ def test_read_uq_global_sensitivity_analysis(global_sa_results_dir: Path) -> Non assert numpy.all(data == (12.1, 12.2, 12.3, 12.4, 12.5, 12.6, 12.7)) -def test_read_incomplete_uq_metadata(global_sa_results_dir: Path) -> None: +def test_read_incomplete_gsa_metadata(global_sa_results_dir: Path) -> None: """ When a .creating result file exists in the results folder, the metadata is incomplete, so they will be not read. @@ -370,3 +374,108 @@ def test_read_incomplete_uq_metadata(global_sa_results_dir: Path) -> None: metadata=gsa_meta_data, ) assert coefficients is None + + +def test_read_history_matching_result_metadata( + hm_probabilistic_results_dir: Path, +) -> None: + """ + Check reading the HM metadata from a probabilistic result file, which should be enough to + evaluate the deterministic metadata too as they are handled exactly the same. + """ + hm_results_dir = hm_probabilistic_results_dir + + # Existent and completed result file, metadata should be filled. + metadata = read_history_matching_metadata(hm_results_dir) + + assert metadata.result_directory == hm_results_dir + items_meta = metadata.hm_items + + expected_meta1 = HistoryMatchingMetadata.HMItem( + parametric_var_id="parametric_var_1", + parametric_var_name="mg", + min_value=0.0, + max_value=1.0, + data_index=0, + ) + + expected_meta2 = HistoryMatchingMetadata.HMItem( + parametric_var_id="parametric_var_2", + parametric_var_name="mo", + min_value=2.5, + max_value=7.5, + data_index=1, + ) + + assert items_meta["parametric_var_1"] == expected_meta1 + assert items_meta["parametric_var_2"] == expected_meta2 + + # Result file still being created, metadata should be empty. + creating_file = hm_results_dir / "result.creating" + creating_file.touch() + + metadata = read_history_matching_metadata(hm_results_dir) + assert metadata.result_directory == hm_results_dir + assert metadata.hm_items == {} + + creating_file.unlink() + + # Non-existent result directory, metadata should be empty. + unexistent_result_dir = Path("foo/bar") + metadata = read_history_matching_metadata(unexistent_result_dir) + assert metadata.result_directory == unexistent_result_dir + assert metadata.hm_items == {} + + +@pytest.mark.parametrize("hm_type", ("probabilistic", "deterministic")) +def test_read_history_matching_result_data( + hm_probabilistic_results_dir: Path, + hm_deterministic_results_dir: Path, + hm_type: Literal["probabilistic", "deterministic"], +) -> None: + """ + Check reading the result of both HM type analysis. Both results are available simultaneously by + the means of the fixtures, but only one is used at a time. + """ + # Setup. + if hm_type == "probabilistic": + expected_results = ([0.1, 0.22, 1.0, 0.8, 0.55], [3.0, 6.0, 5.1, 4.7, 6.3]) + results_dir = hm_probabilistic_results_dir + else: + assert hm_type == "deterministic" + expected_results = (0.1, 3.2) + results_dir = hm_deterministic_results_dir + + metadata = read_history_matching_metadata(results_dir) + + # Read the result of a single parametric var entry. + result = read_history_matching_result( + metadata, hm_type=hm_type, hm_result_key="parametric_var_1" + ) + assert len(result) == 1 + assert result["parametric_var_1"] == pytest.approx(expected_results[0]) + + # Read the result of all entries. + result = read_history_matching_result(metadata, hm_type=hm_type) + assert len(result) == 2 + assert result["parametric_var_1"] == pytest.approx(expected_results[0]) + assert result["parametric_var_2"] == pytest.approx(expected_results[1]) + + # Unexistent result key, result should be empty. + result = read_history_matching_result( + metadata, hm_type=hm_type, hm_result_key="foo" + ) + assert result == {} + + # Result still being created, result should be empty. + creating_file = results_dir / "result.creating" + creating_file.touch() + + result = read_history_matching_result(metadata, hm_type=hm_type) + assert result == {} + + creating_file.unlink() + + # Receiving an invalid History Matching type should raise. + with pytest.raises(ValueError, match="type `foobar` not supported"): + read_history_matching_result(metadata, "foobar") # type: ignore From 697764a71a9c98b308510f0ee737cb7f71a85bbb Mon Sep 17 00:00:00 2001 From: Rodrigo Neto Date: Tue, 5 Mar 2024 15:36:24 -0300 Subject: [PATCH 2/2] Fix unsupported type annotation --- src/alfasim_sdk/result_reader/aggregator.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/alfasim_sdk/result_reader/aggregator.py b/src/alfasim_sdk/result_reader/aggregator.py index 85e3520fa..cede9e27c 100644 --- a/src/alfasim_sdk/result_reader/aggregator.py +++ b/src/alfasim_sdk/result_reader/aggregator.py @@ -1722,7 +1722,7 @@ def read_history_matching_metadata(result_directory: Path) -> HistoryMatchingMet def read_history_matching_result( metadata: HistoryMatchingMetadata, hm_type: Literal["deterministic", "probabilistic"], - hm_result_key: HistoryMatchingResultKeyType | None = None, + hm_result_key: Optional[HistoryMatchingResultKeyType] = None, ) -> Dict[HistoryMatchingResultKeyType, np.ndarray | float]: """ :param metadata: