Skip to content

Commit

Permalink
Added more integration tests. Improved timestamp field validation logic.
Browse files Browse the repository at this point in the history
Signed-off-by: Ketan Verma <ketan9495@gmail.com>
  • Loading branch information
ketanv3 committed May 12, 2021
1 parent d9a3517 commit d9b2521
Show file tree
Hide file tree
Showing 6 changed files with 593 additions and 6 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.action.admin.indices.datastream;

import org.opensearch.common.collect.List;

import java.util.concurrent.ExecutionException;

import static org.hamcrest.Matchers.containsString;

public class DataStreamIndexTemplateIT extends DataStreamTestCase {

public void testCreateDataStreamIndexTemplate() throws Exception {
// Without the data stream metadata field mapper, data_stream would have been an unknown field in
// the index template and would have thrown an error.
createIndexTemplate(
"demo-template",
"{" +
"\"index_patterns\": [ \"logs-*\" ]," +
"\"data_stream\": { }" +
"}"
);

// Data stream index template with a custom timestamp field name.
createIndexTemplate(
"demo-template",
"{" +
"\"index_patterns\": [ \"logs-*\" ]," +
"\"data_stream\": {" +
"\"timestamp_field\": { \"name\": \"created_at\" }" +
"}" +
"}"
);
}

public void testDeleteIndexTemplate() throws Exception {
createDataStreamIndexTemplate("demo-template", List.of("logs-*"));
createDataStream("logs-demo");

// Index template deletion should fail if there is a data stream using it.
ExecutionException exception = expectThrows(ExecutionException.class, () -> deleteIndexTemplate("demo-template"));
assertThat(
exception.getMessage(),
containsString("unable to remove composable templates [demo-template] as they are in use by a data streams")
);

// Index template can be deleted when all matching data streams are also deleted first.
deleteDataStreams("logs-demo");
deleteIndexTemplate("demo-template");
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.action.admin.indices.datastream;

import org.opensearch.action.admin.indices.rollover.RolloverResponse;
import org.opensearch.cluster.metadata.DataStream;
import org.opensearch.index.Index;

import java.util.Collections;
import java.util.stream.Collectors;

import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.equalTo;

public class DataStreamRolloverIT extends DataStreamTestCase {

public void testDataStreamRollover() throws Exception {
createDataStreamIndexTemplate("demo-template", Collections.singletonList("logs-*"));
createDataStream("logs-demo");

DataStream dataStream;
GetDataStreamAction.Response.DataStreamInfo dataStreamInfo;
GetDataStreamAction.Response response;

// Data stream before a rollover.
response = getDataStreams("logs-demo");
dataStreamInfo = response.getDataStreams().get(0);
assertThat(dataStreamInfo.getIndexTemplate(), equalTo("demo-template"));
dataStream = dataStreamInfo.getDataStream();
assertThat(dataStream.getGeneration(), equalTo(1L));
assertThat(dataStream.getIndices().size(), equalTo(1));
assertThat(dataStream.getTimeStampField(), equalTo(new DataStream.TimestampField("@timestamp")));
assertThat(
dataStream.getIndices().stream().map(Index::getName).collect(Collectors.toList()),
containsInAnyOrder(".ds-logs-demo-000001")
);

// Perform a rollover.
RolloverResponse rolloverResponse = rolloverDataStream("logs-demo");
assertThat(rolloverResponse.getOldIndex(), equalTo(".ds-logs-demo-000001"));
assertThat(rolloverResponse.getNewIndex(), equalTo(".ds-logs-demo-000002"));

// Data stream after a rollover.
response = getDataStreams("logs-demo");
dataStreamInfo = response.getDataStreams().get(0);
assertThat(dataStreamInfo.getIndexTemplate(), equalTo("demo-template"));
dataStream = dataStreamInfo.getDataStream();
assertThat(dataStream.getGeneration(), equalTo(2L));
assertThat(dataStream.getIndices().size(), equalTo(2));
assertThat(dataStream.getTimeStampField(), equalTo(new DataStream.TimestampField("@timestamp")));
assertThat(
dataStream.getIndices().stream().map(Index::getName).collect(Collectors.toList()),
containsInAnyOrder(".ds-logs-demo-000001", ".ds-logs-demo-000002")
);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.action.admin.indices.datastream;

import org.opensearch.action.admin.indices.rollover.RolloverRequest;
import org.opensearch.action.admin.indices.rollover.RolloverResponse;
import org.opensearch.action.admin.indices.template.delete.DeleteComposableIndexTemplateAction;
import org.opensearch.action.admin.indices.template.put.PutComposableIndexTemplateAction;
import org.opensearch.action.support.master.AcknowledgedResponse;
import org.opensearch.cluster.metadata.ComposableIndexTemplate;
import org.opensearch.cluster.metadata.DataStream;
import org.opensearch.cluster.metadata.Template;
import org.opensearch.common.bytes.BytesArray;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.xcontent.XContentHelper;
import org.opensearch.common.xcontent.XContentParser;
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.test.OpenSearchIntegTestCase;

import java.util.List;
import java.util.stream.Collectors;

import static org.hamcrest.Matchers.is;
import static org.opensearch.test.OpenSearchIntegTestCase.ClusterScope;
import static org.opensearch.test.OpenSearchIntegTestCase.Scope;

@ClusterScope(scope = Scope.TEST, numDataNodes = 2)
public class DataStreamTestCase extends OpenSearchIntegTestCase {

public AcknowledgedResponse createDataStream(String name) throws Exception {
CreateDataStreamAction.Request request = new CreateDataStreamAction.Request(name);
AcknowledgedResponse response = client().admin().indices().createDataStream(request).get();
assertThat(response.isAcknowledged(), is(true));
return response;
}

public AcknowledgedResponse deleteDataStreams(String... names) throws Exception {
DeleteDataStreamAction.Request request = new DeleteDataStreamAction.Request(names);
AcknowledgedResponse response = client().admin().indices().deleteDataStream(request).get();
assertThat(response.isAcknowledged(), is(true));
return response;
}

public GetDataStreamAction.Response getDataStreams(String... names) throws Exception {
GetDataStreamAction.Request request = new GetDataStreamAction.Request(names);
return client().admin().indices().getDataStreams(request).get();
}

public List<String> getDataStreamsNames(String... names) throws Exception {
return getDataStreams(names)
.getDataStreams()
.stream()
.map(dsInfo -> dsInfo.getDataStream().getName())
.collect(Collectors.toList());
}

public DataStreamsStatsAction.Response getDataStreamsStats(String... names) throws Exception {
DataStreamsStatsAction.Request request = new DataStreamsStatsAction.Request();
request.indices(names);
return client().execute(DataStreamsStatsAction.INSTANCE, request).get();
}

public RolloverResponse rolloverDataStream(String name) throws Exception {
RolloverRequest request = new RolloverRequest(name, null);
RolloverResponse response = client().admin().indices().rolloverIndex(request).get();
assertThat(response.isAcknowledged(), is(true));
assertThat(response.isRolledOver(), is(true));
return response;
}

public AcknowledgedResponse createDataStreamIndexTemplate(String name, List<String> indexPatterns) throws Exception {
return createDataStreamIndexTemplate(name, indexPatterns, "@timestamp");
}

public AcknowledgedResponse createDataStreamIndexTemplate(String name,
List<String> indexPatterns,
String timestampFieldName) throws Exception {
ComposableIndexTemplate template = new ComposableIndexTemplate(
indexPatterns,
new Template(
Settings.builder().put("number_of_shards", 2).put("number_of_replicas", 1).build(),
null,
null
),
null,
null,
null,
null,
new ComposableIndexTemplate.DataStreamTemplate(new DataStream.TimestampField(timestampFieldName))
);

return createIndexTemplate(name, template);
}

public AcknowledgedResponse createIndexTemplate(String name, String jsonContent) throws Exception {
XContentParser parser = XContentHelper.createParser(
xContentRegistry(),
null,
new BytesArray(jsonContent),
XContentType.JSON
);

return createIndexTemplate(name, ComposableIndexTemplate.parse(parser));
}

private AcknowledgedResponse createIndexTemplate(String name, ComposableIndexTemplate template) throws Exception {
PutComposableIndexTemplateAction.Request request = new PutComposableIndexTemplateAction.Request(name);
request.indexTemplate(template);
AcknowledgedResponse response = client().execute(PutComposableIndexTemplateAction.INSTANCE, request).get();
assertThat(response.isAcknowledged(), is(true));
return response;
}

public AcknowledgedResponse deleteIndexTemplate(String name) throws Exception {
DeleteComposableIndexTemplateAction.Request request = new DeleteComposableIndexTemplateAction.Request(name);
AcknowledgedResponse response = client().execute(DeleteComposableIndexTemplateAction.INSTANCE, request).get();
assertThat(response.isAcknowledged(), is(true));
return response;
}

}
Loading

0 comments on commit d9b2521

Please sign in to comment.