diff --git a/ydb/core/tx/columnshard/columnshard__statistics.cpp b/ydb/core/tx/columnshard/columnshard__statistics.cpp index 81ff6304f44e..928570b22d22 100644 --- a/ydb/core/tx/columnshard/columnshard__statistics.cpp +++ b/ydb/core/tx/columnshard/columnshard__statistics.cpp @@ -179,7 +179,7 @@ class TColumnPortionsAccumulator { return; } Result->AddWaitingTask(); - std::shared_ptr request = std::make_shared(); + std::shared_ptr request = std::make_shared("STATISTICS_FLUSH"); for (auto&& i : Portions) { request->AddPortion(i); } @@ -227,7 +227,7 @@ void TColumnShard::Handle(NStat::TEvStatistics::TEvStatisticsRequest::TPtr& ev, columnTagsRequested = std::set(allColumnIds.begin(), allColumnIds.end()); } - NOlap::TDataAccessorsRequest request; + NOlap::TDataAccessorsRequest request("STATISTICS"); std::shared_ptr resultAccumulator = std::make_shared(columnTagsRequested, ev->Sender, ev->Cookie, std::move(response)); auto versionedIndex = std::make_shared(index.GetVersionedIndex()); diff --git a/ydb/core/tx/columnshard/columnshard_impl.cpp b/ydb/core/tx/columnshard/columnshard_impl.cpp index b893094b86d2..9ab856a69516 100644 --- a/ydb/core/tx/columnshard/columnshard_impl.cpp +++ b/ydb/core/tx/columnshard/columnshard_impl.cpp @@ -1405,12 +1405,15 @@ class TTxAskPortionChunks: public TTransactionBase { std::shared_ptr FetchCallback; THashMap> PortionsByPath; std::vector FetchedAccessors; + const TString Consumer; public: TTxAskPortionChunks(TColumnShard* self, const std::shared_ptr& fetchCallback, - THashMap&& portions) + THashMap&& portions, const TString& consumer) : TBase(self) - , FetchCallback(fetchCallback) { + , FetchCallback(fetchCallback) + , Consumer(consumer) + { for (auto&& i : portions) { PortionsByPath[i.second->GetPathId()].emplace_back(i.second); } @@ -1421,8 +1424,9 @@ class TTxAskPortionChunks: public TTransactionBase { TBlobGroupSelector selector(Self->Info()); bool reask = false; + NActors::TLogContextGuard lGuard = NActors::TLogContextBuilder::Build()("consumer", Consumer)("event", "TTxAskPortionChunks::Execute"); for (auto&& i : PortionsByPath) { - AFL_INFO(NKikimrServices::TX_COLUMNSHARD)("event", "TTxAskPortionChunks::Execute")("size", i.second.size())("path_id", i.first); + AFL_INFO(NKikimrServices::TX_COLUMNSHARD)("size", i.second.size())("path_id", i.first); for (auto&& p : i.second) { if (!p->GetSchema(Self->GetIndexAs().GetVersionedIndex())->GetIndexesCount()) { continue; @@ -1440,7 +1444,8 @@ class TTxAskPortionChunks: public TTransactionBase { } for (auto&& i : PortionsByPath) { - AFL_INFO(NKikimrServices::TX_COLUMNSHARD)("event", "TTxAskPortionChunks::Execute")("stage", "processing")("size", i.second.size())("path_id", i.first); + AFL_INFO(NKikimrServices::TX_COLUMNSHARD)("stage", "processing")("size", i.second.size())( + "path_id", i.first); while (i.second.size()) { auto p = i.second.back(); TPortionConstructorV2 constructor(p); @@ -1470,11 +1475,11 @@ class TTxAskPortionChunks: public TTransactionBase { FetchedAccessors.emplace_back(std::move(constructor)); i.second.pop_back(); } - AFL_INFO(NKikimrServices::TX_COLUMNSHARD)("event", "TTxAskPortionChunks::Execute")("stage", "finished")("size", i.second.size())( + AFL_INFO(NKikimrServices::TX_COLUMNSHARD)("stage", "finished")("size", i.second.size())( "path_id", i.first); } - AFL_INFO(NKikimrServices::TX_COLUMNSHARD)("event", "TTxAskPortionChunks::Execute")("stage", "finished"); + AFL_INFO(NKikimrServices::TX_COLUMNSHARD)("stage", "finished"); NConveyor::TInsertServiceOperator::AsyncTaskToExecute(std::make_shared(FetchCallback, std::move(FetchedAccessors))); return true; } @@ -1486,7 +1491,7 @@ class TTxAskPortionChunks: public TTransactionBase { }; void TColumnShard::Handle(NOlap::NDataAccessorControl::TEvAskTabletDataAccessors::TPtr& ev, const TActorContext& /*ctx*/) { - Execute(new TTxAskPortionChunks(this, ev->Get()->GetCallback(), std::move(ev->Get()->MutablePortions()))); + Execute(new TTxAskPortionChunks(this, ev->Get()->GetCallback(), std::move(ev->Get()->MutablePortions()), ev->Get()->GetConsumer())); } void TColumnShard::Handle(NOlap::NDataSharing::NEvents::TEvAckFinishFromInitiator::TPtr& ev, const TActorContext& ctx) { diff --git a/ydb/core/tx/columnshard/data_accessor/abstract/collector.cpp b/ydb/core/tx/columnshard/data_accessor/abstract/collector.cpp index 3c1b4d579fa7..97400ff1865e 100644 --- a/ydb/core/tx/columnshard/data_accessor/abstract/collector.cpp +++ b/ydb/core/tx/columnshard/data_accessor/abstract/collector.cpp @@ -6,9 +6,9 @@ namespace NKikimr::NOlap::NDataAccessorControl { THashMap IGranuleDataAccessor::AskData( - const std::vector& portions, const std::shared_ptr& callback) { + const std::vector& portions, const std::shared_ptr& callback, const TString& consumer) { AFL_VERIFY(portions.size()); - return DoAskData(portions, callback); + return DoAskData(portions, callback, consumer); } void TActorAccessorsCallback::OnAccessorsFetched(std::vector&& accessors) { diff --git a/ydb/core/tx/columnshard/data_accessor/abstract/collector.h b/ydb/core/tx/columnshard/data_accessor/abstract/collector.h index 650778fa25e3..f34f867e7a0d 100644 --- a/ydb/core/tx/columnshard/data_accessor/abstract/collector.h +++ b/ydb/core/tx/columnshard/data_accessor/abstract/collector.h @@ -25,7 +25,7 @@ class IGranuleDataAccessor { const ui64 PathId; virtual THashMap DoAskData( - const std::vector& portions, const std::shared_ptr& callback) = 0; + const std::vector& portions, const std::shared_ptr& callback, const TString& consumer) = 0; virtual void DoModifyPortions(const std::vector& add, const std::vector& remove) = 0; public: @@ -39,7 +39,8 @@ class IGranuleDataAccessor { : PathId(pathId) { } - THashMap AskData(const std::vector& portions, const std::shared_ptr& callback); + THashMap AskData( + const std::vector& portions, const std::shared_ptr& callback, const TString& consumer); void ModifyPortions(const std::vector& add, const std::vector& remove) { return DoModifyPortions(add, remove); } diff --git a/ydb/core/tx/columnshard/data_accessor/events.h b/ydb/core/tx/columnshard/data_accessor/events.h index 0d841ab42ddc..5f9c48ee9332 100644 --- a/ydb/core/tx/columnshard/data_accessor/events.h +++ b/ydb/core/tx/columnshard/data_accessor/events.h @@ -80,12 +80,14 @@ class TEvAskTabletDataAccessors: public NActors::TEventLocal; YDB_ACCESSOR_DEF(TPortions, Portions); YDB_READONLY_DEF(std::shared_ptr, Callback); + YDB_READONLY_DEF(TString, Consumer); public: - explicit TEvAskTabletDataAccessors( - const THashMap& portions, const std::shared_ptr& callback) + explicit TEvAskTabletDataAccessors(const THashMap& portions, + const std::shared_ptr& callback, const TString& consumer) : Portions(portions) - , Callback(callback) { + , Callback(callback) + , Consumer(consumer) { } }; diff --git a/ydb/core/tx/columnshard/data_accessor/in_mem/collector.cpp b/ydb/core/tx/columnshard/data_accessor/in_mem/collector.cpp index 69f0ce4b6aa9..42a5558dc17a 100644 --- a/ydb/core/tx/columnshard/data_accessor/in_mem/collector.cpp +++ b/ydb/core/tx/columnshard/data_accessor/in_mem/collector.cpp @@ -3,7 +3,7 @@ namespace NKikimr::NOlap::NDataAccessorControl::NInMem { THashMap TCollector::DoAskData( - const std::vector& portions, const std::shared_ptr& /*callback*/) { + const std::vector& portions, const std::shared_ptr& /*callback*/, const TString& /*consumer*/) { THashMap accessors; for (auto&& i : portions) { auto it = Accessors.find(i->GetPortionId()); diff --git a/ydb/core/tx/columnshard/data_accessor/in_mem/collector.h b/ydb/core/tx/columnshard/data_accessor/in_mem/collector.h index 8cdf6bfa9efd..ead6b25ac23e 100644 --- a/ydb/core/tx/columnshard/data_accessor/in_mem/collector.h +++ b/ydb/core/tx/columnshard/data_accessor/in_mem/collector.h @@ -7,7 +7,7 @@ class TCollector: public IGranuleDataAccessor { using TBase = IGranuleDataAccessor; THashMap Accessors; virtual THashMap DoAskData( - const std::vector& portions, const std::shared_ptr& callback) override; + const std::vector& portions, const std::shared_ptr& callback, const TString& consumer) override; virtual void DoModifyPortions(const std::vector& add, const std::vector& remove) override; diff --git a/ydb/core/tx/columnshard/data_accessor/local_db/collector.cpp b/ydb/core/tx/columnshard/data_accessor/local_db/collector.cpp index 31917ea3c628..08be63308d6a 100644 --- a/ydb/core/tx/columnshard/data_accessor/local_db/collector.cpp +++ b/ydb/core/tx/columnshard/data_accessor/local_db/collector.cpp @@ -4,7 +4,7 @@ namespace NKikimr::NOlap::NDataAccessorControl::NLocalDB { THashMap TCollector::DoAskData( - const std::vector& portions, const std::shared_ptr& callback) { + const std::vector& portions, const std::shared_ptr& callback, const TString& consumer) { THashMap accessors; THashMap portionsToDirectAsk; for (auto&& p : portions) { @@ -17,7 +17,7 @@ THashMap TCollector::DoAskData( } if (portionsToDirectAsk.size()) { NActors::TActivationContext::Send( - TabletActorId, std::make_unique(portionsToDirectAsk, callback)); + TabletActorId, std::make_unique(portionsToDirectAsk, callback, consumer)); } return accessors; } diff --git a/ydb/core/tx/columnshard/data_accessor/local_db/collector.h b/ydb/core/tx/columnshard/data_accessor/local_db/collector.h index 40517f9e1dc8..d52ca722ff49 100644 --- a/ydb/core/tx/columnshard/data_accessor/local_db/collector.h +++ b/ydb/core/tx/columnshard/data_accessor/local_db/collector.h @@ -17,8 +17,8 @@ class TCollector: public IGranuleDataAccessor { TLRUCache AccessorsCache; using TBase = IGranuleDataAccessor; - virtual THashMap DoAskData( - const std::vector& portions, const std::shared_ptr& callback) override; + virtual THashMap DoAskData(const std::vector& portions, + const std::shared_ptr& callback, const TString& consumer) override; virtual void DoModifyPortions(const std::vector& add, const std::vector& remove) override; public: diff --git a/ydb/core/tx/columnshard/data_accessor/manager.h b/ydb/core/tx/columnshard/data_accessor/manager.h index b538406925c1..83e9155cea03 100644 --- a/ydb/core/tx/columnshard/data_accessor/manager.h +++ b/ydb/core/tx/columnshard/data_accessor/manager.h @@ -115,7 +115,7 @@ class TLocalManager: public IDataAccessorsManager { if (portionsAsk.empty()) { continue; } - auto accessors = it->second->AskData(portionsAsk, AccessorCallback); + auto accessors = it->second->AskData(portionsAsk, AccessorCallback, request->GetConsumer()); for (auto&& p : portionsAsk) { auto itAccessor = accessors.find(p->GetPortionId()); if (itAccessor == accessors.end()) { diff --git a/ydb/core/tx/columnshard/data_accessor/request.h b/ydb/core/tx/columnshard/data_accessor/request.h index cf7012bbfc88..31ab85cd6578 100644 --- a/ydb/core/tx/columnshard/data_accessor/request.h +++ b/ydb/core/tx/columnshard/data_accessor/request.h @@ -169,6 +169,7 @@ class TDataAccessorsRequest: public NColumnShard::TMonitoringObjectsCounter PortionIds; THashMap PathIdStatus; THashSet PathIds; @@ -208,7 +209,11 @@ class TDataAccessorsRequest: public NColumnShard::TMonitoringObjectsCountersecond.OnError(errorMessage); diff --git a/ydb/core/tx/columnshard/data_locks/locks/composite.h b/ydb/core/tx/columnshard/data_locks/locks/composite.h index 548ba22f73f8..819239bb5d38 100644 --- a/ydb/core/tx/columnshard/data_locks/locks/composite.h +++ b/ydb/core/tx/columnshard/data_locks/locks/composite.h @@ -35,6 +35,20 @@ class TCompositeLock: public ILock { return Locks.empty(); } public: + static std::shared_ptr Build(const TString& lockName, const std::initializer_list>& locks) { + std::vector> locksUseful; + for (auto&& i : locks) { + if (i && !i->IsEmpty()) { + locksUseful.emplace_back(i); + } + } + if (locksUseful.size() == 1) { + return locksUseful.front(); + } else { + return std::make_shared(lockName, locksUseful); + } + } + TCompositeLock(const TString& lockName, const std::vector>& locks, const ELockCategory category = NDataLocks::ELockCategory::Any, const bool readOnly = false) : TBase(lockName, category, readOnly) diff --git a/ydb/core/tx/columnshard/engines/changes/abstract/abstract.h b/ydb/core/tx/columnshard/engines/changes/abstract/abstract.h index 812d05caa4ae..c8712f2cd983 100644 --- a/ydb/core/tx/columnshard/engines/changes/abstract/abstract.h +++ b/ydb/core/tx/columnshard/engines/changes/abstract/abstract.h @@ -242,7 +242,7 @@ class TColumnEngineChanges { return DoBuildDataLock(); } - std::shared_ptr PortionsToAccess = std::make_shared(); + std::shared_ptr PortionsToAccess = std::make_shared(TaskIdentifier); virtual void OnDataAccessorsInitialized(const TDataAccessorsInitializationContext& context) = 0; public: diff --git a/ydb/core/tx/columnshard/engines/changes/cleanup_portions.cpp b/ydb/core/tx/columnshard/engines/changes/cleanup_portions.cpp index 6f5edbe87ef3..f094c0af7f06 100644 --- a/ydb/core/tx/columnshard/engines/changes/cleanup_portions.cpp +++ b/ydb/core/tx/columnshard/engines/changes/cleanup_portions.cpp @@ -22,6 +22,19 @@ void TCleanupPortionsColumnEngineChanges::DoWriteIndexOnExecute(NColumnShard::TC if (!self) { return; } + THashSet usedPortionIds; + auto schemaPtr = context.EngineLogs.GetVersionedIndex().GetLastSchema(); + for (auto&& i : PortionsToRemove) { + Y_ABORT_UNLESS(!i->HasRemoveSnapshot()); + AFL_VERIFY(usedPortionIds.emplace(i->GetPortionId()).second)("portion_info", i->DebugString(true)); + const auto pred = [&](TPortionInfo& portionCopy) { + portionCopy.SetRemoveSnapshot(context.Snapshot); + }; + context.EngineLogs.GetGranuleVerified(i->GetPathId()) + .ModifyPortionOnExecute( + context.DBWrapper, GetPortionDataAccessor(i->GetPortionId()), pred, schemaPtr->GetIndexInfo().GetPKFirstColumnId()); + } + THashMap> blobIdsByStorage; for (auto&& [_, p] : FetchedDataAccessors->GetPortions()) { p.RemoveFromDatabase(context.DBWrapper); @@ -37,12 +50,32 @@ void TCleanupPortionsColumnEngineChanges::DoWriteIndexOnExecute(NColumnShard::TC } void TCleanupPortionsColumnEngineChanges::DoWriteIndexOnComplete(NColumnShard::TColumnShard* self, TWriteIndexCompleteContext& context) { + { + auto g = context.EngineLogs.GranulesStorage->GetStats()->StartPackModification(); + for (auto&& i : PortionsToRemove) { + Y_ABORT_UNLESS(!i->HasRemoveSnapshot()); + const auto pred = [&](const std::shared_ptr& portion) { + portion->SetRemoveSnapshot(context.Snapshot); + }; + context.EngineLogs.ModifyPortionOnComplete(i, pred); + context.EngineLogs.AddCleanupPortion(i); + } + } for (auto& portionInfo : PortionsToDrop) { if (!context.EngineLogs.ErasePortion(*portionInfo)) { AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("event", "Cannot erase portion")("portion", portionInfo->DebugString()); } } if (self) { + self->Counters.GetTabletCounters()->IncCounter(NColumnShard::COUNTER_PORTIONS_DEACTIVATED, PortionsToRemove.size()); + for (auto& portionInfo : PortionsToRemove) { + self->Counters.GetTabletCounters()->IncCounter(NColumnShard::COUNTER_BLOBS_DEACTIVATED, portionInfo->GetBlobIdsCount()); + for (auto& blobId : portionInfo->GetBlobIds()) { + self->Counters.GetTabletCounters()->IncCounter(NColumnShard::COUNTER_BYTES_DEACTIVATED, blobId.BlobSize()); + } + self->Counters.GetTabletCounters()->IncCounter(NColumnShard::COUNTER_RAW_BYTES_DEACTIVATED, portionInfo->GetTotalRawBytes()); + } + self->Counters.GetTabletCounters()->IncCounter(NColumnShard::COUNTER_PORTIONS_ERASED, PortionsToDrop.size()); for (auto&& p : PortionsToDrop) { self->Counters.GetTabletCounters()->OnDropPortionEvent(p->GetTotalRawBytes(), p->GetTotalBlobBytes(), p->GetRecordsCount()); diff --git a/ydb/core/tx/columnshard/engines/changes/cleanup_portions.h b/ydb/core/tx/columnshard/engines/changes/cleanup_portions.h index 9e36a64c4407..4133c979cde4 100644 --- a/ydb/core/tx/columnshard/engines/changes/cleanup_portions.h +++ b/ydb/core/tx/columnshard/engines/changes/cleanup_portions.h @@ -9,6 +9,7 @@ class TCleanupPortionsColumnEngineChanges: public TColumnEngineChanges, using TBase = TColumnEngineChanges; THashMap>> StoragePortions; std::vector PortionsToDrop; + std::vector PortionsToRemove; THashSet TablesToDrop; protected: @@ -37,16 +38,13 @@ class TCleanupPortionsColumnEngineChanges: public TColumnEngineChanges, return NDataLocks::ELockCategory::Cleanup; } virtual std::shared_ptr DoBuildDataLock() const override { - auto portionsLock = std::make_shared( - TypeString() + "::PORTIONS::" + GetTaskIdentifier(), PortionsToDrop, NDataLocks::ELockCategory::Cleanup); - if (TablesToDrop.size()) { - auto tablesLock = std::make_shared( - TypeString() + "::TABLES::" + GetTaskIdentifier(), TablesToDrop, NDataLocks::ELockCategory::Tables); - return std::shared_ptr( - new NDataLocks::TCompositeLock(TypeString() + "::COMPOSITE::" + GetTaskIdentifier(), { portionsLock, tablesLock })); - } else { - return portionsLock; - } + auto portionsDropLock = std::make_shared( + TypeString() + "::PORTIONS_DROP::" + GetTaskIdentifier(), PortionsToDrop, NDataLocks::ELockCategory::Cleanup); + auto portionsRemoveLock = std::make_shared( + TypeString() + "::PORTIONS_REMOVE::" + GetTaskIdentifier(), PortionsToRemove, NDataLocks::ELockCategory::Compaction); + auto tablesLock = std::make_shared( + TypeString() + "::TABLES::" + GetTaskIdentifier(), TablesToDrop, NDataLocks::ELockCategory::Tables); + return NDataLocks::TCompositeLock::Build(TypeString() + "::COMPOSITE::" + GetTaskIdentifier(), {portionsDropLock, portionsRemoveLock, tablesLock}); } public: @@ -68,6 +66,10 @@ class TCleanupPortionsColumnEngineChanges: public TColumnEngineChanges, PortionsToAccess->AddPortion(portion); } + void AddPortionToRemove(const TPortionInfo::TConstPtr& portion) { + PortionsToRemove.emplace_back(portion); + } + virtual ui32 GetWritePortionsCount() const override { return 0; } diff --git a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp index 7ebc69d5904c..d25abc4941fd 100644 --- a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp +++ b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp @@ -355,7 +355,7 @@ std::shared_ptr TColumnEngineForLogs::Start limitExceeded = true; break; } - changes->AddPortionToDrop(info); + changes->AddPortionToRemove(info); ++portionsFromDrop; } changes->AddTableToDrop(pathId); diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/source.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/source.cpp index c586ea83ff69..03235267db8e 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/source.cpp +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/source.cpp @@ -237,7 +237,7 @@ bool TPortionDataSource::DoStartFetchingAccessor(const std::shared_ptrHasPortionAccessor()); AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", step.GetName())("fetching_info", step.DebugString()); - std::shared_ptr request = std::make_shared(); + std::shared_ptr request = std::make_shared("PLAIN::" + step.GetName()); request->AddPortion(Portion); request->RegisterSubscriber(std::make_shared(step, sourcePtr)); GetContext()->GetCommonContext()->GetDataAccessorsManager()->AskData(request); diff --git a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/source.cpp b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/source.cpp index cecf792636c8..9ce0b5bd0ae0 100644 --- a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/source.cpp +++ b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/source.cpp @@ -229,7 +229,7 @@ bool TPortionDataSource::DoStartFetchingAccessor(const std::shared_ptrHasPortionAccessor()); AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", step.GetName())("fetching_info", step.DebugString()); - std::shared_ptr request = std::make_shared(); + std::shared_ptr request = std::make_shared("SIMPLE::" + step.GetName()); request->AddPortion(Portion); request->SetColumnIds(GetContext()->GetAllUsageColumns()->GetColumnIds()); request->RegisterSubscriber(std::make_shared(step, sourcePtr)); diff --git a/ydb/core/tx/columnshard/engines/reader/sys_view/chunks/chunks.cpp b/ydb/core/tx/columnshard/engines/reader/sys_view/chunks/chunks.cpp index 5634f37fcbfa..ff8d33e4d345 100644 --- a/ydb/core/tx/columnshard/engines/reader/sys_view/chunks/chunks.cpp +++ b/ydb/core/tx/columnshard/engines/reader/sys_view/chunks/chunks.cpp @@ -177,7 +177,7 @@ TConclusionStatus TStatsIterator::Start() { for (auto&& i : IndexGranules) { GroupGuards.emplace_back(NGroupedMemoryManager::TScanMemoryLimiterOperator::BuildGroupGuard(ReadMetadata->GetTxId(), 1)); for (auto&& p : i.GetPortions()) { - std::shared_ptr request = std::make_shared(); + std::shared_ptr request = std::make_shared("SYS_VIEW::CHUNKS"); request->AddPortion(p); auto allocation = std::make_shared(request, p->PredictMetadataMemorySize(columnsCount), Context); request->RegisterSubscriber(allocation); diff --git a/ydb/core/tx/columnshard/engines/storage/actualizer/tiering/tiering.cpp b/ydb/core/tx/columnshard/engines/storage/actualizer/tiering/tiering.cpp index 1c41b8a34c58..ab0e0990f9f8 100644 --- a/ydb/core/tx/columnshard/engines/storage/actualizer/tiering/tiering.cpp +++ b/ydb/core/tx/columnshard/engines/storage/actualizer/tiering/tiering.cpp @@ -269,7 +269,7 @@ std::vector TTieringActualizer::BuildMetadataRequests( std::shared_ptr currentRequest; for (auto&& i : NewPortionIds) { if (!currentRequest) { - currentRequest = std::make_shared(); + currentRequest = std::make_shared("TIERING_ACTUALIZER"); } auto it = portions.find(i); AFL_VERIFY(it != portions.end()); diff --git a/ydb/core/tx/columnshard/engines/storage/granule/granule.h b/ydb/core/tx/columnshard/engines/storage/granule/granule.h index 6f98d778f662..e5ead2a7ab47 100644 --- a/ydb/core/tx/columnshard/engines/storage/granule/granule.h +++ b/ydb/core/tx/columnshard/engines/storage/granule/granule.h @@ -217,11 +217,11 @@ class TGranuleMeta: TNonCopyable { NTabletFlatExecutor::TTransactionContext& txc, const TInsertWriteId insertWriteId, const TSnapshot& snapshot) const; void CommitPortionOnComplete(const TInsertWriteId insertWriteId, IColumnEngine& engine); - void AbortPortionOnExecute(NTabletFlatExecutor::TTransactionContext& txc, const TInsertWriteId insertWriteId) const { + void AbortPortionOnExecute(NTabletFlatExecutor::TTransactionContext& txc, const TInsertWriteId insertWriteId, const TSnapshot ssRemove) const { auto it = InsertedPortions.find(insertWriteId); AFL_VERIFY(it != InsertedPortions.end()); - it->second->SetCommitSnapshot(TSnapshot(1, 1)); - it->second->SetRemoveSnapshot(TSnapshot(1, 2)); + it->second->SetCommitSnapshot(ssRemove); + it->second->SetRemoveSnapshot(ssRemove); TDbWrapper wrapper(txc.DB, nullptr); it->second->SaveMetaToDatabase(wrapper); } @@ -301,7 +301,7 @@ class TGranuleMeta: TNonCopyable { OnAfterChangePortion(i.second, &g, true); } if (MetadataMemoryManager->NeedPrefetch() && Portions.size()) { - auto request = std::make_shared(); + auto request = std::make_shared("PREFETCH_GRANULE::" + ::ToString(PathId)); for (auto&& p : Portions) { request->AddPortion(p.second); } diff --git a/ydb/core/tx/columnshard/operations/write.cpp b/ydb/core/tx/columnshard/operations/write.cpp index df6ba684af3e..203def09fb6b 100644 --- a/ydb/core/tx/columnshard/operations/write.cpp +++ b/ydb/core/tx/columnshard/operations/write.cpp @@ -138,7 +138,7 @@ void TWriteOperation::AbortOnExecute(TColumnShard& owner, NTabletFlatExecutor::T owner.InsertTable->Abort(dbTable, writeIds); } else { for (auto&& i : InsertWriteIds) { - owner.MutableIndexAs().MutableGranuleVerified(PathId).AbortPortionOnExecute(txc, i); + owner.MutableIndexAs().MutableGranuleVerified(PathId).AbortPortionOnExecute(txc, i, owner.GetCurrentSnapshotForInternalModification()); } } }