Skip to content

Commit

Permalink
Split ttl tasks by rw address of storages read/write (ydb-platform#2644)
Browse files Browse the repository at this point in the history
  • Loading branch information
ivanmorozov333 authored Mar 12, 2024
1 parent 6173ab9 commit 7a4c215
Show file tree
Hide file tree
Showing 50 changed files with 1,255 additions and 482 deletions.
5 changes: 0 additions & 5 deletions ydb/core/tx/columnshard/background_controller.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,6 @@

namespace NKikimr::NColumnShard {

void TBackgroundController::StartTtl() {
Y_ABORT_UNLESS(!TtlStarted);
TtlStarted = true;
}

bool TBackgroundController::StartCompaction(const NOlap::TPlanCompactionInfo& info) {
Y_ABORT_UNLESS(ActiveCompactionInfo.emplace(info.GetPathId(), info).second);
return true;
Expand Down
10 changes: 0 additions & 10 deletions ydb/core/tx/columnshard/background_controller.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ class TBackgroundController {
TCurrentCompaction ActiveCompactionInfo;

bool ActiveCleanup = false;
bool TtlStarted = false;
YDB_READONLY(TMonotonic, LastIndexationInstant, TMonotonic::Zero());
public:
THashSet<NOlap::TPortionAddress> GetConflictTTLPortions() const;
Expand Down Expand Up @@ -90,15 +89,6 @@ class TBackgroundController {
bool IsCleanupActive() const {
return ActiveCleanup;
}

void StartTtl();
void FinishTtl() {
Y_ABORT_UNLESS(TtlStarted);
TtlStarted = false;
}
bool IsTtlActive() const {
return TtlStarted;
}
};

}
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@ bool TTxWriteIndex::Execute(TTransactionContext& txc, const TActorContext& ctx)
Y_ABORT_UNLESS(Self->TablesManager.HasPrimaryIndex());
txc.DB.NoMoreReadsForTx();

ACFL_DEBUG("event", "TTxWriteIndex::Execute")("change_type", changes->TypeString())("details", *changes);
ACFL_DEBUG("event", "TTxWriteIndex::Execute")("change_type", changes->TypeString())("details", changes->DebugString());
if (Ev->Get()->GetPutStatus() == NKikimrProto::OK) {
NOlap::TSnapshot snapshot(Self->LastPlannedStep, Self->LastPlannedTxId);
Y_ABORT_UNLESS(Ev->Get()->IndexInfo.GetLastSchema()->GetSnapshot() <= snapshot);
Y_ABORT_UNLESS(Ev->Get()->IndexInfo->GetLastSchema()->GetSnapshot() <= snapshot);

TBlobGroupSelector dsGroupSelector(Self->Info());
NOlap::TDbWrapper dbWrap(txc.DB, &dsGroupSelector);
Expand Down Expand Up @@ -51,7 +51,7 @@ void TTxWriteIndex::Complete(const TActorContext& ctx) {
TLogContextGuard gLogging(NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD)("tablet_id", Self->TabletID()));
CompleteReady = true;
auto changes = Ev->Get()->IndexChanges;
ACFL_DEBUG("event", "TTxWriteIndex::Complete")("change_type", changes->TypeString())("details", *changes);
ACFL_DEBUG("event", "TTxWriteIndex::Complete")("change_type", changes->TypeString())("details", changes->DebugString());

const ui64 blobsWritten = changes->GetBlobsAction().GetWritingBlobsCount();
const ui64 bytesWritten = changes->GetBlobsAction().GetWritingTotalSize();
Expand Down
4 changes: 2 additions & 2 deletions ydb/core/tx/columnshard/columnshard__stats_scan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ void TStatsIterator::AppendStats(const std::vector<std::unique_ptr<arrow::ArrayB
NArrow::Append<arrow::StringType>(*builders[9], blobIdString);
NArrow::Append<arrow::UInt64Type>(*builders[10], r->BlobRange.Offset);
NArrow::Append<arrow::UInt64Type>(*builders[11], r->BlobRange.Size);
NArrow::Append<arrow::BooleanType>(*builders[12], !portion.HasRemoveSnapshot() || ReadMetadata->GetRequestSnapshot() < portion.GetRemoveSnapshot());
NArrow::Append<arrow::BooleanType>(*builders[12], !portion.IsRemovedFor(ReadMetadata->GetRequestSnapshot()));

const auto tierName = portionSchema->GetIndexInfo().GetEntityStorageId(r->GetColumnId(), portion.GetMeta().GetTierName());
std::string strTierName(tierName.data(), tierName.size());
Expand Down Expand Up @@ -114,7 +114,7 @@ void TStatsIterator::AppendStats(const std::vector<std::unique_ptr<arrow::ArrayB
NArrow::Append<arrow::StringType>(*builders[9], blobIdString);
NArrow::Append<arrow::UInt64Type>(*builders[10], r->GetBlobRange().Offset);
NArrow::Append<arrow::UInt64Type>(*builders[11], r->GetBlobRange().Size);
NArrow::Append<arrow::BooleanType>(*builders[12], !portion.HasRemoveSnapshot() || ReadMetadata->GetRequestSnapshot() < portion.GetRemoveSnapshot());
NArrow::Append<arrow::BooleanType>(*builders[12], !portion.IsRemovedFor(ReadMetadata->GetRequestSnapshot()));
const auto tierName = portionSchema->GetIndexInfo().GetEntityStorageId(r->GetIndexId(), portion.GetMeta().GetTierName());
std::string strTierName(tierName.data(), tierName.size());
NArrow::Append<arrow::StringType>(*builders[13], strTierName);
Expand Down
56 changes: 27 additions & 29 deletions ydb/core/tx/columnshard/columnshard_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -554,7 +554,7 @@ class TChangesTask: public NConveyor::ITask {
virtual bool DoExecute() override {
NActors::TLogContextGuard g(NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD)("tablet_id", TabletId)("parent_id", ParentActorId));
{
NOlap::TConstructionContext context(TxEvent->IndexInfo, Counters);
NOlap::TConstructionContext context(*TxEvent->IndexInfo, Counters);
Y_ABORT_UNLESS(TxEvent->IndexChanges->ConstructBlobs(context).Ok());
if (!TxEvent->IndexChanges->GetWritePortionsCount()) {
TxEvent->SetPutStatus(NKikimrProto::OK);
Expand Down Expand Up @@ -601,7 +601,8 @@ class TChangesReadTask: public NOlap::NBlobOperations::NRead::ITask {
}
virtual bool DoOnError(const TString& storageId, const NOlap::TBlobRange& range, const NOlap::IBlobsReadingAction::TErrorStatus& status) override {
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "DoOnError")("storage_id", storageId)("blob_id", range)("status", status.GetErrorMessage())("status_code", status.GetStatus());
AFL_VERIFY(false)("blob_id", range)("status", status.GetStatus());
AFL_VERIFY(false)("blob_id", range)("status", status.GetStatus())("error", status.GetErrorMessage())("type", TxEvent->IndexChanges->TypeString())("task_id", TxEvent->IndexChanges->GetTaskIdentifier())
("debug", TxEvent->IndexChanges->DebugString());
TxEvent->SetPutStatus(NKikimrProto::ERROR);
TActorContext::AsActorContext().Send(ParentActorId, std::move(TxEvent));
return false;
Expand Down Expand Up @@ -631,9 +632,9 @@ void TColumnShard::StartIndexTask(std::vector<const NOlap::TInsertedData*>&& dat
auto indexChanges = TablesManager.MutablePrimaryIndex().StartInsert(std::move(data));
Y_ABORT_UNLESS(indexChanges);

auto actualIndexInfo = TablesManager.GetPrimaryIndex()->GetVersionedIndex();
auto actualIndexInfo = std::make_shared<NOlap::TVersionedIndex>(TablesManager.GetPrimaryIndex()->GetVersionedIndex());
indexChanges->Start(*this);
auto ev = std::make_unique<TEvPrivate::TEvWriteIndex>(std::move(actualIndexInfo), indexChanges, Settings.CacheDataAfterIndexing);
auto ev = std::make_unique<TEvPrivate::TEvWriteIndex>(actualIndexInfo, indexChanges, Settings.CacheDataAfterIndexing);

const TString externalTaskId = indexChanges->GetTaskIdentifier();
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "indexation")("bytes", bytesToIndex)("blobs_count", dataToIndex.size())("max_limit", (i64)Limits.MaxInsertBytes)
Expand Down Expand Up @@ -712,8 +713,8 @@ void TColumnShard::SetupCompaction() {

indexChanges->Start(*this);

auto actualIndexInfo = TablesManager.GetPrimaryIndex()->GetVersionedIndex();
auto ev = std::make_unique<TEvPrivate::TEvWriteIndex>(std::move(actualIndexInfo), indexChanges, Settings.CacheDataAfterCompaction);
auto actualIndexInfo = std::make_shared<NOlap::TVersionedIndex>(TablesManager.GetPrimaryIndex()->GetVersionedIndex());
auto ev = std::make_unique<TEvPrivate::TEvWriteIndex>(actualIndexInfo, indexChanges, Settings.CacheDataAfterCompaction);
const TString externalTaskId = indexChanges->GetTaskIdentifier();
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "compaction")("external_task_id", externalTaskId);

Expand All @@ -731,37 +732,34 @@ bool TColumnShard::SetupTtl(const THashMap<ui64, NOlap::TTiering>& pathTtls) {
return false;
}
CSCounters.OnSetupTtl();
if (BackgroundController.IsTtlActive()) {
ACFL_DEBUG("background", "ttl")("skip_reason", "in_progress");
return false;
}
THashMap<ui64, NOlap::TTiering> eviction = pathTtls;
for (auto&& i : eviction) {
ACFL_DEBUG("background", "ttl")("path", i.first)("info", i.second.GetDebugString());
}

auto actualIndexInfo = TablesManager.GetPrimaryIndex()->GetVersionedIndex();
auto actualIndexInfo = std::make_shared<NOlap::TVersionedIndex>(TablesManager.GetPrimaryIndex()->GetVersionedIndex());
const ui64 memoryUsageLimit = HasAppData() ? AppDataVerified().ColumnShardConfig.GetTieringsMemoryLimit() : ((ui64)512 * 1024 * 1024);
std::shared_ptr<NOlap::TTTLColumnEngineChanges> indexChanges = TablesManager.MutablePrimaryIndex().StartTtl(eviction, DataLocksManager, memoryUsageLimit);
std::vector<std::shared_ptr<NOlap::TTTLColumnEngineChanges>> indexChanges = TablesManager.MutablePrimaryIndex().StartTtl(eviction, DataLocksManager, memoryUsageLimit);

if (!indexChanges) {
if (indexChanges.empty()) {
ACFL_DEBUG("background", "ttl")("skip_reason", "no_changes");
return false;
}
const TString externalTaskId = indexChanges->GetTaskIdentifier();
const bool needWrites = indexChanges->NeedConstruction();
ACFL_DEBUG("background", "ttl")("need_writes", needWrites);

indexChanges->Start(*this);
auto ev = std::make_unique<TEvPrivate::TEvWriteIndex>(std::move(actualIndexInfo), indexChanges, false);
NYDBTest::TControllers::GetColumnShardController()->OnWriteIndexStart(TabletID(), indexChanges->TypeString());
if (needWrites) {
NOlap::NResourceBroker::NSubscribe::ITask::StartResourceSubscription(
ResourceSubscribeActor, std::make_shared<NOlap::NBlobOperations::NRead::ITask::TReadSubscriber>(
std::make_shared<TChangesReadTask>(std::move(ev), SelfId(), TabletID(), CompactionCounters), 0, indexChanges->CalcMemoryForUsage(), externalTaskId, TTLTaskSubscription));
} else {
ev->SetPutStatus(NKikimrProto::OK);
ActorContext().Send(SelfId(), std::move(ev));
for (auto&& i : indexChanges) {
const TString externalTaskId = i->GetTaskIdentifier();
const bool needWrites = i->NeedConstruction();
ACFL_DEBUG("background", "ttl")("need_writes", needWrites);
i->Start(*this);
auto ev = std::make_unique<TEvPrivate::TEvWriteIndex>(actualIndexInfo, i, false);
NYDBTest::TControllers::GetColumnShardController()->OnWriteIndexStart(TabletID(), i->TypeString());
if (needWrites) {
NOlap::NResourceBroker::NSubscribe::ITask::StartResourceSubscription(
ResourceSubscribeActor, std::make_shared<NOlap::NBlobOperations::NRead::ITask::TReadSubscriber>(
std::make_shared<TChangesReadTask>(std::move(ev), SelfId(), TabletID(), CompactionCounters), 0, i->CalcMemoryForUsage(), externalTaskId, TTLTaskSubscription));
} else {
ev->SetPutStatus(NKikimrProto::OK);
ActorContext().Send(SelfId(), std::move(ev));
}
}
return true;
}
Expand All @@ -782,8 +780,8 @@ void TColumnShard::SetupCleanup() {
}

ACFL_DEBUG("background", "cleanup")("changes_info", changes->DebugString());
auto actualIndexInfo = TablesManager.GetPrimaryIndex()->GetVersionedIndex();
auto ev = std::make_unique<TEvPrivate::TEvWriteIndex>(std::move(actualIndexInfo), changes, false);
auto actualIndexInfo = std::make_shared<NOlap::TVersionedIndex>(TablesManager.GetPrimaryIndex()->GetVersionedIndex());
auto ev = std::make_unique<TEvPrivate::TEvWriteIndex>(actualIndexInfo, changes, false);
ev->SetPutStatus(NKikimrProto::OK); // No new blobs to write

changes->Start(*this);
Expand Down
6 changes: 3 additions & 3 deletions ydb/core/tx/columnshard/columnshard_private_events.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,18 +75,18 @@ struct TEvPrivate {

/// Common event for Indexing and GranuleCompaction: write index data in TTxWriteIndex transaction.
struct TEvWriteIndex : public TEventLocal<TEvWriteIndex, EvWriteIndex> {
NOlap::TVersionedIndex IndexInfo;
std::shared_ptr<NOlap::TVersionedIndex> IndexInfo;
std::shared_ptr<NOlap::TColumnEngineChanges> IndexChanges;
bool GranuleCompaction{false};
TUsage ResourceUsage;
bool CacheData{false};
TDuration Duration;
TBlobPutResult::TPtr PutResult;

TEvWriteIndex(NOlap::TVersionedIndex&& indexInfo,
TEvWriteIndex(const std::shared_ptr<NOlap::TVersionedIndex>& indexInfo,
std::shared_ptr<NOlap::TColumnEngineChanges> indexChanges,
bool cacheData)
: IndexInfo(std::move(indexInfo))
: IndexInfo(indexInfo)
, IndexChanges(indexChanges)
, CacheData(cacheData)
{
Expand Down
22 changes: 22 additions & 0 deletions ydb/core/tx/columnshard/counters/engine_logs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,21 @@ TEngineLogsCounters::TEngineLogsCounters()

PortionToDropCount = TBase::GetDeriviative("Ttl/PortionToDrop/Count");
PortionToDropBytes = TBase::GetDeriviative("Ttl/PortionToDrop/Bytes");
PortionToDropLag = TBase::GetHistogram("Ttl/PortionToDrop/Lag/Duration", NMonitoring::ExponentialHistogram(18, 2, 5));
SkipDeleteWithProcessMemory = TBase::GetHistogram("Ttl/PortionToDrop/Skip/ProcessMemory/Lag/Duration", NMonitoring::ExponentialHistogram(18, 2, 5));
SkipDeleteWithTxLimit = TBase::GetHistogram("Ttl/PortionToDrop/Skip/TxLimit/Lag/Duration", NMonitoring::ExponentialHistogram(18, 2, 5));

PortionToEvictCount = TBase::GetDeriviative("Ttl/PortionToEvict/Count");
PortionToEvictBytes = TBase::GetDeriviative("Ttl/PortionToEvict/Bytes");
PortionToEvictLag = TBase::GetHistogram("Ttl/PortionToEvict/Lag/Duration", NMonitoring::ExponentialHistogram(18, 2, 5));
SkipEvictionWithProcessMemory = TBase::GetHistogram("Ttl/PortionToEvict/Skip/ProcessMemory/Lag/Duration", NMonitoring::ExponentialHistogram(18, 2, 5));
SkipEvictionWithTxLimit = TBase::GetHistogram("Ttl/PortionToEvict/Skip/TxLimit/Lag/Duration", NMonitoring::ExponentialHistogram(18, 2, 5));

ActualizationTaskSizeRemove = TBase::GetHistogram("Actualization/RemoveTasks/Size", NMonitoring::ExponentialHistogram(18, 2));
ActualizationTaskSizeEvict = TBase::GetHistogram("Actualization/EvictTasks/Size", NMonitoring::ExponentialHistogram(18, 2));

ActualizationSkipRWProgressCount = TBase::GetDeriviative("Actualization/Skip/RWProgress/Count");
ActualizationSkipTooFreshPortion = TBase::GetHistogram("Actualization//Skip/TooFresh/Duration", NMonitoring::LinearHistogram(12, 0, 360));

PortionNoTtlColumnCount = TBase::GetDeriviative("Ttl/PortionNoTtlColumn/Count");
PortionNoTtlColumnBytes = TBase::GetDeriviative("Ttl/PortionNoTtlColumn/Bytes");
Expand All @@ -52,6 +64,16 @@ TEngineLogsCounters::TEngineLogsCounters()
ChunkUsageForTTLCount = TBase::GetDeriviative("Ttl/ChunkUsageForTTLCount/Count");
}

void TEngineLogsCounters::OnActualizationTask(const ui32 evictCount, const ui32 removeCount) const {
AFL_VERIFY(evictCount * removeCount == 0)("evict", evictCount)("remove", removeCount);
AFL_VERIFY(evictCount + removeCount);
if (evictCount) {
ActualizationTaskSizeEvict->Collect(evictCount);
} else {
ActualizationTaskSizeRemove->Collect(removeCount);
}
}

void TEngineLogsCounters::TPortionsInfoGuard::OnNewPortion(const std::shared_ptr<NOlap::TPortionInfo>& portion) const {
const ui32 producedId = (ui32)(portion->HasRemoveSnapshot() ? NOlap::NPortion::EProduced::INACTIVE : portion->GetMeta().Produced);
Y_ABORT_UNLESS(producedId < BlobGuards.size());
Expand Down
45 changes: 43 additions & 2 deletions ydb/core/tx/columnshard/counters/engine_logs.h
Original file line number Diff line number Diff line change
Expand Up @@ -236,9 +236,21 @@ class TEngineLogsCounters: public TCommonCountersOwner {
using TBase = TCommonCountersOwner;
NMonitoring::TDynamicCounters::TCounterPtr PortionToDropCount;
NMonitoring::TDynamicCounters::TCounterPtr PortionToDropBytes;
NMonitoring::THistogramPtr PortionToDropLag;
NMonitoring::THistogramPtr SkipDeleteWithProcessMemory;
NMonitoring::THistogramPtr SkipDeleteWithTxLimit;

NMonitoring::TDynamicCounters::TCounterPtr PortionToEvictCount;
NMonitoring::TDynamicCounters::TCounterPtr PortionToEvictBytes;
NMonitoring::THistogramPtr PortionToEvictLag;
NMonitoring::THistogramPtr SkipEvictionWithProcessMemory;
NMonitoring::THistogramPtr SkipEvictionWithTxLimit;

NMonitoring::THistogramPtr ActualizationTaskSizeRemove;
NMonitoring::THistogramPtr ActualizationTaskSizeEvict;

NMonitoring::TDynamicCounters::TCounterPtr ActualizationSkipRWProgressCount;
NMonitoring::THistogramPtr ActualizationSkipTooFreshPortion;

NMonitoring::TDynamicCounters::TCounterPtr PortionNoTtlColumnCount;
NMonitoring::TDynamicCounters::TCounterPtr PortionNoTtlColumnBytes;
Expand All @@ -253,6 +265,7 @@ class TEngineLogsCounters: public TCommonCountersOwner {
std::vector<std::shared_ptr<TIncrementalHistogram>> BlobSizeDistribution;
std::vector<std::shared_ptr<TIncrementalHistogram>> PortionSizeDistribution;
std::vector<std::shared_ptr<TIncrementalHistogram>> PortionRecordsDistribution;

public:

class TPortionsInfoGuard {
Expand Down Expand Up @@ -282,6 +295,8 @@ class TEngineLogsCounters: public TCommonCountersOwner {

};

void OnActualizationTask(const ui32 evictCount, const ui32 removeCount) const;

TPortionsInfoGuard BuildPortionBlobsGuard() const {
return TPortionsInfoGuard(BlobSizeDistribution, PortionSizeDistribution, PortionRecordsDistribution);
}
Expand All @@ -290,14 +305,40 @@ class TEngineLogsCounters: public TCommonCountersOwner {
return GranuleDataAgent.RegisterClient();
}

void OnPortionToEvict(const ui64 size) const {
void OnActualizationSkipRWProgress() const {
ActualizationSkipRWProgressCount->Add(1);
}

void OnActualizationSkipTooFreshPortion(const TDuration dWait) const {
ActualizationSkipTooFreshPortion->Collect(dWait.Seconds());
}

void OnSkipDeleteWithProcessMemory(const TDuration lag) const {
SkipDeleteWithProcessMemory->Collect(lag.Seconds());
}

void OnSkipDeleteWithTxLimit(const TDuration lag) const {
SkipDeleteWithTxLimit->Collect(lag.Seconds());
}

void OnSkipEvictionWithProcessMemory(const TDuration lag) const {
SkipEvictionWithProcessMemory->Collect(lag.Seconds());
}

void OnSkipEvictionWithTxLimit(const TDuration lag) const {
SkipEvictionWithTxLimit->Collect(lag.Seconds());
}

void OnPortionToEvict(const ui64 size, const TDuration lag) const {
PortionToEvictCount->Add(1);
PortionToEvictBytes->Add(size);
PortionToEvictLag->Collect(lag.Seconds());
}

void OnPortionToDrop(const ui64 size) const {
void OnPortionToDrop(const ui64 size, const TDuration lag) const {
PortionToDropCount->Add(1);
PortionToDropBytes->Add(size);
PortionToDropLag->Collect(lag.Seconds());
}

void OnPortionNoTtlColumn(const ui64 size) const {
Expand Down
Loading

0 comments on commit 7a4c215

Please sign in to comment.