Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

YQ-3445 added feature flag for resource pools on sls #6808

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 13 additions & 8 deletions ydb/core/kqp/gateway/behaviour/resource_pool/manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -192,19 +192,19 @@ TResourcePoolManager::TAsyncStatus TResourcePoolManager::DoModify(const NYql::TO
TResourcePoolManager::TAsyncStatus TResourcePoolManager::CreateResourcePool(const NYql::TCreateObjectSettings& settings, TInternalModificationContext& context, ui32 nodeId) const {
NKqpProto::TKqpSchemeOperation schemeOperation;
PrepareCreateResourcePool(schemeOperation, settings, context);
return ExecuteSchemeRequest(schemeOperation.GetCreateResourcePool(), context.GetExternalData(), nodeId);
return ExecuteSchemeRequest(schemeOperation.GetCreateResourcePool(), context.GetExternalData(), nodeId, NKqpProto::TKqpSchemeOperation::kCreateResourcePool);
}

TResourcePoolManager::TAsyncStatus TResourcePoolManager::AlterResourcePool(const NYql::TCreateObjectSettings& settings, TInternalModificationContext& context, ui32 nodeId) const {
NKqpProto::TKqpSchemeOperation schemeOperation;
PrepareAlterResourcePool(schemeOperation, settings, context);
return ExecuteSchemeRequest(schemeOperation.GetAlterResourcePool(), context.GetExternalData(), nodeId);
return ExecuteSchemeRequest(schemeOperation.GetAlterResourcePool(), context.GetExternalData(), nodeId, NKqpProto::TKqpSchemeOperation::kAlterResourcePool);
}

TResourcePoolManager::TAsyncStatus TResourcePoolManager::DropResourcePool(const NYql::TCreateObjectSettings& settings, TInternalModificationContext& context, ui32 nodeId) const {
NKqpProto::TKqpSchemeOperation schemeOperation;
PrepareDropResourcePool(schemeOperation, settings, context);
return ExecuteSchemeRequest(schemeOperation.GetDropResourcePool(), context.GetExternalData(), nodeId);
return ExecuteSchemeRequest(schemeOperation.GetDropResourcePool(), context.GetExternalData(), nodeId, NKqpProto::TKqpSchemeOperation::kDropResourcePool);
}

//// Deferred modification
Expand Down Expand Up @@ -271,11 +271,11 @@ TResourcePoolManager::TAsyncStatus TResourcePoolManager::ExecutePrepared(const N
try {
switch (schemeOperation.GetOperationCase()) {
case NKqpProto::TKqpSchemeOperation::kCreateResourcePool:
return ExecuteSchemeRequest(schemeOperation.GetCreateResourcePool(), context, nodeId);
return ExecuteSchemeRequest(schemeOperation.GetCreateResourcePool(), context, nodeId, schemeOperation.GetOperationCase());
case NKqpProto::TKqpSchemeOperation::kAlterResourcePool:
return ExecuteSchemeRequest(schemeOperation.GetAlterResourcePool(), context, nodeId);
return ExecuteSchemeRequest(schemeOperation.GetAlterResourcePool(), context, nodeId, schemeOperation.GetOperationCase());
case NKqpProto::TKqpSchemeOperation::kDropResourcePool:
return ExecuteSchemeRequest(schemeOperation.GetDropResourcePool(), context, nodeId);
return ExecuteSchemeRequest(schemeOperation.GetDropResourcePool(), context, nodeId, schemeOperation.GetOperationCase());
default:
return NThreading::MakeFuture(TYqlConclusionStatus::Fail(TStringBuilder() << "Execution of prepare operation for RESOURCE_POOL object: unsupported operation: " << static_cast<i32>(schemeOperation.GetOperationCase())));
}
Expand All @@ -294,8 +294,13 @@ TResourcePoolManager::TAsyncStatus TResourcePoolManager::ChainFeatures(TAsyncSta
});
}

TResourcePoolManager::TAsyncStatus TResourcePoolManager::ExecuteSchemeRequest(const NKikimrSchemeOp::TModifyScheme& schemeTx, const TExternalModificationContext& context, ui32 nodeId) const {
auto validationFuture = CheckFeatureFlag(context, nodeId);
TResourcePoolManager::TAsyncStatus TResourcePoolManager::ExecuteSchemeRequest(const NKikimrSchemeOp::TModifyScheme& schemeTx, const TExternalModificationContext& context, ui32 nodeId, NKqpProto::TKqpSchemeOperation::OperationCase operationCase) const {
TAsyncStatus validationFuture = NThreading::MakeFuture<TYqlConclusionStatus>(TYqlConclusionStatus::Success());
if (operationCase != NKqpProto::TKqpSchemeOperation::kDropResourcePool) {
validationFuture = ChainFeatures(validationFuture, [context, nodeId] {
return CheckFeatureFlag(context, nodeId);
});
}
return ChainFeatures(validationFuture, [schemeTx, context] {
return SendSchemeRequest(schemeTx, context);
});
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/kqp/gateway/behaviour/resource_pool/manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ class TResourcePoolManager : public NMetadata::NModifications::IOperationsManage
void PrepareDropResourcePool(NKqpProto::TKqpSchemeOperation& schemeOperation, const NYql::TDropObjectSettings& settings, TInternalModificationContext& context) const;

TAsyncStatus ChainFeatures(TAsyncStatus lastFeature, std::function<TAsyncStatus()> callback) const;
TAsyncStatus ExecuteSchemeRequest(const NKikimrSchemeOp::TModifyScheme& schemeTx, const TExternalModificationContext& context, ui32 nodeId) const;
TAsyncStatus ExecuteSchemeRequest(const NKikimrSchemeOp::TModifyScheme& schemeTx, const TExternalModificationContext& context, ui32 nodeId, NKqpProto::TKqpSchemeOperation::OperationCase operationCase) const;
};

} // namespace NKikimr::NKqp
14 changes: 10 additions & 4 deletions ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6142,11 +6142,15 @@ Y_UNIT_TEST_SUITE(KqpScheme) {
auto db = kikimr.GetTableClient();
auto session = db.CreateSession().GetValueSync().GetSession();

auto checkDisabled = [&session](const TString& query) {
auto checkQuery = [&session](const TString& query, EStatus status, const TString& error) {
Cerr << "Check query:\n" << query << "\n";
auto result = session.ExecuteSchemeQuery(query).GetValueSync();
UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::UNSUPPORTED);
UNIT_ASSERT_STRING_CONTAINS(result.GetIssues().ToString(), "Resource pools are disabled. Please contact your system administrator to enable it");
UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), status);
UNIT_ASSERT_STRING_CONTAINS(result.GetIssues().ToString(), error);
};

