-
Notifications
You must be signed in to change notification settings - Fork 4
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Retry backend execute on concurrent append #303
base: main
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See comments
src/databricks/labs/lsql/core.py
Outdated
@@ -182,6 +189,7 @@ def __init__( # pylint: disable=too-many-arguments,too-many-positional-argument | |||
ColumnInfoTypeName.TIMESTAMP: self._parse_timestamp, | |||
} | |||
|
|||
@retried(is_retryable=_is_retryable_delta_concurrent_append, timeout=timedelta(seconds=10)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@nfx : Is this what you thinking off?
I have to think about the implications of always retrying this in lsql, maybe we should only retry within UCX isntead
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think there needs to be a flag to control this, and should default to off: it's not necessarily safe to blindly retry arbitrary SQL.
In general any time we do a 'read-modify-write' cycle, everything needs to start again from the read
part because the modify
(and write
) often depend on it. Sometimes the read
and modify
bits are within the same SQL statement as the write
, in which case this is safe. But often this is part of application code before we get to SQL and that may need to be restarted. In this situation only the application knows what to do.
Irrespective of this, whatever we do here also needs to end up in the .save_table()
implementations: these don't all pass through .execute()
and the same thing can happen there.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- we need to support this in
RuntimeBackend
, separately - i think we need to throw a predefined common exception, to @asnare point
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will move the retry to the RuntimeBackend
. What "predefined common exception" would you suggest to throw? I looked at the sdk.errors
and did not see one that really applied here
❌ 35/37 passed, 2 failed, 4 skipped, 7m59s total ❌ test_runtime_backend_handles_concurrent_append: databricks.sdk.errors.platform.BadRequest: [INSUFFICIENT_PERMISSIONS] Insufficient privileges: (1.544s)
❌ test_runtime_backend_handles_concurrent_append: databricks.sdk.errors.platform.BadRequest: [INSUFFICIENT_PERMISSIONS] Insufficient privileges: (654ms)
Running from acceptance #433 |
src/databricks/labs/lsql/core.py
Outdated
@@ -182,6 +189,7 @@ def __init__( # pylint: disable=too-many-arguments,too-many-positional-argument | |||
ColumnInfoTypeName.TIMESTAMP: self._parse_timestamp, | |||
} | |||
|
|||
@retried(is_retryable=_is_retryable_delta_concurrent_append, timeout=timedelta(seconds=10)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think there needs to be a flag to control this, and should default to off: it's not necessarily safe to blindly retry arbitrary SQL.
In general any time we do a 'read-modify-write' cycle, everything needs to start again from the read
part because the modify
(and write
) often depend on it. Sometimes the read
and modify
bits are within the same SQL statement as the write
, in which case this is safe. But often this is part of application code before we get to SQL and that may need to be restarted. In this situation only the application knows what to do.
Irrespective of this, whatever we do here also needs to end up in the .save_table()
implementations: these don't all pass through .execute()
and the same thing can happen there.
src/databricks/labs/lsql/core.py
Outdated
@@ -182,6 +189,7 @@ def __init__( # pylint: disable=too-many-arguments,too-many-positional-argument | |||
ColumnInfoTypeName.TIMESTAMP: self._parse_timestamp, | |||
} | |||
|
|||
@retried(is_retryable=_is_retryable_delta_concurrent_append, timeout=timedelta(seconds=10)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- we need to support this in
RuntimeBackend
, separately - i think we need to throw a predefined common exception, to @asnare point
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See comments
@@ -119,6 +120,10 @@ def __repr__(self): | |||
return f"Row({', '.join(f'{k}={v!r}' for (k, v) in zip(self.__columns__, self, strict=True))})" | |||
|
|||
|
|||
class DeltaConcurrentAppend(DatabricksError): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Introduced this error
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ConcurrentModification
to be precise - we can concurrently delete, append or update
@@ -139,6 +172,27 @@ def test_runtime_backend_errors_handled(ws, query): | |||
assert result == "PASSED" | |||
|
|||
|
|||
def test_runtime_backend_handles_concurrent_append(ws, make_random, make_table) -> None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@nfx : I copied this from the tests above to integration test the runtime backend. However, it is not really the RuntimeBackend
. Is this the correct approach for integration testing the runtime backend? I would have introduced a local Spark session using pytest-spark
.
commands.run(CONCURRENT_APPEND.format(table_full_name=table.full_name)) | ||
|
||
try: | ||
Threads.strict("concurrent appends", [update_table, update_table]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this does not fail due to the lock in the CommandExecutor
Retry backend execute on concurrent append