Skip to content

Commit

Permalink
Merge branch 'main' into feat/creat_account_groups
Browse files Browse the repository at this point in the history
  • Loading branch information
william-conti authored Jan 22, 2024
2 parents 49f7011 + 87f3c63 commit ceb989a
Show file tree
Hide file tree
Showing 40 changed files with 1,257 additions and 739 deletions.
630 changes: 589 additions & 41 deletions pyproject.toml

Large diffs are not rendered by default.

3 changes: 1 addition & 2 deletions src/databricks/labs/ucx/account.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ def _configured_workspaces(self):
def _get_cloud(self) -> str:
if self._ac.config.is_azure:
return "azure"
elif self._ac.config.is_gcp:
if self._ac.config.is_gcp:
return "gcp"
return "aws"

Expand Down Expand Up @@ -77,7 +77,6 @@ def sync_workspace_info(self):
Create a json dump for each Workspace in account
For each user that has ucx installed in their workspace,
upload the json dump of workspace info in the .ucx folder
:return:
"""
workspaces = []
for workspace in self._configured_workspaces():
Expand Down
39 changes: 10 additions & 29 deletions src/databricks/labs/ucx/assessment/azure.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
_azure_sp_conf_present_check,
logger,
)
from databricks.labs.ucx.assessment.jobs import JobsMixin
from databricks.labs.ucx.framework.crawlers import CrawlerBase, SqlBackend
from databricks.labs.ucx.hive_metastore.locations import ExternalLocations

Expand All @@ -45,7 +46,7 @@ class AzureServicePrincipalInfo:
storage_account: str


class AzureServicePrincipalCrawler(CrawlerBase[AzureServicePrincipalInfo]):
class AzureServicePrincipalCrawler(CrawlerBase[AzureServicePrincipalInfo], JobsMixin):
def __init__(self, ws: WorkspaceClient, sbe: SqlBackend, schema):
super().__init__(sbe, "hive_metastore", schema, "azure_service_principals", AzureServicePrincipalInfo)
self._ws = ws
Expand Down Expand Up @@ -128,26 +129,6 @@ def _get_azure_spn_list(self, config: dict) -> list:
)
return spn_list

def _get_cluster_configs_from_all_jobs(self, all_jobs, all_clusters_by_id):
for j in all_jobs:
if j.settings is not None:
if j.settings.job_clusters is not None:
for jc in j.settings.job_clusters:
if jc.new_cluster is None:
continue
yield j, jc.new_cluster

if j.settings.tasks is not None:
for t in j.settings.tasks:
if t.existing_cluster_id is not None:
interactive_cluster = all_clusters_by_id.get(t.existing_cluster_id, None)
if interactive_cluster is None:
continue
yield j, interactive_cluster

elif t.new_cluster is not None:
yield j, t.new_cluster

def _get_relevant_service_principals(self) -> list:
relevant_service_principals = []
temp_list = self._list_all_cluster_with_spn_in_spark_conf()
Expand Down Expand Up @@ -414,18 +395,18 @@ def _get_principal(self, principal_id: str) -> Principal | None:
try:
path = f"/v1.0/directoryObjects/{principal_id}"
raw: dict[str, str] = self._graph.do("GET", path) # type: ignore[assignment]
client_id = raw.get("appId")
display_name = raw.get("displayName")
object_id = raw.get("id")
assert client_id is not None
assert display_name is not None
assert object_id is not None
self._principals[principal_id] = Principal(client_id, display_name, object_id)
return self._principals[principal_id]
except NotFound:
# don't load principals from external directories twice
self._principals[principal_id] = None
return self._principals[principal_id]
client_id = raw.get("appId")
display_name = raw.get("displayName")
object_id = raw.get("id")
assert client_id is not None
assert display_name is not None
assert object_id is not None
self._principals[principal_id] = Principal(client_id, display_name, object_id)
return self._principals[principal_id]

def role_assignments(
self, resource_id: str, *, principal_types: list[str] | None = None
Expand Down
79 changes: 45 additions & 34 deletions src/databricks/labs/ucx/assessment/clusters.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,48 @@ class ClusterInfo:
creator: str | None = None


class ClustersCrawler(CrawlerBase[ClusterInfo]):
class ClustersMixin:
_ws: WorkspaceClient

def _safe_get_cluster_policy(self, policy_id: str) -> Policy | None:
try:
return self._ws.cluster_policies.get(policy_id)
except NotFound:
logger.warning(f"The cluster policy was deleted: {policy_id}")
return None

def _check_spark_conf(self, cluster, failures):
for k in INCOMPATIBLE_SPARK_CONFIG_KEYS:
if k in cluster.spark_conf:
failures.append(f"unsupported config: {k}")
for value in cluster.spark_conf.values():
if "dbfs:/mnt" in value or "/dbfs/mnt" in value:
failures.append(f"using DBFS mount in configuration: {value}")
# Checking if Azure cluster config is present in spark config
if _azure_sp_conf_present_check(cluster.spark_conf):
failures.append(f"{_AZURE_SP_CONF_FAILURE_MSG} cluster.")

def _check_cluster_policy(self, cluster, failures):
policy = self._safe_get_cluster_policy(cluster.policy_id)
if policy:
if policy.definition:
if _azure_sp_conf_present_check(json.loads(policy.definition)):
failures.append(f"{_AZURE_SP_CONF_FAILURE_MSG} cluster.")
if policy.policy_family_definition_overrides:
if _azure_sp_conf_present_check(json.loads(policy.policy_family_definition_overrides)):
failures.append(f"{_AZURE_SP_CONF_FAILURE_MSG} cluster.")

def _check_init_scripts(self, cluster, failures):
for init_script_info in cluster.init_scripts:
init_script_data = _get_init_script_data(self._ws, init_script_info)
if not init_script_data:
continue
if not _azure_sp_conf_in_init_scripts(init_script_data):
continue
failures.append(f"{_AZURE_SP_CONF_FAILURE_MSG} cluster.")


class ClustersCrawler(CrawlerBase[ClusterInfo], ClustersMixin):
def __init__(self, ws: WorkspaceClient, sbe: SqlBackend, schema):
super().__init__(sbe, "hive_metastore", schema, "clusters", ClusterInfo)
self._ws = ws
Expand Down Expand Up @@ -58,50 +99,20 @@ def _assess_clusters(self, all_clusters):
failures.append(f"not supported DBR: {cluster.spark_version}")

if cluster.spark_conf is not None:
for k in INCOMPATIBLE_SPARK_CONFIG_KEYS:
if k in cluster.spark_conf:
failures.append(f"unsupported config: {k}")

for value in cluster.spark_conf.values():
if "dbfs:/mnt" in value or "/dbfs/mnt" in value:
failures.append(f"using DBFS mount in configuration: {value}")

# Checking if Azure cluster config is present in spark config
if _azure_sp_conf_present_check(cluster.spark_conf):
failures.append(f"{_AZURE_SP_CONF_FAILURE_MSG} cluster.")
self._check_spark_conf(cluster, failures)

# Checking if Azure cluster config is present in cluster policies
if cluster.policy_id:
policy = self._safe_get_cluster_policy(cluster.policy_id)
if policy:
if policy.definition:
if _azure_sp_conf_present_check(json.loads(policy.definition)):
failures.append(f"{_AZURE_SP_CONF_FAILURE_MSG} cluster.")
if policy.policy_family_definition_overrides:
if _azure_sp_conf_present_check(json.loads(policy.policy_family_definition_overrides)):
failures.append(f"{_AZURE_SP_CONF_FAILURE_MSG} cluster.")
self._check_cluster_policy(cluster, failures)

if cluster.init_scripts:
for init_script_info in cluster.init_scripts:
init_script_data = _get_init_script_data(self._ws, init_script_info)
if not init_script_data:
continue
if not _azure_sp_conf_in_init_scripts(init_script_data):
continue
failures.append(f"{_AZURE_SP_CONF_FAILURE_MSG} cluster.")
self._check_init_scripts(cluster, failures)

cluster_info.failures = json.dumps(failures)
if len(failures) > 0:
cluster_info.success = 0
yield cluster_info

def _safe_get_cluster_policy(self, policy_id: str) -> Policy | None:
try:
return self._ws.cluster_policies.get(policy_id)
except NotFound:
logger.warning(f"The cluster policy was deleted: {policy_id}")
return None

def snapshot(self) -> Iterable[ClusterInfo]:
return self._snapshot(self._try_fetch, self._crawl)

Expand Down
7 changes: 5 additions & 2 deletions src/databricks/labs/ucx/assessment/crawlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
import logging
import re

from databricks.sdk.errors import NotFound

logger = logging.getLogger(__name__)

INCOMPATIBLE_SPARK_CONFIG_KEYS = [
Expand Down Expand Up @@ -33,16 +35,17 @@ def _get_init_script_data(w, init_script_info):
try:
data = w.dbfs.read(file_api_format_destination).data
return base64.b64decode(data).decode("utf-8")
except Exception:
except NotFound:
return None
if init_script_info.workspace:
workspace_file_destination = init_script_info.workspace.destination
if workspace_file_destination:
try:
data = w.workspace.export(workspace_file_destination).content
return base64.b64decode(data).decode("utf-8")
except Exception:
except NotFound:
return None
return None


def _azure_sp_conf_in_init_scripts(init_script_data: str) -> bool:
Expand Down
113 changes: 61 additions & 52 deletions src/databricks/labs/ucx/assessment/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,31 +28,34 @@ class JobInfo:
creator: str | None = None


class JobsCrawler(CrawlerBase[JobInfo]):
def __init__(self, ws: WorkspaceClient, sbe: SqlBackend, schema):
super().__init__(sbe, "hive_metastore", schema, "jobs", JobInfo)
self._ws = ws

class JobsMixin:
@staticmethod
def _get_cluster_configs_from_all_jobs(all_jobs, all_clusters_by_id):
for j in all_jobs:
if j.settings is not None:
if j.settings.job_clusters is not None:
for jc in j.settings.job_clusters:
if jc.new_cluster is None:
continue
yield j, jc.new_cluster

if j.settings.tasks is not None:
for t in j.settings.tasks:
if t.existing_cluster_id is not None:
interactive_cluster = all_clusters_by_id.get(t.existing_cluster_id, None)
if interactive_cluster is None:
continue
yield j, interactive_cluster

elif t.new_cluster is not None:
yield j, t.new_cluster
if j.settings is None:
continue
if j.settings.job_clusters is not None:
for jc in j.settings.job_clusters:
if jc.new_cluster is None:
continue
yield j, jc.new_cluster
if j.settings.tasks is None:
continue
for t in j.settings.tasks:
if t.existing_cluster_id is not None:
interactive_cluster = all_clusters_by_id.get(t.existing_cluster_id, None)
if interactive_cluster is None:
continue
yield j, interactive_cluster

elif t.new_cluster is not None:
yield j, t.new_cluster


class JobsCrawler(CrawlerBase[JobInfo], JobsMixin):
def __init__(self, ws: WorkspaceClient, sbe: SqlBackend, schema):
super().__init__(sbe, "hive_metastore", schema, "jobs", JobInfo)
self._ws = ws

def _crawl(self) -> Iterable[JobInfo]:
all_jobs = list(self._ws.jobs.list(expand_tasks=True))
Expand All @@ -73,17 +76,18 @@ def _assess_jobs(self, all_jobs: list[BaseJob], all_clusters_by_id) -> Iterable[
)

job_settings = job.settings
if job_settings is not None:
job_name = job_settings.name
if not job_name:
job_name = "Unknown"
job_details[job.job_id] = JobInfo(
job_id=str(job.job_id),
job_name=job_name,
creator=job.creator_user_name,
success=1,
failures="[]",
)
if not job_settings:
continue
job_name = job_settings.name
if not job_name:
job_name = "Unknown"
job_details[job.job_id] = JobInfo(
job_id=str(job.job_id),
job_name=job_name,
creator=job.creator_user_name,
success=1,
failures="[]",
)

for job, cluster_config in self._get_cluster_configs_from_all_jobs(all_jobs, all_clusters_by_id):
support_status = spark_version_compatibility(cluster_config.spark_version)
Expand All @@ -94,17 +98,7 @@ def _assess_jobs(self, all_jobs: list[BaseJob], all_clusters_by_id) -> Iterable[
job_assessment[job_id].add(f"not supported DBR: {cluster_config.spark_version}")

if cluster_config.spark_conf is not None:
for k in INCOMPATIBLE_SPARK_CONFIG_KEYS:
if k in cluster_config.spark_conf:
job_assessment[job_id].add(f"unsupported config: {k}")

for value in cluster_config.spark_conf.values():
if "dbfs:/mnt" in value or "/dbfs/mnt" in value:
job_assessment[job_id].add(f"using DBFS mount in configuration: {value}")

# Checking if Azure cluster config is present in spark config
if _azure_sp_conf_present_check(cluster_config.spark_conf):
job_assessment[job_id].add(f"{_AZURE_SP_CONF_FAILURE_MSG} Job cluster.")
self._job_spark_conf(cluster_config, job_assessment, job_id)

# Checking if Azure cluster config is present in cluster policies
if cluster_config.policy_id:
Expand All @@ -119,20 +113,35 @@ def _assess_jobs(self, all_jobs: list[BaseJob], all_clusters_by_id) -> Iterable[
job_assessment[job_id].add(f"{_AZURE_SP_CONF_FAILURE_MSG} Job cluster.")

if cluster_config.init_scripts:
for init_script_info in cluster_config.init_scripts:
init_script_data = _get_init_script_data(self._ws, init_script_info)
if not init_script_data:
continue
if not _azure_sp_conf_in_init_scripts(init_script_data):
continue
job_assessment[job_id].add(f"{_AZURE_SP_CONF_FAILURE_MSG} Job cluster.")
self._init_scripts(cluster_config, job_assessment, job_id)

for job_key in job_details.keys():
# TODO: next person looking at this - rewrite, as this code makes no sense
for job_key in job_details.keys(): # pylint: disable=consider-using-dict-items,consider-iterating-dictionary
job_details[job_key].failures = json.dumps(list(job_assessment[job_key]))
if len(job_assessment[job_key]) > 0:
job_details[job_key].success = 0
return list(job_details.values())

def _init_scripts(self, cluster_config, job_assessment, job_id):
for init_script_info in cluster_config.init_scripts:
init_script_data = _get_init_script_data(self._ws, init_script_info)
if not init_script_data:
continue
if not _azure_sp_conf_in_init_scripts(init_script_data):
continue
job_assessment[job_id].add(f"{_AZURE_SP_CONF_FAILURE_MSG} Job cluster.")

def _job_spark_conf(self, cluster_config, job_assessment, job_id):
for k in INCOMPATIBLE_SPARK_CONFIG_KEYS:
if k in cluster_config.spark_conf:
job_assessment[job_id].add(f"unsupported config: {k}")
for value in cluster_config.spark_conf.values():
if "dbfs:/mnt" in value or "/dbfs/mnt" in value:
job_assessment[job_id].add(f"using DBFS mount in configuration: {value}")
# Checking if Azure cluster config is present in spark config
if _azure_sp_conf_present_check(cluster_config.spark_conf):
job_assessment[job_id].add(f"{_AZURE_SP_CONF_FAILURE_MSG} Job cluster.")

def _safe_get_cluster_policy(self, policy_id: str) -> Policy | None:
try:
return self._ws.cluster_policies.get(policy_id)
Expand Down
Loading

0 comments on commit ceb989a

Please sign in to comment.