diff --git a/ydb/core/kqp/workload_service/actors/scheme_actors.cpp b/ydb/core/kqp/workload_service/actors/scheme_actors.cpp index 8b5dd686eefd..55b2cd3085d7 100644 --- a/ydb/core/kqp/workload_service/actors/scheme_actors.cpp +++ b/ydb/core/kqp/workload_service/actors/scheme_actors.cpp @@ -1,11 +1,13 @@ #include "actors.h" #include +#include #include #include #include +#include #include #include @@ -64,7 +66,13 @@ class TPoolResolverActor : public TActorBootstrapped { for (const TString& usedSid : AppData()->AdministrationAllowedSIDs) { diffAcl.AddAccess(NACLib::EAccessType::Allow, NACLib::EAccessRights::GenericFull, usedSid); } - diffAcl.AddAccess(NACLib::EAccessType::Allow, NACLib::EAccessRights::SelectRow | NACLib::EAccessRights::DescribeSchema, AppData()->AllAuthenticatedUsers); + + auto useAccess = NACLib::EAccessRights::SelectRow | NACLib::EAccessRights::DescribeSchema; + for (const auto& userSID : AppData()->DefaultUserSIDs) { + diffAcl.AddAccess(NACLib::EAccessType::Allow, useAccess, userSID); + } + diffAcl.AddAccess(NACLib::EAccessType::Allow, useAccess, AppData()->AllAuthenticatedUsers); + diffAcl.AddAccess(NACLib::EAccessType::Allow, useAccess, BUILTIN_ACL_ROOT); auto token = MakeIntrusive(BUILTIN_ACL_METADATA, TVector{}); Register(CreatePoolCreatorActor(SelfId(), Event->Get()->Database, Event->Get()->PoolId, NResourcePool::TPoolSettings(), token, diffAcl)); @@ -116,7 +124,7 @@ class TPoolResolverActor : public TActorBootstrapped { class TPoolFetcherActor : public TSchemeActorBase { public: - TPoolFetcherActor(const NActors::TActorId& replyActorId, const TString& database, const TString& poolId, TIntrusiveConstPtr userToken, bool enableOnServerless) + TPoolFetcherActor(const TActorId& replyActorId, const TString& database, const TString& poolId, TIntrusiveConstPtr userToken, bool enableOnServerless) : ReplyActorId(replyActorId) , Database(database) , PoolId(poolId) @@ -255,38 +263,67 @@ class TPoolCreatorActor : public TSchemeActorBase { } void Handle(TEvTxUserProxy::TEvProposeTransactionStatus::TPtr& ev) { - const auto ssStatus = ev->Get()->Record.GetSchemeShardStatus(); - switch (ev->Get()->Status()) { + const auto& response = ev->Get()->Record; + const auto ssStatus = response.GetSchemeShardStatus(); + const auto status = ev->Get()->Status(); + switch (status) { case NTxProxy::TResultStatus::ExecComplete: case NTxProxy::TResultStatus::ExecAlready: if (ssStatus == NKikimrScheme::EStatus::StatusSuccess || ssStatus == NKikimrScheme::EStatus::StatusAlreadyExists) { Reply(Ydb::StatusIds::SUCCESS); } else { - Reply(Ydb::StatusIds::SCHEME_ERROR, TStringBuilder() << "Invalid creation status: " << static_cast(ssStatus)); + Reply(Ydb::StatusIds::SCHEME_ERROR, ExtractIssues(response, TStringBuilder() << "Invalid creation status: " << static_cast(ssStatus))); } return; case NTxProxy::TResultStatus::ExecError: - if (ssStatus == NKikimrScheme::EStatus::StatusMultipleModifications || ssStatus == NKikimrScheme::EStatus::StatusInvalidParameter) { - ScheduleRetry(ssStatus, "Retry execution error", true); + if (ssStatus == NKikimrScheme::EStatus::StatusMultipleModifications) { + SubscribeOnTransactionOrRetry(status, response); } else { - Reply(Ydb::StatusIds::SCHEME_ERROR, TStringBuilder() << "Execution error: " << static_cast(ssStatus)); + Reply(Ydb::StatusIds::SCHEME_ERROR, ExtractIssues(response, TStringBuilder() << "Execution error: " << static_cast(ssStatus))); } return; case NTxProxy::TResultStatus::ExecInProgress: - ScheduleRetry(ssStatus, "Retry execution in progress error", true); + SubscribeOnTransactionOrRetry(status, response); return; case NTxProxy::TResultStatus::ProxyShardNotAvailable: - ScheduleRetry(ssStatus, "Retry shard unavailable error"); + ScheduleRetry(response, "Retry shard unavailable error"); return; default: - Reply(Ydb::StatusIds::SCHEME_ERROR, TStringBuilder() << "Failed to create resource pool: " << static_cast(ssStatus)); + Reply(Ydb::StatusIds::SCHEME_ERROR, ExtractIssues(response, TStringBuilder() << "Failed to create resource pool: " << static_cast(ssStatus))); return; } } + void Handle(TEvTabletPipe::TEvClientConnected::TPtr& ev) { + if (ev->Get()->Status == NKikimrProto::OK) { + LOG_T("Tablet to pipe successfully connected"); + return; + } + + ClosePipeClient(); + ScheduleRetry(TStringBuilder() << "Tablet to pipe not connected: " << NKikimrProto::EReplyStatus_Name(ev->Get()->Status)); + } + + void Handle(TEvTabletPipe::TEvClientDestroyed::TPtr& ev) { + const TActorId clientId = ev->Get()->ClientId; + if (!ClosedSchemePipeActors.contains(clientId)) { + ClosePipeClient(); + ScheduleRetry("Tablet to pipe destroyed"); + } + } + + void Handle(NSchemeShard::TEvSchemeShard::TEvNotifyTxCompletionResult::TPtr& ev) { + ScheduleRetry(TStringBuilder() << "Transaction " << ev->Get()->Record.GetTxId() << " completed, doublechecking"); + } + STFUNC(StateFunc) { switch (ev->GetTypeRewrite()) { hFunc(TEvTxUserProxy::TEvProposeTransactionStatus, Handle) + hFunc(TEvTabletPipe::TEvClientConnected, Handle) + hFunc(TEvTabletPipe::TEvClientDestroyed, Handle) + hFunc(NSchemeShard::TEvSchemeShard::TEvNotifyTxCompletionResult, Handle) + IgnoreFunc(NSchemeShard::TEvSchemeShard::TEvNotifyTxCompletionRegistered) + default: StateFuncBase(ev); } @@ -301,13 +338,12 @@ class TPoolCreatorActor : public TSchemeActorBase { schemeTx.SetWorkingDir(JoinPath({Database, ".resource_pools"})); schemeTx.SetOperationType(NKikimrSchemeOp::ESchemeOpCreateResourcePool); schemeTx.SetInternal(true); - schemeTx.SetAllowAccessToPrivatePaths(true); BuildCreatePoolRequest(*schemeTx.MutableCreateResourcePool()); BuildModifyAclRequest(*schemeTx.MutableModifyACL()); if (UserToken) { - event->Record.SetUserToken(UserToken->GetSerializedToken()); + event->Record.SetUserToken(UserToken->SerializeAsString()); } Send(MakeTxProxyID(), std::move(event)); @@ -322,10 +358,42 @@ class TPoolCreatorActor : public TSchemeActorBase { } private: - void ScheduleRetry(ui32 status, const TString& message, bool longDelay = false) { - auto ssStatus = static_cast(status); - if (!TBase::ScheduleRetry(TStringBuilder() << message << ", status: " << ssStatus, longDelay)) { - Reply(Ydb::StatusIds::UNAVAILABLE, TStringBuilder() << "Retry limit exceeded on status: " << ssStatus); + void SubscribeOnTransactionOrRetry(NTxProxy::TResultStatus::EStatus status, const NKikimrTxUserProxy::TEvProposeTransactionStatus& response) { + const ui64 txId = status == NTxProxy::TResultStatus::ExecInProgress ? response.GetTxId() : response.GetPathCreateTxId(); + if (txId == 0) { + ScheduleRetry(response, "Unable to subscribe to concurrent transaction", true); + return; + } + + SchemePipeActorId = Register(NTabletPipe::CreateClient(SelfId(), response.GetSchemeShardTabletId())); + + auto request = MakeHolder(); + request->Record.SetTxId(txId); + NTabletPipe::SendData(SelfId(), SchemePipeActorId, std::move(request)); + LOG_D("Subscribe on create pool tx: " << txId); + } + + void ClosePipeClient() { + if (SchemePipeActorId) { + ClosedSchemePipeActors.insert(SchemePipeActorId); + NTabletPipe::CloseClient(SelfId(), SchemePipeActorId); + SchemePipeActorId = {}; + } + } + + void ScheduleRetry(const NKikimrTxUserProxy::TEvProposeTransactionStatus& response, const TString& message, bool longDelay = false) { + ClosePipeClient(); + + auto ssStatus = static_cast(response.GetSchemeShardStatus()); + if (!TBase::ScheduleRetry(ExtractIssues(response, TStringBuilder() << message << ", status: " << ssStatus), longDelay)) { + Reply(Ydb::StatusIds::UNAVAILABLE, ExtractIssues(response, TStringBuilder() << "Retry limit exceeded on status: " << ssStatus)); + } + } + + void ScheduleRetry(const TString& message, bool longDelay = false) { + ClosePipeClient(); + if (!TBase::ScheduleRetry(message, longDelay)) { + Reply(Ydb::StatusIds::UNAVAILABLE, TStringBuilder() << "Retry limit exceeded on error: " << message); } } @@ -358,11 +426,19 @@ class TPoolCreatorActor : public TSchemeActorBase { LOG_W("Failed to create pool, " << status << ", issues: " << issues.ToOneLineString()); } + ClosePipeClient(); + Issues.AddIssues(std::move(issues)); Send(ReplyActorId, new TEvPrivate::TEvCreatePoolResponse(status, std::move(Issues))); PassAway(); } + static NYql::TIssues ExtractIssues(const NKikimrTxUserProxy::TEvProposeTransactionStatus& response, const TString& message) { + NYql::TIssues issues; + NYql::IssuesFromMessage(response.GetIssues(), issues); + return GroupIssues(issues, message); + } + private: const TActorId ReplyActorId; const TString Database; @@ -370,6 +446,9 @@ class TPoolCreatorActor : public TSchemeActorBase { const TIntrusiveConstPtr UserToken; const NACLibProto::TDiffACL DiffAcl; NResourcePool::TPoolSettings PoolConfig; + + std::unordered_set ClosedSchemePipeActors; + TActorId SchemePipeActorId; }; } // anonymous namespace diff --git a/ydb/core/kqp/workload_service/common/helpers.h b/ydb/core/kqp/workload_service/common/helpers.h index edb489bc3fc6..163b2d765ed1 100644 --- a/ydb/core/kqp/workload_service/common/helpers.h +++ b/ydb/core/kqp/workload_service/common/helpers.h @@ -62,21 +62,25 @@ class TSchemeActorBase : public NActors::TActorBootstrapped { virtual TString LogPrefix() const = 0; protected: - bool ScheduleRetry(const TString& message, bool longDelay = false) { + bool ScheduleRetry(NYql::TIssues issues, bool longDelay = false) { if (!RetryState) { RetryState = CreateRetryState(); } if (const auto delay = RetryState->GetNextRetryDelay(longDelay)) { - Issues.AddIssue(message); + Issues.AddIssues(issues); this->Schedule(*delay, new TEvents::TEvWakeup()); - LOG_W("Scheduled retry for error: " << message); + LOG_W("Scheduled retry for error: " << issues.ToOneLineString()); return true; } return false; } + bool ScheduleRetry(const TString& message, bool longDelay = false) { + return ScheduleRetry({NYql::TIssue(message)}, longDelay); + } + private: static TRetryPolicy::IRetryState::TPtr CreateRetryState() { return TRetryPolicy::GetFixedIntervalPolicy( diff --git a/ydb/core/kqp/workload_service/kqp_workload_service.cpp b/ydb/core/kqp/workload_service/kqp_workload_service.cpp index bf067d74a990..66a6aaaaf64b 100644 --- a/ydb/core/kqp/workload_service/kqp_workload_service.cpp +++ b/ydb/core/kqp/workload_service/kqp_workload_service.cpp @@ -132,9 +132,6 @@ class TKqpWorkloadService : public TActorBootstrapped { return; } - // Add AllAuthenticatedUsers group SID into user token - ev->Get()->UserToken = GetUserToken(ev->Get()->UserToken); - LOG_D("Recieved new request from " << workerActorId << ", Database: " << ev->Get()->Database << ", PoolId: " << ev->Get()->PoolId << ", SessionId: " << ev->Get()->SessionId); bool hasDefaultPool = DatabasesWithDefaultPool.contains(CanonizePath(ev->Get()->Database)); Register(CreatePoolResolverActor(std::move(ev), hasDefaultPool, EnabledResourcePoolsOnServerless)); @@ -475,25 +472,6 @@ class TKqpWorkloadService : public TActorBootstrapped { Send(replyActorId, new TEvCleanupResponse(status, {NYql::TIssue(message)})); } - static TIntrusivePtr GetUserToken(TIntrusiveConstPtr userToken) { - auto token = MakeIntrusive(userToken ? userToken->GetUserSID() : NACLib::TSID(), TVector{}); - - bool hasAllAuthenticatedUsersSID = false; - const auto& allAuthenticatedUsersSID = AppData()->AllAuthenticatedUsers; - if (userToken) { - for (const auto& groupSID : userToken->GetGroupSIDs()) { - token->AddGroupSID(groupSID); - hasAllAuthenticatedUsersSID = hasAllAuthenticatedUsersSID || groupSID == allAuthenticatedUsersSID; - } - } - - if (!hasAllAuthenticatedUsersSID) { - token->AddGroupSID(allAuthenticatedUsersSID); - } - - return token; - } - TPoolState* GetPoolState(const TString& database, const TString& poolId) { return GetPoolState(GetPoolKey(database, poolId)); } diff --git a/ydb/core/kqp/workload_service/ut/kqp_workload_service_actors_ut.cpp b/ydb/core/kqp/workload_service/ut/kqp_workload_service_actors_ut.cpp index 2c259f9f601a..271d7accbbfd 100644 --- a/ydb/core/kqp/workload_service/ut/kqp_workload_service_actors_ut.cpp +++ b/ydb/core/kqp/workload_service/ut/kqp_workload_service_actors_ut.cpp @@ -16,7 +16,9 @@ TEvPrivate::TEvFetchPoolResponse::TPtr FetchPool(TIntrusivePtr ydb, c auto runtime = ydb->GetRuntime(); const auto& edgeActor = runtime->AllocateEdgeActor(); - runtime->Register(CreatePoolFetcherActor(edgeActor, settings.DomainName_, poolId ? poolId : settings.PoolId_, MakeIntrusive(userSID, TVector{}), true)); + auto userToken = MakeIntrusive(userSID, TVector{}); + userToken->SaveSerializationInfo(); + runtime->Register(CreatePoolFetcherActor(edgeActor, settings.DomainName_, poolId ? poolId : settings.PoolId_, userToken, true)); return runtime->GrabEdgeEvent(edgeActor, FUTURE_WAIT_TIMEOUT); } @@ -108,7 +110,8 @@ Y_UNIT_TEST_SUITE(KqpWorkloadServiceActors) { // Check default pool access TSampleQueries::TSelect42::CheckResult(ydb->ExecuteQuery(TSampleQueries::TSelect42::Query, settings.UserSID(userSID))); - TSampleQueries::TSelect42::CheckResult(ydb->ExecuteQuery(TSampleQueries::TSelect42::Query, settings.UserSID(""))); + TSampleQueries::TSelect42::CheckResult(ydb->ExecuteQuery(TSampleQueries::TSelect42::Query, settings.UserSID(ydb->GetRuntime()->GetAppData().AllAuthenticatedUsers))); + TSampleQueries::TSelect42::CheckResult(ydb->ExecuteQuery(TSampleQueries::TSelect42::Query, settings.UserSID(BUILTIN_ACL_ROOT))); } Y_UNIT_TEST(TestDefaultPoolAdminPermissions) { diff --git a/ydb/library/table_creator/table_creator.cpp b/ydb/library/table_creator/table_creator.cpp index 38dc0a914cb2..769946ce258a 100644 --- a/ydb/library/table_creator/table_creator.cpp +++ b/ydb/library/table_creator/table_creator.cpp @@ -392,7 +392,9 @@ THolder BuildSchemeCacheNavigateRequest(cons auto request = MakeHolder(); auto databasePath = SplitPath(database); request->DatabaseName = CanonizePath(databasePath); - request->UserToken = userToken; + if (userToken && !userToken->GetSerializedToken().empty()) { + request->UserToken = userToken; + } for (const auto& pathComponents : pathsComponents) { auto& entry = request->ResultSet.emplace_back(); diff --git a/ydb/tests/tools/kqprun/kqprun.cpp b/ydb/tests/tools/kqprun/kqprun.cpp index 3793c38ef937..fe30413d12d5 100644 --- a/ydb/tests/tools/kqprun/kqprun.cpp +++ b/ydb/tests/tools/kqprun/kqprun.cpp @@ -13,6 +13,7 @@ #include +#include #include #include #include @@ -37,17 +38,23 @@ struct TExecutionOptions { bool ForgetExecution = false; std::vector ExecutionCases; - NKikimrKqp::EQueryAction ScriptQueryAction = NKikimrKqp::QUERY_ACTION_EXECUTE; + std::vector ScriptQueryActions; + std::vector TraceIds; + std::vector PoolIds; + std::vector UserSIDs; - const TString TraceId = "kqprun_" + CreateGuidAsString(); + const TString DefaultTraceId = "kqprun"; bool HasResults() const { - if (ScriptQueries.empty() || ScriptQueryAction != NKikimrKqp::QUERY_ACTION_EXECUTE) { + if (ScriptQueries.empty()) { return false; } - for (EExecutionCase executionCase : ExecutionCases) { - if (executionCase != EExecutionCase::AsyncQuery) { + for (size_t i = 0; i < ExecutionCases.size(); ++i) { + if (GetScriptQueryAction(i) != NKikimrKqp::EQueryAction::QUERY_ACTION_EXECUTE) { + continue; + } + if (ExecutionCases[i] != EExecutionCase::AsyncQuery) { return true; } } @@ -55,8 +62,41 @@ struct TExecutionOptions { } EExecutionCase GetExecutionCase(size_t index) const { - Y_ABORT_UNLESS(!ExecutionCases.empty()); - return ExecutionCases[std::min(index, ExecutionCases.size() - 1)]; + return GetValue(index, ExecutionCases, EExecutionCase::GenericScript); + } + + NKikimrKqp::EQueryAction GetScriptQueryAction(size_t index) const { + return GetValue(index, ScriptQueryActions, NKikimrKqp::EQueryAction::QUERY_ACTION_EXECUTE); + } + + NKqpRun::TRequestOptions GetSchemeQueryOptions() const { + return { + .Query = SchemeQuery, + .Action = NKikimrKqp::EQueryAction::QUERY_ACTION_EXECUTE, + .TraceId = DefaultTraceId, + .PoolId = "", + .UserSID = BUILTIN_ACL_ROOT + }; + } + + NKqpRun::TRequestOptions GetScriptQueryOptions(size_t index, TInstant startTime) const { + Y_ABORT_UNLESS(index < ScriptQueries.size()); + return { + .Query = ScriptQueries[index], + .Action = GetScriptQueryAction(index), + .TraceId = TStringBuilder() << GetValue(index, TraceIds, DefaultTraceId) << "-" << startTime.ToString(), + .PoolId = GetValue(index, PoolIds, TString()), + .UserSID = GetValue(index, UserSIDs, TString(BUILTIN_ACL_ROOT)) + }; + } + +private: + template + static TValue GetValue(size_t index, const std::vector& values, TValue defaultValue) { + if (values.empty()) { + return defaultValue; + } + return values[std::min(index, values.size() - 1)]; } }; @@ -66,7 +106,7 @@ void RunArgumentQueries(const TExecutionOptions& executionOptions, NKqpRun::TKqp if (executionOptions.SchemeQuery) { Cout << colors.Yellow() << TInstant::Now().ToIsoStringLocal() << " Executing scheme query..." << colors.Default() << Endl; - if (!runner.ExecuteSchemeQuery(executionOptions.SchemeQuery, executionOptions.TraceId)) { + if (!runner.ExecuteSchemeQuery(executionOptions.GetSchemeQueryOptions())) { ythrow yexception() << TInstant::Now().ToIsoStringLocal() << " Scheme query execution failed"; } } @@ -79,9 +119,10 @@ void RunArgumentQueries(const TExecutionOptions& executionOptions, NKqpRun::TKqp Sleep(executionOptions.LoopDelay); } + const TInstant startTime = TInstant::Now(); const auto executionCase = executionOptions.GetExecutionCase(id); if (executionCase != TExecutionOptions::EExecutionCase::AsyncQuery) { - Cout << colors.Yellow() << TInstant::Now().ToIsoStringLocal() << " Executing script"; + Cout << colors.Yellow() << startTime.ToIsoStringLocal() << " Executing script"; if (numberQueries > 1) { Cout << " " << id; } @@ -91,13 +132,11 @@ void RunArgumentQueries(const TExecutionOptions& executionOptions, NKqpRun::TKqp Cout << "..." << colors.Default() << Endl; } - TInstant startTime = TInstant::Now(); switch (executionCase) { case TExecutionOptions::EExecutionCase::GenericScript: - if (!runner.ExecuteScript(executionOptions.ScriptQueries[id], executionOptions.ScriptQueryAction, executionOptions.TraceId)) { + if (!runner.ExecuteScript(executionOptions.GetScriptQueryOptions(id, startTime))) { ythrow yexception() << TInstant::Now().ToIsoStringLocal() << " Script execution failed"; } - Cout << colors.Cyan() << "Script request finished. Time: " << TInstant::Now() - startTime << colors.Default() << Endl; Cout << colors.Yellow() << TInstant::Now().ToIsoStringLocal() << " Fetching script results..." << colors.Default() << Endl; if (!runner.FetchScriptResults()) { ythrow yexception() << TInstant::Now().ToIsoStringLocal() << " Fetch script results failed"; @@ -111,21 +150,19 @@ void RunArgumentQueries(const TExecutionOptions& executionOptions, NKqpRun::TKqp break; case TExecutionOptions::EExecutionCase::GenericQuery: - if (!runner.ExecuteQuery(executionOptions.ScriptQueries[id], executionOptions.ScriptQueryAction, executionOptions.TraceId)) { + if (!runner.ExecuteQuery(executionOptions.GetScriptQueryOptions(id, startTime))) { ythrow yexception() << TInstant::Now().ToIsoStringLocal() << " Query execution failed"; } - Cout << colors.Cyan() << "Generic request finished. Time: " << TInstant::Now() - startTime << colors.Default() << Endl; break; case TExecutionOptions::EExecutionCase::YqlScript: - if (!runner.ExecuteYqlScript(executionOptions.ScriptQueries[id], executionOptions.ScriptQueryAction, executionOptions.TraceId)) { + if (!runner.ExecuteYqlScript(executionOptions.GetScriptQueryOptions(id, startTime))) { ythrow yexception() << TInstant::Now().ToIsoStringLocal() << " Yql script execution failed"; } - Cout << colors.Cyan() << "Yql script request finished. Time: " << TInstant::Now() - startTime << colors.Default() << Endl; break; case TExecutionOptions::EExecutionCase::AsyncQuery: - runner.ExecuteQueryAsync(executionOptions.ScriptQueries[id], executionOptions.ScriptQueryAction, executionOptions.TraceId); + runner.ExecuteQueryAsync(executionOptions.GetScriptQueryOptions(id, startTime)); break; } } @@ -349,6 +386,9 @@ class TMain : public TMainClassArgs { RunnerOptions.YdbSettings.TraceOptEnabled = traceOptType != NKqpRun::TRunnerOptions::ETraceOptType::Disabled; return traceOptType; }); + options.AddLongOption("trace-id", "Trace id for -p queries") + .RequiredArgument("id") + .EmplaceTo(&ExecutionOptions.TraceIds); options.AddLongOption("result-file", "File with script execution results (use '-' to write in stdout)") .RequiredArgument("file") @@ -423,7 +463,10 @@ class TMain : public TMainClassArgs { .RequiredArgument("script-action") .DefaultValue("execute") .Choices(scriptAction.GetChoices()) - .StoreMappedResultT(&ExecutionOptions.ScriptQueryAction, scriptAction); + .Handler1([this, scriptAction](const NLastGetopt::TOptsParser* option) { + TString choice(option->CurValOrDef()); + ExecutionOptions.ScriptQueryActions.emplace_back(scriptAction(choice)); + }); options.AddLongOption('F', "forget", "Forget script execution operation after fetching results") .NoArgument() @@ -438,9 +481,13 @@ class TMain : public TMainClassArgs { .DefaultValue(1000) .StoreMappedResultT(&ExecutionOptions.LoopDelay, &TDuration::MilliSeconds); + options.AddLongOption('U', "user", "User SID for -p queries") + .RequiredArgument("user-SID") + .EmplaceTo(&ExecutionOptions.UserSIDs); + options.AddLongOption("pool", "Workload manager pool in which queries will be executed") .RequiredArgument("pool-id") - .StoreResult(&RunnerOptions.YdbSettings.DefaultPoolId); + .EmplaceTo(&ExecutionOptions.PoolIds); // Cluster settings diff --git a/ydb/tests/tools/kqprun/src/common.h b/ydb/tests/tools/kqprun/src/common.h index a16c1010f9f5..561f40e38293 100644 --- a/ydb/tests/tools/kqprun/src/common.h +++ b/ydb/tests/tools/kqprun/src/common.h @@ -1,6 +1,7 @@ #pragma once #include +#include #include #include @@ -16,7 +17,6 @@ constexpr char YQL_TOKEN_VARIABLE[] = "YQL_TOKEN"; struct TYdbSetupSettings { ui32 NodeCount = 1; TString DomainName = "Root"; - TString DefaultPoolId; TDuration InitializationTimeout = TDuration::Seconds(10); bool MonitoringEnabled = false; @@ -61,4 +61,13 @@ struct TRunnerOptions { TYdbSetupSettings YdbSettings; }; + +struct TRequestOptions { + TString Query; + NKikimrKqp::EQueryAction Action; + TString TraceId; + TString PoolId; + TString UserSID; +}; + } // namespace NKqpRun diff --git a/ydb/tests/tools/kqprun/src/kqp_runner.cpp b/ydb/tests/tools/kqprun/src/kqp_runner.cpp index 399025dcee25..b75098331a48 100644 --- a/ydb/tests/tools/kqprun/src/kqp_runner.cpp +++ b/ydb/tests/tools/kqprun/src/kqp_runner.cpp @@ -101,11 +101,11 @@ class TKqpRunner::TImpl { , CoutColors_(NColorizer::AutoColors(Cout)) {} - bool ExecuteSchemeQuery(const TString& query, const TString& traceId) const { + bool ExecuteSchemeQuery(const TRequestOptions& query) const { StartSchemeTraceOpt(); TSchemeMeta meta; - TRequestResult status = YdbSetup_.SchemeQueryRequest(query, traceId, meta); + TRequestResult status = YdbSetup_.SchemeQueryRequest(query, meta); TYdbSetup::StopTraceOpt(); PrintSchemeQueryAst(meta.Ast); @@ -118,10 +118,10 @@ class TKqpRunner::TImpl { return true; } - bool ExecuteScript(const TString& script, NKikimrKqp::EQueryAction action, const TString& traceId) { + bool ExecuteScript(const TRequestOptions& script) { StartScriptTraceOpt(); - TRequestResult status = YdbSetup_.ScriptRequest(script, action, traceId, ExecutionOperation_); + TRequestResult status = YdbSetup_.ScriptRequest(script, ExecutionOperation_); if (!status.IsSuccess()) { Cerr << CerrColors_.Red() << "Failed to start script execution, reason:" << CerrColors_.Default() << Endl << status.ToString() << Endl; @@ -131,30 +131,35 @@ class TKqpRunner::TImpl { return WaitScriptExecutionOperation(); } - bool ExecuteQuery(const TString& query, NKikimrKqp::EQueryAction action, const TString& traceId, EQueryType queryType) { + bool ExecuteQuery(const TRequestOptions& query, EQueryType queryType) { StartScriptTraceOpt(); + StartTime_ = TInstant::Now(); + TString queryTypeStr; TQueryMeta meta; TRequestResult status; switch (queryType) { case EQueryType::ScriptQuery: - status = YdbSetup_.QueryRequest(query, action, traceId, meta, ResultSets_, GetProgressCallback()); + queryTypeStr = "Generic"; + status = YdbSetup_.QueryRequest(query, meta, ResultSets_, GetProgressCallback()); break; case EQueryType::YqlScriptQuery: - status = YdbSetup_.YqlScriptRequest(query, action, traceId, meta, ResultSets_); + queryTypeStr = "Yql script"; + status = YdbSetup_.YqlScriptRequest(query, meta, ResultSets_); break; case EQueryType::AsyncQuery: - YdbSetup_.QueryRequestAsync(query, action, traceId); + YdbSetup_.QueryRequestAsync(query); return true; } TYdbSetup::StopTraceOpt(); PrintScriptAst(meta.Ast); - + PrintScriptProgress(ExecutionMeta_.Plan); PrintScriptPlan(meta.Plan); + PrintScriptFinish(meta, queryTypeStr); if (!status.IsSuccess()) { Cerr << CerrColors_.Red() << "Failed to execute query, reason:" << CerrColors_.Default() << Endl << status.ToString() << Endl; @@ -220,6 +225,7 @@ class TKqpRunner::TImpl { private: bool WaitScriptExecutionOperation() { + StartTime_ = TInstant::Now(); ExecutionMeta_ = TExecutionMeta(); TDuration getOperationPeriod = TDuration::Seconds(1); @@ -245,8 +251,8 @@ class TKqpRunner::TImpl { } PrintScriptAst(ExecutionMeta_.Ast); - PrintScriptPlan(ExecutionMeta_.Plan); + PrintScriptFinish(ExecutionMeta_, "Script"); if (!status.IsSuccess() || ExecutionMeta_.ExecutionStatus != NYdb::NQuery::EExecStatus::Completed) { Cerr << CerrColors_.Red() << "Failed to execute script, invalid final status, reason:" << CerrColors_.Default() << Endl << status.ToString() << Endl; @@ -377,6 +383,16 @@ class TKqpRunner::TImpl { } } + void PrintScriptFinish(const TQueryMeta& meta, const TString& queryType) const { + Cout << CoutColors_.Cyan() << queryType << " request finished."; + if (meta.TotalDuration) { + Cout << " Total duration: " << meta.TotalDuration; + } else { + Cout << " Estimated duration: " << TInstant::Now() - StartTime_; + } + Cout << CoutColors_.Default() << Endl; + } + private: TRunnerOptions Options_; @@ -388,6 +404,7 @@ class TKqpRunner::TImpl { TString ExecutionOperation_; TExecutionMeta ExecutionMeta_; std::vector ResultSets_; + TInstant StartTime_; }; @@ -397,24 +414,24 @@ TKqpRunner::TKqpRunner(const TRunnerOptions& options) : Impl_(new TImpl(options)) {} -bool TKqpRunner::ExecuteSchemeQuery(const TString& query, const TString& traceId) const { - return Impl_->ExecuteSchemeQuery(query, traceId); +bool TKqpRunner::ExecuteSchemeQuery(const TRequestOptions& query) const { + return Impl_->ExecuteSchemeQuery(query); } -bool TKqpRunner::ExecuteScript(const TString& script, NKikimrKqp::EQueryAction action, const TString& traceId) const { - return Impl_->ExecuteScript(script, action, traceId); +bool TKqpRunner::ExecuteScript(const TRequestOptions& script) const { + return Impl_->ExecuteScript(script); } -bool TKqpRunner::ExecuteQuery(const TString& query, NKikimrKqp::EQueryAction action, const TString& traceId) const { - return Impl_->ExecuteQuery(query, action, traceId, TImpl::EQueryType::ScriptQuery); +bool TKqpRunner::ExecuteQuery(const TRequestOptions& query) const { + return Impl_->ExecuteQuery(query, TImpl::EQueryType::ScriptQuery); } -bool TKqpRunner::ExecuteYqlScript(const TString& query, NKikimrKqp::EQueryAction action, const TString& traceId) const { - return Impl_->ExecuteQuery(query, action, traceId, TImpl::EQueryType::YqlScriptQuery); +bool TKqpRunner::ExecuteYqlScript(const TRequestOptions& query) const { + return Impl_->ExecuteQuery(query, TImpl::EQueryType::YqlScriptQuery); } -void TKqpRunner::ExecuteQueryAsync(const TString& query, NKikimrKqp::EQueryAction action, const TString& traceId) const { - Impl_->ExecuteQuery(query, action, traceId, TImpl::EQueryType::AsyncQuery); +void TKqpRunner::ExecuteQueryAsync(const TRequestOptions& query) const { + Impl_->ExecuteQuery(query, TImpl::EQueryType::AsyncQuery); } void TKqpRunner::WaitAsyncQueries() const { diff --git a/ydb/tests/tools/kqprun/src/kqp_runner.h b/ydb/tests/tools/kqprun/src/kqp_runner.h index 3687a7cbda06..83c707bbd989 100644 --- a/ydb/tests/tools/kqprun/src/kqp_runner.h +++ b/ydb/tests/tools/kqprun/src/kqp_runner.h @@ -2,7 +2,6 @@ #include "common.h" -#include namespace NKqpRun { @@ -10,15 +9,15 @@ class TKqpRunner { public: explicit TKqpRunner(const TRunnerOptions& options); - bool ExecuteSchemeQuery(const TString& query, const TString& traceId) const; + bool ExecuteSchemeQuery(const TRequestOptions& query) const; - bool ExecuteScript(const TString& script, NKikimrKqp::EQueryAction action, const TString& traceId) const; + bool ExecuteScript(const TRequestOptions& script) const; - bool ExecuteQuery(const TString& query, NKikimrKqp::EQueryAction action, const TString& traceId) const; + bool ExecuteQuery(const TRequestOptions& query) const; - bool ExecuteYqlScript(const TString& query, NKikimrKqp::EQueryAction action, const TString& traceId) const; + bool ExecuteYqlScript(const TRequestOptions& query) const; - void ExecuteQueryAsync(const TString& query, NKikimrKqp::EQueryAction action, const TString& traceId) const; + void ExecuteQueryAsync(const TRequestOptions& query) const; void WaitAsyncQueries() const; diff --git a/ydb/tests/tools/kqprun/src/ydb_setup.cpp b/ydb/tests/tools/kqprun/src/ydb_setup.cpp index b40738e806d0..9026a0fb40c0 100644 --- a/ydb/tests/tools/kqprun/src/ydb_setup.cpp +++ b/ydb/tests/tools/kqprun/src/ydb_setup.cpp @@ -60,6 +60,14 @@ class TStaticSecuredCredentialsFactory : public NYql::ISecuredServiceAccountCred TString YqlToken_; }; +void FillQueryMeta(TQueryMeta& meta, const NKikimrKqp::TQueryResponse& response) { + meta.Ast = response.GetQueryAst(); + if (const auto& plan = response.GetQueryPlan()) { + meta.Plan = plan; + } + meta.TotalDuration = TDuration::MicroSeconds(response.GetQueryStats().GetDurationUs()); +} + } // anonymous namespace @@ -207,31 +215,31 @@ class TYdbSetup::TImpl { } } - NKikimr::NKqp::TEvKqp::TEvQueryResponse::TPtr SchemeQueryRequest(const TString& query, const TString& traceId) const { + NKikimr::NKqp::TEvKqp::TEvQueryResponse::TPtr SchemeQueryRequest(const TRequestOptions& query) const { auto event = MakeHolder(); - FillQueryRequest(query, NKikimrKqp::QUERY_TYPE_SQL_DDL, NKikimrKqp::QUERY_ACTION_EXECUTE, traceId, event->Record); + FillQueryRequest(query, NKikimrKqp::QUERY_TYPE_SQL_DDL, event->Record); return RunKqpProxyRequest(std::move(event)); } - NKikimr::NKqp::TEvKqp::TEvScriptResponse::TPtr ScriptRequest(const TString& script, NKikimrKqp::EQueryAction action, const TString& traceId) const { + NKikimr::NKqp::TEvKqp::TEvScriptResponse::TPtr ScriptRequest(const TRequestOptions& script) const { auto event = MakeHolder(); - FillScriptRequest(script, action, traceId, event->Record); + FillScriptRequest(script, event->Record); return RunKqpProxyRequest(std::move(event)); } - TQueryResponse QueryRequest(const TString& query, NKikimrKqp::EQueryAction action, const TString& traceId, TProgressCallback progressCallback) const { - auto request = GetQueryRequest(query, action, traceId); + TQueryResponse QueryRequest(const TRequestOptions& query, TProgressCallback progressCallback) const { + auto request = GetQueryRequest(query); auto promise = NThreading::NewPromise(); GetRuntime()->Register(CreateRunScriptActorMock(std::move(request), promise, progressCallback)); return promise.GetFuture().GetValueSync(); } - NKikimr::NKqp::TEvKqp::TEvQueryResponse::TPtr YqlScriptRequest(const TString& query, NKikimrKqp::EQueryAction action, const TString& traceId) const { + NKikimr::NKqp::TEvKqp::TEvQueryResponse::TPtr YqlScriptRequest(const TRequestOptions& query) const { auto event = MakeHolder(); - FillQueryRequest(query, NKikimrKqp::QUERY_TYPE_SQL_SCRIPT, action, traceId, event->Record); + FillQueryRequest(query, NKikimrKqp::QUERY_TYPE_SQL_SCRIPT, event->Record); return RunKqpProxyRequest(std::move(event)); } @@ -264,12 +272,12 @@ class TYdbSetup::TImpl { return RunKqpProxyRequest(std::move(event)); } - void QueryRequestAsync(const TString& query, NKikimrKqp::EQueryAction action, const TString& traceId) { + void QueryRequestAsync(const TRequestOptions& query) { if (!AsyncQueryRunnerActorId_) { AsyncQueryRunnerActorId_ = GetRuntime()->Register(CreateAsyncQueryRunnerActor(Settings_.InFlightLimit)); } - auto request = GetQueryRequest(query, action, traceId); + auto request = GetQueryRequest(query); auto startPromise = NThreading::NewPromise(); GetRuntime()->Send(*AsyncQueryRunnerActorId_, GetRuntime()->AllocateEdgeActor(), new TEvPrivate::TEvStartAsyncQuery(std::move(request), startPromise)); @@ -316,32 +324,32 @@ class TYdbSetup::TImpl { } private: - void FillQueryRequest(const TString& query, NKikimrKqp::EQueryType type, NKikimrKqp::EQueryAction action, const TString& traceId, NKikimrKqp::TEvQueryRequest& event) const { - event.SetTraceId(traceId); - event.SetUserToken(NACLib::TUserToken(Settings_.YqlToken, BUILTIN_ACL_ROOT, {}).SerializeAsString()); + void FillQueryRequest(const TRequestOptions& query, NKikimrKqp::EQueryType type, NKikimrKqp::TEvQueryRequest& event) const { + event.SetTraceId(query.TraceId); + event.SetUserToken(NACLib::TUserToken(Settings_.YqlToken, query.UserSID, {}).SerializeAsString()); auto request = event.MutableRequest(); - request->SetQuery(query); + request->SetQuery(query.Query); request->SetType(type); - request->SetAction(action); + request->SetAction(query.Action); request->SetCollectStats(Ydb::Table::QueryStatsCollection::STATS_COLLECTION_FULL); request->SetDatabase(Settings_.DomainName); - request->SetPoolId(Settings_.DefaultPoolId); + request->SetPoolId(query.PoolId); } - void FillScriptRequest(const TString& script, NKikimrKqp::EQueryAction action, const TString& traceId, NKikimrKqp::TEvQueryRequest& event) const { - FillQueryRequest(script, NKikimrKqp::QUERY_TYPE_SQL_GENERIC_SCRIPT, action, traceId, event); + void FillScriptRequest(const TRequestOptions& script, NKikimrKqp::TEvQueryRequest& event) const { + FillQueryRequest(script, NKikimrKqp::QUERY_TYPE_SQL_GENERIC_SCRIPT, event); auto request = event.MutableRequest(); - if (action == NKikimrKqp::QUERY_ACTION_EXECUTE) { + if (script.Action == NKikimrKqp::QUERY_ACTION_EXECUTE) { request->MutableTxControl()->mutable_begin_tx()->mutable_serializable_read_write(); request->MutableTxControl()->set_commit_tx(true); } } - TQueryRequest GetQueryRequest(const TString& query, NKikimrKqp::EQueryAction action, const TString& traceId) const { + TQueryRequest GetQueryRequest(const TRequestOptions& query) const { auto event = std::make_unique(); - FillQueryRequest(query, NKikimrKqp::QUERY_TYPE_SQL_GENERIC_QUERY, action, traceId, event->Record); + FillQueryRequest(query, NKikimrKqp::QUERY_TYPE_SQL_GENERIC_QUERY, event->Record); if (auto progressStatsPeriodMs = Settings_.AppConfig.GetQueryServiceConfig().GetProgressStatsPeriodMs()) { event->SetProgressStatsPeriod(TDuration::MilliSeconds(progressStatsPeriodMs)); @@ -399,8 +407,8 @@ TYdbSetup::TYdbSetup(const TYdbSetupSettings& settings) : Impl_(new TImpl(settings)) {} -TRequestResult TYdbSetup::SchemeQueryRequest(const TString& query, const TString& traceId, TSchemeMeta& meta) const { - auto schemeQueryOperationResponse = Impl_->SchemeQueryRequest(query, traceId)->Get()->Record.GetRef(); +TRequestResult TYdbSetup::SchemeQueryRequest(const TRequestOptions& query, TSchemeMeta& meta) const { + auto schemeQueryOperationResponse = Impl_->SchemeQueryRequest(query)->Get()->Record.GetRef(); const auto& responseRecord = schemeQueryOperationResponse.GetResponse(); meta.Ast = responseRecord.GetQueryAst(); @@ -408,38 +416,34 @@ TRequestResult TYdbSetup::SchemeQueryRequest(const TString& query, const TString return TRequestResult(schemeQueryOperationResponse.GetYdbStatus(), responseRecord.GetQueryIssues()); } -TRequestResult TYdbSetup::ScriptRequest(const TString& script, NKikimrKqp::EQueryAction action, const TString& traceId, TString& operation) const { - auto scriptExecutionOperation = Impl_->ScriptRequest(script, action, traceId); +TRequestResult TYdbSetup::ScriptRequest(const TRequestOptions& script, TString& operation) const { + auto scriptExecutionOperation = Impl_->ScriptRequest(script); operation = scriptExecutionOperation->Get()->OperationId; return TRequestResult(scriptExecutionOperation->Get()->Status, scriptExecutionOperation->Get()->Issues); } -TRequestResult TYdbSetup::QueryRequest(const TString& query, NKikimrKqp::EQueryAction action, const TString& traceId, TQueryMeta& meta, std::vector& resultSets, TProgressCallback progressCallback) const { +TRequestResult TYdbSetup::QueryRequest(const TRequestOptions& query, TQueryMeta& meta, std::vector& resultSets, TProgressCallback progressCallback) const { resultSets.clear(); - TQueryResponse queryResponse = Impl_->QueryRequest(query, action, traceId, progressCallback); + TQueryResponse queryResponse = Impl_->QueryRequest(query, progressCallback); const auto& queryOperationResponse = queryResponse.Response->Get()->Record.GetRef(); const auto& responseRecord = queryOperationResponse.GetResponse(); resultSets = std::move(queryResponse.ResultSets); - meta.Ast = responseRecord.GetQueryAst(); - if (const auto& plan = responseRecord.GetQueryPlan()) { - meta.Plan = plan; - } + FillQueryMeta(meta, responseRecord); return TRequestResult(queryOperationResponse.GetYdbStatus(), responseRecord.GetQueryIssues()); } -TRequestResult TYdbSetup::YqlScriptRequest(const TString& query, NKikimrKqp::EQueryAction action, const TString& traceId, TQueryMeta& meta, std::vector& resultSets) const { +TRequestResult TYdbSetup::YqlScriptRequest(const TRequestOptions& query, TQueryMeta& meta, std::vector& resultSets) const { resultSets.clear(); - auto yqlQueryOperationResponse = Impl_->YqlScriptRequest(query, action, traceId)->Get()->Record.GetRef(); + auto yqlQueryOperationResponse = Impl_->YqlScriptRequest(query)->Get()->Record.GetRef(); const auto& responseRecord = yqlQueryOperationResponse.GetResponse(); - meta.Ast = responseRecord.GetQueryAst(); - meta.Plan = responseRecord.GetQueryPlan(); + FillQueryMeta(meta, responseRecord); resultSets.reserve(responseRecord.results_size()); for (const auto& result : responseRecord.results()) { @@ -466,6 +470,7 @@ TRequestResult TYdbSetup::GetScriptExecutionOperationRequest(const TString& oper if (deserializedMeta.exec_stats().query_plan() != "{}") { meta.Plan = deserializedMeta.exec_stats().query_plan(); } + meta.TotalDuration = TDuration::MicroSeconds(deserializedMeta.exec_stats().total_duration_us()); } return TRequestResult(scriptExecutionOperation->Get()->Status, scriptExecutionOperation->Get()->Issues); @@ -485,8 +490,8 @@ TRequestResult TYdbSetup::ForgetScriptExecutionOperationRequest(const TString& o return TRequestResult(forgetScriptExecutionOperationResponse->Get()->Status, forgetScriptExecutionOperationResponse->Get()->Issues); } -void TYdbSetup::QueryRequestAsync(const TString& query, NKikimrKqp::EQueryAction action, const TString& traceId) const { - Impl_->QueryRequestAsync(query, action, traceId); +void TYdbSetup::QueryRequestAsync(const TRequestOptions& query) const { + Impl_->QueryRequestAsync(query); } void TYdbSetup::WaitAsyncQueries() const { diff --git a/ydb/tests/tools/kqprun/src/ydb_setup.h b/ydb/tests/tools/kqprun/src/ydb_setup.h index 017ab6e18ede..67ca5c26c7b5 100644 --- a/ydb/tests/tools/kqprun/src/ydb_setup.h +++ b/ydb/tests/tools/kqprun/src/ydb_setup.h @@ -3,7 +3,6 @@ #include "common.h" #include "actors.h" -#include #include @@ -14,20 +13,18 @@ struct TSchemeMeta { }; -struct TExecutionMeta { - bool Ready = false; - NYdb::NQuery::EExecStatus ExecutionStatus = NYdb::NQuery::EExecStatus::Unspecified; - - i32 ResultSetsCount = 0; - +struct TQueryMeta { TString Ast; TString Plan; + TDuration TotalDuration; }; -struct TQueryMeta { - TString Ast; - TString Plan; +struct TExecutionMeta : public TQueryMeta { + bool Ready = false; + NYdb::NQuery::EExecStatus ExecutionStatus = NYdb::NQuery::EExecStatus::Unspecified; + + i32 ResultSetsCount = 0; }; @@ -51,13 +48,13 @@ class TYdbSetup { public: explicit TYdbSetup(const TYdbSetupSettings& settings); - TRequestResult SchemeQueryRequest(const TString& query, const TString& traceId, TSchemeMeta& meta) const; + TRequestResult SchemeQueryRequest(const TRequestOptions& query, TSchemeMeta& meta) const; - TRequestResult ScriptRequest(const TString& script, NKikimrKqp::EQueryAction action, const TString& traceId, TString& operation) const; + TRequestResult ScriptRequest(const TRequestOptions& script, TString& operation) const; - TRequestResult QueryRequest(const TString& query, NKikimrKqp::EQueryAction action, const TString& traceId, TQueryMeta& meta, std::vector& resultSets, TProgressCallback progressCallback) const; + TRequestResult QueryRequest(const TRequestOptions& query, TQueryMeta& meta, std::vector& resultSets, TProgressCallback progressCallback) const; - TRequestResult YqlScriptRequest(const TString& query, NKikimrKqp::EQueryAction action, const TString& traceId, TQueryMeta& meta, std::vector& resultSets) const; + TRequestResult YqlScriptRequest(const TRequestOptions& query, TQueryMeta& meta, std::vector& resultSets) const; TRequestResult GetScriptExecutionOperationRequest(const TString& operation, TExecutionMeta& meta) const; @@ -65,7 +62,7 @@ class TYdbSetup { TRequestResult ForgetScriptExecutionOperationRequest(const TString& operation) const; - void QueryRequestAsync(const TString& query, NKikimrKqp::EQueryAction action, const TString& traceId) const; + void QueryRequestAsync(const TRequestOptions& query) const; void WaitAsyncQueries() const;