Skip to content

Commit

Permalink
Initial support of replaying udf_resolver (ydb-platform#4166)
Browse files Browse the repository at this point in the history
  • Loading branch information
vitstn authored Apr 27, 2024
1 parent d8c3404 commit 9d1cd27
Show file tree
Hide file tree
Showing 10 changed files with 217 additions and 15 deletions.
1 change: 1 addition & 0 deletions ydb/library/yql/core/facade/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ PEERDIR(
ydb/library/yql/core/url_preprocessing/interface
ydb/library/yql/core/credentials
ydb/library/yql/core/qplayer/storage/interface
ydb/library/yql/core/qplayer/udf_resolver
ydb/library/yql/sql
ydb/library/yql/utils/log
ydb/library/yql/core
Expand Down
4 changes: 4 additions & 0 deletions ydb/library/yql/core/facade/yql_facade.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include <ydb/library/yql/providers/common/arrow_resolve/yql_simple_arrow_resolver.h>
#include <ydb/library/yql/providers/common/proto/gateways_config.pb.h>
#include <ydb/library/yql/providers/common/config/yql_setting.h>
#include <ydb/library/yql/core/qplayer/udf_resolver/yql_qplayer_udf_resolver.h>

#include <library/cpp/yson/node/node_io.h>
#include <library/cpp/deprecated/split/split_iterator.h>
Expand Down Expand Up @@ -316,7 +317,10 @@ TProgram::~TProgram() {
void TProgram::SetQContext(const TQContext& qContext) {
YQL_PROFILE_FUNC(TRACE);
YQL_ENSURE(SourceSyntax_ == ESourceSyntax::Unknown);
YQL_ENSURE(!QContext_.CanRead() && !QContext_.CanWrite());
YQL_ENSURE(qContext.CanRead() || qContext.CanWrite());
QContext_ = qContext;
UdfResolver_ = NCommon::WrapUdfResolverWithQContext(UdfResolver_, qContext);
}

void TProgram::ConfigureYsonResultFormat(NYson::EYsonFormat format) {
Expand Down
2 changes: 1 addition & 1 deletion ydb/library/yql/core/facade/yql_facade.h
Original file line number Diff line number Diff line change
Expand Up @@ -401,7 +401,7 @@ class TProgram: public TThrRefBase, private TNonCopyable
TYqlOperationOptions OperationOptions_;
TCredentials::TPtr Credentials_;
const IUrlListerManagerPtr UrlListerManager_;
const IUdfResolver::TPtr UdfResolver_;
IUdfResolver::TPtr UdfResolver_;
const TUdfIndex::TPtr UdfIndex_;
const TUdfIndexPackageSet::TPtr UdfIndexPackageSet_;
const TFileStoragePtr FileStorage_;
Expand Down
16 changes: 16 additions & 0 deletions ydb/library/yql/core/qplayer/udf_resolver/ya.make
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
LIBRARY()

SRCS(
yql_qplayer_udf_resolver.cpp
)

PEERDIR(
ydb/library/yql/core/qplayer/storage/interface
ydb/library/yql/providers/common/schema/expr
ydb/library/yql/core
library/cpp/yson/node
contrib/libs/openssl
)

END()

152 changes: 152 additions & 0 deletions ydb/library/yql/core/qplayer/udf_resolver/yql_qplayer_udf_resolver.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
#include "yql_qplayer_udf_resolver.h"

#include <ydb/library/yql/ast/yql_expr.h>
#include <ydb/library/yql/providers/common/schema/expr/yql_expr_schema.h>
#include <library/cpp/yson/node/node_io.h>

#include <openssl/sha.h>

namespace NYql::NCommon {

namespace {

const TString UdfResolver_LoadMetadata = "UdfResolver_LoadMetadata";

TString MakeHash(const TString& str) {
SHA256_CTX sha;
SHA256_Init(&sha);
SHA256_Update(&sha, str.Data(), str.Size());
unsigned char hash[SHA256_DIGEST_LENGTH];
SHA256_Final(hash, &sha);
return TString((const char*)hash, sizeof(hash));
}

class TResolver : public IUdfResolver {
public:
TResolver(IUdfResolver::TPtr inner, const TQContext& qContext)
: Inner_(inner)
, QContext_(qContext)
{}

TMaybe<TFilePathWithMd5> GetSystemModulePath(const TStringBuf& moduleName) const final {
if (QContext_.CanRead()) {
ythrow yexception() << "can't replay GetSystemModulePath";
}

return Inner_-> GetSystemModulePath(moduleName);
}

bool LoadMetadata(const TVector<TImport*>& imports,
const TVector<TFunction*>& functions, TExprContext& ctx) const final {
if (QContext_.CanRead()) {
for (auto& f : functions) {
auto key = MakeKey(f);
auto res = QContext_.GetReader()->Get({UdfResolver_LoadMetadata, key}).GetValueSync();
if (!res) {
ythrow yexception() << "Missing replay data";
}

LoadValue(f, res->Value, ctx);
}

return true;
}

auto res = Inner_->LoadMetadata(imports, functions, ctx);
if (res && QContext_.CanWrite()) {
// calculate hash for each function and store it
for (const auto& f : functions) {
auto key = MakeKey(f);
auto value = SaveValue(f);
QContext_.GetWriter()->Put({UdfResolver_LoadMetadata, key}, value).GetValueSync();
}
}

return res;
}

TResolveResult LoadRichMetadata(const TVector<TImport>& imports) const final {
if (QContext_.CanRead()) {
ythrow yexception() << "can't replay LoadRichMetadata";
}

return Inner_->LoadRichMetadata(imports);
}

bool ContainsModule(const TStringBuf& moduleName) const final {
if (QContext_.CanRead()) {
ythrow yexception() << "can't replay ContainsModule";
}

return Inner_->ContainsModule(moduleName);
}

private:
TString MakeKey(const TFunction* f) const {
auto node = NYT::TNode()
("Name", NYT::TNode(f->Name));
if (f->TypeConfig) {
node("TypeConfig", NYT::TNode(f->TypeConfig));
}

if (f->UserType) {
node("UserType", TypeToYsonNode(f->UserType));
}

return MakeHash(NYT::NodeToCanonicalYsonString(node, NYT::NYson::EYsonFormat::Binary));
}

TString SaveValue(const TFunction* f) const {
auto node = NYT::TNode()
("CallableType", TypeToYsonNode(f->CallableType));
if (f->NormalizedUserType && f->NormalizedUserType->GetKind() != ETypeAnnotationKind::Void) {
node("NormalizedUserType", TypeToYsonNode(f->NormalizedUserType));
}

if (f->RunConfigType && f->RunConfigType->GetKind() != ETypeAnnotationKind::Void) {
node("RunConfigType", TypeToYsonNode(f->RunConfigType));
}

if (f->SupportsBlocks) {
node("SupportsBlocks", NYT::TNode(true));
}

if (f->IsStrict) {
node("IsStrict", NYT::TNode(true));
}

return NYT::NodeToYsonString(node,NYT::NYson::EYsonFormat::Binary);
}

void LoadValue(TFunction* f, const TString& value, TExprContext& ctx) const {
auto node = NYT::NodeFromYsonString(value);
f->CallableType = ParseTypeFromYson(node["CallableType"], ctx);
if (node.HasKey("NormalizedUserType")) {
f->NormalizedUserType = ParseTypeFromYson(node["NormalizedUserType"], ctx);
}

if (node.HasKey("RunConfigType")) {
f->RunConfigType = ParseTypeFromYson(node["RunConfigType"], ctx);
}

if (node.HasKey("SupportsBlocks")) {
f->SupportsBlocks = node["SupportsBlocks"].AsBool();
}

if (node.HasKey("IsStrict")) {
f->IsStrict = node["IsStrict"].AsBool();
}
}

private:
const IUdfResolver::TPtr Inner_;
const TQContext QContext_;
};

}

IUdfResolver::TPtr WrapUdfResolverWithQContext(IUdfResolver::TPtr inner, const TQContext& qContext) {
return new TResolver(inner, qContext);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
#pragma once
#include <ydb/library/yql/core/yql_udf_resolver.h>
#include <ydb/library/yql/core/qplayer/storage/interface/yql_qstorage.h>

namespace NYql::NCommon {

IUdfResolver::TPtr WrapUdfResolverWithQContext(IUdfResolver::TPtr inner, const TQContext& qContext);

}
1 change: 1 addition & 0 deletions ydb/library/yql/core/qplayer/ya.make
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
RECURSE(
storage
udf_resolver
)

2 changes: 2 additions & 0 deletions ydb/library/yql/core/ut/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ PEERDIR(
ydb/library/yql/core/facade
ydb/library/yql/core/services
ydb/library/yql/core/qplayer/storage/memory
ydb/library/yql/providers/common/udf_resolve
ydb/library/yql/public/udf
ydb/library/yql/public/udf/service/exception_policy
ydb/library/yql/core/type_ann
Expand All @@ -35,6 +36,7 @@ PEERDIR(
ydb/library/yql/minikql/comp_nodes/llvm14
ydb/library/yql/minikql/invoke_builtins/llvm14
ydb/library/yql/sql/pg
ydb/library/yql/udfs/common/string
)

IF (SANITIZER_TYPE == "thread" OR WITH_VALGRIND)
Expand Down
19 changes: 16 additions & 3 deletions ydb/library/yql/core/ut/yql_qplayer_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,29 @@
#include <ydb/library/yql/providers/yt/gateway/file/yql_yt_file_services.h>
#include <ydb/library/yql/core/facade/yql_facade.h>
#include <ydb/library/yql/core/qplayer/storage/memory/yql_qstorage_memory.h>
#include <ydb/library/yql/providers/common/udf_resolve/yql_simple_udf_resolver.h>

#include <library/cpp/testing/unittest/registar.h>

#include <util/system/user.h>

using namespace NYql;

bool RunProgram(const TString& sourceCode, const TQContext& qContext, bool isSql) {
bool RunProgram(const TString& sourceCode, const TQContext& qContext, bool isSql, bool withUdfs) {
auto functionRegistry = NKikimr::NMiniKQL::CreateFunctionRegistry(NKikimr::NMiniKQL::CreateBuiltinRegistry());
if (withUdfs) {
auto cloned = functionRegistry->Clone();
NKikimr::NMiniKQL::FillStaticModules(*cloned);
functionRegistry = cloned;
}

auto yqlNativeServices = NFile::TYtFileServices::Make(functionRegistry.Get(), {}, {}, "");
auto ytGateway = CreateYtFileGateway(yqlNativeServices);

TVector<TDataProviderInitializer> dataProvidersInit;
dataProvidersInit.push_back(GetYtNativeDataProviderInitializer(ytGateway));
TProgramFactory factory(true, functionRegistry.Get(), 0ULL, dataProvidersInit, "ut");
factory.SetUdfResolver(NCommon::CreateSimpleUdfResolver(functionRegistry.Get()));

TProgramPtr program = factory.Create("-stdin-", sourceCode);
program->SetQContext(qContext);
Expand Down Expand Up @@ -48,10 +56,10 @@ bool RunProgram(const TString& sourceCode, const TQContext& qContext, bool isSql
void CheckProgram(const TString& sql, bool isSql = true) {
auto qStorage = MakeMemoryQStorage();
TQContext savingCtx(qStorage->MakeWriter("foo"));
UNIT_ASSERT(RunProgram(sql, savingCtx, isSql));
UNIT_ASSERT(RunProgram(sql, savingCtx, isSql, true));
savingCtx.GetWriter()->Commit().GetValueSync();
TQContext loadingCtx(qStorage->MakeReader("foo"));
UNIT_ASSERT(RunProgram("", loadingCtx, isSql));
UNIT_ASSERT(RunProgram("", loadingCtx, isSql, false));
}

Y_UNIT_TEST_SUITE(QPlayerTests) {
Expand Down Expand Up @@ -97,4 +105,9 @@ Y_UNIT_TEST_SUITE(QPlayerTests) {
auto s = "select 1";
CheckProgram(s);
}

Y_UNIT_TEST(Udf) {
auto s = "select String::AsciiToUpper('a')";
CheckProgram(s);
}
}
26 changes: 15 additions & 11 deletions ydb/library/yql/tools/dqrun/dqrun.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -639,6 +639,14 @@ int RunMain(int argc, const char* argv[])
return 1;
}

if (res.Has("replay")) {
qStorage = MakeFileQStorage(qStorageDir);
qContext = TQContext(qStorage->MakeReader(opId));
} else if (res.Has("capture")) {
qStorage = MakeFileQStorage(qStorageDir);
qContext = TQContext(qStorage->MakeWriter(opId));
}

if (res.Has("dq-host")) {
dqHost = res.Get<TString>("dq-host");
}
Expand Down Expand Up @@ -937,12 +945,15 @@ int RunMain(int argc, const char* argv[])
TProgramFactory progFactory(emulateYt, funcRegistry.Get(), ctx.NextUniqueId, dataProvidersInit, "dqrun");
progFactory.AddUserDataTable(std::move(dataTable));
progFactory.SetModules(moduleResolver);
IUdfResolver::TPtr udfResolverImpl;
if (udfResolver) {
progFactory.SetUdfResolver(NCommon::CreateOutProcUdfResolver(funcRegistry.Get(), storage,
udfResolver, {}, {}, udfResolverFilterSyscalls, {}));
udfResolverImpl = NCommon::CreateOutProcUdfResolver(funcRegistry.Get(), storage,
udfResolver, {}, {}, udfResolverFilterSyscalls, {});
} else {
progFactory.SetUdfResolver(NCommon::CreateSimpleUdfResolver(funcRegistry.Get(), storage, true));
udfResolverImpl = NCommon::CreateSimpleUdfResolver(funcRegistry.Get(), storage, true);
}

progFactory.SetUdfResolver(udfResolverImpl);
progFactory.SetFileStorage(storage);
progFactory.SetUrlPreprocessing(new TUrlPreprocessing(gatewaysConfig));
progFactory.SetGatewaysConfig(&gatewaysConfig);
Expand Down Expand Up @@ -985,14 +996,7 @@ int RunMain(int argc, const char* argv[])
program->SetQueryName(progFile);
}

if (res.Has("replay")) {
qStorage = MakeFileQStorage(qStorageDir);
qContext = TQContext(qStorage->MakeReader(opId));
program->SetQContext(qContext);
} else if (res.Has("capture")) {
Y_ENSURE(opId);
qStorage = MakeFileQStorage(qStorageDir);
qContext = TQContext(qStorage->MakeWriter(opId));
if (qStorage) {
program->SetQContext(qContext);
}

Expand Down

0 comments on commit 9d1cd27

Please sign in to comment.