diff --git a/ydb/core/tx/columnshard/columnshard__init.cpp b/ydb/core/tx/columnshard/columnshard__init.cpp index 088294c6896b..3ab0862000ec 100644 --- a/ydb/core/tx/columnshard/columnshard__init.cpp +++ b/ydb/core/tx/columnshard/columnshard__init.cpp @@ -126,8 +126,8 @@ class TTxUpdateSchema: public TTransactionBase { }; bool TTxUpdateSchema::Execute(TTransactionContext& txc, const TActorContext&) { - NActors::TLogContextGuard gLogging = - NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD)("tablet_id", Self->TabletID())("event", "initialize_shard"); + NActors::TLogContextGuard gLogging = NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD)("tablet_id", Self->TabletID())( + "process", "TTxUpdateSchema::Execute"); ACFL_INFO("step", "TTxUpdateSchema.Execute_Start")("details", Self->NormalizerController.DebugString()); while (!Self->NormalizerController.IsNormalizationFinished()) { @@ -153,6 +153,8 @@ bool TTxUpdateSchema::Execute(TTransactionContext& txc, const TActorContext&) { } void TTxUpdateSchema::Complete(const TActorContext& ctx) { + NActors::TLogContextGuard gLogging = + NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD)("tablet_id", Self->TabletID())("process", "TTxUpdateSchema::Complete"); AFL_INFO(NKikimrServices::TX_COLUMNSHARD)("step", "TTxUpdateSchema.Complete"); Self->Counters.GetCSCounters().Initialization.OnTxUpdateSchemaFinished(TMonotonic::Now() - StartInstant); if (NormalizerTasks.empty()) { @@ -190,13 +192,13 @@ class TTxApplyNormalizer: public TTransactionBase { bool TTxApplyNormalizer::Execute(TTransactionContext& txc, const TActorContext&) { NActors::TLogContextGuard gLogging = - NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD)("tablet_id", Self->TabletID())("event", "initialize_shard"); + NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD)("tablet_id", Self->TabletID())("event", "TTxApplyNormalizer::Execute"); AFL_INFO(NKikimrServices::TX_COLUMNSHARD)("step", "TTxApplyNormalizer.Execute")("details", Self->NormalizerController.DebugString()); if (!Changes->ApplyOnExecute(txc, Self->NormalizerController)) { return false; } - if (Self->NormalizerController.GetNormalizer()->GetActiveTasksCount() == 1) { + if (Self->NormalizerController.GetNormalizer()->DecActiveCounters() == 1) { NIceDb::TNiceDb db(txc.DB); Self->NormalizerController.OnNormalizerFinished(db); } @@ -204,12 +206,12 @@ bool TTxApplyNormalizer::Execute(TTransactionContext& txc, const TActorContext&) } void TTxApplyNormalizer::Complete(const TActorContext& ctx) { - AFL_INFO(NKikimrServices::TX_COLUMNSHARD)("step", "TTxApplyNormalizer.Complete")("tablet_id", Self->TabletID())("event", "initialize_shard"); + NActors::TLogContextGuard gLogging = NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD)("tablet_id", Self->TabletID())( + "event", "TTxApplyNormalizer::Complete"); AFL_VERIFY(!Self->NormalizerController.IsNormalizationFinished())("details", Self->NormalizerController.DebugString()); - AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("tablet_id", Self->TabletID())("event", "apply_normalizer_changes")( + AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("event", "apply_normalizer_changes")( "details", Self->NormalizerController.DebugString())("size", Changes->GetSize()); Changes->ApplyOnComplete(Self->NormalizerController); - Self->NormalizerController.GetNormalizer()->OnResultReady(); if (Self->NormalizerController.GetNormalizer()->HasActiveTasks()) { return; } @@ -240,6 +242,8 @@ class TTxInitSchema: public TTransactionBase { }; bool TTxInitSchema::Execute(TTransactionContext& txc, const TActorContext&) { + NActors::TLogContextGuard gLogging = NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD)("tablet_id", Self->TabletID())( + "process", "TTxInitSchema::Execute"); LOG_S_DEBUG("TxInitSchema.Execute at tablet " << Self->TabletID()); const bool isFirstRun = txc.DB.GetScheme().IsEmpty(); @@ -286,6 +290,8 @@ bool TTxInitSchema::Execute(TTransactionContext& txc, const TActorContext&) { } void TTxInitSchema::Complete(const TActorContext& ctx) { + NActors::TLogContextGuard gLogging = + NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD)("tablet_id", Self->TabletID())("process", "TTxInitSchema::Complete"); Self->Counters.GetCSCounters().Initialization.OnTxInitSchemaFinished(TMonotonic::Now() - StartInstant); LOG_S_DEBUG("TxInitSchema.Complete at tablet " << Self->TabletID();); Self->Execute(new TTxUpdateSchema(Self), ctx); diff --git a/ydb/core/tx/columnshard/columnshard_impl.cpp b/ydb/core/tx/columnshard/columnshard_impl.cpp index f2a0a5ffa3ba..4583473f425d 100644 --- a/ydb/core/tx/columnshard/columnshard_impl.cpp +++ b/ydb/core/tx/columnshard/columnshard_impl.cpp @@ -1279,9 +1279,17 @@ class TTxAskPortionChunks: public TTransactionBase { bool reask = false; for (auto&& i : PortionsByPath) { for (auto&& p : i.second) { - auto rowset = db.Table().Prefix(p->GetPathId(), p->GetPortionId()).Select(); - if (!rowset.IsReady()) { - reask = true; + { + auto rowset = db.Table().Prefix(p->GetPathId(), p->GetPortionId()).Select(); + if (!rowset.IsReady()) { + reask = true; + } + } + { + auto rowset = db.Table().Prefix(p->GetPathId(), p->GetPortionId()).Select(); + if (!rowset.IsReady()) { + reask = true; + } } } } @@ -1295,12 +1303,12 @@ class TTxAskPortionChunks: public TTransactionBase { std::vector records; std::vector indexes; { - auto rowset = db.Table().Prefix(p->GetPathId(), p->GetPortionId()).Select(); + auto rowset = db.Table().Prefix(p->GetPathId(), p->GetPortionId()).Select(); if (!rowset.IsReady()) { return false; } while (!rowset.EndOfSet()) { - records.emplace_back(NOlap::TColumnChunkLoadContextV1(rowset)); + NOlap::TColumnChunkLoadContextV1::BuildFromDBV2(rowset, records); if (!rowset.Next()) { return false; } diff --git a/ydb/core/tx/columnshard/columnshard_schema.h b/ydb/core/tx/columnshard/columnshard_schema.h index 9d8a0c7bae2f..6c743fe19a72 100644 --- a/ydb/core/tx/columnshard/columnshard_schema.h +++ b/ydb/core/tx/columnshard/columnshard_schema.h @@ -569,6 +569,15 @@ struct Schema : NIceDb::Schema { using TColumns = TableColumns; }; + struct IndexColumnsV2: Table { + struct PathId: Column<1, NScheme::NTypeIds::Uint64> {}; + struct PortionId: Column<2, NScheme::NTypeIds::Uint64> {}; + struct Metadata: Column<3, NScheme::NTypeIds::String> {}; + + using TKey = TableKey; + using TColumns = TableColumns; + }; + using TTables = SchemaTables< Value, TxInfo, @@ -607,7 +616,8 @@ struct Schema : NIceDb::Schema { TxDependencies, TxStates, TxEvents, - IndexColumnsV1 + IndexColumnsV1, + IndexColumnsV2 >; // @@ -997,6 +1007,29 @@ class TColumnChunkLoadContextV1 { YDB_READONLY_DEF(NKikimrTxColumnShard::TIndexColumnMeta, MetaProto); public: + template + static void BuildFromDBV2(const TSource& rowset, std::vector& records) { + const ui64 pathId = rowset.template GetValue(); + const ui64 portionId = rowset.template GetValue(); + const TString metadata = rowset.template GetValue(); + NKikimrTxColumnShard::TIndexPortionAccessor metaProto; + AFL_VERIFY(metaProto.ParseFromArray(metadata.data(), metadata.size()))("event", "cannot parse metadata as protobuf"); + for (auto&& i : metaProto.GetChunks()) { + TColumnChunkLoadContextV1 result(pathId, portionId, TChunkAddress(i.GetSSColumnId(), i.GetChunkIdx()), + TBlobRangeLink16::BuildFromProto(i.GetBlobRangeLink()).DetachResult(), i.GetMetadata()); + records.emplace_back(std::move(result)); + } + } + + NKikimrTxColumnShard::TColumnChunkInfo SerializeToDBProto() const { + NKikimrTxColumnShard::TColumnChunkInfo proto; + proto.SetSSColumnId(Address.GetColumnId()); + proto.SetChunkIdx(Address.GetChunkIdx()); + *proto.MutableMetadata() = MetaProto; + *proto.MutableBlobRangeLink() = BlobRange.SerializeToProto(); + return proto; + } + TFullChunkAddress GetFullChunkAddress() const { return TFullChunkAddress(PathId, PortionId, Address.GetEntityId(), Address.GetChunkIdx()); } diff --git a/ydb/core/tx/columnshard/engines/changes/actualization/construction/context.cpp b/ydb/core/tx/columnshard/engines/changes/actualization/construction/context.cpp index 2eecd8ed6464..a53fc5d1205a 100644 --- a/ydb/core/tx/columnshard/engines/changes/actualization/construction/context.cpp +++ b/ydb/core/tx/columnshard/engines/changes/actualization/construction/context.cpp @@ -6,8 +6,10 @@ namespace NKikimr::NOlap::NActualizer { TTieringProcessContext::TTieringProcessContext(const ui64 memoryUsageLimit, const TSaverContext& saverContext, - const std::shared_ptr& dataLocksManager, const NColumnShard::TEngineLogsCounters& counters, const std::shared_ptr& controller) - : MemoryUsageLimit(memoryUsageLimit) + const std::shared_ptr& dataLocksManager, const TVersionedIndex& versionedIndex, + const NColumnShard::TEngineLogsCounters& counters, const std::shared_ptr& controller) + : VersionedIndex(versionedIndex) + , MemoryUsageLimit(memoryUsageLimit) , SaverContext(saverContext) , Counters(counters) , Controller(controller) @@ -31,10 +33,10 @@ bool TTieringProcessContext::AddPortion( }; auto it = Tasks.find(features.GetRWAddress()); if (it == Tasks.end()) { - std::vector tasks = {buildNewTask()}; + std::vector tasks = { buildNewTask() }; it = Tasks.emplace(features.GetRWAddress(), std::move(tasks)).first; } - if (it->second.back().GetTxWriteVolume() + info->GetTxVolume() > TGlobalLimits::TxWriteLimitBytes / 2 && it->second.back().GetTxWriteVolume()) { + if (!it->second.back().CanTakePortionInTx(info, VersionedIndex)) { if (Controller->IsNewTaskAvailable(it->first, it->second.size())) { it->second.emplace_back(buildNewTask()); } else { @@ -53,7 +55,7 @@ bool TTieringProcessContext::AddPortion( } it->second.back().MutableMemoryUsage() = it->second.back().GetMemoryPredictor()->AddPortion(info); } - it->second.back().MutableTxWriteVolume() += info->GetTxVolume(); + it->second.back().TakePortionInTx(info, VersionedIndex); if (features.GetTargetTierName() == NTiering::NCommon::DeleteTierName) { AFL_VERIFY(dWait); Counters.OnPortionToDrop(info->GetTotalBlobBytes(), *dWait); diff --git a/ydb/core/tx/columnshard/engines/changes/actualization/construction/context.h b/ydb/core/tx/columnshard/engines/changes/actualization/construction/context.h index f4bca2ef38d3..0601e088c4c6 100644 --- a/ydb/core/tx/columnshard/engines/changes/actualization/construction/context.h +++ b/ydb/core/tx/columnshard/engines/changes/actualization/construction/context.h @@ -15,17 +15,34 @@ class TTaskConstructor { YDB_READONLY_DEF(std::shared_ptr, MemoryPredictor); YDB_READONLY_DEF(std::shared_ptr, Task); YDB_ACCESSOR(ui64, MemoryUsage, 0); - YDB_ACCESSOR(ui64, TxWriteVolume, 0); + YDB_READONLY(ui64, PortionsCount, 0); + YDB_READONLY(ui64, ChunksCount, 0); + public: TTaskConstructor(const std::shared_ptr& predictor, const std::shared_ptr& task) : MemoryPredictor(predictor) , Task(task) { } + + bool CanTakePortionInTx(const TPortionInfo::TConstPtr& portion, const TVersionedIndex& index) { + if (!PortionsCount) { + return true; + } + return + (PortionsCount + 1 < 1000) && + (ChunksCount + portion->GetApproxChunksCount(portion->GetSchema(index)->GetColumnsCount()) < 100000); + } + + void TakePortionInTx(const TPortionInfo::TConstPtr& portion, const TVersionedIndex& index) { + ++PortionsCount; + ChunksCount += portion->GetApproxChunksCount(portion->GetSchema(index)->GetColumnsCount()); + } }; class TTieringProcessContext { private: + const TVersionedIndex& VersionedIndex; THashSet UsedPortions; const ui64 MemoryUsageLimit; TSaverContext SaverContext; @@ -63,7 +80,8 @@ class TTieringProcessContext { } } - TTieringProcessContext(const ui64 memoryUsageLimit, const TSaverContext& saverContext, const std::shared_ptr& dataLocksManager, + TTieringProcessContext(const ui64 memoryUsageLimit, const TSaverContext& saverContext, + const std::shared_ptr& dataLocksManager, const TVersionedIndex& versionedIndex, const NColumnShard::TEngineLogsCounters& counters, const std::shared_ptr& controller); }; diff --git a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp index 186935239478..b300b52174bf 100644 --- a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp +++ b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp @@ -316,11 +316,13 @@ std::shared_ptr TColumnEngineForLogs::Start std::shared_ptr changes = std::make_shared(StoragesManager); // Add all portions from dropped paths - ui64 txSize = 0; - const ui64 txSizeLimit = TGlobalLimits::TxWriteLimitBytes / 4; + ui64 portionsCount = 0; + ui64 chunksCount = 0; ui32 skipLocked = 0; ui32 portionsFromDrop = 0; bool limitExceeded = false; + const ui32 maxChunksCount = 100000; + const ui32 maxPortionsCount = 1000; for (ui64 pathId : pathsToDrop) { auto g = GranulesStorage->GetGranuleOptional(pathId); if (!g) { @@ -335,8 +337,9 @@ std::shared_ptr TColumnEngineForLogs::Start ++skipLocked; continue; } - if (txSize + info->GetTxVolume() < txSizeLimit || changes->GetPortionsToDrop().empty()) { - txSize += info->GetTxVolume(); + ++portionsCount; + chunksCount += info->GetApproxChunksCount(info->GetSchema(VersionedIndex)->GetColumnsCount()); + if ((portionsCount < maxPortionsCount && chunksCount < maxChunksCount) || changes->GetPortionsToDrop().empty()) { } else { limitExceeded = true; break; @@ -360,8 +363,9 @@ std::shared_ptr TColumnEngineForLogs::Start continue; } AFL_VERIFY(it->second[i]->CheckForCleanup(snapshot))("p_snapshot", it->second[i]->GetRemoveSnapshotOptional())("snapshot", snapshot); - if (txSize + it->second[i]->GetTxVolume() < txSizeLimit || changes->GetPortionsToDrop().empty()) { - txSize += it->second[i]->GetTxVolume(); + ++portionsCount; + chunksCount += it->second[i]->GetApproxChunksCount(it->second[i]->GetSchema(VersionedIndex)->GetColumnsCount()); + if ((portionsCount < maxPortionsCount && chunksCount < maxChunksCount) || changes->GetPortionsToDrop().empty()) { } else { limitExceeded = true; break; @@ -397,7 +401,7 @@ std::vector> TColumnEngineForLogs::Star AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "StartTtl")("external", pathEviction.size()); TSaverContext saverContext(StoragesManager); - NActualizer::TTieringProcessContext context(memoryUsageLimit, saverContext, dataLocksManager, SignalCounters, ActualizationController); + NActualizer::TTieringProcessContext context(memoryUsageLimit, saverContext, dataLocksManager, VersionedIndex, SignalCounters, ActualizationController); const TDuration actualizationLag = NYDBTest::TControllers::GetColumnShardController()->GetActualizationTasksLag(); for (auto&& i : pathEviction) { auto g = GetGranuleOptional(i.first); diff --git a/ydb/core/tx/columnshard/engines/portions/column_record.h b/ydb/core/tx/columnshard/engines/portions/column_record.h index 9f9b27301338..50eff23677f4 100644 --- a/ydb/core/tx/columnshard/engines/portions/column_record.h +++ b/ydb/core/tx/columnshard/engines/portions/column_record.h @@ -121,6 +121,14 @@ class TColumnRecord { return BlobRange; } + NKikimrTxColumnShard::TColumnChunkInfo SerializeToDBProto() const { + NKikimrTxColumnShard::TColumnChunkInfo result; + result.SetSSColumnId(GetEntityId()); + result.SetChunkIdx(GetChunkIdx()); + result.SetMetadata(Meta.SerializeToProto()); + *result.MutableBlobRangeLink() = BlobRange.SerializeToProto(); + return result; + } NKikimrColumnShardDataSharingProto::TColumnRecord SerializeToProto() const; static TConclusion BuildFromProto(const NKikimrColumnShardDataSharingProto::TColumnRecord& proto) { TColumnRecord result; diff --git a/ydb/core/tx/columnshard/engines/portions/data_accessor.cpp b/ydb/core/tx/columnshard/engines/portions/data_accessor.cpp index 03a2e0876860..5369ee82c52f 100644 --- a/ydb/core/tx/columnshard/engines/portions/data_accessor.cpp +++ b/ydb/core/tx/columnshard/engines/portions/data_accessor.cpp @@ -501,6 +501,12 @@ void TPortionDataAccessor::SaveToDatabase(IDbWrapper& db, const ui32 firstPKColu FullValidation(); db.WritePortion(*PortionInfo); if (!saveOnlyMeta) { + NKikimrTxColumnShard::TIndexPortionAccessor protoData; + for (auto& record : GetRecordsVerified()) { + *protoData.AddChunks() = record.SerializeToDBProto(); + } + db.WriteColumns(*PortionInfo, std::move(protoData)); + for (auto& record : GetRecordsVerified()) { db.WriteColumn(*PortionInfo, record, firstPKColumnId); } @@ -533,7 +539,7 @@ void TPortionDataAccessor::FullValidation() const { blobIdxs.emplace(bRange->GetBlobIdxVerified()); } } - AFL_VERIFY(blobIdxs.size()); + AFL_VERIFY(blobIdxs.size())("portion_info", PortionInfo->DebugString()); AFL_VERIFY(PortionInfo->GetBlobIdsCount() == blobIdxs.size()); AFL_VERIFY(PortionInfo->GetBlobIdsCount() == *blobIdxs.rbegin() + 1); } diff --git a/ydb/core/tx/columnshard/engines/portions/portion_info.cpp b/ydb/core/tx/columnshard/engines/portions/portion_info.cpp index 58859e35b496..6a180c5d2808 100644 --- a/ydb/core/tx/columnshard/engines/portions/portion_info.cpp +++ b/ydb/core/tx/columnshard/engines/portions/portion_info.cpp @@ -50,8 +50,8 @@ ui64 TPortionInfo::GetMetadataMemorySize() const { return sizeof(TPortionInfo) - sizeof(TPortionMeta) + Meta.GetMetadataMemorySize(); } -ui64 TPortionInfo::GetTxVolume() const { - return 1024; +ui64 TPortionInfo::GetApproxChunksCount(const ui32 schemaColumnsCount) const { + return schemaColumnsCount * 256 * (GetRecordsCount() / 10000 + 1); } void TPortionInfo::SerializeToProto(NKikimrColumnShardDataSharingProto::TPortionInfo& proto) const { diff --git a/ydb/core/tx/columnshard/engines/portions/portion_info.h b/ydb/core/tx/columnshard/engines/portions/portion_info.h index f596cb38884f..680711b15ea8 100644 --- a/ydb/core/tx/columnshard/engines/portions/portion_info.h +++ b/ydb/core/tx/columnshard/engines/portions/portion_info.h @@ -221,7 +221,11 @@ class TPortionInfo { const TString& GetIndexStorageId(const ui32 columnId, const TIndexInfo& indexInfo) const; const TString& GetEntityStorageId(const ui32 entityId, const TIndexInfo& indexInfo) const; - ui64 GetTxVolume() const; // fake-correct method for determ volume on rewrite this portion in transaction progress + ui64 GetTxVolume() const { + return 1024; + } + + ui64 GetApproxChunksCount(const ui32 schemaColumnsCount) const; ui64 GetMetadataMemorySize() const; void SerializeToProto(NKikimrColumnShardDataSharingProto::TPortionInfo& proto) const; diff --git a/ydb/core/tx/columnshard/engines/protos/portion_info.proto b/ydb/core/tx/columnshard/engines/protos/portion_info.proto index fae848b95033..b8da3c4fc7ad 100644 --- a/ydb/core/tx/columnshard/engines/protos/portion_info.proto +++ b/ydb/core/tx/columnshard/engines/protos/portion_info.proto @@ -35,3 +35,14 @@ message TIndexColumnMeta { optional NKikimrSSA.TProgram.TConstant MaxValue = 4; optional TIndexPortionMeta PortionMeta = 5[deprecated = true]; // First PK column could contain portion info } + +message TColumnChunkInfo { + optional uint32 SSColumnId = 1; + optional uint32 ChunkIdx = 2; + optional TIndexColumnMeta Metadata = 3; + optional TBlobRangeLink16 BlobRangeLink = 4; +} + +message TIndexPortionAccessor { + repeated TColumnChunkInfo Chunks = 1; +} diff --git a/ydb/core/tx/columnshard/normalizer/abstract/abstract.cpp b/ydb/core/tx/columnshard/normalizer/abstract/abstract.cpp index 5cb3c2c8c375..0c3f07ee67c4 100644 --- a/ydb/core/tx/columnshard/normalizer/abstract/abstract.cpp +++ b/ydb/core/tx/columnshard/normalizer/abstract/abstract.cpp @@ -7,6 +7,7 @@ namespace NKikimr::NOlap { TNormalizationController::INormalizerComponent::TPtr TNormalizationController::RegisterNormalizer(INormalizerComponent::TPtr normalizer) { + AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("event", "normalizer_register")("description", normalizer->DebugString()); AFL_VERIFY(normalizer); Counters.emplace_back(normalizer->GetClassName()); Normalizers.emplace_back(normalizer); @@ -30,12 +31,19 @@ bool TNormalizationController::TNormalizationController::IsNormalizationFinished bool TNormalizationController::SwitchNormalizer() { if (IsNormalizationFinished()) { + AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("event", "normalization_finished"); return false; } - Y_ABORT_UNLESS(!GetNormalizer()->HasActiveTasks()); + AFL_VERIFY(!GetNormalizer()->HasActiveTasks()); GetCounters().OnNormalizerFinish(); Normalizers.pop_front(); Counters.pop_front(); + if (Normalizers.size()) { + AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("event", "normalizer_switched")("description", Normalizers.front()->DebugString())( + "id", Normalizers.front()->GetEnumSequentialId()); + } else { + AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("event", "normalization_finished"); + } return !IsNormalizationFinished(); } @@ -51,7 +59,10 @@ void TNormalizationController::OnNormalizerFinished(NIceDb::TNiceDb& db) const { if (auto seqId = GetNormalizer()->GetSequentialId()) { NColumnShard::Schema::SaveSpecialValue(db, NColumnShard::Schema::EValueIds::LastNormalizerSequentialId, *seqId); } - NColumnShard::Schema::FinishNormalizer(db, GetNormalizer()->GetClassName(), GetNormalizer()->GetUniqueDescription(), GetNormalizer()->GetUniqueId()); + AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("event", "normalizer_finished")("description", GetNormalizer()->DebugString())( + "id", GetNormalizer()->GetSequentialId()); + NColumnShard::Schema::FinishNormalizer( + db, GetNormalizer()->GetClassName(), GetNormalizer()->GetUniqueDescription(), GetNormalizer()->GetUniqueId()); } void TNormalizationController::InitNormalizers(const TInitContext& ctx) { @@ -74,6 +85,7 @@ void TNormalizationController::InitNormalizers(const TInitContext& ctx) { LastSavedNormalizerId = {}; } + AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("event", "normalization_start")("last_saved_id", LastSavedNormalizerId); auto normalizers = GetEnumAllValues(); auto lastRegisteredNormalizer = ENormalizerSequentialId::Granules; for (auto nType : normalizers) { @@ -137,7 +149,7 @@ bool TNormalizationController::InitControllerState(NIceDb::TNiceDb& db) { } NKikimr::TConclusion> TNormalizationController::INormalizerComponent::Init(const TNormalizationController& controller, NTabletFlatExecutor::TTransactionContext& txc) { - AFL_NOTICE(NKikimrServices::TX_COLUMNSHARD)("event", "normalization_init")("last", controller.GetLastSavedNormalizerId()) + AFL_NOTICE(NKikimrServices::TX_COLUMNSHARD)("event", "normalizer_init")("last", controller.GetLastSavedNormalizerId()) ("seq_id", GetSequentialId())("type", GetEnumSequentialId()); auto result = DoInit(controller, txc); if (!result.IsSuccess()) { diff --git a/ydb/core/tx/columnshard/normalizer/abstract/abstract.h b/ydb/core/tx/columnshard/normalizer/abstract/abstract.h index 3f002b2d21a5..a41a97a37a83 100644 --- a/ydb/core/tx/columnshard/normalizer/abstract/abstract.h +++ b/ydb/core/tx/columnshard/normalizer/abstract/abstract.h @@ -65,6 +65,7 @@ enum class ENormalizerSequentialId: ui32 { SyncMinSnapshotFromChunks, DeprecatedRestoreV1Chunks_V1, RestoreV1Chunks_V2, + RestoreV2Chunks, MAX }; @@ -174,13 +175,10 @@ class TNormalizationController { return AtomicGet(ActiveTasksCount) > 0; } - void OnResultReady() { - AFL_VERIFY(ActiveTasksCount > 0); - AtomicDecrement(ActiveTasksCount); - } - - i64 GetActiveTasksCount() const { - return AtomicGet(ActiveTasksCount); + [[nodiscard]] ui64 DecActiveCounters() { + const i64 result = AtomicDecrement(ActiveTasksCount); + AFL_VERIFY(result >= 0); + return result; } std::optional GetEnumSequentialId() const { diff --git a/ydb/core/tx/columnshard/normalizer/portion/restore_v1_chunks.cpp b/ydb/core/tx/columnshard/normalizer/portion/restore_v1_chunks.cpp index a545e99ed706..33b2a1dc9178 100644 --- a/ydb/core/tx/columnshard/normalizer/portion/restore_v1_chunks.cpp +++ b/ydb/core/tx/columnshard/normalizer/portion/restore_v1_chunks.cpp @@ -160,7 +160,9 @@ TConclusion> TNormalizer::DoInit( while (!rowset.EndOfSet()) { TPortionLoadContext portion(rowset); existPortions0.emplace(portion.GetPortionId()); - AFL_VERIFY(portions0.emplace(portion.GetPortionId(), portion).second); + if (!portion.GetMetaProto().BlobIdsSize()) { + AFL_VERIFY(portions0.emplace(portion.GetPortionId(), portion).second); + } if (!rowset.Next()) { return TConclusionStatus::Fail("Not ready"); @@ -194,10 +196,10 @@ TConclusion> TNormalizer::DoInit( while (!rowset.EndOfSet()) { TColumnChunkLoadContextV1 chunk(rowset); -// AFL_VERIFY(!portions0.contains(chunk.GetPortionId())); -// if (!existPortions0.contains(chunk.GetPortionId())) { + //AFL_VERIFY(!portions0.contains(chunk.GetPortionId())); + if (!existPortions0.contains(chunk.GetPortionId())) { AFL_VERIFY(columns1Remove.emplace(chunk.GetFullChunkAddress(), chunk).second); -// } + } if (!rowset.Next()) { return TConclusionStatus::Fail("Not ready"); @@ -248,4 +250,4 @@ TConclusion> TNormalizer::DoInit( return tasks; } -} // namespace NKikimr::NOlap::NRestorePortionsFromChunks +} // namespace NKikimr::NOlap::NRestoreV1Chunks diff --git a/ydb/core/tx/columnshard/normalizer/portion/restore_v2_chunks.cpp b/ydb/core/tx/columnshard/normalizer/portion/restore_v2_chunks.cpp new file mode 100644 index 000000000000..030d3eb0c087 --- /dev/null +++ b/ydb/core/tx/columnshard/normalizer/portion/restore_v2_chunks.cpp @@ -0,0 +1,176 @@ +#include "normalizer.h" +#include "restore_v1_chunks.h" + +#include +#include +#include +#include + +namespace NKikimr::NOlap::NRestoreV2Chunks { + +class TV2BuildTask { +private: + TPortionAddress PortionAddress; + std::vector Chunks; + +public: + void AddChunk(const TColumnChunkLoadContextV1& chunk) { + Chunks.emplace_back(chunk); + } + + NKikimrTxColumnShard::TIndexPortionAccessor BuildProto() const { + const auto pred = [](const TColumnChunkLoadContextV1& l, const TColumnChunkLoadContextV1& r) { + return l.GetAddress() < r.GetAddress(); + }; + std::sort(Chunks.begin(), Chunks.end(), pred); + NKikimrTxColumnShard::TIndexPortionAccessor result; + for (auto&& c : Chunks) { + *result.AddChunks() = c.SerializeToDBProto(); + } + return result; + } + + TV2BuildTask(const TPortionAddress& address) + : PortionAddress(address) { + } +}; + +class TChangesAddV2: public INormalizerChanges { +private: + std::vector Patches; + +public: + TChangesAddV2(std::vector&& patches) + : Patches(std::move(patches)) { + } + virtual bool ApplyOnExecute(NTabletFlatExecutor::TTransactionContext& txc, const TNormalizationController&) const override { + using namespace NColumnShard; + NIceDb::TNiceDb db(txc.DB); + using IndexColumnsV2 = NColumnShard::Schema::IndexColumnsV2; + for (auto&& i : Patches) { + auto metaProto = i.BuildProto(); + db.Table() + .Key(i.GetPathId(), i.GetPortionId()) + .Update(NIceDb::TUpdate(metaProto.SerializeAsString())); + } + + return true; + } + + virtual ui64 GetSize() const override { + return Patches.size(); + } +}; + +class TPatchItemRemoveV1 { +private: + TColumnChunkLoadContextV1 ChunkInfo; + +public: + const TColumnChunkLoadContextV1& GetChunkInfo() const { + return ChunkInfo; + } + + TPatchItemRemoveV1(const TColumnChunkLoadContextV1& chunkInfo) + : ChunkInfo(chunkInfo) { + } +}; + +class TChangesRemoveV1: public INormalizerChanges { +private: + std::vector Patches; + +public: + TChangesRemoveV1(std::vector&& patches) + : Patches(std::move(patches)) { + } + virtual bool ApplyOnExecute(NTabletFlatExecutor::TTransactionContext& txc, const TNormalizationController&) const override { + using namespace NColumnShard; + NIceDb::TNiceDb db(txc.DB); + using IndexColumnsV1 = NColumnShard::Schema::IndexColumnsV1; + for (auto&& i : Patches) { + db.Table() + .Key(i.GetChunkInfo().GetPathId(), i.GetChunkInfo().GetPortionId(), i.GetChunkInfo().GetAddress().GetEntityId(), + i.GetChunkInfo().GetAddress().GetChunkIdx()) + .Delete(); + } + + return true; + } + + virtual ui64 GetSize() const override { + return Patches.size(); + } +}; + +TConclusion> TNormalizer::DoInit( + const TNormalizationController& /*controller*/, NTabletFlatExecutor::TTransactionContext& txc) { + using namespace NColumnShard; + NIceDb::TNiceDb db(txc.DB); + + bool ready = true; + ready = ready & Schema::Precharge(db, txc.DB.GetScheme()); + ready = ready & Schema::Precharge(db, txc.DB.GetScheme()); + if (!ready) { + return TConclusionStatus::Fail("Not ready"); + } + AFL_VERIFY(AppDataVerified().ColumnShardConfig.GetColumnChunksV1Usage()); + THashSet readyPortions; + THashMap> buildPortions; + { + auto rowset = db.Table().Select(); + if (!rowset.IsReady()) { + return TConclusionStatus::Fail("Not ready"); + } + + while (!rowset.EndOfSet()) { + AFL_VERIFY(readyPortions.emplace(TPortionAddress(rowset.template GetValue(), + rowset.template GetValue())).second); + if (!rowset.Next()) { + return TConclusionStatus::Fail("Not ready"); + } + } + } + + { + auto rowset = db.Table().Select(); + if (!rowset.IsReady()) { + return TConclusionStatus::Fail("Not ready"); + } + + while (!rowset.EndOfSet()) { + TColumnChunkLoadContextV1 chunk(rowset); + if (!readyPortions.contains(chunk.GetPortionId())) { + auto it = buildPortions.find(chunk.GetPortionAddress()); + if (it == buildPortions.end()) { + it = buildPortions.emplace(chunk.GetPortionAddress(), TV2BuildTask(chunk.GetPortionAddress())).first; + } + it->second.AddChunk(chunk); + } + + if (!rowset.Next()) { + return TConclusionStatus::Fail("Not ready"); + } + } + } + + { + std::vector package; + for (auto&& [portionAddress, portionInfos] : buildPortions) { + package.emplace_back(std::move(portionInfos)); + if (package.size() == 100) { + std::vector local; + local.swap(package); + tasks.emplace_back(std::make_shared(std::make_shared(std::move(local)))); + } + } + + if (package.size() > 0) { + tasks.emplace_back(std::make_shared(std::make_shared(std::move(package)))); + } + } + + return tasks; +} + +} // namespace NKikimr::NOlap::NRestoreV1Chunks diff --git a/ydb/core/tx/columnshard/normalizer/portion/restore_v2_chunks.h b/ydb/core/tx/columnshard/normalizer/portion/restore_v2_chunks.h new file mode 100644 index 000000000000..46e7d06ddc95 --- /dev/null +++ b/ydb/core/tx/columnshard/normalizer/portion/restore_v2_chunks.h @@ -0,0 +1,43 @@ +#pragma once + +#include +#include +#include + +namespace NKikimr::NColumnShard { +class TTablesManager; +} + +namespace NKikimr::NOlap::NRestoreV2Chunks { + +class TNormalizer: public TNormalizationController::INormalizerComponent { +public: + static TString GetClassNameStatic() { + return ::ToString(ENormalizerSequentialId::RestoreV2Chunks); + } + + virtual std::optional DoGetEnumSequentialId() const override { + return ENormalizerSequentialId::RestoreV2Chunks; + } + + virtual TString GetClassName() const override { + return GetClassNameStatic(); + } + + class TNormalizerResult; + + static inline INormalizerComponent::TFactory::TRegistrator Registrator = + INormalizerComponent::TFactory::TRegistrator(GetClassNameStatic()); + +public: + TNormalizer(const TNormalizationController::TInitContext& info) + : DsGroupSelector(info.GetStorageInfo()) { + } + + virtual TConclusion> DoInit( + const TNormalizationController& controller, NTabletFlatExecutor::TTransactionContext& txc) override; + +private: + NColumnShard::TBlobGroupSelector DsGroupSelector; +}; +} // namespace NKikimr::NOlap::NChunksActualization diff --git a/ydb/core/tx/columnshard/normalizer/portion/ya.make b/ydb/core/tx/columnshard/normalizer/portion/ya.make index c9392ac97479..077cea88a48d 100644 --- a/ydb/core/tx/columnshard/normalizer/portion/ya.make +++ b/ydb/core/tx/columnshard/normalizer/portion/ya.make @@ -11,6 +11,7 @@ SRCS( GLOBAL chunks_actualization.cpp GLOBAL restore_portion_from_chunks.cpp GLOBAL restore_v1_chunks.cpp + GLOBAL restore_v2_chunks.cpp GLOBAL snapshot_from_chunks.cpp )