From 9d1cd2747dc601ea8280beac1c48df1a232c353d Mon Sep 17 00:00:00 2001 From: Vitaly Stoyan Date: Sat, 27 Apr 2024 13:02:26 +0300 Subject: [PATCH] Initial support of replaying udf_resolver (#4166) --- ydb/library/yql/core/facade/ya.make | 1 + ydb/library/yql/core/facade/yql_facade.cpp | 4 + ydb/library/yql/core/facade/yql_facade.h | 2 +- .../yql/core/qplayer/udf_resolver/ya.make | 16 ++ .../udf_resolver/yql_qplayer_udf_resolver.cpp | 152 ++++++++++++++++++ .../udf_resolver/yql_qplayer_udf_resolver.h | 9 ++ ydb/library/yql/core/qplayer/ya.make | 1 + ydb/library/yql/core/ut/ya.make | 2 + ydb/library/yql/core/ut/yql_qplayer_ut.cpp | 19 ++- ydb/library/yql/tools/dqrun/dqrun.cpp | 26 +-- 10 files changed, 217 insertions(+), 15 deletions(-) create mode 100644 ydb/library/yql/core/qplayer/udf_resolver/ya.make create mode 100644 ydb/library/yql/core/qplayer/udf_resolver/yql_qplayer_udf_resolver.cpp create mode 100644 ydb/library/yql/core/qplayer/udf_resolver/yql_qplayer_udf_resolver.h diff --git a/ydb/library/yql/core/facade/ya.make b/ydb/library/yql/core/facade/ya.make index 8a09cfd941e9..0b4ec691f1b4 100644 --- a/ydb/library/yql/core/facade/ya.make +++ b/ydb/library/yql/core/facade/ya.make @@ -19,6 +19,7 @@ PEERDIR( ydb/library/yql/core/url_preprocessing/interface ydb/library/yql/core/credentials ydb/library/yql/core/qplayer/storage/interface + ydb/library/yql/core/qplayer/udf_resolver ydb/library/yql/sql ydb/library/yql/utils/log ydb/library/yql/core diff --git a/ydb/library/yql/core/facade/yql_facade.cpp b/ydb/library/yql/core/facade/yql_facade.cpp index 23ec8486c66d..13f65d829817 100644 --- a/ydb/library/yql/core/facade/yql_facade.cpp +++ b/ydb/library/yql/core/facade/yql_facade.cpp @@ -22,6 +22,7 @@ #include #include #include +#include #include #include @@ -316,7 +317,10 @@ TProgram::~TProgram() { void TProgram::SetQContext(const TQContext& qContext) { YQL_PROFILE_FUNC(TRACE); YQL_ENSURE(SourceSyntax_ == ESourceSyntax::Unknown); + YQL_ENSURE(!QContext_.CanRead() && !QContext_.CanWrite()); + YQL_ENSURE(qContext.CanRead() || qContext.CanWrite()); QContext_ = qContext; + UdfResolver_ = NCommon::WrapUdfResolverWithQContext(UdfResolver_, qContext); } void TProgram::ConfigureYsonResultFormat(NYson::EYsonFormat format) { diff --git a/ydb/library/yql/core/facade/yql_facade.h b/ydb/library/yql/core/facade/yql_facade.h index e794f8052525..c47a4a6fa856 100644 --- a/ydb/library/yql/core/facade/yql_facade.h +++ b/ydb/library/yql/core/facade/yql_facade.h @@ -401,7 +401,7 @@ class TProgram: public TThrRefBase, private TNonCopyable TYqlOperationOptions OperationOptions_; TCredentials::TPtr Credentials_; const IUrlListerManagerPtr UrlListerManager_; - const IUdfResolver::TPtr UdfResolver_; + IUdfResolver::TPtr UdfResolver_; const TUdfIndex::TPtr UdfIndex_; const TUdfIndexPackageSet::TPtr UdfIndexPackageSet_; const TFileStoragePtr FileStorage_; diff --git a/ydb/library/yql/core/qplayer/udf_resolver/ya.make b/ydb/library/yql/core/qplayer/udf_resolver/ya.make new file mode 100644 index 000000000000..1a4160496cf4 --- /dev/null +++ b/ydb/library/yql/core/qplayer/udf_resolver/ya.make @@ -0,0 +1,16 @@ +LIBRARY() + +SRCS( + yql_qplayer_udf_resolver.cpp +) + +PEERDIR( + ydb/library/yql/core/qplayer/storage/interface + ydb/library/yql/providers/common/schema/expr + ydb/library/yql/core + library/cpp/yson/node + contrib/libs/openssl +) + +END() + diff --git a/ydb/library/yql/core/qplayer/udf_resolver/yql_qplayer_udf_resolver.cpp b/ydb/library/yql/core/qplayer/udf_resolver/yql_qplayer_udf_resolver.cpp new file mode 100644 index 000000000000..cccdec5781bb --- /dev/null +++ b/ydb/library/yql/core/qplayer/udf_resolver/yql_qplayer_udf_resolver.cpp @@ -0,0 +1,152 @@ +#include "yql_qplayer_udf_resolver.h" + +#include +#include +#include + +#include + +namespace NYql::NCommon { + +namespace { + +const TString UdfResolver_LoadMetadata = "UdfResolver_LoadMetadata"; + +TString MakeHash(const TString& str) { + SHA256_CTX sha; + SHA256_Init(&sha); + SHA256_Update(&sha, str.Data(), str.Size()); + unsigned char hash[SHA256_DIGEST_LENGTH]; + SHA256_Final(hash, &sha); + return TString((const char*)hash, sizeof(hash)); +} + +class TResolver : public IUdfResolver { +public: + TResolver(IUdfResolver::TPtr inner, const TQContext& qContext) + : Inner_(inner) + , QContext_(qContext) + {} + + TMaybe GetSystemModulePath(const TStringBuf& moduleName) const final { + if (QContext_.CanRead()) { + ythrow yexception() << "can't replay GetSystemModulePath"; + } + + return Inner_-> GetSystemModulePath(moduleName); + } + + bool LoadMetadata(const TVector& imports, + const TVector& functions, TExprContext& ctx) const final { + if (QContext_.CanRead()) { + for (auto& f : functions) { + auto key = MakeKey(f); + auto res = QContext_.GetReader()->Get({UdfResolver_LoadMetadata, key}).GetValueSync(); + if (!res) { + ythrow yexception() << "Missing replay data"; + } + + LoadValue(f, res->Value, ctx); + } + + return true; + } + + auto res = Inner_->LoadMetadata(imports, functions, ctx); + if (res && QContext_.CanWrite()) { + // calculate hash for each function and store it + for (const auto& f : functions) { + auto key = MakeKey(f); + auto value = SaveValue(f); + QContext_.GetWriter()->Put({UdfResolver_LoadMetadata, key}, value).GetValueSync(); + } + } + + return res; + } + + TResolveResult LoadRichMetadata(const TVector& imports) const final { + if (QContext_.CanRead()) { + ythrow yexception() << "can't replay LoadRichMetadata"; + } + + return Inner_->LoadRichMetadata(imports); + } + + bool ContainsModule(const TStringBuf& moduleName) const final { + if (QContext_.CanRead()) { + ythrow yexception() << "can't replay ContainsModule"; + } + + return Inner_->ContainsModule(moduleName); + } + +private: + TString MakeKey(const TFunction* f) const { + auto node = NYT::TNode() + ("Name", NYT::TNode(f->Name)); + if (f->TypeConfig) { + node("TypeConfig", NYT::TNode(f->TypeConfig)); + } + + if (f->UserType) { + node("UserType", TypeToYsonNode(f->UserType)); + } + + return MakeHash(NYT::NodeToCanonicalYsonString(node, NYT::NYson::EYsonFormat::Binary)); + } + + TString SaveValue(const TFunction* f) const { + auto node = NYT::TNode() + ("CallableType", TypeToYsonNode(f->CallableType)); + if (f->NormalizedUserType && f->NormalizedUserType->GetKind() != ETypeAnnotationKind::Void) { + node("NormalizedUserType", TypeToYsonNode(f->NormalizedUserType)); + } + + if (f->RunConfigType && f->RunConfigType->GetKind() != ETypeAnnotationKind::Void) { + node("RunConfigType", TypeToYsonNode(f->RunConfigType)); + } + + if (f->SupportsBlocks) { + node("SupportsBlocks", NYT::TNode(true)); + } + + if (f->IsStrict) { + node("IsStrict", NYT::TNode(true)); + } + + return NYT::NodeToYsonString(node,NYT::NYson::EYsonFormat::Binary); + } + + void LoadValue(TFunction* f, const TString& value, TExprContext& ctx) const { + auto node = NYT::NodeFromYsonString(value); + f->CallableType = ParseTypeFromYson(node["CallableType"], ctx); + if (node.HasKey("NormalizedUserType")) { + f->NormalizedUserType = ParseTypeFromYson(node["NormalizedUserType"], ctx); + } + + if (node.HasKey("RunConfigType")) { + f->RunConfigType = ParseTypeFromYson(node["RunConfigType"], ctx); + } + + if (node.HasKey("SupportsBlocks")) { + f->SupportsBlocks = node["SupportsBlocks"].AsBool(); + } + + if (node.HasKey("IsStrict")) { + f->IsStrict = node["IsStrict"].AsBool(); + } + } + +private: + const IUdfResolver::TPtr Inner_; + const TQContext QContext_; +}; + +} + +IUdfResolver::TPtr WrapUdfResolverWithQContext(IUdfResolver::TPtr inner, const TQContext& qContext) { + return new TResolver(inner, qContext); +} + +} diff --git a/ydb/library/yql/core/qplayer/udf_resolver/yql_qplayer_udf_resolver.h b/ydb/library/yql/core/qplayer/udf_resolver/yql_qplayer_udf_resolver.h new file mode 100644 index 000000000000..17790f2510dc --- /dev/null +++ b/ydb/library/yql/core/qplayer/udf_resolver/yql_qplayer_udf_resolver.h @@ -0,0 +1,9 @@ +#pragma once +#include +#include + +namespace NYql::NCommon { + +IUdfResolver::TPtr WrapUdfResolverWithQContext(IUdfResolver::TPtr inner, const TQContext& qContext); + +} diff --git a/ydb/library/yql/core/qplayer/ya.make b/ydb/library/yql/core/qplayer/ya.make index 25965070b632..d8708620effe 100644 --- a/ydb/library/yql/core/qplayer/ya.make +++ b/ydb/library/yql/core/qplayer/ya.make @@ -1,4 +1,5 @@ RECURSE( storage + udf_resolver ) diff --git a/ydb/library/yql/core/ut/ya.make b/ydb/library/yql/core/ut/ya.make index b786e7ae08e8..39813be01eb5 100644 --- a/ydb/library/yql/core/ut/ya.make +++ b/ydb/library/yql/core/ut/ya.make @@ -21,6 +21,7 @@ PEERDIR( ydb/library/yql/core/facade ydb/library/yql/core/services ydb/library/yql/core/qplayer/storage/memory + ydb/library/yql/providers/common/udf_resolve ydb/library/yql/public/udf ydb/library/yql/public/udf/service/exception_policy ydb/library/yql/core/type_ann @@ -35,6 +36,7 @@ PEERDIR( ydb/library/yql/minikql/comp_nodes/llvm14 ydb/library/yql/minikql/invoke_builtins/llvm14 ydb/library/yql/sql/pg + ydb/library/yql/udfs/common/string ) IF (SANITIZER_TYPE == "thread" OR WITH_VALGRIND) diff --git a/ydb/library/yql/core/ut/yql_qplayer_ut.cpp b/ydb/library/yql/core/ut/yql_qplayer_ut.cpp index 9dfe1ad86f7a..c618f662685f 100644 --- a/ydb/library/yql/core/ut/yql_qplayer_ut.cpp +++ b/ydb/library/yql/core/ut/yql_qplayer_ut.cpp @@ -4,6 +4,7 @@ #include #include #include +#include #include @@ -11,14 +12,21 @@ using namespace NYql; -bool RunProgram(const TString& sourceCode, const TQContext& qContext, bool isSql) { +bool RunProgram(const TString& sourceCode, const TQContext& qContext, bool isSql, bool withUdfs) { auto functionRegistry = NKikimr::NMiniKQL::CreateFunctionRegistry(NKikimr::NMiniKQL::CreateBuiltinRegistry()); + if (withUdfs) { + auto cloned = functionRegistry->Clone(); + NKikimr::NMiniKQL::FillStaticModules(*cloned); + functionRegistry = cloned; + } + auto yqlNativeServices = NFile::TYtFileServices::Make(functionRegistry.Get(), {}, {}, ""); auto ytGateway = CreateYtFileGateway(yqlNativeServices); TVector dataProvidersInit; dataProvidersInit.push_back(GetYtNativeDataProviderInitializer(ytGateway)); TProgramFactory factory(true, functionRegistry.Get(), 0ULL, dataProvidersInit, "ut"); + factory.SetUdfResolver(NCommon::CreateSimpleUdfResolver(functionRegistry.Get())); TProgramPtr program = factory.Create("-stdin-", sourceCode); program->SetQContext(qContext); @@ -48,10 +56,10 @@ bool RunProgram(const TString& sourceCode, const TQContext& qContext, bool isSql void CheckProgram(const TString& sql, bool isSql = true) { auto qStorage = MakeMemoryQStorage(); TQContext savingCtx(qStorage->MakeWriter("foo")); - UNIT_ASSERT(RunProgram(sql, savingCtx, isSql)); + UNIT_ASSERT(RunProgram(sql, savingCtx, isSql, true)); savingCtx.GetWriter()->Commit().GetValueSync(); TQContext loadingCtx(qStorage->MakeReader("foo")); - UNIT_ASSERT(RunProgram("", loadingCtx, isSql)); + UNIT_ASSERT(RunProgram("", loadingCtx, isSql, false)); } Y_UNIT_TEST_SUITE(QPlayerTests) { @@ -97,4 +105,9 @@ Y_UNIT_TEST_SUITE(QPlayerTests) { auto s = "select 1"; CheckProgram(s); } + + Y_UNIT_TEST(Udf) { + auto s = "select String::AsciiToUpper('a')"; + CheckProgram(s); + } } diff --git a/ydb/library/yql/tools/dqrun/dqrun.cpp b/ydb/library/yql/tools/dqrun/dqrun.cpp index 9f5133e6b0ea..ed1f9f8a43e9 100644 --- a/ydb/library/yql/tools/dqrun/dqrun.cpp +++ b/ydb/library/yql/tools/dqrun/dqrun.cpp @@ -639,6 +639,14 @@ int RunMain(int argc, const char* argv[]) return 1; } + if (res.Has("replay")) { + qStorage = MakeFileQStorage(qStorageDir); + qContext = TQContext(qStorage->MakeReader(opId)); + } else if (res.Has("capture")) { + qStorage = MakeFileQStorage(qStorageDir); + qContext = TQContext(qStorage->MakeWriter(opId)); + } + if (res.Has("dq-host")) { dqHost = res.Get("dq-host"); } @@ -937,12 +945,15 @@ int RunMain(int argc, const char* argv[]) TProgramFactory progFactory(emulateYt, funcRegistry.Get(), ctx.NextUniqueId, dataProvidersInit, "dqrun"); progFactory.AddUserDataTable(std::move(dataTable)); progFactory.SetModules(moduleResolver); + IUdfResolver::TPtr udfResolverImpl; if (udfResolver) { - progFactory.SetUdfResolver(NCommon::CreateOutProcUdfResolver(funcRegistry.Get(), storage, - udfResolver, {}, {}, udfResolverFilterSyscalls, {})); + udfResolverImpl = NCommon::CreateOutProcUdfResolver(funcRegistry.Get(), storage, + udfResolver, {}, {}, udfResolverFilterSyscalls, {}); } else { - progFactory.SetUdfResolver(NCommon::CreateSimpleUdfResolver(funcRegistry.Get(), storage, true)); + udfResolverImpl = NCommon::CreateSimpleUdfResolver(funcRegistry.Get(), storage, true); } + + progFactory.SetUdfResolver(udfResolverImpl); progFactory.SetFileStorage(storage); progFactory.SetUrlPreprocessing(new TUrlPreprocessing(gatewaysConfig)); progFactory.SetGatewaysConfig(&gatewaysConfig); @@ -985,14 +996,7 @@ int RunMain(int argc, const char* argv[]) program->SetQueryName(progFile); } - if (res.Has("replay")) { - qStorage = MakeFileQStorage(qStorageDir); - qContext = TQContext(qStorage->MakeReader(opId)); - program->SetQContext(qContext); - } else if (res.Has("capture")) { - Y_ENSURE(opId); - qStorage = MakeFileQStorage(qStorageDir); - qContext = TQContext(qStorage->MakeWriter(opId)); + if (qStorage) { program->SetQContext(qContext); }