Skip to content

Commit

Permalink
[7.x] Utility methods to add and remove backing indices from data str…
Browse files Browse the repository at this point in the history
…eams (elastic#77977)
  • Loading branch information
danhermann authored Sep 17, 2021
1 parent fbd1330 commit cc6d9e4
Show file tree
Hide file tree
Showing 2 changed files with 301 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.apache.lucene.index.PointValues;
import org.elasticsearch.cluster.AbstractDiffable;
import org.elasticsearch.cluster.Diff;
import org.elasticsearch.common.Strings;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.common.xcontent.ParseField;
import org.elasticsearch.common.io.stream.StreamInput;
Expand All @@ -36,6 +37,7 @@
import java.util.Map;
import java.util.Objects;
import java.util.function.LongSupplier;
import java.util.stream.Collectors;

public final class DataStream extends AbstractDiffable<DataStream> implements ToXContentObject {

Expand Down Expand Up @@ -185,8 +187,20 @@ public DataStream rollover(Metadata clusterMetadata, String writeIndexUuid, Vers
*
* @param index the backing index to remove
* @return new {@code DataStream} instance with the remaining backing indices
* @throws IllegalArgumentException if {@code index} is not a backing index or is the current write index of the data stream
*/
public DataStream removeBackingIndex(Index index) {
int backingIndexPosition = indices.indexOf(index);

if (backingIndexPosition == -1) {
throw new IllegalArgumentException(String.format(Locale.ROOT, "index [%s] is not part of data stream [%s]",
index.getName(), name));
}
if (generation == (backingIndexPosition + 1)) {
throw new IllegalArgumentException(String.format(Locale.ROOT, "cannot remove backing index [%s] of data stream [%s] because " +
"it is the write index", index.getName(), name));
}

List<Index> backingIndices = new ArrayList<>(indices);
backingIndices.remove(index);
assert backingIndices.size() == indices.size() - 1;
Expand All @@ -207,7 +221,7 @@ public DataStream replaceBackingIndex(Index existingBackingIndex, Index newBacki
List<Index> backingIndices = new ArrayList<>(indices);
int backingIndexPosition = backingIndices.indexOf(existingBackingIndex);
if (backingIndexPosition == -1) {
throw new IllegalArgumentException(String.format(Locale.ROOT, "index [%s] is not part of data stream [%s] ",
throw new IllegalArgumentException(String.format(Locale.ROOT, "index [%s] is not part of data stream [%s]",
existingBackingIndex.getName(), name));
}
if (generation == (backingIndexPosition + 1)) {
Expand All @@ -218,6 +232,53 @@ public DataStream replaceBackingIndex(Index existingBackingIndex, Index newBacki
return new DataStream(name, timeStampField, backingIndices, generation, metadata, hidden, replicated, system);
}

/**
* Adds the specified index as a backing index and returns a new {@code DataStream} instance with the new combination
* of backing indices.
*
* @param index index to add to the data stream
* @return new {@code DataStream} instance with the added backing index
* @throws IllegalArgumentException if {@code index} is ineligible to be a backing index for the data stream
*/
public DataStream addBackingIndex(Metadata clusterMetadata, Index index) {
// validate that index is not part of another data stream
final IndexAbstraction.DataStream parentDataStream = clusterMetadata.getIndicesLookup().get(index.getName()).getParentDataStream();
if (parentDataStream != null) {
if (parentDataStream.getDataStream().equals(this)) {
return this;
} else {
throw new IllegalArgumentException(
String.format(Locale.ROOT,
"cannot add index [%s] to data stream [%s] because it is already a backing index on data stream [%s]",
index.getName(),
getName(),
parentDataStream.getName()
)
);
}
}

// ensure that no aliases reference index
IndexMetadata im = clusterMetadata.getIndicesLookup().get(index.getName()).getWriteIndex();
if (im.getAliases().size() > 0) {
throw new IllegalArgumentException(
String.format(Locale.ROOT,
"cannot add index [%s] to data stream [%s] until its alias(es) [%s] are removed",
index.getName(),
getName(),
Strings.collectionToCommaDelimitedString(
im.getAliases().stream().map(Map.Entry::getKey).sorted().collect(Collectors.toList())
)
)
);
}

List<Index> backingIndices = new ArrayList<>(indices);
backingIndices.add(0, index);
assert backingIndices.size() == indices.size() + 1;
return new DataStream(name, timeStampField, backingIndices, generation + 1, metadata, hidden, replicated, system);
}

public DataStream promoteDataStream() {
return new DataStream(name, timeStampField, indices, getGeneration(), metadata, hidden, false, system, timeProvider);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
package org.elasticsearch.cluster.metadata;

import org.elasticsearch.Version;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.XContentParser;
Expand All @@ -16,6 +17,7 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
Expand Down Expand Up @@ -116,6 +118,243 @@ public void testRemoveBackingIndex() {
}
}

public void testRemoveBackingIndexThatDoesNotExist() {
int numBackingIndices = randomIntBetween(2, 32);
String dataStreamName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT);

List<Index> indices = new ArrayList<>(numBackingIndices);
for (int k = 1; k <= numBackingIndices; k++) {
indices.add(new Index(DataStream.getDefaultBackingIndexName(dataStreamName, k), UUIDs.randomBase64UUID(random())));
}
DataStream original = new DataStream(dataStreamName, createTimestampField("@timestamp"), indices);

final Index indexToRemove = new Index(randomAlphaOfLength(4), UUIDs.randomBase64UUID(random()));

IllegalArgumentException e = expectThrows(
IllegalArgumentException.class,
() -> original.removeBackingIndex(indexToRemove)
);
assertThat(
e.getMessage(),
equalTo(
String.format(
Locale.ROOT,
"index [%s] is not part of data stream [%s]",
indexToRemove.getName(),
dataStreamName)
)
);
}

public void testRemoveBackingWriteIndex() {
int numBackingIndices = randomIntBetween(2, 32);
String dataStreamName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT);

List<Index> indices = new ArrayList<>(numBackingIndices);
for (int k = 1; k <= numBackingIndices; k++) {
indices.add(new Index(DataStream.getDefaultBackingIndexName(dataStreamName, k), UUIDs.randomBase64UUID(random())));
}
DataStream original = new DataStream(dataStreamName, createTimestampField("@timestamp"), indices);

IllegalArgumentException e = expectThrows(
IllegalArgumentException.class,
() -> original.removeBackingIndex(indices.get(numBackingIndices - 1))
);
assertThat(
e.getMessage(),
equalTo(
String.format(
Locale.ROOT,
"cannot remove backing index [%s] of data stream [%s] because it is the write index",
indices.get(numBackingIndices - 1).getName(),
dataStreamName
)
)
);
}

public void testAddBackingIndex() {
int numBackingIndices = randomIntBetween(2, 32);
String dataStreamName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
final long epochMillis = System.currentTimeMillis();

List<Index> indices = new ArrayList<>(numBackingIndices);
for (int k = 1; k <= numBackingIndices; k++) {
indices.add(new Index(DataStream.getDefaultBackingIndexName(dataStreamName, k, epochMillis), UUIDs.randomBase64UUID(random())));
}

Metadata.Builder builder = Metadata.builder();
for (int k = 1; k <= numBackingIndices; k++) {
IndexMetadata im = IndexMetadata.builder(indices.get(k - 1).getName())
.settings(settings(Version.CURRENT))
.numberOfShards(1)
.numberOfReplicas(1)
.build();
builder.put(im, false);
}
DataStream original = new DataStream(dataStreamName, createTimestampField("@timestamp"), indices);
builder.put(original);
Index indexToAdd = new Index(randomAlphaOfLength(4), UUIDs.randomBase64UUID(random()));
builder.put(
IndexMetadata
.builder(indexToAdd.getName())
.settings(settings(Version.CURRENT))
.numberOfShards(1)
.numberOfReplicas(1)
.build(),
false
);

DataStream updated = original.addBackingIndex(builder.build(), indexToAdd);
assertThat(updated.getName(), equalTo(original.getName()));
assertThat(updated.getGeneration(), equalTo(original.getGeneration() + 1));
assertThat(updated.getTimeStampField(), equalTo(original.getTimeStampField()));
assertThat(updated.getIndices().size(), equalTo(numBackingIndices + 1));
for (int k = 1; k <= numBackingIndices; k++) {
assertThat(updated.getIndices().get(k), equalTo(original.getIndices().get(k - 1)));
}
assertThat(updated.getIndices().get(0), equalTo(indexToAdd));
}

public void testAddBackingIndexThatIsPartOfAnotherDataStream() {
int numBackingIndices = randomIntBetween(2, 32);
final String dsName1 = randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
final String dsName2 = randomAlphaOfLength(10).toLowerCase(Locale.ROOT);

final long epochMillis = System.currentTimeMillis();

List<Index> indices1 = new ArrayList<>(numBackingIndices);
List<Index> indices2 = new ArrayList<>(numBackingIndices);
for (int k = 1; k <= numBackingIndices; k++) {
indices1.add(new Index(DataStream.getDefaultBackingIndexName(dsName1, k, epochMillis), UUIDs.randomBase64UUID(random())));
indices2.add(new Index(DataStream.getDefaultBackingIndexName(dsName2, k, epochMillis), UUIDs.randomBase64UUID(random())));
}

Metadata.Builder builder = Metadata.builder();
for (int k = 1; k <= numBackingIndices; k++) {
IndexMetadata im = IndexMetadata.builder(indices1.get(k - 1).getName())
.settings(settings(Version.CURRENT))
.numberOfShards(1)
.numberOfReplicas(1)
.build();
builder.put(im, false);
im = IndexMetadata.builder(indices2.get(k - 1).getName())
.settings(settings(Version.CURRENT))
.numberOfShards(1)
.numberOfReplicas(1)
.build();
builder.put(im, false);
}
DataStream ds1 = new DataStream(dsName1, createTimestampField("@timestamp"), indices1);
DataStream ds2 = new DataStream(dsName2, createTimestampField("@timestamp"), indices2);
builder.put(ds1);
builder.put(ds2);

Index indexToAdd = randomFrom(indices2.toArray(Index.EMPTY_ARRAY));

IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> ds1.addBackingIndex(builder.build(), indexToAdd));
assertThat(
e.getMessage(),
equalTo(
String.format(
Locale.ROOT,
"cannot add index [%s] to data stream [%s] because it is already a backing index on data stream [%s]",
indexToAdd.getName(),
ds1.getName(),
ds2.getName()
)
)
);
}

public void testAddExistingBackingIndex() {
int numBackingIndices = randomIntBetween(2, 32);
String dataStreamName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
final long epochMillis = System.currentTimeMillis();

List<Index> indices = new ArrayList<>(numBackingIndices);
for (int k = 1; k <= numBackingIndices; k++) {
indices.add(new Index(DataStream.getDefaultBackingIndexName(dataStreamName, k, epochMillis), UUIDs.randomBase64UUID(random())));
}

Metadata.Builder builder = Metadata.builder();
for (int k = 1; k <= numBackingIndices; k++) {
IndexMetadata im = IndexMetadata.builder(indices.get(k - 1).getName())
.settings(settings(Version.CURRENT))
.numberOfShards(1)
.numberOfReplicas(1)
.build();
builder.put(im, false);
}
DataStream original = new DataStream(dataStreamName, createTimestampField("@timestamp"), indices);
builder.put(original);
Index indexToAdd = randomFrom(indices.toArray(Index.EMPTY_ARRAY));

DataStream updated = original.addBackingIndex(builder.build(), indexToAdd);
assertThat(updated.getName(), equalTo(original.getName()));
assertThat(updated.getGeneration(), equalTo(original.getGeneration()));
assertThat(updated.getTimeStampField(), equalTo(original.getTimeStampField()));
assertThat(updated.getIndices().size(), equalTo(numBackingIndices));
for (int k = 0; k < numBackingIndices; k++) {
assertThat(updated.getIndices().get(k), equalTo(original.getIndices().get(k)));
}
}

public void testAddBackingIndexWithAliases() {
int numBackingIndices = randomIntBetween(2, 32);
String dataStreamName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
final long epochMillis = System.currentTimeMillis();

List<Index> indices = new ArrayList<>(numBackingIndices);
for (int k = 1; k <= numBackingIndices; k++) {
indices.add(new Index(DataStream.getDefaultBackingIndexName(dataStreamName, k, epochMillis), UUIDs.randomBase64UUID(random())));
}

Metadata.Builder builder = Metadata.builder();
for (int k = 1; k <= numBackingIndices; k++) {
IndexMetadata im = IndexMetadata.builder(indices.get(k - 1).getName())
.settings(settings(Version.CURRENT))
.numberOfShards(1)
.numberOfReplicas(1)
.build();
builder.put(im, false);
}
DataStream original = new DataStream(dataStreamName, createTimestampField("@timestamp"), indices);
builder.put(original);

Index indexToAdd = new Index(randomAlphaOfLength(4), UUIDs.randomBase64UUID(random()));
IndexMetadata.Builder b = IndexMetadata
.builder(indexToAdd.getName())
.settings(settings(Version.CURRENT))
.numberOfShards(1)
.numberOfReplicas(1);
final int numAliases = randomIntBetween(1, 3);
final String[] aliasNames = new String[numAliases];
for (int k = 0; k < numAliases; k++) {
aliasNames[k] = randomAlphaOfLength(6);
b.putAlias(AliasMetadata.builder(aliasNames[k]));
}
builder.put(b.build(), false);
Arrays.sort(aliasNames);

IllegalArgumentException e = expectThrows(
IllegalArgumentException.class,
() -> original.addBackingIndex(builder.build(), indexToAdd)
);
assertThat(
e.getMessage(),
equalTo(
String.format(
Locale.ROOT,
"cannot add index [%s] to data stream [%s] until its alias(es) [%s] are removed",
indexToAdd.getName(),
original.getName(),
Strings.arrayToCommaDelimitedString(aliasNames)
)
)
);
}

public void testDefaultBackingIndexName() {
// this test does little more than flag that changing the default naming convention for backing indices
// will also require changing a lot of hard-coded values in REST tests and docs
Expand Down

0 comments on commit cc6d9e4

Please sign in to comment.