Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

YQ-3345 support load cpu threshold #6790

183 changes: 58 additions & 125 deletions ydb/core/fq/libs/compute/ydb/control_plane/database_monitoring.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
#include <ydb/core/fq/libs/compute/ydb/events/events.h>
#include <ydb/core/fq/libs/control_plane_storage/util.h>

#include <ydb/core/kqp/workload_service/common/cpu_quota_manager.h>

#include <ydb/library/services/services.pb.h>

#include <ydb/library/security/ydb_credentials_provider_factory.h>
Expand All @@ -24,17 +26,9 @@ namespace NFq {
class TComputeDatabaseMonitoringActor : public NActors::TActorBootstrapped<TComputeDatabaseMonitoringActor> {
struct TCounters {
::NMonitoring::TDynamicCounterPtr Counters;
struct TCommonMetrics {
::NMonitoring::TDynamicCounters::TCounterPtr Ok;
::NMonitoring::TDynamicCounters::TCounterPtr Error;
::NMonitoring::THistogramPtr LatencyMs;
};

TCommonMetrics CpuLoadRequest;
::NMonitoring::TDynamicCounters::TCounterPtr InstantLoadPercentage;
::NMonitoring::TDynamicCounters::TCounterPtr AverageLoadPercentage;
::NMonitoring::TDynamicCounters::TCounterPtr QuotedLoadPercentage;
::NMonitoring::TDynamicCounters::TCounterPtr AvailableLoadPercentage;
::NMonitoring::TDynamicCounterPtr SubComponent;

::NMonitoring::THistogramPtr CpuLoadRequestLatencyMs;
::NMonitoring::TDynamicCounters::TCounterPtr TargetLoadPercentage;
::NMonitoring::TDynamicCounters::TCounterPtr PendingQueueSize;
::NMonitoring::TDynamicCounters::TCounterPtr PendingQueueOverload;
Expand All @@ -48,21 +42,11 @@ class TComputeDatabaseMonitoringActor : public NActors::TActorBootstrapped<TComp
private:
void Register() {
::NMonitoring::TDynamicCounterPtr component = Counters->GetSubgroup("component", "ComputeDatabaseMonitoring");
auto subComponent = component->GetSubgroup("subcomponent", "CpuLoadRequest");
RegisterCommonMetrics(CpuLoadRequest, subComponent);
InstantLoadPercentage = subComponent->GetCounter("InstantLoadPercentage", false);
AverageLoadPercentage = subComponent->GetCounter("AverageLoadPercentage", false);
QuotedLoadPercentage = subComponent->GetCounter("QuotedLoadPercentage", false);
AvailableLoadPercentage = subComponent->GetCounter("AvailableLoadPercentage", false);
TargetLoadPercentage = subComponent->GetCounter("TargetLoadPercentage", false);
PendingQueueSize = subComponent->GetCounter("PendingQueueSize", false);
PendingQueueOverload = subComponent->GetCounter("PendingQueueOverload", true);
}

void RegisterCommonMetrics(TCommonMetrics& metrics, ::NMonitoring::TDynamicCounterPtr subComponent) {
metrics.Ok = subComponent->GetCounter("Ok", true);
metrics.Error = subComponent->GetCounter("Error", true);
metrics.LatencyMs = subComponent->GetHistogram("LatencyMs", GetLatencyHistogramBuckets());
SubComponent = component->GetSubgroup("subcomponent", "CpuLoadRequest");
CpuLoadRequestLatencyMs = SubComponent->GetHistogram("LatencyMs", GetLatencyHistogramBuckets());
TargetLoadPercentage = SubComponent->GetCounter("TargetLoadPercentage", false);
PendingQueueSize = SubComponent->GetCounter("PendingQueueSize", false);
PendingQueueOverload = SubComponent->GetCounter("PendingQueueOverload", true);
}

static ::NMonitoring::IHistogramCollectorPtr GetLatencyHistogramBuckets() {
Expand All @@ -75,15 +59,19 @@ class TComputeDatabaseMonitoringActor : public NActors::TActorBootstrapped<TComp
TComputeDatabaseMonitoringActor(const TActorId& monitoringClientActorId, NFq::NConfig::TLoadControlConfig config, const ::NMonitoring::TDynamicCounterPtr& counters)
: MonitoringClientActorId(monitoringClientActorId)
, Counters(counters)
, MonitoringRequestDelay(GetDuration(config.GetMonitoringRequestDelay(), TDuration::Seconds(1)))
, AverageLoadInterval(std::max<TDuration>(GetDuration(config.GetAverageLoadInterval(), TDuration::Seconds(10)), TDuration::Seconds(1)))
, MaxClusterLoad(std::min<ui32>(config.GetMaxClusterLoadPercentage(), 100) / 100.0)
, DefaultQueryLoad(config.GetDefaultQueryLoadPercentage() ? std::min<ui32>(config.GetDefaultQueryLoadPercentage(), 100) / 100.0 : 0.1)
, PendingQueueSize(config.GetPendingQueueSize())
, Strict(config.GetStrict())
, CpuNumber(config.GetCpuNumber())
, CpuQuotaManager(
GetDuration(config.GetMonitoringRequestDelay(), TDuration::Seconds(1)),
std::max<TDuration>(GetDuration(config.GetAverageLoadInterval(), TDuration::Seconds(10)), TDuration::Seconds(1)),
TDuration::Zero(),
config.GetDefaultQueryLoadPercentage() ? std::min<ui32>(config.GetDefaultQueryLoadPercentage(), 100) / 100.0 : 0.1,
config.GetStrict(),
config.GetCpuNumber(),
Counters.SubComponent
)
{
*Counters.AvailableLoadPercentage = 100;
*Counters.TargetLoadPercentage = static_cast<ui64>(MaxClusterLoad * 100);
}

Expand All @@ -105,54 +93,29 @@ class TComputeDatabaseMonitoringActor : public NActors::TActorBootstrapped<TComp
)

void Handle(TEvYdbCompute::TEvCpuLoadRequest::TPtr& ev) {
auto response = std::make_unique<TEvYdbCompute::TEvCpuLoadResponse>(InstantLoad, AverageLoad);
if (!Ready) {
auto response = std::make_unique<TEvYdbCompute::TEvCpuLoadResponse>(CpuQuotaManager.GetInstantLoad(), CpuQuotaManager.GetAverageLoad());
if (!CpuQuotaManager.CheckLoadIsOutdated()) {
response->Issues.AddIssue("CPU Load is unavailable");
}
Send(ev->Sender, response.release(), 0, ev->Cookie);
}

void Handle(TEvYdbCompute::TEvCpuLoadResponse::TPtr& ev) {
const auto& response = *ev.Get()->Get();

auto now = TInstant::Now();
if (!response.Issues) {
auto delta = now - LastCpuLoad;
LastCpuLoad = now;

if (response.CpuNumber) {
CpuNumber = response.CpuNumber;
}

InstantLoad = response.InstantLoad;
// exponential moving average
if (!Ready || delta >= AverageLoadInterval) {
AverageLoad = InstantLoad;
QuotedLoad = InstantLoad;
} else {
auto ratio = static_cast<double>(delta.GetValue()) / AverageLoadInterval.GetValue();
AverageLoad = (1 - ratio) * AverageLoad + ratio * InstantLoad;
QuotedLoad = (1 - ratio) * QuotedLoad + ratio * InstantLoad;
}
Ready = true;
Counters.CpuLoadRequest.Ok->Inc();
*Counters.InstantLoadPercentage = static_cast<ui64>(InstantLoad * 100);
*Counters.AverageLoadPercentage = static_cast<ui64>(AverageLoad * 100);
CheckPendingQueue();
*Counters.QuotedLoadPercentage = static_cast<ui64>(QuotedLoad * 100);
} else {
if (response.Issues) {
LOG_E("CPU Load Request FAILED: " << response.Issues.ToOneLineString());
Counters.CpuLoadRequest.Error->Inc();
CheckLoadIsOutdated();
}
Counters.CpuLoadRequest.LatencyMs->Collect((now - StartCpuLoad).MilliSeconds());
Counters.CpuLoadRequestLatencyMs->Collect((TInstant::Now() - StartCpuLoad).MilliSeconds());

CpuQuotaManager.UpdateCpuLoad(response.InstantLoad, response.CpuNumber, !response.Issues);
CheckPendingQueue();

// TODO: make load pulling reactive
// 1. Long period (i.e. AverageLoadInterval/2) when idle (no requests)
// 2. Active pulling when busy

if (MonitoringRequestDelay) {
Schedule(MonitoringRequestDelay, new NActors::TEvents::TEvWakeup());
if (auto delay = CpuQuotaManager.GetMonitoringRequestDelay()) {
Schedule(delay, new NActors::TEvents::TEvWakeup());
} else {
SendCpuLoadRequest();
}
Expand All @@ -164,48 +127,24 @@ class TComputeDatabaseMonitoringActor : public NActors::TActorBootstrapped<TComp
if (request.Quota > 1.0) {
Send(ev->Sender, new TEvYdbCompute::TEvCpuQuotaResponse(-1, NYdb::EStatus::OVERLOADED, NYql::TIssues{NYql::TIssue{TStringBuilder{} << "Incorrect quota value (exceeds 1.0) " << request.Quota}}), 0, ev->Cookie);
} else {
if (!request.Quota) {
request.Quota = DefaultQueryLoad;
}
CheckLoadIsOutdated();
if (MaxClusterLoad > 0.0 && ((!Ready && Strict) || QuotedLoad >= MaxClusterLoad)) {
if (PendingQueue.size() >= PendingQueueSize) {
Send(ev->Sender, new TEvYdbCompute::TEvCpuQuotaResponse(-1, NYdb::EStatus::OVERLOADED, NYql::TIssues{
NYql::TIssue{TStringBuilder{}
<< "Cluster is overloaded, current quoted load " << static_cast<ui64>(QuotedLoad * 100)
<< "%, average load " << static_cast<ui64>(AverageLoad * 100) << "%"
}}), 0, ev->Cookie);
auto response = CpuQuotaManager.RequestCpuQuota(request.Quota, MaxClusterLoad);
CheckPendingQueue();
if (response.Status == NYdb::EStatus::OVERLOADED && PendingQueue.size() < PendingQueueSize) {
PendingQueue.push(ev);
Counters.PendingQueueSize->Inc();
} else {
if (response.Status == NYdb::EStatus::OVERLOADED) {
Counters.PendingQueueOverload->Inc();
} else {
PendingQueue.push(ev);
Counters.PendingQueueSize->Inc();
}
} else {
QuotedLoad += request.Quota;
*Counters.QuotedLoadPercentage = static_cast<ui64>(QuotedLoad * 100);
Send(ev->Sender, new TEvYdbCompute::TEvCpuQuotaResponse(QuotedLoad * 100), 0, ev->Cookie);
Send(ev->Sender, new TEvYdbCompute::TEvCpuQuotaResponse(response.CurrentLoad, response.Status, response.Issues), 0, ev->Cookie);
}
}
}

void Handle(TEvYdbCompute::TEvCpuQuotaAdjust::TPtr& ev) {
if (CpuNumber) {
auto& request = *ev.Get()->Get();
if (request.Duration && request.Duration < AverageLoadInterval / 2 && request.Quota <= 1.0) {
auto load = (request.CpuSecondsConsumed * 1000 / request.Duration.MilliSeconds()) / CpuNumber;
auto quota = request.Quota ? request.Quota : DefaultQueryLoad;
if (quota > load) {
auto adjustment = (quota - load) / 2;
if (QuotedLoad > adjustment) {
QuotedLoad -= adjustment;
} else {
QuotedLoad = 0.0;
}
CheckPendingQueue();
*Counters.QuotedLoadPercentage = static_cast<ui64>(QuotedLoad * 100);
}
}
}
auto& request = *ev.Get()->Get();
CpuQuotaManager.AdjustCpuQuota(request.Quota, request.Duration, request.CpuSecondsConsumed);
CheckPendingQueue();
}

void SendCpuLoadRequest() {
Expand All @@ -215,57 +154,51 @@ class TComputeDatabaseMonitoringActor : public NActors::TActorBootstrapped<TComp

void CheckLoadIsOutdated() {
// TODO: support timeout to decline quota after request pending time is over, not load info
if (TInstant::Now() - LastCpuLoad > AverageLoadInterval) {
Ready = false;
QuotedLoad = 0.0;
if (Strict) {
while (PendingQueue.size()) {
auto& ev = PendingQueue.front();
Send(ev->Sender, new TEvYdbCompute::TEvCpuQuotaResponse(-1, NYdb::EStatus::OVERLOADED, NYql::TIssues{NYql::TIssue{TStringBuilder{} << "Cluster load info is not available"}}), 0, ev->Cookie);
PendingQueue.pop();
Counters.PendingQueueSize->Dec();
}
if (Strict && !CpuQuotaManager.CheckLoadIsOutdated()) {
while (PendingQueue.size()) {
auto& ev = PendingQueue.front();
Send(ev->Sender, new TEvYdbCompute::TEvCpuQuotaResponse(-1, NYdb::EStatus::OVERLOADED, NYql::TIssues{NYql::TIssue{TStringBuilder{} << "Cluster load info is not available"}}), 0, ev->Cookie);
PendingQueue.pop();
Counters.PendingQueueSize->Dec();
}
}
}

void CheckPendingQueue() {
CheckLoadIsOutdated();

auto now = TInstant::Now();
while (QuotedLoad < MaxClusterLoad && PendingQueue.size()) {
while (PendingQueue.size()) {
auto& ev = PendingQueue.front();
auto& request = *ev.Get()->Get();
if (request.Deadline && now >= request.Deadline) {
Send(ev->Sender, new TEvYdbCompute::TEvCpuQuotaResponse(-1, NYdb::EStatus::CANCELLED, NYql::TIssues{
NYql::TIssue{TStringBuilder{} << "Deadline reached " << request.Deadline}}), 0, ev->Cookie);
} else {
QuotedLoad += request.Quota;
Send(ev->Sender, new TEvYdbCompute::TEvCpuQuotaResponse(QuotedLoad * 100), 0, ev->Cookie);
auto response = CpuQuotaManager.RequestCpuQuota(request.Quota, MaxClusterLoad);
if (response.Status == NYdb::EStatus::OVERLOADED) {
break;
}

Send(ev->Sender, new TEvYdbCompute::TEvCpuQuotaResponse(response.CurrentLoad, response.Status, response.Issues), 0, ev->Cookie);
}

PendingQueue.pop();
Counters.PendingQueueSize->Dec();
}
}

private:
TInstant StartCpuLoad;
TInstant LastCpuLoad;
TActorId MonitoringClientActorId;
TCounters Counters;

double InstantLoad = 0.0;
double AverageLoad = 0.0;
double QuotedLoad = 0.0;
bool Ready = false;

const TDuration MonitoringRequestDelay;
const TDuration AverageLoadInterval;
const double MaxClusterLoad;
const double DefaultQueryLoad;
const ui32 PendingQueueSize;
const bool Strict;
ui32 CpuNumber = 0;

NKikimr::NKqp::NWorkload::TCpuQuotaManager CpuQuotaManager;
TQueue<TEvYdbCompute::TEvCpuQuotaRequest::TPtr> PendingQueue;

TInstant StartCpuLoad;
};

std::unique_ptr<NActors::IActor> CreateDatabaseMonitoringActor(const NActors::TActorId& monitoringClientActorId, NFq::NConfig::TLoadControlConfig config, const ::NMonitoring::TDynamicCounterPtr& counters) {
Expand Down
1 change: 1 addition & 0 deletions ydb/core/fq/libs/compute/ydb/control_plane/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ PEERDIR(
ydb/core/fq/libs/compute/ydb/synchronization_service
ydb/core/fq/libs/control_plane_storage/proto
ydb/core/fq/libs/quota_manager/proto
ydb/core/kqp/workload_service/common
ydb/core/protos
ydb/library/db_pool/protos
ydb/library/yql/public/issue
Expand Down
6 changes: 5 additions & 1 deletion ydb/core/kqp/common/events/workload_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,15 +41,19 @@ struct TEvContinueRequest : public NActors::TEventLocal<TEvContinueRequest, TKqp
};

struct TEvCleanupRequest : public NActors::TEventLocal<TEvCleanupRequest, TKqpWorkloadServiceEvents::EvCleanupRequest> {
TEvCleanupRequest(const TString& database, const TString& sessionId, const TString& poolId)
TEvCleanupRequest(const TString& database, const TString& sessionId, const TString& poolId, TDuration duration, TDuration cpuConsumed)
: Database(database)
, SessionId(sessionId)
, PoolId(poolId)
, Duration(duration)
, CpuConsumed(cpuConsumed)
{}

const TString Database;
const TString SessionId;
const TString PoolId;
const TDuration Duration;
const TDuration CpuConsumed;
};

struct TEvCleanupResponse : public NActors::TEventLocal<TEvCleanupResponse, TKqpWorkloadServiceEvents::EvCleanupResponse> {
Expand Down
10 changes: 8 additions & 2 deletions ydb/core/kqp/gateway/behaviour/resource_pool/manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -138,8 +138,14 @@ void FillResourcePoolDescription(NKikimrSchemeOp::TResourcePoolDescription& reso
}

if (settings.GetObjectId() == NResourcePool::DEFAULT_POOL_ID) {
if (properties.contains("concurrent_query_limit")) {
ythrow yexception() << "Can not change property concurrent_query_limit for default pool";
std::vector<TString> forbiddenProperties = {
"concurrent_query_limit",
"database_load_cpu_threshold"
};
for (const TString& property : forbiddenProperties) {
if (properties.contains(property)) {
ythrow yexception() << "Can not change property " << property << " for default pool";
}
}
}
}
Expand Down
9 changes: 8 additions & 1 deletion ydb/core/kqp/session_actor/kqp_session_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2069,8 +2069,15 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
}
CleanupCtx->Final = isFinal;
CleanupCtx->IsWaitingForWorkloadServiceCleanup = true;

const auto& stats = QueryState->QueryStats;
auto event = std::make_unique<NWorkload::TEvCleanupRequest>(
QueryState->Database, SessionId, QueryState->UserRequestContext->PoolId,
TDuration::MicroSeconds(stats.DurationUs), TDuration::MicroSeconds(stats.WorkerCpuTimeUs)
);

auto forwardId = MakeKqpWorkloadServiceId(SelfId().NodeId());
Send(new IEventHandle(*QueryState->PoolHandlerActor, SelfId(), new NWorkload::TEvCleanupRequest(QueryState->Database, SessionId, QueryState->UserRequestContext->PoolId), IEventHandle::FlagForwardOnNondelivery, 0, &forwardId));
Send(new IEventHandle(*QueryState->PoolHandlerActor, SelfId(), event.release(), IEventHandle::FlagForwardOnNondelivery, 0, &forwardId));
QueryState->PoolHandlerActor = Nothing();
}

Expand Down
3 changes: 3 additions & 0 deletions ydb/core/kqp/workload_service/actors/actors.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,7 @@ NActors::IActor* CreatePoolResolverActor(TEvPlaceRequestIntoPool::TPtr event, bo
NActors::IActor* CreatePoolFetcherActor(const NActors::TActorId& replyActorId, const TString& database, const TString& poolId, TIntrusiveConstPtr<NACLib::TUserToken> userToken);
NActors::IActor* CreatePoolCreatorActor(const NActors::TActorId& replyActorId, const TString& database, const TString& poolId, const NResourcePool::TPoolSettings& poolConfig, TIntrusiveConstPtr<NACLib::TUserToken> userToken, NACLibProto::TDiffACL diffAcl);

// Cpu load fetcher actor
NActors::IActor* CreateCpuLoadFetcherActor(const NActors::TActorId& replyActorId);

} // NKikimr::NKqp::NWorkload
Loading
Loading