From d71dc1c1dd432776149ca0e6c694cc8390b45f80 Mon Sep 17 00:00:00 2001 From: Achal Shah <achals@gmail.com> Date: Tue, 14 Dec 2021 00:57:05 -0800 Subject: [PATCH 1/7] Refactor tag methods to infer objects that are created, deleted, and kept Signed-off-by: Achal Shah <achals@gmail.com> --- sdk/python/feast/diff/FcoDiff.py | 165 ++++++++++++++++++++++++++ sdk/python/feast/diff/__init__.py | 0 sdk/python/feast/entity.py | 2 +- sdk/python/feast/feature_service.py | 2 +- sdk/python/feast/feature_store.py | 23 ++-- sdk/python/feast/feature_table.py | 2 +- sdk/python/feast/feature_view.py | 6 +- sdk/python/feast/registry.py | 75 +++++++++--- sdk/python/feast/repo_operations.py | 174 ++++++++++++++-------------- 9 files changed, 334 insertions(+), 115 deletions(-) create mode 100644 sdk/python/feast/diff/FcoDiff.py create mode 100644 sdk/python/feast/diff/__init__.py diff --git a/sdk/python/feast/diff/FcoDiff.py b/sdk/python/feast/diff/FcoDiff.py new file mode 100644 index 0000000000..33d3223b98 --- /dev/null +++ b/sdk/python/feast/diff/FcoDiff.py @@ -0,0 +1,165 @@ +from dataclasses import dataclass +from enum import Enum +from typing import Any, List, Set, Tuple, Union + +from feast.entity import Entity +from feast.feature_service import FeatureService +from feast.feature_table import FeatureTable +from feast.feature_view import FeatureView +from feast.on_demand_feature_view import OnDemandFeatureView +from feast.request_feature_view import RequestFeatureView + + +@dataclass +class PropertyDiff: + property_name: str + val_existing: str + val_declared: str + + +class TransitionType(Enum): + UNKNOWN = 0 + CREATE = 1 + DELETE = 2 + UPDATE = 3 + UNCHANGED = 4 + + +@dataclass +class FcoDiff: + current_fco: Any + new_fco: Any + fco_property_diffs: List[PropertyDiff] + transition_type: TransitionType + + +class RegistryDiff: + fco_diffs: List[FcoDiff] + + def __init__(self): + self.fco_diffs = [] + + def add_fco_diff(self, fco_diff: FcoDiff): + self.fco_diffs.append(fco_diff) + + +def _tag_registry_entities_for_keep_delete( + existing_entities: Set[Entity], desired_entities: Set[Entity] +) -> Tuple[Set[Entity], Set[Entity], Set[Entity]]: + desired_entity_names = {e.name: e for e in desired_entities} + existing_entity_names = {e.name: e for e in existing_entities} + + entities_to_add = set( + [ + desired_entity_names[name] + for name in desired_entity_names.keys() - existing_entity_names.keys() + ] + ) + entities_to_delete = set( + [ + existing_entity_names[name] + for name in existing_entity_names.keys() - desired_entity_names.keys() + ] + ) + entities_to_keep = set( + [ + desired_entity_names[name] + for name in desired_entity_names.keys() & existing_entity_names.keys() + ] + ) + + return entities_to_keep, entities_to_delete, entities_to_add + + +def _tag_registry_views_for_keep_delete( + existing_views: Union[ + Set[FeatureView], Set[RequestFeatureView], Set[OnDemandFeatureView] + ], + desired_views: Union[ + Set[FeatureView], Set[RequestFeatureView], Set[OnDemandFeatureView] + ], +) -> Tuple[ + Set[Union[FeatureView, RequestFeatureView, OnDemandFeatureView]], + Set[Union[FeatureView, RequestFeatureView, OnDemandFeatureView]], + Set[Union[FeatureView, RequestFeatureView, OnDemandFeatureView]], +]: + + existing_views_by_name = {v.name: v for v in existing_views} + desired_views_by_name = {v.name: v for v in desired_views} + + views_to_add = set( + [ + desired_views_by_name[name] + for name in desired_views_by_name.keys() - existing_views_by_name.keys() + ] + ) + views_to_delete = set( + [ + existing_views_by_name[name] + for name in existing_views_by_name.keys() - desired_views_by_name.keys() + ] + ) + views_to_keep = set( + [ + desired_views_by_name[name] + for name in desired_views_by_name.keys() & existing_views_by_name.keys() + ] + ) + return views_to_keep, views_to_delete, views_to_add + + +def _tag_registry_tables_for_keep_delete( + existing_tables: Set[FeatureTable], desired_tables: Set[FeatureTable] +) -> Tuple[Set[FeatureTable], Set[FeatureTable], Set[FeatureTable]]: + existing_tables_by_name = {v.name: v for v in existing_tables} + desired_tables_by_name = {v.name: v for v in desired_tables} + + tables_to_add = set( + [ + desired_tables_by_name[name] + for name in desired_tables_by_name.keys() - existing_tables_by_name.keys() + ] + ) + tables_to_delete = set( + [ + existing_tables_by_name[name] + for name in existing_tables_by_name.keys() - desired_tables_by_name.keys() + ] + ) + tables_to_keep = set( + [ + desired_tables_by_name[name] + for name in desired_tables_by_name.keys() & existing_tables_by_name.keys() + ] + ) + return tables_to_keep, tables_to_delete, tables_to_add + + +def _tag_registry_services_for_keep_delete( + existing_service: Set[FeatureService], desired_service: Set[FeatureService] +) -> Tuple[Set[FeatureService], Set[FeatureService], Set[FeatureService]]: + existing_services_by_name = {v.name: v for v in existing_service} + desired_services_by_name = {v.name: v for v in desired_service} + + services_to_add = set( + [ + desired_services_by_name[name] + for name in desired_services_by_name.keys() + - existing_services_by_name.keys() + ] + ) + services_to_delete = set( + [ + existing_services_by_name[name] + for name in existing_services_by_name.keys() + - desired_services_by_name.keys() + ] + ) + services_to_keep = set( + [ + desired_services_by_name[name] + for name in desired_services_by_name.keys() + & existing_services_by_name.keys() + ] + ) + return services_to_keep, services_to_delete, services_to_add diff --git a/sdk/python/feast/diff/__init__.py b/sdk/python/feast/diff/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/sdk/python/feast/entity.py b/sdk/python/feast/entity.py index 8066eae60a..433f37709e 100644 --- a/sdk/python/feast/entity.py +++ b/sdk/python/feast/entity.py @@ -75,7 +75,7 @@ def __init__( self._last_updated_timestamp: Optional[datetime] = None def __hash__(self) -> int: - return hash((id(self), self.name)) + return hash(self.name) def __eq__(self, other): if not isinstance(other, Entity): diff --git a/sdk/python/feast/feature_service.py b/sdk/python/feast/feature_service.py index 46afdfff1e..6bad5d79a6 100644 --- a/sdk/python/feast/feature_service.py +++ b/sdk/python/feast/feature_service.py @@ -81,7 +81,7 @@ def __str__(self): return str(MessageToJson(self.to_proto())) def __hash__(self): - return hash((id(self), self.name)) + return hash(self.name) def __eq__(self, other): if not isinstance(other, FeatureService): diff --git a/sdk/python/feast/feature_store.py b/sdk/python/feast/feature_store.py index 7a66bfb81d..84e8437ef5 100644 --- a/sdk/python/feast/feature_store.py +++ b/sdk/python/feast/feature_store.py @@ -379,16 +379,18 @@ def apply( ] ], ], - objects_to_delete: List[ - Union[ - FeatureView, - OnDemandFeatureView, - RequestFeatureView, - Entity, - FeatureService, - FeatureTable, + objects_to_delete: Optional[ + List[ + Union[ + FeatureView, + OnDemandFeatureView, + RequestFeatureView, + Entity, + FeatureService, + FeatureTable, + ] ] - ] = [], + ] = None, partial: bool = True, ): """Register objects to metadata store and update related infrastructure. @@ -435,6 +437,9 @@ def apply( assert isinstance(objects, list) + if not objects_to_delete: + objects_to_delete = [] + # Separate all objects into entities, feature services, and different feature view types. entities_to_update = [ob for ob in objects if isinstance(ob, Entity)] views_to_update = [ob for ob in objects if isinstance(ob, FeatureView)] diff --git a/sdk/python/feast/feature_table.py b/sdk/python/feast/feature_table.py index 2c1022de22..bacf3922c8 100644 --- a/sdk/python/feast/feature_table.py +++ b/sdk/python/feast/feature_table.py @@ -69,7 +69,7 @@ def __str__(self): return str(MessageToJson(self.to_proto())) def __hash__(self) -> int: - return hash((id(self), self.name)) + return hash(self.name) def __eq__(self, other): if not isinstance(other, FeatureTable): diff --git a/sdk/python/feast/feature_view.py b/sdk/python/feast/feature_view.py index ac8abefeb0..6411b6b247 100644 --- a/sdk/python/feast/feature_view.py +++ b/sdk/python/feast/feature_view.py @@ -21,6 +21,7 @@ from feast import utils from feast.base_feature_view import BaseFeatureView from feast.data_source import DataSource +from feast.entity import Entity from feast.feature import Feature from feast.feature_view_projection import FeatureViewProjection from feast.protos.feast.core.FeatureView_pb2 import FeatureView as FeatureViewProto @@ -42,6 +43,9 @@ DUMMY_ENTITY_ID = "__dummy_id" DUMMY_ENTITY_NAME = "__dummy" DUMMY_ENTITY_VAL = "" +DUMMY_ENTITY = Entity( + name=DUMMY_ENTITY_NAME, join_key=DUMMY_ENTITY_ID, value_type=ValueType.INT32, +) class FeatureView(BaseFeatureView): @@ -138,7 +142,7 @@ def __init__( # Note: Python requires redefining hash in child classes that override __eq__ def __hash__(self): - return super().__hash__() + return hash(self.name) def __copy__(self): fv = FeatureView( diff --git a/sdk/python/feast/registry.py b/sdk/python/feast/registry.py index 7b523c9274..f5df8c8741 100644 --- a/sdk/python/feast/registry.py +++ b/sdk/python/feast/registry.py @@ -11,7 +11,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - +import logging from collections import defaultdict from datetime import datetime, timedelta from pathlib import Path @@ -24,6 +24,12 @@ from feast import importer from feast.base_feature_view import BaseFeatureView +from feast.diff.FcoDiff import ( + FcoDiff, + RegistryDiff, + TransitionType, + _tag_registry_entities_for_keep_delete, +) from feast.entity import Entity from feast.errors import ( ConflictingFeatureViewNames, @@ -57,6 +63,8 @@ "": "LocalRegistryStore", } +logger = logging.getLogger(__name__) + def get_registry_store_class_from_type(registry_store_type: str): if not registry_store_type.endswith("RegistryStore"): @@ -95,7 +103,9 @@ class Registry: cached_registry_proto_ttl: timedelta cache_being_updated: bool = False - def __init__(self, registry_config: RegistryConfig, repo_path: Path): + def __init__( + self, registry_config: Optional[RegistryConfig], repo_path: Optional[Path] + ): """ Create the Registry object. @@ -104,20 +114,59 @@ def __init__(self, registry_config: RegistryConfig, repo_path: Path): repo_path: Path to the base of the Feast repository or where it will be created if it does not exist yet. """ - registry_store_type = registry_config.registry_store_type - registry_path = registry_config.path - if registry_store_type is None: - cls = get_registry_store_class_from_scheme(registry_path) - else: - cls = get_registry_store_class_from_type(str(registry_store_type)) - self._registry_store = cls(registry_config, repo_path) - self.cached_registry_proto_ttl = timedelta( - seconds=registry_config.cache_ttl_seconds - if registry_config.cache_ttl_seconds is not None - else 0 + if registry_config and repo_path: + registry_store_type = registry_config.registry_store_type + registry_path = registry_config.path + if registry_store_type is None: + cls = get_registry_store_class_from_scheme(registry_path) + else: + cls = get_registry_store_class_from_type(str(registry_store_type)) + + self._registry_store = cls(registry_config, repo_path) + self.cached_registry_proto_ttl = timedelta( + seconds=registry_config.cache_ttl_seconds + if registry_config.cache_ttl_seconds is not None + else 0 + ) + + @classmethod + def from_proto(cls, regsitry_proto: RegistryProto): + registry = cls(None, None) + registry.cached_registry_proto = regsitry_proto + registry.cached_registry_proto_created = datetime.utcnow() + registry.cached_registry_proto_ttl = timedelta() + return registry + + # TODO(achals): This method needs to be filled out and used in the feast plan/apply methods. + @staticmethod + def diff_between( + project: str, current_registry: "Registry", new_registry: "Registry" + ) -> RegistryDiff: + diff = RegistryDiff() + + # Handle Entities + ( + entities_to_keep, + entities_to_delete, + entities_to_add, + ) = _tag_registry_entities_for_keep_delete( + set(current_registry.list_entities(project=project, allow_cache=True)), + set(new_registry.list_entities(project=project, allow_cache=True)), ) + for e in entities_to_add: + diff.add_fco_diff(FcoDiff(None, e, [], TransitionType.CREATE)) + for e in entities_to_delete: + diff.add_fco_diff(FcoDiff(e, None, [], TransitionType.DELETE)) + + # Handle Feature Views + # Handle On Demand Feature Views + # Handle Request Feature Views + # Handle Feature Services + logger.info(f"Diff: {diff}") + return diff + def _initialize_registry(self): """Explicitly initializes the registry with an empty proto if it doesn't exist.""" try: diff --git a/sdk/python/feast/repo_operations.py b/sdk/python/feast/repo_operations.py index 8d4ecba529..9330fc21ba 100644 --- a/sdk/python/feast/repo_operations.py +++ b/sdk/python/feast/repo_operations.py @@ -6,16 +6,23 @@ import sys from importlib.abc import Loader from pathlib import Path -from typing import List, NamedTuple, Set, Tuple, Union, cast +from typing import List, NamedTuple, Set, Union, cast import click from click.exceptions import BadParameter -from feast import Entity, FeatureTable from feast.base_feature_view import BaseFeatureView +from feast.diff.FcoDiff import ( + _tag_registry_entities_for_keep_delete, + _tag_registry_services_for_keep_delete, + _tag_registry_tables_for_keep_delete, + _tag_registry_views_for_keep_delete, +) +from feast.entity import Entity from feast.feature_service import FeatureService from feast.feature_store import FeatureStore -from feast.feature_view import DUMMY_ENTITY_NAME, FeatureView +from feast.feature_table import FeatureTable +from feast.feature_view import DUMMY_ENTITY, DUMMY_ENTITY_NAME, FeatureView from feast.names import adjectives, animals from feast.on_demand_feature_view import OnDemandFeatureView from feast.registry import Registry @@ -122,6 +129,7 @@ def parse_repo(repo_root: Path) -> ParsedRepo: res.on_demand_feature_views.add(obj) elif isinstance(obj, RequestFeatureView): res.request_feature_views.add(obj) + res.entities.add(DUMMY_ENTITY) return res @@ -150,22 +158,77 @@ def apply_total(repo_config: RepoConfig, repo_path: Path, skip_source_validation data_source.validate(store.config) # For each object in the registry, determine whether it should be kept or deleted. - entities_to_keep, entities_to_delete = _tag_registry_entities_for_keep_delete( - project, registry, repo + ( + entities_to_keep, + entities_to_delete, + entities_to_add, + ) = _tag_registry_entities_for_keep_delete( + set(registry.list_entities(project=project)), repo.entities + ) + # TODO(achals): This code path should be refactored to handle added & kept entities separately. + entities_to_keep = entities_to_keep.union(entities_to_add) + + views = _tag_registry_views_for_keep_delete( + set(registry.list_feature_views(project=project)), repo.feature_views + ) + views_to_keep, views_to_delete, views_to_add = ( + cast(Set[FeatureView], views[0]), + cast(Set[FeatureView], views[1]), + cast(Set[FeatureView], views[2]), + ) + + request_views = _tag_registry_views_for_keep_delete( + set(registry.list_request_feature_views(project=project)), + repo.request_feature_views, + ) + request_views_to_keep: Set[RequestFeatureView] + request_views_to_delete: Set[RequestFeatureView] + request_views_to_add: Set[RequestFeatureView] + request_views_to_keep, request_views_to_delete, request_views_to_add = ( + cast(Set[RequestFeatureView], request_views[0]), + cast(Set[RequestFeatureView], request_views[1]), + cast(Set[RequestFeatureView], request_views[2]), + ) + + base_views_to_keep: Set[Union[RequestFeatureView, FeatureView]] = { + *views_to_keep, + *views_to_add, + *request_views_to_keep, + *request_views_to_add, + } + base_views_to_delete: Set[Union[RequestFeatureView, FeatureView]] = { + *views_to_delete, + *request_views_to_delete, + } + + odfvs = _tag_registry_views_for_keep_delete( + set(registry.list_on_demand_feature_views(project=project)), + repo.on_demand_feature_views, ) - views_to_keep, views_to_delete = _tag_registry_views_for_keep_delete( - project, registry, repo + odfvs_to_keep, odfvs_to_delete, odfvs_to_add = ( + cast(Set[OnDemandFeatureView], odfvs[0]), + cast(Set[OnDemandFeatureView], odfvs[1]), + cast(Set[OnDemandFeatureView], odfvs[2]), ) + odfvs_to_keep = odfvs_to_keep.union(odfvs_to_add) + ( - odfvs_to_keep, - odfvs_to_delete, - ) = _tag_registry_on_demand_feature_views_for_keep_delete(project, registry, repo) - tables_to_keep, tables_to_delete = _tag_registry_tables_for_keep_delete( - project, registry, repo + tables_to_keep, + tables_to_delete, + tables_to_add, + ) = _tag_registry_tables_for_keep_delete( + set(registry.list_feature_tables(project=project)), repo.feature_tables ) - services_to_keep, services_to_delete = _tag_registry_services_for_keep_delete( - project, registry, repo + tables_to_keep = tables_to_keep.union(tables_to_add) + + ( + services_to_keep, + services_to_delete, + services_to_add, + ) = _tag_registry_services_for_keep_delete( + set(registry.list_feature_services(project=project)), repo.feature_services ) + services_to_keep = services_to_keep.union(services_to_add) sys.dont_write_bytecode = False @@ -176,7 +239,7 @@ def apply_total(repo_config: RepoConfig, repo_path: Path, skip_source_validation ] ] = [] all_to_apply.extend(entities_to_keep) - all_to_apply.extend(views_to_keep) + all_to_apply.extend(base_views_to_keep) all_to_apply.extend(services_to_keep) all_to_apply.extend(odfvs_to_keep) all_to_apply.extend(tables_to_keep) @@ -186,7 +249,7 @@ def apply_total(repo_config: RepoConfig, repo_path: Path, skip_source_validation ] ] = [] all_to_delete.extend(entities_to_delete) - all_to_delete.extend(views_to_delete) + all_to_delete.extend(base_views_to_delete) all_to_delete.extend(services_to_delete) all_to_delete.extend(odfvs_to_delete) all_to_delete.extend(tables_to_delete) @@ -197,7 +260,7 @@ def apply_total(repo_config: RepoConfig, repo_path: Path, skip_source_validation click.echo( f"Deleted entity {Style.BRIGHT + Fore.GREEN}{entity.name}{Style.RESET_ALL} from registry" ) - for view in views_to_delete: + for view in base_views_to_delete: click.echo( f"Deleted feature view {Style.BRIGHT + Fore.GREEN}{view.name}{Style.RESET_ALL} from registry" ) @@ -216,10 +279,11 @@ def apply_total(repo_config: RepoConfig, repo_path: Path, skip_source_validation ) for entity in entities_to_keep: - click.echo( - f"Registered entity {Style.BRIGHT + Fore.GREEN}{entity.name}{Style.RESET_ALL}" - ) - for view in views_to_keep: + if entity.name != DUMMY_ENTITY_NAME: + click.echo( + f"Registered entity {Style.BRIGHT + Fore.GREEN}{entity.name}{Style.RESET_ALL}" + ) + for view in base_views_to_keep: click.echo( f"Registered feature view {Style.BRIGHT + Fore.GREEN}{view.name}{Style.RESET_ALL}" ) @@ -258,74 +322,6 @@ def apply_total(repo_config: RepoConfig, repo_path: Path, skip_source_validation # TODO: consider echoing also entities being deployed/removed -def _tag_registry_entities_for_keep_delete( - project: str, registry: Registry, repo: ParsedRepo -) -> Tuple[Set[Entity], Set[Entity]]: - entities_to_keep: Set[Entity] = repo.entities - entities_to_delete: Set[Entity] = set() - repo_entities_names = set([e.name for e in repo.entities]) - for registry_entity in registry.list_entities(project=project): - # Do not delete dummy entity. - if ( - registry_entity.name not in repo_entities_names - and registry_entity.name != DUMMY_ENTITY_NAME - ): - entities_to_delete.add(registry_entity) - return entities_to_keep, entities_to_delete - - -def _tag_registry_views_for_keep_delete( - project: str, registry: Registry, repo: ParsedRepo -) -> Tuple[Set[BaseFeatureView], Set[BaseFeatureView]]: - views_to_keep: Set[BaseFeatureView] = cast(Set[BaseFeatureView], repo.feature_views) - for request_fv in repo.request_feature_views: - views_to_keep.add(request_fv) - views_to_delete: Set[BaseFeatureView] = set() - repo_feature_view_names = set(t.name for t in repo.feature_views) - for registry_view in registry.list_feature_views(project=project): - if registry_view.name not in repo_feature_view_names: - views_to_delete.add(registry_view) - return views_to_keep, views_to_delete - - -def _tag_registry_on_demand_feature_views_for_keep_delete( - project: str, registry: Registry, repo: ParsedRepo -) -> Tuple[Set[OnDemandFeatureView], Set[OnDemandFeatureView]]: - odfvs_to_keep: Set[OnDemandFeatureView] = repo.on_demand_feature_views - odfvs_to_delete: Set[OnDemandFeatureView] = set() - repo_on_demand_feature_view_names = set( - t.name for t in repo.on_demand_feature_views - ) - for registry_odfv in registry.list_on_demand_feature_views(project=project): - if registry_odfv.name not in repo_on_demand_feature_view_names: - odfvs_to_delete.add(registry_odfv) - return odfvs_to_keep, odfvs_to_delete - - -def _tag_registry_tables_for_keep_delete( - project: str, registry: Registry, repo: ParsedRepo -) -> Tuple[Set[FeatureTable], Set[FeatureTable]]: - tables_to_keep: Set[FeatureTable] = repo.feature_tables - tables_to_delete: Set[FeatureTable] = set() - repo_table_names = set(t.name for t in repo.feature_tables) - for registry_table in registry.list_feature_tables(project=project): - if registry_table.name not in repo_table_names: - tables_to_delete.add(registry_table) - return tables_to_keep, tables_to_delete - - -def _tag_registry_services_for_keep_delete( - project: str, registry: Registry, repo: ParsedRepo -) -> Tuple[Set[FeatureService], Set[FeatureService]]: - services_to_keep: Set[FeatureService] = repo.feature_services - services_to_delete: Set[FeatureService] = set() - repo_feature_service_names = set(t.name for t in repo.feature_services) - for registry_service in registry.list_feature_services(project=project): - if registry_service.name not in repo_feature_service_names: - services_to_delete.add(registry_service) - return services_to_keep, services_to_delete - - @log_exceptions_and_usage def teardown(repo_config: RepoConfig, repo_path: Path): # Cannot pass in both repo_path and repo_config to FeatureStore. From 711a3df83496458e8f1445af312bc6c582c7a6d9 Mon Sep 17 00:00:00 2001 From: Achal Shah <achals@gmail.com> Date: Tue, 14 Dec 2021 10:55:44 -0800 Subject: [PATCH 2/7] Fixes Signed-off-by: Achal Shah <achals@gmail.com> --- sdk/python/feast/entity.py | 2 +- sdk/python/feast/feature_service.py | 2 +- sdk/python/feast/feature_table.py | 2 +- sdk/python/feast/feature_view.py | 2 +- sdk/python/feast/registry.py | 6 ++++-- 5 files changed, 8 insertions(+), 6 deletions(-) diff --git a/sdk/python/feast/entity.py b/sdk/python/feast/entity.py index 433f37709e..8066eae60a 100644 --- a/sdk/python/feast/entity.py +++ b/sdk/python/feast/entity.py @@ -75,7 +75,7 @@ def __init__( self._last_updated_timestamp: Optional[datetime] = None def __hash__(self) -> int: - return hash(self.name) + return hash((id(self), self.name)) def __eq__(self, other): if not isinstance(other, Entity): diff --git a/sdk/python/feast/feature_service.py b/sdk/python/feast/feature_service.py index 6bad5d79a6..46afdfff1e 100644 --- a/sdk/python/feast/feature_service.py +++ b/sdk/python/feast/feature_service.py @@ -81,7 +81,7 @@ def __str__(self): return str(MessageToJson(self.to_proto())) def __hash__(self): - return hash(self.name) + return hash((id(self), self.name)) def __eq__(self, other): if not isinstance(other, FeatureService): diff --git a/sdk/python/feast/feature_table.py b/sdk/python/feast/feature_table.py index bacf3922c8..2c1022de22 100644 --- a/sdk/python/feast/feature_table.py +++ b/sdk/python/feast/feature_table.py @@ -69,7 +69,7 @@ def __str__(self): return str(MessageToJson(self.to_proto())) def __hash__(self) -> int: - return hash(self.name) + return hash((id(self), self.name)) def __eq__(self, other): if not isinstance(other, FeatureTable): diff --git a/sdk/python/feast/feature_view.py b/sdk/python/feast/feature_view.py index 6411b6b247..02299f25f8 100644 --- a/sdk/python/feast/feature_view.py +++ b/sdk/python/feast/feature_view.py @@ -142,7 +142,7 @@ def __init__( # Note: Python requires redefining hash in child classes that override __eq__ def __hash__(self): - return hash(self.name) + return super(FeatureView, self).__hash__() def __copy__(self): fv = FeatureView( diff --git a/sdk/python/feast/registry.py b/sdk/python/feast/registry.py index f5df8c8741..4da8571cf4 100644 --- a/sdk/python/feast/registry.py +++ b/sdk/python/feast/registry.py @@ -115,7 +115,7 @@ def __init__( or where it will be created if it does not exist yet. """ - if registry_config and repo_path: + if registry_config: registry_store_type = registry_config.registry_store_type registry_path = registry_config.path if registry_store_type is None: @@ -135,7 +135,8 @@ def from_proto(cls, regsitry_proto: RegistryProto): registry = cls(None, None) registry.cached_registry_proto = regsitry_proto registry.cached_registry_proto_created = datetime.utcnow() - registry.cached_registry_proto_ttl = timedelta() + registry.cached_registry_proto_ttl = timedelta(days=1) + registry.cache_being_updated = True return registry # TODO(achals): This method needs to be filled out and used in the feast plan/apply methods. @@ -801,6 +802,7 @@ def _get_registry_proto(self, allow_cache: bool = False) -> RegistryProto: > (self.cached_registry_proto_created + self.cached_registry_proto_ttl) ) ) + if allow_cache and (not expired or self.cache_being_updated): assert isinstance(self.cached_registry_proto, RegistryProto) return self.cached_registry_proto From 1507e95f847fa62a19f1d03527444b8aa78a2134 Mon Sep 17 00:00:00 2001 From: Achal Shah <achals@gmail.com> Date: Tue, 14 Dec 2021 11:31:59 -0800 Subject: [PATCH 3/7] Fixes Signed-off-by: Achal Shah <achals@gmail.com> --- sdk/python/feast/feature_view.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/python/feast/feature_view.py b/sdk/python/feast/feature_view.py index 02299f25f8..ee22ae1266 100644 --- a/sdk/python/feast/feature_view.py +++ b/sdk/python/feast/feature_view.py @@ -142,7 +142,7 @@ def __init__( # Note: Python requires redefining hash in child classes that override __eq__ def __hash__(self): - return super(FeatureView, self).__hash__() + return super().__hash__() def __copy__(self): fv = FeatureView( From 0d18cb14ed41642f9651e9949633723a6bbcbbef Mon Sep 17 00:00:00 2001 From: Achal Shah <achals@gmail.com> Date: Tue, 14 Dec 2021 13:30:08 -0800 Subject: [PATCH 4/7] True Fixes Signed-off-by: Achal Shah <achals@gmail.com> --- sdk/python/feast/diff/FcoDiff.py | 118 ++++++++---------------------- sdk/python/feast/feature_store.py | 11 +-- 2 files changed, 35 insertions(+), 94 deletions(-) diff --git a/sdk/python/feast/diff/FcoDiff.py b/sdk/python/feast/diff/FcoDiff.py index 33d3223b98..283b3f3dd9 100644 --- a/sdk/python/feast/diff/FcoDiff.py +++ b/sdk/python/feast/diff/FcoDiff.py @@ -46,27 +46,16 @@ def add_fco_diff(self, fco_diff: FcoDiff): def _tag_registry_entities_for_keep_delete( existing_entities: Set[Entity], desired_entities: Set[Entity] ) -> Tuple[Set[Entity], Set[Entity], Set[Entity]]: - desired_entity_names = {e.name: e for e in desired_entities} - existing_entity_names = {e.name: e for e in existing_entities} - - entities_to_add = set( - [ - desired_entity_names[name] - for name in desired_entity_names.keys() - existing_entity_names.keys() - ] - ) - entities_to_delete = set( - [ - existing_entity_names[name] - for name in existing_entity_names.keys() - desired_entity_names.keys() - ] - ) - entities_to_keep = set( - [ - desired_entity_names[name] - for name in desired_entity_names.keys() & existing_entity_names.keys() - ] - ) + existing_entity_names = {e.name for e in existing_entities} + desired_entity_names = {e.name for e in desired_entities} + + entities_to_add = { + e for e in desired_entities if e.name not in existing_entity_names + } + entities_to_keep = {e for e in desired_entities if e.name in existing_entity_names} + entities_to_delete = { + e for e in existing_entities if e.name not in desired_entity_names + } return entities_to_keep, entities_to_delete, entities_to_add @@ -83,83 +72,38 @@ def _tag_registry_views_for_keep_delete( Set[Union[FeatureView, RequestFeatureView, OnDemandFeatureView]], Set[Union[FeatureView, RequestFeatureView, OnDemandFeatureView]], ]: + existing_view_names = {v.name for v in existing_views} + desired_view_names = {v.name for v in desired_views} - existing_views_by_name = {v.name: v for v in existing_views} - desired_views_by_name = {v.name: v for v in desired_views} - - views_to_add = set( - [ - desired_views_by_name[name] - for name in desired_views_by_name.keys() - existing_views_by_name.keys() - ] - ) - views_to_delete = set( - [ - existing_views_by_name[name] - for name in existing_views_by_name.keys() - desired_views_by_name.keys() - ] - ) - views_to_keep = set( - [ - desired_views_by_name[name] - for name in desired_views_by_name.keys() & existing_views_by_name.keys() - ] - ) + views_to_add = {v for v in desired_views if v.name not in existing_view_names} + views_to_keep = {v for v in desired_views if v.name in existing_view_names} + views_to_delete = {v for v in existing_views if v.name not in desired_view_names} return views_to_keep, views_to_delete, views_to_add def _tag_registry_tables_for_keep_delete( existing_tables: Set[FeatureTable], desired_tables: Set[FeatureTable] ) -> Tuple[Set[FeatureTable], Set[FeatureTable], Set[FeatureTable]]: - existing_tables_by_name = {v.name: v for v in existing_tables} - desired_tables_by_name = {v.name: v for v in desired_tables} - - tables_to_add = set( - [ - desired_tables_by_name[name] - for name in desired_tables_by_name.keys() - existing_tables_by_name.keys() - ] - ) - tables_to_delete = set( - [ - existing_tables_by_name[name] - for name in existing_tables_by_name.keys() - desired_tables_by_name.keys() - ] - ) - tables_to_keep = set( - [ - desired_tables_by_name[name] - for name in desired_tables_by_name.keys() & existing_tables_by_name.keys() - ] - ) + existing_table_names = {v.name for v in existing_tables} + desired_table_names = {v.name for v in desired_tables} + + tables_to_add = {t for t in desired_tables if t.name not in existing_table_names} + tables_to_keep = {t for t in desired_tables if t.name in existing_table_names} + tables_to_delete = {t for t in existing_tables if t.name not in desired_table_names} return tables_to_keep, tables_to_delete, tables_to_add def _tag_registry_services_for_keep_delete( existing_service: Set[FeatureService], desired_service: Set[FeatureService] ) -> Tuple[Set[FeatureService], Set[FeatureService], Set[FeatureService]]: - existing_services_by_name = {v.name: v for v in existing_service} - desired_services_by_name = {v.name: v for v in desired_service} - - services_to_add = set( - [ - desired_services_by_name[name] - for name in desired_services_by_name.keys() - - existing_services_by_name.keys() - ] - ) - services_to_delete = set( - [ - existing_services_by_name[name] - for name in existing_services_by_name.keys() - - desired_services_by_name.keys() - ] - ) - services_to_keep = set( - [ - desired_services_by_name[name] - for name in desired_services_by_name.keys() - & existing_services_by_name.keys() - ] - ) + existing_service_names = {v.name for v in existing_service} + desired_service_names = {v.name for v in desired_service} + + services_to_add = { + s for s in desired_service if s.name not in existing_service_names + } + services_to_delete = { + s for s in existing_service if s.name not in desired_service_names + } + services_to_keep = {s for s in desired_service if s.name in existing_service_names} return services_to_keep, services_to_delete, services_to_add diff --git a/sdk/python/feast/feature_store.py b/sdk/python/feast/feature_store.py index 84e8437ef5..7ab7817d61 100644 --- a/sdk/python/feast/feature_store.py +++ b/sdk/python/feast/feature_store.py @@ -38,6 +38,7 @@ from feast.feature_service import FeatureService from feast.feature_table import FeatureTable from feast.feature_view import ( + DUMMY_ENTITY, DUMMY_ENTITY_ID, DUMMY_ENTITY_NAME, DUMMY_ENTITY_VAL, @@ -61,7 +62,6 @@ from feast.request_feature_view import RequestFeatureView from feast.type_map import python_value_to_proto_value from feast.usage import log_exceptions, log_exceptions_and_usage, set_usage_attribute -from feast.value_type import ValueType from feast.version import get_version warnings.simplefilter("once", DeprecationWarning) @@ -489,11 +489,6 @@ def apply( odfv.infer_features() # Handle all entityless feature views by using DUMMY_ENTITY as a placeholder entity. - DUMMY_ENTITY = Entity( - name=DUMMY_ENTITY_NAME, - join_key=DUMMY_ENTITY_ID, - value_type=ValueType.INT32, - ) entities_to_update.append(DUMMY_ENTITY) # Add all objects to the registry and update the provider's infrastructure. @@ -1565,7 +1560,9 @@ def _validate_feature_views(feature_views: List[BaseFeatureView]): case_insensitive_fv_name = fv.name.lower() if case_insensitive_fv_name in fv_names: raise ValueError( - f"More than one feature view with name {case_insensitive_fv_name} found. Please ensure that all feature view names are case-insensitively unique. It may be necessary to ignore certain files in your feature repository by using a .feastignore file." + f"More than one feature view with name {case_insensitive_fv_name} found. " + f"Please ensure that all feature view names are case-insensitively unique. " + f"It may be necessary to ignore certain files in your feature repository by using a .feastignore file." ) else: fv_names.add(case_insensitive_fv_name) From 1b6bdf19cf4bd1332862300c6ed6d21e65f7dd07 Mon Sep 17 00:00:00 2001 From: Achal Shah <achals@gmail.com> Date: Tue, 14 Dec 2021 15:05:47 -0800 Subject: [PATCH 5/7] Use the same tag method Signed-off-by: Achal Shah <achals@gmail.com> --- sdk/python/feast/diff/FcoDiff.py | 78 +++++++---------------------- sdk/python/feast/registry.py | 4 +- sdk/python/feast/repo_operations.py | 19 +++---- 3 files changed, 26 insertions(+), 75 deletions(-) diff --git a/sdk/python/feast/diff/FcoDiff.py b/sdk/python/feast/diff/FcoDiff.py index 283b3f3dd9..27be1f8990 100644 --- a/sdk/python/feast/diff/FcoDiff.py +++ b/sdk/python/feast/diff/FcoDiff.py @@ -1,7 +1,9 @@ from dataclasses import dataclass from enum import Enum -from typing import Any, List, Set, Tuple, Union +from typing import Any, List, Set, Tuple, Union, TypeVar +from unittest.mock import Base +from feast.base_feature_view import BaseFeatureView from feast.entity import Entity from feast.feature_service import FeatureService from feast.feature_table import FeatureTable @@ -43,67 +45,21 @@ def add_fco_diff(self, fco_diff: FcoDiff): self.fco_diffs.append(fco_diff) -def _tag_registry_entities_for_keep_delete( - existing_entities: Set[Entity], desired_entities: Set[Entity] -) -> Tuple[Set[Entity], Set[Entity], Set[Entity]]: - existing_entity_names = {e.name for e in existing_entities} - desired_entity_names = {e.name for e in desired_entities} +T = TypeVar('T', Entity, BaseFeatureView, FeatureService, FeatureTable) - entities_to_add = { - e for e in desired_entities if e.name not in existing_entity_names - } - entities_to_keep = {e for e in desired_entities if e.name in existing_entity_names} - entities_to_delete = { - e for e in existing_entities if e.name not in desired_entity_names - } - return entities_to_keep, entities_to_delete, entities_to_add - - -def _tag_registry_views_for_keep_delete( - existing_views: Union[ - Set[FeatureView], Set[RequestFeatureView], Set[OnDemandFeatureView] - ], - desired_views: Union[ - Set[FeatureView], Set[RequestFeatureView], Set[OnDemandFeatureView] - ], -) -> Tuple[ - Set[Union[FeatureView, RequestFeatureView, OnDemandFeatureView]], - Set[Union[FeatureView, RequestFeatureView, OnDemandFeatureView]], - Set[Union[FeatureView, RequestFeatureView, OnDemandFeatureView]], -]: - existing_view_names = {v.name for v in existing_views} - desired_view_names = {v.name for v in desired_views} - - views_to_add = {v for v in desired_views if v.name not in existing_view_names} - views_to_keep = {v for v in desired_views if v.name in existing_view_names} - views_to_delete = {v for v in existing_views if v.name not in desired_view_names} - return views_to_keep, views_to_delete, views_to_add - - -def _tag_registry_tables_for_keep_delete( - existing_tables: Set[FeatureTable], desired_tables: Set[FeatureTable] -) -> Tuple[Set[FeatureTable], Set[FeatureTable], Set[FeatureTable]]: - existing_table_names = {v.name for v in existing_tables} - desired_table_names = {v.name for v in desired_tables} - - tables_to_add = {t for t in desired_tables if t.name not in existing_table_names} - tables_to_keep = {t for t in desired_tables if t.name in existing_table_names} - tables_to_delete = {t for t in existing_tables if t.name not in desired_table_names} - return tables_to_keep, tables_to_delete, tables_to_add - - -def _tag_registry_services_for_keep_delete( - existing_service: Set[FeatureService], desired_service: Set[FeatureService] -) -> Tuple[Set[FeatureService], Set[FeatureService], Set[FeatureService]]: - existing_service_names = {v.name for v in existing_service} - desired_service_names = {v.name for v in desired_service} - - services_to_add = { - s for s in desired_service if s.name not in existing_service_names +def tag_objects_for_keep_delete_add( + existing_objs: Set[T], desired_objs: Set[T] +) -> Tuple[Set[T], Set[T], Set[T]]: + existing_obj_names = {e.name for e in existing_objs} + desired_obj_names = {e.name for e in desired_objs} + + objs_to_add = { + e for e in desired_objs if e.name not in existing_obj_names } - services_to_delete = { - s for s in existing_service if s.name not in desired_service_names + objs_to_keep = {e for e in desired_objs if e.name in existing_obj_names} + objs_to_delete = { + e for e in existing_objs if e.name not in desired_obj_names } - services_to_keep = {s for s in desired_service if s.name in existing_service_names} - return services_to_keep, services_to_delete, services_to_add + + return objs_to_keep, objs_to_delete, objs_to_add diff --git a/sdk/python/feast/registry.py b/sdk/python/feast/registry.py index 4da8571cf4..2351094544 100644 --- a/sdk/python/feast/registry.py +++ b/sdk/python/feast/registry.py @@ -28,7 +28,7 @@ FcoDiff, RegistryDiff, TransitionType, - _tag_registry_entities_for_keep_delete, + tag_objects_for_keep_delete_add, ) from feast.entity import Entity from feast.errors import ( @@ -151,7 +151,7 @@ def diff_between( entities_to_keep, entities_to_delete, entities_to_add, - ) = _tag_registry_entities_for_keep_delete( + ) = tag_objects_for_keep_delete_add( set(current_registry.list_entities(project=project, allow_cache=True)), set(new_registry.list_entities(project=project, allow_cache=True)), ) diff --git a/sdk/python/feast/repo_operations.py b/sdk/python/feast/repo_operations.py index 9330fc21ba..780c5df9a6 100644 --- a/sdk/python/feast/repo_operations.py +++ b/sdk/python/feast/repo_operations.py @@ -12,12 +12,7 @@ from click.exceptions import BadParameter from feast.base_feature_view import BaseFeatureView -from feast.diff.FcoDiff import ( - _tag_registry_entities_for_keep_delete, - _tag_registry_services_for_keep_delete, - _tag_registry_tables_for_keep_delete, - _tag_registry_views_for_keep_delete, -) +from feast.diff.FcoDiff import tag_objects_for_keep_delete_add from feast.entity import Entity from feast.feature_service import FeatureService from feast.feature_store import FeatureStore @@ -162,13 +157,13 @@ def apply_total(repo_config: RepoConfig, repo_path: Path, skip_source_validation entities_to_keep, entities_to_delete, entities_to_add, - ) = _tag_registry_entities_for_keep_delete( + ) = tag_objects_for_keep_delete_add( set(registry.list_entities(project=project)), repo.entities ) # TODO(achals): This code path should be refactored to handle added & kept entities separately. entities_to_keep = entities_to_keep.union(entities_to_add) - views = _tag_registry_views_for_keep_delete( + views = tag_objects_for_keep_delete_add( set(registry.list_feature_views(project=project)), repo.feature_views ) views_to_keep, views_to_delete, views_to_add = ( @@ -177,7 +172,7 @@ def apply_total(repo_config: RepoConfig, repo_path: Path, skip_source_validation cast(Set[FeatureView], views[2]), ) - request_views = _tag_registry_views_for_keep_delete( + request_views = tag_objects_for_keep_delete_add( set(registry.list_request_feature_views(project=project)), repo.request_feature_views, ) @@ -201,7 +196,7 @@ def apply_total(repo_config: RepoConfig, repo_path: Path, skip_source_validation *request_views_to_delete, } - odfvs = _tag_registry_views_for_keep_delete( + odfvs = tag_objects_for_keep_delete_add( set(registry.list_on_demand_feature_views(project=project)), repo.on_demand_feature_views, ) @@ -216,7 +211,7 @@ def apply_total(repo_config: RepoConfig, repo_path: Path, skip_source_validation tables_to_keep, tables_to_delete, tables_to_add, - ) = _tag_registry_tables_for_keep_delete( + ) = tag_objects_for_keep_delete_add( set(registry.list_feature_tables(project=project)), repo.feature_tables ) tables_to_keep = tables_to_keep.union(tables_to_add) @@ -225,7 +220,7 @@ def apply_total(repo_config: RepoConfig, repo_path: Path, skip_source_validation services_to_keep, services_to_delete, services_to_add, - ) = _tag_registry_services_for_keep_delete( + ) = tag_objects_for_keep_delete_add( set(registry.list_feature_services(project=project)), repo.feature_services ) services_to_keep = services_to_keep.union(services_to_add) From d8ea76facdae3725458c4091fef2b550b7de7585 Mon Sep 17 00:00:00 2001 From: Achal Shah <achals@gmail.com> Date: Tue, 14 Dec 2021 15:23:17 -0800 Subject: [PATCH 6/7] CR updates Signed-off-by: Achal Shah <achals@gmail.com> --- sdk/python/feast/diff/FcoDiff.py | 29 ++++++++++++++++++++++++++--- sdk/python/feast/registry.py | 19 +++++-------------- 2 files changed, 31 insertions(+), 17 deletions(-) diff --git a/sdk/python/feast/diff/FcoDiff.py b/sdk/python/feast/diff/FcoDiff.py index 27be1f8990..de1d93e648 100644 --- a/sdk/python/feast/diff/FcoDiff.py +++ b/sdk/python/feast/diff/FcoDiff.py @@ -1,6 +1,6 @@ from dataclasses import dataclass from enum import Enum -from typing import Any, List, Set, Tuple, Union, TypeVar +from typing import Any, List, Set, Tuple, Union, TypeVar, Iterable from unittest.mock import Base from feast.base_feature_view import BaseFeatureView @@ -9,6 +9,8 @@ from feast.feature_table import FeatureTable from feast.feature_view import FeatureView from feast.on_demand_feature_view import OnDemandFeatureView +from feast.protos.feast.core.Entity_pb2 import Entity as EntityProto +from feast.protos.feast.core.FeatureView_pb2 import FeatureView as FeatureViewProto from feast.request_feature_view import RequestFeatureView @@ -35,6 +37,7 @@ class FcoDiff: transition_type: TransitionType +@dataclass class RegistryDiff: fco_diffs: List[FcoDiff] @@ -49,8 +52,8 @@ def add_fco_diff(self, fco_diff: FcoDiff): def tag_objects_for_keep_delete_add( - existing_objs: Set[T], desired_objs: Set[T] -) -> Tuple[Set[T], Set[T], Set[T]]: + existing_objs: Iterable[T], desired_objs: Iterable[T] +) -> Tuple[Iterable[T], Iterable[T], Iterable[T]]: existing_obj_names = {e.name for e in existing_objs} desired_obj_names = {e.name for e in desired_objs} @@ -63,3 +66,23 @@ def tag_objects_for_keep_delete_add( } return objs_to_keep, objs_to_delete, objs_to_add + + +U = TypeVar('U', EntityProto, FeatureViewProto) + + +def tag_proto_objects_for_keep_delete_add( + existing_objs: Iterable[U], desired_objs: Iterable[U] +) -> Tuple[Iterable[U], Iterable[U], Iterable[U]]: + existing_obj_names = {e.spec.name for e in existing_objs} + desired_obj_names = {e.spec.name for e in desired_objs} + + objs_to_add = [ + e for e in desired_objs if e.spec.name not in existing_obj_names + ] + objs_to_keep = [e for e in desired_objs if e.spec.name in existing_obj_names] + objs_to_delete = [ + e for e in existing_objs if e.spec.name not in desired_obj_names + ] + + return objs_to_keep, objs_to_delete, objs_to_add diff --git a/sdk/python/feast/registry.py b/sdk/python/feast/registry.py index 2351094544..01c4980d42 100644 --- a/sdk/python/feast/registry.py +++ b/sdk/python/feast/registry.py @@ -28,7 +28,7 @@ FcoDiff, RegistryDiff, TransitionType, - tag_objects_for_keep_delete_add, + tag_proto_objects_for_keep_delete_add, ) from feast.entity import Entity from feast.errors import ( @@ -130,19 +130,10 @@ def __init__( else 0 ) - @classmethod - def from_proto(cls, regsitry_proto: RegistryProto): - registry = cls(None, None) - registry.cached_registry_proto = regsitry_proto - registry.cached_registry_proto_created = datetime.utcnow() - registry.cached_registry_proto_ttl = timedelta(days=1) - registry.cache_being_updated = True - return registry - # TODO(achals): This method needs to be filled out and used in the feast plan/apply methods. @staticmethod def diff_between( - project: str, current_registry: "Registry", new_registry: "Registry" + current_registry: RegistryProto, new_registry: RegistryProto ) -> RegistryDiff: diff = RegistryDiff() @@ -151,9 +142,9 @@ def diff_between( entities_to_keep, entities_to_delete, entities_to_add, - ) = tag_objects_for_keep_delete_add( - set(current_registry.list_entities(project=project, allow_cache=True)), - set(new_registry.list_entities(project=project, allow_cache=True)), + ) = tag_proto_objects_for_keep_delete_add( + current_registry.entities, + new_registry.entities, ) for e in entities_to_add: From 2de2d29dea8dd1785e55f337520751022187f1f8 Mon Sep 17 00:00:00 2001 From: Achal Shah <achals@gmail.com> Date: Tue, 14 Dec 2021 15:59:34 -0800 Subject: [PATCH 7/7] CR updates Signed-off-by: Achal Shah <achals@gmail.com> --- sdk/python/feast/diff/FcoDiff.py | 30 +++++++++-------------------- sdk/python/feast/registry.py | 3 +-- sdk/python/feast/repo_operations.py | 2 +- 3 files changed, 11 insertions(+), 24 deletions(-) diff --git a/sdk/python/feast/diff/FcoDiff.py b/sdk/python/feast/diff/FcoDiff.py index de1d93e648..bb466c33e6 100644 --- a/sdk/python/feast/diff/FcoDiff.py +++ b/sdk/python/feast/diff/FcoDiff.py @@ -1,17 +1,13 @@ from dataclasses import dataclass from enum import Enum -from typing import Any, List, Set, Tuple, Union, TypeVar, Iterable -from unittest.mock import Base +from typing import Any, Iterable, List, Set, Tuple, TypeVar from feast.base_feature_view import BaseFeatureView from feast.entity import Entity from feast.feature_service import FeatureService from feast.feature_table import FeatureTable -from feast.feature_view import FeatureView -from feast.on_demand_feature_view import OnDemandFeatureView from feast.protos.feast.core.Entity_pb2 import Entity as EntityProto from feast.protos.feast.core.FeatureView_pb2 import FeatureView as FeatureViewProto -from feast.request_feature_view import RequestFeatureView @dataclass @@ -48,41 +44,33 @@ def add_fco_diff(self, fco_diff: FcoDiff): self.fco_diffs.append(fco_diff) -T = TypeVar('T', Entity, BaseFeatureView, FeatureService, FeatureTable) +T = TypeVar("T", Entity, BaseFeatureView, FeatureService, FeatureTable) def tag_objects_for_keep_delete_add( existing_objs: Iterable[T], desired_objs: Iterable[T] -) -> Tuple[Iterable[T], Iterable[T], Iterable[T]]: +) -> Tuple[Set[T], Set[T], Set[T]]: existing_obj_names = {e.name for e in existing_objs} desired_obj_names = {e.name for e in desired_objs} - objs_to_add = { - e for e in desired_objs if e.name not in existing_obj_names - } + objs_to_add = {e for e in desired_objs if e.name not in existing_obj_names} objs_to_keep = {e for e in desired_objs if e.name in existing_obj_names} - objs_to_delete = { - e for e in existing_objs if e.name not in desired_obj_names - } + objs_to_delete = {e for e in existing_objs if e.name not in desired_obj_names} return objs_to_keep, objs_to_delete, objs_to_add -U = TypeVar('U', EntityProto, FeatureViewProto) +U = TypeVar("U", EntityProto, FeatureViewProto) def tag_proto_objects_for_keep_delete_add( - existing_objs: Iterable[U], desired_objs: Iterable[U] + existing_objs: Iterable[U], desired_objs: Iterable[U] ) -> Tuple[Iterable[U], Iterable[U], Iterable[U]]: existing_obj_names = {e.spec.name for e in existing_objs} desired_obj_names = {e.spec.name for e in desired_objs} - objs_to_add = [ - e for e in desired_objs if e.spec.name not in existing_obj_names - ] + objs_to_add = [e for e in desired_objs if e.spec.name not in existing_obj_names] objs_to_keep = [e for e in desired_objs if e.spec.name in existing_obj_names] - objs_to_delete = [ - e for e in existing_objs if e.spec.name not in desired_obj_names - ] + objs_to_delete = [e for e in existing_objs if e.spec.name not in desired_obj_names] return objs_to_keep, objs_to_delete, objs_to_add diff --git a/sdk/python/feast/registry.py b/sdk/python/feast/registry.py index 01c4980d42..3a54568d45 100644 --- a/sdk/python/feast/registry.py +++ b/sdk/python/feast/registry.py @@ -143,8 +143,7 @@ def diff_between( entities_to_delete, entities_to_add, ) = tag_proto_objects_for_keep_delete_add( - current_registry.entities, - new_registry.entities, + current_registry.entities, new_registry.entities, ) for e in entities_to_add: diff --git a/sdk/python/feast/repo_operations.py b/sdk/python/feast/repo_operations.py index 780c5df9a6..ef0953feb2 100644 --- a/sdk/python/feast/repo_operations.py +++ b/sdk/python/feast/repo_operations.py @@ -161,7 +161,7 @@ def apply_total(repo_config: RepoConfig, repo_path: Path, skip_source_validation set(registry.list_entities(project=project)), repo.entities ) # TODO(achals): This code path should be refactored to handle added & kept entities separately. - entities_to_keep = entities_to_keep.union(entities_to_add) + entities_to_keep = set(entities_to_keep).union(entities_to_add) views = tag_objects_for_keep_delete_add( set(registry.list_feature_views(project=project)), repo.feature_views