Skip to content

Commit

Permalink
Merge 4a84819 into 89e6dec
Browse files Browse the repository at this point in the history
  • Loading branch information
azevaykin authored Aug 30, 2024
2 parents 89e6dec + 4a84819 commit 2e8a929
Show file tree
Hide file tree
Showing 132 changed files with 8,029 additions and 2,168 deletions.
1 change: 1 addition & 0 deletions .github/config/muted_ya.txt
Original file line number Diff line number Diff line change
Expand Up @@ -107,3 +107,4 @@ ydb/tests/functional/tenants test_tenants.py.*
ydb/tests/functional/ydb_cli test_ydb_impex.py.TestImpex.test_big_dataset*
ydb/tests/tools/pq_read/test test_timeout.py.TestTimeout.test_timeout
ydb/tests/functional/rename [test_rename.py */10] chunk chunk
ydb/tests/functional/suite_tests test_postgres.py.TestPGSQL.test_sql_suite[plan-jointest/join2.test]
2 changes: 1 addition & 1 deletion ydb/core/driver_lib/run/kikimr_services_initializers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@

#include <ydb/core/sys_view/processor/processor.h>
#include <ydb/core/sys_view/service/sysview_service.h>
#include <ydb/core/statistics/stat_service.h>
#include <ydb/core/statistics/service/service.h>
#include <ydb/core/statistics/aggregator/aggregator.h>

