Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

YQ-3689 added kqp proxy database cache #9644

42 changes: 42 additions & 0 deletions ydb/core/kqp/common/events/events.h
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,18 @@ struct TEvKqp {
struct TEvScriptRequest : public TEventLocal<TEvScriptRequest, TKqpEvents::EvScriptRequest> {
TEvScriptRequest() = default;

const TString& GetDatabase() const {
return Record.GetRequest().GetDatabase();
}

const TString& GetDatabaseId() const {
return Record.GetRequest().GetDatabaseId();
}

void SetDatabaseId(const TString& databaseId) {
Record.MutableRequest()->SetDatabaseId(databaseId);
}

mutable NKikimrKqp::TEvQueryRequest Record;
TDuration ForgetAfter;
TDuration ResultsTtl;
Expand Down Expand Up @@ -161,6 +173,36 @@ struct TEvKqp {
return issues;
}
};

struct TEvSubscribeOnDatabase : public TEventLocal<TEvSubscribeOnDatabase, TKqpEvents::EvSubscribeOnDatabase> {
explicit TEvSubscribeOnDatabase(const TString& database)
: Database(database)
{}

TString Database;
};

struct TEvUpdateDatabaseInfo : public TEventLocal<TEvUpdateDatabaseInfo, TKqpEvents::EvUpdateDatabaseInfo> {
TEvUpdateDatabaseInfo(const TString& database, Ydb::StatusIds::StatusCode status, NYql::TIssues issues)
: Status(status)
, Database(database)
, Issues(std::move(issues))
{}

TEvUpdateDatabaseInfo(const TString& database, const TString& databaseId, bool serverless)
: Status(Ydb::StatusIds::SUCCESS)
, Database(database)
, DatabaseId(databaseId)
, Serverless(serverless)
, Issues({})
{}

Ydb::StatusIds::StatusCode Status;
TString Database;
TString DatabaseId;
bool Serverless = false;
NYql::TIssues Issues;
};
};

} // namespace NKikimr::NKqp
12 changes: 12 additions & 0 deletions ydb/core/kqp/common/events/query.h
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,17 @@ struct TEvQueryRequest: public NActors::TEventLocal<TEvQueryRequest, TKqpEvents:
return PoolConfig;
}

const TString& GetDatabaseId() const {
if (DatabaseId) {
return DatabaseId;
}
return Record.GetRequest().GetDatabaseId();
}

void SetDatabaseId(const TString& databaseId) {
DatabaseId = databaseId;
}

mutable NKikimrKqp::TEvQueryRequest Record;

private:
Expand All @@ -363,6 +374,7 @@ struct TEvQueryRequest: public NActors::TEventLocal<TEvQueryRequest, TKqpEvents:
mutable TIntrusiveConstPtr<NACLib::TUserToken> Token_;
TActorId RequestActorId;
TString Database;
TString DatabaseId;
TString SessionId;
TString YqlText;
TString QueryId;
Expand Down
52 changes: 34 additions & 18 deletions ydb/core/kqp/common/events/script_executions.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,34 @@ enum EFinalizationStatus : i32 {
FS_ROLLBACK,
};

struct TEvForgetScriptExecutionOperation : public NActors::TEventLocal<TEvForgetScriptExecutionOperation, TKqpScriptExecutionEvents::EvForgetScriptExecutionOperation> {
TEvForgetScriptExecutionOperation(const TString& database, const NOperationId::TOperationId& id)
template <typename TEv, ui32 TEventType>
struct TEventWithDatabaseId : public NActors::TEventLocal<TEv, TEventType> {
TEventWithDatabaseId(const TString& database)
: Database(database)
, OperationId(id)
{}

const TString& GetDatabase() const {
return Database;
}

const TString& GetDatabaseId() const {
return DatabaseId;
}

void SetDatabaseId(const TString& databaseId) {
DatabaseId = databaseId;
}

const TString Database;
TString DatabaseId;
};

struct TEvForgetScriptExecutionOperation : public TEventWithDatabaseId<TEvForgetScriptExecutionOperation, TKqpScriptExecutionEvents::EvForgetScriptExecutionOperation> {
TEvForgetScriptExecutionOperation(const TString& database, const NOperationId::TOperationId& id)
: TEventWithDatabaseId(database)
, OperationId(id)
{}

const NOperationId::TOperationId OperationId;
};

Expand All @@ -43,14 +64,12 @@ struct TEvForgetScriptExecutionOperationResponse : public NActors::TEventLocal<T
NYql::TIssues Issues;
};

