Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

API for adding and removing indices from a data stream #79279

Merged
merged 15 commits into from
Oct 19, 2021
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
{
"indices.modify_data_stream":{
"documentation":{
"url":"https://www.elastic.co/guide/en/elasticsearch/reference/master/data-streams.html",
"description":"Creates a data stream"
},
"stability":"stable",
"visibility":"public",
"headers":{
"accept": [ "application/json"]
},
"url":{
"paths":[
{
"path":"/_data_stream/_modify",
"methods":["POST"]
}
]
},
"params":{
},
"body":{
"description":"The data stream modifications",
"required":true
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ public DataStream removeBackingIndex(Index index) {
index.getName(), name
));
}
if (generation == (backingIndexPosition + 1)) {
if (indices.size() == (backingIndexPosition + 1)) {
throw new IllegalArgumentException(String.format(
Locale.ROOT,
"cannot remove backing index [%s] of data stream [%s] because it is the write index",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,73 +9,173 @@
package org.elasticsearch.cluster.metadata;

import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.xcontent.ConstructingObjectParser;
import org.elasticsearch.xcontent.ObjectParser;
import org.elasticsearch.xcontent.ParseField;
import org.elasticsearch.xcontent.ToXContentObject;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.XContentParser;

import java.io.IOException;
import java.util.function.Supplier;

import static org.elasticsearch.xcontent.ConstructingObjectParser.optionalConstructorArg;

/**
* Operations on data streams
* Operations on data streams. Currently supports adding and removing backing indices.
*/
public abstract class DataStreamAction {
private final String dataStream;
public class DataStreamAction implements Writeable, ToXContentObject {

private static final ParseField DATA_STREAM = new ParseField("data_stream");
private static final ParseField INDEX = new ParseField("index");

private static final ParseField ADD_BACKING_INDEX = new ParseField("add_backing_index");
private static final ParseField REMOVE_BACKING_INDEX = new ParseField("remove_backing_index");

public enum Type {
ADD_BACKING_INDEX((byte) 0, DataStreamAction.ADD_BACKING_INDEX),
REMOVE_BACKING_INDEX((byte) 1, DataStreamAction.REMOVE_BACKING_INDEX);

private final byte value;
private final String fieldName;

Type(byte value, ParseField field) {
this.value = value;
this.fieldName = field.getPreferredName();
}

public byte value() {
return value;
}

public static Type fromValue(byte value) {
switch (value) {
case 0: return ADD_BACKING_INDEX;
case 1: return REMOVE_BACKING_INDEX;
default: throw new IllegalArgumentException("no data stream action type for [" + value + "]");
}
}
}

private final Type type;
private String dataStream;
private String index;
Comment on lines +65 to +66
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this is not really big enough to be a builder class, I think we should make these final (it's not too hard for a user to pass them in the constructor)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I originally had all the members as final but I believe they have to be mutable so the parser is able to construct them here.


public static DataStreamAction addBackingIndex(String dataStream, String index) {
return new DataStreamAction.AddBackingIndex(dataStream, index);
return new DataStreamAction(Type.ADD_BACKING_INDEX, dataStream, index);
}

public static DataStreamAction removeBackingIndex(String dataStream, String index) {
return new DataStreamAction.RemoveBackingIndex(dataStream, index);
return new DataStreamAction(Type.REMOVE_BACKING_INDEX, dataStream, index);
}

DataStreamAction(StreamInput in) throws IOException {
this.type = Type.fromValue(in.readByte());
this.dataStream = in.readString();
this.index = in.readString();
}

private DataStreamAction(String dataStream) {
private DataStreamAction(Type type, String dataStream, String index) {
if (false == Strings.hasText(dataStream)) {
throw new IllegalArgumentException("[data_stream] is required");
}
if (false == Strings.hasText(index)) {
throw new IllegalArgumentException("[index] is required");
}
this.type = type;
this.dataStream = dataStream;
this.index = index;
}

DataStreamAction(Type type) {
this.type = type;
}

/**
* Data stream on which the operation should act
*/
public String getDataStream() {
return dataStream;
}

public static class AddBackingIndex extends DataStreamAction {

private final String index;
public void setDataStream(String datastream) {
this.dataStream = datastream;
}

private AddBackingIndex(String dataStream, String index) {
super(dataStream);
public String getIndex() {
return index;
}

if (false == Strings.hasText(index)) {
throw new IllegalArgumentException("[index] is required");
}
public void setIndex(String index) {
this.index = index;
}

this.index = index;
}
public Type getType() {
return type;
}

public String getIndex() {
return index;
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.startObject(type.fieldName);
builder.field(DATA_STREAM.getPreferredName(), dataStream);
builder.field(INDEX.getPreferredName(), index);
builder.endObject();
builder.endObject();
return builder;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeByte(type.value());
out.writeString(dataStream);
out.writeString(index);
}

public static class RemoveBackingIndex extends DataStreamAction {
public static DataStreamAction fromXContent(XContentParser parser) throws IOException {
return PARSER.apply(parser, null);
}

private final String index;
private static final ObjectParser<DataStreamAction, Void> ADD_BACKING_INDEX_PARSER = parser(
ADD_BACKING_INDEX.getPreferredName(),
() -> new DataStreamAction(Type.ADD_BACKING_INDEX)
);
private static final ObjectParser<DataStreamAction, Void> REMOVE_BACKING_INDEX_PARSER = parser(
REMOVE_BACKING_INDEX.getPreferredName(),
() -> new DataStreamAction(Type.REMOVE_BACKING_INDEX)
);
static {
ADD_BACKING_INDEX_PARSER.declareField(DataStreamAction::setDataStream, XContentParser::text, DATA_STREAM,
ObjectParser.ValueType.STRING);
ADD_BACKING_INDEX_PARSER.declareField(DataStreamAction::setIndex, XContentParser::text, INDEX, ObjectParser.ValueType.STRING);
REMOVE_BACKING_INDEX_PARSER.declareField(DataStreamAction::setDataStream, XContentParser::text, DATA_STREAM,
ObjectParser.ValueType.STRING);
REMOVE_BACKING_INDEX_PARSER.declareField(DataStreamAction::setIndex, XContentParser::text, INDEX, ObjectParser.ValueType.STRING);
}

private RemoveBackingIndex(String dataStream, String index) {
super(dataStream);
private static ObjectParser<DataStreamAction, Void> parser(String name, Supplier<DataStreamAction> supplier) {
ObjectParser<DataStreamAction, Void> parser = new ObjectParser<>(name, supplier);
return parser;
}

if (false == Strings.hasText(index)) {
throw new IllegalArgumentException("[index] is required");
public static final ConstructingObjectParser<DataStreamAction, Void> PARSER = new ConstructingObjectParser<>(
"data_stream_action", a -> {
// Take the first action and error if there is more than one action
DataStreamAction action = null;
for (Object o : a) {
if (o != null) {
if (action == null) {
action = (DataStreamAction) o;
} else {
throw new IllegalArgumentException("too many data stream operations declared on operation entry");
}
}

this.index = index;
}

public String getIndex() {
return index;
}

return action;
});
static {
PARSER.declareObject(optionalConstructorArg(), ADD_BACKING_INDEX_PARSER, ADD_BACKING_INDEX);
PARSER.declareObject(optionalConstructorArg(), REMOVE_BACKING_INDEX_PARSER, REMOVE_BACKING_INDEX);
}

}
Loading