Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

YQ-3459 fix resource pools permissions #6989

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
113 changes: 96 additions & 17 deletions ydb/core/kqp/workload_service/actors/scheme_actors.cpp
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
#include "actors.h"

#include <ydb/core/base/path.h>
#include <ydb/core/base/tablet_pipe.h>

#include <ydb/core/kqp/common/simple/services.h>
#include <ydb/core/kqp/workload_service/common/events.h>
#include <ydb/core/kqp/workload_service/common/helpers.h>

#include <ydb/core/tx/schemeshard/schemeshard.h>
#include <ydb/core/tx/tx_proxy/proxy.h>

#include <ydb/library/table_creator/table_creator.h>
Expand Down Expand Up @@ -64,7 +66,13 @@ class TPoolResolverActor : public TActorBootstrapped<TPoolResolverActor> {
for (const TString& usedSid : AppData()->AdministrationAllowedSIDs) {
diffAcl.AddAccess(NACLib::EAccessType::Allow, NACLib::EAccessRights::GenericFull, usedSid);
}
diffAcl.AddAccess(NACLib::EAccessType::Allow, NACLib::EAccessRights::SelectRow | NACLib::EAccessRights::DescribeSchema, AppData()->AllAuthenticatedUsers);

auto useAccess = NACLib::EAccessRights::SelectRow | NACLib::EAccessRights::DescribeSchema;
for (const auto& userSID : AppData()->DefaultUserSIDs) {
diffAcl.AddAccess(NACLib::EAccessType::Allow, useAccess, userSID);
}
diffAcl.AddAccess(NACLib::EAccessType::Allow, useAccess, AppData()->AllAuthenticatedUsers);
diffAcl.AddAccess(NACLib::EAccessType::Allow, useAccess, BUILTIN_ACL_ROOT);

auto token = MakeIntrusive<NACLib::TUserToken>(BUILTIN_ACL_METADATA, TVector<NACLib::TSID>{});
Register(CreatePoolCreatorActor(SelfId(), Event->Get()->Database, Event->Get()->PoolId, NResourcePool::TPoolSettings(), token, diffAcl));
Expand Down Expand Up @@ -116,7 +124,7 @@ class TPoolResolverActor : public TActorBootstrapped<TPoolResolverActor> {

class TPoolFetcherActor : public TSchemeActorBase<TPoolFetcherActor> {
public:
TPoolFetcherActor(const NActors::TActorId& replyActorId, const TString& database, const TString& poolId, TIntrusiveConstPtr<NACLib::TUserToken> userToken, bool enableOnServerless)
TPoolFetcherActor(const TActorId& replyActorId, const TString& database, const TString& poolId, TIntrusiveConstPtr<NACLib::TUserToken> userToken, bool enableOnServerless)
: ReplyActorId(replyActorId)
, Database(database)
, PoolId(poolId)
Expand Down Expand Up @@ -255,38 +263,67 @@ class TPoolCreatorActor : public TSchemeActorBase<TPoolCreatorActor> {
}

void Handle(TEvTxUserProxy::TEvProposeTransactionStatus::TPtr& ev) {
const auto ssStatus = ev->Get()->Record.GetSchemeShardStatus();
switch (ev->Get()->Status()) {
const auto& response = ev->Get()->Record;
const auto ssStatus = response.GetSchemeShardStatus();
const auto status = ev->Get()->Status();
switch (status) {
case NTxProxy::TResultStatus::ExecComplete:
case NTxProxy::TResultStatus::ExecAlready:
if (ssStatus == NKikimrScheme::EStatus::StatusSuccess || ssStatus == NKikimrScheme::EStatus::StatusAlreadyExists) {
Reply(Ydb::StatusIds::SUCCESS);
} else {
Reply(Ydb::StatusIds::SCHEME_ERROR, TStringBuilder() << "Invalid creation status: " << static_cast<NKikimrScheme::EStatus>(ssStatus));
Reply(Ydb::StatusIds::SCHEME_ERROR, ExtractIssues(response, TStringBuilder() << "Invalid creation status: " << static_cast<NKikimrScheme::EStatus>(ssStatus)));
}
return;
case NTxProxy::TResultStatus::ExecError:
if (ssStatus == NKikimrScheme::EStatus::StatusMultipleModifications || ssStatus == NKikimrScheme::EStatus::StatusInvalidParameter) {
ScheduleRetry(ssStatus, "Retry execution error", true);
if (ssStatus == NKikimrScheme::EStatus::StatusMultipleModifications) {
SubscribeOnTransactionOrRetry(status, response);
} else {
Reply(Ydb::StatusIds::SCHEME_ERROR, TStringBuilder() << "Execution error: " << static_cast<NKikimrScheme::EStatus>(ssStatus));
Reply(Ydb::StatusIds::SCHEME_ERROR, ExtractIssues(response, TStringBuilder() << "Execution error: " << static_cast<NKikimrScheme::EStatus>(ssStatus)));
}
return;
case NTxProxy::TResultStatus::ExecInProgress:
ScheduleRetry(ssStatus, "Retry execution in progress error", true);
SubscribeOnTransactionOrRetry(status, response);
return;
case NTxProxy::TResultStatus::ProxyShardNotAvailable:
ScheduleRetry(ssStatus, "Retry shard unavailable error");
ScheduleRetry(response, "Retry shard unavailable error");
return;
default:
Reply(Ydb::StatusIds::SCHEME_ERROR, TStringBuilder() << "Failed to create resource pool: " << static_cast<NKikimrScheme::EStatus>(ssStatus));
Reply(Ydb::StatusIds::SCHEME_ERROR, ExtractIssues(response, TStringBuilder() << "Failed to create resource pool: " << static_cast<NKikimrScheme::EStatus>(ssStatus)));
return;
}
}

void Handle(TEvTabletPipe::TEvClientConnected::TPtr& ev) {
if (ev->Get()->Status == NKikimrProto::OK) {
LOG_T("Tablet to pipe successfully connected");
return;
}

ClosePipeClient();
ScheduleRetry(TStringBuilder() << "Tablet to pipe not connected: " << NKikimrProto::EReplyStatus_Name(ev->Get()->Status));
}

void Handle(TEvTabletPipe::TEvClientDestroyed::TPtr& ev) {
const TActorId clientId = ev->Get()->ClientId;
if (!ClosedSchemePipeActors.contains(clientId)) {
ClosePipeClient();
ScheduleRetry("Tablet to pipe destroyed");
}
}

void Handle(NSchemeShard::TEvSchemeShard::TEvNotifyTxCompletionResult::TPtr& ev) {
ScheduleRetry(TStringBuilder() << "Transaction " << ev->Get()->Record.GetTxId() << " completed, doublechecking");
}

STFUNC(StateFunc) {
switch (ev->GetTypeRewrite()) {
hFunc(TEvTxUserProxy::TEvProposeTransactionStatus, Handle)
hFunc(TEvTabletPipe::TEvClientConnected, Handle)
hFunc(TEvTabletPipe::TEvClientDestroyed, Handle)
hFunc(NSchemeShard::TEvSchemeShard::TEvNotifyTxCompletionResult, Handle)
IgnoreFunc(NSchemeShard::TEvSchemeShard::TEvNotifyTxCompletionRegistered)

default:
StateFuncBase(ev);
}
Expand All @@ -301,13 +338,12 @@ class TPoolCreatorActor : public TSchemeActorBase<TPoolCreatorActor> {
schemeTx.SetWorkingDir(JoinPath({Database, ".resource_pools"}));
schemeTx.SetOperationType(NKikimrSchemeOp::ESchemeOpCreateResourcePool);
schemeTx.SetInternal(true);
schemeTx.SetAllowAccessToPrivatePaths(true);

BuildCreatePoolRequest(*schemeTx.MutableCreateResourcePool());
BuildModifyAclRequest(*schemeTx.MutableModifyACL());

if (UserToken) {
event->Record.SetUserToken(UserToken->GetSerializedToken());
event->Record.SetUserToken(UserToken->SerializeAsString());
}

Send(MakeTxProxyID(), std::move(event));
Expand All @@ -322,10 +358,42 @@ class TPoolCreatorActor : public TSchemeActorBase<TPoolCreatorActor> {
}

private:
void ScheduleRetry(ui32 status, const TString& message, bool longDelay = false) {
auto ssStatus = static_cast<NKikimrScheme::EStatus>(status);
if (!TBase::ScheduleRetry(TStringBuilder() << message << ", status: " << ssStatus, longDelay)) {
Reply(Ydb::StatusIds::UNAVAILABLE, TStringBuilder() << "Retry limit exceeded on status: " << ssStatus);
void SubscribeOnTransactionOrRetry(NTxProxy::TResultStatus::EStatus status, const NKikimrTxUserProxy::TEvProposeTransactionStatus& response) {
const ui64 txId = status == NTxProxy::TResultStatus::ExecInProgress ? response.GetTxId() : response.GetPathCreateTxId();
if (txId == 0) {
ScheduleRetry(response, "Unable to subscribe to concurrent transaction", true);
return;
}

SchemePipeActorId = Register(NTabletPipe::CreateClient(SelfId(), response.GetSchemeShardTabletId()));

auto request = MakeHolder<NSchemeShard::TEvSchemeShard::TEvNotifyTxCompletion>();
request->Record.SetTxId(txId);
NTabletPipe::SendData(SelfId(), SchemePipeActorId, std::move(request));
LOG_D("Subscribe on create pool tx: " << txId);
}

void ClosePipeClient() {
if (SchemePipeActorId) {
ClosedSchemePipeActors.insert(SchemePipeActorId);
NTabletPipe::CloseClient(SelfId(), SchemePipeActorId);
SchemePipeActorId = {};
}
}

void ScheduleRetry(const NKikimrTxUserProxy::TEvProposeTransactionStatus& response, const TString& message, bool longDelay = false) {
ClosePipeClient();

auto ssStatus = static_cast<NKikimrScheme::EStatus>(response.GetSchemeShardStatus());
if (!TBase::ScheduleRetry(ExtractIssues(response, TStringBuilder() << message << ", status: " << ssStatus), longDelay)) {
Reply(Ydb::StatusIds::UNAVAILABLE, ExtractIssues(response, TStringBuilder() << "Retry limit exceeded on status: " << ssStatus));
}
}

void ScheduleRetry(const TString& message, bool longDelay = false) {
ClosePipeClient();
if (!TBase::ScheduleRetry(message, longDelay)) {
Reply(Ydb::StatusIds::UNAVAILABLE, TStringBuilder() << "Retry limit exceeded on error: " << message);
}
}

Expand Down Expand Up @@ -358,18 +426,29 @@ class TPoolCreatorActor : public TSchemeActorBase<TPoolCreatorActor> {
LOG_W("Failed to create pool, " << status << ", issues: " << issues.ToOneLineString());
}

ClosePipeClient();

Issues.AddIssues(std::move(issues));
Send(ReplyActorId, new TEvPrivate::TEvCreatePoolResponse(status, std::move(Issues)));
PassAway();
}

static NYql::TIssues ExtractIssues(const NKikimrTxUserProxy::TEvProposeTransactionStatus& response, const TString& message) {
NYql::TIssues issues;
NYql::IssuesFromMessage(response.GetIssues(), issues);
return GroupIssues(issues, message);
}

private:
const TActorId ReplyActorId;
const TString Database;
const TString PoolId;
const TIntrusiveConstPtr<NACLib::TUserToken> UserToken;
const NACLibProto::TDiffACL DiffAcl;
NResourcePool::TPoolSettings PoolConfig;

std::unordered_set<TActorId> ClosedSchemePipeActors;
TActorId SchemePipeActorId;
};

} // anonymous namespace
Expand Down
10 changes: 7 additions & 3 deletions ydb/core/kqp/workload_service/common/helpers.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,21 +62,25 @@ class TSchemeActorBase : public NActors::TActorBootstrapped<TDerived> {
virtual TString LogPrefix() const = 0;

protected:
bool ScheduleRetry(const TString& message, bool longDelay = false) {
bool ScheduleRetry(NYql::TIssues issues, bool longDelay = false) {
if (!RetryState) {
RetryState = CreateRetryState();
}

if (const auto delay = RetryState->GetNextRetryDelay(longDelay)) {
Issues.AddIssue(message);
Issues.AddIssues(issues);
this->Schedule(*delay, new TEvents::TEvWakeup());
LOG_W("Scheduled retry for error: " << message);
LOG_W("Scheduled retry for error: " << issues.ToOneLineString());
return true;
}

return false;
}

bool ScheduleRetry(const TString& message, bool longDelay = false) {
return ScheduleRetry({NYql::TIssue(message)}, longDelay);
}

private:
static TRetryPolicy::IRetryState::TPtr CreateRetryState() {
return TRetryPolicy::GetFixedIntervalPolicy(
Expand Down
22 changes: 0 additions & 22 deletions ydb/core/kqp/workload_service/kqp_workload_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -132,9 +132,6 @@ class TKqpWorkloadService : public TActorBootstrapped<TKqpWorkloadService> {
return;
}

// Add AllAuthenticatedUsers group SID into user token
ev->Get()->UserToken = GetUserToken(ev->Get()->UserToken);

LOG_D("Recieved new request from " << workerActorId << ", Database: " << ev->Get()->Database << ", PoolId: " << ev->Get()->PoolId << ", SessionId: " << ev->Get()->SessionId);
bool hasDefaultPool = DatabasesWithDefaultPool.contains(CanonizePath(ev->Get()->Database));
Register(CreatePoolResolverActor(std::move(ev), hasDefaultPool, EnabledResourcePoolsOnServerless));
Expand Down Expand Up @@ -475,25 +472,6 @@ class TKqpWorkloadService : public TActorBootstrapped<TKqpWorkloadService> {
Send(replyActorId, new TEvCleanupResponse(status, {NYql::TIssue(message)}));
}

static TIntrusivePtr<NACLib::TUserToken> GetUserToken(TIntrusiveConstPtr<NACLib::TUserToken> userToken) {
auto token = MakeIntrusive<NACLib::TUserToken>(userToken ? userToken->GetUserSID() : NACLib::TSID(), TVector<NACLib::TSID>{});

bool hasAllAuthenticatedUsersSID = false;
const auto& allAuthenticatedUsersSID = AppData()->AllAuthenticatedUsers;
if (userToken) {
for (const auto& groupSID : userToken->GetGroupSIDs()) {
token->AddGroupSID(groupSID);
hasAllAuthenticatedUsersSID = hasAllAuthenticatedUsersSID || groupSID == allAuthenticatedUsersSID;
}
}

if (!hasAllAuthenticatedUsersSID) {
token->AddGroupSID(allAuthenticatedUsersSID);
}

return token;
}

TPoolState* GetPoolState(const TString& database, const TString& poolId) {
return GetPoolState(GetPoolKey(database, poolId));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ TEvPrivate::TEvFetchPoolResponse::TPtr FetchPool(TIntrusivePtr<IYdbSetup> ydb, c
auto runtime = ydb->GetRuntime();
const auto& edgeActor = runtime->AllocateEdgeActor();

runtime->Register(CreatePoolFetcherActor(edgeActor, settings.DomainName_, poolId ? poolId : settings.PoolId_, MakeIntrusive<NACLib::TUserToken>(userSID, TVector<NACLib::TSID>{}), true));
auto userToken = MakeIntrusive<NACLib::TUserToken>(userSID, TVector<NACLib::TSID>{});
userToken->SaveSerializationInfo();
runtime->Register(CreatePoolFetcherActor(edgeActor, settings.DomainName_, poolId ? poolId : settings.PoolId_, userToken, true));
return runtime->GrabEdgeEvent<TEvPrivate::TEvFetchPoolResponse>(edgeActor, FUTURE_WAIT_TIMEOUT);
}

Expand Down Expand Up @@ -108,7 +110,8 @@ Y_UNIT_TEST_SUITE(KqpWorkloadServiceActors) {

// Check default pool access
TSampleQueries::TSelect42::CheckResult(ydb->ExecuteQuery(TSampleQueries::TSelect42::Query, settings.UserSID(userSID)));
TSampleQueries::TSelect42::CheckResult(ydb->ExecuteQuery(TSampleQueries::TSelect42::Query, settings.UserSID("")));
TSampleQueries::TSelect42::CheckResult(ydb->ExecuteQuery(TSampleQueries::TSelect42::Query, settings.UserSID(ydb->GetRuntime()->GetAppData().AllAuthenticatedUsers)));
TSampleQueries::TSelect42::CheckResult(ydb->ExecuteQuery(TSampleQueries::TSelect42::Query, settings.UserSID(BUILTIN_ACL_ROOT)));
}

Y_UNIT_TEST(TestDefaultPoolAdminPermissions) {
Expand Down
4 changes: 3 additions & 1 deletion ydb/library/table_creator/table_creator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -392,7 +392,9 @@ THolder<NSchemeCache::TSchemeCacheNavigate> BuildSchemeCacheNavigateRequest(cons
auto request = MakeHolder<NSchemeCache::TSchemeCacheNavigate>();
auto databasePath = SplitPath(database);
request->DatabaseName = CanonizePath(databasePath);
request->UserToken = userToken;
if (userToken && !userToken->GetSerializedToken().empty()) {
request->UserToken = userToken;
}

for (const auto& pathComponents : pathsComponents) {
auto& entry = request->ResultSet.emplace_back();
Expand Down
Loading
Loading