Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Retry backend execute on concurrent append #303

Draft
wants to merge 24 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
22d9c63
Add test with concurrent delta append
JCZuurmond Sep 30, 2024
1f4bb5a
Move roll over method out
JCZuurmond Sep 30, 2024
1745911
Rename table
JCZuurmond Sep 30, 2024
92394ea
Assert the right way
JCZuurmond Sep 30, 2024
1010236
Use test table
JCZuurmond Sep 30, 2024
65dca14
Add comment explaining rollover
JCZuurmond Sep 30, 2024
bffd2d5
Retry concurrent append
JCZuurmond Sep 30, 2024
1f0b0a8
Fix string concat
JCZuurmond Sep 30, 2024
44513b3
Fix return type hint of retryable
JCZuurmond Sep 30, 2024
75050ad
Rename variables
JCZuurmond Sep 30, 2024
ac3e37c
Remove if delta missing raise as data loss
JCZuurmond Sep 30, 2024
3a7d5b3
Simplify create table
JCZuurmond Sep 30, 2024
c83f117
Use `make_table` fixture
JCZuurmond Sep 30, 2024
4844218
Remove wait until roll over
JCZuurmond Oct 1, 2024
f5e4db0
Remove unused import
JCZuurmond Oct 1, 2024
3f1c004
Move integration test to the appropriate module
JCZuurmond Oct 1, 2024
ae1ee5b
Put back raise error for missing delta transaction log
JCZuurmond Oct 1, 2024
7c0d4f4
Introduce custom `DeltaConcurrentAppend` error
JCZuurmond Oct 1, 2024
c31b679
Unit test `DeltaConcurrentAppend` error on statement execution backend
JCZuurmond Oct 1, 2024
664796b
Narrow test
JCZuurmond Oct 1, 2024
4f944ad
Test `DeltaConcurrentAppend` error on `RuntimeBackend`
JCZuurmond Oct 1, 2024
79cb07a
Narrow tests
JCZuurmond Oct 1, 2024
b2f2c8c
Format
JCZuurmond Oct 1, 2024
7f3d72e
Add integration test for concurrent write through runtime backend
JCZuurmond Oct 1, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion src/databricks/labs/lsql/backends.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
from databricks.sdk.retries import retried
from databricks.sdk.service.compute import Language

from databricks.labs.lsql.core import Row, StatementExecutionExt
from databricks.labs.lsql.core import DeltaConcurrentAppend, Row, StatementExecutionExt
from databricks.labs.lsql.structs import StructInference

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -117,6 +117,8 @@ def _api_error_from_message(error_message: str) -> DatabricksError:
return BadRequest(error_message)
if "Operation not allowed" in error_message:
return PermissionDenied(error_message)
if "DELTA_CONCURRENT_APPEND" in error_message:
return DeltaConcurrentAppend(error_message)
return Unknown(error_message)


