Skip to content

Commit

Permalink
Merge d6c49b8 into bff00ae
Browse files Browse the repository at this point in the history
  • Loading branch information
ivanmorozov333 authored Dec 11, 2024
2 parents bff00ae + d6c49b8 commit c8fcd73
Show file tree
Hide file tree
Showing 8 changed files with 27 additions and 14 deletions.
10 changes: 5 additions & 5 deletions ydb/core/tx/columnshard/engines/db_wrapper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -118,16 +118,16 @@ void TDbWrapper::EraseColumn(const NOlap::TPortionInfo& portion, const TColumnRe
}
}

bool TDbWrapper::LoadColumns(const std::optional<ui64> pathId, const std::function<void(TColumnChunkLoadContextV1&&)>& callback) {
bool TDbWrapper::LoadColumns(const std::optional<ui64> pathId, const std::function<void(TColumnChunkLoadContextV2&&)>& callback) {
NIceDb::TNiceDb db(Database);
using IndexColumnsV1 = NColumnShard::Schema::IndexColumnsV1;
using IndexColumnsV2 = NColumnShard::Schema::IndexColumnsV2;
const auto pred = [&](auto& rowset) {
if (!rowset.IsReady()) {
return false;
}

while (!rowset.EndOfSet()) {
NOlap::TColumnChunkLoadContextV1 chunkLoadContext(rowset);
NOlap::TColumnChunkLoadContextV2 chunkLoadContext(rowset);
callback(std::move(chunkLoadContext));

if (!rowset.Next()) {
Expand All @@ -137,10 +137,10 @@ bool TDbWrapper::LoadColumns(const std::optional<ui64> pathId, const std::functi
return true;
};
if (pathId) {
auto rowset = db.Table<IndexColumnsV1>().Prefix(*pathId).Select();
auto rowset = db.Table<IndexColumnsV2>().Prefix(*pathId).Select();
return pred(rowset);
} else {
auto rowset = db.Table<IndexColumnsV1>().Select();
auto rowset = db.Table<IndexColumnsV2>().Select();
return pred(rowset);
}
}
Expand Down
4 changes: 2 additions & 2 deletions ydb/core/tx/columnshard/engines/db_wrapper.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ class IDbWrapper {

virtual void WriteColumn(const TPortionInfo& portion, const TColumnRecord& row, const ui32 firstPKColumnId) = 0;
virtual void EraseColumn(const TPortionInfo& portion, const TColumnRecord& row) = 0;
virtual bool LoadColumns(const std::optional<ui64> pathId, const std::function<void(TColumnChunkLoadContextV1&&)>& callback) = 0;
virtual bool LoadColumns(const std::optional<ui64> pathId, const std::function<void(TColumnChunkLoadContextV2&&)>& callback) = 0;

virtual void WritePortion(const NOlap::TPortionInfo& portion) = 0;
virtual void ErasePortion(const NOlap::TPortionInfo& portion) = 0;
Expand Down Expand Up @@ -89,7 +89,7 @@ class TDbWrapper : public IDbWrapper {
void WriteColumn(const NOlap::TPortionInfo& portion, const TColumnRecord& row, const ui32 firstPKColumnId) override;
void WriteColumns(const NOlap::TPortionInfo& portion, const NKikimrTxColumnShard::TIndexPortionAccessor& proto) override;
void EraseColumn(const NOlap::TPortionInfo& portion, const TColumnRecord& row) override;
bool LoadColumns(const std::optional<ui64> pathId, const std::function<void(TColumnChunkLoadContextV1&&)>& callback) override;
bool LoadColumns(const std::optional<ui64> pathId, const std::function<void(TColumnChunkLoadContextV2&&)>& callback) override;

virtual void WriteIndex(const TPortionInfo& portion, const TIndexChunk& row) override;
virtual void EraseIndex(const TPortionInfo& portion, const TIndexChunk& row) override;
Expand Down
4 changes: 2 additions & 2 deletions ydb/core/tx/columnshard/engines/storage/granule/stages.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,14 @@ bool TGranuleColumnsReader::DoExecute(NTabletFlatExecutor::TTransactionContext&
TDbWrapper db(txc.DB, &*DsGroupSelector);
TPortionInfo::TSchemaCursor schema(*VersionedIndex);
Context->ClearRecords();
return db.LoadColumns(Self->GetPathId(), [&](TColumnChunkLoadContextV1&& loadContext) {
return db.LoadColumns(Self->GetPathId(), [&](TColumnChunkLoadContextV2&& loadContext) {
Context->Add(std::move(loadContext));
});
}

bool TGranuleColumnsReader::DoPrecharge(NTabletFlatExecutor::TTransactionContext& txc, const TActorContext& /*ctx*/) {
NIceDb::TNiceDb db(txc.DB);
return db.Table<NColumnShard::Schema::IndexColumnsV1>().Prefix(Self->GetPathId()).Select().IsReady();
return db.Table<NColumnShard::Schema::IndexColumnsV2>().Prefix(Self->GetPathId()).Select().IsReady();
}

bool TGranuleIndexesReader::DoExecute(NTabletFlatExecutor::TTransactionContext& txc, const TActorContext& /*ctx*/) {
Expand Down
5 changes: 5 additions & 0 deletions ydb/core/tx/columnshard/engines/storage/granule/stages.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,11 @@ class TPortionsLoadContext {
auto& constructor = MutableConstructor(chunk.GetPortionId());
constructor.MutableRecords().emplace_back(std::move(chunk));
}
void Add(TColumnChunkLoadContextV2&& chunk) {
for (auto&& i : chunk.BuildRecordsV1()) {
Add(std::move(i));
}
}
};

class TGranuleOnlyPortionsReader: public ITxReader {
Expand Down
3 changes: 2 additions & 1 deletion ydb/core/tx/columnshard/normalizer/portion/clean_empty.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -234,13 +234,14 @@ std::optional<std::vector<std::vector<std::shared_ptr<IDBModifier>>>> GetPortion
std::vector<TPortionAddress> pack;
std::map<TPortionAddress, std::vector<TIterator>> iteration;
const bool v0Usage = AppDataVerified().ColumnShardConfig.GetColumnChunksV0Usage();
const bool v1Usage = AppDataVerified().ColumnShardConfig.GetColumnChunksV1Usage();
const ui32 SourcesCount = v0Usage ? 4 : 3;
if (v0Usage) {
if (v0Portions.size()) {
iteration[v0Portions.begin()->first].emplace_back(v0Portions);
}
}
{
if (v1Usage) {
if (v1Portions.size()) {
iteration[v1Portions.begin()->first].emplace_back(v1Portions);
}
Expand Down
8 changes: 5 additions & 3 deletions ydb/core/tx/columnshard/normalizer/portion/normalizer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ TConclusionStatus TPortionsNormalizerBase::InitColumns(
const NColumnShard::TTablesManager& tablesManager, NIceDb::TNiceDb& db, THashMap<ui64, TPortionAccessorConstructor>& portions) {
using namespace NColumnShard;
auto columnsFilter = GetColumnsFilter(tablesManager.GetPrimaryIndexSafe().GetVersionedIndex().GetLastSchema());
auto rowset = db.Table<Schema::IndexColumnsV1>().Select();
auto rowset = db.Table<Schema::IndexColumnsV2>().Select();
if (!rowset.IsReady()) {
return TConclusionStatus::Fail("Not ready");
}
Expand All @@ -126,8 +126,10 @@ TConclusionStatus TPortionsNormalizerBase::InitColumns(
};

while (!rowset.EndOfSet()) {
NOlap::TColumnChunkLoadContextV1 chunkLoadContext(rowset);
initPortion(std::move(chunkLoadContext));
NOlap::TColumnChunkLoadContextV2 chunkLoadContext(rowset);
for (auto&& i : chunkLoadContext.BuildRecordsV1()) {
initPortion(std::move(i));
}

if (!rowset.Next()) {
return TConclusionStatus::Fail("Not ready");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,7 @@ TConclusion<std::vector<INormalizerTask::TPtr>> TNormalizer::DoInit(
}

AFL_VERIFY(AppDataVerified().ColumnShardConfig.GetColumnChunksV0Usage());
AFL_VERIFY(AppDataVerified().ColumnShardConfig.GetColumnChunksV1Usage());
{
std::vector<TPatchItemRemoveV1> package;
for (auto&& [portionId, chunkInfo] : columns1Remove) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,6 @@ TConclusion<std::vector<INormalizerTask::TPtr>> TNormalizer::DoInit(
if (!ready) {
return TConclusionStatus::Fail("Not ready");
}
AFL_VERIFY(AppDataVerified().ColumnShardConfig.GetColumnChunksV1Usage());
THashSet<TPortionAddress> readyPortions;
THashMap<TPortionAddress, TV2BuildTask> buildPortions;
{
Expand Down Expand Up @@ -168,6 +167,11 @@ TConclusion<std::vector<INormalizerTask::TPtr>> TNormalizer::DoInit(
}

std::vector<INormalizerTask::TPtr> tasks;
if (buildPortions.empty()) {
return tasks;
}
AFL_VERIFY(AppDataVerified().ColumnShardConfig.GetColumnChunksV1Usage());

{
std::vector<TV2BuildTask> package;
for (auto&& [portionAddress, portionInfos] : buildPortions) {
Expand Down

0 comments on commit c8fcd73

Please sign in to comment.