From b35e3db0b1ebe58aae92e1c128ad79b29597deb1 Mon Sep 17 00:00:00 2001 From: Harshal Sheth Date: Fri, 13 Sep 2024 15:20:00 -0700 Subject: [PATCH] feat(ingest/dbt): add `only_include_if_in_catalog` flag for dbt core (#11314) --- .../datahub/ingestion/source/dbt/dbt_cloud.py | 1 + .../ingestion/source/dbt/dbt_common.py | 7 ++- .../datahub/ingestion/source/dbt/dbt_core.py | 48 +++++++++++++++---- 3 files changed, 45 insertions(+), 11 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_cloud.py b/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_cloud.py index 0672b9ce6f781c..1866599fa21c67 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_cloud.py +++ b/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_cloud.py @@ -472,6 +472,7 @@ def _parse_into_dbt_node(self, node: Dict) -> DBTNode: upstream_nodes=upstream_nodes, materialization=materialization, catalog_type=catalog_type, + missing_from_catalog=False, # This doesn't really apply to dbt Cloud. meta=meta, query_tag={}, # TODO: Get this from the dbt API. tags=tags, diff --git a/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_common.py b/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_common.py index 8d67551b9e1f2f..12812aad441f23 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_common.py +++ b/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_common.py @@ -154,7 +154,7 @@ class DBTSourceReport(StaleEntityRemovalSourceReport): default_factory=LossyList ) - in_manifest_but_missing_catalog: LossyList[str] = field(default_factory=LossyList) + nodes_filtered: LossyList[str] = field(default_factory=LossyList) class EmitDirective(ConfigEnum): @@ -528,6 +528,7 @@ class DBTNode: materialization: Optional[str] # table, view, ephemeral, incremental, snapshot # see https://docs.getdbt.com/reference/artifacts/manifest-json catalog_type: Optional[str] + missing_from_catalog: bool # indicates if the node was missing from the catalog.json owner: Optional[str] @@ -853,6 +854,9 @@ def get_column_type( TypeClass = resolve_postgres_modified_type(column_type) elif dbt_adapter == "vertica": TypeClass = resolve_vertica_modified_type(column_type) + elif dbt_adapter == "snowflake": + # Snowflake types are uppercase, so we check that. + TypeClass = _field_type_mapping.get(column_type.upper()) # if still not found, report the warning if TypeClass is None: @@ -1034,6 +1038,7 @@ def _filter_nodes(self, all_nodes: List[DBTNode]) -> List[DBTNode]: key = node.dbt_name if not self.config.node_name_pattern.allowed(key): + self.report.nodes_filtered.append(key) continue nodes.append(node) diff --git a/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_core.py b/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_core.py index e24c18147e4e61..1aad806e958f85 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_core.py +++ b/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_core.py @@ -58,6 +58,12 @@ class DBTCoreConfig(DBTCommonConfig): "See https://docs.getdbt.com/reference/artifacts/run-results-json.", ) + only_include_if_in_catalog: bool = Field( + default=False, + description="[experimental] If true, only include nodes that are also present in the catalog file. " + "This is useful if you only want to include models that have been built by the associated run.", + ) + # Because we now also collect model performance metadata, the "test_results" field was renamed to "run_results". _convert_test_results_path = pydantic_renamed_field( "test_results_path", "run_results_paths", transform=lambda x: [x] if x else [] @@ -156,6 +162,7 @@ def extract_dbt_entities( manifest_adapter: str, use_identifiers: bool, tag_prefix: str, + only_include_if_in_catalog: bool, report: DBTSourceReport, ) -> List[DBTNode]: sources_by_id = {x["unique_id"]: x for x in sources_results} @@ -194,12 +201,22 @@ def extract_dbt_entities( # It's a source catalog_node = all_catalog_entities.get(key) + missing_from_catalog = catalog_node is None catalog_type = None if catalog_node is None: - if materialization not in {"test", "ephemeral"}: + if materialization in {"test", "ephemeral"}: # Test and ephemeral nodes will never show up in the catalog. - report.in_manifest_but_missing_catalog.append(key) + missing_from_catalog = False + else: + if not only_include_if_in_catalog: + report.warning( + title="Node missing from catalog", + message="Found a node in the manifest file but not in the catalog. " + "This usually means the catalog file was not generated by `dbt docs generate` and so is incomplete. " + "Some metadata, such as column types and descriptions, will be impacted.", + context=key, + ) else: catalog_type = all_catalog_entities[key]["metadata"]["type"] @@ -264,6 +281,7 @@ def extract_dbt_entities( upstream_nodes=upstream_nodes, materialization=materialization, catalog_type=catalog_type, + missing_from_catalog=missing_from_catalog, meta=meta, query_tag=query_tag_props, tags=tags, @@ -291,14 +309,6 @@ def extract_dbt_entities( dbt_entities.append(dbtNode) - if report.in_manifest_but_missing_catalog: - # We still want this to show up as a warning, but don't want to spam the warnings section - # if there's a lot of them. - report.warning( - "in_manifest_but_missing_catalog", - f"Found {len(report.in_manifest_but_missing_catalog)} nodes in manifest but not in catalog. See in_manifest_but_missing_catalog for details.", - ) - return dbt_entities @@ -535,6 +545,7 @@ def loadManifestAndCatalog( manifest_adapter, self.config.use_identifiers, self.config.tag_prefix, + self.config.only_include_if_in_catalog, self.report, ) @@ -588,6 +599,23 @@ def load_nodes(self) -> Tuple[List[DBTNode], Dict[str, Optional[str]]]: return all_nodes, additional_custom_props + def _filter_nodes(self, all_nodes: List[DBTNode]) -> List[DBTNode]: + nodes = super()._filter_nodes(all_nodes) + + if not self.config.only_include_if_in_catalog: + return nodes + + filtered_nodes = [] + for node in nodes: + if node.missing_from_catalog: + # TODO: We need to do some additional testing of this flag to validate that it doesn't + # drop important things entirely (e.g. sources). + self.report.nodes_filtered.append(node.dbt_name) + else: + filtered_nodes.append(node) + + return filtered_nodes + def get_external_url(self, node: DBTNode) -> Optional[str]: if self.config.git_info and node.dbt_file_path: return self.config.git_info.get_url_for_file_path(node.dbt_file_path)