From fe27bca429b57c78ae91690860e0ee5b0857426b Mon Sep 17 00:00:00 2001 From: Grigoriy Pisarenko Date: Mon, 23 Sep 2024 14:05:41 +0000 Subject: [PATCH 01/15] Added kqp proxy database cache --- ydb/core/kqp/common/events/events.h | 42 +++++ ydb/core/kqp/common/events/query.h | 12 ++ .../kqp/common/events/script_executions.h | 52 ++++--- ydb/core/kqp/common/events/workload_service.h | 10 -- ydb/core/kqp/common/kqp_event_impl.cpp | 4 + ydb/core/kqp/common/simple/kqp_event_ids.h | 5 +- .../kqp_proxy_databases_cache.cpp | 146 ++++++++++++++++++ .../kqp/proxy_service/kqp_proxy_service.cpp | 40 +++-- .../proxy_service/kqp_proxy_service_impl.h | 72 +++++++++ ydb/core/kqp/proxy_service/ya.make | 1 + .../workload_service/actors/scheme_actors.cpp | 4 +- ydb/core/kqp/workload_service/common/events.h | 4 +- .../kqp_workload_service_impl.h | 4 - ydb/core/protos/kqp.proto | 1 + 14 files changed, 349 insertions(+), 48 deletions(-) create mode 100644 ydb/core/kqp/proxy_service/kqp_proxy_databases_cache.cpp diff --git a/ydb/core/kqp/common/events/events.h b/ydb/core/kqp/common/events/events.h index 7e700123c42c..ab88138b306b 100644 --- a/ydb/core/kqp/common/events/events.h +++ b/ydb/core/kqp/common/events/events.h @@ -108,6 +108,18 @@ struct TEvKqp { struct TEvScriptRequest : public TEventLocal { TEvScriptRequest() = default; + const TString& GetDatabase() const { + return Record.GetRequest().GetDatabase(); + } + + const TString& GetDatabaseId() const { + return Record.GetRequest().GetDatabaseId(); + } + + void SetDatabaseId(const TString& databaseId) { + Record.MutableRequest()->SetDatabaseId(databaseId); + } + mutable NKikimrKqp::TEvQueryRequest Record; TDuration ForgetAfter; TDuration ResultsTtl; @@ -161,6 +173,36 @@ struct TEvKqp { return issues; } }; + + struct TEvSubscribeOnDatabase : public TEventLocal { + explicit TEvSubscribeOnDatabase(const TString& database) + : Database(database) + {} + + TString Database; + }; + + struct TEvUpdateDatabaseInfo : public TEventLocal { + TEvUpdateDatabaseInfo(const TString& database, Ydb::StatusIds::StatusCode status, NYql::TIssues issues) + : Status(status) + , Database(database) + , Issues(std::move(issues)) + {} + + TEvUpdateDatabaseInfo(const TString& database, const TString& databaseId, bool serverless) + : Status(Ydb::StatusIds::SUCCESS) + , Database(database) + , DatabaseId(databaseId) + , Serverless(serverless) + , Issues({}) + {} + + Ydb::StatusIds::StatusCode Status; + TString Database; + TString DatabaseId; + bool Serverless = false; + NYql::TIssues Issues; + }; }; } // namespace NKikimr::NKqp diff --git a/ydb/core/kqp/common/events/query.h b/ydb/core/kqp/common/events/query.h index 18818e958182..e2fed5ba6fe9 100644 --- a/ydb/core/kqp/common/events/query.h +++ b/ydb/core/kqp/common/events/query.h @@ -351,6 +351,17 @@ struct TEvQueryRequest: public NActors::TEventLocal Token_; TActorId RequestActorId; TString Database; + TString DatabaseId; TString SessionId; TString YqlText; TString QueryId; diff --git a/ydb/core/kqp/common/events/script_executions.h b/ydb/core/kqp/common/events/script_executions.h index f6bf6d424101..62e2d767c744 100644 --- a/ydb/core/kqp/common/events/script_executions.h +++ b/ydb/core/kqp/common/events/script_executions.h @@ -22,13 +22,34 @@ enum EFinalizationStatus : i32 { FS_ROLLBACK, }; -struct TEvForgetScriptExecutionOperation : public NActors::TEventLocal { - TEvForgetScriptExecutionOperation(const TString& database, const NOperationId::TOperationId& id) +template +struct TEventWithDatabaseId : public NActors::TEventLocal { + TEventWithDatabaseId(const TString& database) : Database(database) - , OperationId(id) {} + const TString& GetDatabase() const { + return Database; + } + + const TString& GetDatabaseId() const { + return DatabaseId; + } + + void SetDatabaseId(const TString& databaseId) { + DatabaseId = databaseId; + } + const TString Database; + TString DatabaseId; +}; + +struct TEvForgetScriptExecutionOperation : public TEventWithDatabaseId { + TEvForgetScriptExecutionOperation(const TString& database, const NOperationId::TOperationId& id) + : TEventWithDatabaseId(database) + , OperationId(id) + {} + const NOperationId::TOperationId OperationId; }; @@ -43,14 +64,12 @@ struct TEvForgetScriptExecutionOperationResponse : public NActors::TEventLocal { - explicit TEvGetScriptExecutionOperation(const TString& database, const NOperationId::TOperationId& id) - : Database(database) +struct TEvGetScriptExecutionOperation : public TEventWithDatabaseId { + TEvGetScriptExecutionOperation(const TString& database, const NOperationId::TOperationId& id) + : TEventWithDatabaseId(database) , OperationId(id) - { - } + {} - TString Database; NOperationId::TOperationId OperationId; }; @@ -97,14 +116,13 @@ struct TEvGetScriptExecutionOperationResponse : public NActors::TEventLocal Metadata; }; -struct TEvListScriptExecutionOperations : public NActors::TEventLocal { +struct TEvListScriptExecutionOperations : public TEventWithDatabaseId { TEvListScriptExecutionOperations(const TString& database, const ui64 pageSize, const TString& pageToken) - : Database(database) + : TEventWithDatabaseId(database) , PageSize(pageSize) , PageToken(pageToken) {} - TString Database; ui64 PageSize; TString PageToken; }; @@ -151,14 +169,12 @@ struct TEvCheckAliveRequest : public NActors::TEventPB { }; -struct TEvCancelScriptExecutionOperation : public NActors::TEventLocal { - explicit TEvCancelScriptExecutionOperation(const TString& database, const NOperationId::TOperationId& id) - : Database(database) +struct TEvCancelScriptExecutionOperation : public TEventWithDatabaseId { + TEvCancelScriptExecutionOperation(const TString& database, const NOperationId::TOperationId& id) + : TEventWithDatabaseId(database) , OperationId(id) - { - } + {} - TString Database; NOperationId::TOperationId OperationId; }; diff --git a/ydb/core/kqp/common/events/workload_service.h b/ydb/core/kqp/common/events/workload_service.h index c1d36a957a76..674be7854510 100644 --- a/ydb/core/kqp/common/events/workload_service.h +++ b/ydb/core/kqp/common/events/workload_service.h @@ -90,14 +90,4 @@ struct TEvUpdatePoolInfo : public NActors::TEventLocal SecurityObject; }; -struct TEvUpdateDatabaseInfo : public NActors::TEventLocal { - TEvUpdateDatabaseInfo(const TString& database, bool serverless) - : Database(database) - , Serverless(serverless) - {} - - const TString Database; - const bool Serverless; -}; - } // NKikimr::NKqp::NWorkload diff --git a/ydb/core/kqp/common/kqp_event_impl.cpp b/ydb/core/kqp/common/kqp_event_impl.cpp index cadd44a1c89e..ee4e834e6c88 100644 --- a/ydb/core/kqp/common/kqp_event_impl.cpp +++ b/ydb/core/kqp/common/kqp_event_impl.cpp @@ -90,6 +90,10 @@ void TEvKqp::TEvQueryRequest::PrepareRemote() const { Record.MutableRequest()->SetPoolId(PoolId); } + if (!DatabaseId.empty()) { + Record.MutableRequest()->SetDatabaseId(DatabaseId); + } + Record.MutableRequest()->SetSessionId(SessionId); Record.MutableRequest()->SetAction(QueryAction); Record.MutableRequest()->SetType(QueryType); diff --git a/ydb/core/kqp/common/simple/kqp_event_ids.h b/ydb/core/kqp/common/simple/kqp_event_ids.h index c927c0b11568..8f187ca69d7a 100644 --- a/ydb/core/kqp/common/simple/kqp_event_ids.h +++ b/ydb/core/kqp/common/simple/kqp_event_ids.h @@ -43,7 +43,9 @@ struct TKqpEvents { EvListSessionsRequest, EvListSessionsResponse, EvListProxyNodesRequest, - EvListProxyNodesResponse + EvListProxyNodesResponse, + EvSubscribeOnDatabase, + EvUpdateDatabaseInfo }; static_assert (EvCompileInvalidateRequest + 1 == EvAbortExecution); @@ -174,7 +176,6 @@ struct TKqpWorkloadServiceEvents { EvCleanupRequest, EvCleanupResponse, EvUpdatePoolInfo, - EvUpdateDatabaseInfo, EvSubscribeOnPoolChanges, }; }; diff --git a/ydb/core/kqp/proxy_service/kqp_proxy_databases_cache.cpp b/ydb/core/kqp/proxy_service/kqp_proxy_databases_cache.cpp new file mode 100644 index 000000000000..9f54f4365ed3 --- /dev/null +++ b/ydb/core/kqp/proxy_service/kqp_proxy_databases_cache.cpp @@ -0,0 +1,146 @@ +#include "kqp_proxy_service_impl.h" + +#include +#include + +#include + + +namespace NKikimr::NKqp { + +namespace { + +class TDatabaseSubscriberActor : public TActor { + using TBase = TActor; + + struct TDatabaseState { + bool FetchRequestIsRunning = false; + TPathId WatchPathId; + + TString DatabaseId; + bool Serverless = false; + std::unordered_set Subscribers; + }; + +public: + TDatabaseSubscriberActor() + : TBase(&TDatabaseSubscriberActor::StateFunc) + {} + + void Handle(TEvKqp::TEvSubscribeOnDatabase::TPtr& ev) { + const TString& database = CanonizePath(ev->Get()->Database); + auto& databaseState = Subscriptions[database]; + + if (databaseState.DatabaseId) { + SendSubscriberInfo(database, ev->Sender, databaseState, Ydb::StatusIds::SUCCESS); + } else if (!databaseState.FetchRequestIsRunning) { + Register(NWorkload::CreateDatabaseFetcherActor(SelfId(), database)); + databaseState.FetchRequestIsRunning = true; + } + + databaseState.Subscribers.insert(ev->Sender); + } + + void Handle(NWorkload::TEvPrivate::TEvFetchDatabaseResponse::TPtr& ev) { + const TString& database = CanonizePath(ev->Get()->Database); + auto& databaseState = Subscriptions[database]; + + UpdateDatabaseState(databaseState, database, ev->Get()->PathId, ev->Get()->Serverless); + UpdateSubscribersInfo(database, databaseState, ev->Get()->Status, ev->Get()->Issues); + + databaseState.FetchRequestIsRunning = false; + databaseState.WatchPathId = ev->Get()->PathId; + + if (ev->Get()->Status == Ydb::StatusIds::SUCCESS) { + WatchKey++; + Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvWatchPathId(databaseState.WatchPathId, WatchKey)); + WatchDatabases.insert({WatchKey, database}); + } + } + + void Handle(TEvTxProxySchemeCache::TEvWatchNotifyUpdated::TPtr& ev) { + auto it = WatchDatabases.find(ev->Get()->Key); + if (it == WatchDatabases.end()) { + return; + } + + const auto& result = ev->Get()->Result; + if (!result || result->GetStatus() != NKikimrScheme::StatusSuccess) { + return; + } + + if (result->GetPathDescription().HasDomainDescription()) { + NSchemeCache::TDomainInfo description(result->GetPathDescription().GetDomainDescription()); + + auto& databaseState = Subscriptions[it->second]; + UpdateDatabaseState(databaseState, it->second, description.DomainKey, description.IsServerless()); + UpdateSubscribersInfo(it->second, databaseState, Ydb::StatusIds::SUCCESS); + } + } + + void Handle(TEvTxProxySchemeCache::TEvWatchNotifyDeleted::TPtr& ev) { + auto it = WatchDatabases.find(ev->Get()->Key); + if (it == WatchDatabases.end()) { + return; + } + + Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvWatchRemove(ev->Get()->Key)); + + auto databaseStateIt = Subscriptions.find(it->second); + if (databaseStateIt != Subscriptions.end()) { + UpdateSubscribersInfo(it->second, databaseStateIt->second, Ydb::StatusIds::NOT_FOUND, {NYql::TIssue{"Database was dropped"}}); + Subscriptions.erase(databaseStateIt); + } + + WatchDatabases.erase(it); + } + + void HandlePoison() { + if (!WatchDatabases.empty()) { + Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvWatchRemove(0)); + } + + TBase::PassAway(); + } + + STRICT_STFUNC(StateFunc, + hFunc(TEvKqp::TEvSubscribeOnDatabase, Handle); + hFunc(NWorkload::TEvPrivate::TEvFetchDatabaseResponse, Handle); + hFunc(TEvTxProxySchemeCache::TEvWatchNotifyUpdated, Handle); + hFunc(TEvTxProxySchemeCache::TEvWatchNotifyDeleted, Handle); + sFunc(TEvents::TEvPoison, HandlePoison); + ) + +private: + void UpdateDatabaseState(TDatabaseState& databaseState, const TString& database, TPathId pathId, bool serverless) { + databaseState.DatabaseId = (serverless ? TStringBuilder() << pathId.OwnerId << ":" << pathId.LocalPathId << ":" : TStringBuilder()) << database; + databaseState.Serverless = serverless; + } + + void UpdateSubscribersInfo(const TString& database, const TDatabaseState& databaseState, Ydb::StatusIds::StatusCode status, NYql::TIssues issues = {}) { + for (const auto& subscriber : databaseState.Subscribers) { + SendSubscriberInfo(database, subscriber, databaseState, status, issues); + } + } + + void SendSubscriberInfo(const TString& database, TActorId subscriber, const TDatabaseState& databaseState, Ydb::StatusIds::StatusCode status, NYql::TIssues issues = {}) { + if (status == Ydb::StatusIds::SUCCESS || status == Ydb::StatusIds::UNSUPPORTED) { + Send(subscriber, new TEvKqp::TEvUpdateDatabaseInfo(database, databaseState.DatabaseId, databaseState.Serverless)); + } else { + Send(subscriber, new TEvKqp::TEvUpdateDatabaseInfo(database, status, std::move(issues))); + } + } + +private: + std::unordered_map Subscriptions; + std::unordered_map WatchDatabases; + ui32 WatchKey = 0; +}; + +} // anonymous namespace + +void TDatabasesCache::CreateDatabaseSubscriberActor(TActorContext actorContext) { + SubscriberActor = actorContext.Register(new TDatabaseSubscriberActor()); +} + +} // namespace NKikimr::NKqp diff --git a/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp b/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp index c6793fc65208..7e6ec315071d 100644 --- a/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp +++ b/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp @@ -486,6 +486,7 @@ class TKqpProxyService : public TActorBootstrapped { }); ResourcePoolsCache.UnsubscribeFromResourcePoolClassifiers(ActorContext()); + DatabasesCache.StopSubscriberActor(ActorContext()); return TActor::PassAway(); } @@ -635,6 +636,16 @@ class TKqpProxyService : public TActorBootstrapped { } void Handle(TEvKqp::TEvQueryRequest::TPtr& ev) { + const auto errorHandler = [this, sender = ev->Sender](Ydb::StatusIds::StatusCode status, NYql::TIssues issues){ + auto response = std::make_unique(); + response->Record.GetRef().SetYdbStatus(status); + NYql::IssuesToMessage(issues, response->Record.GetRef().MutableResponse()->MutableQueryIssues()); + Send(sender, std::move(response)); + }; + if (!DatabasesCache.SetDatabaseIdOrDeffer(ev, errorHandler, ActorContext())) { + return; + } + const TString& database = ev->Get()->GetDatabase(); const TString& traceId = ev->Get()->GetTraceId(); const auto queryType = ev->Get()->GetType(); @@ -1359,7 +1370,7 @@ class TKqpProxyService : public TActorBootstrapped { hFunc(TEvKqp::TEvListSessionsRequest, Handle); hFunc(TEvKqp::TEvListProxyNodesRequest, Handle); hFunc(NWorkload::TEvUpdatePoolInfo, Handle); - hFunc(NWorkload::TEvUpdateDatabaseInfo, Handle); + hFunc(TEvKqp::TEvUpdateDatabaseInfo, Handle); hFunc(NMetadata::NProvider::TEvRefreshSubscriberData, Handle); default: Y_ABORT("TKqpProxyService: unexpected event type: %" PRIx32 " event: %s", @@ -1636,6 +1647,13 @@ class TKqpProxyService : public TActorBootstrapped { return false; } + const auto errorHandler = [this, sender = ev->Sender](Ydb::StatusIds::StatusCode status, NYql::TIssues issues){ + Send(sender, new TResponse(status, std::move(issues))); + }; + if (!DatabasesCache.SetDatabaseIdOrDeffer(ev, errorHandler, ActorContext())) { + return false; + } + switch (ScriptExecutionsCreationStatus) { case EScriptExecutionsCreationStatus::NotStarted: ScriptExecutionsCreationStatus = EScriptExecutionsCreationStatus::Pending; @@ -1645,9 +1663,7 @@ class TKqpProxyService : public TActorBootstrapped { if (DelayedEventsQueue.size() < 10000) { DelayedEventsQueue.push_back({ .Event = std::move(ev), - .ResponseBuilder = [](Ydb::StatusIds::StatusCode status, NYql::TIssues issues) { - return new TResponse(status, std::move(issues)); - } + .ErrorHandler = errorHandler }); } else { NYql::TIssues issues; @@ -1677,7 +1693,7 @@ class TKqpProxyService : public TActorBootstrapped { if (ev->Get()->Success) { Send(std::move(delayedEvent.Event)); } else { - Send(delayedEvent.Event->Sender, delayedEvent.ResponseBuilder(Ydb::StatusIds::INTERNAL_ERROR, {rootIssue})); + delayedEvent.ErrorHandler(Ydb::StatusIds::INTERNAL_ERROR, {rootIssue}); } DelayedEventsQueue.pop_front(); } @@ -1806,8 +1822,11 @@ class TKqpProxyService : public TActorBootstrapped { ResourcePoolsCache.UpdatePoolInfo(ev->Get()->Database, ev->Get()->PoolId, ev->Get()->Config, ev->Get()->SecurityObject, ActorContext()); } - void Handle(NWorkload::TEvUpdateDatabaseInfo::TPtr& ev) { - ResourcePoolsCache.UpdateDatabaseInfo(ev->Get()->Database, ev->Get()->Serverless); + void Handle(TEvKqp::TEvUpdateDatabaseInfo::TPtr& ev) { + if (ev->Get()->Status == Ydb::StatusIds::SUCCESS) { + ResourcePoolsCache.UpdateDatabaseInfo(ev->Get()->Database, ev->Get()->Serverless); + } + DatabasesCache.UpdateDatabaseInfo(ev, ActorContext()); } void Handle(NMetadata::NProvider::TEvRefreshSubscriberData::TPtr& ev) { @@ -1867,16 +1886,13 @@ class TKqpProxyService : public TActorBootstrapped { Pending, Finished, }; - struct TDelayedEvent { - THolder Event; - std::function ResponseBuilder; - }; EScriptExecutionsCreationStatus ScriptExecutionsCreationStatus = EScriptExecutionsCreationStatus::NotStarted; - std::deque DelayedEventsQueue; + std::deque DelayedEventsQueue; bool IsLookupByRmScheduled = false; TActorId KqpTempTablesAgentActor; TResourcePoolsCache ResourcePoolsCache; + TDatabasesCache DatabasesCache; }; } // namespace diff --git a/ydb/core/kqp/proxy_service/kqp_proxy_service_impl.h b/ydb/core/kqp/proxy_service/kqp_proxy_service_impl.h index b55c8e67d2e5..acc17f1141bd 100644 --- a/ydb/core/kqp/proxy_service/kqp_proxy_service_impl.h +++ b/ydb/core/kqp/proxy_service/kqp_proxy_service_impl.h @@ -642,4 +642,76 @@ class TResourcePoolsCache { bool SubscribedOnResourcePoolClassifiers = false; }; +class TDatabasesCache { +public: + struct TDelayedEvent { + THolder Event; + std::function ErrorHandler; + }; + +private: + struct TDatabaseInfo { + TString DatabaseId; // string "::" + std::vector DelayedEvents; + }; + +public: + template + bool SetDatabaseIdOrDeffer(TEvent& event, std::function errorHandler, TActorContext actorContext) { + if (!event->Get()->GetDatabaseId().empty()) { + return true; + } + + const auto& database = event->Get()->GetDatabase(); + auto& databaseInfo = DatabasesCache[database]; + if (databaseInfo.DatabaseId) { + event->Get()->SetDatabaseId(databaseInfo.DatabaseId); + return true; + } + + if (!SubscriberActor) { + CreateDatabaseSubscriberActor(actorContext); + } + + actorContext.Send(SubscriberActor, new TEvKqp::TEvSubscribeOnDatabase(database)); + databaseInfo.DelayedEvents.push_back({ + .Event = std::move(event), + .ErrorHandler = errorHandler + }); + return false; + } + + void UpdateDatabaseInfo(TEvKqp::TEvUpdateDatabaseInfo::TPtr& event, TActorContext actorContext) { + auto it = DatabasesCache.find(event->Get()->Database); + it->second.DatabaseId = event->Get()->DatabaseId; + + bool success = event->Get()->Status == Ydb::StatusIds::SUCCESS; + for (auto& delayedEvent : it->second.DelayedEvents) { + if (success) { + actorContext.Send(std::move(delayedEvent.Event)); + } else { + delayedEvent.ErrorHandler(event->Get()->Status, event->Get()->Issues); + } + } + it->second.DelayedEvents.clear(); + + if (!success) { + DatabasesCache.erase(it); + } + } + + void StopSubscriberActor(TActorContext actorContext) const { + if (SubscriberActor) { + actorContext.Send(SubscriberActor, new TEvents::TEvPoison()); + } + } + +private: + void CreateDatabaseSubscriberActor(TActorContext actorContext); + +private: + std::unordered_map DatabasesCache; + TActorId SubscriberActor; +}; + } // namespace NKikimr::NKqp diff --git a/ydb/core/kqp/proxy_service/ya.make b/ydb/core/kqp/proxy_service/ya.make index e1c2b9e1b76a..8a143789e701 100644 --- a/ydb/core/kqp/proxy_service/ya.make +++ b/ydb/core/kqp/proxy_service/ya.make @@ -2,6 +2,7 @@ LIBRARY() SRCS( kqp_proxy_service.cpp + kqp_proxy_databases_cache.cpp kqp_proxy_peer_stats_calculator.cpp kqp_script_executions.cpp kqp_session_info.cpp diff --git a/ydb/core/kqp/workload_service/actors/scheme_actors.cpp b/ydb/core/kqp/workload_service/actors/scheme_actors.cpp index 781d9f4a6eca..9c9d400c9c37 100644 --- a/ydb/core/kqp/workload_service/actors/scheme_actors.cpp +++ b/ydb/core/kqp/workload_service/actors/scheme_actors.cpp @@ -489,6 +489,7 @@ class TDatabaseFetcherActor : public TSchemeActorBase { } if (result.DomainInfo) { Serverless = result.DomainInfo->IsServerless(); + PathId = result.DomainInfo->DomainKey; } Reply(Ydb::StatusIds::SUCCESS); return; @@ -537,7 +538,7 @@ class TDatabaseFetcherActor : public TSchemeActorBase { } Issues.AddIssues(std::move(issues)); - Send(ReplyActorId, new TEvPrivate::TEvFetchDatabaseResponse(status, Database, Serverless, std::move(Issues))); + Send(ReplyActorId, new TEvPrivate::TEvFetchDatabaseResponse(status, Database, Serverless, PathId, std::move(Issues))); PassAway(); } @@ -560,6 +561,7 @@ class TDatabaseFetcherActor : public TSchemeActorBase { const NACLib::EAccessRights CheckAccess; bool Serverless = false; + TPathId PathId; }; } // anonymous namespace diff --git a/ydb/core/kqp/workload_service/common/events.h b/ydb/core/kqp/workload_service/common/events.h index df821a4c26d6..3af9abc8b4c1 100644 --- a/ydb/core/kqp/workload_service/common/events.h +++ b/ydb/core/kqp/workload_service/common/events.h @@ -95,16 +95,18 @@ struct TEvPrivate { }; struct TEvFetchDatabaseResponse : public NActors::TEventLocal { - TEvFetchDatabaseResponse(Ydb::StatusIds::StatusCode status, const TString& database, bool serverless, NYql::TIssues issues) + TEvFetchDatabaseResponse(Ydb::StatusIds::StatusCode status, const TString& database, bool serverless, TPathId pathId, NYql::TIssues issues) : Status(status) , Database(database) , Serverless(serverless) + , PathId(pathId) , Issues(std::move(issues)) {} const Ydb::StatusIds::StatusCode Status; const TString Database; const bool Serverless; + const TPathId PathId; const NYql::TIssues Issues; }; diff --git a/ydb/core/kqp/workload_service/kqp_workload_service_impl.h b/ydb/core/kqp/workload_service/kqp_workload_service_impl.h index 813f97b6e107..6e849eb07e86 100644 --- a/ydb/core/kqp/workload_service/kqp_workload_service_impl.h +++ b/ydb/core/kqp/workload_service/kqp_workload_service_impl.h @@ -77,10 +77,6 @@ struct TDatabaseState { return; } - if (Serverless != ev->Get()->Serverless) { - ActorContext.Send(MakeKqpProxyID(ActorContext.SelfID.NodeId()), new TEvUpdateDatabaseInfo(ev->Get()->Database, ev->Get()->Serverless)); - } - LastUpdateTime = TInstant::Now(); Serverless = ev->Get()->Serverless; StartPendingRequests(); diff --git a/ydb/core/protos/kqp.proto b/ydb/core/protos/kqp.proto index c9f4cc7442d0..a4c6906783b5 100644 --- a/ydb/core/protos/kqp.proto +++ b/ydb/core/protos/kqp.proto @@ -117,6 +117,7 @@ message TQueryRequest { optional string UserSID = 33; optional uint64 OutputChunkMaxSize = 34; optional string PoolId = 35; + optional string DatabaseId = 36; } message TKqpPathIdProto { From 2a4923ca0b1028fcc74292c6c2c07cd3b71932c9 Mon Sep 17 00:00:00 2001 From: Grigoriy Pisarenko Date: Mon, 23 Sep 2024 15:52:24 +0000 Subject: [PATCH 02/15] Fixed unit tests --- ydb/core/kqp/proxy_service/kqp_proxy_databases_cache.cpp | 6 +++++- ydb/core/kqp/proxy_service/kqp_proxy_service.cpp | 2 ++ ydb/core/kqp/proxy_service/kqp_proxy_service_impl.h | 8 ++++++-- ydb/core/kqp/proxy_service/ut/ya.make | 1 + .../kqp/workload_service/ut/kqp_workload_service_ut.cpp | 4 ++-- ydb/tests/tools/kqprun/configuration/app_config.conf | 1 + 6 files changed, 17 insertions(+), 5 deletions(-) diff --git a/ydb/core/kqp/proxy_service/kqp_proxy_databases_cache.cpp b/ydb/core/kqp/proxy_service/kqp_proxy_databases_cache.cpp index 9f54f4365ed3..d3062695db3d 100644 --- a/ydb/core/kqp/proxy_service/kqp_proxy_databases_cache.cpp +++ b/ydb/core/kqp/proxy_service/kqp_proxy_databases_cache.cpp @@ -127,7 +127,11 @@ class TDatabaseSubscriberActor : public TActor { if (status == Ydb::StatusIds::SUCCESS || status == Ydb::StatusIds::UNSUPPORTED) { Send(subscriber, new TEvKqp::TEvUpdateDatabaseInfo(database, databaseState.DatabaseId, databaseState.Serverless)); } else { - Send(subscriber, new TEvKqp::TEvUpdateDatabaseInfo(database, status, std::move(issues))); + NYql::TIssue rootIssue(TStringBuilder() << "Failed to describe database" << database); + for (const auto& issue : issues) { + rootIssue.AddSubIssue(MakeIntrusive(issue)); + } + Send(subscriber, new TEvKqp::TEvUpdateDatabaseInfo(database, status, {rootIssue})); } } diff --git a/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp b/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp index 7e6ec315071d..11da07090688 100644 --- a/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp +++ b/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp @@ -646,6 +646,8 @@ class TKqpProxyService : public TActorBootstrapped { return; } + Cerr << "------------------------------- DatabaseId: " << ev->Get()->GetDatabaseId() << "\n"; + const TString& database = ev->Get()->GetDatabase(); const TString& traceId = ev->Get()->GetTraceId(); const auto queryType = ev->Get()->GetType(); diff --git a/ydb/core/kqp/proxy_service/kqp_proxy_service_impl.h b/ydb/core/kqp/proxy_service/kqp_proxy_service_impl.h index acc17f1141bd..8b039f07a1ba 100644 --- a/ydb/core/kqp/proxy_service/kqp_proxy_service_impl.h +++ b/ydb/core/kqp/proxy_service/kqp_proxy_service_impl.h @@ -662,7 +662,7 @@ class TDatabasesCache { return true; } - const auto& database = event->Get()->GetDatabase(); + const auto& database = CanonizePath(event->Get()->GetDatabase()); auto& databaseInfo = DatabasesCache[database]; if (databaseInfo.DatabaseId) { event->Get()->SetDatabaseId(databaseInfo.DatabaseId); @@ -682,7 +682,11 @@ class TDatabasesCache { } void UpdateDatabaseInfo(TEvKqp::TEvUpdateDatabaseInfo::TPtr& event, TActorContext actorContext) { - auto it = DatabasesCache.find(event->Get()->Database); + const auto& database = event->Get()->Database; + auto it = DatabasesCache.find(database); + if (it == DatabasesCache.end()) { + it = DatabasesCache.insert({database, TDatabaseInfo{}}).first; + } it->second.DatabaseId = event->Get()->DatabaseId; bool success = event->Get()->Status == Ydb::StatusIds::SUCCESS; diff --git a/ydb/core/kqp/proxy_service/ut/ya.make b/ydb/core/kqp/proxy_service/ut/ya.make index 730f59a3fcae..fc6d9e7c89cb 100644 --- a/ydb/core/kqp/proxy_service/ut/ya.make +++ b/ydb/core/kqp/proxy_service/ut/ya.make @@ -13,6 +13,7 @@ PEERDIR( ydb/core/kqp/run_script_actor ydb/core/kqp/proxy_service ydb/core/kqp/ut/common + ydb/core/kqp/workload_service/ut/common ydb/library/yql/sql/pg_dummy ydb/public/sdk/cpp/client/ydb_query ydb/public/sdk/cpp/client/ydb_driver diff --git a/ydb/core/kqp/workload_service/ut/kqp_workload_service_ut.cpp b/ydb/core/kqp/workload_service/ut/kqp_workload_service_ut.cpp index cad50ad143da..39618cce2b00 100644 --- a/ydb/core/kqp/workload_service/ut/kqp_workload_service_ut.cpp +++ b/ydb/core/kqp/workload_service/ut/kqp_workload_service_ut.cpp @@ -635,7 +635,7 @@ Y_UNIT_TEST_SUITE(ResourcePoolClassifiersDdl) { } void WaitForFail(TIntrusivePtr ydb, const TQueryRunnerSettings& settings, const TString& poolId) { - ydb->WaitFor(TDuration::Seconds(5), "Resource pool classifier fail", [ydb, settings, poolId](TString& errorString) { + ydb->WaitFor(TDuration::Seconds(10), "Resource pool classifier fail", [ydb, settings, poolId](TString& errorString) { auto result = ydb->ExecuteQuery(TSampleQueries::TSelect42::Query, settings); errorString = result.GetIssues().ToOneLineString(); @@ -644,7 +644,7 @@ Y_UNIT_TEST_SUITE(ResourcePoolClassifiersDdl) { } void WaitForSuccess(TIntrusivePtr ydb, const TQueryRunnerSettings& settings) { - ydb->WaitFor(TDuration::Seconds(5), "Resource pool classifier success", [ydb, settings](TString& errorString) { + ydb->WaitFor(TDuration::Seconds(10), "Resource pool classifier success", [ydb, settings](TString& errorString) { auto result = ydb->ExecuteQuery(TSampleQueries::TSelect42::Query, settings); errorString = result.GetIssues().ToOneLineString(); diff --git a/ydb/tests/tools/kqprun/configuration/app_config.conf b/ydb/tests/tools/kqprun/configuration/app_config.conf index 1c9c56cebd20..1b48b9cb2e3b 100644 --- a/ydb/tests/tools/kqprun/configuration/app_config.conf +++ b/ydb/tests/tools/kqprun/configuration/app_config.conf @@ -53,6 +53,7 @@ FeatureFlags { EnableScriptExecutionOperations: true EnableExternalSourceSchemaInference: true EnableTempTables: true + EnableResourcePools: false } KQPConfig { From 3d1fe9c5c207aef9d912fe1f8fc5bf4862142e0b Mon Sep 17 00:00:00 2001 From: Grigoriy Pisarenko Date: Mon, 23 Sep 2024 16:05:18 +0000 Subject: [PATCH 03/15] Fixed empty database path --- ydb/core/kqp/proxy_service/kqp_proxy_service_impl.h | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ydb/core/kqp/proxy_service/kqp_proxy_service_impl.h b/ydb/core/kqp/proxy_service/kqp_proxy_service_impl.h index 8b039f07a1ba..e1737f396a37 100644 --- a/ydb/core/kqp/proxy_service/kqp_proxy_service_impl.h +++ b/ydb/core/kqp/proxy_service/kqp_proxy_service_impl.h @@ -658,11 +658,11 @@ class TDatabasesCache { public: template bool SetDatabaseIdOrDeffer(TEvent& event, std::function errorHandler, TActorContext actorContext) { - if (!event->Get()->GetDatabaseId().empty()) { + const auto& database = CanonizePath(event->Get()->GetDatabase()); + if (!event->Get()->GetDatabaseId().empty() || database.empty()) { return true; } - const auto& database = CanonizePath(event->Get()->GetDatabase()); auto& databaseInfo = DatabasesCache[database]; if (databaseInfo.DatabaseId) { event->Get()->SetDatabaseId(databaseInfo.DatabaseId); From daed4e5927220686220a20970143abb59ab79a2f Mon Sep 17 00:00:00 2001 From: Grigoriy Pisarenko Date: Mon, 23 Sep 2024 18:31:19 +0000 Subject: [PATCH 04/15] Added unit test --- .../kqp/proxy_service/kqp_proxy_service.cpp | 2 - ydb/core/kqp/proxy_service/kqp_proxy_ut.cpp | 86 +++++++++++++++++++ 2 files changed, 86 insertions(+), 2 deletions(-) diff --git a/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp b/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp index 11da07090688..7e6ec315071d 100644 --- a/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp +++ b/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp @@ -646,8 +646,6 @@ class TKqpProxyService : public TActorBootstrapped { return; } - Cerr << "------------------------------- DatabaseId: " << ev->Get()->GetDatabaseId() << "\n"; - const TString& database = ev->Get()->GetDatabase(); const TString& traceId = ev->Get()->GetTraceId(); const auto queryType = ev->Get()->GetType(); diff --git a/ydb/core/kqp/proxy_service/kqp_proxy_ut.cpp b/ydb/core/kqp/proxy_service/kqp_proxy_ut.cpp index 883ec7d9198e..d61d643e0588 100644 --- a/ydb/core/kqp/proxy_service/kqp_proxy_ut.cpp +++ b/ydb/core/kqp/proxy_service/kqp_proxy_ut.cpp @@ -2,7 +2,9 @@ #include #include #include +#include #include +#include #include #include #include @@ -65,6 +67,63 @@ TString CreateSession(TTestActorRuntime* runtime, const TActorId& kqpProxy, cons return sessionId; } +class TDatabaseCacheTestActor : public TActorBootstrapped { +public: + TDatabaseCacheTestActor(const TString& database, const TString& expectedDatabaseId, bool fromCache, TDatabasesCache& cache, NThreading::TPromise promise) + : Database(database) + , ExpectedDatabaseId(expectedDatabaseId) + , Cache(cache) + , Promise(promise) + , FromCache(fromCache) + {} + + void Bootstrap() { + Become(&TDatabaseCacheTestActor::StateFunc); + + auto event = MakeHolder(); + event->Record.MutableRequest()->SetDatabase(Database); + Send(SelfId(), event.Release()); + } + + void Handle(TEvKqp::TEvUpdateDatabaseInfo::TPtr& ev) { + Cache.UpdateDatabaseInfo(ev, ActorContext()); + } + + void Handle(TEvKqp::TEvQueryRequest::TPtr& ev) { + auto success = Cache.SetDatabaseIdOrDeffer(ev, [this](Ydb::StatusIds::StatusCode status, NYql::TIssues issues){ + UNIT_ASSERT_C(false, TStringBuilder() << "Unexpected fail, " << GetErrorString() << ", status: " << status << ", reason: " << issues.ToOneLineString()); + }, ActorContext()); + + if (FromCache) { + UNIT_ASSERT_C(success, TStringBuilder() << "Expected database id from cache, " << GetErrorString()); + UNIT_ASSERT_STRING_CONTAINS_C(ev->Get()->GetDatabaseId(), ExpectedDatabaseId, GetErrorString()); + Promise.SetValue(); + PassAway(); + } else { + UNIT_ASSERT_C(!success, TStringBuilder() << "Unexpected database id from cache, " << GetErrorString()); + FromCache = true; + } + } + + STRICT_STFUNC(StateFunc, + hFunc(TEvKqp::TEvUpdateDatabaseInfo, Handle); + hFunc(TEvKqp::TEvQueryRequest, Handle); + ) + +private: + TString GetErrorString() const { + return TStringBuilder() << "database: " << Database << ", from cache: " << FromCache << "\n"; + } + +private: + const TString Database; + const TString ExpectedDatabaseId; + TDatabasesCache& Cache; + NThreading::TPromise Promise; + + bool FromCache = false; +}; + } Y_UNIT_TEST_SUITE(KqpProxy) { @@ -542,5 +601,32 @@ Y_UNIT_TEST_SUITE(KqpProxy) { UNIT_ASSERT(allDoneOk); } + + Y_UNIT_TEST(DatabasesCacheForServerless) { + auto ydb = NWorkload::TYdbSetupSettings() + .CreateSampleTenants(true) + .Create(); + + auto& runtime = *ydb->GetRuntime(); + TDatabasesCache cache; + + auto checkCache = [&](const TString& database, const TString& expectedDatabaseId, bool fromCache) { + auto promise = NThreading::NewPromise(); + runtime.Register(new TDatabaseCacheTestActor(database, expectedDatabaseId, fromCache, cache, promise)); + promise.GetFuture().GetValueSync(); + }; + + const auto& dedicatedTennant = ydb->GetSettings().GetDedicatedTenantName(); + checkCache(dedicatedTennant, dedicatedTennant, false); + checkCache(dedicatedTennant, dedicatedTennant, true); + + const auto& sharedTennant = ydb->GetSettings().GetSharedTenantName(); + checkCache(sharedTennant, sharedTennant, false); + checkCache(sharedTennant, sharedTennant, true); + + const auto& serverlessTennant = ydb->GetSettings().GetServerlessTenantName(); + checkCache(serverlessTennant, TStringBuilder() << ":4:" << serverlessTennant, false); + checkCache(serverlessTennant, TStringBuilder() << ":4:" << serverlessTennant, true); + } } // namspace NKqp } // namespace NKikimr From 66734c8d9e36c5bdaf33feb403a809d2145ecd00 Mon Sep 17 00:00:00 2001 From: Grigoriy Pisarenko Date: Mon, 23 Sep 2024 18:42:45 +0000 Subject: [PATCH 05/15] Removed kqprun chenges --- ydb/tests/tools/kqprun/configuration/app_config.conf | 1 - 1 file changed, 1 deletion(-) diff --git a/ydb/tests/tools/kqprun/configuration/app_config.conf b/ydb/tests/tools/kqprun/configuration/app_config.conf index 1b48b9cb2e3b..1c9c56cebd20 100644 --- a/ydb/tests/tools/kqprun/configuration/app_config.conf +++ b/ydb/tests/tools/kqprun/configuration/app_config.conf @@ -53,7 +53,6 @@ FeatureFlags { EnableScriptExecutionOperations: true EnableExternalSourceSchemaInference: true EnableTempTables: true - EnableResourcePools: false } KQPConfig { From 0a8071c536b9768bed0533a145058f63b8ca38f2 Mon Sep 17 00:00:00 2001 From: Grigoriy Pisarenko Date: Tue, 24 Sep 2024 07:59:32 +0000 Subject: [PATCH 06/15] Fixed typos --- ydb/core/kqp/proxy_service/kqp_proxy_databases_cache.cpp | 2 +- ydb/core/kqp/proxy_service/kqp_proxy_service_impl.h | 2 +- ydb/core/kqp/proxy_service/kqp_proxy_ut.cpp | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/ydb/core/kqp/proxy_service/kqp_proxy_databases_cache.cpp b/ydb/core/kqp/proxy_service/kqp_proxy_databases_cache.cpp index d3062695db3d..e6f2fa403d7b 100644 --- a/ydb/core/kqp/proxy_service/kqp_proxy_databases_cache.cpp +++ b/ydb/core/kqp/proxy_service/kqp_proxy_databases_cache.cpp @@ -127,7 +127,7 @@ class TDatabaseSubscriberActor : public TActor { if (status == Ydb::StatusIds::SUCCESS || status == Ydb::StatusIds::UNSUPPORTED) { Send(subscriber, new TEvKqp::TEvUpdateDatabaseInfo(database, databaseState.DatabaseId, databaseState.Serverless)); } else { - NYql::TIssue rootIssue(TStringBuilder() << "Failed to describe database" << database); + NYql::TIssue rootIssue(TStringBuilder() << "Failed to describe database " << database); for (const auto& issue : issues) { rootIssue.AddSubIssue(MakeIntrusive(issue)); } diff --git a/ydb/core/kqp/proxy_service/kqp_proxy_service_impl.h b/ydb/core/kqp/proxy_service/kqp_proxy_service_impl.h index e1737f396a37..10c6ccbeb12b 100644 --- a/ydb/core/kqp/proxy_service/kqp_proxy_service_impl.h +++ b/ydb/core/kqp/proxy_service/kqp_proxy_service_impl.h @@ -656,7 +656,7 @@ class TDatabasesCache { }; public: - template + template bool SetDatabaseIdOrDeffer(TEvent& event, std::function errorHandler, TActorContext actorContext) { const auto& database = CanonizePath(event->Get()->GetDatabase()); if (!event->Get()->GetDatabaseId().empty() || database.empty()) { diff --git a/ydb/core/kqp/proxy_service/kqp_proxy_ut.cpp b/ydb/core/kqp/proxy_service/kqp_proxy_ut.cpp index d61d643e0588..ebaad532350f 100644 --- a/ydb/core/kqp/proxy_service/kqp_proxy_ut.cpp +++ b/ydb/core/kqp/proxy_service/kqp_proxy_ut.cpp @@ -90,7 +90,7 @@ class TDatabaseCacheTestActor : public TActorBootstrapped Date: Tue, 24 Sep 2024 14:46:14 +0000 Subject: [PATCH 07/15] Removed unused events --- ydb/core/kqp/common/events/events.h | 30 -------- ydb/core/kqp/common/events/workload_service.h | 27 +++++++ ydb/core/kqp/common/events/ya.make | 1 + ydb/core/kqp/common/simple/kqp_event_ids.h | 4 +- .../resource_pool_classifier/checker.cpp | 30 +++++++- .../kqp_proxy_databases_cache.cpp | 73 ++++++++++++++++++- .../proxy_service/kqp_proxy_service_impl.h | 50 ++++--------- .../workload_service/actors/scheme_actors.cpp | 2 +- ydb/core/kqp/workload_service/common/events.h | 34 --------- .../workload_service/kqp_workload_service.cpp | 4 +- .../kqp_workload_service_impl.h | 6 +- 11 files changed, 149 insertions(+), 112 deletions(-) diff --git a/ydb/core/kqp/common/events/events.h b/ydb/core/kqp/common/events/events.h index ab88138b306b..1cef6b5b54ea 100644 --- a/ydb/core/kqp/common/events/events.h +++ b/ydb/core/kqp/common/events/events.h @@ -173,36 +173,6 @@ struct TEvKqp { return issues; } }; - - struct TEvSubscribeOnDatabase : public TEventLocal { - explicit TEvSubscribeOnDatabase(const TString& database) - : Database(database) - {} - - TString Database; - }; - - struct TEvUpdateDatabaseInfo : public TEventLocal { - TEvUpdateDatabaseInfo(const TString& database, Ydb::StatusIds::StatusCode status, NYql::TIssues issues) - : Status(status) - , Database(database) - , Issues(std::move(issues)) - {} - - TEvUpdateDatabaseInfo(const TString& database, const TString& databaseId, bool serverless) - : Status(Ydb::StatusIds::SUCCESS) - , Database(database) - , DatabaseId(databaseId) - , Serverless(serverless) - , Issues({}) - {} - - Ydb::StatusIds::StatusCode Status; - TString Database; - TString DatabaseId; - bool Serverless = false; - NYql::TIssues Issues; - }; }; } // namespace NKikimr::NKqp diff --git a/ydb/core/kqp/common/events/workload_service.h b/ydb/core/kqp/common/events/workload_service.h index 674be7854510..899e542eb56f 100644 --- a/ydb/core/kqp/common/events/workload_service.h +++ b/ydb/core/kqp/common/events/workload_service.h @@ -2,6 +2,7 @@ #include #include +#include #include #include @@ -90,4 +91,30 @@ struct TEvUpdatePoolInfo : public NActors::TEventLocal SecurityObject; }; +struct TEvUpdateDatabaseInfo : public NActors::TEventLocal { + TEvUpdateDatabaseInfo(const TString& database, bool serverless) + : Database(database) + , Serverless(serverless) + {} + + const TString Database; + const bool Serverless; +}; + +struct TEvFetchDatabaseResponse : public NActors::TEventLocal { + TEvFetchDatabaseResponse(Ydb::StatusIds::StatusCode status, const TString& database, bool serverless, TPathId pathId, NYql::TIssues issues) + : Status(status) + , Database(database) + , Serverless(serverless) + , PathId(pathId) + , Issues(std::move(issues)) + {} + + const Ydb::StatusIds::StatusCode Status; + const TString Database; + const bool Serverless; + const TPathId PathId; + const NYql::TIssues Issues; +}; + } // NKikimr::NKqp::NWorkload diff --git a/ydb/core/kqp/common/events/ya.make b/ydb/core/kqp/common/events/ya.make index 9729e9518eed..a442cb07ea7c 100644 --- a/ydb/core/kqp/common/events/ya.make +++ b/ydb/core/kqp/common/events/ya.make @@ -15,6 +15,7 @@ PEERDIR( ydb/core/kqp/common/shutdown ydb/core/kqp/common/compilation ydb/core/resource_pools + ydb/core/scheme ydb/library/yql/dq/actors ydb/public/api/protos diff --git a/ydb/core/kqp/common/simple/kqp_event_ids.h b/ydb/core/kqp/common/simple/kqp_event_ids.h index 8f187ca69d7a..ef759894f5f2 100644 --- a/ydb/core/kqp/common/simple/kqp_event_ids.h +++ b/ydb/core/kqp/common/simple/kqp_event_ids.h @@ -44,8 +44,6 @@ struct TKqpEvents { EvListSessionsResponse, EvListProxyNodesRequest, EvListProxyNodesResponse, - EvSubscribeOnDatabase, - EvUpdateDatabaseInfo }; static_assert (EvCompileInvalidateRequest + 1 == EvAbortExecution); @@ -177,6 +175,8 @@ struct TKqpWorkloadServiceEvents { EvCleanupResponse, EvUpdatePoolInfo, EvSubscribeOnPoolChanges, + EvUpdateDatabaseInfo, + EvFetchDatabaseResponse, }; }; diff --git a/ydb/core/kqp/gateway/behaviour/resource_pool_classifier/checker.cpp b/ydb/core/kqp/gateway/behaviour/resource_pool_classifier/checker.cpp index 5b6bae22b411..497b6e0a41ea 100644 --- a/ydb/core/kqp/gateway/behaviour/resource_pool_classifier/checker.cpp +++ b/ydb/core/kqp/gateway/behaviour/resource_pool_classifier/checker.cpp @@ -4,7 +4,6 @@ #include #include #include -#include #include #include @@ -20,6 +19,31 @@ using namespace NResourcePool; using namespace NWorkload; +struct TEvPrivate { + // Event ids + enum EEv : ui32 { + EvRanksCheckerResponse = EventSpaceBegin(NActors::TEvents::ES_PRIVATE), + + EvEnd + }; + + static_assert(EvEnd < EventSpaceEnd(NActors::TEvents::ES_PRIVATE), "expect EvEnd < EventSpaceEnd(NActors::TEvents::ES_PRIVATE)"); + + struct TEvRanksCheckerResponse : public TEventLocal { + TEvRanksCheckerResponse(Ydb::StatusIds::StatusCode status, i64 maxRank, ui64 numberClassifiers, NYql::TIssues issues) + : Status(status) + , MaxRank(maxRank) + , NumberClassifiers(numberClassifiers) + , Issues(std::move(issues)) + {} + + const Ydb::StatusIds::StatusCode Status; + const i64 MaxRank; + const ui64 NumberClassifiers; + const NYql::TIssues Issues; + }; +}; + class TRanksCheckerActor : public NKikimr::TQueryBase { using TBase = NKikimr::TQueryBase; @@ -177,7 +201,7 @@ class TResourcePoolClassifierPreparationActor : public TActorBootstrappedGet()->Status != Ydb::StatusIds::SUCCESS) { FailAndPassAway("Database check failed", ev->Get()->Status, ev->Get()->Issues); return; @@ -223,7 +247,7 @@ class TResourcePoolClassifierPreparationActor : public TActorBootstrapped { + explicit TEvSubscribeOnDatabase(const TString& database) + : Database(database) + {} + + TString Database; + }; + + struct TEvPingDatabaseSubscription : public TEventLocal { + explicit TEvPingDatabaseSubscription(const TString& database) + : Database(database) + {} + + TString Database; + }; +}; + class TDatabaseSubscriberActor : public TActor { using TBase = TActor; @@ -27,7 +56,7 @@ class TDatabaseSubscriberActor : public TActor { : TBase(&TDatabaseSubscriberActor::StateFunc) {} - void Handle(TEvKqp::TEvSubscribeOnDatabase::TPtr& ev) { + void Handle(TEvPrivate::TEvSubscribeOnDatabase::TPtr& ev) { const TString& database = CanonizePath(ev->Get()->Database); auto& databaseState = Subscriptions[database]; @@ -41,7 +70,7 @@ class TDatabaseSubscriberActor : public TActor { databaseState.Subscribers.insert(ev->Sender); } - void Handle(NWorkload::TEvPrivate::TEvFetchDatabaseResponse::TPtr& ev) { + void Handle(NWorkload::TEvFetchDatabaseResponse::TPtr& ev) { const TString& database = CanonizePath(ev->Get()->Database); auto& databaseState = Subscriptions[database]; @@ -104,8 +133,8 @@ class TDatabaseSubscriberActor : public TActor { } STRICT_STFUNC(StateFunc, - hFunc(TEvKqp::TEvSubscribeOnDatabase, Handle); - hFunc(NWorkload::TEvPrivate::TEvFetchDatabaseResponse, Handle); + hFunc(TEvPrivate::TEvSubscribeOnDatabase, Handle); + hFunc(NWorkload::TEvFetchDatabaseResponse, Handle); hFunc(TEvTxProxySchemeCache::TEvWatchNotifyUpdated, Handle); hFunc(TEvTxProxySchemeCache::TEvWatchNotifyDeleted, Handle); sFunc(TEvents::TEvPoison, HandlePoison); @@ -143,6 +172,42 @@ class TDatabaseSubscriberActor : public TActor { } // anonymous namespace +const TString& TDatabasesCache::GetTenantName() { + if (!TenantName) { + TenantName = CanonizePath(AppData()->TenantName); + } + return TenantName; +} + +void TDatabasesCache::UpdateDatabaseInfo(TEvKqp::TEvUpdateDatabaseInfo::TPtr& event, TActorContext actorContext) { + const auto& database = event->Get()->Database; + auto it = DatabasesCache.find(database); + if (it == DatabasesCache.end()) { + it = DatabasesCache.insert({database, TDatabaseInfo{}}).first; + } + it->second.DatabaseId = event->Get()->DatabaseId; + + bool success = event->Get()->Status == Ydb::StatusIds::SUCCESS; + for (auto& delayedEvent : it->second.DelayedEvents) { + if (success) { + actorContext.Send(std::move(delayedEvent.Event)); + } else { + delayedEvent.ErrorHandler(event->Get()->Status, event->Get()->Issues); + } + } + it->second.DelayedEvents.clear(); + + if (!success) { + DatabasesCache.erase(it); + } +} + +void TDatabasesCache::StopSubscriberActor(TActorContext actorContext) const { + if (SubscriberActor) { + actorContext.Send(SubscriberActor, new TEvents::TEvPoison()); + } +} + void TDatabasesCache::CreateDatabaseSubscriberActor(TActorContext actorContext) { SubscriberActor = actorContext.Register(new TDatabaseSubscriberActor()); } diff --git a/ydb/core/kqp/proxy_service/kqp_proxy_service_impl.h b/ydb/core/kqp/proxy_service/kqp_proxy_service_impl.h index 10c6ccbeb12b..384ae6983150 100644 --- a/ydb/core/kqp/proxy_service/kqp_proxy_service_impl.h +++ b/ydb/core/kqp/proxy_service/kqp_proxy_service_impl.h @@ -658,64 +658,44 @@ class TDatabasesCache { public: template bool SetDatabaseIdOrDeffer(TEvent& event, std::function errorHandler, TActorContext actorContext) { + if (!event->Get()->GetDatabaseId().empty()) { + return true; + } + const auto& database = CanonizePath(event->Get()->GetDatabase()); - if (!event->Get()->GetDatabaseId().empty() || database.empty()) { + if (database.empty() || database == GetTenantName()) { + event->Get()->SetDatabaseId(GetTenantName()); return true; } auto& databaseInfo = DatabasesCache[database]; if (databaseInfo.DatabaseId) { + PingDatabaseSubscription(database, actorContext); event->Get()->SetDatabaseId(databaseInfo.DatabaseId); return true; } - if (!SubscriberActor) { - CreateDatabaseSubscriberActor(actorContext); - } - - actorContext.Send(SubscriberActor, new TEvKqp::TEvSubscribeOnDatabase(database)); + SubscribeOnDatabase(database, actorContext); databaseInfo.DelayedEvents.push_back({ .Event = std::move(event), .ErrorHandler = errorHandler }); - return false; - } - - void UpdateDatabaseInfo(TEvKqp::TEvUpdateDatabaseInfo::TPtr& event, TActorContext actorContext) { - const auto& database = event->Get()->Database; - auto it = DatabasesCache.find(database); - if (it == DatabasesCache.end()) { - it = DatabasesCache.insert({database, TDatabaseInfo{}}).first; - } - it->second.DatabaseId = event->Get()->DatabaseId; - bool success = event->Get()->Status == Ydb::StatusIds::SUCCESS; - for (auto& delayedEvent : it->second.DelayedEvents) { - if (success) { - actorContext.Send(std::move(delayedEvent.Event)); - } else { - delayedEvent.ErrorHandler(event->Get()->Status, event->Get()->Issues); - } - } - it->second.DelayedEvents.clear(); - - if (!success) { - DatabasesCache.erase(it); - } + return false; } - void StopSubscriberActor(TActorContext actorContext) const { - if (SubscriberActor) { - actorContext.Send(SubscriberActor, new TEvents::TEvPoison()); - } - } + void UpdateDatabaseInfo(TEvKqp::TEvUpdateDatabaseInfo::TPtr& event, TActorContext actorContext); + void StopSubscriberActor(TActorContext actorContext) const; private: - void CreateDatabaseSubscriberActor(TActorContext actorContext); + const TString& GetTenantName(); + void SubscribeOnDatabase(const TString& database, TActorContext actorContext); + void PingDatabaseSubscription(const TString& database, TActorContext actorContext) const; private: std::unordered_map DatabasesCache; TActorId SubscriberActor; + TString TenantName; }; } // namespace NKikimr::NKqp diff --git a/ydb/core/kqp/workload_service/actors/scheme_actors.cpp b/ydb/core/kqp/workload_service/actors/scheme_actors.cpp index 9c9d400c9c37..e7cb25220418 100644 --- a/ydb/core/kqp/workload_service/actors/scheme_actors.cpp +++ b/ydb/core/kqp/workload_service/actors/scheme_actors.cpp @@ -538,7 +538,7 @@ class TDatabaseFetcherActor : public TSchemeActorBase { } Issues.AddIssues(std::move(issues)); - Send(ReplyActorId, new TEvPrivate::TEvFetchDatabaseResponse(status, Database, Serverless, PathId, std::move(Issues))); + Send(ReplyActorId, new TEvFetchDatabaseResponse(status, Database, Serverless, PathId, std::move(Issues))); PassAway(); } diff --git a/ydb/core/kqp/workload_service/common/events.h b/ydb/core/kqp/workload_service/common/events.h index 3af9abc8b4c1..57e332ac66b3 100644 --- a/ydb/core/kqp/workload_service/common/events.h +++ b/ydb/core/kqp/workload_service/common/events.h @@ -22,7 +22,6 @@ struct TEvPrivate { EvRefreshPoolState = EventSpaceBegin(NActors::TEvents::ES_PRIVATE), EvResolvePoolResponse, EvFetchPoolResponse, - EvFetchDatabaseResponse, EvCreatePoolResponse, EvPrepareTablesRequest, EvPlaceRequestIntoPoolResponse, @@ -47,8 +46,6 @@ struct TEvPrivate { EvStartRequestResponse, EvCleanupRequestsResponse, - EvRanksCheckerResponse, - EvEnd }; @@ -94,22 +91,6 @@ struct TEvPrivate { const NYql::TIssues Issues; }; - struct TEvFetchDatabaseResponse : public NActors::TEventLocal { - TEvFetchDatabaseResponse(Ydb::StatusIds::StatusCode status, const TString& database, bool serverless, TPathId pathId, NYql::TIssues issues) - : Status(status) - , Database(database) - , Serverless(serverless) - , PathId(pathId) - , Issues(std::move(issues)) - {} - - const Ydb::StatusIds::StatusCode Status; - const TString Database; - const bool Serverless; - const TPathId PathId; - const NYql::TIssues Issues; - }; - struct TEvCreatePoolResponse : public NActors::TEventLocal { TEvCreatePoolResponse(Ydb::StatusIds::StatusCode status, NYql::TIssues issues) : Status(status) @@ -336,21 +317,6 @@ struct TEvPrivate { const std::vector SesssionIds; const NYql::TIssues Issues; }; - - // Resource pool classifier events - struct TEvRanksCheckerResponse : public TEventLocal { - TEvRanksCheckerResponse(Ydb::StatusIds::StatusCode status, i64 maxRank, ui64 numberClassifiers, NYql::TIssues issues) - : Status(status) - , MaxRank(maxRank) - , NumberClassifiers(numberClassifiers) - , Issues(std::move(issues)) - {} - - const Ydb::StatusIds::StatusCode Status; - const i64 MaxRank; - const ui64 NumberClassifiers; - const NYql::TIssues Issues; - }; }; } // NKikimr::NKqp::NWorkload diff --git a/ydb/core/kqp/workload_service/kqp_workload_service.cpp b/ydb/core/kqp/workload_service/kqp_workload_service.cpp index 26653cda3e3e..9bcfe49f4a4e 100644 --- a/ydb/core/kqp/workload_service/kqp_workload_service.cpp +++ b/ydb/core/kqp/workload_service/kqp_workload_service.cpp @@ -214,7 +214,7 @@ class TKqpWorkloadService : public TActorBootstrapped { hFunc(TEvCleanupRequest, Handle); hFunc(TEvents::TEvWakeup, Handle); - hFunc(TEvPrivate::TEvFetchDatabaseResponse, Handle); + hFunc(TEvFetchDatabaseResponse, Handle); hFunc(TEvPrivate::TEvFetchPoolResponse, Handle); hFunc(TEvPrivate::TEvResolvePoolResponse, Handle); hFunc(TEvPrivate::TEvPlaceRequestIntoPoolResponse, Handle); @@ -231,7 +231,7 @@ class TKqpWorkloadService : public TActorBootstrapped { ) private: - void Handle(TEvPrivate::TEvFetchDatabaseResponse::TPtr& ev) { + void Handle(TEvFetchDatabaseResponse::TPtr& ev) { GetOrCreateDatabaseState(ev->Get()->Database)->UpdateDatabaseInfo(ev); } diff --git a/ydb/core/kqp/workload_service/kqp_workload_service_impl.h b/ydb/core/kqp/workload_service/kqp_workload_service_impl.h index 6e849eb07e86..f16c33336b7a 100644 --- a/ydb/core/kqp/workload_service/kqp_workload_service_impl.h +++ b/ydb/core/kqp/workload_service/kqp_workload_service_impl.h @@ -70,13 +70,17 @@ struct TDatabaseState { subscribers.clear(); } - void UpdateDatabaseInfo(const TEvPrivate::TEvFetchDatabaseResponse::TPtr& ev) { + void UpdateDatabaseInfo(const TEvFetchDatabaseResponse::TPtr& ev) { DatabaseUnsupported = ev->Get()->Status == Ydb::StatusIds::UNSUPPORTED; if (ev->Get()->Status != Ydb::StatusIds::SUCCESS) { ReplyContinueError(ev->Get()->Status, GroupIssues(ev->Get()->Issues, "Failed to fetch database info")); return; } + if (Serverless != ev->Get()->Serverless) { + ActorContext.Send(MakeKqpProxyID(ActorContext.SelfID.NodeId()), new TEvUpdateDatabaseInfo(ev->Get()->Database, ev->Get()->Serverless)); + } + LastUpdateTime = TInstant::Now(); Serverless = ev->Get()->Serverless; StartPendingRequests(); From f82aa6a5cfaf8649037b75d8038729b30af865d7 Mon Sep 17 00:00:00 2001 From: Grigoriy Pisarenko Date: Tue, 24 Sep 2024 14:46:18 +0000 Subject: [PATCH 08/15] Fixed event in kqp proxy --- ydb/core/kqp/proxy_service/kqp_proxy_service.cpp | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp b/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp index 7e6ec315071d..fc0018ee465e 100644 --- a/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp +++ b/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp @@ -1370,7 +1370,7 @@ class TKqpProxyService : public TActorBootstrapped { hFunc(TEvKqp::TEvListSessionsRequest, Handle); hFunc(TEvKqp::TEvListProxyNodesRequest, Handle); hFunc(NWorkload::TEvUpdatePoolInfo, Handle); - hFunc(TEvKqp::TEvUpdateDatabaseInfo, Handle); + hFunc(NWorkload::TEvUpdateDatabaseInfo, Handle); hFunc(NMetadata::NProvider::TEvRefreshSubscriberData, Handle); default: Y_ABORT("TKqpProxyService: unexpected event type: %" PRIx32 " event: %s", @@ -1822,11 +1822,8 @@ class TKqpProxyService : public TActorBootstrapped { ResourcePoolsCache.UpdatePoolInfo(ev->Get()->Database, ev->Get()->PoolId, ev->Get()->Config, ev->Get()->SecurityObject, ActorContext()); } - void Handle(TEvKqp::TEvUpdateDatabaseInfo::TPtr& ev) { - if (ev->Get()->Status == Ydb::StatusIds::SUCCESS) { - ResourcePoolsCache.UpdateDatabaseInfo(ev->Get()->Database, ev->Get()->Serverless); - } - DatabasesCache.UpdateDatabaseInfo(ev, ActorContext()); + void Handle(NWorkload::TEvUpdateDatabaseInfo::TPtr& ev) { + ResourcePoolsCache.UpdateDatabaseInfo(ev->Get()->Database, ev->Get()->Serverless); } void Handle(NMetadata::NProvider::TEvRefreshSubscriberData::TPtr& ev) { From 96e2943b831c40aa22854a15548d92cf9989c0f8 Mon Sep 17 00:00:00 2001 From: Grigoriy Pisarenko Date: Tue, 24 Sep 2024 14:48:06 +0000 Subject: [PATCH 09/15] Fixed database id fetching --- ydb/core/kqp/common/events/query.h | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/ydb/core/kqp/common/events/query.h b/ydb/core/kqp/common/events/query.h index e2fed5ba6fe9..960927332005 100644 --- a/ydb/core/kqp/common/events/query.h +++ b/ydb/core/kqp/common/events/query.h @@ -352,10 +352,7 @@ struct TEvQueryRequest: public NActors::TEventLocal Date: Tue, 24 Sep 2024 14:50:18 +0000 Subject: [PATCH 10/15] Removed changes in event ids --- ydb/core/kqp/common/simple/kqp_event_ids.h | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ydb/core/kqp/common/simple/kqp_event_ids.h b/ydb/core/kqp/common/simple/kqp_event_ids.h index ef759894f5f2..2e04aca262ff 100644 --- a/ydb/core/kqp/common/simple/kqp_event_ids.h +++ b/ydb/core/kqp/common/simple/kqp_event_ids.h @@ -43,7 +43,7 @@ struct TKqpEvents { EvListSessionsRequest, EvListSessionsResponse, EvListProxyNodesRequest, - EvListProxyNodesResponse, + EvListProxyNodesResponse }; static_assert (EvCompileInvalidateRequest + 1 == EvAbortExecution); @@ -174,8 +174,8 @@ struct TKqpWorkloadServiceEvents { EvCleanupRequest, EvCleanupResponse, EvUpdatePoolInfo, - EvSubscribeOnPoolChanges, EvUpdateDatabaseInfo, + EvSubscribeOnPoolChanges, EvFetchDatabaseResponse, }; }; From 3d2ef9ccf8d12029a35f409e274522b615e04538 Mon Sep 17 00:00:00 2001 From: Grigoriy Pisarenko Date: Tue, 24 Sep 2024 17:08:55 +0000 Subject: [PATCH 11/15] Added lru cache --- ydb/core/kqp/common/events/events.h | 22 ++ ydb/core/kqp/common/events/workload_service.h | 10 - ydb/core/kqp/common/simple/kqp_event_ids.h | 3 +- .../kqp_proxy_databases_cache.cpp | 204 +++++++++++------- .../kqp/proxy_service/kqp_proxy_service.cpp | 9 +- .../proxy_service/kqp_proxy_service_impl.h | 3 + ydb/core/kqp/proxy_service/kqp_proxy_ut.cpp | 67 ++++-- .../kqp_workload_service_impl.h | 4 - 8 files changed, 201 insertions(+), 121 deletions(-) diff --git a/ydb/core/kqp/common/events/events.h b/ydb/core/kqp/common/events/events.h index 1cef6b5b54ea..9dc7aac1b334 100644 --- a/ydb/core/kqp/common/events/events.h +++ b/ydb/core/kqp/common/events/events.h @@ -173,6 +173,28 @@ struct TEvKqp { return issues; } }; + + struct TEvUpdateDatabaseInfo : public TEventLocal { + TEvUpdateDatabaseInfo(const TString& database, Ydb::StatusIds::StatusCode status, NYql::TIssues issues) + : Status(status) + , Database(database) + , Issues(std::move(issues)) + {} + + TEvUpdateDatabaseInfo(const TString& database, const TString& databaseId, bool serverless) + : Status(Ydb::StatusIds::SUCCESS) + , Database(database) + , DatabaseId(databaseId) + , Serverless(serverless) + , Issues({}) + {} + + Ydb::StatusIds::StatusCode Status; + TString Database; + TString DatabaseId; + bool Serverless = false; + NYql::TIssues Issues; + }; }; } // namespace NKikimr::NKqp diff --git a/ydb/core/kqp/common/events/workload_service.h b/ydb/core/kqp/common/events/workload_service.h index 899e542eb56f..9781b951e2d5 100644 --- a/ydb/core/kqp/common/events/workload_service.h +++ b/ydb/core/kqp/common/events/workload_service.h @@ -91,16 +91,6 @@ struct TEvUpdatePoolInfo : public NActors::TEventLocal SecurityObject; }; -struct TEvUpdateDatabaseInfo : public NActors::TEventLocal { - TEvUpdateDatabaseInfo(const TString& database, bool serverless) - : Database(database) - , Serverless(serverless) - {} - - const TString Database; - const bool Serverless; -}; - struct TEvFetchDatabaseResponse : public NActors::TEventLocal { TEvFetchDatabaseResponse(Ydb::StatusIds::StatusCode status, const TString& database, bool serverless, TPathId pathId, NYql::TIssues issues) : Status(status) diff --git a/ydb/core/kqp/common/simple/kqp_event_ids.h b/ydb/core/kqp/common/simple/kqp_event_ids.h index 2e04aca262ff..986f3ddf2a7c 100644 --- a/ydb/core/kqp/common/simple/kqp_event_ids.h +++ b/ydb/core/kqp/common/simple/kqp_event_ids.h @@ -43,7 +43,8 @@ struct TKqpEvents { EvListSessionsRequest, EvListSessionsResponse, EvListProxyNodesRequest, - EvListProxyNodesResponse + EvListProxyNodesResponse, + EvUpdateDatabaseInfo }; static_assert (EvCompileInvalidateRequest + 1 == EvAbortExecution); diff --git a/ydb/core/kqp/proxy_service/kqp_proxy_databases_cache.cpp b/ydb/core/kqp/proxy_service/kqp_proxy_databases_cache.cpp index eed1cc071ef1..95fd3eb22793 100644 --- a/ydb/core/kqp/proxy_service/kqp_proxy_databases_cache.cpp +++ b/ydb/core/kqp/proxy_service/kqp_proxy_databases_cache.cpp @@ -14,164 +14,201 @@ namespace { struct TEvPrivate { // Event ids enum EEv : ui32 { - EvSubscribeOnDatabase = EventSpaceBegin(NActors::TEvents::ES_PRIVATE), + EvSubscribeOnDatabase = EventSpaceBegin(TEvents::ES_PRIVATE), EvPingDatabaseSubscription, EvEnd }; - static_assert(EvEnd < EventSpaceEnd(NActors::TEvents::ES_PRIVATE), "expect EvEnd < EventSpaceEnd(NActors::TEvents::ES_PRIVATE)"); + static_assert(EvEnd < EventSpaceEnd(TEvents::ES_PRIVATE), "expect EvEnd < EventSpaceEnd(TEvents::ES_PRIVATE)"); struct TEvSubscribeOnDatabase : public TEventLocal { explicit TEvSubscribeOnDatabase(const TString& database) : Database(database) {} - TString Database; + const TString Database; }; - struct TEvPingDatabaseSubscription : public TEventLocal { + struct TEvPingDatabaseSubscription : public TEventLocal { explicit TEvPingDatabaseSubscription(const TString& database) : Database(database) {} - TString Database; + const TString Database; }; }; class TDatabaseSubscriberActor : public TActor { - using TBase = TActor; - struct TDatabaseState { - bool FetchRequestIsRunning = false; - TPathId WatchPathId; - - TString DatabaseId; + TString Database; + TString DatabaseId = ""; bool Serverless = false; - std::unordered_set Subscribers; + + bool FetchRequestIsRunning = true; + TInstant LastUpdateTime = TInstant::Now(); + ui32 WatchKey = 0; }; + using TBase = TActor; + using TDatabaseStatePtr = typename std::list::iterator; + public: - TDatabaseSubscriberActor() + TDatabaseSubscriberActor(TDuration idleTimeout) : TBase(&TDatabaseSubscriberActor::StateFunc) + , IdleTimeout(idleTimeout) {} + void Registered(TActorSystem* sys, const TActorId& owner) { + TBase::Registered(sys, owner); + Owner = owner; + } + void Handle(TEvPrivate::TEvSubscribeOnDatabase::TPtr& ev) { - const TString& database = CanonizePath(ev->Get()->Database); - auto& databaseState = Subscriptions[database]; + const TString& database = ev->Get()->Database; + const auto it = DatabasePathToState.find(database); - if (databaseState.DatabaseId) { - SendSubscriberInfo(database, ev->Sender, databaseState, Ydb::StatusIds::SUCCESS); - } else if (!databaseState.FetchRequestIsRunning) { + if (it == DatabasePathToState.end()) { + DatabaseStates.emplace_front(TDatabaseState{.Database = database}); + DatabasePathToState.insert({database, DatabaseStates.begin()}); Register(NWorkload::CreateDatabaseFetcherActor(SelfId(), database)); - databaseState.FetchRequestIsRunning = true; + StartIdleCheck(); + return; } - databaseState.Subscribers.insert(ev->Sender); - } - - void Handle(NWorkload::TEvFetchDatabaseResponse::TPtr& ev) { - const TString& database = CanonizePath(ev->Get()->Database); - auto& databaseState = Subscriptions[database]; - - UpdateDatabaseState(databaseState, database, ev->Get()->PathId, ev->Get()->Serverless); - UpdateSubscribersInfo(database, databaseState, ev->Get()->Status, ev->Get()->Issues); - - databaseState.FetchRequestIsRunning = false; - databaseState.WatchPathId = ev->Get()->PathId; - - if (ev->Get()->Status == Ydb::StatusIds::SUCCESS) { - WatchKey++; - Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvWatchPathId(databaseState.WatchPathId, WatchKey)); - WatchDatabases.insert({WatchKey, database}); + const auto databaseState = it->second; + if (databaseState->DatabaseId) { + SendSubscriberInfo(*databaseState, Ydb::StatusIds::SUCCESS); } } - void Handle(TEvTxProxySchemeCache::TEvWatchNotifyUpdated::TPtr& ev) { - auto it = WatchDatabases.find(ev->Get()->Key); - if (it == WatchDatabases.end()) { + void Handle(TEvPrivate::TEvPingDatabaseSubscription::TPtr& ev) { + const auto it = DatabasePathToState.find(ev->Get()->Database); + if (it == DatabasePathToState.end()) { return; } - const auto& result = ev->Get()->Result; - if (!result || result->GetStatus() != NKikimrScheme::StatusSuccess) { + TDatabaseState databaseState = *it->second; + databaseState.LastUpdateTime = TInstant::Now(); + + DatabaseStates.erase(it->second); + DatabaseStates.emplace_front(databaseState); + it->second = DatabaseStates.begin(); + } + + void Handle(NWorkload::TEvFetchDatabaseResponse::TPtr& ev) { + const auto it = DatabasePathToState.find(ev->Get()->Database); + if (it == DatabasePathToState.end()) { return; } - if (result->GetPathDescription().HasDomainDescription()) { - NSchemeCache::TDomainInfo description(result->GetPathDescription().GetDomainDescription()); + const auto databaseState = it->second; + databaseState->FetchRequestIsRunning = false; + UpdateDatabaseState(*databaseState, ev->Get()->PathId, ev->Get()->Serverless); + SendSubscriberInfo(*databaseState, ev->Get()->Status, ev->Get()->Issues); - auto& databaseState = Subscriptions[it->second]; - UpdateDatabaseState(databaseState, it->second, description.DomainKey, description.IsServerless()); - UpdateSubscribersInfo(it->second, databaseState, Ydb::StatusIds::SUCCESS); + if (ev->Get()->Status == Ydb::StatusIds::SUCCESS) { + FreeWatchKey++; + databaseState->WatchKey = FreeWatchKey; + Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvWatchPathId(ev->Get()->PathId, FreeWatchKey)); } } void Handle(TEvTxProxySchemeCache::TEvWatchNotifyDeleted::TPtr& ev) { - auto it = WatchDatabases.find(ev->Get()->Key); - if (it == WatchDatabases.end()) { + const auto it = DatabasePathToState.find(ev->Get()->Path); + if (it == DatabasePathToState.end()) { return; } - Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvWatchRemove(ev->Get()->Key)); + const auto databaseState = it->second; + UnsubscribeFromSchemeCache(*databaseState); + SendSubscriberInfo(*databaseState, Ydb::StatusIds::NOT_FOUND, {NYql::TIssue{"Database was dropped"}}); + DatabasePathToState.erase(it); + DatabaseStates.erase(databaseState); + } - auto databaseStateIt = Subscriptions.find(it->second); - if (databaseStateIt != Subscriptions.end()) { - UpdateSubscribersInfo(it->second, databaseStateIt->second, Ydb::StatusIds::NOT_FOUND, {NYql::TIssue{"Database was dropped"}}); - Subscriptions.erase(databaseStateIt); + void HandlePoison() { + for (auto& databaseState : DatabaseStates) { + UnsubscribeFromSchemeCache(databaseState); } - WatchDatabases.erase(it); + TBase::PassAway(); } - void HandlePoison() { - if (!WatchDatabases.empty()) { - Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvWatchRemove(0)); + void HandleWakeup() { + IdleCheckStarted = false; + const auto minimalTime = TInstant::Now() - IdleTimeout; + while (!DatabaseStates.empty() && DatabaseStates.back().LastUpdateTime <= minimalTime) { + UnsubscribeFromSchemeCache(DatabaseStates.back()); + SendSubscriberInfo(DatabaseStates.back(), Ydb::StatusIds::ABORTED, {NYql::TIssue{"Database subscription was dropped by idle timeout"}}); + DatabasePathToState.erase(DatabaseStates.back().Database); + DatabaseStates.pop_back(); } - TBase::PassAway(); + if (!DatabaseStates.empty()) { + StartIdleCheck(); + } } STRICT_STFUNC(StateFunc, hFunc(TEvPrivate::TEvSubscribeOnDatabase, Handle); + hFunc(TEvPrivate::TEvPingDatabaseSubscription, Handle); hFunc(NWorkload::TEvFetchDatabaseResponse, Handle); - hFunc(TEvTxProxySchemeCache::TEvWatchNotifyUpdated, Handle); - hFunc(TEvTxProxySchemeCache::TEvWatchNotifyDeleted, Handle); sFunc(TEvents::TEvPoison, HandlePoison); + sFunc(TEvents::TEvWakeup, HandleWakeup); + + hFunc(TEvTxProxySchemeCache::TEvWatchNotifyDeleted, Handle); + IgnoreFunc(TEvTxProxySchemeCache::TEvWatchNotifyUpdated); ) private: - void UpdateDatabaseState(TDatabaseState& databaseState, const TString& database, TPathId pathId, bool serverless) { - databaseState.DatabaseId = (serverless ? TStringBuilder() << pathId.OwnerId << ":" << pathId.LocalPathId << ":" : TStringBuilder()) << database; + static void UpdateDatabaseState(TDatabaseState& databaseState, TPathId pathId, bool serverless) { + databaseState.DatabaseId = (serverless ? TStringBuilder() << pathId.OwnerId << ":" << pathId.LocalPathId << ":" : TStringBuilder()) << databaseState.Database; databaseState.Serverless = serverless; } - void UpdateSubscribersInfo(const TString& database, const TDatabaseState& databaseState, Ydb::StatusIds::StatusCode status, NYql::TIssues issues = {}) { - for (const auto& subscriber : databaseState.Subscribers) { - SendSubscriberInfo(database, subscriber, databaseState, status, issues); + void UnsubscribeFromSchemeCache(TDatabaseState& databaseState) const { + if (databaseState.WatchKey) { + Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvWatchRemove(databaseState.WatchKey)); + databaseState.WatchKey = 0; } } - void SendSubscriberInfo(const TString& database, TActorId subscriber, const TDatabaseState& databaseState, Ydb::StatusIds::StatusCode status, NYql::TIssues issues = {}) { + void SendSubscriberInfo(TDatabaseState& databaseState, Ydb::StatusIds::StatusCode status, NYql::TIssues issues = {}) { if (status == Ydb::StatusIds::SUCCESS || status == Ydb::StatusIds::UNSUPPORTED) { - Send(subscriber, new TEvKqp::TEvUpdateDatabaseInfo(database, databaseState.DatabaseId, databaseState.Serverless)); + Send(Owner, new TEvKqp::TEvUpdateDatabaseInfo(databaseState.Database, databaseState.DatabaseId, databaseState.Serverless)); } else { - NYql::TIssue rootIssue(TStringBuilder() << "Failed to describe database " << database); + NYql::TIssue rootIssue(TStringBuilder() << "Failed to describe database " << databaseState.Database); for (const auto& issue : issues) { rootIssue.AddSubIssue(MakeIntrusive(issue)); } - Send(subscriber, new TEvKqp::TEvUpdateDatabaseInfo(database, status, {rootIssue})); + Send(Owner, new TEvKqp::TEvUpdateDatabaseInfo(databaseState.Database, status, {rootIssue})); + } + } + + void StartIdleCheck() { + if (!IdleCheckStarted) { + IdleCheckStarted = true; + Schedule(IdleTimeout, new TEvents::TEvWakeup()); } } private: - std::unordered_map Subscriptions; - std::unordered_map WatchDatabases; - ui32 WatchKey = 0; + const TDuration IdleTimeout; + TActorId Owner; + bool IdleCheckStarted = false; + + std::unordered_map DatabasePathToState; + std::list DatabaseStates; + ui32 FreeWatchKey = 0; }; } // anonymous namespace +TDatabasesCache::TDatabasesCache(TDuration idleTimeout) + : IdleTimeout(idleTimeout) +{} + const TString& TDatabasesCache::GetTenantName() { if (!TenantName) { TenantName = CanonizePath(AppData()->TenantName); @@ -182,12 +219,10 @@ const TString& TDatabasesCache::GetTenantName() { void TDatabasesCache::UpdateDatabaseInfo(TEvKqp::TEvUpdateDatabaseInfo::TPtr& event, TActorContext actorContext) { const auto& database = event->Get()->Database; auto it = DatabasesCache.find(database); - if (it == DatabasesCache.end()) { - it = DatabasesCache.insert({database, TDatabaseInfo{}}).first; - } + Y_ABORT_UNLESS(it != DatabasesCache.end()); it->second.DatabaseId = event->Get()->DatabaseId; - bool success = event->Get()->Status == Ydb::StatusIds::SUCCESS; + const bool success = event->Get()->Status == Ydb::StatusIds::SUCCESS; for (auto& delayedEvent : it->second.DelayedEvents) { if (success) { actorContext.Send(std::move(delayedEvent.Event)); @@ -202,14 +237,23 @@ void TDatabasesCache::UpdateDatabaseInfo(TEvKqp::TEvUpdateDatabaseInfo::TPtr& ev } } -void TDatabasesCache::StopSubscriberActor(TActorContext actorContext) const { +void TDatabasesCache::SubscribeOnDatabase(const TString& database, TActorContext actorContext) { + if (!SubscriberActor) { + SubscriberActor = actorContext.Register(new TDatabaseSubscriberActor(IdleTimeout)); + } + actorContext.Send(SubscriberActor, new TEvPrivate::TEvSubscribeOnDatabase(database)); +} + +void TDatabasesCache::PingDatabaseSubscription(const TString& database, TActorContext actorContext) const { if (SubscriberActor) { - actorContext.Send(SubscriberActor, new TEvents::TEvPoison()); + actorContext.Send(SubscriberActor, new TEvPrivate::TEvPingDatabaseSubscription(database)); } } -void TDatabasesCache::CreateDatabaseSubscriberActor(TActorContext actorContext) { - SubscriberActor = actorContext.Register(new TDatabaseSubscriberActor()); +void TDatabasesCache::StopSubscriberActor(TActorContext actorContext) const { + if (SubscriberActor) { + actorContext.Send(SubscriberActor, new TEvents::TEvPoison()); + } } } // namespace NKikimr::NKqp diff --git a/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp b/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp index fc0018ee465e..7e6ec315071d 100644 --- a/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp +++ b/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp @@ -1370,7 +1370,7 @@ class TKqpProxyService : public TActorBootstrapped { hFunc(TEvKqp::TEvListSessionsRequest, Handle); hFunc(TEvKqp::TEvListProxyNodesRequest, Handle); hFunc(NWorkload::TEvUpdatePoolInfo, Handle); - hFunc(NWorkload::TEvUpdateDatabaseInfo, Handle); + hFunc(TEvKqp::TEvUpdateDatabaseInfo, Handle); hFunc(NMetadata::NProvider::TEvRefreshSubscriberData, Handle); default: Y_ABORT("TKqpProxyService: unexpected event type: %" PRIx32 " event: %s", @@ -1822,8 +1822,11 @@ class TKqpProxyService : public TActorBootstrapped { ResourcePoolsCache.UpdatePoolInfo(ev->Get()->Database, ev->Get()->PoolId, ev->Get()->Config, ev->Get()->SecurityObject, ActorContext()); } - void Handle(NWorkload::TEvUpdateDatabaseInfo::TPtr& ev) { - ResourcePoolsCache.UpdateDatabaseInfo(ev->Get()->Database, ev->Get()->Serverless); + void Handle(TEvKqp::TEvUpdateDatabaseInfo::TPtr& ev) { + if (ev->Get()->Status == Ydb::StatusIds::SUCCESS) { + ResourcePoolsCache.UpdateDatabaseInfo(ev->Get()->Database, ev->Get()->Serverless); + } + DatabasesCache.UpdateDatabaseInfo(ev, ActorContext()); } void Handle(NMetadata::NProvider::TEvRefreshSubscriberData::TPtr& ev) { diff --git a/ydb/core/kqp/proxy_service/kqp_proxy_service_impl.h b/ydb/core/kqp/proxy_service/kqp_proxy_service_impl.h index 384ae6983150..a05d88928950 100644 --- a/ydb/core/kqp/proxy_service/kqp_proxy_service_impl.h +++ b/ydb/core/kqp/proxy_service/kqp_proxy_service_impl.h @@ -656,6 +656,8 @@ class TDatabasesCache { }; public: + TDatabasesCache(TDuration idleTimeout = TDuration::Seconds(60)); + template bool SetDatabaseIdOrDeffer(TEvent& event, std::function errorHandler, TActorContext actorContext) { if (!event->Get()->GetDatabaseId().empty()) { @@ -693,6 +695,7 @@ class TDatabasesCache { void PingDatabaseSubscription(const TString& database, TActorContext actorContext) const; private: + const TDuration IdleTimeout; std::unordered_map DatabasesCache; TActorId SubscriberActor; TString TenantName; diff --git a/ydb/core/kqp/proxy_service/kqp_proxy_ut.cpp b/ydb/core/kqp/proxy_service/kqp_proxy_ut.cpp index ebaad532350f..cb21bc06af07 100644 --- a/ydb/core/kqp/proxy_service/kqp_proxy_ut.cpp +++ b/ydb/core/kqp/proxy_service/kqp_proxy_ut.cpp @@ -69,12 +69,12 @@ TString CreateSession(TTestActorRuntime* runtime, const TActorId& kqpProxy, cons class TDatabaseCacheTestActor : public TActorBootstrapped { public: - TDatabaseCacheTestActor(const TString& database, const TString& expectedDatabaseId, bool fromCache, TDatabasesCache& cache, NThreading::TPromise promise) - : Database(database) + TDatabaseCacheTestActor(const TString& database, const TString& expectedDatabaseId, TDuration idleTimeout, NThreading::TPromise promise) + : IdleTimeout(idleTimeout) + , Database(database) , ExpectedDatabaseId(expectedDatabaseId) - , Cache(cache) + , Cache(idleTimeout) , Promise(promise) - , FromCache(fromCache) {} void Bootstrap() { @@ -83,45 +83,68 @@ class TDatabaseCacheTestActor : public TActorBootstrapped(); event->Record.MutableRequest()->SetDatabase(Database); Send(SelfId(), event.Release()); + + Schedule(3 * IdleTimeout, new TEvents::TEvWakeup()); } void Handle(TEvKqp::TEvUpdateDatabaseInfo::TPtr& ev) { - Cache.UpdateDatabaseInfo(ev, ActorContext()); + if (!CacheUpdated) { + UNIT_ASSERT_VALUES_EQUAL_C(ev->Get()->Status, Ydb::StatusIds::SUCCESS, TStringBuilder() << GetErrorString() << ev->Get()->Issues.ToString()); + Cache.UpdateDatabaseInfo(ev, ActorContext()); + CacheUpdated = true; + } else { + UNIT_ASSERT_VALUES_EQUAL_C(ev->Get()->Status, Ydb::StatusIds::ABORTED, TStringBuilder() << GetErrorString() << ev->Get()->Issues.ToString()); + UNIT_ASSERT_STRING_CONTAINS_C(ev->Get()->Issues.ToString(), "Database subscription was dropped by idle timeout", GetErrorString()); + Finish(); + } } void Handle(TEvKqp::TEvQueryRequest::TPtr& ev) { auto success = Cache.SetDatabaseIdOrDeffer(ev, [this](Ydb::StatusIds::StatusCode status, NYql::TIssues issues) { - UNIT_ASSERT_C(false, TStringBuilder() << "Unexpected fail, " << GetErrorString() << ", status: " << status << ", reason: " << issues.ToOneLineString()); + UNIT_ASSERT_C(false, TStringBuilder() << "Unexpected fail, status: " << status << ", " << GetErrorString() << issues.ToString()); }, ActorContext()); - if (FromCache) { + bool dedicated = Database == ExpectedDatabaseId; + if (CacheUpdated || dedicated) { UNIT_ASSERT_C(success, TStringBuilder() << "Expected database id from cache, " << GetErrorString()); UNIT_ASSERT_STRING_CONTAINS_C(ev->Get()->GetDatabaseId(), ExpectedDatabaseId, GetErrorString()); - Promise.SetValue(); - PassAway(); + if (dedicated) { + Finish(); + } } else { UNIT_ASSERT_C(!success, TStringBuilder() << "Unexpected database id from cache, " << GetErrorString()); - FromCache = true; } } + void HandleWakeup() { + UNIT_ASSERT_C(false, TStringBuilder() << "Test cache timeout, " << GetErrorString()); + Finish(); + } + STRICT_STFUNC(StateFunc, hFunc(TEvKqp::TEvUpdateDatabaseInfo, Handle); hFunc(TEvKqp::TEvQueryRequest, Handle); + sFunc(TEvents::TEvWakeup, HandleWakeup); ) private: TString GetErrorString() const { - return TStringBuilder() << "database: " << Database << ", from cache: " << FromCache << "\n"; + return TStringBuilder() << "cache updated: " << CacheUpdated << ", database: " << Database << "\n"; + } + + void Finish() { + Promise.SetValue(); + PassAway(); } private: + const TDuration IdleTimeout; const TString Database; const TString ExpectedDatabaseId; - TDatabasesCache& Cache; + TDatabasesCache Cache; NThreading::TPromise Promise; - bool FromCache = false; + bool CacheUpdated = false; }; } @@ -606,27 +629,25 @@ Y_UNIT_TEST_SUITE(KqpProxy) { auto ydb = NWorkload::TYdbSetupSettings() .CreateSampleTenants(true) .Create(); - + auto& runtime = *ydb->GetRuntime(); - TDatabasesCache cache; + TDuration idleTimeout = TDuration::Seconds(5); - auto checkCache = [&](const TString& database, const TString& expectedDatabaseId, bool fromCache) { + auto checkCache = [&](const TString& database, const TString& expectedDatabaseId, ui32 nodeIndex) { auto promise = NThreading::NewPromise(); - runtime.Register(new TDatabaseCacheTestActor(database, expectedDatabaseId, fromCache, cache, promise)); + runtime.Register(new TDatabaseCacheTestActor(database, expectedDatabaseId, idleTimeout, promise), nodeIndex); promise.GetFuture().GetValueSync(); }; const auto& dedicatedTennant = ydb->GetSettings().GetDedicatedTenantName(); - checkCache(dedicatedTennant, dedicatedTennant, false); - checkCache(dedicatedTennant, dedicatedTennant, true); + checkCache(dedicatedTennant, dedicatedTennant, 2); const auto& sharedTennant = ydb->GetSettings().GetSharedTenantName(); - checkCache(sharedTennant, sharedTennant, false); - checkCache(sharedTennant, sharedTennant, true); + checkCache(sharedTennant, sharedTennant, 1); const auto& serverlessTennant = ydb->GetSettings().GetServerlessTenantName(); - checkCache(serverlessTennant, TStringBuilder() << ":4:" << serverlessTennant, false); - checkCache(serverlessTennant, TStringBuilder() << ":4:" << serverlessTennant, true); + checkCache(serverlessTennant, TStringBuilder() << ":4:" << serverlessTennant, 1); } + } // namspace NKqp } // namespace NKikimr diff --git a/ydb/core/kqp/workload_service/kqp_workload_service_impl.h b/ydb/core/kqp/workload_service/kqp_workload_service_impl.h index f16c33336b7a..e9e292d81dfc 100644 --- a/ydb/core/kqp/workload_service/kqp_workload_service_impl.h +++ b/ydb/core/kqp/workload_service/kqp_workload_service_impl.h @@ -77,10 +77,6 @@ struct TDatabaseState { return; } - if (Serverless != ev->Get()->Serverless) { - ActorContext.Send(MakeKqpProxyID(ActorContext.SelfID.NodeId()), new TEvUpdateDatabaseInfo(ev->Get()->Database, ev->Get()->Serverless)); - } - LastUpdateTime = TInstant::Now(); Serverless = ev->Get()->Serverless; StartPendingRequests(); From 2e06e0593a9d0e91d454227926eba69b2089366a Mon Sep 17 00:00:00 2001 From: Grigoriy Pisarenko Date: Tue, 24 Sep 2024 17:17:54 +0000 Subject: [PATCH 12/15] Fixed issues --- ydb/core/kqp/common/simple/kqp_event_ids.h | 1 - .../gateway/behaviour/resource_pool_classifier/checker.cpp | 4 ++-- ydb/core/kqp/proxy_service/kqp_proxy_databases_cache.cpp | 1 - 3 files changed, 2 insertions(+), 4 deletions(-) diff --git a/ydb/core/kqp/common/simple/kqp_event_ids.h b/ydb/core/kqp/common/simple/kqp_event_ids.h index 986f3ddf2a7c..5d6d0009fc8f 100644 --- a/ydb/core/kqp/common/simple/kqp_event_ids.h +++ b/ydb/core/kqp/common/simple/kqp_event_ids.h @@ -175,7 +175,6 @@ struct TKqpWorkloadServiceEvents { EvCleanupRequest, EvCleanupResponse, EvUpdatePoolInfo, - EvUpdateDatabaseInfo, EvSubscribeOnPoolChanges, EvFetchDatabaseResponse, }; diff --git a/ydb/core/kqp/gateway/behaviour/resource_pool_classifier/checker.cpp b/ydb/core/kqp/gateway/behaviour/resource_pool_classifier/checker.cpp index 497b6e0a41ea..7200999c7ca1 100644 --- a/ydb/core/kqp/gateway/behaviour/resource_pool_classifier/checker.cpp +++ b/ydb/core/kqp/gateway/behaviour/resource_pool_classifier/checker.cpp @@ -22,12 +22,12 @@ using namespace NWorkload; struct TEvPrivate { // Event ids enum EEv : ui32 { - EvRanksCheckerResponse = EventSpaceBegin(NActors::TEvents::ES_PRIVATE), + EvRanksCheckerResponse = EventSpaceBegin(TEvents::ES_PRIVATE), EvEnd }; - static_assert(EvEnd < EventSpaceEnd(NActors::TEvents::ES_PRIVATE), "expect EvEnd < EventSpaceEnd(NActors::TEvents::ES_PRIVATE)"); + static_assert(EvEnd < EventSpaceEnd(TEvents::ES_PRIVATE), "expect EvEnd < EventSpaceEnd(TEvents::ES_PRIVATE)"); struct TEvRanksCheckerResponse : public TEventLocal { TEvRanksCheckerResponse(Ydb::StatusIds::StatusCode status, i64 maxRank, ui64 numberClassifiers, NYql::TIssues issues) diff --git a/ydb/core/kqp/proxy_service/kqp_proxy_databases_cache.cpp b/ydb/core/kqp/proxy_service/kqp_proxy_databases_cache.cpp index 95fd3eb22793..aa7019876648 100644 --- a/ydb/core/kqp/proxy_service/kqp_proxy_databases_cache.cpp +++ b/ydb/core/kqp/proxy_service/kqp_proxy_databases_cache.cpp @@ -1,7 +1,6 @@ #include "kqp_proxy_service_impl.h" #include -#include #include From 5ae39b98cbb9ef1cb28a23ed89c4509cf5c82715 Mon Sep 17 00:00:00 2001 From: Grigoriy Pisarenko Date: Tue, 24 Sep 2024 17:26:22 +0000 Subject: [PATCH 13/15] Removed Y_ABORT_UNLESS --- ydb/core/kqp/proxy_service/kqp_proxy_databases_cache.cpp | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/ydb/core/kqp/proxy_service/kqp_proxy_databases_cache.cpp b/ydb/core/kqp/proxy_service/kqp_proxy_databases_cache.cpp index aa7019876648..afba7bb8e807 100644 --- a/ydb/core/kqp/proxy_service/kqp_proxy_databases_cache.cpp +++ b/ydb/core/kqp/proxy_service/kqp_proxy_databases_cache.cpp @@ -216,9 +216,10 @@ const TString& TDatabasesCache::GetTenantName() { } void TDatabasesCache::UpdateDatabaseInfo(TEvKqp::TEvUpdateDatabaseInfo::TPtr& event, TActorContext actorContext) { - const auto& database = event->Get()->Database; - auto it = DatabasesCache.find(database); - Y_ABORT_UNLESS(it != DatabasesCache.end()); + auto it = DatabasesCache.find(event->Get()->Database); + if (it == DatabasesCache.end()) { + return; + } it->second.DatabaseId = event->Get()->DatabaseId; const bool success = event->Get()->Status == Ydb::StatusIds::SUCCESS; From 55189fc81791677c5bdf062a624e3178974aa2c3 Mon Sep 17 00:00:00 2001 From: Grigoriy Pisarenko Date: Wed, 25 Sep 2024 09:19:14 +0000 Subject: [PATCH 14/15] Fixed typo --- ydb/core/kqp/proxy_service/kqp_proxy_service.cpp | 4 ++-- ydb/core/kqp/proxy_service/kqp_proxy_service_impl.h | 2 +- ydb/core/kqp/proxy_service/kqp_proxy_ut.cpp | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp b/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp index 7e6ec315071d..096122981b3b 100644 --- a/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp +++ b/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp @@ -642,7 +642,7 @@ class TKqpProxyService : public TActorBootstrapped { NYql::IssuesToMessage(issues, response->Record.GetRef().MutableResponse()->MutableQueryIssues()); Send(sender, std::move(response)); }; - if (!DatabasesCache.SetDatabaseIdOrDeffer(ev, errorHandler, ActorContext())) { + if (!DatabasesCache.SetDatabaseIdOrDefer(ev, errorHandler, ActorContext())) { return; } @@ -1650,7 +1650,7 @@ class TKqpProxyService : public TActorBootstrapped { const auto errorHandler = [this, sender = ev->Sender](Ydb::StatusIds::StatusCode status, NYql::TIssues issues){ Send(sender, new TResponse(status, std::move(issues))); }; - if (!DatabasesCache.SetDatabaseIdOrDeffer(ev, errorHandler, ActorContext())) { + if (!DatabasesCache.SetDatabaseIdOrDefer(ev, errorHandler, ActorContext())) { return false; } diff --git a/ydb/core/kqp/proxy_service/kqp_proxy_service_impl.h b/ydb/core/kqp/proxy_service/kqp_proxy_service_impl.h index a05d88928950..3c608023c83c 100644 --- a/ydb/core/kqp/proxy_service/kqp_proxy_service_impl.h +++ b/ydb/core/kqp/proxy_service/kqp_proxy_service_impl.h @@ -659,7 +659,7 @@ class TDatabasesCache { TDatabasesCache(TDuration idleTimeout = TDuration::Seconds(60)); template - bool SetDatabaseIdOrDeffer(TEvent& event, std::function errorHandler, TActorContext actorContext) { + bool SetDatabaseIdOrDefer(TEvent& event, std::function errorHandler, TActorContext actorContext) { if (!event->Get()->GetDatabaseId().empty()) { return true; } diff --git a/ydb/core/kqp/proxy_service/kqp_proxy_ut.cpp b/ydb/core/kqp/proxy_service/kqp_proxy_ut.cpp index cb21bc06af07..6149f43692c1 100644 --- a/ydb/core/kqp/proxy_service/kqp_proxy_ut.cpp +++ b/ydb/core/kqp/proxy_service/kqp_proxy_ut.cpp @@ -100,7 +100,7 @@ class TDatabaseCacheTestActor : public TActorBootstrapped Date: Wed, 25 Sep 2024 10:49:54 +0000 Subject: [PATCH 15/15] Fixed lru cache --- .../kqp_proxy_databases_cache.cpp | 80 ++++++++----------- 1 file changed, 35 insertions(+), 45 deletions(-) diff --git a/ydb/core/kqp/proxy_service/kqp_proxy_databases_cache.cpp b/ydb/core/kqp/proxy_service/kqp_proxy_databases_cache.cpp index afba7bb8e807..ca729fbbedae 100644 --- a/ydb/core/kqp/proxy_service/kqp_proxy_databases_cache.cpp +++ b/ydb/core/kqp/proxy_service/kqp_proxy_databases_cache.cpp @@ -50,12 +50,12 @@ class TDatabaseSubscriberActor : public TActor { }; using TBase = TActor; - using TDatabaseStatePtr = typename std::list::iterator; public: TDatabaseSubscriberActor(TDuration idleTimeout) : TBase(&TDatabaseSubscriberActor::StateFunc) , IdleTimeout(idleTimeout) + , DatabaseStates(std::numeric_limits::max()) {} void Registered(TActorSystem* sys, const TActorId& owner) { @@ -65,86 +65,76 @@ class TDatabaseSubscriberActor : public TActor { void Handle(TEvPrivate::TEvSubscribeOnDatabase::TPtr& ev) { const TString& database = ev->Get()->Database; - const auto it = DatabasePathToState.find(database); + auto databaseStateIt = DatabaseStates.Find(database); - if (it == DatabasePathToState.end()) { - DatabaseStates.emplace_front(TDatabaseState{.Database = database}); - DatabasePathToState.insert({database, DatabaseStates.begin()}); + if (databaseStateIt == DatabaseStates.End()) { + DatabaseStates.Insert({database, TDatabaseState{.Database = database}}); Register(NWorkload::CreateDatabaseFetcherActor(SelfId(), database)); StartIdleCheck(); return; } - const auto databaseState = it->second; - if (databaseState->DatabaseId) { - SendSubscriberInfo(*databaseState, Ydb::StatusIds::SUCCESS); + databaseStateIt->LastUpdateTime = TInstant::Now(); + if (databaseStateIt->DatabaseId) { + SendSubscriberInfo(*databaseStateIt, Ydb::StatusIds::SUCCESS); } } void Handle(TEvPrivate::TEvPingDatabaseSubscription::TPtr& ev) { - const auto it = DatabasePathToState.find(ev->Get()->Database); - if (it == DatabasePathToState.end()) { - return; + auto databaseStateIt = DatabaseStates.Find(ev->Get()->Database); + if (databaseStateIt != DatabaseStates.End()) { + databaseStateIt->LastUpdateTime = TInstant::Now(); } - - TDatabaseState databaseState = *it->second; - databaseState.LastUpdateTime = TInstant::Now(); - - DatabaseStates.erase(it->second); - DatabaseStates.emplace_front(databaseState); - it->second = DatabaseStates.begin(); } void Handle(NWorkload::TEvFetchDatabaseResponse::TPtr& ev) { - const auto it = DatabasePathToState.find(ev->Get()->Database); - if (it == DatabasePathToState.end()) { + auto databaseStateIt = DatabaseStates.Find(ev->Get()->Database); + if (databaseStateIt == DatabaseStates.End()) { return; } - const auto databaseState = it->second; - databaseState->FetchRequestIsRunning = false; - UpdateDatabaseState(*databaseState, ev->Get()->PathId, ev->Get()->Serverless); - SendSubscriberInfo(*databaseState, ev->Get()->Status, ev->Get()->Issues); + databaseStateIt->FetchRequestIsRunning = false; + UpdateDatabaseState(*databaseStateIt, ev->Get()->PathId, ev->Get()->Serverless); + SendSubscriberInfo(*databaseStateIt, ev->Get()->Status, ev->Get()->Issues); if (ev->Get()->Status == Ydb::StatusIds::SUCCESS) { FreeWatchKey++; - databaseState->WatchKey = FreeWatchKey; + databaseStateIt->WatchKey = FreeWatchKey; Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvWatchPathId(ev->Get()->PathId, FreeWatchKey)); } } void Handle(TEvTxProxySchemeCache::TEvWatchNotifyDeleted::TPtr& ev) { - const auto it = DatabasePathToState.find(ev->Get()->Path); - if (it == DatabasePathToState.end()) { + auto databaseStateIt = DatabaseStates.Find(ev->Get()->Path); + if (databaseStateIt == DatabaseStates.End()) { return; } - const auto databaseState = it->second; - UnsubscribeFromSchemeCache(*databaseState); - SendSubscriberInfo(*databaseState, Ydb::StatusIds::NOT_FOUND, {NYql::TIssue{"Database was dropped"}}); - DatabasePathToState.erase(it); - DatabaseStates.erase(databaseState); + UnsubscribeFromSchemeCache(*databaseStateIt); + SendSubscriberInfo(*databaseStateIt, Ydb::StatusIds::NOT_FOUND, {NYql::TIssue{"Database was dropped"}}); + DatabaseStates.Erase(databaseStateIt); } void HandlePoison() { - for (auto& databaseState : DatabaseStates) { - UnsubscribeFromSchemeCache(databaseState); - } - + Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvWatchRemove(0)); TBase::PassAway(); } void HandleWakeup() { IdleCheckStarted = false; const auto minimalTime = TInstant::Now() - IdleTimeout; - while (!DatabaseStates.empty() && DatabaseStates.back().LastUpdateTime <= minimalTime) { - UnsubscribeFromSchemeCache(DatabaseStates.back()); - SendSubscriberInfo(DatabaseStates.back(), Ydb::StatusIds::ABORTED, {NYql::TIssue{"Database subscription was dropped by idle timeout"}}); - DatabasePathToState.erase(DatabaseStates.back().Database); - DatabaseStates.pop_back(); + while (!DatabaseStates.Empty()) { + auto oldestIt = DatabaseStates.FindOldest(); + if (oldestIt->LastUpdateTime > minimalTime) { + break; + } + + UnsubscribeFromSchemeCache(*oldestIt); + SendSubscriberInfo(*oldestIt, Ydb::StatusIds::ABORTED, {NYql::TIssue{"Database subscription was dropped by idle timeout"}}); + DatabaseStates.Erase(oldestIt); } - if (!DatabaseStates.empty()) { + if (!DatabaseStates.Empty()) { StartIdleCheck(); } } @@ -162,6 +152,7 @@ class TDatabaseSubscriberActor : public TActor { private: static void UpdateDatabaseState(TDatabaseState& databaseState, TPathId pathId, bool serverless) { + databaseState.LastUpdateTime = TInstant::Now(); databaseState.DatabaseId = (serverless ? TStringBuilder() << pathId.OwnerId << ":" << pathId.LocalPathId << ":" : TStringBuilder()) << databaseState.Database; databaseState.Serverless = serverless; } @@ -173,7 +164,7 @@ class TDatabaseSubscriberActor : public TActor { } } - void SendSubscriberInfo(TDatabaseState& databaseState, Ydb::StatusIds::StatusCode status, NYql::TIssues issues = {}) { + void SendSubscriberInfo(const TDatabaseState& databaseState, Ydb::StatusIds::StatusCode status, NYql::TIssues issues = {}) { if (status == Ydb::StatusIds::SUCCESS || status == Ydb::StatusIds::UNSUPPORTED) { Send(Owner, new TEvKqp::TEvUpdateDatabaseInfo(databaseState.Database, databaseState.DatabaseId, databaseState.Serverless)); } else { @@ -197,8 +188,7 @@ class TDatabaseSubscriberActor : public TActor { TActorId Owner; bool IdleCheckStarted = false; - std::unordered_map DatabasePathToState; - std::list DatabaseStates; + TLRUCache DatabaseStates; ui32 FreeWatchKey = 0; };