Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[dq] Peephole integration + some transform helpers (YQL-17386) #572

Merged
merged 3 commits into from
Dec 20, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions ydb/library/yql/core/services/yql_transform_pipeline.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@ TTransformationPipeline& TTransformationPipeline::Add(IGraphTransformer& transfo
return *this;
}

TTransformationPipeline& TTransformationPipeline::Add(TTransformStage&& stage) {
Transformers_.push_back(std::move(stage));
return *this;
}

TTransformationPipeline& TTransformationPipeline::AddServiceTransformers(EYqlIssueCode issueCode) {
Transformers_.push_back(TTransformStage(CreateGcNodeTransformer(), "GC", issueCode));
return *this;
Expand Down
1 change: 1 addition & 0 deletions ydb/library/yql/core/services/yql_transform_pipeline.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ class TTransformationPipeline
EYqlIssueCode issueCode = TIssuesIds::DEFAULT_ERROR, const TString& issueMessage = {});
TTransformationPipeline& Add(IGraphTransformer& transformer, const TString& stageName,
EYqlIssueCode issueCode = TIssuesIds::DEFAULT_ERROR, const TString& issueMessage = {});
TTransformationPipeline& Add(TTransformStage&& stage);

TAutoPtr<IGraphTransformer> Build(bool useIssueScopes = true);
TAutoPtr<IGraphTransformer> BuildWithNoArgChecks(bool useIssueScopes = true);
Expand Down
39 changes: 36 additions & 3 deletions ydb/library/yql/core/yql_graph_transformer.h
Original file line number Diff line number Diff line change
Expand Up @@ -264,30 +264,63 @@ class TNullTransformer final: public TSyncTransformerBase {
};

