Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Transform] Do not deduce mappings in latest transform function #66523

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,8 @@ setup:
- match: { generated_dest_index.mappings.properties.airline.type: "keyword" }
- match: { generated_dest_index.mappings.properties.by-hour.type: "date" }
- match: { generated_dest_index.mappings.properties.avg_response.type: "double" }
- match: { generated_dest_index.mappings.properties.time.properties.max.type: "date" }
- match: { generated_dest_index.mappings.properties.time.properties.min.type: "date" }
- match: { generated_dest_index.mappings.properties.time\.max.type: "date" }
- match: { generated_dest_index.mappings.properties.time\.min.type: "date" }

- do:
ingest.put_pipeline:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,14 @@
import java.util.Map;
import java.util.stream.Stream;

import static java.util.Collections.emptyMap;
import static java.util.Collections.singletonMap;
import static java.util.stream.Collectors.toList;
import static java.util.stream.Collectors.toMap;
import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasEntry;
import static org.hamcrest.Matchers.hasKey;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;

Expand Down Expand Up @@ -133,13 +136,9 @@ public void testLatest() throws Exception {
try (RestHighLevelClient restClient = new TestRestHighLevelClient()) {
restClient.indices().refresh(new RefreshRequest(destIndexName), RequestOptions.DEFAULT);
// Verify destination index mappings
GetMappingsResponse sourceIndexMapping =
restClient.indices().getMapping(new GetMappingsRequest().indices(SOURCE_INDEX_NAME), RequestOptions.DEFAULT);
GetMappingsResponse destIndexMapping =
restClient.indices().getMapping(new GetMappingsRequest().indices(destIndexName), RequestOptions.DEFAULT);
assertThat(
destIndexMapping.mappings().get(destIndexName).sourceAsMap().get("properties"),
is(equalTo(sourceIndexMapping.mappings().get(SOURCE_INDEX_NAME).sourceAsMap().get("properties"))));
assertThat(destIndexMapping.mappings().get(destIndexName).sourceAsMap(), allOf(hasKey("_meta"), hasKey("properties")));
// Verify destination index contents
SearchResponse searchResponse =
restClient.search(new SearchRequest(destIndexName).source(new SearchSourceBuilder().size(1000)), RequestOptions.DEFAULT);
Expand All @@ -166,27 +165,10 @@ public void testLatestPreview() throws Exception {
PreviewTransformResponse previewResponse =
restClient.transform().previewTransform(new PreviewTransformRequest(transformConfig), RequestOptions.DEFAULT);
// Verify preview mappings
GetMappingsResponse sourceIndexMapping =
restClient.indices().getMapping(new GetMappingsRequest().indices(SOURCE_INDEX_NAME), RequestOptions.DEFAULT);
assertThat(
// Mappings we get from preview sometimes contain redundant { "type": "object" } entries.
// We clear them here to be able to compare with the GetMappingsAction output.
clearDefaultObjectType(previewResponse.getMappings().get("properties")),
is(equalTo(sourceIndexMapping.mappings().get(SOURCE_INDEX_NAME).sourceAsMap().get("properties"))));
assertThat(previewResponse.getMappings(), allOf(hasKey("_meta"), hasEntry("properties", emptyMap())));
// Verify preview contents
assertThat(previewResponse.getDocs(), hasSize(NUM_USERS + 1));
assertThat(previewResponse.getDocs(), containsInAnyOrder(EXPECTED_DEST_INDEX_ROWS));
}
}

private static Object clearDefaultObjectType(Object obj) {
if (obj instanceof Map == false) {
return obj;
}
@SuppressWarnings("unchecked")
Map<String, Object> map = (Map<String, Object>) obj;
return map.entrySet().stream()
.filter(entry -> (entry.getKey().equals("type") && entry.getValue().equals("object")) == false)
.collect(toMap(entry -> entry.getKey(), entry -> clearDefaultObjectType(entry.getValue())));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;

import static org.hamcrest.Matchers.equalTo;
Expand All @@ -39,12 +38,15 @@ public class LatestContinuousIT extends ContinuousTestCase {

private static final String NAME = "continuous-latest-test";

private static final String EVENT_FIELD = "event";
private static final String TIMESTAMP_FIELD = "timestamp";

private static final String MISSING_BUCKET_KEY = "~~NULL~~"; // ensure that this key is last after sorting

public LatestContinuousIT() {}
private final String eventField;
private final String timestampField;

public LatestContinuousIT() {
eventField = randomFrom(TERMS_FIELDS);
timestampField = randomFrom(TIMESTAMP_FIELDS);
}

@Override
public TransformConfig createConfig() {
Expand All @@ -55,8 +57,8 @@ public TransformConfig createConfig() {
.setDest(new DestConfig(NAME, INGEST_PIPELINE))
.setLatestConfig(
LatestConfig.builder()
.setUniqueKey(EVENT_FIELD)
.setSort(TIMESTAMP_FIELD)
.setUniqueKey(eventField)
.setSort(timestampField)
.build());
addCommonBuilderParameters(transformConfigBuilder);
return transformConfigBuilder.build();
Expand All @@ -79,17 +81,20 @@ public void testIteration(int iteration, Set<String> modifiedEvents) throws IOEx
.aggregation(
new TermsAggregationBuilder("by_event")
.size(1000)
.field(EVENT_FIELD)
.field(eventField)
.missing(MISSING_BUCKET_KEY)
.order(BucketOrder.key(true))
.subAggregation(AggregationBuilders.max("max_timestamp").field(TIMESTAMP_FIELD))));
.subAggregation(AggregationBuilders.max("max_timestamp").field(timestampField))));
SearchResponse searchResponseSource = search(searchRequestSource);

SearchRequest searchRequestDest =
new SearchRequest(NAME)
.allowPartialSearchResults(false)
.indicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN)
.source(new SearchSourceBuilder().size(1000).sort(EVENT_FIELD));
// In destination index we don't have access to runtime fields from source index, let's use what we have i.e.: event.keyword
// and assume the sorting order will be the same (it is true as the runtime field applies "toUpperCase()" which preserves
// sorting order)
.source(new SearchSourceBuilder().size(1000).sort("event.keyword"));
SearchResponse searchResponseDest = search(searchRequestDest);

List<? extends Bucket> buckets = ((Terms) searchResponseSource.getAggregations().get("by_event")).getBuckets();
Expand All @@ -110,12 +115,13 @@ public void testIteration(int iteration, Set<String> modifiedEvents) throws IOEx
Bucket bucket = sourceIterator.next();
SearchHit searchHit = destIterator.next();
Map<String, Object> source = searchHit.getSourceAsMap();
String eventFieldValue = (String) XContentMapValues.extractValue(EVENT_FIELD, source);
String timestampFieldValue = (String) XContentMapValues.extractValue(TIMESTAMP_FIELD, source);

String transformBucketKey =
Optional.ofNullable((String) XContentMapValues.extractValue(EVENT_FIELD, source))
.orElse(MISSING_BUCKET_KEY);
String eventFieldValue = (String) XContentMapValues.extractValue("event", source);
String timestampFieldValue = (String) XContentMapValues.extractValue("timestamp", source);
String transformBucketKey = eventFieldValue != null
// The bucket key in source can be either an ordinary field or a runtime field. When it is runtime field, simulate its
// script ("toUpperCase()") here.
? "event".equals(eventField) ? eventFieldValue : eventFieldValue.toUpperCase()
: MISSING_BUCKET_KEY;

// Verify that the results from the aggregation and the results from the transform are the same.
assertThat(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,36 +15,24 @@
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.mapper.ObjectMapper;
import org.elasticsearch.xpack.core.transform.TransformField;
import org.elasticsearch.xpack.core.transform.TransformMessages;
import org.elasticsearch.xpack.core.transform.transforms.TransformConfig;
import org.elasticsearch.xpack.core.transform.transforms.TransformDestIndexSettings;

import java.time.Clock;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;

import static java.util.Map.Entry.comparingByKey;
import static java.util.Collections.singletonMap;
import static java.util.stream.Collectors.toMap;

public final class TransformIndex {
private static final Logger logger = LogManager.getLogger(TransformIndex.class);

/**
* The list of object types used in the mappings.
* We include {@code null} as an alternative for "object", which is the default.
*/
private static final Set<String> OBJECT_TYPES =
new HashSet<>(Arrays.asList(null, ObjectMapper.CONTENT_TYPE, ObjectMapper.NESTED_CONTENT_TYPE));
private static final String PROPERTIES = "properties";
private static final String FIELDS = "fields";
private static final String META = "_meta";

private TransformIndex() {}
Expand Down Expand Up @@ -150,27 +138,7 @@ private static Settings createSettings() {
* @param mappings A Map of the form {"fieldName": "fieldType"}
*/
static Map<String, Object> createMappingsFromStringMap(Map<String, String> mappings) {
List<Map.Entry<String, String>> sortedMappingsEntries = new ArrayList<>(mappings.entrySet());
// We sort the entry list to make sure that for each (parent, parent.child) pair, parent entry will be processed before child entry.
sortedMappingsEntries.sort(comparingByKey());
Map<String, Object> fieldMappings = new HashMap<>();
for (Map.Entry<String, String> entry : sortedMappingsEntries) {
String[] parts = Strings.tokenizeToStringArray(entry.getKey(), ".");
String type = entry.getValue();
Map<String, Object> current = fieldMappings;
current = diveInto(current, parts[0]);
for (int j = 1; j < parts.length; ++j) {
// Here we decide whether a dot ('.') means inner object or a multi-field.
current = diveInto(current, OBJECT_TYPES.contains(current.get("type")) ? PROPERTIES : FIELDS);
current = diveInto(current, parts[j]);
}
current.put("type", type);
}
return fieldMappings;
}

@SuppressWarnings("unchecked")
private static Map<String, Object> diveInto(Map<String, Object> map, String key) {
return (Map<String, Object>) map.computeIfAbsent(key, k -> new HashMap<>());
return mappings.entrySet().stream()
.collect(toMap(e -> e.getKey(), e -> singletonMap("type", e.getValue())));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.fieldcaps.FieldCapabilitiesAction;
import org.elasticsearch.action.fieldcaps.FieldCapabilitiesRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchAction;
import org.elasticsearch.action.search.SearchRequest;
Expand Down Expand Up @@ -47,6 +45,7 @@
import java.util.Map;
import java.util.stream.Stream;

import static java.util.Collections.emptyMap;
import static java.util.stream.Collectors.toList;

public class Latest implements Function {
Expand Down Expand Up @@ -187,20 +186,7 @@ public void validateConfig(ActionListener<Boolean> listener) {

@Override
public void deduceMappings(Client client, SourceConfig sourceConfig, ActionListener<Map<String, String>> listener) {
FieldCapabilitiesRequest fieldCapabilitiesRequest =
new FieldCapabilitiesRequest().indices(sourceConfig.getIndex())
.fields("*")
.indicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN);
client.execute(
FieldCapabilitiesAction.INSTANCE,
fieldCapabilitiesRequest,
ActionListener.wrap(
response -> {
listener.onResponse(
DocumentConversionUtils.removeInternalFields(DocumentConversionUtils.extractFieldMappings(response)));
},
listener::onFailure)
);
listener.onResponse(emptyMap());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,10 +95,8 @@ public void testCreateMappingsFromStringMap() {
put("a.b", "keyword");
}}),
is(equalTo(new HashMap<>() {{
put("a", new HashMap<>() {{
put("type", "long");
put("fields", singletonMap("b", singletonMap("type", "keyword")));
}});
put("a", singletonMap("type", "long"));
put("a.b", singletonMap("type", "keyword"));
}}))
);
assertThat(
Expand All @@ -108,17 +106,9 @@ public void testCreateMappingsFromStringMap() {
put("a.b.c", "keyword");
}}),
is(equalTo(new HashMap<>() {{
put("a", new HashMap<>() {{
put("type", "long");
put("fields", new HashMap<>() {{
put("b", new HashMap<>() {{
put("type", "text");
put("fields", new HashMap<>() {{
put("c", singletonMap("type", "keyword"));
}});
}});
}});
}});
put("a", singletonMap("type", "long"));
put("a.b", singletonMap("type", "text"));
put("a.b.c", singletonMap("type", "keyword"));
}}))
);
assertThat(
Expand All @@ -133,38 +123,14 @@ public void testCreateMappingsFromStringMap() {
put("f.g.h.i", "text");
}}),
is(equalTo(new HashMap<>() {{
put("a", new HashMap<>() {{
put("type", "object");
put("properties", new HashMap<>() {{
put("b", new HashMap<>() {{
put("type", "long");
}});
}});
}});
put("c", new HashMap<>() {{
put("type", "nested");
put("properties", new HashMap<>() {{
put("d", new HashMap<>() {{
put("type", "boolean");
}});
}});
}});
put("f", new HashMap<>() {{
put("type", "object");
put("properties", new HashMap<>() {{
put("g", new HashMap<>() {{
put("type", "object");
put("properties", new HashMap<>() {{
put("h", new HashMap<>() {{
put("type", "text");
put("fields", new HashMap<>() {{
put("i", singletonMap("type", "text"));
}});
}});
}});
}});
}});
}});
put("a", singletonMap("type", "object"));
put("a.b", singletonMap("type", "long"));
put("c", singletonMap("type", "nested"));
put("c.d", singletonMap("type", "boolean"));
put("f", singletonMap("type", "object"));
put("f.g", singletonMap("type", "object"));
put("f.g.h", singletonMap("type", "text"));
put("f.g.h.i", singletonMap("type", "text"));
}}))
);
}
Expand Down