Skip to content

Commit

Permalink
break locks on scheme tx (#11517)
Browse files Browse the repository at this point in the history
  • Loading branch information
CyberROFL authored Nov 12, 2024
1 parent a431c94 commit 91cc642
Show file tree
Hide file tree
Showing 9 changed files with 70 additions and 10 deletions.
12 changes: 11 additions & 1 deletion ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2351,9 +2351,19 @@ Y_UNIT_TEST_SUITE(KqpQueryService) {
Value2 String,
PRIMARY KEY (Key)
);
)", TTxControl::NoTx()).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());

result = db.ExecuteQuery(R"(
UPSERT INTO TestDdlDml2 (Key, Value1, Value2) VALUES (1, "1", "1");
SELECT * FROM TestDdlDml2;
ALTER TABLE TestDdlDml2 DROP COLUMN Value2;
)", TTxControl::NoTx()).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::ABORTED, result.GetIssues().ToString());

result = db.ExecuteQuery(R"(
UPSERT INTO TestDdlDml2 (Key, Value1) VALUES (1, "1");
SELECT * FROM TestDdlDml2;
UPSERT INTO TestDdlDml2 (Key, Value1) VALUES (2, "2");
SELECT * FROM TestDdlDml2;
CREATE TABLE TestDdlDml33 (
Expand All @@ -2363,7 +2373,7 @@ Y_UNIT_TEST_SUITE(KqpQueryService) {
)", TTxControl::NoTx()).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
UNIT_ASSERT_VALUES_EQUAL(result.GetResultSets().size(), 2);
CompareYson(R"([[[1u];["1"];["1"]]])", FormatResultSetYson(result.GetResultSet(0)));
CompareYson(R"([[[1u];["1"]]])", FormatResultSetYson(result.GetResultSet(0)));
CompareYson(R"([[[1u];["1"]];[[2u];["2"]]])", FormatResultSetYson(result.GetResultSet(1)));
UNIT_ASSERT_EQUAL_C(result.GetIssues().Size(), 0, result.GetIssues().ToString());

Expand Down
4 changes: 3 additions & 1 deletion ydb/core/tx/datashard/alter_table_unit.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#include "datashard_impl.h"
#include "datashard_locks_db.h"
#include "datashard_pipeline.h"
#include "execution_unit_ctors.h"

Expand Down Expand Up @@ -151,7 +152,8 @@ EExecutionStatus TAlterTableUnit::Execute(TOperation::TPtr op,
}

TUserTable::TPtr info = DataShard.AlterUserTable(ctx, txc, alterTableTx);
DataShard.AddUserTable(tableId, info);
TDataShardLocksDb locksDb(DataShard, txc);
DataShard.AddUserTable(tableId, info, &locksDb);

if (info->NeedSchemaSnapshots()) {
DataShard.AddSchemaSnapshot(tableId, version, op->GetStep(), op->GetTxId(), txc, ctx);
Expand Down
4 changes: 3 additions & 1 deletion ydb/core/tx/datashard/create_cdc_stream_unit.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#include "datashard_impl.h"
#include "datashard_locks_db.h"
#include "datashard_pipeline.h"
#include "execution_unit_ctors.h"

Expand Down Expand Up @@ -43,7 +44,8 @@ class TCreateCdcStreamUnit : public TExecutionUnit {
Y_ABORT_UNLESS(version);

auto tableInfo = DataShard.AlterTableAddCdcStream(ctx, txc, pathId, version, streamDesc);
DataShard.AddUserTable(pathId, tableInfo);
TDataShardLocksDb locksDb(DataShard, txc);
DataShard.AddUserTable(pathId, tableInfo, &locksDb);

if (tableInfo->NeedSchemaSnapshots()) {
DataShard.AddSchemaSnapshot(pathId, version, op->GetStep(), op->GetTxId(), txc, ctx);
Expand Down
5 changes: 2 additions & 3 deletions ydb/core/tx/datashard/datashard.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1890,7 +1890,6 @@ TUserTable::TPtr TDataShard::MoveUserTable(TOperation::TPtr op, const NKikimrTxD
newTableInfo->StatsNeedUpdate = true;

TDataShardLocksDb locksDb(*this, txc);

RemoveUserTable(prevId, &locksDb);
AddUserTable(newId, newTableInfo);

Expand Down Expand Up @@ -1967,8 +1966,8 @@ TUserTable::TPtr TDataShard::MoveUserIndex(TOperation::TPtr op, const NKikimrTxD
}

newTableInfo->SetSchema(schema);

AddUserTable(pathId, newTableInfo);
TDataShardLocksDb locksDb(*this, txc);
AddUserTable(pathId, newTableInfo, &locksDb);

if (newTableInfo->NeedSchemaSnapshots()) {
AddSchemaSnapshot(pathId, version, op->GetStep(), op->GetTxId(), txc, ctx);
Expand Down
5 changes: 4 additions & 1 deletion ydb/core/tx/datashard/datashard_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -1632,7 +1632,10 @@ class TDataShard
TableInfos.erase(tableId.LocalPathId);
}

void AddUserTable(const TPathId& tableId, TUserTable::TPtr tableInfo) {
void AddUserTable(const TPathId& tableId, TUserTable::TPtr tableInfo, ILocksDb* locksDb = nullptr) {
if (locksDb) {
SysLocks.RemoveSchema(tableId, locksDb);
}
TableInfos[tableId.LocalPathId] = tableInfo;
SysLocks.UpdateSchema(tableId, tableInfo->KeyColumnTypes);
Pipeline.GetDepTracker().UpdateSchema(tableId, *tableInfo);
Expand Down
38 changes: 38 additions & 0 deletions ydb/core/tx/datashard/datashard_ut_change_exchange.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3868,6 +3868,44 @@ Y_UNIT_TEST_SUITE(Cdc) {
MustNotLoseSchemaSnapshot(true);
}

Y_UNIT_TEST(ShouldBreakLocksOnConcurrentSchemeTx) {
TPortManager portManager;
TServer::TPtr server = new TServer(TServerSettings(portManager.GetPort(2134), {}, DefaultPQConfig())
.SetUseRealThreads(false)
.SetDomainName("Root")
);

auto& runtime = *server->GetRuntime();
const auto edgeActor = runtime.AllocateEdgeActor();

SetupLogging(runtime);
InitRoot(server, edgeActor);
CreateShardedTable(server, edgeActor, "/Root", "Table", SimpleTable());

WaitTxNotification(server, edgeActor, AsyncAlterAddStream(server, "/Root", "Table",
Updates(NKikimrSchemeOp::ECdcStreamFormatJson)));

ExecSQL(server, edgeActor, "UPSERT INTO `/Root/Table` (key, value) VALUES (1, 10);");

TString sessionId;
TString txId;
KqpSimpleBegin(runtime, sessionId, txId, "UPSERT INTO `/Root/Table` (key, value) VALUES (1, 11);");

UNIT_ASSERT_VALUES_EQUAL(
KqpSimpleContinue(runtime, sessionId, txId, "SELECT key, value FROM `/Root/Table`;"),
"{ items { uint32_value: 1 } items { uint32_value: 11 } }");

WaitTxNotification(server, edgeActor, AsyncAlterAddExtraColumn(server, "/Root", "Table"));

UNIT_ASSERT_VALUES_EQUAL(
KqpSimpleCommit(runtime, sessionId, txId, "SELECT 1;"),
"ERROR: ABORTED");

WaitForContent(server, edgeActor, "/Root/Table/Stream", {
R"({"update":{"value":10},"key":[1]})",
});
}

Y_UNIT_TEST(ResolvedTimestampsContinueAfterMerge) {
TPortManager portManager;
TServer::TPtr server = new TServer(TServerSettings(portManager.GetPort(2134), {}, DefaultPQConfig())
Expand Down
4 changes: 3 additions & 1 deletion ydb/core/tx/datashard/drop_cdc_stream_unit.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#include "datashard_impl.h"
#include "datashard_locks_db.h"
#include "datashard_pipeline.h"
#include "execution_unit_ctors.h"

Expand Down Expand Up @@ -40,7 +41,8 @@ class TDropCdcStreamUnit : public TExecutionUnit {
Y_ABORT_UNLESS(version);

auto tableInfo = DataShard.AlterTableDropCdcStream(ctx, txc, pathId, version, streamPathId);
DataShard.AddUserTable(pathId, tableInfo);
TDataShardLocksDb locksDb(DataShard, txc);
DataShard.AddUserTable(pathId, tableInfo, &locksDb);

if (tableInfo->NeedSchemaSnapshots()) {
DataShard.AddSchemaSnapshot(pathId, version, op->GetStep(), op->GetTxId(), txc, ctx);
Expand Down
4 changes: 3 additions & 1 deletion ydb/core/tx/datashard/drop_index_notice_unit.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#include "datashard_impl.h"
#include "datashard_locks_db.h"
#include "datashard_pipeline.h"
#include "execution_unit_ctors.h"

Expand Down Expand Up @@ -52,7 +53,8 @@ class TDropIndexNoticeUnit : public TExecutionUnit {
}

Y_ABORT_UNLESS(tableInfo);
DataShard.AddUserTable(pathId, tableInfo);
TDataShardLocksDb locksDb(DataShard, txc);
DataShard.AddUserTable(pathId, tableInfo, &locksDb);

if (tableInfo->NeedSchemaSnapshots()) {
DataShard.AddSchemaSnapshot(pathId, version, op->GetStep(), op->GetTxId(), txc, ctx);
Expand Down
4 changes: 3 additions & 1 deletion ydb/core/tx/datashard/initiate_build_index_unit.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#include "datashard_impl.h"
#include "datashard_locks_db.h"
#include "datashard_pipeline.h"
#include "execution_unit_ctors.h"

Expand Down Expand Up @@ -53,7 +54,8 @@ class TInitiateBuildIndexUnit : public TExecutionUnit {
}

Y_ABORT_UNLESS(tableInfo);
DataShard.AddUserTable(pathId, tableInfo);
TDataShardLocksDb locksDb(DataShard, txc);
DataShard.AddUserTable(pathId, tableInfo, &locksDb);

if (tableInfo->NeedSchemaSnapshots()) {
DataShard.AddSchemaSnapshot(pathId, version, op->GetStep(), op->GetTxId(), txc, ctx);
Expand Down

0 comments on commit 91cc642

Please sign in to comment.