Skip to content

Commit

Permalink
Merge d559878 into 88e8771
Browse files Browse the repository at this point in the history
  • Loading branch information
azevaykin authored Feb 26, 2024
2 parents 88e8771 + d559878 commit 883ab54
Show file tree
Hide file tree
Showing 11 changed files with 173 additions and 53 deletions.
22 changes: 22 additions & 0 deletions ydb/core/tx/datashard/complete_write_unit.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,28 @@ void TCompleteWriteUnit::Complete(TOperation::TPtr op, const TActorContext &ctx)

DataShard.EnqueueChangeRecords(std::move(op->ChangeRecords()));
DataShard.EmitHeartbeats();

if (op->HasOutputData()) {
const auto& outReadSets = op->OutReadSets();
const auto& expectedReadSets = op->ExpectedReadSets();
auto itOut = outReadSets.begin();
auto itExpected = expectedReadSets.begin();
while (itExpected != expectedReadSets.end()) {
while (itOut != outReadSets.end() && itOut->first < itExpected->first) {
++itOut;
}
if (itOut != outReadSets.end() && itOut->first == itExpected->first) {
++itOut;
++itExpected;
continue;
}
// We have an expected readset without a corresponding out readset
for (const auto& recipient : itExpected->second) {
DataShard.SendReadSetNoData(ctx, recipient, op->GetStep(), op->GetTxId(), itExpected->first.first, itExpected->first.second);
}
++itExpected;
}
}
}

THolder<TExecutionUnit> CreateCompleteWriteUnit(TDataShard &dataShard, TPipeline &pipeline)
Expand Down
57 changes: 52 additions & 5 deletions ydb/core/tx/datashard/datashard.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -582,11 +582,18 @@ class TDataShard::TSendVolatileResult final : public IVolatileTxCallback {
{ }

void OnCommit(ui64) override {
LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD,
TString error = Result->GetError();
if (error) {
LOG_ERROR_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD,
"Complete [" << Step << " : " << TxId << "] from " << Self->TabletID()
<< " at tablet " << Self->TabletID() << ", error: " << error);
} else {
LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD,
"Complete [" << Step << " : " << TxId << "] from " << Self->TabletID()
<< " at tablet " << Self->TabletID() << " send result to client "
<< Target << ", exec latency: " << Result->Record.GetExecLatency()
<< " ms, propose latency: " << Result->Record.GetProposeLatency() << " ms");
}

ui64 resultSize = Result->GetTxResult().size();
ui32 flags = IEventHandle::MakeFlags(TInterconnectChannels::GetTabletChannel(resultSize), 0);
Expand All @@ -609,6 +616,49 @@ class TDataShard::TSendVolatileResult final : public IVolatileTxCallback {
ui64 TxId;
};

class TDataShard::TSendVolatileWriteResult final: public IVolatileTxCallback {
public:
TSendVolatileWriteResult(
TDataShard* self, std::unique_ptr<NEvents::TDataEvents::TEvWriteResult> writeResult,
const TActorId& target,
ui64 step, ui64 txId
)
: Self(self)
, WriteResult(std::move(writeResult))
, Target(target)
, Step(step)
, TxId(txId)
{
}

void OnCommit(ui64) override {
if (WriteResult->IsError()) {
LOG_ERROR_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD,
"Complete volatile write [" << Step << " : " << TxId << "] from " << Self->TabletID()
<< " at tablet " << Self->TabletID() << ", error: " << WriteResult->GetError());
} else {
LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD,
"Complete volatile write [" << Step << " : " << TxId << "] from " << Self->TabletID()
<< " at tablet " << Self->TabletID() << " send result to client " << Target);
}

LWTRACK(ProposeTransactionSendResult, WriteResult->GetOrbit());
Self->Send(Target, WriteResult.release(), 0);
}

void OnAbort(ui64 txId) override {
WriteResult = NEvents::TDataEvents::TEvWriteResult::BuildError(Self->TabletID(), txId, NKikimrDataEvents::TEvWriteResult::STATUS_ABORTED, "Distributed transaction aborted due to commit failure");
OnCommit(txId);
}

