From 9d83402a8e996b620b5ba79fd7e41cc75ccfa751 Mon Sep 17 00:00:00 2001 From: zverevgeny Date: Fri, 9 Aug 2024 12:25:15 +0300 Subject: [PATCH] Delete empty portions normalizer (#7596) (#7600) --- .../normalizer/abstract/abstract.h | 1 + .../normalizer/portion/clean_empty.cpp | 120 ++++++++++++++++++ .../normalizer/portion/clean_empty.h | 28 ++++ .../normalizer/portion/normalizer.cpp | 2 +- .../tx/columnshard/normalizer/portion/ya.make | 1 + .../tx/columnshard/ut_rw/ut_normalizer.cpp | 24 +++- 6 files changed, 173 insertions(+), 3 deletions(-) create mode 100644 ydb/core/tx/columnshard/normalizer/portion/clean_empty.cpp create mode 100644 ydb/core/tx/columnshard/normalizer/portion/clean_empty.h diff --git a/ydb/core/tx/columnshard/normalizer/abstract/abstract.h b/ydb/core/tx/columnshard/normalizer/abstract/abstract.h index aaa6b6be1d87..55b0da0e6b92 100644 --- a/ydb/core/tx/columnshard/normalizer/abstract/abstract.h +++ b/ydb/core/tx/columnshard/normalizer/abstract/abstract.h @@ -56,6 +56,7 @@ enum class ENormalizerSequentialId: ui32 { TablesCleaner, PortionsMetadata, CleanGranuleId, + EmptyPortionsCleaner, MAX }; diff --git a/ydb/core/tx/columnshard/normalizer/portion/clean_empty.cpp b/ydb/core/tx/columnshard/normalizer/portion/clean_empty.cpp new file mode 100644 index 000000000000..56a258e0be26 --- /dev/null +++ b/ydb/core/tx/columnshard/normalizer/portion/clean_empty.cpp @@ -0,0 +1,120 @@ +#include "clean_empty.h" +#include + + +namespace NKikimr::NOlap { + +namespace { +std::optional> GetColumnPortionAddresses(NTabletFlatExecutor::TTransactionContext& txc) { + using namespace NColumnShard; + NIceDb::TNiceDb db(txc.DB); + if (!Schema::Precharge(db, txc.DB.GetScheme())) { + return std::nullopt; + } + THashSet usedPortions; + auto rowset = db.Table().Select< + Schema::IndexColumns::PathId, + Schema::IndexColumns::Portion + >(); + if (!rowset.IsReady()) { + return std::nullopt; + } + while (!rowset.EndOfSet()) { + usedPortions.emplace( + rowset.GetValue(), + rowset.GetValue() + ); + if (!rowset.Next()) { + return std::nullopt; + } + } + return usedPortions; +} + +using TBatch = std::vector; + +std::optional> GetPortionsToDelete(NTabletFlatExecutor::TTransactionContext& txc) { + using namespace NColumnShard; + const auto usedPortions = GetColumnPortionAddresses(txc); + if (!usedPortions) { + return std::nullopt; + } + const size_t MaxBatchSize = 10000; + NIceDb::TNiceDb db(txc.DB); + if (!Schema::Precharge(db, txc.DB.GetScheme())) { + return std::nullopt; + } + auto rowset = db.Table().Select< + Schema::IndexPortions::PathId, + Schema::IndexPortions::PortionId + >(); + if (!rowset.IsReady()) { + return std::nullopt; + } + std::vector result; + TBatch portionsToDelete; + while (!rowset.EndOfSet()) { + TPortionAddress addr( + rowset.GetValue(), + rowset.GetValue() + ); + if (!usedPortions->contains(addr)) { + ACFL_WARN("normalizer", "TCleanEmptyPortionsNormalizer")("message", TStringBuilder() << addr.DebugString() << " marked for deletion"); + portionsToDelete.emplace_back(std::move(addr)); + if (portionsToDelete.size() == MaxBatchSize) { + result.emplace_back(std::move(portionsToDelete)); + portionsToDelete = TBatch{}; + } + } + if (!rowset.Next()) { + return std::nullopt; + } + } + if (!portionsToDelete.empty()) { + result.emplace_back(std::move(portionsToDelete)); + } + return result; +} + +class TChanges : public INormalizerChanges { +public: + TChanges(TBatch&& addresses) + : Addresses(addresses) + {} + bool ApplyOnExecute(NTabletFlatExecutor::TTransactionContext& txc, const TNormalizationController&) const override { + using namespace NColumnShard; + NIceDb::TNiceDb db(txc.DB); + for(const auto& a: Addresses) { + db.Table().Key( + a.GetPathId(), + a.GetPortionId() + ).Delete(); + } + ACFL_WARN("normalizer", "TCleanEmptyPortionsNormalizer")("message", TStringBuilder() << GetSize() << " portions deleted"); + return true; + } + + ui64 GetSize() const override { + return Addresses.size(); + } +private: + const TBatch Addresses; +}; + +} //namespace + +TConclusion> TCleanEmptyPortionsNormalizer::DoInit(const TNormalizationController&, NTabletFlatExecutor::TTransactionContext& txc) { + using namespace NColumnShard; + auto batchesToDelete = GetPortionsToDelete(txc); + if (!batchesToDelete) { + return TConclusionStatus::Fail("Not ready"); + } + + std::vector result; + for (auto&& b: *batchesToDelete) { + result.emplace_back(std::make_shared(std::make_shared(std::move(b)))); + } + return result; +} + +} //namespace NKikimr::NOlap diff --git a/ydb/core/tx/columnshard/normalizer/portion/clean_empty.h b/ydb/core/tx/columnshard/normalizer/portion/clean_empty.h new file mode 100644 index 000000000000..920b3d8c0f56 --- /dev/null +++ b/ydb/core/tx/columnshard/normalizer/portion/clean_empty.h @@ -0,0 +1,28 @@ +#pragma once + +#include + +namespace NKikimr::NOlap { + +class TCleanEmptyPortionsNormalizer : public TNormalizationController::INormalizerComponent { + + static TString ClassName() { + return ToString(ENormalizerSequentialId::EmptyPortionsCleaner); + } + static inline auto Registrator = INormalizerComponent::TFactory::TRegistrator(ClassName()); +public: + TCleanEmptyPortionsNormalizer(const TNormalizationController::TInitContext&) + {} + + std::optional DoGetEnumSequentialId() const override { + return ENormalizerSequentialId::EmptyPortionsCleaner; + } + + TString GetClassName() const override { + return ClassName(); + } + + TConclusion> DoInit(const TNormalizationController& controller, NTabletFlatExecutor::TTransactionContext& txc) override; +}; + +} //namespace NKikimr::NOlap diff --git a/ydb/core/tx/columnshard/normalizer/portion/normalizer.cpp b/ydb/core/tx/columnshard/normalizer/portion/normalizer.cpp index b286281c2d91..63cea8b19952 100644 --- a/ydb/core/tx/columnshard/normalizer/portion/normalizer.cpp +++ b/ydb/core/tx/columnshard/normalizer/portion/normalizer.cpp @@ -7,7 +7,7 @@ namespace NKikimr::NOlap { TConclusion> TPortionsNormalizerBase::DoInit(const TNormalizationController& controller, NTabletFlatExecutor::TTransactionContext& txc) { - auto initRes = DoInitImpl(controller,txc); + auto initRes = DoInitImpl(controller, txc); if (initRes.IsFail()) { return initRes; diff --git a/ydb/core/tx/columnshard/normalizer/portion/ya.make b/ydb/core/tx/columnshard/normalizer/portion/ya.make index ec31c82f7b31..ff813694a971 100644 --- a/ydb/core/tx/columnshard/normalizer/portion/ya.make +++ b/ydb/core/tx/columnshard/normalizer/portion/ya.make @@ -5,6 +5,7 @@ SRCS( GLOBAL portion.cpp GLOBAL chunks.cpp GLOBAL clean.cpp + GLOBAL clean_empty.cpp GLOBAL broken_blobs.cpp ) diff --git a/ydb/core/tx/columnshard/ut_rw/ut_normalizer.cpp b/ydb/core/tx/columnshard/ut_rw/ut_normalizer.cpp index 50f305bf4fc6..68eecd9f7b73 100644 --- a/ydb/core/tx/columnshard/ut_rw/ut_normalizer.cpp +++ b/ydb/core/tx/columnshard/ut_rw/ut_normalizer.cpp @@ -3,6 +3,7 @@ #include #include +#include #include @@ -161,7 +162,7 @@ class TColumnChunksCleaner : public NYDBTest::ILocalDBModifier { } }; -class TPortinosCleaner : public NYDBTest::ILocalDBModifier { +class TPortionsCleaner : public NYDBTest::ILocalDBModifier { public: virtual void Apply(NTabletFlatExecutor::TTransactionContext& txc) const override { using namespace NColumnShard; @@ -185,6 +186,21 @@ class TPortinosCleaner : public NYDBTest::ILocalDBModifier { } }; + +class TEmptyPortionsCleaner : public NYDBTest::ILocalDBModifier { +public: + virtual void Apply(NTabletFlatExecutor::TTransactionContext& txc) const override { + using namespace NColumnShard; + NIceDb::TNiceDb db(txc.DB); + for (size_t pathId = 100; pathId != 299; ++pathId) { + for (size_t portionId = 1000; portionId != 1199; ++portionId) { + db.Table().Key(pathId, portionId).Update(); + } + } + } +}; + + class TTablesCleaner : public NYDBTest::ILocalDBModifier { public: virtual void Apply(NTabletFlatExecutor::TTransactionContext& txc) const override { @@ -317,7 +333,11 @@ Y_UNIT_TEST_SUITE(Normalizers) { } Y_UNIT_TEST(PortionsNormalizer) { - TestNormalizerImpl(); + TestNormalizerImpl(); + } + + Y_UNIT_TEST(CleanEmptyPortionsNormalizer) { + TestNormalizerImpl(); } Y_UNIT_TEST(EmptyTablesNormalizer) {