Skip to content

Commit

Permalink
Move FieldValueFetcher into server module
Browse files Browse the repository at this point in the history
Moved FieldValueFetcher in the mappers package so
that it can be retrieved through the MappedFieldType
and remove the instanceof clause
  • Loading branch information
csoulios committed Sep 20, 2022
1 parent 6c1597e commit 69664b0
Show file tree
Hide file tree
Showing 5 changed files with 85 additions and 97 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.index.mapper;

import org.apache.lucene.index.LeafReaderContext;
import org.elasticsearch.index.fielddata.FormattedDocValues;

import java.util.Map;

/**
* Interface for retrieving field values by reading field data
*/
public interface FieldValueFetcher {

/**
* Return the field data leaf reader for one or more fields.
* @return a map with the field name as a key and field data leaf reader as value
*/
Map<String, FormattedDocValues> getLeaves(LeafReaderContext context);
}
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,14 @@ public IndexFieldData.Builder fielddataBuilder(FieldDataContext fieldDataContext
*/
public abstract ValueFetcher valueFetcher(SearchExecutionContext context, @Nullable String format);

/**
* Create a helper class to fetch value field data.
*/
public FieldValueFetcher fieldValueFetcher(SearchExecutionContext context) {
IndexFieldData<?> fieldData = context.getForField(this, MappedFieldType.FielddataOperation.SEARCH);
return ctx -> Map.of(name(), fieldData.load(ctx).getFormattedValues(docValueFormat(null, null)));
}

/** Returns the name of this type, as would be specified in mapping properties */
public abstract String typeName();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,15 @@
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.index.fielddata.FieldDataContext;
import org.elasticsearch.index.fielddata.FormattedDocValues;
import org.elasticsearch.index.fielddata.IndexFieldData;
import org.elasticsearch.index.fielddata.ScriptDocValues;
import org.elasticsearch.index.fielddata.ScriptDocValues.DoublesSupplier;
import org.elasticsearch.index.fielddata.SortedBinaryDocValues;
import org.elasticsearch.index.fielddata.SortedNumericDoubleValues;
import org.elasticsearch.index.mapper.DocumentParserContext;
import org.elasticsearch.index.mapper.FieldMapper;
import org.elasticsearch.index.mapper.FieldValueFetcher;
import org.elasticsearch.index.mapper.MappedFieldType;
import org.elasticsearch.index.mapper.Mapper;
import org.elasticsearch.index.mapper.MapperBuilderContext;
Expand Down Expand Up @@ -64,6 +66,7 @@
import java.util.EnumMap;
import java.util.EnumSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand Down Expand Up @@ -495,6 +498,18 @@ public ValueFetcher valueFetcher(SearchExecutionContext context, String format)
return SourceValueFetcher.identity(name(), context, format);
}

@Override
public FieldValueFetcher fieldValueFetcher(SearchExecutionContext searchExecutionContext) {
return ctx -> {
Map<String, FormattedDocValues> docValues = new LinkedHashMap<>();
for (NumberFieldMapper.NumberFieldType metricSubField : getMetricFields().values()) {
IndexFieldData<?> fieldData = searchExecutionContext.getForField(metricSubField, FielddataOperation.SEARCH);
docValues.put(metricSubField.name(), fieldData.load(ctx).getFormattedValues(metricSubField.docValueFormat(null, null)));
}
return Collections.unmodifiableMap(docValues);
};
}

/**
* If field is a time series metric field, returns its metric type
* @return the metric type or null
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.elasticsearch.index.fielddata.FormattedDocValues;
import org.elasticsearch.index.mapper.DataStreamTimestampFieldMapper;
import org.elasticsearch.index.mapper.DocCountFieldMapper;
import org.elasticsearch.index.mapper.FieldValueFetcher;
import org.elasticsearch.index.mapper.MappedFieldType;
import org.elasticsearch.index.mapper.TimeSeriesIdFieldMapper;
import org.elasticsearch.index.query.SearchExecutionContext;
Expand All @@ -51,9 +52,11 @@

import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand Down Expand Up @@ -89,7 +92,7 @@ class RollupShardIndexer {
private final String[] dimensionFields;
private final String[] metricFields;
private final String[] labelFields;
private final Map<String, FieldValueFetcher> fieldValueFetchers;
private final List<FieldValueFetcher> fieldValueFetchers;
private final AtomicLong numSent = new AtomicLong();
private final AtomicLong numIndexed = new AtomicLong();
private final AtomicLong numFailed = new AtomicLong();
Expand Down Expand Up @@ -126,7 +129,7 @@ class RollupShardIndexer {
this.timestampField = searchExecutionContext.getFieldType(DataStreamTimestampFieldMapper.DEFAULT_PATH);
this.timestampFormat = timestampField.docValueFormat(null, null);
this.rounding = config.createRounding();
this.fieldValueFetchers = FieldValueFetcher.create(searchExecutionContext, ArrayUtils.concat(metricFields, labelFields));
this.fieldValueFetchers = getFieldValueFetchers(searchExecutionContext, ArrayUtils.concat(metricFields, labelFields));
toClose = null;
} finally {
IOUtils.closeWhileHandlingException(toClose);
Expand Down Expand Up @@ -168,6 +171,36 @@ public RollupIndexerAction.ShardRollupResponse execute() throws IOException {
return new RollupIndexerAction.ShardRollupResponse(indexShard.shardId(), numIndexed.get());
}

/**
* Retrieve field value fetchers for a list of fields.
*/
private static List<FieldValueFetcher> getFieldValueFetchers(SearchExecutionContext context, String[] fields) {
List<FieldValueFetcher> fetchers = new ArrayList<>();
for (String field : fields) {
MappedFieldType fieldType = context.getFieldType(field);
assert fieldType != null : "Unknown field type for field: [" + field + "]";
fetchers.add(fieldType.fieldValueFetcher(context));
}
return Collections.unmodifiableList(fetchers);
}

private static Map<String, FormattedDocValues> docValuesFetchers(
LeafReaderContext ctx,
SearchExecutionContext searchContext,
List<FieldValueFetcher> fieldValueFetchers
) {
final Map<String, FormattedDocValues> docValuesFetchers = new LinkedHashMap<>(fieldValueFetchers.size());
for (FieldValueFetcher fetcher : fieldValueFetchers) {
for (Map.Entry<String, FormattedDocValues> e : fetcher.getLeaves(ctx).entrySet()) {
if (searchContext.fieldExistsInIndex(e.getKey())) {
docValuesFetchers.put(e.getKey(), e.getValue());
}
}
docValuesFetchers.putAll(fetcher.getLeaves(ctx));
}
return Collections.unmodifiableMap(docValuesFetchers);
}

private BulkProcessor createBulkProcessor() {
final BulkProcessor.Listener listener = new BulkProcessor.Listener() {
@Override
Expand Down Expand Up @@ -229,7 +262,7 @@ public LeafBucketCollector getLeafCollector(final AggregationExecutionContext ag
final LeafReaderContext ctx = aggCtx.getLeafReaderContext();
final DocCountProvider docCountProvider = new DocCountProvider();
docCountProvider.setLeafReaderContext(ctx);
final Map<String, FormattedDocValues> docValuesFetchers = FieldValueFetcher.docValuesFetchers(ctx, fieldValueFetchers);
final Map<String, FormattedDocValues> docValuesFetchers = docValuesFetchers(ctx, searchExecutionContext, fieldValueFetchers);

return new LeafBucketCollector() {
@Override
Expand Down

0 comments on commit 69664b0

Please sign in to comment.