Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added debug notebook companion to troubleshoot the installation #191

Merged
merged 8 commits into from
Sep 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
149 changes: 113 additions & 36 deletions src/databricks/labs/ucx/install.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,52 @@
from databricks.labs.ucx.runtime import main
from databricks.labs.ucx.tasks import _TASKS

TAG_STEP = "step"
TAG_APP = "App"

DEBUG_NOTEBOOK = """
# Databricks notebook source
# MAGIC %md
# MAGIC # Debug companion for UCX installation (see [README]({readme_link}))
# MAGIC
# MAGIC Production runs are supposed to be triggered through the following jobs: {job_links}
# MAGIC
# MAGIC **This notebook is overwritten with each UCX update/(re)install.**

# COMMAND ----------

# MAGIC %pip install /Workspace{remote_wheel}
dbutils.library.restartPython()

# COMMAND ----------

import logging
from pathlib import Path
from databricks.labs.ucx.__about__ import __version__
from databricks.labs.ucx.config import MigrationConfig
from databricks.labs.ucx import logger
from databricks.sdk import WorkspaceClient

logger._install()
logging.getLogger("databricks").setLevel("DEBUG")

cfg = MigrationConfig.from_file(Path("/Workspace{config_file}"))
ws = WorkspaceClient()

print(__version__)
"""

logger = logging.getLogger(__name__)


class Installer:
def __init__(self, ws: WorkspaceClient):
def __init__(self, ws: WorkspaceClient, *, prefix: str = "ucx", promtps: bool = True):
if "DATABRICKS_RUNTIME_VERSION" in os.environ:
msg = "Installer is not supposed to be executed in Databricks Runtime"
raise SystemExit(msg)
self._ws = ws
self._prefix = prefix
self._prompts = promtps

def run(self):
self._configure()
Expand All @@ -45,7 +82,7 @@ def _my_username(self):

@property
def _install_folder(self):
return f"/Users/{self._my_username}/.ucx"
return f"/Users/{self._my_username}/.{self._prefix}"

@property
def _config_file(self):
Expand All @@ -60,14 +97,13 @@ def _current_config(self):
return self._config

def _configure(self):
config_path = self._config_file
ws_file_url = f"{self._ws.config.host}/#workspace{config_path}"
ws_file_url = self._notebook_link(self._config_file)
try:
self._ws.workspace.get_status(config_path)
self._ws.workspace.get_status(self._config_file)
logger.info(f"UCX is already configured. See {ws_file_url}")
if self._question("Type 'yes' to open config file in the browser") == "yes":
if self._prompts and self._question("Type 'yes' to open config file in the browser") == "yes":
webbrowser.open(ws_file_url)
return config_path
return
except DatabricksError as err:
if err.error_code != "RESOURCE_DOES_NOT_EXIST":
raise err
Expand All @@ -84,41 +120,55 @@ def _configure(self):
num_threads=int(self._question("Number of threads", default="8")),
)

config_bytes = yaml.dump(self._config.as_dict()).encode("utf8")
self._ws.workspace.upload(config_path, config_bytes, format=ImportFormat.AUTO)
logger.info(f"Created configuration file: {config_path}")
if self._question("Open config file in the browser and continue installing?", default="yes") == "yes":
self._write_config()
msg = "Open config file in the browser and continue installing?"
if self._prompts and self._question(msg, default="yes") == "yes":
webbrowser.open(ws_file_url)

def _write_config(self):
try:
self._ws.workspace.get_status(self._install_folder)
except DatabricksError as err:
if err.error_code != "RESOURCE_DOES_NOT_EXIST":
raise err
logger.debug(f"Creating install folder: {self._install_folder}")
self._ws.workspace.mkdirs(self._install_folder)

config_bytes = yaml.dump(self._config.as_dict()).encode("utf8")
logger.info(f"Creating configuration file: {self._config_file}")
self._ws.workspace.upload(self._config_file, config_bytes, format=ImportFormat.AUTO)

