Skip to content

Commit

Permalink
Works now?
Browse files Browse the repository at this point in the history
Had to disable some stuff
  • Loading branch information
nik9000 committed Dec 30, 2024
1 parent 146403a commit e81febf
Show file tree
Hide file tree
Showing 8 changed files with 167 additions and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -246,4 +246,9 @@ public OrdinalBytesRefBlock expand() {
public long ramBytesUsed() {
return ordinals.ramBytesUsed() + bytes.ramBytesUsed();
}

@Override
public String toString() {
return getClass().getSimpleName() + "[ordinals=" + ordinals + ", bytes=" + bytes + "]";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ private void onSeqNoCompleted(long seqNo) {
notifyIfBlocked();
}
if (closed || failureCollector.hasFailure()) {
discardPages();
discardResults();
}
}

Expand All @@ -151,12 +151,12 @@ private void notifyIfBlocked() {
private void checkFailure() {
Exception e = failureCollector.getFailure();
if (e != null) {
discardPages();
discardResults();
throw ExceptionsHelper.convertToRuntime(e);
}
}

private void discardPages() {
private void discardResults() {
long nextCheckpoint;
while ((nextCheckpoint = checkpoint.getPersistedCheckpoint() + 1) <= checkpoint.getProcessedCheckpoint()) {
Result result = buffers.remove(nextCheckpoint);
Expand All @@ -171,7 +171,7 @@ private void discardPages() {
public final void close() {
finish();
closed = true;
discardPages();
discardResults();
doClose();
}

Expand Down Expand Up @@ -239,6 +239,7 @@ public final Operator.Status status() {
}

protected Operator.Status status(long receivedPages, long completedPages, long totalTimeInMillis) {
// NOCOMMIT this is wrong - completedPages is the number of results, not pages.
return new Status(receivedPages, completedPages, totalTimeInMillis);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,12 +138,12 @@ public class RightChunkedLeftJoin implements Releasable {
*/
private int next = 0;

RightChunkedLeftJoin(Page leftHand, int mergedElementCounts) {
public RightChunkedLeftJoin(Page leftHand, int mergedElementCounts) {
this.leftHand = leftHand;
this.mergedElementCount = mergedElementCounts;
}

Page join(Page rightHand) {
public Page join(Page rightHand) {
IntVector positions = rightHand.<IntBlock>getBlock(0).asVector();
if (positions.getInt(0) < next - 1) {
throw new IllegalArgumentException("maximum overlap is one position");
Expand Down Expand Up @@ -209,7 +209,7 @@ Page join(Page rightHand) {
}
}

Optional<Page> noMoreRightHandPages() {
public Optional<Page> noMoreRightHandPages() {
if (next == leftHand.getPositionCount()) {
return Optional.empty();
}
Expand Down Expand Up @@ -237,6 +237,14 @@ Optional<Page> noMoreRightHandPages() {
}
}

/**
* Release this on <strong>any</strong> thread, rather than just the thread that built it.
*/
public void releaseOnAnyThread() {
leftHand.allowPassingToDifferentDriver();
leftHand.releaseBlocks();
}

@Override
public void close() {
Releasables.close(leftHand::releaseBlocks);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
Expand All @@ -80,34 +81,33 @@ public void testLookupIndex() throws IOException {
/**
* Tests when multiple results match.
*/
@AwaitsFix(bugUrl = "fixing real soon now")
public void testLookupIndexMultiResults() throws IOException {
runLookup(new UsingSingleLookupTable(new Object[] { "aa", new String[] { "bb", "ff" }, "cc", "dd" }));
runLookup(new UsingSingleLookupTable(new String[] { "aa", "bb", "bb", "dd" }));
}

interface PopulateIndices {
void populate(int docCount, List<String> expected) throws IOException;
}

class UsingSingleLookupTable implements PopulateIndices {
private final Object[] lookupData;
private final Map<String, List<Integer>> matches = new HashMap<>();
private final String[] lookupData;

UsingSingleLookupTable(Object[] lookupData) {
UsingSingleLookupTable(String[] lookupData) {
this.lookupData = lookupData;
for (int i = 0; i < lookupData.length; i++) {
matches.computeIfAbsent(lookupData[i], k -> new ArrayList<>()).add(i);
}
}

@Override
public void populate(int docCount, List<String> expected) throws IOException {
public void populate(int docCount, List<String> expected) {
List<IndexRequestBuilder> docs = new ArrayList<>();
for (int i = 0; i < docCount; i++) {
docs.add(client().prepareIndex("source").setSource(Map.of("data", lookupData[i % lookupData.length])));
Object d = lookupData[i % lookupData.length];
if (d instanceof String s) {
expected.add(s + ":" + (i % lookupData.length));
} else if (d instanceof String[] ss) {
for (String s : ss) {
expected.add(s + ":" + (i % lookupData.length));
}
String data = lookupData[i % lookupData.length];
docs.add(client().prepareIndex("source").setSource(Map.of("data", data)));
for (Integer match: matches.get(data)) {
expected.add(data + ":" + match);
}
}
for (int i = 0; i < lookupData.length; i++) {
Expand Down Expand Up @@ -230,6 +230,7 @@ private void runLookup(PopulateIndices populateIndices) throws IOException {
List.of(reader.get(driverContext), lookup.get(driverContext)),
new PageConsumerOperator(page -> {
try {
System.err.println(docCount + " ADSFADFDAFADSF " + page);
BytesRefVector dataBlock = page.<BytesRefBlock>getBlock(1).asVector();
LongVector loadedBlock = page.<LongBlock>getBlock(2).asVector();
for (int p = 0; p < page.getPositionCount(); p++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.elasticsearch.compute.operator.DriverContext;
import org.elasticsearch.compute.operator.Operator;
import org.elasticsearch.compute.operator.OutputOperator;
import org.elasticsearch.compute.operator.ProjectOperator;
import org.elasticsearch.compute.operator.Warnings;
import org.elasticsearch.compute.operator.lookup.EnrichQuerySourceOperator;
import org.elasticsearch.compute.operator.lookup.MergePositionsOperator;
Expand Down Expand Up @@ -137,6 +138,7 @@ abstract class AbstractLookupService<R extends AbstractLookupService.Request, T
private final BigArrays bigArrays;
private final BlockFactory blockFactory;
private final LocalCircuitBreaker.SizeSettings localBreakerSettings;
private final boolean mergePages;

AbstractLookupService(
String actionName,
Expand All @@ -145,6 +147,7 @@ abstract class AbstractLookupService<R extends AbstractLookupService.Request, T
TransportService transportService,
BigArrays bigArrays,
BlockFactory blockFactory,
boolean mergePages,
CheckedBiFunction<StreamInput, BlockFactory, T, IOException> readRequest
) {
this.actionName = actionName;
Expand All @@ -155,6 +158,7 @@ abstract class AbstractLookupService<R extends AbstractLookupService.Request, T
this.bigArrays = bigArrays;
this.blockFactory = blockFactory;
this.localBreakerSettings = new LocalCircuitBreaker.SizeSettings(clusterService.getSettings());
this.mergePages = mergePages;
transportService.registerRequestHandler(
actionName,
transportService.getThreadPool().executor(EsqlPlugin.ESQL_WORKER_THREAD_POOL_NAME),
Expand Down Expand Up @@ -323,31 +327,31 @@ private void doLookup(T request, CancellableTask task, ActionListener<List<Page>
mergingTypes[i] = PlannerUtils.toElementType(request.extractFields.get(i).dataType());
}
final int[] mergingChannels = IntStream.range(0, request.extractFields.size()).map(i -> i + 2).toArray();
final MergePositionsOperator mergePositionsOperator;
final Operator finishPages;
final OrdinalBytesRefBlock ordinalsBytesRefBlock;
if (inputBlock instanceof BytesRefBlock bytesRefBlock && (ordinalsBytesRefBlock = bytesRefBlock.asOrdinals()) != null) {
if (mergePages // TODO fix this faster branch with mergePages == false
&& inputBlock instanceof BytesRefBlock bytesRefBlock
&& (ordinalsBytesRefBlock = bytesRefBlock.asOrdinals()) != null) {

inputBlock = ordinalsBytesRefBlock.getDictionaryVector().asBlock();
var selectedPositions = ordinalsBytesRefBlock.getOrdinalsBlock();
mergePositionsOperator = new MergePositionsOperator(
1,
mergingChannels,
mergingTypes,
selectedPositions,
driverContext.blockFactory()
);

finishPages = new MergePositionsOperator(1, mergingChannels, mergingTypes, selectedPositions, driverContext.blockFactory());
} else {
try (var selectedPositions = IntVector.range(0, inputBlock.getPositionCount(), blockFactory).asBlock()) {
mergePositionsOperator = new MergePositionsOperator(
1,
mergingChannels,
mergingTypes,
selectedPositions,
driverContext.blockFactory()
);
if (mergePages) {
try (var selectedPositions = IntVector.range(0, inputBlock.getPositionCount(), blockFactory).asBlock()) {
finishPages = new MergePositionsOperator(
1,
mergingChannels,
mergingTypes,
selectedPositions,
driverContext.blockFactory()
);
}
} else {
finishPages = dropDocBlockOperator(request.extractFields);
}
}
releasables.add(mergePositionsOperator);
releasables.add(finishPages);
SearchExecutionContext searchExecutionContext = searchContext.getSearchExecutionContext();
QueryList queryList = queryList(request, searchExecutionContext, inputBlock, request.inputDataType);
var warnings = Warnings.createWarnings(
Expand Down Expand Up @@ -377,7 +381,7 @@ private void doLookup(T request, CancellableTask task, ActionListener<List<Page>
driverContext,
request::toString,
queryOperator,
List.of(extractFieldsOperator, mergePositionsOperator),
List.of(extractFieldsOperator, finishPages),
outputOperator,
Driver.DEFAULT_STATUS_INTERVAL,
Releasables.wrap(searchContext, localBreaker)
Expand Down Expand Up @@ -442,6 +446,16 @@ private static Operator extractFieldsOperator(
);
}

private Operator dropDocBlockOperator(List<NamedExpression> extractFields) {
// Drop just the first block, keeping the remaining
int end = extractFields.size() + 1;
List<Integer> projection = new ArrayList<>(end);
for (int i = 1; i <= end; i++) {
projection.add(i);
}
return new ProjectOperator(projection);
}

private Page createNullResponse(int positionCount, List<NamedExpression> extractFields) {
final Block[] blocks = new Block[extractFields.size()];
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,16 @@ public EnrichLookupService(
BigArrays bigArrays,
BlockFactory blockFactory
) {
super(LOOKUP_ACTION_NAME, clusterService, searchService, transportService, bigArrays, blockFactory, TransportRequest::readFrom);
super(
LOOKUP_ACTION_NAME,
clusterService,
searchService,
transportService,
bigArrays,
blockFactory,
true,
TransportRequest::readFrom
);
}

@Override
Expand Down
Loading

0 comments on commit e81febf

Please sign in to comment.