-
Notifications
You must be signed in to change notification settings - Fork 3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(ingest/snowflake): allow option for incremental properties #12080
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,69 @@ | ||
import logging | ||
from typing import Iterable, Optional | ||
|
||
from pydantic.fields import Field | ||
|
||
from datahub.configuration.common import ConfigModel | ||
from datahub.emitter.mce_builder import set_aspect | ||
from datahub.emitter.mcp import MetadataChangeProposalWrapper | ||
from datahub.ingestion.api.source_helpers import create_dataset_props_patch_builder | ||
from datahub.ingestion.api.workunit import MetadataWorkUnit | ||
from datahub.metadata.schema_classes import ( | ||
DatasetPropertiesClass, | ||
MetadataChangeEventClass, | ||
SystemMetadataClass, | ||
) | ||
|
||
logger = logging.getLogger(__name__) | ||
|
||
|
||
def convert_dataset_properties_to_patch( | ||
urn: str, | ||
aspect: DatasetPropertiesClass, | ||
system_metadata: Optional[SystemMetadataClass], | ||
) -> MetadataWorkUnit: | ||
patch_builder = create_dataset_props_patch_builder(urn, aspect, system_metadata) | ||
mcp = next(iter(patch_builder.build())) | ||
return MetadataWorkUnit(id=MetadataWorkUnit.generate_workunit_id(mcp), mcp_raw=mcp) | ||
|
||
|
||
def auto_incremental_properties( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This seems to convert the entire aspect to a PATCH. For now, can we just patch the custom properties, to limit surface area? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't think it would work as expected if we emit rest of aspect as a whole and only custom properties as patch. Emitting the whole aspect will overwrite any parallel edits made to datasetProperties. |
||
incremental_properties: bool, | ||
stream: Iterable[MetadataWorkUnit], | ||
) -> Iterable[MetadataWorkUnit]: | ||
if not incremental_properties: | ||
yield from stream | ||
return # early exit | ||
|
||
for wu in stream: | ||
urn = wu.get_urn() | ||
|
||
if isinstance(wu.metadata, MetadataChangeEventClass): | ||
properties_aspect = wu.get_aspect_of_type(DatasetPropertiesClass) | ||
set_aspect(wu.metadata, None, DatasetPropertiesClass) | ||
if len(wu.metadata.proposedSnapshot.aspects) > 0: | ||
yield wu | ||
|
||
if properties_aspect: | ||
yield convert_dataset_properties_to_patch( | ||
urn, properties_aspect, wu.metadata.systemMetadata | ||
) | ||
elif isinstance(wu.metadata, MetadataChangeProposalWrapper) and isinstance( | ||
wu.metadata.aspect, DatasetPropertiesClass | ||
): | ||
properties_aspect = wu.metadata.aspect | ||
if properties_aspect: | ||
yield convert_dataset_properties_to_patch( | ||
urn, properties_aspect, wu.metadata.systemMetadata | ||
) | ||
else: | ||
yield wu | ||
|
||
|
||
# TODO: Use this in SQLCommonConfig. Currently only used in snowflake | ||
class IncrementalPropertiesConfigMixin(ConfigModel): | ||
incremental_properties: bool = Field( | ||
default=False, | ||
description="When enabled, emits dataset properties as incremental to existing dataset properties " | ||
"in DataHub. When disabled, re-states dataset properties on each run.", | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there any plan or intention to make this generic to other entities beyond Dataset?
Current implementation is specific for
DatasetProperties
aspect, however the namings (ingremental_properties_helper.py
,IncrementalPropertiesConfigMixin
, ...) suggest this could apply for the properties of any entity (Container, ...).There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's a very good question and I did consider it myself.
I was hoping, we can reuse this for rest of entity types as well in future.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If keeping generic is the plan, we may provide a generic flag description too, by mentioning entities instead of dataset.
Also, we may add some conditional logic when fetching the
DatasetPropertiesClass
instead of assuming it's a dataset entity; for the moment, we can skip processing other entity types. WDYT?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
DatasetPropertiesClass is only present for entity dataset. For other entities, it would be absent so would return None. So its safe.
I would update description once this config and workunit processor is adopted for other entities. Also planning a quick followup with addressing TODOs in this PR. Will make changes in there.