def _create_jobs(self):
logger.debug(f"Creating jobs from tasks in {main.__name__}")
dbfs_path = self._upload_wheel()
deployed_steps = self._deployed_steps()
remote_wheel = self._upload_wheel()
self._deployed_steps = self._deployed_steps()
desired_steps = {t.workflow for t in _TASKS.values()}
for step_name in desired_steps:
settings = self._job_settings(step_name, dbfs_path)
if step_name in deployed_steps:
job_id = deployed_steps[step_name]
settings = self._job_settings(step_name, remote_wheel)
if step_name in self._deployed_steps:
job_id = self._deployed_steps[step_name]
logger.info(f"Updating configuration for step={step_name} job_id={job_id}")
self._ws.jobs.reset(job_id, jobs.JobSettings(**settings))
else:
logger.info(f"Creating new job configuration for step={step_name}")
deployed_steps[step_name] = self._ws.jobs.create(**settings).job_id
self._deployed_steps[step_name] = self._ws.jobs.create(**settings).job_id

for step_name, job_id in deployed_steps.items():
for step_name, job_id in self._deployed_steps.items():
if step_name not in desired_steps:
logger.info(f"Removing job_id={job_id}, as it is no longer needed")
self._ws.jobs.delete(job_id)

self._create_readme(deployed_steps)
self._create_readme()
self._create_debug(remote_wheel)

def _create_readme(self, deployed_steps):
def _create_readme(self):
md = [
"# UCX - The Unity Catalog Migration Assistant",
"Here are the descriptions of jobs that trigger various stages of migration.",
f'To troubleshoot, see [debug notebook]({self._notebook_link(f"{self._install_folder}/DEBUG.py")}).',
]
for step_name, job_id in deployed_steps.items():
md.append(f"## [[UCX] {step_name}]({self._ws.config.host}#job/{job_id})\n")
for step_name, job_id in self._deployed_steps.items():
md.append(f"## [[{self._prefix.upper()}] {step_name}]({self._ws.config.host}#job/{job_id})\n")
for t in _TASKS.values():
if t.workflow != step_name:
continue
Expand All @@ -129,12 +179,31 @@ def _create_readme(self, deployed_steps):
intro = "\n".join(preamble + [f"# MAGIC {line}" for line in md])
path = f"{self._install_folder}/README.py"
self._ws.workspace.upload(path, intro.encode("utf8"), overwrite=True)
url = f"{self._ws.config.host}/#workspace{path}"
logger.info(f"Created notebook with job overview: {url}")
url = self._notebook_link(path)
logger.info(f"Created README notebook with job overview: {url}")
msg = "Type 'yes' to open job overview in README notebook in your home directory"
if self._question(msg) == "yes":
if self._prompts and self._question(msg) == "yes":
webbrowser.open(url)

def _create_debug(self, remote_wheel: str):
readme_link = self._notebook_link(f"{self._install_folder}/README.py")
job_links = ", ".join(
f"[[{self._prefix.upper()}] {step_name}]({self._ws.config.host}#job/{job_id})"
for step_name, job_id in self._deployed_steps.items()
)
path = f"{self._install_folder}/DEBUG.py"
logger.debug(f"Created debug notebook: {self._notebook_link(path)}")
self._ws.workspace.upload(
path,
DEBUG_NOTEBOOK.format(
remote_wheel=remote_wheel, readme_link=readme_link, job_links=job_links, config_file=self._config_file
).encode("utf8"),
overwrite=True,
)

def _notebook_link(self, path: str) -> str:
return f"{self._ws.config.host}/#workspace{path}"

@staticmethod
def _question(text: str, *, default: str | None = None) -> str:
default_help = "" if default is None else f"\033[36m (default: {default})\033[0m"
Expand All @@ -146,14 +215,20 @@ def _question(text: str, *, default: str | None = None) -> str:
return default
return res

