Skip to content

Commit

Permalink
remove schema from table version (#10878)
Browse files Browse the repository at this point in the history
  • Loading branch information
ivanmorozov333 authored Oct 26, 2024
1 parent d969bf6 commit fe19569
Show file tree
Hide file tree
Showing 6 changed files with 159 additions and 140 deletions.
1 change: 0 additions & 1 deletion ydb/core/protos/tx_columnshard.proto
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,6 @@ message TTableVersionInfo {
optional uint64 PathId = 1;
optional uint64 SinceStep = 2;
optional uint64 SinceTxId = 3;
optional NKikimrSchemeOp.TColumnTableSchema Schema = 4;
optional uint32 SchemaPresetId = 5;
optional NKikimrSchemeOp.TColumnDataLifeCycle TtlSettings = 6;
optional uint32 TtlSettingsPresetId = 7;
Expand Down
10 changes: 6 additions & 4 deletions ydb/core/tx/columnshard/columnshard_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -385,6 +385,7 @@ void TColumnShard::RunEnsureTable(const NKikimrTxColumnShard::TCreateTable& tabl

// check schema changed

std::optional<NKikimrSchemeOp::TColumnTableSchema> schema;
if (tableProto.HasSchemaPreset()) {
Y_ABORT_UNLESS(!tableProto.HasSchema(), "Tables has either schema or preset");

Expand All @@ -398,7 +399,7 @@ void TColumnShard::RunEnsureTable(const NKikimrTxColumnShard::TCreateTable& tabl
}
} else {
Y_ABORT_UNLESS(tableProto.HasSchema(), "Tables has either schema or preset");
*tableVerProto.MutableSchema() = tableProto.GetSchema();
schema = tableProto.GetSchema();
}

{
Expand All @@ -421,7 +422,7 @@ void TColumnShard::RunEnsureTable(const NKikimrTxColumnShard::TCreateTable& tabl

tableVerProto.SetSchemaPresetVersionAdj(tableProto.GetSchemaPresetVersionAdj());

TablesManager.AddTableVersion(pathId, version, tableVerProto, db, Tiers);
TablesManager.AddTableVersion(pathId, version, tableVerProto, schema, db, Tiers);
InsertTable->RegisterPathInfo(pathId);

Counters.GetTabletCounters()->SetCounter(COUNTER_TABLES, TablesManager.GetTables().size());
Expand All @@ -442,11 +443,12 @@ void TColumnShard::RunAlterTable(const NKikimrTxColumnShard::TAlterTable& alterP
<< " at tablet " << TabletID());

NKikimrTxColumnShard::TTableVersionInfo tableVerProto;
std::optional<NKikimrSchemeOp::TColumnTableSchema> schema;
if (alterProto.HasSchemaPreset()) {
tableVerProto.SetSchemaPresetId(alterProto.GetSchemaPreset().GetId());
TablesManager.AddSchemaVersion(alterProto.GetSchemaPreset().GetId(), version, alterProto.GetSchemaPreset().GetSchema(), db, Tiers);
} else if (alterProto.HasSchema()) {
*tableVerProto.MutableSchema() = alterProto.GetSchema();
schema = alterProto.GetSchema();
}

const auto& ttlSettings = alterProto.GetTtlSettings(); // Note: Not valid behaviour for full alter implementation
Expand All @@ -459,7 +461,7 @@ void TColumnShard::RunAlterTable(const NKikimrTxColumnShard::TAlterTable& alterP
Schema::SaveTableInfo(db, pathId, tieringUsage);

tableVerProto.SetSchemaPresetVersionAdj(alterProto.GetSchemaPresetVersionAdj());
TablesManager.AddTableVersion(pathId, version, tableVerProto, db, Tiers);
TablesManager.AddTableVersion(pathId, version, tableVerProto, schema, db, Tiers);
}

void TColumnShard::RunDropTable(const NKikimrTxColumnShard::TDropTable& dropProto, const NOlap::TSnapshot& version,
Expand Down
189 changes: 106 additions & 83 deletions ydb/core/tx/columnshard/normalizer/schema_version/version.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,24 +2,47 @@

namespace NKikimr::NOlap {

class TSchemaVersionNormalizer::TNormalizerResult : public INormalizerChanges {
class TSchemaVersionNormalizer::TNormalizerResult: public INormalizerChanges {
private:
class TKey {
private:
std::optional<ui64> Version;

public:
ui64 Step;
ui64 TxId;
ui64 Version;
ui32 Id;

public:
TKey() = default;

TKey(ui32 id, ui64 step, ui64 txId, ui64 version)
: Step(step)
ui64 GetVersion() const {
AFL_VERIFY(Version);
return *Version;
}

TKey(ui32 id, ui64 step, ui64 txId, const std::optional<ui64> version)
: Version(version)
, Step(step)
, TxId(txId)
, Version(version)
, Id(id)
{
, Id(id) {
}

bool operator<(const TKey& item) const {
if (Id == item.Id) {
const bool result = std::tie(Step, TxId) < std::tie(item.Step, item.TxId);
if (Version && item.Version) {
const bool resultVersions = Version < item.Version;
AFL_VERIFY(result == resultVersions);
}
return result;
} else {
return Id < item.Id;
}
}

bool operator==(const TKey& item) const {
return std::tie(Id, Step, TxId, Version) == std::tie(item.Id, item.Step, item.TxId, item.Version);
}
};

Expand All @@ -28,15 +51,12 @@ class TSchemaVersionNormalizer::TNormalizerResult : public INormalizerChanges {
ui64 PathId;
ui64 Step;
ui64 TxId;
ui64 Version;

public:
TTableKey(ui64 pathId, ui64 step, ui64 txId, ui64 version)
TTableKey(ui64 pathId, ui64 step, ui64 txId)
: PathId(pathId)
, Step(step)
, TxId(txId)
, Version(version)
{
, TxId(txId) {
}
};

Expand All @@ -46,19 +66,19 @@ class TSchemaVersionNormalizer::TNormalizerResult : public INormalizerChanges {
public:
TNormalizerResult(std::vector<TKey>&& versions, std::vector<TTableKey>&& tableVersions)
: VersionsToRemove(versions)
, TableVersionsToRemove(tableVersions)
{
, TableVersionsToRemove(tableVersions) {
}

bool ApplyOnExecute(NTabletFlatExecutor::TTransactionContext& txc, const TNormalizationController& /* normController */) const override {
using namespace NColumnShard;
NIceDb::TNiceDb db(txc.DB);
for (auto& key: VersionsToRemove) {
LOG_S_DEBUG("Removing schema version in TSchemaVersionNormalizer " << key.Version);
for (auto& key : VersionsToRemove) {
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "Removing schema version in TSchemaVersionNormalizer")("version", key.GetVersion());
db.Table<Schema::SchemaPresetVersionInfo>().Key(key.Id, key.Step, key.TxId).Delete();
}
for (auto& key: TableVersionsToRemove) {
LOG_S_DEBUG("Removing table version in TSchemaVersionNormalizer " << key.Version << " pathId " << key.PathId);
for (auto& key : TableVersionsToRemove) {
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "Removing table version in TSchemaVersionNormalizer")("pathId", key.PathId)(
"plan_step", key.Step)("tx_id", key.TxId);
db.Table<Schema::TableVersionInfo>().Key(key.PathId, key.Step, key.TxId).Delete();
}
return true;
Expand Down Expand Up @@ -101,97 +121,100 @@ class TSchemaVersionNormalizer::TNormalizerResult : public INormalizerChanges {
}
}

std::vector<TKey> unusedSchemaIds;
std::map<TKey, bool> schemaIdUsability;
std::vector<TTableKey> unusedTableSchemaIds;
std::optional<ui64> maxVersion;
std::vector<INormalizerChanges::TPtr> changes;

{
THashMap<ui32, TKey> maxByPresetId;
auto rowset = db.Table<Schema::SchemaPresetVersionInfo>().Select();
if (rowset.IsReady()) {
while (!rowset.EndOfSet()) {
const ui32 id = rowset.GetValue<Schema::SchemaPresetVersionInfo::Id>();
NKikimrTxColumnShard::TSchemaPresetVersionInfo info;
Y_ABORT_UNLESS(info.ParseFromString(rowset.GetValue<Schema::SchemaPresetVersionInfo::InfoProto>()));
if (info.HasSchema()) {
ui64 version = info.GetSchema().GetVersion();
if (!maxVersion.has_value() || (version > *maxVersion)) {
maxVersion = version;
}
if (!usedSchemaVersions.contains(version)) {
unusedSchemaIds.emplace_back(id, rowset.GetValue<Schema::SchemaPresetVersionInfo::SinceStep>(), rowset.GetValue<Schema::SchemaPresetVersionInfo::SinceTxId>(), version);
}
}

if (!rowset.Next()) {
return std::nullopt;
}
}
} else {
return std::nullopt;
}
}

{
auto rowset = db.Table<Schema::TableVersionInfo>().Select();
if (!rowset.IsReady()) {
return std::nullopt;
}

while (!rowset.EndOfSet()) {
const ui64 pathId = rowset.GetValue<Schema::TableVersionInfo::PathId>();

NKikimrTxColumnShard::TTableVersionInfo versionInfo;
Y_ABORT_UNLESS(versionInfo.ParseFromString(rowset.GetValue<Schema::TableVersionInfo::InfoProto>()));
if (versionInfo.HasSchema()) {
ui64 version = versionInfo.GetSchema().GetVersion();
if (!usedSchemaVersions.contains(version)) {
unusedTableSchemaIds.emplace_back(pathId, rowset.GetValue<Schema::TableVersionInfo::SinceStep>(), rowset.GetValue<Schema::TableVersionInfo::SinceTxId>(), version);
}
const ui32 id = rowset.GetValue<Schema::SchemaPresetVersionInfo::Id>();
NKikimrTxColumnShard::TSchemaPresetVersionInfo info;
Y_ABORT_UNLESS(info.ParseFromString(rowset.GetValue<Schema::SchemaPresetVersionInfo::InfoProto>()));
AFL_VERIFY(info.HasSchema());
ui64 version = info.GetSchema().GetVersion();
TKey presetVersionKey(id, rowset.GetValue<Schema::SchemaPresetVersionInfo::SinceStep>(),
rowset.GetValue<Schema::SchemaPresetVersionInfo::SinceTxId>(), version);
auto it = maxByPresetId.find(id);
if (it == maxByPresetId.end()) {
it = maxByPresetId.emplace(id, presetVersionKey).first;
} else if (it->second < presetVersionKey) {
it->second = presetVersionKey;
}
AFL_VERIFY(schemaIdUsability.emplace(presetVersionKey, usedSchemaVersions.contains(version)).second);

if (!rowset.Next()) {
return std::nullopt;
}
}
}
for (auto&& i : maxByPresetId) {
auto it = schemaIdUsability.find(i.second);
AFL_VERIFY(it != schemaIdUsability.end());
AFL_VERIFY(it->first == i.second);
it->second = true;
}
{
auto rowset = db.Table<Schema::TableVersionInfo>().Select();
if (!rowset.IsReady()) {
return std::nullopt;
}

while (!rowset.EndOfSet()) {
const ui64 pathId = rowset.GetValue<Schema::TableVersionInfo::PathId>();

NKikimrTxColumnShard::TTableVersionInfo versionInfo;
Y_ABORT_UNLESS(versionInfo.ParseFromString(rowset.GetValue<Schema::TableVersionInfo::InfoProto>()));
auto it = schemaIdUsability.find(TKey(versionInfo.GetSchemaPresetId(),
rowset.GetValue<Schema::TableVersionInfo::SinceStep>(), rowset.GetValue<Schema::TableVersionInfo::SinceTxId>(), {}));
AFL_VERIFY(it != schemaIdUsability.end());
if (!it->second) {
unusedTableSchemaIds.emplace_back(pathId, rowset.GetValue<Schema::TableVersionInfo::SinceStep>(),
rowset.GetValue<Schema::TableVersionInfo::SinceTxId>());
}

std::vector<TTableKey> tablePortion;
std::vector<TKey> portion;
tablePortion.reserve(10000);
portion.reserve(10000);
auto addPortion = [&]() {
if (portion.size() + tablePortion.size() >= 10000) {
changes.emplace_back(std::make_shared<TNormalizerResult>(std::move(portion), std::move(tablePortion)));
portion = std::vector<TKey>();
tablePortion = std::vector<TTableKey>();
if (!rowset.Next()) {
return std::nullopt;
}
}
}
};
for (const auto& id: unusedSchemaIds) {
if (!maxVersion.has_value() || (id.Version != *maxVersion)) {
portion.push_back(id);
addPortion();

std::vector<TTableKey> tableVersionToRemove;
std::vector<TKey> presetVersionsToRemove;
auto addNormalizationTask = [&](const ui32 limit) {
if (presetVersionsToRemove.size() + tableVersionToRemove.size() > limit) {
changes.emplace_back(
std::make_shared<TNormalizerResult>(std::move(presetVersionsToRemove), std::move(tableVersionToRemove)));
presetVersionsToRemove = std::vector<TKey>();
tableVersionToRemove = std::vector<TTableKey>();
}
};
for (const auto& id : schemaIdUsability) {
if (!id.second) {
presetVersionsToRemove.push_back(id.first);
addNormalizationTask(10000);
}
}
}

for (const auto& id: unusedTableSchemaIds) {
if (!maxVersion.has_value() || (id.Version != *maxVersion)) {
tablePortion.push_back(id);
addPortion();
for (const auto& id : unusedTableSchemaIds) {
tableVersionToRemove.push_back(id);
addNormalizationTask(10000);
}
}

if (portion.size() + tablePortion.size() > 0) {
changes.emplace_back(std::make_shared<TNormalizerResult>(std::move(portion), std::move(tablePortion)));
addNormalizationTask(0);
return changes;
}
return changes;
}
};

TConclusion<std::vector<INormalizerTask::TPtr>> TSchemaVersionNormalizer::DoInit(const TNormalizationController&, NTabletFlatExecutor::TTransactionContext& txc) {
TConclusion<std::vector<INormalizerTask::TPtr>> TSchemaVersionNormalizer::DoInit(
const TNormalizationController&, NTabletFlatExecutor::TTransactionContext& txc) {
auto changes = TNormalizerResult::Init(txc);
if (!changes) {
return TConclusionStatus::Fail("Not ready");;
return TConclusionStatus::Fail("Not ready");
}
std::vector<INormalizerTask::TPtr> tasks;
for (auto&& c : *changes) {
Expand All @@ -200,4 +223,4 @@ TConclusion<std::vector<INormalizerTask::TPtr>> TSchemaVersionNormalizer::DoInit
return tasks;
}

}
} // namespace NKikimr::NOlap
Loading

0 comments on commit fe19569

Please sign in to comment.