From f1bf21617534c893e66ebace4e84eddd1432fa82 Mon Sep 17 00:00:00 2001 From: Nikita Vasilev Date: Tue, 12 Nov 2024 12:22:10 +0300 Subject: [PATCH] EvWrite: add mvcc snapshot (#11474) --- ydb/core/kqp/common/kqp_tx.cpp | 4 ++++ ydb/core/kqp/executer_actor/kqp_executer_impl.h | 4 ++++ ydb/core/kqp/runtime/kqp_write_actor.cpp | 12 ++++++++++++ ydb/core/protos/kqp.proto | 1 + 4 files changed, 21 insertions(+) diff --git a/ydb/core/kqp/common/kqp_tx.cpp b/ydb/core/kqp/common/kqp_tx.cpp index e2758eff6273..f7a07ee8d169 100644 --- a/ydb/core/kqp/common/kqp_tx.cpp +++ b/ydb/core/kqp/common/kqp_tx.cpp @@ -169,6 +169,10 @@ bool NeedSnapshot(const TKqpTransactionContext& txCtx, const NYql::TKikimrConfig if (!commitTx) return true; + if (HasOlapTableWriteInTx(physicalQuery) || HasOlapTableReadInTx(physicalQuery)) { + return true; + } + size_t readPhases = 0; bool hasEffects = false; bool hasSourceRead = false; diff --git a/ydb/core/kqp/executer_actor/kqp_executer_impl.h b/ydb/core/kqp/executer_actor/kqp_executer_impl.h index 90d2813e0473..29e118432b6f 100644 --- a/ydb/core/kqp/executer_actor/kqp_executer_impl.h +++ b/ydb/core/kqp/executer_actor/kqp_executer_impl.h @@ -952,6 +952,10 @@ class TKqpExecuterBase : public TActorBootstrapped { if (!settings.GetInconsistentTx() && !settings.GetIsOlap()) { ActorIdToProto(BufferActorId, settings.MutableBufferActorId()); } + if (GetSnapshot().IsValid() && settings.GetIsOlap()) { + settings.MutableMvccSnapshot()->SetStep(GetSnapshot().Step); + settings.MutableMvccSnapshot()->SetTxId(GetSnapshot().TxId); + } output.SinkSettings.ConstructInPlace(); output.SinkSettings->PackFrom(settings); } else { diff --git a/ydb/core/kqp/runtime/kqp_write_actor.cpp b/ydb/core/kqp/runtime/kqp_write_actor.cpp index 65bbffdf93c0..067333d29d15 100644 --- a/ydb/core/kqp/runtime/kqp_write_actor.cpp +++ b/ydb/core/kqp/runtime/kqp_write_actor.cpp @@ -170,12 +170,14 @@ class TKqpTableWriteActor : public TActorBootstrapped { const bool inconsistentTx, const NMiniKQL::TTypeEnvironment& typeEnv, std::shared_ptr alloc, + const std::optional& mvccSnapshot, const IKqpTransactionManagerPtr& txManager, const TActorId sessionActorId, TIntrusivePtr counters, NWilson::TTraceId traceId) : TypeEnv(typeEnv) , Alloc(alloc) + , MvccSnapshot(mvccSnapshot) , TableId(tableId) , TablePath(tablePath) , LockTxId(lockTxId) @@ -812,6 +814,9 @@ class TKqpTableWriteActor : public TActorBootstrapped { FillEvWritePrepare(evWrite.get(), shardId, *TxId, TxManager); } else if (!InconsistentTx) { evWrite->SetLockId(LockTxId, LockNodeId); + if (MvccSnapshot) { + *evWrite->Record.MutableMvccSnapshot() = *MvccSnapshot; + } } const auto serializationResult = ShardedWriteController->SerializeMessageToPayload(shardId, *evWrite); @@ -953,6 +958,8 @@ class TKqpTableWriteActor : public TActorBootstrapped { const NMiniKQL::TTypeEnvironment& TypeEnv; std::shared_ptr Alloc; + std::optional MvccSnapshot; + const TTableId TableId; const TString TablePath; @@ -1018,6 +1025,9 @@ class TKqpDirectWriteActor : public TActorBootstrapped, pu Settings.GetInconsistentTx(), TypeEnv, Alloc, + Settings.GetIsOlap() + ? std::optional{Settings.GetMvccSnapshot()} + : std::optional{}, nullptr, TActorId{}, Counters, @@ -1205,6 +1215,7 @@ struct TTransactionSettings { ui64 LockTxId = 0; ui64 LockNodeId = 0; bool InconsistentTx = false; + NKikimrDataEvents::TMvccSnapshot MvccSnapshot; }; struct TWriteSettings { @@ -1333,6 +1344,7 @@ class TKqpBufferWriteActor :public TActorBootstrapped, pub InconsistentTx, TypeEnv, Alloc, + std::nullopt, TxManager, SessionActorId, Counters, diff --git a/ydb/core/protos/kqp.proto b/ydb/core/protos/kqp.proto index b487df72e037..fb46e899f70c 100644 --- a/ydb/core/protos/kqp.proto +++ b/ydb/core/protos/kqp.proto @@ -719,6 +719,7 @@ message TKqpTableSinkSettings { optional NActorsProto.TActorId BufferActorId = 10; optional int64 Priority = 11; optional bool IsOlap = 12; + optional NKikimrDataEvents.TMvccSnapshot MvccSnapshot = 13; } message TKqpStreamLookupSettings {