Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

clean trash on versions switching #9679

Merged
merged 1 commit into from
Sep 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 102 additions & 0 deletions ydb/core/tx/columnshard/normalizer/portion/special_cleaner.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
#include "special_cleaner.h"

#include <ydb/core/tx/columnshard/columnshard_private_events.h>

namespace NKikimr::NOlap::NNormalizer::NSpecialColumns {

namespace {

class TChanges: public INormalizerChanges {
public:
TChanges(TDeleteTrashImpl::TKeyBatch&& keys)
: Keys(keys) {
}
bool ApplyOnExecute(NTabletFlatExecutor::TTransactionContext& txc, const TNormalizationController& /*normController*/) const override {
using namespace NColumnShard;
NIceDb::TNiceDb db(txc.DB);
for (const auto& k : Keys) {
db.Table<Schema::IndexColumns>().Key(k.Index, k.Granule, k.ColumnIdx, k.PlanStep, k.TxId, k.Portion, k.Chunk).Delete();
}
AFL_NOTICE(NKikimrServices::TX_COLUMNSHARD)("normalizer", "TDeleteTrash")("message", TStringBuilder() << GetSize() << " rows deleted");
return true;
}

ui64 GetSize() const override {
return Keys.size();
}

private:
const TDeleteTrashImpl::TKeyBatch Keys;
};

} //namespace

TConclusion<std::vector<INormalizerTask::TPtr>> TDeleteTrashImpl::DoInit(
const TNormalizationController& /*controller*/, NTabletFlatExecutor::TTransactionContext& txc) {
using namespace NColumnShard;
NIceDb::TNiceDb db(txc.DB);
const size_t MaxBatchSize = 10000;
auto keysToDelete = KeysToDelete(txc, MaxBatchSize);
if (!keysToDelete) {
return TConclusionStatus::Fail("Not ready");
}
ui32 removeCount = 0;
for (auto&& i : *keysToDelete) {
removeCount += i.size();
}
AFL_NOTICE(NKikimrServices::TX_COLUMNSHARD)("normalizer", "TDeleteTrash")(
"message", TStringBuilder() << "found " << removeCount << " rows to delete grouped in " << keysToDelete->size() << " batches");

std::vector<INormalizerTask::TPtr> result;
for (auto&& batch : *keysToDelete) {
AFL_VERIFY(!batch.empty());
result.emplace_back(std::make_shared<TTrivialNormalizerTask>(std::make_shared<TChanges>(std::move(batch))));
}
return result;
}

std::optional<std::vector<TDeleteTrashImpl::TKeyBatch>> TDeleteTrashImpl::KeysToDelete(
NTabletFlatExecutor::TTransactionContext& txc, const size_t maxBatchSize) {
NIceDb::TNiceDb db(txc.DB);
using namespace NColumnShard;
if (!Schema::Precharge<Schema::IndexColumns>(db, txc.DB.GetScheme())) {
return std::nullopt;
}
const std::set<ui64> columnIdsToDelete = GetColumnIdsToDelete();
std::vector<TKeyBatch> result;
TKeyBatch currentBatch;
auto rowset =
db.Table<Schema::IndexColumns>()
.Select<Schema::IndexColumns::Index, Schema::IndexColumns::Granule, Schema::IndexColumns::ColumnIdx, Schema::IndexColumns::PlanStep,
Schema::IndexColumns::TxId, Schema::IndexColumns::Portion, Schema::IndexColumns::Chunk>();
if (!rowset.IsReady()) {
return std::nullopt;
}
while (!rowset.EndOfSet()) {
if (columnIdsToDelete.contains(rowset.GetValue<Schema::IndexColumns::ColumnIdx>())) {
auto key = TKey{
.Index = rowset.GetValue<Schema::IndexColumns::Index>(),
.Granule = rowset.GetValue<Schema::IndexColumns::Granule>(),
.ColumnIdx = rowset.GetValue<Schema::IndexColumns::ColumnIdx>(),
.PlanStep = rowset.GetValue<Schema::IndexColumns::PlanStep>(),
.TxId = rowset.GetValue<Schema::IndexColumns::TxId>(),
.Portion = rowset.GetValue<Schema::IndexColumns::Portion>(),
.Chunk = rowset.GetValue<Schema::IndexColumns::Chunk>() };
currentBatch.emplace_back(std::move(key));
if (currentBatch.size() == maxBatchSize) {
result.emplace_back(std::move(currentBatch));
currentBatch = TKeyBatch{};
}
}
if (!rowset.Next()) {
return std::nullopt;
}
}
if (!currentBatch.empty()) {
result.emplace_back(std::move(currentBatch));
}

return result;
}

} // namespace NKikimr::NOlap::NNormalizer::NSpecialColumns
94 changes: 94 additions & 0 deletions ydb/core/tx/columnshard/normalizer/portion/special_cleaner.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
#pragma once

