Skip to content

Commit

Permalink
improve replay execution (#6211)
Browse files Browse the repository at this point in the history
  • Loading branch information
gridnevvvit authored Jul 2, 2024
1 parent 9b5182d commit 4c547a3
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 10 deletions.
18 changes: 16 additions & 2 deletions ydb/tools/query_replay_yt/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include <ydb/core/client/minikql_compile/mkql_compile_service.h>
#include <ydb/core/kqp/rm_service/kqp_rm_service.h>
#include <ydb/library/yql/minikql/invoke_builtins/mkql_builtins.h>
#include <ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h>

#include <library/cpp/monlib/dynamic_counters/counters.h>

Expand Down Expand Up @@ -45,6 +46,7 @@ class TQueryReplayMapper
TIntrusivePtr<NKikimr::NMiniKQL::IMutableFunctionRegistry> FunctionRegistry;
TIntrusivePtr<NKikimr::NKqp::TModuleResolverState> ModuleResolverState;

NYql::IHTTPGateway::TPtr HttpGateway;
TVector<TString> UdfFiles;
ui32 ActorSystemThreadsCount = 5;

Expand Down Expand Up @@ -72,6 +74,8 @@ class TQueryReplayMapper
return "write_columns_mismatch";
case TQueryReplayEvents::UncategorizedPlanMismatch:
return "uncategorized_plan_mismatch";
case TQueryReplayEvents::MissingTableMetadata:
return "missing_table_metadata";
default:
return "unspecified";
}
Expand Down Expand Up @@ -107,6 +111,7 @@ class TQueryReplayMapper
ActorSystem->Start();
ActorSystem->Register(NKikimr::NKqp::CreateKqpResourceManagerActor({}, nullptr));
ModuleResolverState = MakeIntrusive<NKikimr::NKqp::TModuleResolverState>();
HttpGateway = NYql::IHTTPGateway::Make();
Y_ABORT_UNLESS(GetYqlDefaultModuleResolver(ModuleResolverState->ExprCtx, ModuleResolverState->ModuleResolver));
}

Expand All @@ -120,7 +125,16 @@ class TQueryReplayMapper
json.InsertValue(key, NJson::TJsonValue(child.AsString()));
}

auto compileActorId = ActorSystem->Register(CreateQueryCompiler(ModuleResolverState, FunctionRegistry.Get()));
TString queryType = row["query_type"].AsString();
if (queryType == "QUERY_TYPE_AST_SCAN") {
continue;
}

if (queryType == "QUERY_TYPE_SQL_GENERIC_SCRIPT") {
continue;
}

auto compileActorId = ActorSystem->Register(CreateQueryCompiler(ModuleResolverState, FunctionRegistry.Get(), HttpGateway));

