Skip to content

Commit

Permalink
NOCOMMITS
Browse files Browse the repository at this point in the history
  • Loading branch information
nik9000 committed Jan 2, 2025
1 parent 4de82c1 commit 4900c75
Show file tree
Hide file tree
Showing 3 changed files with 99 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,6 @@ public final Operator.Status status() {
}

protected Operator.Status status(long receivedPages, long completedPages, long totalTimeInMillis) {
// NOCOMMIT this is wrong - completedPages is the number of results, not pages.
return new Status(receivedPages, completedPages, totalTimeInMillis);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,10 @@ public Operator get(DriverContext driverContext) {
private final List<NamedExpression> loadFields;
private final Source source;
private long totalTerms = 0L;
/**
* Total number of pages emitted by this {@link Operator}.
*/
private long emittedPages = 0L;
/**
* The ongoing join or {@code null} none is ongoing at the moment.
*/
Expand Down Expand Up @@ -131,7 +135,6 @@ protected void performAsync(Page inputPage, ActionListener<OngoingJoin> listener
loadFields,
source
);
// NOCOMMIT join pages with the operator dude
lookupService.lookupAsync(
request,
parentTask,
Expand All @@ -152,6 +155,7 @@ public Page getOutput() {
if (ongoing.itr.hasNext()) {
// There's more to do in the ongoing join.
Page right = ongoing.itr.next();
emittedPages++;
try {
return ongoing.join.join(right);
} finally {
Expand All @@ -162,7 +166,11 @@ public Page getOutput() {
Optional<Page> remaining = ongoing.join.noMoreRightHandPages();
ongoing.close();
ongoing = null;
return remaining.orElse(null);
if (remaining.isEmpty()) {
return null;
}
emittedPages++;
return remaining.get();
}

@Override
Expand Down Expand Up @@ -207,8 +215,7 @@ protected void doClose() {

@Override
protected Operator.Status status(long receivedPages, long completedPages, long totalTimeInMillis) {
// NOCOMMIT this is wrong - completedPages is the number of results, not pages.
return new LookupFromIndexOperator.Status(receivedPages, completedPages, totalTimeInMillis, totalTerms);
return new LookupFromIndexOperator.Status(receivedPages, completedPages, totalTimeInMillis, totalTerms, emittedPages);
}

public static class Status extends AsyncOperator.Status {
Expand All @@ -218,34 +225,50 @@ public static class Status extends AsyncOperator.Status {
Status::new
);

final long totalTerms;
private final long totalTerms;
/**
* Total number of pages emitted by this {@link Operator}.
*/
private final long emittedPages;

Status(long receivedPages, long completedPages, long totalTimeInMillis, long totalTerms) {
Status(long receivedPages, long completedPages, long totalTimeInMillis, long totalTerms, long emittedPages) {
super(receivedPages, completedPages, totalTimeInMillis);
this.totalTerms = totalTerms;
this.emittedPages = emittedPages;
}

Status(StreamInput in) throws IOException {
super(in);
this.totalTerms = in.readVLong();
this.emittedPages = in.readVLong();
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeVLong(totalTerms);
out.writeVLong(emittedPages);
}

@Override
public String getWriteableName() {
return ENTRY.name;
}

public long emittedPages() {
return emittedPages;
}

public long totalTerms() {
return totalTerms;
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
innerToXContent(builder);
builder.field("total_terms", totalTerms);
super.innerToXContent(builder);
builder.field("emitted_pages", emittedPages());
builder.field("total_terms", totalTerms());
return builder.endObject();
}

Expand All @@ -258,7 +281,7 @@ public boolean equals(Object o) {
return false;
}
Status status = (Status) o;
return totalTerms == status.totalTerms;
return totalTerms == status.totalTerms && emittedPages == status.emittedPages;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.esql.enrich;

import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.test.AbstractWireSerializingTestCase;
import org.elasticsearch.test.ESTestCase;

import java.io.IOException;

import static org.hamcrest.Matchers.equalTo;

public class LookupFromIndexOperatorStatusTests extends AbstractWireSerializingTestCase<LookupFromIndexOperator.Status> {
@Override
protected Writeable.Reader<LookupFromIndexOperator.Status> instanceReader() {
return LookupFromIndexOperator.Status::new;
}

@Override
protected LookupFromIndexOperator.Status createTestInstance() {
return new LookupFromIndexOperator.Status(
randomNonNegativeLong(),
randomNonNegativeLong(),
randomNonNegativeLong(),
randomLongBetween(1, TimeValue.timeValueHours(1).millis()),
randomNonNegativeLong()
);
}

@Override
protected LookupFromIndexOperator.Status mutateInstance(LookupFromIndexOperator.Status in) throws IOException {
long receivedPages = in.receivedPages();
long completedPages = in.completedPages();
long totalTimeInMillis = in.totalTimeInMillis();
long totalTerms = in.totalTerms();
long emittedPages = in.emittedPages();
switch (randomIntBetween(0, 4)) {
case 0 -> receivedPages = randomValueOtherThan(receivedPages, ESTestCase::randomNonNegativeLong);
case 1 -> completedPages = randomValueOtherThan(completedPages, ESTestCase::randomNonNegativeLong);
case 2 -> totalTimeInMillis = randomValueOtherThan(totalTimeInMillis, ESTestCase::randomNonNegativeLong);
case 3 -> totalTerms = randomValueOtherThan(totalTerms, ESTestCase::randomNonNegativeLong);
case 4 -> emittedPages = randomValueOtherThan(emittedPages, ESTestCase::randomNonNegativeLong);
default -> throw new UnsupportedOperationException();
}
return new LookupFromIndexOperator.Status(receivedPages, completedPages, totalTimeInMillis, totalTerms, emittedPages);
}

public void testToXContent() {
var status = new EnrichLookupOperator.Status(100, 50, TimeValue.timeValueSeconds(10).millis(), 120);
String json = Strings.toString(status, true, true);
assertThat(json, equalTo("""
{
"received_pages" : 100,
"completed_pages" : 50,
"total_time_in_millis" : 10000,
"total_time" : "10s",
"total_terms" : 120
}"""));
}
}

0 comments on commit 4900c75

Please sign in to comment.