Skip to content

Commit

Permalink
Avoid committed and removed volatile txs in the TxMap (ydb-platform#1…
Browse files Browse the repository at this point in the history
  • Loading branch information
snaury authored Feb 7, 2025
1 parent 18686b5 commit dccdc9d
Show file tree
Hide file tree
Showing 5 changed files with 250 additions and 9 deletions.
21 changes: 17 additions & 4 deletions ydb/core/tx/datashard/datashard__read_iterator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1848,8 +1848,12 @@ class TDataShard::TReadOperation : public TOperation, public IReadOperation {
if (hadWrites)
return EExecutionStatus::DelayCompleteNoMoreRestarts;

if (Self->Pipeline.HasCommittingOpsBelow(state.ReadVersion) || Reader && Reader->NeedVolatileWaitForCommit())
if (Reader && Reader->NeedVolatileWaitForCommit() ||
Self->Pipeline.HasCommittingOpsBelow(state.ReadVersion) ||
Self->GetVolatileTxManager().HasUnstableVolatileTxsAtSnapshot(state.ReadVersion))
{
return EExecutionStatus::DelayComplete;
}

Complete(ctx);
return EExecutionStatus::Executed;
Expand Down Expand Up @@ -2881,10 +2885,13 @@ class TDataShard::TTxReadContinue : public NTabletFlatExecutor::TTransactionBase

ApplyLocks(ctx);

if (!Reader->NeedVolatileWaitForCommit()) {
SendResult(ctx);
} else {
if (Reader->NeedVolatileWaitForCommit() ||
Self->Pipeline.HasCommittingOpsBelow(state.ReadVersion) ||
Self->GetVolatileTxManager().HasUnstableVolatileTxsAtSnapshot(state.ReadVersion))
{
DelayedResult = true;
} else {
SendResult(ctx);
}
return true;
}
Expand All @@ -2893,6 +2900,12 @@ class TDataShard::TTxReadContinue : public NTabletFlatExecutor::TTransactionBase

void Complete(const TActorContext& ctx) override {
if (DelayedResult) {
if (!Self->ReadIteratorsByLocalReadId.contains(LocalReadId)) {
// the one who removed the iterator should have replied to the user
LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, Self->TabletID() << " read iterator# " << LocalReadId
<< " has been invalidated before TTxReadContinue::Complete()");
return;
}
SendResult(ctx);
}
}
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/tx/datashard/datashard_pipeline.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2313,7 +2313,7 @@ bool TPipeline::WaitCompletion(const TOperation::TPtr& op) const {
if(!op->Result() || op->Result()->GetStatus() != NKikimrTxDataShard::TEvProposeTransactionResult::COMPLETE)
return true;

return HasCommittingOpsBelow(op->GetMvccSnapshot());
return HasCommittingOpsBelow(op->GetMvccSnapshot()) || Self->GetVolatileTxManager().HasUnstableVolatileTxsAtSnapshot(op->GetMvccSnapshot());
}

bool TPipeline::HasCommittingOpsBelow(TRowVersion upperBound) const {
Expand Down
217 changes: 217 additions & 0 deletions ydb/core/tx/datashard/datashard_ut_volatile.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3455,6 +3455,223 @@ Y_UNIT_TEST_SUITE(DataShardVolatile) {
"{ items { uint32_value: 20 } items { uint32_value: 20 } }");
}

Y_UNIT_TEST(CompactedVolatileChangesCommit) {
// Test explained:
// 1. distributed tx that writes some data (commits to channel 0 are blocked)
// 2. shard receives a successful readset and marks commits in local db
// 3. table is compacted (commit to channel 0 are blocked, but SST switches)
// 4. perform a read, it shouldn't reply with success (tx not persistent yet and may rollback)
// Note: this is already the case due to committing ops tracking
TPortManager pm;
TServerSettings serverSettings(pm.GetPort(2134));
serverSettings.SetDomainName("Root")
.SetUseRealThreads(false)
.SetEnableDataShardVolatileTransactions(true);

Tests::TServer::TPtr server = new TServer(serverSettings);
auto &runtime = *server->GetRuntime();
auto sender = runtime.AllocateEdgeActor();

runtime.SetLogPriority(NKikimrServices::TABLET_EXECUTOR, NLog::PRI_DEBUG);
runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE);
runtime.SetLogPriority(NKikimrServices::PIPE_CLIENT, NLog::PRI_TRACE);

InitRoot(server, sender);

TDisableDataShardLogBatching disableDataShardLogBatching;

Cerr << "========= Creating table =========" << Endl;
UNIT_ASSERT_VALUES_EQUAL(
KqpSchemeExec(runtime, R"(
CREATE TABLE `/Root/table` (key uint32, value uint32, PRIMARY KEY (key))
WITH (PARTITION_AT_KEYS = (10));
)"),
"SUCCESS");

const auto shards = GetTableShards(server, sender, "/Root/table");
UNIT_ASSERT_VALUES_EQUAL(shards.size(), 2u);
const auto tableId = ResolveTableId(server, sender, "/Root/table");

// We need to fill table with some data
Cerr << "========= Upserting initial values =========" << Endl;
UNIT_ASSERT_VALUES_EQUAL(
KqpSimpleExec(runtime, R"(
UPSERT INTO `/Root/table` (key, value) VALUES (1, 1), (11, 11);
)"),
"<empty>");
UNIT_ASSERT_VALUES_EQUAL(
KqpSimpleExec(runtime, R"(
SELECT key, value FROM `/Root/table` WHERE key = 2;
)"),
"");

// We need to start a distributed write
TBlockEvents<TEvTxProcessing::TEvPlanStep> blockedPlan(runtime);
Cerr << "========= Starting a distributed write =========" << Endl;
auto upsertFuture = KqpSimpleSend(runtime, R"(
UPSERT INTO `/Root/table` (key, value) VALUES (2, 2), (12, 12);
)");
runtime.WaitFor("blocked plans", [&]{ return blockedPlan.size() >= 2; });
UNIT_ASSERT_VALUES_EQUAL(blockedPlan.size(), 2u);

// Make sure the first blocked message is for the first shard
auto shard1actor = ResolveTablet(runtime, shards.at(0));
if (blockedPlan[0]->GetRecipientRewrite() != shard1actor) {
std::swap(blockedPlan[0], blockedPlan[1]);
}
UNIT_ASSERT_VALUES_EQUAL(blockedPlan[0]->GetRecipientRewrite(), shard1actor);

// Block channel 0 commits at the first shard and wait for the commit attempt
TBlockEvents<TEvBlobStorage::TEvPut> blockedCommits(runtime,
[shard1 = shards.at(0)](const auto& ev) {
auto* msg = ev->Get();
if (msg->Id.TabletID() == shard1 && msg->Id.Channel() == 0) {
return true;
}
return false;
});
Cerr << "========= Unblocking plan at shard1 =========" << Endl;
blockedPlan.Unblock(1);
runtime.WaitFor("two commits (plan + exec)", [&]{ return blockedCommits.size() >= 2; });

Cerr << "========= Unblocking plan at shard2 =========" << Endl;
blockedPlan.Unblock();
runtime.WaitFor("two more commits (readset + volatile commit)", [&]{ return blockedCommits.size() >= 4; });

Cerr << "========= Compacting table =========" << Endl;
CompactTable(runtime, shards.at(0), tableId, false);

Cerr << "========= Starting an immediate read =========" << Endl;
auto readFuture = KqpSimpleSend(runtime, R"(
SELECT key, value FROM `/Root/table` WHERE key = 2;
)");
runtime.SimulateSleep(TDuration::Seconds(1));
UNIT_ASSERT(!readFuture.HasValue());

blockedCommits.Stop().Unblock();

UNIT_ASSERT_VALUES_EQUAL(
FormatResult(runtime.WaitFuture(std::move(upsertFuture))),
"<empty>");

UNIT_ASSERT_VALUES_EQUAL(
FormatResult(runtime.WaitFuture(std::move(readFuture))),
"{ items { uint32_value: 2 } items { uint32_value: 2 } }");
}