#include <ydb/core/tablet/bootstrapper.h>
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/driver_lib/run/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,8 @@ PEERDIR(
ydb/core/scheme_types
ydb/core/security
ydb/core/security/ldap_auth_provider
ydb/core/statistics
ydb/core/statistics/aggregator
ydb/core/statistics/service
ydb/core/sys_view/processor
ydb/core/sys_view/service
ydb/core/tablet
Expand Down
4 changes: 4 additions & 0 deletions ydb/core/formats/arrow/protos/ssa.proto
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ message TProgram {
repeated uint64 HashValues = 1;
}

message TCountMinSketchChecker {
}

message TOlapIndexChecker {
optional uint32 IndexId = 1;
optional string ClassName = 2;
Expand All @@ -56,6 +59,7 @@ message TProgram {
oneof Implementation {
TBloomFilterChecker BloomFilter = 40;
TCompositeChecker Composite = 41;
TCountMinSketchChecker CountMinSketch = 42;
}
}

Expand Down
24 changes: 24 additions & 0 deletions ydb/core/kqp/executer_actor/kqp_scheme_executer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
#include "kqp_executer_impl.h"

#include <ydb/core/kqp/gateway/actors/scheme.h>
#include <ydb/core/kqp/gateway/actors/analyze_actor.h>
#include <ydb/core/kqp/gateway/local_rpc/helper.h>
#include <ydb/core/tx/tx_proxy/proxy.h>
#include <ydb/core/kqp/session_actor/kqp_worker_common.h>
Expand Down Expand Up @@ -307,6 +308,29 @@ class TKqpSchemeExecuter : public TActorBootstrapped<TKqpSchemeExecuter> {
break;
}

case NKqpProto::TKqpSchemeOperation::kAnalyzeTable: {
const auto& analyzeOperation = schemeOp.GetAnalyzeTable();

auto analyzePromise = NewPromise<IKqpGateway::TGenericResult>();

TVector<TString> columns{analyzeOperation.columns().begin(), analyzeOperation.columns().end()};
IActor* analyzeActor = new TAnalyzeActor(analyzeOperation.GetTablePath(), columns, analyzePromise);

auto actorSystem = TlsActivationContext->AsActorContext().ExecutorThread.ActorSystem;
RegisterWithSameMailbox(analyzeActor);

auto selfId = SelfId();
analyzePromise.GetFuture().Subscribe([actorSystem, selfId](const TFuture<IKqpGateway::TGenericResult>& future) {
auto ev = MakeHolder<TEvPrivate::TEvResult>();
ev->Result = future.GetValue();

actorSystem->Send(selfId, ev.Release());
});

Become(&TKqpSchemeExecuter::ExecuteState);
return;
}

default:
InternalError(TStringBuilder() << "Unexpected scheme operation: "
<< (ui32) schemeOp.GetOperationCase());
Expand Down
248 changes: 248 additions & 0 deletions ydb/core/kqp/gateway/actors/analyze_actor.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,248 @@
#include "analyze_actor.h"

#include <ydb/core/base/path.h>
#include <ydb/core/util/ulid.h>
#include <ydb/library/actors/core/log.h>
#include <ydb/library/services/services.pb.h>


namespace NKikimr::NKqp {

enum {
FirstRoundCookie = 0,
SecondRoundCookie = 1,
};

using TNavigate = NSchemeCache::TSchemeCacheNavigate;

TString MakeOperationId() {
TULIDGenerator ulidGen;
return ulidGen.Next(TActivationContext::Now()).ToBinary();
}

TAnalyzeActor::TAnalyzeActor(TString tablePath, TVector<TString> columns, NThreading::TPromise<NYql::IKikimrGateway::TGenericResult> promise)
: TablePath(tablePath)
, Columns(columns)
, Promise(promise)
, OperationId(MakeOperationId())
{}

void TAnalyzeActor::Bootstrap() {
using TNavigate = NSchemeCache::TSchemeCacheNavigate;
auto navigate = std::make_unique<TNavigate>();
auto& entry = navigate->ResultSet.emplace_back();
entry.Path = SplitPath(TablePath);
entry.Operation = TNavigate::EOp::OpTable;
entry.RequestType = TNavigate::TEntry::ERequestType::ByPath;
navigate->Cookie = FirstRoundCookie;

Send(NKikimr::MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvNavigateKeySet(navigate.release()));

Become(&TAnalyzeActor::StateWork);
}

void TAnalyzeActor::Handle(NStat::TEvStatistics::TEvAnalyzeResponse::TPtr& ev, const TActorContext& ctx) {
Y_UNUSED(ctx);

const auto& record = ev->Get()->Record;
const TString operationId = record.GetOperationId();
const auto status = record.GetStatus();

if (status != NKikimrStat::TEvAnalyzeResponse::STATUS_SUCCESS) {
ALOG_CRIT(NKikimrServices::KQP_GATEWAY,
"TAnalyzeActor, TEvAnalyzeResponse has status=" << status);
}

if (operationId != OperationId) {
ALOG_CRIT(NKikimrServices::KQP_GATEWAY,
"TAnalyzeActor, TEvAnalyzeResponse has operationId=" << operationId
<< " , but expected " << OperationId);
}

NYql::IKikimrGateway::TGenericResult result;
result.SetSuccess();
Promise.SetValue(std::move(result));
this->Die(ctx);
}

void TAnalyzeActor::Handle(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev, const TActorContext& ctx) {
std::unique_ptr<TNavigate> navigate(ev->Get()->Request.Release());
Y_ABORT_UNLESS(navigate->ResultSet.size() == 1);
auto& entry = navigate->ResultSet.front();

if (entry.Status != TNavigate::EStatus::Ok) {
NYql::EYqlIssueCode error;
switch (entry.Status) {
case TNavigate::EStatus::PathErrorUnknown:
case TNavigate::EStatus::RootUnknown:
case TNavigate::EStatus::PathNotTable:
case TNavigate::EStatus::TableCreationNotComplete:
error = NYql::TIssuesIds::KIKIMR_SCHEME_ERROR;
case TNavigate::EStatus::LookupError:
case TNavigate::EStatus::RedirectLookupError:
error = NYql::TIssuesIds::KIKIMR_TEMPORARILY_UNAVAILABLE;
default:
error = NYql::TIssuesIds::DEFAULT_ERROR;
}
Promise.SetValue(
NYql::NCommon::ResultFromIssues<NYql::IKikimrGateway::TGenericResult>(
error,
TStringBuilder() << "Can't get statistics aggregator ID. " << entry.Status,
{}
)
);
this->Die(ctx);
return;
}

if (navigate->Cookie == SecondRoundCookie) {
if (entry.DomainInfo->Params.HasStatisticsAggregator()) {
SendStatisticsAggregatorAnalyze(entry, ctx);
} else {
Promise.SetValue(
NYql::NCommon::ResultFromIssues<NYql::IKikimrGateway::TGenericResult>(
NYql::TIssuesIds::DEFAULT_ERROR,
TStringBuilder() << "Can't get statistics aggregator ID.", {}
)
);
}

this->Die(ctx);
return;
}

PathId = entry.TableId.PathId;

auto& domainInfo = entry.DomainInfo;

auto navigateDomainKey = [this] (TPathId domainKey) {
using TNavigate = NSchemeCache::TSchemeCacheNavigate;
auto navigate = std::make_unique<TNavigate>();
auto& entry = navigate->ResultSet.emplace_back();
entry.TableId = TTableId(domainKey.OwnerId, domainKey.LocalPathId);
entry.Operation = TNavigate::EOp::OpPath;
entry.RequestType = TNavigate::TEntry::ERequestType::ByTableId;
entry.RedirectRequired = false;
navigate->Cookie = SecondRoundCookie;

Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvNavigateKeySet(navigate.release()));
};

if (!domainInfo->IsServerless()) {
if (domainInfo->Params.HasStatisticsAggregator()) {
SendStatisticsAggregatorAnalyze(entry, ctx);
return;
}

navigateDomainKey(domainInfo->DomainKey);
} else {
navigateDomainKey(domainInfo->ResourcesDomainKey);
}
}

TDuration TAnalyzeActor::CalcBackoffTime() {
ui32 backoffSlots = 1 << RetryCount;
TDuration maxDuration = RetryInterval * backoffSlots;

double uncertaintyRatio = std::max(std::min(UncertainRatio, 1.0), 0.0);
double uncertaintyMultiplier = RandomNumber<double>() * uncertaintyRatio - uncertaintyRatio + 1.0;

double durationMs = round(maxDuration.MilliSeconds() * uncertaintyMultiplier);
durationMs = std::max(std::min(durationMs, MaxBackoffDurationMs), 0.0);
return TDuration::MilliSeconds(durationMs);
}

void TAnalyzeActor::Handle(TEvPipeCache::TEvDeliveryProblem::TPtr& ev, const TActorContext& ctx) {
Y_UNUSED(ev, ctx);

if (RetryCount >= MaxRetryCount) {
Promise.SetValue(
NYql::NCommon::ResultFromError<NYql::IKikimrGateway::TGenericResult>(
YqlIssue(
{}, NYql::TIssuesIds::UNEXPECTED,
TStringBuilder() << "Can't establish connection with the Statistics Aggregator!"
)
)
);
this->Die(ctx);
return;
}

++RetryCount;
Schedule(CalcBackoffTime(), new TEvAnalyzePrivate::TEvAnalyzeRetry());
}

void TAnalyzeActor::Handle(TEvAnalyzePrivate::TEvAnalyzeRetry::TPtr& ev, const TActorContext& ctx) {
Y_UNUSED(ev, ctx);

auto analyzeRequest = std::make_unique<NStat::TEvStatistics::TEvAnalyze>();
analyzeRequest->Record = Request.Record;
Send(
MakePipePerNodeCacheID(false),
new TEvPipeCache::TEvForward(analyzeRequest.release(), StatisticsAggregatorId.value(), true),
IEventHandle::FlagTrackDelivery
);
}

void TAnalyzeActor::SendStatisticsAggregatorAnalyze(const NSchemeCache::TSchemeCacheNavigate::TEntry& entry, const TActorContext& ctx) {
Y_ABORT_UNLESS(entry.DomainInfo->Params.HasStatisticsAggregator());

StatisticsAggregatorId = entry.DomainInfo->Params.GetStatisticsAggregator();

auto& record = Request.Record;
record.SetOperationId(OperationId);
auto table = record.AddTables();

PathIdFromPathId(PathId, table->MutablePathId());


THashMap<TString, ui32> tagByColumnName;
for (const auto& [_, tableInfo]: entry.Columns) {
tagByColumnName[TString(tableInfo.Name)] = tableInfo.Id;
}

for (const auto& columnName: Columns) {
if (!tagByColumnName.contains(columnName)){
Promise.SetValue(
NYql::NCommon::ResultFromError<NYql::IKikimrGateway::TGenericResult>(
YqlIssue(
{}, NYql::TIssuesIds::UNEXPECTED,
TStringBuilder() << "No such column: " << columnName << " in the " << TablePath
)
)
);
this->Die(ctx);
return;
}

*table->MutableColumnTags()->Add() = tagByColumnName[columnName];
}

auto analyzeRequest = std::make_unique<NStat::TEvStatistics::TEvAnalyze>();
analyzeRequest->Record = Request.Record;
Send(
MakePipePerNodeCacheID(false),
new TEvPipeCache::TEvForward(analyzeRequest.release(), entry.DomainInfo->Params.GetStatisticsAggregator(), true),
IEventHandle::FlagTrackDelivery
);
}

void TAnalyzeActor::HandleUnexpectedEvent(ui32 typeRewrite) {
ALOG_CRIT(
NKikimrServices::KQP_GATEWAY,
"TAnalyzeActor, unexpected event, request type: " << typeRewrite;
);

Promise.SetValue(
NYql::NCommon::ResultFromError<NYql::IKikimrGateway::TGenericResult>(
YqlIssue(
{}, NYql::TIssuesIds::UNEXPECTED,
TStringBuilder() << "Unexpected event: " << typeRewrite
)
)
);

this->PassAway();
}

}// end of NKikimr::NKqp
Loading

0 comments on commit 2e8a929

Please sign in to comment.