Skip to content

Commit

Permalink
sql: fix auto-retry behavior with autocommit_before_ddl
Browse files Browse the repository at this point in the history
When the autocommit_before_ddl setting was enabled, we were
sending a notice to the client without any buffering. This prevents
auto-retry logic for retriable errors from kicking in, since if results
have already sent to the client, it's not safe to retry the current
statement.

The fix is to buffer the notice for sending instead. It will get sent
whenever results are flushed back to the client. In order to achieve
this, I changed the SendClientNotice function so that it does not
immediately flush the notice. Instead, the notice will now get sent
whenever the connection is flushed, like when the results are complete
or when a Flush message is received.

The existing BufferClientNotice function is not sufficient since that only
buffers notices in the command result, and that buffer is discarded due
to how the connExecutor state transitions are defined. Namely, the
connExecutor will execute the schema change command twice: once to
autocommit the current transaction, and another time to run the schema
change. To avoid losing the notice from the autocommit, it must be
buffered all the way into the client connection, and not just the
command result.

Running with this patch makes logic tests less flaky, since schema
changes that encounter retryable errors are way more likely to be able
to be automatically retried now. This allows us to remove the testing
knob that overrode the transaction liveness threshold. I verified the
flakiness is gone by using:
```
./dev testlogic ccl --config=3node-tenant --stress --ignore-cache
```

Release note (bug fix): Fixed a bug that prevented transaction retry
errors encountered during implicit transactions from being automatically
retried internally if the autocommit_before_ddl session variable was
enabled and the statement was a schema change.
  • Loading branch information