Y_UNIT_TEST(CompactedVolatileChangesAbort) {
// Test explained:
// 1. distributed tx that writes some data, volatile transactions are persisted
// 2. commits to channel 0 are blocked
// 3. shard receives a nodata readset and starts to rollback the tx
// 4. tx is marked as removed in the local db, but it's not persistent yet
// 5. table is compacted, the commit is blocked, but SST switches
// 6. since there are no traces of the volatile tx in the table the
// read may erroneously reply that these changes do not exist
// 7. a nodata readset is allowed to be spurious in tablets that cannot
// commit, technically a newer generation may have committed already
// Note: in practice this shouldn't happen, since new generation commit
// implies older generations no longer having a read lease, and read
// results wouldn't be sent anyway, but there's no guarantee that it's
// true for all reads and decisions, we must wait for abort to commit.
TPortManager pm;
TServerSettings serverSettings(pm.GetPort(2134));
serverSettings.SetDomainName("Root")
.SetUseRealThreads(false)
.SetEnableDataShardVolatileTransactions(true);

Tests::TServer::TPtr server = new TServer(serverSettings);
auto &runtime = *server->GetRuntime();
auto sender = runtime.AllocateEdgeActor();

runtime.SetLogPriority(NKikimrServices::TABLET_EXECUTOR, NLog::PRI_DEBUG);
runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE);
runtime.SetLogPriority(NKikimrServices::PIPE_CLIENT, NLog::PRI_TRACE);

