Skip to content

Commit

Permalink
Add cluster state checksum in manifest (opensearch-project#15218)
Browse files Browse the repository at this point in the history
* Add cluster state checksum in manifest for remote state and routing table publication

Signed-off-by: Himshikha Gupta <himshikh@amazon.com>
Co-authored-by: Bukhtawar Khan <bukhtawa@amazon.com>
  • Loading branch information
2 people authored and Himshikha Gupta committed Sep 4, 2024
1 parent 6194bbd commit 57168df
Show file tree
Hide file tree
Showing 38 changed files with 2,076 additions and 59 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- [Remote Publication] Add remote download stats ([#15291](https://github.com/opensearch-project/OpenSearch/pull/15291)))
- Add canRemain method to TargetPoolAllocationDecider to move shards from local to remote pool for hot to warm tiering ([#15010](https://github.com/opensearch-project/OpenSearch/pull/15010))
- Add support for pluggable deciders for concurrent search ([#15363](https://github.com/opensearch-project/OpenSearch/pull/15363))
- [Remote Publication] Added checksum validation for cluster state behind a cluster setting ([#15218](https://github.com/opensearch-project/OpenSearch/pull/15218))

### Dependencies
- Bump `netty` from 4.1.111.Final to 4.1.112.Final ([#15081](https://github.com/opensearch-project/OpenSearch/pull/15081))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -633,7 +633,7 @@ public final <K, V> void writeMapOfLists(final Map<K, List<V>> map, final Writer
* @param keyWriter The key writer
* @param valueWriter The value writer
*/
public final <K, V> void writeMap(final Map<K, V> map, final Writer<K> keyWriter, final Writer<V> valueWriter) throws IOException {
public <K, V> void writeMap(final Map<K, V> map, final Writer<K> keyWriter, final Writer<V> valueWriter) throws IOException {
writeVInt(map.size());
for (final Map.Entry<K, V> entry : map.entrySet()) {
keyWriter.write(this, entry.getKey());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.core.common.io.stream;

import java.io.IOException;

/**
* Provides a method for serialization which will give ordered stream, creating same byte array on every invocation.
* This should be invoked with a stream that provides ordered serialization.
*/
public interface VerifiableWriteable extends Writeable {

void writeVerifiableTo(StreamOutput out) throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ protected Settings nodeSettings(int nodeOrdinal) {
)
.put("node.attr." + REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY, REMOTE_ROUTING_TABLE_REPO)
.put(REMOTE_PUBLICATION_EXPERIMENTAL, true)
.put(RemoteClusterStateService.REMOTE_CLUSTER_STATE_CHECKSUM_VALIDATION_ENABLED_SETTING.getKey(), true)
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ protected Settings nodeSettings(int nodeOrdinal) {
.put("node.attr." + REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY, routingTableRepoName)
.put(routingTableRepoTypeAttributeKey, ReloadableFsRepository.TYPE)
.put(routingTableRepoSettingsAttributeKeyPrefix + "location", segmentRepoPath)
.put(RemoteClusterStateService.REMOTE_CLUSTER_STATE_CHECKSUM_VALIDATION_ENABLED_SETTING.getKey(), true)
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,11 @@ private static class CompleteDiff<T extends Diffable<T>> implements Diff<T> {
this.part = part;
}

@Override
public String toString() {
return "CompleteDiff{" + "part=" + part + '}';
}

/**
* Creates simple diff without changes
*/
Expand Down
30 changes: 29 additions & 1 deletion server/src/main/java/org/opensearch/cluster/ClusterState.java
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ default boolean isPrivate() {

}

private static final NamedDiffableValueSerializer<Custom> CUSTOM_VALUE_SERIALIZER = new NamedDiffableValueSerializer<>(Custom.class);
public static final NamedDiffableValueSerializer<Custom> CUSTOM_VALUE_SERIALIZER = new NamedDiffableValueSerializer<>(Custom.class);

public static final String UNKNOWN_UUID = "_na_";

Expand Down Expand Up @@ -839,6 +839,34 @@ private static class ClusterStateDiff implements Diff<ClusterState> {
minimumClusterManagerNodesOnPublishingClusterManager = after.minimumClusterManagerNodesOnPublishingClusterManager;
}

@Override
public String toString() {
return new StringBuilder().append("ClusterStateDiff{toVersion=")
.append(toVersion)
.append(", fromUuid='")
.append(fromUuid)
.append('\'')
.append(", toUuid='")
.append(toUuid)
.append('\'')
.append(", clusterName=")
.append(clusterName)
.append(", routingTable=")
.append(routingTable)
.append(", nodes=")
.append(nodes)
.append(", metadata=")
.append(metadata)
.append(", blocks=")
.append(blocks)
.append(", customs=")
.append(customs)
.append(", minimumClusterManagerNodesOnPublishingClusterManager=")
.append(minimumClusterManagerNodesOnPublishingClusterManager)
.append("}")
.toString();
}

ClusterStateDiff(StreamInput in, DiscoveryNode localNode) throws IOException {
clusterName = new ClusterName(in);
fromUuid = in.readString();
Expand Down
12 changes: 12 additions & 0 deletions server/src/main/java/org/opensearch/cluster/DiffableUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,18 @@ public Map<K, T> getUpserts() {
return upserts;
}

@Override
public String toString() {
return new StringBuilder().append("MapDiff{deletes=")
.append(deletes)
.append(", diffs=")
.append(diffs)
.append(", upserts=")
.append(upserts)
.append("}")
.toString();
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeCollection(deletes, (o, v) -> keySerializer.writeKey(v, o));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@
* @opensearch.api
*/
@PublicApi(since = "1.0.0")
public class ClusterBlock implements Writeable, ToXContentFragment {
public class ClusterBlock implements Writeable, ToXContentFragment, Comparable<ClusterBlock> {

private final int id;
@Nullable
Expand Down Expand Up @@ -217,7 +217,13 @@ public int hashCode() {
return Objects.hash(id, uuid);
}

@Override
public int compareTo(ClusterBlock block) {
return Integer.compare(block.id(), this.id());
}

public boolean isAllowReleaseResources() {
return allowReleaseResources;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.opensearch.common.util.set.Sets;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.common.io.stream.VerifiableWriteable;
import org.opensearch.core.rest.RestStatus;

import java.io.IOException;
Expand All @@ -62,7 +63,7 @@
* @opensearch.api
*/
@PublicApi(since = "1.0.0")
public class ClusterBlocks extends AbstractDiffable<ClusterBlocks> {
public class ClusterBlocks extends AbstractDiffable<ClusterBlocks> implements VerifiableWriteable {
public static final ClusterBlocks EMPTY_CLUSTER_BLOCK = new ClusterBlocks(emptySet(), Map.of());

private final Set<ClusterBlock> global;
Expand Down Expand Up @@ -303,6 +304,11 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeMap(indicesBlocks, StreamOutput::writeString, (o, s) -> writeBlockSet(s, o));
}

@Override
public void writeVerifiableTo(StreamOutput out) throws IOException {
writeTo(out);
}

private static void writeBlockSet(Set<ClusterBlock> blocks, StreamOutput out) throws IOException {
out.writeCollection(blocks);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.opensearch.core.ParseField;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.common.io.stream.VerifiableWriteable;
import org.opensearch.core.common.io.stream.Writeable;
import org.opensearch.core.xcontent.ConstructingObjectParser;
import org.opensearch.core.xcontent.ToXContentFragment;
Expand All @@ -59,7 +60,7 @@
* @opensearch.api
*/
@PublicApi(since = "1.0.0")
public class CoordinationMetadata implements Writeable, ToXContentFragment {
public class CoordinationMetadata implements VerifiableWriteable, ToXContentFragment {

public static final CoordinationMetadata EMPTY_METADATA = builder().build();

Expand Down Expand Up @@ -149,6 +150,11 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeCollection(votingConfigExclusions);
}

@Override
public void writeVerifiableTo(StreamOutput out) throws IOException {
writeTo(out);
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
return builder.field(TERM_PARSE_FIELD.getPreferredName(), term)
Expand Down Expand Up @@ -272,7 +278,7 @@ public CoordinationMetadata build() {
* @opensearch.api
*/
@PublicApi(since = "1.0.0")
public static class VotingConfigExclusion implements Writeable, ToXContentFragment {
public static class VotingConfigExclusion implements Writeable, ToXContentFragment, Comparable<VotingConfigExclusion> {
public static final String MISSING_VALUE_MARKER = "_absent_";
private final String nodeId;
private final String nodeName;
Expand Down Expand Up @@ -361,6 +367,10 @@ public String toString() {
return sb.toString();
}

@Override
public int compareTo(VotingConfigExclusion votingConfigExclusion) {
return votingConfigExclusion.getNodeId().compareTo(this.getNodeId());
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import org.opensearch.core.common.Strings;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.common.io.stream.VerifiableWriteable;
import org.opensearch.core.common.io.stream.Writeable;
import org.opensearch.core.index.Index;
import org.opensearch.core.index.shard.ShardId;
Expand All @@ -69,6 +70,7 @@
import org.opensearch.index.IndexModule;
import org.opensearch.index.mapper.MapperService;
import org.opensearch.index.seqno.SequenceNumbers;
import org.opensearch.index.translog.BufferedChecksumStreamOutput;
import org.opensearch.indices.replication.SegmentReplicationSource;
import org.opensearch.indices.replication.common.ReplicationType;

Expand All @@ -88,6 +90,7 @@
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.TreeSet;
import java.util.function.Function;

import static org.opensearch.cluster.metadata.Metadata.CONTEXT_MODE_PARAM;
Expand All @@ -103,7 +106,7 @@
* @opensearch.api
*/
@PublicApi(since = "1.0.0")
public class IndexMetadata implements Diffable<IndexMetadata>, ToXContentFragment {
public class IndexMetadata implements Diffable<IndexMetadata>, ToXContentFragment, VerifiableWriteable {

public static final ClusterBlock INDEX_READ_ONLY_BLOCK = new ClusterBlock(
5,
Expand Down Expand Up @@ -1287,6 +1290,32 @@ public void writeTo(StreamOutput out) throws IOException {
}
}

@Override
public void writeVerifiableTo(StreamOutput out) throws IOException {
out.writeString(index.getName()); // uuid will come as part of settings
out.writeLong(version);
out.writeVLong(mappingVersion);
out.writeVLong(settingsVersion);
out.writeVLong(aliasesVersion);
out.writeInt(routingNumShards);
out.writeByte(state.id());
writeSettingsToStream(settings, out);
out.writeVLongArray(primaryTerms);
((BufferedChecksumStreamOutput)out).writeMapValues(mappings, (stream, val) -> val.writeTo(stream));
((BufferedChecksumStreamOutput)out).writeMapValues(aliases, (stream, val) -> val.writeTo(stream));
out.writeMap(customData, StreamOutput::writeString, (stream, val) -> val.writeTo(stream));
out.writeMap(
inSyncAllocationIds,
StreamOutput::writeVInt,
(stream, val) -> DiffableUtils.StringSetValueSerializer.getInstance().write(new TreeSet<>(val), stream)
);
((BufferedChecksumStreamOutput)out).writeMapValues(rolloverInfos, (stream, val) -> val.writeTo(stream));
out.writeBoolean(isSystem);
if (out.getVersion().onOrAfter(Version.V_2_17_0)) {
out.writeOptionalWriteable(context);
}
}

public boolean isSystem() {
return isSystem;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,12 @@
import org.opensearch.common.xcontent.json.JsonXContent;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.common.io.stream.VerifiableWriteable;
import org.opensearch.core.xcontent.ToXContent;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.index.mapper.MapperService;
import org.opensearch.index.translog.BufferedChecksumStreamOutput;

import java.io.IOException;
import java.io.UncheckedIOException;
Expand All @@ -67,7 +69,7 @@
* @opensearch.api
*/
@PublicApi(since = "1.0.0")
public class IndexTemplateMetadata extends AbstractDiffable<IndexTemplateMetadata> {
public class IndexTemplateMetadata extends AbstractDiffable<IndexTemplateMetadata> implements VerifiableWriteable {

private static final DeprecationLogger deprecationLogger = DeprecationLogger.getLogger(IndexTemplateMetadata.class);

Expand Down Expand Up @@ -257,6 +259,17 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeOptionalVInt(version);
}

@Override
public void writeVerifiableTo(StreamOutput out) throws IOException {
out.writeString(name);
out.writeInt(order);
out.writeStringCollection(patterns);
Settings.writeSettingsToStream(settings, out);
out.writeMap(mappings, StreamOutput::writeString, (stream, val) -> val.writeTo(stream));
((BufferedChecksumStreamOutput)out).writeMapValues(aliases, (stream, val) -> val.writeTo(stream));
out.writeOptionalVInt(version);
}

@Override
public String toString() {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ static Custom fromXContent(XContentParser parser, String name) throws IOExceptio

public static final String GLOBAL_STATE_FILE_PREFIX = "global-";

private static final NamedDiffableValueSerializer<Custom> CUSTOM_VALUE_SERIALIZER = new NamedDiffableValueSerializer<>(Custom.class);
public static final NamedDiffableValueSerializer<Custom> CUSTOM_VALUE_SERIALIZER = new NamedDiffableValueSerializer<>(Custom.class);

private final String clusterUUID;
private final boolean clusterUUIDCommitted;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,11 @@
import org.opensearch.cluster.AbstractDiffable;
import org.opensearch.common.annotation.PublicApi;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.common.io.stream.VerifiableWriteable;
import org.opensearch.core.xcontent.ToXContentFragment;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.index.translog.BufferedChecksumStreamOutput;

import java.io.IOException;
import java.util.Collections;
Expand All @@ -27,7 +29,7 @@
* @opensearch.api
*/
@PublicApi(since = "2.15.0")
public class TemplatesMetadata extends AbstractDiffable<TemplatesMetadata> implements ToXContentFragment {
public class TemplatesMetadata extends AbstractDiffable<TemplatesMetadata> implements ToXContentFragment, VerifiableWriteable {
public static TemplatesMetadata EMPTY_METADATA = builder().build();
private final Map<String, IndexTemplateMetadata> templates;

Expand Down Expand Up @@ -65,6 +67,11 @@ public void writeTo(StreamOutput out) throws IOException {
}
}

@Override
public void writeVerifiableTo(StreamOutput out) throws IOException {
((BufferedChecksumStreamOutput)out).writeMapValues(templates, (stream, value) -> value.writeVerifiableTo(stream));
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
Expand All @@ -80,6 +87,11 @@ public int hashCode() {
return templates != null ? templates.hashCode() : 0;
}

@Override
public String toString() {
return "TemplatesMetadata{" + "templates=" + templates + '}';
}

/**
* Builder for the templates metadata
*
Expand Down
Loading

0 comments on commit 57168df

Please sign in to comment.