Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

YQ-4101 KqpRun fixed health check for multi tenant mode #14427

Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ydb/core/kqp/rm_service/kqp_rm_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@ class TKqpResourceManager : public IKqpResourceManager {
, ExecutionUnitsLimit(config.GetComputeActorsCount())
, SpillingPercent(config.GetSpillingPercent())
, TotalMemoryResource(MakeIntrusive<TMemoryResource>(config.GetQueryMemoryLimit(), (double)100, config.GetSpillingPercent()))
, ResourceSnapshotState(std::make_shared<TResourceSnapshotState>())
{
SetConfigValues(config);
}
Expand Down Expand Up @@ -195,7 +196,6 @@ class TKqpResourceManager : public IKqpResourceManager {

void CreateResourceInfoExchanger(
const NKikimrConfig::TTableServiceConfig::TResourceManager::TInfoExchangerSettings& settings) {
ResourceSnapshotState = std::make_shared<TResourceSnapshotState>();
auto exchanger = CreateKqpResourceInfoExchangerActor(
Counters, ResourceSnapshotState, settings);
ResourceInfoExchanger = ActorSystem->Register(exchanger);
Expand Down
3 changes: 2 additions & 1 deletion ydb/core/kqp/workload_service/common/helpers.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,10 +83,11 @@ class TSchemeActorBase : public NActors::TActorBootstrapped<TDerived> {

private:
static TRetryPolicy::IRetryState::TPtr CreateRetryState() {
return TRetryPolicy::GetFixedIntervalPolicy(
return TRetryPolicy::GetExponentialBackoffPolicy(
[](bool longDelay){return longDelay ? ERetryErrorClass::LongRetry : ERetryErrorClass::ShortRetry;}
, TDuration::MilliSeconds(100)
, TDuration::MilliSeconds(500)
, TDuration::Seconds(1)
, 100
)->CreateRetryState();
}
Expand Down
3 changes: 2 additions & 1 deletion ydb/library/table_creator/table_creator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -380,10 +380,11 @@ using TTableCreatorRetryPolicy = IRetryPolicy<bool>;
}

static TTableCreatorRetryPolicy::IRetryState::TPtr CreateRetryState() {
return TTableCreatorRetryPolicy::GetFixedIntervalPolicy(
return TTableCreatorRetryPolicy::GetExponentialBackoffPolicy(
[](bool longDelay){return longDelay ? ERetryErrorClass::LongRetry : ERetryErrorClass::ShortRetry;}
, TDuration::MilliSeconds(100)
, TDuration::MilliSeconds(300)
, TDuration::Seconds(1)
, 100
)->CreateRetryState();
}
Expand Down
4 changes: 4 additions & 0 deletions ydb/tests/tools/kqprun/configuration/app_config.conf
Original file line number Diff line number Diff line change
Expand Up @@ -191,4 +191,8 @@ TableServiceConfig {
ResourceManager {
QueryMemoryLimit: 64424509440
}

WriteActorSettings {
MaxWriteAttempts: 1000
}
}
30 changes: 19 additions & 11 deletions ydb/tests/tools/kqprun/kqprun.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ struct TExecutionOptions {
bool UseTemplates = false;

ui32 LoopCount = 1;
TDuration QueryDelay;
TDuration LoopDelay;
bool ContinueAfterFail = false;

Expand Down Expand Up @@ -100,11 +101,12 @@ struct TExecutionOptions {
};
}

TRequestOptions GetScriptQueryOptions(size_t index, size_t queryId, TInstant startTime) const {
TRequestOptions GetScriptQueryOptions(size_t index, size_t loopId, size_t queryId, TInstant startTime) const {
Y_ABORT_UNLESS(index < ScriptQueries.size());

TString sql = ScriptQueries[index];
if (UseTemplates) {
SubstGlobal(sql, "${LOOP_ID}", ToString(loopId));
SubstGlobal(sql, "${QUERY_ID}", ToString(queryId));
}

Expand Down Expand Up @@ -270,12 +272,12 @@ struct TExecutionOptions {
};


void RunArgumentQuery(size_t index, size_t queryId, TInstant startTime, const TExecutionOptions& executionOptions, TKqpRunner& runner) {
void RunArgumentQuery(size_t index, size_t loopId, size_t queryId, TInstant startTime, const TExecutionOptions& executionOptions, TKqpRunner& runner) {
NColorizer::TColors colors = NColorizer::AutoColors(Cout);

switch (executionOptions.GetExecutionCase(index)) {
case TExecutionOptions::EExecutionCase::GenericScript: {
if (!runner.ExecuteScript(executionOptions.GetScriptQueryOptions(index, queryId, startTime))) {
if (!runner.ExecuteScript(executionOptions.GetScriptQueryOptions(index, loopId, queryId, startTime))) {
ythrow yexception() << TInstant::Now().ToIsoStringLocal() << " Script execution failed";
}
Cout << colors.Yellow() << TInstant::Now().ToIsoStringLocal() << " Fetching script results..." << colors.Default() << Endl;
Expand All @@ -292,21 +294,21 @@ void RunArgumentQuery(size_t index, size_t queryId, TInstant startTime, const TE
}

case TExecutionOptions::EExecutionCase::GenericQuery: {
if (!runner.ExecuteQuery(executionOptions.GetScriptQueryOptions(index, queryId, startTime))) {
if (!runner.ExecuteQuery(executionOptions.GetScriptQueryOptions(index, loopId, queryId, startTime))) {
ythrow yexception() << TInstant::Now().ToIsoStringLocal() << " Query execution failed";
}
break;
}

case TExecutionOptions::EExecutionCase::YqlScript: {
if (!runner.ExecuteYqlScript(executionOptions.GetScriptQueryOptions(index, queryId, startTime))) {
if (!runner.ExecuteYqlScript(executionOptions.GetScriptQueryOptions(index, loopId, queryId, startTime))) {
ythrow yexception() << TInstant::Now().ToIsoStringLocal() << " Yql script execution failed";
}
break;
}

case TExecutionOptions::EExecutionCase::AsyncQuery: {
runner.ExecuteQueryAsync(executionOptions.GetScriptQueryOptions(index, queryId, startTime));
runner.ExecuteQueryAsync(executionOptions.GetScriptQueryOptions(index, loopId, queryId, startTime));
break;
}
}
Expand All @@ -327,24 +329,25 @@ void RunArgumentQueries(const TExecutionOptions& executionOptions, TKqpRunner& r
const size_t numberLoops = executionOptions.LoopCount;
for (size_t queryId = 0; queryId < numberQueries * numberLoops || numberLoops == 0; ++queryId) {
size_t id = queryId % numberQueries;
if (id == 0 && queryId > 0) {
Sleep(executionOptions.LoopDelay);
if (queryId > 0) {
Sleep(id == 0 ? executionOptions.LoopDelay : executionOptions.QueryDelay);
}

const TInstant startTime = TInstant::Now();
const size_t loopId = queryId / numberQueries;
if (executionOptions.GetExecutionCase(id) != TExecutionOptions::EExecutionCase::AsyncQuery) {
Cout << colors.Yellow() << startTime.ToIsoStringLocal() << " Executing script";
if (numberQueries > 1) {
Cout << " " << id;
}
if (numberLoops != 1) {
Cout << ", loop " << queryId / numberQueries;
Cout << ", loop " << loopId;
}
Cout << "..." << colors.Default() << Endl;
}

try {
RunArgumentQuery(id, queryId, startTime, executionOptions, runner);
RunArgumentQuery(id, loopId, queryId, startTime, executionOptions, runner);
} catch (const yexception& exception) {
if (executionOptions.ContinueAfterFail) {
Cerr << colors.Red() << CurrentExceptionMessage() << colors.Default() << Endl;
Expand Down Expand Up @@ -713,6 +716,11 @@ class TMain : public TMainBase {
.DefaultValue(0)
.StoreMappedResultT<ui64>(&ExecutionOptions.LoopDelay, &TDuration::MilliSeconds<ui64>);

options.AddLongOption("query-delay", "Delay in milliseconds between queries starts")
.RequiredArgument("uint")
.DefaultValue(0)
.StoreMappedResultT<ui64>(&ExecutionOptions.QueryDelay, &TDuration::MilliSeconds<ui64>);

options.AddLongOption("continue-after-fail", "Don't not stop requests execution after fails")
.NoArgument()
.SetFlag(&ExecutionOptions.ContinueAfterFail);
Expand Down Expand Up @@ -751,7 +759,7 @@ class TMain : public TMainBase {

options.AddLongOption('H', "health-check", TStringBuilder() << "Level of health check before start (max level " << static_cast<ui32>(TYdbSetupSettings::EHealthCheck::Max) - 1 << ")")
.RequiredArgument("uint")
.DefaultValue(static_cast<ui8>(TYdbSetupSettings::EHealthCheck::NodesCount))
.DefaultValue(static_cast<ui8>(TYdbSetupSettings::EHealthCheck::FetchDatabase))
.StoreMappedResultT<ui8>(&RunnerOptions.YdbSettings.HealthCheckLevel, [](ui8 value) {
return static_cast<TYdbSetupSettings::EHealthCheck>(std::min(value, static_cast<ui8>(TYdbSetupSettings::EHealthCheck::Max)));
});
Expand Down
49 changes: 41 additions & 8 deletions ydb/tests/tools/kqprun/src/actors.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

#include <ydb/core/kqp/common/simple/services.h>
#include <ydb/core/kqp/rm_service/kqp_rm_service.h>

#include <ydb/core/kqp/workload_service/actors/actors.h>

namespace NKqpRun {

Expand Down Expand Up @@ -254,6 +254,8 @@ class TResourcesWaiterActor : public NActors::TActorBootstrapped<TResourcesWaite
void Bootstrap() {
Become(&TResourcesWaiterActor::StateFunc);

Schedule(Settings_.HealthCheckTimeout, new NActors::TEvents::TEvWakeup());

HealthCheckStage_ = EHealthCheck::NodesCount;
DoHealthCheck();
}
Expand All @@ -264,17 +266,26 @@ class TResourcesWaiterActor : public NActors::TActorBootstrapped<TResourcesWaite
return;
}

if (TInstant::Now() - StartTime_ >= Settings_.HealthCheckTimeout) {
FailTimeout();
return;
}

switch (HealthCheckStage_) {
case TYdbSetupSettings::EHealthCheck::NodesCount:
case EHealthCheck::NodesCount:
CheckResourcesPublish();
break;

case TYdbSetupSettings::EHealthCheck::ScriptRequest:
case EHealthCheck::FetchDatabase:
FetchDatabase();
break;

case EHealthCheck::ScriptRequest:
StartScriptQuery();
break;

case TYdbSetupSettings::EHealthCheck::None:
case TYdbSetupSettings::EHealthCheck::Max:
case EHealthCheck::None:
case EHealthCheck::Max:
Finish();
break;
}
Expand All @@ -283,14 +294,25 @@ class TResourcesWaiterActor : public NActors::TActorBootstrapped<TResourcesWaite
void Handle(TEvPrivate::TEvResourcesInfo::TPtr& ev) {
const auto nodeCount = ev->Get()->NodeCount;
if (nodeCount == Settings_.ExpectedNodeCount) {
HealthCheckStage_ = EHealthCheck::ScriptRequest;
HealthCheckStage_ = EHealthCheck::FetchDatabase;
DoHealthCheck();
return;
}

Retry(TStringBuilder() << "invalid node count, got " << nodeCount << ", expected " << Settings_.ExpectedNodeCount, true);
}

void Handle(NKikimr::NKqp::NWorkload::TEvFetchDatabaseResponse::TPtr& ev) {
const auto status = ev->Get()->Status;
if (status == Ydb::StatusIds::SUCCESS) {
HealthCheckStage_ = EHealthCheck::ScriptRequest;
DoHealthCheck();
return;
}

Retry(TStringBuilder() << "failed to fetch database with status " << status << ", reason:\n" << CoutColors_.Default() << ev->Get()->Issues.ToString(), true);
}

void Handle(NKikimr::NKqp::TEvKqp::TEvScriptResponse::TPtr& ev) {
const auto status = ev->Get()->Status;
if (status == Ydb::StatusIds::SUCCESS) {
Expand All @@ -304,6 +326,7 @@ class TResourcesWaiterActor : public NActors::TActorBootstrapped<TResourcesWaite
STRICT_STFUNC(StateFunc,
sFunc(NActors::TEvents::TEvWakeup, DoHealthCheck);
hFunc(TEvPrivate::TEvResourcesInfo, Handle);
hFunc(NKikimr::NKqp::NWorkload::TEvFetchDatabaseResponse, Handle);
hFunc(NKikimr::NKqp::TEvKqp::TEvScriptResponse, Handle);
)

Expand All @@ -324,6 +347,10 @@ class TResourcesWaiterActor : public NActors::TActorBootstrapped<TResourcesWaite
});
}

void FetchDatabase() {
Register(NKikimr::NKqp::NWorkload::CreateDatabaseFetcherActor(SelfId(), Settings_.Database));
}

void StartScriptQuery() {
auto event = MakeHolder<NKikimr::NKqp::TEvKqp::TEvScriptRequest>();
event->Record.SetUserToken(NACLib::TUserToken("", BUILTIN_ACL_ROOT, {}).SerializeAsString());
Expand All @@ -344,11 +371,12 @@ class TResourcesWaiterActor : public NActors::TActorBootstrapped<TResourcesWaite

if (auto delay = RetryState_->GetNextRetryDelay(shortRetry)) {
if (Settings_.VerboseLevel >= EVerbose::InitLogs) {
Cout << CoutColors_.Cyan() << "Retry in " << *delay << " " << message << CoutColors_.Default() << Endl;
const TString str = TStringBuilder() << CoutColors_.Cyan() << "Retry for database '" << Settings_.Database << "' in " << *delay << " " << message << CoutColors_.Default();
Cout << str << Endl;
}
Schedule(*delay, new NActors::TEvents::TEvWakeup());
} else {
Fail(TStringBuilder() << "Health check timeout " << Settings_.HealthCheckTimeout << " exceeded, use --health-check-timeout for increasing it or check out health check logs by using --verbose " << static_cast<ui32>(EVerbose::InitLogs));
FailTimeout();
}
}

Expand All @@ -357,6 +385,10 @@ class TResourcesWaiterActor : public NActors::TActorBootstrapped<TResourcesWaite
PassAway();
}

void FailTimeout() {
Fail(TStringBuilder() << "Health check timeout " << Settings_.HealthCheckTimeout << " exceeded for database '" << Settings_.Database << "', use --health-check-timeout for increasing it or check out health check logs by using --verbose " << static_cast<ui32>(EVerbose::InitLogs));
}

void Fail(const TString& error) {
Promise_.SetException(error);
PassAway();
Expand All @@ -368,6 +400,7 @@ class TResourcesWaiterActor : public NActors::TActorBootstrapped<TResourcesWaite

private:
const TWaitResourcesSettings Settings_;
const TInstant StartTime_ = TInstant::Now();
const NColorizer::TColors CoutColors_ = NColorizer::AutoColors(Cout);
const IRetryPolicy::TPtr RetryPolicy_;
IRetryPolicy::IRetryState::TPtr RetryState_ = nullptr;
Expand Down
1 change: 1 addition & 0 deletions ydb/tests/tools/kqprun/src/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ struct TYdbSetupSettings : public NKikimrRun::TServerSettings {
enum class EHealthCheck {
None,
NodesCount,
FetchDatabase,
ScriptRequest,
Max
};
Expand Down
1 change: 1 addition & 0 deletions ydb/tests/tools/kqprun/src/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ SRCS(
)

PEERDIR(
ydb/core/kqp/workload_service/actors
ydb/core/testlib

ydb/tests/tools/kqprun/runlib
Expand Down
40 changes: 31 additions & 9 deletions ydb/tests/tools/kqprun/src/ydb_setup.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ void FillQueryMeta(TQueryMeta& meta, const NKikimrKqp::TQueryResponse& response)

class TYdbSetup::TImpl {
using EVerbose = TYdbSetupSettings::EVerbose;
using EHealthCheck = TYdbSetupSettings::EHealthCheck;

private:
TAutoPtr<TLogBackend> CreateLogBackend() const {
Expand Down Expand Up @@ -399,19 +400,36 @@ class TYdbSetup::TImpl {
NYql::NLog::InitLogger(NActors::CreateNullBackend());
}

void WaitResourcesPublishing() const {
auto promise = NThreading::NewPromise();
NThreading::TFuture<void> RunHealthCheck(const TString& database) const {
EHealthCheck level = Settings_.HealthCheckLevel;
i32 nodesCount = Settings_.NodeCount;
if (database != Settings_.DomainName) {
nodesCount = Tenants_->Size(database);
} else if (StorageMeta_.TenantsSize() > 0) {
level = std::min(level, EHealthCheck::NodesCount);
}

const TWaitResourcesSettings settings = {
.ExpectedNodeCount = static_cast<i32>(Settings_.NodeCount),
.HealthCheckLevel = Settings_.HealthCheckLevel,
.ExpectedNodeCount = nodesCount,
.HealthCheckLevel = level,
.HealthCheckTimeout = Settings_.HealthCheckTimeout,
.VerboseLevel = Settings_.VerboseLevel,
.Database = NKikimr::CanonizePath(Settings_.DomainName)
.Database = NKikimr::CanonizePath(database)
};
GetRuntime()->Register(CreateResourcesWaiterActor(promise, settings), 0, GetRuntime()->GetAppData().SystemPoolId);
const auto promise = NThreading::NewPromise();
GetRuntime()->Register(CreateResourcesWaiterActor(promise, settings), GetNodeIndexForDatabase(database), GetRuntime()->GetAppData().SystemPoolId);

return promise.GetFuture();
}

void WaitResourcesPublishing() const {
std::vector<NThreading::TFuture<void>> futures(1, RunHealthCheck(Settings_.DomainName));
for (const auto& [tenantName, _] : StorageMeta_.GetTenants()) {
futures.emplace_back(RunHealthCheck(GetTenantPath(tenantName)));
}

try {
promise.GetFuture().GetValue(2 * Settings_.HealthCheckTimeout);
NThreading::WaitAll(futures).GetValue(2 * Settings_.HealthCheckTimeout);
} catch (...) {
ythrow yexception() << "Failed to initialize all resources: " << CurrentExceptionMessage();
}
Expand Down Expand Up @@ -628,7 +646,11 @@ class TYdbSetup::TImpl {
}

TString GetDatabasePath(const TString& database) const {
return NKikimr::CanonizePath(database ? database : GetDefaultDatabase());
const TString& result = NKikimr::CanonizePath(database ? database : GetDefaultDatabase());
if (StorageMeta_.TenantsSize() > 0 && result == NKikimr::CanonizePath(Settings_.DomainName)) {
ythrow yexception() << "Cannot use root domain '" << result << "' as request database then created additional tenants";
}
return result;
}

ui32 GetNodeIndexForDatabase(const TString& path) const {
Expand All @@ -653,7 +675,7 @@ class TYdbSetup::TImpl {
ythrow yexception() << "Can not choose default database, there is more than one tenants, please use `-D <database name>`";
}
if (StorageMeta_.TenantsSize() == 1) {
return StorageMeta_.GetTenants().begin()->first;
return GetTenantPath(StorageMeta_.GetTenants().begin()->first);
}
return Settings_.DomainName;
}
Expand Down
Loading