From faeab6d486654e3dbac211af1a7caaf44ee1e700 Mon Sep 17 00:00:00 2001 From: ivanmorozov333 Date: Tue, 5 Nov 2024 19:12:57 +0300 Subject: [PATCH 1/2] fix error on start internal scanner --- ydb/core/tx/columnshard/data_reader/actor.cpp | 4 +++- ydb/core/tx/columnshard/data_reader/actor.h | 6 ++++-- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/ydb/core/tx/columnshard/data_reader/actor.cpp b/ydb/core/tx/columnshard/data_reader/actor.cpp index 4fd69af8a7ab..a1fb223f78a6 100644 --- a/ydb/core/tx/columnshard/data_reader/actor.cpp +++ b/ydb/core/tx/columnshard/data_reader/actor.cpp @@ -23,6 +23,7 @@ void TActor::HandleExecute(NKqp::TEvKqpCompute::TEvScanData::TPtr& ev) { } else { AFL_INFO(NKikimrServices::TX_COLUMNSHARD)("event", "restore_task_finished")("reason", status.GetErrorMessage()); } + PassAway(); } } @@ -35,10 +36,11 @@ void TActor::HandleExecute(NKqp::TEvKqpCompute::TEvScanInitActor::TPtr& ev) { } void TActor::HandleExecute(NKqp::TEvKqpCompute::TEvScanError::TPtr& ev) { - SwitchStage(EStage::WaitData, EStage::Finished); + SwitchStage(std::nullopt, EStage::Finished); AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("event", "problem_on_restore_data")( "reason", NYql::IssuesFromMessageAsString(ev->Get()->Record.GetIssues())); RestoreTask->OnError(NYql::IssuesFromMessageAsString(ev->Get()->Record.GetIssues())); + PassAway(); } void TActor::Bootstrap(const TActorContext& /*ctx*/) { diff --git a/ydb/core/tx/columnshard/data_reader/actor.h b/ydb/core/tx/columnshard/data_reader/actor.h index 2eca911a87e2..048ee314922f 100644 --- a/ydb/core/tx/columnshard/data_reader/actor.h +++ b/ydb/core/tx/columnshard/data_reader/actor.h @@ -59,8 +59,10 @@ class TActor: public NActors::TActorBootstrapped { EStage Stage = EStage::Initialization; static inline const ui64 FreeSpace = ((ui64)8) << 20; - void SwitchStage(const EStage from, const EStage to) { - AFL_VERIFY(Stage == from)("from", (ui32)from)("real", (ui32)Stage)("to", (ui32)to); + void SwitchStage(const std::optional from, const EStage to) { + if (from) { + AFL_VERIFY(Stage == *from)("from", (ui32)*from)("real", (ui32)Stage)("to", (ui32)to); + } Stage = to; } From f60045fdd7001e1d3340cda31948cfd1b07d97c4 Mon Sep 17 00:00:00 2001 From: ivanmorozov333 Date: Tue, 12 Nov 2024 13:39:33 +0300 Subject: [PATCH 2/2] use mvcc snapshot --- ydb/core/tx/columnshard/columnshard__write.cpp | 7 ++++--- ydb/core/tx/columnshard/operations/batch_builder/builder.h | 4 ++-- ydb/core/tx/columnshard/operations/common/context.h | 7 +++++-- ydb/core/tx/columnshard/operations/write.cpp | 2 +- 4 files changed, 12 insertions(+), 8 deletions(-) diff --git a/ydb/core/tx/columnshard/columnshard__write.cpp b/ydb/core/tx/columnshard/columnshard__write.cpp index 1f43312fdbef..aa3e198cae76 100644 --- a/ydb/core/tx/columnshard/columnshard__write.cpp +++ b/ydb/core/tx/columnshard/columnshard__write.cpp @@ -283,9 +283,9 @@ void TColumnShard::Handle(TEvColumnShard::TEvWrite::TPtr& ev, const TActorContex writeData.MutableWriteMeta().SetWriteMiddle1StartInstant(TMonotonic::Now()); NOlap::TWritingContext context(TabletID(), SelfId(), snapshotSchema, StoragesManager, - Counters.GetIndexationCounters().SplitterCounters, Counters.GetCSCounters().WritingCounters); + Counters.GetIndexationCounters().SplitterCounters, Counters.GetCSCounters().WritingCounters, GetLastTxSnapshot()); std::shared_ptr task = std::make_shared( - BufferizationWriteActorId, std::move(writeData), GetLastTxSnapshot(), context); + BufferizationWriteActorId, std::move(writeData), context); NConveyor::TInsertServiceOperator::AsyncTaskToExecute(task); } } @@ -589,9 +589,10 @@ void TColumnShard::Handle(NEvents::TDataEvents::TEvWrite::TPtr& ev, const TActor pathId, lockId, cookie, granuleShardingVersionId, *mType, AppDataVerified().FeatureFlags.GetEnableWritePortionsOnInsert()); Y_ABORT_UNLESS(writeOperation); writeOperation->SetBehaviour(behaviour); + const TSnapshot applyToSnapshot(record.GetMvccSnapshot().GetStep(), record.GetMvccSnapshot().GetTxId()); NOlap::TWritingContext wContext( pathId, SelfId(), schema, StoragesManager, Counters.GetIndexationCounters().SplitterCounters, - Counters.GetCSCounters().WritingCounters); + Counters.GetCSCounters().WritingCounters, applyToSnapshot); arrowData->SetSeparationPoints(GetIndexAs().GetGranulePtrVerified(pathId)->GetBucketPositions()); writeOperation->Start(*this, arrowData, source, wContext); } diff --git a/ydb/core/tx/columnshard/operations/batch_builder/builder.h b/ydb/core/tx/columnshard/operations/batch_builder/builder.h index 33b0281e71a6..05fc38d636f8 100644 --- a/ydb/core/tx/columnshard/operations/batch_builder/builder.h +++ b/ydb/core/tx/columnshard/operations/batch_builder/builder.h @@ -26,10 +26,10 @@ class TBuildBatchesTask: public NConveyor::ITask { } TBuildBatchesTask( - const NActors::TActorId bufferActorId, NEvWrite::TWriteData&& writeData, const TSnapshot& actualSnapshot, const TWritingContext& context) + const NActors::TActorId bufferActorId, NEvWrite::TWriteData&& writeData, const TWritingContext& context) : WriteData(std::move(writeData)) , BufferActorId(bufferActorId) - , ActualSnapshot(actualSnapshot) + , ActualSnapshot(context.GetApplyToSnapshot()) , Context(context) { } }; diff --git a/ydb/core/tx/columnshard/operations/common/context.h b/ydb/core/tx/columnshard/operations/common/context.h index 41c10d2eb009..ae89ff8f536b 100644 --- a/ydb/core/tx/columnshard/operations/common/context.h +++ b/ydb/core/tx/columnshard/operations/common/context.h @@ -14,17 +14,20 @@ class TWritingContext { YDB_READONLY_DEF(std::shared_ptr, StoragesManager); YDB_READONLY_DEF(std::shared_ptr, SplitterCounters); YDB_READONLY_DEF(std::shared_ptr, WritingCounters); + YDB_READONLY(TSnapshot, ApplyToSnapshot, TSnapshot::Zero()); public: TWritingContext(const ui64 tabletId, const NActors::TActorId& tabletActorId, const std::shared_ptr& actualSchema, const std::shared_ptr& operators, const std::shared_ptr& splitterCounters, - const std::shared_ptr& writingCounters) + const std::shared_ptr& writingCounters, const TSnapshot& applyToSnapshot) : TabletId(tabletId) , TabletActorId(tabletActorId) , ActualSchema(actualSchema) , StoragesManager(operators) , SplitterCounters(splitterCounters) - , WritingCounters(writingCounters) { + , WritingCounters(writingCounters) + , ApplyToSnapshot(applyToSnapshot) + { } }; } // namespace NKikimr::NOlap diff --git a/ydb/core/tx/columnshard/operations/write.cpp b/ydb/core/tx/columnshard/operations/write.cpp index b90e27218134..ccfe6e456f78 100644 --- a/ydb/core/tx/columnshard/operations/write.cpp +++ b/ydb/core/tx/columnshard/operations/write.cpp @@ -37,7 +37,7 @@ void TWriteOperation::Start( NEvWrite::TWriteData writeData(writeMeta, data, owner.TablesManager.GetPrimaryIndex()->GetReplaceKey(), owner.StoragesManager->GetInsertOperator()->StartWritingAction(NOlap::NBlobOperations::EConsumer::WRITING_OPERATOR), WritePortions); std::shared_ptr task = - std::make_shared(owner.BufferizationWriteActorId, std::move(writeData), owner.GetLastTxSnapshot(), context); + std::make_shared(owner.BufferizationWriteActorId, std::move(writeData), context.GetApplyToSnapshot(), context); NConveyor::TInsertServiceOperator::AsyncTaskToExecute(task); Status = EOperationStatus::Started;