From d466de1e9df62d7c8900d34f25110aaec77b1932 Mon Sep 17 00:00:00 2001 From: Harshal Sheth Date: Tue, 9 Jan 2024 21:59:44 -0500 Subject: [PATCH 1/3] feat(ingest/tableau): add retries on OSErrors Plus more types. --- .../src/datahub/ingestion/source/tableau.py | 30 ++++++++++++++++++- .../ingestion/source/tableau_common.py | 11 +++++-- 2 files changed, 38 insertions(+), 3 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/tableau.py b/metadata-ingestion/src/datahub/ingestion/source/tableau.py index ed5fe543310b8f..c1865c44abc681 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/tableau.py +++ b/metadata-ingestion/src/datahub/ingestion/source/tableau.py @@ -732,7 +732,10 @@ def get_connection_object_page( count: int = 0, offset: int = 0, retry_on_auth_error: bool = True, + retries_remaining: Optional[int] = None, ) -> Tuple[dict, int, int]: + retries_remaining = retries_remaining or self.config.max_retries + logger.debug( f"Query {connection_type} to get {count} objects with offset {offset}" ) @@ -749,7 +752,31 @@ def get_connection_object_page( # will be thrown and we need to re-authenticate and retry. self._authenticate() return self.get_connection_object_page( - query, connection_type, query_filter, count, offset, False + query, + connection_type, + query_filter, + count, + offset, + retry_on_auth_error=False, + retries_remaining=retries_remaining, + ) + except OSError: + # In tableauseverclient 0.26 (which was yanked and released in 0.28 on 2023-10-04), + # the request logic was changed to use threads. + # https://github.com/tableau/server-client-python/commit/307d8a20a30f32c1ce615cca7c6a78b9b9bff081 + # I'm not exactly sure why, but since then, we now occasionally see + # `OSError: Response is not a http response?` for some requests. This + # retry logic is basically a bandaid for that. + if retries_remaining <= 0: + raise + return self.get_connection_object_page( + query, + connection_type, + query_filter, + count, + offset, + retry_on_auth_error=False, + retries_remaining=retries_remaining - 1, ) if c.ERRORS in query_data: @@ -2503,3 +2530,4 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]: def get_report(self) -> TableauSourceReport: return self.report + return self.report diff --git a/metadata-ingestion/src/datahub/ingestion/source/tableau_common.py b/metadata-ingestion/src/datahub/ingestion/source/tableau_common.py index 65d779b7f4516d..2d009a00544472 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/tableau_common.py +++ b/metadata-ingestion/src/datahub/ingestion/source/tableau_common.py @@ -5,6 +5,7 @@ from typing import Dict, List, Optional, Tuple from pydantic.fields import Field +from tableauserverclient import Server import datahub.emitter.mce_builder as builder from datahub.configuration.common import ConfigModel @@ -699,7 +700,6 @@ def get_overridden_info( platform_instance_map: Optional[Dict[str, str]], lineage_overrides: Optional[TableauLineageOverrides] = None, ) -> Tuple[Optional[str], Optional[str], str, str]: - original_platform = platform = get_platform(connection_type) if ( lineage_overrides is not None @@ -824,7 +824,14 @@ def clean_query(query: str) -> str: return query -def query_metadata(server, main_query, connection_name, first, offset, qry_filter=""): +def query_metadata( + server: Server, + main_query: str, + connection_name: str, + first: int, + offset: int, + qry_filter: str = "", +): query = """{{ {connection_name} (first:{first}, offset:{offset}, filter:{{{filter}}}) {{ From 936b85151a490150e393ca50d472bc428edcbdaa Mon Sep 17 00:00:00 2001 From: Harshal Sheth Date: Wed, 10 Jan 2024 13:01:55 -0500 Subject: [PATCH 2/3] fix lint --- metadata-ingestion/src/datahub/ingestion/source/tableau.py | 1 + .../src/datahub/ingestion/source/tableau_common.py | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/tableau.py b/metadata-ingestion/src/datahub/ingestion/source/tableau.py index c1865c44abc681..ee40890adc1a47 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/tableau.py +++ b/metadata-ingestion/src/datahub/ingestion/source/tableau.py @@ -740,6 +740,7 @@ def get_connection_object_page( f"Query {connection_type} to get {count} objects with offset {offset}" ) try: + assert self.server is not None query_data = query_metadata( self.server, query, connection_type, count, offset, query_filter ) diff --git a/metadata-ingestion/src/datahub/ingestion/source/tableau_common.py b/metadata-ingestion/src/datahub/ingestion/source/tableau_common.py index 2d009a00544472..a2f460feca3888 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/tableau_common.py +++ b/metadata-ingestion/src/datahub/ingestion/source/tableau_common.py @@ -831,7 +831,7 @@ def query_metadata( first: int, offset: int, qry_filter: str = "", -): +) -> dict: query = """{{ {connection_name} (first:{first}, offset:{offset}, filter:{{{filter}}}) {{ From 50a62d980811704f5bcef1a378f5e1517d5103e2 Mon Sep 17 00:00:00 2001 From: Harshal Sheth Date: Wed, 10 Jan 2024 17:02:05 -0500 Subject: [PATCH 3/3] fix lint --- metadata-ingestion/src/datahub/ingestion/source/tableau.py | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/tableau.py b/metadata-ingestion/src/datahub/ingestion/source/tableau.py index ee40890adc1a47..6c86062ecbeea5 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/tableau.py +++ b/metadata-ingestion/src/datahub/ingestion/source/tableau.py @@ -792,11 +792,7 @@ def get_connection_object_page( else: raise RuntimeError(f"Query {connection_type} error: {errors}") - connection_object = ( - query_data.get(c.DATA).get(connection_type, {}) - if query_data.get(c.DATA) - else {} - ) + connection_object = query_data.get(c.DATA, {}).get(connection_type, {}) total_count = connection_object.get(c.TOTAL_COUNT, 0) has_next_page = connection_object.get(c.PAGE_INFO, {}).get(