diff --git a/ydb/core/tx/columnshard/columnshard_schema.h b/ydb/core/tx/columnshard/columnshard_schema.h index 2a507e45c6fc..646a4409d737 100644 --- a/ydb/core/tx/columnshard/columnshard_schema.h +++ b/ydb/core/tx/columnshard/columnshard_schema.h @@ -1070,6 +1070,12 @@ class TColumnChunkLoadContextV2 { MetadataProto = rowset.template GetValue(); } + TColumnChunkLoadContextV2(const ui64 pathId, const ui64 portionId, const NKikimrTxColumnShard::TIndexPortionAccessor& proto) + : PathId(pathId) + , PortionId(portionId) + , MetadataProto(proto.SerializeAsString()) { + } + std::vector BuildRecordsV1() const { std::vector records; NKikimrTxColumnShard::TIndexPortionAccessor metaProto; diff --git a/ydb/core/tx/columnshard/engines/db_wrapper.cpp b/ydb/core/tx/columnshard/engines/db_wrapper.cpp index dba06b061120..60544f24190f 100644 --- a/ydb/core/tx/columnshard/engines/db_wrapper.cpp +++ b/ydb/core/tx/columnshard/engines/db_wrapper.cpp @@ -46,10 +46,12 @@ bool TDbWrapper::Load(TInsertTableAccessor& insertTable, const TInstant& loadTim } void TDbWrapper::WriteColumn(const NOlap::TPortionInfo& portion, const TColumnRecord& row, const ui32 firstPKColumnId) { + if (!AppDataVerified().ColumnShardConfig.GetColumnChunksV1Usage() && !AppDataVerified().ColumnShardConfig.GetColumnChunksV0Usage()) { + return; + } NIceDb::TNiceDb db(Database); using IndexColumnsV1 = NColumnShard::Schema::IndexColumnsV1; auto rowProto = row.GetMeta().SerializeToProto(); - AFL_VERIFY(AppDataVerified().ColumnShardConfig.GetColumnChunksV1Usage() || AppDataVerified().ColumnShardConfig.GetColumnChunksV0Usage()); if (AppDataVerified().ColumnShardConfig.GetColumnChunksV1Usage()) { db.Table() .Key(portion.GetPathId(), portion.GetPortionId(), row.ColumnId, row.Chunk) @@ -118,16 +120,16 @@ void TDbWrapper::EraseColumn(const NOlap::TPortionInfo& portion, const TColumnRe } } -bool TDbWrapper::LoadColumns(const std::optional pathId, const std::function& callback) { +bool TDbWrapper::LoadColumns(const std::optional pathId, const std::function& callback) { NIceDb::TNiceDb db(Database); - using IndexColumnsV1 = NColumnShard::Schema::IndexColumnsV1; + using IndexColumnsV2 = NColumnShard::Schema::IndexColumnsV2; const auto pred = [&](auto& rowset) { if (!rowset.IsReady()) { return false; } while (!rowset.EndOfSet()) { - NOlap::TColumnChunkLoadContextV1 chunkLoadContext(rowset); + NOlap::TColumnChunkLoadContextV2 chunkLoadContext(rowset); callback(std::move(chunkLoadContext)); if (!rowset.Next()) { @@ -137,10 +139,10 @@ bool TDbWrapper::LoadColumns(const std::optional pathId, const std::functi return true; }; if (pathId) { - auto rowset = db.Table().Prefix(*pathId).Select(); + auto rowset = db.Table().Prefix(*pathId).Select(); return pred(rowset); } else { - auto rowset = db.Table().Select(); + auto rowset = db.Table().Select(); return pred(rowset); } } @@ -290,7 +292,6 @@ TConclusion>> TD void TDbWrapper::WriteColumns(const NOlap::TPortionInfo& portion, const NKikimrTxColumnShard::TIndexPortionAccessor& proto) { NIceDb::TNiceDb db(Database); using IndexColumnsV2 = NColumnShard::Schema::IndexColumnsV2; - AFL_VERIFY(AppDataVerified().ColumnShardConfig.GetColumnChunksV1Usage()); db.Table() .Key(portion.GetPathId(), portion.GetPortionId()) .Update(NIceDb::TUpdate(proto.SerializeAsString())); diff --git a/ydb/core/tx/columnshard/engines/db_wrapper.h b/ydb/core/tx/columnshard/engines/db_wrapper.h index d8a3a00dbcec..dacf770e78ea 100644 --- a/ydb/core/tx/columnshard/engines/db_wrapper.h +++ b/ydb/core/tx/columnshard/engines/db_wrapper.h @@ -14,7 +14,7 @@ class TDatabase; namespace NKikimr::NOlap { -class TColumnChunkLoadContextV1; +class TColumnChunkLoadContextV2; class TIndexChunkLoadContext; class TInsertedData; class TCommittedData; @@ -49,7 +49,7 @@ class IDbWrapper { virtual void WriteColumn(const TPortionInfo& portion, const TColumnRecord& row, const ui32 firstPKColumnId) = 0; virtual void EraseColumn(const TPortionInfo& portion, const TColumnRecord& row) = 0; - virtual bool LoadColumns(const std::optional pathId, const std::function& callback) = 0; + virtual bool LoadColumns(const std::optional pathId, const std::function& callback) = 0; virtual void WritePortion(const NOlap::TPortionInfo& portion) = 0; virtual void ErasePortion(const NOlap::TPortionInfo& portion) = 0; @@ -89,7 +89,7 @@ class TDbWrapper : public IDbWrapper { void WriteColumn(const NOlap::TPortionInfo& portion, const TColumnRecord& row, const ui32 firstPKColumnId) override; void WriteColumns(const NOlap::TPortionInfo& portion, const NKikimrTxColumnShard::TIndexPortionAccessor& proto) override; void EraseColumn(const NOlap::TPortionInfo& portion, const TColumnRecord& row) override; - bool LoadColumns(const std::optional pathId, const std::function& callback) override; + bool LoadColumns(const std::optional pathId, const std::function& callback) override; virtual void WriteIndex(const TPortionInfo& portion, const TIndexChunk& row) override; virtual void EraseIndex(const TPortionInfo& portion, const TIndexChunk& row) override; diff --git a/ydb/core/tx/columnshard/engines/storage/granule/granule.cpp b/ydb/core/tx/columnshard/engines/storage/granule/granule.cpp index a81477f06ba1..32fa3ec70859 100644 --- a/ydb/core/tx/columnshard/engines/storage/granule/granule.cpp +++ b/ydb/core/tx/columnshard/engines/storage/granule/granule.cpp @@ -243,9 +243,11 @@ bool TGranuleMeta::TestingLoad(IDbWrapper& db, const TVersionedIndex& versionedI { TPortionInfo::TSchemaCursor schema(versionedIndex); - if (!db.LoadColumns(PathId, [&](TColumnChunkLoadContextV1&& loadContext) { + if (!db.LoadColumns(PathId, [&](TColumnChunkLoadContextV2&& loadContext) { auto* constructor = constructors.GetConstructorVerified(loadContext.GetPortionId()); - constructor->LoadRecord(std::move(loadContext)); + for (auto&& i : loadContext.BuildRecordsV1()) { + constructor->LoadRecord(std::move(i)); + } })) { return false; } diff --git a/ydb/core/tx/columnshard/engines/storage/granule/stages.cpp b/ydb/core/tx/columnshard/engines/storage/granule/stages.cpp index e099ce624385..84c7ffd6d0c7 100644 --- a/ydb/core/tx/columnshard/engines/storage/granule/stages.cpp +++ b/ydb/core/tx/columnshard/engines/storage/granule/stages.cpp @@ -31,14 +31,14 @@ bool TGranuleColumnsReader::DoExecute(NTabletFlatExecutor::TTransactionContext& TDbWrapper db(txc.DB, &*DsGroupSelector); TPortionInfo::TSchemaCursor schema(*VersionedIndex); Context->ClearRecords(); - return db.LoadColumns(Self->GetPathId(), [&](TColumnChunkLoadContextV1&& loadContext) { + return db.LoadColumns(Self->GetPathId(), [&](TColumnChunkLoadContextV2&& loadContext) { Context->Add(std::move(loadContext)); }); } bool TGranuleColumnsReader::DoPrecharge(NTabletFlatExecutor::TTransactionContext& txc, const TActorContext& /*ctx*/) { NIceDb::TNiceDb db(txc.DB); - return db.Table().Prefix(Self->GetPathId()).Select().IsReady(); + return db.Table().Prefix(Self->GetPathId()).Select().IsReady(); } bool TGranuleIndexesReader::DoExecute(NTabletFlatExecutor::TTransactionContext& txc, const TActorContext& /*ctx*/) { diff --git a/ydb/core/tx/columnshard/engines/storage/granule/stages.h b/ydb/core/tx/columnshard/engines/storage/granule/stages.h index aabf763e0ed1..2dcac36dc4b0 100644 --- a/ydb/core/tx/columnshard/engines/storage/granule/stages.h +++ b/ydb/core/tx/columnshard/engines/storage/granule/stages.h @@ -58,6 +58,11 @@ class TPortionsLoadContext { auto& constructor = MutableConstructor(chunk.GetPortionId()); constructor.MutableRecords().emplace_back(std::move(chunk)); } + void Add(TColumnChunkLoadContextV2&& chunk) { + for (auto&& i : chunk.BuildRecordsV1()) { + Add(std::move(i)); + } + } }; class TGranuleOnlyPortionsReader: public ITxReader { diff --git a/ydb/core/tx/columnshard/engines/ut/ut_insert_table.cpp b/ydb/core/tx/columnshard/engines/ut/ut_insert_table.cpp index 5b34a9b636ee..718e416edf29 100644 --- a/ydb/core/tx/columnshard/engines/ut/ut_insert_table.cpp +++ b/ydb/core/tx/columnshard/engines/ut/ut_insert_table.cpp @@ -58,7 +58,7 @@ class TTestInsertTableDB : public IDbWrapper { } void EraseColumn(const TPortionInfo&, const TColumnRecord&) override { } - bool LoadColumns(const std::optional /*reqPathId*/, const std::function&) override { + bool LoadColumns(const std::optional /*reqPathId*/, const std::function&) override { return true; } diff --git a/ydb/core/tx/columnshard/engines/ut/ut_logs_engine.cpp b/ydb/core/tx/columnshard/engines/ut/ut_logs_engine.cpp index 475c51a93fa5..84bde8d4f699 100644 --- a/ydb/core/tx/columnshard/engines/ut/ut_logs_engine.cpp +++ b/ydb/core/tx/columnshard/engines/ut/ut_logs_engine.cpp @@ -35,11 +35,16 @@ std::shared_ptr EmptyDataLocksManager = std::make_shared> LoadContexts; + std::map LoadContexts; public: - virtual void WriteColumns(const NOlap::TPortionInfo& /*portion*/, const NKikimrTxColumnShard::TIndexPortionAccessor& /*proto*/) override { - + virtual void WriteColumns(const NOlap::TPortionInfo& portion, const NKikimrTxColumnShard::TIndexPortionAccessor& proto) override { + auto it = LoadContexts.find(portion.GetAddress()); + if (it == LoadContexts.end()) { + LoadContexts.emplace(portion.GetAddress(), TColumnChunkLoadContextV2(portion.GetPathId(), portion.GetPortionId(), proto)); + } else { + it->second = TColumnChunkLoadContextV2(portion.GetPathId(), portion.GetPortionId(), proto); + } } virtual const IBlobGroupSelector* GetDsGroupSelector() const override { @@ -124,11 +129,6 @@ class TTestDbWrapper: public IDbWrapper { } auto& data = Indices[0].Columns[portion.GetPathId()]; - NOlap::TColumnChunkLoadContextV1 loadContext(portion.GetPathId(), portion.GetPortionId(), row.GetAddress(), row.BlobRange, rowProto); - auto itInsertInfo = LoadContexts[portion.GetAddress()].emplace(row.GetAddress(), loadContext); - if (!itInsertInfo.second) { - itInsertInfo.first->second = loadContext; - } auto it = data.find(portion.GetPortionId()); if (it == data.end()) { it = data.emplace(portion.GetPortionId(), TPortionInfoConstructor(portion, true, true)).first; @@ -173,7 +173,7 @@ class TTestDbWrapper: public IDbWrapper { portionLocal.TestMutableRecords().swap(filtered); } - bool LoadColumns(const std::optional reqPathId, const std::function& callback) override { + bool LoadColumns(const std::optional reqPathId, const std::function& callback) override { auto& columns = Indices[0].Columns; for (auto& [pathId, portions] : columns) { if (pathId && *reqPathId != pathId) { @@ -182,14 +182,10 @@ class TTestDbWrapper: public IDbWrapper { for (auto& [portionId, portionLocal] : portions) { auto copy = portionLocal.MakeCopy(); copy.TestMutableRecords().clear(); - for (const auto& rec : portionLocal.GetRecords()) { - auto address = copy.GetPortionConstructor().GetAddress(); - auto itContextLoader = LoadContexts[address].find(rec.GetAddress()); - Y_ABORT_UNLESS(itContextLoader != LoadContexts[address].end()); - auto copy = itContextLoader->second; - callback(std::move(copy)); - LoadContexts[address].erase(itContextLoader); - } + auto it = LoadContexts.find(portionLocal.GetPortionConstructor().GetAddress()); + AFL_VERIFY(it != LoadContexts.end()); + callback(std::move(it->second)); + LoadContexts.erase(it); } } return true; diff --git a/ydb/core/tx/columnshard/normalizer/portion/chunks_actualization.cpp b/ydb/core/tx/columnshard/normalizer/portion/chunks_actualization.cpp index 2f37c83bc33d..b66b199a797f 100644 --- a/ydb/core/tx/columnshard/normalizer/portion/chunks_actualization.cpp +++ b/ydb/core/tx/columnshard/normalizer/portion/chunks_actualization.cpp @@ -83,6 +83,10 @@ TConclusion> TNormalizer::DoInit( using namespace NColumnShard; NIceDb::TNiceDb db(txc.DB); + if (!AppDataVerified().ColumnShardConfig.GetColumnChunksV0Usage()) { + return std::vector(); + } + bool ready = true; ready = ready & Schema::Precharge(db, txc.DB.GetScheme()); ready = ready & Schema::Precharge(db, txc.DB.GetScheme()); diff --git a/ydb/core/tx/columnshard/normalizer/portion/clean_empty.cpp b/ydb/core/tx/columnshard/normalizer/portion/clean_empty.cpp index 9f98b179cdd7..ec0be99fd8dc 100644 --- a/ydb/core/tx/columnshard/normalizer/portion/clean_empty.cpp +++ b/ydb/core/tx/columnshard/normalizer/portion/clean_empty.cpp @@ -234,13 +234,16 @@ std::optional>>> GetPortion std::vector pack; std::map> iteration; const bool v0Usage = AppDataVerified().ColumnShardConfig.GetColumnChunksV0Usage(); - const ui32 SourcesCount = v0Usage ? 4 : 3; + const bool v1Usage = AppDataVerified().ColumnShardConfig.GetColumnChunksV1Usage(); + ui32 SourcesCount = 2; if (v0Usage) { + ++SourcesCount; if (v0Portions.size()) { iteration[v0Portions.begin()->first].emplace_back(v0Portions); } } - { + if (v1Usage) { + ++SourcesCount; if (v1Portions.size()) { iteration[v1Portions.begin()->first].emplace_back(v1Portions); } @@ -312,7 +315,6 @@ class TChanges: public INormalizerChanges { TConclusion> TCleanEmptyPortionsNormalizer::DoInit( const TNormalizationController&, NTabletFlatExecutor::TTransactionContext& txc) { using namespace NColumnShard; - AFL_VERIFY(AppDataVerified().ColumnShardConfig.GetColumnChunksV1Usage()); auto batchesToDelete = GetPortionsToDelete(txc, DsGroupSelector); if (!batchesToDelete) { return TConclusionStatus::Fail("Not ready"); diff --git a/ydb/core/tx/columnshard/normalizer/portion/leaked_blobs.cpp b/ydb/core/tx/columnshard/normalizer/portion/leaked_blobs.cpp index d8e0d5f48428..811cae0957ae 100644 --- a/ydb/core/tx/columnshard/normalizer/portion/leaked_blobs.cpp +++ b/ydb/core/tx/columnshard/normalizer/portion/leaked_blobs.cpp @@ -221,9 +221,11 @@ TConclusionStatus TLeakedBlobsNormalizer::LoadPortionBlobIds( } if (Records.empty()) { THashMap> recordsLocal; - if (!wrapper.LoadColumns(std::nullopt, [&](TColumnChunkLoadContextV1&& chunk) { + if (!wrapper.LoadColumns(std::nullopt, [&](TColumnChunkLoadContextV2&& chunk) { const ui64 portionId = chunk.GetPortionId(); - recordsLocal[portionId].emplace_back(std::move(chunk)); + for (auto&& i : chunk.BuildRecordsV1()) { + recordsLocal[portionId].emplace_back(std::move(i)); + } })) { return TConclusionStatus::Fail("repeated read db"); } diff --git a/ydb/core/tx/columnshard/normalizer/portion/normalizer.cpp b/ydb/core/tx/columnshard/normalizer/portion/normalizer.cpp index 2606903d86b2..bf4a6f5e89e1 100644 --- a/ydb/core/tx/columnshard/normalizer/portion/normalizer.cpp +++ b/ydb/core/tx/columnshard/normalizer/portion/normalizer.cpp @@ -110,7 +110,7 @@ TConclusionStatus TPortionsNormalizerBase::InitColumns( const NColumnShard::TTablesManager& tablesManager, NIceDb::TNiceDb& db, THashMap& portions) { using namespace NColumnShard; auto columnsFilter = GetColumnsFilter(tablesManager.GetPrimaryIndexSafe().GetVersionedIndex().GetLastSchema()); - auto rowset = db.Table().Select(); + auto rowset = db.Table().Select(); if (!rowset.IsReady()) { return TConclusionStatus::Fail("Not ready"); } @@ -126,8 +126,10 @@ TConclusionStatus TPortionsNormalizerBase::InitColumns( }; while (!rowset.EndOfSet()) { - NOlap::TColumnChunkLoadContextV1 chunkLoadContext(rowset); - initPortion(std::move(chunkLoadContext)); + NOlap::TColumnChunkLoadContextV2 chunkLoadContext(rowset); + for (auto&& i : chunkLoadContext.BuildRecordsV1()) { + initPortion(std::move(i)); + } if (!rowset.Next()) { return TConclusionStatus::Fail("Not ready"); diff --git a/ydb/core/tx/columnshard/normalizer/portion/restore_v1_chunks.cpp b/ydb/core/tx/columnshard/normalizer/portion/restore_v1_chunks.cpp index 400a70068508..b7f33ea7f7e4 100644 --- a/ydb/core/tx/columnshard/normalizer/portion/restore_v1_chunks.cpp +++ b/ydb/core/tx/columnshard/normalizer/portion/restore_v1_chunks.cpp @@ -212,6 +212,7 @@ TConclusion> TNormalizer::DoInit( } AFL_VERIFY(AppDataVerified().ColumnShardConfig.GetColumnChunksV0Usage()); + AFL_VERIFY(AppDataVerified().ColumnShardConfig.GetColumnChunksV1Usage()); { std::vector package; for (auto&& [portionId, chunkInfo] : columns1Remove) { diff --git a/ydb/core/tx/columnshard/normalizer/portion/restore_v2_chunks.cpp b/ydb/core/tx/columnshard/normalizer/portion/restore_v2_chunks.cpp index ffaeb5e4e5af..434dcceefba0 100644 --- a/ydb/core/tx/columnshard/normalizer/portion/restore_v2_chunks.cpp +++ b/ydb/core/tx/columnshard/normalizer/portion/restore_v2_chunks.cpp @@ -127,7 +127,6 @@ TConclusion> TNormalizer::DoInit( if (!ready) { return TConclusionStatus::Fail("Not ready"); } - AFL_VERIFY(AppDataVerified().ColumnShardConfig.GetColumnChunksV1Usage()); THashSet readyPortions; THashMap buildPortions; { @@ -168,6 +167,11 @@ TConclusion> TNormalizer::DoInit( } std::vector tasks; + if (buildPortions.empty()) { + return tasks; + } + AFL_VERIFY(AppDataVerified().ColumnShardConfig.GetColumnChunksV1Usage()); + { std::vector package; for (auto&& [portionAddress, portionInfos] : buildPortions) {