InitRoot(server, sender);

TDisableDataShardLogBatching disableDataShardLogBatching;

Cerr << "========= Creating table =========" << Endl;
UNIT_ASSERT_VALUES_EQUAL(
KqpSchemeExec(runtime, R"(
CREATE TABLE `/Root/table` (key uint32, value uint32, PRIMARY KEY (key))
WITH (PARTITION_AT_KEYS = (10));
)"),
"SUCCESS");

const auto shards = GetTableShards(server, sender, "/Root/table");
UNIT_ASSERT_VALUES_EQUAL(shards.size(), 2u);
const auto tableId = ResolveTableId(server, sender, "/Root/table");

// We need to fill table with some data
Cerr << "========= Upserting initial values =========" << Endl;
UNIT_ASSERT_VALUES_EQUAL(
KqpSimpleExec(runtime, R"(
UPSERT INTO `/Root/table` (key, value) VALUES (1, 1), (11, 11);
)"),
"<empty>");
UNIT_ASSERT_VALUES_EQUAL(
KqpSimpleExec(runtime, R"(
SELECT key, value FROM `/Root/table` WHERE key = 2;
)"),
"");

// We need to start a distributed write
TBlockEvents<TEvTxProcessing::TEvReadSet> blockedReadSets(runtime);
Cerr << "========= Starting a distributed write =========" << Endl;
auto upsertFuture = KqpSimpleSend(runtime, R"(
UPSERT INTO `/Root/table` (key, value) VALUES (2, 2), (12, 12);
)");
runtime.WaitFor("blocked readsets", [&]{ return blockedReadSets.size() >= 4; });
UNIT_ASSERT_VALUES_EQUAL(blockedReadSets.size(), 4u);
blockedReadSets.Stop();

// Block channel 0 commits at the first shard
TBlockEvents<TEvBlobStorage::TEvPut> blockedCommits(runtime,
[shard1 = shards.at(0)](const auto& ev) {
auto* msg = ev->Get();
if (msg->Id.TabletID() == shard1 && msg->Id.Channel() == 0) {
return true;
}
return false;
});

// Send a nodata reply to the shard1 expectation
Cerr << "========= Sending a spurious nodata reply =========" << Endl;
auto shard1actor = ResolveTablet(runtime, shards.at(0));
bool replied = false;
for (auto& ev : blockedReadSets) {
if (ev->Sender == shard1actor) {
const auto& record = ev->Get()->Record;
if (record.GetFlags() & NKikimrTx::TEvReadSet::FLAG_EXPECT_READSET) {
auto event = MakeHolder<TEvTxProcessing::TEvReadSet>(
record.GetStep(),
record.GetTxId(),
record.GetTabletDest(),
record.GetTabletSource(),
shards.at(1));
event->Record.SetFlags(NKikimrTx::TEvReadSet::FLAG_NO_DATA | NKikimrTx::TEvReadSet::FLAG_NO_ACK);
runtime.Send(new IEventHandle(ev->Sender, ev->GetRecipientRewrite(), event.Release()));
replied = true;
break;
}
}
}
UNIT_ASSERT_C(replied, "Could not find an expected expectation readset");
runtime.WaitFor("one commit (volatile abort)", [&]{ return blockedCommits.size() >= 1; });

Cerr << "========= Compacting table =========" << Endl;
CompactTable(runtime, shards.at(0), tableId, false);

Cerr << "========= Starting an immediate read =========" << Endl;
auto readFuture = KqpSimpleSend(runtime, R"(
SELECT key, value FROM `/Root/table` WHERE key = 2;
)");
runtime.SimulateSleep(TDuration::Seconds(1));
UNIT_ASSERT(!readFuture.HasValue());
}

} // Y_UNIT_TEST_SUITE(DataShardVolatile)

} // namespace NKikimr
14 changes: 10 additions & 4 deletions ydb/core/tx/datashard/volatile_tx.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,9 @@ namespace NKikimr::NDataShard {
}
}

Self->VolatileTxManager.RemoveFromTxMap(info);
Self->VolatileTxManager.UnstableVolatileTxByVersion.insert(info);

