Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

YQ-3689 added kqp proxy database cache #9644

46 changes: 46 additions & 0 deletions ydb/core/kqp/common/events/events.h
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,18 @@ struct TEvKqp {
struct TEvScriptRequest : public TEventLocal<TEvScriptRequest, TKqpEvents::EvScriptRequest> {
TEvScriptRequest() = default;

const TString& GetDatabase() const {
return Record.GetRequest().GetDatabase();
}

const TString& GetDatabaseId() const {
return Record.GetRequest().GetDatabaseId();
}

void SetDatabaseId(const TString& databaseId) {
Record.MutableRequest()->SetDatabaseId(databaseId);
}

mutable NKikimrKqp::TEvQueryRequest Record;
TDuration ForgetAfter;
TDuration ResultsTtl;
Expand Down Expand Up @@ -161,6 +173,40 @@ struct TEvKqp {
return issues;
}
};

struct TEvUpdateDatabaseInfo : public TEventLocal<TEvUpdateDatabaseInfo, TKqpEvents::EvUpdateDatabaseInfo> {
TEvUpdateDatabaseInfo(const TString& database, Ydb::StatusIds::StatusCode status, NYql::TIssues issues)
: Status(status)
, Database(database)
, Issues(std::move(issues))
{}

TEvUpdateDatabaseInfo(const TString& database, const TString& databaseId, bool serverless)
: Status(Ydb::StatusIds::SUCCESS)
, Database(database)
, DatabaseId(databaseId)
, Serverless(serverless)
, Issues({})
{}

Ydb::StatusIds::StatusCode Status;
TString Database;
TString DatabaseId;
bool Serverless = false;
NYql::TIssues Issues;
};

struct TEvDelayedRequestError : public TEventLocal<TEvDelayedRequestError, TKqpEvents::EvDelayedRequestError> {
TEvDelayedRequestError(THolder<IEventHandle> requestEvent, Ydb::StatusIds::StatusCode status, NYql::TIssues issues)
: RequestEvent(std::move(requestEvent))
, Status(status)
, Issues(std::move(issues))
{}

THolder<IEventHandle> RequestEvent;
Ydb::StatusIds::StatusCode Status;
NYql::TIssues Issues;
};
};

} // namespace NKikimr::NKqp
9 changes: 9 additions & 0 deletions ydb/core/kqp/common/events/query.h
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,14 @@ struct TEvQueryRequest: public NActors::TEventLocal<TEvQueryRequest, TKqpEvents:
return PoolConfig;
}

const TString& GetDatabaseId() const {
return DatabaseId ? DatabaseId : Record.GetRequest().GetDatabaseId();
}

void SetDatabaseId(const TString& databaseId) {
DatabaseId = databaseId;
}

mutable NKikimrKqp::TEvQueryRequest Record;

private:
Expand All @@ -363,6 +371,7 @@ struct TEvQueryRequest: public NActors::TEventLocal<TEvQueryRequest, TKqpEvents:
mutable TIntrusiveConstPtr<NACLib::TUserToken> Token_;
TActorId RequestActorId;
TString Database;
TString DatabaseId;
TString SessionId;
TString YqlText;
TString QueryId;
Expand Down
52 changes: 34 additions & 18 deletions ydb/core/kqp/common/events/script_executions.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,34 @@ enum EFinalizationStatus : i32 {
FS_ROLLBACK,
};

struct TEvForgetScriptExecutionOperation : public NActors::TEventLocal<TEvForgetScriptExecutionOperation, TKqpScriptExecutionEvents::EvForgetScriptExecutionOperation> {
TEvForgetScriptExecutionOperation(const TString& database, const NOperationId::TOperationId& id)
template <typename TEv, ui32 TEventType>
struct TEventWithDatabaseId : public NActors::TEventLocal<TEv, TEventType> {
TEventWithDatabaseId(const TString& database)
: Database(database)
, OperationId(id)
{}

const TString& GetDatabase() const {
return Database;
}

const TString& GetDatabaseId() const {
return DatabaseId;
}

void SetDatabaseId(const TString& databaseId) {
DatabaseId = databaseId;
}

const TString Database;
TString DatabaseId;
};

struct TEvForgetScriptExecutionOperation : public TEventWithDatabaseId<TEvForgetScriptExecutionOperation, TKqpScriptExecutionEvents::EvForgetScriptExecutionOperation> {
TEvForgetScriptExecutionOperation(const TString& database, const NOperationId::TOperationId& id)
: TEventWithDatabaseId(database)
, OperationId(id)
{}

const NOperationId::TOperationId OperationId;
};

Expand All @@ -43,14 +64,12 @@ struct TEvForgetScriptExecutionOperationResponse : public NActors::TEventLocal<T
NYql::TIssues Issues;
};