struct TEvGetScriptExecutionOperation : public NActors::TEventLocal<TEvGetScriptExecutionOperation, TKqpScriptExecutionEvents::EvGetScriptExecutionOperation> {
explicit TEvGetScriptExecutionOperation(const TString& database, const NOperationId::TOperationId& id)
: Database(database)
struct TEvGetScriptExecutionOperation : public TEventWithDatabaseId<TEvGetScriptExecutionOperation, TKqpScriptExecutionEvents::EvGetScriptExecutionOperation> {
TEvGetScriptExecutionOperation(const TString& database, const NOperationId::TOperationId& id)
: TEventWithDatabaseId(database)
, OperationId(id)
{
}
{}

TString Database;
NOperationId::TOperationId OperationId;
};

Expand Down Expand Up @@ -97,14 +116,13 @@ struct TEvGetScriptExecutionOperationResponse : public NActors::TEventLocal<TEvG
TMaybe<google::protobuf::Any> Metadata;
};

struct TEvListScriptExecutionOperations : public NActors::TEventLocal<TEvListScriptExecutionOperations, TKqpScriptExecutionEvents::EvListScriptExecutionOperations> {
struct TEvListScriptExecutionOperations : public TEventWithDatabaseId<TEvListScriptExecutionOperations, TKqpScriptExecutionEvents::EvListScriptExecutionOperations> {
TEvListScriptExecutionOperations(const TString& database, const ui64 pageSize, const TString& pageToken)
: Database(database)
: TEventWithDatabaseId(database)
, PageSize(pageSize)
, PageToken(pageToken)
{}

TString Database;
ui64 PageSize;
TString PageToken;
};
Expand Down Expand Up @@ -151,14 +169,12 @@ struct TEvCheckAliveRequest : public NActors::TEventPB<TEvCheckAliveRequest, NKi
struct TEvCheckAliveResponse : public NActors::TEventPB<TEvCheckAliveResponse, NKikimrKqp::TEvCheckAliveResponse, TKqpScriptExecutionEvents::EvCheckAliveResponse> {
};

struct TEvCancelScriptExecutionOperation : public NActors::TEventLocal<TEvCancelScriptExecutionOperation, TKqpScriptExecutionEvents::EvCancelScriptExecutionOperation> {
explicit TEvCancelScriptExecutionOperation(const TString& database, const NOperationId::TOperationId& id)
: Database(database)
struct TEvCancelScriptExecutionOperation : public TEventWithDatabaseId<TEvCancelScriptExecutionOperation, TKqpScriptExecutionEvents::EvCancelScriptExecutionOperation> {
TEvCancelScriptExecutionOperation(const TString& database, const NOperationId::TOperationId& id)
: TEventWithDatabaseId(database)
, OperationId(id)
{
}
{}

TString Database;
NOperationId::TOperationId OperationId;
};

Expand Down
10 changes: 0 additions & 10 deletions ydb/core/kqp/common/events/workload_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -90,14 +90,4 @@ struct TEvUpdatePoolInfo : public NActors::TEventLocal<TEvUpdatePoolInfo, TKqpWo
const std::optional<NACLib::TSecurityObject> SecurityObject;
};

struct TEvUpdateDatabaseInfo : public NActors::TEventLocal<TEvUpdateDatabaseInfo, TKqpWorkloadServiceEvents::EvUpdateDatabaseInfo> {
TEvUpdateDatabaseInfo(const TString& database, bool serverless)
: Database(database)
, Serverless(serverless)
{}

const TString Database;
const bool Serverless;
};

} // NKikimr::NKqp::NWorkload
4 changes: 4 additions & 0 deletions ydb/core/kqp/common/kqp_event_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,10 @@ void TEvKqp::TEvQueryRequest::PrepareRemote() const {
Record.MutableRequest()->SetPoolId(PoolId);
}

