diff --git a/ydb/core/kqp/rm_service/kqp_rm_service.cpp b/ydb/core/kqp/rm_service/kqp_rm_service.cpp index f1f5b365bfa6..9274684de4bf 100644 --- a/ydb/core/kqp/rm_service/kqp_rm_service.cpp +++ b/ydb/core/kqp/rm_service/kqp_rm_service.cpp @@ -163,6 +163,7 @@ class TKqpResourceManager : public IKqpResourceManager { , ExecutionUnitsLimit(config.GetComputeActorsCount()) , SpillingPercent(config.GetSpillingPercent()) , TotalMemoryResource(MakeIntrusive(config.GetQueryMemoryLimit(), (double)100, config.GetSpillingPercent())) + , ResourceSnapshotState(std::make_shared()) { SetConfigValues(config); } @@ -195,7 +196,6 @@ class TKqpResourceManager : public IKqpResourceManager { void CreateResourceInfoExchanger( const NKikimrConfig::TTableServiceConfig::TResourceManager::TInfoExchangerSettings& settings) { - ResourceSnapshotState = std::make_shared(); auto exchanger = CreateKqpResourceInfoExchangerActor( Counters, ResourceSnapshotState, settings); ResourceInfoExchanger = ActorSystem->Register(exchanger); diff --git a/ydb/core/kqp/workload_service/common/helpers.h b/ydb/core/kqp/workload_service/common/helpers.h index 58b9be80ee1a..8ceef39a5531 100644 --- a/ydb/core/kqp/workload_service/common/helpers.h +++ b/ydb/core/kqp/workload_service/common/helpers.h @@ -83,10 +83,11 @@ class TSchemeActorBase : public NActors::TActorBootstrapped { private: static TRetryPolicy::IRetryState::TPtr CreateRetryState() { - return TRetryPolicy::GetFixedIntervalPolicy( + return TRetryPolicy::GetExponentialBackoffPolicy( [](bool longDelay){return longDelay ? ERetryErrorClass::LongRetry : ERetryErrorClass::ShortRetry;} , TDuration::MilliSeconds(100) , TDuration::MilliSeconds(500) + , TDuration::Seconds(1) , 100 )->CreateRetryState(); } diff --git a/ydb/library/table_creator/table_creator.cpp b/ydb/library/table_creator/table_creator.cpp index 9cf498b48c5c..b920a497a840 100644 --- a/ydb/library/table_creator/table_creator.cpp +++ b/ydb/library/table_creator/table_creator.cpp @@ -380,10 +380,11 @@ using TTableCreatorRetryPolicy = IRetryPolicy; } static TTableCreatorRetryPolicy::IRetryState::TPtr CreateRetryState() { - return TTableCreatorRetryPolicy::GetFixedIntervalPolicy( + return TTableCreatorRetryPolicy::GetExponentialBackoffPolicy( [](bool longDelay){return longDelay ? ERetryErrorClass::LongRetry : ERetryErrorClass::ShortRetry;} , TDuration::MilliSeconds(100) , TDuration::MilliSeconds(300) + , TDuration::Seconds(1) , 100 )->CreateRetryState(); } diff --git a/ydb/tests/tools/kqprun/configuration/app_config.conf b/ydb/tests/tools/kqprun/configuration/app_config.conf index d5a5e8f21cdf..4c69371ab8fc 100644 --- a/ydb/tests/tools/kqprun/configuration/app_config.conf +++ b/ydb/tests/tools/kqprun/configuration/app_config.conf @@ -191,4 +191,8 @@ TableServiceConfig { ResourceManager { QueryMemoryLimit: 64424509440 } + + WriteActorSettings { + MaxWriteAttempts: 1000 + } } diff --git a/ydb/tests/tools/kqprun/kqprun.cpp b/ydb/tests/tools/kqprun/kqprun.cpp index 4505610ce417..b736157d4ac1 100644 --- a/ydb/tests/tools/kqprun/kqprun.cpp +++ b/ydb/tests/tools/kqprun/kqprun.cpp @@ -44,6 +44,7 @@ struct TExecutionOptions { bool UseTemplates = false; ui32 LoopCount = 1; + TDuration QueryDelay; TDuration LoopDelay; bool ContinueAfterFail = false; @@ -100,11 +101,12 @@ struct TExecutionOptions { }; } - TRequestOptions GetScriptQueryOptions(size_t index, size_t queryId, TInstant startTime) const { + TRequestOptions GetScriptQueryOptions(size_t index, size_t loopId, size_t queryId, TInstant startTime) const { Y_ABORT_UNLESS(index < ScriptQueries.size()); TString sql = ScriptQueries[index]; if (UseTemplates) { + SubstGlobal(sql, "${LOOP_ID}", ToString(loopId)); SubstGlobal(sql, "${QUERY_ID}", ToString(queryId)); } @@ -270,12 +272,12 @@ struct TExecutionOptions { }; -void RunArgumentQuery(size_t index, size_t queryId, TInstant startTime, const TExecutionOptions& executionOptions, TKqpRunner& runner) { +void RunArgumentQuery(size_t index, size_t loopId, size_t queryId, TInstant startTime, const TExecutionOptions& executionOptions, TKqpRunner& runner) { NColorizer::TColors colors = NColorizer::AutoColors(Cout); switch (executionOptions.GetExecutionCase(index)) { case TExecutionOptions::EExecutionCase::GenericScript: { - if (!runner.ExecuteScript(executionOptions.GetScriptQueryOptions(index, queryId, startTime))) { + if (!runner.ExecuteScript(executionOptions.GetScriptQueryOptions(index, loopId, queryId, startTime))) { ythrow yexception() << TInstant::Now().ToIsoStringLocal() << " Script execution failed"; } Cout << colors.Yellow() << TInstant::Now().ToIsoStringLocal() << " Fetching script results..." << colors.Default() << Endl; @@ -292,21 +294,21 @@ void RunArgumentQuery(size_t index, size_t queryId, TInstant startTime, const TE } case TExecutionOptions::EExecutionCase::GenericQuery: { - if (!runner.ExecuteQuery(executionOptions.GetScriptQueryOptions(index, queryId, startTime))) { + if (!runner.ExecuteQuery(executionOptions.GetScriptQueryOptions(index, loopId, queryId, startTime))) { ythrow yexception() << TInstant::Now().ToIsoStringLocal() << " Query execution failed"; } break; } case TExecutionOptions::EExecutionCase::YqlScript: { - if (!runner.ExecuteYqlScript(executionOptions.GetScriptQueryOptions(index, queryId, startTime))) { + if (!runner.ExecuteYqlScript(executionOptions.GetScriptQueryOptions(index, loopId, queryId, startTime))) { ythrow yexception() << TInstant::Now().ToIsoStringLocal() << " Yql script execution failed"; } break; } case TExecutionOptions::EExecutionCase::AsyncQuery: { - runner.ExecuteQueryAsync(executionOptions.GetScriptQueryOptions(index, queryId, startTime)); + runner.ExecuteQueryAsync(executionOptions.GetScriptQueryOptions(index, loopId, queryId, startTime)); break; } } @@ -327,24 +329,25 @@ void RunArgumentQueries(const TExecutionOptions& executionOptions, TKqpRunner& r const size_t numberLoops = executionOptions.LoopCount; for (size_t queryId = 0; queryId < numberQueries * numberLoops || numberLoops == 0; ++queryId) { size_t id = queryId % numberQueries; - if (id == 0 && queryId > 0) { - Sleep(executionOptions.LoopDelay); + if (queryId > 0) { + Sleep(id == 0 ? executionOptions.LoopDelay : executionOptions.QueryDelay); } const TInstant startTime = TInstant::Now(); + const size_t loopId = queryId / numberQueries; if (executionOptions.GetExecutionCase(id) != TExecutionOptions::EExecutionCase::AsyncQuery) { Cout << colors.Yellow() << startTime.ToIsoStringLocal() << " Executing script"; if (numberQueries > 1) { Cout << " " << id; } if (numberLoops != 1) { - Cout << ", loop " << queryId / numberQueries; + Cout << ", loop " << loopId; } Cout << "..." << colors.Default() << Endl; } try { - RunArgumentQuery(id, queryId, startTime, executionOptions, runner); + RunArgumentQuery(id, loopId, queryId, startTime, executionOptions, runner); } catch (const yexception& exception) { if (executionOptions.ContinueAfterFail) { Cerr << colors.Red() << CurrentExceptionMessage() << colors.Default() << Endl; @@ -713,6 +716,11 @@ class TMain : public TMainBase { .DefaultValue(0) .StoreMappedResultT(&ExecutionOptions.LoopDelay, &TDuration::MilliSeconds); + options.AddLongOption("query-delay", "Delay in milliseconds between queries starts") + .RequiredArgument("uint") + .DefaultValue(0) + .StoreMappedResultT(&ExecutionOptions.QueryDelay, &TDuration::MilliSeconds); + options.AddLongOption("continue-after-fail", "Don't not stop requests execution after fails") .NoArgument() .SetFlag(&ExecutionOptions.ContinueAfterFail); @@ -751,7 +759,7 @@ class TMain : public TMainBase { options.AddLongOption('H', "health-check", TStringBuilder() << "Level of health check before start (max level " << static_cast(TYdbSetupSettings::EHealthCheck::Max) - 1 << ")") .RequiredArgument("uint") - .DefaultValue(static_cast(TYdbSetupSettings::EHealthCheck::NodesCount)) + .DefaultValue(static_cast(TYdbSetupSettings::EHealthCheck::FetchDatabase)) .StoreMappedResultT(&RunnerOptions.YdbSettings.HealthCheckLevel, [](ui8 value) { return static_cast(std::min(value, static_cast(TYdbSetupSettings::EHealthCheck::Max))); }); diff --git a/ydb/tests/tools/kqprun/src/actors.cpp b/ydb/tests/tools/kqprun/src/actors.cpp index c548e0025660..ab1ae17974ec 100644 --- a/ydb/tests/tools/kqprun/src/actors.cpp +++ b/ydb/tests/tools/kqprun/src/actors.cpp @@ -4,7 +4,7 @@ #include #include - +#include namespace NKqpRun { @@ -254,6 +254,8 @@ class TResourcesWaiterActor : public NActors::TActorBootstrapped= Settings_.HealthCheckTimeout) { + FailTimeout(); + return; + } + switch (HealthCheckStage_) { - case TYdbSetupSettings::EHealthCheck::NodesCount: + case EHealthCheck::NodesCount: CheckResourcesPublish(); break; - case TYdbSetupSettings::EHealthCheck::ScriptRequest: + case EHealthCheck::FetchDatabase: + FetchDatabase(); + break; + + case EHealthCheck::ScriptRequest: StartScriptQuery(); break; - case TYdbSetupSettings::EHealthCheck::None: - case TYdbSetupSettings::EHealthCheck::Max: + case EHealthCheck::None: + case EHealthCheck::Max: Finish(); break; } @@ -283,7 +294,7 @@ class TResourcesWaiterActor : public NActors::TActorBootstrappedGet()->NodeCount; if (nodeCount == Settings_.ExpectedNodeCount) { - HealthCheckStage_ = EHealthCheck::ScriptRequest; + HealthCheckStage_ = EHealthCheck::FetchDatabase; DoHealthCheck(); return; } @@ -291,6 +302,17 @@ class TResourcesWaiterActor : public NActors::TActorBootstrappedGet()->Status; + if (status == Ydb::StatusIds::SUCCESS) { + HealthCheckStage_ = EHealthCheck::ScriptRequest; + DoHealthCheck(); + return; + } + + Retry(TStringBuilder() << "failed to fetch database with status " << status << ", reason:\n" << CoutColors_.Default() << ev->Get()->Issues.ToString(), true); + } + void Handle(NKikimr::NKqp::TEvKqp::TEvScriptResponse::TPtr& ev) { const auto status = ev->Get()->Status; if (status == Ydb::StatusIds::SUCCESS) { @@ -304,6 +326,7 @@ class TResourcesWaiterActor : public NActors::TActorBootstrapped(); event->Record.SetUserToken(NACLib::TUserToken("", BUILTIN_ACL_ROOT, {}).SerializeAsString()); @@ -344,11 +371,12 @@ class TResourcesWaiterActor : public NActors::TActorBootstrappedGetNextRetryDelay(shortRetry)) { if (Settings_.VerboseLevel >= EVerbose::InitLogs) { - Cout << CoutColors_.Cyan() << "Retry in " << *delay << " " << message << CoutColors_.Default() << Endl; + const TString str = TStringBuilder() << CoutColors_.Cyan() << "Retry for database '" << Settings_.Database << "' in " << *delay << " " << message << CoutColors_.Default(); + Cout << str << Endl; } Schedule(*delay, new NActors::TEvents::TEvWakeup()); } else { - Fail(TStringBuilder() << "Health check timeout " << Settings_.HealthCheckTimeout << " exceeded, use --health-check-timeout for increasing it or check out health check logs by using --verbose " << static_cast(EVerbose::InitLogs)); + FailTimeout(); } } @@ -357,6 +385,10 @@ class TResourcesWaiterActor : public NActors::TActorBootstrapped(EVerbose::InitLogs)); + } + void Fail(const TString& error) { Promise_.SetException(error); PassAway(); @@ -368,6 +400,7 @@ class TResourcesWaiterActor : public NActors::TActorBootstrapped CreateLogBackend() const { @@ -399,19 +400,36 @@ class TYdbSetup::TImpl { NYql::NLog::InitLogger(NActors::CreateNullBackend()); } - void WaitResourcesPublishing() const { - auto promise = NThreading::NewPromise(); + NThreading::TFuture RunHealthCheck(const TString& database) const { + EHealthCheck level = Settings_.HealthCheckLevel; + i32 nodesCount = Settings_.NodeCount; + if (database != Settings_.DomainName) { + nodesCount = Tenants_->Size(database); + } else if (StorageMeta_.TenantsSize() > 0) { + level = std::min(level, EHealthCheck::NodesCount); + } + const TWaitResourcesSettings settings = { - .ExpectedNodeCount = static_cast(Settings_.NodeCount), - .HealthCheckLevel = Settings_.HealthCheckLevel, + .ExpectedNodeCount = nodesCount, + .HealthCheckLevel = level, .HealthCheckTimeout = Settings_.HealthCheckTimeout, .VerboseLevel = Settings_.VerboseLevel, - .Database = NKikimr::CanonizePath(Settings_.DomainName) + .Database = NKikimr::CanonizePath(database) }; - GetRuntime()->Register(CreateResourcesWaiterActor(promise, settings), 0, GetRuntime()->GetAppData().SystemPoolId); + const auto promise = NThreading::NewPromise(); + GetRuntime()->Register(CreateResourcesWaiterActor(promise, settings), GetNodeIndexForDatabase(database), GetRuntime()->GetAppData().SystemPoolId); + + return promise.GetFuture(); + } + + void WaitResourcesPublishing() const { + std::vector> futures(1, RunHealthCheck(Settings_.DomainName)); + for (const auto& [tenantName, _] : StorageMeta_.GetTenants()) { + futures.emplace_back(RunHealthCheck(GetTenantPath(tenantName))); + } try { - promise.GetFuture().GetValue(2 * Settings_.HealthCheckTimeout); + NThreading::WaitAll(futures).GetValue(2 * Settings_.HealthCheckTimeout); } catch (...) { ythrow yexception() << "Failed to initialize all resources: " << CurrentExceptionMessage(); } @@ -628,7 +646,11 @@ class TYdbSetup::TImpl { } TString GetDatabasePath(const TString& database) const { - return NKikimr::CanonizePath(database ? database : GetDefaultDatabase()); + const TString& result = NKikimr::CanonizePath(database ? database : GetDefaultDatabase()); + if (StorageMeta_.TenantsSize() > 0 && result == NKikimr::CanonizePath(Settings_.DomainName)) { + ythrow yexception() << "Cannot use root domain '" << result << "' as request database then created additional tenants"; + } + return result; } ui32 GetNodeIndexForDatabase(const TString& path) const { @@ -653,7 +675,7 @@ class TYdbSetup::TImpl { ythrow yexception() << "Can not choose default database, there is more than one tenants, please use `-D `"; } if (StorageMeta_.TenantsSize() == 1) { - return StorageMeta_.GetTenants().begin()->first; + return GetTenantPath(StorageMeta_.GetTenants().begin()->first); } return Settings_.DomainName; }