Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Concurrent thread access to shared doc values #99007

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/99007.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 99007
summary: Cardinality nested in time series doc values bug
area: "Aggregations"
type: bug
issues: []
Original file line number Diff line number Diff line change
@@ -0,0 +1,229 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.aggregations.bucket;

import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.aggregations.AggregationIntegTestCase;
import org.elasticsearch.aggregations.bucket.timeseries.InternalTimeSeries;
import org.elasticsearch.aggregations.bucket.timeseries.TimeSeriesAggregationBuilder;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.mapper.DateFieldMapper;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.query.MatchAllQueryBuilder;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramAggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval;
import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.CardinalityAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.InternalCardinality;
import org.elasticsearch.search.aggregations.metrics.SumAggregationBuilder;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.XContentFactory;
import org.hamcrest.Matchers;
import org.junit.Before;

import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import java.util.function.Supplier;

public class TimeSeriesNestedAggregationsIT extends AggregationIntegTestCase {
private static int numberOfDimensions;
private static int numberOfDocuments;

private static final String FOO_DIM_VALUE = "foo".repeat(10);
private static final String BAR_DIM_VALUE = "bar".repeat(11);
private static final String BAZ_DIM_VALUE = "baz".repeat(12);

@Before
public void setup() throws Exception {
numberOfDimensions = randomIntBetween(10, 20);
final XContentBuilder mapping = timeSeriesIndexMapping();
long startMillis = DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.parseMillis("2023-01-01T00:00:00Z");
long endMillis = DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.parseMillis("2023-01-31T00:00:00Z");
numberOfDocuments = randomIntBetween(100, 200);
final Iterator<Long> timestamps = getTimestamps(startMillis, endMillis, numberOfDocuments);
// NOTE: use also the last (changing) dimension so to make sure documents are not indexed all in the same shard.
final String[] routingDimensions = new String[] { "dim_000000", formatDim(numberOfDimensions - 1) };
assertTrue(prepareTimeSeriesIndex(mapping, startMillis, endMillis, routingDimensions).isAcknowledged());
logger.info("Dimensions: " + numberOfDimensions + " docs: " + numberOfDocuments + " start: " + startMillis + " end: " + endMillis);

final BulkRequestBuilder bulkIndexRequest = client().prepareBulk();
for (int docId = 0; docId < numberOfDocuments; docId++) {
final XContentBuilder document = timeSeriesDocument(FOO_DIM_VALUE, BAR_DIM_VALUE, BAZ_DIM_VALUE, docId, timestamps::next);
bulkIndexRequest.add(client().prepareIndex("index").setOpType(DocWriteRequest.OpType.CREATE).setSource(document));
}

final BulkResponse bulkIndexResponse = bulkIndexRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get();
assertFalse(bulkIndexResponse.hasFailures());
assertEquals(RestStatus.OK.getStatus(), client().admin().indices().prepareFlush("index").get().getStatus().getStatus());
}

private static XContentBuilder timeSeriesDocument(
final String fooDimValue,
final String barDimValue,
final String bazDimValue,
int docId,
final Supplier<Long> timestampSupplier
) throws IOException {
final XContentBuilder docSource = XContentFactory.jsonBuilder();
docSource.startObject();
// NOTE: we assign dimensions in such a way that almost all of them have the same value but the last one.
// This way we are going to have just two time series (and two distinct tsid) and the last dimension identifies
// which time series the document belongs to.
for (int dimId = 0; dimId < numberOfDimensions - 1; dimId++) {
docSource.field(formatDim(dimId), fooDimValue);
}
docSource.field(formatDim(numberOfDimensions - 1), docId % 2 == 0 ? barDimValue : bazDimValue);
docSource.field("counter_metric", docId + 1);
docSource.field("gauge_metric", randomDoubleBetween(1000.0, 2000.0, true));
docSource.field("@timestamp", timestampSupplier.get());
docSource.endObject();

return docSource;
}

private CreateIndexResponse prepareTimeSeriesIndex(
final XContentBuilder mapping,
long startMillis,
long endMillis,
final String[] routingDimensions
) {
return prepareCreate("index").setSettings(
Settings.builder()
.put("mode", "time_series")
.put("routing_path", String.join(",", routingDimensions))
.put("index.number_of_shards", randomIntBetween(1, 3))
.put("index.number_of_replicas", randomIntBetween(1, 3))
.put("time_series.start_time", startMillis)
.put("time_series.end_time", endMillis)
.put(MapperService.INDEX_MAPPING_DIMENSION_FIELDS_LIMIT_SETTING.getKey(), numberOfDimensions + 1)
.put(MapperService.INDEX_MAPPING_FIELD_NAME_LENGTH_LIMIT_SETTING.getKey(), 4192)
.build()
).setMapping(mapping).get();
}

private static Iterator<Long> getTimestamps(long startMillis, long endMillis, int numberOfDocs) {
final Set<Long> timestamps = new TreeSet<>();
while (timestamps.size() < numberOfDocs) {
timestamps.add(randomLongBetween(startMillis, endMillis));
}
return timestamps.iterator();
}

private static XContentBuilder timeSeriesIndexMapping() throws IOException {
final XContentBuilder builder = XContentFactory.jsonBuilder();
builder.startObject();
builder.startObject("properties");
for (int i = 0; i < numberOfDimensions; i++) {
builder.startObject(formatDim(i));
builder.field("type", "keyword");
builder.field("time_series_dimension", true);
builder.endObject();
}
builder.startObject("counter_metric");
builder.field("type", "double");
builder.field("time_series_metric", "counter");
builder.endObject();
builder.startObject("gauge_metric");
builder.field("type", "double");
builder.field("time_series_metric", "gauge");
builder.endObject();
builder.endObject(); // properties
builder.endObject();
return builder;
}

private static String formatDim(int dimId) {
return String.format(Locale.ROOT, "dim_%06d", dimId);
}

public void testTimeSeriesAggregation() {
final TimeSeriesAggregationBuilder timeSeries = new TimeSeriesAggregationBuilder("ts");
final SearchResponse aggregationResponse = client().prepareSearch("index").addAggregation(timeSeries).setSize(0).get();
final InternalTimeSeries ts = (InternalTimeSeries) aggregationResponse.getAggregations().asList().get(0);
assertTimeSeriesAggregation(ts);
}

public void testSumByTsid() {
final TimeSeriesAggregationBuilder timeSeries = new TimeSeriesAggregationBuilder("ts").subAggregation(
new SumAggregationBuilder("sum").field("gauge_metric")
);
final SearchResponse searchResponse = client().prepareSearch("index").setQuery(new MatchAllQueryBuilder()).get();
assertNotEquals(numberOfDocuments, searchResponse.getHits().getHits().length);
final SearchResponse aggregationResponse = client().prepareSearch("index").addAggregation(timeSeries).setSize(0).get();
final InternalTimeSeries ts = (InternalTimeSeries) aggregationResponse.getAggregations().asList().get(0);
assertTimeSeriesAggregation(ts);
}

public void testTermsByTsid() {
final TimeSeriesAggregationBuilder timeSeries = new TimeSeriesAggregationBuilder("ts").subAggregation(
new TermsAggregationBuilder("terms").field("dim_0")
);
final SearchResponse aggregationResponse = client().prepareSearch("index").addAggregation(timeSeries).setSize(0).get();
final InternalTimeSeries ts = (InternalTimeSeries) aggregationResponse.getAggregations().asList().get(0);
assertTimeSeriesAggregation(ts);
}

public void testDateHistogramByTsid() {
final TimeSeriesAggregationBuilder timeSeries = new TimeSeriesAggregationBuilder("ts").subAggregation(
new DateHistogramAggregationBuilder("date_histogram").field("@timestamp").calendarInterval(DateHistogramInterval.HOUR)
);
final SearchResponse aggregationResponse = client().prepareSearch("index").addAggregation(timeSeries).setSize(0).get();
final InternalTimeSeries ts = (InternalTimeSeries) aggregationResponse.getAggregations().asList().get(0);
assertTimeSeriesAggregation(ts);
}

public void testCardinalityByTsid() {
final TimeSeriesAggregationBuilder timeSeries = new TimeSeriesAggregationBuilder("ts").subAggregation(
new CardinalityAggregationBuilder("dim_n_cardinality").field(formatDim(numberOfDimensions - 1))
);
final SearchResponse aggregationResponse = client().prepareSearch("index").addAggregation(timeSeries).setSize(0).get();
final InternalTimeSeries ts = (InternalTimeSeries) aggregationResponse.getAggregations().asList().get(0);
assertTimeSeriesAggregation(ts);
ts.getBuckets().forEach(bucket -> { assertCardinality(bucket.getAggregations().get("dim_n_cardinality"), 1); });
}

private static void assertTimeSeriesAggregation(final InternalTimeSeries timeSeriesAggregation) {
final List<Map<String, Object>> dimensions = timeSeriesAggregation.getBuckets()
.stream()
.map(InternalTimeSeries.InternalBucket::getKey)
.toList();
// NOTE: only two time series expected as a result of having just two distinct values for the last dimension
assertEquals(2, dimensions.size());

final Map<String, Object> firstTimeSeries = dimensions.get(0);
final Map<String, Object> secondTimeSeries = dimensions.get(1);

assertTsid(firstTimeSeries);
assertTsid(secondTimeSeries);
}

private static void assertTsid(final Map<String, Object> timeSeries) {
timeSeries.entrySet().stream().sorted(Map.Entry.comparingByKey()).limit(numberOfDimensions - 2).forEach(entry -> {
assertThat(entry.getValue().toString(), Matchers.equalTo(FOO_DIM_VALUE));
});
timeSeries.entrySet().stream().sorted(Map.Entry.comparingByKey()).skip(numberOfDimensions - 1).forEach(entry -> {
assertThat(entry.getValue().toString(), Matchers.oneOf(BAR_DIM_VALUE, BAZ_DIM_VALUE));
});
}

private static void assertCardinality(final InternalCardinality cardinalityAggregation, int expectedCardinality) {
assertEquals(expectedCardinality, cardinalityAggregation.getValue());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,6 @@ private static void executeInSortOrder(SearchContext context, BucketCollector co
searcher.setProfiler(context);
try {
searcher.search(context.rewrittenQuery(), collector);
collector.postCollection();
} catch (IOException e) {
throw new AggregationExecutionException("Could not perform time series aggregation", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,8 @@ public class GlobalOrdCardinalityAggregator extends NumericMetricsAggregator.Sin
// Build at post-collection phase
@Nullable
private HyperLogLogPlusPlusSparse counts;
private SortedSetDocValues values;
private ObjectArray<BitArray> visitedOrds;
private SortedSetDocValues values;

public GlobalOrdCardinalityAggregator(
String name,
Expand Down Expand Up @@ -211,6 +211,7 @@ public LeafBucketCollector getLeafCollector(AggregationExecutionContext aggCtx,
if (maxOrd <= MAX_FIELD_CARDINALITY_FOR_DYNAMIC_PRUNING || numNonVisitedOrds <= MAX_TERMS_FOR_DYNAMIC_PRUNING) {
dynamicPruningAttempts++;
return new LeafBucketCollector() {
final SortedSetDocValues docValues = values;

final BitArray bits;
final CompetitiveIterator competitiveIterator;
Expand All @@ -234,8 +235,8 @@ public LeafBucketCollector getLeafCollector(AggregationExecutionContext aggCtx,

@Override
public void collect(int doc, long bucketOrd) throws IOException {
if (values.advanceExact(doc)) {
for (long ord = values.nextOrd(); ord != SortedSetDocValues.NO_MORE_ORDS; ord = values.nextOrd()) {
if (docValues.advanceExact(doc)) {
for (long ord = docValues.nextOrd(); ord != SortedSetDocValues.NO_MORE_ORDS; ord = docValues.nextOrd()) {
if (bits.getAndSet(ord) == false) {
competitiveIterator.onVisitedOrdinal(ord);
}
Expand Down Expand Up @@ -267,6 +268,8 @@ public CompetitiveIterator competitiveIterator() {

bruteForce++;
return new LeafBucketCollector() {
final SortedSetDocValues docValues = values;

@Override
public void collect(int doc, long bucketOrd) throws IOException {
visitedOrds = bigArrays.grow(visitedOrds, bucketOrd + 1);
Expand All @@ -275,8 +278,8 @@ public void collect(int doc, long bucketOrd) throws IOException {
bits = new BitArray(maxOrd, bigArrays);
visitedOrds.set(bucketOrd, bits);
}
if (values.advanceExact(doc)) {
for (long ord = values.nextOrd(); ord != SortedSetDocValues.NO_MORE_ORDS; ord = values.nextOrd()) {
if (docValues.advanceExact(doc)) {
for (long ord = docValues.nextOrd(); ord != SortedSetDocValues.NO_MORE_ORDS; ord = docValues.nextOrd()) {
bits.set((int) ord);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,11 +95,13 @@ public void search(Query query, BucketCollector bucketCollector) throws IOExcept
Weight weight = searcher.createWeight(query, bucketCollector.scoreMode(), 1);
if (searcher.getExecutor() == null) {
search(bucketCollector, weight);
bucketCollector.postCollection();
return;
}
// offload to the search worker thread pool whenever possible. It will be null only when search.worker_threads_enabled is false
RunnableFuture<Void> task = new FutureTask<>(() -> {
search(bucketCollector, weight);
bucketCollector.postCollection();
return null;
});
searcher.getExecutor().execute(task);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -586,8 +586,8 @@ private <A extends InternalAggregation, C extends Aggregator> A searchAndReduce(
} else {
Weight weight = subSearcher.createWeight(rewritten, ScoreMode.COMPLETE, 1f);
subSearcher.search(weight, a.asCollector());
a.postCollection();
}
a.postCollection();
assertEquals(shouldBeCached, context.isCacheable());
internalAggs.add(a.buildTopLevel());
} finally {
Expand All @@ -612,7 +612,6 @@ private <A extends InternalAggregation, C extends Aggregator> A searchAndReduce(
root.preCollection();
aggregators.add(root);
new TimeSeriesIndexSearcher(searcher, List.of()).search(rewritten, MultiBucketCollector.wrap(true, List.of(root)));
root.postCollection();
} else {
CollectorManager<Collector, Void> collectorManager = new CollectorManager<>() {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,6 @@ public DownsampleIndexerAction.ShardDownsampleResponse execute() throws IOExcept
TimeSeriesBucketCollector bucketCollector = new TimeSeriesBucketCollector(bulkProcessor);
bucketCollector.preCollection();
timeSeriesSearcher.search(initialStateQuery, bucketCollector);
bucketCollector.postCollection();
}

logger.info(
Expand Down