auto future = ActorSystem->Ask<TQueryReplayEvents::TEvCompileResponse>(
compileActorId,
Expand All @@ -132,7 +146,7 @@ class TQueryReplayMapper

TString failReason = GetFailReason(status);

if (failReason == "unspecified") {
if (failReason == "unspecified" || status == TQueryReplayEvents::MissingTableMetadata) {
continue;
}

Expand Down
36 changes: 29 additions & 7 deletions ydb/tools/query_replay_yt/query_compiler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ struct TMetadataInfoHolder {
class TStaticTableMetadataLoader: public NYql::IKikimrGateway::IKqpTableMetadataLoader, public NYql::IDbSchemeResolver {
TActorSystem* ActorSystem;
std::shared_ptr<TMetadataInfoHolder> TableMetadata;
bool IsMissingTableMetadata = false;

public:
TStaticTableMetadataLoader(TActorSystem* actorSystem, std::shared_ptr<TMetadataInfoHolder>& tableMetadata)
Expand All @@ -152,6 +153,10 @@ class TStaticTableMetadataLoader: public NYql::IKikimrGateway::IKqpTableMetadata
Y_UNUSED(database);
Y_UNUSED(userToken);
auto ptr = TableMetadata->find(table);
if (ptr == TableMetadata->end()) {
IsMissingTableMetadata = true;
}

Y_ENSURE(ptr != TableMetadata->end());

NYql::IKikimrGateway::TTableMetadataResult result;
Expand Down Expand Up @@ -199,6 +204,10 @@ class TStaticTableMetadataLoader: public NYql::IKikimrGateway::IKqpTableMetadata
return results;
}

bool HasMissingTableMetadata() const {
return IsMissingTableMetadata;
}

virtual NThreading::TFuture<TTableResults> ResolveTables(const TVector<TTable>& tables) override {
return NThreading::MakeFuture(Resolve(tables));
}
Expand All @@ -215,13 +224,17 @@ class TStaticTableMetadataLoader: public NYql::IKikimrGateway::IKqpTableMetadata

class TReplayCompileActor: public TActorBootstrapped<TReplayCompileActor> {
public:
TReplayCompileActor(TIntrusivePtr<TModuleResolverState> moduleResolverState, const NMiniKQL::IFunctionRegistry* functionRegistry)
TReplayCompileActor(TIntrusivePtr<TModuleResolverState> moduleResolverState, const NMiniKQL::IFunctionRegistry* functionRegistry,
NYql::IHTTPGateway::TPtr httpGateway)
: ModuleResolverState(moduleResolverState)
, KqpSettings()
, Config(MakeIntrusive<TKikimrConfiguration>())
, FunctionRegistry(functionRegistry)
, HttpGateway(std::move(httpGateway))
{
Config->EnableKqpScanQueryStreamLookup = true;
Config->PredicateExtract20 = true;
Config->EnablePreparedDdl = true;
}

void Bootstrap() {
Expand Down Expand Up @@ -495,7 +508,13 @@ class TReplayCompileActor: public TActorBootstrapped<TReplayCompileActor> {
Y_UNUSED(queryPlan);
if (status != Ydb::StatusIds::SUCCESS) {
ev->Success = false;
ev->Status = status == Ydb::StatusIds::TIMEOUT ? TQueryReplayEvents::CompileTimeout : TQueryReplayEvents::CompileError;
if (MetadataLoader->HasMissingTableMetadata()) {
ev->Status = TQueryReplayEvents::MissingTableMetadata;
} else if (status == Ydb::StatusIds::TIMEOUT) {
ev->Status = TQueryReplayEvents::CompileTimeout;
} else {
ev->Status = TQueryReplayEvents::CompileError;
}
ev->Message = issues.ToString();
Cerr << "Failed to compile query: " << ev->Message << Endl;
WriteJsonData("-repro.txt", ReplayDetails);
Expand Down Expand Up @@ -594,16 +613,17 @@ class TReplayCompileActor: public TActorBootstrapped<TReplayCompileActor> {
Config->_KqpYqlSyntaxVersion = syntax;
Config->FreezeDefaults();

std::shared_ptr<NYql::IKikimrGateway::IKqpTableMetadataLoader> loader = make_shared<TStaticTableMetadataLoader>(TlsActivationContext->ActorSystem(), TableMetadata);
MetadataLoader = make_shared<TStaticTableMetadataLoader>(TlsActivationContext->ActorSystem(), TableMetadata);
std::shared_ptr<NYql::IKikimrGateway::IKqpTableMetadataLoader> loader = MetadataLoader;

auto c = MakeIntrusive<NMonitoring::TDynamicCounters>();
auto counters = MakeIntrusive<TKqpRequestCounters>();
counters->Counters = new TKqpCounters(c);
counters->TxProxyMon = new NTxProxy::TTxProxyMon(c);

Gateway = CreateKikimrIcGateway(Query->Cluster, NKikimrKqp::QUERY_TYPE_SQL_GENERIC_QUERY, Query->Database, std::move(loader),
Gateway = CreateKikimrIcGateway(Query->Cluster, queryType, Query->Database, std::move(loader),
TlsActivationContext->ExecutorThread.ActorSystem, SelfId().NodeId(), counters);
auto federatedQuerySetup = std::make_optional<TKqpFederatedQuerySetup>({NYql::IHTTPGateway::Make(), nullptr, nullptr, nullptr, {}, {}, {}, nullptr, nullptr});
auto federatedQuerySetup = std::make_optional<TKqpFederatedQuerySetup>({HttpGateway, nullptr, nullptr, nullptr, {}, {}, {}, nullptr, nullptr});
KqpHost = CreateKqpHost(Gateway, Query->Cluster, Query->Database, Config, ModuleResolverState->ModuleResolver,
federatedQuerySetup, nullptr, GUCSettings, Nothing(), FunctionRegistry, false);

Expand Down Expand Up @@ -652,10 +672,12 @@ class TReplayCompileActor: public TActorBootstrapped<TReplayCompileActor> {
std::shared_ptr<TMetadataInfoHolder> TableMetadata;
TActorId MiniKQLCompileService;
NJson::TJsonValue ReplayDetails;
std::shared_ptr<TStaticTableMetadataLoader> MetadataLoader;
NYql::IHTTPGateway::TPtr HttpGateway;
};

IActor* CreateQueryCompiler(TIntrusivePtr<TModuleResolverState> moduleResolverState,
const NMiniKQL::IFunctionRegistry* functionRegistry)
const NMiniKQL::IFunctionRegistry* functionRegistry, NYql::IHTTPGateway::TPtr httpGateway)
{
return new TReplayCompileActor(moduleResolverState, functionRegistry);
return new TReplayCompileActor(moduleResolverState, functionRegistry, httpGateway);
}
7 changes: 6 additions & 1 deletion ydb/tools/query_replay_yt/query_replay.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ struct TQueryReplayConfig {
void ParseConfig(int argc, const char** argv);
};

namespace NYql {
class IHTTPGateway;
}

using namespace NActors;

THolder<TActorSystemSetup> BuildActorSystemSetup(ui32 threads, ui32 pools = 1);
Expand All @@ -46,6 +50,7 @@ struct TQueryReplayEvents {
ExtraWriting,
WriteColumnsMismatch,
UncategorizedPlanMismatch,
MissingTableMetadata,
Unspecified,
};

Expand Down Expand Up @@ -73,4 +78,4 @@ struct TQueryReplayEvents {
};

NActors::IActor* CreateQueryCompiler(TIntrusivePtr<NKikimr::NKqp::TModuleResolverState> moduleResolverState,
const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry);
const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry, std::shared_ptr<NYql::IHTTPGateway> httpGateway);

0 comments on commit 4c547a3

Please sign in to comment.