Skip to content

Commit

Permalink
[yt provider] Don't run operation input if sections have nodes to cal…
Browse files Browse the repository at this point in the history
…culate (#11947)
  • Loading branch information
rvu1024 authored Nov 28, 2024
1 parent bf6d055 commit 34a26dd
Showing 1 changed file with 21 additions and 2 deletions.
23 changes: 21 additions & 2 deletions ydb/library/yql/providers/yt/provider/yql_yt_datasink_exec.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ class TYtDataSinkExecTransformer : public TExecTransformerBase {
TYtMerge::CallableName(),
TYtMapReduce::CallableName(),
},
RequireAllOf({TYtTransientOpBase::idx_World, TYtTransientOpBase::idx_Input}),
RequireForTransientOp(),
Hndl(&TYtDataSinkExecTransformer::HandleOutputOp<true>)
);
AddHandler(
Expand All @@ -105,7 +105,7 @@ class TYtDataSinkExecTransformer : public TExecTransformerBase {
RequireFirst(),
Hndl(&TYtDataSinkExecTransformer::HandleOutputOp<true>)
);
AddHandler({TYtReduce::CallableName()}, RequireAllOf({TYtTransientOpBase::idx_World, TYtTransientOpBase::idx_Input}), Hndl(&TYtDataSinkExecTransformer::HandleReduce));
AddHandler({TYtReduce::CallableName()}, RequireForTransientOp(), Hndl(&TYtDataSinkExecTransformer::HandleReduce));
AddHandler({TYtOutput::CallableName()}, RequireFirst(), Pass());
AddHandler({TYtPublish::CallableName()}, RequireAllOf({TYtPublish::idx_World, TYtPublish::idx_Input}), Hndl(&TYtDataSinkExecTransformer::HandlePublish));
AddHandler({TYtDropTable::CallableName()}, RequireFirst(), Hndl(&TYtDataSinkExecTransformer::HandleDrop));
Expand All @@ -124,6 +124,21 @@ class TYtDataSinkExecTransformer : public TExecTransformerBase {
TExecTransformerBase::Rewind();
}

static TExecTransformerBase::TPrerequisite RequireForTransientOp() {
return [] (const TExprNode::TPtr& input) {
auto status = RequireChild(*input, TYtTransientOpBase::idx_World);
// We have to run input only if it has no settings to calculate.
// Otherwise, we first of all wait world completion.
// Then begins node execution, which run settings calculation.
// And after that, starts input execution
// See YQL-19303
if (!HasNodesToCalculate(input->ChildPtr(TYtTransientOpBase::idx_Input))) {
status = status.Combine(RequireChild(*input, TYtTransientOpBase::idx_Input));
}
return status;
};
}

private:
static void PushHybridStats(const TYtState::TPtr& state, TStringBuf statName, TStringBuf opName, const TStringBuf& folderName = "") {
with_lock(state->StatisticsMutex) {
Expand Down Expand Up @@ -190,6 +205,10 @@ class TYtDataSinkExecTransformer : public TExecTransformerBase {
return CalculateNodes(State_, input, cluster, needCalc, ctx);
}

if (auto opInput = op.Maybe<TYtTransientOpBase>().Input()) {
YQL_ENSURE(opInput.Ref().GetState() == TExprNode::EState::ExecutionComplete);
}

auto outSection = op.Output();

size_t outWithoutName = 0;
Expand Down

0 comments on commit 34a26dd

Please sign in to comment.