struct TEvGetScriptExecutionOperation : public NActors::TEventLocal<TEvGetScriptExecutionOperation, TKqpScriptExecutionEvents::EvGetScriptExecutionOperation> {
explicit TEvGetScriptExecutionOperation(const TString& database, const NOperationId::TOperationId& id)
: Database(database)
struct TEvGetScriptExecutionOperation : public TEventWithDatabaseId<TEvGetScriptExecutionOperation, TKqpScriptExecutionEvents::EvGetScriptExecutionOperation> {
TEvGetScriptExecutionOperation(const TString& database, const NOperationId::TOperationId& id)
: TEventWithDatabaseId(database)
, OperationId(id)
{
}
{}

TString Database;
NOperationId::TOperationId OperationId;
};

Expand Down Expand Up @@ -97,14 +116,13 @@ struct TEvGetScriptExecutionOperationResponse : public NActors::TEventLocal<TEvG
TMaybe<google::protobuf::Any> Metadata;
};

struct TEvListScriptExecutionOperations : public NActors::TEventLocal<TEvListScriptExecutionOperations, TKqpScriptExecutionEvents::EvListScriptExecutionOperations> {
struct TEvListScriptExecutionOperations : public TEventWithDatabaseId<TEvListScriptExecutionOperations, TKqpScriptExecutionEvents::EvListScriptExecutionOperations> {
TEvListScriptExecutionOperations(const TString& database, const ui64 pageSize, const TString& pageToken)
: Database(database)
: TEventWithDatabaseId(database)
, PageSize(pageSize)
, PageToken(pageToken)
{}

TString Database;
ui64 PageSize;
TString PageToken;
};
Expand Down Expand Up @@ -151,14 +169,12 @@ struct TEvCheckAliveRequest : public NActors::TEventPB<TEvCheckAliveRequest, NKi
struct TEvCheckAliveResponse : public NActors::TEventPB<TEvCheckAliveResponse, NKikimrKqp::TEvCheckAliveResponse, TKqpScriptExecutionEvents::EvCheckAliveResponse> {
};

struct TEvCancelScriptExecutionOperation : public NActors::TEventLocal<TEvCancelScriptExecutionOperation, TKqpScriptExecutionEvents::EvCancelScriptExecutionOperation> {
explicit TEvCancelScriptExecutionOperation(const TString& database, const NOperationId::TOperationId& id)
: Database(database)
struct TEvCancelScriptExecutionOperation : public TEventWithDatabaseId<TEvCancelScriptExecutionOperation, TKqpScriptExecutionEvents::EvCancelScriptExecutionOperation> {
TEvCancelScriptExecutionOperation(const TString& database, const NOperationId::TOperationId& id)
: TEventWithDatabaseId(database)
, OperationId(id)
{
}
{}

TString Database;
NOperationId::TOperationId OperationId;
};

Expand Down
13 changes: 10 additions & 3 deletions ydb/core/kqp/common/events/workload_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

#include <ydb/core/kqp/common/simple/kqp_event_ids.h>
#include <ydb/core/resource_pools/resource_pool_settings.h>
#include <ydb/core/scheme/scheme_pathid.h>

#include <ydb/library/aclib/aclib.h>
#include <ydb/library/actors/core/event_local.h>
Expand Down Expand Up @@ -90,14 +91,20 @@ struct TEvUpdatePoolInfo : public NActors::TEventLocal<TEvUpdatePoolInfo, TKqpWo
const std::optional<NACLib::TSecurityObject> SecurityObject;
};

struct TEvUpdateDatabaseInfo : public NActors::TEventLocal<TEvUpdateDatabaseInfo, TKqpWorkloadServiceEvents::EvUpdateDatabaseInfo> {
TEvUpdateDatabaseInfo(const TString& database, bool serverless)
: Database(database)
struct TEvFetchDatabaseResponse : public NActors::TEventLocal<TEvFetchDatabaseResponse, TKqpWorkloadServiceEvents::EvFetchDatabaseResponse> {
TEvFetchDatabaseResponse(Ydb::StatusIds::StatusCode status, const TString& database, bool serverless, TPathId pathId, NYql::TIssues issues)
: Status(status)
, Database(database)
, Serverless(serverless)
, PathId(pathId)
, Issues(std::move(issues))
{}

const Ydb::StatusIds::StatusCode Status;
const TString Database;
const bool Serverless;
const TPathId PathId;
const NYql::TIssues Issues;
};

} // NKikimr::NKqp::NWorkload
1 change: 1 addition & 0 deletions ydb/core/kqp/common/events/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ PEERDIR(
ydb/core/kqp/common/shutdown
ydb/core/kqp/common/compilation
ydb/core/resource_pools
ydb/core/scheme

ydb/library/yql/dq/actors
ydb/public/api/protos
Expand Down
4 changes: 4 additions & 0 deletions ydb/core/kqp/common/kqp_event_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,10 @@ void TEvKqp::TEvQueryRequest::PrepareRemote() const {
Record.MutableRequest()->SetPoolId(PoolId);
}

if (!DatabaseId.empty()) {
Record.MutableRequest()->SetDatabaseId(DatabaseId);
}

Record.MutableRequest()->SetSessionId(SessionId);
Record.MutableRequest()->SetAction(QueryAction);
Record.MutableRequest()->SetType(QueryType);
Expand Down
6 changes: 4 additions & 2 deletions ydb/core/kqp/common/simple/kqp_event_ids.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,9 @@ struct TKqpEvents {
EvListSessionsRequest,
EvListSessionsResponse,
EvListProxyNodesRequest,
EvListProxyNodesResponse
EvListProxyNodesResponse,
EvUpdateDatabaseInfo,
EvDelayedRequestError
};

static_assert (EvCompileInvalidateRequest + 1 == EvAbortExecution);
Expand Down Expand Up @@ -174,8 +176,8 @@ struct TKqpWorkloadServiceEvents {
EvCleanupRequest,
EvCleanupResponse,
EvUpdatePoolInfo,
EvUpdateDatabaseInfo,
EvSubscribeOnPoolChanges,
EvFetchDatabaseResponse,
};
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
#include <ydb/core/base/path.h>
#include <ydb/core/cms/console/configs_dispatcher.h>
#include <ydb/core/kqp/workload_service/actors/actors.h>
#include <ydb/core/kqp/workload_service/common/events.h>
#include <ydb/core/protos/console_config.pb.h>
#include <ydb/core/resource_pools/resource_pool_classifier_settings.h>

Expand All @@ -20,6 +19,31 @@ using namespace NResourcePool;
using namespace NWorkload;


struct TEvPrivate {
// Event ids
enum EEv : ui32 {
EvRanksCheckerResponse = EventSpaceBegin(TEvents::ES_PRIVATE),

EvEnd
};

static_assert(EvEnd < EventSpaceEnd(TEvents::ES_PRIVATE), "expect EvEnd < EventSpaceEnd(TEvents::ES_PRIVATE)");

struct TEvRanksCheckerResponse : public TEventLocal<TEvRanksCheckerResponse, EvRanksCheckerResponse> {
TEvRanksCheckerResponse(Ydb::StatusIds::StatusCode status, i64 maxRank, ui64 numberClassifiers, NYql::TIssues issues)
: Status(status)
, MaxRank(maxRank)
, NumberClassifiers(numberClassifiers)
, Issues(std::move(issues))
{}

const Ydb::StatusIds::StatusCode Status;
const i64 MaxRank;
const ui64 NumberClassifiers;
const NYql::TIssues Issues;
};
};

class TRanksCheckerActor : public NKikimr::TQueryBase {
using TBase = NKikimr::TQueryBase;

Expand Down Expand Up @@ -177,7 +201,7 @@ class TResourcePoolClassifierPreparationActor : public TActorBootstrapped<TResou
TryFinish();
}

void Handle(TEvPrivate::TEvFetchDatabaseResponse::TPtr& ev) {
void Handle(TEvFetchDatabaseResponse::TPtr& ev) {
if (ev->Get()->Status != Ydb::StatusIds::SUCCESS) {
FailAndPassAway("Database check failed", ev->Get()->Status, ev->Get()->Issues);
return;
Expand Down Expand Up @@ -223,7 +247,7 @@ class TResourcePoolClassifierPreparationActor : public TActorBootstrapped<TResou

STRICT_STFUNC(StateFunc,
hFunc(TEvPrivate::TEvRanksCheckerResponse, Handle);
hFunc(TEvPrivate::TEvFetchDatabaseResponse, Handle);
hFunc(TEvFetchDatabaseResponse, Handle);
hFunc(TEvents::TEvUndelivered, Handle);
hFunc(NConsole::TEvConfigsDispatcher::TEvGetConfigResponse, Handle);
hFunc(NMetadata::NProvider::TEvRefreshSubscriberData, Handle)
Expand Down
Loading
Loading