Skip to content

Commit

Permalink
YQ-3445 added feature flag for resource pools on sls (#6808)
Browse files Browse the repository at this point in the history
  • Loading branch information
GrigoriyPA authored Jul 22, 2024
1 parent ce7c29c commit 48bdda4
Show file tree
Hide file tree
Showing 12 changed files with 65 additions and 25 deletions.
21 changes: 13 additions & 8 deletions ydb/core/kqp/gateway/behaviour/resource_pool/manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -192,19 +192,19 @@ TResourcePoolManager::TAsyncStatus TResourcePoolManager::DoModify(const NYql::TO
TResourcePoolManager::TAsyncStatus TResourcePoolManager::CreateResourcePool(const NYql::TCreateObjectSettings& settings, TInternalModificationContext& context, ui32 nodeId) const {
NKqpProto::TKqpSchemeOperation schemeOperation;
PrepareCreateResourcePool(schemeOperation, settings, context);
return ExecuteSchemeRequest(schemeOperation.GetCreateResourcePool(), context.GetExternalData(), nodeId);
return ExecuteSchemeRequest(schemeOperation.GetCreateResourcePool(), context.GetExternalData(), nodeId, NKqpProto::TKqpSchemeOperation::kCreateResourcePool);
}

TResourcePoolManager::TAsyncStatus TResourcePoolManager::AlterResourcePool(const NYql::TCreateObjectSettings& settings, TInternalModificationContext& context, ui32 nodeId) const {
NKqpProto::TKqpSchemeOperation schemeOperation;
PrepareAlterResourcePool(schemeOperation, settings, context);
return ExecuteSchemeRequest(schemeOperation.GetAlterResourcePool(), context.GetExternalData(), nodeId);
return ExecuteSchemeRequest(schemeOperation.GetAlterResourcePool(), context.GetExternalData(), nodeId, NKqpProto::TKqpSchemeOperation::kAlterResourcePool);
}

TResourcePoolManager::TAsyncStatus TResourcePoolManager::DropResourcePool(const NYql::TCreateObjectSettings& settings, TInternalModificationContext& context, ui32 nodeId) const {
NKqpProto::TKqpSchemeOperation schemeOperation;
PrepareDropResourcePool(schemeOperation, settings, context);
return ExecuteSchemeRequest(schemeOperation.GetDropResourcePool(), context.GetExternalData(), nodeId);
return ExecuteSchemeRequest(schemeOperation.GetDropResourcePool(), context.GetExternalData(), nodeId, NKqpProto::TKqpSchemeOperation::kDropResourcePool);
}

//// Deferred modification
Expand Down Expand Up @@ -271,11 +271,11 @@ TResourcePoolManager::TAsyncStatus TResourcePoolManager::ExecutePrepared(const N
try {
switch (schemeOperation.GetOperationCase()) {
case NKqpProto::TKqpSchemeOperation::kCreateResourcePool:
return ExecuteSchemeRequest(schemeOperation.GetCreateResourcePool(), context, nodeId);
return ExecuteSchemeRequest(schemeOperation.GetCreateResourcePool(), context, nodeId, schemeOperation.GetOperationCase());
case NKqpProto::TKqpSchemeOperation::kAlterResourcePool:
return ExecuteSchemeRequest(schemeOperation.GetAlterResourcePool(), context, nodeId);
return ExecuteSchemeRequest(schemeOperation.GetAlterResourcePool(), context, nodeId, schemeOperation.GetOperationCase());
case NKqpProto::TKqpSchemeOperation::kDropResourcePool:
return ExecuteSchemeRequest(schemeOperation.GetDropResourcePool(), context, nodeId);
return ExecuteSchemeRequest(schemeOperation.GetDropResourcePool(), context, nodeId, schemeOperation.GetOperationCase());
default:
return NThreading::MakeFuture(TYqlConclusionStatus::Fail(TStringBuilder() << "Execution of prepare operation for RESOURCE_POOL object: unsupported operation: " << static_cast<i32>(schemeOperation.GetOperationCase())));
}
Expand All @@ -294,8 +294,13 @@ TResourcePoolManager::TAsyncStatus TResourcePoolManager::ChainFeatures(TAsyncSta
});
}

TResourcePoolManager::TAsyncStatus TResourcePoolManager::ExecuteSchemeRequest(const NKikimrSchemeOp::TModifyScheme& schemeTx, const TExternalModificationContext& context, ui32 nodeId) const {
auto validationFuture = CheckFeatureFlag(context, nodeId);
TResourcePoolManager::TAsyncStatus TResourcePoolManager::ExecuteSchemeRequest(const NKikimrSchemeOp::TModifyScheme& schemeTx, const TExternalModificationContext& context, ui32 nodeId, NKqpProto::TKqpSchemeOperation::OperationCase operationCase) const {
TAsyncStatus validationFuture = NThreading::MakeFuture<TYqlConclusionStatus>(TYqlConclusionStatus::Success());
if (operationCase != NKqpProto::TKqpSchemeOperation::kDropResourcePool) {
validationFuture = ChainFeatures(validationFuture, [context, nodeId] {
return CheckFeatureFlag(context, nodeId);
});
}
return ChainFeatures(validationFuture, [schemeTx, context] {
return SendSchemeRequest(schemeTx, context);
});
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/kqp/gateway/behaviour/resource_pool/manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ class TResourcePoolManager : public NMetadata::NModifications::IOperationsManage
void PrepareDropResourcePool(NKqpProto::TKqpSchemeOperation& schemeOperation, const NYql::TDropObjectSettings& settings, TInternalModificationContext& context) const;

TAsyncStatus ChainFeatures(TAsyncStatus lastFeature, std::function<TAsyncStatus()> callback) const;
TAsyncStatus ExecuteSchemeRequest(const NKikimrSchemeOp::TModifyScheme& schemeTx, const TExternalModificationContext& context, ui32 nodeId) const;
TAsyncStatus ExecuteSchemeRequest(const NKikimrSchemeOp::TModifyScheme& schemeTx, const TExternalModificationContext& context, ui32 nodeId, NKqpProto::TKqpSchemeOperation::OperationCase operationCase) const;
};

} // namespace NKikimr::NKqp
14 changes: 10 additions & 4 deletions ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6178,11 +6178,15 @@ Y_UNIT_TEST_SUITE(KqpScheme) {
auto db = kikimr.GetTableClient();
auto session = db.CreateSession().GetValueSync().GetSession();

auto checkDisabled = [&session](const TString& query) {
auto checkQuery = [&session](const TString& query, EStatus status, const TString& error) {
Cerr << "Check query:\n" << query << "\n";
auto result = session.ExecuteSchemeQuery(query).GetValueSync();
UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::UNSUPPORTED);
UNIT_ASSERT_STRING_CONTAINS(result.GetIssues().ToString(), "Resource pools are disabled. Please contact your system administrator to enable it");
UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), status);
UNIT_ASSERT_STRING_CONTAINS(result.GetIssues().ToString(), error);
};

auto checkDisabled = [checkQuery](const TString& query) {
checkQuery(query, EStatus::UNSUPPORTED, "Resource pools are disabled. Please contact your system administrator to enable it");
};

// CREATE RESOURCE POOL
Expand All @@ -6201,7 +6205,9 @@ Y_UNIT_TEST_SUITE(KqpScheme) {
)");

// DROP RESOURCE POOL
checkDisabled("DROP RESOURCE POOL MyResourcePool;");
checkQuery("DROP RESOURCE POOL MyResourcePool;",
EStatus::SCHEME_ERROR,
"Path does not exist");
}

