Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ingestion/snowflake):adds streams as a new dataset with lineage and properties. #12318

Open
wants to merge 44 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
a268e3e
merged changes from master
brock-acryl Jan 13, 2025
b7dfa7c
- moved stream_pattern from sql_config.py to snowflake_config.py
brock-acryl Jan 14, 2025
30f2e53
added streams to docs
brock-acryl Jan 14, 2025
e0b3c3b
- removed unused method
brock-acryl Jan 15, 2025
f0124e2
Merge branch 'master' into snowflake-streams-v2
brock-acryl Jan 15, 2025
2ff19ba
merge changes
brock-acryl Jan 15, 2025
fd87ca9
lintfix
brock-acryl Jan 16, 2025
11c6b8c
Refactored _process_schema into smaller functions
brock-acryl Jan 16, 2025
b06d0bd
fixed streams_for_database
brock-acryl Jan 16, 2025
2968f6c
Merge branch 'master' into snowflake-streams-v2
brock-acryl Jan 16, 2025
5f25f3c
updated pytests and golden files
brock-acryl Jan 16, 2025
18770a9
lintfix
brock-acryl Jan 17, 2025
f726f38
code review updates.
brock-acryl Jan 17, 2025
59c21c7
Merge branch 'master' into snowflake-streams-v2
brock-acryl Jan 17, 2025
0d95fcd
lint
brock-acryl Jan 17, 2025
6ad3f70
lint
brock-acryl Jan 17, 2025
c20aa2d
updated tests
brock-acryl Jan 17, 2025
67b8212
Merge branch 'master' into snowflake-streams-v2
brock-acryl Jan 17, 2025
af9d421
updated tests
brock-acryl Jan 17, 2025
c1f0be8
updated reporting
brock-acryl Jan 18, 2025
b97724a
Merge branch 'master' into snowflake-streams-v2
brock-acryl Jan 18, 2025
07cf0bd
Merge branch 'snowflake-streams-v2' of github.com:brock-acryl/datahub…
brock-acryl Jan 18, 2025
623ecb5
Merge branch 'master' into snowflake-streams-v2
brock-acryl Jan 22, 2025
ade6503
- Updated docs with required permissions
brock-acryl Jan 22, 2025
d67a22f
Merge branch 'master' into snowflake-streams-v2
brock-acryl Jan 24, 2025
3a25dd9
- added logic to filter information schema columns for only
brock-acryl Jan 24, 2025
96b16d2
Merge branch 'master' into snowflake-streams-v2
brock-acryl Jan 24, 2025
13bb2ec
- added logic to filter information schema columns for only
brock-acryl Jan 24, 2025
f46a418
Update metadata-ingestion/src/datahub/ingestion/source/snowflake/snow…
brock-acryl Jan 24, 2025
5199366
Update metadata-ingestion/src/datahub/ingestion/source/snowflake/snow…
brock-acryl Jan 24, 2025
150b12a
Merge branch 'master' into snowflake-streams-v2
brock-acryl Jan 24, 2025
b719ffb
- removed tags from streams, streams don't have tags on columns
brock-acryl Jan 24, 2025
7b68003
- removed tags from streams, streams don't have columns so they don't…
brock-acryl Jan 24, 2025
a41aac4
- updated golden files
brock-acryl Jan 24, 2025
8b41101
- test fix
brock-acryl Jan 24, 2025
619efc3
- test fix
brock-acryl Jan 24, 2025
0527c23
reverted to add_known_lineage_mapping for streams
brock-acryl Jan 28, 2025
217c295
Merge branch 'master' into snowflake-streams-v2
brock-acryl Jan 28, 2025
68cbe87
Merge branch 'snowflake-streams-v2' of github.com:brock-acryl/datahub…
brock-acryl Jan 28, 2025
16bdca2
Update metadata-ingestion/src/datahub/ingestion/source/snowflake/snow…
brock-acryl Jan 30, 2025
457dab7
removed extra yield
brock-acryl Jan 31, 2025
21cb6c2
Merge branch 'master' into snowflake-streams-v2
brock-acryl Feb 3, 2025
f82685e
lintfix
brock-acryl Feb 4, 2025
0756e5d
Merge branch 'master' into snowflake-streams-v2
brock-acryl Feb 4, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ class DatasetSubTypes(StrEnum):
SAC_LIVE_DATA_MODEL = "Live Data Model"
NEO4J_NODE = "Neo4j Node"
NEO4J_RELATIONSHIP = "Neo4j Relationship"
SNOWFLAKE_STREAM = "Snowflake Stream"

