From 34a26dd332227aa4537d135c19c5a711044ffcfb Mon Sep 17 00:00:00 2001 From: Roman Udovichenko Date: Thu, 28 Nov 2024 11:52:07 +0300 Subject: [PATCH] [yt provider] Don't run operation input if sections have nodes to calculate (#11947) --- .../yt/provider/yql_yt_datasink_exec.cpp | 23 +++++++++++++++++-- 1 file changed, 21 insertions(+), 2 deletions(-) diff --git a/ydb/library/yql/providers/yt/provider/yql_yt_datasink_exec.cpp b/ydb/library/yql/providers/yt/provider/yql_yt_datasink_exec.cpp index 0d986df6f7d3..1cddefb54581 100644 --- a/ydb/library/yql/providers/yt/provider/yql_yt_datasink_exec.cpp +++ b/ydb/library/yql/providers/yt/provider/yql_yt_datasink_exec.cpp @@ -94,7 +94,7 @@ class TYtDataSinkExecTransformer : public TExecTransformerBase { TYtMerge::CallableName(), TYtMapReduce::CallableName(), }, - RequireAllOf({TYtTransientOpBase::idx_World, TYtTransientOpBase::idx_Input}), + RequireForTransientOp(), Hndl(&TYtDataSinkExecTransformer::HandleOutputOp) ); AddHandler( @@ -105,7 +105,7 @@ class TYtDataSinkExecTransformer : public TExecTransformerBase { RequireFirst(), Hndl(&TYtDataSinkExecTransformer::HandleOutputOp) ); - AddHandler({TYtReduce::CallableName()}, RequireAllOf({TYtTransientOpBase::idx_World, TYtTransientOpBase::idx_Input}), Hndl(&TYtDataSinkExecTransformer::HandleReduce)); + AddHandler({TYtReduce::CallableName()}, RequireForTransientOp(), Hndl(&TYtDataSinkExecTransformer::HandleReduce)); AddHandler({TYtOutput::CallableName()}, RequireFirst(), Pass()); AddHandler({TYtPublish::CallableName()}, RequireAllOf({TYtPublish::idx_World, TYtPublish::idx_Input}), Hndl(&TYtDataSinkExecTransformer::HandlePublish)); AddHandler({TYtDropTable::CallableName()}, RequireFirst(), Hndl(&TYtDataSinkExecTransformer::HandleDrop)); @@ -124,6 +124,21 @@ class TYtDataSinkExecTransformer : public TExecTransformerBase { TExecTransformerBase::Rewind(); } + static TExecTransformerBase::TPrerequisite RequireForTransientOp() { + return [] (const TExprNode::TPtr& input) { + auto status = RequireChild(*input, TYtTransientOpBase::idx_World); + // We have to run input only if it has no settings to calculate. + // Otherwise, we first of all wait world completion. + // Then begins node execution, which run settings calculation. + // And after that, starts input execution + // See YQL-19303 + if (!HasNodesToCalculate(input->ChildPtr(TYtTransientOpBase::idx_Input))) { + status = status.Combine(RequireChild(*input, TYtTransientOpBase::idx_Input)); + } + return status; + }; + } + private: static void PushHybridStats(const TYtState::TPtr& state, TStringBuf statName, TStringBuf opName, const TStringBuf& folderName = "") { with_lock(state->StatisticsMutex) { @@ -190,6 +205,10 @@ class TYtDataSinkExecTransformer : public TExecTransformerBase { return CalculateNodes(State_, input, cluster, needCalc, ctx); } + if (auto opInput = op.Maybe().Input()) { + YQL_ENSURE(opInput.Ref().GetState() == TExprNode::EState::ExecutionComplete); + } + auto outSection = op.Output(); size_t outWithoutName = 0;