Skip to content

Commit

Permalink
clean trash on versions switching (#9679)
Browse files Browse the repository at this point in the history
  • Loading branch information
ivanmorozov333 authored Sep 24, 2024
1 parent af0844f commit 25fdf52
Show file tree
Hide file tree
Showing 3 changed files with 197 additions and 0 deletions.
102 changes: 102 additions & 0 deletions ydb/core/tx/columnshard/normalizer/portion/special_cleaner.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
#include "special_cleaner.h"

#include <ydb/core/tx/columnshard/columnshard_private_events.h>

namespace NKikimr::NOlap::NNormalizer::NSpecialColumns {

namespace {

class TChanges: public INormalizerChanges {
public:
TChanges(TDeleteTrashImpl::TKeyBatch&& keys)
: Keys(keys) {
}
bool ApplyOnExecute(NTabletFlatExecutor::TTransactionContext& txc, const TNormalizationController& /*normController*/) const override {
using namespace NColumnShard;
NIceDb::TNiceDb db(txc.DB);
for (const auto& k : Keys) {
db.Table<Schema::IndexColumns>().Key(k.Index, k.Granule, k.ColumnIdx, k.PlanStep, k.TxId, k.Portion, k.Chunk).Delete();
}
AFL_NOTICE(NKikimrServices::TX_COLUMNSHARD)("normalizer", "TDeleteTrash")("message", TStringBuilder() << GetSize() << " rows deleted");
return true;
}

ui64 GetSize() const override {
return Keys.size();
}

private:
const TDeleteTrashImpl::TKeyBatch Keys;
};

} //namespace

TConclusion<std::vector<INormalizerTask::TPtr>> TDeleteTrashImpl::DoInit(
const TNormalizationController& /*controller*/, NTabletFlatExecutor::TTransactionContext& txc) {
using namespace NColumnShard;
NIceDb::TNiceDb db(txc.DB);
const size_t MaxBatchSize = 10000;
auto keysToDelete = KeysToDelete(txc, MaxBatchSize);
if (!keysToDelete) {
return TConclusionStatus::Fail("Not ready");
}
ui32 removeCount = 0;
for (auto&& i : *keysToDelete) {
removeCount += i.size();
}
AFL_NOTICE(NKikimrServices::TX_COLUMNSHARD)("normalizer", "TDeleteTrash")(
"message", TStringBuilder() << "found " << removeCount << " rows to delete grouped in " << keysToDelete->size() << " batches");

std::vector<INormalizerTask::TPtr> result;
for (auto&& batch : *keysToDelete) {
AFL_VERIFY(!batch.empty());
result.emplace_back(std::make_shared<TTrivialNormalizerTask>(std::make_shared<TChanges>(std::move(batch))));
}
return result;
}

std::optional<std::vector<TDeleteTrashImpl::TKeyBatch>> TDeleteTrashImpl::KeysToDelete(
NTabletFlatExecutor::TTransactionContext& txc, const size_t maxBatchSize) {
NIceDb::TNiceDb db(txc.DB);
using namespace NColumnShard;
if (!Schema::Precharge<Schema::IndexColumns>(db, txc.DB.GetScheme())) {
return std::nullopt;
}
const std::set<ui64> columnIdsToDelete = GetColumnIdsToDelete();
std::vector<TKeyBatch> result;
TKeyBatch currentBatch;
auto rowset =
db.Table<Schema::IndexColumns>()
.Select<Schema::IndexColumns::Index, Schema::IndexColumns::Granule, Schema::IndexColumns::ColumnIdx, Schema::IndexColumns::PlanStep,
Schema::IndexColumns::TxId, Schema::IndexColumns::Portion, Schema::IndexColumns::Chunk>();
if (!rowset.IsReady()) {
return std::nullopt;
}
while (!rowset.EndOfSet()) {
if (columnIdsToDelete.contains(rowset.GetValue<Schema::IndexColumns::ColumnIdx>())) {
auto key = TKey{
.Index = rowset.GetValue<Schema::IndexColumns::Index>(),
.Granule = rowset.GetValue<Schema::IndexColumns::Granule>(),
.ColumnIdx = rowset.GetValue<Schema::IndexColumns::ColumnIdx>(),
.PlanStep = rowset.GetValue<Schema::IndexColumns::PlanStep>(),
.TxId = rowset.GetValue<Schema::IndexColumns::TxId>(),
.Portion = rowset.GetValue<Schema::IndexColumns::Portion>(),
.Chunk = rowset.GetValue<Schema::IndexColumns::Chunk>() };
currentBatch.emplace_back(std::move(key));
if (currentBatch.size() == maxBatchSize) {
result.emplace_back(std::move(currentBatch));
currentBatch = TKeyBatch{};
}
}
if (!rowset.Next()) {
return std::nullopt;
}
}
if (!currentBatch.empty()) {
result.emplace_back(std::move(currentBatch));
}

return result;
}

} // namespace NKikimr::NOlap::NNormalizer::NSpecialColumns
94 changes: 94 additions & 0 deletions ydb/core/tx/columnshard/normalizer/portion/special_cleaner.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
#pragma once

#include <ydb/core/tx/columnshard/columnshard_schema.h>
#include <ydb/core/tx/columnshard/normalizer/abstract/abstract.h>

namespace NKikimr::NOlap::NNormalizer::NSpecialColumns {

class TDeleteTrashImpl: public TNormalizationController::INormalizerComponent {
public:
struct TKey {
ui32 Index;
ui64 Granule;
ui32 ColumnIdx;
ui64 PlanStep;
ui64 TxId;
ui64 Portion;
ui32 Chunk;
};

using TKeyBatch = std::vector<TKey>;

private:

std::optional<std::vector<TKeyBatch>> KeysToDelete(NTabletFlatExecutor::TTransactionContext& txc, const size_t maxBatchSize);

virtual std::set<ui64> GetColumnIdsToDelete() const = 0;

public:
TDeleteTrashImpl(const TNormalizationController::TInitContext&) {
}

virtual TConclusion<std::vector<INormalizerTask::TPtr>> DoInit(
const TNormalizationController& controller, NTabletFlatExecutor::TTransactionContext& txc) override;
};

class TRemoveDeleteFlag: public TDeleteTrashImpl {
private:
using TBase = TDeleteTrashImpl;
public:
static TString GetClassNameStatic() {
return "RemoveDeleteFlag";
}

private:
static inline auto Registrator = INormalizerComponent::TFactory::TRegistrator<TRemoveDeleteFlag>(GetClassNameStatic());

virtual std::set<ui64> GetColumnIdsToDelete() const override {
return { NPortion::TSpecialColumns::SPEC_COL_DELETE_FLAG_INDEX };
}

virtual std::optional<ENormalizerSequentialId> DoGetEnumSequentialId() const override {
return {};
}
virtual TString GetClassName() const override {
return GetClassNameStatic();
}

public:
TRemoveDeleteFlag(const TNormalizationController::TInitContext& context)
: TBase(context) {
}
};

class TRemoveWriteId: public TDeleteTrashImpl {
private:
using TBase = TDeleteTrashImpl;

public:
static TString GetClassNameStatic() {
return "RemoveWriteId";
}

private:
static inline auto Registrator = INormalizerComponent::TFactory::TRegistrator<TRemoveWriteId>(GetClassNameStatic());

virtual std::set<ui64> GetColumnIdsToDelete() const override {
return { NPortion::TSpecialColumns::SPEC_COL_WRITE_ID_INDEX };
}

virtual std::optional<ENormalizerSequentialId> DoGetEnumSequentialId() const override {
return {};
}
virtual TString GetClassName() const override {
return GetClassNameStatic();
}

public:
TRemoveWriteId(const TNormalizationController::TInitContext& context)
: TBase(context)
{
}
};

} // namespace NKikimr::NOlap
1 change: 1 addition & 0 deletions ydb/core/tx/columnshard/normalizer/portion/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ SRCS(
GLOBAL clean.cpp
GLOBAL clean_empty.cpp
GLOBAL broken_blobs.cpp
GLOBAL special_cleaner.cpp
)

PEERDIR(
Expand Down

0 comments on commit 25fdf52

Please sign in to comment.