Skip to content

Commit

Permalink
fix linting issues
Browse files Browse the repository at this point in the history
  • Loading branch information
Yanik Häni authored and Yanik Häni committed Jul 5, 2024
1 parent 0727699 commit 485262b
Showing 1 changed file with 53 additions and 50 deletions.
103 changes: 53 additions & 50 deletions metadata-ingestion/src/datahub/ingestion/source/tableau.py
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,9 @@ class TableauConnectionConfig(ConfigModel):
def remove_trailing_slash(cls, v):
return config_clean.remove_trailing_slashes(v)

def get_tableau_auth(self, site: str) -> TableauAuth:
def get_tableau_auth(
self, site: str
) -> Union[TableauAuth, PersonalAccessTokenAuth]:
# https://tableau.github.io/server-client-python/docs/api-ref#authentication
authentication: Union[TableauAuth, PersonalAccessTokenAuth]
if self.username and self.password:
Expand All @@ -227,7 +229,9 @@ def get_tableau_auth(self, site: str) -> TableauAuth:
return authentication

def make_tableau_client(self, site: str) -> Server:
authentication: Union[TableauAuth, PersonalAccessTokenAuth] = self.get_tableau_auth(site)
authentication: Union[
TableauAuth, PersonalAccessTokenAuth
] = self.get_tableau_auth(site)
try:
server = Server(
self.connect_uri,
Expand Down Expand Up @@ -413,7 +417,7 @@ class TableauConfig(
"By default, all sites will be included in the ingestion. "
"You can both allow and deny sites based on their name using their name, or a Regex pattern. "
"Deny patterns always take precedence over allow patterns. "
"This property is currently only supported for Tableau Server. Tableau Cloud is not supported. "
"This property is currently only supported for Tableau Server. Tableau Cloud is not supported. ",
)

add_site_container: bool = Field(
Expand Down Expand Up @@ -557,18 +561,17 @@ def __hash__(self):
return id(self)

def __init__(
self,
config: TableauConfig,
ctx: PipelineContext,
self,
config: TableauConfig,
ctx: PipelineContext,
):
super().__init__(config, ctx)
self.config: TableauConfig = config
self.report: TableauSourceReport = TableauSourceReport()
self.server: Optional[Server] = None
self._authenticate(self.config.site)


def _authenticate(self, site_content_url) -> Server:
def _authenticate(self, site_content_url: str) -> None:
try:
logger.info(f"Authenticated to Tableau site: '{site_content_url}'")
self.server = self.config.make_tableau_client(site_content_url)
Expand All @@ -578,6 +581,7 @@ def _authenticate(self, site_content_url) -> Server:
key="tableau-login",
reason=str(e),
)

@staticmethod
def test_connection(config_dict: dict) -> TestConnectionReport:
test_report = TestConnectionReport()
Expand All @@ -591,17 +595,14 @@ def test_connection(config_dict: dict) -> TestConnectionReport:
)
return test_report


def get_report(self) -> TableauSourceReport:
return self.report


@classmethod
def create(cls, config_dict: dict, ctx: PipelineContext) -> Source:
config = TableauConfig.parse_obj(config_dict)
return cls(config, ctx)


