Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Eager speculative execution for final LIMIT stages #18862

Merged
merged 21 commits into from
Sep 20, 2023

Conversation

losipiuk
Copy link
Member

It is quite common to use

  SELECT .... LIMIT N

queries for date exploration.
Currently such queries when run in FTE mode require completion of all (or almost) all
tasks which read source data, even though most of the time final answer
could be obtained much sooner.

This commit enables EAGER_SPECULATIVE tasks execution mode for stages
which have FINAL LIMIT operator. This will allow for returning final
results to user much faster (assuming exchange plugin in use supports
concurrent read and write)

Release notes

(x) This is not user-visible or is docs only, and no release notes are required.
( ) Release notes are required. Please propose a release note for me.
( ) Release notes are required, with the following suggested text:

@cla-bot cla-bot bot added the cla-signed label Aug 30, 2023
@losipiuk losipiuk marked this pull request as ready for review September 4, 2023 11:37
Comment on lines -1205 to -1241
if (nonSpeculativeTasksWaitingForNode >= maxTasksWaitingForNode) {
break;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in Keep executionClass explicit in SchedulingQueue and PrioritizedSchedu… where did it go to?

not sure if this commit is supposed to be a pure syntatical refactor or something more?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It changes internal model of ScheulingQueue a bit. Not behavioral changes. Added some comment in commit message.

@@ -1412,7 +1412,7 @@ private void loadMoreTaskDescriptorsIfNecessary()
{
boolean schedulingQueueIsFull = schedulingQueue.getTaskCount(STANDARD) >= maxTasksWaitingForExecution;
for (StageExecution stageExecution : stageExecutions.values()) {
if (!schedulingQueueIsFull || stageExecution.hasOpenTaskRunning()) {
if (!schedulingQueueIsFull || stageExecution.hasOpenTaskRunning() || stageExecution.isEager()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this be here or some previous? (currently it's in Eagerly enumerate splits for eager stages)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this one (supprisingly) is actually in good commit :P

@losipiuk losipiuk force-pushed the lo/specul-limit branch 2 times, most recently from 93b749a to 1439ea3 Compare September 5, 2023 12:28

while (!schedulingQueue.isEmpty()) {
PrioritizedScheduledTask scheduledTask;
if (schedulingQueue.getTaskCount(STANDARD) > 0) {

if (schedulingQueue.getTaskCount(EAGER_SPECULATIVE) > 0 && eagerSpeculativeTasksWaitingForNode < maxTasksWaitingForNode) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should maxTasksWaitingForNode account for both standard and eagerSpeculative?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it does not matter much - it is mostly about how much state we want to keep. The waiting tasks are not really utilizing resources.
How I handle EAGER_SPECULATIVE matches logic we had so far for STANDARD and SPECULATIVE which are accounted separately.
Having separate lists may be better in a way if EAGER_SPECULATIVE require less resources than STANDARD task. Then we have a chance to schedule it even if bigger tasks are waiting.

@@ -139,6 +142,7 @@ public BinPackingNodeAllocatorService(
this.memoryRequirementIncreaseOnWorkerCrashEnabled = memoryRequirementIncreaseOnWorkerCrashEnabled;
this.allowedNoMatchingNodePeriod = requireNonNull(allowedNoMatchingNodePeriod, "allowedNoMatchingNodePeriod is null");
this.taskRuntimeMemoryEstimationOverhead = requireNonNull(taskRuntimeMemoryEstimationOverhead, "taskRuntimeMemoryEstimationOverhead is null");
this.eagerSpeculativeTasksNodeMemoryOvercommit = eagerSpeculativeTasksNodeMemoryOvercommit;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are able to over commit because we can always kill them if memory is not enough? Can we do the same for speculative tasks as well?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can. We can even for STANDARD, those will also be killed we run out of memory. But if we wanted to have more of STANDARD/SEPCULATIVE tasks running together we could just lower the memory estimate for those - would be more straightforward.
The starting memory estimate for tasks should be adjust to what level of concurrency we want - and that is why we picked 5GB which should match ~100GB worker node fine.

Overcommit for EAGER tasks is important because we believe those are "special". Typically they would not take much resources and have huge potential of finishing query early. And that is why we still want to run those even bumping taks concurrency a bit if cluster is fully booked on resources.

private static boolean hasSmallFinalLimitNode(SubPlan subPlan)
{
return PlanNodeSearcher.searchFrom(subPlan.getFragment().getRoot())
.where(node -> node instanceof LimitNode limitNode && !limitNode.isPartial() && limitNode.getCount() < 1_000_000)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For my knowledge, what does it mean if a limit node is "partial"?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

non-final. Limit is planned as

       LIMIT_FINAL (single)
                 |
       LIMIT_PARTIAL(distributed)

There is just a single task executing LIMIT_FINAL while there is many of LIMIT_PARTIAL nodes. Each LIMIT_PARTIAL would output up-to limit number of rows, and LIMIT_FINAL will do final selection

for (int i = 0; i < outputPartitionsCount; ++i) {
estimateBuilder.add(0);
}
return Optional.of(new OutputDataSizeEstimateResult(estimateBuilder.build(), OutputDataSizeEstimateStatus.ESTIMATED_FOR_EAGER_PARENT));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't quite understand here. Is this a simply place holder full of 0s?

}

queue.addOrUpdate(prioritizedTask.task(), prioritizedTask.priority());
queues.values().forEach(queue -> queue.remove(prioritizedTask.task()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the complexity of queue.remove?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is removal from HashMap + removal from TreeSet. So logN

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, right. It's our custom priority queue. I was afraid it may involve a linear scan.

int outputPartitionsCount = sinkPartitioningScheme.getPartitionCount();
ImmutableLongArray.Builder estimateBuilder = ImmutableLongArray.builder(outputPartitionsCount);
for (int i = 0; i < outputPartitionsCount; ++i) {
estimateBuilder.add(0);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is supposed to ensure that only a single "eager" task is created? (with the current logic it should always be single anyway as we are doing it only for simple LIMIT queries, right?)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah this is current assumption - that single task will be enough. If we wanted more tasks we would need to have estimate guessing here more elaborate - but we do not have much of the information at hand to be honest, give EAGER tasks are to be used to handle cases when query just barely started (at least for now).


private static boolean hasSmallFinalLimitNode(SubPlan subPlan)
{
return PlanNodeSearcher.searchFrom(subPlan.getFragment().getRoot())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible for a plan to be more complex then a single LIMIT node? (For example Final Aggregation -> Limit).

What do you think about having an extra condition if the stage is distribution "SINGLE"?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible for a plan to be more complex then a single LIMIT node? (For example Final Aggregation -> Limit).

I think it is possible - and it would still be caught by this predicate as we are just looking for final Limit node in fragment - not assuming it is root.

What do you think about having an extra condition if the stage is distribution "SINGLE"?

As part of predicate or more like an assertion? I think we should always get "SINGLE" distribution for fragment with final Limit node, right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As part of predicate or more like an assertion? I think we should always get "SINGLE" distribution for fragment with final Limit node, right?

That is my understanding as well, unless I'm missing some very unobvious corner case.

I wonder if maybe purely for "documentation" purposes it would be better to have a check for "SINGLE" to be explicit?

Copy link
Member Author

@losipiuk losipiuk Sep 11, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed method to:

        private static boolean hasSmallFinalLimitNode(SubPlan subPlan)
        {
            if (!subPlan.getFragment().getPartitioning().isSingleNode()) {
                // Final LIMIT should always have SINGLE distribution
                return false;
            }
            return PlanNodeSearcher.searchFrom(subPlan.getFragment().getRoot())
                    .where(node -> node instanceof LimitNode limitNode && !limitNode.isPartial() && limitNode.getCount() < 1_000_000)
                    .matches();
        }

@losipiuk losipiuk force-pushed the lo/specul-limit branch 3 times, most recently from d58369a to b5cc2ce Compare September 18, 2023 09:39
EAGER_SPECULATIVE is used for tasks from stages with some upstream stages still running but with high priority.
Tasks will be scheduled even if there are resources to schedule STANDARD tasks on cluster.
Tasks of EAGER_SPECULATIVE are used to implement early termination of queries, when it
is probable that we do not need to run whole downstream stages to produce final query result.
EAGER_SPECULATIVE will not prevent STANDARD tasks from being scheduled and will still be picked
to kill if needed when worker runs out of memory; this is needed to prevent deadlocks.
Reorder fields so order matches order of constructor parameters.
While most of FTE related session properties were hidden already some
were left visible. This was not intentional. Those session properties are
low level toggels to be used for tweaking engine mechanism and removed
eventually.
@losipiuk losipiuk force-pushed the lo/specul-limit branch 2 times, most recently from 39c47d3 to 8cd1491 Compare September 19, 2023 21:25
Only close source exchange if source stage writing to it is already done.
It could be that closeSourceExchanges was called because downstream stage
already finished while some upstream stages are still running.
E.g this may happen in case of early limit termination.
It is quite common to use
  SELECT .... LIMIT N
queries for date exploration.
Currently such queries when run in FTE mode require completion of all (or almost) all
tasks which read source data, even though most of the time final answer
could be obtained much sooner.

This commit enables EAGER_SPECULATIVE tasks execution mode for stages
which have FINAL LIMIT operator. This will allow for returning final
results to user much faster (assuming exchange plugin in use supports
concurrent read and write)
Extend Exchange SPI so engine can tell exchange that it should deliver
source handles as soon as it has any available, even if from troughput
perspective it would make more sense to wait a bit and deliver bigger
batch. It is important to be able to process stages which may
short-circuit query execution (like top-level LIMIT) swiftly.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Development

Successfully merging this pull request may close these issues.

4 participants