Skip to content

Commit

Permalink
fix: Add columns for user metadata in the tables (#2760)
Browse files Browse the repository at this point in the history
* fix: Add columns for user metadata in the tables

Signed-off-by: Achal Shah <achals@gmail.com>

* registry -> base registry

Signed-off-by: Achal Shah <achals@gmail.com>

* metadata methods

Signed-off-by: Achal Shah <achals@gmail.com>

* metadata methods

Signed-off-by: Achal Shah <achals@gmail.com>

* tests

Signed-off-by: Achal Shah <achals@gmail.com>

* one more test assert

Signed-off-by: Achal Shah <achals@gmail.com>

* cr update

Signed-off-by: Achal Shah <achals@gmail.com>
  • Loading branch information
achals authored Jun 7, 2022
1 parent 4339c0a commit 269055e
Show file tree
Hide file tree
Showing 15 changed files with 400 additions and 119 deletions.
11 changes: 7 additions & 4 deletions sdk/python/feast/diff/registry_diff.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
from feast.protos.feast.core.ValidationProfile_pb2 import (
ValidationReference as ValidationReferenceProto,
)
from feast.registry import FEAST_OBJECT_TYPES, FeastObjectType, Registry
from feast.registry import FEAST_OBJECT_TYPES, BaseRegistry, FeastObjectType
from feast.repo_contents import RepoContents


Expand Down Expand Up @@ -161,7 +161,7 @@ def diff_registry_objects(


def extract_objects_for_keep_delete_update_add(
registry: Registry, current_project: str, desired_repo_contents: RepoContents,
registry: BaseRegistry, current_project: str, desired_repo_contents: RepoContents,
) -> Tuple[
Dict[FeastObjectType, Set[FeastObject]],
Dict[FeastObjectType, Set[FeastObject]],
Expand Down Expand Up @@ -208,7 +208,7 @@ def extract_objects_for_keep_delete_update_add(


def diff_between(
registry: Registry, current_project: str, desired_repo_contents: RepoContents,
registry: BaseRegistry, current_project: str, desired_repo_contents: RepoContents,
) -> RegistryDiff:
"""
Returns the difference between the current and desired repo states.
Expand Down Expand Up @@ -267,7 +267,10 @@ def diff_between(


def apply_diff_to_registry(
registry: Registry, registry_diff: RegistryDiff, project: str, commit: bool = True
registry: BaseRegistry,
registry_diff: RegistryDiff,
project: str,
commit: bool = True,
):
"""
Applies the given diff to the given Feast project in the registry.
Expand Down
8 changes: 4 additions & 4 deletions sdk/python/feast/feature_logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
)

if TYPE_CHECKING:
from feast import FeatureService
from feast.registry import Registry
from feast.feature_service import FeatureService
from feast.registry import BaseRegistry


REQUEST_ID_FIELD = "__request_id"
Expand All @@ -33,7 +33,7 @@ class LoggingSource:
"""

@abc.abstractmethod
def get_schema(self, registry: "Registry") -> pa.Schema:
def get_schema(self, registry: "BaseRegistry") -> pa.Schema:
""" Generate schema for logs destination. """
raise NotImplementedError

Expand All @@ -48,7 +48,7 @@ def __init__(self, feature_service: "FeatureService", project: str):
self._feature_service = feature_service
self._project = project

def get_schema(self, registry: "Registry") -> pa.Schema:
def get_schema(self, registry: "BaseRegistry") -> pa.Schema:
fields: Dict[str, pa.DataType] = {}

for projection in self._feature_service.feature_view_projections:
Expand Down
19 changes: 7 additions & 12 deletions sdk/python/feast/feature_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,14 +74,13 @@
from feast.infra.registry_stores.sql import SqlRegistry
from feast.on_demand_feature_view import OnDemandFeatureView
from feast.online_response import OnlineResponse
from feast.protos.feast.core.InfraObject_pb2 import Infra as InfraProto
from feast.protos.feast.serving.ServingService_pb2 import (
FieldStatus,
GetOnlineFeaturesResponse,
)
from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto
from feast.protos.feast.types.Value_pb2 import RepeatedValue, Value
from feast.registry import Registry
from feast.registry import BaseRegistry, Registry
from feast.repo_config import RepoConfig, load_repo_config
from feast.repo_contents import RepoContents
from feast.request_feature_view import RequestFeatureView
Expand Down Expand Up @@ -113,7 +112,7 @@ class FeatureStore:

config: RepoConfig
repo_path: Path
_registry: Registry
_registry: BaseRegistry
_provider: Provider
_go_server: "EmbeddedOnlineFeatureServer"

Expand Down Expand Up @@ -142,8 +141,9 @@ def __init__(
if registry_config.registry_type == "sql":
self._registry = SqlRegistry(registry_config, None)
else:
self._registry = Registry(registry_config, repo_path=self.repo_path)
self._registry._initialize_registry()
r = Registry(registry_config, repo_path=self.repo_path)
r._initialize_registry()
self._registry = r
self._provider = get_provider(self.config, self.repo_path)
self._go_server = None

Expand All @@ -153,7 +153,7 @@ def version(self) -> str:
return get_version()

@property
def registry(self) -> Registry:
def registry(self) -> BaseRegistry:
"""Gets the registry of this feature store."""
return self._registry

Expand Down Expand Up @@ -644,12 +644,7 @@ def _plan(
# Compute the desired difference between the current infra, as stored in the registry,
# and the desired infra.
self._registry.refresh()
current_infra_proto = (
self._registry.cached_registry_proto.infra.__deepcopy__()
if hasattr(self._registry, "cached_registry_proto")
and self._registry.cached_registry_proto
else InfraProto()
)
current_infra_proto = self._registry.proto().infra.__deepcopy__()
desired_registry_proto = desired_repo_contents.to_registry_proto()
new_infra = self._provider.plan_infra(self.config, desired_registry_proto)
new_infra_proto = new_infra.to_proto()
Expand Down
6 changes: 3 additions & 3 deletions sdk/python/feast/infra/offline_stores/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
RetrievalMetadata,
)
from feast.on_demand_feature_view import OnDemandFeatureView
from feast.registry import Registry
from feast.registry import BaseRegistry
from feast.repo_config import FeastConfigBaseModel, RepoConfig

from ...saved_dataset import SavedDatasetStorage
Expand Down Expand Up @@ -169,7 +169,7 @@ def get_historical_features(
feature_views: List[FeatureView],
feature_refs: List[str],
entity_df: Union[pd.DataFrame, str],
registry: Registry,
registry: BaseRegistry,
project: str,
full_feature_names: bool = False,
) -> RetrievalJob:
Expand Down Expand Up @@ -262,7 +262,7 @@ def write_logged_features(
data: Union[pyarrow.Table, Path],
source: LoggingSource,
logging_config: LoggingConfig,
registry: Registry,
registry: BaseRegistry,
):
destination = logging_config.destination
assert isinstance(destination, BigQueryLoggingDestination)
Expand Down
6 changes: 3 additions & 3 deletions sdk/python/feast/infra/offline_stores/file.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
_get_requested_feature_views_to_features_dict,
_run_dask_field_mapping,
)
from feast.registry import Registry
from feast.registry import BaseRegistry
from feast.repo_config import FeastConfigBaseModel, RepoConfig
from feast.saved_dataset import SavedDatasetStorage
from feast.usage import log_exceptions_and_usage
Expand Down Expand Up @@ -113,7 +113,7 @@ def get_historical_features(
feature_views: List[FeatureView],
feature_refs: List[str],
entity_df: Union[pd.DataFrame, str],
registry: Registry,
registry: BaseRegistry,
project: str,
full_feature_names: bool = False,
) -> RetrievalJob:
Expand Down Expand Up @@ -380,7 +380,7 @@ def write_logged_features(
data: Union[pyarrow.Table, Path],
source: LoggingSource,
logging_config: LoggingConfig,
registry: Registry,
registry: BaseRegistry,
):
destination = logging_config.destination
assert isinstance(destination, FileLoggingDestination)
Expand Down
6 changes: 3 additions & 3 deletions sdk/python/feast/infra/offline_stores/offline_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
from feast.feature_logging import LoggingConfig, LoggingSource
from feast.feature_view import FeatureView
from feast.on_demand_feature_view import OnDemandFeatureView
from feast.registry import Registry
from feast.registry import BaseRegistry
from feast.repo_config import RepoConfig
from feast.saved_dataset import SavedDatasetStorage

Expand Down Expand Up @@ -211,7 +211,7 @@ def get_historical_features(
feature_views: List[FeatureView],
feature_refs: List[str],
entity_df: Union[pd.DataFrame, str],
registry: Registry,
registry: BaseRegistry,
project: str,
full_feature_names: bool = False,
) -> RetrievalJob:
Expand Down Expand Up @@ -252,7 +252,7 @@ def write_logged_features(
data: Union[pyarrow.Table, Path],
source: LoggingSource,
logging_config: LoggingConfig,
registry: Registry,
registry: BaseRegistry,
):
"""
Write logged features to a specified destination (taken from logging_config) in the offline store.
Expand Down
8 changes: 4 additions & 4 deletions sdk/python/feast/infra/offline_stores/offline_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
from jinja2 import BaseLoader, Environment
from pandas import Timestamp

import feast
from feast.errors import (
EntityTimestampInferenceException,
FeastEntityDFMissingColumnsError,
Expand All @@ -17,7 +16,7 @@
from feast.importer import import_class
from feast.infra.offline_stores.offline_store import OfflineStore
from feast.infra.provider import _get_requested_feature_views_to_features_dict
from feast.registry import Registry
from feast.registry import BaseRegistry
from feast.utils import to_naive_utc

DEFAULT_ENTITY_DF_EVENT_TIMESTAMP_COL = "event_timestamp"
Expand Down Expand Up @@ -55,8 +54,9 @@ def assert_expected_columns_in_entity_df(
raise FeastEntityDFMissingColumnsError(expected_columns, missing_keys)


# TODO: Remove project and registry from the interface and call sites.
def get_expected_join_keys(
project: str, feature_views: List["feast.FeatureView"], registry: Registry
project: str, feature_views: List[FeatureView], registry: BaseRegistry
) -> Set[str]:
join_keys = set()
for feature_view in feature_views:
Expand Down Expand Up @@ -95,7 +95,7 @@ class FeatureViewQueryContext:
def get_feature_view_query_context(
feature_refs: List[str],
feature_views: List[FeatureView],
registry: Registry,
registry: BaseRegistry,
project: str,
entity_df_timestamp_range: Tuple[datetime, datetime],
) -> List[FeatureViewQueryContext]:
Expand Down
6 changes: 3 additions & 3 deletions sdk/python/feast/infra/offline_stores/redshift.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
SavedDatasetRedshiftStorage,
)
from feast.infra.utils import aws_utils
from feast.registry import Registry
from feast.registry import BaseRegistry
from feast.repo_config import FeastConfigBaseModel, RepoConfig
from feast.saved_dataset import SavedDatasetStorage
from feast.usage import log_exceptions_and_usage
Expand Down Expand Up @@ -176,7 +176,7 @@ def get_historical_features(
feature_views: List[FeatureView],
feature_refs: List[str],
entity_df: Union[pd.DataFrame, str],
registry: Registry,
registry: BaseRegistry,
project: str,
full_feature_names: bool = False,
) -> RetrievalJob:
Expand Down Expand Up @@ -269,7 +269,7 @@ def write_logged_features(
data: Union[pyarrow.Table, Path],
source: LoggingSource,
logging_config: LoggingConfig,
registry: Registry,
registry: BaseRegistry,
):
destination = logging_config.destination
assert isinstance(destination, RedshiftLoggingDestination)
Expand Down
6 changes: 3 additions & 3 deletions sdk/python/feast/infra/offline_stores/snowflake.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
write_pandas,
write_parquet,
)
from feast.registry import Registry
from feast.registry import BaseRegistry
from feast.repo_config import FeastConfigBaseModel, RepoConfig
from feast.saved_dataset import SavedDatasetStorage
from feast.usage import log_exceptions_and_usage
Expand Down Expand Up @@ -206,7 +206,7 @@ def get_historical_features(
feature_views: List[FeatureView],
feature_refs: List[str],
entity_df: Union[pd.DataFrame, str],
registry: Registry,
registry: BaseRegistry,
project: str,
full_feature_names: bool = False,
) -> RetrievalJob:
Expand Down Expand Up @@ -284,7 +284,7 @@ def write_logged_features(
data: Union[pyarrow.Table, Path],
source: LoggingSource,
logging_config: LoggingConfig,
registry: Registry,
registry: BaseRegistry,
):
assert isinstance(logging_config.destination, SnowflakeLoggingDestination)

Expand Down
10 changes: 5 additions & 5 deletions sdk/python/feast/infra/passthrough_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
)
from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto
from feast.protos.feast.types.Value_pb2 import Value as ValueProto
from feast.registry import Registry
from feast.registry import BaseRegistry
from feast.repo_config import RepoConfig
from feast.saved_dataset import SavedDataset
from feast.usage import RatioSampler, log_exceptions_and_usage, set_usage_attribute
Expand Down Expand Up @@ -138,7 +138,7 @@ def materialize_single_feature_view(
feature_view: FeatureView,
start_date: datetime,
end_date: datetime,
registry: Registry,
registry: BaseRegistry,
project: str,
tqdm_builder: Callable[[int], tqdm],
) -> None:
Expand Down Expand Up @@ -194,7 +194,7 @@ def get_historical_features(
feature_views: List[FeatureView],
feature_refs: List[str],
entity_df: Union[pandas.DataFrame, str],
registry: Registry,
registry: BaseRegistry,
project: str,
full_feature_names: bool,
) -> RetrievalJob:
Expand Down Expand Up @@ -240,7 +240,7 @@ def write_feature_service_logs(
feature_service: FeatureService,
logs: Union[pyarrow.Table, str],
config: RepoConfig,
registry: Registry,
registry: BaseRegistry,
):
assert (
feature_service.logging_config is not None
Expand All @@ -260,7 +260,7 @@ def retrieve_feature_service_logs(
start_date: datetime,
end_date: datetime,
config: RepoConfig,
registry: Registry,
registry: BaseRegistry,
) -> RetrievalJob:
assert (
feature_service.logging_config is not None
Expand Down
10 changes: 5 additions & 5 deletions sdk/python/feast/infra/provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
from feast.protos.feast.core.Registry_pb2 import Registry as RegistryProto
from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto
from feast.protos.feast.types.Value_pb2 import Value as ValueProto
from feast.registry import Registry
from feast.registry import BaseRegistry
from feast.repo_config import RepoConfig
from feast.saved_dataset import SavedDataset
from feast.type_map import python_values_to_proto_values
Expand Down Expand Up @@ -133,7 +133,7 @@ def materialize_single_feature_view(
feature_view: FeatureView,
start_date: datetime,
end_date: datetime,
registry: Registry,
registry: BaseRegistry,
project: str,
tqdm_builder: Callable[[int], tqdm],
) -> None:
Expand All @@ -146,7 +146,7 @@ def get_historical_features(
feature_views: List[FeatureView],
feature_refs: List[str],
entity_df: Union[pandas.DataFrame, str],
registry: Registry,
registry: BaseRegistry,
project: str,
full_feature_names: bool,
) -> RetrievalJob:
Expand Down Expand Up @@ -192,7 +192,7 @@ def write_feature_service_logs(
feature_service: FeatureService,
logs: Union[pyarrow.Table, Path],
config: RepoConfig,
registry: Registry,
registry: BaseRegistry,
):
"""
Write features and entities logged by a feature server to an offline store.
Expand All @@ -211,7 +211,7 @@ def retrieve_feature_service_logs(
start_date: datetime,
end_date: datetime,
config: RepoConfig,
registry: Registry,
registry: BaseRegistry,
) -> RetrievalJob:
"""
Read logged features from an offline store for a given time window [from, to).
Expand Down
Loading

0 comments on commit 269055e

Please sign in to comment.