Skip to content

Commit

Permalink
feat(ingest/powerbi): add timeouts for m-query parsing
Browse files Browse the repository at this point in the history
  • Loading branch information
hsheth2 committed Oct 30, 2024
1 parent 0e62c69 commit 34a54a3
Show file tree
Hide file tree
Showing 4 changed files with 96 additions and 2 deletions.
11 changes: 10 additions & 1 deletion metadata-ingestion/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,10 @@
*path_spec_common,
}

threading_timeout_common = {
"stopit==1.1.2",
}

abs_base = {
"azure-core==1.29.4",
"azure-identity>=1.17.1",
Expand Down Expand Up @@ -487,7 +491,12 @@
"trino": sql_common | trino,
"starburst-trino-usage": sql_common | usage_common | trino,
"nifi": {"requests", "packaging", "requests-gssapi"},
"powerbi": microsoft_common | {"lark[regex]==1.1.4", "sqlparse"} | sqlglot_lib,
"powerbi": (
microsoft_common
| {"lark[regex]==1.1.4", "sqlparse"}
| sqlglot_lib
| threading_timeout_common
),
"powerbi-report-server": powerbi_report_server,
"vertica": sql_common | {"vertica-sqlalchemy-dialect[vertica-python]==0.0.8.2"},
"unity-catalog": databricks | sql_common | sqllineage_lib,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import functools
import importlib.resources as pkg_resource
import logging
import os
from typing import Dict, List

import lark
Expand All @@ -19,9 +20,12 @@
TRACE_POWERBI_MQUERY_PARSER,
)
from datahub.ingestion.source.powerbi.rest_api_wrapper.data_classes import Table
from datahub.utilities.threading_timeout import TimeoutException, threading_timeout

logger = logging.getLogger(__name__)

_M_QUERY_PARSE_TIMEOUT = int(os.getenv("DATAHUB_POWERBI_M_QUERY_PARSE_TIMEOUT", 60))


@functools.lru_cache(maxsize=1)
def get_lark_parser() -> Lark:
Expand All @@ -41,7 +45,8 @@ def _parse_expression(expression: str) -> Tree:
expression = expression.replace("\u00a0", " ")

logger.debug(f"Parsing expression = {expression}")
parse_tree: Tree = lark_parser.parse(expression)
with threading_timeout(_M_QUERY_PARSE_TIMEOUT):
parse_tree: Tree = lark_parser.parse(expression)

if TRACE_POWERBI_MQUERY_PARSER:
logger.debug(parse_tree.pretty())
Expand Down Expand Up @@ -83,6 +88,13 @@ def get_upstream_tables(
context=f"table-full-name={table.full_name}, expression={table.expression}, message={message}",
)
return []
except TimeoutException:
reporter.warning(
title="M-Query Parsing Timeout",
message=f"M-Query parsing timed out after {_M_QUERY_PARSE_TIMEOUT} seconds. Lineage for this table will not be extracted.",
context=f"table-full-name={table.full_name}, expression={table.expression}",
)
return []
except (
BaseException
) as e: # TODO: Debug why BaseException is needed here and below.
Expand Down
42 changes: 42 additions & 0 deletions metadata-ingestion/src/datahub/utilities/threading_timeout.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
import contextlib
import functools
import platform
from typing import ContextManager

from stopit import ThreadingTimeout as _ThreadingTimeout, TimeoutException

__all__ = ["threading_timeout", "TimeoutException"]


@functools.lru_cache(maxsize=1)
def _is_cpython() -> bool:
"""Check if we're running on CPython."""
return platform.python_implementation() == "CPython"


def threading_timeout(timeout: float) -> ContextManager[None]:
"""A timeout context manager that uses stopit's ThreadingTimeout underneath.
This is only supported on CPython.
That's because stopit.ThreadingTimeout uses a CPython-internal method to raise
an exception (the timeout error) in another thread. See stopit.threadstop.async_raise.
Reference: https://github.com/glenfant/stopit
Args:
timeout: The timeout in seconds. If <= 0, no timeout is applied.
Raises:
RuntimeError: If the timeout is not supported on the current Python implementation.
TimeoutException: If the timeout is exceeded.
"""

if timeout <= 0:
return contextlib.nullcontext()

if not _is_cpython():
raise RuntimeError(
f"Timeout is only supported on CPython, not {platform.python_implementation()}"
)

return _ThreadingTimeout(timeout, swallow_exc=False)
31 changes: 31 additions & 0 deletions metadata-ingestion/tests/unit/utilities/test_threading_timeout.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
import time

import pytest

from datahub.utilities.threading_timeout import TimeoutException, threading_timeout


def test_timeout_no_timeout():
# Should complete without raising an exception
with threading_timeout(1.0):
time.sleep(0.1)


def test_timeout_raises():
# Should raise TimeoutException
with pytest.raises(TimeoutException):
with threading_timeout(0.1):
time.sleep(0.5)


def test_timeout_early_exit():
# Test that context manager handles other exceptions properly
with pytest.raises(ValueError):
with threading_timeout(1.0):
raise ValueError("Early exit")


def test_timeout_zero():
# Should not raise an exception
with threading_timeout(0.0):
pass

0 comments on commit 34a54a3

Please sign in to comment.