Skip to content

Commit

Permalink
Implemented schema versions normalizer (#9627)
Browse files Browse the repository at this point in the history
  • Loading branch information
aavdonkin authored Sep 26, 2024
1 parent 850de76 commit 7d208c7
Show file tree
Hide file tree
Showing 5 changed files with 232 additions and 1 deletion.
139 changes: 139 additions & 0 deletions ydb/core/tx/columnshard/normalizer/schema_version/version.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
#include "version.h"

namespace NKikimr::NOlap {

class TSchemaVersionNormalizer::TNormalizerResult : public INormalizerChanges {
private:
class TKey {
public:
ui64 Step;
ui64 TxId;
ui64 Version;
ui32 Id;

public:
TKey() = default;

TKey(ui32 id, ui64 step, ui64 txId, ui64 version)
: Step(step)
, TxId(txId)
, Version(version)
, Id(id)
{
}
};

std::vector<TKey> VersionsToRemove;

public:
TNormalizerResult(std::vector<TKey>&& versions)
: VersionsToRemove(versions)
{
}

bool ApplyOnExecute(NTabletFlatExecutor::TTransactionContext& txc, const TNormalizationController& /* normController */) const override {
using namespace NColumnShard;
NIceDb::TNiceDb db(txc.DB);
for (auto& key: VersionsToRemove) {
db.Table<Schema::SchemaPresetVersionInfo>().Key(key.Id, key.Step, key.TxId).Delete();
}
return true;
}

ui64 GetSize() const override {
return VersionsToRemove.size();
}

static std::optional<std::vector<INormalizerChanges::TPtr>> Init(NTabletFlatExecutor::TTransactionContext& txc) {
using namespace NColumnShard;
THashSet<ui64> usedSchemaVersions;
NIceDb::TNiceDb db(txc.DB);
{
auto rowset = db.Table<Schema::IndexPortions>().Select();
if (rowset.IsReady()) {
while (!rowset.EndOfSet()) {
usedSchemaVersions.insert(rowset.GetValue<Schema::IndexPortions::SchemaVersion>());
if (!rowset.Next()) {
return std::nullopt;
}
}
} else {
return std::nullopt;
}
}
{
auto rowset = db.Table<Schema::InsertTable>().Select();
if (rowset.IsReady()) {
while (!rowset.EndOfSet()) {
if (rowset.HaveValue<Schema::InsertTable::SchemaVersion>()) {
usedSchemaVersions.insert(rowset.GetValue<Schema::InsertTable::SchemaVersion>());
if (!rowset.Next()) {
return std::nullopt;
}
}
}
} else {
return std::nullopt;
}
}

std::vector<TKey> unusedSchemaIds;
std::optional<ui64> maxVersion;
std::vector<INormalizerChanges::TPtr> changes;

{
auto rowset = db.Table<Schema::SchemaPresetVersionInfo>().Select();
if (rowset.IsReady()) {
while (!rowset.EndOfSet()) {
const ui32 id = rowset.GetValue<Schema::SchemaPresetVersionInfo::Id>();
NKikimrTxColumnShard::TSchemaPresetVersionInfo info;
Y_ABORT_UNLESS(info.ParseFromString(rowset.GetValue<Schema::SchemaPresetVersionInfo::InfoProto>()));
if (info.HasSchema()) {
ui64 version = info.GetSchema().GetVersion();
if (!maxVersion.has_value() || (version > *maxVersion)) {
maxVersion = version;
}
if (!usedSchemaVersions.contains(version)) {
unusedSchemaIds.emplace_back(id, rowset.GetValue<Schema::SchemaPresetVersionInfo::SinceStep>(), rowset.GetValue<Schema::SchemaPresetVersionInfo::SinceTxId>(), version);
}
}

if (!rowset.Next()) {
return std::nullopt;
}
}
} else {
return std::nullopt;
}
}

std::vector<TKey> portion;
portion.reserve(10000);
for (const auto& id: unusedSchemaIds) {
if (!maxVersion.has_value() || (id.Version != *maxVersion)) {
portion.push_back(id);
if (portion.size() >= 10000) {
changes.emplace_back(std::make_shared<TNormalizerResult>(std::move(portion)));
}
}
}
if (portion.size() > 0) {
changes.emplace_back(std::make_shared<TNormalizerResult>(std::move(portion)));
}
return changes;
}
};

TConclusion<std::vector<INormalizerTask::TPtr>> TSchemaVersionNormalizer::DoInit(const TNormalizationController&, NTabletFlatExecutor::TTransactionContext& txc) {
auto changes = TNormalizerResult::Init(txc);
if (!changes) {
return TConclusionStatus::Fail("Not ready");;
}
std::vector<INormalizerTask::TPtr> tasks;
for (auto&& c : *changes) {
tasks.emplace_back(std::make_shared<TTrivialNormalizerTask>(c));
}
return tasks;
}

}
42 changes: 42 additions & 0 deletions ydb/core/tx/columnshard/normalizer/schema_version/version.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
#pragma once

#include <ydb/core/tx/columnshard/normalizer/abstract/abstract.h>
#include <ydb/core/tx/columnshard/columnshard_schema.h>
#include <ydb/core/tx/columnshard/defs.h>


namespace NKikimr::NColumnShard {
class TTablesManager;
}

namespace NKikimr::NOlap {

class TSchemaVersionNormalizer : public TNormalizationController::INormalizerComponent {
public:
static TString GetClassNameStatic() {
return "SchemaVersionCleaner";
}

private:
static inline TFactory::TRegistrator<TSchemaVersionNormalizer> Registrator = TFactory::TRegistrator<TSchemaVersionNormalizer>(
GetClassNameStatic());
public:
class TNormalizerResult;
class TTask;

public:
virtual std::optional<ENormalizerSequentialId> DoGetEnumSequentialId() const override {
return std::nullopt;
}

virtual TString GetClassName() const override {
return GetClassNameStatic();
}

TSchemaVersionNormalizer(const TNormalizationController::TInitContext&) {
}

virtual TConclusion<std::vector<INormalizerTask::TPtr>> DoInit(const TNormalizationController& controller, NTabletFlatExecutor::TTransactionContext& txc) override;
};

}
11 changes: 11 additions & 0 deletions ydb/core/tx/columnshard/normalizer/schema_version/ya.make
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
LIBRARY()

SRCS(
GLOBAL version.cpp
)

PEERDIR(
ydb/core/tx/columnshard/normalizer/abstract
)

END()
1 change: 1 addition & 0 deletions ydb/core/tx/columnshard/normalizer/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ PEERDIR(
ydb/core/tx/columnshard/normalizer/tables
ydb/core/tx/columnshard/normalizer/portion
ydb/core/tx/columnshard/normalizer/insert_table
ydb/core/tx/columnshard/normalizer/schema_version
)

END()
40 changes: 39 additions & 1 deletion ydb/core/tx/columnshard/ut_rw/ut_normalizer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,36 @@ class TColumnChunksCleaner: public NYDBTest::ILocalDBModifier {
}
};