Expand Down
10 changes: 9 additions & 1 deletion src/databricks/labs/lsql/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@
import requests
import sqlglot
from databricks.sdk import WorkspaceClient, errors
from databricks.sdk.errors import DataLoss, NotFound
from databricks.sdk.errors import BadRequest, DatabricksError, DataLoss, NotFound
from databricks.sdk.retries import retried
from databricks.sdk.service.sql import (
ColumnInfoTypeName,
Disposition,
Expand Down Expand Up @@ -119,6 +120,10 @@ def __repr__(self):
return f"Row({', '.join(f'{k}={v!r}' for (k, v) in zip(self.__columns__, self, strict=True))})"


class DeltaConcurrentAppend(DatabricksError):
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Introduced this error

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ConcurrentModification to be precise - we can concurrently delete, append or update

"""Error raised when appending concurrent to a Delta table."""


class StatementExecutionExt:
"""Execute SQL statements in a stateless manner.

Expand Down Expand Up @@ -182,6 +187,7 @@ def __init__( # pylint: disable=too-many-arguments,too-many-positional-argument
ColumnInfoTypeName.TIMESTAMP: self._parse_timestamp,
}

@retried(on=[DeltaConcurrentAppend], timeout=timedelta(seconds=10))
def execute(
self,
statement: str,
Expand Down Expand Up @@ -469,6 +475,8 @@ def _raise_if_needed(status: StatementStatus):
raise NotFound(error_message)
if "DELTA_MISSING_TRANSACTION_LOG" in error_message:
JCZuurmond marked this conversation as resolved.
Show resolved Hide resolved
raise DataLoss(error_message)
if "DELTA_CONCURRENT_APPEND" in error_message:
raise DeltaConcurrentAppend(error_message)
mapping = {
ServiceErrorCode.ABORTED: errors.Aborted,
ServiceErrorCode.ALREADY_EXISTS: errors.AlreadyExists,
Expand Down
56 changes: 55 additions & 1 deletion tests/integration/test_backends.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
import pytest
from databricks.labs.blueprint.commands import CommandExecutor
from databricks.labs.blueprint.installation import Installation
from databricks.labs.blueprint.parallel import Threads
from databricks.labs.blueprint.wheels import ProductInfo, WheelsV2
from databricks.sdk.errors import BadRequest
from databricks.sdk.service import compute

from databricks.labs.lsql import Row
from databricks.labs.lsql.backends import SqlBackend, StatementExecutionBackend
Expand Down Expand Up @@ -74,7 +77,6 @@
return "PASSED"
"""


UNKNOWN_ERROR = """
from databricks.labs.lsql.backends import RuntimeBackend
from databricks.sdk.errors import Unknown
Expand All @@ -86,6 +88,37 @@
return "PASSED"
"""

CONCURRENT_APPEND = '''
import math
import time


def wait_until_seconds_rollover(*, rollover_seconds: int = 10) -> None:
"""Wait until the next rollover.

Useful to align concurrent writes.

Args:
rollover_seconds (int) : The multiple of seconds to wait until the next rollover.
"""
nano, micro = 1e9, 1e6

nanoseconds_now = time.clock_gettime_ns(time.CLOCK_BOOTTIME)
nanoseconds_target = math.ceil(nanoseconds_now / nano // rollover_seconds) * nano * rollover_seconds

# To hit the rollover more accurate, first sleep until almost target
nanoseconds_until_almost_target = (nanoseconds_target - nanoseconds_now) - micro
time.sleep(max(nanoseconds_until_almost_target / nano, 0))

# Then busy-wait until the rollover occurs
while time.clock_gettime_ns(time.CLOCK_BOOTTIME) < nanoseconds_target:
pass


wait_until_seconds_rollover()
spark.sql("UPDATE {table_full_name} SET y = y * 2 WHERE (x % 2 = 0)")
'''


@pytest.mark.xfail
def test_runtime_backend_works_maps_permission_denied(ws):
Expand Down Expand Up @@ -139,6 +172,27 @@ def test_runtime_backend_errors_handled(ws, query):
assert result == "PASSED"


def test_runtime_backend_handles_concurrent_append(ws, make_random, make_table) -> None:
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@nfx : I copied this from the tests above to integration test the runtime backend. However, it is not really the RuntimeBackend. Is this the correct approach for integration testing the runtime backend? I would have introduced a local Spark session using pytest-spark.

commands = CommandExecutor(
ws.clusters,
ws.command_execution,
lambda: ws.config.cluster_id,
language=compute.Language.PYTHON,
)
table = make_table(name=f"lsql_test_{make_random()}", ctas="SELECT r.id AS x, random() AS y FROM range(1000000) r")

def update_table() -> None:
commands.run(CONCURRENT_APPEND.format(table_full_name=table.full_name))

try:
Threads.strict("concurrent appends", [update_table, update_table])
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this does not fail due to the lock in the CommandExecutor

except BadRequest as e:
if "DELTA_CONCURRENT_APPEND" in str(e):
assert False, str(e)
else:
raise # Raise in case of unexpected error


def test_statement_execution_backend_works(ws, env_or_skip):
sql_backend = StatementExecutionBackend(ws, env_or_skip("TEST_DEFAULT_WAREHOUSE_ID"))
rows = list(sql_backend.fetch("SELECT * FROM samples.nyctaxi.trips LIMIT 10"))
Expand Down
17 changes: 17 additions & 0 deletions tests/integration/test_core.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import logging

import pytest
from databricks.labs.blueprint.parallel import Threads
from databricks.sdk.errors import BadRequest
from databricks.sdk.service.sql import Disposition

from databricks.labs.lsql.core import Row, StatementExecutionExt
Expand Down Expand Up @@ -83,3 +85,18 @@ def test_fetch_value(ws):
see = StatementExecutionExt(ws)
count = see.fetch_value("SELECT COUNT(*) FROM samples.nyctaxi.trips")
assert count == 21932


def test_runtime_backend_handles_concurrent_append(sql_backend, make_random, make_table) -> None:
table = make_table(name=f"lsql_test_{make_random()}", ctas="SELECT r.id AS x, random() AS y FROM range(1000000) r")

def update_table() -> None:
sql_backend.execute(f"UPDATE {table.full_name} SET y = y * 2 WHERE (x % 2 = 0)")

try:
Threads.strict("concurrent appends", [update_table, update_table])
except BadRequest as e:
if "DELTA_CONCURRENT_APPEND" in str(e):
assert False, str(e)
else:
raise # Raise in case of unexpected error
9 changes: 6 additions & 3 deletions tests/unit/test_backends.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import datetime
import os
import re
import sys
from dataclasses import dataclass
from unittest import mock
Expand Down Expand Up @@ -32,6 +33,7 @@
RuntimeBackend,
StatementExecutionBackend,
)
from databricks.labs.lsql.core import DeltaConcurrentAppend