template <typename TFunctor>
class TFunctorTransformer final: public TSyncTransformerBase {
class TFunctorTransformer: public TSyncTransformerBase {
public:
TFunctorTransformer(TFunctor functor)
: Functor_(std::move(functor)) {}

TStatus DoTransform(TExprNode::TPtr input, TExprNode::TPtr& output, TExprContext& ctx) final {
TStatus DoTransform(TExprNode::TPtr input, TExprNode::TPtr& output, TExprContext& ctx) override {
TStatus status = Functor_(input, output, ctx);
YQL_ENSURE(status.Level != IGraphTransformer::TStatus::Async);

return status;
}

void Rewind() final {
void Rewind() override {
}

private:
TFunctor Functor_;
};

template <typename TFunctor>
class TSinglePassFunctorTransformer final: public TFunctorTransformer<TFunctor> {
using TBase = TFunctorTransformer<TFunctor>;
public:
TSinglePassFunctorTransformer(TFunctor functor)
: TFunctorTransformer<TFunctor>(std::move(functor))
{}

IGraphTransformer::TStatus DoTransform(TExprNode::TPtr input, TExprNode::TPtr& output, TExprContext& ctx) final {
if (Pass_) {
output = input;
return IGraphTransformer::TStatus::Ok;
}
IGraphTransformer::TStatus status = TBase::DoTransform(input, output, ctx);
if (IGraphTransformer::TStatus::Ok == status.Level) {
Pass_ = true;
}
return status;
}

void Rewind() final {
Pass_ = false;
}

private:
bool Pass_ = false;
};

template <typename TFunctor>
THolder<IGraphTransformer> CreateFunctorTransformer(TFunctor functor) {
return MakeHolder<TFunctorTransformer<TFunctor>>(std::move(functor));
}

template <typename TFunctor>
THolder<IGraphTransformer> CreateSinglePassFunctorTransformer(TFunctor functor) {
return MakeHolder<TSinglePassFunctorTransformer<TFunctor>>(std::move(functor));
}

typedef std::function<IGraphTransformer::TStatus(const TExprNode::TPtr&, TExprNode::TPtr&, TExprContext&)> TAsyncTransformCallback;
typedef NThreading::TFuture<TAsyncTransformCallback> TAsyncTransformCallbackFuture;

Expand Down
3 changes: 3 additions & 0 deletions ydb/library/yql/dq/integration/yql_dq_integration.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

#include <ydb/library/yql/ast/yql_expr.h>
#include <ydb/library/yql/core/yql_data_provider.h>
#include <ydb/library/yql/core/yql_graph_transformer.h>
#include <ydb/library/yql/core/yql_statistics.h>
#include <ydb/library/yql/dq/tasks/dq_tasks_graph.h>
#include <ydb/library/yql/public/issue/yql_issue.h>
Expand Down Expand Up @@ -72,6 +73,8 @@ class IDqIntegration {
// Return true if node was handled
virtual bool FillSourcePlanProperties(const NNodes::TExprBase& node, TMap<TString, NJson::TJsonValue>& properties) = 0;
virtual bool FillSinkPlanProperties(const NNodes::TExprBase& node, TMap<TString, NJson::TJsonValue>& properties) = 0;
// This transformer will be called before DQ peephole transformations
virtual std::vector<TTransformStage> GetPeepholeTransforms(bool beforeDq, const THashMap<TString, TString>& params) = 0;
};

} // namespace NYql
Original file line number Diff line number Diff line change
Expand Up @@ -92,4 +92,8 @@ bool TDqIntegrationBase::FillSinkPlanProperties(const NNodes::TExprBase&, TMap<T
return false;
}

std::vector<TTransformStage> TDqIntegrationBase::GetPeepholeTransforms(bool, const THashMap<TString, TString>&) {
return {};
}

} // namespace NYql
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ class TDqIntegrationBase: public IDqIntegration {
void WriteFullResultTableRef(NYson::TYsonWriter& writer, const TVector<TString>& columns, const THashMap<TString, TString>& graphParams) override;
bool FillSourcePlanProperties(const NNodes::TExprBase& node, TMap<TString, NJson::TJsonValue>& properties) override;
bool FillSinkPlanProperties(const NNodes::TExprBase& node, TMap<TString, NJson::TJsonValue>& properties) override;
std::vector<TTransformStage> GetPeepholeTransforms(bool beforeDq, const THashMap<TString, TString>& params) override;

protected:
bool CanBlockReadTypes(const TStructExprType* node);
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,14 +224,31 @@ struct TPublicIds {

struct TDqsPipelineConfigurator : public IPipelineConfigurator {
public:
TDqsPipelineConfigurator(const TDqStatePtr& state)
TDqsPipelineConfigurator(const TDqStatePtr& state, const THashMap<TString, TString>& providerParams)
: State_(state)
, ProviderParams_(providerParams)
{
for (const auto& ds: State_->TypeCtx->DataSources) {
if (const auto dq = ds->GetDqIntegration()) {
UniqIntegrations_.emplace(dq);
}
}
for (const auto& ds: State_->TypeCtx->DataSinks) {
if (const auto dq = ds->GetDqIntegration()) {
UniqIntegrations_.emplace(dq);
}
}
}
private:
void AfterCreate(TTransformationPipeline*) const final {}

void AfterTypeAnnotation(TTransformationPipeline* pipeline) const final {
std::for_each(UniqIntegrations_.cbegin(), UniqIntegrations_.cend(), [&](const auto dqInt) {
for (auto& stage: dqInt->GetPeepholeTransforms(true, ProviderParams_)) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pass pipeline to dqInt directly

pipeline->Add(std::move(stage));
}
});

pipeline->Add(NDqs::CreateDqsReplacePrecomputesTransformer(*pipeline->GetTypeAnnotationContext(), State_->FunctionRegistry), "ReplacePrecomputes");
if (State_->Settings->UseBlockReader.Get().GetOrElse(false)) {
pipeline->Add(NDqs::CreateDqsRewritePhyBlockReadOnDqIntegrationTransformer(*pipeline->GetTypeAnnotationContext()), "ReplaceWideReadsWithBlock");
Expand All @@ -255,10 +272,18 @@ struct TDqsPipelineConfigurator : public IPipelineConfigurator {
pipeline->Add(NDqs::CreateDqsRewritePhyCallablesTransformer(*pipeline->GetTypeAnnotationContext()), "RewritePhyCallables");
}

void AfterOptimize(TTransformationPipeline*) const final {}
void AfterOptimize(TTransformationPipeline* pipeline) const final {
std::for_each(UniqIntegrations_.cbegin(), UniqIntegrations_.cend(), [&](const auto dqInt) {
for (auto& stage: dqInt->GetPeepholeTransforms(false, ProviderParams_)) {
pipeline->Add(std::move(stage));
}
});
}

private:
TDqStatePtr State_;
THashMap<TString, TString> ProviderParams_;
std::unordered_set<IDqIntegration*> UniqIntegrations_;
};

TExprNode::TPtr DqMarkBlockStage(const TDqPhyStage& stage, TExprContext& ctx) {
Expand Down Expand Up @@ -792,9 +817,16 @@ class TDqExecTransformer: public TExecTransformerBase, TCounters
try {
auto result = TMaybeNode<TResult>(input).Cast();

THashMap<TString, TString> resSettings;
for (auto s: result.Settings()) {
if (auto val = s.Value().Maybe<TCoAtom>()) {
resSettings.emplace(s.Name().Value(), val.Cast().Value());
}
}

auto precomputes = FindIndependentPrecomputes(result.Input().Ptr());
if (!precomputes.empty()) {
auto status = HandlePrecomputes(precomputes, ctx);
auto status = HandlePrecomputes(precomputes, ctx, resSettings);
if (status.Level != TStatus::Ok) {
if (status == TStatus::Async) {
return std::make_pair(status, ExecState->Promise.GetFuture().Apply([execState = ExecState](const TFuture<void>& completedFuture) {
Expand All @@ -813,10 +845,19 @@ class TDqExecTransformer: public TExecTransformerBase, TCounters
settings->_AllResultsBytesLimit = 64_MB;
}

int level;
TExprNode::TPtr resInput = WrapLambdaBody(level, result.Input().Ptr(), ctx);
{
auto block = MeasureBlock("PeepHole");
if (const auto status = PeepHole(resInput, resInput, ctx, resSettings); status.Level != TStatus::Ok) {
return SyncStatus(status);
}
}

THashMap<TString, TString> secureParams;
NCommon::FillSecureParams(result.Input().Ptr(), *State->TypeCtx, secureParams);
NCommon::FillSecureParams(resInput, *State->TypeCtx, secureParams);

auto graphParams = GatherGraphParams(result.Input().Ptr());
auto graphParams = GatherGraphParams(resInput);
bool hasGraphParams = !graphParams.empty();

TString type;
Expand Down Expand Up @@ -847,15 +888,6 @@ class TDqExecTransformer: public TExecTransformerBase, TCounters
settings->EnableFullResultWrite = enableFullResultWrite;
}

int level;
TExprNode::TPtr resInput = WrapLambdaBody(level, result.Input().Ptr(), ctx);
{
auto block = MeasureBlock("PeepHole");
if (const auto status = PeepHole(resInput, resInput, ctx); status.Level != TStatus::Ok) {
return SyncStatus(status);
}
}

TString lambda;
bool untrustedUdfFlag;
TUploadList uploadList;
Expand Down Expand Up @@ -1111,6 +1143,13 @@ class TDqExecTransformer: public TExecTransformerBase, TCounters
TInstant startTime = TInstant::Now();
auto pull = TPull(input);

THashMap<TString, TString> pullSettings;
for (auto s: pull.Settings()) {
if (auto val = s.Value().Maybe<TCoAtom>()) {
pullSettings.emplace(s.Name().Value(), val.Cast().Value());
}
}

YQL_ENSURE(!TMaybeNode<TDqQuery>(pull.Input().Ptr()) || State->Settings->EnableComputeActor.Get().GetOrElse(false),
"DqQuery is not supported with worker actor");

Expand All @@ -1120,7 +1159,7 @@ class TDqExecTransformer: public TExecTransformerBase, TCounters

auto precomputes = FindIndependentPrecomputes(pull.Input().Ptr());
if (!precomputes.empty()) {
auto status = HandlePrecomputes(precomputes, ctx);
auto status = HandlePrecomputes(precomputes, ctx, pullSettings);
if (status.Level != TStatus::Ok) {
if (status == TStatus::Async) {
return std::make_pair(status, ExecState->Promise.GetFuture().Apply([execState = ExecState](const TFuture<void>& completedFuture) {
Expand All @@ -1145,7 +1184,7 @@ class TDqExecTransformer: public TExecTransformerBase, TCounters
optimizedInput->SetTypeAnn(pull.Input().Ref().GetTypeAnn());
optimizedInput->CopyConstraints(pull.Input().Ref());

auto status = PeepHole(optimizedInput, optimizedInput, ctx);
auto status = PeepHole(optimizedInput, optimizedInput, ctx, pullSettings);
if (status.Level != TStatus::Ok) {
return SyncStatus(status);
}
Expand Down Expand Up @@ -1593,7 +1632,7 @@ class TDqExecTransformer: public TExecTransformerBase, TCounters
});
}

IGraphTransformer::TStatus HandlePrecomputes(const TNodeOnNodeOwnedMap& precomputes, TExprContext& ctx) {
IGraphTransformer::TStatus HandlePrecomputes(const TNodeOnNodeOwnedMap& precomputes, TExprContext& ctx, const THashMap<TString, TString>& providerParams) {

IDataProvider::TFillSettings fillSettings;
fillSettings.AllResultsBytesLimit.Clear();
Expand All @@ -1620,7 +1659,7 @@ class TDqExecTransformer: public TExecTransformerBase, TCounters

auto optimizedInput = input;
optimizedInput->SetState(TExprNode::EState::ConstrComplete);
auto status = PeepHole(optimizedInput, optimizedInput, ctx);
auto status = PeepHole(optimizedInput, optimizedInput, ctx, providerParams);
if (status.Level != TStatus::Ok) {
return combinedStatus.Combine(status);
}
Expand Down Expand Up @@ -1840,8 +1879,8 @@ class TDqExecTransformer: public TExecTransformerBase, TCounters
return combinedStatus;
}

IGraphTransformer::TStatus PeepHole(TExprNode::TPtr input, TExprNode::TPtr& output, TExprContext& ctx) const {
TDqsPipelineConfigurator peepholeConfig(State);
IGraphTransformer::TStatus PeepHole(TExprNode::TPtr input, TExprNode::TPtr& output, TExprContext& ctx, const THashMap<TString, TString>& providerParams) const {
TDqsPipelineConfigurator peepholeConfig(State, providerParams);
TDqsFinalPipelineConfigurator finalPeepholeConfg;
TPeepholeSettings peepholeSettings;
peepholeSettings.CommonConfig = &peepholeConfig;
Expand Down