Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use set when parsing repos to prevent duplicates #1913

Merged
merged 6 commits into from
Sep 30, 2021
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 2 additions & 5 deletions sdk/python/feast/entity.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,13 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import inspect
from datetime import datetime
from typing import Dict, Optional

import yaml
from google.protobuf import json_format
from google.protobuf.json_format import MessageToDict, MessageToJson

from feast.importer import get_calling_file_name
from feast.loaders import yaml as feast_yaml
from feast.protos.feast.core.Entity_pb2 import Entity as EntityV2Proto
from feast.protos.feast.core.Entity_pb2 import EntityMeta as EntityMetaProto
Expand Down Expand Up @@ -50,8 +48,6 @@ class Entity:
_created_timestamp: Optional[datetime]
_last_updated_timestamp: Optional[datetime]

defined_in: str

@log_exceptions
def __init__(
self,
Expand All @@ -78,7 +74,8 @@ def __init__(
self._created_timestamp: Optional[datetime] = None
self._last_updated_timestamp: Optional[datetime] = None

self.defined_in = get_calling_file_name(inspect.stack())
def __hash__(self) -> int:
return hash(self.name)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How would we detect and raise an exception when users provide two objects with the same name that are different?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My earlier comments completely misunderstood your point -fixing using id().


def __eq__(self, other):
if not isinstance(other, Entity):
Expand Down
6 changes: 0 additions & 6 deletions sdk/python/feast/feature_service.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import inspect
from datetime import datetime
from typing import Dict, List, Optional, Union

Expand All @@ -7,7 +6,6 @@
from feast.feature_table import FeatureTable
from feast.feature_view import FeatureView
from feast.feature_view_projection import FeatureViewProjection
from feast.importer import get_calling_file_name
from feast.on_demand_feature_view import OnDemandFeatureView
from feast.protos.feast.core.FeatureService_pb2 import (
FeatureService as FeatureServiceProto,
Expand Down Expand Up @@ -39,8 +37,6 @@ class FeatureService:
created_timestamp: Optional[datetime] = None
last_updated_timestamp: Optional[datetime] = None

defined_in: str

@log_exceptions
def __init__(
self,
Expand Down Expand Up @@ -75,8 +71,6 @@ def __init__(
self.created_timestamp = None
self.last_updated_timestamp = None

self.defined_in = get_calling_file_name(inspect.stack())

def __repr__(self):
items = (f"{k} = {v}" for k, v in self.__dict__.items())
return f"<{self.__class__.__name__}({', '.join(items)})>"
Expand Down
4 changes: 0 additions & 4 deletions sdk/python/feast/feature_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import inspect
from typing import Dict, List, MutableMapping, Optional, Union

import yaml
Expand All @@ -22,7 +21,6 @@

from feast.data_source import DataSource, KafkaSource, KinesisSource
from feast.feature import Feature
from feast.importer import get_calling_file_name
from feast.loaders import yaml as feast_yaml
from feast.protos.feast.core.FeatureTable_pb2 import FeatureTable as FeatureTableProto
from feast.protos.feast.core.FeatureTable_pb2 import (
Expand Down Expand Up @@ -67,8 +65,6 @@ def __init__(
self._created_timestamp: Optional[Timestamp] = None
self._last_updated_timestamp: Optional[Timestamp] = None

self.defined_in = get_calling_file_name(inspect.stack())

def __str__(self):
return str(MessageToJson(self.to_proto()))

Expand Down
6 changes: 0 additions & 6 deletions sdk/python/feast/feature_view.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import inspect
import re
import warnings
from datetime import datetime, timedelta
Expand All @@ -25,7 +24,6 @@
from feast.errors import RegistryInferenceFailure
from feast.feature import Feature
from feast.feature_view_projection import FeatureViewProjection
from feast.importer import get_calling_file_name
from feast.protos.feast.core.FeatureView_pb2 import FeatureView as FeatureViewProto
from feast.protos.feast.core.FeatureView_pb2 import (
FeatureViewMeta as FeatureViewMetaProto,
Expand Down Expand Up @@ -81,8 +79,6 @@ class FeatureView:
last_updated_timestamp: Optional[datetime] = None
materialization_intervals: List[Tuple[datetime, datetime]]

defined_in: str

@log_exceptions
def __init__(
self,
Expand Down Expand Up @@ -145,8 +141,6 @@ def __init__(
self.created_timestamp: Optional[datetime] = None
self.last_updated_timestamp: Optional[datetime] = None

self.defined_in = get_calling_file_name(inspect.stack())

def __repr__(self):
items = (f"{k} = {v}" for k, v in self.__dict__.items())
return f"<{self.__class__.__name__}({', '.join(items)})>"
Expand Down
8 changes: 0 additions & 8 deletions sdk/python/feast/importer.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,6 @@
from feast import errors


def get_calling_file_name(stack) -> str:
# Get two levels up from current, to ignore usage.py
previous_stack_frame = stack[1]
if "feast/usage.py" in previous_stack_frame.filename:
previous_stack_frame = stack[2]
return previous_stack_frame.filename


def get_class_from_type(module_name: str, class_name: str, class_type: str):
if not class_name.endswith(class_type):
raise errors.FeastClassInvalidName(class_name, class_type)
Expand Down
6 changes: 0 additions & 6 deletions sdk/python/feast/on_demand_feature_view.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import functools
import inspect
from types import MethodType
from typing import Dict, List, Union, cast

Expand All @@ -12,7 +11,6 @@
from feast.feature import Feature
from feast.feature_view import FeatureView
from feast.feature_view_projection import FeatureViewProjection
from feast.importer import get_calling_file_name
from feast.protos.feast.core.OnDemandFeatureView_pb2 import (
OnDemandFeatureView as OnDemandFeatureViewProto,
)
Expand Down Expand Up @@ -48,8 +46,6 @@ class OnDemandFeatureView:
inputs: Dict[str, Union[FeatureView, RequestDataSource]]
udf: MethodType

defined_in: str

@log_exceptions
def __init__(
self,
Expand All @@ -67,8 +63,6 @@ def __init__(
self.inputs = inputs
self.udf = udf

self.defined_in = get_calling_file_name(inspect.stack())

def to_proto(self) -> OnDemandFeatureViewProto:
"""
Converts an on demand feature view object to its protobuf representation.
Expand Down
86 changes: 38 additions & 48 deletions sdk/python/feast/repo_operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,11 @@ def py_path_to_module(path: Path, repo_root: Path) -> str:


class ParsedRepo(NamedTuple):
feature_tables: List[FeatureTable]
feature_views: List[FeatureView]
on_demand_feature_views: List[OnDemandFeatureView]
entities: List[Entity]
feature_services: List[FeatureService]
feature_tables: Set[FeatureTable]
feature_views: Set[FeatureView]
on_demand_feature_views: Set[OnDemandFeatureView]
entities: Set[Entity]
feature_services: Set[FeatureService]


def read_feastignore(repo_root: Path) -> List[str]:
Expand Down Expand Up @@ -94,11 +94,11 @@ def get_repo_files(repo_root: Path) -> List[Path]:
def parse_repo(repo_root: Path) -> ParsedRepo:
""" Collect feature table definitions from feature repo """
res = ParsedRepo(
feature_tables=[],
entities=[],
feature_views=[],
feature_services=[],
on_demand_feature_views=[],
feature_tables=set(),
entities=set(),
feature_views=set(),
feature_services=set(),
on_demand_feature_views=set(),
)

for repo_file in get_repo_files(repo_root):
Expand All @@ -107,25 +107,15 @@ def parse_repo(repo_root: Path) -> ParsedRepo:
for attr_name in dir(module):
obj = getattr(module, attr_name)
if isinstance(obj, FeatureTable):
assert obj.defined_in is not None
if obj.defined_in == module.__file__:
res.feature_tables.append(obj)
res.feature_tables.add(obj)
if isinstance(obj, FeatureView):
assert obj.defined_in is not None
if obj.defined_in == module.__file__:
res.feature_views.append(obj)
res.feature_views.add(obj)
elif isinstance(obj, Entity):
assert obj.defined_in is not None
if obj.defined_in == module.__file__:
res.entities.append(obj)
res.entities.add(obj)
elif isinstance(obj, FeatureService):
assert obj.defined_in is not None
if obj.defined_in == module.__file__:
res.feature_services.append(obj)
res.feature_services.add(obj)
elif isinstance(obj, OnDemandFeatureView):
assert obj.defined_in is not None
if obj.defined_in == module.__file__:
res.on_demand_feature_views.append(obj)
res.on_demand_feature_views.add(obj)
return res


Expand All @@ -146,7 +136,7 @@ def apply_total(repo_config: RepoConfig, repo_path: Path, skip_source_validation
registry._initialize_registry()
sys.dont_write_bytecode = True
repo = parse_repo(repo_path)
_validate_feature_views(repo.feature_views)
_validate_feature_views(list(repo.feature_views))

if not skip_source_validation:
data_sources = [t.batch_source for t in repo.feature_views]
Expand Down Expand Up @@ -259,8 +249,8 @@ def apply_total(repo_config: RepoConfig, repo_path: Path, skip_source_validation
project,
tables_to_delete=all_to_delete,
tables_to_keep=all_to_keep,
entities_to_delete=entities_to_delete,
entities_to_keep=entities_to_keep,
entities_to_delete=list(entities_to_delete),
entities_to_keep=list(entities_to_keep),
partial=False,
)

Expand All @@ -270,63 +260,63 @@ def apply_total(repo_config: RepoConfig, repo_path: Path, skip_source_validation

def _tag_registry_entities_for_keep_delete(
project: str, registry: Registry, repo: ParsedRepo
) -> Tuple[List[Entity], List[Entity]]:
entities_to_keep: List[Entity] = repo.entities
entities_to_delete: List[Entity] = []
) -> Tuple[Set[Entity], Set[Entity]]:
entities_to_keep: Set[Entity] = repo.entities
entities_to_delete: Set[Entity] = set()
repo_entities_names = set([e.name for e in repo.entities])
for registry_entity in registry.list_entities(project=project):
if registry_entity.name not in repo_entities_names:
entities_to_delete.append(registry_entity)
entities_to_delete.add(registry_entity)
return entities_to_keep, entities_to_delete


def _tag_registry_views_for_keep_delete(
project: str, registry: Registry, repo: ParsedRepo
) -> Tuple[List[FeatureView], List[FeatureView]]:
views_to_keep: List[FeatureView] = repo.feature_views
views_to_delete: List[FeatureView] = []
) -> Tuple[Set[FeatureView], Set[FeatureView]]:
views_to_keep: Set[FeatureView] = repo.feature_views
views_to_delete: Set[FeatureView] = set()
repo_feature_view_names = set(t.name for t in repo.feature_views)
for registry_view in registry.list_feature_views(project=project):
if registry_view.name not in repo_feature_view_names:
views_to_delete.append(registry_view)
views_to_delete.add(registry_view)
return views_to_keep, views_to_delete


def _tag_registry_on_demand_feature_views_for_keep_delete(
project: str, registry: Registry, repo: ParsedRepo
) -> Tuple[List[OnDemandFeatureView], List[OnDemandFeatureView]]:
odfvs_to_keep: List[OnDemandFeatureView] = repo.on_demand_feature_views
odfvs_to_delete: List[OnDemandFeatureView] = []
) -> Tuple[Set[OnDemandFeatureView], Set[OnDemandFeatureView]]:
odfvs_to_keep: Set[OnDemandFeatureView] = repo.on_demand_feature_views
odfvs_to_delete: Set[OnDemandFeatureView] = set()
repo_on_demand_feature_view_names = set(
t.name for t in repo.on_demand_feature_views
)
for registry_odfv in registry.list_on_demand_feature_views(project=project):
if registry_odfv.name not in repo_on_demand_feature_view_names:
odfvs_to_delete.append(registry_odfv)
odfvs_to_delete.add(registry_odfv)
return odfvs_to_keep, odfvs_to_delete


def _tag_registry_tables_for_keep_delete(
project: str, registry: Registry, repo: ParsedRepo
) -> Tuple[List[FeatureTable], List[FeatureTable]]:
tables_to_keep: List[FeatureTable] = repo.feature_tables
tables_to_delete: List[FeatureTable] = []
) -> Tuple[Set[FeatureTable], Set[FeatureTable]]:
tables_to_keep: Set[FeatureTable] = repo.feature_tables
tables_to_delete: Set[FeatureTable] = set()
repo_table_names = set(t.name for t in repo.feature_tables)
for registry_table in registry.list_feature_tables(project=project):
if registry_table.name not in repo_table_names:
tables_to_delete.append(registry_table)
tables_to_delete.add(registry_table)
return tables_to_keep, tables_to_delete


def _tag_registry_services_for_keep_delete(
project: str, registry: Registry, repo: ParsedRepo
) -> Tuple[List[FeatureService], List[FeatureService]]:
services_to_keep: List[FeatureService] = repo.feature_services
services_to_delete: List[FeatureService] = []
) -> Tuple[Set[FeatureService], Set[FeatureService]]:
services_to_keep: Set[FeatureService] = repo.feature_services
services_to_delete: Set[FeatureService] = set()
repo_feature_service_names = set(t.name for t in repo.feature_services)
for registry_service in registry.list_feature_services(project=project):
if registry_service.name not in repo_feature_service_names:
services_to_delete.append(registry_service)
services_to_delete.add(registry_service)
return services_to_keep, services_to_delete


Expand Down