# pylint: disable=protected-access

Expand Down Expand Up @@ -364,9 +366,10 @@ def test_save_table_with_not_null_constraint_violated():
("PARSE_SYNTAX_ERROR foo", BadRequest),
("foo Operation not allowed", PermissionDenied),
("foo error failure", Unknown),
("[DELTA_CONCURRENT_APPEND] ConcurrentAppendException: Files were added ...", DeltaConcurrentAppend),
],
)
def test_runtime_backend_error_mapping_similar_to_statement_execution(msg, err_t):
def test_runtime_backend_error_mapping_similar_to_statement_execution(msg, err_t) -> None:
with mock.patch.dict(os.environ, {"DATABRICKS_RUNTIME_VERSION": "14.0"}):
pyspark_sql_session = MagicMock()
sys.modules["pyspark.sql.session"] = pyspark_sql_session
Expand All @@ -376,10 +379,10 @@ def test_runtime_backend_error_mapping_similar_to_statement_execution(msg, err_t

runtime_backend = RuntimeBackend()

with pytest.raises(err_t):
with pytest.raises(err_t, match=re.escape(msg)):
runtime_backend.execute("SELECT * from bar")

with pytest.raises(err_t):
with pytest.raises(err_t, match=re.escape(msg)):
list(runtime_backend.fetch("SELECT * from bar"))


Expand Down
9 changes: 4 additions & 5 deletions tests/unit/test_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
timedelta,
)

from databricks.labs.lsql.core import Row, StatementExecutionExt
from databricks.labs.lsql.core import DeltaConcurrentAppend, Row, StatementExecutionExt


@pytest.mark.parametrize(
Expand Down Expand Up @@ -196,19 +196,18 @@ def test_execute_poll_succeeds():
(ServiceError(message="... DELTA_TABLE_NOT_FOUND ..."), errors.NotFound),
(ServiceError(message="... DELTA_TABLE_NOT_FOUND ..."), errors.NotFound),
(ServiceError(message="... DELTA_MISSING_TRANSACTION_LOG ..."), errors.DataLoss),
(ServiceError(message="... DELTA_CONCURRENT_APPEND ..."), DeltaConcurrentAppend),
],
)
def test_execute_fails(status_error, platform_error_type):
def test_execute_fails(status_error, platform_error_type) -> None:
ws = create_autospec(WorkspaceClient)

ws.statement_execution.execute_statement.return_value = StatementResponse(
status=StatementStatus(state=StatementState.FAILED, error=status_error),
statement_id="bcd",
)

see = StatementExecutionExt(ws, warehouse_id="abc")

with pytest.raises(platform_error_type):
with pytest.raises(platform_error_type, match=status_error.message if status_error is not None else None):
see.execute("SELECT 2+2")


Expand Down
Loading