diff --git a/ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp b/ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp index e8065a3eea08..c97a2509abf4 100644 --- a/ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp +++ b/ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp @@ -2351,9 +2351,19 @@ Y_UNIT_TEST_SUITE(KqpQueryService) { Value2 String, PRIMARY KEY (Key) ); + )", TTxControl::NoTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + + result = db.ExecuteQuery(R"( UPSERT INTO TestDdlDml2 (Key, Value1, Value2) VALUES (1, "1", "1"); SELECT * FROM TestDdlDml2; ALTER TABLE TestDdlDml2 DROP COLUMN Value2; + )", TTxControl::NoTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::ABORTED, result.GetIssues().ToString()); + + result = db.ExecuteQuery(R"( + UPSERT INTO TestDdlDml2 (Key, Value1) VALUES (1, "1"); + SELECT * FROM TestDdlDml2; UPSERT INTO TestDdlDml2 (Key, Value1) VALUES (2, "2"); SELECT * FROM TestDdlDml2; CREATE TABLE TestDdlDml33 ( @@ -2363,7 +2373,7 @@ Y_UNIT_TEST_SUITE(KqpQueryService) { )", TTxControl::NoTx()).ExtractValueSync(); UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); UNIT_ASSERT_VALUES_EQUAL(result.GetResultSets().size(), 2); - CompareYson(R"([[[1u];["1"];["1"]]])", FormatResultSetYson(result.GetResultSet(0))); + CompareYson(R"([[[1u];["1"]]])", FormatResultSetYson(result.GetResultSet(0))); CompareYson(R"([[[1u];["1"]];[[2u];["2"]]])", FormatResultSetYson(result.GetResultSet(1))); UNIT_ASSERT_EQUAL_C(result.GetIssues().Size(), 0, result.GetIssues().ToString()); diff --git a/ydb/core/tx/datashard/alter_table_unit.cpp b/ydb/core/tx/datashard/alter_table_unit.cpp index 59697e000794..60167f662527 100644 --- a/ydb/core/tx/datashard/alter_table_unit.cpp +++ b/ydb/core/tx/datashard/alter_table_unit.cpp @@ -1,4 +1,5 @@ #include "datashard_impl.h" +#include "datashard_locks_db.h" #include "datashard_pipeline.h" #include "execution_unit_ctors.h" @@ -151,7 +152,8 @@ EExecutionStatus TAlterTableUnit::Execute(TOperation::TPtr op, } TUserTable::TPtr info = DataShard.AlterUserTable(ctx, txc, alterTableTx); - DataShard.AddUserTable(tableId, info); + TDataShardLocksDb locksDb(DataShard, txc); + DataShard.AddUserTable(tableId, info, &locksDb); if (info->NeedSchemaSnapshots()) { DataShard.AddSchemaSnapshot(tableId, version, op->GetStep(), op->GetTxId(), txc, ctx); diff --git a/ydb/core/tx/datashard/create_cdc_stream_unit.cpp b/ydb/core/tx/datashard/create_cdc_stream_unit.cpp index 1f690063de51..67b31fbdc6ba 100644 --- a/ydb/core/tx/datashard/create_cdc_stream_unit.cpp +++ b/ydb/core/tx/datashard/create_cdc_stream_unit.cpp @@ -1,4 +1,5 @@ #include "datashard_impl.h" +#include "datashard_locks_db.h" #include "datashard_pipeline.h" #include "execution_unit_ctors.h" @@ -43,7 +44,8 @@ class TCreateCdcStreamUnit : public TExecutionUnit { Y_ABORT_UNLESS(version); auto tableInfo = DataShard.AlterTableAddCdcStream(ctx, txc, pathId, version, streamDesc); - DataShard.AddUserTable(pathId, tableInfo); + TDataShardLocksDb locksDb(DataShard, txc); + DataShard.AddUserTable(pathId, tableInfo, &locksDb); if (tableInfo->NeedSchemaSnapshots()) { DataShard.AddSchemaSnapshot(pathId, version, op->GetStep(), op->GetTxId(), txc, ctx); diff --git a/ydb/core/tx/datashard/datashard.cpp b/ydb/core/tx/datashard/datashard.cpp index 17cc5f570207..6771f4517ec1 100644 --- a/ydb/core/tx/datashard/datashard.cpp +++ b/ydb/core/tx/datashard/datashard.cpp @@ -1890,7 +1890,6 @@ TUserTable::TPtr TDataShard::MoveUserTable(TOperation::TPtr op, const NKikimrTxD newTableInfo->StatsNeedUpdate = true; TDataShardLocksDb locksDb(*this, txc); - RemoveUserTable(prevId, &locksDb); AddUserTable(newId, newTableInfo); @@ -1967,8 +1966,8 @@ TUserTable::TPtr TDataShard::MoveUserIndex(TOperation::TPtr op, const NKikimrTxD } newTableInfo->SetSchema(schema); - - AddUserTable(pathId, newTableInfo); + TDataShardLocksDb locksDb(*this, txc); + AddUserTable(pathId, newTableInfo, &locksDb); if (newTableInfo->NeedSchemaSnapshots()) { AddSchemaSnapshot(pathId, version, op->GetStep(), op->GetTxId(), txc, ctx); diff --git a/ydb/core/tx/datashard/datashard_impl.h b/ydb/core/tx/datashard/datashard_impl.h index f103724450ac..a5c275c22f0f 100644 --- a/ydb/core/tx/datashard/datashard_impl.h +++ b/ydb/core/tx/datashard/datashard_impl.h @@ -1632,7 +1632,10 @@ class TDataShard TableInfos.erase(tableId.LocalPathId); } - void AddUserTable(const TPathId& tableId, TUserTable::TPtr tableInfo) { + void AddUserTable(const TPathId& tableId, TUserTable::TPtr tableInfo, ILocksDb* locksDb = nullptr) { + if (locksDb) { + SysLocks.RemoveSchema(tableId, locksDb); + } TableInfos[tableId.LocalPathId] = tableInfo; SysLocks.UpdateSchema(tableId, tableInfo->KeyColumnTypes); Pipeline.GetDepTracker().UpdateSchema(tableId, *tableInfo); diff --git a/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp b/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp index 40614b6abe75..e3e61412efca 100644 --- a/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp +++ b/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp @@ -3868,6 +3868,44 @@ Y_UNIT_TEST_SUITE(Cdc) { MustNotLoseSchemaSnapshot(true); } + Y_UNIT_TEST(ShouldBreakLocksOnConcurrentSchemeTx) { + TPortManager portManager; + TServer::TPtr server = new TServer(TServerSettings(portManager.GetPort(2134), {}, DefaultPQConfig()) + .SetUseRealThreads(false) + .SetDomainName("Root") + ); + + auto& runtime = *server->GetRuntime(); + const auto edgeActor = runtime.AllocateEdgeActor(); + + SetupLogging(runtime); + InitRoot(server, edgeActor); + CreateShardedTable(server, edgeActor, "/Root", "Table", SimpleTable()); + + WaitTxNotification(server, edgeActor, AsyncAlterAddStream(server, "/Root", "Table", + Updates(NKikimrSchemeOp::ECdcStreamFormatJson))); + + ExecSQL(server, edgeActor, "UPSERT INTO `/Root/Table` (key, value) VALUES (1, 10);"); + + TString sessionId; + TString txId; + KqpSimpleBegin(runtime, sessionId, txId, "UPSERT INTO `/Root/Table` (key, value) VALUES (1, 11);"); + + UNIT_ASSERT_VALUES_EQUAL( + KqpSimpleContinue(runtime, sessionId, txId, "SELECT key, value FROM `/Root/Table`;"), + "{ items { uint32_value: 1 } items { uint32_value: 11 } }"); + + WaitTxNotification(server, edgeActor, AsyncAlterAddExtraColumn(server, "/Root", "Table")); + + UNIT_ASSERT_VALUES_EQUAL( + KqpSimpleCommit(runtime, sessionId, txId, "SELECT 1;"), + "ERROR: ABORTED"); + + WaitForContent(server, edgeActor, "/Root/Table/Stream", { + R"({"update":{"value":10},"key":[1]})", + }); + } + Y_UNIT_TEST(ResolvedTimestampsContinueAfterMerge) { TPortManager portManager; TServer::TPtr server = new TServer(TServerSettings(portManager.GetPort(2134), {}, DefaultPQConfig()) diff --git a/ydb/core/tx/datashard/drop_cdc_stream_unit.cpp b/ydb/core/tx/datashard/drop_cdc_stream_unit.cpp index 359bdd455f1f..64fcf4863460 100644 --- a/ydb/core/tx/datashard/drop_cdc_stream_unit.cpp +++ b/ydb/core/tx/datashard/drop_cdc_stream_unit.cpp @@ -1,4 +1,5 @@ #include "datashard_impl.h" +#include "datashard_locks_db.h" #include "datashard_pipeline.h" #include "execution_unit_ctors.h" @@ -40,7 +41,8 @@ class TDropCdcStreamUnit : public TExecutionUnit { Y_ABORT_UNLESS(version); auto tableInfo = DataShard.AlterTableDropCdcStream(ctx, txc, pathId, version, streamPathId); - DataShard.AddUserTable(pathId, tableInfo); + TDataShardLocksDb locksDb(DataShard, txc); + DataShard.AddUserTable(pathId, tableInfo, &locksDb); if (tableInfo->NeedSchemaSnapshots()) { DataShard.AddSchemaSnapshot(pathId, version, op->GetStep(), op->GetTxId(), txc, ctx); diff --git a/ydb/core/tx/datashard/drop_index_notice_unit.cpp b/ydb/core/tx/datashard/drop_index_notice_unit.cpp index d1df5149c41e..6d45bd13a2af 100644 --- a/ydb/core/tx/datashard/drop_index_notice_unit.cpp +++ b/ydb/core/tx/datashard/drop_index_notice_unit.cpp @@ -1,4 +1,5 @@ #include "datashard_impl.h" +#include "datashard_locks_db.h" #include "datashard_pipeline.h" #include "execution_unit_ctors.h" @@ -52,7 +53,8 @@ class TDropIndexNoticeUnit : public TExecutionUnit { } Y_ABORT_UNLESS(tableInfo); - DataShard.AddUserTable(pathId, tableInfo); + TDataShardLocksDb locksDb(DataShard, txc); + DataShard.AddUserTable(pathId, tableInfo, &locksDb); if (tableInfo->NeedSchemaSnapshots()) { DataShard.AddSchemaSnapshot(pathId, version, op->GetStep(), op->GetTxId(), txc, ctx); diff --git a/ydb/core/tx/datashard/initiate_build_index_unit.cpp b/ydb/core/tx/datashard/initiate_build_index_unit.cpp index a2ac702a7390..d0bc48a8aa4b 100644 --- a/ydb/core/tx/datashard/initiate_build_index_unit.cpp +++ b/ydb/core/tx/datashard/initiate_build_index_unit.cpp @@ -1,4 +1,5 @@ #include "datashard_impl.h" +#include "datashard_locks_db.h" #include "datashard_pipeline.h" #include "execution_unit_ctors.h" @@ -53,7 +54,8 @@ class TInitiateBuildIndexUnit : public TExecutionUnit { } Y_ABORT_UNLESS(tableInfo); - DataShard.AddUserTable(pathId, tableInfo); + TDataShardLocksDb locksDb(DataShard, txc); + DataShard.AddUserTable(pathId, tableInfo, &locksDb); if (tableInfo->NeedSchemaSnapshots()) { DataShard.AddSchemaSnapshot(pathId, version, op->GetStep(), op->GetTxId(), txc, ctx);