From 77c3e450bf86826021a9de1e399c3fa8417a703d Mon Sep 17 00:00:00 2001 From: Joakim Hove Date: Mon, 8 Feb 2021 14:06:19 +0100 Subject: [PATCH] Apply row_scaling in the smoother update --- lib/enkf/enkf_main.cpp | 71 ++--- lib/enkf/local_dataset.cpp | 7 +- lib/include/ert/enkf/local_dataset.hpp | 2 +- python/res/enkf/data/__init__.py | 3 +- python/res/enkf/data/enkf_node.py | 1 + python/res/enkf/row_scaling.py | 52 +++ test-data/local/row_scaling/CASE.EGRID | Bin 0 -> 11304 bytes test-data/local/row_scaling/config | 11 + test-data/local/row_scaling/observations.txt | 6 + test-data/local/row_scaling/timemap.txt | 2 + .../local/row_scaling/workflows/ROW_SCALING1 | 1 + .../row_scaling/workflows/ROW_SCALING_JOB1 | 2 + .../workflows/ROW_SCALING_WORKFLOW1 | 1 + .../workflows/scripts/row_scaling_job1.py | 49 +++ tests/res/enkf/data/test_summary.py | 2 +- tests/res/enkf/test_row_scaling.py | 78 ++++- tests/res/enkf/test_row_scaling_case.py | 300 ++++++++++++++++++ 17 files changed, 540 insertions(+), 48 deletions(-) create mode 100644 test-data/local/row_scaling/CASE.EGRID create mode 100644 test-data/local/row_scaling/config create mode 100644 test-data/local/row_scaling/observations.txt create mode 100644 test-data/local/row_scaling/timemap.txt create mode 100644 test-data/local/row_scaling/workflows/ROW_SCALING1 create mode 100644 test-data/local/row_scaling/workflows/ROW_SCALING_JOB1 create mode 100644 test-data/local/row_scaling/workflows/ROW_SCALING_WORKFLOW1 create mode 100644 test-data/local/row_scaling/workflows/scripts/row_scaling_job1.py create mode 100644 tests/res/enkf/test_row_scaling_case.py diff --git a/lib/enkf/enkf_main.cpp b/lib/enkf/enkf_main.cpp index 58f73740ce..d1e8c26889 100644 --- a/lib/enkf/enkf_main.cpp +++ b/lib/enkf/enkf_main.cpp @@ -758,7 +758,6 @@ static module_info_type * enkf_main_module_info_alloc( const local_ministep_type stringlist_free( update_keys ); } - { /* Init obs blocks in module_info */ module_obs_block_vector_type * module_obs_block_vector = module_info_get_obs_block_vector ( module_info ); int current_row = 0; @@ -965,7 +964,6 @@ static void enkf_main_update__(enkf_main_type * enkf_main, const int_vector_type res_log_ferror("No active observations/parameters for MINISTEP: %s.", local_ministep_get_name(ministep)); } - enkf_main_inflate(enkf_main, source_fs, target_fs, current_step, use_count); hash_free(use_count); } @@ -1096,54 +1094,50 @@ static void enkf_main_analysis_update( enkf_main_type * enkf_main , if (localA == NULL) analysis_module_initX( module , X , NULL , S , R , dObs , E , D, enkf_main->shared_rng); - while (!hash_iter_is_complete( dataset_iter )) { const char * dataset_name = hash_iter_get_next_key( dataset_iter ); const local_dataset_type * dataset = local_ministep_get_dataset( ministep , dataset_name ); if (local_dataset_get_size( dataset )) { local_obsdata_type * local_obsdata = local_ministep_get_obsdata( ministep ); + const auto& unscaled_keys = local_dataset_unscaled_keys(dataset); + if (unscaled_keys.size()) { - enkf_main_serialize_dataset(enkf_main_get_ensemble_config(enkf_main), dataset , step2 , use_count , tp , serialize_info); - module_info_type * module_info = enkf_main_module_info_alloc(ministep, obs_data, dataset, local_obsdata, serialize_info->active_size.data() , serialize_info->row_offset.data()); + /* + The update for one local_dataset instance consists of two main chunks: - /* - The update for one local_dataset instance consists of two main chunks: + 1. The first chunk updates all the parameters which don't have row + scaling attached. These parameters are serialized together to the A + matrix and all the parameters are updated in one go. - 1. The first chunk updates all the parameters which don't have row - scaling attached. These parameters are serialized together to the A - matrix and all the parameters are updated in one go. + 2. The second chunk is loop over all the parameters which have row + scaling attached. These parameters are updated one at a time. + */ - 2. The second chunk is loop over all the parameters which have row - scaling attached. These parameters are updated one at a time. - */ + // Part 1: Parameters which do not have row scaling attached. + enkf_main_serialize_dataset(enkf_main_get_ensemble_config(enkf_main), dataset , step2 , use_count , tp , serialize_info); + module_info_type * module_info = enkf_main_module_info_alloc(ministep, obs_data, dataset, local_obsdata, serialize_info->active_size.data() , serialize_info->row_offset.data()); - - // Part 1: Parameters which do not have row scaling attached. - enkf_main_serialize_dataset(enkf_main_get_ensemble_config(enkf_main), dataset , step2 , use_count , tp , serialize_info); - - if (analysis_module_check_option( module , ANALYSIS_UPDATE_A)){ - if (analysis_module_check_option( module , ANALYSIS_ITERABLE)){ - analysis_module_updateA( module , localA , S , R , dObs , E , D , module_info, enkf_main->shared_rng); - } - else - analysis_module_updateA( module , localA , S , R , dObs , E , D , module_info, enkf_main->shared_rng); - } else { - if (analysis_module_check_option( module , ANALYSIS_USE_A)){ + if (analysis_module_check_option( module , ANALYSIS_UPDATE_A)){ + if (analysis_module_check_option( module , ANALYSIS_ITERABLE)){ + analysis_module_updateA( module , localA , S , R , dObs , E , D , module_info, enkf_main->shared_rng); + } + else + analysis_module_updateA( module , localA , S , R , dObs , E , D , module_info, enkf_main->shared_rng); + } else { + if (analysis_module_check_option( module , ANALYSIS_USE_A)){ analysis_module_initX( module , X , localA , S , R , dObs , E , D, enkf_main->shared_rng); + } + matrix_inplace_matmul_mt2( A , X , tp ); } - matrix_inplace_matmul_mt2( A , X , tp ); + enkf_main_deserialize_dataset( enkf_main_get_ensemble_config( enkf_main ) , dataset , serialize_info , tp); + enkf_main_module_info_free( module_info ); } - enkf_main_deserialize_dataset( enkf_main_get_ensemble_config( enkf_main ) , dataset , serialize_info , tp); - - - // Part 2: Parameters with row scaling attached - to support distance based localization. { const auto& scaled_keys = local_dataset_scaled_keys(dataset); if (scaled_keys.size()) { - throw std::logic_error("Sorry - the use of row scaling for distance based localization is not yet implemented"); if (analysis_module_check_option( module , ANALYSIS_UPDATE_A)) throw std::logic_error("Sorry - row scaling for distance based localization can not be combined with analysis modules which update the A matrix"); @@ -1151,7 +1145,7 @@ static void enkf_main_analysis_update( enkf_main_type * enkf_main , for (int ikw=0; ikw < scaled_keys.size(); ikw++) { const auto& key = scaled_keys[ikw]; const active_list_type * active_list = local_dataset_get_node_active_list(dataset, key.c_str()); - for (int iens = serialize_info->iens1; iens < serialize_info->iens2; iens++) { + for (int iens = 0; iens < bool_vector_size(ens_mask); iens++) { int column = int_vector_iget( serialize_info->iens_active_index , iens); if (column >= 0) { serialize_node(serialize_info->src_fs, @@ -1165,16 +1159,15 @@ static void enkf_main_analysis_update( enkf_main_type * enkf_main , A); } } + matrix_shrink_header( A , local_dataset_get_row_scaling(dataset, key.c_str())->size(), matrix_get_columns(A) ); if (analysis_module_check_option( module , ANALYSIS_USE_A)) analysis_module_initX( module , X , localA , S , R , dObs , E , D, enkf_main->shared_rng); - /* - row_scaling_type * row_scaling = local_dataset_get_row_scaling( local_dataset, key ); - row_scaling_multiply(row_scaling, X, A ); - */ + const row_scaling_type * row_scaling = local_dataset_get_row_scaling( dataset, key.c_str() ); + row_scaling_multiply(row_scaling, A, X); - for (int iens = serialize_info->iens1; iens < serialize_info->iens2; iens++) { + for (int iens = 0; iens < bool_vector_size(ens_mask); iens++) { int column = int_vector_iget( serialize_info->iens_active_index , iens); if (column >= 0) { deserialize_node(serialize_info->target_fs, @@ -1182,7 +1175,7 @@ static void enkf_main_analysis_update( enkf_main_type * enkf_main , serialize_info->ensemble_config, key.c_str(), iens, - serialize_info->report_step, + serialize_info->target_step, 0, column, active_list, @@ -1192,7 +1185,6 @@ static void enkf_main_analysis_update( enkf_main_type * enkf_main , } } } - enkf_main_module_info_free( module_info ); } } hash_iter_free( dataset_iter ); @@ -1200,7 +1192,6 @@ static void enkf_main_analysis_update( enkf_main_type * enkf_main , } analysis_module_complete_update( module ); - /*****************************************************************/ int_vector_free(iens_active_index); diff --git a/lib/enkf/local_dataset.cpp b/lib/enkf/local_dataset.cpp index 8fa47e3e02..c6db579fc9 100644 --- a/lib/enkf/local_dataset.cpp +++ b/lib/enkf/local_dataset.cpp @@ -112,7 +112,7 @@ void local_dataset_clear( local_dataset_type * dataset) { hash_clear( dataset->active_size ); } -row_scaling * local_dataset_get_row_scaling(local_dataset_type * dataset, const char * key) { +const row_scaling * local_dataset_get_row_scaling(const local_dataset_type * dataset, const char * key) { auto scaling_iter = dataset->scaling.find( key ); if (scaling_iter != dataset->scaling.end()) return &scaling_iter->second; @@ -134,12 +134,13 @@ row_scaling_type * local_dataset_get_or_create_row_scaling(local_dataset_type * dataset->scaling.emplace( key, row_scaling{}); } - return local_dataset_get_row_scaling( dataset, key ); + return &dataset->scaling[key]; } active_list_type * local_dataset_get_node_active_list(const local_dataset_type * dataset , const char * node_key ) { - return (active_list_type *) hash_get( dataset->active_size , node_key ); /* Fails hard if you do not have the key ... */ + active_list_type * al = (active_list_type *) hash_get( dataset->active_size , node_key ); /* Fails hard if you do not have the key ... */ + return al; } stringlist_type * local_dataset_alloc_keys( const local_dataset_type * dataset ) { diff --git a/lib/include/ert/enkf/local_dataset.hpp b/lib/include/ert/enkf/local_dataset.hpp index 4889aed7d6..c1cf5e5563 100644 --- a/lib/include/ert/enkf/local_dataset.hpp +++ b/lib/include/ert/enkf/local_dataset.hpp @@ -50,7 +50,7 @@ int local_dataset_get_size( const local_dataset_type * dataset void local_dataset_del_node( local_dataset_type * dataset , const char * node_key); PY_USED bool local_dataset_has_key(const local_dataset_type * dataset, const char * key); hash_iter_type * local_dataset_alloc_iter(const local_dataset_type * dataset); -row_scaling_type * local_dataset_get_row_scaling(const local_dataset_type * dataset, const char * key); +const row_scaling_type * local_dataset_get_row_scaling(const local_dataset_type * dataset, const char * key); row_scaling_type * local_dataset_get_or_create_row_scaling(local_dataset_type * dataset, const char * key); bool local_dataset_has_row_scaling(const local_dataset_type * dataset, const char * key); diff --git a/python/res/enkf/data/__init__.py b/python/res/enkf/data/__init__.py index 9b9b45d4eb..fc1da228a0 100644 --- a/python/res/enkf/data/__init__.py +++ b/python/res/enkf/data/__init__.py @@ -3,6 +3,5 @@ from .gen_kw import GenKw from .ext_param import ExtParam from .enkf_node import EnkfNode -from .summary import Summary -__all__ = ["Field", "Summary", "GenKw", "GenData", "ExtParam", "EnkfNode"] +__all__ = ["Field", "GenKw", "GenData", "ExtParam", "EnkfNode"] diff --git a/python/res/enkf/data/enkf_node.py b/python/res/enkf/data/enkf_node.py index 65bb8cac87..6d80f97f9b 100644 --- a/python/res/enkf/data/enkf_node.py +++ b/python/res/enkf/data/enkf_node.py @@ -19,6 +19,7 @@ from res import ResPrototype from res.enkf import EnkfFs, NodeId from res.enkf.data import GenKw, GenData, Field, ExtParam +from res.enkf.data.summary import Summary class EnkfNode(BaseCClass): diff --git a/python/res/enkf/row_scaling.py b/python/res/enkf/row_scaling.py index bda08e541f..4c3ee3cff6 100644 --- a/python/res/enkf/row_scaling.py +++ b/python/res/enkf/row_scaling.py @@ -51,3 +51,55 @@ def __getitem__(self, index): def clamp(self, value): return self._clamp(value) + + def assign(self, target_size, func): + """Assign tapering value for all elements. + + The assign() method is the main function used to assign a row scaling + value to be used as tapering in the update. The first argument is the + number of elements in the target parameter, and the second argument is + a callable which will be called with element index as argument. + + In the example below we will assume that tapering is for a field + variable and we will scale the update with the function exp( -r/r0 ) + where r is the distance from some point and r0 is length scale. + + def sqr(x): + return x*x + + def exp_decay(grid, pos, r0, data_index): + x,y,z = grid.get_xyz( active_index = data_index) + r = math.sqrt( sqr(pos[0] - x) + sqr(pos[1] - y) + sqr(pos[2] - z)) + return math.exp( -r/r0 ) + + + ens_config = ert.ensembleConfig() + config_node = ens_config["PORO"] + field_config = config.node.getFieldModelConfig() + grid = ert.eclConfig().getGrid() + pos = grid.get_xyz(ijk=(10,10,1)) + r0 = 10 + + if grid.get_num_active() != field_config.get_data_size(): + raise ValuError("Fatal error - inconsistent field size for: {}".format(config_node.getKey()) + + # Some local configuration boilerplate has been skipped here. + local_config = main.getLocalConfig() + local_data = local_config.createDataset("LOCAL") + row_scaling = local_data.row_scaling("PORO") + row_scaling.assign( field_config.get_data_size(), functools.partial(exp_decay, grid, pos, r0)) + + + In the example below functools.partial() is used to create a callable + which has access to the necessary context, another alternative would be + to use a class instance which implements the __call__() method. + + It is an important point that the assign() method does not have any + context for error checking, i.e. if you call it with an incorrect value + for the size argument things will silently pass initially but might + blow up in the subsequent update step. + + """ + + for index in range(target_size): + self[index] = func(index) diff --git a/test-data/local/row_scaling/CASE.EGRID b/test-data/local/row_scaling/CASE.EGRID new file mode 100644 index 0000000000000000000000000000000000000000..1df43c1261a7ad81ace67a37c03b18edd851ddd0 GIT binary patch literal 11304 zcmeI2O>QJb5QWn-5NEp;j7@Wb7^)7#T>TT1!u?)-Q{K9q9vJM!yNKK~2( zscxPcbN?wO;IGNye);Lc`R?&Oo_KS7yIhd*dX+EkPmh<=`&!BLe!Se>vJm_#)g7X) zO013gkDLR(-ftAIkY8H!JUwfo;~X3x9xk_d4e|2+c{wfba0|SSU;X@Vw=$UDuh%QV z^cf)&Fk5UAyT8JaY#m@o)&_G8Hgd-z1snFZ$xLkEZ;=dk-?_t)f$-EYYkyWf*9c7IF0m_Fwa&kc4Tz3;x*{hE9+ead_t z3z@-tFjK(>jsWiL9u{WGEL~fZFUDm*1I!f9gE2Gu$iSVAyd_^upS?3v!A3^!%v7+E zYx2cbhAaSEGPmA40SY#-2N%c~SoTbS+%x&>*t;*rrO`V9@;R95d#V@X(wGAQ@|oDE z_uj!&u#q_j0u*fIntZX9p?KeNM>b^d9ZUrq*kc7i##oiHiTjusXe5v z1zu0S*!?Z}V*1|u3V?!*%sH$8DA>p~`C=uE`f$8H(S_+{OFTd++0V zu<<&hV}Xp(m9g1 z#a4#+jF3MM8}EG^Zm?nNu>l}sY|7ZgeN2ordfxz$&tX%2OZ8$a@Otva?r+H#)A!!7 zr-F^lIcxwZ*vK{cVk<-Z`Hid-BEZZ^;+a z=QHEE!R})YyDxUXCSOdS;=DpWJ8Zmn`N0OC5%s~$J_pqYyT7J-vHLCgV)uLU#qMv( z7t{CN_1s|h;l;m`Ir>^^^2MyDmdbT#siRl#9ZUrqn6XqWkRg^jvSCA}f{nZ;U+jKM zzS#Yqe6jmm^2PMM_X>c5jm+LF017s8O}^L)<)5t&zZ@SfXaC&&nO#43f8D%fE^`8N z0&@a$0&@b-oB;n1oiS%H&%itb^9;;0Fwej|1M>{L+!-i^f1`i->zwJ!&2-N4|2Ip+ z-z_ud4CWb_XJDRzc?O<)2KX7P;JZl6@$vlOzE#q#aiCyTYd$UKa=i5|GTID-x_ipApigX literal 0 HcmV?d00001 diff --git a/test-data/local/row_scaling/config b/test-data/local/row_scaling/config new file mode 100644 index 0000000000..e30ea0ce23 --- /dev/null +++ b/test-data/local/row_scaling/config @@ -0,0 +1,11 @@ +NUM_REALIZATIONS 10 +GRID CASE.EGRID +FIELD PORO PARAMETER poro.grdecl INIT_FILES:fields/poro%d.grdecl +SUMMARY WBHP +OBS_CONFIG observations.txt + + +LOAD_WORKFLOW_JOB workflows/ROW_SCALING_JOB1 +LOAD_WORKFLOW workflows/ROW_SCALING_WORKFLOW1 + +TIME_MAP timemap.txt diff --git a/test-data/local/row_scaling/observations.txt b/test-data/local/row_scaling/observations.txt new file mode 100644 index 0000000000..88f1263060 --- /dev/null +++ b/test-data/local/row_scaling/observations.txt @@ -0,0 +1,6 @@ +SUMMARY_OBSERVATION WBHP0 { + VALUE = 125; + ERROR = 25; + RESTART = 1; + KEY = WBHP; +}; diff --git a/test-data/local/row_scaling/timemap.txt b/test-data/local/row_scaling/timemap.txt new file mode 100644 index 0000000000..a4ac5df647 --- /dev/null +++ b/test-data/local/row_scaling/timemap.txt @@ -0,0 +1,2 @@ +01/01/2020 +01/02/2020 diff --git a/test-data/local/row_scaling/workflows/ROW_SCALING1 b/test-data/local/row_scaling/workflows/ROW_SCALING1 new file mode 100644 index 0000000000..b934485f73 --- /dev/null +++ b/test-data/local/row_scaling/workflows/ROW_SCALING1 @@ -0,0 +1 @@ +SCRIPT script/row_scaling1.py diff --git a/test-data/local/row_scaling/workflows/ROW_SCALING_JOB1 b/test-data/local/row_scaling/workflows/ROW_SCALING_JOB1 new file mode 100644 index 0000000000..267af4429e --- /dev/null +++ b/test-data/local/row_scaling/workflows/ROW_SCALING_JOB1 @@ -0,0 +1,2 @@ +INTERNAL True +SCRIPT scripts/row_scaling_job1.py diff --git a/test-data/local/row_scaling/workflows/ROW_SCALING_WORKFLOW1 b/test-data/local/row_scaling/workflows/ROW_SCALING_WORKFLOW1 new file mode 100644 index 0000000000..637b0b7c26 --- /dev/null +++ b/test-data/local/row_scaling/workflows/ROW_SCALING_WORKFLOW1 @@ -0,0 +1 @@ +ROW_SCALING_JOB1 diff --git a/test-data/local/row_scaling/workflows/scripts/row_scaling_job1.py b/test-data/local/row_scaling/workflows/scripts/row_scaling_job1.py new file mode 100644 index 0000000000..04c17b20a0 --- /dev/null +++ b/test-data/local/row_scaling/workflows/scripts/row_scaling_job1.py @@ -0,0 +1,49 @@ +from res.enkf import ErtScript +from functools import partial +import math + +def gaussian_decay(obs_pos, length_scale, grid, data_index): + x,y,z = grid.get_xyz( active_index = data_index) + dx = (obs_pos[0] - x)/length_scale[0] + dy = (obs_pos[1] - y)/length_scale[1] + dz = (obs_pos[2] - z)/length_scale[2] + + exp_arg = -0.5* (dx*dx + dy*dy + dz*dz) + return math.exp( exp_arg ) + + + + +class RowScalingJob1(ErtScript): + + def run(self): + main = self.ert() + local_config = main.getLocalConfig() + local_config.clear() + + + local_data = local_config.createDataset("LOCAL") + local_data.addNode("PORO") + row_scaling = local_data.row_scaling("PORO") + + ens_config = main.ensembleConfig() + poro_config = ens_config["PORO"] + field_config = poro_config.getFieldModelConfig() + + #------------------------------------------------------------------------------------- + grid = main.eclConfig().getGrid() + obs_pos = grid.get_xyz( ijk=(5,5,1) ) + length_scale = (2, 1, 0.50) + gaussian = partial( gaussian_decay, obs_pos, length_scale, grid) + row_scaling.assign( field_config.get_data_size(), gaussian) + #------------------------------------------------------------------------------------- + + + obs = local_config.createObsdata("OBSSET_LOCAL") + obs.addNode("WBHP0") + ministep = local_config.createMinistep("MINISTEP_LOCAL") + ministep.attachDataset(local_data) + ministep.attachObsset(obs) + updatestep = local_config.getUpdatestep() + updatestep.attachMinistep(ministep) + diff --git a/tests/res/enkf/data/test_summary.py b/tests/res/enkf/data/test_summary.py index 724c610cd8..7e62132dd2 100644 --- a/tests/res/enkf/data/test_summary.py +++ b/tests/res/enkf/data/test_summary.py @@ -2,7 +2,7 @@ import json from ecl.util.test.ecl_mock import createEclSum -from res.enkf.data import Summary +from res.enkf.data.summary import Summary from res.enkf.config import SummaryConfig from ecl.util.test import TestAreaContext from tests import ResTest diff --git a/tests/res/enkf/test_row_scaling.py b/tests/res/enkf/test_row_scaling.py index a39f96f049..6d0c4dfd45 100644 --- a/tests/res/enkf/test_row_scaling.py +++ b/tests/res/enkf/test_row_scaling.py @@ -16,10 +16,44 @@ import pytest import random +from functools import partial from res.enkf import RowScaling +import math +from ecl.grid import EclGridGenerator +from res.enkf import FieldConfig -def test_access(): + +def row_scaling_one(data_index): + return 1.0 + + +def row_scaling_inverse_size(size, data_index): + return data_index / size + + +def row_scaling_coloumb(nx, ny, data_index): + j = data_index / nx + i = data_index - j * nx + + dx = 0.5 + i + dy = 0.5 + j + + r2 = dx * dx + dy * dy + return 0.50 / r2 + + +def gaussian_decay(obs_pos, length_scale, grid, data_index): + x, y, z = grid.get_xyz(active_index=data_index) + dx = (obs_pos[0] - x) / length_scale[0] + dy = (obs_pos[1] - y) / length_scale[1] + dz = (obs_pos[2] - z) / length_scale[2] + + exp_arg = -0.5 * (dx * dx + dy * dy + dz * dz) + return math.exp(exp_arg) + + +def test_basic(): row_scaling = RowScaling() assert len(row_scaling) == 0 @@ -35,3 +69,45 @@ def test_access(): r = random.random() row_scaling[i] = r assert row_scaling[i] == row_scaling.clamp(r) + + nx = 10 + ny = 10 + row_scaling.assign(nx * ny, row_scaling_one) + assert len(row_scaling) == nx * ny + assert row_scaling[0] == 1 + assert row_scaling[nx * ny - 1] == 1 + + inverse_size = partial(row_scaling_inverse_size, nx * ny) + row_scaling.assign(nx * ny, inverse_size) + for g in range(nx * ny): + assert row_scaling[g] == row_scaling.clamp(g / (nx * ny)) + + coloumb = partial(row_scaling_coloumb, nx, ny) + row_scaling.assign(nx * ny, coloumb) + for j in range(ny): + for i in range(nx): + g = j * nx + i + assert row_scaling[g] == row_scaling.clamp(row_scaling_coloumb(nx, ny, g)) + + +def test_field_config(): + nx = 10 + ny = 10 + nz = 5 + actnum = [1] * nx * ny * nz + actnum[0] = 0 + actnum[3] = 0 + actnum[10] = 0 + + grid = EclGridGenerator.create_rectangular((nx, ny, nz), (1, 1, 1), actnum) + fc = FieldConfig("PORO", grid) + row_scaling = RowScaling() + obs_pos = grid.get_xyz(ijk=(5, 5, 1)) + length_scale = (2, 1, 0.50) + + gaussian = partial(gaussian_decay, obs_pos, length_scale, grid) + row_scaling.assign(grid.get_num_active(), gaussian) + for g in range(grid.get_num_active()): + assert row_scaling[g] == row_scaling.clamp( + gaussian_decay(obs_pos, length_scale, grid, g) + ) diff --git a/tests/res/enkf/test_row_scaling_case.py b/tests/res/enkf/test_row_scaling_case.py new file mode 100644 index 0000000000..843a45ebab --- /dev/null +++ b/tests/res/enkf/test_row_scaling_case.py @@ -0,0 +1,300 @@ +# Copyright (C) 2020 Equinor ASA, Norway. +# +# The file 'test_row_scaling.py' is part of ERT - Ensemble based Reservoir Tool. +# +# ERT is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# ERT is distributed in the hope that it will be useful, but WITHOUT ANY +# WARRANTY; without even the implied warranty of MERCHANTABILITY or +# FITNESS FOR A PARTICULAR PURPOSE. +# +# See the GNU General Public License at +# for more details. + +import random +import os +import math + +from tests import ResTest + +from ecl.util.util import BoolVector + +from res.test import ErtTestContext +from res.enkf import RowScaling +from res.enkf import EnkfNode, NodeId +from res.enkf import ESUpdate +from res.enkf import FieldConfig, ErtRunContext +from res.enkf.enums import RealizationStateEnum + + +# This function will initialize the data in the case before the actual row +# scaling test can be performed. The function will initialize the PORO field +# which is the parameter to be updated, and it will assign one value for the +# summary value WBHP which will served as simulated response in the update +# process. +# +# The parameter field PORO is initialized by first creating input files +# poro/poro%d.grdecl and then the normal main.initRun() function is used to +# initialize the case and load the PORO data from disk. The summary data is +# normally loaded from a summary case found on disk, here we instead explicitly +# assign a value to the WBHP field at report step 1. Since we circumvent the +# normal load from forward model functionality we must manually update the +# state_map of the file system with the RealizationStateEnum.STATE_HAS_DATA +# flag. The return value from this function is the enkf_fs instance which has +# been initialized. + + +def init_data(main): + fsm = main.getEnkfFsManager() + init_fs = fsm.getFileSystem("init") + + # Model: bhp = poro * 1000 + poro_mean = 0.15 + poro_std = 0.10 + bhp_std = 125 + + bhp = [] + num_realisations = main.getEnsembleSize() + + # The path fields/poro{}.grdecl must be consistent with the INIT_FILES: argument in the + # PORO configuration in the configuration file used for the testcase. + os.mkdir("fields") + random.seed(12345) + for i in range(num_realisations): + with open("fields/poro{}.grdecl".format(i), "w") as f: + poro = random.gauss(poro_mean, poro_std) + f.write( + """PORO + 200*{:<7.5} +/ + """.format( + poro + ) + ) + bhp.append(poro * 1000 + random.gauss(0, bhp_std)) + + mask = BoolVector(initial_size=main.getEnsembleSize(), default_value=True) + init_context = ErtRunContext.case_init(init_fs, mask) + main.initRun(init_context) + + ens_config = main.ensembleConfig() + bhp_config = ens_config["WBHP"] + state_map = init_fs.getStateMap() + for iens in range(main.getEnsembleSize()): + bhp_node = EnkfNode(bhp_config) + bhp_summary = bhp_node.as_summary() + bhp_summary[1] = bhp[iens] + + node_id = NodeId(1, iens) + bhp_node.save(init_fs, node_id) + state_map[iens] = RealizationStateEnum.STATE_HAS_DATA + + return init_fs + + +# This is an example callable which decays as a gaussian away from a position +# obs_pos; this is (probaly) a simplified version of tapering function which +# will be interesting to use. +class GaussianDecay(object): + def __init__(self, obs_pos, length_scale, grid): + self.obs_pos = obs_pos + self.length_scale = length_scale + self.grid = grid + + def __call__(self, data_index): + x, y, z = self.grid.get_xyz(active_index=data_index) + dx = (self.obs_pos[0] - x) / self.length_scale[0] + dy = (self.obs_pos[1] - y) / self.length_scale[1] + dz = (self.obs_pos[2] - z) / self.length_scale[2] + + exp_arg = -0.5 * (dx * dx + dy * dy + dz * dz) + return math.exp(exp_arg) + + +# This is a quite contrived row scaling callable which is mainly designed to +# test that the correct parts of the field are updated. The behavior of the +# ScalingTest function is as follows: +# +# k = 0: In the upper layer the scaling fcuntion behaves as a step function, +# for i <= 5 the scaling function returns one, and the updated field +# should be identical to the field updated without row scaling applied. +# +# For i > 5 the scaling function will return zero - corresponding to +# zero update. In this case the field returned from the updated process +# should be identical to the input field. +# +# k = 1: In the lower layer he scaling function is constantly equal to 0.50, +# that implies the updated values should be between the initial values +# and the normal full update value. +# +# The function assert_field_update() verifies that the updated field satisfies +# these constraints. + + +class ScalingTest(object): + def __init__(self, grid): + self.grid = grid + + def __call__(self, data_index): + i, j, k = self.grid.get_ijk(active_index=data_index) + if k == 0: + if i <= 5: + return 1 + else: + return 0 + else: + return 0.5 + + +def assert_field_update(grid, init_field, update_field1, update_field2): + k = 0 + for j in range(grid.ny): + for i in range(6): + assert update_field1.ijk_get_double( + i, j, k + ) == update_field2.ijk_get_double(i, j, k) + + for i in range(6, grid.nx): + assert init_field.ijk_get_double(i, j, k) == update_field2.ijk_get_double( + i, j, k + ) + + k = 1 + for j in range(grid.ny): + for i in range(grid.nx): + init = init_field.ijk_get_double(i, j, k) + update1 = update_field1.ijk_get_double(i, j, k) + update2 = update_field1.ijk_get_double(i, j, k) + assert (update2 - init) * (update1 - init) > 0 + + +class RowScalingTest(ResTest): + def setUp(self): + self.config_file = self.createTestPath("local/row_scaling/config") + + # The test_update_workflow() applies the row scaling through a workflow, + # that is probably the way it will be done by users. The test does not + # really verify the results in any way, but serves to demonstrate how + # things are connected. The job in workflows/row_scaling_job1.py uses + # functools.partial() as an alternative to Class::__call__() as callable + # when the row scaling is applied. + def test_update_workflow(self): + with ErtTestContext("row_scaling", self.config_file) as tc: + main = tc.getErt() + workflow_list = main.getWorkflowList() + workflow = workflow_list["ROW_SCALING_WORKFLOW1"] + self.assertTrue(workflow.run(main)) + + init_fs = init_data(main) + target_fs = main.getEnkfFsManager().getFileSystem("target") + + es_update = ESUpdate(main) + run_context = ErtRunContext.ensemble_smoother_update(init_fs, target_fs) + es_update.smootherUpdate(run_context) + + # The test_update_code() applies the row scaling through code inlined in + # the test, and also uses the GaussianDecay callable class instead of + # functools.partial() to create a callable for the scaling operation. + def test_update_code1(self): + with ErtTestContext("row_scaling", self.config_file) as tc: + main = tc.getErt() + + local_config = main.getLocalConfig() + local_config.clear() + local_data = local_config.createDataset("LOCAL") + local_data.addNode("PORO") + obs = local_config.createObsdata("OBSSET_LOCAL") + obs.addNode("WBHP0") + ministep = local_config.createMinistep("MINISTEP_LOCAL") + ministep.attachDataset(local_data) + ministep.attachObsset(obs) + updatestep = local_config.getUpdatestep() + updatestep.attachMinistep(ministep) + + row_scaling = local_data.row_scaling("PORO") + ens_config = main.ensembleConfig() + poro_config = ens_config["PORO"] + field_config = poro_config.getFieldModelConfig() + + # ------------------------------------------------------------------------------------- + grid = main.eclConfig().getGrid() + obs_pos = grid.get_xyz(ijk=(5, 5, 1)) + length_scale = (2, 1, 0.50) + row_scaling.assign( + field_config.get_data_size(), GaussianDecay(obs_pos, length_scale, grid) + ) + # ------------------------------------------------------------------------------------- + + init_fs = init_data(main) + target_fs = main.getEnkfFsManager().getFileSystem("target") + es_update = ESUpdate(main) + run_context = ErtRunContext.ensemble_smoother_update(init_fs, target_fs) + es_update.smootherUpdate(run_context) + + # This test does two smoother updates, first without row scaling in update1 + # and then afterwards with row scaling in update2. The row scaling function + # is designed so that it is possible to test the updates results. + def test_update_code2(self): + random_seed = "ABCDEFGHIJK0123456" + with ErtTestContext("row_scaling", self.config_file) as tc: + main = tc.getErt() + init_fs = init_data(main) + update_fs1 = main.getEnkfFsManager().getFileSystem("target1") + + # The first smoother update without row scaling + es_update = ESUpdate(main) + run_context = ErtRunContext.ensemble_smoother_update(init_fs, update_fs1) + rng = main.rng() + rng.setState(random_seed) + es_update.smootherUpdate(run_context) + + # Configure the local updates + local_config = main.getLocalConfig() + local_config.clear() + local_data = local_config.createDataset("LOCAL") + local_data.addNode("PORO") + obs = local_config.createObsdata("OBSSET_LOCAL") + obs.addNode("WBHP0") + ministep = local_config.createMinistep("MINISTEP_LOCAL") + ministep.attachDataset(local_data) + ministep.attachObsset(obs) + updatestep = local_config.getUpdatestep() + updatestep.attachMinistep(ministep) + + # Apply the row scaling + row_scaling = local_data.row_scaling("PORO") + ens_config = main.ensembleConfig() + poro_config = ens_config["PORO"] + field_config = poro_config.getFieldModelConfig() + grid = main.eclConfig().getGrid() + row_scaling.assign(field_config.get_data_size(), ScalingTest(grid)) + + # Second update with row scaling + update_fs2 = main.getEnkfFsManager().getFileSystem("target2") + es_update = ESUpdate(main) + run_context = ErtRunContext.ensemble_smoother_update(init_fs, update_fs2) + rng.setState(random_seed) + es_update.smootherUpdate(run_context) + + # Fetch the three values initial, update without row scaling and + # update with row scaling and verify that the row scaling has been + # correctly applied. + init_node = EnkfNode(poro_config) + update_node1 = EnkfNode(poro_config) + update_node2 = EnkfNode(poro_config) + for iens in range(main.getEnsembleSize()): + node_id = NodeId(0, iens) + + init_node.load(init_fs, node_id) + update_node1.load(update_fs1, node_id) + update_node2.load(update_fs2, node_id) + + assert_field_update( + grid, + init_node.asField(), + update_node1.asField(), + update_node2.asField(), + )