From 059ae3be0d964959a7c9c017ab809c1db34a5528 Mon Sep 17 00:00:00 2001 From: Grigoriy Pisarenko Date: Tue, 23 Jul 2024 12:24:12 +0000 Subject: [PATCH 1/6] Fixed resource pools acl validation --- .../workload_service/actors/scheme_actors.cpp | 108 +++++++++++++++--- .../kqp/workload_service/common/helpers.h | 11 +- .../workload_service/kqp_workload_service.cpp | 4 + .../ut/kqp_workload_service_actors_ut.cpp | 4 +- ydb/library/table_creator/table_creator.cpp | 4 +- ydb/tests/tools/kqprun/kqprun.cpp | 81 ++++++++++--- ydb/tests/tools/kqprun/src/common.h | 11 +- ydb/tests/tools/kqprun/src/kqp_runner.cpp | 57 +++++---- ydb/tests/tools/kqprun/src/kqp_runner.h | 11 +- ydb/tests/tools/kqprun/src/ydb_setup.cpp | 79 +++++++------ ydb/tests/tools/kqprun/src/ydb_setup.h | 27 ++--- 11 files changed, 279 insertions(+), 118 deletions(-) diff --git a/ydb/core/kqp/workload_service/actors/scheme_actors.cpp b/ydb/core/kqp/workload_service/actors/scheme_actors.cpp index 8b5dd686eefd..2e282169d205 100644 --- a/ydb/core/kqp/workload_service/actors/scheme_actors.cpp +++ b/ydb/core/kqp/workload_service/actors/scheme_actors.cpp @@ -1,11 +1,13 @@ #include "actors.h" #include +#include #include #include #include +#include #include #include @@ -255,38 +257,70 @@ class TPoolCreatorActor : public TSchemeActorBase { } void Handle(TEvTxUserProxy::TEvProposeTransactionStatus::TPtr& ev) { - const auto ssStatus = ev->Get()->Record.GetSchemeShardStatus(); - switch (ev->Get()->Status()) { + const auto& response = ev->Get()->Record; + const auto ssStatus = response.GetSchemeShardStatus(); + const auto status = ev->Get()->Status(); + switch (status) { case NTxProxy::TResultStatus::ExecComplete: case NTxProxy::TResultStatus::ExecAlready: if (ssStatus == NKikimrScheme::EStatus::StatusSuccess || ssStatus == NKikimrScheme::EStatus::StatusAlreadyExists) { Reply(Ydb::StatusIds::SUCCESS); } else { - Reply(Ydb::StatusIds::SCHEME_ERROR, TStringBuilder() << "Invalid creation status: " << static_cast(ssStatus)); + Reply(Ydb::StatusIds::SCHEME_ERROR, ExtractIssues(response, TStringBuilder() << "Invalid creation status: " << static_cast(ssStatus))); } return; case NTxProxy::TResultStatus::ExecError: - if (ssStatus == NKikimrScheme::EStatus::StatusMultipleModifications || ssStatus == NKikimrScheme::EStatus::StatusInvalidParameter) { - ScheduleRetry(ssStatus, "Retry execution error", true); + if (ssStatus == NKikimrScheme::EStatus::StatusMultipleModifications) { + SubscribeOnTransactionOrRetry(status, response); } else { - Reply(Ydb::StatusIds::SCHEME_ERROR, TStringBuilder() << "Execution error: " << static_cast(ssStatus)); + Reply(Ydb::StatusIds::SCHEME_ERROR, ExtractIssues(response, TStringBuilder() << "Execution error: " << static_cast(ssStatus))); } return; case NTxProxy::TResultStatus::ExecInProgress: - ScheduleRetry(ssStatus, "Retry execution in progress error", true); + SubscribeOnTransactionOrRetry(status, response); return; case NTxProxy::TResultStatus::ProxyShardNotAvailable: - ScheduleRetry(ssStatus, "Retry shard unavailable error"); + ScheduleRetry(response, "Retry shard unavailable error"); return; default: - Reply(Ydb::StatusIds::SCHEME_ERROR, TStringBuilder() << "Failed to create resource pool: " << static_cast(ssStatus)); + Reply(Ydb::StatusIds::SCHEME_ERROR, ExtractIssues(response, TStringBuilder() << "Failed to create resource pool: " << static_cast(ssStatus))); return; } } + void Handle(TEvTabletPipe::TEvClientConnected::TPtr& ev) { + if (ev->Get()->Status == NKikimrProto::OK) { + LOG_T("Tablet to pipe successfully connected"); + return; + } + + PipeClientClosedByUs = true; + SchemePipeActorId = {}; + NTabletPipe::CloseClient(SelfId(), SchemePipeActorId); + + ScheduleRetry(TStringBuilder() << "Tablet to pipe not connected: " << NKikimrProto::EReplyStatus_Name(ev->Get()->Status)); + } + + void HandleClientDestroyed() { + SchemePipeActorId = {}; + if (!PipeClientClosedByUs) { + ScheduleRetry("Tablet to pipe destroyed"); + } + PipeClientClosedByUs = false; + } + + void HandleNotifyTxCompletionResult() { + ScheduleRetry("Transaction completed, doublechecking"); + } + STFUNC(StateFunc) { switch (ev->GetTypeRewrite()) { hFunc(TEvTxUserProxy::TEvProposeTransactionStatus, Handle) + hFunc(TEvTabletPipe::TEvClientConnected, Handle) + sFunc(TEvTabletPipe::TEvClientDestroyed, HandleClientDestroyed) + sFunc(NSchemeShard::TEvSchemeShard::TEvNotifyTxCompletionResult, HandleNotifyTxCompletionResult) + IgnoreFunc(NSchemeShard::TEvSchemeShard::TEvNotifyTxCompletionRegistered) + default: StateFuncBase(ev); } @@ -301,13 +335,12 @@ class TPoolCreatorActor : public TSchemeActorBase { schemeTx.SetWorkingDir(JoinPath({Database, ".resource_pools"})); schemeTx.SetOperationType(NKikimrSchemeOp::ESchemeOpCreateResourcePool); schemeTx.SetInternal(true); - schemeTx.SetAllowAccessToPrivatePaths(true); BuildCreatePoolRequest(*schemeTx.MutableCreateResourcePool()); BuildModifyAclRequest(*schemeTx.MutableModifyACL()); if (UserToken) { - event->Record.SetUserToken(UserToken->GetSerializedToken()); + event->Record.SetUserToken(UserToken->SerializeAsString()); } Send(MakeTxProxyID(), std::move(event)); @@ -322,10 +355,42 @@ class TPoolCreatorActor : public TSchemeActorBase { } private: - void ScheduleRetry(ui32 status, const TString& message, bool longDelay = false) { - auto ssStatus = static_cast(status); - if (!TBase::ScheduleRetry(TStringBuilder() << message << ", status: " << ssStatus, longDelay)) { - Reply(Ydb::StatusIds::UNAVAILABLE, TStringBuilder() << "Retry limit exceeded on status: " << ssStatus); + void SubscribeOnTransactionOrRetry(NTxProxy::TResultStatus::EStatus status, const NKikimrTxUserProxy::TEvProposeTransactionStatus& response) { + const ui64 txId = status == NTxProxy::TResultStatus::ExecInProgress ? response.GetTxId() : response.GetPathCreateTxId(); + if (txId == 0) { + ScheduleRetry(response, "Unable to subscribe to concurrent transaction"); + return; + } + + PipeClientClosedByUs = false; + SchemePipeActorId = Register(NTabletPipe::CreateClient(SelfId(), response.GetSchemeShardTabletId())); + + auto request = MakeHolder(); + request->Record.SetTxId(txId); + NTabletPipe::SendData(SelfId(), SchemePipeActorId, std::move(request)); + LOG_D("Subscribe on create pool tx: " << txId); + } + + void ScheduleRetry(const NKikimrTxUserProxy::TEvProposeTransactionStatus& response, const TString& message, bool longDelay = false) { + if (SchemePipeActorId){ + PipeClientClosedByUs = true; + NTabletPipe::CloseClient(SelfId(), SchemePipeActorId); + } + + auto ssStatus = static_cast(response.GetSchemeShardStatus()); + if (!TBase::ScheduleRetry(ExtractIssues(response, TStringBuilder() << message << ", status: " << ssStatus), longDelay)) { + Reply(Ydb::StatusIds::UNAVAILABLE, ExtractIssues(response, TStringBuilder() << "Retry limit exceeded on status: " << ssStatus)); + } + } + + void ScheduleRetry(const TString& message, bool longDelay = false) { + if (SchemePipeActorId){ + PipeClientClosedByUs = true; + NTabletPipe::CloseClient(SelfId(), SchemePipeActorId); + } + + if (!TBase::ScheduleRetry(message, longDelay)) { + Reply(Ydb::StatusIds::UNAVAILABLE, TStringBuilder() << "Retry limit exceeded on error: " << message); } } @@ -358,11 +423,21 @@ class TPoolCreatorActor : public TSchemeActorBase { LOG_W("Failed to create pool, " << status << ", issues: " << issues.ToOneLineString()); } + if (SchemePipeActorId) { + NTabletPipe::CloseClient(SelfId(), SchemePipeActorId); + } + Issues.AddIssues(std::move(issues)); Send(ReplyActorId, new TEvPrivate::TEvCreatePoolResponse(status, std::move(Issues))); PassAway(); } + static NYql::TIssues ExtractIssues(const NKikimrTxUserProxy::TEvProposeTransactionStatus& response, const TString& message) { + NYql::TIssues issues; + NYql::IssuesFromMessage(response.GetIssues(), issues); + return GroupIssues(issues, message); + } + private: const TActorId ReplyActorId; const TString Database; @@ -370,6 +445,9 @@ class TPoolCreatorActor : public TSchemeActorBase { const TIntrusiveConstPtr UserToken; const NACLibProto::TDiffACL DiffAcl; NResourcePool::TPoolSettings PoolConfig; + + NActors::TActorId SchemePipeActorId; + bool PipeClientClosedByUs = false; }; } // anonymous namespace diff --git a/ydb/core/kqp/workload_service/common/helpers.h b/ydb/core/kqp/workload_service/common/helpers.h index edb489bc3fc6..e85afd7627f1 100644 --- a/ydb/core/kqp/workload_service/common/helpers.h +++ b/ydb/core/kqp/workload_service/common/helpers.h @@ -25,6 +25,7 @@ namespace NKikimr::NKqp::NWorkload { #define LOG_C(stream) LOG_CRIT_S(*TlsActivationContext, NKikimrServices::KQP_WORKLOAD_SERVICE, "[WorkloadService] " << LogPrefix() << stream) + template class TSchemeActorBase : public NActors::TActorBootstrapped { using TRetryPolicy = IRetryPolicy; @@ -62,21 +63,25 @@ class TSchemeActorBase : public NActors::TActorBootstrapped { virtual TString LogPrefix() const = 0; protected: - bool ScheduleRetry(const TString& message, bool longDelay = false) { + bool ScheduleRetry(NYql::TIssues issues, bool longDelay = false) { if (!RetryState) { RetryState = CreateRetryState(); } if (const auto delay = RetryState->GetNextRetryDelay(longDelay)) { - Issues.AddIssue(message); + Issues.AddIssues(issues); this->Schedule(*delay, new TEvents::TEvWakeup()); - LOG_W("Scheduled retry for error: " << message); + LOG_W("Scheduled retry for error: " << issues.ToOneLineString()); return true; } return false; } + bool ScheduleRetry(const TString& message, bool longDelay = false) { + return ScheduleRetry({NYql::TIssue(message)}, longDelay); + } + private: static TRetryPolicy::IRetryState::TPtr CreateRetryState() { return TRetryPolicy::GetFixedIntervalPolicy( diff --git a/ydb/core/kqp/workload_service/kqp_workload_service.cpp b/ydb/core/kqp/workload_service/kqp_workload_service.cpp index bf067d74a990..6c411f8c6e75 100644 --- a/ydb/core/kqp/workload_service/kqp_workload_service.cpp +++ b/ydb/core/kqp/workload_service/kqp_workload_service.cpp @@ -491,6 +491,10 @@ class TKqpWorkloadService : public TActorBootstrapped { token->AddGroupSID(allAuthenticatedUsersSID); } + if (userToken && !userToken->GetSerializedToken().empty()) { + token->SaveSerializationInfo(); + } + return token; } diff --git a/ydb/core/kqp/workload_service/ut/kqp_workload_service_actors_ut.cpp b/ydb/core/kqp/workload_service/ut/kqp_workload_service_actors_ut.cpp index 2c259f9f601a..d044e4982929 100644 --- a/ydb/core/kqp/workload_service/ut/kqp_workload_service_actors_ut.cpp +++ b/ydb/core/kqp/workload_service/ut/kqp_workload_service_actors_ut.cpp @@ -16,7 +16,9 @@ TEvPrivate::TEvFetchPoolResponse::TPtr FetchPool(TIntrusivePtr ydb, c auto runtime = ydb->GetRuntime(); const auto& edgeActor = runtime->AllocateEdgeActor(); - runtime->Register(CreatePoolFetcherActor(edgeActor, settings.DomainName_, poolId ? poolId : settings.PoolId_, MakeIntrusive(userSID, TVector{}), true)); + auto userToken = MakeIntrusive(userSID, TVector{}); + userToken->SaveSerializationInfo(); + runtime->Register(CreatePoolFetcherActor(edgeActor, settings.DomainName_, poolId ? poolId : settings.PoolId_, userToken, true)); return runtime->GrabEdgeEvent(edgeActor, FUTURE_WAIT_TIMEOUT); } diff --git a/ydb/library/table_creator/table_creator.cpp b/ydb/library/table_creator/table_creator.cpp index 38dc0a914cb2..769946ce258a 100644 --- a/ydb/library/table_creator/table_creator.cpp +++ b/ydb/library/table_creator/table_creator.cpp @@ -392,7 +392,9 @@ THolder BuildSchemeCacheNavigateRequest(cons auto request = MakeHolder(); auto databasePath = SplitPath(database); request->DatabaseName = CanonizePath(databasePath); - request->UserToken = userToken; + if (userToken && !userToken->GetSerializedToken().empty()) { + request->UserToken = userToken; + } for (const auto& pathComponents : pathsComponents) { auto& entry = request->ResultSet.emplace_back(); diff --git a/ydb/tests/tools/kqprun/kqprun.cpp b/ydb/tests/tools/kqprun/kqprun.cpp index 3793c38ef937..1cedaca54534 100644 --- a/ydb/tests/tools/kqprun/kqprun.cpp +++ b/ydb/tests/tools/kqprun/kqprun.cpp @@ -13,6 +13,7 @@ #include +#include #include #include #include @@ -37,17 +38,23 @@ struct TExecutionOptions { bool ForgetExecution = false; std::vector ExecutionCases; - NKikimrKqp::EQueryAction ScriptQueryAction = NKikimrKqp::QUERY_ACTION_EXECUTE; + std::vector ScriptQueryActions; + std::vector TraceIds; + std::vector PoolIds; + std::vector UserSIDs; - const TString TraceId = "kqprun_" + CreateGuidAsString(); + const TString DefaultTraceId = "kqprun"; bool HasResults() const { - if (ScriptQueries.empty() || ScriptQueryAction != NKikimrKqp::QUERY_ACTION_EXECUTE) { + if (ScriptQueries.empty()) { return false; } - for (EExecutionCase executionCase : ExecutionCases) { - if (executionCase != EExecutionCase::AsyncQuery) { + for (size_t i = 0; i < ExecutionCases.size(); ++i) { + if (i < ScriptQueryActions.size() && ScriptQueryActions[i] != NKikimrKqp::EQueryAction::QUERY_ACTION_EXECUTE) { + continue; + } + if (ExecutionCases[i] != EExecutionCase::AsyncQuery) { return true; } } @@ -55,8 +62,37 @@ struct TExecutionOptions { } EExecutionCase GetExecutionCase(size_t index) const { - Y_ABORT_UNLESS(!ExecutionCases.empty()); - return ExecutionCases[std::min(index, ExecutionCases.size() - 1)]; + return GetValue(index, ExecutionCases, EExecutionCase::GenericScript); + } + + NKqpRun::TRequestOptions GetSchemeQueryOptions() const { + return { + .Query = SchemeQuery, + .Action = NKikimrKqp::EQueryAction::QUERY_ACTION_EXECUTE, + .TraceId = DefaultTraceId, + .PoolId = "", + .UserSID = BUILTIN_ACL_ROOT + }; + } + + NKqpRun::TRequestOptions GetScriptQueryOptions(size_t index, TInstant startTime) const { + Y_ABORT_UNLESS(index < ScriptQueries.size()); + return { + .Query = ScriptQueries[index], + .Action = GetValue(index, ScriptQueryActions, NKikimrKqp::EQueryAction::QUERY_ACTION_EXECUTE), + .TraceId = TStringBuilder() << GetValue(index, TraceIds, DefaultTraceId) << "-" << startTime.ToString(), + .PoolId = GetValue(index, PoolIds, TString()), + .UserSID = GetValue(index, UserSIDs, TString(BUILTIN_ACL_ROOT)) + }; + } + +private: + template + static TValue GetValue(size_t index, const std::vector& values, TValue defaultValue) { + if (values.empty()) { + return defaultValue; + } + return values[std::min(index, values.size() - 1)]; } }; @@ -66,7 +102,7 @@ void RunArgumentQueries(const TExecutionOptions& executionOptions, NKqpRun::TKqp if (executionOptions.SchemeQuery) { Cout << colors.Yellow() << TInstant::Now().ToIsoStringLocal() << " Executing scheme query..." << colors.Default() << Endl; - if (!runner.ExecuteSchemeQuery(executionOptions.SchemeQuery, executionOptions.TraceId)) { + if (!runner.ExecuteSchemeQuery(executionOptions.GetSchemeQueryOptions())) { ythrow yexception() << TInstant::Now().ToIsoStringLocal() << " Scheme query execution failed"; } } @@ -79,9 +115,10 @@ void RunArgumentQueries(const TExecutionOptions& executionOptions, NKqpRun::TKqp Sleep(executionOptions.LoopDelay); } + const TInstant startTime = TInstant::Now(); const auto executionCase = executionOptions.GetExecutionCase(id); if (executionCase != TExecutionOptions::EExecutionCase::AsyncQuery) { - Cout << colors.Yellow() << TInstant::Now().ToIsoStringLocal() << " Executing script"; + Cout << colors.Yellow() << startTime.ToIsoStringLocal() << " Executing script"; if (numberQueries > 1) { Cout << " " << id; } @@ -91,13 +128,11 @@ void RunArgumentQueries(const TExecutionOptions& executionOptions, NKqpRun::TKqp Cout << "..." << colors.Default() << Endl; } - TInstant startTime = TInstant::Now(); switch (executionCase) { case TExecutionOptions::EExecutionCase::GenericScript: - if (!runner.ExecuteScript(executionOptions.ScriptQueries[id], executionOptions.ScriptQueryAction, executionOptions.TraceId)) { + if (!runner.ExecuteScript(executionOptions.GetScriptQueryOptions(id, startTime))) { ythrow yexception() << TInstant::Now().ToIsoStringLocal() << " Script execution failed"; } - Cout << colors.Cyan() << "Script request finished. Time: " << TInstant::Now() - startTime << colors.Default() << Endl; Cout << colors.Yellow() << TInstant::Now().ToIsoStringLocal() << " Fetching script results..." << colors.Default() << Endl; if (!runner.FetchScriptResults()) { ythrow yexception() << TInstant::Now().ToIsoStringLocal() << " Fetch script results failed"; @@ -111,21 +146,19 @@ void RunArgumentQueries(const TExecutionOptions& executionOptions, NKqpRun::TKqp break; case TExecutionOptions::EExecutionCase::GenericQuery: - if (!runner.ExecuteQuery(executionOptions.ScriptQueries[id], executionOptions.ScriptQueryAction, executionOptions.TraceId)) { + if (!runner.ExecuteQuery(executionOptions.GetScriptQueryOptions(id, startTime))) { ythrow yexception() << TInstant::Now().ToIsoStringLocal() << " Query execution failed"; } - Cout << colors.Cyan() << "Generic request finished. Time: " << TInstant::Now() - startTime << colors.Default() << Endl; break; case TExecutionOptions::EExecutionCase::YqlScript: - if (!runner.ExecuteYqlScript(executionOptions.ScriptQueries[id], executionOptions.ScriptQueryAction, executionOptions.TraceId)) { + if (!runner.ExecuteYqlScript(executionOptions.GetScriptQueryOptions(id, startTime))) { ythrow yexception() << TInstant::Now().ToIsoStringLocal() << " Yql script execution failed"; } - Cout << colors.Cyan() << "Yql script request finished. Time: " << TInstant::Now() - startTime << colors.Default() << Endl; break; case TExecutionOptions::EExecutionCase::AsyncQuery: - runner.ExecuteQueryAsync(executionOptions.ScriptQueries[id], executionOptions.ScriptQueryAction, executionOptions.TraceId); + runner.ExecuteQueryAsync(executionOptions.GetScriptQueryOptions(id, startTime)); break; } } @@ -349,6 +382,9 @@ class TMain : public TMainClassArgs { RunnerOptions.YdbSettings.TraceOptEnabled = traceOptType != NKqpRun::TRunnerOptions::ETraceOptType::Disabled; return traceOptType; }); + options.AddLongOption("trace-id", "Trace id for -p queries") + .RequiredArgument("id") + .EmplaceTo(&ExecutionOptions.TraceIds); options.AddLongOption("result-file", "File with script execution results (use '-' to write in stdout)") .RequiredArgument("file") @@ -423,7 +459,10 @@ class TMain : public TMainClassArgs { .RequiredArgument("script-action") .DefaultValue("execute") .Choices(scriptAction.GetChoices()) - .StoreMappedResultT(&ExecutionOptions.ScriptQueryAction, scriptAction); + .Handler1([this, scriptAction](const NLastGetopt::TOptsParser* option) { + TString choice(option->CurValOrDef()); + ExecutionOptions.ScriptQueryActions.emplace_back(scriptAction(choice)); + }); options.AddLongOption('F', "forget", "Forget script execution operation after fetching results") .NoArgument() @@ -438,9 +477,13 @@ class TMain : public TMainClassArgs { .DefaultValue(1000) .StoreMappedResultT(&ExecutionOptions.LoopDelay, &TDuration::MilliSeconds); + options.AddLongOption('U', "user", "User SID for -p queries") + .RequiredArgument("user-SID") + .EmplaceTo(&ExecutionOptions.UserSIDs); + options.AddLongOption("pool", "Workload manager pool in which queries will be executed") .RequiredArgument("pool-id") - .StoreResult(&RunnerOptions.YdbSettings.DefaultPoolId); + .EmplaceTo(&ExecutionOptions.PoolIds); // Cluster settings diff --git a/ydb/tests/tools/kqprun/src/common.h b/ydb/tests/tools/kqprun/src/common.h index a16c1010f9f5..561f40e38293 100644 --- a/ydb/tests/tools/kqprun/src/common.h +++ b/ydb/tests/tools/kqprun/src/common.h @@ -1,6 +1,7 @@ #pragma once #include +#include #include #include @@ -16,7 +17,6 @@ constexpr char YQL_TOKEN_VARIABLE[] = "YQL_TOKEN"; struct TYdbSetupSettings { ui32 NodeCount = 1; TString DomainName = "Root"; - TString DefaultPoolId; TDuration InitializationTimeout = TDuration::Seconds(10); bool MonitoringEnabled = false; @@ -61,4 +61,13 @@ struct TRunnerOptions { TYdbSetupSettings YdbSettings; }; + +struct TRequestOptions { + TString Query; + NKikimrKqp::EQueryAction Action; + TString TraceId; + TString PoolId; + TString UserSID; +}; + } // namespace NKqpRun diff --git a/ydb/tests/tools/kqprun/src/kqp_runner.cpp b/ydb/tests/tools/kqprun/src/kqp_runner.cpp index 399025dcee25..b75098331a48 100644 --- a/ydb/tests/tools/kqprun/src/kqp_runner.cpp +++ b/ydb/tests/tools/kqprun/src/kqp_runner.cpp @@ -101,11 +101,11 @@ class TKqpRunner::TImpl { , CoutColors_(NColorizer::AutoColors(Cout)) {} - bool ExecuteSchemeQuery(const TString& query, const TString& traceId) const { + bool ExecuteSchemeQuery(const TRequestOptions& query) const { StartSchemeTraceOpt(); TSchemeMeta meta; - TRequestResult status = YdbSetup_.SchemeQueryRequest(query, traceId, meta); + TRequestResult status = YdbSetup_.SchemeQueryRequest(query, meta); TYdbSetup::StopTraceOpt(); PrintSchemeQueryAst(meta.Ast); @@ -118,10 +118,10 @@ class TKqpRunner::TImpl { return true; } - bool ExecuteScript(const TString& script, NKikimrKqp::EQueryAction action, const TString& traceId) { + bool ExecuteScript(const TRequestOptions& script) { StartScriptTraceOpt(); - TRequestResult status = YdbSetup_.ScriptRequest(script, action, traceId, ExecutionOperation_); + TRequestResult status = YdbSetup_.ScriptRequest(script, ExecutionOperation_); if (!status.IsSuccess()) { Cerr << CerrColors_.Red() << "Failed to start script execution, reason:" << CerrColors_.Default() << Endl << status.ToString() << Endl; @@ -131,30 +131,35 @@ class TKqpRunner::TImpl { return WaitScriptExecutionOperation(); } - bool ExecuteQuery(const TString& query, NKikimrKqp::EQueryAction action, const TString& traceId, EQueryType queryType) { + bool ExecuteQuery(const TRequestOptions& query, EQueryType queryType) { StartScriptTraceOpt(); + StartTime_ = TInstant::Now(); + TString queryTypeStr; TQueryMeta meta; TRequestResult status; switch (queryType) { case EQueryType::ScriptQuery: - status = YdbSetup_.QueryRequest(query, action, traceId, meta, ResultSets_, GetProgressCallback()); + queryTypeStr = "Generic"; + status = YdbSetup_.QueryRequest(query, meta, ResultSets_, GetProgressCallback()); break; case EQueryType::YqlScriptQuery: - status = YdbSetup_.YqlScriptRequest(query, action, traceId, meta, ResultSets_); + queryTypeStr = "Yql script"; + status = YdbSetup_.YqlScriptRequest(query, meta, ResultSets_); break; case EQueryType::AsyncQuery: - YdbSetup_.QueryRequestAsync(query, action, traceId); + YdbSetup_.QueryRequestAsync(query); return true; } TYdbSetup::StopTraceOpt(); PrintScriptAst(meta.Ast); - + PrintScriptProgress(ExecutionMeta_.Plan); PrintScriptPlan(meta.Plan); + PrintScriptFinish(meta, queryTypeStr); if (!status.IsSuccess()) { Cerr << CerrColors_.Red() << "Failed to execute query, reason:" << CerrColors_.Default() << Endl << status.ToString() << Endl; @@ -220,6 +225,7 @@ class TKqpRunner::TImpl { private: bool WaitScriptExecutionOperation() { + StartTime_ = TInstant::Now(); ExecutionMeta_ = TExecutionMeta(); TDuration getOperationPeriod = TDuration::Seconds(1); @@ -245,8 +251,8 @@ class TKqpRunner::TImpl { } PrintScriptAst(ExecutionMeta_.Ast); - PrintScriptPlan(ExecutionMeta_.Plan); + PrintScriptFinish(ExecutionMeta_, "Script"); if (!status.IsSuccess() || ExecutionMeta_.ExecutionStatus != NYdb::NQuery::EExecStatus::Completed) { Cerr << CerrColors_.Red() << "Failed to execute script, invalid final status, reason:" << CerrColors_.Default() << Endl << status.ToString() << Endl; @@ -377,6 +383,16 @@ class TKqpRunner::TImpl { } } + void PrintScriptFinish(const TQueryMeta& meta, const TString& queryType) const { + Cout << CoutColors_.Cyan() << queryType << " request finished."; + if (meta.TotalDuration) { + Cout << " Total duration: " << meta.TotalDuration; + } else { + Cout << " Estimated duration: " << TInstant::Now() - StartTime_; + } + Cout << CoutColors_.Default() << Endl; + } + private: TRunnerOptions Options_; @@ -388,6 +404,7 @@ class TKqpRunner::TImpl { TString ExecutionOperation_; TExecutionMeta ExecutionMeta_; std::vector ResultSets_; + TInstant StartTime_; }; @@ -397,24 +414,24 @@ TKqpRunner::TKqpRunner(const TRunnerOptions& options) : Impl_(new TImpl(options)) {} -bool TKqpRunner::ExecuteSchemeQuery(const TString& query, const TString& traceId) const { - return Impl_->ExecuteSchemeQuery(query, traceId); +bool TKqpRunner::ExecuteSchemeQuery(const TRequestOptions& query) const { + return Impl_->ExecuteSchemeQuery(query); } -bool TKqpRunner::ExecuteScript(const TString& script, NKikimrKqp::EQueryAction action, const TString& traceId) const { - return Impl_->ExecuteScript(script, action, traceId); +bool TKqpRunner::ExecuteScript(const TRequestOptions& script) const { + return Impl_->ExecuteScript(script); } -bool TKqpRunner::ExecuteQuery(const TString& query, NKikimrKqp::EQueryAction action, const TString& traceId) const { - return Impl_->ExecuteQuery(query, action, traceId, TImpl::EQueryType::ScriptQuery); +bool TKqpRunner::ExecuteQuery(const TRequestOptions& query) const { + return Impl_->ExecuteQuery(query, TImpl::EQueryType::ScriptQuery); } -bool TKqpRunner::ExecuteYqlScript(const TString& query, NKikimrKqp::EQueryAction action, const TString& traceId) const { - return Impl_->ExecuteQuery(query, action, traceId, TImpl::EQueryType::YqlScriptQuery); +bool TKqpRunner::ExecuteYqlScript(const TRequestOptions& query) const { + return Impl_->ExecuteQuery(query, TImpl::EQueryType::YqlScriptQuery); } -void TKqpRunner::ExecuteQueryAsync(const TString& query, NKikimrKqp::EQueryAction action, const TString& traceId) const { - Impl_->ExecuteQuery(query, action, traceId, TImpl::EQueryType::AsyncQuery); +void TKqpRunner::ExecuteQueryAsync(const TRequestOptions& query) const { + Impl_->ExecuteQuery(query, TImpl::EQueryType::AsyncQuery); } void TKqpRunner::WaitAsyncQueries() const { diff --git a/ydb/tests/tools/kqprun/src/kqp_runner.h b/ydb/tests/tools/kqprun/src/kqp_runner.h index 3687a7cbda06..83c707bbd989 100644 --- a/ydb/tests/tools/kqprun/src/kqp_runner.h +++ b/ydb/tests/tools/kqprun/src/kqp_runner.h @@ -2,7 +2,6 @@ #include "common.h" -#include namespace NKqpRun { @@ -10,15 +9,15 @@ class TKqpRunner { public: explicit TKqpRunner(const TRunnerOptions& options); - bool ExecuteSchemeQuery(const TString& query, const TString& traceId) const; + bool ExecuteSchemeQuery(const TRequestOptions& query) const; - bool ExecuteScript(const TString& script, NKikimrKqp::EQueryAction action, const TString& traceId) const; + bool ExecuteScript(const TRequestOptions& script) const; - bool ExecuteQuery(const TString& query, NKikimrKqp::EQueryAction action, const TString& traceId) const; + bool ExecuteQuery(const TRequestOptions& query) const; - bool ExecuteYqlScript(const TString& query, NKikimrKqp::EQueryAction action, const TString& traceId) const; + bool ExecuteYqlScript(const TRequestOptions& query) const; - void ExecuteQueryAsync(const TString& query, NKikimrKqp::EQueryAction action, const TString& traceId) const; + void ExecuteQueryAsync(const TRequestOptions& query) const; void WaitAsyncQueries() const; diff --git a/ydb/tests/tools/kqprun/src/ydb_setup.cpp b/ydb/tests/tools/kqprun/src/ydb_setup.cpp index b40738e806d0..9026a0fb40c0 100644 --- a/ydb/tests/tools/kqprun/src/ydb_setup.cpp +++ b/ydb/tests/tools/kqprun/src/ydb_setup.cpp @@ -60,6 +60,14 @@ class TStaticSecuredCredentialsFactory : public NYql::ISecuredServiceAccountCred TString YqlToken_; }; +void FillQueryMeta(TQueryMeta& meta, const NKikimrKqp::TQueryResponse& response) { + meta.Ast = response.GetQueryAst(); + if (const auto& plan = response.GetQueryPlan()) { + meta.Plan = plan; + } + meta.TotalDuration = TDuration::MicroSeconds(response.GetQueryStats().GetDurationUs()); +} + } // anonymous namespace @@ -207,31 +215,31 @@ class TYdbSetup::TImpl { } } - NKikimr::NKqp::TEvKqp::TEvQueryResponse::TPtr SchemeQueryRequest(const TString& query, const TString& traceId) const { + NKikimr::NKqp::TEvKqp::TEvQueryResponse::TPtr SchemeQueryRequest(const TRequestOptions& query) const { auto event = MakeHolder(); - FillQueryRequest(query, NKikimrKqp::QUERY_TYPE_SQL_DDL, NKikimrKqp::QUERY_ACTION_EXECUTE, traceId, event->Record); + FillQueryRequest(query, NKikimrKqp::QUERY_TYPE_SQL_DDL, event->Record); return RunKqpProxyRequest(std::move(event)); } - NKikimr::NKqp::TEvKqp::TEvScriptResponse::TPtr ScriptRequest(const TString& script, NKikimrKqp::EQueryAction action, const TString& traceId) const { + NKikimr::NKqp::TEvKqp::TEvScriptResponse::TPtr ScriptRequest(const TRequestOptions& script) const { auto event = MakeHolder(); - FillScriptRequest(script, action, traceId, event->Record); + FillScriptRequest(script, event->Record); return RunKqpProxyRequest(std::move(event)); } - TQueryResponse QueryRequest(const TString& query, NKikimrKqp::EQueryAction action, const TString& traceId, TProgressCallback progressCallback) const { - auto request = GetQueryRequest(query, action, traceId); + TQueryResponse QueryRequest(const TRequestOptions& query, TProgressCallback progressCallback) const { + auto request = GetQueryRequest(query); auto promise = NThreading::NewPromise(); GetRuntime()->Register(CreateRunScriptActorMock(std::move(request), promise, progressCallback)); return promise.GetFuture().GetValueSync(); } - NKikimr::NKqp::TEvKqp::TEvQueryResponse::TPtr YqlScriptRequest(const TString& query, NKikimrKqp::EQueryAction action, const TString& traceId) const { + NKikimr::NKqp::TEvKqp::TEvQueryResponse::TPtr YqlScriptRequest(const TRequestOptions& query) const { auto event = MakeHolder(); - FillQueryRequest(query, NKikimrKqp::QUERY_TYPE_SQL_SCRIPT, action, traceId, event->Record); + FillQueryRequest(query, NKikimrKqp::QUERY_TYPE_SQL_SCRIPT, event->Record); return RunKqpProxyRequest(std::move(event)); } @@ -264,12 +272,12 @@ class TYdbSetup::TImpl { return RunKqpProxyRequest(std::move(event)); } - void QueryRequestAsync(const TString& query, NKikimrKqp::EQueryAction action, const TString& traceId) { + void QueryRequestAsync(const TRequestOptions& query) { if (!AsyncQueryRunnerActorId_) { AsyncQueryRunnerActorId_ = GetRuntime()->Register(CreateAsyncQueryRunnerActor(Settings_.InFlightLimit)); } - auto request = GetQueryRequest(query, action, traceId); + auto request = GetQueryRequest(query); auto startPromise = NThreading::NewPromise(); GetRuntime()->Send(*AsyncQueryRunnerActorId_, GetRuntime()->AllocateEdgeActor(), new TEvPrivate::TEvStartAsyncQuery(std::move(request), startPromise)); @@ -316,32 +324,32 @@ class TYdbSetup::TImpl { } private: - void FillQueryRequest(const TString& query, NKikimrKqp::EQueryType type, NKikimrKqp::EQueryAction action, const TString& traceId, NKikimrKqp::TEvQueryRequest& event) const { - event.SetTraceId(traceId); - event.SetUserToken(NACLib::TUserToken(Settings_.YqlToken, BUILTIN_ACL_ROOT, {}).SerializeAsString()); + void FillQueryRequest(const TRequestOptions& query, NKikimrKqp::EQueryType type, NKikimrKqp::TEvQueryRequest& event) const { + event.SetTraceId(query.TraceId); + event.SetUserToken(NACLib::TUserToken(Settings_.YqlToken, query.UserSID, {}).SerializeAsString()); auto request = event.MutableRequest(); - request->SetQuery(query); + request->SetQuery(query.Query); request->SetType(type); - request->SetAction(action); + request->SetAction(query.Action); request->SetCollectStats(Ydb::Table::QueryStatsCollection::STATS_COLLECTION_FULL); request->SetDatabase(Settings_.DomainName); - request->SetPoolId(Settings_.DefaultPoolId); + request->SetPoolId(query.PoolId); } - void FillScriptRequest(const TString& script, NKikimrKqp::EQueryAction action, const TString& traceId, NKikimrKqp::TEvQueryRequest& event) const { - FillQueryRequest(script, NKikimrKqp::QUERY_TYPE_SQL_GENERIC_SCRIPT, action, traceId, event); + void FillScriptRequest(const TRequestOptions& script, NKikimrKqp::TEvQueryRequest& event) const { + FillQueryRequest(script, NKikimrKqp::QUERY_TYPE_SQL_GENERIC_SCRIPT, event); auto request = event.MutableRequest(); - if (action == NKikimrKqp::QUERY_ACTION_EXECUTE) { + if (script.Action == NKikimrKqp::QUERY_ACTION_EXECUTE) { request->MutableTxControl()->mutable_begin_tx()->mutable_serializable_read_write(); request->MutableTxControl()->set_commit_tx(true); } } - TQueryRequest GetQueryRequest(const TString& query, NKikimrKqp::EQueryAction action, const TString& traceId) const { + TQueryRequest GetQueryRequest(const TRequestOptions& query) const { auto event = std::make_unique(); - FillQueryRequest(query, NKikimrKqp::QUERY_TYPE_SQL_GENERIC_QUERY, action, traceId, event->Record); + FillQueryRequest(query, NKikimrKqp::QUERY_TYPE_SQL_GENERIC_QUERY, event->Record); if (auto progressStatsPeriodMs = Settings_.AppConfig.GetQueryServiceConfig().GetProgressStatsPeriodMs()) { event->SetProgressStatsPeriod(TDuration::MilliSeconds(progressStatsPeriodMs)); @@ -399,8 +407,8 @@ TYdbSetup::TYdbSetup(const TYdbSetupSettings& settings) : Impl_(new TImpl(settings)) {} -TRequestResult TYdbSetup::SchemeQueryRequest(const TString& query, const TString& traceId, TSchemeMeta& meta) const { - auto schemeQueryOperationResponse = Impl_->SchemeQueryRequest(query, traceId)->Get()->Record.GetRef(); +TRequestResult TYdbSetup::SchemeQueryRequest(const TRequestOptions& query, TSchemeMeta& meta) const { + auto schemeQueryOperationResponse = Impl_->SchemeQueryRequest(query)->Get()->Record.GetRef(); const auto& responseRecord = schemeQueryOperationResponse.GetResponse(); meta.Ast = responseRecord.GetQueryAst(); @@ -408,38 +416,34 @@ TRequestResult TYdbSetup::SchemeQueryRequest(const TString& query, const TString return TRequestResult(schemeQueryOperationResponse.GetYdbStatus(), responseRecord.GetQueryIssues()); } -TRequestResult TYdbSetup::ScriptRequest(const TString& script, NKikimrKqp::EQueryAction action, const TString& traceId, TString& operation) const { - auto scriptExecutionOperation = Impl_->ScriptRequest(script, action, traceId); +TRequestResult TYdbSetup::ScriptRequest(const TRequestOptions& script, TString& operation) const { + auto scriptExecutionOperation = Impl_->ScriptRequest(script); operation = scriptExecutionOperation->Get()->OperationId; return TRequestResult(scriptExecutionOperation->Get()->Status, scriptExecutionOperation->Get()->Issues); } -TRequestResult TYdbSetup::QueryRequest(const TString& query, NKikimrKqp::EQueryAction action, const TString& traceId, TQueryMeta& meta, std::vector& resultSets, TProgressCallback progressCallback) const { +TRequestResult TYdbSetup::QueryRequest(const TRequestOptions& query, TQueryMeta& meta, std::vector& resultSets, TProgressCallback progressCallback) const { resultSets.clear(); - TQueryResponse queryResponse = Impl_->QueryRequest(query, action, traceId, progressCallback); + TQueryResponse queryResponse = Impl_->QueryRequest(query, progressCallback); const auto& queryOperationResponse = queryResponse.Response->Get()->Record.GetRef(); const auto& responseRecord = queryOperationResponse.GetResponse(); resultSets = std::move(queryResponse.ResultSets); - meta.Ast = responseRecord.GetQueryAst(); - if (const auto& plan = responseRecord.GetQueryPlan()) { - meta.Plan = plan; - } + FillQueryMeta(meta, responseRecord); return TRequestResult(queryOperationResponse.GetYdbStatus(), responseRecord.GetQueryIssues()); } -TRequestResult TYdbSetup::YqlScriptRequest(const TString& query, NKikimrKqp::EQueryAction action, const TString& traceId, TQueryMeta& meta, std::vector& resultSets) const { +TRequestResult TYdbSetup::YqlScriptRequest(const TRequestOptions& query, TQueryMeta& meta, std::vector& resultSets) const { resultSets.clear(); - auto yqlQueryOperationResponse = Impl_->YqlScriptRequest(query, action, traceId)->Get()->Record.GetRef(); + auto yqlQueryOperationResponse = Impl_->YqlScriptRequest(query)->Get()->Record.GetRef(); const auto& responseRecord = yqlQueryOperationResponse.GetResponse(); - meta.Ast = responseRecord.GetQueryAst(); - meta.Plan = responseRecord.GetQueryPlan(); + FillQueryMeta(meta, responseRecord); resultSets.reserve(responseRecord.results_size()); for (const auto& result : responseRecord.results()) { @@ -466,6 +470,7 @@ TRequestResult TYdbSetup::GetScriptExecutionOperationRequest(const TString& oper if (deserializedMeta.exec_stats().query_plan() != "{}") { meta.Plan = deserializedMeta.exec_stats().query_plan(); } + meta.TotalDuration = TDuration::MicroSeconds(deserializedMeta.exec_stats().total_duration_us()); } return TRequestResult(scriptExecutionOperation->Get()->Status, scriptExecutionOperation->Get()->Issues); @@ -485,8 +490,8 @@ TRequestResult TYdbSetup::ForgetScriptExecutionOperationRequest(const TString& o return TRequestResult(forgetScriptExecutionOperationResponse->Get()->Status, forgetScriptExecutionOperationResponse->Get()->Issues); } -void TYdbSetup::QueryRequestAsync(const TString& query, NKikimrKqp::EQueryAction action, const TString& traceId) const { - Impl_->QueryRequestAsync(query, action, traceId); +void TYdbSetup::QueryRequestAsync(const TRequestOptions& query) const { + Impl_->QueryRequestAsync(query); } void TYdbSetup::WaitAsyncQueries() const { diff --git a/ydb/tests/tools/kqprun/src/ydb_setup.h b/ydb/tests/tools/kqprun/src/ydb_setup.h index 017ab6e18ede..67ca5c26c7b5 100644 --- a/ydb/tests/tools/kqprun/src/ydb_setup.h +++ b/ydb/tests/tools/kqprun/src/ydb_setup.h @@ -3,7 +3,6 @@ #include "common.h" #include "actors.h" -#include #include @@ -14,20 +13,18 @@ struct TSchemeMeta { }; -struct TExecutionMeta { - bool Ready = false; - NYdb::NQuery::EExecStatus ExecutionStatus = NYdb::NQuery::EExecStatus::Unspecified; - - i32 ResultSetsCount = 0; - +struct TQueryMeta { TString Ast; TString Plan; + TDuration TotalDuration; }; -struct TQueryMeta { - TString Ast; - TString Plan; +struct TExecutionMeta : public TQueryMeta { + bool Ready = false; + NYdb::NQuery::EExecStatus ExecutionStatus = NYdb::NQuery::EExecStatus::Unspecified; + + i32 ResultSetsCount = 0; }; @@ -51,13 +48,13 @@ class TYdbSetup { public: explicit TYdbSetup(const TYdbSetupSettings& settings); - TRequestResult SchemeQueryRequest(const TString& query, const TString& traceId, TSchemeMeta& meta) const; + TRequestResult SchemeQueryRequest(const TRequestOptions& query, TSchemeMeta& meta) const; - TRequestResult ScriptRequest(const TString& script, NKikimrKqp::EQueryAction action, const TString& traceId, TString& operation) const; + TRequestResult ScriptRequest(const TRequestOptions& script, TString& operation) const; - TRequestResult QueryRequest(const TString& query, NKikimrKqp::EQueryAction action, const TString& traceId, TQueryMeta& meta, std::vector& resultSets, TProgressCallback progressCallback) const; + TRequestResult QueryRequest(const TRequestOptions& query, TQueryMeta& meta, std::vector& resultSets, TProgressCallback progressCallback) const; - TRequestResult YqlScriptRequest(const TString& query, NKikimrKqp::EQueryAction action, const TString& traceId, TQueryMeta& meta, std::vector& resultSets) const; + TRequestResult YqlScriptRequest(const TRequestOptions& query, TQueryMeta& meta, std::vector& resultSets) const; TRequestResult GetScriptExecutionOperationRequest(const TString& operation, TExecutionMeta& meta) const; @@ -65,7 +62,7 @@ class TYdbSetup { TRequestResult ForgetScriptExecutionOperationRequest(const TString& operation) const; - void QueryRequestAsync(const TString& query, NKikimrKqp::EQueryAction action, const TString& traceId) const; + void QueryRequestAsync(const TRequestOptions& query) const; void WaitAsyncQueries() const; From 8c7f1a0c94ad44b54add0e133a31ae8acf4a1b7c Mon Sep 17 00:00:00 2001 From: Grigoriy Pisarenko Date: Tue, 23 Jul 2024 13:02:21 +0000 Subject: [PATCH 2/6] Fixed pipe client closing --- .../workload_service/actors/scheme_actors.cpp | 45 +++++++++---------- 1 file changed, 20 insertions(+), 25 deletions(-) diff --git a/ydb/core/kqp/workload_service/actors/scheme_actors.cpp b/ydb/core/kqp/workload_service/actors/scheme_actors.cpp index 2e282169d205..5676c7182df4 100644 --- a/ydb/core/kqp/workload_service/actors/scheme_actors.cpp +++ b/ydb/core/kqp/workload_service/actors/scheme_actors.cpp @@ -118,7 +118,7 @@ class TPoolResolverActor : public TActorBootstrapped { class TPoolFetcherActor : public TSchemeActorBase { public: - TPoolFetcherActor(const NActors::TActorId& replyActorId, const TString& database, const TString& poolId, TIntrusiveConstPtr userToken, bool enableOnServerless) + TPoolFetcherActor(const TActorId& replyActorId, const TString& database, const TString& poolId, TIntrusiveConstPtr userToken, bool enableOnServerless) : ReplyActorId(replyActorId) , Database(database) , PoolId(poolId) @@ -294,19 +294,16 @@ class TPoolCreatorActor : public TSchemeActorBase { return; } - PipeClientClosedByUs = true; - SchemePipeActorId = {}; - NTabletPipe::CloseClient(SelfId(), SchemePipeActorId); - + ClosePipeClient(); ScheduleRetry(TStringBuilder() << "Tablet to pipe not connected: " << NKikimrProto::EReplyStatus_Name(ev->Get()->Status)); } - void HandleClientDestroyed() { - SchemePipeActorId = {}; - if (!PipeClientClosedByUs) { + void Handle(TEvTabletPipe::TEvClientDestroyed::TPtr& ev) { + const TActorId clientId = ev->Get()->ClientId; + if (!ClosedSchemePipeActors.contains(clientId)) { + ClosePipeClient(); ScheduleRetry("Tablet to pipe destroyed"); } - PipeClientClosedByUs = false; } void HandleNotifyTxCompletionResult() { @@ -317,7 +314,7 @@ class TPoolCreatorActor : public TSchemeActorBase { switch (ev->GetTypeRewrite()) { hFunc(TEvTxUserProxy::TEvProposeTransactionStatus, Handle) hFunc(TEvTabletPipe::TEvClientConnected, Handle) - sFunc(TEvTabletPipe::TEvClientDestroyed, HandleClientDestroyed) + hFunc(TEvTabletPipe::TEvClientDestroyed, Handle) sFunc(NSchemeShard::TEvSchemeShard::TEvNotifyTxCompletionResult, HandleNotifyTxCompletionResult) IgnoreFunc(NSchemeShard::TEvSchemeShard::TEvNotifyTxCompletionRegistered) @@ -358,11 +355,10 @@ class TPoolCreatorActor : public TSchemeActorBase { void SubscribeOnTransactionOrRetry(NTxProxy::TResultStatus::EStatus status, const NKikimrTxUserProxy::TEvProposeTransactionStatus& response) { const ui64 txId = status == NTxProxy::TResultStatus::ExecInProgress ? response.GetTxId() : response.GetPathCreateTxId(); if (txId == 0) { - ScheduleRetry(response, "Unable to subscribe to concurrent transaction"); + ScheduleRetry(response, "Unable to subscribe to concurrent transaction", true); return; } - PipeClientClosedByUs = false; SchemePipeActorId = Register(NTabletPipe::CreateClient(SelfId(), response.GetSchemeShardTabletId())); auto request = MakeHolder(); @@ -371,11 +367,16 @@ class TPoolCreatorActor : public TSchemeActorBase { LOG_D("Subscribe on create pool tx: " << txId); } - void ScheduleRetry(const NKikimrTxUserProxy::TEvProposeTransactionStatus& response, const TString& message, bool longDelay = false) { - if (SchemePipeActorId){ - PipeClientClosedByUs = true; + void ClosePipeClient() { + if (SchemePipeActorId) { + ClosedSchemePipeActors.insert(SchemePipeActorId); + SchemePipeActorId = {}; NTabletPipe::CloseClient(SelfId(), SchemePipeActorId); } + } + + void ScheduleRetry(const NKikimrTxUserProxy::TEvProposeTransactionStatus& response, const TString& message, bool longDelay = false) { + ClosePipeClient(); auto ssStatus = static_cast(response.GetSchemeShardStatus()); if (!TBase::ScheduleRetry(ExtractIssues(response, TStringBuilder() << message << ", status: " << ssStatus), longDelay)) { @@ -384,11 +385,7 @@ class TPoolCreatorActor : public TSchemeActorBase { } void ScheduleRetry(const TString& message, bool longDelay = false) { - if (SchemePipeActorId){ - PipeClientClosedByUs = true; - NTabletPipe::CloseClient(SelfId(), SchemePipeActorId); - } - + ClosePipeClient(); if (!TBase::ScheduleRetry(message, longDelay)) { Reply(Ydb::StatusIds::UNAVAILABLE, TStringBuilder() << "Retry limit exceeded on error: " << message); } @@ -423,9 +420,7 @@ class TPoolCreatorActor : public TSchemeActorBase { LOG_W("Failed to create pool, " << status << ", issues: " << issues.ToOneLineString()); } - if (SchemePipeActorId) { - NTabletPipe::CloseClient(SelfId(), SchemePipeActorId); - } + ClosePipeClient(); Issues.AddIssues(std::move(issues)); Send(ReplyActorId, new TEvPrivate::TEvCreatePoolResponse(status, std::move(Issues))); @@ -446,8 +441,8 @@ class TPoolCreatorActor : public TSchemeActorBase { const NACLibProto::TDiffACL DiffAcl; NResourcePool::TPoolSettings PoolConfig; - NActors::TActorId SchemePipeActorId; - bool PipeClientClosedByUs = false; + std::unordered_set ClosedSchemePipeActors; + TActorId SchemePipeActorId; }; } // anonymous namespace From 432b1c7cc244de1d051e3701d0063049a720fbe8 Mon Sep 17 00:00:00 2001 From: Grigoriy Pisarenko Date: Tue, 23 Jul 2024 13:05:10 +0000 Subject: [PATCH 3/6] Fixed log entry for TEvNotifyTxCompletionResult --- ydb/core/kqp/workload_service/actors/scheme_actors.cpp | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/ydb/core/kqp/workload_service/actors/scheme_actors.cpp b/ydb/core/kqp/workload_service/actors/scheme_actors.cpp index 5676c7182df4..862a8b3cb2fc 100644 --- a/ydb/core/kqp/workload_service/actors/scheme_actors.cpp +++ b/ydb/core/kqp/workload_service/actors/scheme_actors.cpp @@ -306,8 +306,8 @@ class TPoolCreatorActor : public TSchemeActorBase { } } - void HandleNotifyTxCompletionResult() { - ScheduleRetry("Transaction completed, doublechecking"); + void Handle(NSchemeShard::TEvSchemeShard::TEvNotifyTxCompletionResult::TPtr& ev) { + ScheduleRetry(TStringBuilder() << "Transaction " << ev->Get()->Record.GetTxId() << " completed, doublechecking"); } STFUNC(StateFunc) { @@ -315,7 +315,7 @@ class TPoolCreatorActor : public TSchemeActorBase { hFunc(TEvTxUserProxy::TEvProposeTransactionStatus, Handle) hFunc(TEvTabletPipe::TEvClientConnected, Handle) hFunc(TEvTabletPipe::TEvClientDestroyed, Handle) - sFunc(NSchemeShard::TEvSchemeShard::TEvNotifyTxCompletionResult, HandleNotifyTxCompletionResult) + hFunc(NSchemeShard::TEvSchemeShard::TEvNotifyTxCompletionResult, Handle) IgnoreFunc(NSchemeShard::TEvSchemeShard::TEvNotifyTxCompletionRegistered) default: From d58adb2eff2100a26a00386b9193dd7bfafce530 Mon Sep 17 00:00:00 2001 From: Grigoriy Pisarenko Date: Tue, 23 Jul 2024 13:07:56 +0000 Subject: [PATCH 4/6] Fixed SchemePipeActorId cleanup --- ydb/core/kqp/workload_service/actors/scheme_actors.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ydb/core/kqp/workload_service/actors/scheme_actors.cpp b/ydb/core/kqp/workload_service/actors/scheme_actors.cpp index 862a8b3cb2fc..d464785a0baf 100644 --- a/ydb/core/kqp/workload_service/actors/scheme_actors.cpp +++ b/ydb/core/kqp/workload_service/actors/scheme_actors.cpp @@ -370,8 +370,8 @@ class TPoolCreatorActor : public TSchemeActorBase { void ClosePipeClient() { if (SchemePipeActorId) { ClosedSchemePipeActors.insert(SchemePipeActorId); - SchemePipeActorId = {}; NTabletPipe::CloseClient(SelfId(), SchemePipeActorId); + SchemePipeActorId = {}; } } From 13b048ed0dac7e6d55bcca9eed9f50daa33d10b4 Mon Sep 17 00:00:00 2001 From: Grigoriy Pisarenko Date: Tue, 23 Jul 2024 13:33:37 +0000 Subject: [PATCH 5/6] Fixed kqprun HasResults flag --- ydb/core/kqp/workload_service/common/helpers.h | 1 - ydb/tests/tools/kqprun/kqprun.cpp | 8 ++++++-- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/ydb/core/kqp/workload_service/common/helpers.h b/ydb/core/kqp/workload_service/common/helpers.h index e85afd7627f1..163b2d765ed1 100644 --- a/ydb/core/kqp/workload_service/common/helpers.h +++ b/ydb/core/kqp/workload_service/common/helpers.h @@ -25,7 +25,6 @@ namespace NKikimr::NKqp::NWorkload { #define LOG_C(stream) LOG_CRIT_S(*TlsActivationContext, NKikimrServices::KQP_WORKLOAD_SERVICE, "[WorkloadService] " << LogPrefix() << stream) - template class TSchemeActorBase : public NActors::TActorBootstrapped { using TRetryPolicy = IRetryPolicy; diff --git a/ydb/tests/tools/kqprun/kqprun.cpp b/ydb/tests/tools/kqprun/kqprun.cpp index 1cedaca54534..fe30413d12d5 100644 --- a/ydb/tests/tools/kqprun/kqprun.cpp +++ b/ydb/tests/tools/kqprun/kqprun.cpp @@ -51,7 +51,7 @@ struct TExecutionOptions { } for (size_t i = 0; i < ExecutionCases.size(); ++i) { - if (i < ScriptQueryActions.size() && ScriptQueryActions[i] != NKikimrKqp::EQueryAction::QUERY_ACTION_EXECUTE) { + if (GetScriptQueryAction(i) != NKikimrKqp::EQueryAction::QUERY_ACTION_EXECUTE) { continue; } if (ExecutionCases[i] != EExecutionCase::AsyncQuery) { @@ -65,6 +65,10 @@ struct TExecutionOptions { return GetValue(index, ExecutionCases, EExecutionCase::GenericScript); } + NKikimrKqp::EQueryAction GetScriptQueryAction(size_t index) const { + return GetValue(index, ScriptQueryActions, NKikimrKqp::EQueryAction::QUERY_ACTION_EXECUTE); + } + NKqpRun::TRequestOptions GetSchemeQueryOptions() const { return { .Query = SchemeQuery, @@ -79,7 +83,7 @@ struct TExecutionOptions { Y_ABORT_UNLESS(index < ScriptQueries.size()); return { .Query = ScriptQueries[index], - .Action = GetValue(index, ScriptQueryActions, NKikimrKqp::EQueryAction::QUERY_ACTION_EXECUTE), + .Action = GetScriptQueryAction(index), .TraceId = TStringBuilder() << GetValue(index, TraceIds, DefaultTraceId) << "-" << startTime.ToString(), .PoolId = GetValue(index, PoolIds, TString()), .UserSID = GetValue(index, UserSIDs, TString(BUILTIN_ACL_ROOT)) From 3572906d0807010ae630f189c6f1430cf81022b2 Mon Sep 17 00:00:00 2001 From: Grigoriy Pisarenko Date: Tue, 23 Jul 2024 16:46:16 +0000 Subject: [PATCH 6/6] Removed token changes --- .../workload_service/actors/scheme_actors.cpp | 8 +++++- .../workload_service/kqp_workload_service.cpp | 26 ------------------- .../ut/kqp_workload_service_actors_ut.cpp | 3 ++- 3 files changed, 9 insertions(+), 28 deletions(-) diff --git a/ydb/core/kqp/workload_service/actors/scheme_actors.cpp b/ydb/core/kqp/workload_service/actors/scheme_actors.cpp index d464785a0baf..55b2cd3085d7 100644 --- a/ydb/core/kqp/workload_service/actors/scheme_actors.cpp +++ b/ydb/core/kqp/workload_service/actors/scheme_actors.cpp @@ -66,7 +66,13 @@ class TPoolResolverActor : public TActorBootstrapped { for (const TString& usedSid : AppData()->AdministrationAllowedSIDs) { diffAcl.AddAccess(NACLib::EAccessType::Allow, NACLib::EAccessRights::GenericFull, usedSid); } - diffAcl.AddAccess(NACLib::EAccessType::Allow, NACLib::EAccessRights::SelectRow | NACLib::EAccessRights::DescribeSchema, AppData()->AllAuthenticatedUsers); + + auto useAccess = NACLib::EAccessRights::SelectRow | NACLib::EAccessRights::DescribeSchema; + for (const auto& userSID : AppData()->DefaultUserSIDs) { + diffAcl.AddAccess(NACLib::EAccessType::Allow, useAccess, userSID); + } + diffAcl.AddAccess(NACLib::EAccessType::Allow, useAccess, AppData()->AllAuthenticatedUsers); + diffAcl.AddAccess(NACLib::EAccessType::Allow, useAccess, BUILTIN_ACL_ROOT); auto token = MakeIntrusive(BUILTIN_ACL_METADATA, TVector{}); Register(CreatePoolCreatorActor(SelfId(), Event->Get()->Database, Event->Get()->PoolId, NResourcePool::TPoolSettings(), token, diffAcl)); diff --git a/ydb/core/kqp/workload_service/kqp_workload_service.cpp b/ydb/core/kqp/workload_service/kqp_workload_service.cpp index 6c411f8c6e75..66a6aaaaf64b 100644 --- a/ydb/core/kqp/workload_service/kqp_workload_service.cpp +++ b/ydb/core/kqp/workload_service/kqp_workload_service.cpp @@ -132,9 +132,6 @@ class TKqpWorkloadService : public TActorBootstrapped { return; } - // Add AllAuthenticatedUsers group SID into user token - ev->Get()->UserToken = GetUserToken(ev->Get()->UserToken); - LOG_D("Recieved new request from " << workerActorId << ", Database: " << ev->Get()->Database << ", PoolId: " << ev->Get()->PoolId << ", SessionId: " << ev->Get()->SessionId); bool hasDefaultPool = DatabasesWithDefaultPool.contains(CanonizePath(ev->Get()->Database)); Register(CreatePoolResolverActor(std::move(ev), hasDefaultPool, EnabledResourcePoolsOnServerless)); @@ -475,29 +472,6 @@ class TKqpWorkloadService : public TActorBootstrapped { Send(replyActorId, new TEvCleanupResponse(status, {NYql::TIssue(message)})); } - static TIntrusivePtr GetUserToken(TIntrusiveConstPtr userToken) { - auto token = MakeIntrusive(userToken ? userToken->GetUserSID() : NACLib::TSID(), TVector{}); - - bool hasAllAuthenticatedUsersSID = false; - const auto& allAuthenticatedUsersSID = AppData()->AllAuthenticatedUsers; - if (userToken) { - for (const auto& groupSID : userToken->GetGroupSIDs()) { - token->AddGroupSID(groupSID); - hasAllAuthenticatedUsersSID = hasAllAuthenticatedUsersSID || groupSID == allAuthenticatedUsersSID; - } - } - - if (!hasAllAuthenticatedUsersSID) { - token->AddGroupSID(allAuthenticatedUsersSID); - } - - if (userToken && !userToken->GetSerializedToken().empty()) { - token->SaveSerializationInfo(); - } - - return token; - } - TPoolState* GetPoolState(const TString& database, const TString& poolId) { return GetPoolState(GetPoolKey(database, poolId)); } diff --git a/ydb/core/kqp/workload_service/ut/kqp_workload_service_actors_ut.cpp b/ydb/core/kqp/workload_service/ut/kqp_workload_service_actors_ut.cpp index d044e4982929..271d7accbbfd 100644 --- a/ydb/core/kqp/workload_service/ut/kqp_workload_service_actors_ut.cpp +++ b/ydb/core/kqp/workload_service/ut/kqp_workload_service_actors_ut.cpp @@ -110,7 +110,8 @@ Y_UNIT_TEST_SUITE(KqpWorkloadServiceActors) { // Check default pool access TSampleQueries::TSelect42::CheckResult(ydb->ExecuteQuery(TSampleQueries::TSelect42::Query, settings.UserSID(userSID))); - TSampleQueries::TSelect42::CheckResult(ydb->ExecuteQuery(TSampleQueries::TSelect42::Query, settings.UserSID(""))); + TSampleQueries::TSelect42::CheckResult(ydb->ExecuteQuery(TSampleQueries::TSelect42::Query, settings.UserSID(ydb->GetRuntime()->GetAppData().AllAuthenticatedUsers))); + TSampleQueries::TSelect42::CheckResult(ydb->ExecuteQuery(TSampleQueries::TSelect42::Query, settings.UserSID(BUILTIN_ACL_ROOT))); } Y_UNIT_TEST(TestDefaultPoolAdminPermissions) {