private:
TDataShard* Self;
std::unique_ptr<NEvents::TDataEvents::TEvWriteResult> WriteResult;
TActorId Target;
ui64 Step;
ui64 TxId;
};

void TDataShard::SendResult(const TActorContext &ctx,
TOutputOpData::TResultPtr &res,
const TActorId &target,
Expand Down Expand Up @@ -640,15 +690,12 @@ void TDataShard::SendResult(const TActorContext &ctx,
void TDataShard::SendWriteResult(const TActorContext& ctx, std::unique_ptr<NEvents::TDataEvents::TEvWriteResult>& result, const TActorId& target, ui64 step, ui64 txId) {
Y_ABORT_UNLESS(txId == result->Record.GetTxId(), "%" PRIu64 " vs %" PRIu64, txId, result->Record.GetTxId());

// TODO: Volatile
/*
if (VolatileTxManager.FindByTxId(txId)) {
// This is a volatile transaction, and we need to wait until it is resolved
bool ok = VolatileTxManager.AttachVolatileTxCallback(txId, new TSendVolatileResult(this, std::move(result), target, step, txId));
bool ok = VolatileTxManager.AttachVolatileTxCallback(txId, new TSendVolatileWriteResult(this, std::move(result), target, step, txId));
Y_ABORT_UNLESS(ok);
return;
}
*/

LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, "Complete write [" << step << " : " << txId << "] from " << TabletID() << " at tablet " << TabletID() << " send result to client " << target);

Expand Down
1 change: 1 addition & 0 deletions ydb/core/tx/datashard/datashard_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,7 @@ class TDataShard

class TWaitVolatileDependencies;
class TSendVolatileResult;
class TSendVolatileWriteResult;

struct TEvPrivate {
enum EEv {
Expand Down
4 changes: 2 additions & 2 deletions ydb/core/tx/datashard/datashard_pipeline.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -600,9 +600,9 @@ bool TPipeline::LoadWriteDetails(TTransactionContext& txc, const TActorContext&
if (it != DataTxCache.end()) {
auto baseTx = it->second;
Y_ABORT_UNLESS(baseTx->GetType() == TValidatedTx::EType::WriteTx, "Wrong writeOp type in cache");
TValidatedWriteTx::TPtr dataTx = std::static_pointer_cast<TValidatedWriteTx>(baseTx);
TValidatedWriteTx::TPtr writeTx = std::static_pointer_cast<TValidatedWriteTx>(baseTx);

writeOp->FillTxData(dataTx);
writeOp->FillTxData(writeTx);
// Remove writeOp from cache.
ForgetTx(writeOp->GetTxId());

Expand Down
9 changes: 6 additions & 3 deletions ydb/core/tx/datashard/datashard_ut_read_iterator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3218,10 +3218,11 @@ Y_UNIT_TEST_SUITE(DataShardReadIterator) {
helper.TestReadOneKey(tableName, {1, 1, 1}, 101);
}

Y_UNIT_TEST_TWIN(TryCommitLocksPrepared, BreakLocks) {
Y_UNIT_TEST_QUAD(TryCommitLocksPrepared, Volatile, BreakLocks) {
TTestHelper helper;

auto runtime = helper.Server->GetRuntime();
runtime->SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE);

const ui64 lockTxId = 1011121314;
const TString tableName1 = "table-1";
Expand Down Expand Up @@ -3266,7 +3267,8 @@ Y_UNIT_TEST_SUITE(DataShardReadIterator) {

Cerr << "===== Commit locks on table 1" << Endl;
{
auto writeRequest = std::make_unique<NKikimr::NEvents::TDataEvents::TEvWrite>(++helper.TxId, NKikimrDataEvents::TEvWrite::MODE_PREPARE);
auto writeRequest = std::make_unique<NKikimr::NEvents::TDataEvents::TEvWrite>(++helper.TxId,
Volatile ? NKikimrDataEvents::TEvWrite::MODE_VOLATILE_PREPARE : NKikimrDataEvents::TEvWrite::MODE_PREPARE);

NKikimrDataEvents::TKqpLocks& kqpLocks = *writeRequest->Record.MutableLocks();
kqpLocks.MutableLocks()->CopyFrom(readLocks);
Expand All @@ -3284,7 +3286,8 @@ Y_UNIT_TEST_SUITE(DataShardReadIterator) {

Cerr << "===== Write and commit locks on table 2" << Endl;
{
auto writeRequest = helper.MakeWriteRequest(tableName2, helper.TxId, {1, 1, 1, 1001}, NKikimrDataEvents::TEvWrite::MODE_PREPARE);
auto writeRequest = helper.MakeWriteRequest(tableName2, helper.TxId, {1, 1, 1, 1001},
Volatile ? NKikimrDataEvents::TEvWrite::MODE_VOLATILE_PREPARE : NKikimrDataEvents::TEvWrite::MODE_PREPARE);

NKikimrDataEvents::TKqpLocks& kqpLocks = *writeRequest->Record.MutableLocks();
kqpLocks.AddSendingShards(tabletId1);
Expand Down
31 changes: 16 additions & 15 deletions ydb/core/tx/datashard/datashard_ut_write.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,10 @@ Y_UNIT_TEST_SUITE(DataShardWrite) {
}
}

Y_UNIT_TEST_TWIN(UpsertPrepared, EvWrite) {
Y_UNIT_TEST_QUAD(UpsertPrepared, EvWrite, Volatile) {
auto [runtime, server, sender] = TestCreateServer();

// Disable volatile transactions, since EvWrite has not yet supported them.
runtime.GetAppData().FeatureFlags.SetEnableDataShardVolatileTransactions(false);
runtime.GetAppData().FeatureFlags.SetEnableDataShardVolatileTransactions(Volatile);

auto opts = TShardedTableOptions();
auto [shards1, tableId1] = CreateShardedTable(server, sender, "/Root", "table-1", opts);
Expand Down Expand Up @@ -160,11 +159,12 @@ Y_UNIT_TEST_SUITE(DataShardWrite) {
}
}

Y_UNIT_TEST(WritePrepared) {
Y_UNIT_TEST_TWIN(WritePrepared, Volatile) {
auto [runtime, server, sender] = TestCreateServer();

TShardedTableOptions opts;
const auto [shards, tableId] = CreateShardedTable(server, sender, "/Root", "table-1", opts);
const TString tableName = "table-1";
const auto [shards, tableId] = CreateShardedTable(server, sender, "/Root", tableName, opts);
const ui64 shard = shards[0];
const ui64 coordinator = ChangeStateStorage(Coordinator, server->GetSettings().Domain);
const ui32 rowCount = 3;
Expand All @@ -174,9 +174,9 @@ Y_UNIT_TEST_SUITE(DataShardWrite) {

Cout << "========= Send prepare =========\n";
{
const auto writeResult = Write(runtime, sender, shard, tableId, opts.Columns_, rowCount, txId, NKikimrDataEvents::TEvWrite::MODE_PREPARE);
const auto writeResult = Write(runtime, sender, shard, tableId, opts.Columns_, rowCount, txId,
Volatile ? NKikimrDataEvents::TEvWrite::MODE_VOLATILE_PREPARE : NKikimrDataEvents::TEvWrite::MODE_PREPARE);

UNIT_ASSERT_VALUES_EQUAL(writeResult.GetStatus(), NKikimrDataEvents::TEvWriteResult::STATUS_PREPARED);
UNIT_ASSERT_GT(writeResult.GetMinStep(), 0);
UNIT_ASSERT_GT(writeResult.GetMaxStep(), writeResult.GetMinStep());
UNIT_ASSERT_VALUES_EQUAL(writeResult.GetOrigin(), shard);
Expand All @@ -198,27 +198,25 @@ Y_UNIT_TEST_SUITE(DataShardWrite) {
{
auto writeResult = WaitForWriteCompleted(runtime, sender);

UNIT_ASSERT_VALUES_EQUAL_C(writeResult.GetStatus(), NKikimrDataEvents::TEvWriteResult::STATUS_COMPLETED, "Status: " << writeResult.GetStatus() << " Issues: " << writeResult.GetIssues());
UNIT_ASSERT_VALUES_EQUAL(writeResult.GetOrigin(), shard);
UNIT_ASSERT_GE(writeResult.GetStep(), minStep);
UNIT_ASSERT_LE(writeResult.GetStep(), maxStep);
UNIT_ASSERT_VALUES_EQUAL(writeResult.GetOrderId(), txId);
UNIT_ASSERT_VALUES_EQUAL(writeResult.GetTxId(), txId);

const auto& tableAccessStats = writeResult.GetTxStats().GetTableAccessStats(0);
UNIT_ASSERT_VALUES_EQUAL(tableAccessStats.GetTableInfo().GetName(), "/Root/table-1");
UNIT_ASSERT_VALUES_EQUAL(tableAccessStats.GetTableInfo().GetName(), "/Root/" + tableName);
UNIT_ASSERT_VALUES_EQUAL(tableAccessStats.GetUpdateRow().GetCount(), rowCount);
}

Cout << "========= Read table =========\n";
{
auto tableState = TReadTableState(server, MakeReadTableSettings("/Root/table-1")).All();
auto tableState = TReadTableState(server, MakeReadTableSettings("/Root/" + tableName)).All();
UNIT_ASSERT_VALUES_EQUAL(tableState, expectedTableState);
}

}

Y_UNIT_TEST(WritePreparedManyTables) {
Y_UNIT_TEST_TWIN(WritePreparedManyTables, Volatile) {
auto [runtime, server, sender] = TestCreateServer();

TShardedTableOptions opts;
Expand All @@ -236,7 +234,8 @@ Y_UNIT_TEST_SUITE(DataShardWrite) {

Cerr << "===== Write prepared to table 1" << Endl;
{
const auto writeResult = Write(runtime, sender, tabletId1, tableId1, opts.Columns_, rowCount, txId, NKikimrDataEvents::TEvWrite::MODE_PREPARE);
const auto writeResult = Write(runtime, sender, tabletId1, tableId1, opts.Columns_, rowCount, txId,
Volatile ? NKikimrDataEvents::TEvWrite::MODE_VOLATILE_PREPARE : NKikimrDataEvents::TEvWrite::MODE_PREPARE);

minStep1 = writeResult.GetMinStep();
maxStep1 = writeResult.GetMaxStep();
Expand Down Expand Up @@ -282,7 +281,7 @@ Y_UNIT_TEST_SUITE(DataShardWrite) {
}


Y_UNIT_TEST(WritePreparedNoTxCache) {
Y_UNIT_TEST_TWIN(WritePreparedNoTxCache, Volatile) {
auto [runtime, server, sender] = TestCreateServer();

TShardedTableOptions opts;
Expand All @@ -296,7 +295,9 @@ Y_UNIT_TEST_SUITE(DataShardWrite) {

Cout << "========= Send prepare =========\n";
{
const auto writeResult = Write(runtime, sender, shard, tableId, opts.Columns_, rowCount, txId, NKikimrDataEvents::TEvWrite::MODE_PREPARE);
const auto writeResult = Write(runtime, sender, shard, tableId, opts.Columns_, rowCount, txId,
Volatile ? NKikimrDataEvents::TEvWrite::MODE_VOLATILE_PREPARE : NKikimrDataEvents::TEvWrite::MODE_PREPARE);

minStep = writeResult.GetMinStep();
maxStep = writeResult.GetMaxStep();
}
Expand Down
25 changes: 13 additions & 12 deletions ydb/core/tx/datashard/datashard_write_operation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@
namespace NKikimr {
namespace NDataShard {

TValidatedWriteTx::TValidatedWriteTx(TDataShard* self, TTransactionContext& txc, ui64 globalTxId, TInstant receivedAt, const TRowVersion& readVersion, const TRowVersion& writeVersion, const NEvents::TDataEvents::TEvWrite& ev)
: UserDb(*self, txc.DB, globalTxId, readVersion, writeVersion, EngineHostCounters, TAppData::TimeProvider->Now())
TValidatedWriteTx::TValidatedWriteTx(TDataShard* self, TTransactionContext& txc, ui64 globalTxId, TInstant receivedAt, const NEvents::TDataEvents::TEvWrite& ev)
: UserDb(*self, txc.DB, globalTxId, TRowVersion::Min(), TRowVersion::Max(), EngineHostCounters, TAppData::TimeProvider->Now())
, KeyValidator(*self, txc.DB)
, TabletId(self->TabletID())
, ReceivedAt(receivedAt)
Expand Down Expand Up @@ -410,8 +410,7 @@ TValidatedWriteTx::TPtr TWriteOperation::BuildWriteTx(TDataShard* self, TTransac
{
if (!WriteTx) {
Y_ABORT_UNLESS(WriteRequest);
auto [readVersion, writeVersion] = self->GetReadWriteVersions(this);
WriteTx = std::make_shared<TValidatedWriteTx>(self, txc, GetGlobalTxId(), GetReceivedAt(), readVersion, writeVersion, *WriteRequest);
WriteTx = std::make_shared<TValidatedWriteTx>(self, txc, GetGlobalTxId(), GetReceivedAt(), *WriteRequest);
}
return WriteTx;
}
Expand Down Expand Up @@ -515,9 +514,8 @@ ERestoreDataStatus TWriteOperation::RestoreTxData(TDataShard* self, TTransaction
LocksCache().Locks[lock.LockId] = lock;

bool extractKeys = WriteTx->IsTxInfoLoaded();
auto [readVersion, writeVersion] = self->GetReadWriteVersions(this);

WriteTx = std::make_shared<TValidatedWriteTx>(self, txc, GetTxId(), GetReceivedAt(), readVersion, writeVersion, *WriteRequest);
WriteTx = std::make_shared<TValidatedWriteTx>(self, txc, GetTxId(), GetReceivedAt(), *WriteRequest);
if (WriteTx->Ready() && extractKeys) {
WriteTx->ExtractKeys(true);
}
Expand Down Expand Up @@ -547,22 +545,25 @@ void TWriteOperation::BuildExecutionPlan(bool loaded)
plan.push_back(EExecutionUnitKind::ExecuteWrite);
plan.push_back(EExecutionUnitKind::FinishProposeWrite);
plan.push_back(EExecutionUnitKind::CompletedOperations);
}
/*
else if (HasVolatilePrepareFlag()) {
} else if (HasVolatilePrepareFlag()) {
Y_ABORT_UNLESS(!loaded);
plan.push_back(EExecutionUnitKind::CheckWrite);
plan.push_back(EExecutionUnitKind::StoreWrite); // note: stores in memory
plan.push_back(EExecutionUnitKind::FinishProposeWrite);
Y_ABORT_UNLESS(!GetStep());
plan.push_back(EExecutionUnitKind::WaitForPlan);
plan.push_back(EExecutionUnitKind::PlanQueue);
plan.push_back(EExecutionUnitKind::LoadTxDetails); // note: reloads from memory
plan.push_back(EExecutionUnitKind::LoadWriteDetails); // note: reloads from memory
plan.push_back(EExecutionUnitKind::BuildAndWaitDependencies);
// Note: execute will also prepare and send readsets
plan.push_back(EExecutionUnitKind::ExecuteWrite);
// Note: it is important that plan here is the same as regular
// distributed tx, since normal tx may decide to commit in a
// volatile manner with dependencies, to avoid waiting for
// locked keys to resolve.
plan.push_back(EExecutionUnitKind::CompleteWrite);
plan.push_back(EExecutionUnitKind::CompletedOperations);
*/
else {
} else {
if (!loaded) {
plan.push_back(EExecutionUnitKind::CheckWrite);
plan.push_back(EExecutionUnitKind::StoreWrite);
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/tx/datashard/datashard_write_operation.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ class TValidatedWriteTx: TNonCopyable, public TValidatedTx {
public:
using TPtr = std::shared_ptr<TValidatedWriteTx>;

TValidatedWriteTx(TDataShard* self, TTransactionContext& txc, ui64 globalTxId, TInstant receivedAt, const TRowVersion& readVersion, const TRowVersion& writeVersion, const NEvents::TDataEvents::TEvWrite& ev);
TValidatedWriteTx(TDataShard* self, TTransactionContext& txc, ui64 globalTxId, TInstant receivedAt, const NEvents::TDataEvents::TEvWrite& ev);
~TValidatedWriteTx();

EType GetType() const override {
Expand Down
Loading

0 comments on commit 883ab54

Please sign in to comment.