# TODO: Create separate entity...
NOTEBOOK = "Notebook"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ class SnowflakeObjectDomain(StrEnum):
SCHEMA = "schema"
COLUMN = "column"
ICEBERG_TABLE = "iceberg table"
STREAM = "stream"


GENERIC_PERMISSION_ERROR_KEY = "permission-error"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,16 @@ class SnowflakeV2Config(
description="List of regex patterns for tags to include in ingestion. Only used if `extract_tags` is enabled.",
)

include_streams: bool = Field(
default=True,
description="If enabled, streams will be ingested as separate entities from tables/views.",
)

stream_pattern: AllowDenyPattern = Field(
default=AllowDenyPattern.allow_all(),
description="Regex patterns for streams to filter in ingestion.",
)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe this is redundant, since it inherits from SnowflakeFilterConfig

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed the code


# This is required since access_history table does not capture whether the table was temporary table.
temporary_tables_pattern: List[str] = Field(
default=DEFAULT_TEMP_TABLES_PATTERNS,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
)
from datahub.ingestion.source.snowflake.snowflake_query import SnowflakeQuery
from datahub.ingestion.source.snowflake.snowflake_report import SnowflakeV2Report
from datahub.ingestion.source.snowflake.snowflake_schema import SnowflakeStream
from datahub.ingestion.source.snowflake.snowflake_utils import (
SnowflakeCommonMixin,
SnowflakeFilter,
Expand Down Expand Up @@ -540,3 +541,35 @@

def close(self) -> None:
pass

def populate_stream_upstreams(self, streams: List[SnowflakeStream]) -> None:
try:
for stream in streams:
stream_fully_qualified = ".".join(

Check warning on line 548 in metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_lineage_v2.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_lineage_v2.py#L546-L548

Added lines #L546 - L548 were not covered by tests
[
stream.database_name.lower(),
stream.schema_name.lower(),
stream.name.lower(),
]
)

logger.info(

Check warning on line 556 in metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_lineage_v2.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_lineage_v2.py#L556

Added line #L556 was not covered by tests
f"Stream Info: upstream_urn: {stream.table_name.lower()}, downstream_urn: {stream_fully_qualified}"
)
known_lineage = KnownLineageMapping(

Check warning on line 559 in metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_lineage_v2.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_lineage_v2.py#L559

Added line #L559 was not covered by tests
upstream_urn=self.identifiers.gen_dataset_urn(
stream.table_name.lower()
),
downstream_urn=self.identifiers.gen_dataset_urn(
stream_fully_qualified
),
)
self.report.num_streams_with_known_upstreams += 1
self.sql_aggregator.add(known_lineage)
except Exception as e:
logger.debug(e, exc_info=e)
self.warn_if_stateful_else_error(

Check warning on line 571 in metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_lineage_v2.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_lineage_v2.py#L567-L571

Added lines #L567 - L571 were not covered by tests
"stream-lineage",
f"Failed to extract stream lineage due to error {e}",
)
self.report_status("STREAM_LINEAGE", False)

Check warning on line 575 in metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_lineage_v2.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_lineage_v2.py#L575

Added line #L575 was not covered by tests
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
from datahub.sql_parsing.schema_resolver import SchemaResolver
from datahub.sql_parsing.sql_parsing_aggregator import (
KnownLineageMapping,
ObservedQuery,
PreparsedQuery,
SqlAggregatorReport,
SqlParsingAggregator,
Expand Down Expand Up @@ -398,6 +399,36 @@
pass
else:
return None

user = CorpUserUrn(

Check warning on line 403 in metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_queries.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_queries.py#L403

Added line #L403 was not covered by tests
self.identifiers.get_user_identifier(
res["user_name"], users.get(res["user_name"])
)
)

# Check if any of the accessed objects are streams
has_stream_objects = any(

Check warning on line 410 in metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_queries.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_queries.py#L410

Added line #L410 was not covered by tests
obj.get("objectDomain") == "Stream" for obj in direct_objects_accessed
)

# If a stream is used, default to query parsing.
mayurinehate marked this conversation as resolved.
Show resolved Hide resolved
if has_stream_objects:
logger.debug("Found matching stream object")
self.aggregator.add_observed_query(

Check warning on line 417 in metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_queries.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_queries.py#L415-L417

Added lines #L415 - L417 were not covered by tests
mayurinehate marked this conversation as resolved.
Show resolved Hide resolved
observed=ObservedQuery(
query=res["query_text"],
session_id=res["session_id"],
timestamp=res["query_start_time"].astimezone(timezone.utc),
user=user,
default_db=res["default_db"],
default_schema=res["default_schema"],
query_hash=get_query_fingerprint(
res["query_text"], self.identifiers.platform, fast=True
),
),
)
return None

Check warning on line 430 in metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_queries.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_queries.py#L430

Added line #L430 was not covered by tests

upstreams = []
column_usage = {}

Expand Down Expand Up @@ -460,12 +491,6 @@
)
)