class TPortionsCleaner: public NYDBTest::ILocalDBModifier {
class TSchemaVersionsCleaner : public NYDBTest::ILocalDBModifier {
public:
virtual void Apply(NTabletFlatExecutor::TTransactionContext& txc) const override {
using namespace NColumnShard;
NIceDb::TNiceDb db(txc.DB);
auto rowset = db.Table<Schema::IndexPortions>().Select();
UNIT_ASSERT(rowset.IsReady());

ui64 minVersion = (ui64)-1;
while (!rowset.EndOfSet()) {
auto version = rowset.GetValue<Schema::IndexPortions::SchemaVersion>();
if (version < minVersion) {
minVersion = version;
}
UNIT_ASSERT(rowset.Next());
}

// Add invalid widow schema, if SchemaVersionCleaner will not erase it, then test will fail
TString serialized;
NKikimrTxColumnShard::TSchemaPresetVersionInfo info;
info.MutableSchema()->SetVersion(minVersion - 1);
Y_ABORT_UNLESS(info.SerializeToString(&serialized));
db.Table<Schema::SchemaPresetVersionInfo>().Key(11, 1, 1).Update(NIceDb::TUpdate<Schema::SchemaPresetVersionInfo::InfoProto>(serialized));

db.Table<Schema::SchemaPresetInfo>().Key(10).Update(NIceDb::TUpdate<Schema::SchemaPresetInfo::Name>("default"));

}
};

class TPortionsCleaner : public NYDBTest::ILocalDBModifier {
public:
virtual void Apply(NTabletFlatExecutor::TTransactionContext& txc) const override {
using namespace NColumnShard;
Expand Down Expand Up @@ -259,6 +288,10 @@ Y_UNIT_TEST_SUITE(Normalizers) {
TTestBasicRuntime runtime;
TTester::Setup(runtime);

auto* repair = runtime.GetAppData().ColumnShardConfig.MutableRepairs()->Add();
repair->SetClassName("SchemaVersionCleaner");
repair->SetDescription("Removing unused schema versions");

const ui64 tableId = 1;
const std::vector<NArrow::NTest::TTestColumn> schema = { NArrow::NTest::TTestColumn("key1", TTypeInfo(NTypeIds::Uint64)),
NArrow::NTest::TTestColumn("key2", TTypeInfo(NTypeIds::Uint64)), NArrow::NTest::TTestColumn("field", TTypeInfo(NTypeIds::Utf8)) };
Expand Down Expand Up @@ -308,10 +341,15 @@ Y_UNIT_TEST_SUITE(Normalizers) {
TestNormalizerImpl<TPortionsCleaner>();
}

Y_UNIT_TEST(SchemaVersionsNormalizer) {
TestNormalizerImpl<TSchemaVersionsCleaner>();
}

Y_UNIT_TEST(CleanEmptyPortionsNormalizer) {
TestNormalizerImpl<TEmptyPortionsCleaner>();
}


Y_UNIT_TEST(EmptyTablesNormalizer) {
class TLocalNormalizerChecker: public TNormalizerChecker {
public:
Expand Down

0 comments on commit 7d208c7

Please sign in to comment.