if (!DatabaseId.empty()) {
Record.MutableRequest()->SetDatabaseId(DatabaseId);
}

Record.MutableRequest()->SetSessionId(SessionId);
Record.MutableRequest()->SetAction(QueryAction);
Record.MutableRequest()->SetType(QueryType);
Expand Down
5 changes: 3 additions & 2 deletions ydb/core/kqp/common/simple/kqp_event_ids.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,9 @@ struct TKqpEvents {
EvListSessionsRequest,
EvListSessionsResponse,
EvListProxyNodesRequest,
EvListProxyNodesResponse
EvListProxyNodesResponse,
EvSubscribeOnDatabase,
EvUpdateDatabaseInfo
};

static_assert (EvCompileInvalidateRequest + 1 == EvAbortExecution);
Expand Down Expand Up @@ -174,7 +176,6 @@ struct TKqpWorkloadServiceEvents {
EvCleanupRequest,
EvCleanupResponse,
EvUpdatePoolInfo,
EvUpdateDatabaseInfo,
EvSubscribeOnPoolChanges,
};
};
Expand Down
150 changes: 150 additions & 0 deletions ydb/core/kqp/proxy_service/kqp_proxy_databases_cache.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
#include "kqp_proxy_service_impl.h"

#include <ydb/core/kqp/workload_service/actors/actors.h>
#include <ydb/core/kqp/workload_service/common/events.h>

#include <ydb/core/tx/scheme_cache/scheme_cache.h>


namespace NKikimr::NKqp {

namespace {

class TDatabaseSubscriberActor : public TActor<TDatabaseSubscriberActor> {
using TBase = TActor<TDatabaseSubscriberActor>;

struct TDatabaseState {
bool FetchRequestIsRunning = false;
TPathId WatchPathId;

TString DatabaseId;
bool Serverless = false;
std::unordered_set<TActorId> Subscribers;
};

public:
TDatabaseSubscriberActor()
: TBase(&TDatabaseSubscriberActor::StateFunc)
{}

void Handle(TEvKqp::TEvSubscribeOnDatabase::TPtr& ev) {
const TString& database = CanonizePath(ev->Get()->Database);
auto& databaseState = Subscriptions[database];

if (databaseState.DatabaseId) {
SendSubscriberInfo(database, ev->Sender, databaseState, Ydb::StatusIds::SUCCESS);
} else if (!databaseState.FetchRequestIsRunning) {
Register(NWorkload::CreateDatabaseFetcherActor(SelfId(), database));
databaseState.FetchRequestIsRunning = true;
}

databaseState.Subscribers.insert(ev->Sender);
}

void Handle(NWorkload::TEvPrivate::TEvFetchDatabaseResponse::TPtr& ev) {
const TString& database = CanonizePath(ev->Get()->Database);
auto& databaseState = Subscriptions[database];

UpdateDatabaseState(databaseState, database, ev->Get()->PathId, ev->Get()->Serverless);
UpdateSubscribersInfo(database, databaseState, ev->Get()->Status, ev->Get()->Issues);

databaseState.FetchRequestIsRunning = false;
databaseState.WatchPathId = ev->Get()->PathId;

if (ev->Get()->Status == Ydb::StatusIds::SUCCESS) {
WatchKey++;
Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvWatchPathId(databaseState.WatchPathId, WatchKey));
WatchDatabases.insert({WatchKey, database});
}
}

void Handle(TEvTxProxySchemeCache::TEvWatchNotifyUpdated::TPtr& ev) {
auto it = WatchDatabases.find(ev->Get()->Key);
if (it == WatchDatabases.end()) {
return;
}

const auto& result = ev->Get()->Result;
if (!result || result->GetStatus() != NKikimrScheme::StatusSuccess) {
return;
}

if (result->GetPathDescription().HasDomainDescription()) {
NSchemeCache::TDomainInfo description(result->GetPathDescription().GetDomainDescription());

auto& databaseState = Subscriptions[it->second];
UpdateDatabaseState(databaseState, it->second, description.DomainKey, description.IsServerless());
UpdateSubscribersInfo(it->second, databaseState, Ydb::StatusIds::SUCCESS);
}
}

void Handle(TEvTxProxySchemeCache::TEvWatchNotifyDeleted::TPtr& ev) {
auto it = WatchDatabases.find(ev->Get()->Key);
if (it == WatchDatabases.end()) {
return;
}

Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvWatchRemove(ev->Get()->Key));

auto databaseStateIt = Subscriptions.find(it->second);
if (databaseStateIt != Subscriptions.end()) {
UpdateSubscribersInfo(it->second, databaseStateIt->second, Ydb::StatusIds::NOT_FOUND, {NYql::TIssue{"Database was dropped"}});
Subscriptions.erase(databaseStateIt);
}

WatchDatabases.erase(it);
}

void HandlePoison() {
if (!WatchDatabases.empty()) {
Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvWatchRemove(0));
}

TBase::PassAway();
}

