From e16a7921cb68a2e4d155fd05cd1e550430c96648 Mon Sep 17 00:00:00 2001
From: Mayuri N <mayuri.nehate@gslab.com>
Date: Tue, 28 Jan 2025 18:11:19 +0530
Subject: [PATCH 1/6] fix(ingest/unity): table level profile of large delta
 tables

---
 .../ingestion/source/ge_profiling_config.py   | 18 ++++++----
 .../ingestion/source/unity/ge_profiler.py     | 33 +++++++++++++++++++
 2 files changed, 44 insertions(+), 7 deletions(-)

diff --git a/metadata-ingestion/src/datahub/ingestion/source/ge_profiling_config.py b/metadata-ingestion/src/datahub/ingestion/source/ge_profiling_config.py
index 93142a347ca0e6..cce70b417f9e18 100644
--- a/metadata-ingestion/src/datahub/ingestion/source/ge_profiling_config.py
+++ b/metadata-ingestion/src/datahub/ingestion/source/ge_profiling_config.py
@@ -115,26 +115,30 @@ class GEProfilingConfig(GEProfilingBaseConfig):
     )
     max_number_of_fields_to_profile: Optional[pydantic.PositiveInt] = Field(
         default=None,
-        description="A positive integer that specifies the maximum number of columns to profile for any table. `None` implies all columns. The cost of profiling goes up significantly as the number of columns to profile goes up.",
+        description="A positive integer that specifies the maximum number of columns to profile for "
+        "any table. `None` implies all columns. The cost of profiling goes up significantly as the "
+        "number of columns to profile goes up.",
     )
 
     profile_if_updated_since_days: Optional[pydantic.PositiveFloat] = Field(
         default=None,
-        description="Profile table only if it has been updated since these many number of days. If set to `null`, no constraint of last modified time for tables to profile. Supported only in `snowflake` and `BigQuery`.",
+        description="Profile table only if it has been updated since these many number of days. "
+        "If set to `null`, no constraint of last modified time for tables to profile. "
+        "Supported only in `snowflake` and `BigQuery`.",
     )
 
     profile_table_size_limit: Optional[int] = Field(
         default=5,
         description="Profile tables only if their size is less than specified GBs. If set to `null`, "
-        "no limit on the size of tables to profile. Supported only in `snowflake` and `BigQuery`"
-        "Supported for `oracle` based on calculated size from gathered stats.",
+        "no limit on the size of tables to profile. Supported only in `Snowflake`, `BigQuery` and "
+        "`Databricks`. Supported for `oracle` based on calculated size from gathered stats.",
     )
 
     profile_table_row_limit: Optional[int] = Field(
         default=5000000,
-        description="Profile tables only if their row count is less than specified count. If set to `null`, "
-        "no limit on the row count of tables to profile. Supported only in `snowflake` and `BigQuery`"
-        "Supported for `oracle` based on gathered stats.",
+        description="Profile tables only if their row count is less than specified count. "
+        "If set to `null`, no limit on the row count of tables to profile. Supported only in "
+        "`Snowflake`, `BigQuery`. Supported for `oracle` based on gathered stats.",
     )
 
     profile_table_row_count_estimate_only: bool = Field(
diff --git a/metadata-ingestion/src/datahub/ingestion/source/unity/ge_profiler.py b/metadata-ingestion/src/datahub/ingestion/source/unity/ge_profiler.py
index e24ca8330777ed..276990ba1bbad4 100644
--- a/metadata-ingestion/src/datahub/ingestion/source/unity/ge_profiler.py
+++ b/metadata-ingestion/src/datahub/ingestion/source/unity/ge_profiler.py
@@ -117,6 +117,7 @@ def get_unity_profile_request(
 
         if table.size_in_bytes is None:
             self.report.num_profile_missing_size_in_bytes += 1
+
         if not self.is_dataset_eligible_for_profiling(
             dataset_name,
             size_in_bytes=table.size_in_bytes,
@@ -143,6 +144,17 @@ def get_unity_profile_request(
                 self.report.report_dropped(dataset_name)
             return None
 
+        if profile_table_level_only and table.size_in_bytes is not None:
+            # For requests with profile_table_level_only set, dataset profile is generated
+            # by looking at table.rows_count. For delta tables (a typical databricks table)
+            # count(*) is an efficient query to compute row count.
+            # Presence of size_in_bytes confirms this is DELTA table and that we have
+            # SELECT permission on this table.
+            try:
+                table.rows_count = _get_dataset_row_count(table, conn)
+            except Exception as e:
+                logger.warning(f"Failed to get table row count for {dataset_name}: {e}")
+
         self.report.report_entity_profiled(dataset_name)
         logger.debug(f"Preparing profiling request for {dataset_name}")
         return TableProfilerRequest(
@@ -160,6 +172,9 @@ def _get_dataset_size_in_bytes(
         conn.dialect.identifier_preparer.quote(c)
         for c in [table.ref.catalog, table.ref.schema, table.ref.table]
     )
+    # This query only works for delta table.
+    # Ref: https://docs.databricks.com/en/delta/table-details.html
+    # Note: Any change here should also update _get_dataset_row_count
     row = conn.execute(f"DESCRIBE DETAIL {name}").fetchone()
     if row is None:
         return None
@@ -168,3 +183,21 @@ def _get_dataset_size_in_bytes(
             return int(row._asdict()["sizeInBytes"])
         except Exception:
             return None
+
+
+def _get_dataset_row_count(
+    table: UnityCatalogSQLGenericTable, conn: Connection
+) -> Optional[int]:
+    name = ".".join(
+        conn.dialect.identifier_preparer.quote(c)
+        for c in [table.ref.catalog, table.ref.schema, table.ref.table]
+    )
+    # This query only works efficiently for delta table
+    row = conn.execute(f"select count(*) as numRows from {name}").fetchone()
+    if row is None:
+        return None
+    else:
+        try:
+            return int(row._asdict()["numRows"])
+        except Exception:
+            return None

From 7101d6c4c8e897967ba76f12d018d6af3b5ab304 Mon Sep 17 00:00:00 2001
From: Mayuri Nehate <33225191+mayurinehate@users.noreply.github.com>
Date: Tue, 28 Jan 2025 22:20:31 +0530
Subject: [PATCH 2/6] Update
 metadata-ingestion/src/datahub/ingestion/source/ge_profiling_config.py
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

Co-authored-by: Sergio Gómez Villamor <sgomezvillamor@gmail.com>
---
 .../src/datahub/ingestion/source/ge_profiling_config.py         | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/metadata-ingestion/src/datahub/ingestion/source/ge_profiling_config.py b/metadata-ingestion/src/datahub/ingestion/source/ge_profiling_config.py
index cce70b417f9e18..1b7b5fa6ec5fde 100644
--- a/metadata-ingestion/src/datahub/ingestion/source/ge_profiling_config.py
+++ b/metadata-ingestion/src/datahub/ingestion/source/ge_profiling_config.py
@@ -138,7 +138,7 @@ class GEProfilingConfig(GEProfilingBaseConfig):
         default=5000000,
         description="Profile tables only if their row count is less than specified count. "
         "If set to `null`, no limit on the row count of tables to profile. Supported only in "
-        "`Snowflake`, `BigQuery`. Supported for `oracle` based on gathered stats.",
+        "`Snowflake`, `BigQuery`. Supported for `Oracle` based on gathered stats.",
     )
 
     profile_table_row_count_estimate_only: bool = Field(

From 007ab225656025fe943b3e637a743fdc77a97e1e Mon Sep 17 00:00:00 2001
From: Mayuri Nehate <33225191+mayurinehate@users.noreply.github.com>
Date: Tue, 28 Jan 2025 22:20:38 +0530
Subject: [PATCH 3/6] Update
 metadata-ingestion/src/datahub/ingestion/source/ge_profiling_config.py
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

Co-authored-by: Sergio Gómez Villamor <sgomezvillamor@gmail.com>
---
 .../src/datahub/ingestion/source/ge_profiling_config.py         | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/metadata-ingestion/src/datahub/ingestion/source/ge_profiling_config.py b/metadata-ingestion/src/datahub/ingestion/source/ge_profiling_config.py
index 1b7b5fa6ec5fde..9c251c040bed13 100644
--- a/metadata-ingestion/src/datahub/ingestion/source/ge_profiling_config.py
+++ b/metadata-ingestion/src/datahub/ingestion/source/ge_profiling_config.py
@@ -131,7 +131,7 @@ class GEProfilingConfig(GEProfilingBaseConfig):
         default=5,
         description="Profile tables only if their size is less than specified GBs. If set to `null`, "
         "no limit on the size of tables to profile. Supported only in `Snowflake`, `BigQuery` and "
-        "`Databricks`. Supported for `oracle` based on calculated size from gathered stats.",
+        "`Databricks`. Supported for `Oracle` based on calculated size from gathered stats.",
     )
 
     profile_table_row_limit: Optional[int] = Field(

From 4615df03166308d127babffbf7221614afecccc3 Mon Sep 17 00:00:00 2001
From: Mayuri N <mayuri.nehate@gslab.com>
Date: Mon, 3 Feb 2025 19:48:44 +0530
Subject: [PATCH 4/6] is_delta_table property

---
 .../ingestion/source/unity/ge_profiler.py     | 34 ++++++++++++++-----
 .../datahub/ingestion/source/unity/report.py  |  1 +
 2 files changed, 27 insertions(+), 8 deletions(-)

diff --git a/metadata-ingestion/src/datahub/ingestion/source/unity/ge_profiler.py b/metadata-ingestion/src/datahub/ingestion/source/unity/ge_profiler.py
index 276990ba1bbad4..4487231ab5c007 100644
--- a/metadata-ingestion/src/datahub/ingestion/source/unity/ge_profiler.py
+++ b/metadata-ingestion/src/datahub/ingestion/source/unity/ge_profiler.py
@@ -3,6 +3,7 @@
 from dataclasses import dataclass, field
 from typing import Iterable, List, Optional
 
+from databricks.sdk.service.catalog import DataSourceFormat
 from sqlalchemy import create_engine
 from sqlalchemy.engine import Connection
 
@@ -34,6 +35,11 @@ def __init__(self, table: Table):
         self.size_in_bytes = None
         self.rows_count = None
         self.ddl = None
+        self.data_source_format = table.data_source_format
+
+    @property
+    def is_delta_table(self) -> bool:
+        return self.data_source_format == DataSourceFormat.DELTA.value
 
 
 class UnityCatalogGEProfiler(GenericProfiler):
@@ -110,10 +116,16 @@ def get_unity_profile_request(
         profile_table_level_only = self.profiling_config.profile_table_level_only
 
         dataset_name = table.ref.qualified_table_name
-        try:
-            table.size_in_bytes = _get_dataset_size_in_bytes(table, conn)
-        except Exception as e:
-            logger.warning(f"Failed to get table size for {dataset_name}: {e}")
+        if table.is_delta_table:
+            try:
+                table.size_in_bytes = _get_dataset_size_in_bytes(table, conn)
+            except Exception as e:
+                self.report.warning(
+                    title="Incomplete Dataset Profile",
+                    message="Failed to get table size",
+                    context=dataset_name,
+                    exc=e,
+                )
 
         if table.size_in_bytes is None:
             self.report.num_profile_missing_size_in_bytes += 1
@@ -144,16 +156,22 @@ def get_unity_profile_request(
                 self.report.report_dropped(dataset_name)
             return None
 
-        if profile_table_level_only and table.size_in_bytes is not None:
+        if profile_table_level_only and table.is_delta_table:
             # For requests with profile_table_level_only set, dataset profile is generated
             # by looking at table.rows_count. For delta tables (a typical databricks table)
             # count(*) is an efficient query to compute row count.
-            # Presence of size_in_bytes confirms this is DELTA table and that we have
-            # SELECT permission on this table.
             try:
                 table.rows_count = _get_dataset_row_count(table, conn)
             except Exception as e:
-                logger.warning(f"Failed to get table row count for {dataset_name}: {e}")
+                self.report.warning(
+                    title="Incomplete Dataset Profile",
+                    message="Failed to get table row count",
+                    context=dataset_name,
+                    exc=e,
+                )
+
+        if table.rows_count is None:
+            self.report.num_profile_missing_row_count += 1
 
         self.report.report_entity_profiled(dataset_name)
         logger.debug(f"Preparing profiling request for {dataset_name}")
diff --git a/metadata-ingestion/src/datahub/ingestion/source/unity/report.py b/metadata-ingestion/src/datahub/ingestion/source/unity/report.py
index f16769341853a1..2288514fb82388 100644
--- a/metadata-ingestion/src/datahub/ingestion/source/unity/report.py
+++ b/metadata-ingestion/src/datahub/ingestion/source/unity/report.py
@@ -52,6 +52,7 @@ class UnityCatalogReport(IngestionStageReport, SQLSourceReport):
         default_factory=LossyDict
     )
     num_profile_missing_size_in_bytes: int = 0
+    num_profile_missing_row_count: int = 0
     num_profile_failed_unsupported_column_type: int = 0
     num_profile_failed_int_casts: int = 0
 

From 50534ce9f8a115df036d0dc72fd00853fb4cd936 Mon Sep 17 00:00:00 2001
From: Mayuri N <mayuri.nehate@gslab.com>
Date: Mon, 3 Feb 2025 21:29:20 +0530
Subject: [PATCH 5/6] fix condition

---
 .../src/datahub/ingestion/source/unity/ge_profiler.py           | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/metadata-ingestion/src/datahub/ingestion/source/unity/ge_profiler.py b/metadata-ingestion/src/datahub/ingestion/source/unity/ge_profiler.py
index 4487231ab5c007..5192fda3462dda 100644
--- a/metadata-ingestion/src/datahub/ingestion/source/unity/ge_profiler.py
+++ b/metadata-ingestion/src/datahub/ingestion/source/unity/ge_profiler.py
@@ -39,7 +39,7 @@ def __init__(self, table: Table):
 
     @property
     def is_delta_table(self) -> bool:
-        return self.data_source_format == DataSourceFormat.DELTA.value
+        return self.data_source_format.value == DataSourceFormat.DELTA.value
 
 
 class UnityCatalogGEProfiler(GenericProfiler):

From 0e215f6b1c336a8d51240e6d4151c56984751672 Mon Sep 17 00:00:00 2001
From: Mayuri N <mayuri.nehate@gslab.com>
Date: Tue, 4 Feb 2025 16:06:48 +0530
Subject: [PATCH 6/6] fix condition

---
 .../src/datahub/ingestion/source/unity/ge_profiler.py           | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/metadata-ingestion/src/datahub/ingestion/source/unity/ge_profiler.py b/metadata-ingestion/src/datahub/ingestion/source/unity/ge_profiler.py
index 5192fda3462dda..a8658d476b87b0 100644
--- a/metadata-ingestion/src/datahub/ingestion/source/unity/ge_profiler.py
+++ b/metadata-ingestion/src/datahub/ingestion/source/unity/ge_profiler.py
@@ -39,7 +39,7 @@ def __init__(self, table: Table):
 
     @property
     def is_delta_table(self) -> bool:
-        return self.data_source_format.value == DataSourceFormat.DELTA.value
+        return self.data_source_format == DataSourceFormat.DELTA
 
 
 class UnityCatalogGEProfiler(GenericProfiler):