diff --git a/ydb/core/kqp/workload_service/actors/scheme_actors.cpp b/ydb/core/kqp/workload_service/actors/scheme_actors.cpp index 8b5dd686eefd..55b2cd3085d7 100644 --- a/ydb/core/kqp/workload_service/actors/scheme_actors.cpp +++ b/ydb/core/kqp/workload_service/actors/scheme_actors.cpp @@ -1,11 +1,13 @@ #include "actors.h" #include +#include #include #include #include +#include #include #include @@ -64,7 +66,13 @@ class TPoolResolverActor : public TActorBootstrapped { for (const TString& usedSid : AppData()->AdministrationAllowedSIDs) { diffAcl.AddAccess(NACLib::EAccessType::Allow, NACLib::EAccessRights::GenericFull, usedSid); } - diffAcl.AddAccess(NACLib::EAccessType::Allow, NACLib::EAccessRights::SelectRow | NACLib::EAccessRights::DescribeSchema, AppData()->AllAuthenticatedUsers); + + auto useAccess = NACLib::EAccessRights::SelectRow | NACLib::EAccessRights::DescribeSchema; + for (const auto& userSID : AppData()->DefaultUserSIDs) { + diffAcl.AddAccess(NACLib::EAccessType::Allow, useAccess, userSID); + } + diffAcl.AddAccess(NACLib::EAccessType::Allow, useAccess, AppData()->AllAuthenticatedUsers); + diffAcl.AddAccess(NACLib::EAccessType::Allow, useAccess, BUILTIN_ACL_ROOT); auto token = MakeIntrusive(BUILTIN_ACL_METADATA, TVector{}); Register(CreatePoolCreatorActor(SelfId(), Event->Get()->Database, Event->Get()->PoolId, NResourcePool::TPoolSettings(), token, diffAcl)); @@ -116,7 +124,7 @@ class TPoolResolverActor : public TActorBootstrapped { class TPoolFetcherActor : public TSchemeActorBase { public: - TPoolFetcherActor(const NActors::TActorId& replyActorId, const TString& database, const TString& poolId, TIntrusiveConstPtr userToken, bool enableOnServerless) + TPoolFetcherActor(const TActorId& replyActorId, const TString& database, const TString& poolId, TIntrusiveConstPtr userToken, bool enableOnServerless) : ReplyActorId(replyActorId) , Database(database) , PoolId(poolId) @@ -255,38 +263,67 @@ class TPoolCreatorActor : public TSchemeActorBase { } void Handle(TEvTxUserProxy::TEvProposeTransactionStatus::TPtr& ev) { - const auto ssStatus = ev->Get()->Record.GetSchemeShardStatus(); - switch (ev->Get()->Status()) { + const auto& response = ev->Get()->Record; + const auto ssStatus = response.GetSchemeShardStatus(); + const auto status = ev->Get()->Status(); + switch (status) { case NTxProxy::TResultStatus::ExecComplete: case NTxProxy::TResultStatus::ExecAlready: if (ssStatus == NKikimrScheme::EStatus::StatusSuccess || ssStatus == NKikimrScheme::EStatus::StatusAlreadyExists) { Reply(Ydb::StatusIds::SUCCESS); } else { - Reply(Ydb::StatusIds::SCHEME_ERROR, TStringBuilder() << "Invalid creation status: " << static_cast(ssStatus)); + Reply(Ydb::StatusIds::SCHEME_ERROR, ExtractIssues(response, TStringBuilder() << "Invalid creation status: " << static_cast(ssStatus))); } return; case NTxProxy::TResultStatus::ExecError: - if (ssStatus == NKikimrScheme::EStatus::StatusMultipleModifications || ssStatus == NKikimrScheme::EStatus::StatusInvalidParameter) { - ScheduleRetry(ssStatus, "Retry execution error", true); + if (ssStatus == NKikimrScheme::EStatus::StatusMultipleModifications) { + SubscribeOnTransactionOrRetry(status, response); } else { - Reply(Ydb::StatusIds::SCHEME_ERROR, TStringBuilder() << "Execution error: " << static_cast(ssStatus)); + Reply(Ydb::StatusIds::SCHEME_ERROR, ExtractIssues(response, TStringBuilder() << "Execution error: " << static_cast(ssStatus))); } return; case NTxProxy::TResultStatus::ExecInProgress: - ScheduleRetry(ssStatus, "Retry execution in progress error", true); + SubscribeOnTransactionOrRetry(status, response); return; case NTxProxy::TResultStatus::ProxyShardNotAvailable: - ScheduleRetry(ssStatus, "Retry shard unavailable error"); + ScheduleRetry(response, "Retry shard unavailable error"); return; default: - Reply(Ydb::StatusIds::SCHEME_ERROR, TStringBuilder() << "Failed to create resource pool: " << static_cast(ssStatus)); + Reply(Ydb::StatusIds::SCHEME_ERROR, ExtractIssues(response, TStringBuilder() << "Failed to create resource pool: " << static_cast(ssStatus))); return; } } + void Handle(TEvTabletPipe::TEvClientConnected::TPtr& ev) { + if (ev->Get()->Status == NKikimrProto::OK) { + LOG_T("Tablet to pipe successfully connected"); + return; + } + + ClosePipeClient(); + ScheduleRetry(TStringBuilder() << "Tablet to pipe not connected: " << NKikimrProto::EReplyStatus_Name(ev->Get()->Status)); + } + + void Handle(TEvTabletPipe::TEvClientDestroyed::TPtr& ev) { + const TActorId clientId = ev->Get()->ClientId; + if (!ClosedSchemePipeActors.contains(clientId)) { + ClosePipeClient(); + ScheduleRetry("Tablet to pipe destroyed"); + } + } + + void Handle(NSchemeShard::TEvSchemeShard::TEvNotifyTxCompletionResult::TPtr& ev) { + ScheduleRetry(TStringBuilder() << "Transaction " << ev->Get()->Record.GetTxId() << " completed, doublechecking"); + } + STFUNC(StateFunc) { switch (ev->GetTypeRewrite()) { hFunc(TEvTxUserProxy::TEvProposeTransactionStatus, Handle) + hFunc(TEvTabletPipe::TEvClientConnected, Handle) + hFunc(TEvTabletPipe::TEvClientDestroyed, Handle) + hFunc(NSchemeShard::TEvSchemeShard::TEvNotifyTxCompletionResult, Handle) + IgnoreFunc(NSchemeShard::TEvSchemeShard::TEvNotifyTxCompletionRegistered) + default: StateFuncBase(ev); } @@ -301,13 +338,12 @@ class TPoolCreatorActor : public TSchemeActorBase { schemeTx.SetWorkingDir(JoinPath({Database, ".resource_pools"})); schemeTx.SetOperationType(NKikimrSchemeOp::ESchemeOpCreateResourcePool); schemeTx.SetInternal(true); - schemeTx.SetAllowAccessToPrivatePaths(true); BuildCreatePoolRequest(*schemeTx.MutableCreateResourcePool()); BuildModifyAclRequest(*schemeTx.MutableModifyACL()); if (UserToken) { - event->Record.SetUserToken(UserToken->GetSerializedToken()); + event->Record.SetUserToken(UserToken->SerializeAsString()); } Send(MakeTxProxyID(), std::move(event)); @@ -322,10 +358,42 @@ class TPoolCreatorActor : public TSchemeActorBase { } private: - void ScheduleRetry(ui32 status, const TString& message, bool longDelay = false) { - auto ssStatus = static_cast(status); - if (!TBase::ScheduleRetry(TStringBuilder() << message << ", status: " << ssStatus, longDelay)) { - Reply(Ydb::StatusIds::UNAVAILABLE, TStringBuilder() << "Retry limit exceeded on status: " << ssStatus); + void SubscribeOnTransactionOrRetry(NTxProxy::TResultStatus::EStatus status, const NKikimrTxUserProxy::TEvProposeTransactionStatus& response) { + const ui64 txId = status == NTxProxy::TResultStatus::ExecInProgress ? response.GetTxId() : response.GetPathCreateTxId(); + if (txId == 0) { + ScheduleRetry(response, "Unable to subscribe to concurrent transaction", true); + return; + } + + SchemePipeActorId = Register(NTabletPipe::CreateClient(SelfId(), response.GetSchemeShardTabletId())); + + auto request = MakeHolder(); + request->Record.SetTxId(txId); + NTabletPipe::SendData(SelfId(), SchemePipeActorId, std::move(request)); + LOG_D("Subscribe on create pool tx: " << txId); + } + + void ClosePipeClient() { + if (SchemePipeActorId) { + ClosedSchemePipeActors.insert(SchemePipeActorId); + NTabletPipe::CloseClient(SelfId(), SchemePipeActorId); + SchemePipeActorId = {}; + } + } + + void ScheduleRetry(const NKikimrTxUserProxy::TEvProposeTransactionStatus& response, const TString& message, bool longDelay = false) { + ClosePipeClient(); + + auto ssStatus = static_cast(response.GetSchemeShardStatus()); + if (!TBase::ScheduleRetry(ExtractIssues(response, TStringBuilder() << message << ", status: " << ssStatus), longDelay)) { + Reply(Ydb::StatusIds::UNAVAILABLE, ExtractIssues(response, TStringBuilder() << "Retry limit exceeded on status: " << ssStatus)); + } + } + + void ScheduleRetry(const TString& message, bool longDelay = false) { + ClosePipeClient(); + if (!TBase::ScheduleRetry(message, longDelay)) { + Reply(Ydb::StatusIds::UNAVAILABLE, TStringBuilder() << "Retry limit exceeded on error: " << message); } } @@ -358,11 +426,19 @@ class TPoolCreatorActor : public TSchemeActorBase { LOG_W("Failed to create pool, " << status << ", issues: " << issues.ToOneLineString()); } + ClosePipeClient(); + Issues.AddIssues(std::move(issues)); Send(ReplyActorId, new TEvPrivate::TEvCreatePoolResponse(status, std::move(Issues))); PassAway(); } + static NYql::TIssues ExtractIssues(const NKikimrTxUserProxy::TEvProposeTransactionStatus& response, const TString& message) { + NYql::TIssues issues; + NYql::IssuesFromMessage(response.GetIssues(), issues); + return GroupIssues(issues, message); + } + private: const TActorId ReplyActorId; const TString Database; @@ -370,6 +446,9 @@ class TPoolCreatorActor : public TSchemeActorBase { const TIntrusiveConstPtr UserToken; const NACLibProto::TDiffACL DiffAcl; NResourcePool::TPoolSettings PoolConfig; + + std::unordered_set ClosedSchemePipeActors; + TActorId SchemePipeActorId; }; } // anonymous namespace diff --git a/ydb/core/kqp/workload_service/common/helpers.h b/ydb/core/kqp/workload_service/common/helpers.h index edb489bc3fc6..163b2d765ed1 100644 --- a/ydb/core/kqp/workload_service/common/helpers.h +++ b/ydb/core/kqp/workload_service/common/helpers.h @@ -62,21 +62,25 @@ class TSchemeActorBase : public NActors::TActorBootstrapped { virtual TString LogPrefix() const = 0; protected: - bool ScheduleRetry(const TString& message, bool longDelay = false) { + bool ScheduleRetry(NYql::TIssues issues, bool longDelay = false) { if (!RetryState) { RetryState = CreateRetryState(); } if (const auto delay = RetryState->GetNextRetryDelay(longDelay)) { - Issues.AddIssue(message); + Issues.AddIssues(issues); this->Schedule(*delay, new TEvents::TEvWakeup()); - LOG_W("Scheduled retry for error: " << message); + LOG_W("Scheduled retry for error: " << issues.ToOneLineString()); return true; } return false; } + bool ScheduleRetry(const TString& message, bool longDelay = false) { + return ScheduleRetry({NYql::TIssue(message)}, longDelay); + } + private: static TRetryPolicy::IRetryState::TPtr CreateRetryState() { return TRetryPolicy::GetFixedIntervalPolicy( diff --git a/ydb/core/kqp/workload_service/kqp_workload_service.cpp b/ydb/core/kqp/workload_service/kqp_workload_service.cpp index bf067d74a990..66a6aaaaf64b 100644 --- a/ydb/core/kqp/workload_service/kqp_workload_service.cpp +++ b/ydb/core/kqp/workload_service/kqp_workload_service.cpp @@ -132,9 +132,6 @@ class TKqpWorkloadService : public TActorBootstrapped { return; } - // Add AllAuthenticatedUsers group SID into user token - ev->Get()->UserToken = GetUserToken(ev->Get()->UserToken); - LOG_D("Recieved new request from " << workerActorId << ", Database: " << ev->Get()->Database << ", PoolId: " << ev->Get()->PoolId << ", SessionId: " << ev->Get()->SessionId); bool hasDefaultPool = DatabasesWithDefaultPool.contains(CanonizePath(ev->Get()->Database)); Register(CreatePoolResolverActor(std::move(ev), hasDefaultPool, EnabledResourcePoolsOnServerless)); @@ -475,25 +472,6 @@ class TKqpWorkloadService : public TActorBootstrapped { Send(replyActorId, new TEvCleanupResponse(status, {NYql::TIssue(message)})); } - static TIntrusivePtr GetUserToken(TIntrusiveConstPtr userToken) { - auto token = MakeIntrusive(userToken ? userToken->GetUserSID() : NACLib::TSID(), TVector{}); - - bool hasAllAuthenticatedUsersSID = false; - const auto& allAuthenticatedUsersSID = AppData()->AllAuthenticatedUsers; - if (userToken) { - for (const auto& groupSID : userToken->GetGroupSIDs()) { - token->AddGroupSID(groupSID); - hasAllAuthenticatedUsersSID = hasAllAuthenticatedUsersSID || groupSID == allAuthenticatedUsersSID; - } - } - - if (!hasAllAuthenticatedUsersSID) { - token->AddGroupSID(allAuthenticatedUsersSID); - } - - return token; - } - TPoolState* GetPoolState(const TString& database, const TString& poolId) { return GetPoolState(GetPoolKey(database, poolId)); } diff --git a/ydb/core/kqp/workload_service/ut/kqp_workload_service_actors_ut.cpp b/ydb/core/kqp/workload_service/ut/kqp_workload_service_actors_ut.cpp index 2c259f9f601a..271d7accbbfd 100644 --- a/ydb/core/kqp/workload_service/ut/kqp_workload_service_actors_ut.cpp +++ b/ydb/core/kqp/workload_service/ut/kqp_workload_service_actors_ut.cpp @@ -16,7 +16,9 @@ TEvPrivate::TEvFetchPoolResponse::TPtr FetchPool(TIntrusivePtr ydb, c auto runtime = ydb->GetRuntime(); const auto& edgeActor = runtime->AllocateEdgeActor(); - runtime->Register(CreatePoolFetcherActor(edgeActor, settings.DomainName_, poolId ? poolId : settings.PoolId_, MakeIntrusive(userSID, TVector{}), true)); + auto userToken = MakeIntrusive(userSID, TVector{}); + userToken->SaveSerializationInfo(); + runtime->Register(CreatePoolFetcherActor(edgeActor, settings.DomainName_, poolId ? poolId : settings.PoolId_, userToken, true)); return runtime->GrabEdgeEvent(edgeActor, FUTURE_WAIT_TIMEOUT); } @@ -108,7 +110,8 @@ Y_UNIT_TEST_SUITE(KqpWorkloadServiceActors) { // Check default pool access TSampleQueries::TSelect42::CheckResult(ydb->ExecuteQuery(TSampleQueries::TSelect42::Query, settings.UserSID(userSID))); - TSampleQueries::TSelect42::CheckResult(ydb->ExecuteQuery(TSampleQueries::TSelect42::Query, settings.UserSID(""))); + TSampleQueries::TSelect42::CheckResult(ydb->ExecuteQuery(TSampleQueries::TSelect42::Query, settings.UserSID(ydb->GetRuntime()->GetAppData().AllAuthenticatedUsers))); + TSampleQueries::TSelect42::CheckResult(ydb->ExecuteQuery(TSampleQueries::TSelect42::Query, settings.UserSID(BUILTIN_ACL_ROOT))); } Y_UNIT_TEST(TestDefaultPoolAdminPermissions) { diff --git a/ydb/library/table_creator/table_creator.cpp b/ydb/library/table_creator/table_creator.cpp index 38dc0a914cb2..769946ce258a 100644 --- a/ydb/library/table_creator/table_creator.cpp +++ b/ydb/library/table_creator/table_creator.cpp @@ -392,7 +392,9 @@ THolder BuildSchemeCacheNavigateRequest(cons auto request = MakeHolder(); auto databasePath = SplitPath(database); request->DatabaseName = CanonizePath(databasePath); - request->UserToken = userToken; + if (userToken && !userToken->GetSerializedToken().empty()) { + request->UserToken = userToken; + } for (const auto& pathComponents : pathsComponents) { auto& entry = request->ResultSet.emplace_back();