Skip to content

Commit

Permalink
Add the foundation of the universal feature repo and a test that uses…
Browse files Browse the repository at this point in the history
… it (#1734)

* Add the foundation of the universal feature repo and a test that uses it

Signed-off-by: Achal Shah <achals@gmail.com>

* Make tests actually work

Signed-off-by: Achal Shah <achals@gmail.com>

* Make format

Signed-off-by: Achal Shah <achals@gmail.com>

* Make format

Signed-off-by: Achal Shah <achals@gmail.com>

* add a redshift data source creator

Signed-off-by: Achal Shah <achals@gmail.com>

* integration test

Signed-off-by: Achal Shah <achals@gmail.com>

* file data source creator

Signed-off-by: Achal Shah <achals@gmail.com>

* fix online store ref

Signed-off-by: Achal Shah <achals@gmail.com>

* dynamodb region

Signed-off-by: Achal Shah <achals@gmail.com>

* fix file

Signed-off-by: Achal Shah <achals@gmail.com>

* remove impor

Signed-off-by: Achal Shah <achals@gmail.com>

* close not delete

Signed-off-by: Achal Shah <achals@gmail.com>

* Refactor configs into test_repo_config

Signed-off-by: Achal Shah <achals@gmail.com>

* make forma

Signed-off-by: Achal Shah <achals@gmail.com>

* Add a sweet decorator per feedback

Signed-off-by: Achal Shah <achals@gmail.com>

* make format

Signed-off-by: Achal Shah <achals@gmail.com>

* move stuff into with

Signed-off-by: Achal Shah <achals@gmail.com>

* Specify repo_path for tests to succeed

Signed-off-by: Achal Shah <achals@gmail.com>

* fix comments

Signed-off-by: Achal Shah <achals@gmail.com>

* fix format

Signed-off-by: Achal Shah <achals@gmail.com>
  • Loading branch information
achals authored Aug 4, 2021
1 parent 1f505e0 commit 7977a53
Show file tree
Hide file tree
Showing 11 changed files with 491 additions and 42 deletions.
1 change: 1 addition & 0 deletions sdk/python/feast/repo_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ class RepoConfig(FeastBaseModel):

def __init__(self, **data: Any):
super().__init__(**data)

if isinstance(self.online_store, Dict):
self.online_store = get_online_config_from_type(self.online_store["type"])(
**self.online_store
Expand Down
27 changes: 27 additions & 0 deletions sdk/python/tests/data/data_creator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
from datetime import datetime, timedelta

import pandas as pd
from pytz import timezone, utc


def create_dataset() -> pd.DataFrame:
now = datetime.utcnow()
ts = pd.Timestamp(now).round("ms")
data = {
"id": [1, 2, 1, 3, 3],
"value": [0.1, None, 0.3, 4, 5],
"ts_1": [
ts - timedelta(hours=4),
ts,
ts - timedelta(hours=3),
# Use different time zones to test tz-naive -> tz-aware conversion
(ts - timedelta(hours=4))
.replace(tzinfo=utc)
.astimezone(tz=timezone("Europe/Berlin")),
(ts - timedelta(hours=1))
.replace(tzinfo=utc)
.astimezone(tz=timezone("US/Pacific")),
],
"created_ts": [ts, ts, ts, ts, ts],
}
return pd.DataFrame.from_dict(data)
123 changes: 123 additions & 0 deletions sdk/python/tests/integration/e2e/test_universal_e2e.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
import math
from datetime import datetime, timedelta
from typing import Optional

import pandas as pd
from pytz import utc

from feast import FeatureStore, FeatureView
from tests.integration.feature_repos.test_repo_configuration import parametrize_e2e_test


@parametrize_e2e_test
def test_e2e_consistency(fs: FeatureStore):
run_offline_online_store_consistency_test(fs)


def check_offline_and_online_features(
fs: FeatureStore,
fv: FeatureView,
driver_id: int,
event_timestamp: datetime,
expected_value: Optional[float],
full_feature_names: bool,
check_offline_store: bool = True,
) -> None:
# Check online store
response_dict = fs.get_online_features(
[f"{fv.name}:value"],
[{"driver": driver_id}],
full_feature_names=full_feature_names,
).to_dict()

if full_feature_names:
if expected_value:
assert abs(response_dict[f"{fv.name}__value"][0] - expected_value) < 1e-6
else:
assert response_dict[f"{fv.name}__value"][0] is None
else:
if expected_value:
assert abs(response_dict["value"][0] - expected_value) < 1e-6
else:
assert response_dict["value"][0] is None

# Check offline store
if check_offline_store:
df = fs.get_historical_features(
entity_df=pd.DataFrame.from_dict(
{"driver_id": [driver_id], "event_timestamp": [event_timestamp]}
),
features=[f"{fv.name}:value"],
full_feature_names=full_feature_names,
).to_df()

if full_feature_names:
if expected_value:
assert abs(df.to_dict()[f"{fv.name}__value"][0] - expected_value) < 1e-6
else:
assert math.isnan(df.to_dict()[f"{fv.name}__value"][0])
else:
if expected_value:
assert abs(df.to_dict()["value"][0] - expected_value) < 1e-6
else:
assert math.isnan(df.to_dict()["value"][0])


def run_offline_online_store_consistency_test(fs: FeatureStore,) -> None:
now = datetime.utcnow()

fv = fs.get_feature_view("test_correctness")
full_feature_names = True
check_offline_store: bool = True

# Run materialize()
# use both tz-naive & tz-aware timestamps to test that they're both correctly handled
start_date = (now - timedelta(hours=5)).replace(tzinfo=utc)
end_date = now - timedelta(hours=2)
fs.materialize(feature_views=[fv.name], start_date=start_date, end_date=end_date)

# check result of materialize()
check_offline_and_online_features(
fs=fs,
fv=fv,
driver_id=1,
event_timestamp=end_date,
expected_value=0.3,
full_feature_names=full_feature_names,
check_offline_store=check_offline_store,
)

check_offline_and_online_features(
fs=fs,
fv=fv,
driver_id=2,
event_timestamp=end_date,
expected_value=None,
full_feature_names=full_feature_names,
check_offline_store=check_offline_store,
)

# check prior value for materialize_incremental()
check_offline_and_online_features(
fs=fs,
fv=fv,
driver_id=3,
event_timestamp=end_date,
expected_value=4,
full_feature_names=full_feature_names,
check_offline_store=check_offline_store,
)

# run materialize_incremental()
fs.materialize_incremental(feature_views=[fv.name], end_date=now)

# check result of materialize_incremental()
check_offline_and_online_features(
fs=fs,
fv=fv,
driver_id=3,
event_timestamp=now,
expected_value=5,
full_feature_names=full_feature_names,
check_offline_store=check_offline_store,
)
109 changes: 109 additions & 0 deletions sdk/python/tests/integration/feature_repos/test_repo_configuration.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
import tempfile
import uuid
from contextlib import contextmanager
from pathlib import Path
from typing import Dict, List, Union

import pytest
from attr import dataclass

from feast import FeatureStore, RepoConfig, importer
from tests.data.data_creator import create_dataset
from tests.integration.feature_repos.universal.data_source_creator import (
DataSourceCreator,
)
from tests.integration.feature_repos.universal.entities import driver
from tests.integration.feature_repos.universal.feature_views import (
correctness_feature_view,
)


@dataclass
class TestRepoConfig:
"""
This class should hold all possible parameters that may need to be varied by individual tests.
"""

provider: str = "local"
online_store: Union[str, Dict] = "sqlite"

offline_store_creator: str = "tests.integration.feature_repos.universal.data_sources.file.FileDataSourceCreator"

full_feature_names: bool = True


FULL_REPO_CONFIGS: List[TestRepoConfig] = [
TestRepoConfig(), # Local
TestRepoConfig(
provider="aws",
offline_store_creator="tests.integration.feature_repos.universal.data_sources.redshift.RedshiftDataSourceCreator",
online_store={"type": "dynamodb", "region": "us-west-2"},
),
TestRepoConfig(
provider="gcp",
offline_store_creator="tests.integration.feature_repos.universal.data_sources.bigquery.BigQueryDataSourceCreator",
online_store="datastore",
),
]


OFFLINE_STORES: List[str] = []
ONLINE_STORES: List[str] = []
PROVIDERS: List[str] = []


@contextmanager
def construct_feature_store(test_repo_config: TestRepoConfig) -> FeatureStore:
"""
This method should take in the parameters from the test repo config and created a feature repo, apply it,
and return the constructed feature store object to callers.
This feature store object can be interacted for the purposes of tests.
The user is *not* expected to perform any clean up actions.
:param test_repo_config: configuration
:return: A feature store built using the supplied configuration.
"""
df = create_dataset()

project = f"test_correctness_{str(uuid.uuid4()).replace('-', '')[:8]}"

module_name, config_class_name = test_repo_config.offline_store_creator.rsplit(
".", 1
)

offline_creator: DataSourceCreator = importer.get_class_from_type(
module_name, config_class_name, "DataSourceCreator"
)()
ds = offline_creator.create_data_source(project, df)
offline_store = offline_creator.create_offline_store_config()
online_store = test_repo_config.online_store

with tempfile.TemporaryDirectory() as repo_dir_name:
config = RepoConfig(
registry=str(Path(repo_dir_name) / "registry.db"),
project=project,
provider=test_repo_config.provider,
offline_store=offline_store,
online_store=online_store,
repo_path=repo_dir_name,
)
fs = FeatureStore(config=config)
fv = correctness_feature_view(ds)
entity = driver()
fs.apply([fv, entity])

yield fs

fs.teardown()
offline_creator.teardown(project)


def parametrize_e2e_test(e2e_test):
@pytest.mark.integration
@pytest.mark.parametrize("config", FULL_REPO_CONFIGS, ids=lambda v: v.provider)
def inner_test(config):
with construct_feature_store(config) as fs:
e2e_test(fs)

return inner_test
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
from abc import ABC, abstractmethod

import pandas as pd

from feast.data_source import DataSource
from feast.repo_config import FeastConfigBaseModel


class DataSourceCreator(ABC):
@abstractmethod
def create_data_source(
self,
name: str,
df: pd.DataFrame,
event_timestamp_column="ts",
created_timestamp_column="created_ts",
) -> DataSource:
...

@abstractmethod
def create_offline_store_config(self) -> FeastConfigBaseModel:
...

@abstractmethod
def teardown(self, name: str):
...
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
import time

import pandas as pd
from google.cloud import bigquery

from feast import BigQuerySource
from feast.data_source import DataSource
from feast.infra.offline_stores.bigquery import BigQueryOfflineStoreConfig
from tests.integration.feature_repos.universal.data_source_creator import (
DataSourceCreator,
)


class BigQueryDataSourceCreator(DataSourceCreator):
def teardown(self, name: str):
pass

def __init__(self):
self.client = bigquery.Client()

def create_offline_store_config(self):
return BigQueryOfflineStoreConfig()

def create_data_source(
self,
name: str,
df: pd.DataFrame,
event_timestamp_column="ts",
created_timestamp_column="created_ts",
**kwargs,
) -> DataSource:
gcp_project = self.client.project
bigquery_dataset = "test_ingestion"
dataset = bigquery.Dataset(f"{gcp_project}.{bigquery_dataset}")
self.client.create_dataset(dataset, exists_ok=True)
dataset.default_table_expiration_ms = (
1000 * 60 * 60 * 24 * 14
) # 2 weeks in milliseconds
self.client.update_dataset(dataset, ["default_table_expiration_ms"])

job_config = bigquery.LoadJobConfig()
table_ref = f"{gcp_project}.{bigquery_dataset}.{name}_{int(time.time_ns())}"
job = self.client.load_table_from_dataframe(
df, table_ref, job_config=job_config
)
job.result()

return BigQuerySource(
table_ref=table_ref,
event_timestamp_column=event_timestamp_column,
created_timestamp_column=created_timestamp_column,
date_partition_column="",
field_mapping={"ts_1": "ts", "id": "driver_id"},
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
import tempfile
from typing import Any

import pandas as pd

from feast import FileSource
from feast.data_format import ParquetFormat
from feast.data_source import DataSource
from feast.infra.offline_stores.file import FileOfflineStoreConfig
from feast.repo_config import FeastConfigBaseModel
from tests.integration.feature_repos.universal.data_source_creator import (
DataSourceCreator,
)


class FileDataSourceCreator(DataSourceCreator):
f: Any

def create_data_source(
self,
name: str,
df: pd.DataFrame,
event_timestamp_column="ts",
created_timestamp_column="created_ts",
) -> DataSource:
self.f = tempfile.NamedTemporaryFile(suffix=".parquet", delete=False)
df.to_parquet(self.f.name)
return FileSource(
file_format=ParquetFormat(),
path=f"file://{self.f.name}",
event_timestamp_column=event_timestamp_column,
created_timestamp_column=created_timestamp_column,
date_partition_column="",
field_mapping={"ts_1": "ts", "id": "driver_id"},
)

def create_offline_store_config(self) -> FeastConfigBaseModel:
return FileOfflineStoreConfig()

def teardown(self, name: str):
self.f.close()
Loading

0 comments on commit 7977a53

Please sign in to comment.