Skip to content

Commit

Permalink
EvWrite: add mvcc snapshot (#11474)
Browse files Browse the repository at this point in the history
  • Loading branch information
nikvas0 authored Nov 12, 2024
1 parent e62a642 commit f1bf216
Show file tree
Hide file tree
Showing 4 changed files with 21 additions and 0 deletions.
4 changes: 4 additions & 0 deletions ydb/core/kqp/common/kqp_tx.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,10 @@ bool NeedSnapshot(const TKqpTransactionContext& txCtx, const NYql::TKikimrConfig
if (!commitTx)
return true;

if (HasOlapTableWriteInTx(physicalQuery) || HasOlapTableReadInTx(physicalQuery)) {
return true;
}

size_t readPhases = 0;
bool hasEffects = false;
bool hasSourceRead = false;
Expand Down
4 changes: 4 additions & 0 deletions ydb/core/kqp/executer_actor/kqp_executer_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -952,6 +952,10 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
if (!settings.GetInconsistentTx() && !settings.GetIsOlap()) {
ActorIdToProto(BufferActorId, settings.MutableBufferActorId());
}
if (GetSnapshot().IsValid() && settings.GetIsOlap()) {
settings.MutableMvccSnapshot()->SetStep(GetSnapshot().Step);
settings.MutableMvccSnapshot()->SetTxId(GetSnapshot().TxId);
}
output.SinkSettings.ConstructInPlace();
output.SinkSettings->PackFrom(settings);
} else {
Expand Down
12 changes: 12 additions & 0 deletions ydb/core/kqp/runtime/kqp_write_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -170,12 +170,14 @@ class TKqpTableWriteActor : public TActorBootstrapped<TKqpTableWriteActor> {
const bool inconsistentTx,
const NMiniKQL::TTypeEnvironment& typeEnv,
std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> alloc,
const std::optional<NKikimrDataEvents::TMvccSnapshot>& mvccSnapshot,
const IKqpTransactionManagerPtr& txManager,
const TActorId sessionActorId,
TIntrusivePtr<TKqpCounters> counters,
NWilson::TTraceId traceId)
: TypeEnv(typeEnv)
, Alloc(alloc)
, MvccSnapshot(mvccSnapshot)
, TableId(tableId)
, TablePath(tablePath)
, LockTxId(lockTxId)
Expand Down Expand Up @@ -812,6 +814,9 @@ class TKqpTableWriteActor : public TActorBootstrapped<TKqpTableWriteActor> {
FillEvWritePrepare(evWrite.get(), shardId, *TxId, TxManager);
} else if (!InconsistentTx) {
evWrite->SetLockId(LockTxId, LockNodeId);
if (MvccSnapshot) {
*evWrite->Record.MutableMvccSnapshot() = *MvccSnapshot;
}
}

const auto serializationResult = ShardedWriteController->SerializeMessageToPayload(shardId, *evWrite);
Expand Down Expand Up @@ -953,6 +958,8 @@ class TKqpTableWriteActor : public TActorBootstrapped<TKqpTableWriteActor> {
const NMiniKQL::TTypeEnvironment& TypeEnv;
std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> Alloc;

std::optional<NKikimrDataEvents::TMvccSnapshot> MvccSnapshot;

const TTableId TableId;
const TString TablePath;

Expand Down Expand Up @@ -1018,6 +1025,9 @@ class TKqpDirectWriteActor : public TActorBootstrapped<TKqpDirectWriteActor>, pu
Settings.GetInconsistentTx(),
TypeEnv,
Alloc,
Settings.GetIsOlap()
? std::optional<NKikimrDataEvents::TMvccSnapshot>{Settings.GetMvccSnapshot()}
: std::optional<NKikimrDataEvents::TMvccSnapshot>{},
nullptr,
TActorId{},
Counters,
Expand Down Expand Up @@ -1205,6 +1215,7 @@ struct TTransactionSettings {
ui64 LockTxId = 0;
ui64 LockNodeId = 0;
bool InconsistentTx = false;
NKikimrDataEvents::TMvccSnapshot MvccSnapshot;
};

struct TWriteSettings {
Expand Down Expand Up @@ -1333,6 +1344,7 @@ class TKqpBufferWriteActor :public TActorBootstrapped<TKqpBufferWriteActor>, pub
InconsistentTx,
TypeEnv,
Alloc,
std::nullopt,
TxManager,
SessionActorId,
Counters,
Expand Down
1 change: 1 addition & 0 deletions ydb/core/protos/kqp.proto
Original file line number Diff line number Diff line change
Expand Up @@ -719,6 +719,7 @@ message TKqpTableSinkSettings {
optional NActorsProto.TActorId BufferActorId = 10;
optional int64 Priority = 11;
optional bool IsOlap = 12;
optional NKikimrDataEvents.TMvccSnapshot MvccSnapshot = 13;
}

message TKqpStreamLookupSettings {
Expand Down

0 comments on commit f1bf216

Please sign in to comment.