diff --git a/ydb/core/tx/columnshard/blobs_action/transaction/tx_write_index.cpp b/ydb/core/tx/columnshard/blobs_action/transaction/tx_write_index.cpp index 64577e56f358..9eecf96157da 100644 --- a/ydb/core/tx/columnshard/blobs_action/transaction/tx_write_index.cpp +++ b/ydb/core/tx/columnshard/blobs_action/transaction/tx_write_index.cpp @@ -22,10 +22,11 @@ bool TTxWriteIndex::Execute(TTransactionContext& txc, const TActorContext& ctx) NOlap::TDbWrapper dbWrap(txc.DB, &dsGroupSelector); AFL_VERIFY(Self->TablesManager.MutablePrimaryIndex().ApplyChanges(dbWrap, changes, snapshot)); LOG_S_DEBUG(TxPrefix() << "(" << changes->TypeString() << ") apply" << TxSuffix()); - NOlap::TWriteIndexContext context(txc, dbWrap); - changes->WriteIndexOnExecute(*Self, context); + NOlap::TWriteIndexContext context(&txc.DB, dbWrap, Self->MutableIndexAs()); + changes->WriteIndexOnExecute(Self, context); - changes->MutableBlobsAction().OnExecuteTxAfterAction(*Self, *context.BlobManagerDb, true); + NOlap::TBlobManagerDb blobManagerDb(txc.DB); + changes->MutableBlobsAction().OnExecuteTxAfterAction(*Self, blobManagerDb, true); Self->UpdateIndexCounters(); } else { @@ -57,7 +58,7 @@ void TTxWriteIndex::Complete(const TActorContext& ctx) { if (!Ev->Get()->IndexChanges->IsAborted()) { NOlap::TWriteIndexCompleteContext context(ctx, blobsWritten, bytesWritten, Ev->Get()->Duration, TriggerActivity, Self->MutableIndexAs()); - Ev->Get()->IndexChanges->WriteIndexOnComplete(*Self, context); + Ev->Get()->IndexChanges->WriteIndexOnComplete(Self, context); } if (Ev->Get()->GetPutStatus() == NKikimrProto::TRYLATER) { diff --git a/ydb/core/tx/columnshard/engines/changes/abstract/abstract.cpp b/ydb/core/tx/columnshard/engines/changes/abstract/abstract.cpp index 502c0a1c5cd2..0c66ef049e7b 100644 --- a/ydb/core/tx/columnshard/engines/changes/abstract/abstract.cpp +++ b/ydb/core/tx/columnshard/engines/changes/abstract/abstract.cpp @@ -35,7 +35,7 @@ TConclusionStatus TColumnEngineChanges::ConstructBlobs(TConstructionContext& con return result; } -void TColumnEngineChanges::WriteIndexOnExecute(NColumnShard::TColumnShard& self, TWriteIndexContext& context) { +void TColumnEngineChanges::WriteIndexOnExecute(NColumnShard::TColumnShard* self, TWriteIndexContext& context) { Y_ABORT_UNLESS(Stage != EStage::Aborted); Y_ABORT_UNLESS(Stage <= EStage::Written); Y_ABORT_UNLESS(Stage >= EStage::Compiled); @@ -44,13 +44,15 @@ void TColumnEngineChanges::WriteIndexOnExecute(NColumnShard::TColumnShard& self, Stage = EStage::Written; } -void TColumnEngineChanges::WriteIndexOnComplete(NColumnShard::TColumnShard& self, TWriteIndexCompleteContext& context) { - Y_ABORT_UNLESS(Stage == EStage::Written); +void TColumnEngineChanges::WriteIndexOnComplete(NColumnShard::TColumnShard* self, TWriteIndexCompleteContext& context) { + Y_ABORT_UNLESS(Stage == EStage::Written || !self); Stage = EStage::Finished; AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "WriteIndexComplete")("type", TypeString())("success", context.FinishedSuccessfully); DoWriteIndexOnComplete(self, context); - OnFinish(self, context); - self.IncCounter(GetCounterIndex(context.FinishedSuccessfully)); + if (self) { + OnFinish(*self, context); + self->IncCounter(GetCounterIndex(context.FinishedSuccessfully)); + } } @@ -105,10 +107,10 @@ void TColumnEngineChanges::OnFinish(NColumnShard::TColumnShard& self, TChangesFi DoOnFinish(self, context); } -TWriteIndexContext::TWriteIndexContext(NTabletFlatExecutor::TTransactionContext& txc, IDbWrapper& dbWrapper) - : Txc(txc) - , BlobManagerDb(std::make_shared(txc.DB)) +TWriteIndexContext::TWriteIndexContext(NTable::TDatabase* db, IDbWrapper& dbWrapper, TColumnEngineForLogs& engineLogs) + : DB(db) , DBWrapper(dbWrapper) + , EngineLogs(engineLogs) { } diff --git a/ydb/core/tx/columnshard/engines/changes/abstract/abstract.h b/ydb/core/tx/columnshard/engines/changes/abstract/abstract.h index 0cdfd0cd6895..b8b4365a60e5 100644 --- a/ydb/core/tx/columnshard/engines/changes/abstract/abstract.h +++ b/ydb/core/tx/columnshard/engines/changes/abstract/abstract.h @@ -74,10 +74,10 @@ class TFinalizationContext: TNonCopyable { class TWriteIndexContext: TNonCopyable { public: - NTabletFlatExecutor::TTransactionContext& Txc; - std::shared_ptr BlobManagerDb; + NTable::TDatabase* DB; IDbWrapper& DBWrapper; - TWriteIndexContext(NTabletFlatExecutor::TTransactionContext& txc, IDbWrapper& dbWrapper); + TColumnEngineForLogs& EngineLogs; + TWriteIndexContext(NTable::TDatabase* db, IDbWrapper& dbWrapper, TColumnEngineForLogs& engineLogs); }; class TChangesFinishContext { @@ -146,8 +146,8 @@ class TColumnEngineChanges { protected: virtual void DoDebugString(TStringOutput& out) const = 0; virtual void DoCompile(TFinalizationContext& context) = 0; - virtual void DoWriteIndexOnExecute(NColumnShard::TColumnShard& self, TWriteIndexContext& context) = 0; - virtual void DoWriteIndexOnComplete(NColumnShard::TColumnShard& self, TWriteIndexCompleteContext& context) = 0; + virtual void DoWriteIndexOnExecute(NColumnShard::TColumnShard* self, TWriteIndexContext& context) = 0; + virtual void DoWriteIndexOnComplete(NColumnShard::TColumnShard* self, TWriteIndexCompleteContext& context) = 0; virtual void DoOnFinish(NColumnShard::TColumnShard& self, TChangesFinishContext& context) = 0; virtual bool NeedConstruction() const { return true; @@ -217,8 +217,8 @@ class TColumnEngineChanges { virtual TPortionInfoWithBlobs* GetWritePortionInfo(const ui32 index) = 0; virtual bool NeedWritePortion(const ui32 index) const = 0; - void WriteIndexOnExecute(NColumnShard::TColumnShard& self, TWriteIndexContext& context); - void WriteIndexOnComplete(NColumnShard::TColumnShard& self, TWriteIndexCompleteContext& context); + void WriteIndexOnExecute(NColumnShard::TColumnShard* self, TWriteIndexContext& context); + void WriteIndexOnComplete(NColumnShard::TColumnShard* self, TWriteIndexCompleteContext& context); void Compile(TFinalizationContext& context) noexcept; diff --git a/ydb/core/tx/columnshard/engines/changes/cleanup.cpp b/ydb/core/tx/columnshard/engines/changes/cleanup.cpp index a7b25e9198f7..c86be3c567e2 100644 --- a/ydb/core/tx/columnshard/engines/changes/cleanup.cpp +++ b/ydb/core/tx/columnshard/engines/changes/cleanup.cpp @@ -15,31 +15,39 @@ void TCleanupColumnEngineChanges::DoDebugString(TStringOutput& out) const { } } -void TCleanupColumnEngineChanges::DoWriteIndexOnExecute(NColumnShard::TColumnShard& self, TWriteIndexContext& context) { - self.IncCounter(NColumnShard::COUNTER_PORTIONS_ERASED, PortionsToDrop.size()); +void TCleanupColumnEngineChanges::DoWriteIndexOnExecute(NColumnShard::TColumnShard* self, TWriteIndexContext& context) { THashSet pathIds; - for (auto&& p : PortionsToDrop) { - p.RemoveFromDatabase(context.DBWrapper); + if (self) { + for (auto&& p : PortionsToDrop) { + p.RemoveFromDatabase(context.DBWrapper); - auto removing = BlobsAction.GetRemoving(p); - for (auto&& r : p.Records) { - removing->DeclareRemove((TTabletId)self.TabletID(), r.BlobRange.BlobId); + auto removing = BlobsAction.GetRemoving(p); + for (auto&& r : p.Records) { + removing->DeclareRemove((TTabletId)self->TabletID(), r.BlobRange.BlobId); + } + pathIds.emplace(p.GetPathId()); + } + if (context.DB) { + for (auto&& p : pathIds) { + self->TablesManager.TryFinalizeDropPath(*context.DB, p); + } } - pathIds.emplace(p.GetPathId()); - self.IncCounter(NColumnShard::COUNTER_RAW_BYTES_ERASED, p.RawBytesSum()); - } - for (auto&& p: pathIds) { - self.TablesManager.TryFinalizeDropPath(context.Txc, p); } } -void TCleanupColumnEngineChanges::DoWriteIndexOnComplete(NColumnShard::TColumnShard& /*self*/, TWriteIndexCompleteContext& context) { +void TCleanupColumnEngineChanges::DoWriteIndexOnComplete(NColumnShard::TColumnShard* self, TWriteIndexCompleteContext& context) { for (auto& portionInfo : PortionsToDrop) { if (!context.EngineLogs.ErasePortion(portionInfo)) { AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("event", "Cannot erase portion")("portion", portionInfo.DebugString()); } } context.TriggerActivity = NeedRepeat ? NColumnShard::TBackgroundActivity::Cleanup() : NColumnShard::TBackgroundActivity::None(); + if (self) { + self->IncCounter(NColumnShard::COUNTER_PORTIONS_ERASED, PortionsToDrop.size()); + for (auto&& p : PortionsToDrop) { + self->IncCounter(NColumnShard::COUNTER_RAW_BYTES_ERASED, p.RawBytesSum()); + } + } } void TCleanupColumnEngineChanges::DoStart(NColumnShard::TColumnShard& self) { diff --git a/ydb/core/tx/columnshard/engines/changes/cleanup.h b/ydb/core/tx/columnshard/engines/changes/cleanup.h index 82c1fd3c56e3..24f37bee2959 100644 --- a/ydb/core/tx/columnshard/engines/changes/cleanup.h +++ b/ydb/core/tx/columnshard/engines/changes/cleanup.h @@ -9,8 +9,8 @@ class TCleanupColumnEngineChanges: public TColumnEngineChanges { THashMap> BlobsToForget; THashMap>> StoragePortions; protected: - virtual void DoWriteIndexOnComplete(NColumnShard::TColumnShard& self, TWriteIndexCompleteContext& context) override; - virtual void DoWriteIndexOnExecute(NColumnShard::TColumnShard& self, TWriteIndexContext& context) override; + virtual void DoWriteIndexOnComplete(NColumnShard::TColumnShard* self, TWriteIndexCompleteContext& context) override; + virtual void DoWriteIndexOnExecute(NColumnShard::TColumnShard* self, TWriteIndexContext& context) override; virtual void DoStart(NColumnShard::TColumnShard& self) override; virtual void DoOnFinish(NColumnShard::TColumnShard& self, TChangesFinishContext& context) override; diff --git a/ydb/core/tx/columnshard/engines/changes/compaction.cpp b/ydb/core/tx/columnshard/engines/changes/compaction.cpp index 9b33f77a2652..da1cb7f5eb2a 100644 --- a/ydb/core/tx/columnshard/engines/changes/compaction.cpp +++ b/ydb/core/tx/columnshard/engines/changes/compaction.cpp @@ -44,9 +44,11 @@ void TCompactColumnEngineChanges::DoStart(NColumnShard::TColumnShard& self) { GranuleMeta->OnCompactionStarted(); } -void TCompactColumnEngineChanges::DoWriteIndexOnComplete(NColumnShard::TColumnShard& self, TWriteIndexCompleteContext& context) { +void TCompactColumnEngineChanges::DoWriteIndexOnComplete(NColumnShard::TColumnShard* self, TWriteIndexCompleteContext& context) { TBase::DoWriteIndexOnComplete(self, context); - self.IncCounter(NColumnShard::COUNTER_COMPACTION_TIME, context.Duration.MilliSeconds()); + if (self) { + self->IncCounter(NColumnShard::COUNTER_COMPACTION_TIME, context.Duration.MilliSeconds()); + } } void TCompactColumnEngineChanges::DoOnFinish(NColumnShard::TColumnShard& self, TChangesFinishContext& context) { diff --git a/ydb/core/tx/columnshard/engines/changes/compaction.h b/ydb/core/tx/columnshard/engines/changes/compaction.h index 7186c33946dd..333a08af09b0 100644 --- a/ydb/core/tx/columnshard/engines/changes/compaction.h +++ b/ydb/core/tx/columnshard/engines/changes/compaction.h @@ -14,7 +14,7 @@ class TCompactColumnEngineChanges: public TChangesWithAppend { protected: std::shared_ptr GranuleMeta; - virtual void DoWriteIndexOnComplete(NColumnShard::TColumnShard& self, TWriteIndexCompleteContext& context) override; + virtual void DoWriteIndexOnComplete(NColumnShard::TColumnShard* self, TWriteIndexCompleteContext& context) override; virtual void DoStart(NColumnShard::TColumnShard& self) override; virtual void DoOnFinish(NColumnShard::TColumnShard& self, TChangesFinishContext& context) override; diff --git a/ydb/core/tx/columnshard/engines/changes/general_compaction.cpp b/ydb/core/tx/columnshard/engines/changes/general_compaction.cpp index b84f43dba6db..973d07952d58 100644 --- a/ydb/core/tx/columnshard/engines/changes/general_compaction.cpp +++ b/ydb/core/tx/columnshard/engines/changes/general_compaction.cpp @@ -246,11 +246,13 @@ TConclusionStatus TGeneralCompactColumnEngineChanges::DoConstructBlobs(TConstruc return TConclusionStatus::Success(); } -void TGeneralCompactColumnEngineChanges::DoWriteIndexOnComplete(NColumnShard::TColumnShard& self, TWriteIndexCompleteContext& context) { +void TGeneralCompactColumnEngineChanges::DoWriteIndexOnComplete(NColumnShard::TColumnShard* self, TWriteIndexCompleteContext& context) { TBase::DoWriteIndexOnComplete(self, context); - self.IncCounter(context.FinishedSuccessfully ? NColumnShard::COUNTER_SPLIT_COMPACTION_SUCCESS : NColumnShard::COUNTER_SPLIT_COMPACTION_FAIL); - self.IncCounter(NColumnShard::COUNTER_SPLIT_COMPACTION_BLOBS_WRITTEN, context.BlobsWritten); - self.IncCounter(NColumnShard::COUNTER_SPLIT_COMPACTION_BYTES_WRITTEN, context.BytesWritten); + if (self) { + self->IncCounter(context.FinishedSuccessfully ? NColumnShard::COUNTER_SPLIT_COMPACTION_SUCCESS : NColumnShard::COUNTER_SPLIT_COMPACTION_FAIL); + self->IncCounter(NColumnShard::COUNTER_SPLIT_COMPACTION_BLOBS_WRITTEN, context.BlobsWritten); + self->IncCounter(NColumnShard::COUNTER_SPLIT_COMPACTION_BYTES_WRITTEN, context.BytesWritten); + } } void TGeneralCompactColumnEngineChanges::DoStart(NColumnShard::TColumnShard& self) { diff --git a/ydb/core/tx/columnshard/engines/changes/general_compaction.h b/ydb/core/tx/columnshard/engines/changes/general_compaction.h index 3131258770cd..f57ae68f4282 100644 --- a/ydb/core/tx/columnshard/engines/changes/general_compaction.h +++ b/ydb/core/tx/columnshard/engines/changes/general_compaction.h @@ -7,7 +7,7 @@ namespace NKikimr::NOlap::NCompaction { class TGeneralCompactColumnEngineChanges: public TCompactColumnEngineChanges { private: using TBase = TCompactColumnEngineChanges; - virtual void DoWriteIndexOnComplete(NColumnShard::TColumnShard& self, TWriteIndexCompleteContext& context) override; + virtual void DoWriteIndexOnComplete(NColumnShard::TColumnShard* self, TWriteIndexCompleteContext& context) override; std::map CheckPoints; void BuildAppendedPortionsByFullBatches(TConstructionContext& context) noexcept; void BuildAppendedPortionsByChunks(TConstructionContext& context) noexcept; diff --git a/ydb/core/tx/columnshard/engines/changes/indexation.cpp b/ydb/core/tx/columnshard/engines/changes/indexation.cpp index 3eb8a0b40c39..4828ffeb6013 100644 --- a/ydb/core/tx/columnshard/engines/changes/indexation.cpp +++ b/ydb/core/tx/columnshard/engines/changes/indexation.cpp @@ -7,11 +7,13 @@ namespace NKikimr::NOlap { -void TInsertColumnEngineChanges::DoWriteIndexOnExecute(NColumnShard::TColumnShard& self, TWriteIndexContext& context) { +void TInsertColumnEngineChanges::DoWriteIndexOnExecute(NColumnShard::TColumnShard* self, TWriteIndexContext& context) { TBase::DoWriteIndexOnExecute(self, context); - auto removing = BlobsAction.GetRemoving(IStoragesManager::DefaultStorageId); - for (const auto& insertedData : DataToIndex) { - self.InsertTable->EraseCommittedOnExecute(context.DBWrapper, insertedData, removing); + if (self) { + auto removing = BlobsAction.GetRemoving(IStoragesManager::DefaultStorageId); + for (const auto& insertedData : DataToIndex) { + self->InsertTable->EraseCommittedOnExecute(context.DBWrapper, insertedData, removing); + } } } @@ -26,17 +28,19 @@ void TInsertColumnEngineChanges::DoStart(NColumnShard::TColumnShard& self) { self.BackgroundController.StartIndexing(*this); } -void TInsertColumnEngineChanges::DoWriteIndexOnComplete(NColumnShard::TColumnShard& self, TWriteIndexCompleteContext& context) { +void TInsertColumnEngineChanges::DoWriteIndexOnComplete(NColumnShard::TColumnShard* self, TWriteIndexCompleteContext& context) { TBase::DoWriteIndexOnComplete(self, context); - for (const auto& insertedData : DataToIndex) { - self.InsertTable->EraseCommittedOnComplete(insertedData); - } - if (!DataToIndex.empty()) { - self.UpdateInsertTableCounters(); + if (self) { + for (const auto& insertedData : DataToIndex) { + self->InsertTable->EraseCommittedOnComplete(insertedData); + } + if (!DataToIndex.empty()) { + self->UpdateInsertTableCounters(); + } + self->IncCounter(NColumnShard::COUNTER_INDEXING_BLOBS_WRITTEN, context.BlobsWritten); + self->IncCounter(NColumnShard::COUNTER_INDEXING_BYTES_WRITTEN, context.BytesWritten); + self->IncCounter(NColumnShard::COUNTER_INDEXING_TIME, context.Duration.MilliSeconds()); } - self.IncCounter(NColumnShard::COUNTER_INDEXING_BLOBS_WRITTEN, context.BlobsWritten); - self.IncCounter(NColumnShard::COUNTER_INDEXING_BYTES_WRITTEN, context.BytesWritten); - self.IncCounter(NColumnShard::COUNTER_INDEXING_TIME, context.Duration.MilliSeconds()); } void TInsertColumnEngineChanges::DoOnFinish(NColumnShard::TColumnShard& self, TChangesFinishContext& /*context*/) { diff --git a/ydb/core/tx/columnshard/engines/changes/indexation.h b/ydb/core/tx/columnshard/engines/changes/indexation.h index fab0441e421e..619909baf4e0 100644 --- a/ydb/core/tx/columnshard/engines/changes/indexation.h +++ b/ydb/core/tx/columnshard/engines/changes/indexation.h @@ -14,8 +14,8 @@ class TInsertColumnEngineChanges: public TChangesWithAppend { const TIndexInfo& indexInfo, const TInsertedData& inserted) const; std::vector DataToIndex; protected: - virtual void DoWriteIndexOnComplete(NColumnShard::TColumnShard& self, TWriteIndexCompleteContext& context) override; - virtual void DoWriteIndexOnExecute(NColumnShard::TColumnShard& self, TWriteIndexContext& context) override; + virtual void DoWriteIndexOnComplete(NColumnShard::TColumnShard* self, TWriteIndexCompleteContext& context) override; + virtual void DoWriteIndexOnExecute(NColumnShard::TColumnShard* self, TWriteIndexContext& context) override; virtual void DoStart(NColumnShard::TColumnShard& self) override; virtual void DoOnFinish(NColumnShard::TColumnShard& self, TChangesFinishContext& context) override; diff --git a/ydb/core/tx/columnshard/engines/changes/with_appended.cpp b/ydb/core/tx/columnshard/engines/changes/with_appended.cpp index 8f8933f04198..2b8ad23531c9 100644 --- a/ydb/core/tx/columnshard/engines/changes/with_appended.cpp +++ b/ydb/core/tx/columnshard/engines/changes/with_appended.cpp @@ -6,9 +6,9 @@ namespace NKikimr::NOlap { -void TChangesWithAppend::DoWriteIndexOnExecute(NColumnShard::TColumnShard& self, TWriteIndexContext& context) { +void TChangesWithAppend::DoWriteIndexOnExecute(NColumnShard::TColumnShard* self, TWriteIndexContext& context) { { - auto g = self.MutableIndexAs().GranulesStorage->StartPackModification(); + auto g = context.EngineLogs.GranulesStorage->StartPackModification(); THashSet usedPortionIds; for (auto& [_, portionInfo] : PortionsToRemove) { Y_ABORT_UNLESS(!portionInfo.Empty()); @@ -25,56 +25,57 @@ void TChangesWithAppend::DoWriteIndexOnExecute(NColumnShard::TColumnShard& self, } for (auto& [_, portionInfo] : PortionsToRemove) { - self.MutableIndexAs().CleanupPortions[portionInfo.GetRemoveSnapshot()].emplace_back(portionInfo); + context.EngineLogs.CleanupPortions[portionInfo.GetRemoveSnapshot()].emplace_back(portionInfo); } - - for (auto& portionInfo : AppendedPortions) { - switch (portionInfo.GetPortionInfo().GetMeta().Produced) { - case NOlap::TPortionMeta::EProduced::UNSPECIFIED: - Y_ABORT_UNLESS(false); // unexpected - case NOlap::TPortionMeta::EProduced::INSERTED: - self.IncCounter(NColumnShard::COUNTER_INDEXING_PORTIONS_WRITTEN); - break; - case NOlap::TPortionMeta::EProduced::COMPACTED: - self.IncCounter(NColumnShard::COUNTER_COMPACTION_PORTIONS_WRITTEN); - break; - case NOlap::TPortionMeta::EProduced::SPLIT_COMPACTED: - self.IncCounter(NColumnShard::COUNTER_SPLIT_COMPACTION_PORTIONS_WRITTEN); - break; - case NOlap::TPortionMeta::EProduced::EVICTED: - Y_ABORT("Unexpected evicted case"); - break; - case NOlap::TPortionMeta::EProduced::INACTIVE: - Y_ABORT("Unexpected inactive case"); - break; + if (self) { + for (auto& portionInfo : AppendedPortions) { + switch (portionInfo.GetPortionInfo().GetMeta().Produced) { + case NOlap::TPortionMeta::EProduced::UNSPECIFIED: + Y_ABORT_UNLESS(false); // unexpected + case NOlap::TPortionMeta::EProduced::INSERTED: + self->IncCounter(NColumnShard::COUNTER_INDEXING_PORTIONS_WRITTEN); + break; + case NOlap::TPortionMeta::EProduced::COMPACTED: + self->IncCounter(NColumnShard::COUNTER_COMPACTION_PORTIONS_WRITTEN); + break; + case NOlap::TPortionMeta::EProduced::SPLIT_COMPACTED: + self->IncCounter(NColumnShard::COUNTER_SPLIT_COMPACTION_PORTIONS_WRITTEN); + break; + case NOlap::TPortionMeta::EProduced::EVICTED: + Y_ABORT("Unexpected evicted case"); + break; + case NOlap::TPortionMeta::EProduced::INACTIVE: + Y_ABORT("Unexpected inactive case"); + break; + } } - } - self.IncCounter(NColumnShard::COUNTER_PORTIONS_DEACTIVATED, PortionsToRemove.size()); + self->IncCounter(NColumnShard::COUNTER_PORTIONS_DEACTIVATED, PortionsToRemove.size()); - THashSet blobsDeactivated; - for (auto& [_, portionInfo] : PortionsToRemove) { - for (auto& rec : portionInfo.Records) { - blobsDeactivated.insert(rec.BlobRange.BlobId); + THashSet blobsDeactivated; + for (auto& [_, portionInfo] : PortionsToRemove) { + for (auto& rec : portionInfo.Records) { + blobsDeactivated.insert(rec.BlobRange.BlobId); + } + self->IncCounter(NColumnShard::COUNTER_RAW_BYTES_DEACTIVATED, portionInfo.RawBytesSum()); } - self.IncCounter(NColumnShard::COUNTER_RAW_BYTES_DEACTIVATED, portionInfo.RawBytesSum()); - } - self.IncCounter(NColumnShard::COUNTER_BLOBS_DEACTIVATED, blobsDeactivated.size()); - for (auto& blobId : blobsDeactivated) { - self.IncCounter(NColumnShard::COUNTER_BYTES_DEACTIVATED, blobId.BlobSize()); + self->IncCounter(NColumnShard::COUNTER_BLOBS_DEACTIVATED, blobsDeactivated.size()); + for (auto& blobId : blobsDeactivated) { + self->IncCounter(NColumnShard::COUNTER_BYTES_DEACTIVATED, blobId.BlobSize()); + } } } -void TChangesWithAppend::DoWriteIndexOnComplete(NColumnShard::TColumnShard& self, TWriteIndexCompleteContext& /*context*/) { +void TChangesWithAppend::DoWriteIndexOnComplete(NColumnShard::TColumnShard* /*self*/, TWriteIndexCompleteContext& context) { { - auto g = self.MutableIndexAs().GranulesStorage->StartPackModification(); + auto g = context.EngineLogs.GranulesStorage->StartPackModification(); for (auto& [_, portionInfo] : PortionsToRemove) { - const TPortionInfo& oldInfo = self.MutableIndexAs().GetGranuleVerified(portionInfo.GetPathId()).GetPortionVerified(portionInfo.GetPortion()); - self.MutableIndexAs().UpsertPortion(portionInfo, &oldInfo); + const TPortionInfo& oldInfo = context.EngineLogs.GetGranuleVerified(portionInfo.GetPathId()).GetPortionVerified(portionInfo.GetPortion()); + context.EngineLogs.UpsertPortion(portionInfo, &oldInfo); } for (auto& portionInfoWithBlobs : AppendedPortions) { auto& portionInfo = portionInfoWithBlobs.GetPortionInfo(); - self.MutableIndexAs().UpsertPortion(portionInfo); + context.EngineLogs.UpsertPortion(portionInfo); } } } diff --git a/ydb/core/tx/columnshard/engines/changes/with_appended.h b/ydb/core/tx/columnshard/engines/changes/with_appended.h index 0eacd62db969..3f1f835d23bc 100644 --- a/ydb/core/tx/columnshard/engines/changes/with_appended.h +++ b/ydb/core/tx/columnshard/engines/changes/with_appended.h @@ -14,8 +14,8 @@ class TChangesWithAppend: public TColumnEngineChanges { TSplitSettings SplitSettings; TSaverContext SaverContext; virtual void DoCompile(TFinalizationContext& context) override; - virtual void DoWriteIndexOnExecute(NColumnShard::TColumnShard& self, TWriteIndexContext& context) override; - virtual void DoWriteIndexOnComplete(NColumnShard::TColumnShard& self, TWriteIndexCompleteContext& context) override; + virtual void DoWriteIndexOnExecute(NColumnShard::TColumnShard* self, TWriteIndexContext& context) override; + virtual void DoWriteIndexOnComplete(NColumnShard::TColumnShard* self, TWriteIndexCompleteContext& context) override; virtual void DoStart(NColumnShard::TColumnShard& self) override; std::vector MakeAppendedPortions(const std::shared_ptr batch, const ui64 granule, const TSnapshot& snapshot, const TGranuleMeta* granuleMeta, TConstructionContext& context) const; diff --git a/ydb/core/tx/columnshard/engines/ut/ut_logs_engine.cpp b/ydb/core/tx/columnshard/engines/ut/ut_logs_engine.cpp index e93a0a57e3af..c493c8aeb0cf 100644 --- a/ydb/core/tx/columnshard/engines/ut/ut_logs_engine.cpp +++ b/ydb/core/tx/columnshard/engines/ut/ut_logs_engine.cpp @@ -10,6 +10,8 @@ #include #include #include +#include +#include namespace NKikimr { @@ -293,6 +295,12 @@ bool Insert(TColumnEngineForLogs& engine, TTestDbWrapper& db, TSnapshot snap, AddIdsToBlobs(changes->AppendedPortions, blobs, step); const bool result = engine.ApplyChanges(db, changes, snap); + + NOlap::TWriteIndexContext contextExecute(nullptr, db, engine); + changes->WriteIndexOnExecute(nullptr, contextExecute); + NColumnShard::TBackgroundActivity triggered; + NOlap::TWriteIndexCompleteContext contextComplete(NActors::TActivationContext::AsActorContext(), 0, 0, TDuration::Zero(), triggered, engine); + changes->WriteIndexOnComplete(nullptr, contextComplete); changes->AbortEmergency(); return result; } @@ -319,6 +327,11 @@ bool Compact(TColumnEngineForLogs& engine, TTestDbWrapper& db, TSnapshot snap, T // UNIT_ASSERT_VALUES_EQUAL(changes->GetTmpGranuleIds().size(), expected.NewGranules); const bool result = engine.ApplyChanges(db, changes, snap); + NOlap::TWriteIndexContext contextExecute(nullptr, db, engine); + changes->WriteIndexOnExecute(nullptr, contextExecute); + NColumnShard::TBackgroundActivity triggered; + NOlap::TWriteIndexCompleteContext contextComplete(NActors::TActivationContext::AsActorContext(), 0, 0, TDuration::Zero(), triggered, engine); + changes->WriteIndexOnComplete(nullptr, contextComplete); if (blobsPool) { for (auto&& i : changes->AppendedPortions) { for (auto&& r : i.GetPortionInfo().Records) { @@ -342,6 +355,11 @@ bool Cleanup(TColumnEngineForLogs& engine, TTestDbWrapper& db, TSnapshot snap, u changes->StartEmergency(); const bool result = engine.ApplyChanges(db, changes, snap); + NOlap::TWriteIndexContext contextExecute(nullptr, db, engine); + changes->WriteIndexOnExecute(nullptr, contextExecute); + NColumnShard::TBackgroundActivity triggered; + NOlap::TWriteIndexCompleteContext contextComplete(NActors::TActivationContext::AsActorContext(), 0, 0, TDuration::Zero(), triggered, engine); + changes->WriteIndexOnComplete(nullptr, contextComplete); changes->AbortEmergency(); return result; } @@ -355,6 +373,11 @@ bool Ttl(TColumnEngineForLogs& engine, TTestDbWrapper& db, changes->StartEmergency(); const bool result = engine.ApplyChanges(db, changes, TSnapshot(1,1)); + NOlap::TWriteIndexContext contextExecute(nullptr, db, engine); + changes->WriteIndexOnExecute(nullptr, contextExecute); + NColumnShard::TBackgroundActivity triggered; + NOlap::TWriteIndexCompleteContext contextComplete(NActors::TActivationContext::AsActorContext(), 0, 0, TDuration::Zero(), triggered, engine); + changes->WriteIndexOnComplete(nullptr, contextComplete); changes->AbortEmergency(); return result; } diff --git a/ydb/core/tx/columnshard/hooks/testing/controller.cpp b/ydb/core/tx/columnshard/hooks/testing/controller.cpp index b557e4c430e6..216cb4485eab 100644 --- a/ydb/core/tx/columnshard/hooks/testing/controller.cpp +++ b/ydb/core/tx/columnshard/hooks/testing/controller.cpp @@ -17,8 +17,8 @@ bool TController::DoOnAfterFilterAssembling(const std::shared_ptr g(Mutex); + Indexations.Inc(); if (SharingIds.empty()) { CheckInvariants(); } @@ -26,6 +26,7 @@ bool TController::DoOnWriteIndexComplete(const NOlap::TColumnEngineChanges& /*ch } bool TController::DoOnStartCompaction(std::shared_ptr& changes) { + TGuard g(Mutex); if (auto compaction = dynamic_pointer_cast(changes)) { Compactions.Inc(); } @@ -33,10 +34,10 @@ bool TController::DoOnStartCompaction(std::shared_ptr g(Mutex); for (auto d = action.GetBlobsToRemove().GetDirect().GetIterator(); d.IsValid(); ++d) { AFL_VERIFY(RemovedBlobIds[action.GetStorageId()][d.GetBlobId()].emplace(d.GetTabletId()).second); } -// TGuard g(Mutex); // if (SharingIds.empty()) { // CheckInvariants(); // } diff --git a/ydb/core/tx/columnshard/tables_manager.cpp b/ydb/core/tx/columnshard/tables_manager.cpp index 4f628419226f..87c9aa7c87c6 100644 --- a/ydb/core/tx/columnshard/tables_manager.cpp +++ b/ydb/core/tx/columnshard/tables_manager.cpp @@ -331,7 +331,7 @@ TTablesManager::TTablesManager(const std::shared_ptr& s { } -bool TTablesManager::TryFinalizeDropPath(NTabletFlatExecutor::TTransactionContext& txc, const ui64 pathId) { +bool TTablesManager::TryFinalizeDropPath(NTable::TDatabase& dbTable, const ui64 pathId) { auto itDrop = PathsToDrop.find(pathId); if (itDrop == PathsToDrop.end()) { return false; @@ -340,7 +340,7 @@ bool TTablesManager::TryFinalizeDropPath(NTabletFlatExecutor::TTransactionContex return false; } PathsToDrop.erase(itDrop); - NIceDb::TNiceDb db(txc.DB); + NIceDb::TNiceDb db(dbTable); NColumnShard::Schema::EraseTableInfo(db, pathId); const auto& table = Tables.find(pathId); Y_ABORT_UNLESS(table != Tables.end(), "No schema for path %lu", pathId); diff --git a/ydb/core/tx/columnshard/tables_manager.h b/ydb/core/tx/columnshard/tables_manager.h index 28c84bc3209b..8349ccb9cd3b 100644 --- a/ydb/core/tx/columnshard/tables_manager.h +++ b/ydb/core/tx/columnshard/tables_manager.h @@ -140,7 +140,7 @@ class TTablesManager { public: TTablesManager(const std::shared_ptr& storagesManager, const ui64 tabletId); - bool TryFinalizeDropPath(NTabletFlatExecutor::TTransactionContext& txc, const ui64 pathId); + bool TryFinalizeDropPath(NTable::TDatabase& dbTable, const ui64 pathId); const TTtl& GetTtl() const { return Ttl;