auto checkDisabled = [checkQuery](const TString& query) {
checkQuery(query, EStatus::UNSUPPORTED, "Resource pools are disabled. Please contact your system administrator to enable it");
};

// CREATE RESOURCE POOL
Expand All @@ -6165,7 +6169,9 @@ Y_UNIT_TEST_SUITE(KqpScheme) {
)");

// DROP RESOURCE POOL
checkDisabled("DROP RESOURCE POOL MyResourcePool;");
checkQuery("DROP RESOURCE POOL MyResourcePool;",
EStatus::SCHEME_ERROR,
"Path does not exist");
}

Y_UNIT_TEST(ResourcePoolsValidation) {
Expand Down
4 changes: 2 additions & 2 deletions ydb/core/kqp/workload_service/actors/actors.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@ namespace NKikimr::NKqp::NWorkload {
NActors::IActor* CreatePoolHandlerActor(const TString& database, const TString& poolId, const NResourcePool::TPoolSettings& poolConfig, NMonitoring::TDynamicCounterPtr counters);

// Fetch pool and create default pool if needed
NActors::IActor* CreatePoolResolverActor(TEvPlaceRequestIntoPool::TPtr event, bool defaultPoolExists);
NActors::IActor* CreatePoolResolverActor(TEvPlaceRequestIntoPool::TPtr event, bool defaultPoolExists, bool enableOnServerless);

// Fetch and create pool in scheme shard
NActors::IActor* CreatePoolFetcherActor(const NActors::TActorId& replyActorId, const TString& database, const TString& poolId, TIntrusiveConstPtr<NACLib::TUserToken> userToken);
NActors::IActor* CreatePoolFetcherActor(const NActors::TActorId& replyActorId, const TString& database, const TString& poolId, TIntrusiveConstPtr<NACLib::TUserToken> userToken, bool enableOnServerless);
NActors::IActor* CreatePoolCreatorActor(const NActors::TActorId& replyActorId, const TString& database, const TString& poolId, const NResourcePool::TPoolSettings& poolConfig, TIntrusiveConstPtr<NACLib::TUserToken> userToken, NACLibProto::TDiffACL diffAcl);

// Cpu load fetcher actor
Expand Down
23 changes: 16 additions & 7 deletions ydb/core/kqp/workload_service/actors/scheme_actors.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,9 @@ using namespace NActors;

class TPoolResolverActor : public TActorBootstrapped<TPoolResolverActor> {
public:
TPoolResolverActor(TEvPlaceRequestIntoPool::TPtr event, bool defaultPoolExists)
TPoolResolverActor(TEvPlaceRequestIntoPool::TPtr event, bool defaultPoolExists, bool enableOnServerless)
: Event(std::move(event))
, EnableOnServerless(enableOnServerless)
{
if (!Event->Get()->PoolId) {
Event->Get()->PoolId = NResourcePool::DEFAULT_POOL_ID;
Expand All @@ -36,7 +37,7 @@ class TPoolResolverActor : public TActorBootstrapped<TPoolResolverActor> {

void StartPoolFetchRequest() const {
LOG_D("Start pool fetching");
Register(CreatePoolFetcherActor(SelfId(), Event->Get()->Database, Event->Get()->PoolId, Event->Get()->UserToken));
Register(CreatePoolFetcherActor(SelfId(), Event->Get()->Database, Event->Get()->PoolId, Event->Get()->UserToken, EnableOnServerless));
}

void Handle(TEvPrivate::TEvFetchPoolResponse::TPtr& ev) {
Expand Down Expand Up @@ -107,18 +108,20 @@ class TPoolResolverActor : public TActorBootstrapped<TPoolResolverActor> {

private:
TEvPlaceRequestIntoPool::TPtr Event;
const bool EnableOnServerless;
bool CanCreatePool = false;
bool DefaultPoolCreated = false;
};


class TPoolFetcherActor : public TSchemeActorBase<TPoolFetcherActor> {
public:
TPoolFetcherActor(const NActors::TActorId& replyActorId, const TString& database, const TString& poolId, TIntrusiveConstPtr<NACLib::TUserToken> userToken)
TPoolFetcherActor(const NActors::TActorId& replyActorId, const TString& database, const TString& poolId, TIntrusiveConstPtr<NACLib::TUserToken> userToken, bool enableOnServerless)
: ReplyActorId(replyActorId)
, Database(database)
, PoolId(poolId)
, UserToken(userToken)
, EnableOnServerless(enableOnServerless)
{}

void DoBootstrap() {
Expand All @@ -133,6 +136,11 @@ class TPoolFetcherActor : public TSchemeActorBase<TPoolFetcherActor> {
}

const auto& result = results[0];
if (!EnableOnServerless && result.DomainInfo && result.DomainInfo->IsServerless()) {
Reply(Ydb::StatusIds::UNSUPPORTED, "Resource pools are disabled for serverless domains. Please contact your system administrator to enable it");
return;
}

switch (result.Status) {
case EStatus::Unknown:
case EStatus::PathNotTable:
Expand Down Expand Up @@ -222,6 +230,7 @@ class TPoolFetcherActor : public TSchemeActorBase<TPoolFetcherActor> {
const TString Database;
const TString PoolId;
const TIntrusiveConstPtr<NACLib::TUserToken> UserToken;
const bool EnableOnServerless;

NResourcePool::TPoolSettings PoolConfig;
NKikimrProto::TPathID PathId;
Expand Down Expand Up @@ -365,12 +374,12 @@ class TPoolCreatorActor : public TSchemeActorBase<TPoolCreatorActor> {

} // anonymous namespace

IActor* CreatePoolResolverActor(TEvPlaceRequestIntoPool::TPtr event, bool defaultPoolExists) {
return new TPoolResolverActor(std::move(event), defaultPoolExists);
IActor* CreatePoolResolverActor(TEvPlaceRequestIntoPool::TPtr event, bool defaultPoolExists, bool enableOnServerless) {
return new TPoolResolverActor(std::move(event), defaultPoolExists, enableOnServerless);
}

IActor* CreatePoolFetcherActor(const TActorId& replyActorId, const TString& database, const TString& poolId, TIntrusiveConstPtr<NACLib::TUserToken> userToken) {
return new TPoolFetcherActor(replyActorId, database, poolId, userToken);
IActor* CreatePoolFetcherActor(const TActorId& replyActorId, const TString& database, const TString& poolId, TIntrusiveConstPtr<NACLib::TUserToken> userToken, bool enableOnServerless) {
return new TPoolFetcherActor(replyActorId, database, poolId, userToken, enableOnServerless);
}

IActor* CreatePoolCreatorActor(const TActorId& replyActorId, const TString& database, const TString& poolId, const NResourcePool::TPoolSettings& poolConfig, TIntrusiveConstPtr<NACLib::TUserToken> userToken, NACLibProto::TDiffACL diffAcl) {
Expand Down
5 changes: 4 additions & 1 deletion ydb/core/kqp/workload_service/kqp_workload_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ class TKqpWorkloadService : public TActorBootstrapped<TKqpWorkloadService> {
CpuQuotaManager = std::make_unique<TCpuQuotaManagerState>(ActorContext(), Counters->GetSubgroup("subcomponent", "CpuQuotaManager"));

EnabledResourcePools = AppData()->FeatureFlags.GetEnableResourcePools();
EnabledResourcePoolsOnServerless = AppData()->FeatureFlags.GetEnableResourcePoolsOnServerless();
if (EnabledResourcePools) {
InitializeWorkloadService();
}
Expand All @@ -84,6 +85,7 @@ class TKqpWorkloadService : public TActorBootstrapped<TKqpWorkloadService> {
const auto& event = ev->Get()->Record;

EnabledResourcePools = event.GetConfig().GetFeatureFlags().GetEnableResourcePools();
EnabledResourcePoolsOnServerless = event.GetConfig().GetFeatureFlags().GetEnableResourcePoolsOnServerless();
if (EnabledResourcePools) {
LOG_I("Resource pools was enanbled");
InitializeWorkloadService();
Expand Down Expand Up @@ -135,7 +137,7 @@ class TKqpWorkloadService : public TActorBootstrapped<TKqpWorkloadService> {

LOG_D("Recieved new request from " << workerActorId << ", Database: " << ev->Get()->Database << ", PoolId: " << ev->Get()->PoolId << ", SessionId: " << ev->Get()->SessionId);
bool hasDefaultPool = DatabasesWithDefaultPool.contains(CanonizePath(ev->Get()->Database));
Register(CreatePoolResolverActor(std::move(ev), hasDefaultPool));
Register(CreatePoolResolverActor(std::move(ev), hasDefaultPool, EnabledResourcePoolsOnServerless));
}

void Handle(TEvCleanupRequest::TPtr& ev) {
Expand Down Expand Up @@ -520,6 +522,7 @@ class TKqpWorkloadService : public TActorBootstrapped<TKqpWorkloadService> {
NMonitoring::TDynamicCounterPtr Counters;

bool EnabledResourcePools = false;
bool EnabledResourcePoolsOnServerless = false;
bool ServiceInitialized = false;
bool IdleChecksStarted = false;
ETablesCreationStatus TablesCreationStatus = ETablesCreationStatus::Cleanup;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ TEvPrivate::TEvFetchPoolResponse::TPtr FetchPool(TIntrusivePtr<IYdbSetup> ydb, c
auto runtime = ydb->GetRuntime();
const auto& edgeActor = runtime->AllocateEdgeActor();

runtime->Register(CreatePoolFetcherActor(edgeActor, settings.DomainName_, poolId ? poolId : settings.PoolId_, MakeIntrusive<NACLib::TUserToken>(userSID, TVector<NACLib::TSID>{})));
runtime->Register(CreatePoolFetcherActor(edgeActor, settings.DomainName_, poolId ? poolId : settings.PoolId_, MakeIntrusive<NACLib::TUserToken>(userSID, TVector<NACLib::TSID>{}), true));
return runtime->GrabEdgeEvent<TEvPrivate::TEvFetchPoolResponse>(edgeActor, FUTURE_WAIT_TIMEOUT);
}

Expand Down
3 changes: 2 additions & 1 deletion ydb/core/protos/feature_flags.proto
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,8 @@ message TFeatureFlags {
optional bool EnableExternalSourceSchemaInference = 126 [default = false];
optional bool EnableDbMetadataCache = 127 [default = false];
optional bool EnableTableDatetime64 = 128 [default = true];
optional bool EnableResourcePools = 129 [default = false];
optional bool EnableResourcePools = 129 [default = false];
optional bool EnableColumnStatistics = 130 [default = false];
optional bool EnableSingleCompositeActionGroup = 131 [default = false];
optional bool EnableResourcePoolsOnServerless = 132 [default = false];
}
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,13 @@ class TAlterResourcePool : public TSubOperation {
static_cast<ui64>(OperationId.GetTxId()),
static_cast<ui64>(context.SS->SelfTabletId()));

if (context.SS->IsServerlessDomain(TPath::Init(context.SS->RootPathId(), context.SS))) {
if (!context.SS->EnableResourcePoolsOnServerless) {
result->SetError(NKikimrScheme::StatusPreconditionFailed, "Resource pools are disabled for serverless domains. Please contact your system administrator to enable it");
return result;
}
}

const TPath& parentPath = TPath::Resolve(parentPathStr, context.SS);
RETURN_RESULT_UNLESS(NResourcePool::IsParentPathValid(result, parentPath));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,13 @@ class TCreateResourcePool : public TSubOperation {
static_cast<ui64>(OperationId.GetTxId()),
static_cast<ui64>(context.SS->SelfTabletId()));

if (context.SS->IsServerlessDomain(TPath::Init(context.SS->RootPathId(), context.SS))) {
if (!context.SS->EnableResourcePoolsOnServerless) {
result->SetError(NKikimrScheme::StatusPreconditionFailed, "Resource pools are disabled for serverless domains. Please contact your system administrator to enable it");
return result;
}
}

const TPath& parentPath = TPath::Resolve(parentPathStr, context.SS);
RETURN_RESULT_UNLESS(NResourcePool::IsParentPathValid(result, parentPath));

Expand Down
1 change: 1 addition & 0 deletions ydb/core/tx/schemeshard/schemeshard_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7010,6 +7010,7 @@ void TSchemeShard::ApplyConsoleConfigs(const NKikimrConfig::TFeatureFlags& featu
EnableTempTables = featureFlags.GetEnableTempTables();
EnableReplaceIfExistsForExternalEntities = featureFlags.GetEnableReplaceIfExistsForExternalEntities();
EnableTableDatetime64 = featureFlags.GetEnableTableDatetime64();
EnableResourcePoolsOnServerless = featureFlags.GetEnableResourcePoolsOnServerless();
}

void TSchemeShard::ConfigureStatsBatching(const NKikimrConfig::TSchemeShardConfig& config, const TActorContext& ctx) {
Expand Down
1 change: 1 addition & 0 deletions ydb/core/tx/schemeshard/schemeshard_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,7 @@ class TSchemeShard
bool EnableReplaceIfExistsForExternalEntities = false;
bool EnableTempTables = false;
bool EnableTableDatetime64 = false;
bool EnableResourcePoolsOnServerless = false;

TShardDeleter ShardDeleter;

Expand Down
Loading