Skip to content

Commit

Permalink
Fix release
Browse files Browse the repository at this point in the history
  • Loading branch information
nik9000 committed Jan 2, 2025
1 parent 85c3609 commit d8a4760
Show file tree
Hide file tree
Showing 4 changed files with 233 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -136,17 +136,21 @@ FROM employees
| EVAL language_code = emp_no % 10
| LOOKUP JOIN languages_lookup_non_unique_key ON language_code
| WHERE emp_no > 10090 AND emp_no < 10096
| SORT emp_no
| EVAL country = MV_SORT(country)
| SORT emp_no, country
| KEEP emp_no, language_code, language_name, country
;

emp_no:integer | language_code:integer | language_name:keyword | country:keyword
10091 | 1 | [English, English, English] | [Canada, United Kingdom, United States of America]
10092 | 2 | [German, German, German] | [Austria, Germany, Switzerland]
10093 | 3 | null | null
10094 | 4 | Quenya | null
10095 | 5 | null | Atlantis
emp_no:integer | language_code:integer | language_name:keyword | country:text
10091 | 1 | English | Canada
10091 | 1 | null | United Kingdom
10091 | 1 | English | United States of America
10091 | 1 | English | null
10092 | 2 | German | [Germany, Austria]
10092 | 2 | German | Switzerland
10092 | 2 | German | null
10093 | 3 | null | null
10094 | 4 | Quenya | null
10095 | 5 | null | Atlantis
;

nonUniqueRightKeyOnTheCoordinator
Expand All @@ -161,12 +165,17 @@ FROM employees
| KEEP emp_no, language_code, language_name, country
;

emp_no:integer | language_code:integer | language_name:keyword | country:keyword
10001 | 1 | [English, English, English] | [Canada, United Kingdom, United States of America]
10002 | 2 | [German, German, German] | [Austria, Germany, Switzerland]
10003 | 3 | null | null
10004 | 4 | Quenya | null
10005 | 5 | null | Atlantis
emp_no:integer | language_code:integer | language_name:keyword | country:keyword
10001 | 1 | English | Canada
10001 | 1 | English | null
10001 | 1 | null | United Kingdom
10001 | 1 | English | United States of America
10002 | 2 | German | [Austria, Germany]
10002 | 2 | German | Switzerland
10002 | 2 | German | null
10003 | 3 | null | null
10004 | 4 | Quenya | null
10005 | 5 | null | Atlantis
;

nonUniqueRightKeyFromRow
Expand All @@ -178,8 +187,10 @@ ROW language_code = 2
| EVAL country = MV_SORT(country)
;

language_code:integer | language_name:keyword | country:keyword
2 | [German, German, German] | [Austria, Germany, Switzerland]
language_code:integer | language_name:keyword | country:keyword
2 | German | [Austria, Germany]
2 | German | Switzerland
2 | German | null
;

repeatedIndexOnFrom
Expand Down Expand Up @@ -406,8 +417,13 @@ FROM employees
;

emp_no:integer | language_code:integer | language_name:keyword
10001 | 1 | [English, English, English]
10002 | 2 | [German, German, German]
10001 | 1 | English
10001 | 1 | English
10001 | 1 | English
10001 | 1 | null
10002 | 2 | German
10002 | 2 | German
10002 | 2 | German
10003 | null | null
;

Expand All @@ -418,15 +434,16 @@ FROM employees
| WHERE 10003 < emp_no AND emp_no < 10008
| EVAL language_code = emp_no % 10
| LOOKUP JOIN languages_lookup_non_unique_key ON language_code
| SORT emp_no
| SORT emp_no, language_name
| KEEP emp_no, language_code, language_name
;

emp_no:integer | language_code:integer | language_name:keyword
10004 | 4 | Quenya
10005 | 5 | null
10006 | 6 | Mv-Lang
10007 | 7 | [Mv-Lang, Mv-Lang2]
10007 | 7 | Mv-Lang
10007 | 7 | Mv-Lang2
;

mvJoinKeyFromRow
Expand All @@ -438,8 +455,11 @@ ROW language_code = [4, 5, 6, 7]
| KEEP language_code, language_name, country
;

language_code:integer | language_name:keyword | country:keyword
[4, 5, 6, 7] | [Mv-Lang, Mv-Lang2, Quenya] | [Atlantis, Mv-Land, Mv-Land2]
language_code:integer | language_name:keyword | country:keyword
[4, 5, 6, 7] | Quenya | null
[4, 5, 6, 7] | null | Atlantis
[4, 5, 6, 7] | Mv-Lang | Mv-Land
[4, 5, 6, 7] | Mv-Lang2 | Mv-Land2
;

