Skip to content

Commit

Permalink
Merge c728dc7 into 0f64a08
Browse files Browse the repository at this point in the history
  • Loading branch information
alexd65536 authored Jan 29, 2024
2 parents 0f64a08 + c728dc7 commit 28695f6
Show file tree
Hide file tree
Showing 11 changed files with 221 additions and 94 deletions.
4 changes: 4 additions & 0 deletions ydb/core/protos/statistics.proto
Original file line number Diff line number Diff line change
Expand Up @@ -56,3 +56,7 @@ message TEvPropagateStatistics {
}
repeated TStatsEntry Entries = 2;
}

// SA -> nodes
message TEvStatisticsIsDisabled {
}
53 changes: 48 additions & 5 deletions ydb/core/statistics/aggregator/aggregator_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,27 @@ void TStatisticsAggregator::DefaultSignalTabletActive(const TActorContext& ctx)
Y_UNUSED(ctx);
}

void TStatisticsAggregator::SubscribeForConfigChanges(const TActorContext& ctx) {
ui32 configKind = (ui32)NKikimrConsole::TConfigItem::FeatureFlagsItem;
ctx.Send(NConsole::MakeConfigsDispatcherID(ctx.SelfID.NodeId()),
new NConsole::TEvConfigsDispatcher::TEvSetConfigSubscriptionRequest({configKind}));
}

void TStatisticsAggregator::HandleConfig(NConsole::TEvConfigsDispatcher::TEvSetConfigSubscriptionResponse::TPtr&) {
SA_LOG_I("[" << TabletID() << "] Subscribed for config changes");
}

void TStatisticsAggregator::HandleConfig(NConsole::TEvConsole::TEvConfigNotificationRequest::TPtr& ev) {
const auto& record = ev->Get()->Record;
const auto& config = record.GetConfig();
if (config.HasFeatureFlags()) {
const auto& featureFlags = config.GetFeatureFlags();
EnableStatistics = featureFlags.GetEnableStatistics();
}
auto response = std::make_unique<NConsole::TEvConsole::TEvConfigNotificationResponse>(record);
Send(ev->Sender, response.release(), 0, ev->Cookie);
}