def get_workunit_processors(self) -> List[Optional[MetadataWorkUnitProcessor]]:
return [
*super().get_workunit_processors(),
Expand All @@ -617,26 +618,41 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
if self.config.ingest_multiple_sites:
for site in TSC.Pager(self.server.sites):
if (
site.state != "Active"
or not self.config.site_name_pattern.allowed(site.name)
site.state != "Active"
or not self.config.site_name_pattern.allowed(site.name)
):
logger.info(f"Skip site '{site.name}' as it's excluded in site_name_pattern or inactive.")
logger.info(
f"Skip site '{site.name}' as it's excluded in site_name_pattern or inactive."
)
continue
self.server.auth.switch_site(site)
site_source = TableauSiteSource(config=self.config, ctx=self.ctx, site=site, report=self.report, server=self.server, platform=self.platform)
site_source = TableauSiteSource(
config=self.config,
ctx=self.ctx,
site=site,
report=self.report,
server=self.server,
platform=self.platform,
)
logger.info(f"Ingesting assets of site '{site.content_url}'.")
yield from site_source.ingest_tableau_site()
else:
site = self.server.sites.get_by_id(self.server.site_id)
site_source = TableauSiteSource(config=self.config, ctx=self.ctx, site=site, report=self.report, server=self.server, platform=self.platform)
site_source = TableauSiteSource(
config=self.config,
ctx=self.ctx,
site=site,
report=self.report,
server=self.server,
platform=self.platform,
)
yield from site_source.ingest_tableau_site()
except MetadataQueryException as md_exception:
self.report.failure(
key="tableau-metadata",
reason=f"Unable to retrieve metadata from tableau. Information: {str(md_exception)}",
)


def close(self) -> None:
try:
if self.server is not None:
Expand All @@ -659,7 +675,7 @@ def __init__(
site: SiteItem,
report: TableauSourceReport,
server: Server,
platform: str
platform: str,
):
self.config: TableauConfig = config
self.report = report
Expand Down Expand Up @@ -707,9 +723,7 @@ def no_env_browse_prefix(self) -> str:
@property
def site_name_browse_path(self) -> str:
site_name_prefix = (
self.site.name
if self.site and self.config.add_site_container
else ""
self.site.name if self.site and self.config.add_site_container else ""
)
return f"/{site_name_prefix}" if site_name_prefix else ""

Expand All @@ -718,9 +732,10 @@ def dataset_browse_prefix(self) -> str:
# datasets also have the env in the browse path
return f"/{self.config.env.lower()}{self.no_env_browse_prefix}"


def _re_authenticate(self):
tableau_auth: Union[TableauAuth, PersonalAccessTokenAuth] = self.config.get_tableau_auth(self.site.content_url)
tableau_auth: Union[
TableauAuth, PersonalAccessTokenAuth
] = self.config.get_tableau_auth(self.site.content_url)
self.server.auth.sign_in(tableau_auth)

def _populate_usage_stat_registry(self) -> None:
Expand Down Expand Up @@ -834,6 +849,12 @@ def _init_tableau_project_registry(self, all_project_map: dict) -> None:
logger.debug(f"Project {project.name} is added in project registry")
projects_to_ingest[project.id] = project

# We rely on automatic browse paths (v2) when creating containers. That's why we need to sort the projects here.
# Otherwise, nested projects will not have the correct browse paths if not created in correct order / hierarchy.
self.tableau_project_registry = OrderedDict(
sorted(projects_to_ingest.items(), key=lambda item: len(item[1].path))
)

if self.config.extract_project_hierarchy is False:
logger.debug(
"Skipping project hierarchy processing as configuration extract_project_hierarchy is "
Expand All @@ -845,17 +866,11 @@ def _init_tableau_project_registry(self, all_project_map: dict) -> None:

for project in list_of_skip_projects:
if (
project.parent_id in projects_to_ingest
project.parent_id in self.tableau_project_registry
and self._is_denied_project(project) is False
):
logger.debug(f"Project {project.name} is added in project registry")
projects_to_ingest[project.id] = project

# We rely on automatic browse paths (v2) when creating containers. That's why we need to sort the projects here.
# Otherwise, nested projects will not have the correct browse paths if not created in correct order / hierarchy.
self.tableau_project_registry = OrderedDict(
sorted(projects_to_ingest.items(), key=lambda item: len(item[1].path))
)
self.tableau_project_registry[project.id] = project

def _init_datasource_registry(self) -> None:
if self.server is None:
Expand Down Expand Up @@ -1528,7 +1543,7 @@ def emit_custom_sql_datasources(self) -> Iterable[MetadataWorkUnit]:

datasource_name = None
project = None
columns = []
columns: List[Dict[Any, Any]] = []
if len(csql[c.DATA_SOURCES]) > 0:
# CustomSQLTable id owned by exactly one tableau data source
logger.debug(
Expand Down Expand Up @@ -1576,7 +1591,7 @@ def emit_custom_sql_datasources(self) -> Iterable[MetadataWorkUnit]:
project = self._get_project_browse_path_name(datasource)

# if condition is needed as graphQL return "columns": None
columns: List[Dict[Any, Any]] = (
columns = (
cast(List[Dict[Any, Any]], csql.get(c.COLUMNS))
if c.COLUMNS in csql and csql.get(c.COLUMNS) is not None
else []
Expand Down Expand Up @@ -2420,9 +2435,7 @@ def emit_sheets_as_charts(
last_modified = self.get_last_modified(creator, created_at, updated_at)

if sheet.get(c.PATH):
site_part = (
f"/site/{self.site.content_url}" if self.site else ""
)
site_part = f"/site/{self.site.content_url}" if self.site else ""
sheet_external_url = (
f"{self.config.connect_uri}/#{site_part}/views/{sheet.get(c.PATH)}"
)
Expand All @@ -2433,9 +2446,7 @@ def emit_sheets_as_charts(
and sheet[c.CONTAINED_IN_DASHBOARDS][0].get(c.PATH)
):
# sheet contained in dashboard
site_part = (
f"/t/{self.site.content_url}" if self.site else ""
)
site_part = f"/t/{self.site.content_url}" if self.site else ""
dashboard_path = sheet[c.CONTAINED_IN_DASHBOARDS][0][c.PATH]
sheet_external_url = f"{self.config.connect_uri}{site_part}/authoring/{dashboard_path}/{sheet.get(c.NAME, '')}"
else:
Expand Down Expand Up @@ -2567,9 +2578,7 @@ def emit_workbook_as_container(self, workbook: Dict) -> Iterable[MetadataWorkUni
else None
)

site_part = (
f"/site/{self.site.content_url}" if self.site else ""
)
site_part = f"/site/{self.site.content_url}" if self.site else ""
workbook_uri = workbook.get("uri")
workbook_part = (
workbook_uri[workbook_uri.index("/workbooks/") :] if workbook_uri else None
Expand Down Expand Up @@ -2728,9 +2737,7 @@ def emit_dashboard(
updated_at = dashboard.get(c.UPDATED_AT, datetime.now())
last_modified = self.get_last_modified(creator, created_at, updated_at)

site_part = (
f"/site/{self.site.content_url}" if self.site else ""
)
site_part = f"/site/{self.site.content_url}" if self.site else ""
dashboard_external_url = (
f"{self.config.connect_uri}/#{site_part}/views/{dashboard.get(c.PATH, '')}"
)
Expand Down Expand Up @@ -2878,11 +2885,7 @@ def emit_project_containers(self) -> Iterable[MetadataWorkUnit]:
parent_container_key: Optional[ContainerKey] = None
if project.parent_id:
parent_container_key = self.gen_project_key(project.parent_id)
elif (
self.config.add_site_container
and self.site
and self.site.id
):
elif self.config.add_site_container and self.site and self.site.id:
parent_container_key = self.gen_site_key(self.site.id)

yield from gen_containers(
Expand Down

0 comments on commit 485262b

Please sign in to comment.