rafiss committed Feb 12, 2025
1 parent 8b2c9ae commit 84079c9
Show file tree
Hide file tree
Showing 13 changed files with 125 additions and 61 deletions.
1 change: 1 addition & 0 deletions pkg/sql/conn_executor_ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ func (ex *connExecutor) maybeAutoCommitBeforeDDL(
if err := ex.planner.SendClientNotice(
ctx,
pgnotice.Newf("auto-committing transaction before processing DDL due to autocommit_before_ddl setting"),
false, /* immediateFlush */
); err != nil {
return ex.makeErrEvent(err, ast)
}
Expand Down
77 changes: 69 additions & 8 deletions pkg/sql/conn_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package sql_test

import (
"bytes"
"context"
gosql "database/sql"
"database/sql/driver"
Expand Down Expand Up @@ -119,7 +120,7 @@ func TestSessionFinishRollsBackTxn(t *testing.T) {
defer log.Scope(t).Close(t)
aborter := NewTxnAborter()
defer aborter.Close(t)
params, _ := createTestServerParamsAllowTenants()
params, _ := createTestServerParams()
params.Knobs.SQLExecutor = aborter.executorKnobs()
s, mainDB, _ := serverutils.StartServer(t, params)
defer s.Stopper().Stop(context.Background())
Expand Down Expand Up @@ -149,8 +150,9 @@ CREATE TABLE t.test (k INT PRIMARY KEY, v TEXT);
for _, state := range tests {
t.Run(state, func(t *testing.T) {
// Create a low-level lib/pq connection so we can close it at will.
pgURL, cleanup := s.ApplicationLayer().PGUrl(t)
defer cleanup()
pgURL, cleanupDB := sqlutils.PGUrl(
t, s.AdvSQLAddr(), state, url.User(username.RootUser))
defer cleanupDB()
c, err := pq.Open(pgURL.String())
if err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -412,7 +414,7 @@ func TestHalloweenProblemAvoidance(t *testing.T) {
defer mutations.ResetMaxBatchSizeForTests()
numRows := smallerKvBatchSize + smallerInsertBatchSize + 10

params, _ := createTestServerParamsAllowTenants()
params, _ := createTestServerParams()
params.Insecure = true
params.Knobs.DistSQL = &execinfra.TestingKnobs{
TableReaderBatchBytesLimit: 10,
Expand Down Expand Up @@ -483,7 +485,7 @@ func TestAppNameStatisticsInitialization(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

params, _ := createTestServerParamsAllowTenants()
params, _ := createTestServerParams()
params.Insecure = true

s := serverutils.StartServerOnly(t, params)
Expand Down Expand Up @@ -875,7 +877,7 @@ func TestRetriableErrorDuringUpgradedTransaction(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

var retryCount int64
var attemptCount atomic.Int64
const numToRetry = 2 // only fail on the first two attempts
filter := newDynamicRequestFilter()
s, sqlDB, _ := serverutils.StartServer(t, base.TestServerArgs{
Expand Down Expand Up @@ -909,7 +911,7 @@ func TestRetriableErrorDuringUpgradedTransaction(t *testing.T) {
if err != nil || tableID != fooTableId {
return nil
}
if atomic.AddInt64(&retryCount, 1) <= numToRetry {
if attemptCount.Add(1) <= numToRetry {
return kvpb.NewErrorWithTxn(
kvpb.NewTransactionRetryError(kvpb.RETRY_REASON_UNKNOWN, "injected retry error"), ba.Txn,
)
Expand All @@ -919,7 +921,7 @@ func TestRetriableErrorDuringUpgradedTransaction(t *testing.T) {
})

testDB.Exec(t, "INSERT INTO bar VALUES(2); BEGIN; INSERT INTO foo VALUES(1); COMMIT;")
require.Equal(t, numToRetry+1, int(retryCount))
require.EqualValues(t, numToRetry+1, attemptCount.Load())

var x int
testDB.QueryRow(t, "select * from foo").Scan(&x)
Expand All @@ -928,6 +930,65 @@ func TestRetriableErrorDuringUpgradedTransaction(t *testing.T) {
require.Equal(t, 2, x)
}

// TestRetriableErrorAutoCommitBeforeDDL injects a retriable error while
// executing a schema change after that schema change caused the transaction to
// autocommit. In this scenario, the schema change should automatically be
// retried.
func TestRetriableErrorAutoCommitBeforeDDL(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

var attemptCount atomic.Int64
const numToRetry = 2 // only fail on the first two attempts
filter := newDynamicRequestFilter()
s, sqlDB, _ := serverutils.StartServer(t, base.TestServerArgs{
Knobs: base.TestingKnobs{
Store: &kvserver.StoreTestingKnobs{
TestingRequestFilter: filter.filter,
},
},
})
defer s.Stopper().Stop(context.Background())
codec := s.ApplicationLayer().Codec()

sqlDB.SetMaxOpenConns(1)
conn, err := sqlDB.Conn(context.Background())
require.NoError(t, err)
testDB := sqlutils.MakeSQLRunner(conn)

var fooTableId uint32
testDB.Exec(t, "SET enable_implicit_transaction_for_batch_statements = true")
testDB.Exec(t, "SET autocommit_before_ddl = true")
testDB.Exec(t, "CREATE TABLE foo (a INT PRIMARY KEY)")
testDB.QueryRow(t, "SELECT 'foo'::regclass::oid").Scan(&fooTableId)

// Inject an error that will happen during execution.
filter.setFilter(func(ctx context.Context, ba *kvpb.BatchRequest) *kvpb.Error {
if ba.Txn == nil {
return nil
}
if req, ok := ba.GetArg(kvpb.ConditionalPut); ok {
put := req.(*kvpb.ConditionalPutRequest)
if bytes.HasPrefix(put.Key, codec.DescMetadataKey(fooTableId)) {
if attemptCount.Load() <= numToRetry {
attemptCount.Add(1)
return kvpb.NewErrorWithTxn(
kvpb.NewTransactionRetryError(kvpb.RETRY_REASON_UNKNOWN, "injected retry error"), ba.Txn,
)
}
}
}
return nil
})

testDB.Exec(t, "INSERT INTO foo VALUES(1); ALTER TABLE foo ADD COLUMN b INT NULL DEFAULT -2; INSERT INTO foo VALUES(2);")
require.EqualValues(t, numToRetry+1, attemptCount.Load())

var b int
testDB.QueryRow(t, "SELECT b FROM foo WHERE a = 2").Scan(&b)
require.Equal(t, -2, b)
}

// This test ensures that when in an explicit transaction and statement
// preparation uses the user's transaction, errors during those planning queries
// are handled correctly.
Expand Down
9 changes: 6 additions & 3 deletions pkg/sql/conn_io.go
Original file line number Diff line number Diff line change
Expand Up @@ -838,8 +838,9 @@ type RestrictedCommandResult interface {
// This gets flushed only when the CommandResult is closed.
BufferNotice(notice pgnotice.Notice)

// SendNotice immediately flushes a notice to the client.
SendNotice(ctx context.Context, notice pgnotice.Notice) error
// SendNotice sends a notice to the client, which can optionally be flushed
// immediately.
SendNotice(ctx context.Context, notice pgnotice.Notice, immediateFlush bool) error

// SetColumns informs the client about the schema of the result. The columns
// can be nil.
Expand Down Expand Up @@ -1114,7 +1115,9 @@ func (r *streamingCommandResult) BufferNotice(notice pgnotice.Notice) {
}

// SendNotice is part of the RestrictedCommandResult interface.
func (r *streamingCommandResult) SendNotice(ctx context.Context, notice pgnotice.Notice) error {
func (r *streamingCommandResult) SendNotice(
ctx context.Context, notice pgnotice.Notice, immediateFlush bool,
) error {
// Unimplemented: the internal executor does not support notices.
return nil
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/faketreeeval/evalctx.go
Original file line number Diff line number Diff line change
Expand Up @@ -651,7 +651,7 @@ var _ eval.ClientNoticeSender = &DummyClientNoticeSender{}
func (c *DummyClientNoticeSender) BufferClientNotice(context.Context, pgnotice.Notice) {}

// SendClientNotice is part of the eval.ClientNoticeSender interface.
func (c *DummyClientNoticeSender) SendClientNotice(context.Context, pgnotice.Notice) error {
func (c *DummyClientNoticeSender) SendClientNotice(context.Context, pgnotice.Notice, bool) error {
return nil
}

Expand Down
1 change: 0 additions & 1 deletion pkg/sql/logictest/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@ go_library(
"//pkg/kv/kvclient/rangefeed",
"//pkg/kv/kvserver",
"//pkg/kv/kvserver/kvserverbase",
"//pkg/kv/kvserver/txnwait",
"//pkg/multitenant/tenantcapabilities",
"//pkg/security/username",
"//pkg/server",
Expand Down
13 changes: 0 additions & 13 deletions pkg/sql/logictest/logic.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangefeed"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/txnwait"
"github.com/cockroachdb/cockroach/pkg/multitenant/tenantcapabilities"
"github.com/cockroachdb/cockroach/pkg/security/username"
"github.com/cockroachdb/cockroach/pkg/server"
Expand Down Expand Up @@ -4389,18 +4388,6 @@ func RunLogicTest(
if *printErrorSummary {
defer lt.printErrorSummary()
}
if config.UseSecondaryTenant == logictestbase.Always {
// Under multitenant configs running in EngFlow, we have seen that logic
// tests can be flaky due to an overload condition where schema change
// transactions do not heartbeat quickly enough. This allows background jobs
// such as the spanconfig reconciler or the job registry "remove claims from
// dead sessions" loop.
// See https://github.com/cockroachdb/cockroach/pull/140400#issuecomment-2634346278
// and https://github.com/cockroachdb/cockroach/issues/140494#issuecomment-2640208187
// for a detailed analysis of this issue.
cleanup := txnwait.TestingOverrideTxnLivenessThreshold(30 * time.Second)
defer cleanup()
}
// Each test needs a copy because of Parallel
serverArgsCopy := serverArgs
serverArgsCopy.ForceProductionValues = serverArgs.ForceProductionValues || nonMetamorphicBatchSizes
Expand Down
7 changes: 7 additions & 0 deletions pkg/sql/logictest/testdata/logic_test/upsert
Original file line number Diff line number Diff line change
Expand Up @@ -623,6 +623,7 @@ CREATE TABLE t29494(x INT); INSERT INTO t29494 VALUES (12)

statement ok
BEGIN TRANSACTION ISOLATION LEVEL SERIALIZABLE;
SET LOCAL autocommit_before_ddl = false;
ALTER TABLE t29494 ADD COLUMN y INT NOT NULL DEFAULT 123

# Check that the new column is not visible
Expand All @@ -643,6 +644,7 @@ UPSERT INTO t29494(x) VALUES (123) RETURNING y
statement ok
ROLLBACK;
BEGIN TRANSACTION ISOLATION LEVEL SERIALIZABLE;
SET LOCAL autocommit_before_ddl = false;
ALTER TABLE t29494 ADD COLUMN y INT NOT NULL DEFAULT 123

statement error column "y" does not exist
Expand All @@ -653,6 +655,7 @@ ROLLBACK

statement ok
BEGIN TRANSACTION ISOLATION LEVEL SERIALIZABLE;
SET LOCAL autocommit_before_ddl = false;
ALTER TABLE t29494 ADD COLUMN y INT NOT NULL DEFAULT 123

query I
Expand Down Expand Up @@ -694,6 +697,7 @@ CREATE TABLE t29497(x INT PRIMARY KEY);

statement ok
BEGIN TRANSACTION ISOLATION LEVEL SERIALIZABLE;
SET LOCAL autocommit_before_ddl = false;
ALTER TABLE t29497 ADD COLUMN y INT NOT NULL DEFAULT 123

statement error UPSERT has more expressions than target columns
Expand All @@ -704,6 +708,7 @@ ROLLBACK;

statement ok
BEGIN TRANSACTION ISOLATION LEVEL SERIALIZABLE;
SET LOCAL autocommit_before_ddl = false;
ALTER TABLE t29497 ADD COLUMN y INT NOT NULL DEFAULT 123

statement error column "y" does not exist
Expand All @@ -716,6 +721,7 @@ subtest visible_returning_columns

statement ok
BEGIN TRANSACTION ISOLATION LEVEL SERIALIZABLE;
SET LOCAL autocommit_before_ddl = false;
ALTER TABLE tc DROP COLUMN y

query I colnames,rowsort
Expand Down Expand Up @@ -974,6 +980,7 @@ CREATE TABLE table38627 (a INT PRIMARY KEY, b INT); INSERT INTO table38627 VALUE

statement ok
BEGIN TRANSACTION ISOLATION LEVEL SERIALIZABLE;
SET LOCAL autocommit_before_ddl = false;
ALTER TABLE table38627 ADD COLUMN c INT NOT NULL DEFAULT 5

statement ok
Expand Down
29 changes: 9 additions & 20 deletions pkg/sql/logictest/testdata/logic_test/virtual_columns
Original file line number Diff line number Diff line change
Expand Up @@ -823,7 +823,8 @@ a b v

# Add virtual columns inside an explicit transactions.
statement ok
BEGIN
BEGIN TRANSACTION ISOLATION LEVEL SERIALIZABLE;
SET LOCAL autocommit_before_ddl = false;

statement ok
ALTER TABLE sc ADD COLUMN w1 INT AS (a*b) VIRTUAL
Expand Down Expand Up @@ -878,7 +879,8 @@ ALTER TABLE sc DROP COLUMN v

# Add a column and an index on that column in the same transaction.
statement ok
BEGIN
BEGIN TRANSACTION ISOLATION LEVEL SERIALIZABLE;
SET LOCAL autocommit_before_ddl = false;

statement ok
ALTER TABLE sc ADD COLUMN v INT AS (a+b) VIRTUAL
Expand All @@ -904,11 +906,9 @@ ALTER TABLE sc DROP COLUMN v

# Adding a column and a partial index using that column in the predicate in the
# same transaction is not allowed.
statement ok
SET autocommit_before_ddl = false

statement ok
BEGIN TRANSACTION ISOLATION LEVEL SERIALIZABLE;
SET LOCAL autocommit_before_ddl = false;

statement ok
ALTER TABLE sc ADD COLUMN v INT AS (a+b) VIRTUAL
Expand All @@ -919,9 +919,6 @@ CREATE INDEX partial_idx ON sc(b) WHERE v > 20
statement ok
END

statement ok
RESET autocommit_before_ddl

statement ok
ALTER TABLE sc ADD COLUMN v INT AS (a+b) VIRTUAL

Expand Down Expand Up @@ -1088,11 +1085,9 @@ statement error pgcode 0A000 virtual computed column "k" referencing columns \("
ALTER TABLE t_ref ADD COLUMN j INT NOT NULL DEFAULT 42,
ADD COLUMN k INT AS (i+j) VIRTUAL;

statement ok
SET autocommit_before_ddl = false

statement error pgcode 0A000 virtual computed column "l" referencing columns \("j", "k"\) added in the current transaction
BEGIN TRANSACTION ISOLATION LEVEL SERIALIZABLE;
SET LOCAL autocommit_before_ddl = false;
ALTER TABLE t_ref ADD COLUMN j INT NOT NULL DEFAULT 42;
ALTER TABLE t_ref ADD COLUMN k INT NOT NULL DEFAULT 42;
ALTER TABLE t_ref ADD COLUMN l INT AS (i+j+k) VIRTUAL;
Expand All @@ -1101,20 +1096,15 @@ COMMIT;
statement ok
ROLLBACK;

statement ok
RESET autocommit_before_ddl

# Test that adding virtual computed columns to tables which have been created
# in the current transaction is fine.

statement ok
DROP TABLE t_ref;

statement ok
SET autocommit_before_ddl = false

statement ok
BEGIN TRANSACTION ISOLATION LEVEL SERIALIZABLE;
SET LOCAL autocommit_before_ddl = false;
CREATE TABLE t_ref (i INT PRIMARY KEY);
ALTER TABLE t_ref ADD COLUMN j INT NOT NULL DEFAULT 42;
ALTER TABLE t_ref ADD COLUMN k INT AS (i+j) VIRTUAL;
Expand All @@ -1123,9 +1113,6 @@ COMMIT;
statement ok
DROP TABLE t_ref;

statement ok
RESET autocommit_before_ddl

# Tests for virtual computed columns that reference foreign key columns.
subtest referencing_fks

Expand Down Expand Up @@ -1315,12 +1302,14 @@ ALTER TABLE virtual_pk DROP COLUMN d;
# the legacy schema changer.
statement ok
BEGIN TRANSACTION ISOLATION LEVEL SERIALIZABLE;
SET LOCAL autocommit_before_ddl = false;
SET LOCAL use_declarative_schema_changer = off;
ALTER TABLE virtual_pk ADD COLUMN d INT NOT NULL DEFAULT 42;
COMMIT;

statement ok
BEGIN TRANSACTION ISOLATION LEVEL SERIALIZABLE;
SET LOCAL autocommit_before_ddl = false;
SET LOCAL use_declarative_schema_changer = off;
ALTER TABLE virtual_pk DROP COLUMN d;
COMMIT;
Expand Down
Loading

0 comments on commit 84079c9

Please sign in to comment.