def _upload_wheel(self):
def _upload_wheel(self) -> str:
with tempfile.TemporaryDirectory() as tmp_dir:
wheel = self._build_wheel(tmp_dir)
dbfs_path = f"{self._install_folder}/wheels/{wheel.name}"
with wheel.open("rb") as f:
logger.info(f"Uploading wheel to dbfs:{dbfs_path}")
self._ws.dbfs.upload(dbfs_path, f, overwrite=True)
return dbfs_path
local_wheel = self._build_wheel(tmp_dir)
remote_wheel = f"{self._install_folder}/wheels/{local_wheel.name}"
remote_dirname = os.path.dirname(remote_wheel)
with local_wheel.open("rb") as f:
self._ws.dbfs.mkdirs(remote_dirname)
logger.info(f"Uploading wheel to dbfs:{remote_wheel}")
self._ws.dbfs.upload(remote_wheel, f, overwrite=True)
with local_wheel.open("rb") as f:
self._ws.workspace.mkdirs(remote_dirname)
logger.info(f"Uploading wheel to /Workspace{remote_wheel}")
self._ws.workspace.upload(remote_wheel, f, overwrite=True, format=ImportFormat.AUTO)
return remote_wheel

def _job_settings(self, step_name, dbfs_path):
config_file = f"/Workspace/{self._install_folder}/config.yml"
Expand All @@ -164,8 +239,8 @@ def _job_settings(self, step_name, dbfs_path):
)
tasks = sorted([t for t in _TASKS.values() if t.workflow == step_name], key=lambda _: _.name)
return {
"name": f"[UCX] {step_name}",
"tags": {"App": "ucx", "step": step_name},
"name": f"[{self._prefix.upper()}] {step_name}",
"tags": {TAG_APP: self._prefix, TAG_STEP: step_name},
"job_clusters": self._job_clusters({t.job_cluster for t in tasks}),
"email_notifications": email_notifications,
"tasks": [
Expand Down Expand Up @@ -210,6 +285,7 @@ def _job_clusters(self, names: set[str]):
spec,
data_security_mode=compute.DataSecurityMode.LEGACY_TABLE_ACL,
spark_conf={"spark.databricks.acl.sqlOnly": "true"},
num_workers=1, # ShowPermissionsCommand needs a worker
custom_tags={},
),
)
Expand Down Expand Up @@ -270,13 +346,14 @@ def _cluster_node_type(self, spec: compute.ClusterSpec) -> compute.ClusterSpec:

def _deployed_steps(self):
deployed_steps = {}
logger.debug(f"Fetching all jobs to determine already deployed steps for app={self._prefix}")
for j in self._ws.jobs.list():
tags = j.settings.tags
if tags is None:
continue
if tags.get("App", None) != "ucx":
if tags.get(TAG_APP, None) != self._prefix:
continue
deployed_steps[tags.get("step", "_")] = j.job_id
deployed_steps[tags.get(TAG_STEP, "_")] = j.job_id
return deployed_steps


