Skip to content

Commit

Permalink
fix: PR comments
Browse files Browse the repository at this point in the history
  • Loading branch information
sagar-salvi-apptware committed Oct 14, 2024
1 parent 0ddd6fc commit ec86b1f
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@ class BigQueryV2Report(
usage_start_time: Optional[datetime] = None
usage_end_time: Optional[datetime] = None
stateful_usage_ingestion_enabled: bool = False
num_skipped_external_table_lineage: int = 0

queries_extractor: Optional[BigQueryQueriesExtractorReport] = None

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,9 @@ def __init__(
self.snapshots_by_ref: FileBackedDict[BigqueryTableSnapshot] = FileBackedDict()
# Add External BQ table
self.external_tables: Dict[str, BigqueryTable] = defaultdict()
self.bq_external_table_pattern = (
r".*create\s+external\s+table\s+`?(?:project_id\.)?.*`?"
)

@property
def store_table_refs(self):
Expand Down Expand Up @@ -823,11 +826,9 @@ def gen_dataset_workunits(
# Added for bigquery to gcs lineage extraction
if (
isinstance(table, BigqueryTable)
and table.table_type is not None
and table.table_type == "EXTERNAL"
and table.ddl is not None
and f"CREATE EXTERNAL TABLE `{project_id}.{dataset_name}.{table.name}`"
in table.ddl
and re.search(self.bq_external_table_pattern, table.ddl, re.IGNORECASE)
):
self.external_tables[dataset_urn] = table

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -941,11 +941,19 @@ def gen_lineage_workunits_for_external_table(
# Expect URIs in `uris=[""]` format
uris_match = self.gcs_uris_regex.search(ddl)
if not uris_match:
self.report.num_skipped_external_table_lineage += 1
logger.warning(f"Unable to parse GCS URI from the provided DDL {ddl}.")
return

uris_str = uris_match.group(1)
source_uris = json.loads(f"[{uris_str}]")
try:
source_uris = json.loads(f"[{uris_str}]")
except json.JSONDecodeError as e:
self.report.num_skipped_external_table_lineage += 1
logger.warning(
f"Json load failed on loading source uri with error: {e}. The field value was: {uris_str}"
)
return

lineage_info = self.get_lineage_for_external_table(
dataset_urn=dataset_urn,
Expand Down

0 comments on commit ec86b1f

Please sign in to comment.