Skip to content

Commit

Permalink
Fixed health check for multi tenant mode
Browse files Browse the repository at this point in the history
  • Loading branch information
GrigoriyPA committed Feb 11, 2025
1 parent c166bec commit 345d224
Show file tree
Hide file tree
Showing 9 changed files with 109 additions and 34 deletions.
12 changes: 8 additions & 4 deletions ydb/core/kqp/rm_service/kqp_rm_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -401,8 +401,10 @@ class TKqpResourceManager : public IKqpResourceManager {
TVector<NKikimrKqp::TKqpNodeResources> GetClusterResources() const override {
TVector<NKikimrKqp::TKqpNodeResources> resources;
std::shared_ptr<TVector<NKikimrKqp::TKqpNodeResources>> infos;
with_lock (ResourceSnapshotState->Lock) {
infos = ResourceSnapshotState->Snapshot;
if (ResourceSnapshotState) {
with_lock (ResourceSnapshotState->Lock) {
infos = ResourceSnapshotState->Snapshot;
}
}
if (infos != nullptr) {
resources = *infos;
Expand All @@ -414,8 +416,10 @@ class TKqpResourceManager : public IKqpResourceManager {
void RequestClusterResourcesInfo(TOnResourcesSnapshotCallback&& callback) override {
LOG_AS_D("Schedule Snapshot request");
std::shared_ptr<TVector<NKikimrKqp::TKqpNodeResources>> infos;
with_lock (ResourceSnapshotState->Lock) {
infos = ResourceSnapshotState->Snapshot;
if (ResourceSnapshotState) {
with_lock (ResourceSnapshotState->Lock) {
infos = ResourceSnapshotState->Snapshot;
}
}
TVector<NKikimrKqp::TKqpNodeResources> resources;
if (infos != nullptr) {
Expand Down
3 changes: 2 additions & 1 deletion ydb/core/kqp/workload_service/common/helpers.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,10 +83,11 @@ class TSchemeActorBase : public NActors::TActorBootstrapped<TDerived> {

private:
static TRetryPolicy::IRetryState::TPtr CreateRetryState() {
return TRetryPolicy::GetFixedIntervalPolicy(
return TRetryPolicy::GetExponentialBackoffPolicy(
[](bool longDelay){return longDelay ? ERetryErrorClass::LongRetry : ERetryErrorClass::ShortRetry;}
, TDuration::MilliSeconds(100)
, TDuration::MilliSeconds(500)
, TDuration::Seconds(1)
, 100
)->CreateRetryState();
}
Expand Down
3 changes: 2 additions & 1 deletion ydb/library/table_creator/table_creator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -380,10 +380,11 @@ using TTableCreatorRetryPolicy = IRetryPolicy<bool>;
}

static TTableCreatorRetryPolicy::IRetryState::TPtr CreateRetryState() {
return TTableCreatorRetryPolicy::GetFixedIntervalPolicy(
return TTableCreatorRetryPolicy::GetExponentialBackoffPolicy(
[](bool longDelay){return longDelay ? ERetryErrorClass::LongRetry : ERetryErrorClass::ShortRetry;}
, TDuration::MilliSeconds(100)
, TDuration::MilliSeconds(300)
, TDuration::Seconds(1)
, 100
)->CreateRetryState();
}
Expand Down
4 changes: 4 additions & 0 deletions ydb/tests/tools/kqprun/configuration/app_config.conf
Original file line number Diff line number Diff line change
Expand Up @@ -191,4 +191,8 @@ TableServiceConfig {
ResourceManager {
QueryMemoryLimit: 64424509440
}

WriteActorSettings {
MaxWriteAttempts: 1000
}
}
30 changes: 19 additions & 11 deletions ydb/tests/tools/kqprun/kqprun.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ struct TExecutionOptions {
bool UseTemplates = false;

ui32 LoopCount = 1;
TDuration QueryDelay;
TDuration LoopDelay;
bool ContinueAfterFail = false;

Expand Down Expand Up @@ -100,11 +101,12 @@ struct TExecutionOptions {
};
}

TRequestOptions GetScriptQueryOptions(size_t index, size_t queryId, TInstant startTime) const {
TRequestOptions GetScriptQueryOptions(size_t index, size_t loopId, size_t queryId, TInstant startTime) const {
Y_ABORT_UNLESS(index < ScriptQueries.size());

TString sql = ScriptQueries[index];
if (UseTemplates) {
SubstGlobal(sql, "${LOOP_ID}", ToString(loopId));
SubstGlobal(sql, "${QUERY_ID}", ToString(queryId));
}

Expand Down Expand Up @@ -270,12 +272,12 @@ struct TExecutionOptions {
};


void RunArgumentQuery(size_t index, size_t queryId, TInstant startTime, const TExecutionOptions& executionOptions, TKqpRunner& runner) {
void RunArgumentQuery(size_t index, size_t loopId, size_t queryId, TInstant startTime, const TExecutionOptions& executionOptions, TKqpRunner& runner) {
NColorizer::TColors colors = NColorizer::AutoColors(Cout);

switch (executionOptions.GetExecutionCase(index)) {
case TExecutionOptions::EExecutionCase::GenericScript: {
if (!runner.ExecuteScript(executionOptions.GetScriptQueryOptions(index, queryId, startTime))) {
if (!runner.ExecuteScript(executionOptions.GetScriptQueryOptions(index, loopId, queryId, startTime))) {
ythrow yexception() << TInstant::Now().ToIsoStringLocal() << " Script execution failed";
}
Cout << colors.Yellow() << TInstant::Now().ToIsoStringLocal() << " Fetching script results..." << colors.Default() << Endl;
Expand All @@ -292,21 +294,21 @@ void RunArgumentQuery(size_t index, size_t queryId, TInstant startTime, const TE
}

case TExecutionOptions::EExecutionCase::GenericQuery: {
if (!runner.ExecuteQuery(executionOptions.GetScriptQueryOptions(index, queryId, startTime))) {
if (!runner.ExecuteQuery(executionOptions.GetScriptQueryOptions(index, loopId, queryId, startTime))) {
ythrow yexception() << TInstant::Now().ToIsoStringLocal() << " Query execution failed";
}
break;
}

case TExecutionOptions::EExecutionCase::YqlScript: {
if (!runner.ExecuteYqlScript(executionOptions.GetScriptQueryOptions(index, queryId, startTime))) {
if (!runner.ExecuteYqlScript(executionOptions.GetScriptQueryOptions(index, loopId, queryId, startTime))) {
ythrow yexception() << TInstant::Now().ToIsoStringLocal() << " Yql script execution failed";
}
break;
}

case TExecutionOptions::EExecutionCase::AsyncQuery: {
runner.ExecuteQueryAsync(executionOptions.GetScriptQueryOptions(index, queryId, startTime));
runner.ExecuteQueryAsync(executionOptions.GetScriptQueryOptions(index, loopId, queryId, startTime));
break;
}
}
Expand All @@ -327,24 +329,25 @@ void RunArgumentQueries(const TExecutionOptions& executionOptions, TKqpRunner& r
const size_t numberLoops = executionOptions.LoopCount;
for (size_t queryId = 0; queryId < numberQueries * numberLoops || numberLoops == 0; ++queryId) {
size_t id = queryId % numberQueries;
if (id == 0 && queryId > 0) {
Sleep(executionOptions.LoopDelay);
if (queryId > 0) {
Sleep(id == 0 ? executionOptions.LoopDelay : executionOptions.QueryDelay);
}

const TInstant startTime = TInstant::Now();
const size_t loopId = queryId / numberQueries;
if (executionOptions.GetExecutionCase(id) != TExecutionOptions::EExecutionCase::AsyncQuery) {
Cout << colors.Yellow() << startTime.ToIsoStringLocal() << " Executing script";
if (numberQueries > 1) {
Cout << " " << id;
}
if (numberLoops != 1) {
Cout << ", loop " << queryId / numberQueries;
Cout << ", loop " << loopId;
}
Cout << "..." << colors.Default() << Endl;
}

try {
RunArgumentQuery(id, queryId, startTime, executionOptions, runner);
RunArgumentQuery(id, loopId, queryId, startTime, executionOptions, runner);
} catch (const yexception& exception) {
if (executionOptions.ContinueAfterFail) {
Cerr << colors.Red() << CurrentExceptionMessage() << colors.Default() << Endl;
Expand Down Expand Up @@ -713,6 +716,11 @@ class TMain : public TMainBase {
.DefaultValue(0)
.StoreMappedResultT<ui64>(&ExecutionOptions.LoopDelay, &TDuration::MilliSeconds<ui64>);

options.AddLongOption("query-delay", "Delay in milliseconds between queries starts")
.RequiredArgument("uint")
.DefaultValue(0)
.StoreMappedResultT<ui64>(&ExecutionOptions.QueryDelay, &TDuration::MilliSeconds<ui64>);

options.AddLongOption("continue-after-fail", "Don't not stop requests execution after fails")
.NoArgument()
.SetFlag(&ExecutionOptions.ContinueAfterFail);
Expand Down Expand Up @@ -751,7 +759,7 @@ class TMain : public TMainBase {

options.AddLongOption('H', "health-check", TStringBuilder() << "Level of health check before start (max level " << static_cast<ui32>(TYdbSetupSettings::EHealthCheck::Max) - 1 << ")")
.RequiredArgument("uint")
.DefaultValue(static_cast<ui8>(TYdbSetupSettings::EHealthCheck::NodesCount))
.DefaultValue(static_cast<ui8>(TYdbSetupSettings::EHealthCheck::FetchDatabase))
.StoreMappedResultT<ui8>(&RunnerOptions.YdbSettings.HealthCheckLevel, [](ui8 value) {
return static_cast<TYdbSetupSettings::EHealthCheck>(std::min(value, static_cast<ui8>(TYdbSetupSettings::EHealthCheck::Max)));
});
Expand Down
49 changes: 41 additions & 8 deletions ydb/tests/tools/kqprun/src/actors.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

#include <ydb/core/kqp/common/simple/services.h>
#include <ydb/core/kqp/rm_service/kqp_rm_service.h>

#include <ydb/core/kqp/workload_service/actors/actors.h>

namespace NKqpRun {

Expand Down Expand Up @@ -254,6 +254,8 @@ class TResourcesWaiterActor : public NActors::TActorBootstrapped<TResourcesWaite
void Bootstrap() {
Become(&TResourcesWaiterActor::StateFunc);

Schedule(Settings_.HealthCheckTimeout, new NActors::TEvents::TEvWakeup());

HealthCheckStage_ = EHealthCheck::NodesCount;
DoHealthCheck();
}
Expand All @@ -264,17 +266,26 @@ class TResourcesWaiterActor : public NActors::TActorBootstrapped<TResourcesWaite
return;
}

if (TInstant::Now() - StartTime_ >= Settings_.HealthCheckTimeout) {
FailTimeout();
return;
}

switch (HealthCheckStage_) {
case TYdbSetupSettings::EHealthCheck::NodesCount:
case EHealthCheck::NodesCount:
CheckResourcesPublish();
break;

case TYdbSetupSettings::EHealthCheck::ScriptRequest:
case EHealthCheck::FetchDatabase:
FetchDatabase();
break;

case EHealthCheck::ScriptRequest:
StartScriptQuery();
break;

case TYdbSetupSettings::EHealthCheck::None:
case TYdbSetupSettings::EHealthCheck::Max:
case EHealthCheck::None:
case EHealthCheck::Max:
Finish();
break;
}
Expand All @@ -283,14 +294,25 @@ class TResourcesWaiterActor : public NActors::TActorBootstrapped<TResourcesWaite
void Handle(TEvPrivate::TEvResourcesInfo::TPtr& ev) {
const auto nodeCount = ev->Get()->NodeCount;
if (nodeCount == Settings_.ExpectedNodeCount) {
HealthCheckStage_ = EHealthCheck::ScriptRequest;
HealthCheckStage_ = EHealthCheck::FetchDatabase;
DoHealthCheck();
return;
}

Retry(TStringBuilder() << "invalid node count, got " << nodeCount << ", expected " << Settings_.ExpectedNodeCount, true);
}

void Handle(NKikimr::NKqp::NWorkload::TEvFetchDatabaseResponse::TPtr& ev) {
const auto status = ev->Get()->Status;
if (status == Ydb::StatusIds::SUCCESS) {
HealthCheckStage_ = EHealthCheck::ScriptRequest;
DoHealthCheck();
return;
}

Retry(TStringBuilder() << "failed to fetch database with status " << status << ", reason:\n" << CoutColors_.Default() << ev->Get()->Issues.ToString(), true);
}

void Handle(NKikimr::NKqp::TEvKqp::TEvScriptResponse::TPtr& ev) {
const auto status = ev->Get()->Status;
if (status == Ydb::StatusIds::SUCCESS) {
Expand All @@ -304,6 +326,7 @@ class TResourcesWaiterActor : public NActors::TActorBootstrapped<TResourcesWaite
STRICT_STFUNC(StateFunc,
sFunc(NActors::TEvents::TEvWakeup, DoHealthCheck);
hFunc(TEvPrivate::TEvResourcesInfo, Handle);
hFunc(NKikimr::NKqp::NWorkload::TEvFetchDatabaseResponse, Handle);
hFunc(NKikimr::NKqp::TEvKqp::TEvScriptResponse, Handle);
)

Expand All @@ -324,6 +347,10 @@ class TResourcesWaiterActor : public NActors::TActorBootstrapped<TResourcesWaite
});
}

void FetchDatabase() {
Register(NKikimr::NKqp::NWorkload::CreateDatabaseFetcherActor(SelfId(), Settings_.Database));
}

void StartScriptQuery() {
auto event = MakeHolder<NKikimr::NKqp::TEvKqp::TEvScriptRequest>();
event->Record.SetUserToken(NACLib::TUserToken("", BUILTIN_ACL_ROOT, {}).SerializeAsString());
Expand All @@ -344,11 +371,12 @@ class TResourcesWaiterActor : public NActors::TActorBootstrapped<TResourcesWaite

if (auto delay = RetryState_->GetNextRetryDelay(shortRetry)) {
if (Settings_.VerboseLevel >= EVerbose::InitLogs) {
Cout << CoutColors_.Cyan() << "Retry in " << *delay << " " << message << CoutColors_.Default() << Endl;
const TString str = TStringBuilder() << CoutColors_.Cyan() << "Retry for database '" << Settings_.Database << "' in " << *delay << " " << message << CoutColors_.Default();
Cout << str << Endl;
}
Schedule(*delay, new NActors::TEvents::TEvWakeup());
} else {
Fail(TStringBuilder() << "Health check timeout " << Settings_.HealthCheckTimeout << " exceeded, use --health-check-timeout for increasing it or check out health check logs by using --verbose " << static_cast<ui32>(EVerbose::InitLogs));
FailTimeout();
}
}

Expand All @@ -357,6 +385,10 @@ class TResourcesWaiterActor : public NActors::TActorBootstrapped<TResourcesWaite
PassAway();
}

void FailTimeout() {
Fail(TStringBuilder() << "Health check timeout " << Settings_.HealthCheckTimeout << " exceeded for database '" << Settings_.Database << "', use --health-check-timeout for increasing it or check out health check logs by using --verbose " << static_cast<ui32>(EVerbose::InitLogs));
}

void Fail(const TString& error) {
Promise_.SetException(error);
PassAway();
Expand All @@ -368,6 +400,7 @@ class TResourcesWaiterActor : public NActors::TActorBootstrapped<TResourcesWaite

private:
const TWaitResourcesSettings Settings_;
const TInstant StartTime_ = TInstant::Now();
const NColorizer::TColors CoutColors_ = NColorizer::AutoColors(Cout);
const IRetryPolicy::TPtr RetryPolicy_;
IRetryPolicy::IRetryState::TPtr RetryState_ = nullptr;
Expand Down
1 change: 1 addition & 0 deletions ydb/tests/tools/kqprun/src/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ struct TYdbSetupSettings : public NKikimrRun::TServerSettings {
enum class EHealthCheck {
None,
NodesCount,
FetchDatabase,
ScriptRequest,
Max
};
Expand Down
1 change: 1 addition & 0 deletions ydb/tests/tools/kqprun/src/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ SRCS(
)

PEERDIR(
ydb/core/kqp/workload_service/actors
ydb/core/testlib

ydb/tests/tools/kqprun/runlib
Expand Down
40 changes: 31 additions & 9 deletions ydb/tests/tools/kqprun/src/ydb_setup.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ void FillQueryMeta(TQueryMeta& meta, const NKikimrKqp::TQueryResponse& response)

class TYdbSetup::TImpl {
using EVerbose = TYdbSetupSettings::EVerbose;
using EHealthCheck = TYdbSetupSettings::EHealthCheck;

private:
TAutoPtr<TLogBackend> CreateLogBackend() const {
Expand Down Expand Up @@ -399,19 +400,36 @@ class TYdbSetup::TImpl {
NYql::NLog::InitLogger(NActors::CreateNullBackend());
}

void WaitResourcesPublishing() const {
auto promise = NThreading::NewPromise();
NThreading::TFuture<void> RunHealthCheck(const TString& database) const {
EHealthCheck level = Settings_.HealthCheckLevel;
i32 nodesCount = Settings_.NodeCount;
if (database != Settings_.DomainName) {
nodesCount = Tenants_->Size(database);
} else if (StorageMeta_.TenantsSize() > 0) {
level = std::min(level, EHealthCheck::NodesCount);
}

const TWaitResourcesSettings settings = {
.ExpectedNodeCount = static_cast<i32>(Settings_.NodeCount),
.HealthCheckLevel = Settings_.HealthCheckLevel,
.ExpectedNodeCount = nodesCount,
.HealthCheckLevel = level,
.HealthCheckTimeout = Settings_.HealthCheckTimeout,
.VerboseLevel = Settings_.VerboseLevel,
.Database = NKikimr::CanonizePath(Settings_.DomainName)
.Database = NKikimr::CanonizePath(database)
};
GetRuntime()->Register(CreateResourcesWaiterActor(promise, settings), 0, GetRuntime()->GetAppData().SystemPoolId);
const auto promise = NThreading::NewPromise();
GetRuntime()->Register(CreateResourcesWaiterActor(promise, settings), GetNodeIndexForDatabase(database), GetRuntime()->GetAppData().SystemPoolId);

return promise.GetFuture();
}

void WaitResourcesPublishing() const {
std::vector<NThreading::TFuture<void>> futures(1, RunHealthCheck(Settings_.DomainName));
for (const auto& [tenantName, _] : StorageMeta_.GetTenants()) {
futures.emplace_back(RunHealthCheck(GetTenantPath(tenantName)));
}

try {
promise.GetFuture().GetValue(2 * Settings_.HealthCheckTimeout);
NThreading::WaitAll(futures).GetValue(2 * Settings_.HealthCheckTimeout);
} catch (...) {
ythrow yexception() << "Failed to initialize all resources: " << CurrentExceptionMessage();
}
Expand Down Expand Up @@ -628,7 +646,11 @@ class TYdbSetup::TImpl {
}

TString GetDatabasePath(const TString& database) const {
return NKikimr::CanonizePath(database ? database : GetDefaultDatabase());
const TString& result = NKikimr::CanonizePath(database ? database : GetDefaultDatabase());
if (StorageMeta_.TenantsSize() > 0 && result == NKikimr::CanonizePath(Settings_.DomainName)) {
ythrow yexception() << "Cannot use root domain '" << result << "' as request database then created additional tenants";
}
return result;
}

ui32 GetNodeIndexForDatabase(const TString& path) const {
Expand All @@ -653,7 +675,7 @@ class TYdbSetup::TImpl {
ythrow yexception() << "Can not choose default database, there is more than one tenants, please use `-D <database name>`";
}
if (StorageMeta_.TenantsSize() == 1) {
return StorageMeta_.GetTenants().begin()->first;
return GetTenantPath(StorageMeta_.GetTenants().begin()->first);
}
return Settings_.DomainName;
}
Expand Down

0 comments on commit 345d224

Please sign in to comment.