Skip to content

Commit

Permalink
Fixed resource pools acl validation
Browse files Browse the repository at this point in the history
  • Loading branch information
GrigoriyPA committed Jul 23, 2024
1 parent 1091a58 commit 059ae3b
Show file tree
Hide file tree
Showing 11 changed files with 279 additions and 118 deletions.
108 changes: 93 additions & 15 deletions ydb/core/kqp/workload_service/actors/scheme_actors.cpp
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
#include "actors.h"

#include <ydb/core/base/path.h>
#include <ydb/core/base/tablet_pipe.h>

#include <ydb/core/kqp/common/simple/services.h>
#include <ydb/core/kqp/workload_service/common/events.h>
#include <ydb/core/kqp/workload_service/common/helpers.h>

#include <ydb/core/tx/schemeshard/schemeshard.h>
#include <ydb/core/tx/tx_proxy/proxy.h>

#include <ydb/library/table_creator/table_creator.h>
Expand Down Expand Up @@ -255,38 +257,70 @@ class TPoolCreatorActor : public TSchemeActorBase<TPoolCreatorActor> {
}

void Handle(TEvTxUserProxy::TEvProposeTransactionStatus::TPtr& ev) {
const auto ssStatus = ev->Get()->Record.GetSchemeShardStatus();
switch (ev->Get()->Status()) {
const auto& response = ev->Get()->Record;
const auto ssStatus = response.GetSchemeShardStatus();
const auto status = ev->Get()->Status();
switch (status) {
case NTxProxy::TResultStatus::ExecComplete:
case NTxProxy::TResultStatus::ExecAlready:
if (ssStatus == NKikimrScheme::EStatus::StatusSuccess || ssStatus == NKikimrScheme::EStatus::StatusAlreadyExists) {
Reply(Ydb::StatusIds::SUCCESS);
} else {
Reply(Ydb::StatusIds::SCHEME_ERROR, TStringBuilder() << "Invalid creation status: " << static_cast<NKikimrScheme::EStatus>(ssStatus));
Reply(Ydb::StatusIds::SCHEME_ERROR, ExtractIssues(response, TStringBuilder() << "Invalid creation status: " << static_cast<NKikimrScheme::EStatus>(ssStatus)));
}
return;
case NTxProxy::TResultStatus::ExecError:
if (ssStatus == NKikimrScheme::EStatus::StatusMultipleModifications || ssStatus == NKikimrScheme::EStatus::StatusInvalidParameter) {
ScheduleRetry(ssStatus, "Retry execution error", true);
if (ssStatus == NKikimrScheme::EStatus::StatusMultipleModifications) {
SubscribeOnTransactionOrRetry(status, response);
} else {
Reply(Ydb::StatusIds::SCHEME_ERROR, TStringBuilder() << "Execution error: " << static_cast<NKikimrScheme::EStatus>(ssStatus));
Reply(Ydb::StatusIds::SCHEME_ERROR, ExtractIssues(response, TStringBuilder() << "Execution error: " << static_cast<NKikimrScheme::EStatus>(ssStatus)));
}
return;
case NTxProxy::TResultStatus::ExecInProgress:
ScheduleRetry(ssStatus, "Retry execution in progress error", true);
SubscribeOnTransactionOrRetry(status, response);
return;
case NTxProxy::TResultStatus::ProxyShardNotAvailable:
ScheduleRetry(ssStatus, "Retry shard unavailable error");
ScheduleRetry(response, "Retry shard unavailable error");
return;
default:
Reply(Ydb::StatusIds::SCHEME_ERROR, TStringBuilder() << "Failed to create resource pool: " << static_cast<NKikimrScheme::EStatus>(ssStatus));
Reply(Ydb::StatusIds::SCHEME_ERROR, ExtractIssues(response, TStringBuilder() << "Failed to create resource pool: " << static_cast<NKikimrScheme::EStatus>(ssStatus)));
return;
}
}

void Handle(TEvTabletPipe::TEvClientConnected::TPtr& ev) {
if (ev->Get()->Status == NKikimrProto::OK) {
LOG_T("Tablet to pipe successfully connected");
return;
}

PipeClientClosedByUs = true;
SchemePipeActorId = {};
NTabletPipe::CloseClient(SelfId(), SchemePipeActorId);

ScheduleRetry(TStringBuilder() << "Tablet to pipe not connected: " << NKikimrProto::EReplyStatus_Name(ev->Get()->Status));
}

void HandleClientDestroyed() {
SchemePipeActorId = {};
if (!PipeClientClosedByUs) {
ScheduleRetry("Tablet to pipe destroyed");
}
PipeClientClosedByUs = false;
}

void HandleNotifyTxCompletionResult() {
ScheduleRetry("Transaction completed, doublechecking");
}

STFUNC(StateFunc) {
switch (ev->GetTypeRewrite()) {
hFunc(TEvTxUserProxy::TEvProposeTransactionStatus, Handle)
hFunc(TEvTabletPipe::TEvClientConnected, Handle)
sFunc(TEvTabletPipe::TEvClientDestroyed, HandleClientDestroyed)
sFunc(NSchemeShard::TEvSchemeShard::TEvNotifyTxCompletionResult, HandleNotifyTxCompletionResult)
IgnoreFunc(NSchemeShard::TEvSchemeShard::TEvNotifyTxCompletionRegistered)

default:
StateFuncBase(ev);
}
Expand All @@ -301,13 +335,12 @@ class TPoolCreatorActor : public TSchemeActorBase<TPoolCreatorActor> {
schemeTx.SetWorkingDir(JoinPath({Database, ".resource_pools"}));
schemeTx.SetOperationType(NKikimrSchemeOp::ESchemeOpCreateResourcePool);
schemeTx.SetInternal(true);
schemeTx.SetAllowAccessToPrivatePaths(true);

BuildCreatePoolRequest(*schemeTx.MutableCreateResourcePool());
BuildModifyAclRequest(*schemeTx.MutableModifyACL());

if (UserToken) {
event->Record.SetUserToken(UserToken->GetSerializedToken());
event->Record.SetUserToken(UserToken->SerializeAsString());
}

Send(MakeTxProxyID(), std::move(event));
Expand All @@ -322,10 +355,42 @@ class TPoolCreatorActor : public TSchemeActorBase<TPoolCreatorActor> {
}

private:
void ScheduleRetry(ui32 status, const TString& message, bool longDelay = false) {
auto ssStatus = static_cast<NKikimrScheme::EStatus>(status);
if (!TBase::ScheduleRetry(TStringBuilder() << message << ", status: " << ssStatus, longDelay)) {
Reply(Ydb::StatusIds::UNAVAILABLE, TStringBuilder() << "Retry limit exceeded on status: " << ssStatus);
void SubscribeOnTransactionOrRetry(NTxProxy::TResultStatus::EStatus status, const NKikimrTxUserProxy::TEvProposeTransactionStatus& response) {
const ui64 txId = status == NTxProxy::TResultStatus::ExecInProgress ? response.GetTxId() : response.GetPathCreateTxId();
if (txId == 0) {
ScheduleRetry(response, "Unable to subscribe to concurrent transaction");
return;
}

PipeClientClosedByUs = false;
SchemePipeActorId = Register(NTabletPipe::CreateClient(SelfId(), response.GetSchemeShardTabletId()));

auto request = MakeHolder<NSchemeShard::TEvSchemeShard::TEvNotifyTxCompletion>();
request->Record.SetTxId(txId);
NTabletPipe::SendData(SelfId(), SchemePipeActorId, std::move(request));
LOG_D("Subscribe on create pool tx: " << txId);
}

void ScheduleRetry(const NKikimrTxUserProxy::TEvProposeTransactionStatus& response, const TString& message, bool longDelay = false) {
if (SchemePipeActorId){
PipeClientClosedByUs = true;
NTabletPipe::CloseClient(SelfId(), SchemePipeActorId);
}

auto ssStatus = static_cast<NKikimrScheme::EStatus>(response.GetSchemeShardStatus());
if (!TBase::ScheduleRetry(ExtractIssues(response, TStringBuilder() << message << ", status: " << ssStatus), longDelay)) {
Reply(Ydb::StatusIds::UNAVAILABLE, ExtractIssues(response, TStringBuilder() << "Retry limit exceeded on status: " << ssStatus));
}
}

void ScheduleRetry(const TString& message, bool longDelay = false) {
if (SchemePipeActorId){
PipeClientClosedByUs = true;
NTabletPipe::CloseClient(SelfId(), SchemePipeActorId);
}

if (!TBase::ScheduleRetry(message, longDelay)) {
Reply(Ydb::StatusIds::UNAVAILABLE, TStringBuilder() << "Retry limit exceeded on error: " << message);
}
}

Expand Down Expand Up @@ -358,18 +423,31 @@ class TPoolCreatorActor : public TSchemeActorBase<TPoolCreatorActor> {
LOG_W("Failed to create pool, " << status << ", issues: " << issues.ToOneLineString());
}

if (SchemePipeActorId) {
NTabletPipe::CloseClient(SelfId(), SchemePipeActorId);
}

Issues.AddIssues(std::move(issues));
Send(ReplyActorId, new TEvPrivate::TEvCreatePoolResponse(status, std::move(Issues)));
PassAway();
}

static NYql::TIssues ExtractIssues(const NKikimrTxUserProxy::TEvProposeTransactionStatus& response, const TString& message) {
NYql::TIssues issues;
NYql::IssuesFromMessage(response.GetIssues(), issues);
return GroupIssues(issues, message);
}

private:
const TActorId ReplyActorId;
const TString Database;
const TString PoolId;
const TIntrusiveConstPtr<NACLib::TUserToken> UserToken;
const NACLibProto::TDiffACL DiffAcl;
NResourcePool::TPoolSettings PoolConfig;

NActors::TActorId SchemePipeActorId;
bool PipeClientClosedByUs = false;
};

} // anonymous namespace
Expand Down
11 changes: 8 additions & 3 deletions ydb/core/kqp/workload_service/common/helpers.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ namespace NKikimr::NKqp::NWorkload {
#define LOG_C(stream) LOG_CRIT_S(*TlsActivationContext, NKikimrServices::KQP_WORKLOAD_SERVICE, "[WorkloadService] " << LogPrefix() << stream)



template <typename TDerived>
class TSchemeActorBase : public NActors::TActorBootstrapped<TDerived> {
using TRetryPolicy = IRetryPolicy<bool>;
Expand Down Expand Up @@ -62,21 +63,25 @@ class TSchemeActorBase : public NActors::TActorBootstrapped<TDerived> {
virtual TString LogPrefix() const = 0;

protected:
bool ScheduleRetry(const TString& message, bool longDelay = false) {
bool ScheduleRetry(NYql::TIssues issues, bool longDelay = false) {
if (!RetryState) {
RetryState = CreateRetryState();
}

if (const auto delay = RetryState->GetNextRetryDelay(longDelay)) {
Issues.AddIssue(message);
Issues.AddIssues(issues);
this->Schedule(*delay, new TEvents::TEvWakeup());
LOG_W("Scheduled retry for error: " << message);
LOG_W("Scheduled retry for error: " << issues.ToOneLineString());
return true;
}

return false;
}

bool ScheduleRetry(const TString& message, bool longDelay = false) {
return ScheduleRetry({NYql::TIssue(message)}, longDelay);
}

private:
static TRetryPolicy::IRetryState::TPtr CreateRetryState() {
return TRetryPolicy::GetFixedIntervalPolicy(
Expand Down
4 changes: 4 additions & 0 deletions ydb/core/kqp/workload_service/kqp_workload_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -491,6 +491,10 @@ class TKqpWorkloadService : public TActorBootstrapped<TKqpWorkloadService> {
token->AddGroupSID(allAuthenticatedUsersSID);
}

if (userToken && !userToken->GetSerializedToken().empty()) {
token->SaveSerializationInfo();
}

return token;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ TEvPrivate::TEvFetchPoolResponse::TPtr FetchPool(TIntrusivePtr<IYdbSetup> ydb, c
auto runtime = ydb->GetRuntime();
const auto& edgeActor = runtime->AllocateEdgeActor();

runtime->Register(CreatePoolFetcherActor(edgeActor, settings.DomainName_, poolId ? poolId : settings.PoolId_, MakeIntrusive<NACLib::TUserToken>(userSID, TVector<NACLib::TSID>{}), true));
auto userToken = MakeIntrusive<NACLib::TUserToken>(userSID, TVector<NACLib::TSID>{});
userToken->SaveSerializationInfo();
runtime->Register(CreatePoolFetcherActor(edgeActor, settings.DomainName_, poolId ? poolId : settings.PoolId_, userToken, true));
return runtime->GrabEdgeEvent<TEvPrivate::TEvFetchPoolResponse>(edgeActor, FUTURE_WAIT_TIMEOUT);
}

Expand Down
4 changes: 3 additions & 1 deletion ydb/library/table_creator/table_creator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -392,7 +392,9 @@ THolder<NSchemeCache::TSchemeCacheNavigate> BuildSchemeCacheNavigateRequest(cons
auto request = MakeHolder<NSchemeCache::TSchemeCacheNavigate>();
auto databasePath = SplitPath(database);
request->DatabaseName = CanonizePath(databasePath);
request->UserToken = userToken;
if (userToken && !userToken->GetSerializedToken().empty()) {
request->UserToken = userToken;
}

for (const auto& pathComponents : pathsComponents) {
auto& entry = request->ResultSet.emplace_back();
Expand Down
Loading

0 comments on commit 059ae3b

Please sign in to comment.