From e81febfa541e2311c84cc6bb9304cd88ff58f344 Mon Sep 17 00:00:00 2001 From: Nik Everett Date: Mon, 30 Dec 2024 16:36:01 -0500 Subject: [PATCH] Works now? Had to disable some stuff --- .../compute/data/OrdinalBytesRefBlock.java | 5 ++ .../compute/operator/AsyncOperator.java | 9 +- .../operator/lookup/RightChunkedLeftJoin.java | 14 +++- .../xpack/esql/action/LookupFromIndexIT.java | 27 +++--- .../esql/enrich/AbstractLookupService.java | 54 +++++++----- .../esql/enrich/EnrichLookupService.java | 11 ++- .../esql/enrich/LookupFromIndexOperator.java | 84 +++++++++++++++++-- .../esql/enrich/LookupFromIndexService.java | 11 ++- 8 files changed, 167 insertions(+), 48 deletions(-) diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/OrdinalBytesRefBlock.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/OrdinalBytesRefBlock.java index 863f89827207e..87b33b3b0893d 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/OrdinalBytesRefBlock.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/OrdinalBytesRefBlock.java @@ -246,4 +246,9 @@ public OrdinalBytesRefBlock expand() { public long ramBytesUsed() { return ordinals.ramBytesUsed() + bytes.ramBytesUsed(); } + + @Override + public String toString() { + return getClass().getSimpleName() + "[ordinals=" + ordinals + ", bytes=" + bytes + "]"; + } } diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/AsyncOperator.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/AsyncOperator.java index 751910e9ed336..4beff90687c55 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/AsyncOperator.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/AsyncOperator.java @@ -131,7 +131,7 @@ private void onSeqNoCompleted(long seqNo) { notifyIfBlocked(); } if (closed || failureCollector.hasFailure()) { - discardPages(); + discardResults(); } } @@ -151,12 +151,12 @@ private void notifyIfBlocked() { private void checkFailure() { Exception e = failureCollector.getFailure(); if (e != null) { - discardPages(); + discardResults(); throw ExceptionsHelper.convertToRuntime(e); } } - private void discardPages() { + private void discardResults() { long nextCheckpoint; while ((nextCheckpoint = checkpoint.getPersistedCheckpoint() + 1) <= checkpoint.getProcessedCheckpoint()) { Result result = buffers.remove(nextCheckpoint); @@ -171,7 +171,7 @@ private void discardPages() { public final void close() { finish(); closed = true; - discardPages(); + discardResults(); doClose(); } @@ -239,6 +239,7 @@ public final Operator.Status status() { } protected Operator.Status status(long receivedPages, long completedPages, long totalTimeInMillis) { + // NOCOMMIT this is wrong - completedPages is the number of results, not pages. return new Status(receivedPages, completedPages, totalTimeInMillis); } diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/lookup/RightChunkedLeftJoin.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/lookup/RightChunkedLeftJoin.java index 6f1f8a1f27bb9..2e2a0d383e6b4 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/lookup/RightChunkedLeftJoin.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/lookup/RightChunkedLeftJoin.java @@ -138,12 +138,12 @@ public class RightChunkedLeftJoin implements Releasable { */ private int next = 0; - RightChunkedLeftJoin(Page leftHand, int mergedElementCounts) { + public RightChunkedLeftJoin(Page leftHand, int mergedElementCounts) { this.leftHand = leftHand; this.mergedElementCount = mergedElementCounts; } - Page join(Page rightHand) { + public Page join(Page rightHand) { IntVector positions = rightHand.getBlock(0).asVector(); if (positions.getInt(0) < next - 1) { throw new IllegalArgumentException("maximum overlap is one position"); @@ -209,7 +209,7 @@ Page join(Page rightHand) { } } - Optional noMoreRightHandPages() { + public Optional noMoreRightHandPages() { if (next == leftHand.getPositionCount()) { return Optional.empty(); } @@ -237,6 +237,14 @@ Optional noMoreRightHandPages() { } } + /** + * Release this on any thread, rather than just the thread that built it. + */ + public void releaseOnAnyThread() { + leftHand.allowPassingToDifferentDriver(); + leftHand.releaseBlocks(); + } + @Override public void close() { Releasables.close(leftHand::releaseBlocks); diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/LookupFromIndexIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/LookupFromIndexIT.java index f31eabea9d616..f5dfc62df1160 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/LookupFromIndexIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/LookupFromIndexIT.java @@ -60,6 +60,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.CopyOnWriteArrayList; @@ -80,9 +81,8 @@ public void testLookupIndex() throws IOException { /** * Tests when multiple results match. */ - @AwaitsFix(bugUrl = "fixing real soon now") public void testLookupIndexMultiResults() throws IOException { - runLookup(new UsingSingleLookupTable(new Object[] { "aa", new String[] { "bb", "ff" }, "cc", "dd" })); + runLookup(new UsingSingleLookupTable(new String[] { "aa", "bb", "bb", "dd" })); } interface PopulateIndices { @@ -90,24 +90,24 @@ interface PopulateIndices { } class UsingSingleLookupTable implements PopulateIndices { - private final Object[] lookupData; + private final Map> matches = new HashMap<>(); + private final String[] lookupData; - UsingSingleLookupTable(Object[] lookupData) { + UsingSingleLookupTable(String[] lookupData) { this.lookupData = lookupData; + for (int i = 0; i < lookupData.length; i++) { + matches.computeIfAbsent(lookupData[i], k -> new ArrayList<>()).add(i); + } } @Override - public void populate(int docCount, List expected) throws IOException { + public void populate(int docCount, List expected) { List docs = new ArrayList<>(); for (int i = 0; i < docCount; i++) { - docs.add(client().prepareIndex("source").setSource(Map.of("data", lookupData[i % lookupData.length]))); - Object d = lookupData[i % lookupData.length]; - if (d instanceof String s) { - expected.add(s + ":" + (i % lookupData.length)); - } else if (d instanceof String[] ss) { - for (String s : ss) { - expected.add(s + ":" + (i % lookupData.length)); - } + String data = lookupData[i % lookupData.length]; + docs.add(client().prepareIndex("source").setSource(Map.of("data", data))); + for (Integer match: matches.get(data)) { + expected.add(data + ":" + match); } } for (int i = 0; i < lookupData.length; i++) { @@ -230,6 +230,7 @@ private void runLookup(PopulateIndices populateIndices) throws IOException { List.of(reader.get(driverContext), lookup.get(driverContext)), new PageConsumerOperator(page -> { try { + System.err.println(docCount + " ADSFADFDAFADSF " + page); BytesRefVector dataBlock = page.getBlock(1).asVector(); LongVector loadedBlock = page.getBlock(2).asVector(); for (int p = 0; p < page.getPositionCount(); p++) { diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/AbstractLookupService.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/AbstractLookupService.java index 7d13498c73a71..d320a8d46120d 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/AbstractLookupService.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/AbstractLookupService.java @@ -39,6 +39,7 @@ import org.elasticsearch.compute.operator.DriverContext; import org.elasticsearch.compute.operator.Operator; import org.elasticsearch.compute.operator.OutputOperator; +import org.elasticsearch.compute.operator.ProjectOperator; import org.elasticsearch.compute.operator.Warnings; import org.elasticsearch.compute.operator.lookup.EnrichQuerySourceOperator; import org.elasticsearch.compute.operator.lookup.MergePositionsOperator; @@ -137,6 +138,7 @@ abstract class AbstractLookupService readRequest ) { this.actionName = actionName; @@ -155,6 +158,7 @@ abstract class AbstractLookupService mergingTypes[i] = PlannerUtils.toElementType(request.extractFields.get(i).dataType()); } final int[] mergingChannels = IntStream.range(0, request.extractFields.size()).map(i -> i + 2).toArray(); - final MergePositionsOperator mergePositionsOperator; + final Operator finishPages; final OrdinalBytesRefBlock ordinalsBytesRefBlock; - if (inputBlock instanceof BytesRefBlock bytesRefBlock && (ordinalsBytesRefBlock = bytesRefBlock.asOrdinals()) != null) { + if (mergePages // TODO fix this faster branch with mergePages == false + && inputBlock instanceof BytesRefBlock bytesRefBlock + && (ordinalsBytesRefBlock = bytesRefBlock.asOrdinals()) != null) { + inputBlock = ordinalsBytesRefBlock.getDictionaryVector().asBlock(); var selectedPositions = ordinalsBytesRefBlock.getOrdinalsBlock(); - mergePositionsOperator = new MergePositionsOperator( - 1, - mergingChannels, - mergingTypes, - selectedPositions, - driverContext.blockFactory() - ); - + finishPages = new MergePositionsOperator(1, mergingChannels, mergingTypes, selectedPositions, driverContext.blockFactory()); } else { - try (var selectedPositions = IntVector.range(0, inputBlock.getPositionCount(), blockFactory).asBlock()) { - mergePositionsOperator = new MergePositionsOperator( - 1, - mergingChannels, - mergingTypes, - selectedPositions, - driverContext.blockFactory() - ); + if (mergePages) { + try (var selectedPositions = IntVector.range(0, inputBlock.getPositionCount(), blockFactory).asBlock()) { + finishPages = new MergePositionsOperator( + 1, + mergingChannels, + mergingTypes, + selectedPositions, + driverContext.blockFactory() + ); + } + } else { + finishPages = dropDocBlockOperator(request.extractFields); } } - releasables.add(mergePositionsOperator); + releasables.add(finishPages); SearchExecutionContext searchExecutionContext = searchContext.getSearchExecutionContext(); QueryList queryList = queryList(request, searchExecutionContext, inputBlock, request.inputDataType); var warnings = Warnings.createWarnings( @@ -377,7 +381,7 @@ private void doLookup(T request, CancellableTask task, ActionListener driverContext, request::toString, queryOperator, - List.of(extractFieldsOperator, mergePositionsOperator), + List.of(extractFieldsOperator, finishPages), outputOperator, Driver.DEFAULT_STATUS_INTERVAL, Releasables.wrap(searchContext, localBreaker) @@ -442,6 +446,16 @@ private static Operator extractFieldsOperator( ); } + private Operator dropDocBlockOperator(List extractFields) { + // Drop just the first block, keeping the remaining + int end = extractFields.size() + 1; + List projection = new ArrayList<>(end); + for (int i = 1; i <= end; i++) { + projection.add(i); + } + return new ProjectOperator(projection); + } + private Page createNullResponse(int positionCount, List extractFields) { final Block[] blocks = new Block[extractFields.size()]; try { diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichLookupService.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichLookupService.java index 090c54d84f8b3..7bfc0867e139d 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichLookupService.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichLookupService.java @@ -53,7 +53,16 @@ public EnrichLookupService( BigArrays bigArrays, BlockFactory blockFactory ) { - super(LOOKUP_ACTION_NAME, clusterService, searchService, transportService, bigArrays, blockFactory, TransportRequest::readFrom); + super( + LOOKUP_ACTION_NAME, + clusterService, + searchService, + transportService, + bigArrays, + blockFactory, + true, + TransportRequest::readFrom + ); } @Override diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/LookupFromIndexOperator.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/LookupFromIndexOperator.java index bb803411ad914..4574d8d1b8e50 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/LookupFromIndexOperator.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/LookupFromIndexOperator.java @@ -8,6 +8,7 @@ package org.elasticsearch.xpack.esql.enrich; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.common.collect.Iterators; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; @@ -15,7 +16,11 @@ import org.elasticsearch.compute.data.Page; import org.elasticsearch.compute.operator.AsyncOperator; import org.elasticsearch.compute.operator.DriverContext; +import org.elasticsearch.compute.operator.IsBlockedResult; import org.elasticsearch.compute.operator.Operator; +import org.elasticsearch.compute.operator.lookup.RightChunkedLeftJoin; +import org.elasticsearch.core.Releasable; +import org.elasticsearch.core.Releasables; import org.elasticsearch.tasks.CancellableTask; import org.elasticsearch.xcontent.XContentBuilder; import org.elasticsearch.xpack.esql.core.expression.NamedExpression; @@ -23,11 +28,13 @@ import org.elasticsearch.xpack.esql.core.type.DataType; import java.io.IOException; +import java.util.Iterator; import java.util.List; import java.util.Objects; +import java.util.Optional; // TODO rename package -public final class LookupFromIndexOperator extends AsyncOperator { +public final class LookupFromIndexOperator extends AsyncOperator { public record Factory( String sessionId, CancellableTask parentTask, @@ -81,6 +88,10 @@ public Operator get(DriverContext driverContext) { private final List loadFields; private final Source source; private long totalTerms = 0L; + /** + * The ongoing join or {@code null} none is ongoing at the moment. + */ + private OngoingJoin ongoing = null; public LookupFromIndexOperator( String sessionId, @@ -108,7 +119,8 @@ public LookupFromIndexOperator( } @Override - protected void performAsync(Page inputPage, ActionListener listener) { + protected void performAsync(Page inputPage, ActionListener listener) { + System.err.println("AAAAAA input " + inputPage); final Block inputBlock = inputPage.getBlock(inputChannel); totalTerms += inputBlock.getTotalValueCount(); LookupFromIndexService.Request request = new LookupFromIndexService.Request( @@ -121,17 +133,48 @@ protected void performAsync(Page inputPage, ActionListener listener) { source ); // NOCOMMIT join pages with the operator dude - lookupService.lookupAsync(request, parentTask, listener.map(pages -> inputPage.appendPage(pages.getFirst()))); + lookupService.lookupAsync( + request, + parentTask, + listener.map(pages -> new OngoingJoin(new RightChunkedLeftJoin(inputPage, loadFields.size()), pages.iterator())) + ); } @Override public Page getOutput() { - return getResultFromBuffer(); + if (ongoing == null) { + System.err.println("AAAAAA starting"); + // No ongoing join, start a new one if we can. + ongoing = getResultFromBuffer(); + if (ongoing == null) { + // Buffer empty, wait for the next time we're called. + System.err.println("AAAAAA nothing"); + return null; + } + System.err.println("AAAAAA started"); + } + if (ongoing.itr.hasNext()) { + // There's more to do in the ongoing join. + Page right = ongoing.itr.next(); + System.err.println("AAAAAA joining " + right); + try { + return ongoing.join.join(right); + } finally { + right.releaseBlocks(); + } + } + // Current join is all done. Emit any trailing unmatched rows. + System.err.println("AAAAAA finishing"); + Optional remaining = ongoing.join.noMoreRightHandPages(); + remaining.ifPresent(page -> System.err.println("AAAAAA remaining " + page.getPositionCount())); + ongoing.close(); + ongoing = null; + return remaining.orElse(null); } @Override - protected void releaseResultOnAnyThread(Page page) { - releasePageOnAnyThread(page); + protected void releaseResultOnAnyThread(OngoingJoin ongoingJoin) { + ongoingJoin.releaseOnAnyThread(); } @Override @@ -149,14 +192,29 @@ public String toString() { + "]"; } + @Override + public boolean isFinished() { + return ongoing == null && super.isFinished(); + } + + @Override + public IsBlockedResult isBlocked() { + if (ongoing != null) { + return NOT_BLOCKED; + } + return super.isBlocked(); + } + @Override protected void doClose() { // TODO: Maybe create a sub-task as the parent task of all the lookup tasks // then cancel it when this operator terminates early (e.g., have enough result). + Releasables.close(ongoing); } @Override protected Operator.Status status(long receivedPages, long completedPages, long totalTimeInMillis) { + // NOCOMMIT this is wrong - completedPages is the number of results, not pages. return new LookupFromIndexOperator.Status(receivedPages, completedPages, totalTimeInMillis, totalTerms); } @@ -215,4 +273,18 @@ public int hashCode() { return Objects.hash(super.hashCode(), totalTerms); } } + + protected record OngoingJoin(RightChunkedLeftJoin join, Iterator itr) implements Releasable { + @Override + public void close() { + Releasables.close(join, Releasables.wrap(() -> Iterators.map(itr, page -> page::releaseBlocks))); + } + + public void releaseOnAnyThread() { + Releasables.close( + join::releaseOnAnyThread, + Releasables.wrap(() -> Iterators.map(itr, page -> () -> releasePageOnAnyThread(page))) + ); + } + } } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/LookupFromIndexService.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/LookupFromIndexService.java index b3fcb328fdd91..ae4af18207aa3 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/LookupFromIndexService.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/LookupFromIndexService.java @@ -52,7 +52,16 @@ public LookupFromIndexService( BigArrays bigArrays, BlockFactory blockFactory ) { - super(LOOKUP_ACTION_NAME, clusterService, searchService, transportService, bigArrays, blockFactory, TransportRequest::readFrom); + super( + LOOKUP_ACTION_NAME, + clusterService, + searchService, + transportService, + bigArrays, + blockFactory, + false, + TransportRequest::readFrom + ); } @Override