Skip to content

Commit

Permalink
Merge e712c91 into cba7316
Browse files Browse the repository at this point in the history
  • Loading branch information
GrigoriyPA authored Sep 5, 2024
2 parents cba7316 + e712c91 commit db15c1c
Show file tree
Hide file tree
Showing 11 changed files with 69 additions and 4 deletions.
14 changes: 14 additions & 0 deletions ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6643,6 +6643,20 @@ Y_UNIT_TEST_SUITE(KqpScheme) {
);)").GetValueSync();
UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::GENERIC_ERROR);
UNIT_ASSERT_STRING_CONTAINS(result.GetIssues().ToString(), "Failed to parse property concurrent_query_limit:");

result = session.ExecuteSchemeQuery(TStringBuilder() << R"(
CREATE RESOURCE POOL MyResourcePool WITH (
CONCURRENT_QUERY_LIMIT=)" << NResourcePool::POOL_MAX_CONCURRENT_QUERY_LIMIT + 1 << R"(
);)").GetValueSync();
UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::SCHEME_ERROR);
UNIT_ASSERT_STRING_CONTAINS(result.GetIssues().ToString(), TStringBuilder() << "Invalid resource pool configuration, concurrent_query_limit is " << NResourcePool::POOL_MAX_CONCURRENT_QUERY_LIMIT + 1 << ", that exceeds limit in " << NResourcePool::POOL_MAX_CONCURRENT_QUERY_LIMIT);

result = session.ExecuteSchemeQuery(R"(
CREATE RESOURCE POOL MyResourcePool WITH (
QUEUE_SIZE=1
);)").GetValueSync();
UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::SCHEME_ERROR);
UNIT_ASSERT_STRING_CONTAINS(result.GetIssues().ToString(), "Invalid resource pool configuration, queue_size unsupported without concurrent_query_limit or database_load_cpu_threshold");
}

