From 87f3c63a7a0f27ba548b62a2831677fa122b0c04 Mon Sep 17 00:00:00 2001 From: Serge Smertin <259697+nfx@users.noreply.github.com> Date: Mon, 22 Jan 2024 16:00:35 +0100 Subject: [PATCH] Cleanup code with `pylint` (#822) This PR cleans up the code according to Google Python Styleguide --- pyproject.toml | 630 ++++++++++++++++-- src/databricks/labs/ucx/account.py | 3 +- src/databricks/labs/ucx/assessment/azure.py | 39 +- .../labs/ucx/assessment/clusters.py | 79 ++- .../labs/ucx/assessment/crawlers.py | 7 +- src/databricks/labs/ucx/assessment/jobs.py | 113 ++-- src/databricks/labs/ucx/cli.py | 14 +- src/databricks/labs/ucx/config.py | 15 +- src/databricks/labs/ucx/framework/crawlers.py | 18 +- .../labs/ucx/framework/dashboards.py | 8 +- src/databricks/labs/ucx/framework/tasks.py | 175 ++--- .../labs/ucx/hive_metastore/grants.py | 10 +- .../labs/ucx/hive_metastore/hms_lineage.py | 8 +- .../labs/ucx/hive_metastore/locations.py | 75 +-- .../labs/ucx/hive_metastore/mapping.py | 11 +- .../labs/ucx/hive_metastore/table_migrate.py | 55 +- .../labs/ucx/hive_metastore/table_size.py | 8 +- .../labs/ucx/hive_metastore/tables.py | 4 +- .../labs/ucx/hive_metastore/udfs.py | 2 +- src/databricks/labs/ucx/install.py | 98 +-- src/databricks/labs/ucx/installer/__init__.py | 4 +- src/databricks/labs/ucx/mixins/fixtures.py | 27 +- src/databricks/labs/ucx/mixins/redash.py | 98 +-- src/databricks/labs/ucx/mixins/sql.py | 59 +- .../labs/ucx/workspace_access/generic.py | 23 +- .../labs/ucx/workspace_access/groups.py | 11 +- .../labs/ucx/workspace_access/listing.py | 2 +- .../labs/ucx/workspace_access/redash.py | 24 +- .../labs/ucx/workspace_access/verification.py | 4 +- .../integration/framework/test_dashboards.py | 63 -- tests/unit/assessment/__init__.py | 31 + .../assessment/clusters/no-spark-conf.json | 12 + .../policies/single-user-with-spn.json | 82 +++ tests/unit/assessment/test_clusters.py | 56 +- tests/unit/assessment/test_jobs.py | 9 +- tests/unit/framework/test_crawlers.py | 6 +- tests/unit/framework/test_tasks.py | 41 ++ tests/unit/hive_metastore/test_table_move.py | 60 +- tests/unit/test_install.py | 2 +- .../workspace_access/test_verification.py | 10 +- 40 files changed, 1257 insertions(+), 739 deletions(-) delete mode 100644 tests/integration/framework/test_dashboards.py create mode 100644 tests/unit/assessment/clusters/no-spark-conf.json create mode 100644 tests/unit/assessment/policies/single-user-with-spn.json create mode 100644 tests/unit/framework/test_tasks.py diff --git a/pyproject.toml b/pyproject.toml index 39bae8108e..74d26ca28b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -27,7 +27,7 @@ classifiers = [ "Programming Language :: Python :: Implementation :: CPython", ] dependencies = ["databricks-sdk~=0.17.0", - "databricks-labs-blueprint", + "databricks-labs-blueprint~=0.1.0", "PyYAML>=6.0.0,<7.0.0"] [project.entry-points.databricks] @@ -42,6 +42,7 @@ path = "src/databricks/labs/ucx/__about__.py" [tool.hatch.envs.default] dependencies = [ + "pylint", "coverage[toml]>=6.5", "pytest", "pytest-xdist", @@ -62,24 +63,27 @@ python="3.10" path = ".venv" [tool.hatch.envs.default.scripts] -test = "pytest -n auto --cov src --cov-report=xml --timeout 30 tests/unit --durations 20" +test = "pytest -n 4 --cov src --cov-report=xml --timeout 30 tests/unit --durations 20" coverage = "pytest -n auto --cov src tests/unit --timeout 30 --cov-report=html --durations 20" integration = "pytest -n 10 --cov src tests/integration --durations 20" fmt = ["isort .", "black .", "ruff . --fix", - "mypy ."] + "mypy .", + "pylint --output-format=colorized -j 0 src"] verify = ["black --check .", "isort . --check-only", "ruff .", - "mypy ."] + "mypy .", + "pylint --output-format=colorized -j 0 src"] [tool.isort] skip_glob = ["notebooks/*.py"] profile = "black" [tool.pytest.ini_options] -addopts = "-s -p no:warnings -vv --cache-clear" +# TODO: remove `-p no:warnings` +addopts = "--no-header -p no:warnings" cache_dir = ".venv/pytest-cache" [tool.black] @@ -91,35 +95,7 @@ skip-string-normalization = true cache-dir = ".venv/ruff-cache" target-version = "py310" line-length = 120 -select = [ - "A", - "ARG", - "B", - "C", - "E", - "EM", - "F", - "FBT", - "I", - "ICN", - "ISC", - "N", - "PLC", - "PLE", - "PLR", - "PLW", - "Q", - "RUF", - "S", - "T", - "TID", - "UP", - "W", - "YTT", -] ignore = [ - # Allow non-abstract empty methods in abstract base classes - "B027", # Allow boolean positional values in function calls, like `dict.get(... True)` "FBT003", # Ignore checks for possible passwords and SQL statement construction @@ -128,17 +104,20 @@ ignore = [ "T201", # Allow asserts "S101", - # Allow standard random generators - "S311", # Ignore complexity - "C901", "PLR0911", "PLR0912", "PLR0913", "PLR0915", - # Ignore flaky Import block is un-sorted or un-formatted - "I001", + "C901", + # Too many return statements + "PLR0911", + # Too many branches + "PLR0912", + # Too many arguments in function definition + "PLR0913", + # Too many statements + "PLR0915", # Ignore Exception must not use a string literal, assign to variable first "EM101", -] -extend-exclude = [ - "notebooks/*.py" + # Ignore the error message will be duplicated in the traceback, which can make the traceback less readable. + "EM102", ] [tool.ruff.isort] @@ -149,6 +128,8 @@ ban-relative-imports = "all" [tool.ruff.per-file-ignores] +"src/databricks/labs/ucx/mixins/*" = ["S311"] + "tests/**/*" = [ "PLR2004", "S101", "TID252", # tests can use magic values, assertions, and relative imports "ARG001" # tests may not use the provided fixtures @@ -167,3 +148,570 @@ exclude_lines = [ "if __name__ == .__main__.:", "if TYPE_CHECKING:", ] + +[tool.pylint.main] +# PyLint configuration is adapted from Google Python Style Guide with modifications. +# Sources https://google.github.io/styleguide/pylintrc +# License: https://github.com/google/styleguide/blob/gh-pages/LICENSE + +# Analyse import fallback blocks. This can be used to support both Python 2 and 3 +# compatible code, which means that the block might have code that exists only in +# one or another interpreter, leading to false positives when analysed. +# analyse-fallback-blocks = + +# Clear in-memory caches upon conclusion of linting. Useful if running pylint in +# a server-like mode. +# clear-cache-post-run = + +# Always return a 0 (non-error) status code, even if lint errors are found. This +# is primarily useful in continuous integration scripts. +# exit-zero = + +# A comma-separated list of package or module names from where C extensions may +# be loaded. Extensions are loading into the active Python interpreter and may +# run arbitrary code. +# extension-pkg-allow-list = + +# A comma-separated list of package or module names from where C extensions may +# be loaded. Extensions are loading into the active Python interpreter and may +# run arbitrary code. (This is an alternative name to extension-pkg-allow-list +# for backward compatibility.) +# extension-pkg-whitelist = + +# Return non-zero exit code if any of these messages/categories are detected, +# even if score is above --fail-under value. Syntax same as enable. Messages +# specified are enabled, while categories only check already-enabled messages. +# fail-on = + +# Specify a score threshold under which the program will exit with error. +fail-under = 10.0 + +# Interpret the stdin as a python script, whose filename needs to be passed as +# the module_or_package argument. +# from-stdin = + +# Add files or directories matching the regular expressions patterns to the +# ignore-list. The regex matches against paths and can be in Posix or Windows +# format. Because '\\' represents the directory delimiter on Windows systems, it +# can't be used as an escape character. +# ignore-paths = + +# Files or directories matching the regular expression patterns are skipped. The +# regex matches against base names, not paths. The default value ignores Emacs +# file locks +ignore-patterns = ["^\\.#"] + +# List of module names for which member attributes should not be checked (useful +# for modules/projects where namespaces are manipulated during runtime and thus +# existing member attributes cannot be deduced by static analysis). It supports +# qualified module names, as well as Unix pattern matching. +# ignored-modules = + +# Python code to execute, usually for sys.path manipulation such as +# pygtk.require(). +# init-hook = + +# Use multiple processes to speed up Pylint. Specifying 0 will auto-detect the +# number of processors available to use, and will cap the count on Windows to +# avoid hangs. +# jobs = + +# Control the amount of potential inferred values when inferring a single object. +# This can help the performance when dealing with large functions or complex, +# nested conditions. +limit-inference-results = 100 + +# List of plugins (as comma separated values of python module names) to load, +# usually to register additional checkers. +load-plugins = ["pylint.extensions.check_elif", "pylint.extensions.bad_builtin", "pylint.extensions.docparams", "pylint.extensions.for_any_all", "pylint.extensions.set_membership", "pylint.extensions.code_style", "pylint.extensions.overlapping_exceptions", "pylint.extensions.typing", "pylint.extensions.redefined_variable_type", "pylint.extensions.comparison_placement", "pylint.extensions.broad_try_clause", "pylint.extensions.dict_init_mutate", "pylint.extensions.consider_refactoring_into_while_condition"] + +# Pickle collected data for later comparisons. +persistent = true + +# Minimum Python version to use for version dependent checks. Will default to the +# version used to run pylint. +py-version = "3.10" + +# Discover python modules and packages in the file system subtree. +# recursive = + +# Add paths to the list of the source roots. Supports globbing patterns. The +# source root is an absolute path or a path relative to the current working +# directory used to determine a package namespace for modules located under the +# source root. +# source-roots = + +# When enabled, pylint would attempt to guess common misconfiguration and emit +# user-friendly hints instead of false-positive error messages. +suggestion-mode = true + +# Allow loading of arbitrary C extensions. Extensions are imported into the +# active Python interpreter and may run arbitrary code. +# unsafe-load-any-extension = + +[tool.pylint.basic] +# Naming style matching correct argument names. +argument-naming-style = "snake_case" + +# Regular expression matching correct argument names. Overrides argument-naming- +# style. If left empty, argument names will be checked with the set naming style. +argument-rgx = "[a-z_][a-z0-9_]{2,30}$" + +# Naming style matching correct attribute names. +attr-naming-style = "snake_case" + +# Regular expression matching correct attribute names. Overrides attr-naming- +# style. If left empty, attribute names will be checked with the set naming +# style. +attr-rgx = "[a-z_][a-z0-9_]{2,}$" + +# Bad variable names which should always be refused, separated by a comma. +bad-names = ["foo", "bar", "baz", "toto", "tutu", "tata"] + +# Bad variable names regexes, separated by a comma. If names match any regex, +# they will always be refused +# bad-names-rgxs = + +# Naming style matching correct class attribute names. +class-attribute-naming-style = "any" + +# Regular expression matching correct class attribute names. Overrides class- +# attribute-naming-style. If left empty, class attribute names will be checked +# with the set naming style. +class-attribute-rgx = "([A-Za-z_][A-Za-z0-9_]{2,30}|(__.*__))$" + +# Naming style matching correct class constant names. +class-const-naming-style = "UPPER_CASE" + +# Regular expression matching correct class constant names. Overrides class- +# const-naming-style. If left empty, class constant names will be checked with +# the set naming style. +# class-const-rgx = + +# Naming style matching correct class names. +class-naming-style = "PascalCase" + +# Regular expression matching correct class names. Overrides class-naming-style. +# If left empty, class names will be checked with the set naming style. +class-rgx = "[A-Z_][a-zA-Z0-9]+$" + +# Naming style matching correct constant names. +const-naming-style = "UPPER_CASE" + +# Regular expression matching correct constant names. Overrides const-naming- +# style. If left empty, constant names will be checked with the set naming style. +const-rgx = "(([A-Z_][A-Z0-9_]*)|(__.*__))$" + +# Minimum line length for functions/classes that require docstrings, shorter ones +# are exempt. +docstring-min-length = -1 + +# Naming style matching correct function names. +function-naming-style = "snake_case" + +# Regular expression matching correct function names. Overrides function-naming- +# style. If left empty, function names will be checked with the set naming style. +function-rgx = "[a-z_][a-z0-9_]{2,30}$" + +# Good variable names which should always be accepted, separated by a comma. +good-names = ["i", "j", "k", "ex", "Run", "_"] + +# Good variable names regexes, separated by a comma. If names match any regex, +# they will always be accepted +# good-names-rgxs = + +# Include a hint for the correct naming format with invalid-name. +# include-naming-hint = + +# Naming style matching correct inline iteration names. +inlinevar-naming-style = "any" + +# Regular expression matching correct inline iteration names. Overrides +# inlinevar-naming-style. If left empty, inline iteration names will be checked +# with the set naming style. +inlinevar-rgx = "[A-Za-z_][A-Za-z0-9_]*$" + +# Naming style matching correct method names. +method-naming-style = "snake_case" + +# Regular expression matching correct method names. Overrides method-naming- +# style. If left empty, method names will be checked with the set naming style. +method-rgx = "[a-z_][a-z0-9_]{2,}$" + +# Naming style matching correct module names. +module-naming-style = "snake_case" + +# Regular expression matching correct module names. Overrides module-naming- +# style. If left empty, module names will be checked with the set naming style. +module-rgx = "(([a-z_][a-z0-9_]*)|([A-Z][a-zA-Z0-9]+))$" + +# Colon-delimited sets of names that determine each other's naming style when the +# name regexes allow several styles. +# name-group = + +# Regular expression which should only match function or class names that do not +# require a docstring. +no-docstring-rgx = "__.*__" + +# List of decorators that produce properties, such as abc.abstractproperty. Add +# to this list to register other decorators that produce valid properties. These +# decorators are taken in consideration only for invalid-name. +property-classes = ["abc.abstractproperty"] + +# Regular expression matching correct type alias names. If left empty, type alias +# names will be checked with the set naming style. +# typealias-rgx = + +# Regular expression matching correct type variable names. If left empty, type +# variable names will be checked with the set naming style. +# typevar-rgx = + +# Naming style matching correct variable names. +variable-naming-style = "snake_case" + +# Regular expression matching correct variable names. Overrides variable-naming- +# style. If left empty, variable names will be checked with the set naming style. +variable-rgx = "[a-z_][a-z0-9_]{2,30}$" + +[tool.pylint.broad_try_clause] +# Maximum number of statements allowed in a try clause +max-try-statements = 7 + +[tool.pylint.classes] +# Warn about protected attribute access inside special methods +# check-protected-access-in-special-methods = + +# List of method names used to declare (i.e. assign) instance attributes. +defining-attr-methods = ["__init__", "__new__", "setUp", "__post_init__"] + +# List of member names, which should be excluded from the protected access +# warning. +exclude-protected = ["_asdict", "_fields", "_replace", "_source", "_make"] + +# List of valid names for the first argument in a class method. +valid-classmethod-first-arg = ["cls"] + +# List of valid names for the first argument in a metaclass class method. +valid-metaclass-classmethod-first-arg = ["mcs"] + +[tool.pylint.deprecated_builtins] +# List of builtins function names that should not be used, separated by a comma +bad-functions = ["map", "input"] + +[tool.pylint.design] +# List of regular expressions of class ancestor names to ignore when counting +# public methods (see R0903) +# exclude-too-few-public-methods = + +# List of qualified class names to ignore when counting class parents (see R0901) +# ignored-parents = + +# Maximum number of arguments for function / method. +max-args = 9 + +# Maximum number of attributes for a class (see R0902). +max-attributes = 11 + +# Maximum number of boolean expressions in an if statement (see R0916). +max-bool-expr = 5 + +# Maximum number of branch for function / method body. +max-branches = 20 + +# Maximum number of locals for function / method body. +max-locals = 19 + +# Maximum number of parents for a class (see R0901). +max-parents = 7 + +# Maximum number of public methods for a class (see R0904). +max-public-methods = 20 + +# Maximum number of return / yield for function / method body. +max-returns = 11 + +# Maximum number of statements in function / method body. +max-statements = 50 + +# Minimum number of public methods for a class (see R0903). +min-public-methods = 2 + +[tool.pylint.exceptions] +# Exceptions that will emit a warning when caught. +overgeneral-exceptions = ["builtins.Exception"] + +[tool.pylint.format] +# Expected format of line ending, e.g. empty (any line ending), LF or CRLF. +# expected-line-ending-format = + +# Regexp for a line that is allowed to be longer than the limit. +ignore-long-lines = "^\\s*(# )??$" + +# Number of spaces of indent required inside a hanging or continued line. +indent-after-paren = 4 + +# String used as indentation unit. This is usually " " (4 spaces) or "\t" (1 +# tab). +indent-string = " " + +# Maximum number of characters on a single line. +max-line-length = 100 + +# Maximum number of lines in a module. +max-module-lines = 2000 + +# Allow the body of a class to be on the same line as the declaration if body +# contains single statement. +# single-line-class-stmt = + +# Allow the body of an if to be on the same line as the test if there is no else. +# single-line-if-stmt = + +[tool.pylint.imports] +# List of modules that can be imported at any level, not just the top level one. +# allow-any-import-level = + +# Allow explicit reexports by alias from a package __init__. +# allow-reexport-from-package = + +# Allow wildcard imports from modules that define __all__. +# allow-wildcard-with-all = + +# Deprecated modules which should not be used, separated by a comma. +deprecated-modules = ["regsub", "TERMIOS", "Bastion", "rexec"] + +# Output a graph (.gv or any supported image format) of external dependencies to +# the given file (report RP0402 must not be disabled). +# ext-import-graph = + +# Output a graph (.gv or any supported image format) of all (i.e. internal and +# external) dependencies to the given file (report RP0402 must not be disabled). +# import-graph = + +# Output a graph (.gv or any supported image format) of internal dependencies to +# the given file (report RP0402 must not be disabled). +# int-import-graph = + +# Force import order to recognize a module as part of the standard compatibility +# libraries. +# known-standard-library = + +# Force import order to recognize a module as part of a third party library. +known-third-party = ["enchant"] + +# Couples of modules and preferred modules, separated by a comma. +# preferred-modules = + +[tool.pylint.logging] +# The type of string formatting that logging methods do. `old` means using % +# formatting, `new` is for `{}` formatting. +logging-format-style = "old" + +# Logging modules to check that the string format arguments are in logging +# function parameter format. +logging-modules = ["logging"] + +[tool.pylint."messages control"] +# Only show warnings with the listed confidence levels. Leave empty to show all. +# Valid levels: HIGH, CONTROL_FLOW, INFERENCE, INFERENCE_FAILURE, UNDEFINED. +confidence = ["HIGH", "CONTROL_FLOW", "INFERENCE", "INFERENCE_FAILURE", "UNDEFINED"] + +# Disable the message, report, category or checker with the given id(s). You can +# either give multiple identifiers separated by comma (,) or put this option +# multiple times (only on the command line, not in the configuration file where +# it should appear only once). You can also use "--disable=all" to disable +# everything first and then re-enable specific checks. For example, if you want +# to run only the similarities checker, you can use "--disable=all +# --enable=similarities". If you want to run only the classes checker, but have +# no Warning level messages displayed, use "--disable=all --enable=classes +# --disable=W". +disable = ["raw-checker-failed", "bad-inline-option", "locally-disabled", "file-ignored", "suppressed-message", "deprecated-pragma", "use-implicit-booleaness-not-comparison-to-string", "use-implicit-booleaness-not-comparison-to-zero", "consider-using-augmented-assign", "prefer-typing-namedtuple", "attribute-defined-outside-init", "invalid-name", "missing-module-docstring", "missing-class-docstring", "missing-function-docstring", "protected-access", "too-few-public-methods", "line-too-long", "too-many-lines", "trailing-whitespace", "missing-final-newline", "trailing-newlines", "bad-indentation", "unnecessary-semicolon", "multiple-statements", "superfluous-parens", "mixed-line-endings", "unexpected-line-ending-format", "fixme", "consider-using-assignment-expr", "logging-fstring-interpolation", "consider-using-any-or-all"] + +# Enable the message, report, category or checker with the given id(s). You can +# either give multiple identifier separated by comma (,) or put this option +# multiple time (only on the command line, not in the configuration file where it +# should appear only once). See also the "--disable" option for examples. +enable = ["useless-suppression", "use-symbolic-message-instead"] + +[tool.pylint.method_args] +# List of qualified names (i.e., library.method) which require a timeout +# parameter e.g. 'requests.api.get,requests.api.post' +timeout-methods = ["requests.api.delete", "requests.api.get", "requests.api.head", "requests.api.options", "requests.api.patch", "requests.api.post", "requests.api.put", "requests.api.request"] + +[tool.pylint.miscellaneous] +# List of note tags to take in consideration, separated by a comma. +notes = ["FIXME", "XXX", "TODO"] + +# Regular expression of note tags to take in consideration. +# notes-rgx = + +[tool.pylint.parameter_documentation] +# Whether to accept totally missing parameter documentation in the docstring of a +# function that has parameters. +accept-no-param-doc = true + +# Whether to accept totally missing raises documentation in the docstring of a +# function that raises an exception. +accept-no-raise-doc = true + +# Whether to accept totally missing return documentation in the docstring of a +# function that returns a statement. +accept-no-return-doc = true + +# Whether to accept totally missing yields documentation in the docstring of a +# generator. +accept-no-yields-doc = true + +# If the docstring type cannot be guessed the specified docstring type will be +# used. +default-docstring-type = "default" + +[tool.pylint.refactoring] +# Maximum number of nested blocks for function / method body +max-nested-blocks = 5 + +# Complete name of functions that never returns. When checking for inconsistent- +# return-statements if a never returning function is called then it will be +# considered as an explicit return statement and no message will be printed. +never-returning-functions = ["sys.exit", "argparse.parse_error"] + +[tool.pylint.reports] +# Python expression which should return a score less than or equal to 10. You +# have access to the variables 'fatal', 'error', 'warning', 'refactor', +# 'convention', and 'info' which contain the number of messages in each category, +# as well as 'statement' which is the total number of statements analyzed. This +# score is used by the global evaluation report (RP0004). +evaluation = "max(0, 0 if fatal else 10.0 - ((float(5 * error + warning + refactor + convention) / statement) * 10))" + +# Template used to display messages. This is a python new-style format string +# used to format the message information. See doc for all details. +# msg-template = + +# Set the output format. Available formats are: text, parseable, colorized, json2 +# (improved json format), json (old json format) and msvs (visual studio). You +# can also give a reporter class, e.g. mypackage.mymodule.MyReporterClass. +# output-format = + +# Tells whether to display a full report or only the messages. +# reports = + +# Activate the evaluation score. +score = true + +[tool.pylint.similarities] +# Comments are removed from the similarity computation +ignore-comments = true + +# Docstrings are removed from the similarity computation +ignore-docstrings = true + +# Imports are removed from the similarity computation +ignore-imports = true + +# Signatures are removed from the similarity computation +ignore-signatures = true + +# Minimum lines number of a similarity. +min-similarity-lines = 6 + +[tool.pylint.spelling] +# Limits count of emitted suggestions for spelling mistakes. +max-spelling-suggestions = 2 + +# Spelling dictionary name. No available dictionaries : You need to install both +# the python package and the system dependency for enchant to work. +# spelling-dict = + +# List of comma separated words that should be considered directives if they +# appear at the beginning of a comment and should not be checked. +spelling-ignore-comment-directives = "fmt: on,fmt: off,noqa:,noqa,nosec,isort:skip,mypy:,pragma:,# noinspection" + +# List of comma separated words that should not be checked. +# spelling-ignore-words = + +# A path to a file that contains the private dictionary; one word per line. +spelling-private-dict-file = ".pyenchant_pylint_custom_dict.txt" + +# Tells whether to store unknown words to the private dictionary (see the +# --spelling-private-dict-file option) instead of raising a message. +# spelling-store-unknown-words = + +[tool.pylint.typecheck] +# List of decorators that produce context managers, such as +# contextlib.contextmanager. Add to this list to register other decorators that +# produce valid context managers. +contextmanager-decorators = ["contextlib.contextmanager"] + +# List of members which are set dynamically and missed by pylint inference +# system, and so shouldn't trigger E1101 when accessed. Python regular +# expressions are accepted. +generated-members = "REQUEST,acl_users,aq_parent,argparse.Namespace" + +# Tells whether missing members accessed in mixin class should be ignored. A +# class is considered mixin if its name matches the mixin-class-rgx option. +# Tells whether to warn about missing members when the owner of the attribute is +# inferred to be None. +ignore-none = true + +# This flag controls whether pylint should warn about no-member and similar +# checks whenever an opaque object is returned when inferring. The inference can +# return multiple potential results while evaluating a Python object, but some +# branches might not be evaluated, which results in partial inference. In that +# case, it might be useful to still emit no-member and other checks for the rest +# of the inferred objects. +ignore-on-opaque-inference = true + +# List of symbolic message names to ignore for Mixin members. +ignored-checks-for-mixins = ["no-member", "not-async-context-manager", "not-context-manager", "attribute-defined-outside-init"] + +# List of class names for which member attributes should not be checked (useful +# for classes with dynamically set attributes). This supports the use of +# qualified names. +ignored-classes = ["SQLObject", "optparse.Values", "thread._local", "_thread._local"] + +# Show a hint with possible names when a member name was not found. The aspect of +# finding the hint is based on edit distance. +missing-member-hint = true + +# The minimum edit distance a name should have in order to be considered a +# similar match for a missing member name. +missing-member-hint-distance = 1 + +# The total number of similar names that should be taken in consideration when +# showing a hint for a missing member. +missing-member-max-choices = 1 + +# Regex pattern to define which classes are considered mixins. +mixin-class-rgx = ".*MixIn" + +# List of decorators that change the signature of a decorated function. +# signature-mutators = + +[tool.pylint.variables] +# List of additional names supposed to be defined in builtins. Remember that you +# should avoid defining new builtins when possible. +# additional-builtins = + +# Tells whether unused global variables should be treated as a violation. +allow-global-unused-variables = true + +# List of names allowed to shadow builtins +# allowed-redefined-builtins = + +# List of strings which can identify a callback function by name. A callback name +# must start or end with one of those strings. +callbacks = ["cb_", "_cb"] + +# A regular expression matching the name of dummy variables (i.e. expected to not +# be used). +dummy-variables-rgx = "_+$|(_[a-zA-Z0-9_]*[a-zA-Z0-9]+?$)|dummy|^ignored_|^unused_" + +# Argument names that match this expression will be ignored. +ignored-argument-names = "_.*|^ignored_|^unused_" + +# Tells whether we should check for unused import in __init__ files. +# init-import = + +# List of qualified module names which can have objects that can redefine +# builtins. +redefining-builtins-modules = ["six.moves", "past.builtins", "future.builtins", "builtins", "io"] diff --git a/src/databricks/labs/ucx/account.py b/src/databricks/labs/ucx/account.py index 77f2ab9337..5eba81ce7c 100644 --- a/src/databricks/labs/ucx/account.py +++ b/src/databricks/labs/ucx/account.py @@ -47,7 +47,7 @@ def _configured_workspaces(self): def _get_cloud(self) -> str: if self._ac.config.is_azure: return "azure" - elif self._ac.config.is_gcp: + if self._ac.config.is_gcp: return "gcp" return "aws" @@ -76,7 +76,6 @@ def sync_workspace_info(self): Create a json dump for each Workspace in account For each user that has ucx installed in their workspace, upload the json dump of workspace info in the .ucx folder - :return: """ workspaces = [] for workspace in self._configured_workspaces(): diff --git a/src/databricks/labs/ucx/assessment/azure.py b/src/databricks/labs/ucx/assessment/azure.py index 3ce8c12540..1d3d0c4445 100644 --- a/src/databricks/labs/ucx/assessment/azure.py +++ b/src/databricks/labs/ucx/assessment/azure.py @@ -27,6 +27,7 @@ _azure_sp_conf_present_check, logger, ) +from databricks.labs.ucx.assessment.jobs import JobsMixin from databricks.labs.ucx.framework.crawlers import CrawlerBase, SqlBackend from databricks.labs.ucx.hive_metastore.locations import ExternalLocations @@ -45,7 +46,7 @@ class AzureServicePrincipalInfo: storage_account: str -class AzureServicePrincipalCrawler(CrawlerBase[AzureServicePrincipalInfo]): +class AzureServicePrincipalCrawler(CrawlerBase[AzureServicePrincipalInfo], JobsMixin): def __init__(self, ws: WorkspaceClient, sbe: SqlBackend, schema): super().__init__(sbe, "hive_metastore", schema, "azure_service_principals", AzureServicePrincipalInfo) self._ws = ws @@ -128,26 +129,6 @@ def _get_azure_spn_list(self, config: dict) -> list: ) return spn_list - def _get_cluster_configs_from_all_jobs(self, all_jobs, all_clusters_by_id): - for j in all_jobs: - if j.settings is not None: - if j.settings.job_clusters is not None: - for jc in j.settings.job_clusters: - if jc.new_cluster is None: - continue - yield j, jc.new_cluster - - if j.settings.tasks is not None: - for t in j.settings.tasks: - if t.existing_cluster_id is not None: - interactive_cluster = all_clusters_by_id.get(t.existing_cluster_id, None) - if interactive_cluster is None: - continue - yield j, interactive_cluster - - elif t.new_cluster is not None: - yield j, t.new_cluster - def _get_relevant_service_principals(self) -> list: relevant_service_principals = [] temp_list = self._list_all_cluster_with_spn_in_spark_conf() @@ -414,18 +395,18 @@ def _get_principal(self, principal_id: str) -> Principal | None: try: path = f"/v1.0/directoryObjects/{principal_id}" raw: dict[str, str] = self._graph.do("GET", path) # type: ignore[assignment] - client_id = raw.get("appId") - display_name = raw.get("displayName") - object_id = raw.get("id") - assert client_id is not None - assert display_name is not None - assert object_id is not None - self._principals[principal_id] = Principal(client_id, display_name, object_id) - return self._principals[principal_id] except NotFound: # don't load principals from external directories twice self._principals[principal_id] = None return self._principals[principal_id] + client_id = raw.get("appId") + display_name = raw.get("displayName") + object_id = raw.get("id") + assert client_id is not None + assert display_name is not None + assert object_id is not None + self._principals[principal_id] = Principal(client_id, display_name, object_id) + return self._principals[principal_id] def role_assignments( self, resource_id: str, *, principal_types: list[str] | None = None diff --git a/src/databricks/labs/ucx/assessment/clusters.py b/src/databricks/labs/ucx/assessment/clusters.py index 268930a301..cf80b9971e 100644 --- a/src/databricks/labs/ucx/assessment/clusters.py +++ b/src/databricks/labs/ucx/assessment/clusters.py @@ -27,7 +27,48 @@ class ClusterInfo: creator: str | None = None -class ClustersCrawler(CrawlerBase[ClusterInfo]): +class ClustersMixin: + _ws: WorkspaceClient + + def _safe_get_cluster_policy(self, policy_id: str) -> Policy | None: + try: + return self._ws.cluster_policies.get(policy_id) + except NotFound: + logger.warning(f"The cluster policy was deleted: {policy_id}") + return None + + def _check_spark_conf(self, cluster, failures): + for k in INCOMPATIBLE_SPARK_CONFIG_KEYS: + if k in cluster.spark_conf: + failures.append(f"unsupported config: {k}") + for value in cluster.spark_conf.values(): + if "dbfs:/mnt" in value or "/dbfs/mnt" in value: + failures.append(f"using DBFS mount in configuration: {value}") + # Checking if Azure cluster config is present in spark config + if _azure_sp_conf_present_check(cluster.spark_conf): + failures.append(f"{_AZURE_SP_CONF_FAILURE_MSG} cluster.") + + def _check_cluster_policy(self, cluster, failures): + policy = self._safe_get_cluster_policy(cluster.policy_id) + if policy: + if policy.definition: + if _azure_sp_conf_present_check(json.loads(policy.definition)): + failures.append(f"{_AZURE_SP_CONF_FAILURE_MSG} cluster.") + if policy.policy_family_definition_overrides: + if _azure_sp_conf_present_check(json.loads(policy.policy_family_definition_overrides)): + failures.append(f"{_AZURE_SP_CONF_FAILURE_MSG} cluster.") + + def _check_init_scripts(self, cluster, failures): + for init_script_info in cluster.init_scripts: + init_script_data = _get_init_script_data(self._ws, init_script_info) + if not init_script_data: + continue + if not _azure_sp_conf_in_init_scripts(init_script_data): + continue + failures.append(f"{_AZURE_SP_CONF_FAILURE_MSG} cluster.") + + +class ClustersCrawler(CrawlerBase[ClusterInfo], ClustersMixin): def __init__(self, ws: WorkspaceClient, sbe: SqlBackend, schema): super().__init__(sbe, "hive_metastore", schema, "clusters", ClusterInfo) self._ws = ws @@ -58,50 +99,20 @@ def _assess_clusters(self, all_clusters): failures.append(f"not supported DBR: {cluster.spark_version}") if cluster.spark_conf is not None: - for k in INCOMPATIBLE_SPARK_CONFIG_KEYS: - if k in cluster.spark_conf: - failures.append(f"unsupported config: {k}") - - for value in cluster.spark_conf.values(): - if "dbfs:/mnt" in value or "/dbfs/mnt" in value: - failures.append(f"using DBFS mount in configuration: {value}") - - # Checking if Azure cluster config is present in spark config - if _azure_sp_conf_present_check(cluster.spark_conf): - failures.append(f"{_AZURE_SP_CONF_FAILURE_MSG} cluster.") + self._check_spark_conf(cluster, failures) # Checking if Azure cluster config is present in cluster policies if cluster.policy_id: - policy = self._safe_get_cluster_policy(cluster.policy_id) - if policy: - if policy.definition: - if _azure_sp_conf_present_check(json.loads(policy.definition)): - failures.append(f"{_AZURE_SP_CONF_FAILURE_MSG} cluster.") - if policy.policy_family_definition_overrides: - if _azure_sp_conf_present_check(json.loads(policy.policy_family_definition_overrides)): - failures.append(f"{_AZURE_SP_CONF_FAILURE_MSG} cluster.") + self._check_cluster_policy(cluster, failures) if cluster.init_scripts: - for init_script_info in cluster.init_scripts: - init_script_data = _get_init_script_data(self._ws, init_script_info) - if not init_script_data: - continue - if not _azure_sp_conf_in_init_scripts(init_script_data): - continue - failures.append(f"{_AZURE_SP_CONF_FAILURE_MSG} cluster.") + self._check_init_scripts(cluster, failures) cluster_info.failures = json.dumps(failures) if len(failures) > 0: cluster_info.success = 0 yield cluster_info - def _safe_get_cluster_policy(self, policy_id: str) -> Policy | None: - try: - return self._ws.cluster_policies.get(policy_id) - except NotFound: - logger.warning(f"The cluster policy was deleted: {policy_id}") - return None - def snapshot(self) -> Iterable[ClusterInfo]: return self._snapshot(self._try_fetch, self._crawl) diff --git a/src/databricks/labs/ucx/assessment/crawlers.py b/src/databricks/labs/ucx/assessment/crawlers.py index f6322acfd1..d9729cb7f1 100644 --- a/src/databricks/labs/ucx/assessment/crawlers.py +++ b/src/databricks/labs/ucx/assessment/crawlers.py @@ -2,6 +2,8 @@ import logging import re +from databricks.sdk.errors import NotFound + logger = logging.getLogger(__name__) INCOMPATIBLE_SPARK_CONFIG_KEYS = [ @@ -33,7 +35,7 @@ def _get_init_script_data(w, init_script_info): try: data = w.dbfs.read(file_api_format_destination).data return base64.b64decode(data).decode("utf-8") - except Exception: + except NotFound: return None if init_script_info.workspace: workspace_file_destination = init_script_info.workspace.destination @@ -41,8 +43,9 @@ def _get_init_script_data(w, init_script_info): try: data = w.workspace.export(workspace_file_destination).content return base64.b64decode(data).decode("utf-8") - except Exception: + except NotFound: return None + return None def _azure_sp_conf_in_init_scripts(init_script_data: str) -> bool: diff --git a/src/databricks/labs/ucx/assessment/jobs.py b/src/databricks/labs/ucx/assessment/jobs.py index 178baad61e..7cab71a1fd 100644 --- a/src/databricks/labs/ucx/assessment/jobs.py +++ b/src/databricks/labs/ucx/assessment/jobs.py @@ -28,31 +28,34 @@ class JobInfo: creator: str | None = None -class JobsCrawler(CrawlerBase[JobInfo]): - def __init__(self, ws: WorkspaceClient, sbe: SqlBackend, schema): - super().__init__(sbe, "hive_metastore", schema, "jobs", JobInfo) - self._ws = ws - +class JobsMixin: @staticmethod def _get_cluster_configs_from_all_jobs(all_jobs, all_clusters_by_id): for j in all_jobs: - if j.settings is not None: - if j.settings.job_clusters is not None: - for jc in j.settings.job_clusters: - if jc.new_cluster is None: - continue - yield j, jc.new_cluster - - if j.settings.tasks is not None: - for t in j.settings.tasks: - if t.existing_cluster_id is not None: - interactive_cluster = all_clusters_by_id.get(t.existing_cluster_id, None) - if interactive_cluster is None: - continue - yield j, interactive_cluster - - elif t.new_cluster is not None: - yield j, t.new_cluster + if j.settings is None: + continue + if j.settings.job_clusters is not None: + for jc in j.settings.job_clusters: + if jc.new_cluster is None: + continue + yield j, jc.new_cluster + if j.settings.tasks is None: + continue + for t in j.settings.tasks: + if t.existing_cluster_id is not None: + interactive_cluster = all_clusters_by_id.get(t.existing_cluster_id, None) + if interactive_cluster is None: + continue + yield j, interactive_cluster + + elif t.new_cluster is not None: + yield j, t.new_cluster + + +class JobsCrawler(CrawlerBase[JobInfo], JobsMixin): + def __init__(self, ws: WorkspaceClient, sbe: SqlBackend, schema): + super().__init__(sbe, "hive_metastore", schema, "jobs", JobInfo) + self._ws = ws def _crawl(self) -> Iterable[JobInfo]: all_jobs = list(self._ws.jobs.list(expand_tasks=True)) @@ -73,17 +76,18 @@ def _assess_jobs(self, all_jobs: list[BaseJob], all_clusters_by_id) -> Iterable[ ) job_settings = job.settings - if job_settings is not None: - job_name = job_settings.name - if not job_name: - job_name = "Unknown" - job_details[job.job_id] = JobInfo( - job_id=str(job.job_id), - job_name=job_name, - creator=job.creator_user_name, - success=1, - failures="[]", - ) + if not job_settings: + continue + job_name = job_settings.name + if not job_name: + job_name = "Unknown" + job_details[job.job_id] = JobInfo( + job_id=str(job.job_id), + job_name=job_name, + creator=job.creator_user_name, + success=1, + failures="[]", + ) for job, cluster_config in self._get_cluster_configs_from_all_jobs(all_jobs, all_clusters_by_id): support_status = spark_version_compatibility(cluster_config.spark_version) @@ -94,17 +98,7 @@ def _assess_jobs(self, all_jobs: list[BaseJob], all_clusters_by_id) -> Iterable[ job_assessment[job_id].add(f"not supported DBR: {cluster_config.spark_version}") if cluster_config.spark_conf is not None: - for k in INCOMPATIBLE_SPARK_CONFIG_KEYS: - if k in cluster_config.spark_conf: - job_assessment[job_id].add(f"unsupported config: {k}") - - for value in cluster_config.spark_conf.values(): - if "dbfs:/mnt" in value or "/dbfs/mnt" in value: - job_assessment[job_id].add(f"using DBFS mount in configuration: {value}") - - # Checking if Azure cluster config is present in spark config - if _azure_sp_conf_present_check(cluster_config.spark_conf): - job_assessment[job_id].add(f"{_AZURE_SP_CONF_FAILURE_MSG} Job cluster.") + self._job_spark_conf(cluster_config, job_assessment, job_id) # Checking if Azure cluster config is present in cluster policies if cluster_config.policy_id: @@ -119,20 +113,35 @@ def _assess_jobs(self, all_jobs: list[BaseJob], all_clusters_by_id) -> Iterable[ job_assessment[job_id].add(f"{_AZURE_SP_CONF_FAILURE_MSG} Job cluster.") if cluster_config.init_scripts: - for init_script_info in cluster_config.init_scripts: - init_script_data = _get_init_script_data(self._ws, init_script_info) - if not init_script_data: - continue - if not _azure_sp_conf_in_init_scripts(init_script_data): - continue - job_assessment[job_id].add(f"{_AZURE_SP_CONF_FAILURE_MSG} Job cluster.") + self._init_scripts(cluster_config, job_assessment, job_id) - for job_key in job_details.keys(): + # TODO: next person looking at this - rewrite, as this code makes no sense + for job_key in job_details.keys(): # pylint: disable=consider-using-dict-items,consider-iterating-dictionary job_details[job_key].failures = json.dumps(list(job_assessment[job_key])) if len(job_assessment[job_key]) > 0: job_details[job_key].success = 0 return list(job_details.values()) + def _init_scripts(self, cluster_config, job_assessment, job_id): + for init_script_info in cluster_config.init_scripts: + init_script_data = _get_init_script_data(self._ws, init_script_info) + if not init_script_data: + continue + if not _azure_sp_conf_in_init_scripts(init_script_data): + continue + job_assessment[job_id].add(f"{_AZURE_SP_CONF_FAILURE_MSG} Job cluster.") + + def _job_spark_conf(self, cluster_config, job_assessment, job_id): + for k in INCOMPATIBLE_SPARK_CONFIG_KEYS: + if k in cluster_config.spark_conf: + job_assessment[job_id].add(f"unsupported config: {k}") + for value in cluster_config.spark_conf.values(): + if "dbfs:/mnt" in value or "/dbfs/mnt" in value: + job_assessment[job_id].add(f"using DBFS mount in configuration: {value}") + # Checking if Azure cluster config is present in spark config + if _azure_sp_conf_present_check(cluster_config.spark_conf): + job_assessment[job_id].add(f"{_AZURE_SP_CONF_FAILURE_MSG} Job cluster.") + def _safe_get_cluster_policy(self, policy_id: str) -> Policy | None: try: return self._ws.cluster_policies.get(policy_id) diff --git a/src/databricks/labs/ucx/cli.py b/src/databricks/labs/ucx/cli.py index 5dd4fe0ad9..18635ae5f5 100644 --- a/src/databricks/labs/ucx/cli.py +++ b/src/databricks/labs/ucx/cli.py @@ -61,12 +61,12 @@ def skip(w: WorkspaceClient, schema: str | None = None, table: str | None = None logger.info("Running skip command") if not schema: logger.error("--schema is a required parameter.") - return None + return installation_manager = InstallationManager(w) installation = installation_manager.for_user(w.current_user.me()) if not installation: logger.error(CANT_FIND_UCX_MSG) - return None + return warehouse_id = installation.config.warehouse_id sql_backend = StatementExecutionBackend(w, warehouse_id) mapping = TableMapping(w, sql_backend) @@ -128,7 +128,7 @@ def ensure_assessment_run(w: WorkspaceClient): installation = installation_manager.for_user(w.current_user.me()) if not installation: logger.error(CANT_FIND_UCX_MSG) - return None + return workspace_installer = WorkspaceInstaller(w) workspace_installer.validate_and_run("assessment") @@ -150,7 +150,7 @@ def validate_groups_membership(w: WorkspaceClient): installation = installation_manager.for_user(w.current_user.me()) if not installation: logger.error(CANT_FIND_UCX_MSG) - return None + return warehouse_id = installation.config.warehouse_id inventory_database = installation.config.inventory_database renamed_group_prefix = installation.config.renamed_group_prefix @@ -186,10 +186,10 @@ def revert_migrated_tables(w: WorkspaceClient, schema: str, table: str, *, delet " Would you like to continue?", max_attempts=2, ): - return None + return if not installation: logger.error(CANT_FIND_UCX_MSG) - return None + return warehouse_id = installation.config.warehouse_id sql_backend = StatementExecutionBackend(w, warehouse_id) table_crawler = TablesCrawler(sql_backend, installation.config.inventory_database) @@ -260,5 +260,5 @@ def save_azure_storage_accounts(w: WorkspaceClient, subscription_id: str): azure_resource_permissions.save_spn_permissions() -if "__main__" == __name__: +if __name__ == "__main__": ucx() diff --git a/src/databricks/labs/ucx/config.py b/src/databricks/labs/ucx/config.py index 6b1055390a..4905ed0105 100644 --- a/src/databricks/labs/ucx/config.py +++ b/src/databricks/labs/ucx/config.py @@ -4,6 +4,7 @@ from pathlib import Path from typing import Any, Generic, TypeVar +import yaml from databricks.sdk import AccountClient, WorkspaceClient from databricks.sdk.core import Config @@ -13,7 +14,7 @@ @dataclass -class ConnectConfig: +class ConnectConfig: # pylint: disable=too-many-instance-attributes # Keep all the fields in sync with databricks.sdk.core.Config host: str | None = None account_id: str | None = None @@ -98,9 +99,7 @@ def from_dict(cls, raw: dict[str, Any]) -> T: @classmethod def from_bytes(cls, raw_str: str | bytes) -> T: - from yaml import safe_load - - raw: dict[str, Any] = safe_load(raw_str) + raw: dict[str, Any] = yaml.safe_load(raw_str) empty: dict[str, Any] = {} return cls.from_dict(empty if not raw else raw) @@ -120,12 +119,10 @@ def to_databricks_config(self) -> Config: return connect.to_databricks_config() def as_dict(self) -> dict[str, Any]: - from dataclasses import fields, is_dataclass - def inner(x): - if is_dataclass(x): + if dataclasses.is_dataclass(x): result = [] - for f in fields(x): + for f in dataclasses.fields(x): value = inner(getattr(x, f.name)) if not value: continue @@ -189,7 +186,7 @@ def _verify_version(cls, raw: dict): @dataclass -class WorkspaceConfig(_Config["WorkspaceConfig"]): +class WorkspaceConfig(_Config["WorkspaceConfig"]): # pylint: disable=too-many-instance-attributes inventory_database: str # Group name conversion parameters. workspace_group_regex: str | None = None diff --git a/src/databricks/labs/ucx/framework/crawlers.py b/src/databricks/labs/ucx/framework/crawlers.py index 3c2fa6a95b..5e71f3a6f5 100644 --- a/src/databricks/labs/ucx/framework/crawlers.py +++ b/src/databricks/labs/ucx/framework/crawlers.py @@ -147,6 +147,7 @@ def _row_to_sql(row, fields): class RuntimeBackend(SqlBackend): def __init__(self): + # pylint: disable-next=import-error,import-outside-toplevel from pyspark.sql.session import SparkSession # type: ignore[import-not-found] if "DATABRICKS_RUNTIME_VERSION" not in os.environ: @@ -159,7 +160,7 @@ def execute(self, sql): logger.debug(f"[spark][execute] {sql}") try: immediate_response = self._spark.sql(sql) - except Exception as e: + except Exception as e: # pylint: disable=broad-exception-caught error_message = str(e) self._raise_spark_sql_exceptions(error_message) return immediate_response @@ -168,7 +169,7 @@ def fetch(self, sql) -> Iterator[Row]: logger.debug(f"[spark][fetch] {sql}") try: fetch_query_response = self._spark.sql(sql).collect() - except Exception as e: + except Exception as e: # pylint: disable=broad-exception-caught error_message = str(e) self._raise_spark_sql_exceptions(error_message) return fetch_query_response @@ -187,18 +188,17 @@ def save_table(self, full_name: str, rows: Sequence[DataclassInstance], klass: D def _raise_spark_sql_exceptions(error_message: str): if "SCHEMA_NOT_FOUND" in error_message: raise NotFound(error_message) from None - elif "TABLE_OR_VIEW_NOT_FOUND" in error_message: + if "TABLE_OR_VIEW_NOT_FOUND" in error_message: raise NotFound(error_message) from None - elif "DELTA_TABLE_NOT_FOUND" in error_message: + if "DELTA_TABLE_NOT_FOUND" in error_message: raise NotFound(error_message) from None - elif "DELTA_MISSING_TRANSACTION_LOG" in error_message: + if "DELTA_MISSING_TRANSACTION_LOG" in error_message: raise DataLoss(error_message) from None - elif "PARSE_SYNTAX_ERROR" in error_message: + if "PARSE_SYNTAX_ERROR" in error_message: raise BadRequest(error_message) from None - elif "Operation not allowed" in error_message: + if "Operation not allowed" in error_message: raise PermissionDenied(error_message) from None - else: - raise Unknown(error_message) from None + raise Unknown(error_message) from None class CrawlerBase(Generic[Result]): diff --git a/src/databricks/labs/ucx/framework/dashboards.py b/src/databricks/labs/ucx/framework/dashboards.py index 56c085e9df..3fa46cba85 100644 --- a/src/databricks/labs/ucx/framework/dashboards.py +++ b/src/databricks/labs/ucx/framework/dashboards.py @@ -44,7 +44,7 @@ def viz_args(self) -> dict: @dataclass -class VizColumn: +class VizColumn: # pylint: disable=too-many-instance-attributes) name: str title: str type: str = "string" @@ -160,7 +160,7 @@ def _get_widget_options(self, query: SimpleQuery): return widget_options def _state_pre_v06(self): - try: + try: # pylint: disable=too-many-try-statements) query_state = f"{self._remote_folder}/state.json" state = json.load(self._ws.workspace.download(query_state)) to_remove = [] @@ -282,6 +282,7 @@ def _install_viz(self, query: SimpleQuery): viz = self._ws.query_visualizations.create(self._state.queries[query.key], **viz_args) assert viz.id is not None self._state.viz[query.key] = viz.id + return None def _get_viz_options(self, query: SimpleQuery): viz_types: dict[str, Callable[..., dict]] = {"table": self._table_viz_args, "counter": self._counter_viz_args} @@ -308,6 +309,7 @@ def _install_query(self, query: SimpleQuery, dashboard_name: str, data_source_id access_control_list=[AccessControl(group_name="users", permission_level=PermissionLevel.CAN_RUN)], ) self._state.queries[query.key] = deployed_query.id + return None @staticmethod def _table_viz_args( @@ -336,7 +338,7 @@ def _table_viz_args( } @staticmethod - def _counter_viz_args( + def _counter_viz_args( # pylint: disable=too-many-arguments name: str, value_column: str, *, diff --git a/src/databricks/labs/ucx/framework/tasks.py b/src/databricks/labs/ucx/framework/tasks.py index fd30e9a62f..b3a879811e 100644 --- a/src/databricks/labs/ucx/framework/tasks.py +++ b/src/databricks/labs/ucx/framework/tasks.py @@ -1,3 +1,4 @@ +import contextlib import logging import os from collections.abc import Callable @@ -41,14 +42,12 @@ def cloud_compatible(self, config: Config) -> bool: if self.cloud: if self.cloud.lower() == "aws": return config.is_aws - elif self.cloud.lower() == "azure": + if self.cloud.lower() == "azure": return config.is_azure - elif self.cloud.lower() == "gcp": + if self.cloud.lower() == "gcp": return config.is_gcp - else: - return True - else: return True + return True def _remove_extra_indentation(doc: str) -> str: @@ -120,38 +119,100 @@ def wrapper(*args, **kwargs): return decorator -@retried(on=[FileExistsError], timeout=timedelta(seconds=5)) -def _create_lock(lockfile_name): - while True: # wait until the lock file can be opened - f = os.open(lockfile_name, os.O_CREAT | os.O_EXCL) - break - return f - +class TaskLogger(contextlib.AbstractContextManager): + # files are available in the workspace only once their handlers are closed, + # so we rotate files log every 10 minutes. + # + # See https://docs.python.org/3/library/logging.handlers.html#logging.handlers.TimedRotatingFileHandler + # See https://docs.python.org/3/howto/logging-cookbook.html + + def __init__( + self, install_dir: Path, workflow: str, workflow_id: str, task_name: str, workflow_run_id: str, log_level="INFO" + ): + self._log_level = log_level + self._workflow = workflow + self._workflow_id = workflow_id + self._workflow_run_id = workflow_run_id + self._databricks_logger = logging.getLogger("databricks") + self._app_logger = logging.getLogger("databricks.labs.ucx") + self._log_path = install_dir / "logs" / self._workflow / f"run-{self._workflow_run_id}" + self._log_file = self._log_path / f"{task_name}.log" + self._app_logger.info(f"UCX v{__version__} After job finishes, see debug logs at {self._log_file}") + + def __repr__(self): + return self._log_file.as_posix() + + def __enter__(self): + self._log_path.mkdir(parents=True, exist_ok=True) + self._init_debug_logfile() + self._init_run_readme() + self._databricks_logger.setLevel(logging.DEBUG) + self._app_logger.setLevel(logging.DEBUG) + console_handler = install_logger(self._log_level) + self._databricks_logger.removeHandler(console_handler) + self._databricks_logger.addHandler(self._file_handler) + return self + + def __exit__(self, _t, error, _tb): + if error: + log_file_for_cli = str(self._log_file).removeprefix("/Workspace") + cli_command = f"databricks workspace export /{log_file_for_cli}" + self._app_logger.error(f"Execute `{cli_command}` locally to troubleshoot with more details. {error}") + self._databricks_logger.debug("Task crash details", exc_info=error) + self._file_handler.flush() + self._file_handler.close() + + def _init_debug_logfile(self): + log_format = "%(asctime)s %(levelname)s [%(name)s] {%(threadName)s} %(message)s" + log_formatter = logging.Formatter(fmt=log_format, datefmt="%H:%M:%S") + self._file_handler = TimedRotatingFileHandler(self._log_file.as_posix(), when="M", interval=10) + self._file_handler.setFormatter(log_formatter) + self._file_handler.setLevel(logging.DEBUG) + + def _init_run_readme(self): + log_readme = self._log_path.joinpath("README.md") + if log_readme.exists(): + return + # this may race when run from multiple tasks, therefore it must be multiprocess safe + with self._exclusive_open(str(log_readme), mode="w") as f: + f.write(f"# Logs for the UCX {self._workflow} workflow\n") + f.write("This folder contains UCX log files.\n\n") + f.write(f"See the [{self._workflow} job](/#job/{self._workflow_id}) and ") + f.write(f"[run #{self._workflow_run_id}](/#job/{self._workflow_id}/run/{self._workflow_run_id})\n") -@contextmanager -def _exclusive_open(filename: str, *args, **kwargs): - """Open a file with exclusive access across multiple processes. - Requires write access to the directory containing the file. + @classmethod + @contextmanager + def _exclusive_open(cls, filename: str, **kwargs): + """Open a file with exclusive access across multiple processes. + Requires write access to the directory containing the file. - Arguments are the same as the built-in open. + Arguments are the same as the built-in open. - Returns a context manager that closes the file and releases the lock. - """ - lockfile_name = filename + ".lock" - lockfile = _create_lock(lockfile_name) + Returns a context manager that closes the file and releases the lock. + """ + lockfile_name = filename + ".lock" + lockfile = cls._create_lock(lockfile_name) - try: - with open(filename, *args, **kwargs) as f: - yield f - finally: try: - os.close(lockfile) + with open(filename, encoding="utf8", **kwargs) as f: + yield f finally: - os.unlink(lockfile_name) + try: + os.close(lockfile) + finally: + os.unlink(lockfile_name) + + @staticmethod + @retried(on=[FileExistsError], timeout=timedelta(seconds=5)) + def _create_lock(lockfile_name): + while True: # wait until the lock file can be opened + f = os.open(lockfile_name, os.O_CREAT | os.O_EXCL) + break + return f def trigger(*argv): - args = dict(a[2:].split("=") for a in argv if "--" == a[0:2]) + args = dict(a[2:].split("=") for a in argv if a[0:2] == "--") if "config" not in args: msg = "no --config specified" raise KeyError(msg) @@ -171,52 +232,14 @@ def trigger(*argv): config_path = Path(args["config"]) cfg = WorkspaceConfig.from_file(config_path) - - # see https://docs.python.org/3/howto/logging-cookbook.html - databricks_logger = logging.getLogger("databricks") - databricks_logger.setLevel(logging.DEBUG) - - ucx_logger = logging.getLogger("databricks.labs.ucx") - ucx_logger.setLevel(logging.DEBUG) - - log_path = config_path.parent / "logs" / current_task.workflow / f"run-{workflow_run_id}" - log_path.mkdir(parents=True, exist_ok=True) - - log_file = log_path / f"{task_name}.log" - - # files are available in the workspace only once their handlers are closed, - # so we rotate files log every 10 minutes. - # - # See https://docs.python.org/3/library/logging.handlers.html#logging.handlers.TimedRotatingFileHandler - file_handler = TimedRotatingFileHandler(log_file.as_posix(), when="M", interval=10) - log_format = "%(asctime)s %(levelname)s [%(name)s] {%(threadName)s} %(message)s" - log_formatter = logging.Formatter(fmt=log_format, datefmt="%H:%M:%S") - file_handler.setFormatter(log_formatter) - file_handler.setLevel(logging.DEBUG) - - console_handler = install_logger(cfg.log_level) - databricks_logger.removeHandler(console_handler) - databricks_logger.addHandler(file_handler) - - ucx_logger.info(f"UCX v{__version__} After job finishes, see debug logs at {log_file}") - - log_readme = log_path.joinpath("README.md") - if not log_readme.exists(): - # this may race when run from multiple tasks, therefore it must be multiprocess safe - with _exclusive_open(str(log_readme), mode="w") as f: - f.write(f"# Logs for the UCX {current_task.workflow} workflow\n") - f.write("This folder contains UCX log files.\n\n") - f.write(f"See the [{current_task.workflow} job](/#job/{job_id}) and ") - f.write(f"[run #{workflow_run_id}](/#job/{job_id}/run/{workflow_run_id})\n") - - try: + with TaskLogger( + config_path.parent, + workflow=current_task.workflow, + workflow_id=job_id, + task_name=task_name, + workflow_run_id=workflow_run_id, + log_level=cfg.log_level, + ) as task_logger: + ucx_logger = logging.getLogger("databricks.labs.ucx") + ucx_logger.info(f"UCX v{__version__} After job finishes, see debug logs at {task_logger}") current_task.fn(cfg) - except BaseException as error: - log_file_for_cli = str(log_file).lstrip("/Workspace") - cli_command = f"databricks workspace export /{log_file_for_cli}" - ucx_logger.error(f"Task crashed. Execute `{cli_command}` locally to troubleshoot with more details. {error}") - databricks_logger.debug("Task crash details", exc_info=error) - file_handler.flush() - raise - finally: - file_handler.close() diff --git a/src/databricks/labs/ucx/hive_metastore/grants.py b/src/databricks/labs/ucx/hive_metastore/grants.py index 59cfaaa5fc..918880975c 100644 --- a/src/databricks/labs/ucx/hive_metastore/grants.py +++ b/src/databricks/labs/ucx/hive_metastore/grants.py @@ -239,9 +239,6 @@ def _grants( any_file (bool): Whether to include any file grants (optional). anonymous_function (bool): Whether to include anonymous function grants (optional). - Yields: - Iterator[Grant]: An iterator of Grant objects representing the fetched grants. - Behavior: - Normalizes the provided parameters and constructs an object type and key using the `Grant.type_and_key` method. @@ -268,7 +265,7 @@ def _grants( any_file=any_file, anonymous_function=anonymous_function, ) - try: + try: # pylint: disable=too-many-try-statements grants = [] object_type_normalization = { "SCHEMA": "DATABASE", @@ -278,8 +275,7 @@ def _grants( } for row in self._fetch(f"SHOW GRANTS ON {on_type} {key}"): (principal, action_type, object_type, _) = row - if object_type in object_type_normalization: - object_type = object_type_normalization[object_type] + object_type = object_type_normalization.get(object_type, object_type) if on_type != object_type: continue # we have to return concrete list, as with yield we're executing @@ -297,7 +293,7 @@ def _grants( ) grants.append(grant) return grants - except Exception as e: + except Exception as e: # pylint: disable=broad-exception-caught # TODO: https://github.com/databrickslabs/ucx/issues/406 logger.error(f"Couldn't fetch grants for object {on_type} {key}: {e}") return [] diff --git a/src/databricks/labs/ucx/hive_metastore/hms_lineage.py b/src/databricks/labs/ucx/hive_metastore/hms_lineage.py index baa800ed20..54b7a6a6e7 100644 --- a/src/databricks/labs/ucx/hive_metastore/hms_lineage.py +++ b/src/databricks/labs/ucx/hive_metastore/hms_lineage.py @@ -35,11 +35,9 @@ def check_lineage_spark_config_exists(self) -> GlobalInitScriptDetailsWithConten return script_content return None - def _get_init_script_content(self): - try: - return base64.b64encode(global_init_script.encode()).decode() - except Exception: - print("The init script content was not found.") + @staticmethod + def _get_init_script_content(): + return base64.b64encode(global_init_script.encode()).decode() def add_global_init_script(self) -> str: content = self._get_init_script_content() diff --git a/src/databricks/labs/ucx/hive_metastore/locations.py b/src/databricks/labs/ucx/hive_metastore/locations.py index 7a4c865cb1..c2995fa76b 100644 --- a/src/databricks/labs/ucx/hive_metastore/locations.py +++ b/src/databricks/labs/ucx/hive_metastore/locations.py @@ -67,47 +67,46 @@ def _external_locations(self, tables: list[Row], mounts) -> Iterable[ExternalLoc if not dupe: external_locations.append(ExternalLocation(os.path.dirname(location) + "/", 1)) if location.startswith("jdbc"): - dupe = False - pattern = r"(\w+)=(.*?)(?=\s*,|\s*\])" - - # Find all matches in the input string - # Storage properties is of the format - # "[personalAccessToken=*********(redacted), \ - # httpPath=/sql/1.0/warehouses/65b52fb5bd86a7be, host=dbc-test1-aa11.cloud.databricks.com, \ - # dbtable=samples.nyctaxi.trips]" - matches = re.findall(pattern, table.storage_properties) - - # Create a dictionary from the matches - result_dict = dict(matches) - - # Fetch the value of host from the newly created dict - host = result_dict.get("host", "") - port = result_dict.get("port", "") - database = result_dict.get("database", "") - httppath = result_dict.get("httpPath", "") - provider = result_dict.get("provider", "") - # dbtable = result_dict.get("dbtable", "") - - # currently supporting databricks and mysql external tables - # add other jdbc types - if "databricks" in location.lower(): - jdbc_location = f"jdbc:databricks://{host};httpPath={httppath}" - elif "mysql" in location.lower(): - jdbc_location = f"jdbc:mysql://{host}:{port}/{database}" - elif not provider == "": - jdbc_location = f"jdbc:{provider.lower()}://{host}:{port}/{database}" - else: - jdbc_location = f"{location.lower()}/{host}:{port}/{database}" - for ext_loc in external_locations: - if ext_loc.location == jdbc_location: - ext_loc.table_count += 1 - dupe = True - break - if not dupe: - external_locations.append(ExternalLocation(jdbc_location, 1)) + self._add_jdbc_location(external_locations, location, table) return external_locations + def _add_jdbc_location(self, external_locations, location, table): + dupe = False + pattern = r"(\w+)=(.*?)(?=\s*,|\s*\])" + # Find all matches in the input string + # Storage properties is of the format + # "[personalAccessToken=*********(redacted), \ + # httpPath=/sql/1.0/warehouses/65b52fb5bd86a7be, host=dbc-test1-aa11.cloud.databricks.com, \ + # dbtable=samples.nyctaxi.trips]" + matches = re.findall(pattern, table.storage_properties) + # Create a dictionary from the matches + result_dict = dict(matches) + # Fetch the value of host from the newly created dict + host = result_dict.get("host", "") + port = result_dict.get("port", "") + database = result_dict.get("database", "") + httppath = result_dict.get("httpPath", "") + provider = result_dict.get("provider", "") + # dbtable = result_dict.get("dbtable", "") + # currently supporting databricks and mysql external tables + # add other jdbc types + if "databricks" in location.lower(): + jdbc_location = f"jdbc:databricks://{host};httpPath={httppath}" + elif "mysql" in location.lower(): + jdbc_location = f"jdbc:mysql://{host}:{port}/{database}" + elif not provider == "": + jdbc_location = f"jdbc:{provider.lower()}://{host}:{port}/{database}" + else: + jdbc_location = f"{location.lower()}/{host}:{port}/{database}" + for ext_loc in external_locations: + if ext_loc.location == jdbc_location: + ext_loc.table_count += 1 + dupe = True + break + if not dupe: + external_locations.append(ExternalLocation(jdbc_location, 1)) + def _external_location_list(self) -> Iterable[ExternalLocation]: tables = list( self._backend.fetch( diff --git a/src/databricks/labs/ucx/hive_metastore/mapping.py b/src/databricks/labs/ucx/hive_metastore/mapping.py index 08ba83e21d..3fd2ffff1f 100644 --- a/src/databricks/labs/ucx/hive_metastore/mapping.py +++ b/src/databricks/labs/ucx/hive_metastore/mapping.py @@ -197,11 +197,12 @@ def _exists_in_uc(self, src_table: Table, target_key: str): return True upgraded_from = table_info.properties.get("upgraded_from") if upgraded_from and upgraded_from != src_table.key: - msg = f"Expected to be migrated from {src_table.key}, but got {upgraded_from}. " - "You can skip this error using the CLI command: " - "databricks labs ucx skip " - f"--schema {src_table.database} --table {src_table.name}" - raise ResourceConflict(msg) + raise ResourceConflict( + f"Expected to be migrated from {src_table.key}, but got {upgraded_from}. " + "You can skip this error using the CLI command: " + "databricks labs ucx skip " + f"--schema {src_table.database} --table {src_table.name}" + ) return True except NotFound: return False diff --git a/src/databricks/labs/ucx/hive_metastore/table_migrate.py b/src/databricks/labs/ucx/hive_metastore/table_migrate.py index bedcbeaa85..4ca8ac88be 100644 --- a/src/databricks/labs/ucx/hive_metastore/table_migrate.py +++ b/src/databricks/labs/ucx/hive_metastore/table_migrate.py @@ -135,11 +135,11 @@ def _get_revert_count(self, schema: str | None = None, table: str | None = None) table_by_database[cur_table.database].append(cur_table) migration_list = [] - for cur_database in table_by_database.keys(): + for cur_database, tables in table_by_database.items(): external_tables = 0 managed_tables = 0 views = 0 - for current_table in table_by_database[cur_database]: + for current_table in tables: if current_table.upgraded_to is not None: if current_table.kind == "VIEW": views += 1 @@ -259,24 +259,12 @@ def _move_table( from_table_name = f"{from_catalog}.{from_schema}.{from_table}" to_table_name = f"{to_catalog}.{to_schema}.{from_table}" try: - create_sql = str(next(self._backend.fetch(f"SHOW CREATE TABLE {from_table_name}"))[0]) - create_table_sql = create_sql.replace(f"CREATE TABLE {from_table_name}", f"CREATE TABLE {to_table_name}") - logger.info(f"Creating table {to_table_name}") - self._backend.execute(create_table_sql) - - grants = self._ws.grants.get(SecurableType.TABLE, from_table_name) - if grants.privilege_assignments is not None: - logger.info(f"Applying grants on table {to_table_name}") - grants_changes = [ - PermissionsChange(pair.privileges, pair.principal) for pair in grants.privilege_assignments - ] - self._ws.grants.update(SecurableType.TABLE, to_table_name, changes=grants_changes) - + self._recreate_table(from_table_name, to_table_name) + self._reapply_grants(from_table_name, to_table_name) if del_table: logger.info(f"Dropping source table {from_table_name}") drop_sql = f"DROP TABLE {from_table_name}" self._backend.execute(drop_sql) - return True except NotFound as err: if "[TABLE_OR_VIEW_NOT_FOUND]" in str(err) or "[DELTA_TABLE_NOT_FOUND]" in str(err): @@ -285,6 +273,21 @@ def _move_table( logger.error(f"Failed to move table {from_table_name}: {err!s}", exc_info=True) return False + def _reapply_grants(self, from_table_name, to_table_name): + grants = self._ws.grants.get(SecurableType.TABLE, from_table_name) + if grants.privilege_assignments is not None: + logger.info(f"Applying grants on table {to_table_name}") + grants_changes = [ + PermissionsChange(pair.privileges, pair.principal) for pair in grants.privilege_assignments + ] + self._ws.grants.update(SecurableType.TABLE, to_table_name, changes=grants_changes) + + def _recreate_table(self, from_table_name, to_table_name): + create_sql = str(next(self._backend.fetch(f"SHOW CREATE TABLE {from_table_name}"))[0]) + create_table_sql = create_sql.replace(f"CREATE TABLE {from_table_name}", f"CREATE TABLE {to_table_name}") + logger.info(f"Creating table {to_table_name}") + self._backend.execute(create_table_sql) + def _move_view( self, from_catalog: str, @@ -298,23 +301,12 @@ def _move_view( from_view_name = f"{from_catalog}.{from_schema}.{from_view}" to_view_name = f"{to_catalog}.{to_schema}.{from_view}" try: - create_sql = f"CREATE VIEW {to_view_name} AS {view_text}" - logger.info(f"Creating view {to_view_name}") - self._backend.execute(create_sql) - - grants = self._ws.grants.get(SecurableType.TABLE, from_view_name) - if grants.privilege_assignments is not None: - logger.info(f"Applying grants on view {to_view_name}") - grants_changes = [ - PermissionsChange(pair.privileges, pair.principal) for pair in grants.privilege_assignments - ] - self._ws.grants.update(SecurableType.TABLE, to_view_name, changes=grants_changes) - + self._recreate_view(to_view_name, view_text) + self._reapply_grants(from_view_name, to_view_name) if del_view: logger.info(f"Dropping source view {from_view_name}") drop_sql = f"DROP VIEW {from_view_name}" self._backend.execute(drop_sql) - return True except NotFound as err: if "[TABLE_OR_VIEW_NOT_FOUND]" in str(err): @@ -322,3 +314,8 @@ def _move_view( else: logger.error(f"Failed to move view {from_view_name}: {err!s}", exc_info=True) return False + + def _recreate_view(self, to_view_name, view_text): + create_sql = f"CREATE VIEW {to_view_name} AS {view_text}" + logger.info(f"Creating view {to_view_name}") + self._backend.execute(create_sql) diff --git a/src/databricks/labs/ucx/hive_metastore/table_size.py b/src/databricks/labs/ucx/hive_metastore/table_size.py index 8090de4740..a2fb88cd9d 100644 --- a/src/databricks/labs/ucx/hive_metastore/table_size.py +++ b/src/databricks/labs/ucx/hive_metastore/table_size.py @@ -19,8 +19,6 @@ class TableSize: class TableSizeCrawler(CrawlerBase): def __init__(self, backend: RuntimeBackend, schema): - from pyspark.sql.session import SparkSession # type: ignore[import-not-found] - """ Initializes a TablesSizeCrawler instance. @@ -28,6 +26,9 @@ def __init__(self, backend: RuntimeBackend, schema): backend (SqlBackend): The SQL Execution Backend abstraction (either REST API or Spark) schema: The schema name for the inventory persistence. """ + # pylint: disable-next=import-error,import-outside-toplevel + from pyspark.sql.session import SparkSession # type: ignore[import-not-found] + self._backend: RuntimeBackend = backend super().__init__(backend, "hive_metastore", schema, "table_size", TableSize) self._tables_crawler = TablesCrawler(backend, schema) @@ -69,14 +70,13 @@ def _safe_get_table_size(self, table_full_name: str) -> int | None: logger.debug(f"Evaluating {table_full_name} table size.") try: return self._spark._jsparkSession.table(table_full_name).queryExecution().analyzed().stats().sizeInBytes() - except Exception as e: + except Exception as e: # pylint: disable=broad-exception-caught if "[TABLE_OR_VIEW_NOT_FOUND]" in str(e) or "[DELTA_TABLE_NOT_FOUND]" in str(e): logger.warning(f"Failed to evaluate {table_full_name} table size. Table not found.") return None if "[DELTA_MISSING_TRANSACTION_LOG]" in str(e): logger.warning(f"Delta table {table_full_name} is corrupted: missing transaction log.") return None - except: # noqa: E722 logger.error(f"Failed to evaluate {table_full_name} table size: ", exc_info=True) return None diff --git a/src/databricks/labs/ucx/hive_metastore/tables.py b/src/databricks/labs/ucx/hive_metastore/tables.py index d4ccacbf17..91cc6e73ac 100644 --- a/src/databricks/labs/ucx/hive_metastore/tables.py +++ b/src/databricks/labs/ucx/hive_metastore/tables.py @@ -84,7 +84,7 @@ def is_dbfs_root(self) -> bool: def is_format_supported_for_sync(self) -> bool: if self.table_format is None: return False - return self.table_format.upper() in ("DELTA", "PARQUET", "CSV", "JSON", "ORC", "TEXT") + return self.table_format.upper() in {"DELTA", "PARQUET", "CSV", "JSON", "ORC", "TEXT"} @property def is_databricks_dataset(self) -> bool: @@ -224,7 +224,7 @@ def _describe(self, catalog: str, database: str, table: str) -> Table | None: ), storage_properties=self._parse_table_props(describe.get("Storage Properties", "").lower()), # type: ignore[arg-type] ) - except Exception as e: + except Exception as e: # pylint: disable=broad-exception-caught # TODO: https://github.com/databrickslabs/ucx/issues/406 logger.error(f"Couldn't fetch information for table {full_name} : {e}") return None diff --git a/src/databricks/labs/ucx/hive_metastore/udfs.py b/src/databricks/labs/ucx/hive_metastore/udfs.py index 9731b1bf98..339b4df2c3 100644 --- a/src/databricks/labs/ucx/hive_metastore/udfs.py +++ b/src/databricks/labs/ucx/hive_metastore/udfs.py @@ -99,6 +99,6 @@ def _describe(self, catalog: str, database: str, udf: str) -> Udf | None: comment=describe.get("Comment", "UNKNOWN"), body=describe.get("Body", "UNKNOWN"), ) - except Exception as e: + except Exception as e: # pylint: disable=broad-exception-caught logger.error(f"Couldn't fetch information for udf {full_name} : {e}") return None diff --git a/src/databricks/labs/ucx/install.py b/src/databricks/labs/ucx/install.py index 2595b1a9cb..70fd43062b 100644 --- a/src/databricks/labs/ucx/install.py +++ b/src/databricks/labs/ucx/install.py @@ -18,7 +18,7 @@ from databricks.labs.blueprint.tui import Prompts from databricks.labs.blueprint.wheels import ProductInfo, Wheels, find_project_root from databricks.sdk import WorkspaceClient -from databricks.sdk.errors import ( +from databricks.sdk.errors import ( # pylint: disable=redefined-builtin Aborted, AlreadyExists, BadRequest, @@ -135,7 +135,8 @@ def deploy_schema(sql_backend: SqlBackend, inventory_schema: str): - from databricks.labs import ucx + # we need to import it like this because we expect a module instance + from databricks.labs import ucx # pylint: disable=import-outside-toplevel deployer = SchemaDeployer(sql_backend, inventory_schema, ucx) deployer.deploy_schema() @@ -163,7 +164,7 @@ def deploy_schema(sql_backend: SqlBackend, inventory_schema: str): deployer.deploy_view("grant_detail", "queries/views/grant_detail.sql") -class WorkspaceInstaller: +class WorkspaceInstaller: # pylint: disable=too-many-instance-attributes def __init__( self, ws: WorkspaceClient, @@ -424,11 +425,10 @@ def _configure(self): ws_file_url = self.notebook_link(self.config_file) try: if "version: 1" in self._raw_previous_config(): - logger.info("old version detected, attempting to migrate to new config") self._config = self.current_config self._write_config(overwrite=True) return - elif "version: 2" in self._raw_previous_config(): + if "version: 2" in self._raw_previous_config(): logger.info(f"UCX is already configured. See {ws_file_url}") return except NotFound: @@ -574,6 +574,7 @@ def _deploy_workflow(self, step_name: str, settings): new_job = self._ws.jobs.create(**settings) assert new_job.job_id is not None self._state.jobs[step_name] = str(new_job.job_id) + return None def _deployed_steps_pre_v06(self): deployed_steps = {} @@ -846,8 +847,7 @@ def _check_policy_has_instance_pool(self, policy_id): instance_pool = def_json.get("instance_pool_id") if instance_pool is not None: return True - else: - return False + return False @staticmethod def _get_ext_hms_conf_from_policy(cluster_policy): @@ -880,37 +880,38 @@ def _readable_timedelta(epoch): time_parts.append("ago") if time_parts: return " ".join(time_parts) - else: - return "less than 1 second ago" + return "less than 1 second ago" def latest_job_status(self) -> list[dict]: latest_status = [] for step, job_id in self._state.jobs.items(): try: - job_state = None - start_time = None - job_runs = list(self._ws.jobs.list_runs(job_id=int(job_id), limit=1)) - if job_runs: - state = job_runs[0].state - job_state = None - if state and state.result_state: - job_state = state.result_state.name - elif state and state.life_cycle_state: - job_state = state.life_cycle_state.name - if job_runs[0].start_time: - start_time = job_runs[0].start_time / 1000 - latest_status.append( - { - "step": step, - "state": "UNKNOWN" if not (job_runs and job_state) else job_state, - "started": "" if not job_runs else self._readable_timedelta(start_time), - } - ) + step_status = self._step_status(job_id, step) + latest_status.append(step_status) except InvalidParameterValue as e: logger.warning(f"skipping {step}: {e}") continue return latest_status + def _step_status(self, job_id, step): + job_state = None + start_time = None + job_runs = list(self._ws.jobs.list_runs(job_id=int(job_id), limit=1)) + if job_runs: + state = job_runs[0].state + job_state = None + if state and state.result_state: + job_state = state.result_state.name + elif state and state.life_cycle_state: + job_state = state.life_cycle_state.name + if job_runs[0].start_time: + start_time = job_runs[0].start_time / 1000 + return { + "step": step, + "state": "UNKNOWN" if not (job_runs and job_state) else job_state, + "started": "" if not job_runs else self._readable_timedelta(start_time), + } + def _get_result_state(self, job_id): job_runs = list(self._ws.jobs.list_runs(job_id=job_id, limit=1)) latest_job_run = job_runs[0] @@ -921,25 +922,7 @@ def _get_result_state(self, job_id): def repair_run(self, workflow): try: - job_id = self._state.jobs.get(workflow) - if not job_id: - logger.warning(f"{workflow} job does not exists hence skipping Repair Run") - return - job_runs = list(self._ws.jobs.list_runs(job_id=job_id, limit=1)) - if not job_runs: - logger.warning(f"{workflow} job is not initialized yet. Can't trigger repair run now") - return - latest_job_run = job_runs[0] - retry_on_attribute_error = retried(on=[AttributeError], timeout=self._verify_timeout) - retried_check = retry_on_attribute_error(self._get_result_state) - state_value = retried_check(job_id) - - logger.info(f"The status for the latest run is {state_value}") - - if state_value != "FAILED": - logger.warning(f"{workflow} job is not in FAILED state hence skipping Repair Run") - return - run_id = latest_job_run.run_id + job_id, run_id = self._repair_workflow(workflow) run_details = self._ws.jobs.get_run(run_id=run_id, include_history=True) latest_repair_run_id = run_details.repair_history[-1].id job_url = f"{self._ws.config.host}#job/{job_id}/run/{run_id}" @@ -951,6 +934,23 @@ def repair_run(self, workflow): except TimeoutError: logger.warning(f"Skipping the {workflow} due to time out. Please try after sometime") + def _repair_workflow(self, workflow): + job_id = self._state.jobs.get(workflow) + if not job_id: + raise InvalidParameterValue("job does not exists hence skipping repair") + job_runs = list(self._ws.jobs.list_runs(job_id=job_id, limit=1)) + if not job_runs: + raise InvalidParameterValue("job is not initialized yet. Can't trigger repair run now") + latest_job_run = job_runs[0] + retry_on_attribute_error = retried(on=[AttributeError], timeout=self._verify_timeout) + retried_check = retry_on_attribute_error(self._get_result_state) + state_value = retried_check(job_id) + logger.info(f"The status for the latest run is {state_value}") + if state_value != "FAILED": + raise InvalidParameterValue("job is not in FAILED state hence skipping repair") + run_id = latest_job_run.run_id + return job_id, run_id + def uninstall(self): if self._prompts and not self._prompts.confirm( "Do you want to uninstall ucx from the workspace too, this would " @@ -1042,6 +1042,6 @@ def validate_and_run(self, step: str): logger = get_logger(__file__) logger.setLevel("INFO") - ws = WorkspaceClient(product="ucx", product_version=__version__) - installer = WorkspaceInstaller(ws, promtps=Prompts()) + workspace_client = WorkspaceClient(product="ucx", product_version=__version__) + installer = WorkspaceInstaller(workspace_client, promtps=Prompts()) installer.run() diff --git a/src/databricks/labs/ucx/installer/__init__.py b/src/databricks/labs/ucx/installer/__init__.py index bddcc84c7a..1a6ec614f3 100644 --- a/src/databricks/labs/ucx/installer/__init__.py +++ b/src/databricks/labs/ucx/installer/__init__.py @@ -3,12 +3,12 @@ from dataclasses import dataclass from pathlib import Path +import yaml from databricks.labs.blueprint.installer import IllegalState from databricks.labs.blueprint.parallel import ManyError, Threads from databricks.sdk import WorkspaceClient from databricks.sdk.errors import NotFound from databricks.sdk.service.iam import User -from yaml import YAMLError from databricks.labs.ucx.config import WorkspaceConfig @@ -48,7 +48,7 @@ def for_user(self, user: User) -> Installation: except TypeError: msg = f"Installation is corrupt for {user.user_name}" raise IllegalState(msg) from None - except YAMLError: + except yaml.YAMLError: msg = f"Config file {config_file} is corrupted, check if it is in correct yaml format" raise IllegalState(msg) from None diff --git a/src/databricks/labs/ucx/mixins/fixtures.py b/src/databricks/labs/ucx/mixins/fixtures.py index 0dc2796a52..845b079f94 100644 --- a/src/databricks/labs/ucx/mixins/fixtures.py +++ b/src/databricks/labs/ucx/mixins/fixtures.py @@ -10,7 +10,7 @@ from collections.abc import Callable, Generator, MutableMapping from datetime import timedelta from pathlib import Path -from typing import BinaryIO, Optional +from typing import BinaryIO import pytest from databricks.sdk import AccountClient, WorkspaceClient @@ -37,6 +37,9 @@ from databricks.labs.ucx.framework.crawlers import StatementExecutionBackend +# this file will get to databricks-labs-pytester project and be maintained/refactored there +# pylint: disable=redefined-outer-name,too-many-try-statements,import-outside-toplevel,unnecessary-lambda + logger = logging.getLogger(__name__) @@ -325,10 +328,9 @@ def __init__(self, object_id: str, before: list[iam.AccessControlRequest], after def _principal(acr: iam.AccessControlRequest) -> str: if acr.user_name is not None: return f"user_name {acr.user_name}" - elif acr.group_name is not None: + if acr.group_name is not None: return f"group_name {acr.group_name}" - else: - return f"service_principal_name {acr.service_principal_name}" + return f"service_principal_name {acr.service_principal_name}" def _list(self, acl: list[iam.AccessControlRequest]): return ", ".join(f"{self._principal(_)} {_.permission_level.value}" for _ in acl) @@ -347,8 +349,7 @@ def __init__(self, object_id: str, before: list[sql.AccessControl], after: list[ def _principal(acr: sql.AccessControl) -> str: if acr.user_name is not None: return f"user_name {acr.user_name}" - else: - return f"group_name {acr.group_name}" + return f"group_name {acr.group_name}" def _list(self, acl: list[sql.AccessControl]): return ", ".join(f"{self._principal(_)} {_.permission_level.value}" for _ in acl) @@ -385,7 +386,7 @@ def create( group_name: str | None = None, user_name: str | None = None, service_principal_name: str | None = None, - access_control_list: Optional["list[iam.AccessControlRequest]"] = None, + access_control_list: list[iam.AccessControlRequest] | None = None, ): nothing_specified = permission_level is None and access_control_list is None both_specified = permission_level is not None and access_control_list is not None @@ -456,7 +457,7 @@ def create( permission_level: sql.PermissionLevel | None = None, group_name: str | None = None, user_name: str | None = None, - access_control_list: Optional["list[sql.AccessControl]"] = None, + access_control_list: list[sql.AccessControl] | None = None, ): nothing_specified = permission_level is None and access_control_list is None both_specified = permission_level is not None and access_control_list is not None @@ -831,10 +832,7 @@ def create( def _is_in_debug() -> bool: - return os.path.basename(sys.argv[0]) in [ - "_jb_pytest_runner.py", - "testlauncher.py", - ] + return os.path.basename(sys.argv[0]) in {"_jb_pytest_runner.py", "testlauncher.py"} @pytest.fixture @@ -937,6 +935,7 @@ def create(*, catalog_name: str = "hive_metastore", name: str | None = None) -> @pytest.fixture +# pylint: disable-next=too-many-statements def make_table(ws, sql_backend, make_schema, make_random) -> Generator[Callable[..., TableInfo], None, None]: def create( *, @@ -956,7 +955,7 @@ def create( schema_name = schema.name if name is None: name = f"ucx_T{make_random(4)}".lower() - table_type = None + table_type: TableType | None = None data_source_format = None storage_location = None full_name = f"{catalog_name}.{schema_name}.{name}".lower() @@ -967,7 +966,7 @@ def create( # temporary (if not view) ddl = f"{ddl} AS {ctas}" elif non_delta: - table_type = TableType.MANAGED + table_type = TableType.MANAGED # pylint: disable=redefined-variable-type data_source_format = DataSourceFormat.JSON storage_location = "dbfs:/databricks-datasets/iot-stream/data-device" ddl = f"{ddl} USING json LOCATION '{storage_location}'" diff --git a/src/databricks/labs/ucx/mixins/redash.py b/src/databricks/labs/ucx/mixins/redash.py index b172509f67..c968b55e7c 100644 --- a/src/databricks/labs/ucx/mixins/redash.py +++ b/src/databricks/labs/ucx/mixins/redash.py @@ -1,9 +1,11 @@ -import dataclasses from dataclasses import dataclass -from typing import Any, Optional +from typing import Any from databricks.sdk.service._internal import _from_dict -from databricks.sdk.service.sql import Visualization, Widget +from databricks.sdk.service.sql import Visualization, Widget, WidgetPosition + +# this file is going away soon +# pylint: disable=redefined-builtin @dataclass @@ -12,7 +14,7 @@ class WidgetOptions: description: str | None = None is_hidden: bool | None = None parameter_mappings: Any | None = None - position: Optional["WidgetPosition"] = None + position: WidgetPosition | None = None title: str | None = None updated_at: str | None = None @@ -47,42 +49,6 @@ def from_dict(cls, d: dict[str, Any]) -> "WidgetOptions": ) -@dataclass -class WidgetPosition: - """Coordinates of this widget on a dashboard. This portion of the API changes frequently and is - unsupported.""" - - auto_height: bool | None = None - col: int | None = None - row: int | None = None - size_x: int | None = None - size_y: int | None = None - - def as_dict(self) -> dict: - body: dict[str, bool | int] = {} - if self.auto_height is not None: - body["autoHeight"] = self.auto_height - if self.col is not None: - body["col"] = self.col - if self.row is not None: - body["row"] = self.row - if self.size_x is not None: - body["sizeX"] = self.size_x - if self.size_y is not None: - body["sizeY"] = self.size_y - return body - - @classmethod - def from_dict(cls, d: dict[str, Any]) -> "WidgetPosition": - return cls( - auto_height=d.get("autoHeight", None), - col=d.get("col", None), - row=d.get("row", None), - size_x=d.get("sizeX", None), - size_y=d.get("sizeY", None), - ) - - class DashboardWidgetsAPI: """This is an evolving API that facilitates the addition and removal of widgets from existing dashboards within the Databricks Workspace. Data structures may change over time.""" @@ -218,55 +184,3 @@ def delete(self, id: str): "Accept": "application/json", } self._api.do("DELETE", f"/api/2.0/preview/sql/visualizations/{id}", headers=headers) - - -@dataclass -class VizColumn: - name: str - title: str - type: str = "string" - imageUrlTemplate: str = "{{ @ }}" - imageTitleTemplate: str = "{{ @ }}" - linkUrlTemplate: str = "{{ @ }}" - linkTextTemplate: str = "{{ @ }}" - linkTitleTemplate: str = "{{ @ }}" - linkOpenInNewTab: bool = True - displayAs: str = "string" - visible: bool = True - order: int = 100000 - allowSearch: bool = False - alignContent: str = "left" - allowHTML: bool = False - highlightLinks: bool = False - useMonospaceFont: bool = False - preserveWhitespace: bool = False - - def as_dict(self): - return dataclasses.asdict(self) - - -class QueryVisualizationsExt(QueryVisualizationsAPI): - def create_table( - self, - query_id: str, - name: str, - columns: list[VizColumn], - *, - items_per_page: int = 25, - condensed=True, - with_row_number=False, - description: str | None = None, - ): - return self.create( - query_id, - "TABLE", - { - "itemsPerPage": items_per_page, - "condensed": condensed, - "withRowNumber": with_row_number, - "version": 2, - "columns": [x.as_dict() for x in columns], - }, - name=name, - description=description, - ) diff --git a/src/databricks/labs/ucx/mixins/sql.py b/src/databricks/labs/ucx/mixins/sql.py index e856de1c6e..12a03f51d7 100644 --- a/src/databricks/labs/ucx/mixins/sql.py +++ b/src/databricks/labs/ucx/mixins/sql.py @@ -60,9 +60,9 @@ def __getattr__(self, col): idx = self.__columns__.index(col) return self[idx] except IndexError: - raise AttributeError(col) # noqa: B904 + raise AttributeError(col) from None except ValueError: - raise AttributeError(col) # noqa: B904 + raise AttributeError(col) from None def __repr__(self): return f"Row({', '.join(f'{k}={v}' for (k, v) in zip(self.__columns__, self, strict=True))})" @@ -194,10 +194,7 @@ def execute( ) status_message = f"current status: {state.value}" self._raise_if_needed(result_status) - sleep = attempt - if sleep > MAX_SLEEP_PER_ATTEMPT: - # sleep 10s max per attempt - sleep = MAX_SLEEP_PER_ATTEMPT + sleep = min(attempt, MAX_SLEEP_PER_ATTEMPT) _LOG.debug(f"SQL statement {statement_id}: {status_message} (sleeping ~{sleep}s)") time.sleep(sleep + random.random()) attempt += 1 @@ -218,6 +215,32 @@ def execute_fetch_all( execute_response = self.execute( warehouse_id, statement, byte_limit=byte_limit, catalog=catalog, schema=schema, timeout=timeout ) + col_conv, row_factory = self._row_converters(execute_response) + result_data = execute_response.result + if result_data is None: + return + while True: + data_array = result_data.data_array + if not data_array: + data_array = [] + for data in data_array: + # enumerate() + iterator + tuple constructor makes it more performant + # on larger humber of records for Python, even though it's less + # readable code. + row = [] + for i, value in enumerate(data): + if value is None: + row.append(None) + else: + row.append(col_conv[i](value)) + yield row_factory(row) + if result_data.next_chunk_index is None: + return + # TODO: replace once ES-828324 is fixed + json_response = self._api.do("GET", result_data.next_chunk_internal_link) + result_data = ResultData.from_dict(json_response) + + def _row_converters(self, execute_response): col_names = [] col_conv = [] manifest = execute_response.manifest @@ -242,26 +265,4 @@ def execute_fetch_all( raise ValueError(msg) col_conv.append(conv) row_factory = type("Row", (Row,), {"__columns__": col_names}) - result_data = execute_response.result - if result_data is None: - return [] - while True: - data_array = result_data.data_array - if not data_array: - data_array = [] - for data in data_array: - # enumerate() + iterator + tuple constructor makes it more performant - # on larger humber of records for Python, even though it's less - # readable code. - row = [] - for i, value in enumerate(data): - if value is None: - row.append(None) - else: - row.append(col_conv[i](value)) - yield row_factory(row) - if result_data.next_chunk_index is None: - return - # TODO: replace once ES-828324 is fixed - json_response = self._api.do("GET", result_data.next_chunk_internal_link) - result_data = ResultData.from_dict(json_response) + return col_conv, row_factory diff --git a/src/databricks/labs/ucx/workspace_access/generic.py b/src/databricks/labs/ucx/workspace_access/generic.py index 081a2bb599..6bf400f666 100644 --- a/src/databricks/labs/ucx/workspace_access/generic.py +++ b/src/databricks/labs/ucx/workspace_access/generic.py @@ -5,7 +5,6 @@ from dataclasses import dataclass from datetime import timedelta from functools import partial -from typing import Optional from databricks.labs.blueprint.limiter import rate_limited from databricks.labs.blueprint.parallel import ManyError, Threads @@ -91,7 +90,7 @@ def get_apply_task(self, item: Permissions, migration_state: MigrationState): @staticmethod def _is_item_relevant(item: Permissions, migration_state: MigrationState) -> bool: # passwords and tokens are represented on the workspace-level - if item.object_id in ("tokens", "passwords"): + if item.object_id in {"tokens", "passwords"}: return True object_permissions = iam.ObjectPermissions.from_dict(json.loads(item.raw)) assert object_permissions.access_control_list is not None @@ -104,7 +103,7 @@ def _is_item_relevant(item: Permissions, migration_state: MigrationState) -> boo @staticmethod def _response_to_request( - acls: Optional["list[iam.AccessControlResponse]"] = None, + acls: list[iam.AccessControlResponse] | None = None, ) -> list[iam.AccessControlRequest]: results: list[iam.AccessControlRequest] = [] if not acls: @@ -128,12 +127,11 @@ def _inflight_check(self, object_type: str, object_id: str, acl: list[iam.Access remote_permission_as_request = self._response_to_request(remote_permission.access_control_list) if all(elem in remote_permission_as_request for elem in acl): return True - else: - msg = f"""Couldn't apply appropriate permission for object type {object_type} with id {object_id} - acl to be applied={acl} - acl found in the object={remote_permission_as_request} - """ - raise ValueError(msg) + msg = f"""Couldn't apply appropriate permission for object type {object_type} with id {object_id} + acl to be applied={acl} + acl found in the object={remote_permission_as_request} + """ + raise ValueError(msg) return False @rate_limited(max_requests=30) @@ -314,6 +312,7 @@ def __init__( self._inventory_database = inventory_database def _crawl(self) -> Iterable[WorkspaceObjectInfo]: + # pylint: disable-next=import-outside-toplevel,redefined-outer-name from databricks.labs.ucx.workspace_access.listing import WorkspaceListing ws_listing = WorkspaceListing(self._ws, num_threads=self._num_threads, with_directories=False) @@ -384,9 +383,7 @@ def inner() -> Iterator[ml.ModelDatabricks]: def experiments_listing(ws: WorkspaceClient): def inner() -> Iterator[ml.Experiment]: for experiment in ws.experiments.list_experiments(): - """ - We filter-out notebook-based experiments, because they are covered by notebooks listing - """ + # We filter-out notebook-based experiments, because they are covered by notebooks listing in # workspace-based notebook experiment if experiment.tags: nb_tag = [t for t in experiment.tags if t.key == "mlflow.experimentType" and t.value == "NOTEBOOK"] @@ -403,5 +400,5 @@ def inner() -> Iterator[ml.Experiment]: def tokens_and_passwords(): - for _value in ["tokens", "passwords"]: + for _value in ("tokens", "passwords"): yield GenericPermissionsInfo(_value, "authorization") diff --git a/src/databricks/labs/ucx/workspace_access/groups.py b/src/databricks/labs/ucx/workspace_access/groups.py index ae8d763e0f..1ecaa8031d 100644 --- a/src/databricks/labs/ucx/workspace_access/groups.py +++ b/src/databricks/labs/ucx/workspace_access/groups.py @@ -77,8 +77,7 @@ def get_temp_principal(self, name: str) -> str | None: def is_in_scope(self, name: str) -> bool: if name is None: return False - else: - return name in self._name_to_group + return name in self._name_to_group def __len__(self): return len(self._name_to_group) @@ -119,12 +118,10 @@ def _safe_match(group_name: str, match_re: str) -> str: match = re.search(match_re, group_name) if not match: return group_name - else: - match_groups = match.groups() + match_groups = match.groups() if match_groups: return match_groups[0] - else: - return match.group() + return match.group() except re.error: return group_name @@ -299,7 +296,7 @@ def generate_migrated_groups(self): class GroupManager(CrawlerBase[MigratedGroup]): _SYSTEM_GROUPS: ClassVar[list[str]] = ["users", "admins", "account users"] - def __init__( + def __init__( # pylint: disable=too-many-arguments self, sql_backend: SqlBackend, ws: WorkspaceClient, diff --git a/src/databricks/labs/ucx/workspace_access/listing.py b/src/databricks/labs/ucx/workspace_access/listing.py index 59f3c42b56..bf725db84f 100644 --- a/src/databricks/labs/ucx/workspace_access/listing.py +++ b/src/databricks/labs/ucx/workspace_access/listing.py @@ -84,7 +84,7 @@ def walk(self, start_path="/"): initial_future.add_done_callback(self._progress_report) futures_to_objects = {initial_future: root_object} while futures_to_objects: - futures_done, futures_not_done = wait(futures_to_objects, return_when=FIRST_COMPLETED) + futures_done, _ = wait(futures_to_objects, return_when=FIRST_COMPLETED) for future in futures_done: futures_to_objects.pop(future) diff --git a/src/databricks/labs/ucx/workspace_access/redash.py b/src/databricks/labs/ucx/workspace_access/redash.py index 23b45d4fc4..2c0a4d1ee8 100644 --- a/src/databricks/labs/ucx/workspace_access/redash.py +++ b/src/databricks/labs/ucx/workspace_access/redash.py @@ -149,13 +149,12 @@ def _inflight_check(self, object_type: sql.ObjectTypePlural, object_id: str, acl assert remote_permission.access_control_list is not None if all(elem in remote_permission.access_control_list for elem in acl): return True - else: - msg = f""" - Couldn't apply appropriate permission for object type {object_type} with id {object_id} - acl to be applied={acl} - acl found in the object={remote_permission} - """ - raise ValueError(msg) + msg = f""" + Couldn't apply appropriate permission for object type {object_type} with id {object_id} + acl to be applied={acl} + acl found in the object={remote_permission} + """ + raise ValueError(msg) return False @rate_limited(max_requests=30) @@ -236,12 +235,11 @@ def hash_permissions(permissions: list[sql.AccessControl]): assert res.access_control_list is not None if hash_permissions(acl).issubset(hash_permissions(res.access_control_list)): return res - else: - msg = ( - f"Failed to set permission and will be retried for {object_type} {object_id}, " - f"doing another attempt..." - ) - raise ValueError(msg) + msg = ( + f"Failed to set permission and will be retried for {object_type} {object_id}, " + f"doing another attempt..." + ) + raise ValueError(msg) except PermissionDenied: logger.warning(f"Permission denied: {object_type} {object_id}") return None diff --git a/src/databricks/labs/ucx/workspace_access/verification.py b/src/databricks/labs/ucx/workspace_access/verification.py index 134033756d..6e1e3293a8 100644 --- a/src/databricks/labs/ucx/workspace_access/verification.py +++ b/src/databricks/labs/ucx/workspace_access/verification.py @@ -106,10 +106,10 @@ def verify_metastore(self): self.metastore_id = current_metastore.metastore_id self.workspace_id = current_metastore.workspace_id return True - else: - raise MetastoreNotFoundError + raise MetastoreNotFoundError except PermissionDenied: logger.error("Permission Denied while trying to access metastore") + return False class MetastoreNotFoundError(Exception): diff --git a/tests/integration/framework/test_dashboards.py b/tests/integration/framework/test_dashboards.py deleted file mode 100644 index 29c295fc33..0000000000 --- a/tests/integration/framework/test_dashboards.py +++ /dev/null @@ -1,63 +0,0 @@ -import pytest -from databricks.sdk import WorkspaceClient -from databricks.sdk.service.sql import ( - AccessControl, - ObjectTypePlural, - PermissionLevel, - RunAsRole, -) - -from databricks.labs.ucx.mixins.redash import ( - DashboardWidgetsAPI, - QueryVisualizationsExt, - VizColumn, - WidgetOptions, - WidgetPosition, -) - - -@pytest.mark.skip("not working") -def test_creating_widgets(ws: WorkspaceClient, make_warehouse, env_or_skip): - dashboard_widgets_api = DashboardWidgetsAPI(ws.api_client) - query_visualizations_api = QueryVisualizationsExt(ws.api_client) - - x = ws.dashboards.create(name="test dashboard") - assert x.id is not None - ws.dbsql_permissions.set( - ObjectTypePlural.DASHBOARDS, - x.id, - access_control_list=[AccessControl(group_name="users", permission_level=PermissionLevel.CAN_MANAGE)], - ) - - dashboard_widgets_api.create( - x.id, - WidgetOptions( - title="first widget", - description="description of the widget", - position=WidgetPosition(col=0, row=0, size_x=3, size_y=3), - ), - text="this is _some_ **markdown**", - width=1, - ) - - dashboard_widgets_api.create( - x.id, - WidgetOptions(title="second", position=WidgetPosition(col=0, row=3, size_x=3, size_y=3)), - text="another text", - width=1, - ) - - data_sources = {x.warehouse_id: x.id for x in ws.data_sources.list()} - warehouse_id = env_or_skip("TEST_DEFAULT_WAREHOUSE_ID") - - query = ws.queries.create( - data_source_id=data_sources[warehouse_id], - description="abc", - name="this is a test query", - query="SHOW DATABASES", - run_as_role=RunAsRole.VIEWER, - ) - - assert query.id is not None - y = query_visualizations_api.create_table(query.id, "ABC Viz", [VizColumn(name="databaseName", title="DB")]) - print(y) diff --git a/tests/unit/assessment/__init__.py b/tests/unit/assessment/__init__.py index e69de29bb2..b3d6ad6560 100644 --- a/tests/unit/assessment/__init__.py +++ b/tests/unit/assessment/__init__.py @@ -0,0 +1,31 @@ +import json +import pathlib +from unittest.mock import create_autospec + +from databricks.sdk import WorkspaceClient +from databricks.sdk.service.compute import ClusterDetails, Policy + +__dir = pathlib.Path(__file__).parent + + +def _load_fixture(filename: str): + with (__dir / filename).open("r") as f: + return json.load(f) + + +def _load_list(cls: type, filename: str): + return [cls.from_dict(_) for _ in _load_fixture(filename)] # type: ignore[attr-defined] + + +def _cluster_policy(policy_id: str): + fixture = _load_fixture(f"policies/{policy_id}.json") + definition = json.dumps(fixture["definition"]) + overrides = json.dumps(fixture["policy_family_definition_overrides"]) + return Policy(description=definition, policy_family_definition_overrides=overrides) + + +def workspace_client_mock(clusters="no-spark-conf.json"): + ws = create_autospec(WorkspaceClient) + ws.clusters.list.return_value = _load_list(ClusterDetails, f"../assessment/clusters/{clusters}") + ws.cluster_policies.get = _cluster_policy + return ws diff --git a/tests/unit/assessment/clusters/no-spark-conf.json b/tests/unit/assessment/clusters/no-spark-conf.json new file mode 100644 index 0000000000..144354d96b --- /dev/null +++ b/tests/unit/assessment/clusters/no-spark-conf.json @@ -0,0 +1,12 @@ +[ + { + "autoscale": { + "max_workers": 6, + "min_workers": 1 + }, + "cluster_id": "0915-190044-3dqy6751", + "cluster_name": "Tech Summit FY24 Cluster-2", + "policy_id": "single-user-with-spn", + "spark_version": "13.3.x-cpu-ml-scala2.12" + } +] \ No newline at end of file diff --git a/tests/unit/assessment/policies/single-user-with-spn.json b/tests/unit/assessment/policies/single-user-with-spn.json new file mode 100644 index 0000000000..37b6726233 --- /dev/null +++ b/tests/unit/assessment/policies/single-user-with-spn.json @@ -0,0 +1,82 @@ +{ + "definition": { + "node_type_id": { + "type": "allowlist", + "values": [ + "Standard_DS3_v2", + "Standard_DS4_v2", + "Standard_DS5_v2", + "Standard_NC4as_T4_v3" + ], + "defaultValue": "Standard_DS3_v2" + }, + "spark_version": { + "type": "unlimited", + "defaultValue": "auto:latest-ml" + }, + "runtime_engine": { + "type": "fixed", + "value": "STANDARD", + "hidden": true + }, + "num_workers": { + "type": "fixed", + "value": 0, + "hidden": true + }, + "data_security_mode": { + "type": "allowlist", + "values": [ + "SINGLE_USER", + "LEGACY_SINGLE_USER", + "LEGACY_SINGLE_USER_STANDARD" + ], + "defaultValue": "SINGLE_USER", + "hidden": true + }, + "driver_instance_pool_id": { + "type": "forbidden", + "hidden": true + }, + "cluster_type": { + "type": "fixed", + "value": "all-purpose" + }, + "instance_pool_id": { + "type": "forbidden", + "hidden": true + }, + "azure_attributes.availability": { + "type": "fixed", + "value": "ON_DEMAND_AZURE", + "hidden": true + }, + "spark_conf.spark.databricks.cluster.profile": { + "type": "fixed", + "value": "singleNode", + "hidden": true + }, + "autotermination_minutes": { + "type": "unlimited", + "defaultValue": 4320, + "isOptional": true + } + }, + "policy_family_definition_overrides": { + "not.spark.conf": { + "type": "fixed", + "value": "OAuth", + "hidden": true + }, + "not.a.type": { + "type": "fixed", + "value": "not.a.matching.type", + "hidden": true + }, + "not.a.matching.type": { + "type": "fixed", + "value": "https://login.microsoftonline.com/1234ededed/oauth2/token", + "hidden": true + } + } +} \ No newline at end of file diff --git a/tests/unit/assessment/test_clusters.py b/tests/unit/assessment/test_clusters.py index 0b930bd473..6984ee4209 100644 --- a/tests/unit/assessment/test_clusters.py +++ b/tests/unit/assessment/test_clusters.py @@ -1,6 +1,7 @@ -from unittest.mock import Mock +from unittest.mock import Mock, create_autospec import pytest +from databricks.sdk import WorkspaceClient from databricks.sdk.errors import DatabricksError, InternalError, NotFound from databricks.sdk.service.compute import ( AutoScale, @@ -15,6 +16,7 @@ from databricks.labs.ucx.assessment.clusters import ClusterInfo, ClustersCrawler from ..framework.mocks import MockBackend +from . import workspace_client_mock def test_cluster_assessment(mocker): @@ -106,51 +108,10 @@ def test_cluster_assessment(mocker): assert result_set[3].success == 0 -def test_cluster_assessment_cluster_policy_no_spark_conf(mocker): - sample_clusters1 = [ - ClusterDetails( - cluster_name="Tech Summit FY24 Cluster-2", - autoscale=AutoScale(min_workers=1, max_workers=6), - spark_context_id=5134472582179565315, - spark_env_vars=None, - policy_id="D96308F1BF0003A8", - spark_version="13.3.x-cpu-ml-scala2.12", - cluster_id="0915-190044-3dqy6751", - ) - ] - ws = Mock() - ws.cluster_policies.get().definition = ( - '{"node_type_id":{"type":"allowlist","values":["Standard_DS3_v2",' - '"Standard_DS4_v2","Standard_DS5_v2","Standard_NC4as_T4_v3"],"defaultValue":' - '"Standard_DS3_v2"},"spark_version":{"type":"unlimited","defaultValue":"auto:latest-ml"},' - '"runtime_engine":{"type":"fixed","value":"STANDARD","hidden":true},' - '"num_workers":{"type":"fixed","value":0,"hidden":true},"data_security_mode":' - '{"type":"allowlist","values":["SINGLE_USER","LEGACY_SINGLE_USER","LEGACY_SINGLE_USER_STANDARD"],' - '"defaultValue":"SINGLE_USER","hidden":true},"driver_instance_pool_id":{"type":"forbidden","hidden":true},' - '"cluster_type":{"type":"fixed","value":"all-purpose"},"instance_pool_id":{"type":"forbidden","hidden":true},' - '"azure_attributes.availability":{"type":"fixed","value":"ON_DEMAND_AZURE","hidden":true},' - '"spark_conf.spark.databricks.cluster.profile":{"type":"fixed","value":"singleNode","hidden":true},' - '"autotermination_minutes":{"type":"unlimited","defaultValue":4320,"isOptional":true}}' - ) - - ws.cluster_policies.get().policy_family_definition_overrides = ( - '{\n "not.spark.conf": {\n ' - '"type": "fixed",\n "value": "OAuth",\n ' - ' "hidden": true\n },\n "not.a.type": {\n ' - ' "type": "fixed",\n "value": ' - '"not.a.matching.type",\n ' - '"hidden": true\n },\n "not.a.matching.type": {\n ' - '"type": "fixed",\n "value": "fsfsfsfsffsfsf",\n "hidden": true\n },\n ' - '"not.a.matching.type": {\n "type": "fixed",\n ' - '"value": "gfgfgfgfggfggfgfdds",\n "hidden": true\n },\n ' - '"not.a.matching.type": {\n ' - '"type": "fixed",\n ' - '"value": "https://login.microsoftonline.com/1234ededed/oauth2/token",\n ' - '"hidden": true\n }\n}' - ) - - crawler = ClustersCrawler(ws, MockBackend(), "ucx")._assess_clusters(sample_clusters1) - result_set1 = list(crawler) +def test_cluster_assessment_cluster_policy_no_spark_conf(): + ws = workspace_client_mock(clusters="no-spark-conf.json") + crawler = ClustersCrawler(ws, MockBackend(), "ucx") + result_set1 = list(crawler.snapshot()) assert len(result_set1) == 1 assert result_set1[0].success == 1 @@ -387,9 +348,10 @@ def test_cluster_init_script_check_dbfs(mocker): ], ) ] - ws = mocker.Mock() + ws = create_autospec(WorkspaceClient) ws.clusters.list.return_value = sample_clusters ws.dbfs.read().data = "JXNoCmVjaG8gIj0=" + ws.workspace.export().content = "JXNoCmVjaG8gIj0=" init_crawler = ClustersCrawler(ws, MockBackend(), "ucx").snapshot() assert len(init_crawler) == 1 diff --git a/tests/unit/assessment/test_jobs.py b/tests/unit/assessment/test_jobs.py index 8837aa6534..2e87b57c7f 100644 --- a/tests/unit/assessment/test_jobs.py +++ b/tests/unit/assessment/test_jobs.py @@ -1,6 +1,7 @@ -from unittest.mock import Mock +from unittest.mock import Mock, create_autospec import pytest +from databricks.sdk import WorkspaceClient from databricks.sdk.errors import DatabricksError, InternalError, NotFound from databricks.sdk.service.compute import ( AutoScale, @@ -520,8 +521,9 @@ def test_job_cluster_init_script(): cluster_source=ClusterSource.JOB, ) ] - ws = Mock() + ws = create_autospec(WorkspaceClient) ws.workspace.export().content = "JXNoCmVjaG8gIj0=" + ws.dbfs.read().data = "JXNoCmVjaG8gIj0=" result_set = JobsCrawler(ws, MockBackend(), "ucx")._assess_jobs( sample_jobs, {c.cluster_id: c for c in sample_clusters} ) @@ -630,8 +632,9 @@ def test_job_cluster_init_script_check_dbfs(): cluster_source=ClusterSource.JOB, ) ] - ws = Mock() + ws = create_autospec(WorkspaceClient) ws.workspace.export().content = "JXNoCmVjaG8gIj0=" + ws.dbfs.read().data = "JXNoCmVjaG8gIj0=" result_set = JobsCrawler(ws, MockBackend(), "ucx")._assess_jobs( sample_jobs, {c.cluster_id: c for c in sample_clusters} ) diff --git a/tests/unit/framework/test_crawlers.py b/tests/unit/framework/test_crawlers.py index ecea45298e..a813dbc1c2 100644 --- a/tests/unit/framework/test_crawlers.py +++ b/tests/unit/framework/test_crawlers.py @@ -230,13 +230,13 @@ def test_runtime_backend_save_table_with_row_containing_none_with_nullable_class @dataclass -class TestClass: +class DummyClass: key: str value: str | None = None def test_save_table_with_not_null_constraint_violated(mocker): - rows = [TestClass("1", "test"), TestClass("2", None), TestClass(None, "value")] + rows = [DummyClass("1", "test"), DummyClass("2", None), DummyClass(None, "value")] with mock.patch.dict(os.environ, {"DATABRICKS_RUNTIME_VERSION": "14.0"}): pyspark_sql_session = mocker.Mock() @@ -245,7 +245,7 @@ def test_save_table_with_not_null_constraint_violated(mocker): rb = RuntimeBackend() with pytest.raises(Exception) as exc_info: - rb.save_table("a.b.c", rows, TestClass) + rb.save_table("a.b.c", rows, DummyClass) assert ( str(exc_info.value) == "Not null constraint violated for column key, row = {'key': None, 'value': 'value'}" diff --git a/tests/unit/framework/test_tasks.py b/tests/unit/framework/test_tasks.py new file mode 100644 index 0000000000..68af48ce99 --- /dev/null +++ b/tests/unit/framework/test_tasks.py @@ -0,0 +1,41 @@ +import logging + +import pytest + +from databricks.labs.ucx.framework.tasks import TaskLogger + + +def test_task_logger(tmp_path): + app_logger = logging.getLogger("databricks.labs.ucx.foo") + databricks_logger = logging.getLogger("databricks.sdk.core") + with TaskLogger(tmp_path, "assessment", "123", "crawl-tables", "234") as task_logger: + app_logger.info(f"log file is {task_logger._log_file}") + databricks_logger.debug("something from sdk") + contents = _log_contents(tmp_path) + assert 2 == len(contents) + assert "log file is" in contents["logs/assessment/run-234/crawl-tables.log"] + assert "something from sdk" in contents["logs/assessment/run-234/crawl-tables.log"] + assert "[run #234](/#job/123/run/234)" in contents["logs/assessment/run-234/README.md"] + + +def test_task_failure(tmp_path): + with pytest.raises(ValueError): + with TaskLogger(tmp_path, "assessment", "123", "crawl-tables", "234"): + raise ValueError("some value not found") + contents = _log_contents(tmp_path) + assert 2 == len(contents) + # CLI debug info present + assert "databricks workspace export" in contents["logs/assessment/run-234/crawl-tables.log"] + # log file name present + assert "logs/assessment/run-234/crawl-tables.log" in contents["logs/assessment/run-234/crawl-tables.log"] + # traceback present + assert 'raise ValueError("some value not found")' in contents["logs/assessment/run-234/crawl-tables.log"] + + +def _log_contents(tmp_path): + contents = {} + for path in tmp_path.glob("**/*"): + if path.is_dir(): + continue + contents[path.relative_to(tmp_path).as_posix()] = path.read_text() + return contents diff --git a/tests/unit/hive_metastore/test_table_move.py b/tests/unit/hive_metastore/test_table_move.py index 1c8bbe1940..2ab04b1517 100644 --- a/tests/unit/hive_metastore/test_table_move.py +++ b/tests/unit/hive_metastore/test_table_move.py @@ -132,8 +132,7 @@ def test_move_tables_not_found_view_unknown_error(mocker, caplog): assert len([rec.message for rec in caplog.records if "unknown error" in rec.message]) == 1 -def test_move_all_tables_and_drop_source(caplog): - caplog.set_level(logging.INFO) +def test_move_all_tables_and_drop_source(): client = create_autospec(WorkspaceClient) client.tables.list.return_value = [ @@ -221,32 +220,21 @@ def target_tables_mapping(full_name): tm = TableMove(client, backend) tm.move_tables("SrcC", "SrcS", "*", "TgtC", "TgtS", True) - expected_messages = [ - "Moved 2 tables to the new schema TgtS.", - "Moved 2 views to the new schema TgtS.", - "Creating table TgtC.TgtS.table1", - "Applying grants on table TgtC.TgtS.table1", - "Dropping source table SrcC.SrcS.table1", - "Creating table TgtC.TgtS.table2", - "Dropping source table SrcC.SrcS.table2", - "Creating view TgtC.TgtS.view1", - "Applying grants on view TgtC.TgtS.view1", - "Dropping source view SrcC.SrcS.view1", - "Creating view TgtC.TgtS.view2", - "Dropping source view SrcC.SrcS.view2", - ] - - log_count = 0 - for rec in caplog.records: - print(rec) - if rec.message in expected_messages: - log_count += 1 - - assert log_count == len(expected_messages) - - -def test_move_one_table_without_dropping_source(caplog): - caplog.set_level(logging.INFO) + assert [ + "CREATE TABLE SrcC.SrcS.table1 (name string)", + "CREATE TABLE TgtC.TgtS.table1 (name string)", + "CREATE VIEW TgtC.TgtS.view1 AS SELECT * FROM SrcC.SrcS.table1", + "CREATE VIEW TgtC.TgtS.view2 AS SELECT * FROM SrcC.SrcS.table1", + "DROP TABLE SrcC.SrcS.table1", + "DROP TABLE SrcC.SrcS.table2", + "DROP VIEW SrcC.SrcS.view1", + "DROP VIEW SrcC.SrcS.view2", + "SHOW CREATE TABLE SrcC.SrcS.table1", + "SHOW CREATE TABLE SrcC.SrcS.table2", + ] == sorted(backend.queries) + + +def test_move_one_table_without_dropping_source(): client = create_autospec(WorkspaceClient) client.tables.list.return_value = [ @@ -281,16 +269,6 @@ def test_move_one_table_without_dropping_source(caplog): tm = TableMove(client, backend) tm.move_tables("SrcC", "SrcS", "table1", "TgtC", "TgtS", False) - expected_messages = [ - "Moved 1 tables to the new schema TgtS.", - "Moved 0 views to the new schema TgtS.", - "Creating table TgtC.TgtS.table1", - ] - - log_count = 0 - for rec in caplog.records: - print(rec) - if rec.message in expected_messages: - log_count += 1 - - assert log_count == len(expected_messages) + assert ["CREATE TABLE TgtC.TgtS.table1 (name string)", "SHOW CREATE TABLE SrcC.SrcS.table1"] == sorted( + backend.queries + ) diff --git a/tests/unit/test_install.py b/tests/unit/test_install.py index 30c061d6b1..a9311c87ad 100644 --- a/tests/unit/test_install.py +++ b/tests/unit/test_install.py @@ -1194,7 +1194,7 @@ def test_repair_run_result_state(ws, caplog): state=RunState(result_state=None), ) ] - install = WorkspaceInstaller(ws, verify_timeout=timedelta(seconds=5)) + install = WorkspaceInstaller(ws, verify_timeout=timedelta(seconds=1)) install._state.jobs = {"assessment": "123"} ws.jobs.list_runs.return_value = base ws.jobs.list_runs.repair_run = None diff --git a/tests/unit/workspace_access/test_verification.py b/tests/unit/workspace_access/test_verification.py index 730e80d0b9..93ed2ce996 100644 --- a/tests/unit/workspace_access/test_verification.py +++ b/tests/unit/workspace_access/test_verification.py @@ -1,4 +1,7 @@ +from unittest.mock import create_autospec + import pytest +from databricks.sdk import WorkspaceClient from databricks.sdk.errors import PermissionDenied from databricks.sdk.service.catalog import MetastoreAssignment @@ -38,13 +41,12 @@ def test_validate_no_metastore_exists(mocker): verify_metastore_obj.verify_metastore() -def test_permission_denied_error(mocker): - ws = mocker.patch("databricks.sdk.WorkspaceClient.__init__") - ws.metastores = mocker.patch("databricks.sdk.WorkspaceClient.metastores") +def test_permission_denied_error(): + ws = create_autospec(WorkspaceClient) ws.metastores.current.side_effect = PermissionDenied() ws.metastores.current.return_value = None ws.return_value = None verify_metastore_obj = VerifyHasMetastore(ws) - assert verify_metastore_obj.verify_metastore() is None + assert not verify_metastore_obj.verify_metastore()