Skip to content

Commit

Permalink
ENH: Add FLUID column and simplify column names in inplace_volumes (#916
Browse files Browse the repository at this point in the history
)
  • Loading branch information
tnatt authored Dec 13, 2024
1 parent 61d17d4 commit 32cb406
Show file tree
Hide file tree
Showing 4 changed files with 633 additions and 64 deletions.
124 changes: 104 additions & 20 deletions src/fmu/dataio/export/rms/inplace_volumes.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@

import warnings
from dataclasses import dataclass
from enum import Enum
from pathlib import Path
from typing import Any, Final

import numpy as np
import pandas as pd

import fmu.dataio as dio
Expand All @@ -23,8 +25,26 @@

_logger: Final = null_logger(__name__)

_FLUID_COLUMN: Final = "FLUID"
_TABLE_INDEX_COLUMNS: Final = [_FLUID_COLUMN, "ZONE", "REGION", "FACIES", "LICENSE"]
_VOLUMETRIC_COLUMNS: Final = [
"BULK",
"PORV",
"HCPV",
"STOIIP",
"GIIP",
"ASSOCIATEDGAS",
"ASSOCIATEDOIL",
]


class _Fluid(str, Enum):
"""Fluid types"""

OIL = "OIL"
GAS = "GAS"
WATER = "WATER"

_TABLE_INDEX_COLUMNS: Final = ("ZONE", "REGION", "FACIES", "LICENCE")

# rename columns to FMU standard
_RENAME_COLUMNS_FROM_RMS: Final = {
Expand Down Expand Up @@ -58,23 +78,18 @@ class _ExportVolumetricsRMS:
volume_job_name: str

def __post_init__(self) -> None:
_logger.debug("Process data, estiblish state prior to export.")
_logger.debug("Process data, establish state prior to export.")
self._config = load_global_config()
self._volume_job = self._get_rms_volume_job_settings()
self._volume_table_name = self._read_volume_table_name_from_job()
self._dataframe = self._voltable_as_dataframe()
self._dataframe = self._get_table_with_volumes()
_logger.debug("Process data... DONE")

@property
def _classification(self) -> Classification:
"""Get default classification."""
return Classification.restricted

@property
def _table_index(self) -> list[str]:
"""Get index columns present in the dataframe."""
return [col for col in _TABLE_INDEX_COLUMNS if col in self._dataframe]

def _get_rms_volume_job_settings(self) -> dict:
"""Get information out from the RMS job API."""
_logger.debug("RMS VOLJOB settings...")
Expand All @@ -101,21 +116,90 @@ def _read_volume_table_name_from_job(self) -> str:
_logger.debug("The volume table name is %s", volume_table_name)
return volume_table_name

def _voltable_as_dataframe(self) -> pd.DataFrame:
"""Convert table to pandas dataframe"""
_logger.debug("Read values and convert to pandas dataframe...")
def _get_table_with_volumes(self) -> pd.DataFrame:
"""
Get a volumetric table from RMS converted into a pandas
dataframe on standard format for the inplace_volumes product.
"""
table = self._get_table_from_rms()
table = self._convert_table_from_rms_to_legacy_format(table)
return self._convert_table_from_legacy_to_standard_format(table)

dict_values = (
self.project.volumetric_tables[self._volume_table_name]
.get_data_table()
.to_dict()
def _get_table_from_rms(self) -> pd.DataFrame:
"""Fetch volumetric table from RMS and convert to pandas dataframe"""
_logger.debug("Read values and convert to pandas dataframe...")
return pd.DataFrame.from_dict(
(
self.project.volumetric_tables[self._volume_table_name]
.get_data_table()
.to_dict()
)
)
return (
pd.DataFrame.from_dict(dict_values)
.rename(columns=_RENAME_COLUMNS_FROM_RMS)
.drop("REAL", axis=1, errors="ignore")

@staticmethod
def _convert_table_from_rms_to_legacy_format(table: pd.DataFrame) -> pd.DataFrame:
"""Rename columns to legacy naming standard and drop REAL column if present."""
_logger.debug("Converting dataframe from RMS to legacy format...")
return table.rename(columns=_RENAME_COLUMNS_FROM_RMS).drop(
columns="REAL", errors="ignore"
)

@staticmethod
def _add_missing_columns_to_table(table: pd.DataFrame) -> pd.DataFrame:
"""Add columns with nan values if not present in table."""
_logger.debug("Add table index columns to table if missing...")
for col in _TABLE_INDEX_COLUMNS + _VOLUMETRIC_COLUMNS:
if col not in table:
table[col] = np.nan
return table

@staticmethod
def _set_table_column_order(table: pd.DataFrame) -> pd.DataFrame:
"""Set the column order in the table."""
_logger.debug("Settting the table column order...")
return table[_TABLE_INDEX_COLUMNS + _VOLUMETRIC_COLUMNS]

@staticmethod
def _transform_and_add_fluid_column_to_table(
table: pd.DataFrame, table_index: list[str]
) -> pd.DataFrame:
"""
Transformation of a dataframe containing fluid-specific column data into a
standardized format with unified column names, e.g. 'BULK_OIL' and 'PORV_OIL'
are renamed into 'BULK' and 'PORV' columns. To separate the data an additional
FLUID column is added that indicates the type of fluid the row represents.
"""
table_index = [col for col in _TABLE_INDEX_COLUMNS if col in table]

tables = []
for fluid in [_Fluid.GAS.value, _Fluid.OIL.value]:
fluid_columns = [col for col in table.columns if col.endswith(f"_{fluid}")]
if fluid_columns:
fluid_table = table[table_index + fluid_columns].copy()

# drop fluid suffix from columns to get standard names
fluid_table.columns = fluid_table.columns.str.replace(f"_{fluid}", "")

# add the fluid as column entry instead
fluid_table[_FLUID_COLUMN] = fluid.lower()

tables.append(fluid_table)

return pd.concat(tables, ignore_index=True) if tables else pd.DataFrame()

def _convert_table_from_legacy_to_standard_format(
self, table: pd.DataFrame
) -> pd.DataFrame:
"""
Convert the table from legacy to standard format for the 'inplace_volumes'
product. The standard format has a fluid column, and all table_index and
volumetric columns are present with a standard order in the table.
"""
table_index = [col for col in _TABLE_INDEX_COLUMNS if col in table]
table = self._transform_and_add_fluid_column_to_table(table, table_index)
table = self._add_missing_columns_to_table(table)
return self._set_table_column_order(table)

def _export_volume_table(self) -> ExportResult:
"""Do the actual volume table export using dataio setup."""

Expand All @@ -129,7 +213,7 @@ def _export_volume_table(self) -> ExportResult:
classification=self._classification,
name=self.grid_name,
rep_include=False,
table_index=self._table_index,
table_index=_TABLE_INDEX_COLUMNS,
)
absolute_export_path = edata.export(self._dataframe)
_logger.debug("Volume result to: %s", absolute_export_path)
Expand Down
Loading

0 comments on commit 32cb406

Please sign in to comment.