#include <ydb/core/tx/columnshard/columnshard_schema.h>
#include <ydb/core/tx/columnshard/normalizer/abstract/abstract.h>

namespace NKikimr::NOlap::NNormalizer::NSpecialColumns {

class TDeleteTrashImpl: public TNormalizationController::INormalizerComponent {
public:
struct TKey {
ui32 Index;
ui64 Granule;
ui32 ColumnIdx;
ui64 PlanStep;
ui64 TxId;
ui64 Portion;
ui32 Chunk;
};

using TKeyBatch = std::vector<TKey>;

private:

std::optional<std::vector<TKeyBatch>> KeysToDelete(NTabletFlatExecutor::TTransactionContext& txc, const size_t maxBatchSize);

virtual std::set<ui64> GetColumnIdsToDelete() const = 0;

public:
TDeleteTrashImpl(const TNormalizationController::TInitContext&) {
}

virtual TConclusion<std::vector<INormalizerTask::TPtr>> DoInit(
const TNormalizationController& controller, NTabletFlatExecutor::TTransactionContext& txc) override;
};

class TRemoveDeleteFlag: public TDeleteTrashImpl {
private:
using TBase = TDeleteTrashImpl;
public:
static TString GetClassNameStatic() {
return "RemoveDeleteFlag";
}

private:
static inline auto Registrator = INormalizerComponent::TFactory::TRegistrator<TRemoveDeleteFlag>(GetClassNameStatic());

virtual std::set<ui64> GetColumnIdsToDelete() const override {
return { NPortion::TSpecialColumns::SPEC_COL_DELETE_FLAG_INDEX };
}

virtual std::optional<ENormalizerSequentialId> DoGetEnumSequentialId() const override {
return {};
}
virtual TString GetClassName() const override {
return GetClassNameStatic();
}

public:
TRemoveDeleteFlag(const TNormalizationController::TInitContext& context)
: TBase(context) {
}
};

class TRemoveWriteId: public TDeleteTrashImpl {
private:
using TBase = TDeleteTrashImpl;

public:
static TString GetClassNameStatic() {
return "RemoveWriteId";
}

private:
static inline auto Registrator = INormalizerComponent::TFactory::TRegistrator<TRemoveWriteId>(GetClassNameStatic());

virtual std::set<ui64> GetColumnIdsToDelete() const override {
return { NPortion::TSpecialColumns::SPEC_COL_WRITE_ID_INDEX };
}

virtual std::optional<ENormalizerSequentialId> DoGetEnumSequentialId() const override {
return {};
}
virtual TString GetClassName() const override {
return GetClassNameStatic();
}

public:
TRemoveWriteId(const TNormalizationController::TInitContext& context)
: TBase(context)
{
}
};

} // namespace NKikimr::NOlap
1 change: 1 addition & 0 deletions ydb/core/tx/columnshard/normalizer/portion/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ SRCS(
GLOBAL clean.cpp
GLOBAL clean_empty.cpp
GLOBAL broken_blobs.cpp
GLOBAL special_cleaner.cpp
)

PEERDIR(
Expand Down
Loading