Skip to content

Commit

Permalink
ESQL: Make LOOKUP more left-joiny (elastic#119475)
Browse files Browse the repository at this point in the history
This makes `LOOKUP` return multiple rows if there are multiple matches. This is the way SQL works so it's *probably* what folks will expect. Even if it isn't, it allows for more optimizations. Like, this change doesn't optimize anything - it just changes the behavior. But there are optimizations you can do *later* that are transparent when we have *this* behavior, but not with the old behavior.

Example:
```
-  2  | [German, German, German] | [Austria, Germany, Switzerland]
+  2  | German                   | [Austria, Germany]
+  2  | German                   | Switzerland
+  2  | German                   | null
```

Relates: elastic#118781
  • Loading branch information
nik9000 committed Jan 15, 2025
1 parent e4cca58 commit f946a0e
Show file tree
Hide file tree
Showing 29 changed files with 866 additions and 275 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,4 +41,9 @@ public void writeTo(StreamOutput out) throws IOException {

out.writeBoolean(supported);
}

@Override
public String toString() {
return "NodeCapability{supported=" + supported + '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -246,4 +246,9 @@ public OrdinalBytesRefBlock expand() {
public long ramBytesUsed() {
return ordinals.ramBytesUsed() + bytes.ramBytesUsed();
}

@Override
public String toString() {
return getClass().getSimpleName() + "[ordinals=" + ordinals + ", bytes=" + bytes + "]";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,18 @@
import java.util.concurrent.atomic.LongAdder;

/**
* {@link AsyncOperator} performs an external computation specified in {@link #performAsync(Page, ActionListener)}.
* This operator acts as a client and operates on a per-page basis to reduce communication overhead.
* {@link AsyncOperator} performs an external computation specified in
* {@link #performAsync(Page, ActionListener)}. This operator acts as a client
* to reduce communication overhead and fetches a {@code Fetched} at a time.
* It's the responsibility of subclasses to transform that {@code Fetched} into
* output.
* @see #performAsync(Page, ActionListener)
*/
public abstract class AsyncOperator implements Operator {
public abstract class AsyncOperator<Fetched> implements Operator {

private volatile SubscribableListener<Void> blockedFuture;

private final Map<Long, Page> buffers = ConcurrentCollections.newConcurrentMap();
private final Map<Long, Fetched> buffers = ConcurrentCollections.newConcurrentMap();
private final FailureCollector failureCollector = new FailureCollector();
private final DriverContext driverContext;

Expand Down Expand Up @@ -83,7 +86,7 @@ public void addInput(Page input) {
driverContext.addAsyncAction();
boolean success = false;
try {
final ActionListener<Page> listener = ActionListener.wrap(output -> {
final ActionListener<Fetched> listener = ActionListener.wrap(output -> {
buffers.put(seqNo, output);
onSeqNoCompleted(seqNo);
}, e -> {
Expand All @@ -104,18 +107,20 @@ public void addInput(Page input) {
}
}

private void releasePageOnAnyThread(Page page) {
protected static void releasePageOnAnyThread(Page page) {
page.allowPassingToDifferentDriver();
page.releaseBlocks();
}

protected abstract void releaseFetchedOnAnyThread(Fetched result);

/**
* Performs an external computation and notify the listener when the result is ready.
*
* @param inputPage the input page
* @param listener the listener
*/
protected abstract void performAsync(Page inputPage, ActionListener<Page> listener);
protected abstract void performAsync(Page inputPage, ActionListener<Fetched> listener);

protected abstract void doClose();

Expand All @@ -125,7 +130,7 @@ private void onSeqNoCompleted(long seqNo) {
notifyIfBlocked();
}
if (closed || failureCollector.hasFailure()) {
discardPages();
discardResults();
}
}

Expand All @@ -145,18 +150,18 @@ private void notifyIfBlocked() {
private void checkFailure() {
Exception e = failureCollector.getFailure();
if (e != null) {
discardPages();
discardResults();
throw ExceptionsHelper.convertToRuntime(e);
}
}

private void discardPages() {
private void discardResults() {
long nextCheckpoint;
while ((nextCheckpoint = checkpoint.getPersistedCheckpoint() + 1) <= checkpoint.getProcessedCheckpoint()) {
Page page = buffers.remove(nextCheckpoint);
Fetched result = buffers.remove(nextCheckpoint);
checkpoint.markSeqNoAsPersisted(nextCheckpoint);
if (page != null) {
releasePageOnAnyThread(page);
if (result != null) {
releaseFetchedOnAnyThread(result);
}
}
}
Expand All @@ -165,7 +170,7 @@ private void discardPages() {
public final void close() {
finish();
closed = true;
discardPages();
discardResults();
doClose();
}

Expand All @@ -184,15 +189,18 @@ public boolean isFinished() {
}
}

@Override
public Page getOutput() {
/**
* Get a {@link Fetched} from the buffer.
* @return a result if one is ready or {@code null} if none are available.
*/
public final Fetched fetchFromBuffer() {
checkFailure();
long persistedCheckpoint = checkpoint.getPersistedCheckpoint();
if (persistedCheckpoint < checkpoint.getProcessedCheckpoint()) {
persistedCheckpoint++;
Page page = buffers.remove(persistedCheckpoint);
Fetched result = buffers.remove(persistedCheckpoint);
checkpoint.markSeqNoAsPersisted(persistedCheckpoint);
return page;
return result;
} else {
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@
* | l99 | null | null |
* }</pre>
*/
class RightChunkedLeftJoin implements Releasable {
public class RightChunkedLeftJoin implements Releasable {
private final Page leftHand;
private final int mergedElementCount;
/**
Expand All @@ -138,12 +138,12 @@ class RightChunkedLeftJoin implements Releasable {
*/
private int next = 0;

RightChunkedLeftJoin(Page leftHand, int mergedElementCounts) {
public RightChunkedLeftJoin(Page leftHand, int mergedElementCounts) {
this.leftHand = leftHand;
this.mergedElementCount = mergedElementCounts;
}

Page join(Page rightHand) {
public Page join(Page rightHand) {
IntVector positions = rightHand.<IntBlock>getBlock(0).asVector();
if (positions.getInt(0) < next - 1) {
throw new IllegalArgumentException("maximum overlap is one position");
Expand Down Expand Up @@ -209,7 +209,7 @@ Page join(Page rightHand) {
}
}

Optional<Page> noMoreRightHandPages() {
public Optional<Page> noMoreRightHandPages() {
if (next == leftHand.getPositionCount()) {
return Optional.empty();
}
Expand Down Expand Up @@ -237,6 +237,14 @@ Optional<Page> noMoreRightHandPages() {
}
}

/**
* Release this on <strong>any</strong> thread, rather than just the thread that built it.
*/
public void releaseOnAnyThread() {
leftHand.allowPassingToDifferentDriver();
leftHand.releaseBlocks();
}

@Override
public void close() {
Releasables.close(leftHand::releaseBlocks);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,14 +110,24 @@ protected Page createPage(int positionOffset, int length) {
}
};
int maxConcurrentRequests = randomIntBetween(1, 10);
AsyncOperator asyncOperator = new AsyncOperator(driverContext, maxConcurrentRequests) {
AsyncOperator<Page> asyncOperator = new AsyncOperator<Page>(driverContext, maxConcurrentRequests) {
final LookupService lookupService = new LookupService(threadPool, globalBlockFactory, dict, maxConcurrentRequests);

@Override
protected void performAsync(Page inputPage, ActionListener<Page> listener) {
lookupService.lookupAsync(inputPage, listener);
}

@Override
public Page getOutput() {
return fetchFromBuffer();
}

@Override
protected void releaseFetchedOnAnyThread(Page page) {
releasePageOnAnyThread(page);
}

@Override
public void doClose() {

Expand Down Expand Up @@ -159,7 +169,7 @@ public void doClose() {
Releasables.close(localBreaker);
}

class TestOp extends AsyncOperator {
class TestOp extends AsyncOperator<Page> {
Map<Page, ActionListener<Page>> handlers = new HashMap<>();

TestOp(DriverContext driverContext, int maxOutstandingRequests) {
Expand All @@ -171,6 +181,16 @@ protected void performAsync(Page inputPage, ActionListener<Page> listener) {
handlers.put(inputPage, listener);
}

@Override
public Page getOutput() {
return fetchFromBuffer();
}

@Override
protected void releaseFetchedOnAnyThread(Page page) {
releasePageOnAnyThread(page);
}

@Override
protected void doClose() {

Expand Down Expand Up @@ -233,7 +253,7 @@ public void testFailure() throws Exception {
);
int maxConcurrentRequests = randomIntBetween(1, 10);
AtomicBoolean failed = new AtomicBoolean();
AsyncOperator asyncOperator = new AsyncOperator(driverContext, maxConcurrentRequests) {
AsyncOperator<Page> asyncOperator = new AsyncOperator<Page>(driverContext, maxConcurrentRequests) {
@Override
protected void performAsync(Page inputPage, ActionListener<Page> listener) {
ActionRunnable<Page> command = new ActionRunnable<>(listener) {
Expand All @@ -256,6 +276,16 @@ protected void doRun() {
}
}

@Override
public Page getOutput() {
return fetchFromBuffer();
}

@Override
protected void releaseFetchedOnAnyThread(Page page) {
releasePageOnAnyThread(page);
}

@Override
protected void doClose() {

Expand Down Expand Up @@ -285,7 +315,7 @@ public void testIsFinished() {
for (int i = 0; i < iters; i++) {
DriverContext driverContext = new DriverContext(blockFactory.bigArrays(), blockFactory);
CyclicBarrier barrier = new CyclicBarrier(2);
AsyncOperator asyncOperator = new AsyncOperator(driverContext, between(1, 10)) {
AsyncOperator<Page> asyncOperator = new AsyncOperator<Page>(driverContext, between(1, 10)) {
@Override
protected void performAsync(Page inputPage, ActionListener<Page> listener) {
ActionRunnable<Page> command = new ActionRunnable<>(listener) {
Expand All @@ -302,6 +332,16 @@ protected void doRun() {
threadPool.executor(ESQL_TEST_EXECUTOR).execute(command);
}

@Override
public Page getOutput() {
return fetchFromBuffer();
}

@Override
protected void releaseFetchedOnAnyThread(Page page) {
releasePageOnAnyThread(page);
}

@Override
protected void doClose() {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,7 @@ private static Page randomPage() {
return new Page(block.block());
}

static class SwitchContextOperator extends AsyncOperator {
static class SwitchContextOperator extends AsyncOperator<Page> {
private final ThreadPool threadPool;

SwitchContextOperator(DriverContext driverContext, ThreadPool threadPool) {
Expand All @@ -348,6 +348,16 @@ protected void performAsync(Page page, ActionListener<Page> listener) {
}), TimeValue.timeValueNanos(between(1, 1_000_000)), threadPool.executor("esql"));
}

@Override
public Page getOutput() {
return fetchFromBuffer();
}

@Override
protected void releaseFetchedOnAnyThread(Page page) {
releasePageOnAnyThread(page);
}

@Override
protected void doClose() {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -384,7 +384,7 @@ private void assertTrailing(RightChunkedLeftJoin join, int size, int next) {

Object unwrapSingletonLists(Object o) {
if (o instanceof List<?> l && l.size() == 1) {
return l.getFirst();
return l.get(0);
}
return o;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -548,7 +548,7 @@ record Listen(long timestamp, String songId, double duration) {
public void testLookupJoinIndexAllowed() throws Exception {
assumeTrue(
"Requires LOOKUP JOIN capability",
EsqlSpecTestCase.hasCapabilities(adminClient(), List.of(EsqlCapabilities.Cap.JOIN_LOOKUP_V10.capabilityName()))
EsqlSpecTestCase.hasCapabilities(adminClient(), List.of(EsqlCapabilities.Cap.JOIN_LOOKUP_V11.capabilityName()))
);

Response resp = runESQLCommand(
Expand Down Expand Up @@ -587,7 +587,7 @@ public void testLookupJoinIndexAllowed() throws Exception {
public void testLookupJoinIndexForbidden() throws Exception {
assumeTrue(
"Requires LOOKUP JOIN capability",
EsqlSpecTestCase.hasCapabilities(adminClient(), List.of(EsqlCapabilities.Cap.JOIN_LOOKUP_V10.capabilityName()))
EsqlSpecTestCase.hasCapabilities(adminClient(), List.of(EsqlCapabilities.Cap.JOIN_LOOKUP_V11.capabilityName()))
);

var resp = expectThrows(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import java.util.List;

import static org.elasticsearch.xpack.esql.CsvTestUtils.isEnabled;
import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.JOIN_LOOKUP_V10;
import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.JOIN_LOOKUP_V11;
import static org.elasticsearch.xpack.esql.qa.rest.EsqlSpecTestCase.Mode.ASYNC;

public class MixedClusterEsqlSpecIT extends EsqlSpecTestCase {
Expand Down Expand Up @@ -96,7 +96,7 @@ protected boolean supportsInferenceTestService() {

@Override
protected boolean supportsIndexModeLookup() throws IOException {
return hasCapabilities(List.of(JOIN_LOOKUP_V10.capabilityName()));
return hasCapabilities(List.of(JOIN_LOOKUP_V11.capabilityName()));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@
import static org.elasticsearch.xpack.esql.EsqlTestUtils.classpathResources;
import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.INLINESTATS;
import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.INLINESTATS_V2;
import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.JOIN_LOOKUP_V10;
import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.JOIN_LOOKUP_V11;
import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.JOIN_PLANNING_V1;
import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.METADATA_FIELDS_REMOTE_TEST;
import static org.elasticsearch.xpack.esql.qa.rest.EsqlSpecTestCase.Mode.SYNC;
Expand Down Expand Up @@ -124,7 +124,7 @@ protected void shouldSkipTest(String testName) throws IOException {
assumeFalse("INLINESTATS not yet supported in CCS", testCase.requiredCapabilities.contains(INLINESTATS.capabilityName()));
assumeFalse("INLINESTATS not yet supported in CCS", testCase.requiredCapabilities.contains(INLINESTATS_V2.capabilityName()));
assumeFalse("INLINESTATS not yet supported in CCS", testCase.requiredCapabilities.contains(JOIN_PLANNING_V1.capabilityName()));
assumeFalse("LOOKUP JOIN not yet supported in CCS", testCase.requiredCapabilities.contains(JOIN_LOOKUP_V10.capabilityName()));
assumeFalse("LOOKUP JOIN not yet supported in CCS", testCase.requiredCapabilities.contains(JOIN_LOOKUP_V11.capabilityName()));
}

private TestFeatureService remoteFeaturesService() throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ public void testIndicesDontExist() throws IOException {
assertThat(e.getMessage(), containsString("index_not_found_exception"));
assertThat(e.getMessage(), anyOf(containsString("no such index [foo]"), containsString("no such index [remote_cluster:foo]")));

if (EsqlCapabilities.Cap.JOIN_LOOKUP_V10.isEnabled()) {
if (EsqlCapabilities.Cap.JOIN_LOOKUP_V11.isEnabled()) {
e = expectThrows(
ResponseException.class,
() -> runEsql(timestampFilter("gte", "2020-01-01").query(from("test1") + " | LOOKUP JOIN foo ON id1"))
Expand Down
Loading

0 comments on commit f946a0e

Please sign in to comment.