void TStatisticsAggregator::Handle(TEvTabletPipe::TEvServerConnected::TPtr &ev) {
auto pipeServerId = ev->Get()->ServerId;

Expand Down Expand Up @@ -97,6 +118,12 @@ void TStatisticsAggregator::Handle(TEvStatistics::TEvConnectNode::TPtr& ev) {
RequestedSchemeShards.insert(ssEntry.GetSchemeShardId());
}

if (!EnableStatistics) {
auto disabled = std::make_unique<TEvStatistics::TEvStatisticsIsDisabled>();
Send(NStat::MakeStatServiceID(nodeId), disabled.release());
return;
}

if (!IsPropagateInFlight) {
Schedule(PropagateInterval, new TEvPrivate::TEvPropagate());
IsPropagateInFlight = true;
Expand Down Expand Up @@ -124,6 +151,12 @@ void TStatisticsAggregator::Handle(TEvStatistics::TEvRequestStats::TPtr& ev) {
<< ", node id = " << nodeId
<< ", schemeshard count = " << record.NeedSchemeShardsSize());

if (!EnableStatistics) {
auto disabled = std::make_unique<TEvStatistics::TEvStatisticsIsDisabled>();
Send(NStat::MakeStatServiceID(nodeId), disabled.release());
return;
}

std::vector<TSSId> ssIds;
ssIds.reserve(record.NeedSchemeShardsSize());
for (const auto& ssId : record.GetNeedSchemeShards()) {
Expand Down Expand Up @@ -151,6 +184,10 @@ void TStatisticsAggregator::Handle(TEvStatistics::TEvConnectSchemeShard::TPtr& e
void TStatisticsAggregator::Handle(TEvPrivate::TEvFastPropagateCheck::TPtr&) {
SA_LOG_D("[" << TabletID() << "] EvFastPropagateCheck");

if (!EnableStatistics) {
return;
}

PropagateFastStatistics();

FastCheckInFlight = false;
Expand All @@ -162,7 +199,9 @@ void TStatisticsAggregator::Handle(TEvPrivate::TEvFastPropagateCheck::TPtr&) {
void TStatisticsAggregator::Handle(TEvPrivate::TEvPropagate::TPtr&) {
SA_LOG_D("[" << TabletID() << "] EvPropagate");

PropagateStatistics();
if (EnableStatistics) {
PropagateStatistics();
}

Schedule(PropagateInterval, new TEvPrivate::TEvPropagate());
}
Expand All @@ -176,10 +215,10 @@ void TStatisticsAggregator::ProcessRequests(TNodeId nodeId, const std::vector<TS
for (const auto& ssId : ssIds) {
FastSchemeShards.insert(ssId);
}
if (!FastCheckInFlight) {
Schedule(TDuration::MilliSeconds(100), new TEvPrivate::TEvFastPropagateCheck());
FastCheckInFlight = true;
}
}
if (!FastCheckInFlight) {
Schedule(TDuration::MilliSeconds(100), new TEvPrivate::TEvFastPropagateCheck());
FastCheckInFlight = true;
}
}

Expand Down Expand Up @@ -247,6 +286,10 @@ void TStatisticsAggregator::PropagateFastStatistics() {
void TStatisticsAggregator::PropagateStatisticsImpl(
const std::vector<TNodeId>& nodeIds, const std::vector<TSSId>& ssIds)
{
if (nodeIds.empty() || ssIds.empty()) {
return;
}

TNodeId leadingNodeId = nodeIds[0];

for (size_t index = 0; index < ssIds.size(); ) {
Expand Down
15 changes: 13 additions & 2 deletions ydb/core/statistics/aggregator/aggregator_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@
#include <ydb/core/statistics/common.h>
#include <ydb/core/statistics/events.h>

#include <ydb/core/cms/console/configs_dispatcher.h>
#include <ydb/core/cms/console/console.h>

#include <ydb/core/tablet_flat/tablet_flat_executed.h>

#include <random>
Expand Down Expand Up @@ -53,10 +56,14 @@ class TStatisticsAggregator : public TActor<TStatisticsAggregator>, public NTabl
void OnActivateExecutor(const TActorContext& ctx) override;
void DefaultSignalTabletActive(const TActorContext& ctx) override;
bool OnRenderAppHtmlPage(NMon::TEvRemoteHttpInfo::TPtr ev, const TActorContext &ctx) override;
void SubscribeForConfigChanges(const TActorContext& ctx);

NTabletFlatExecutor::ITransaction* CreateTxInitSchema();
NTabletFlatExecutor::ITransaction* CreateTxInit();

void HandleConfig(NConsole::TEvConfigsDispatcher::TEvSetConfigSubscriptionResponse::TPtr& ev);
void HandleConfig(NConsole::TEvConsole::TEvConfigNotificationRequest::TPtr& ev);

void Handle(TEvStatistics::TEvConfigureAggregator::TPtr& ev);
void Handle(TEvStatistics::TEvSchemeShardStats::TPtr& ev);
void Handle(TEvPrivate::TEvPropagate::TPtr& ev);
Expand All @@ -76,11 +83,13 @@ class TStatisticsAggregator : public TActor<TStatisticsAggregator>, public NTabl
void PersistSysParam(NIceDb::TNiceDb& db, ui64 id, const TString& value);

STFUNC(StateInit) {
StateInitImpl(ev,SelfId());
StateInitImpl(ev, SelfId());
}

STFUNC(StateWork) {
switch(ev->GetTypeRewrite()) {
hFunc(NConsole::TEvConfigsDispatcher::TEvSetConfigSubscriptionResponse, HandleConfig)
hFunc(NConsole::TEvConsole::TEvConfigNotificationRequest, HandleConfig)
hFunc(TEvStatistics::TEvConfigureAggregator, Handle);
hFunc(TEvStatistics::TEvSchemeShardStats, Handle);
hFunc(TEvPrivate::TEvPropagate, Handle);
Expand All @@ -103,10 +112,12 @@ class TStatisticsAggregator : public TActor<TStatisticsAggregator>, public NTabl

std::mt19937_64 RandomGenerator;

bool EnableStatistics = false;

static constexpr size_t StatsOptimizeFirstNodesCount = 3; // optimize first nodes - fast propagation
static constexpr size_t StatsSizeLimitBytes = 2 << 20; // limit for stats size in one message

TDuration PropagateInterval = TDuration::Minutes(3);
TDuration PropagateInterval;
bool IsPropagateInFlight = false; // is slow propagation started

std::unordered_map<TSSId, TString> BaseStats; // schemeshard id -> serialized stats for all paths
Expand Down
7 changes: 7 additions & 0 deletions ydb/core/statistics/aggregator/tx_init.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
#include "aggregator_impl.h"

#include <ydb/core/base/appdata_fwd.h>
#include <ydb/core/base/feature_flags.h>

namespace NKikimr::NStat {

struct TStatisticsAggregator::TTxInit : public TTxBase {
Expand Down Expand Up @@ -82,6 +85,10 @@ struct TStatisticsAggregator::TTxInit : public TTxBase {
SA_LOG_D("[" << Self->TabletID() << "] TTxInit::Complete");

Self->SignalTabletActive(ctx);

Self->EnableStatistics = AppData(ctx)->FeatureFlags.GetEnableStatistics();
Self->SubscribeForConfigChanges(ctx);

Self->Become(&TThis::StateWork);
}
};
Expand Down
7 changes: 7 additions & 0 deletions ydb/core/statistics/events.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ struct TEvStatistics {
EvConnectNode,
EvRequestStats,
EvPropagateStatistics,
EvStatisticsIsDisabled,

EvEnd
};
Expand Down Expand Up @@ -108,6 +109,12 @@ struct TEvStatistics {
NKikimrStat::TEvPropagateStatistics,
EvPropagateStatistics>
{};

struct TEvStatisticsIsDisabled : public TEventPB<
TEvStatisticsIsDisabled,
NKikimrStat::TEvStatisticsIsDisabled,
EvStatisticsIsDisabled>
{};
};

} // NStat
Expand Down
76 changes: 46 additions & 30 deletions ydb/core/statistics/stat_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ class TStatService : public TActorBootstrapped<TStatService> {
hFunc(TEvStatistics::TEvPropagateStatistics, Handle);
hFunc(TEvTabletPipe::TEvClientConnected, Handle);
hFunc(TEvTabletPipe::TEvClientDestroyed, Handle);
hFunc(TEvStatistics::TEvStatisticsIsDisabled, Handle);
cFunc(TEvents::TEvPoison::EventType, PassAway);
default:
LOG_CRIT_S(TlsActivationContext->AsActorContext(), NKikimrServices::STATISTICS,
Expand All @@ -51,20 +52,21 @@ class TStatService : public TActorBootstrapped<TStatService> {
}

private:
bool IsSAUnavailable() {
return ResolveSAStage == RSA_FINISHED && StatisticsAggregatorId == 0;
}

void HandleConfig(NConsole::TEvConfigsDispatcher::TEvSetConfigSubscriptionResponse::TPtr&) {
LOG_INFO_S(TlsActivationContext->AsActorContext(), NKikimrServices::STATISTICS,
"Subscribed for config changes");
"Subscribed for config changes on node " << SelfId().NodeId());
}

void HandleConfig(NConsole::TEvConsole::TEvConfigNotificationRequest::TPtr& ev) {
const auto& record = ev->Get()->Record;
const auto& featureFlags = record.GetConfig().GetFeatureFlags();
EnableStatistics = featureFlags.GetEnableStatistics();

const auto& config = record.GetConfig();
if (config.HasFeatureFlags()) {
const auto& featureFlags = config.GetFeatureFlags();
EnableStatistics = featureFlags.GetEnableStatistics();
if (!EnableStatistics) {
ReplyAllFailed();
}
}
auto response = std::make_unique<NConsole::TEvConsole::TEvConfigNotificationResponse>(record);
Send(ev->Sender, response.release(), 0, ev->Cookie);
}
Expand All @@ -77,7 +79,7 @@ class TStatService : public TActorBootstrapped<TStatService> {
request.EvCookie = ev->Cookie;
request.StatRequests.swap(ev->Get()->StatRequests);

if (!EnableStatistics || IsSAUnavailable()) {
if (!EnableStatistics) {
ReplyFailed(requestId, true);
return;
}
Expand Down Expand Up @@ -106,12 +108,12 @@ class TStatService : public TActorBootstrapped<TStatService> {
auto& entry = navigate->ResultSet.back();
if (entry.Status != TNavigate::EStatus::Ok) {
StatisticsAggregatorId = 0;
} else {
} else if (entry.DomainInfo->Params.HasStatisticsAggregator()) {
StatisticsAggregatorId = entry.DomainInfo->Params.GetStatisticsAggregator();
}
ResolveSAStage = RSA_FINISHED;
ResolveSAStage = StatisticsAggregatorId ? RSA_FINISHED : RSA_INITIAL;

if (StatisticsAggregatorId != 0) {
if (StatisticsAggregatorId) {
ConnectToSA();
SyncNode();
} else {
Expand All @@ -127,15 +129,15 @@ class TStatService : public TActorBootstrapped<TStatService> {
}
auto& request = itRequest->second;

if (!EnableStatistics || IsSAUnavailable()) {
if (!EnableStatistics) {
ReplyFailed(requestId, true);
return;
}

std::unordered_set<ui64> ssIds;
bool isServerless = false;
ui64 aggregatorId = 0;
TPathId resourcesDomainKey;
TPathId domainKey, resourcesDomainKey;
for (const auto& entry : navigate->ResultSet) {
if (entry.Status != TNavigate::EStatus::Ok) {
continue;
Expand All @@ -144,6 +146,7 @@ class TStatService : public TActorBootstrapped<TStatService> {
ssIds.insert(domainInfo->ExtractSchemeShard());
aggregatorId = domainInfo->Params.GetStatisticsAggregator();
isServerless = domainInfo->IsServerless();
domainKey = domainInfo->DomainKey;
resourcesDomainKey = domainInfo->ResourcesDomainKey;
}
if (ssIds.size() != 1) {
Expand All @@ -157,22 +160,31 @@ class TStatService : public TActorBootstrapped<TStatService> {
return;
}

auto navigateDomainKey = [this] (TPathId domainKey) {
using TNavigate = NSchemeCache::TSchemeCacheNavigate;
auto navigate = std::make_unique<TNavigate>();
auto& entry = navigate->ResultSet.emplace_back();
entry.TableId = TTableId(domainKey.OwnerId, domainKey.LocalPathId);
entry.Operation = TNavigate::EOp::OpPath;
entry.RequestType = TNavigate::TEntry::ERequestType::ByTableId;
entry.RedirectRequired = false;
navigate->Cookie = ResolveSACookie;
Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvNavigateKeySet(navigate.release()));
ResolveSAStage = RSA_IN_FLIGHT;
};

switch (ResolveSAStage) {
case RSA_NOT_RUN:
case RSA_INITIAL:
if (!isServerless) {
StatisticsAggregatorId = aggregatorId;
ResolveSAStage = RSA_FINISHED;
if (aggregatorId) {
StatisticsAggregatorId = aggregatorId;
ResolveSAStage = RSA_FINISHED;
} else {
navigateDomainKey(domainKey);
return;
}
} else {
using TNavigate = NSchemeCache::TSchemeCacheNavigate;
auto navigate = std::make_unique<TNavigate>();
auto& entry = navigate->ResultSet.emplace_back();
entry.TableId = TTableId(resourcesDomainKey.OwnerId, resourcesDomainKey.LocalPathId);
entry.Operation = TNavigate::EOp::OpPath;
entry.RequestType = TNavigate::TEntry::ERequestType::ByTableId;
entry.RedirectRequired = false;
navigate->Cookie = ResolveSACookie;
Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvNavigateKeySet(navigate.release()));
ResolveSAStage = RSA_IN_FLIGHT;
navigateDomainKey(resourcesDomainKey);
return;
}
break;
Expand All @@ -182,7 +194,7 @@ class TStatService : public TActorBootstrapped<TStatService> {
break;
}

if (IsSAUnavailable()) {
if (!StatisticsAggregatorId) {
ReplyFailed(requestId, true);
return;
}
Expand Down Expand Up @@ -303,6 +315,10 @@ class TStatService : public TActorBootstrapped<TStatService> {
SyncNode();
}

void Handle(TEvStatistics::TEvStatisticsIsDisabled::TPtr&) {
ReplyAllFailed();
}

void ConnectToSA() {
if (SAPipeClientId || !StatisticsAggregatorId) {
return;
Expand Down Expand Up @@ -465,11 +481,11 @@ class TStatService : public TActorBootstrapped<TStatService> {

static const ui64 ResolveSACookie = std::numeric_limits<ui64>::max();
enum EResolveSAStage {
RSA_NOT_RUN,
RSA_INITIAL,
RSA_IN_FLIGHT,
RSA_FINISHED
};
EResolveSAStage ResolveSAStage = RSA_NOT_RUN;
EResolveSAStage ResolveSAStage = RSA_INITIAL;
};

THolder<IActor> CreateStatService() {
Expand Down
Loading

0 comments on commit 28695f6

Please sign in to comment.