user = CorpUserUrn(
self.identifiers.get_user_identifier(
res["user_name"], users.get(res["user_name"])
)
)

brock-acryl marked this conversation as resolved.
Show resolved Hide resolved
timestamp: datetime = res["query_start_time"]
timestamp = timestamp.astimezone(timezone.utc)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from datahub.utilities.prefix_batch_builder import PrefixGroup

SHOW_VIEWS_MAX_PAGE_SIZE = 10000
SHOW_STREAM_MAX_PAGE_SIZE = 10000


def create_deny_regex_sql_filter(
Expand Down Expand Up @@ -36,6 +37,7 @@
SnowflakeObjectDomain.VIEW.capitalize(),
SnowflakeObjectDomain.MATERIALIZED_VIEW.capitalize(),
SnowflakeObjectDomain.ICEBERG_TABLE.capitalize(),
SnowflakeObjectDomain.STREAM.capitalize(),
}

ACCESS_HISTORY_TABLE_VIEW_DOMAINS_FILTER = "({})".format(
Expand All @@ -44,7 +46,8 @@
ACCESS_HISTORY_TABLE_DOMAINS_FILTER = (
"("
f"'{SnowflakeObjectDomain.TABLE.capitalize()}',"
f"'{SnowflakeObjectDomain.VIEW.capitalize()}'"
f"'{SnowflakeObjectDomain.VIEW.capitalize()}',"
f"'{SnowflakeObjectDomain.STREAM.capitalize()}',"
")"
)

Expand Down Expand Up @@ -952,3 +955,25 @@
@staticmethod
def get_all_users() -> str:
return """SELECT name as "NAME", email as "EMAIL" FROM SNOWFLAKE.ACCOUNT_USAGE.USERS"""

@staticmethod
def streams_for_database(
db_name: str,
limit: int = SHOW_STREAM_MAX_PAGE_SIZE,
stream_pagination_marker: Optional[str] = None,
) -> str:
# SHOW VIEWS can return a maximum of 10000 rows.
brock-acryl marked this conversation as resolved.
Show resolved Hide resolved
# https://docs.snowflake.com/en/sql-reference/sql/show-streams#usage-notes
assert limit <= SHOW_STREAM_MAX_PAGE_SIZE

# To work around this, we paginate through the results using the FROM clause.
from_clause = (
f"""FROM '{stream_pagination_marker}'"""
if SHOW_STREAM_MAX_PAGE_SIZE
else ""
)
return f"""SHOW STREAMS IN DATABASE {db_name} LIMIT {limit} {from_clause};"""

@staticmethod
def streams_for_schema(schema_name: str, db_name: str) -> str:
return f"""SHOW STREAMS IN SCHEMA {db_name}.{schema_name}"""

Check warning on line 979 in metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_query.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_query.py#L979

Added line #L979 was not covered by tests
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
num_table_to_view_edges_scanned: int = 0
num_view_to_table_edges_scanned: int = 0
num_external_table_edges_scanned: int = 0
num_stream_edges_scanned: int = 0
mayurinehate marked this conversation as resolved.
Show resolved Hide resolved
ignore_start_time_lineage: Optional[bool] = None
upstream_lineage_in_report: Optional[bool] = None
upstream_lineage: Dict[str, List[str]] = field(default_factory=dict)
Expand Down Expand Up @@ -103,6 +104,7 @@
schemas_scanned: int = 0
databases_scanned: int = 0
tags_scanned: int = 0
streams_scanned: int = 0

include_usage_stats: bool = False
include_operational_stats: bool = False
Expand All @@ -112,6 +114,7 @@
table_lineage_query_secs: float = -1
external_lineage_queries_secs: float = -1
num_tables_with_known_upstreams: int = 0
num_streams_with_known_upstreams: int = 0
mayurinehate marked this conversation as resolved.
Show resolved Hide resolved
num_upstream_lineage_edge_parsing_failed: int = 0
num_secure_views_missing_definition: int = 0

Expand All @@ -129,6 +132,8 @@
num_get_tags_for_object_queries: int = 0
num_get_tags_on_columns_for_table_queries: int = 0

num_get_streams_for_schema_queries: int = 0

rows_zero_objects_modified: int = 0

_processed_tags: MutableSet[str] = field(default_factory=set)
Expand All @@ -155,6 +160,8 @@
return
self._scanned_tags.add(name)
self.tags_scanned += 1
elif ent_type == "stream":
self.streams_scanned += 1

Check warning on line 164 in metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_report.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_report.py#L163-L164

Added lines #L163 - L164 were not covered by tests
else:
raise KeyError(f"Unknown entity {ent_type}.")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@
comment: Optional[str]
tables: List[str] = field(default_factory=list)
views: List[str] = field(default_factory=list)
streams: List[str] = field(default_factory=list)
tags: Optional[List[SnowflakeTag]] = None


Expand All @@ -128,6 +129,29 @@
tags: Optional[List[SnowflakeTag]] = None


@dataclass
class SnowflakeStream:
name: str
created: datetime
owner: str
comment: str
mayurinehate marked this conversation as resolved.
Show resolved Hide resolved
source_type: str
type: str
stale: str
mode: str
invalid_reason: str
owner_role_type: str
database_name: str
schema_name: str
table_name: str
columns: List[SnowflakeColumn] = field(default_factory=list)
stale_after: Optional[datetime] = None
base_tables: Optional[str] = None
tags: Optional[List[SnowflakeTag]] = None
column_tags: Dict[str, List[SnowflakeTag]] = field(default_factory=dict)
last_altered: Optional[datetime] = None


class _SnowflakeTagCache:
def __init__(self) -> None:
# self._database_tags[<database_name>] = list of tags applied to database
Expand Down Expand Up @@ -205,6 +229,7 @@
self.get_tables_for_database,
self.get_views_for_database,
self.get_columns_for_schema,
self.get_streams_for_database,
self.get_pk_constraints_for_schema,
self.get_fk_constraints_for_schema,
]
Expand Down Expand Up @@ -591,3 +616,103 @@
tags[column_name].append(snowflake_tag)

