From 22b8da5c7800eda2b8bccb376010518c3468c22b Mon Sep 17 00:00:00 2001 From: Pisarenko Grigoriy <79596613+GrigoriyPA@users.noreply.github.com> Date: Fri, 20 Sep 2024 21:11:55 +0300 Subject: [PATCH] YQ-3684 fixed error duplicate session id (#9583) --- .../kqp/session_actor/kqp_session_actor.cpp | 2 + .../actors/pool_handlers_acors.cpp | 7 +-- ydb/core/kqp/workload_service/common/events.h | 15 +++++- .../workload_service/kqp_workload_service.cpp | 47 ++++++++++++++++--- .../kqp_workload_service_impl.h | 26 ++++++++++ 5 files changed, 86 insertions(+), 11 deletions(-) diff --git a/ydb/core/kqp/session_actor/kqp_session_actor.cpp b/ydb/core/kqp/session_actor/kqp_session_actor.cpp index 1fc69f5d0b7e..a3caeeb1279e 100644 --- a/ydb/core/kqp/session_actor/kqp_session_actor.cpp +++ b/ydb/core/kqp/session_actor/kqp_session_actor.cpp @@ -255,6 +255,7 @@ class TKqpSessionActor : public TActorBootstrapped { QueryState->UserToken ), IEventHandle::FlagTrackDelivery); + QueryState->PoolHandlerActor = MakeKqpWorkloadServiceId(SelfId().NodeId()); Become(&TKqpSessionActor::ExecuteState); } @@ -2418,6 +2419,7 @@ class TKqpSessionActor : public TActorBootstrapped { hFunc(TEvTxProxySchemeCache::TEvNavigateKeySetResult, HandleNoop); hFunc(TEvents::TEvUndelivered, HandleNoop); hFunc(TEvTxUserProxy::TEvAllocateTxIdResult, HandleNoop); + hFunc(TEvKqpExecuter::TEvStreamData, HandleNoop); hFunc(NWorkload::TEvContinueRequest, HandleNoop); // always come from WorkerActor diff --git a/ydb/core/kqp/workload_service/actors/pool_handlers_acors.cpp b/ydb/core/kqp/workload_service/actors/pool_handlers_acors.cpp index 2ff6e6912abe..77512a4d529f 100644 --- a/ydb/core/kqp/workload_service/actors/pool_handlers_acors.cpp +++ b/ydb/core/kqp/workload_service/actors/pool_handlers_acors.cpp @@ -161,6 +161,7 @@ class TPoolHandlerActorBase : public TActor { } SendPoolInfoUpdate(std::nullopt, std::nullopt, Subscribers); + this->Send(MakeKqpWorkloadServiceId(this->SelfId().NodeId()), new TEvPrivate::TEvStopPoolHandlerResponse(Database, PoolId)); Counters.OnCleanup(ResetCountersOnStrop); @@ -184,16 +185,16 @@ class TPoolHandlerActorBase : public TActor { } void Handle(TEvPrivate::TEvResolvePoolResponse::TPtr& ev) { - this->Send(MakeKqpWorkloadServiceId(this->SelfId().NodeId()), new TEvPrivate::TEvPlaceRequestIntoPoolResponse(Database, PoolId)); - auto event = std::move(ev->Get()->Event); + const TString& sessionId = event->Get()->SessionId; + this->Send(MakeKqpWorkloadServiceId(this->SelfId().NodeId()), new TEvPrivate::TEvPlaceRequestIntoPoolResponse(Database, PoolId, sessionId)); + const TActorId& workerActorId = event->Sender; if (!InFlightLimit) { this->Send(workerActorId, new TEvContinueRequest(Ydb::StatusIds::PRECONDITION_FAILED, PoolId, PoolConfig, {NYql::TIssue(TStringBuilder() << "Resource pool " << PoolId << " was disabled due to zero concurrent query limit")})); return; } - const TString& sessionId = event->Get()->SessionId; if (LocalSessions.contains(sessionId)) { this->Send(workerActorId, new TEvContinueRequest(Ydb::StatusIds::INTERNAL_ERROR, PoolId, PoolConfig, {NYql::TIssue(TStringBuilder() << "Got duplicate session id " << sessionId << " for pool " << PoolId)})); return; diff --git a/ydb/core/kqp/workload_service/common/events.h b/ydb/core/kqp/workload_service/common/events.h index 48643582d7f0..df821a4c26d6 100644 --- a/ydb/core/kqp/workload_service/common/events.h +++ b/ydb/core/kqp/workload_service/common/events.h @@ -29,6 +29,7 @@ struct TEvPrivate { EvFinishRequestInPool, EvResignPoolHandler, EvStopPoolHandler, + EvStopPoolHandlerResponse, EvCancelRequest, EvUpdatePoolSubscription, @@ -128,13 +129,15 @@ struct TEvPrivate { }; struct TEvPlaceRequestIntoPoolResponse : public NActors::TEventLocal { - TEvPlaceRequestIntoPoolResponse(const TString& database, const TString& poolId) + TEvPlaceRequestIntoPoolResponse(const TString& database, const TString& poolId, const TString& sessionId) : Database(database) , PoolId(poolId) + , SessionId(sessionId) {} const TString Database; const TString PoolId; + const TString SessionId; }; struct TEvFinishRequestInPool : public NActors::TEventLocal { @@ -173,6 +176,16 @@ struct TEvPrivate { const bool ResetCounters; }; + struct TEvStopPoolHandlerResponse : public NActors::TEventLocal { + TEvStopPoolHandlerResponse(const TString& database, const TString& poolId) + : Database(database) + , PoolId(poolId) + {} + + const TString Database; + const TString PoolId; + }; + struct TEvCancelRequest : public NActors::TEventLocal { explicit TEvCancelRequest(const TString& sessionId) : SessionId(sessionId) diff --git a/ydb/core/kqp/workload_service/kqp_workload_service.cpp b/ydb/core/kqp/workload_service/kqp_workload_service.cpp index 24fdd21e3d63..26653cda3e3e 100644 --- a/ydb/core/kqp/workload_service/kqp_workload_service.cpp +++ b/ydb/core/kqp/workload_service/kqp_workload_service.cpp @@ -169,6 +169,13 @@ class TKqpWorkloadService : public TActorBootstrapped { void Handle(TEvCleanupRequest::TPtr& ev) { const TString& database = ev->Get()->Database; const TString& poolId = ev->Get()->PoolId; + const TString& sessionId = ev->Get()->SessionId; + if (GetOrCreateDatabaseState(database)->PendingSessionIds.contains(sessionId)) { + LOG_D("Finished request with worker actor " << ev->Sender << ", wait for place request, Database: " << database << ", PoolId: " << poolId << ", SessionId: " << ev->Get()->SessionId); + GetOrCreateDatabaseState(database)->PendingCancelRequests[sessionId].emplace_back(std::move(ev)); + return; + } + auto poolState = GetPoolState(database, poolId); if (!poolState) { ReplyCleanupError(ev->Sender, Ydb::StatusIds::NOT_FOUND, TStringBuilder() << "Pool " << poolId << " not found"); @@ -176,7 +183,7 @@ class TKqpWorkloadService : public TActorBootstrapped { } LOG_D("Finished request with worker actor " << ev->Sender << ", Database: " << database << ", PoolId: " << poolId << ", SessionId: " << ev->Get()->SessionId); - Send(ev->Forward(poolState->PoolHandler)); + poolState->DoCleanupRequest(std::move(ev)); } void Handle(TEvents::TEvWakeup::TPtr& ev) { @@ -220,6 +227,7 @@ class TKqpWorkloadService : public TActorBootstrapped { hFunc(TEvPrivate::TEvTablesCreationFinished, Handle); hFunc(TEvPrivate::TEvCpuLoadResponse, Handle); hFunc(TEvPrivate::TEvResignPoolHandler, Handle); + hFunc(TEvPrivate::TEvStopPoolHandlerResponse, Handle); ) private: @@ -245,12 +253,16 @@ class TKqpWorkloadService : public TActorBootstrapped { void Handle(TEvPrivate::TEvResolvePoolResponse::TPtr& ev) { const auto& event = ev->Get()->Event; const TString& database = event->Get()->Database; + auto databaseState = GetOrCreateDatabaseState(database); if (ev->Get()->DefaultPoolCreated) { - GetOrCreateDatabaseState(database)->HasDefaultPool = true; + databaseState->HasDefaultPool = true; } const TString& poolId = event->Get()->PoolId; if (ev->Get()->Status != Ydb::StatusIds::SUCCESS) { + databaseState->RemovePendingSession(event->Get()->SessionId, [this](TEvCleanupRequest::TPtr event) { + ReplyCleanupError(event->Sender, Ydb::StatusIds::NOT_FOUND, TStringBuilder() << "Pool " << event->Get()->PoolId << " not found"); + }); ReplyContinueError(event->Sender, ev->Get()->Status, ev->Get()->Issues); return; } @@ -265,9 +277,19 @@ class TKqpWorkloadService : public TActorBootstrapped { void Handle(TEvPrivate::TEvPlaceRequestIntoPoolResponse::TPtr& ev) { const TString& database = ev->Get()->Database; const TString& poolId = ev->Get()->PoolId; - LOG_T("Request placed into pool, Database: " << database << ", PoolId: " << poolId); + const TString& sessionId = ev->Get()->SessionId; + LOG_T("Request placed into pool, Database: " << database << ", PoolId: " << poolId << ", SessionId: " << sessionId); - if (auto poolState = GetPoolState(database, poolId)) { + auto poolState = GetPoolState(database, poolId); + GetOrCreateDatabaseState(database)->RemovePendingSession(sessionId, [this, poolState](TEvCleanupRequest::TPtr event) { + if (poolState) { + poolState->DoCleanupRequest(std::move(event)); + } else { + ReplyCleanupError(event->Sender, Ydb::StatusIds::NOT_FOUND, TStringBuilder() << "Pool " << event->Get()->PoolId << " not found"); + } + }); + + if (poolState) { poolState->PlaceRequestRunning = false; poolState->UpdateHandler(); poolState->StartPlaceRequest(); @@ -388,6 +410,17 @@ class TKqpWorkloadService : public TActorBootstrapped { } } + void Handle(TEvPrivate::TEvStopPoolHandlerResponse::TPtr& ev) { + const TString& database = ev->Get()->Database; + const TString& poolId = ev->Get()->PoolId; + LOG_T("Got stop pool handler response, Database: " << database << ", PoolId: " << poolId); + + Counters.ActivePools->Dec(); + if (auto poolState = GetPoolState(database, poolId)) { + poolState->PreviousPoolHandlers.erase(ev->Sender); + } + } + private: void InitializeWorkloadService() { if (ServiceInitialized) { @@ -441,7 +474,7 @@ class TKqpWorkloadService : public TActorBootstrapped { std::vector poolsToDelete; poolsToDelete.reserve(PoolIdToState.size()); for (const auto& [poolKey, poolState] : PoolIdToState) { - if (!poolState.InFlightRequests && TInstant::Now() - poolState.LastUpdateTime > IDLE_DURATION) { + if (!poolState.InFlightRequests && TInstant::Now() - poolState.LastUpdateTime > IDLE_DURATION && poolState.PendingRequests.empty()) { CpuQuotaManager->CleanupHandler(poolState.PoolHandler); Send(poolState.PoolHandler, new TEvPrivate::TEvStopPoolHandler(true)); poolsToDelete.emplace_back(poolKey); @@ -449,7 +482,6 @@ class TKqpWorkloadService : public TActorBootstrapped { } for (const auto& poolKey : poolsToDelete) { PoolIdToState.erase(poolKey); - Counters.ActivePools->Dec(); } if (!PoolIdToState.empty()) { @@ -512,7 +544,8 @@ class TKqpWorkloadService : public TActorBootstrapped { Send(replyActorId, new TEvCleanupResponse(status, {NYql::TIssue(message)})); } - TDatabaseState* GetOrCreateDatabaseState(const TString& database) { + TDatabaseState* GetOrCreateDatabaseState(TString database) { + database = CanonizePath(database); auto databaseIt = DatabaseToState.find(database); if (databaseIt != DatabaseToState.end()) { return &databaseIt->second; diff --git a/ydb/core/kqp/workload_service/kqp_workload_service_impl.h b/ydb/core/kqp/workload_service/kqp_workload_service_impl.h index 9ae115235a25..813f97b6e107 100644 --- a/ydb/core/kqp/workload_service/kqp_workload_service_impl.h +++ b/ydb/core/kqp/workload_service/kqp_workload_service_impl.h @@ -19,6 +19,8 @@ struct TDatabaseState { bool& EnabledResourcePoolsOnServerless; std::vector PendingRequersts = {}; + std::unordered_set PendingSessionIds = {}; + std::unordered_map> PendingCancelRequests = {}; std::unordered_map> PendingSubscriptions = {}; bool HasDefaultPool = false; bool Serverless = false; @@ -38,6 +40,7 @@ struct TDatabaseState { void DoPlaceRequest(TEvPlaceRequestIntoPool::TPtr ev) { TString database = ev->Get()->Database; + PendingSessionIds.emplace(ev->Get()->SessionId); PendingRequersts.emplace_back(std::move(ev)); if (!EnabledResourcePoolsOnServerless && (TInstant::Now() - LastUpdateTime) > IDLE_DURATION) { @@ -83,6 +86,14 @@ struct TDatabaseState { StartPendingRequests(); } + void RemovePendingSession(const TString& sessionId, std::function callback) { + for (auto& event : PendingCancelRequests[sessionId]) { + callback(std::move(event)); + } + PendingCancelRequests.erase(sessionId); + PendingSessionIds.erase(sessionId); + } + private: void StartPendingRequests() { if (!EnabledResourcePoolsOnServerless && Serverless) { @@ -98,6 +109,9 @@ struct TDatabaseState { void ReplyContinueError(Ydb::StatusIds::StatusCode status, NYql::TIssues issues) { for (const auto& ev : PendingRequersts) { + RemovePendingSession(ev->Get()->SessionId, [this](TEvCleanupRequest::TPtr event) { + ActorContext.Send(event->Sender, new TEvCleanupResponse(Ydb::StatusIds::NOT_FOUND, {NYql::TIssue(TStringBuilder() << "Pool " << event->Get()->PoolId << " not found")})); + }); ActorContext.Send(ev->Sender, new TEvContinueRequest(status, {}, {}, issues)); } PendingRequersts.clear(); @@ -112,6 +126,7 @@ struct TPoolState { bool WaitingInitialization = false; bool PlaceRequestRunning = false; std::optional NewPoolHandler = std::nullopt; + std::unordered_set PreviousPoolHandlers = {}; ui64 InFlightRequests = 0; TInstant LastUpdateTime = TInstant::Now(); @@ -122,6 +137,7 @@ struct TPoolState { } ActorContext.Send(PoolHandler, new TEvPrivate::TEvStopPoolHandler(false)); + PreviousPoolHandlers.insert(PoolHandler); PoolHandler = *NewPoolHandler; NewPoolHandler = std::nullopt; InFlightRequests = 0; @@ -143,6 +159,16 @@ struct TPoolState { InFlightRequests--; LastUpdateTime = TInstant::Now(); } + + void DoCleanupRequest(TEvCleanupRequest::TPtr event) { + for (const auto& poolHandler : PreviousPoolHandlers) { + ActorContext.Send(poolHandler, new TEvCleanupRequest( + event->Get()->Database, event->Get()->SessionId, event->Get()->PoolId, + event->Get()->Duration, event->Get()->CpuConsumed + )); + } + ActorContext.Send(event->Forward(PoolHandler)); + } }; struct TCpuQuotaManagerState {