Y_UNIT_TEST(CreateResourcePool) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ Y_UNIT_TEST_SUITE(KqpWorkloadServiceActors) {
// Check alter access
TSampleQueries::CheckSuccess(ydb->ExecuteQuery(TStringBuilder() << R"(
ALTER RESOURCE POOL )" << NResourcePool::DEFAULT_POOL_ID << R"( SET (
QUEUE_SIZE=1
QUERY_MEMORY_LIMIT_PERCENT_PER_NODE=1
);
)", settings));

Expand Down Expand Up @@ -205,7 +205,7 @@ Y_UNIT_TEST_SUITE(KqpWorkloadServiceSubscriptions) {

ydb->ExecuteSchemeQuery(TStringBuilder() << R"(
ALTER RESOURCE POOL )" << ydb->GetSettings().PoolId_ << R"( SET (
QUEUE_SIZE=42
CONCURRENT_QUERY_LIMIT=42
);
)");

Expand All @@ -214,7 +214,7 @@ Y_UNIT_TEST_SUITE(KqpWorkloadServiceSubscriptions) {

const auto& config = response->Get()->Config;
UNIT_ASSERT_C(config, "Pool config not found");
UNIT_ASSERT_VALUES_EQUAL(config->QueueSize, 42);
UNIT_ASSERT_VALUES_EQUAL(config->ConcurrentQueryLimit, 42);
}

Y_UNIT_TEST(TestResourcePoolSubscriptionAfterAclChange) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ Y_UNIT_TEST_SUITE(KqpWorkloadServiceTables) {
Y_UNIT_TEST(TestTablesIsNotCreatingForUnlimitedPool) {
auto ydb = TYdbSetupSettings()
.ConcurrentQueryLimit(-1)
.QueueSize(10)
.QueryMemoryLimitPercentPerNode(50)
.Create();

TSampleQueries::TSelect42::CheckResult(ydb->ExecuteQuery(TSampleQueries::TSelect42::Query));
Expand Down
9 changes: 9 additions & 0 deletions ydb/core/resource_pools/resource_pool_settings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,4 +57,13 @@ std::unordered_map<TString, TPoolSettings::TProperty> TPoolSettings::GetProperti
return properties;
}

void TPoolSettings::Validate() const {
if (ConcurrentQueryLimit > POOL_MAX_CONCURRENT_QUERY_LIMIT) {
throw yexception() << "Invalid resource pool configuration, concurrent_query_limit is " << ConcurrentQueryLimit << ", that exceeds limit in " << POOL_MAX_CONCURRENT_QUERY_LIMIT;
}
if (QueueSize != -1 && ConcurrentQueryLimit == -1 && DatabaseLoadCpuThreshold < 0.0) {
throw yexception() << "Invalid resource pool configuration, queue_size unsupported without concurrent_query_limit or database_load_cpu_threshold";
}
}

} // namespace NKikimr::NResourcePool
3 changes: 3 additions & 0 deletions ydb/core/resource_pools/resource_pool_settings.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ namespace NKikimr::NResourcePool {

inline constexpr char DEFAULT_POOL_ID[] = "default";

inline constexpr i64 POOL_MAX_CONCURRENT_QUERY_LIMIT = 1000;

struct TPoolSettings : public TSettingsBase {
typedef double TPercent;

Expand All @@ -29,6 +31,7 @@ struct TPoolSettings : public TSettingsBase {

bool operator==(const TPoolSettings& other) const = default;
std::unordered_map<TString, TProperty> GetPropertiesMap(bool restricted = false);
void Validate() const;

i32 ConcurrentQueryLimit = -1; // -1 = disabled
i32 QueueSize = -1; // -1 = disabled
Expand Down
15 changes: 15 additions & 0 deletions ydb/core/resource_pools/resource_pool_settings_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,21 @@ Y_UNIT_TEST_SUITE(ResourcePoolTest) {
UNIT_ASSERT_VALUES_EQUAL(std::visit(extractor, propertiesMap["query_cancel_after_seconds"]), "15");
UNIT_ASSERT_VALUES_EQUAL(std::visit(extractor, propertiesMap["query_memory_limit_percent_per_node"]), "0.5");
}

Y_UNIT_TEST(SettingsValidation) {
{ // Max concurrent query limit validation
TPoolSettings settings;
settings.ConcurrentQueryLimit = POOL_MAX_CONCURRENT_QUERY_LIMIT + 1;
UNIT_ASSERT_EXCEPTION_CONTAINS(settings.Validate(), yexception, TStringBuilder() << "Invalid resource pool configuration, concurrent_query_limit is " << settings.ConcurrentQueryLimit << ", that exceeds limit in " << POOL_MAX_CONCURRENT_QUERY_LIMIT);
}

{ // Unused queue size validation

TPoolSettings settings;
settings.QueueSize = 1;
UNIT_ASSERT_EXCEPTION_CONTAINS(settings.Validate(), yexception, TStringBuilder() << "Invalid resource pool configuration, queue_size unsupported without concurrent_query_limit or database_load_cpu_threshold");
}
}
}

} // namespace NKikimr
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ class TAlterResourcePool : public TSubOperation {
Y_ABORT_UNLESS(oldResourcePoolInfo);
const TResourcePoolInfo::TPtr resourcePoolInfo = NResourcePool::ModifyResourcePool(resourcePoolDescription, oldResourcePoolInfo);
Y_ABORT_UNLESS(resourcePoolInfo);
RETURN_RESULT_UNLESS(NResourcePool::IsResourcePoolInfoValid(result, resourcePoolInfo));

result->SetPathId(dstPath.Base()->PathId.LocalPathId);
const TPathElement::TPtr resourcePool = ReplaceResourcePoolPathElement(dstPath);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
#include "schemeshard__operation_common_resource_pool.h"
#include "schemeshard_impl.h"

#include <ydb/core/resource_pools/resource_pool_settings.h>


namespace NKikimr::NSchemeShard::NResourcePool {

Expand Down Expand Up @@ -90,6 +92,23 @@ bool IsDescriptionValid(const THolder<TProposeResponse>& result, const NKikimrSc
return true;
}

bool IsResourcePoolInfoValid(const THolder<TProposeResponse>& result, const TResourcePoolInfo::TPtr& info) {
try {
const auto& properties = info->Properties.GetProperties();
NKikimr::NResourcePool::TPoolSettings settings;
for (auto [name, property] : settings.GetPropertiesMap()) {
if (const auto it = properties.find(name); it != properties.end()) {
std::visit(NKikimr::NResourcePool::TPoolSettings::TParser{it->second}, property);
}
}
settings.Validate();
} catch (...) {
result->SetError(NKikimrScheme::StatusSchemeError, CurrentExceptionMessage());
return false;
}
return true;
}

TTxState& CreateTransaction(const TOperationId& operationId, const TOperationContext& context, const TPathId& resourcePoolPathId, TTxState::ETxType txType) {
Y_ABORT_UNLESS(!context.SS->FindTx(operationId));
TTxState& txState = context.SS->CreateTx(operationId, txType, resourcePoolPathId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ bool IsApplyIfChecksPassed(const TTxTransaction& transaction, const THolder<TPro

bool IsDescriptionValid(const THolder<TProposeResponse>& result, const NKikimrSchemeOp::TResourcePoolDescription& description);

bool IsResourcePoolInfoValid(const THolder<TProposeResponse>& result, const TResourcePoolInfo::TPtr& info);

TTxState& CreateTransaction(const TOperationId& operationId, const TOperationContext& context, const TPathId& resourcePoolPathId, TTxState::ETxType txType);

void RegisterParentPathDependencies(const TOperationId& operationId, const TOperationContext& context, const TPath& parentPath);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ class TCreateResourcePool : public TSubOperation {

const TResourcePoolInfo::TPtr resourcePoolInfo = NResourcePool::CreateResourcePool(resourcePoolDescription, 1);
Y_ABORT_UNLESS(resourcePoolInfo);
RETURN_RESULT_UNLESS(NResourcePool::IsResourcePoolInfoValid(result, resourcePoolInfo));

AddPathInSchemeShard(result, dstPath, owner);
const TPathElement::TPtr resourcePool = CreateResourcePoolPathElement(dstPath);
Expand Down
1 change: 1 addition & 0 deletions ydb/core/tx/schemeshard/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,7 @@ PEERDIR(
ydb/core/persqueue/events
ydb/core/persqueue/writer
ydb/core/protos
ydb/core/resource_pools
ydb/core/scheme
ydb/core/statistics
ydb/core/sys_view/partition_stats
Expand Down

0 comments on commit db15c1c

Please sign in to comment.