Skip to content

Commit

Permalink
signals for tablet initialization (ydb-platform#7281)
Browse files Browse the repository at this point in the history
  • Loading branch information
ivanmorozov333 authored and zverevgeny committed Sep 13, 2024
1 parent 205ca3d commit 89c38fa
Show file tree
Hide file tree
Showing 8 changed files with 81 additions and 17 deletions.
4 changes: 4 additions & 0 deletions ydb/core/tx/columnshard/columnshard.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,13 @@ void TColumnShard::SwitchToWork(const TActorContext& ctx) {
BackgroundSessionsManager->Start();
ctx.Send(SelfId(), new TEvPrivate::TEvPeriodicWakeup());
NYDBTest::TControllers::GetColumnShardController()->OnSwitchToWork(TabletID());
AFL_VERIFY(!!StartInstant);
CSCounters.Initialization.OnSwitchToWork(TMonotonic::Now() - *StartInstant, TMonotonic::Now() - CreateInstant);
}

void TColumnShard::OnActivateExecutor(const TActorContext& ctx) {
StartInstant = TMonotonic::Now();
CSCounters.Initialization.OnActivateExecutor(TMonotonic::Now() - CreateInstant);
const TLogContextGuard gLogging = NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD)("tablet_id", TabletID())("self_id", SelfId());
AFL_INFO(NKikimrServices::TX_COLUMNSHARD)("event", "initialize_shard")("step", "OnActivateExecutor");
Executor()->RegisterExternalTabletCounters(TabletCountersPtr.release());
Expand Down
11 changes: 11 additions & 0 deletions ydb/core/tx/columnshard/columnshard__init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ namespace NKikimr::NColumnShard {
using namespace NTabletFlatExecutor;

class TTxInit : public TTransactionBase<TColumnShard> {
private:
const TMonotonic StartInstant = TMonotonic::Now();

public:
TTxInit(TColumnShard* self)
: TBase(self)
Expand Down Expand Up @@ -250,13 +253,16 @@ bool TTxInit::Execute(TTransactionContext& txc, const TActorContext& ctx) {
}

void TTxInit::Complete(const TActorContext& ctx) {
Self->CSCounters.Initialization.OnTxInitFinished(TMonotonic::Now() - StartInstant);
Self->ProgressTxController->OnTabletInit();
Self->SwitchToWork(ctx);
NYDBTest::TControllers::GetColumnShardController()->OnTabletInitCompleted(*Self);
}

class TTxUpdateSchema : public TTransactionBase<TColumnShard> {
std::vector<NOlap::INormalizerTask::TPtr> NormalizerTasks;
const TMonotonic StartInstant = TMonotonic::Now();

public:
TTxUpdateSchema(TColumnShard* self)
: TBase(self)
Expand Down Expand Up @@ -295,6 +301,7 @@ bool TTxUpdateSchema::Execute(TTransactionContext& txc, const TActorContext&) {

void TTxUpdateSchema::Complete(const TActorContext& ctx) {
AFL_INFO(NKikimrServices::TX_COLUMNSHARD)("step", "TTxUpdateSchema.Complete");
Self->CSCounters.Initialization.OnTxUpdateSchemaFinished(TMonotonic::Now() - StartInstant);
if (NormalizerTasks.empty()) {
AFL_VERIFY(Self->NormalizerController.IsNormalizationFinished())("details", Self->NormalizerController.DebugString());
Self->Execute(new TTxInit(Self), ctx);
Expand Down Expand Up @@ -360,6 +367,9 @@ void TTxApplyNormalizer::Complete(const TActorContext& ctx) {

/// Create local database on tablet start if none
class TTxInitSchema : public TTransactionBase<TColumnShard> {
private:
const TMonotonic StartInstant = TMonotonic::Now();

public:
TTxInitSchema(TColumnShard* self)
: TBase(self)
Expand Down Expand Up @@ -422,6 +432,7 @@ bool TTxInitSchema::Execute(TTransactionContext& txc, const TActorContext&) {
}

void TTxInitSchema::Complete(const TActorContext& ctx) {
Self->CSCounters.Initialization.OnTxInitSchemaFinished(TMonotonic::Now() - StartInstant);
LOG_S_DEBUG("TxInitSchema.Complete at tablet " << Self->TabletID(););
Self->Execute(new TTxUpdateSchema(Self), ctx);
}
Expand Down
3 changes: 3 additions & 0 deletions ydb/core/tx/columnshard/columnshard_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -418,6 +418,9 @@ class TColumnShard
using TSchemaPreset = TSchemaPreset;
using TTableInfo = TTableInfo;

const TMonotonic CreateInstant = TMonotonic::Now();
std::optional<TMonotonic> StartInstant;

struct TLongTxWriteInfo {
ui64 WriteId;
ui32 WritePartId;
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/tx/columnshard/counters/columnshard.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ namespace NKikimr::NColumnShard {

TCSCounters::TCSCounters()
: TBase("CS")
{
, Initialization(*this) {
StartBackgroundCount = TBase::GetDeriviative("StartBackground/Count");
TooEarlyBackgroundCount = TBase::GetDeriviative("TooEarlyBackground/Count");
SetupCompactionCount = TBase::GetDeriviative("SetupCompaction/Count");
Expand Down
53 changes: 53 additions & 0 deletions ydb/core/tx/columnshard/counters/columnshard.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,55 @@ enum class EWriteFailReason {
Overload /* "overload" */
};

class TCSInitialization: public TCommonCountersOwner {
private:
using TBase = TCommonCountersOwner;

const NMonitoring::THistogramPtr HistogramTabletInitializationMs;
const NMonitoring::THistogramPtr HistogramTxInitDurationMs;
const NMonitoring::THistogramPtr HistogramTxUpdateSchemaDurationMs;
const NMonitoring::THistogramPtr HistogramTxInitSchemaDurationMs;
const NMonitoring::THistogramPtr HistogramActivateExecutorFromActivationDurationMs;
const NMonitoring::THistogramPtr HistogramSwitchToWorkFromActivationDurationMs;
const NMonitoring::THistogramPtr HistogramSwitchToWorkFromCreateDurationMs;

public:

void OnTxInitFinished(const TDuration d) const {
HistogramTxInitDurationMs->Collect(d.MilliSeconds());
}

void OnTxUpdateSchemaFinished(const TDuration d) const {
HistogramTxUpdateSchemaDurationMs->Collect(d.MilliSeconds());
}

void OnTxInitSchemaFinished(const TDuration d) const {
HistogramTxInitSchemaDurationMs->Collect(d.MilliSeconds());
}

void OnActivateExecutor(const TDuration fromCreate) const {
HistogramActivateExecutorFromActivationDurationMs->Collect(fromCreate.MilliSeconds());
}
void OnSwitchToWork(const TDuration fromStart, const TDuration fromCreate) const {
HistogramSwitchToWorkFromActivationDurationMs->Collect(fromStart.MilliSeconds());
HistogramSwitchToWorkFromCreateDurationMs->Collect(fromCreate.MilliSeconds());
}

TCSInitialization(TCommonCountersOwner& owner)
: TBase(owner, "stage", "initialization")
, HistogramTabletInitializationMs(TBase::GetHistogram("TabletInitializationMs", NMonitoring::ExponentialHistogram(15, 2, 32)))
, HistogramTxInitDurationMs(TBase::GetHistogram("TxInitDurationMs", NMonitoring::ExponentialHistogram(15, 2, 32)))
, HistogramTxUpdateSchemaDurationMs(TBase::GetHistogram("TxInitDurationMs", NMonitoring::ExponentialHistogram(15, 2, 32)))
, HistogramTxInitSchemaDurationMs(TBase::GetHistogram("TxInitSchemaDurationMs", NMonitoring::ExponentialHistogram(15, 2, 32)))
, HistogramActivateExecutorFromActivationDurationMs(
TBase::GetHistogram("ActivateExecutorFromActivationDurationMs", NMonitoring::ExponentialHistogram(15, 2, 32)))
, HistogramSwitchToWorkFromActivationDurationMs(
TBase::GetHistogram("SwitchToWorkFromActivationDurationMs", NMonitoring::ExponentialHistogram(15, 2, 32)))
, HistogramSwitchToWorkFromCreateDurationMs(
TBase::GetHistogram("SwitchToWorkFromCreateDurationMs", NMonitoring::ExponentialHistogram(15, 2, 32))) {
}
};

class TCSCounters: public TCommonCountersOwner {
private:
using TBase = TCommonCountersOwner;
Expand Down Expand Up @@ -62,11 +111,15 @@ class TCSCounters: public TCommonCountersOwner {
NMonitoring::THistogramPtr HistogramSuccessWriteMiddle6PutBlobsDurationMs;
NMonitoring::THistogramPtr HistogramFailedWritePutBlobsDurationMs;
NMonitoring::THistogramPtr HistogramWriteTxCompleteDurationMs;

NMonitoring::TDynamicCounters::TCounterPtr WritePutBlobsCount;
NMonitoring::TDynamicCounters::TCounterPtr WriteRequests;
THashMap<EWriteFailReason, NMonitoring::TDynamicCounters::TCounterPtr> FailedWriteRequests;
NMonitoring::TDynamicCounters::TCounterPtr SuccessWriteRequests;

public:
const TCSInitialization Initialization;

void OnStartWriteRequest() const {
WriteRequests->Add(1);
}
Expand Down
1 change: 0 additions & 1 deletion ydb/core/tx/columnshard/engines/predicate/range.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ bool TPKRangeFilter::IsPortionInUsage(const TPortionInfo& info) const {
return false;
}
}

// AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("start", info.IndexKeyStart().DebugString())("end", info.IndexKeyEnd().DebugString())(
// "from", PredicateFrom.DebugString())("to", PredicateTo.DebugString());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,8 +120,8 @@ TConclusion<bool> TDeletionFilter::DoExecuteInplace(const std::shared_ptr<IDataS

TConclusion<bool> TShardingFilter::DoExecuteInplace(const std::shared_ptr<IDataSource>& source, const TFetchingScriptCursor& /*step*/) const {
NYDBTest::TControllers::GetColumnShardController()->OnSelectShardingFilter();
auto filter = source->GetContext()->GetReadMetadata()->GetRequestShardingInfo()->GetShardingInfo()->GetFilter(
source->GetStageData().GetTable()->BuildTableVerified());
const auto& shardingInfo = source->GetContext()->GetReadMetadata()->GetRequestShardingInfo()->GetShardingInfo();
auto filter = shardingInfo->GetFilter(source->GetStageData().GetTable()->BuildTableVerified());
source->MutableStageData().AddFilter(filter);
return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,23 +217,17 @@ TConclusionStatus TScanHead::DetectSourcesFeatureInContextIntervalScan(const THa
}
const ui64 startMemory = optimizer.GetMemorySum();
if (!optimizer.Optimize(Context->ReduceMemoryIntervalLimit) && Context->RejectMemoryIntervalLimit < optimizer.GetMemorySum()) {
AFL_ERROR(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "next_internal_broken")
("reason", "a lot of memory need")("start", startMemory)
("reduce_limit", Context->ReduceMemoryIntervalLimit)
("reject_limit", Context->RejectMemoryIntervalLimit)
("need", optimizer.GetMemorySum())
("path_ids", JoinSeq(",", optimizer.GetPathIds()))
("details", IS_LOG_PRIORITY_ENABLED(NActors::NLog::PRI_DEBUG, NKikimrServices::TX_COLUMNSHARD_SCAN) ? optimizer.DebugString() : "NEED_DEBUG_LEVEL");
AFL_ERROR(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "next_internal_broken")("reason", "a lot of memory need")("start", startMemory)(
"reduce_limit", Context->ReduceMemoryIntervalLimit)("reject_limit", Context->RejectMemoryIntervalLimit)(
"need", optimizer.GetMemorySum())("path_ids", JoinSeq(",", optimizer.GetPathIds()))(
"details", IS_LOG_PRIORITY_ENABLED(NActors::NLog::PRI_DEBUG, NKikimrServices::TX_COLUMNSHARD_SCAN) ? optimizer.DebugString() : "NEED_DEBUG_LEVEL");
Context->GetCommonContext()->GetCounters().OnOptimizedIntervalMemoryFailed(optimizer.GetMemorySum());
return TConclusionStatus::Fail("We need a lot of memory in time for interval scanner: " +
::ToString(optimizer.GetMemorySum()) + " path_ids: " + JoinSeq(",", optimizer.GetPathIds()) + ". We need wait compaction processing. Sorry.");
} else if (optimizer.GetMemorySum() < startMemory) {
AFL_INFO(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "memory_reduce_active")
("reason", "need reduce memory")("start", startMemory)
("reduce_limit", Context->ReduceMemoryIntervalLimit)
("reject_limit", Context->RejectMemoryIntervalLimit)
("need", optimizer.GetMemorySum())
("path_ids", JoinSeq(",", optimizer.GetPathIds()));
AFL_INFO(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "memory_reduce_active")("reason", "need reduce memory")("start", startMemory)(
"reduce_limit", Context->ReduceMemoryIntervalLimit)("reject_limit", Context->RejectMemoryIntervalLimit)(
"need", optimizer.GetMemorySum())("path_ids", JoinSeq(",", optimizer.GetPathIds()));
Context->GetCommonContext()->GetCounters().OnOptimizedIntervalMemoryReduced(startMemory - optimizer.GetMemorySum());
}
Context->GetCommonContext()->GetCounters().OnOptimizedIntervalMemoryRequired(optimizer.GetMemorySum());
Expand Down

0 comments on commit 89c38fa

Please sign in to comment.