return tags

@serialized_lru_cache(maxsize=1)
def get_streams_for_database(
self, db_name: str
) -> Dict[str, List[SnowflakeStream]]:
page_limit = SHOW_VIEWS_MAX_PAGE_SIZE

streams: Dict[str, List[SnowflakeStream]] = {}

first_iteration = True
stream_pagination_marker: Optional[str] = None
while first_iteration or stream_pagination_marker is not None:
cur = self.connection.query(
SnowflakeQuery.streams_for_database(
db_name,
limit=page_limit,
stream_pagination_marker=stream_pagination_marker,
)
)

first_iteration = False
stream_pagination_marker = None

Check warning on line 640 in metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_schema.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_schema.py#L639-L640

Added lines #L639 - L640 were not covered by tests

result_set_size = 0
for stream in cur:
result_set_size += 1

Check warning on line 644 in metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_schema.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_schema.py#L642-L644

Added lines #L642 - L644 were not covered by tests

stream_name = stream["name"]
schema_name = stream["schema_name"]
if schema_name not in streams:
streams[schema_name] = []
streams[stream["schema_name"]].append(

Check warning on line 650 in metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_schema.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_schema.py#L646-L650

Added lines #L646 - L650 were not covered by tests
SnowflakeStream(
name=stream["name"],
created=stream["created_on"],
owner=stream["owner"],
comment=stream["comment"],
source_type=stream["source_type"],
type=stream["type"],
stale=stream["stale"],
mode=stream["mode"],
database_name=stream["database_name"],
schema_name=stream["schema_name"],
invalid_reason=stream["invalid_reason"],
owner_role_type=stream["owner_role_type"],
stale_after=stream["stale_after"],
table_name=stream["table_name"],
base_tables=stream["base_tables"],
last_altered=stream["created_on"],
)
)