Expand Down
1 change: 1 addition & 0 deletions src/databricks/labs/ucx/providers/mixins/fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,7 @@ def create(
kwargs["roles"] = _scim_values(roles)
if entitlements is not None:
kwargs["entitlements"] = _scim_values(entitlements)
# TODO: REQUEST_LIMIT_EXCEEDED: GetUserPermissionsRequest RPC token bucket limit has been exceeded.
return interface.create(**kwargs)

yield from factory(name, create, lambda item: interface.delete(item.id))
Expand Down
5 changes: 0 additions & 5 deletions src/databricks/labs/ucx/runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,10 @@
from databricks.sdk import WorkspaceClient

from databricks.labs.ucx.config import MigrationConfig
from databricks.labs.ucx.logger import _install
from databricks.labs.ucx.tasks import task, trigger
from databricks.labs.ucx.toolkits.group_migration import GroupMigrationToolkit
from databricks.labs.ucx.toolkits.table_acls import TaclToolkit

_install()

logging.root.setLevel("INFO")

logger = logging.getLogger(__name__)


Expand Down
9 changes: 7 additions & 2 deletions src/databricks/labs/ucx/tacl/_internal.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,11 @@ def __init__(self, ws: WorkspaceClient, warehouse_id):
self._warehouse_id = warehouse_id

def execute(self, sql):
logger.debug(f"[api][execute] {sql}")
self._sql.execute(self._warehouse_id, sql)

def fetch(self, sql) -> Iterator[any]:
logger.debug(f"[api][fetch] {sql}")
return self._sql.execute_fetch_all(self._warehouse_id, sql)


Expand All @@ -43,9 +45,11 @@ def __init__(self):
self._spark = SparkSession.builder.getOrCreate()

def execute(self, sql):
logger.debug(f"[spark][execute] {sql}")
self._spark.sql(sql)

def fetch(self, sql) -> Iterator[any]:
logger.debug(f"[spark][fetch] {sql}")
return self._spark.sql(sql).collect()


Expand Down Expand Up @@ -160,6 +164,7 @@ def _snapshot(self, klass, fetcher, loader) -> list[any]:
logger.debug(f"[{self._full_name}] crawling new batch for {self._table}")
loaded_records = list(loader())
if len(loaded_records) > 0:
logger.debug(f"[{self._full_name}] found {len(loaded_records)} new records for {self._table}")
self._append_records(klass, loaded_records)
loaded = True

Expand Down Expand Up @@ -230,10 +235,10 @@ def _append_records(self, klass, records: Iterator[any]):
logger.debug(f"[{self._full_name}] not found. creating")
schema = ", ".join(f"{f.name} {self._field_type(f)}" for f in fields)
try:
ddl = f"CREATE TABLE {self._full_name} ({schema}) USING DELTA"
self._exec(ddl)
self._exec(f"CREATE TABLE {self._full_name} ({schema}) USING DELTA")
except Exception as e:
schema_not_found = "SCHEMA_NOT_FOUND" in str(e)
if not schema_not_found:
raise e
logger.debug(f"[{self._catalog}.{self._schema}] not found. creating")
self._exec(f"CREATE SCHEMA {self._catalog}.{self._schema}")
6 changes: 5 additions & 1 deletion src/databricks/labs/ucx/tacl/grants.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,11 @@ def _crawl(self, catalog: str, database: str) -> list[Grant]:
tasks.append(partial(fn, view=table.name))
else:
tasks.append(partial(fn, table=table.name))
return [grant for grants in ThreadedExecution.gather("listing grants", tasks) for grant in grants]
return [
grant
for grants in ThreadedExecution.gather(f"listing grants for {catalog}.{database}", tasks)
for grant in grants
]

def _grants(
self,
Expand Down
2 changes: 1 addition & 1 deletion src/databricks/labs/ucx/tacl/tables.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ def _crawl(self, catalog: str, database: str) -> list[Table]:
tasks = []
for _, table, _is_tmp in self._fetch(f"SHOW TABLES FROM {catalog}.{database}"):
tasks.append(partial(self._describe, catalog, database, table))
return ThreadedExecution.gather("listing tables", tasks)
return ThreadedExecution.gather(f"listing tables in {catalog}.{database}", tasks)

def _describe(self, catalog: str, database: str, table: str) -> Table:
"""Fetches metadata like table type, data format, external table location,
Expand Down
3 changes: 3 additions & 0 deletions src/databricks/labs/ucx/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from pathlib import Path

from databricks.labs.ucx.config import MigrationConfig
from databricks.labs.ucx.logger import _install

_TASKS: dict[str, "Task"] = {}

Expand Down Expand Up @@ -69,6 +70,8 @@ def trigger(*argv):
current_task = _TASKS[task_name]
print(current_task.doc)

_install()

cfg = MigrationConfig.from_file(Path(args["config"]))
logging.getLogger("databricks").setLevel(cfg.log_level)

Expand Down
Loading