diff --git a/pyproject.toml b/pyproject.toml index 4709962..b1c681c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -48,7 +48,7 @@ dependencies = [ "isort>=2.5.0", "mypy", "types-PyYAML", - "types-requests", + "types-requests" ] python="3.10" diff --git a/src/databricks/labs/blueprint/parallel.py b/src/databricks/labs/blueprint/parallel.py index e8a350e..a1faa6c 100644 --- a/src/databricks/labs/blueprint/parallel.py +++ b/src/databricks/labs/blueprint/parallel.py @@ -131,7 +131,25 @@ def _progress_report(self, _): logger.info(msg) @staticmethod - def _wrap_result(func, name): + def _get_result_function_signature(func, name): + if not isinstance(func, functools.partial): + return name + + # try to build up signature, this should never fail + try: + args = [] + args.extend(repr(x) for x in func.args) + args.extend(f"{k}={v!r}" for (k, v) in func.keywords.items()) + args_str = ", ".join(args) + if args_str: + return f"{name}({args_str})" + return name + # but if it would ever fail, better return generic serialized name, than messing up traceback even more... + except Exception: # pylint: disable=broad-exception-caught + return str(func) + + @classmethod + def _wrap_result(cls, func, name): """This method emulates GoLang's error return style""" @functools.wraps(func) @@ -139,7 +157,8 @@ def inner(*args, **kwargs): try: return func(*args, **kwargs), None except Exception as err: # pylint: disable=broad-exception-caught - logger.error(f"{name} task failed: {err!s}", exc_info=err) + signature = cls._get_result_function_signature(func, name) + logger.error(f"{signature} task failed: {err!s}", exc_info=err) return None, err return inner diff --git a/tests/unit/test_parallel.py b/tests/unit/test_parallel.py index fb2f44e..8ca2c25 100644 --- a/tests/unit/test_parallel.py +++ b/tests/unit/test_parallel.py @@ -1,4 +1,5 @@ import logging +from functools import partial from databricks.sdk.core import DatabricksError @@ -117,3 +118,34 @@ def works(): assert [True, True, True, True] == results assert 0 == len(errors) assert ["Finished 'testing' tasks: 100% results available (4/4)"] == _predictable_messages(caplog) + + +def test_odd_partial_failed(caplog): + caplog.set_level(logging.INFO) + + def fails_on_odd(n=1, dummy=None): + if isinstance(n, str): + raise RuntimeError("strings are not supported!") + + if n % 2: + msg = "failed" + raise DatabricksError(msg) + + tasks = [ + partial(fails_on_odd, n=1), + partial(fails_on_odd, 1, dummy="6"), + partial(fails_on_odd), + partial(fails_on_odd, n="aaa"), + ] + + results, errors = Threads.gather("testing", tasks) + + assert [] == results + assert 4 == len(errors) + assert [ + "All 'testing' tasks failed!!!", + "testing task failed: failed", + "testing(1, dummy='6') task failed: failed", + "testing(n='aaa') task failed: strings are not supported!", + "testing(n=1) task failed: failed", + ] == _predictable_messages(caplog)