From 7977a5330a10f60211291b29c3c646ea4f18b87f Mon Sep 17 00:00:00 2001 From: Achal Shah Date: Wed, 4 Aug 2021 15:35:56 -0700 Subject: [PATCH] Add the foundation of the universal feature repo and a test that uses it (#1734) * Add the foundation of the universal feature repo and a test that uses it Signed-off-by: Achal Shah * Make tests actually work Signed-off-by: Achal Shah * Make format Signed-off-by: Achal Shah * Make format Signed-off-by: Achal Shah * add a redshift data source creator Signed-off-by: Achal Shah * integration test Signed-off-by: Achal Shah * file data source creator Signed-off-by: Achal Shah * fix online store ref Signed-off-by: Achal Shah * dynamodb region Signed-off-by: Achal Shah * fix file Signed-off-by: Achal Shah * remove impor Signed-off-by: Achal Shah * close not delete Signed-off-by: Achal Shah * Refactor configs into test_repo_config Signed-off-by: Achal Shah * make forma Signed-off-by: Achal Shah * Add a sweet decorator per feedback Signed-off-by: Achal Shah * make format Signed-off-by: Achal Shah * move stuff into with Signed-off-by: Achal Shah * Specify repo_path for tests to succeed Signed-off-by: Achal Shah * fix comments Signed-off-by: Achal Shah * fix format Signed-off-by: Achal Shah --- sdk/python/feast/repo_config.py | 1 + sdk/python/tests/data/data_creator.py | 27 ++++ .../integration/e2e/test_universal_e2e.py | 123 ++++++++++++++++++ .../feature_repos/test_repo_configuration.py | 109 ++++++++++++++++ .../universal/data_source_creator.py | 26 ++++ .../universal/data_sources/bigquery.py | 54 ++++++++ .../universal/data_sources/file.py | 41 ++++++ .../universal/data_sources/redshift.py | 76 +++++++++++ .../feature_repos/universal/entities.py | 10 ++ .../feature_repos/universal/feature_views.py | 14 ++ .../test_offline_online_store_consistency.py | 52 ++------ 11 files changed, 491 insertions(+), 42 deletions(-) create mode 100644 sdk/python/tests/data/data_creator.py create mode 100644 sdk/python/tests/integration/e2e/test_universal_e2e.py create mode 100644 sdk/python/tests/integration/feature_repos/test_repo_configuration.py create mode 100644 sdk/python/tests/integration/feature_repos/universal/data_source_creator.py create mode 100644 sdk/python/tests/integration/feature_repos/universal/data_sources/bigquery.py create mode 100644 sdk/python/tests/integration/feature_repos/universal/data_sources/file.py create mode 100644 sdk/python/tests/integration/feature_repos/universal/data_sources/redshift.py create mode 100644 sdk/python/tests/integration/feature_repos/universal/entities.py create mode 100644 sdk/python/tests/integration/feature_repos/universal/feature_views.py diff --git a/sdk/python/feast/repo_config.py b/sdk/python/feast/repo_config.py index 167a289a21..afcf30e848 100644 --- a/sdk/python/feast/repo_config.py +++ b/sdk/python/feast/repo_config.py @@ -80,6 +80,7 @@ class RepoConfig(FeastBaseModel): def __init__(self, **data: Any): super().__init__(**data) + if isinstance(self.online_store, Dict): self.online_store = get_online_config_from_type(self.online_store["type"])( **self.online_store diff --git a/sdk/python/tests/data/data_creator.py b/sdk/python/tests/data/data_creator.py new file mode 100644 index 0000000000..a6fc9d423d --- /dev/null +++ b/sdk/python/tests/data/data_creator.py @@ -0,0 +1,27 @@ +from datetime import datetime, timedelta + +import pandas as pd +from pytz import timezone, utc + + +def create_dataset() -> pd.DataFrame: + now = datetime.utcnow() + ts = pd.Timestamp(now).round("ms") + data = { + "id": [1, 2, 1, 3, 3], + "value": [0.1, None, 0.3, 4, 5], + "ts_1": [ + ts - timedelta(hours=4), + ts, + ts - timedelta(hours=3), + # Use different time zones to test tz-naive -> tz-aware conversion + (ts - timedelta(hours=4)) + .replace(tzinfo=utc) + .astimezone(tz=timezone("Europe/Berlin")), + (ts - timedelta(hours=1)) + .replace(tzinfo=utc) + .astimezone(tz=timezone("US/Pacific")), + ], + "created_ts": [ts, ts, ts, ts, ts], + } + return pd.DataFrame.from_dict(data) diff --git a/sdk/python/tests/integration/e2e/test_universal_e2e.py b/sdk/python/tests/integration/e2e/test_universal_e2e.py new file mode 100644 index 0000000000..a58ea841d2 --- /dev/null +++ b/sdk/python/tests/integration/e2e/test_universal_e2e.py @@ -0,0 +1,123 @@ +import math +from datetime import datetime, timedelta +from typing import Optional + +import pandas as pd +from pytz import utc + +from feast import FeatureStore, FeatureView +from tests.integration.feature_repos.test_repo_configuration import parametrize_e2e_test + + +@parametrize_e2e_test +def test_e2e_consistency(fs: FeatureStore): + run_offline_online_store_consistency_test(fs) + + +def check_offline_and_online_features( + fs: FeatureStore, + fv: FeatureView, + driver_id: int, + event_timestamp: datetime, + expected_value: Optional[float], + full_feature_names: bool, + check_offline_store: bool = True, +) -> None: + # Check online store + response_dict = fs.get_online_features( + [f"{fv.name}:value"], + [{"driver": driver_id}], + full_feature_names=full_feature_names, + ).to_dict() + + if full_feature_names: + if expected_value: + assert abs(response_dict[f"{fv.name}__value"][0] - expected_value) < 1e-6 + else: + assert response_dict[f"{fv.name}__value"][0] is None + else: + if expected_value: + assert abs(response_dict["value"][0] - expected_value) < 1e-6 + else: + assert response_dict["value"][0] is None + + # Check offline store + if check_offline_store: + df = fs.get_historical_features( + entity_df=pd.DataFrame.from_dict( + {"driver_id": [driver_id], "event_timestamp": [event_timestamp]} + ), + features=[f"{fv.name}:value"], + full_feature_names=full_feature_names, + ).to_df() + + if full_feature_names: + if expected_value: + assert abs(df.to_dict()[f"{fv.name}__value"][0] - expected_value) < 1e-6 + else: + assert math.isnan(df.to_dict()[f"{fv.name}__value"][0]) + else: + if expected_value: + assert abs(df.to_dict()["value"][0] - expected_value) < 1e-6 + else: + assert math.isnan(df.to_dict()["value"][0]) + + +def run_offline_online_store_consistency_test(fs: FeatureStore,) -> None: + now = datetime.utcnow() + + fv = fs.get_feature_view("test_correctness") + full_feature_names = True + check_offline_store: bool = True + + # Run materialize() + # use both tz-naive & tz-aware timestamps to test that they're both correctly handled + start_date = (now - timedelta(hours=5)).replace(tzinfo=utc) + end_date = now - timedelta(hours=2) + fs.materialize(feature_views=[fv.name], start_date=start_date, end_date=end_date) + + # check result of materialize() + check_offline_and_online_features( + fs=fs, + fv=fv, + driver_id=1, + event_timestamp=end_date, + expected_value=0.3, + full_feature_names=full_feature_names, + check_offline_store=check_offline_store, + ) + + check_offline_and_online_features( + fs=fs, + fv=fv, + driver_id=2, + event_timestamp=end_date, + expected_value=None, + full_feature_names=full_feature_names, + check_offline_store=check_offline_store, + ) + + # check prior value for materialize_incremental() + check_offline_and_online_features( + fs=fs, + fv=fv, + driver_id=3, + event_timestamp=end_date, + expected_value=4, + full_feature_names=full_feature_names, + check_offline_store=check_offline_store, + ) + + # run materialize_incremental() + fs.materialize_incremental(feature_views=[fv.name], end_date=now) + + # check result of materialize_incremental() + check_offline_and_online_features( + fs=fs, + fv=fv, + driver_id=3, + event_timestamp=now, + expected_value=5, + full_feature_names=full_feature_names, + check_offline_store=check_offline_store, + ) diff --git a/sdk/python/tests/integration/feature_repos/test_repo_configuration.py b/sdk/python/tests/integration/feature_repos/test_repo_configuration.py new file mode 100644 index 0000000000..fec573abf9 --- /dev/null +++ b/sdk/python/tests/integration/feature_repos/test_repo_configuration.py @@ -0,0 +1,109 @@ +import tempfile +import uuid +from contextlib import contextmanager +from pathlib import Path +from typing import Dict, List, Union + +import pytest +from attr import dataclass + +from feast import FeatureStore, RepoConfig, importer +from tests.data.data_creator import create_dataset +from tests.integration.feature_repos.universal.data_source_creator import ( + DataSourceCreator, +) +from tests.integration.feature_repos.universal.entities import driver +from tests.integration.feature_repos.universal.feature_views import ( + correctness_feature_view, +) + + +@dataclass +class TestRepoConfig: + """ + This class should hold all possible parameters that may need to be varied by individual tests. + """ + + provider: str = "local" + online_store: Union[str, Dict] = "sqlite" + + offline_store_creator: str = "tests.integration.feature_repos.universal.data_sources.file.FileDataSourceCreator" + + full_feature_names: bool = True + + +FULL_REPO_CONFIGS: List[TestRepoConfig] = [ + TestRepoConfig(), # Local + TestRepoConfig( + provider="aws", + offline_store_creator="tests.integration.feature_repos.universal.data_sources.redshift.RedshiftDataSourceCreator", + online_store={"type": "dynamodb", "region": "us-west-2"}, + ), + TestRepoConfig( + provider="gcp", + offline_store_creator="tests.integration.feature_repos.universal.data_sources.bigquery.BigQueryDataSourceCreator", + online_store="datastore", + ), +] + + +OFFLINE_STORES: List[str] = [] +ONLINE_STORES: List[str] = [] +PROVIDERS: List[str] = [] + + +@contextmanager +def construct_feature_store(test_repo_config: TestRepoConfig) -> FeatureStore: + """ + This method should take in the parameters from the test repo config and created a feature repo, apply it, + and return the constructed feature store object to callers. + + This feature store object can be interacted for the purposes of tests. + The user is *not* expected to perform any clean up actions. + + :param test_repo_config: configuration + :return: A feature store built using the supplied configuration. + """ + df = create_dataset() + + project = f"test_correctness_{str(uuid.uuid4()).replace('-', '')[:8]}" + + module_name, config_class_name = test_repo_config.offline_store_creator.rsplit( + ".", 1 + ) + + offline_creator: DataSourceCreator = importer.get_class_from_type( + module_name, config_class_name, "DataSourceCreator" + )() + ds = offline_creator.create_data_source(project, df) + offline_store = offline_creator.create_offline_store_config() + online_store = test_repo_config.online_store + + with tempfile.TemporaryDirectory() as repo_dir_name: + config = RepoConfig( + registry=str(Path(repo_dir_name) / "registry.db"), + project=project, + provider=test_repo_config.provider, + offline_store=offline_store, + online_store=online_store, + repo_path=repo_dir_name, + ) + fs = FeatureStore(config=config) + fv = correctness_feature_view(ds) + entity = driver() + fs.apply([fv, entity]) + + yield fs + + fs.teardown() + offline_creator.teardown(project) + + +def parametrize_e2e_test(e2e_test): + @pytest.mark.integration + @pytest.mark.parametrize("config", FULL_REPO_CONFIGS, ids=lambda v: v.provider) + def inner_test(config): + with construct_feature_store(config) as fs: + e2e_test(fs) + + return inner_test diff --git a/sdk/python/tests/integration/feature_repos/universal/data_source_creator.py b/sdk/python/tests/integration/feature_repos/universal/data_source_creator.py new file mode 100644 index 0000000000..b85aeeeb39 --- /dev/null +++ b/sdk/python/tests/integration/feature_repos/universal/data_source_creator.py @@ -0,0 +1,26 @@ +from abc import ABC, abstractmethod + +import pandas as pd + +from feast.data_source import DataSource +from feast.repo_config import FeastConfigBaseModel + + +class DataSourceCreator(ABC): + @abstractmethod + def create_data_source( + self, + name: str, + df: pd.DataFrame, + event_timestamp_column="ts", + created_timestamp_column="created_ts", + ) -> DataSource: + ... + + @abstractmethod + def create_offline_store_config(self) -> FeastConfigBaseModel: + ... + + @abstractmethod + def teardown(self, name: str): + ... diff --git a/sdk/python/tests/integration/feature_repos/universal/data_sources/bigquery.py b/sdk/python/tests/integration/feature_repos/universal/data_sources/bigquery.py new file mode 100644 index 0000000000..7776b31e6d --- /dev/null +++ b/sdk/python/tests/integration/feature_repos/universal/data_sources/bigquery.py @@ -0,0 +1,54 @@ +import time + +import pandas as pd +from google.cloud import bigquery + +from feast import BigQuerySource +from feast.data_source import DataSource +from feast.infra.offline_stores.bigquery import BigQueryOfflineStoreConfig +from tests.integration.feature_repos.universal.data_source_creator import ( + DataSourceCreator, +) + + +class BigQueryDataSourceCreator(DataSourceCreator): + def teardown(self, name: str): + pass + + def __init__(self): + self.client = bigquery.Client() + + def create_offline_store_config(self): + return BigQueryOfflineStoreConfig() + + def create_data_source( + self, + name: str, + df: pd.DataFrame, + event_timestamp_column="ts", + created_timestamp_column="created_ts", + **kwargs, + ) -> DataSource: + gcp_project = self.client.project + bigquery_dataset = "test_ingestion" + dataset = bigquery.Dataset(f"{gcp_project}.{bigquery_dataset}") + self.client.create_dataset(dataset, exists_ok=True) + dataset.default_table_expiration_ms = ( + 1000 * 60 * 60 * 24 * 14 + ) # 2 weeks in milliseconds + self.client.update_dataset(dataset, ["default_table_expiration_ms"]) + + job_config = bigquery.LoadJobConfig() + table_ref = f"{gcp_project}.{bigquery_dataset}.{name}_{int(time.time_ns())}" + job = self.client.load_table_from_dataframe( + df, table_ref, job_config=job_config + ) + job.result() + + return BigQuerySource( + table_ref=table_ref, + event_timestamp_column=event_timestamp_column, + created_timestamp_column=created_timestamp_column, + date_partition_column="", + field_mapping={"ts_1": "ts", "id": "driver_id"}, + ) diff --git a/sdk/python/tests/integration/feature_repos/universal/data_sources/file.py b/sdk/python/tests/integration/feature_repos/universal/data_sources/file.py new file mode 100644 index 0000000000..49618dd698 --- /dev/null +++ b/sdk/python/tests/integration/feature_repos/universal/data_sources/file.py @@ -0,0 +1,41 @@ +import tempfile +from typing import Any + +import pandas as pd + +from feast import FileSource +from feast.data_format import ParquetFormat +from feast.data_source import DataSource +from feast.infra.offline_stores.file import FileOfflineStoreConfig +from feast.repo_config import FeastConfigBaseModel +from tests.integration.feature_repos.universal.data_source_creator import ( + DataSourceCreator, +) + + +class FileDataSourceCreator(DataSourceCreator): + f: Any + + def create_data_source( + self, + name: str, + df: pd.DataFrame, + event_timestamp_column="ts", + created_timestamp_column="created_ts", + ) -> DataSource: + self.f = tempfile.NamedTemporaryFile(suffix=".parquet", delete=False) + df.to_parquet(self.f.name) + return FileSource( + file_format=ParquetFormat(), + path=f"file://{self.f.name}", + event_timestamp_column=event_timestamp_column, + created_timestamp_column=created_timestamp_column, + date_partition_column="", + field_mapping={"ts_1": "ts", "id": "driver_id"}, + ) + + def create_offline_store_config(self) -> FeastConfigBaseModel: + return FileOfflineStoreConfig() + + def teardown(self, name: str): + self.f.close() diff --git a/sdk/python/tests/integration/feature_repos/universal/data_sources/redshift.py b/sdk/python/tests/integration/feature_repos/universal/data_sources/redshift.py new file mode 100644 index 0000000000..f34490fc8a --- /dev/null +++ b/sdk/python/tests/integration/feature_repos/universal/data_sources/redshift.py @@ -0,0 +1,76 @@ +import random +import time +from typing import Optional + +import pandas as pd + +from feast import RedshiftSource +from feast.data_source import DataSource +from feast.infra.offline_stores.redshift import RedshiftOfflineStoreConfig +from feast.infra.utils import aws_utils +from feast.repo_config import FeastConfigBaseModel +from tests.integration.feature_repos.universal.data_source_creator import ( + DataSourceCreator, +) + + +class RedshiftDataSourceCreator(DataSourceCreator): + + table_name: Optional[str] = None + redshift_source: Optional[RedshiftSource] = None + + def __init__(self) -> None: + super().__init__() + self.client = aws_utils.get_redshift_data_client("us-west-2") + self.s3 = aws_utils.get_s3_resource("us-west-2") + + self.offline_store_config = RedshiftOfflineStoreConfig( + cluster_id="feast-integration-tests", + region="us-west-2", + user="admin", + database="feast", + s3_staging_location="s3://feast-integration-tests/redshift/tests/ingestion", + iam_role="arn:aws:iam::402087665549:role/redshift_s3_access_role", + ) + + def create_data_source( + self, + name: str, + df: pd.DataFrame, + event_timestamp_column="ts", + created_timestamp_column="created_ts", + ) -> DataSource: + self.table_name = f"{name}_{time.time_ns()}_{random.randint(1000, 9999)}" + aws_utils.upload_df_to_redshift( + self.client, + self.offline_store_config.cluster_id, + self.offline_store_config.database, + self.offline_store_config.user, + self.s3, + f"{self.offline_store_config.s3_staging_location}/copy/{self.table_name}.parquet", + self.offline_store_config.iam_role, + self.table_name, + df, + ) + + self.redshift_source = RedshiftSource( + table=self.table_name, + event_timestamp_column=event_timestamp_column, + created_timestamp_column=created_timestamp_column, + date_partition_column="", + field_mapping={"ts_1": "ts", "id": "driver_id"}, + ) + return self.redshift_source + + def create_offline_store_config(self) -> FeastConfigBaseModel: + return self.offline_store_config + + def teardown(self, name: str): + if self.table_name: + aws_utils.execute_redshift_statement( + self.client, + self.offline_store_config.cluster_id, + self.offline_store_config.database, + self.offline_store_config.user, + f"DROP TABLE {self.table_name}", + ) diff --git a/sdk/python/tests/integration/feature_repos/universal/entities.py b/sdk/python/tests/integration/feature_repos/universal/entities.py new file mode 100644 index 0000000000..9b4352eb83 --- /dev/null +++ b/sdk/python/tests/integration/feature_repos/universal/entities.py @@ -0,0 +1,10 @@ +from feast import Entity, ValueType + + +def driver(): + return Entity( + name="driver", # The name is derived from this argument, not object name. + value_type=ValueType.INT64, + description="driver id", + join_key="driver_id", + ) diff --git a/sdk/python/tests/integration/feature_repos/universal/feature_views.py b/sdk/python/tests/integration/feature_repos/universal/feature_views.py new file mode 100644 index 0000000000..94c80bb84c --- /dev/null +++ b/sdk/python/tests/integration/feature_repos/universal/feature_views.py @@ -0,0 +1,14 @@ +from datetime import timedelta + +from feast import Feature, FeatureView, ValueType +from feast.data_source import DataSource + + +def correctness_feature_view(data_source: DataSource) -> FeatureView: + return FeatureView( + name="test_correctness", + entities=["driver"], + features=[Feature("value", ValueType.FLOAT)], + ttl=timedelta(days=5), + input=data_source, + ) diff --git a/sdk/python/tests/integration/materialization/test_offline_online_store_consistency.py b/sdk/python/tests/integration/materialization/test_offline_online_store_consistency.py index 154b3b8c22..71954611a3 100644 --- a/sdk/python/tests/integration/materialization/test_offline_online_store_consistency.py +++ b/sdk/python/tests/integration/materialization/test_offline_online_store_consistency.py @@ -11,13 +11,11 @@ import pandas as pd import pytest from google.cloud import bigquery -from pytz import timezone, utc +from pytz import utc from feast import BigQuerySource, FileSource, RedshiftSource from feast.data_format import ParquetFormat -from feast.data_source import DataSource from feast.entity import Entity -from feast.feature import Feature from feast.feature_store import FeatureStore from feast.feature_view import FeatureView from feast.infra.offline_stores.file import FileOfflineStoreConfig @@ -29,42 +27,12 @@ from feast.infra.utils import aws_utils from feast.repo_config import RepoConfig from feast.value_type import ValueType +from tests.data.data_creator import create_dataset +from tests.integration.feature_repos.universal.feature_views import ( + correctness_feature_view, +) -def create_dataset() -> pd.DataFrame: - now = datetime.utcnow() - ts = pd.Timestamp(now).round("ms") - data = { - "id": [1, 2, 1, 3, 3], - "value": [0.1, None, 0.3, 4, 5], - "ts_1": [ - ts - timedelta(hours=4), - ts, - ts - timedelta(hours=3), - # Use different time zones to test tz-naive -> tz-aware conversion - (ts - timedelta(hours=4)) - .replace(tzinfo=utc) - .astimezone(tz=timezone("Europe/Berlin")), - (ts - timedelta(hours=1)) - .replace(tzinfo=utc) - .astimezone(tz=timezone("US/Pacific")), - ], - "created_ts": [ts, ts, ts, ts, ts], - } - return pd.DataFrame.from_dict(data) - - -def get_feature_view(data_source: DataSource) -> FeatureView: - return FeatureView( - name="test_bq_correctness", - entities=["driver"], - features=[Feature("value", ValueType.FLOAT)], - ttl=timedelta(days=5), - batch_source=data_source, - ) - - -# bq_source_type must be one of "query" and "table" @contextlib.contextmanager def prep_bq_fs_and_fv( bq_source_type: str, @@ -96,7 +64,7 @@ def prep_bq_fs_and_fv( field_mapping={"ts_1": "ts", "id": "driver_id"}, ) - fv = get_feature_view(bigquery_source) + fv = correctness_feature_view(bigquery_source) e = Entity( name="driver", description="id for driver", @@ -159,7 +127,7 @@ def prep_redshift_fs_and_fv( field_mapping={"ts_1": "ts", "id": "driver_id"}, ) - fv = get_feature_view(redshift_source) + fv = correctness_feature_view(redshift_source) e = Entity( name="driver", description="id for driver", @@ -207,7 +175,7 @@ def prep_local_fs_and_fv() -> Iterator[Tuple[FeatureStore, FeatureView]]: date_partition_column="", field_mapping={"ts_1": "ts", "id": "driver_id"}, ) - fv = get_feature_view(file_source) + fv = correctness_feature_view(file_source) e = Entity( name="driver", description="id for driver", @@ -248,7 +216,7 @@ def prep_redis_fs_and_fv() -> Iterator[Tuple[FeatureStore, FeatureView]]: date_partition_column="", field_mapping={"ts_1": "ts", "id": "driver_id"}, ) - fv = get_feature_view(file_source) + fv = correctness_feature_view(file_source) e = Entity( name="driver", description="id for driver", @@ -290,7 +258,7 @@ def prep_dynamodb_fs_and_fv() -> Iterator[Tuple[FeatureStore, FeatureView]]: date_partition_column="", field_mapping={"ts_1": "ts", "id": "driver_id"}, ) - fv = get_feature_view(file_source) + fv = correctness_feature_view(file_source) e = Entity( name="driver", description="id for driver",