if result_set_size >= page_limit:

Check warning on line 671 in metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_schema.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_schema.py#L671

Added line #L671 was not covered by tests
# If we hit the limit, we need to send another request to get the next page.
logger.info(

Check warning on line 673 in metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_schema.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_schema.py#L673

Added line #L673 was not covered by tests
f"Fetching next page of views for {db_name} - after {stream_name}"
mayurinehate marked this conversation as resolved.
Show resolved Hide resolved
)
stream_pagination_marker = stream_name

Check warning on line 676 in metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_schema.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_schema.py#L676

Added line #L676 was not covered by tests

return streams

Check warning on line 678 in metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_schema.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_schema.py#L678

Added line #L678 was not covered by tests

@serialized_lru_cache(maxsize=1)
def get_streams_for_schema(
self, schema_name: str, db_name: str
) -> Optional[List[SnowflakeStream]]:
try:
streams: List[SnowflakeStream] = []
cur = self.connection.query(

Check warning on line 686 in metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_schema.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_schema.py#L684-L686

Added lines #L684 - L686 were not covered by tests
SnowflakeQuery.streams_for_schema(schema_name, db_name),
)

for stream in cur:
streams.append(

Check warning on line 691 in metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_schema.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_schema.py#L690-L691

Added lines #L690 - L691 were not covered by tests
SnowflakeStream(
name=stream["name"],
created=stream["created_on"],
owner=stream["owner"],
comment=stream["comment"],
source_type=stream["source_type"],
type=stream["type"],
stale=stream["stale"],
mode=stream["mode"],
database_name=stream["database_name"],
schema_name=stream["schema_name"],
invalid_reason=stream["invalid_reason"],
owner_role_type=stream["owner_role_type"],
stale_after=stream["stale_after"],
table_name=stream["table_name"],
base_tables=stream["base_tables"],
last_altered=stream["created_on"],
)
)
return streams

Check warning on line 711 in metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_schema.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_schema.py#L711

Added line #L711 was not covered by tests

except Exception as e:
logger.debug(

Check warning on line 714 in metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_schema.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_schema.py#L713-L714

Added lines #L713 - L714 were not covered by tests
f"Failed to get all streams for schema - {db_name}.{schema_name}",
exc_info=e,
)
return None

Check warning on line 718 in metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_schema.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_schema.py#L718

Added line #L718 was not covered by tests
Loading
Loading