mvJoinKeyFromRowExpanded
Expand All @@ -448,16 +468,17 @@ required_capability: join_lookup_v10
ROW language_code = [4, 5, 6, 7, 8]
| MV_EXPAND language_code
| LOOKUP JOIN languages_lookup_non_unique_key ON language_code
| EVAL language_name = MV_SORT(language_name), country = MV_SORT(country)
| KEEP language_code, language_name, country
| SORT language_code, language_name, country
;

language_code:integer | language_name:keyword | country:keyword
4 | Quenya | null
5 | null | Atlantis
6 | Mv-Lang | Mv-Land
7 | [Mv-Lang, Mv-Lang2] | [Mv-Land, Mv-Land2]
8 | Mv-Lang2 | Mv-Land2
language_code:integer | language_name:keyword | country:text
4 | Quenya | null
5 | null | Atlantis
6 | Mv-Lang | Mv-Land
7 | Mv-Lang | Mv-Land
7 | Mv-Lang2 | Mv-Land2
8 | Mv-Lang2 | Mv-Land2
;

###########################################################################
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,10 @@ private void hasPrivilege(ActionListener<Void> outListener) {
private void doLookup(T request, CancellableTask task, ActionListener<List<Page>> listener) {
Block inputBlock = request.inputPage.getBlock(0);
if (inputBlock.areAllValuesNull()) {
listener.onResponse(List.of(createNullResponse(request.inputPage.getPositionCount(), request.extractFields)));
List<Page> nullResponse = mergePages ?
List.of(createNullResponse(request.inputPage.getPositionCount(), request.extractFields))
: List.of();
listener.onResponse(nullResponse);
return;
}
final List<Releasable> releasables = new ArrayList<>(6);
Expand Down Expand Up @@ -393,7 +396,7 @@ private void doLookup(T request, CancellableTask task, ActionListener<List<Page>
var threadContext = transportService.getThreadPool().getThreadContext();
Driver.start(threadContext, executor, driver, Driver.DEFAULT_MAX_ITERATIONS, listener.map(ignored -> {
List<Page> out = collectedPages;
if (out.isEmpty()) {
if (mergePages && out.isEmpty()) {
out = List.of(createNullResponse(request.inputPage.getPositionCount(), request.extractFields));
}
return out;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@

import java.io.IOException;
import java.util.List;
import java.util.Objects;

/**
* {@link LookupFromIndexService} performs lookup against a Lookup index for
Expand Down Expand Up @@ -194,15 +195,15 @@ protected String extraDescription() {
}
}

private static class LookupResponse extends AbstractLookupService.LookupResponse {
protected static class LookupResponse extends AbstractLookupService.LookupResponse {
private List<Page> pages;

private LookupResponse(List<Page> pages, BlockFactory blockFactory) {
LookupResponse(List<Page> pages, BlockFactory blockFactory) {
super(blockFactory);
this.pages = pages;
}

private LookupResponse(StreamInput in, BlockFactory blockFactory) throws IOException {
LookupResponse(StreamInput in, BlockFactory blockFactory) throws IOException {
super(blockFactory);
try (BlockStreamInput bsi = new BlockStreamInput(in, blockFactory)) {
this.pages = bsi.readCollectionAsList(Page::new);
Expand All @@ -212,7 +213,7 @@ private LookupResponse(StreamInput in, BlockFactory blockFactory) throws IOExcep
@Override
public void writeTo(StreamOutput out) throws IOException {
long bytes = pages.stream().mapToLong(Page::ramBytesUsedByBlocks).sum();
blockFactory.breaker().addEstimateBytesAndMaybeBreak(bytes, "serialize enrich lookup response");
blockFactory.breaker().addEstimateBytesAndMaybeBreak(bytes, "serialize lookup response");
reservedBytes += bytes;
out.writeCollection(pages);
}
Expand All @@ -224,11 +225,36 @@ protected List<Page> takePages() {
return p;
}

List<Page> pages() {
return pages;
}

@Override
protected void innerRelease() {
if (pages != null) {
Releasables.closeExpectNoException(() -> Iterators.map(pages.iterator(), page -> (Releasable) page::releaseBlocks));
Releasables.closeExpectNoException(
Releasables.wrap(Iterators.map(pages.iterator(), page -> page::releaseBlocks))
);
}
}

@Override
public boolean equals(Object o) {
if (o == null || getClass() != o.getClass()) {
return false;
}
LookupResponse that = (LookupResponse) o;
return Objects.equals(pages, that.pages);
}

@Override
public int hashCode() {
return Objects.hashCode(pages);
}

@Override
public String toString() {
return "LookupResponse{pages=" + pages + '}';
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.esql.enrich;

import org.elasticsearch.TransportVersion;
import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.common.breaker.CircuitBreakingException;
import org.elasticsearch.common.collect.Iterators;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.MockBigArrays;
import org.elasticsearch.common.util.PageCacheRecycler;
import org.elasticsearch.compute.data.BlockFactory;
import org.elasticsearch.compute.data.IntBlock;
import org.elasticsearch.compute.data.IntVector;
import org.elasticsearch.compute.data.Page;
import org.elasticsearch.compute.operator.DriverContext;
import org.elasticsearch.core.Releasables;
import org.elasticsearch.test.AbstractWireSerializingTestCase;
import org.elasticsearch.xpack.esql.TestBlockFactory;
import org.junit.After;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.nullValue;
import static org.hamcrest.Matchers.sameInstance;

public class LookupFromIndexServiceResponseTests extends AbstractWireSerializingTestCase<LookupFromIndexService.LookupResponse> {
private final List<CircuitBreaker> breakers = new ArrayList<>();

LookupFromIndexService.LookupResponse createTestInstance(BlockFactory blockFactory) {
return new LookupFromIndexService.LookupResponse(randomList(0, 10, () -> randomPage(blockFactory)), blockFactory);
}

Page randomPage(BlockFactory blockFactory) {
try (IntVector.Builder builder = blockFactory.newIntVectorFixedBuilder(3)) {
builder.appendInt(1);
builder.appendInt(2);
builder.appendInt(3);
return new Page(builder.build().asBlock());
}
}

@Override
protected LookupFromIndexService.LookupResponse createTestInstance() {
// Can't use a real block factory for the basic serialization tests because they don't release.
return createTestInstance(TestBlockFactory.getNonBreakingInstance());
}

@Override
protected Writeable.Reader<LookupFromIndexService.LookupResponse> instanceReader() {
return in -> new LookupFromIndexService.LookupResponse(in, TestBlockFactory.getNonBreakingInstance());
}

@Override
protected LookupFromIndexService.LookupResponse mutateInstance(LookupFromIndexService.LookupResponse instance) throws IOException {
assertThat(instance.blockFactory, sameInstance(TestBlockFactory.getNonBreakingInstance()));
List<Page> pages = new ArrayList<>(instance.pages().size());
pages.addAll(instance.pages());
pages.add(randomPage(TestBlockFactory.getNonBreakingInstance()));
return new LookupFromIndexService.LookupResponse(pages, instance.blockFactory);
}

@Override
protected NamedWriteableRegistry getNamedWriteableRegistry() {
return new NamedWriteableRegistry(List.of(IntBlock.ENTRY));
}

public void testWithBreaker() throws IOException {
BlockFactory origFactory = blockFactory();
BlockFactory copyFactory = blockFactory();
LookupFromIndexService.LookupResponse orig = createTestInstance(origFactory);
try {
LookupFromIndexService.LookupResponse copy = copyInstance(
orig,
getNamedWriteableRegistry(),
(out, v) -> v.writeTo(out),
in -> new LookupFromIndexService.LookupResponse(in, copyFactory),
TransportVersion.current()
);
try {
assertThat(copy, equalTo(orig));
} finally {
copy.decRef();
}
assertThat(copyFactory.breaker().getUsed(), equalTo(0L));
} finally {
orig.decRef();
}
assertThat(origFactory.breaker().getUsed(), equalTo(0L));
}

/**
* Tests that we reserve any memory other than that in the {@link Page}s we
* hold, and calling {@link LookupFromIndexService.LookupResponse#takePages}
* gives us those pages. If we then close those pages, we should have 0
* reserved memory.
*/
public void testTakePages() {
BlockFactory factory = blockFactory();
LookupFromIndexService.LookupResponse orig = createTestInstance(factory);
try {
if (orig.pages().isEmpty()) {
assertThat(factory.breaker().getUsed(), equalTo(0L));
return;
}
List<Page> pages = orig.takePages();
Releasables.closeExpectNoException(
Releasables.wrap(Iterators.map(pages.iterator(), page -> page::releaseBlocks))
);
assertThat(factory.breaker().getUsed(), equalTo(0L));
assertThat(orig.takePages(), nullValue());
} finally {
orig.decRef();
}
assertThat(factory.breaker().getUsed(), equalTo(0L));
}


private BlockFactory blockFactory() {
BigArrays bigArrays = new MockBigArrays(PageCacheRecycler.NON_RECYCLING_INSTANCE, ByteSizeValue.ofMb(4 /* more than we need*/))
.withCircuitBreaking();
CircuitBreaker breaker = bigArrays.breakerService().getBreaker(CircuitBreaker.REQUEST);
breakers.add(breaker);
return new BlockFactory(breaker, bigArrays);
}

@After
public void allBreakersEmpty() throws Exception {
// first check that all big arrays are released, which can affect breakers
MockBigArrays.ensureAllArraysAreReleased();

for (CircuitBreaker breaker : breakers) {
assertThat("Unexpected used in breaker: " + breaker, breaker.getUsed(), equalTo(0L));
}
}
}

0 comments on commit d8a4760

Please sign in to comment.