From fe27bca429b57c78ae91690860e0ee5b0857426b Mon Sep 17 00:00:00 2001 From: Grigoriy Pisarenko Date: Mon, 23 Sep 2024 14:05:41 +0000 Subject: [PATCH 1/5] Added kqp proxy database cache --- ydb/core/kqp/common/events/events.h | 42 +++++ ydb/core/kqp/common/events/query.h | 12 ++ .../kqp/common/events/script_executions.h | 52 ++++--- ydb/core/kqp/common/events/workload_service.h | 10 -- ydb/core/kqp/common/kqp_event_impl.cpp | 4 + ydb/core/kqp/common/simple/kqp_event_ids.h | 5 +- .../kqp_proxy_databases_cache.cpp | 146 ++++++++++++++++++ .../kqp/proxy_service/kqp_proxy_service.cpp | 40 +++-- .../proxy_service/kqp_proxy_service_impl.h | 72 +++++++++ ydb/core/kqp/proxy_service/ya.make | 1 + .../workload_service/actors/scheme_actors.cpp | 4 +- ydb/core/kqp/workload_service/common/events.h | 4 +- .../kqp_workload_service_impl.h | 4 - ydb/core/protos/kqp.proto | 1 + 14 files changed, 349 insertions(+), 48 deletions(-) create mode 100644 ydb/core/kqp/proxy_service/kqp_proxy_databases_cache.cpp diff --git a/ydb/core/kqp/common/events/events.h b/ydb/core/kqp/common/events/events.h index 7e700123c42c..ab88138b306b 100644 --- a/ydb/core/kqp/common/events/events.h +++ b/ydb/core/kqp/common/events/events.h @@ -108,6 +108,18 @@ struct TEvKqp { struct TEvScriptRequest : public TEventLocal { TEvScriptRequest() = default; + const TString& GetDatabase() const { + return Record.GetRequest().GetDatabase(); + } + + const TString& GetDatabaseId() const { + return Record.GetRequest().GetDatabaseId(); + } + + void SetDatabaseId(const TString& databaseId) { + Record.MutableRequest()->SetDatabaseId(databaseId); + } + mutable NKikimrKqp::TEvQueryRequest Record; TDuration ForgetAfter; TDuration ResultsTtl; @@ -161,6 +173,36 @@ struct TEvKqp { return issues; } }; + + struct TEvSubscribeOnDatabase : public TEventLocal { + explicit TEvSubscribeOnDatabase(const TString& database) + : Database(database) + {} + + TString Database; + }; + + struct TEvUpdateDatabaseInfo : public TEventLocal { + TEvUpdateDatabaseInfo(const TString& database, Ydb::StatusIds::StatusCode status, NYql::TIssues issues) + : Status(status) + , Database(database) + , Issues(std::move(issues)) + {} + + TEvUpdateDatabaseInfo(const TString& database, const TString& databaseId, bool serverless) + : Status(Ydb::StatusIds::SUCCESS) + , Database(database) + , DatabaseId(databaseId) + , Serverless(serverless) + , Issues({}) + {} + + Ydb::StatusIds::StatusCode Status; + TString Database; + TString DatabaseId; + bool Serverless = false; + NYql::TIssues Issues; + }; }; } // namespace NKikimr::NKqp diff --git a/ydb/core/kqp/common/events/query.h b/ydb/core/kqp/common/events/query.h index 18818e958182..e2fed5ba6fe9 100644 --- a/ydb/core/kqp/common/events/query.h +++ b/ydb/core/kqp/common/events/query.h @@ -351,6 +351,17 @@ struct TEvQueryRequest: public NActors::TEventLocal Token_; TActorId RequestActorId; TString Database; + TString DatabaseId; TString SessionId; TString YqlText; TString QueryId; diff --git a/ydb/core/kqp/common/events/script_executions.h b/ydb/core/kqp/common/events/script_executions.h index f6bf6d424101..62e2d767c744 100644 --- a/ydb/core/kqp/common/events/script_executions.h +++ b/ydb/core/kqp/common/events/script_executions.h @@ -22,13 +22,34 @@ enum EFinalizationStatus : i32 { FS_ROLLBACK, }; -struct TEvForgetScriptExecutionOperation : public NActors::TEventLocal { - TEvForgetScriptExecutionOperation(const TString& database, const NOperationId::TOperationId& id) +template +struct TEventWithDatabaseId : public NActors::TEventLocal { + TEventWithDatabaseId(const TString& database) : Database(database) - , OperationId(id) {} + const TString& GetDatabase() const { + return Database; + } + + const TString& GetDatabaseId() const { + return DatabaseId; + } + + void SetDatabaseId(const TString& databaseId) { + DatabaseId = databaseId; + } + const TString Database; + TString DatabaseId; +}; + +struct TEvForgetScriptExecutionOperation : public TEventWithDatabaseId { + TEvForgetScriptExecutionOperation(const TString& database, const NOperationId::TOperationId& id) + : TEventWithDatabaseId(database) + , OperationId(id) + {} + const NOperationId::TOperationId OperationId; }; @@ -43,14 +64,12 @@ struct TEvForgetScriptExecutionOperationResponse : public NActors::TEventLocal { - explicit TEvGetScriptExecutionOperation(const TString& database, const NOperationId::TOperationId& id) - : Database(database) +struct TEvGetScriptExecutionOperation : public TEventWithDatabaseId { + TEvGetScriptExecutionOperation(const TString& database, const NOperationId::TOperationId& id) + : TEventWithDatabaseId(database) , OperationId(id) - { - } + {} - TString Database; NOperationId::TOperationId OperationId; }; @@ -97,14 +116,13 @@ struct TEvGetScriptExecutionOperationResponse : public NActors::TEventLocal Metadata; }; -struct TEvListScriptExecutionOperations : public NActors::TEventLocal { +struct TEvListScriptExecutionOperations : public TEventWithDatabaseId { TEvListScriptExecutionOperations(const TString& database, const ui64 pageSize, const TString& pageToken) - : Database(database) + : TEventWithDatabaseId(database) , PageSize(pageSize) , PageToken(pageToken) {} - TString Database; ui64 PageSize; TString PageToken; }; @@ -151,14 +169,12 @@ struct TEvCheckAliveRequest : public NActors::TEventPB { }; -struct TEvCancelScriptExecutionOperation : public NActors::TEventLocal { - explicit TEvCancelScriptExecutionOperation(const TString& database, const NOperationId::TOperationId& id) - : Database(database) +struct TEvCancelScriptExecutionOperation : public TEventWithDatabaseId { + TEvCancelScriptExecutionOperation(const TString& database, const NOperationId::TOperationId& id) + : TEventWithDatabaseId(database) , OperationId(id) - { - } + {} - TString Database; NOperationId::TOperationId OperationId; }; diff --git a/ydb/core/kqp/common/events/workload_service.h b/ydb/core/kqp/common/events/workload_service.h index c1d36a957a76..674be7854510 100644 --- a/ydb/core/kqp/common/events/workload_service.h +++ b/ydb/core/kqp/common/events/workload_service.h @@ -90,14 +90,4 @@ struct TEvUpdatePoolInfo : public NActors::TEventLocal SecurityObject; }; -struct TEvUpdateDatabaseInfo : public NActors::TEventLocal { - TEvUpdateDatabaseInfo(const TString& database, bool serverless) - : Database(database) - , Serverless(serverless) - {} - - const TString Database; - const bool Serverless; -}; - } // NKikimr::NKqp::NWorkload diff --git a/ydb/core/kqp/common/kqp_event_impl.cpp b/ydb/core/kqp/common/kqp_event_impl.cpp index cadd44a1c89e..ee4e834e6c88 100644 --- a/ydb/core/kqp/common/kqp_event_impl.cpp +++ b/ydb/core/kqp/common/kqp_event_impl.cpp @@ -90,6 +90,10 @@ void TEvKqp::TEvQueryRequest::PrepareRemote() const { Record.MutableRequest()->SetPoolId(PoolId); } + if (!DatabaseId.empty()) { + Record.MutableRequest()->SetDatabaseId(DatabaseId); + } + Record.MutableRequest()->SetSessionId(SessionId); Record.MutableRequest()->SetAction(QueryAction); Record.MutableRequest()->SetType(QueryType); diff --git a/ydb/core/kqp/common/simple/kqp_event_ids.h b/ydb/core/kqp/common/simple/kqp_event_ids.h index c927c0b11568..8f187ca69d7a 100644 --- a/ydb/core/kqp/common/simple/kqp_event_ids.h +++ b/ydb/core/kqp/common/simple/kqp_event_ids.h @@ -43,7 +43,9 @@ struct TKqpEvents { EvListSessionsRequest, EvListSessionsResponse, EvListProxyNodesRequest, - EvListProxyNodesResponse + EvListProxyNodesResponse, + EvSubscribeOnDatabase, + EvUpdateDatabaseInfo }; static_assert (EvCompileInvalidateRequest + 1 == EvAbortExecution); @@ -174,7 +176,6 @@ struct TKqpWorkloadServiceEvents { EvCleanupRequest, EvCleanupResponse, EvUpdatePoolInfo, - EvUpdateDatabaseInfo, EvSubscribeOnPoolChanges, }; }; diff --git a/ydb/core/kqp/proxy_service/kqp_proxy_databases_cache.cpp b/ydb/core/kqp/proxy_service/kqp_proxy_databases_cache.cpp new file mode 100644 index 000000000000..9f54f4365ed3 --- /dev/null +++ b/ydb/core/kqp/proxy_service/kqp_proxy_databases_cache.cpp @@ -0,0 +1,146 @@ +#include "kqp_proxy_service_impl.h" + +#include +#include + +#include + + +namespace NKikimr::NKqp { + +namespace { + +class TDatabaseSubscriberActor : public TActor { + using TBase = TActor; + + struct TDatabaseState { + bool FetchRequestIsRunning = false; + TPathId WatchPathId; + + TString DatabaseId; + bool Serverless = false; + std::unordered_set Subscribers; + }; + +public: + TDatabaseSubscriberActor() + : TBase(&TDatabaseSubscriberActor::StateFunc) + {} + + void Handle(TEvKqp::TEvSubscribeOnDatabase::TPtr& ev) { + const TString& database = CanonizePath(ev->Get()->Database); + auto& databaseState = Subscriptions[database]; + + if (databaseState.DatabaseId) { + SendSubscriberInfo(database, ev->Sender, databaseState, Ydb::StatusIds::SUCCESS); + } else if (!databaseState.FetchRequestIsRunning) { + Register(NWorkload::CreateDatabaseFetcherActor(SelfId(), database)); + databaseState.FetchRequestIsRunning = true; + } + + databaseState.Subscribers.insert(ev->Sender); + } + + void Handle(NWorkload::TEvPrivate::TEvFetchDatabaseResponse::TPtr& ev) { + const TString& database = CanonizePath(ev->Get()->Database); + auto& databaseState = Subscriptions[database]; + + UpdateDatabaseState(databaseState, database, ev->Get()->PathId, ev->Get()->Serverless); + UpdateSubscribersInfo(database, databaseState, ev->Get()->Status, ev->Get()->Issues); + + databaseState.FetchRequestIsRunning = false; + databaseState.WatchPathId = ev->Get()->PathId; + + if (ev->Get()->Status == Ydb::StatusIds::SUCCESS) { + WatchKey++; + Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvWatchPathId(databaseState.WatchPathId, WatchKey)); + WatchDatabases.insert({WatchKey, database}); + } + } + + void Handle(TEvTxProxySchemeCache::TEvWatchNotifyUpdated::TPtr& ev) { + auto it = WatchDatabases.find(ev->Get()->Key); + if (it == WatchDatabases.end()) { + return; + } + + const auto& result = ev->Get()->Result; + if (!result || result->GetStatus() != NKikimrScheme::StatusSuccess) { + return; + } + + if (result->GetPathDescription().HasDomainDescription()) { + NSchemeCache::TDomainInfo description(result->GetPathDescription().GetDomainDescription()); + + auto& databaseState = Subscriptions[it->second]; + UpdateDatabaseState(databaseState, it->second, description.DomainKey, description.IsServerless()); + UpdateSubscribersInfo(it->second, databaseState, Ydb::StatusIds::SUCCESS); + } + } + + void Handle(TEvTxProxySchemeCache::TEvWatchNotifyDeleted::TPtr& ev) { + auto it = WatchDatabases.find(ev->Get()->Key); + if (it == WatchDatabases.end()) { + return; + } + + Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvWatchRemove(ev->Get()->Key)); + + auto databaseStateIt = Subscriptions.find(it->second); + if (databaseStateIt != Subscriptions.end()) { + UpdateSubscribersInfo(it->second, databaseStateIt->second, Ydb::StatusIds::NOT_FOUND, {NYql::TIssue{"Database was dropped"}}); + Subscriptions.erase(databaseStateIt); + } + + WatchDatabases.erase(it); + } + + void HandlePoison() { + if (!WatchDatabases.empty()) { + Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvWatchRemove(0)); + } + + TBase::PassAway(); + } + + STRICT_STFUNC(StateFunc, + hFunc(TEvKqp::TEvSubscribeOnDatabase, Handle); + hFunc(NWorkload::TEvPrivate::TEvFetchDatabaseResponse, Handle); + hFunc(TEvTxProxySchemeCache::TEvWatchNotifyUpdated, Handle); + hFunc(TEvTxProxySchemeCache::TEvWatchNotifyDeleted, Handle); + sFunc(TEvents::TEvPoison, HandlePoison); + ) + +private: + void UpdateDatabaseState(TDatabaseState& databaseState, const TString& database, TPathId pathId, bool serverless) { + databaseState.DatabaseId = (serverless ? TStringBuilder() << pathId.OwnerId << ":" << pathId.LocalPathId << ":" : TStringBuilder()) << database; + databaseState.Serverless = serverless; + } + + void UpdateSubscribersInfo(const TString& database, const TDatabaseState& databaseState, Ydb::StatusIds::StatusCode status, NYql::TIssues issues = {}) { + for (const auto& subscriber : databaseState.Subscribers) { + SendSubscriberInfo(database, subscriber, databaseState, status, issues); + } + } + + void SendSubscriberInfo(const TString& database, TActorId subscriber, const TDatabaseState& databaseState, Ydb::StatusIds::StatusCode status, NYql::TIssues issues = {}) { + if (status == Ydb::StatusIds::SUCCESS || status == Ydb::StatusIds::UNSUPPORTED) { + Send(subscriber, new TEvKqp::TEvUpdateDatabaseInfo(database, databaseState.DatabaseId, databaseState.Serverless)); + } else { + Send(subscriber, new TEvKqp::TEvUpdateDatabaseInfo(database, status, std::move(issues))); + } + } + +private: + std::unordered_map Subscriptions; + std::unordered_map WatchDatabases; + ui32 WatchKey = 0; +}; + +} // anonymous namespace + +void TDatabasesCache::CreateDatabaseSubscriberActor(TActorContext actorContext) { + SubscriberActor = actorContext.Register(new TDatabaseSubscriberActor()); +} + +} // namespace NKikimr::NKqp diff --git a/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp b/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp index c6793fc65208..7e6ec315071d 100644 --- a/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp +++ b/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp @@ -486,6 +486,7 @@ class TKqpProxyService : public TActorBootstrapped { }); ResourcePoolsCache.UnsubscribeFromResourcePoolClassifiers(ActorContext()); + DatabasesCache.StopSubscriberActor(ActorContext()); return TActor::PassAway(); } @@ -635,6 +636,16 @@ class TKqpProxyService : public TActorBootstrapped { } void Handle(TEvKqp::TEvQueryRequest::TPtr& ev) { + const auto errorHandler = [this, sender = ev->Sender](Ydb::StatusIds::StatusCode status, NYql::TIssues issues){ + auto response = std::make_unique(); + response->Record.GetRef().SetYdbStatus(status); + NYql::IssuesToMessage(issues, response->Record.GetRef().MutableResponse()->MutableQueryIssues()); + Send(sender, std::move(response)); + }; + if (!DatabasesCache.SetDatabaseIdOrDeffer(ev, errorHandler, ActorContext())) { + return; + } + const TString& database = ev->Get()->GetDatabase(); const TString& traceId = ev->Get()->GetTraceId(); const auto queryType = ev->Get()->GetType(); @@ -1359,7 +1370,7 @@ class TKqpProxyService : public TActorBootstrapped { hFunc(TEvKqp::TEvListSessionsRequest, Handle); hFunc(TEvKqp::TEvListProxyNodesRequest, Handle); hFunc(NWorkload::TEvUpdatePoolInfo, Handle); - hFunc(NWorkload::TEvUpdateDatabaseInfo, Handle); + hFunc(TEvKqp::TEvUpdateDatabaseInfo, Handle); hFunc(NMetadata::NProvider::TEvRefreshSubscriberData, Handle); default: Y_ABORT("TKqpProxyService: unexpected event type: %" PRIx32 " event: %s", @@ -1636,6 +1647,13 @@ class TKqpProxyService : public TActorBootstrapped { return false; } + const auto errorHandler = [this, sender = ev->Sender](Ydb::StatusIds::StatusCode status, NYql::TIssues issues){ + Send(sender, new TResponse(status, std::move(issues))); + }; + if (!DatabasesCache.SetDatabaseIdOrDeffer(ev, errorHandler, ActorContext())) { + return false; + } + switch (ScriptExecutionsCreationStatus) { case EScriptExecutionsCreationStatus::NotStarted: ScriptExecutionsCreationStatus = EScriptExecutionsCreationStatus::Pending; @@ -1645,9 +1663,7 @@ class TKqpProxyService : public TActorBootstrapped { if (DelayedEventsQueue.size() < 10000) { DelayedEventsQueue.push_back({ .Event = std::move(ev), - .ResponseBuilder = [](Ydb::StatusIds::StatusCode status, NYql::TIssues issues) { - return new TResponse(status, std::move(issues)); - } + .ErrorHandler = errorHandler }); } else { NYql::TIssues issues; @@ -1677,7 +1693,7 @@ class TKqpProxyService : public TActorBootstrapped { if (ev->Get()->Success) { Send(std::move(delayedEvent.Event)); } else { - Send(delayedEvent.Event->Sender, delayedEvent.ResponseBuilder(Ydb::StatusIds::INTERNAL_ERROR, {rootIssue})); + delayedEvent.ErrorHandler(Ydb::StatusIds::INTERNAL_ERROR, {rootIssue}); } DelayedEventsQueue.pop_front(); } @@ -1806,8 +1822,11 @@ class TKqpProxyService : public TActorBootstrapped { ResourcePoolsCache.UpdatePoolInfo(ev->Get()->Database, ev->Get()->PoolId, ev->Get()->Config, ev->Get()->SecurityObject, ActorContext()); } - void Handle(NWorkload::TEvUpdateDatabaseInfo::TPtr& ev) { - ResourcePoolsCache.UpdateDatabaseInfo(ev->Get()->Database, ev->Get()->Serverless); + void Handle(TEvKqp::TEvUpdateDatabaseInfo::TPtr& ev) { + if (ev->Get()->Status == Ydb::StatusIds::SUCCESS) { + ResourcePoolsCache.UpdateDatabaseInfo(ev->Get()->Database, ev->Get()->Serverless); + } + DatabasesCache.UpdateDatabaseInfo(ev, ActorContext()); } void Handle(NMetadata::NProvider::TEvRefreshSubscriberData::TPtr& ev) { @@ -1867,16 +1886,13 @@ class TKqpProxyService : public TActorBootstrapped { Pending, Finished, }; - struct TDelayedEvent { - THolder Event; - std::function ResponseBuilder; - }; EScriptExecutionsCreationStatus ScriptExecutionsCreationStatus = EScriptExecutionsCreationStatus::NotStarted; - std::deque DelayedEventsQueue; + std::deque DelayedEventsQueue; bool IsLookupByRmScheduled = false; TActorId KqpTempTablesAgentActor; TResourcePoolsCache ResourcePoolsCache; + TDatabasesCache DatabasesCache; }; } // namespace diff --git a/ydb/core/kqp/proxy_service/kqp_proxy_service_impl.h b/ydb/core/kqp/proxy_service/kqp_proxy_service_impl.h index b55c8e67d2e5..acc17f1141bd 100644 --- a/ydb/core/kqp/proxy_service/kqp_proxy_service_impl.h +++ b/ydb/core/kqp/proxy_service/kqp_proxy_service_impl.h @@ -642,4 +642,76 @@ class TResourcePoolsCache { bool SubscribedOnResourcePoolClassifiers = false; }; +class TDatabasesCache { +public: + struct TDelayedEvent { + THolder Event; + std::function ErrorHandler; + }; + +private: + struct TDatabaseInfo { + TString DatabaseId; // string "::" + std::vector DelayedEvents; + }; + +public: + template + bool SetDatabaseIdOrDeffer(TEvent& event, std::function errorHandler, TActorContext actorContext) { + if (!event->Get()->GetDatabaseId().empty()) { + return true; + } + + const auto& database = event->Get()->GetDatabase(); + auto& databaseInfo = DatabasesCache[database]; + if (databaseInfo.DatabaseId) { + event->Get()->SetDatabaseId(databaseInfo.DatabaseId); + return true; + } + + if (!SubscriberActor) { + CreateDatabaseSubscriberActor(actorContext); + } + + actorContext.Send(SubscriberActor, new TEvKqp::TEvSubscribeOnDatabase(database)); + databaseInfo.DelayedEvents.push_back({ + .Event = std::move(event), + .ErrorHandler = errorHandler + }); + return false; + } + + void UpdateDatabaseInfo(TEvKqp::TEvUpdateDatabaseInfo::TPtr& event, TActorContext actorContext) { + auto it = DatabasesCache.find(event->Get()->Database); + it->second.DatabaseId = event->Get()->DatabaseId; + + bool success = event->Get()->Status == Ydb::StatusIds::SUCCESS; + for (auto& delayedEvent : it->second.DelayedEvents) { + if (success) { + actorContext.Send(std::move(delayedEvent.Event)); + } else { + delayedEvent.ErrorHandler(event->Get()->Status, event->Get()->Issues); + } + } + it->second.DelayedEvents.clear(); + + if (!success) { + DatabasesCache.erase(it); + } + } + + void StopSubscriberActor(TActorContext actorContext) const { + if (SubscriberActor) { + actorContext.Send(SubscriberActor, new TEvents::TEvPoison()); + } + } + +private: + void CreateDatabaseSubscriberActor(TActorContext actorContext); + +private: + std::unordered_map DatabasesCache; + TActorId SubscriberActor; +}; + } // namespace NKikimr::NKqp diff --git a/ydb/core/kqp/proxy_service/ya.make b/ydb/core/kqp/proxy_service/ya.make index e1c2b9e1b76a..8a143789e701 100644 --- a/ydb/core/kqp/proxy_service/ya.make +++ b/ydb/core/kqp/proxy_service/ya.make @@ -2,6 +2,7 @@ LIBRARY() SRCS( kqp_proxy_service.cpp + kqp_proxy_databases_cache.cpp kqp_proxy_peer_stats_calculator.cpp kqp_script_executions.cpp kqp_session_info.cpp diff --git a/ydb/core/kqp/workload_service/actors/scheme_actors.cpp b/ydb/core/kqp/workload_service/actors/scheme_actors.cpp index 781d9f4a6eca..9c9d400c9c37 100644 --- a/ydb/core/kqp/workload_service/actors/scheme_actors.cpp +++ b/ydb/core/kqp/workload_service/actors/scheme_actors.cpp @@ -489,6 +489,7 @@ class TDatabaseFetcherActor : public TSchemeActorBase { } if (result.DomainInfo) { Serverless = result.DomainInfo->IsServerless(); + PathId = result.DomainInfo->DomainKey; } Reply(Ydb::StatusIds::SUCCESS); return; @@ -537,7 +538,7 @@ class TDatabaseFetcherActor : public TSchemeActorBase { } Issues.AddIssues(std::move(issues)); - Send(ReplyActorId, new TEvPrivate::TEvFetchDatabaseResponse(status, Database, Serverless, std::move(Issues))); + Send(ReplyActorId, new TEvPrivate::TEvFetchDatabaseResponse(status, Database, Serverless, PathId, std::move(Issues))); PassAway(); } @@ -560,6 +561,7 @@ class TDatabaseFetcherActor : public TSchemeActorBase { const NACLib::EAccessRights CheckAccess; bool Serverless = false; + TPathId PathId; }; } // anonymous namespace diff --git a/ydb/core/kqp/workload_service/common/events.h b/ydb/core/kqp/workload_service/common/events.h index df821a4c26d6..3af9abc8b4c1 100644 --- a/ydb/core/kqp/workload_service/common/events.h +++ b/ydb/core/kqp/workload_service/common/events.h @@ -95,16 +95,18 @@ struct TEvPrivate { }; struct TEvFetchDatabaseResponse : public NActors::TEventLocal { - TEvFetchDatabaseResponse(Ydb::StatusIds::StatusCode status, const TString& database, bool serverless, NYql::TIssues issues) + TEvFetchDatabaseResponse(Ydb::StatusIds::StatusCode status, const TString& database, bool serverless, TPathId pathId, NYql::TIssues issues) : Status(status) , Database(database) , Serverless(serverless) + , PathId(pathId) , Issues(std::move(issues)) {} const Ydb::StatusIds::StatusCode Status; const TString Database; const bool Serverless; + const TPathId PathId; const NYql::TIssues Issues; }; diff --git a/ydb/core/kqp/workload_service/kqp_workload_service_impl.h b/ydb/core/kqp/workload_service/kqp_workload_service_impl.h index 813f97b6e107..6e849eb07e86 100644 --- a/ydb/core/kqp/workload_service/kqp_workload_service_impl.h +++ b/ydb/core/kqp/workload_service/kqp_workload_service_impl.h @@ -77,10 +77,6 @@ struct TDatabaseState { return; } - if (Serverless != ev->Get()->Serverless) { - ActorContext.Send(MakeKqpProxyID(ActorContext.SelfID.NodeId()), new TEvUpdateDatabaseInfo(ev->Get()->Database, ev->Get()->Serverless)); - } - LastUpdateTime = TInstant::Now(); Serverless = ev->Get()->Serverless; StartPendingRequests(); diff --git a/ydb/core/protos/kqp.proto b/ydb/core/protos/kqp.proto index c9f4cc7442d0..a4c6906783b5 100644 --- a/ydb/core/protos/kqp.proto +++ b/ydb/core/protos/kqp.proto @@ -117,6 +117,7 @@ message TQueryRequest { optional string UserSID = 33; optional uint64 OutputChunkMaxSize = 34; optional string PoolId = 35; + optional string DatabaseId = 36; } message TKqpPathIdProto { From 2a4923ca0b1028fcc74292c6c2c07cd3b71932c9 Mon Sep 17 00:00:00 2001 From: Grigoriy Pisarenko Date: Mon, 23 Sep 2024 15:52:24 +0000 Subject: [PATCH 2/5] Fixed unit tests --- ydb/core/kqp/proxy_service/kqp_proxy_databases_cache.cpp | 6 +++++- ydb/core/kqp/proxy_service/kqp_proxy_service.cpp | 2 ++ ydb/core/kqp/proxy_service/kqp_proxy_service_impl.h | 8 ++++++-- ydb/core/kqp/proxy_service/ut/ya.make | 1 + .../kqp/workload_service/ut/kqp_workload_service_ut.cpp | 4 ++-- ydb/tests/tools/kqprun/configuration/app_config.conf | 1 + 6 files changed, 17 insertions(+), 5 deletions(-) diff --git a/ydb/core/kqp/proxy_service/kqp_proxy_databases_cache.cpp b/ydb/core/kqp/proxy_service/kqp_proxy_databases_cache.cpp index 9f54f4365ed3..d3062695db3d 100644 --- a/ydb/core/kqp/proxy_service/kqp_proxy_databases_cache.cpp +++ b/ydb/core/kqp/proxy_service/kqp_proxy_databases_cache.cpp @@ -127,7 +127,11 @@ class TDatabaseSubscriberActor : public TActor { if (status == Ydb::StatusIds::SUCCESS || status == Ydb::StatusIds::UNSUPPORTED) { Send(subscriber, new TEvKqp::TEvUpdateDatabaseInfo(database, databaseState.DatabaseId, databaseState.Serverless)); } else { - Send(subscriber, new TEvKqp::TEvUpdateDatabaseInfo(database, status, std::move(issues))); + NYql::TIssue rootIssue(TStringBuilder() << "Failed to describe database" << database); + for (const auto& issue : issues) { + rootIssue.AddSubIssue(MakeIntrusive(issue)); + } + Send(subscriber, new TEvKqp::TEvUpdateDatabaseInfo(database, status, {rootIssue})); } } diff --git a/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp b/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp index 7e6ec315071d..11da07090688 100644 --- a/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp +++ b/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp @@ -646,6 +646,8 @@ class TKqpProxyService : public TActorBootstrapped { return; } + Cerr << "------------------------------- DatabaseId: " << ev->Get()->GetDatabaseId() << "\n"; + const TString& database = ev->Get()->GetDatabase(); const TString& traceId = ev->Get()->GetTraceId(); const auto queryType = ev->Get()->GetType(); diff --git a/ydb/core/kqp/proxy_service/kqp_proxy_service_impl.h b/ydb/core/kqp/proxy_service/kqp_proxy_service_impl.h index acc17f1141bd..8b039f07a1ba 100644 --- a/ydb/core/kqp/proxy_service/kqp_proxy_service_impl.h +++ b/ydb/core/kqp/proxy_service/kqp_proxy_service_impl.h @@ -662,7 +662,7 @@ class TDatabasesCache { return true; } - const auto& database = event->Get()->GetDatabase(); + const auto& database = CanonizePath(event->Get()->GetDatabase()); auto& databaseInfo = DatabasesCache[database]; if (databaseInfo.DatabaseId) { event->Get()->SetDatabaseId(databaseInfo.DatabaseId); @@ -682,7 +682,11 @@ class TDatabasesCache { } void UpdateDatabaseInfo(TEvKqp::TEvUpdateDatabaseInfo::TPtr& event, TActorContext actorContext) { - auto it = DatabasesCache.find(event->Get()->Database); + const auto& database = event->Get()->Database; + auto it = DatabasesCache.find(database); + if (it == DatabasesCache.end()) { + it = DatabasesCache.insert({database, TDatabaseInfo{}}).first; + } it->second.DatabaseId = event->Get()->DatabaseId; bool success = event->Get()->Status == Ydb::StatusIds::SUCCESS; diff --git a/ydb/core/kqp/proxy_service/ut/ya.make b/ydb/core/kqp/proxy_service/ut/ya.make index 730f59a3fcae..fc6d9e7c89cb 100644 --- a/ydb/core/kqp/proxy_service/ut/ya.make +++ b/ydb/core/kqp/proxy_service/ut/ya.make @@ -13,6 +13,7 @@ PEERDIR( ydb/core/kqp/run_script_actor ydb/core/kqp/proxy_service ydb/core/kqp/ut/common + ydb/core/kqp/workload_service/ut/common ydb/library/yql/sql/pg_dummy ydb/public/sdk/cpp/client/ydb_query ydb/public/sdk/cpp/client/ydb_driver diff --git a/ydb/core/kqp/workload_service/ut/kqp_workload_service_ut.cpp b/ydb/core/kqp/workload_service/ut/kqp_workload_service_ut.cpp index cad50ad143da..39618cce2b00 100644 --- a/ydb/core/kqp/workload_service/ut/kqp_workload_service_ut.cpp +++ b/ydb/core/kqp/workload_service/ut/kqp_workload_service_ut.cpp @@ -635,7 +635,7 @@ Y_UNIT_TEST_SUITE(ResourcePoolClassifiersDdl) { } void WaitForFail(TIntrusivePtr ydb, const TQueryRunnerSettings& settings, const TString& poolId) { - ydb->WaitFor(TDuration::Seconds(5), "Resource pool classifier fail", [ydb, settings, poolId](TString& errorString) { + ydb->WaitFor(TDuration::Seconds(10), "Resource pool classifier fail", [ydb, settings, poolId](TString& errorString) { auto result = ydb->ExecuteQuery(TSampleQueries::TSelect42::Query, settings); errorString = result.GetIssues().ToOneLineString(); @@ -644,7 +644,7 @@ Y_UNIT_TEST_SUITE(ResourcePoolClassifiersDdl) { } void WaitForSuccess(TIntrusivePtr ydb, const TQueryRunnerSettings& settings) { - ydb->WaitFor(TDuration::Seconds(5), "Resource pool classifier success", [ydb, settings](TString& errorString) { + ydb->WaitFor(TDuration::Seconds(10), "Resource pool classifier success", [ydb, settings](TString& errorString) { auto result = ydb->ExecuteQuery(TSampleQueries::TSelect42::Query, settings); errorString = result.GetIssues().ToOneLineString(); diff --git a/ydb/tests/tools/kqprun/configuration/app_config.conf b/ydb/tests/tools/kqprun/configuration/app_config.conf index 1c9c56cebd20..1b48b9cb2e3b 100644 --- a/ydb/tests/tools/kqprun/configuration/app_config.conf +++ b/ydb/tests/tools/kqprun/configuration/app_config.conf @@ -53,6 +53,7 @@ FeatureFlags { EnableScriptExecutionOperations: true EnableExternalSourceSchemaInference: true EnableTempTables: true + EnableResourcePools: false } KQPConfig { From 3d1fe9c5c207aef9d912fe1f8fc5bf4862142e0b Mon Sep 17 00:00:00 2001 From: Grigoriy Pisarenko Date: Mon, 23 Sep 2024 16:05:18 +0000 Subject: [PATCH 3/5] Fixed empty database path --- ydb/core/kqp/proxy_service/kqp_proxy_service_impl.h | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ydb/core/kqp/proxy_service/kqp_proxy_service_impl.h b/ydb/core/kqp/proxy_service/kqp_proxy_service_impl.h index 8b039f07a1ba..e1737f396a37 100644 --- a/ydb/core/kqp/proxy_service/kqp_proxy_service_impl.h +++ b/ydb/core/kqp/proxy_service/kqp_proxy_service_impl.h @@ -658,11 +658,11 @@ class TDatabasesCache { public: template bool SetDatabaseIdOrDeffer(TEvent& event, std::function errorHandler, TActorContext actorContext) { - if (!event->Get()->GetDatabaseId().empty()) { + const auto& database = CanonizePath(event->Get()->GetDatabase()); + if (!event->Get()->GetDatabaseId().empty() || database.empty()) { return true; } - const auto& database = CanonizePath(event->Get()->GetDatabase()); auto& databaseInfo = DatabasesCache[database]; if (databaseInfo.DatabaseId) { event->Get()->SetDatabaseId(databaseInfo.DatabaseId); From daed4e5927220686220a20970143abb59ab79a2f Mon Sep 17 00:00:00 2001 From: Grigoriy Pisarenko Date: Mon, 23 Sep 2024 18:31:19 +0000 Subject: [PATCH 4/5] Added unit test --- .../kqp/proxy_service/kqp_proxy_service.cpp | 2 - ydb/core/kqp/proxy_service/kqp_proxy_ut.cpp | 86 +++++++++++++++++++ 2 files changed, 86 insertions(+), 2 deletions(-) diff --git a/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp b/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp index 11da07090688..7e6ec315071d 100644 --- a/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp +++ b/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp @@ -646,8 +646,6 @@ class TKqpProxyService : public TActorBootstrapped { return; } - Cerr << "------------------------------- DatabaseId: " << ev->Get()->GetDatabaseId() << "\n"; - const TString& database = ev->Get()->GetDatabase(); const TString& traceId = ev->Get()->GetTraceId(); const auto queryType = ev->Get()->GetType(); diff --git a/ydb/core/kqp/proxy_service/kqp_proxy_ut.cpp b/ydb/core/kqp/proxy_service/kqp_proxy_ut.cpp index 883ec7d9198e..d61d643e0588 100644 --- a/ydb/core/kqp/proxy_service/kqp_proxy_ut.cpp +++ b/ydb/core/kqp/proxy_service/kqp_proxy_ut.cpp @@ -2,7 +2,9 @@ #include #include #include +#include #include +#include #include #include #include @@ -65,6 +67,63 @@ TString CreateSession(TTestActorRuntime* runtime, const TActorId& kqpProxy, cons return sessionId; } +class TDatabaseCacheTestActor : public TActorBootstrapped { +public: + TDatabaseCacheTestActor(const TString& database, const TString& expectedDatabaseId, bool fromCache, TDatabasesCache& cache, NThreading::TPromise promise) + : Database(database) + , ExpectedDatabaseId(expectedDatabaseId) + , Cache(cache) + , Promise(promise) + , FromCache(fromCache) + {} + + void Bootstrap() { + Become(&TDatabaseCacheTestActor::StateFunc); + + auto event = MakeHolder(); + event->Record.MutableRequest()->SetDatabase(Database); + Send(SelfId(), event.Release()); + } + + void Handle(TEvKqp::TEvUpdateDatabaseInfo::TPtr& ev) { + Cache.UpdateDatabaseInfo(ev, ActorContext()); + } + + void Handle(TEvKqp::TEvQueryRequest::TPtr& ev) { + auto success = Cache.SetDatabaseIdOrDeffer(ev, [this](Ydb::StatusIds::StatusCode status, NYql::TIssues issues){ + UNIT_ASSERT_C(false, TStringBuilder() << "Unexpected fail, " << GetErrorString() << ", status: " << status << ", reason: " << issues.ToOneLineString()); + }, ActorContext()); + + if (FromCache) { + UNIT_ASSERT_C(success, TStringBuilder() << "Expected database id from cache, " << GetErrorString()); + UNIT_ASSERT_STRING_CONTAINS_C(ev->Get()->GetDatabaseId(), ExpectedDatabaseId, GetErrorString()); + Promise.SetValue(); + PassAway(); + } else { + UNIT_ASSERT_C(!success, TStringBuilder() << "Unexpected database id from cache, " << GetErrorString()); + FromCache = true; + } + } + + STRICT_STFUNC(StateFunc, + hFunc(TEvKqp::TEvUpdateDatabaseInfo, Handle); + hFunc(TEvKqp::TEvQueryRequest, Handle); + ) + +private: + TString GetErrorString() const { + return TStringBuilder() << "database: " << Database << ", from cache: " << FromCache << "\n"; + } + +private: + const TString Database; + const TString ExpectedDatabaseId; + TDatabasesCache& Cache; + NThreading::TPromise Promise; + + bool FromCache = false; +}; + } Y_UNIT_TEST_SUITE(KqpProxy) { @@ -542,5 +601,32 @@ Y_UNIT_TEST_SUITE(KqpProxy) { UNIT_ASSERT(allDoneOk); } + + Y_UNIT_TEST(DatabasesCacheForServerless) { + auto ydb = NWorkload::TYdbSetupSettings() + .CreateSampleTenants(true) + .Create(); + + auto& runtime = *ydb->GetRuntime(); + TDatabasesCache cache; + + auto checkCache = [&](const TString& database, const TString& expectedDatabaseId, bool fromCache) { + auto promise = NThreading::NewPromise(); + runtime.Register(new TDatabaseCacheTestActor(database, expectedDatabaseId, fromCache, cache, promise)); + promise.GetFuture().GetValueSync(); + }; + + const auto& dedicatedTennant = ydb->GetSettings().GetDedicatedTenantName(); + checkCache(dedicatedTennant, dedicatedTennant, false); + checkCache(dedicatedTennant, dedicatedTennant, true); + + const auto& sharedTennant = ydb->GetSettings().GetSharedTenantName(); + checkCache(sharedTennant, sharedTennant, false); + checkCache(sharedTennant, sharedTennant, true); + + const auto& serverlessTennant = ydb->GetSettings().GetServerlessTenantName(); + checkCache(serverlessTennant, TStringBuilder() << ":4:" << serverlessTennant, false); + checkCache(serverlessTennant, TStringBuilder() << ":4:" << serverlessTennant, true); + } } // namspace NKqp } // namespace NKikimr From 66734c8d9e36c5bdaf33feb403a809d2145ecd00 Mon Sep 17 00:00:00 2001 From: Grigoriy Pisarenko Date: Mon, 23 Sep 2024 18:42:45 +0000 Subject: [PATCH 5/5] Removed kqprun chenges --- ydb/tests/tools/kqprun/configuration/app_config.conf | 1 - 1 file changed, 1 deletion(-) diff --git a/ydb/tests/tools/kqprun/configuration/app_config.conf b/ydb/tests/tools/kqprun/configuration/app_config.conf index 1b48b9cb2e3b..1c9c56cebd20 100644 --- a/ydb/tests/tools/kqprun/configuration/app_config.conf +++ b/ydb/tests/tools/kqprun/configuration/app_config.conf @@ -53,7 +53,6 @@ FeatureFlags { EnableScriptExecutionOperations: true EnableExternalSourceSchemaInference: true EnableTempTables: true - EnableResourcePools: false } KQPConfig {