STRICT_STFUNC(StateFunc,
hFunc(TEvKqp::TEvSubscribeOnDatabase, Handle);
hFunc(NWorkload::TEvPrivate::TEvFetchDatabaseResponse, Handle);
hFunc(TEvTxProxySchemeCache::TEvWatchNotifyUpdated, Handle);
hFunc(TEvTxProxySchemeCache::TEvWatchNotifyDeleted, Handle);
sFunc(TEvents::TEvPoison, HandlePoison);
)

private:
void UpdateDatabaseState(TDatabaseState& databaseState, const TString& database, TPathId pathId, bool serverless) {
databaseState.DatabaseId = (serverless ? TStringBuilder() << pathId.OwnerId << ":" << pathId.LocalPathId << ":" : TStringBuilder()) << database;
databaseState.Serverless = serverless;
}

void UpdateSubscribersInfo(const TString& database, const TDatabaseState& databaseState, Ydb::StatusIds::StatusCode status, NYql::TIssues issues = {}) {
for (const auto& subscriber : databaseState.Subscribers) {
SendSubscriberInfo(database, subscriber, databaseState, status, issues);
}
}

void SendSubscriberInfo(const TString& database, TActorId subscriber, const TDatabaseState& databaseState, Ydb::StatusIds::StatusCode status, NYql::TIssues issues = {}) {
if (status == Ydb::StatusIds::SUCCESS || status == Ydb::StatusIds::UNSUPPORTED) {
Send(subscriber, new TEvKqp::TEvUpdateDatabaseInfo(database, databaseState.DatabaseId, databaseState.Serverless));
} else {
NYql::TIssue rootIssue(TStringBuilder() << "Failed to describe database " << database);
for (const auto& issue : issues) {
rootIssue.AddSubIssue(MakeIntrusive<NYql::TIssue>(issue));
}
Send(subscriber, new TEvKqp::TEvUpdateDatabaseInfo(database, status, {rootIssue}));
}
}

private:
std::unordered_map<TString, TDatabaseState> Subscriptions;
std::unordered_map<ui32, TString> WatchDatabases;
ui32 WatchKey = 0;
};

} // anonymous namespace

void TDatabasesCache::CreateDatabaseSubscriberActor(TActorContext actorContext) {
SubscriberActor = actorContext.Register(new TDatabaseSubscriberActor());
}

} // namespace NKikimr::NKqp
Loading
Loading