From d9b2521ecd5b063f48a5953954c6e95d38f15dd6 Mon Sep 17 00:00:00 2001 From: Ketan Verma Date: Tue, 11 May 2021 20:38:28 +0530 Subject: [PATCH] Added more integration tests. Improved timestamp field validation logic. Signed-off-by: Ketan Verma --- .../datastream/DataStreamIndexTemplateIT.java | 58 ++++++ .../datastream/DataStreamRolloverIT.java | 63 ++++++ .../datastream/DataStreamTestCase.java | 127 ++++++++++++ .../indices/datastream/DataStreamUsageIT.java | 182 ++++++++++++++++++ .../index/mapper/DataStreamFieldMapper.java | 13 +- .../mapper/DataStreamFieldMapperTests.java | 156 +++++++++++++++ 6 files changed, 593 insertions(+), 6 deletions(-) create mode 100644 server/src/internalClusterTest/java/org/opensearch/action/admin/indices/datastream/DataStreamIndexTemplateIT.java create mode 100644 server/src/internalClusterTest/java/org/opensearch/action/admin/indices/datastream/DataStreamRolloverIT.java create mode 100644 server/src/internalClusterTest/java/org/opensearch/action/admin/indices/datastream/DataStreamTestCase.java create mode 100644 server/src/internalClusterTest/java/org/opensearch/action/admin/indices/datastream/DataStreamUsageIT.java create mode 100644 server/src/test/java/org/opensearch/index/mapper/DataStreamFieldMapperTests.java diff --git a/server/src/internalClusterTest/java/org/opensearch/action/admin/indices/datastream/DataStreamIndexTemplateIT.java b/server/src/internalClusterTest/java/org/opensearch/action/admin/indices/datastream/DataStreamIndexTemplateIT.java new file mode 100644 index 0000000000000..2ab1093b78d83 --- /dev/null +++ b/server/src/internalClusterTest/java/org/opensearch/action/admin/indices/datastream/DataStreamIndexTemplateIT.java @@ -0,0 +1,58 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.admin.indices.datastream; + +import org.opensearch.common.collect.List; + +import java.util.concurrent.ExecutionException; + +import static org.hamcrest.Matchers.containsString; + +public class DataStreamIndexTemplateIT extends DataStreamTestCase { + + public void testCreateDataStreamIndexTemplate() throws Exception { + // Without the data stream metadata field mapper, data_stream would have been an unknown field in + // the index template and would have thrown an error. + createIndexTemplate( + "demo-template", + "{" + + "\"index_patterns\": [ \"logs-*\" ]," + + "\"data_stream\": { }" + + "}" + ); + + // Data stream index template with a custom timestamp field name. + createIndexTemplate( + "demo-template", + "{" + + "\"index_patterns\": [ \"logs-*\" ]," + + "\"data_stream\": {" + + "\"timestamp_field\": { \"name\": \"created_at\" }" + + "}" + + "}" + ); + } + + public void testDeleteIndexTemplate() throws Exception { + createDataStreamIndexTemplate("demo-template", List.of("logs-*")); + createDataStream("logs-demo"); + + // Index template deletion should fail if there is a data stream using it. + ExecutionException exception = expectThrows(ExecutionException.class, () -> deleteIndexTemplate("demo-template")); + assertThat( + exception.getMessage(), + containsString("unable to remove composable templates [demo-template] as they are in use by a data streams") + ); + + // Index template can be deleted when all matching data streams are also deleted first. + deleteDataStreams("logs-demo"); + deleteIndexTemplate("demo-template"); + } + +} diff --git a/server/src/internalClusterTest/java/org/opensearch/action/admin/indices/datastream/DataStreamRolloverIT.java b/server/src/internalClusterTest/java/org/opensearch/action/admin/indices/datastream/DataStreamRolloverIT.java new file mode 100644 index 0000000000000..bac16dd4b7cc0 --- /dev/null +++ b/server/src/internalClusterTest/java/org/opensearch/action/admin/indices/datastream/DataStreamRolloverIT.java @@ -0,0 +1,63 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.admin.indices.datastream; + +import org.opensearch.action.admin.indices.rollover.RolloverResponse; +import org.opensearch.cluster.metadata.DataStream; +import org.opensearch.index.Index; + +import java.util.Collections; +import java.util.stream.Collectors; + +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.equalTo; + +public class DataStreamRolloverIT extends DataStreamTestCase { + + public void testDataStreamRollover() throws Exception { + createDataStreamIndexTemplate("demo-template", Collections.singletonList("logs-*")); + createDataStream("logs-demo"); + + DataStream dataStream; + GetDataStreamAction.Response.DataStreamInfo dataStreamInfo; + GetDataStreamAction.Response response; + + // Data stream before a rollover. + response = getDataStreams("logs-demo"); + dataStreamInfo = response.getDataStreams().get(0); + assertThat(dataStreamInfo.getIndexTemplate(), equalTo("demo-template")); + dataStream = dataStreamInfo.getDataStream(); + assertThat(dataStream.getGeneration(), equalTo(1L)); + assertThat(dataStream.getIndices().size(), equalTo(1)); + assertThat(dataStream.getTimeStampField(), equalTo(new DataStream.TimestampField("@timestamp"))); + assertThat( + dataStream.getIndices().stream().map(Index::getName).collect(Collectors.toList()), + containsInAnyOrder(".ds-logs-demo-000001") + ); + + // Perform a rollover. + RolloverResponse rolloverResponse = rolloverDataStream("logs-demo"); + assertThat(rolloverResponse.getOldIndex(), equalTo(".ds-logs-demo-000001")); + assertThat(rolloverResponse.getNewIndex(), equalTo(".ds-logs-demo-000002")); + + // Data stream after a rollover. + response = getDataStreams("logs-demo"); + dataStreamInfo = response.getDataStreams().get(0); + assertThat(dataStreamInfo.getIndexTemplate(), equalTo("demo-template")); + dataStream = dataStreamInfo.getDataStream(); + assertThat(dataStream.getGeneration(), equalTo(2L)); + assertThat(dataStream.getIndices().size(), equalTo(2)); + assertThat(dataStream.getTimeStampField(), equalTo(new DataStream.TimestampField("@timestamp"))); + assertThat( + dataStream.getIndices().stream().map(Index::getName).collect(Collectors.toList()), + containsInAnyOrder(".ds-logs-demo-000001", ".ds-logs-demo-000002") + ); + } + +} diff --git a/server/src/internalClusterTest/java/org/opensearch/action/admin/indices/datastream/DataStreamTestCase.java b/server/src/internalClusterTest/java/org/opensearch/action/admin/indices/datastream/DataStreamTestCase.java new file mode 100644 index 0000000000000..b083db1e6b7f4 --- /dev/null +++ b/server/src/internalClusterTest/java/org/opensearch/action/admin/indices/datastream/DataStreamTestCase.java @@ -0,0 +1,127 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.admin.indices.datastream; + +import org.opensearch.action.admin.indices.rollover.RolloverRequest; +import org.opensearch.action.admin.indices.rollover.RolloverResponse; +import org.opensearch.action.admin.indices.template.delete.DeleteComposableIndexTemplateAction; +import org.opensearch.action.admin.indices.template.put.PutComposableIndexTemplateAction; +import org.opensearch.action.support.master.AcknowledgedResponse; +import org.opensearch.cluster.metadata.ComposableIndexTemplate; +import org.opensearch.cluster.metadata.DataStream; +import org.opensearch.cluster.metadata.Template; +import org.opensearch.common.bytes.BytesArray; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.xcontent.XContentHelper; +import org.opensearch.common.xcontent.XContentParser; +import org.opensearch.common.xcontent.XContentType; +import org.opensearch.test.OpenSearchIntegTestCase; + +import java.util.List; +import java.util.stream.Collectors; + +import static org.hamcrest.Matchers.is; +import static org.opensearch.test.OpenSearchIntegTestCase.ClusterScope; +import static org.opensearch.test.OpenSearchIntegTestCase.Scope; + +@ClusterScope(scope = Scope.TEST, numDataNodes = 2) +public class DataStreamTestCase extends OpenSearchIntegTestCase { + + public AcknowledgedResponse createDataStream(String name) throws Exception { + CreateDataStreamAction.Request request = new CreateDataStreamAction.Request(name); + AcknowledgedResponse response = client().admin().indices().createDataStream(request).get(); + assertThat(response.isAcknowledged(), is(true)); + return response; + } + + public AcknowledgedResponse deleteDataStreams(String... names) throws Exception { + DeleteDataStreamAction.Request request = new DeleteDataStreamAction.Request(names); + AcknowledgedResponse response = client().admin().indices().deleteDataStream(request).get(); + assertThat(response.isAcknowledged(), is(true)); + return response; + } + + public GetDataStreamAction.Response getDataStreams(String... names) throws Exception { + GetDataStreamAction.Request request = new GetDataStreamAction.Request(names); + return client().admin().indices().getDataStreams(request).get(); + } + + public List getDataStreamsNames(String... names) throws Exception { + return getDataStreams(names) + .getDataStreams() + .stream() + .map(dsInfo -> dsInfo.getDataStream().getName()) + .collect(Collectors.toList()); + } + + public DataStreamsStatsAction.Response getDataStreamsStats(String... names) throws Exception { + DataStreamsStatsAction.Request request = new DataStreamsStatsAction.Request(); + request.indices(names); + return client().execute(DataStreamsStatsAction.INSTANCE, request).get(); + } + + public RolloverResponse rolloverDataStream(String name) throws Exception { + RolloverRequest request = new RolloverRequest(name, null); + RolloverResponse response = client().admin().indices().rolloverIndex(request).get(); + assertThat(response.isAcknowledged(), is(true)); + assertThat(response.isRolledOver(), is(true)); + return response; + } + + public AcknowledgedResponse createDataStreamIndexTemplate(String name, List indexPatterns) throws Exception { + return createDataStreamIndexTemplate(name, indexPatterns, "@timestamp"); + } + + public AcknowledgedResponse createDataStreamIndexTemplate(String name, + List indexPatterns, + String timestampFieldName) throws Exception { + ComposableIndexTemplate template = new ComposableIndexTemplate( + indexPatterns, + new Template( + Settings.builder().put("number_of_shards", 2).put("number_of_replicas", 1).build(), + null, + null + ), + null, + null, + null, + null, + new ComposableIndexTemplate.DataStreamTemplate(new DataStream.TimestampField(timestampFieldName)) + ); + + return createIndexTemplate(name, template); + } + + public AcknowledgedResponse createIndexTemplate(String name, String jsonContent) throws Exception { + XContentParser parser = XContentHelper.createParser( + xContentRegistry(), + null, + new BytesArray(jsonContent), + XContentType.JSON + ); + + return createIndexTemplate(name, ComposableIndexTemplate.parse(parser)); + } + + private AcknowledgedResponse createIndexTemplate(String name, ComposableIndexTemplate template) throws Exception { + PutComposableIndexTemplateAction.Request request = new PutComposableIndexTemplateAction.Request(name); + request.indexTemplate(template); + AcknowledgedResponse response = client().execute(PutComposableIndexTemplateAction.INSTANCE, request).get(); + assertThat(response.isAcknowledged(), is(true)); + return response; + } + + public AcknowledgedResponse deleteIndexTemplate(String name) throws Exception { + DeleteComposableIndexTemplateAction.Request request = new DeleteComposableIndexTemplateAction.Request(name); + AcknowledgedResponse response = client().execute(DeleteComposableIndexTemplateAction.INSTANCE, request).get(); + assertThat(response.isAcknowledged(), is(true)); + return response; + } + +} diff --git a/server/src/internalClusterTest/java/org/opensearch/action/admin/indices/datastream/DataStreamUsageIT.java b/server/src/internalClusterTest/java/org/opensearch/action/admin/indices/datastream/DataStreamUsageIT.java new file mode 100644 index 0000000000000..164444eb1f2de --- /dev/null +++ b/server/src/internalClusterTest/java/org/opensearch/action/admin/indices/datastream/DataStreamUsageIT.java @@ -0,0 +1,182 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.admin.indices.datastream; + +import org.opensearch.action.DocWriteRequest; +import org.opensearch.action.admin.indices.datastream.DataStreamsStatsAction.DataStreamStats; +import org.opensearch.action.index.IndexRequest; +import org.opensearch.action.index.IndexResponse; +import org.opensearch.cluster.metadata.DataStream; +import org.opensearch.common.collect.List; +import org.opensearch.common.xcontent.XContentFactory; +import org.opensearch.common.xcontent.XContentType; +import org.opensearch.rest.RestStatus; + +import java.util.Arrays; +import java.util.concurrent.ExecutionException; +import java.util.stream.Collectors; + +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; + +public class DataStreamUsageIT extends DataStreamTestCase { + + public void testDataStreamCrudAPIs() throws Exception { + // Data stream creation without a matching index template should fail. + ExecutionException exception = expectThrows( + ExecutionException.class, + () -> createDataStream("test-data-stream") + ); + assertThat(exception.getMessage(), containsString("no matching index template found for data stream")); + + // Create an index template for data streams. + createDataStreamIndexTemplate("data-stream-template", List.of("logs-*", "metrics-*", "events")); + + // Create multiple data streams matching the above index pattern. + createDataStream("logs-dev"); + createDataStream("logs-prod"); + createDataStream("metrics-prod"); + createDataStream("events"); + ensureGreen(); + + // Get all data streams. + assertThat(getDataStreamsNames(), containsInAnyOrder("logs-dev", "logs-prod", "metrics-prod", "events")); + assertThat(getDataStreamsNames("*"), containsInAnyOrder("logs-dev", "logs-prod", "metrics-prod", "events")); + + // Get data streams with and without wildcards. + assertThat(getDataStreamsNames("logs-*", "events"), containsInAnyOrder("logs-dev", "logs-prod", "events")); + + // Get data stream by name. + GetDataStreamAction.Response response = getDataStreams("logs-prod"); + assertThat(response.getDataStreams().size(), equalTo(1)); + DataStream dataStream = response.getDataStreams().get(0).getDataStream(); + assertThat(dataStream.getName(), equalTo("logs-prod")); + assertThat(dataStream.getIndices().size(), equalTo(1)); + assertThat(dataStream.getGeneration(), equalTo(1L)); + assertThat(dataStream.getTimeStampField(), equalTo(new DataStream.TimestampField("@timestamp"))); + + // Get data stream stats. + DataStreamsStatsAction.Response stats = getDataStreamsStats("*"); + assertThat(stats.getTotalShards(), equalTo(16)); // 4 data streams, 1 backing index per stream, 2 shards, 1 replica + assertThat(stats.getSuccessfulShards(), equalTo(16)); + assertThat(stats.getBackingIndices(), equalTo(4)); + assertThat(stats.getTotalStoreSize().getBytes(), greaterThan(0L)); + assertThat(stats.getDataStreams().length, equalTo(4)); + assertThat( + Arrays.stream(stats.getDataStreams()).map(DataStreamStats::getDataStream).collect(Collectors.toList()), + containsInAnyOrder("logs-dev", "logs-prod", "metrics-prod", "events") + ); + + // Delete multiple data streams at once; with and without wildcards. + deleteDataStreams("logs-*", "events"); + deleteDataStreams("metrics-prod"); + assertThat(getDataStreamsNames("*").size(), equalTo(0)); + } + + public void testDataStreamIndexDocumentsDefaultTimestampField() throws Exception { + assertDataStreamIndexDocuments("@timestamp"); + } + + public void testDataStreamIndexDocumentsCustomTimestampField() throws Exception { + assertDataStreamIndexDocuments("timestamp_" + randomAlphaOfLength(5)); + } + + public void assertDataStreamIndexDocuments(String timestampFieldName) throws Exception { + createDataStreamIndexTemplate("demo-template", List.of("logs-*"), timestampFieldName); + createDataStream("logs-demo"); + + Exception exception; + + // Only op_type=create requests should be allowed. + exception = expectThrows(Exception.class, () -> index( + new IndexRequest("logs-demo") + .id("doc-1") + .source("{}", XContentType.JSON) + )); + assertThat( + exception.getMessage(), + containsString("only write ops with an op_type of create are allowed in data streams") + ); + + // Documents must contain a valid timestamp field. + exception = expectThrows(Exception.class, () -> index( + new IndexRequest("logs-demo") + .id("doc-1") + .source("{}", XContentType.JSON) + .opType(DocWriteRequest.OpType.CREATE) + )); + assertThat( + exception.getMessage(), + containsString("documents must contain a single-valued timestamp field '" + timestampFieldName + "' of date type") + ); + + // The timestamp field cannot have multiple values. + exception = expectThrows(Exception.class, () -> index( + new IndexRequest("logs-demo") + .id("doc-1") + .opType(DocWriteRequest.OpType.CREATE) + .source( + XContentFactory + .jsonBuilder() + .startObject() + .array(timestampFieldName, "2020-12-06T11:04:05.000Z", "2020-12-07T11:04:05.000Z") + .field("message", "User registration successful") + .endObject() + ) + )); + assertThat( + exception.getMessage(), + containsString("documents must contain a single-valued timestamp field '" + timestampFieldName + "' of date type") + ); + + // Successful case. + IndexResponse response = index( + new IndexRequest("logs-demo") + .id("doc-1") + .opType(DocWriteRequest.OpType.CREATE) + .source( + XContentFactory + .jsonBuilder() + .startObject() + .field(timestampFieldName, "2020-12-06T11:04:05.000Z") + .field("message", "User registration successful") + .endObject() + ) + ); + assertThat(response.status(), equalTo(RestStatus.CREATED)); + assertThat(response.getId(), equalTo("doc-1")); + assertThat(response.getIndex(), equalTo(".ds-logs-demo-000001")); + + // Perform a rollover and ingest more documents. + rolloverDataStream("logs-demo"); + response = index( + new IndexRequest("logs-demo") + .id("doc-2") + .opType(DocWriteRequest.OpType.CREATE) + .source( + XContentFactory + .jsonBuilder() + .startObject() + .field(timestampFieldName, "2020-12-06T11:04:05.000Z") + .field("message", "User registration successful") + .endObject() + ) + ); + assertThat(response.status(), equalTo(RestStatus.CREATED)); + assertThat(response.getId(), equalTo("doc-2")); + assertThat(response.getIndex(), equalTo(".ds-logs-demo-000002")); + } + + private IndexResponse index(IndexRequest request) throws Exception { + return client().index(request).get(); + } + +} diff --git a/server/src/main/java/org/opensearch/index/mapper/DataStreamFieldMapper.java b/server/src/main/java/org/opensearch/index/mapper/DataStreamFieldMapper.java index 0730bfa3dd3e7..5a5e5db19190e 100644 --- a/server/src/main/java/org/opensearch/index/mapper/DataStreamFieldMapper.java +++ b/server/src/main/java/org/opensearch/index/mapper/DataStreamFieldMapper.java @@ -8,8 +8,7 @@ package org.opensearch.index.mapper; -import org.apache.lucene.document.LongPoint; -import org.apache.lucene.document.SortedNumericDocValuesField; +import org.apache.lucene.index.DocValuesType; import org.apache.lucene.index.IndexableField; import org.apache.lucene.search.Query; import org.opensearch.cluster.metadata.DataStream.TimestampField; @@ -138,10 +137,12 @@ public void postParse(ParseContext context) throws IOException { Document document = context.doc(); IndexableField[] fields = document.getFields(timestampField.getName()); - // Documents should contain exactly one value for the timestamp field. - if ((fields.length == 2 && - fields[0] instanceof LongPoint && - fields[1] instanceof SortedNumericDocValuesField) == false) { + // Documents must contain exactly one value for the timestamp field. + long numTimestampValues = Arrays.stream(fields) + .filter(field -> field.fieldType().docValuesType() == DocValuesType.SORTED_NUMERIC) + .count(); + + if (numTimestampValues != 1) { throw new IllegalArgumentException( "documents must contain a single-valued timestamp field '" + timestampField.getName() + "' of date type" ); diff --git a/server/src/test/java/org/opensearch/index/mapper/DataStreamFieldMapperTests.java b/server/src/test/java/org/opensearch/index/mapper/DataStreamFieldMapperTests.java new file mode 100644 index 0000000000000..4f0d088b788e5 --- /dev/null +++ b/server/src/test/java/org/opensearch/index/mapper/DataStreamFieldMapperTests.java @@ -0,0 +1,156 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.mapper; + +import org.opensearch.common.Strings; +import org.opensearch.common.bytes.BytesReference; +import org.opensearch.common.compress.CompressedXContent; +import org.opensearch.common.xcontent.XContentFactory; +import org.opensearch.common.xcontent.XContentType; +import org.opensearch.test.OpenSearchSingleNodeTestCase; + +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.equalTo; + +public class DataStreamFieldMapperTests extends OpenSearchSingleNodeTestCase { + + public void testDefaultTimestampField() throws Exception { + String mapping = Strings.toString(XContentFactory + .jsonBuilder() + .startObject() + .startObject("_doc") + .startObject("_data_stream_timestamp") + .field("enabled", true) + .endObject() + .endObject() + .endObject()); + + assertDataStreamFieldMapper(mapping, "@timestamp"); + } + + public void testCustomTimestampField() throws Exception { + String timestampFieldName = "timestamp_" + randomAlphaOfLength(5); + + String mapping = Strings.toString(XContentFactory + .jsonBuilder() + .startObject() + .startObject("_doc") + .startObject("_data_stream_timestamp") + .field("enabled", true) + .startObject("timestamp_field") + .field("name", timestampFieldName) + .endObject() + .endObject() + .endObject() + .endObject()); + + assertDataStreamFieldMapper(mapping, timestampFieldName); + } + + public void testDeeplyNestedCustomTimestampField() throws Exception { + String mapping = Strings.toString(XContentFactory + .jsonBuilder() + .startObject() + .startObject("_doc") + .startObject("_data_stream_timestamp") + .field("enabled", true) + .startObject("timestamp_field") + .field("name", "event.meta.created_at") + .endObject() + .endObject() + .endObject() + .endObject()); + + DocumentMapper mapper = createIndex("test") + .mapperService() + .merge("_doc", new CompressedXContent(mapping), MapperService.MergeReason.MAPPING_UPDATE); + + ParsedDocument doc = mapper.parse(new SourceToParse("test", "_doc", "1", BytesReference.bytes( + XContentFactory + .jsonBuilder() + .startObject() + .startObject("event") + .startObject("meta") + .field("created_at", "2020-12-06T11:04:05.000Z") + .endObject() + .endObject() + .endObject() + ), XContentType.JSON)); + assertThat(doc.rootDoc().getFields("event.meta.created_at").length, equalTo(2)); + + MapperException exception = expectThrows(MapperException.class, () -> { + mapper.parse(new SourceToParse("test", "_doc", "3", BytesReference.bytes( + XContentFactory + .jsonBuilder() + .startObject() + .startObject("event") + .startObject("meta") + .array("created_at", "2020-12-06T11:04:05.000Z", "2020-12-07T11:04:05.000Z") + .endObject() + .endObject() + .endObject() + ), XContentType.JSON)); + }); + assertThat( + exception.getCause().getMessage(), + containsString("documents must contain a single-valued timestamp field 'event.meta.created_at' of date type") + ); + } + + private void assertDataStreamFieldMapper(String mapping, String timestampFieldName) throws Exception { + DocumentMapper mapper = createIndex("test") + .mapperService() + .merge("_doc", new CompressedXContent(mapping), MapperService.MergeReason.MAPPING_UPDATE); + + // Success case - document has timestamp field correctly populated. + ParsedDocument doc = mapper.parse(new SourceToParse("test", "_doc", "1", BytesReference.bytes( + XContentFactory + .jsonBuilder() + .startObject() + .field(timestampFieldName, "2020-12-06T11:04:05.000Z") + .endObject() + ), XContentType.JSON)); + + // A valid timestamp field will be parsed as LongPoint and SortedNumericDocValuesField. + assertThat(doc.rootDoc().getFields(timestampFieldName).length, equalTo(2)); + + MapperException exception; + + // Failure case - document doesn't have a valid timestamp field. + exception = expectThrows(MapperException.class, () -> { + mapper.parse(new SourceToParse("test", "_doc", "2", BytesReference.bytes( + XContentFactory + .jsonBuilder() + .startObject() + .field("invalid-field-name", "2020-12-06T11:04:05.000Z") + .endObject() + ), XContentType.JSON)); + }); + assertThat( + exception.getCause().getMessage(), + containsString("documents must contain a single-valued timestamp field '" + timestampFieldName + "' of date type") + ); + + // Failure case - document contains multiple values for the timestamp field. + exception = expectThrows(MapperException.class, () -> { + mapper.parse(new SourceToParse("test", "_doc", "3", BytesReference.bytes( + XContentFactory + .jsonBuilder() + .startObject() + .array(timestampFieldName, "2020-12-06T11:04:05.000Z", "2020-12-07T11:04:05.000Z") + .endObject() + ), XContentType.JSON)); + }); + assertThat( + exception.getCause().getMessage(), + containsString("documents must contain a single-valued timestamp field '" + timestampFieldName + "' of date type") + ); + } + +}