Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Volatile transaction support in EvWrite #2196

Merged
merged 7 commits into from
Feb 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions ydb/core/tx/datashard/complete_write_unit.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,28 @@ void TCompleteWriteUnit::Complete(TOperation::TPtr op, const TActorContext &ctx)

DataShard.EnqueueChangeRecords(std::move(op->ChangeRecords()));
DataShard.EmitHeartbeats();

if (op->HasOutputData()) {
const auto& outReadSets = op->OutReadSets();
const auto& expectedReadSets = op->ExpectedReadSets();
auto itOut = outReadSets.begin();
auto itExpected = expectedReadSets.begin();
while (itExpected != expectedReadSets.end()) {
while (itOut != outReadSets.end() && itOut->first < itExpected->first) {
++itOut;
}
if (itOut != outReadSets.end() && itOut->first == itExpected->first) {
++itOut;
++itExpected;
continue;
}
// We have an expected readset without a corresponding out readset
for (const auto& recipient : itExpected->second) {
DataShard.SendReadSetNoData(ctx, recipient, op->GetStep(), op->GetTxId(), itExpected->first.first, itExpected->first.second);
}
++itExpected;
}
}
}

THolder<TExecutionUnit> CreateCompleteWriteUnit(TDataShard &dataShard, TPipeline &pipeline)
Expand Down
57 changes: 52 additions & 5 deletions ydb/core/tx/datashard/datashard.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -582,11 +582,18 @@ class TDataShard::TSendVolatileResult final : public IVolatileTxCallback {
{ }

void OnCommit(ui64) override {
LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD,
TString error = Result->GetError();
if (error) {
LOG_ERROR_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD,
"Complete [" << Step << " : " << TxId << "] from " << Self->TabletID()
<< " at tablet " << Self->TabletID() << ", error: " << error);
} else {
LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD,
"Complete [" << Step << " : " << TxId << "] from " << Self->TabletID()
<< " at tablet " << Self->TabletID() << " send result to client "
<< Target << ", exec latency: " << Result->Record.GetExecLatency()
<< " ms, propose latency: " << Result->Record.GetProposeLatency() << " ms");
}

ui64 resultSize = Result->GetTxResult().size();
ui32 flags = IEventHandle::MakeFlags(TInterconnectChannels::GetTabletChannel(resultSize), 0);
Expand All @@ -609,6 +616,49 @@ class TDataShard::TSendVolatileResult final : public IVolatileTxCallback {
ui64 TxId;
};

class TDataShard::TSendVolatileWriteResult final: public IVolatileTxCallback {
public:
TSendVolatileWriteResult(
TDataShard* self, std::unique_ptr<NEvents::TDataEvents::TEvWriteResult> writeResult,
const TActorId& target,
ui64 step, ui64 txId
)
: Self(self)
, WriteResult(std::move(writeResult))
, Target(target)
, Step(step)
, TxId(txId)
{
}

void OnCommit(ui64) override {
if (WriteResult->IsError()) {
LOG_ERROR_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD,
"Complete volatile write [" << Step << " : " << TxId << "] from " << Self->TabletID()
<< " at tablet " << Self->TabletID() << ", error: " << WriteResult->GetError());
} else {
LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD,
"Complete volatile write [" << Step << " : " << TxId << "] from " << Self->TabletID()
<< " at tablet " << Self->TabletID() << " send result to client " << Target);
}

LWTRACK(ProposeTransactionSendResult, WriteResult->GetOrbit());
Self->Send(Target, WriteResult.release(), 0);
}

void OnAbort(ui64 txId) override {
WriteResult = NEvents::TDataEvents::TEvWriteResult::BuildError(Self->TabletID(), txId, NKikimrDataEvents::TEvWriteResult::STATUS_ABORTED, "Distributed transaction aborted due to commit failure");
OnCommit(txId);
}

private:
TDataShard* Self;
std::unique_ptr<NEvents::TDataEvents::TEvWriteResult> WriteResult;
TActorId Target;
ui64 Step;
ui64 TxId;
};

