From abd507c2cb3d3658b87df56dfa0b060da0fcab35 Mon Sep 17 00:00:00 2001 From: Grigoriy Pisarenko Date: Mon, 22 Jul 2024 14:10:16 +0000 Subject: [PATCH] Fixed requests starts for queue_size=0 --- .../actors/pool_handlers_acors.cpp | 8 ++--- .../kqp/workload_service/common/helpers.cpp | 4 +++ .../kqp/workload_service/common/helpers.h | 2 ++ .../common/kqp_workload_service_ut_common.cpp | 33 ++++++++++--------- .../ut/kqp_workload_service_ut.cpp | 30 +++++++++++++++++ 5 files changed, 57 insertions(+), 20 deletions(-) diff --git a/ydb/core/kqp/workload_service/actors/pool_handlers_acors.cpp b/ydb/core/kqp/workload_service/actors/pool_handlers_acors.cpp index 5faa30f86dc7..f0f11628a068 100644 --- a/ydb/core/kqp/workload_service/actors/pool_handlers_acors.cpp +++ b/ydb/core/kqp/workload_service/actors/pool_handlers_acors.cpp @@ -577,7 +577,7 @@ class TFifoPoolHandlerActor : public TPoolHandlerActorBase= MAX_PENDING_REQUESTS || GetLocalSessionsCount() - GetLocalInFlight() > QueueSizeLimit + 1) { + if (PendingRequests.size() >= MAX_PENDING_REQUESTS || SaturationSub(GetLocalSessionsCount() - GetLocalInFlight(), InFlightLimit) > QueueSizeLimit) { ReplyContinue(request, Ydb::StatusIds::OVERLOADED, TStringBuilder() << "Too many pending requests for pool " << PoolId); return; } @@ -695,8 +695,8 @@ class TFifoPoolHandlerActor : public TPoolHandlerActorBase QueueSizeLimit) { - RemoveBackRequests(PendingRequests, std::min(GlobalState.DelayedRequests + PendingRequests.size() - QueueSizeLimit, PendingRequests.size()), [this](TRequest* request) { + if (const ui64 delayedRequests = SaturationSub(GlobalState.AmountRequests() + PendingRequests.size(), InFlightLimit); delayedRequests > QueueSizeLimit) { + RemoveBackRequests(PendingRequests, std::min(delayedRequests - QueueSizeLimit, PendingRequests.size()), [this](TRequest* request) { ReplyContinue(request, Ydb::StatusIds::OVERLOADED, TStringBuilder() << "Too many pending requests for pool " << PoolId); }); FifoCounters.PendingRequestsCount->Set(PendingRequests.size()); @@ -847,7 +847,7 @@ class TFifoPoolHandlerActor : public TPoolHandlerActorBase y) ? x - y : 0; +} + } // NKikimr::NKqp::NWorkload diff --git a/ydb/core/kqp/workload_service/common/helpers.h b/ydb/core/kqp/workload_service/common/helpers.h index 85aff302d68f..edb489bc3fc6 100644 --- a/ydb/core/kqp/workload_service/common/helpers.h +++ b/ydb/core/kqp/workload_service/common/helpers.h @@ -99,4 +99,6 @@ NYql::TIssues GroupIssues(const NYql::TIssues& issues, const TString& message); void ParsePoolSettings(const NKikimrSchemeOp::TResourcePoolDescription& description, NResourcePool::TPoolSettings& poolConfig); +ui64 SaturationSub(ui64 x, ui64 y); + } // NKikimr::NKqp::NWorkload diff --git a/ydb/core/kqp/workload_service/ut/common/kqp_workload_service_ut_common.cpp b/ydb/core/kqp/workload_service/ut/common/kqp_workload_service_ut_common.cpp index 825ea33bbe2f..d6b6ea4e58b5 100644 --- a/ydb/core/kqp/workload_service/ut/common/kqp_workload_service_ut_common.cpp +++ b/ydb/core/kqp/workload_service/ut/common/kqp_workload_service_ut_common.cpp @@ -453,9 +453,10 @@ class TWorkloadServiceYdbSetup : public IYdbSetup { auto subgroup = GetWorkloadManagerCounters(nodeIndex) ->GetSubgroup("pool", CanonizePath(TStringBuilder() << Settings_.DomainName_ << "/" << (poolId ? poolId : Settings_.PoolId_))); - CheckCommonCounters(subgroup); + const TString description = TStringBuilder() << "Node index: " << nodeIndex; + CheckCommonCounters(subgroup, description); if (checkTableCounters) { - CheckTableCounters(subgroup); + CheckTableCounters(subgroup, description); } } } @@ -497,21 +498,21 @@ class TWorkloadServiceYdbSetup : public IYdbSetup { ->GetSubgroup("subsystem", "workload_manager"); } - static void CheckCommonCounters(NMonitoring::TDynamicCounterPtr subgroup) { - UNIT_ASSERT_VALUES_EQUAL(subgroup->GetCounter("LocalInFly", false)->Val(), 0); - UNIT_ASSERT_VALUES_EQUAL(subgroup->GetCounter("LocalDelayedRequests", false)->Val(), 0); - UNIT_ASSERT_VALUES_EQUAL(subgroup->GetCounter("ContinueOverloaded", true)->Val(), 0); - UNIT_ASSERT_VALUES_EQUAL(subgroup->GetCounter("ContinueError", true)->Val(), 0); - UNIT_ASSERT_VALUES_EQUAL(subgroup->GetCounter("CleanupError", true)->Val(), 0); - UNIT_ASSERT_VALUES_EQUAL(subgroup->GetCounter("Cancelled", true)->Val(), 0); + static void CheckCommonCounters(NMonitoring::TDynamicCounterPtr subgroup, const TString& description) { + UNIT_ASSERT_VALUES_EQUAL_C(subgroup->GetCounter("LocalInFly", false)->Val(), 0, description); + UNIT_ASSERT_VALUES_EQUAL_C(subgroup->GetCounter("LocalDelayedRequests", false)->Val(), 0, description); + UNIT_ASSERT_VALUES_EQUAL_C(subgroup->GetCounter("ContinueOverloaded", true)->Val(), 0, description); + UNIT_ASSERT_VALUES_EQUAL_C(subgroup->GetCounter("ContinueError", true)->Val(), 0, description); + UNIT_ASSERT_VALUES_EQUAL_C(subgroup->GetCounter("CleanupError", true)->Val(), 0, description); + UNIT_ASSERT_VALUES_EQUAL_C(subgroup->GetCounter("Cancelled", true)->Val(), 0, description); - UNIT_ASSERT_GE(subgroup->GetCounter("ContinueOk", true)->Val(), 1); - UNIT_ASSERT_VALUES_EQUAL(subgroup->GetCounter("ContinueOk", true)->Val(), subgroup->GetCounter("CleanupOk", true)->Val()); + UNIT_ASSERT_GE_C(subgroup->GetCounter("ContinueOk", true)->Val(), 1, description); + UNIT_ASSERT_VALUES_EQUAL_C(subgroup->GetCounter("ContinueOk", true)->Val(), subgroup->GetCounter("CleanupOk", true)->Val(), description); } - static void CheckTableCounters(NMonitoring::TDynamicCounterPtr subgroup) { - UNIT_ASSERT_VALUES_EQUAL(subgroup->GetCounter("PendingRequestsCount", false)->Val(), 0); - UNIT_ASSERT_VALUES_EQUAL(subgroup->GetCounter("FinishingRequestsCount", false)->Val(), 0); + static void CheckTableCounters(NMonitoring::TDynamicCounterPtr subgroup, const TString& description) { + UNIT_ASSERT_VALUES_EQUAL_C(subgroup->GetCounter("PendingRequestsCount", false)->Val(), 0, description); + UNIT_ASSERT_VALUES_EQUAL_C(subgroup->GetCounter("FinishingRequestsCount", false)->Val(), 0, description); const std::vector> tableQueries = { {"TCleanupTablesQuery", false}, @@ -524,9 +525,9 @@ class TWorkloadServiceYdbSetup : public IYdbSetup { for (const auto& [operation, runExpected] : tableQueries) { auto operationSubgroup = subgroup->GetSubgroup("operation", operation); - UNIT_ASSERT_VALUES_EQUAL_C(operationSubgroup->GetCounter("FinishError", true)->Val(), 0, TStringBuilder() << "Unexpected vaule for operation " << operation); + UNIT_ASSERT_VALUES_EQUAL_C(operationSubgroup->GetCounter("FinishError", true)->Val(), 0, TStringBuilder() << description << ", unexpected vaule for operation " << operation); if (runExpected) { - UNIT_ASSERT_GE_C(operationSubgroup->GetCounter("FinishOk", true)->Val(), 1, TStringBuilder() << "Unexpected vaule for operation " << operation); + UNIT_ASSERT_GE_C(operationSubgroup->GetCounter("FinishOk", true)->Val(), 1, TStringBuilder() << description << ", unexpected vaule for operation " << operation); } } } diff --git a/ydb/core/kqp/workload_service/ut/kqp_workload_service_ut.cpp b/ydb/core/kqp/workload_service/ut/kqp_workload_service_ut.cpp index 5456074990f4..a750be18bd79 100644 --- a/ydb/core/kqp/workload_service/ut/kqp_workload_service_ut.cpp +++ b/ydb/core/kqp/workload_service/ut/kqp_workload_service_ut.cpp @@ -123,6 +123,36 @@ Y_UNIT_TEST_SUITE(KqpWorkloadService) { TSampleQueries::TSelect42::CheckResult(hangingRequest.GetResult()); } + Y_UNIT_TEST(TestZeroQueueSizeManyQueries) { + const i32 inFlight = 10; + auto ydb = TYdbSetupSettings() + .ConcurrentQueryLimit(inFlight) + .QueueSize(0) + .QueryCancelAfter(FUTURE_WAIT_TIMEOUT * inFlight) + .Create(); + + auto settings = TQueryRunnerSettings().HangUpDuringExecution(true); + + std::vector asyncResults; + for (size_t i = 0; i < inFlight; ++i) { + asyncResults.emplace_back(ydb->ExecuteQueryAsync(TSampleQueries::TSelect42::Query, settings)); + } + + for (const auto& asyncResult : asyncResults) { + ydb->WaitQueryExecution(asyncResult); + } + + TSampleQueries::CheckOverloaded( + ydb->ExecuteQuery(TSampleQueries::TSelect42::Query, TQueryRunnerSettings().ExecutionExpected(false)), + ydb->GetSettings().PoolId_ + ); + + for (const auto& asyncResult : asyncResults) { + ydb->ContinueQueryExecution(asyncResult); + TSampleQueries::TSelect42::CheckResult(asyncResult.GetResult()); + } + } + Y_UNIT_TEST(TestQueryCancelAfterUnlimitedPool) { auto ydb = TYdbSetupSettings() .QueryCancelAfter(TDuration::Seconds(10))