Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Retry backend execute on concurrent append #303

Draft
wants to merge 24 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
22d9c63
Add test with concurrent delta append
JCZuurmond Sep 30, 2024
1f4bb5a
Move roll over method out
JCZuurmond Sep 30, 2024
1745911
Rename table
JCZuurmond Sep 30, 2024
92394ea
Assert the right way
JCZuurmond Sep 30, 2024
1010236
Use test table
JCZuurmond Sep 30, 2024
65dca14
Add comment explaining rollover
JCZuurmond Sep 30, 2024
bffd2d5
Retry concurrent append
JCZuurmond Sep 30, 2024
1f0b0a8
Fix string concat
JCZuurmond Sep 30, 2024
44513b3
Fix return type hint of retryable
JCZuurmond Sep 30, 2024
75050ad
Rename variables
JCZuurmond Sep 30, 2024
ac3e37c
Remove if delta missing raise as data loss
JCZuurmond Sep 30, 2024
3a7d5b3
Simplify create table
JCZuurmond Sep 30, 2024
c83f117
Use `make_table` fixture
JCZuurmond Sep 30, 2024
4844218
Remove wait until roll over
JCZuurmond Oct 1, 2024
f5e4db0
Remove unused import
JCZuurmond Oct 1, 2024
3f1c004
Move integration test to the appropriate module
JCZuurmond Oct 1, 2024
ae1ee5b
Put back raise error for missing delta transaction log
JCZuurmond Oct 1, 2024
7c0d4f4
Introduce custom `DeltaConcurrentAppend` error
JCZuurmond Oct 1, 2024
c31b679
Unit test `DeltaConcurrentAppend` error on statement execution backend
JCZuurmond Oct 1, 2024
664796b
Narrow test
JCZuurmond Oct 1, 2024
4f944ad
Test `DeltaConcurrentAppend` error on `RuntimeBackend`
JCZuurmond Oct 1, 2024
79cb07a
Narrow tests
JCZuurmond Oct 1, 2024
b2f2c8c
Format
JCZuurmond Oct 1, 2024
7f3d72e
Add integration test for concurrent write through runtime backend
JCZuurmond Oct 1, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 10 additions & 3 deletions src/databricks/labs/lsql/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@
import requests
import sqlglot
from databricks.sdk import WorkspaceClient, errors
from databricks.sdk.errors import DataLoss, NotFound
from databricks.sdk.errors import BadRequest, DataLoss, NotFound
from databricks.sdk.retries import retried
from databricks.sdk.service.sql import (
ColumnInfoTypeName,
Disposition,
Expand Down Expand Up @@ -119,6 +120,13 @@ def __repr__(self):
return f"Row({', '.join(f'{k}={v!r}' for (k, v) in zip(self.__columns__, self, strict=True))})"


def _is_retryable_delta_concurrent_append(e: BaseException) -> str | None:
"""Retry a concurrent append to a delta table"""
if isinstance(e, BadRequest) and "DELTA_CONCURRENT_APPEND" in str(e):
return "Concurrent append"
return None


class StatementExecutionExt:
"""Execute SQL statements in a stateless manner.

Expand Down Expand Up @@ -182,6 +190,7 @@ def __init__( # pylint: disable=too-many-arguments,too-many-positional-argument
ColumnInfoTypeName.TIMESTAMP: self._parse_timestamp,
}

@retried(is_retryable=_is_retryable_delta_concurrent_append, timeout=timedelta(seconds=10))
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@nfx : Is this what you thinking off?

I have to think about the implications of always retrying this in lsql, maybe we should only retry within UCX isntead

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there needs to be a flag to control this, and should default to off: it's not necessarily safe to blindly retry arbitrary SQL.

In general any time we do a 'read-modify-write' cycle, everything needs to start again from the read part because the modify (and write) often depend on it. Sometimes the read and modify bits are within the same SQL statement as the write, in which case this is safe. But often this is part of application code before we get to SQL and that may need to be restarted. In this situation only the application knows what to do.

Irrespective of this, whatever we do here also needs to end up in the .save_table() implementations: these don't all pass through .execute() and the same thing can happen there.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. we need to support this in RuntimeBackend, separately
  2. i think we need to throw a predefined common exception, to @asnare point

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will move the retry to the RuntimeBackend. What "predefined common exception" would you suggest to throw? I looked at the sdk.errors and did not see one that really applied here

def execute(
self,
statement: str,
Expand Down Expand Up @@ -467,8 +476,6 @@ def _raise_if_needed(status: StatementStatus):
raise NotFound(error_message)
if "does not exist" in error_message:
raise NotFound(error_message)
if "DELTA_MISSING_TRANSACTION_LOG" in error_message:
JCZuurmond marked this conversation as resolved.
Show resolved Hide resolved
raise DataLoss(error_message)
mapping = {
ServiceErrorCode.ABORTED: errors.Aborted,
ServiceErrorCode.ALREADY_EXISTS: errors.AlreadyExists,
Expand Down
46 changes: 46 additions & 0 deletions tests/integration/test_backends.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@
import math
import time

import pytest
from databricks.labs.blueprint.commands import CommandExecutor
from databricks.labs.blueprint.installation import Installation
from databricks.labs.blueprint.parallel import Threads
from databricks.labs.blueprint.wheels import ProductInfo, WheelsV2
from databricks.sdk.errors import BadRequest

from databricks.labs.lsql import Row
from databricks.labs.lsql.backends import SqlBackend, StatementExecutionBackend
Expand Down Expand Up @@ -186,3 +191,44 @@ def test_runtime_backend_use_statements(ws):
"""
result = commands.run(permission_denied_query)
assert result == "PASSED"


def wait_until_seconds_rollover(*, rollover_seconds: int = 10) -> None:
"""Wait until the next rollover.

Useful to align concurrent writes.

Args:
rollover_seconds (int) : The multiple of seconds to wait until the next rollover.
"""
nano, micro = 1e9, 1e6

nanoseconds_now = time.clock_gettime_ns(time.CLOCK_REALTIME)
nanoseconds_target = math.ceil(nanoseconds_now / nano // rollover_seconds) * nano * rollover_seconds

# To hit the rollover more accurate, first sleep until almost target
nanoseconds_until_almost_target = (nanoseconds_target - nanoseconds_now) - micro
time.sleep(max(nanoseconds_until_almost_target / nano, 0))

# Then busy-wait until the rollover occurs
while time.clock_gettime_ns(time.CLOCK_REALTIME) < nanoseconds_target:
pass


def test_runtime_backend_handles_concurrent_append(sql_backend, make_random, make_table) -> None:
table = make_table(
name=f"lsql_test_{make_random()}",
ctas="SELECT r.id AS x, random() AS y FROM range(1000000) r"
)

def update_table() -> None:
wait_until_seconds_rollover() # Update the table at the same time
JCZuurmond marked this conversation as resolved.
Show resolved Hide resolved
sql_backend.execute(f"UPDATE {table.full_name} SET y = y * 2 WHERE (x % 2 = 0)")

try:
Threads.strict("concurrent appends", [update_table, update_table])
JCZuurmond marked this conversation as resolved.
Show resolved Hide resolved
except BadRequest as e:
if "[DELTA_CONCURRENT_APPEND]" in str(e):
assert False, str(e)
else:
raise # Raise in case of unexpected error
Loading