void TDataShard::SendResult(const TActorContext &ctx,
TOutputOpData::TResultPtr &res,
const TActorId &target,
Expand Down Expand Up @@ -640,15 +690,12 @@ void TDataShard::SendResult(const TActorContext &ctx,
void TDataShard::SendWriteResult(const TActorContext& ctx, std::unique_ptr<NEvents::TDataEvents::TEvWriteResult>& result, const TActorId& target, ui64 step, ui64 txId) {
Y_ABORT_UNLESS(txId == result->Record.GetTxId(), "%" PRIu64 " vs %" PRIu64, txId, result->Record.GetTxId());

// TODO: Volatile
/*
if (VolatileTxManager.FindByTxId(txId)) {
// This is a volatile transaction, and we need to wait until it is resolved
bool ok = VolatileTxManager.AttachVolatileTxCallback(txId, new TSendVolatileResult(this, std::move(result), target, step, txId));
bool ok = VolatileTxManager.AttachVolatileTxCallback(txId, new TSendVolatileWriteResult(this, std::move(result), target, step, txId));
Y_ABORT_UNLESS(ok);
return;
}
*/

LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, "Complete write [" << step << " : " << txId << "] from " << TabletID() << " at tablet " << TabletID() << " send result to client " << target);

Expand Down
1 change: 1 addition & 0 deletions ydb/core/tx/datashard/datashard_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,7 @@ class TDataShard

class TWaitVolatileDependencies;
class TSendVolatileResult;
class TSendVolatileWriteResult;

struct TEvPrivate {
enum EEv {
Expand Down
4 changes: 2 additions & 2 deletions ydb/core/tx/datashard/datashard_pipeline.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -600,9 +600,9 @@ bool TPipeline::LoadWriteDetails(TTransactionContext& txc, const TActorContext&
if (it != DataTxCache.end()) {
auto baseTx = it->second;
Y_ABORT_UNLESS(baseTx->GetType() == TValidatedTx::EType::WriteTx, "Wrong writeOp type in cache");
TValidatedWriteTx::TPtr dataTx = std::static_pointer_cast<TValidatedWriteTx>(baseTx);
TValidatedWriteTx::TPtr writeTx = std::static_pointer_cast<TValidatedWriteTx>(baseTx);

writeOp->FillTxData(dataTx);
writeOp->FillTxData(writeTx);
// Remove writeOp from cache.
ForgetTx(writeOp->GetTxId());

Expand Down
9 changes: 6 additions & 3 deletions ydb/core/tx/datashard/datashard_ut_read_iterator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3218,10 +3218,11 @@ Y_UNIT_TEST_SUITE(DataShardReadIterator) {
helper.TestReadOneKey(tableName, {1, 1, 1}, 101);
}

Y_UNIT_TEST_TWIN(TryCommitLocksPrepared, BreakLocks) {
Y_UNIT_TEST_QUAD(TryCommitLocksPrepared, Volatile, BreakLocks) {
TTestHelper helper;

auto runtime = helper.Server->GetRuntime();
runtime->SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE);

const ui64 lockTxId = 1011121314;
const TString tableName1 = "table-1";
Expand Down Expand Up @@ -3266,7 +3267,8 @@ Y_UNIT_TEST_SUITE(DataShardReadIterator) {

Cerr << "===== Commit locks on table 1" << Endl;
{
auto writeRequest = std::make_unique<NKikimr::NEvents::TDataEvents::TEvWrite>(++helper.TxId, NKikimrDataEvents::TEvWrite::MODE_PREPARE);
auto writeRequest = std::make_unique<NKikimr::NEvents::TDataEvents::TEvWrite>(++helper.TxId,
Volatile ? NKikimrDataEvents::TEvWrite::MODE_VOLATILE_PREPARE : NKikimrDataEvents::TEvWrite::MODE_PREPARE);

NKikimrDataEvents::TKqpLocks& kqpLocks = *writeRequest->Record.MutableLocks();
kqpLocks.MutableLocks()->CopyFrom(readLocks);
Expand All @@ -3284,7 +3286,8 @@ Y_UNIT_TEST_SUITE(DataShardReadIterator) {

Cerr << "===== Write and commit locks on table 2" << Endl;
{
auto writeRequest = helper.MakeWriteRequest(tableName2, helper.TxId, {1, 1, 1, 1001}, NKikimrDataEvents::TEvWrite::MODE_PREPARE);
auto writeRequest = helper.MakeWriteRequest(tableName2, helper.TxId, {1, 1, 1, 1001},
Volatile ? NKikimrDataEvents::TEvWrite::MODE_VOLATILE_PREPARE : NKikimrDataEvents::TEvWrite::MODE_PREPARE);

NKikimrDataEvents::TKqpLocks& kqpLocks = *writeRequest->Record.MutableLocks();
kqpLocks.AddSendingShards(tabletId1);
Expand Down
31 changes: 16 additions & 15 deletions ydb/core/tx/datashard/datashard_ut_write.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,10 @@ Y_UNIT_TEST_SUITE(DataShardWrite) {
}
}

Y_UNIT_TEST_TWIN(UpsertPrepared, EvWrite) {
Y_UNIT_TEST_QUAD(UpsertPrepared, EvWrite, Volatile) {
auto [runtime, server, sender] = TestCreateServer();

// Disable volatile transactions, since EvWrite has not yet supported them.
runtime.GetAppData().FeatureFlags.SetEnableDataShardVolatileTransactions(false);
runtime.GetAppData().FeatureFlags.SetEnableDataShardVolatileTransactions(Volatile);

auto opts = TShardedTableOptions();
auto [shards1, tableId1] = CreateShardedTable(server, sender, "/Root", "table-1", opts);
Expand Down Expand Up @@ -160,11 +159,12 @@ Y_UNIT_TEST_SUITE(DataShardWrite) {
}
}

Y_UNIT_TEST(WritePrepared) {
Y_UNIT_TEST_TWIN(WritePrepared, Volatile) {
auto [runtime, server, sender] = TestCreateServer();

TShardedTableOptions opts;
const auto [shards, tableId] = CreateShardedTable(server, sender, "/Root", "table-1", opts);
const TString tableName = "table-1";
const auto [shards, tableId] = CreateShardedTable(server, sender, "/Root", tableName, opts);
const ui64 shard = shards[0];
const ui64 coordinator = ChangeStateStorage(Coordinator, server->GetSettings().Domain);
const ui32 rowCount = 3;
Expand All @@ -174,9 +174,9 @@ Y_UNIT_TEST_SUITE(DataShardWrite) {

Cout << "========= Send prepare =========\n";
{
const auto writeResult = Write(runtime, sender, shard, tableId, opts.Columns_, rowCount, txId, NKikimrDataEvents::TEvWrite::MODE_PREPARE);
const auto writeResult = Write(runtime, sender, shard, tableId, opts.Columns_, rowCount, txId,
Volatile ? NKikimrDataEvents::TEvWrite::MODE_VOLATILE_PREPARE : NKikimrDataEvents::TEvWrite::MODE_PREPARE);

UNIT_ASSERT_VALUES_EQUAL(writeResult.GetStatus(), NKikimrDataEvents::TEvWriteResult::STATUS_PREPARED);
UNIT_ASSERT_GT(writeResult.GetMinStep(), 0);
UNIT_ASSERT_GT(writeResult.GetMaxStep(), writeResult.GetMinStep());
UNIT_ASSERT_VALUES_EQUAL(writeResult.GetOrigin(), shard);
Expand All @@ -198,27 +198,25 @@ Y_UNIT_TEST_SUITE(DataShardWrite) {
{
auto writeResult = WaitForWriteCompleted(runtime, sender);

UNIT_ASSERT_VALUES_EQUAL_C(writeResult.GetStatus(), NKikimrDataEvents::TEvWriteResult::STATUS_COMPLETED, "Status: " << writeResult.GetStatus() << " Issues: " << writeResult.GetIssues());
UNIT_ASSERT_VALUES_EQUAL(writeResult.GetOrigin(), shard);
UNIT_ASSERT_GE(writeResult.GetStep(), minStep);
UNIT_ASSERT_LE(writeResult.GetStep(), maxStep);
UNIT_ASSERT_VALUES_EQUAL(writeResult.GetOrderId(), txId);
UNIT_ASSERT_VALUES_EQUAL(writeResult.GetTxId(), txId);

const auto& tableAccessStats = writeResult.GetTxStats().GetTableAccessStats(0);
UNIT_ASSERT_VALUES_EQUAL(tableAccessStats.GetTableInfo().GetName(), "/Root/table-1");
UNIT_ASSERT_VALUES_EQUAL(tableAccessStats.GetTableInfo().GetName(), "/Root/" + tableName);
UNIT_ASSERT_VALUES_EQUAL(tableAccessStats.GetUpdateRow().GetCount(), rowCount);
}

Cout << "========= Read table =========\n";
{
auto tableState = TReadTableState(server, MakeReadTableSettings("/Root/table-1")).All();
auto tableState = TReadTableState(server, MakeReadTableSettings("/Root/" + tableName)).All();
UNIT_ASSERT_VALUES_EQUAL(tableState, expectedTableState);
}

}

Y_UNIT_TEST(WritePreparedManyTables) {
Y_UNIT_TEST_TWIN(WritePreparedManyTables, Volatile) {
auto [runtime, server, sender] = TestCreateServer();

TShardedTableOptions opts;
Expand All @@ -236,7 +234,8 @@ Y_UNIT_TEST_SUITE(DataShardWrite) {

Cerr << "===== Write prepared to table 1" << Endl;
{
const auto writeResult = Write(runtime, sender, tabletId1, tableId1, opts.Columns_, rowCount, txId, NKikimrDataEvents::TEvWrite::MODE_PREPARE);
const auto writeResult = Write(runtime, sender, tabletId1, tableId1, opts.Columns_, rowCount, txId,
Volatile ? NKikimrDataEvents::TEvWrite::MODE_VOLATILE_PREPARE : NKikimrDataEvents::TEvWrite::MODE_PREPARE);

minStep1 = writeResult.GetMinStep();
maxStep1 = writeResult.GetMaxStep();
Expand Down Expand Up @@ -282,7 +281,7 @@ Y_UNIT_TEST_SUITE(DataShardWrite) {
}


Y_UNIT_TEST(WritePreparedNoTxCache) {
Y_UNIT_TEST_TWIN(WritePreparedNoTxCache, Volatile) {
auto [runtime, server, sender] = TestCreateServer();

TShardedTableOptions opts;
Expand All @@ -296,7 +295,9 @@ Y_UNIT_TEST_SUITE(DataShardWrite) {

Cout << "========= Send prepare =========\n";
{
const auto writeResult = Write(runtime, sender, shard, tableId, opts.Columns_, rowCount, txId, NKikimrDataEvents::TEvWrite::MODE_PREPARE);
const auto writeResult = Write(runtime, sender, shard, tableId, opts.Columns_, rowCount, txId,
Volatile ? NKikimrDataEvents::TEvWrite::MODE_VOLATILE_PREPARE : NKikimrDataEvents::TEvWrite::MODE_PREPARE);

minStep = writeResult.GetMinStep();
maxStep = writeResult.GetMaxStep();
}
Expand Down
25 changes: 13 additions & 12 deletions ydb/core/tx/datashard/datashard_write_operation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@
namespace NKikimr {
namespace NDataShard {

TValidatedWriteTx::TValidatedWriteTx(TDataShard* self, TTransactionContext& txc, ui64 globalTxId, TInstant receivedAt, const TRowVersion& readVersion, const TRowVersion& writeVersion, const NEvents::TDataEvents::TEvWrite& ev)
: UserDb(*self, txc.DB, globalTxId, readVersion, writeVersion, EngineHostCounters, TAppData::TimeProvider->Now())
TValidatedWriteTx::TValidatedWriteTx(TDataShard* self, TTransactionContext& txc, ui64 globalTxId, TInstant receivedAt, const NEvents::TDataEvents::TEvWrite& ev)
: UserDb(*self, txc.DB, globalTxId, TRowVersion::Min(), TRowVersion::Max(), EngineHostCounters, TAppData::TimeProvider->Now())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

А почему readVersion=Min, writeVersion=Max? Это даже никакого смысла не имеет, если с такими настройками произойдёт запись в базу, то потом все инварианты поедут. Кроме того в случае mvcc различающиеся версии read и write это не нормально.

Copy link
Collaborator Author

@azevaykin azevaykin Feb 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Сошлись на том, что UserDb нужно хранить во WriteOperation.
И read/write version нужно брать свежие, прямо перед записью.
И нужно осторожнее с WriteVersion, чтобы он не пришёл со значением Max в локальную базу.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Только не во WriteOperation, а на стеке в том юните, который собирается работать с базой.

, KeyValidator(*self, txc.DB)
, TabletId(self->TabletID())
, ReceivedAt(receivedAt)
Expand Down Expand Up @@ -410,8 +410,7 @@ TValidatedWriteTx::TPtr TWriteOperation::BuildWriteTx(TDataShard* self, TTransac
{
if (!WriteTx) {
Y_ABORT_UNLESS(WriteRequest);
auto [readVersion, writeVersion] = self->GetReadWriteVersions(this);
WriteTx = std::make_shared<TValidatedWriteTx>(self, txc, GetGlobalTxId(), GetReceivedAt(), readVersion, writeVersion, *WriteRequest);
WriteTx = std::make_shared<TValidatedWriteTx>(self, txc, GetGlobalTxId(), GetReceivedAt(), *WriteRequest);
}
return WriteTx;
}
Expand Down Expand Up @@ -515,9 +514,8 @@ ERestoreDataStatus TWriteOperation::RestoreTxData(TDataShard* self, TTransaction
LocksCache().Locks[lock.LockId] = lock;

bool extractKeys = WriteTx->IsTxInfoLoaded();
auto [readVersion, writeVersion] = self->GetReadWriteVersions(this);

WriteTx = std::make_shared<TValidatedWriteTx>(self, txc, GetTxId(), GetReceivedAt(), readVersion, writeVersion, *WriteRequest);
WriteTx = std::make_shared<TValidatedWriteTx>(self, txc, GetTxId(), GetReceivedAt(), *WriteRequest);
if (WriteTx->Ready() && extractKeys) {
WriteTx->ExtractKeys(true);
}
Expand Down Expand Up @@ -547,22 +545,25 @@ void TWriteOperation::BuildExecutionPlan(bool loaded)
plan.push_back(EExecutionUnitKind::ExecuteWrite);
plan.push_back(EExecutionUnitKind::FinishProposeWrite);
plan.push_back(EExecutionUnitKind::CompletedOperations);
}
/*
else if (HasVolatilePrepareFlag()) {
} else if (HasVolatilePrepareFlag()) {
Y_ABORT_UNLESS(!loaded);
plan.push_back(EExecutionUnitKind::CheckWrite);
plan.push_back(EExecutionUnitKind::StoreWrite); // note: stores in memory
plan.push_back(EExecutionUnitKind::FinishProposeWrite);
Y_ABORT_UNLESS(!GetStep());
plan.push_back(EExecutionUnitKind::WaitForPlan);
plan.push_back(EExecutionUnitKind::PlanQueue);
plan.push_back(EExecutionUnitKind::LoadTxDetails); // note: reloads from memory
plan.push_back(EExecutionUnitKind::LoadWriteDetails); // note: reloads from memory
plan.push_back(EExecutionUnitKind::BuildAndWaitDependencies);
// Note: execute will also prepare and send readsets
plan.push_back(EExecutionUnitKind::ExecuteWrite);
// Note: it is important that plan here is the same as regular
// distributed tx, since normal tx may decide to commit in a
// volatile manner with dependencies, to avoid waiting for
// locked keys to resolve.
plan.push_back(EExecutionUnitKind::CompleteWrite);
plan.push_back(EExecutionUnitKind::CompletedOperations);
*/
else {
} else {
if (!loaded) {
plan.push_back(EExecutionUnitKind::CheckWrite);
plan.push_back(EExecutionUnitKind::StoreWrite);
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/tx/datashard/datashard_write_operation.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ class TValidatedWriteTx: TNonCopyable, public TValidatedTx {
public:
using TPtr = std::shared_ptr<TValidatedWriteTx>;

TValidatedWriteTx(TDataShard* self, TTransactionContext& txc, ui64 globalTxId, TInstant receivedAt, const TRowVersion& readVersion, const TRowVersion& writeVersion, const NEvents::TDataEvents::TEvWrite& ev);
TValidatedWriteTx(TDataShard* self, TTransactionContext& txc, ui64 globalTxId, TInstant receivedAt, const NEvents::TDataEvents::TEvWrite& ev);
~TValidatedWriteTx();

EType GetType() const override {
Expand Down
Loading
Loading