auto getGroup = [&]() -> ui64 {
if (!info->ChangeGroup) {
if (info->Version.TxId != info->TxId) {
Expand Down Expand Up @@ -123,8 +126,6 @@ namespace NKikimr::NDataShard {

Self->VolatileTxManager.UnblockDependents(info);

Self->VolatileTxManager.RemoveFromTxMap(info);

Self->VolatileTxManager.RemoveVolatileTx(info);

Y_DEBUG_ABORT_UNLESS(!TxInfo, "TTxVolatileTxCommit has an unexpected link to a removed volatile tx");
Expand Down Expand Up @@ -168,6 +169,9 @@ namespace NKikimr::NDataShard {
}
}

Self->VolatileTxManager.RemoveFromTxMap(info);
Self->VolatileTxManager.UnstableVolatileTxByVersion.insert(info);

if (!info->ArbiterReadSets.empty()) {
NKikimrTx::TReadSetData data;
data.SetDecision(NKikimrTx::TReadSetData::DECISION_ABORT);
Expand Down Expand Up @@ -218,8 +222,6 @@ namespace NKikimr::NDataShard {

Self->VolatileTxManager.UnblockDependents(info);

Self->VolatileTxManager.RemoveFromTxMap(info);

Self->VolatileTxManager.RemoveVolatileTx(info);

Y_DEBUG_ABORT_UNLESS(!TxInfo, "TTxVolatileTxAbort has an unexpected link to a removed volatile tx");
Expand Down Expand Up @@ -255,6 +257,7 @@ namespace NKikimr::NDataShard {
void TVolatileTxManager::Clear() {
VolatileTxs.clear();
VolatileTxByVersion.clear();
UnstableVolatileTxByVersion.clear();
VolatileTxByCommitTxId.clear();
VolatileTxByCommitOrder.Clear();
TxMap.Reset();
Expand All @@ -267,6 +270,7 @@ namespace NKikimr::NDataShard {
Y_ABORT_UNLESS(
VolatileTxs.empty() &&
VolatileTxByVersion.empty() &&
UnstableVolatileTxByVersion.empty() &&
VolatileTxByCommitTxId.empty() &&
VolatileTxByCommitOrder.Empty() &&
!TxMap,
Expand Down Expand Up @@ -640,6 +644,7 @@ namespace NKikimr::NDataShard {
VolatileTxByCommitTxId.erase(commitTxId);
}

Y_DEBUG_ABORT_UNLESS(!UnstableVolatileTxByVersion.contains(info));
VolatileTxByVersion.erase(info);

// FIXME: do we need to handle WaitingSnapshotEvents somehow?
Expand Down Expand Up @@ -682,6 +687,7 @@ namespace NKikimr::NDataShard {
for (ui64 commitTxId : info->CommitTxIds) {
VolatileTxByCommitTxId.erase(commitTxId);
}
UnstableVolatileTxByVersion.erase(info);
VolatileTxByVersion.erase(info);

Self->IncCounter(COUNTER_VOLATILE_TX_TOTAL_LATENCY_MS, info->LatencyTimer.Passed() * 1000);
Expand Down
5 changes: 5 additions & 0 deletions ydb/core/tx/datashard/volatile_tx.h
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,10 @@ namespace NKikimr::NDataShard {
return !VolatileTxByVersion.empty() && (*VolatileTxByVersion.begin())->Version <= snapshot;
}

bool HasUnstableVolatileTxsAtSnapshot(const TRowVersion& snapshot) const {
return !UnstableVolatileTxByVersion.empty() && (*UnstableVolatileTxByVersion.begin())->Version <= snapshot;
}

TRowVersion GetMinUncertainVersion() const {
if (!VolatileTxByVersion.empty()) {
return (*VolatileTxByVersion.begin())->Version;
Expand Down Expand Up @@ -291,6 +295,7 @@ namespace NKikimr::NDataShard {
absl::flat_hash_map<ui64, std::unique_ptr<TVolatileTxInfo>> VolatileTxs; // TxId -> Info
absl::flat_hash_map<ui64, TVolatileTxInfo*> VolatileTxByCommitTxId; // CommitTxId -> Info
TVolatileTxByVersion VolatileTxByVersion;
TVolatileTxByVersion UnstableVolatileTxByVersion;
TIntrusiveList<TVolatileTxInfo, TVolatileTxInfoCommitOrderListTag> VolatileTxByCommitOrder;
std::vector<TWaitingSnapshotEvent> WaitingSnapshotEvents;
TIntrusivePtr<TTxMap> TxMap;
Expand Down

0 comments on commit dccdc9d

Please sign in to comment.