Y_UNIT_TEST(ResourcePoolsValidation) {
Expand Down
4 changes: 2 additions & 2 deletions ydb/core/kqp/workload_service/actors/actors.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@ namespace NKikimr::NKqp::NWorkload {
NActors::IActor* CreatePoolHandlerActor(const TString& database, const TString& poolId, const NResourcePool::TPoolSettings& poolConfig, NMonitoring::TDynamicCounterPtr counters);

// Fetch pool and create default pool if needed
NActors::IActor* CreatePoolResolverActor(TEvPlaceRequestIntoPool::TPtr event, bool defaultPoolExists);
NActors::IActor* CreatePoolResolverActor(TEvPlaceRequestIntoPool::TPtr event, bool defaultPoolExists, bool enableOnServerless);

// Fetch and create pool in scheme shard
NActors::IActor* CreatePoolFetcherActor(const NActors::TActorId& replyActorId, const TString& database, const TString& poolId, TIntrusiveConstPtr<NACLib::TUserToken> userToken);
NActors::IActor* CreatePoolFetcherActor(const NActors::TActorId& replyActorId, const TString& database, const TString& poolId, TIntrusiveConstPtr<NACLib::TUserToken> userToken, bool enableOnServerless);
NActors::IActor* CreatePoolCreatorActor(const NActors::TActorId& replyActorId, const TString& database, const TString& poolId, const NResourcePool::TPoolSettings& poolConfig, TIntrusiveConstPtr<NACLib::TUserToken> userToken, NACLibProto::TDiffACL diffAcl);

// Cpu load fetcher actor
Expand Down
23 changes: 16 additions & 7 deletions ydb/core/kqp/workload_service/actors/scheme_actors.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,9 @@ using namespace NActors;

class TPoolResolverActor : public TActorBootstrapped<TPoolResolverActor> {
public:
TPoolResolverActor(TEvPlaceRequestIntoPool::TPtr event, bool defaultPoolExists)
TPoolResolverActor(TEvPlaceRequestIntoPool::TPtr event, bool defaultPoolExists, bool enableOnServerless)
: Event(std::move(event))
, EnableOnServerless(enableOnServerless)
{
if (!Event->Get()->PoolId) {
Event->Get()->PoolId = NResourcePool::DEFAULT_POOL_ID;
Expand All @@ -36,7 +37,7 @@ class TPoolResolverActor : public TActorBootstrapped<TPoolResolverActor> {

void StartPoolFetchRequest() const {
LOG_D("Start pool fetching");
Register(CreatePoolFetcherActor(SelfId(), Event->Get()->Database, Event->Get()->PoolId, Event->Get()->UserToken));
Register(CreatePoolFetcherActor(SelfId(), Event->Get()->Database, Event->Get()->PoolId, Event->Get()->UserToken, EnableOnServerless));
}

void Handle(TEvPrivate::TEvFetchPoolResponse::TPtr& ev) {
Expand Down Expand Up @@ -107,18 +108,20 @@ class TPoolResolverActor : public TActorBootstrapped<TPoolResolverActor> {

private:
TEvPlaceRequestIntoPool::TPtr Event;
const bool EnableOnServerless;
bool CanCreatePool = false;
bool DefaultPoolCreated = false;
};


class TPoolFetcherActor : public TSchemeActorBase<TPoolFetcherActor> {
public:
TPoolFetcherActor(const NActors::TActorId& replyActorId, const TString& database, const TString& poolId, TIntrusiveConstPtr<NACLib::TUserToken> userToken)
TPoolFetcherActor(const NActors::TActorId& replyActorId, const TString& database, const TString& poolId, TIntrusiveConstPtr<NACLib::TUserToken> userToken, bool enableOnServerless)
: ReplyActorId(replyActorId)
, Database(database)
, PoolId(poolId)
, UserToken(userToken)
, EnableOnServerless(enableOnServerless)
{}

void DoBootstrap() {
Expand All @@ -133,6 +136,11 @@ class TPoolFetcherActor : public TSchemeActorBase<TPoolFetcherActor> {
}

const auto& result = results[0];
if (!EnableOnServerless && result.DomainInfo && result.DomainInfo->IsServerless()) {
Reply(Ydb::StatusIds::UNSUPPORTED, "Resource pools are disabled for serverless domains. Please contact your system administrator to enable it");
return;
}

switch (result.Status) {
case EStatus::Unknown:
case EStatus::PathNotTable:
Expand Down Expand Up @@ -222,6 +230,7 @@ class TPoolFetcherActor : public TSchemeActorBase<TPoolFetcherActor> {
const TString Database;
const TString PoolId;
const TIntrusiveConstPtr<NACLib::TUserToken> UserToken;
const bool EnableOnServerless;

NResourcePool::TPoolSettings PoolConfig;
NKikimrProto::TPathID PathId;
Expand Down Expand Up @@ -365,12 +374,12 @@ class TPoolCreatorActor : public TSchemeActorBase<TPoolCreatorActor> {

} // anonymous namespace

IActor* CreatePoolResolverActor(TEvPlaceRequestIntoPool::TPtr event, bool defaultPoolExists) {
return new TPoolResolverActor(std::move(event), defaultPoolExists);
IActor* CreatePoolResolverActor(TEvPlaceRequestIntoPool::TPtr event, bool defaultPoolExists, bool enableOnServerless) {
return new TPoolResolverActor(std::move(event), defaultPoolExists, enableOnServerless);
}

IActor* CreatePoolFetcherActor(const TActorId& replyActorId, const TString& database, const TString& poolId, TIntrusiveConstPtr<NACLib::TUserToken> userToken) {
return new TPoolFetcherActor(replyActorId, database, poolId, userToken);
IActor* CreatePoolFetcherActor(const TActorId& replyActorId, const TString& database, const TString& poolId, TIntrusiveConstPtr<NACLib::TUserToken> userToken, bool enableOnServerless) {
return new TPoolFetcherActor(replyActorId, database, poolId, userToken, enableOnServerless);
}

IActor* CreatePoolCreatorActor(const TActorId& replyActorId, const TString& database, const TString& poolId, const NResourcePool::TPoolSettings& poolConfig, TIntrusiveConstPtr<NACLib::TUserToken> userToken, NACLibProto::TDiffACL diffAcl) {
Expand Down
5 changes: 4 additions & 1 deletion ydb/core/kqp/workload_service/kqp_workload_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ class TKqpWorkloadService : public TActorBootstrapped<TKqpWorkloadService> {
CpuQuotaManager = std::make_unique<TCpuQuotaManagerState>(ActorContext(), Counters->GetSubgroup("subcomponent", "CpuQuotaManager"));

EnabledResourcePools = AppData()->FeatureFlags.GetEnableResourcePools();
EnabledResourcePoolsOnServerless = AppData()->FeatureFlags.GetEnableResourcePoolsOnServerless();
if (EnabledResourcePools) {
InitializeWorkloadService();
}
Expand All @@ -84,6 +85,7 @@ class TKqpWorkloadService : public TActorBootstrapped<TKqpWorkloadService> {
const auto& event = ev->Get()->Record;

EnabledResourcePools = event.GetConfig().GetFeatureFlags().GetEnableResourcePools();
EnabledResourcePoolsOnServerless = event.GetConfig().GetFeatureFlags().GetEnableResourcePoolsOnServerless();
if (EnabledResourcePools) {
LOG_I("Resource pools was enanbled");
InitializeWorkloadService();
Expand Down Expand Up @@ -135,7 +137,7 @@ class TKqpWorkloadService : public TActorBootstrapped<TKqpWorkloadService> {

LOG_D("Recieved new request from " << workerActorId << ", Database: " << ev->Get()->Database << ", PoolId: " << ev->Get()->PoolId << ", SessionId: " << ev->Get()->SessionId);
bool hasDefaultPool = DatabasesWithDefaultPool.contains(CanonizePath(ev->Get()->Database));
Register(CreatePoolResolverActor(std::move(ev), hasDefaultPool));
Register(CreatePoolResolverActor(std::move(ev), hasDefaultPool, EnabledResourcePoolsOnServerless));
}

void Handle(TEvCleanupRequest::TPtr& ev) {
Expand Down Expand Up @@ -520,6 +522,7 @@ class TKqpWorkloadService : public TActorBootstrapped<TKqpWorkloadService> {
NMonitoring::TDynamicCounterPtr Counters;

bool EnabledResourcePools = false;
bool EnabledResourcePoolsOnServerless = false;
bool ServiceInitialized = false;
bool IdleChecksStarted = false;
ETablesCreationStatus TablesCreationStatus = ETablesCreationStatus::Cleanup;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ TEvPrivate::TEvFetchPoolResponse::TPtr FetchPool(TIntrusivePtr<IYdbSetup> ydb, c
auto runtime = ydb->GetRuntime();
const auto& edgeActor = runtime->AllocateEdgeActor();

runtime->Register(CreatePoolFetcherActor(edgeActor, settings.DomainName_, poolId ? poolId : settings.PoolId_, MakeIntrusive<NACLib::TUserToken>(userSID, TVector<NACLib::TSID>{})));
runtime->Register(CreatePoolFetcherActor(edgeActor, settings.DomainName_, poolId ? poolId : settings.PoolId_, MakeIntrusive<NACLib::TUserToken>(userSID, TVector<NACLib::TSID>{}), true));
return runtime->GrabEdgeEvent<TEvPrivate::TEvFetchPoolResponse>(edgeActor, FUTURE_WAIT_TIMEOUT);
}

Expand Down
3 changes: 2 additions & 1 deletion ydb/core/protos/feature_flags.proto
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,8 @@ message TFeatureFlags {
optional bool EnableExternalSourceSchemaInference = 126 [default = false];
optional bool EnableDbMetadataCache = 127 [default = false];
optional bool EnableTableDatetime64 = 128 [default = true];
optional bool EnableResourcePools = 129 [default = false];
optional bool EnableResourcePools = 129 [default = false];
optional bool EnableColumnStatistics = 130 [default = false];
optional bool EnableSingleCompositeActionGroup = 131 [default = false];
optional bool EnableResourcePoolsOnServerless = 132 [default = false];
}
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,13 @@ class TAlterResourcePool : public TSubOperation {
static_cast<ui64>(OperationId.GetTxId()),
static_cast<ui64>(context.SS->SelfTabletId()));

if (context.SS->IsServerlessDomain(TPath::Init(context.SS->RootPathId(), context.SS))) {
if (!context.SS->EnableResourcePoolsOnServerless) {
result->SetError(NKikimrScheme::StatusPreconditionFailed, "Resource pools are disabled for serverless domains. Please contact your system administrator to enable it");
return result;
}
}

const TPath& parentPath = TPath::Resolve(parentPathStr, context.SS);
RETURN_RESULT_UNLESS(NResourcePool::IsParentPathValid(result, parentPath));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,13 @@ class TCreateResourcePool : public TSubOperation {
static_cast<ui64>(OperationId.GetTxId()),
static_cast<ui64>(context.SS->SelfTabletId()));

if (context.SS->IsServerlessDomain(TPath::Init(context.SS->RootPathId(), context.SS))) {
if (!context.SS->EnableResourcePoolsOnServerless) {
result->SetError(NKikimrScheme::StatusPreconditionFailed, "Resource pools are disabled for serverless domains. Please contact your system administrator to enable it");
return result;
}
}

const TPath& parentPath = TPath::Resolve(parentPathStr, context.SS);
RETURN_RESULT_UNLESS(NResourcePool::IsParentPathValid(result, parentPath));

Expand Down
1 change: 1 addition & 0 deletions ydb/core/tx/schemeshard/schemeshard_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7012,6 +7012,7 @@ void TSchemeShard::ApplyConsoleConfigs(const NKikimrConfig::TFeatureFlags& featu
EnableTempTables = featureFlags.GetEnableTempTables();
EnableReplaceIfExistsForExternalEntities = featureFlags.GetEnableReplaceIfExistsForExternalEntities();
EnableTableDatetime64 = featureFlags.GetEnableTableDatetime64();
EnableResourcePoolsOnServerless = featureFlags.GetEnableResourcePoolsOnServerless();
}

void TSchemeShard::ConfigureStatsBatching(const NKikimrConfig::TSchemeShardConfig& config, const TActorContext& ctx) {
Expand Down
1 change: 1 addition & 0 deletions ydb/core/tx/schemeshard/schemeshard_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,7 @@ class TSchemeShard
bool EnableReplaceIfExistsForExternalEntities = false;
bool EnableTempTables = false;
bool EnableTableDatetime64 = false;
bool EnableResourcePoolsOnServerless = false;

TShardDeleter ShardDeleter;

Expand Down

0 comments on commit 48bdda4

Please sign in to comment.