From 175cbd0a0ce6a8542abfeb7f77457e97cad42587 Mon Sep 17 00:00:00 2001 From: Himshikha Gupta Date: Wed, 4 Sep 2024 16:17:16 +0530 Subject: [PATCH] [Backport 2.17] Add cluster state checksum in manifest #15218 (#15658) * Add cluster state checksum in manifest (#15218) Signed-off-by: Himshikha Gupta Co-authored-by: Bukhtawar Khan --- CHANGELOG.md | 1 + .../core/common/io/stream/StreamOutput.java | 2 +- .../remote/RemoteRoutingTableServiceIT.java | 1 + .../remote/RemoteStatePublicationIT.java | 1 + .../opensearch/cluster/AbstractDiffable.java | 5 + .../org/opensearch/cluster/ClusterState.java | 30 +- .../org/opensearch/cluster/DiffableUtils.java | 12 + .../cluster/block/ClusterBlock.java | 8 +- .../cluster/block/ClusterBlocks.java | 5 + .../coordination/CoordinationMetadata.java | 11 +- .../cluster/metadata/IndexMetadata.java | 27 + .../metadata/IndexTemplateMetadata.java | 11 + .../opensearch/cluster/metadata/Metadata.java | 2 +- .../cluster/metadata/TemplatesMetadata.java | 10 + .../cluster/node/DiscoveryNode.java | 26 +- .../cluster/node/DiscoveryNodes.java | 51 +- .../cluster/routing/IndexRoutingTable.java | 14 + .../routing/IndexShardRoutingTable.java | 24 + .../cluster/routing/RoutingTable.java | 11 + .../common/settings/ClusterSettings.java | 1 + .../remote/ClusterMetadataManifest.java | 62 ++- .../gateway/remote/ClusterStateChecksum.java | 485 ++++++++++++++++++ .../remote/RemoteClusterStateService.java | 190 +++++++ .../gateway/remote/RemoteManifestManager.java | 6 +- .../model/RemoteClusterMetadataManifest.java | 1 + .../BufferedChecksumStreamOutput.java | 81 +++ .../cluster/block/ClusterBlocksTests.java | 55 ++ .../CoordinationMetadataTests.java | 31 ++ .../cluster/metadata/IndexMetadataTests.java | 81 +++ .../metadata/IndexTemplateMetadataTests.java | 47 ++ .../cluster/node/DiscoveryNodeTests.java | 39 ++ .../cluster/node/DiscoveryNodesTests.java | 26 + .../routing/IndexShardRoutingTableTests.java | 22 + .../cluster/routing/RoutingTableTests.java | 1 + .../remote/ClusterMetadataManifestTests.java | 123 +++-- .../remote/ClusterStateChecksumTests.java | 222 ++++++++ .../RemoteClusterStateServiceTests.java | 359 ++++++++++++- 37 files changed, 2032 insertions(+), 52 deletions(-) create mode 100644 server/src/main/java/org/opensearch/gateway/remote/ClusterStateChecksum.java create mode 100644 server/src/test/java/org/opensearch/cluster/block/ClusterBlocksTests.java create mode 100644 server/src/test/java/org/opensearch/gateway/remote/ClusterStateChecksumTests.java diff --git a/CHANGELOG.md b/CHANGELOG.md index a38bc27082b71..00ace6b96a7a2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -43,6 +43,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Add support to upload snapshot shard blobs with hashed prefix ([#15426](https://github.com/opensearch-project/OpenSearch/pull/15426)) - Add canRemain method to TargetPoolAllocationDecider to move shards from local to remote pool for hot to warm tiering ([#15010](https://github.com/opensearch-project/OpenSearch/pull/15010)) - Add support for pluggable deciders for concurrent search ([#15363](https://github.com/opensearch-project/OpenSearch/pull/15363)) +- [Remote Publication] Added checksum validation for cluster state behind a cluster setting ([#15218](https://github.com/opensearch-project/OpenSearch/pull/15218)) ### Dependencies - Bump `netty` from 4.1.111.Final to 4.1.112.Final ([#15081](https://github.com/opensearch-project/OpenSearch/pull/15081)) diff --git a/libs/core/src/main/java/org/opensearch/core/common/io/stream/StreamOutput.java b/libs/core/src/main/java/org/opensearch/core/common/io/stream/StreamOutput.java index 1d0876d79a549..cac8ddc8f94e3 100644 --- a/libs/core/src/main/java/org/opensearch/core/common/io/stream/StreamOutput.java +++ b/libs/core/src/main/java/org/opensearch/core/common/io/stream/StreamOutput.java @@ -633,7 +633,7 @@ public final void writeMapOfLists(final Map> map, final Writer * @param keyWriter The key writer * @param valueWriter The value writer */ - public final void writeMap(final Map map, final Writer keyWriter, final Writer valueWriter) throws IOException { + public void writeMap(final Map map, final Writer keyWriter, final Writer valueWriter) throws IOException { writeVInt(map.size()); for (final Map.Entry entry : map.entrySet()) { keyWriter.write(this, entry.getKey()); diff --git a/server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemoteRoutingTableServiceIT.java b/server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemoteRoutingTableServiceIT.java index 50826d0f92939..0161ecf9205f3 100644 --- a/server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemoteRoutingTableServiceIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemoteRoutingTableServiceIT.java @@ -67,6 +67,7 @@ protected Settings nodeSettings(int nodeOrdinal) { ) .put("node.attr." + REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY, REMOTE_ROUTING_TABLE_REPO) .put(REMOTE_PUBLICATION_EXPERIMENTAL, true) + .put(RemoteClusterStateService.REMOTE_CLUSTER_STATE_CHECKSUM_VALIDATION_ENABLED_SETTING.getKey(), true) .build(); } diff --git a/server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemoteStatePublicationIT.java b/server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemoteStatePublicationIT.java index f237a81942e1a..3a3e023db6446 100644 --- a/server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemoteStatePublicationIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemoteStatePublicationIT.java @@ -90,6 +90,7 @@ protected Settings nodeSettings(int nodeOrdinal) { .put("node.attr." + REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY, routingTableRepoName) .put(routingTableRepoTypeAttributeKey, ReloadableFsRepository.TYPE) .put(routingTableRepoSettingsAttributeKeyPrefix + "location", segmentRepoPath) + .put(RemoteClusterStateService.REMOTE_CLUSTER_STATE_CHECKSUM_VALIDATION_ENABLED_SETTING.getKey(), true) .build(); } diff --git a/server/src/main/java/org/opensearch/cluster/AbstractDiffable.java b/server/src/main/java/org/opensearch/cluster/AbstractDiffable.java index 74af3472433ba..770a0c171e084 100644 --- a/server/src/main/java/org/opensearch/cluster/AbstractDiffable.java +++ b/server/src/main/java/org/opensearch/cluster/AbstractDiffable.java @@ -83,6 +83,11 @@ private static class CompleteDiff> implements Diff { this.part = part; } + @Override + public String toString() { + return "CompleteDiff{" + "part=" + part + '}'; + } + /** * Creates simple diff without changes */ diff --git a/server/src/main/java/org/opensearch/cluster/ClusterState.java b/server/src/main/java/org/opensearch/cluster/ClusterState.java index fa3979882f5cb..1e4fd2dfffe0f 100644 --- a/server/src/main/java/org/opensearch/cluster/ClusterState.java +++ b/server/src/main/java/org/opensearch/cluster/ClusterState.java @@ -156,7 +156,7 @@ default boolean isPrivate() { } - private static final NamedDiffableValueSerializer CUSTOM_VALUE_SERIALIZER = new NamedDiffableValueSerializer<>(Custom.class); + public static final NamedDiffableValueSerializer CUSTOM_VALUE_SERIALIZER = new NamedDiffableValueSerializer<>(Custom.class); public static final String UNKNOWN_UUID = "_na_"; @@ -839,6 +839,34 @@ private static class ClusterStateDiff implements Diff { minimumClusterManagerNodesOnPublishingClusterManager = after.minimumClusterManagerNodesOnPublishingClusterManager; } + @Override + public String toString() { + return new StringBuilder().append("ClusterStateDiff{toVersion=") + .append(toVersion) + .append(", fromUuid='") + .append(fromUuid) + .append('\'') + .append(", toUuid='") + .append(toUuid) + .append('\'') + .append(", clusterName=") + .append(clusterName) + .append(", routingTable=") + .append(routingTable) + .append(", nodes=") + .append(nodes) + .append(", metadata=") + .append(metadata) + .append(", blocks=") + .append(blocks) + .append(", customs=") + .append(customs) + .append(", minimumClusterManagerNodesOnPublishingClusterManager=") + .append(minimumClusterManagerNodesOnPublishingClusterManager) + .append("}") + .toString(); + } + ClusterStateDiff(StreamInput in, DiscoveryNode localNode) throws IOException { clusterName = new ClusterName(in); fromUuid = in.readString(); diff --git a/server/src/main/java/org/opensearch/cluster/DiffableUtils.java b/server/src/main/java/org/opensearch/cluster/DiffableUtils.java index d21cd354bf659..4b5dcaa52cc50 100644 --- a/server/src/main/java/org/opensearch/cluster/DiffableUtils.java +++ b/server/src/main/java/org/opensearch/cluster/DiffableUtils.java @@ -271,6 +271,18 @@ public Map getUpserts() { return upserts; } + @Override + public String toString() { + return new StringBuilder().append("MapDiff{deletes=") + .append(deletes) + .append(", diffs=") + .append(diffs) + .append(", upserts=") + .append(upserts) + .append("}") + .toString(); + } + @Override public void writeTo(StreamOutput out) throws IOException { out.writeCollection(deletes, (o, v) -> keySerializer.writeKey(v, o)); diff --git a/server/src/main/java/org/opensearch/cluster/block/ClusterBlock.java b/server/src/main/java/org/opensearch/cluster/block/ClusterBlock.java index 5fa897c0b1185..7c0a7a2a6b837 100644 --- a/server/src/main/java/org/opensearch/cluster/block/ClusterBlock.java +++ b/server/src/main/java/org/opensearch/cluster/block/ClusterBlock.java @@ -52,7 +52,7 @@ * @opensearch.api */ @PublicApi(since = "1.0.0") -public class ClusterBlock implements Writeable, ToXContentFragment { +public class ClusterBlock implements Writeable, ToXContentFragment, Comparable { private final int id; @Nullable @@ -217,7 +217,13 @@ public int hashCode() { return Objects.hash(id, uuid); } + @Override + public int compareTo(ClusterBlock block) { + return Integer.compare(block.id(), this.id()); + } + public boolean isAllowReleaseResources() { return allowReleaseResources; } + } diff --git a/server/src/main/java/org/opensearch/cluster/block/ClusterBlocks.java b/server/src/main/java/org/opensearch/cluster/block/ClusterBlocks.java index 02a20b7681ba7..c2c9e2e2bd0d2 100644 --- a/server/src/main/java/org/opensearch/cluster/block/ClusterBlocks.java +++ b/server/src/main/java/org/opensearch/cluster/block/ClusterBlocks.java @@ -42,6 +42,7 @@ import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.common.io.stream.StreamOutput; import org.opensearch.core.rest.RestStatus; +import org.opensearch.index.translog.BufferedChecksumStreamOutput; import java.io.IOException; import java.util.Collections; @@ -303,6 +304,10 @@ public void writeTo(StreamOutput out) throws IOException { out.writeMap(indicesBlocks, StreamOutput::writeString, (o, s) -> writeBlockSet(s, o)); } + public void writeVerifiableTo(BufferedChecksumStreamOutput out) throws IOException { + writeTo(out); + } + private static void writeBlockSet(Set blocks, StreamOutput out) throws IOException { out.writeCollection(blocks); } diff --git a/server/src/main/java/org/opensearch/cluster/coordination/CoordinationMetadata.java b/server/src/main/java/org/opensearch/cluster/coordination/CoordinationMetadata.java index 1447cc2fbe772..df2550de102f9 100644 --- a/server/src/main/java/org/opensearch/cluster/coordination/CoordinationMetadata.java +++ b/server/src/main/java/org/opensearch/cluster/coordination/CoordinationMetadata.java @@ -42,6 +42,7 @@ import org.opensearch.core.xcontent.ToXContentFragment; import org.opensearch.core.xcontent.XContentBuilder; import org.opensearch.core.xcontent.XContentParser; +import org.opensearch.index.translog.BufferedChecksumStreamOutput; import java.io.IOException; import java.util.Arrays; @@ -149,6 +150,10 @@ public void writeTo(StreamOutput out) throws IOException { out.writeCollection(votingConfigExclusions); } + public void writeVerifiableTo(BufferedChecksumStreamOutput out) throws IOException { + writeTo(out); + } + @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { return builder.field(TERM_PARSE_FIELD.getPreferredName(), term) @@ -272,7 +277,7 @@ public CoordinationMetadata build() { * @opensearch.api */ @PublicApi(since = "1.0.0") - public static class VotingConfigExclusion implements Writeable, ToXContentFragment { + public static class VotingConfigExclusion implements Writeable, ToXContentFragment, Comparable { public static final String MISSING_VALUE_MARKER = "_absent_"; private final String nodeId; private final String nodeName; @@ -361,6 +366,10 @@ public String toString() { return sb.toString(); } + @Override + public int compareTo(VotingConfigExclusion votingConfigExclusion) { + return votingConfigExclusion.getNodeId().compareTo(this.getNodeId()); + } } /** diff --git a/server/src/main/java/org/opensearch/cluster/metadata/IndexMetadata.java b/server/src/main/java/org/opensearch/cluster/metadata/IndexMetadata.java index ede4e3384f034..e0444ee670011 100644 --- a/server/src/main/java/org/opensearch/cluster/metadata/IndexMetadata.java +++ b/server/src/main/java/org/opensearch/cluster/metadata/IndexMetadata.java @@ -69,6 +69,7 @@ import org.opensearch.index.IndexModule; import org.opensearch.index.mapper.MapperService; import org.opensearch.index.seqno.SequenceNumbers; +import org.opensearch.index.translog.BufferedChecksumStreamOutput; import org.opensearch.indices.replication.SegmentReplicationSource; import org.opensearch.indices.replication.common.ReplicationType; @@ -88,6 +89,7 @@ import java.util.Map; import java.util.Objects; import java.util.Set; +import java.util.TreeSet; import java.util.function.Function; import static org.opensearch.cluster.metadata.Metadata.CONTEXT_MODE_PARAM; @@ -1287,6 +1289,31 @@ public void writeTo(StreamOutput out) throws IOException { } } + public void writeVerifiableTo(BufferedChecksumStreamOutput out) throws IOException { + out.writeString(index.getName()); // uuid will come as part of settings + out.writeLong(version); + out.writeVLong(mappingVersion); + out.writeVLong(settingsVersion); + out.writeVLong(aliasesVersion); + out.writeInt(routingNumShards); + out.writeByte(state.id()); + writeSettingsToStream(settings, out); + out.writeVLongArray(primaryTerms); + out.writeMapValues(mappings, (stream, val) -> val.writeTo(stream)); + out.writeMapValues(aliases, (stream, val) -> val.writeTo(stream)); + out.writeMap(customData, StreamOutput::writeString, (stream, val) -> val.writeTo(stream)); + out.writeMap( + inSyncAllocationIds, + StreamOutput::writeVInt, + (stream, val) -> DiffableUtils.StringSetValueSerializer.getInstance().write(new TreeSet<>(val), stream) + ); + out.writeMapValues(rolloverInfos, (stream, val) -> val.writeTo(stream)); + out.writeBoolean(isSystem); + if (out.getVersion().onOrAfter(Version.V_2_17_0)) { + out.writeOptionalWriteable(context); + } + } + public boolean isSystem() { return isSystem; } diff --git a/server/src/main/java/org/opensearch/cluster/metadata/IndexTemplateMetadata.java b/server/src/main/java/org/opensearch/cluster/metadata/IndexTemplateMetadata.java index 3d532208bcfe2..1539abcfb2e8a 100644 --- a/server/src/main/java/org/opensearch/cluster/metadata/IndexTemplateMetadata.java +++ b/server/src/main/java/org/opensearch/cluster/metadata/IndexTemplateMetadata.java @@ -50,6 +50,7 @@ import org.opensearch.core.xcontent.XContentBuilder; import org.opensearch.core.xcontent.XContentParser; import org.opensearch.index.mapper.MapperService; +import org.opensearch.index.translog.BufferedChecksumStreamOutput; import java.io.IOException; import java.io.UncheckedIOException; @@ -257,6 +258,16 @@ public void writeTo(StreamOutput out) throws IOException { out.writeOptionalVInt(version); } + public void writeVerifiableTo(BufferedChecksumStreamOutput out) throws IOException { + out.writeString(name); + out.writeInt(order); + out.writeStringCollection(patterns); + Settings.writeSettingsToStream(settings, out); + out.writeMap(mappings, StreamOutput::writeString, (stream, val) -> val.writeTo(stream)); + out.writeMapValues(aliases, (stream, val) -> val.writeTo(stream)); + out.writeOptionalVInt(version); + } + @Override public String toString() { try { diff --git a/server/src/main/java/org/opensearch/cluster/metadata/Metadata.java b/server/src/main/java/org/opensearch/cluster/metadata/Metadata.java index 42739c4670ebc..0f6165e18f678 100644 --- a/server/src/main/java/org/opensearch/cluster/metadata/Metadata.java +++ b/server/src/main/java/org/opensearch/cluster/metadata/Metadata.java @@ -254,7 +254,7 @@ static Custom fromXContent(XContentParser parser, String name) throws IOExceptio public static final String GLOBAL_STATE_FILE_PREFIX = "global-"; - private static final NamedDiffableValueSerializer CUSTOM_VALUE_SERIALIZER = new NamedDiffableValueSerializer<>(Custom.class); + public static final NamedDiffableValueSerializer CUSTOM_VALUE_SERIALIZER = new NamedDiffableValueSerializer<>(Custom.class); private final String clusterUUID; private final boolean clusterUUIDCommitted; diff --git a/server/src/main/java/org/opensearch/cluster/metadata/TemplatesMetadata.java b/server/src/main/java/org/opensearch/cluster/metadata/TemplatesMetadata.java index d31a51628eac4..c337ba72de9fa 100644 --- a/server/src/main/java/org/opensearch/cluster/metadata/TemplatesMetadata.java +++ b/server/src/main/java/org/opensearch/cluster/metadata/TemplatesMetadata.java @@ -14,6 +14,7 @@ import org.opensearch.core.xcontent.ToXContentFragment; import org.opensearch.core.xcontent.XContentBuilder; import org.opensearch.core.xcontent.XContentParser; +import org.opensearch.index.translog.BufferedChecksumStreamOutput; import java.io.IOException; import java.util.Collections; @@ -65,6 +66,10 @@ public void writeTo(StreamOutput out) throws IOException { } } + public void writeVerifiableTo(BufferedChecksumStreamOutput out) throws IOException { + out.writeMapValues(templates, (stream, value) -> value.writeVerifiableTo((BufferedChecksumStreamOutput) stream)); + } + @Override public boolean equals(Object o) { if (this == o) return true; @@ -80,6 +85,11 @@ public int hashCode() { return templates != null ? templates.hashCode() : 0; } + @Override + public String toString() { + return "TemplatesMetadata{" + "templates=" + templates + '}'; + } + /** * Builder for the templates metadata * diff --git a/server/src/main/java/org/opensearch/cluster/node/DiscoveryNode.java b/server/src/main/java/org/opensearch/cluster/node/DiscoveryNode.java index 1694fc23f511e..e71570b36552b 100644 --- a/server/src/main/java/org/opensearch/cluster/node/DiscoveryNode.java +++ b/server/src/main/java/org/opensearch/cluster/node/DiscoveryNode.java @@ -44,6 +44,7 @@ import org.opensearch.core.common.transport.TransportAddress; import org.opensearch.core.xcontent.ToXContentFragment; import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.index.translog.BufferedChecksumStreamOutput; import org.opensearch.node.Node; import java.io.IOException; @@ -397,12 +398,7 @@ public void writeToWithAttribute(StreamOutput out) throws IOException { } public void writeToUtil(StreamOutput out, boolean includeAllAttributes) throws IOException { - out.writeString(nodeName); - out.writeString(nodeId); - out.writeString(ephemeralId); - out.writeString(hostName); - out.writeString(hostAddress); - address.writeTo(out); + writeNodeDetails(out); if (includeAllAttributes) { out.writeVInt(attributes.size()); for (Map.Entry entry : attributes.entrySet()) { @@ -412,7 +408,25 @@ public void writeToUtil(StreamOutput out, boolean includeAllAttributes) throws I } else { out.writeVInt(0); } + writeRolesAndVersion(out); + } + + public void writeVerifiableTo(BufferedChecksumStreamOutput out) throws IOException { + writeNodeDetails(out); + out.writeMap(attributes, StreamOutput::writeString, StreamOutput::writeString); + writeRolesAndVersion(out); + } + + private void writeNodeDetails(StreamOutput out) throws IOException { + out.writeString(nodeName); + out.writeString(nodeId); + out.writeString(ephemeralId); + out.writeString(hostName); + out.writeString(hostAddress); + address.writeTo(out); + } + private void writeRolesAndVersion(StreamOutput out) throws IOException { if (out.getVersion().onOrAfter(LegacyESVersion.V_7_3_0)) { out.writeVInt(roles.size()); for (final DiscoveryNodeRole role : roles) { diff --git a/server/src/main/java/org/opensearch/cluster/node/DiscoveryNodes.java b/server/src/main/java/org/opensearch/cluster/node/DiscoveryNodes.java index 33605ec6330cf..35e95fafc3a27 100644 --- a/server/src/main/java/org/opensearch/cluster/node/DiscoveryNodes.java +++ b/server/src/main/java/org/opensearch/cluster/node/DiscoveryNodes.java @@ -45,6 +45,7 @@ import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.common.io.stream.StreamOutput; import org.opensearch.core.common.transport.TransportAddress; +import org.opensearch.index.translog.BufferedChecksumStreamOutput; import java.io.IOException; import java.util.ArrayList; @@ -709,16 +710,58 @@ public void writeToWithAttribute(StreamOutput out) throws IOException { } private void writeToUtil(final Writer writer, StreamOutput out) throws IOException { + writeClusterManager(out); + out.writeVInt(nodes.size()); + for (DiscoveryNode node : this) { + writer.write(out, node); + } + } + + public void writeVerifiableTo(BufferedChecksumStreamOutput out) throws IOException { + writeClusterManager(out); + out.writeMapValues(nodes, (stream, val) -> val.writeVerifiableTo((BufferedChecksumStreamOutput) stream)); + } + + private void writeClusterManager(StreamOutput out) throws IOException { if (clusterManagerNodeId == null) { out.writeBoolean(false); } else { out.writeBoolean(true); out.writeString(clusterManagerNodeId); } - out.writeVInt(nodes.size()); - for (DiscoveryNode node : this) { - writer.write(out, node); - } + } + + @Override + public int hashCode() { + return Objects.hash( + nodes, + dataNodes, + clusterManagerNodes, + ingestNodes, + clusterManagerNodeId, + localNodeId, + minNonClientNodeVersion, + maxNonClientNodeVersion, + maxNodeVersion, + minNodeVersion + ); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + DiscoveryNodes that = (DiscoveryNodes) o; + return Objects.equals(nodes, that.nodes) + && Objects.equals(dataNodes, that.dataNodes) + && Objects.equals(clusterManagerNodes, that.clusterManagerNodes) + && Objects.equals(ingestNodes, that.ingestNodes) + && Objects.equals(clusterManagerNodeId, that.clusterManagerNodeId) + && Objects.equals(localNodeId, that.localNodeId) + && Objects.equals(minNonClientNodeVersion, that.minNonClientNodeVersion) + && Objects.equals(maxNonClientNodeVersion, that.maxNonClientNodeVersion) + && Objects.equals(maxNodeVersion, that.maxNodeVersion) + && Objects.equals(minNodeVersion, that.minNodeVersion); } public static DiscoveryNodes readFrom(StreamInput in, DiscoveryNode localNode) throws IOException { diff --git a/server/src/main/java/org/opensearch/cluster/routing/IndexRoutingTable.java b/server/src/main/java/org/opensearch/cluster/routing/IndexRoutingTable.java index 80c9d42cdb617..b72acc3b46bbb 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/IndexRoutingTable.java +++ b/server/src/main/java/org/opensearch/cluster/routing/IndexRoutingTable.java @@ -49,6 +49,7 @@ import org.opensearch.core.common.io.stream.StreamOutput; import org.opensearch.core.index.Index; import org.opensearch.core.index.shard.ShardId; +import org.opensearch.index.translog.BufferedChecksumStreamOutput; import java.io.IOException; import java.util.ArrayList; @@ -353,6 +354,11 @@ public int hashCode() { return result; } + @Override + public String toString() { + return "IndexRoutingTable{" + "shards=" + shards + ", index=" + index + '}'; + } + public static IndexRoutingTable readFrom(StreamInput in) throws IOException { Index index = new Index(in); Builder builder = new Builder(index); @@ -378,6 +384,14 @@ public void writeTo(StreamOutput out) throws IOException { } } + public void writeVerifiableTo(BufferedChecksumStreamOutput out) throws IOException { + index.writeTo(out); + out.writeMapValues( + shards, + (stream, value) -> IndexShardRoutingTable.Builder.writeVerifiableTo(value, (BufferedChecksumStreamOutput) stream) + ); + } + public static Builder builder(Index index) { return new Builder(index); } diff --git a/server/src/main/java/org/opensearch/cluster/routing/IndexShardRoutingTable.java b/server/src/main/java/org/opensearch/cluster/routing/IndexShardRoutingTable.java index f782cfc1b060a..57a8e8026a8e5 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/IndexShardRoutingTable.java +++ b/server/src/main/java/org/opensearch/cluster/routing/IndexShardRoutingTable.java @@ -34,6 +34,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; import org.opensearch.cluster.AbstractDiffable; import org.opensearch.cluster.Diff; import org.opensearch.cluster.node.DiscoveryNode; @@ -47,6 +48,7 @@ import org.opensearch.core.common.io.stream.StreamOutput; import org.opensearch.core.index.Index; import org.opensearch.core.index.shard.ShardId; +import org.opensearch.index.translog.BufferedChecksumStreamOutput; import org.opensearch.node.ResponseCollectorService; import java.io.IOException; @@ -62,6 +64,7 @@ import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Predicate; import java.util.stream.Collectors; @@ -1169,6 +1172,27 @@ public static void writeToThin(IndexShardRoutingTable indexShard, StreamOutput o } } + public static void writeVerifiableTo(IndexShardRoutingTable indexShard, BufferedChecksumStreamOutput out) throws IOException { + out.writeVInt(indexShard.shardId.id()); + out.writeVInt(indexShard.shards.size()); + // Order allocated shards by allocationId + AtomicInteger assignedShardCount = new AtomicInteger(); + indexShard.shards.stream() + .filter(shardRouting -> shardRouting.allocationId() != null) + .sorted(Comparator.comparing(o -> o.allocationId().getId())) + .forEach(shardRouting -> { + try { + assignedShardCount.getAndIncrement(); + shardRouting.writeToThin(out); + } catch (IOException e) { + logger.error(() -> new ParameterizedMessage("Failed to write shard {}. Exception {}", indexShard, e)); + throw new RuntimeException("Failed to write IndexShardRoutingTable", e); + } + }); + // is primary assigned + out.writeBoolean(indexShard.primaryShard().allocationId() != null); + out.writeVInt(indexShard.shards.size() - assignedShardCount.get()); + } } @Override diff --git a/server/src/main/java/org/opensearch/cluster/routing/RoutingTable.java b/server/src/main/java/org/opensearch/cluster/routing/RoutingTable.java index 544d4f9326fba..7dece5b4924ed 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/RoutingTable.java +++ b/server/src/main/java/org/opensearch/cluster/routing/RoutingTable.java @@ -48,6 +48,7 @@ import org.opensearch.core.index.shard.ShardId; import org.opensearch.index.IndexNotFoundException; import org.opensearch.index.shard.ShardNotFoundException; +import org.opensearch.index.translog.BufferedChecksumStreamOutput; import java.io.IOException; import java.util.ArrayList; @@ -407,6 +408,11 @@ public void writeTo(StreamOutput out) throws IOException { } } + public void writeVerifiableTo(BufferedChecksumStreamOutput out) throws IOException { + out.writeLong(version); + out.writeMapValues(indicesRouting, (stream, value) -> value.writeVerifiableTo((BufferedChecksumStreamOutput) stream)); + } + private static class RoutingTableDiff implements Diff, StringKeyDiffProvider { private final long version; @@ -426,6 +432,11 @@ private static class RoutingTableDiff implements Diff, StringKeyDi indicesRouting = DiffableUtils.readJdkMapDiff(in, DiffableUtils.getStringKeySerializer(), DIFF_VALUE_READER); } + @Override + public String toString() { + return "RoutingTableDiff{" + "version=" + version + ", indicesRouting=" + indicesRouting + '}'; + } + @Override public RoutingTable apply(RoutingTable part) { return new RoutingTable(version, indicesRouting.apply(part.indicesRouting)); diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java index 1ee1d8c3dfa28..72891d16b3667 100644 --- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java @@ -742,6 +742,7 @@ public void apply(Settings value, Settings current, Settings previous) { IndicesService.CLUSTER_INDEX_RESTRICT_REPLICATION_TYPE_SETTING, RemoteRoutingTableBlobStore.REMOTE_ROUTING_TABLE_PATH_TYPE_SETTING, RemoteRoutingTableBlobStore.REMOTE_ROUTING_TABLE_PATH_HASH_ALGO_SETTING, + RemoteClusterStateService.REMOTE_CLUSTER_STATE_CHECKSUM_VALIDATION_ENABLED_SETTING, AdmissionControlSettings.ADMISSION_CONTROL_TRANSPORT_LAYER_MODE, CpuBasedAdmissionControllerSettings.CPU_BASED_ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE, diff --git a/server/src/main/java/org/opensearch/gateway/remote/ClusterMetadataManifest.java b/server/src/main/java/org/opensearch/gateway/remote/ClusterMetadataManifest.java index 8025940b35997..405e5cd784196 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/ClusterMetadataManifest.java +++ b/server/src/main/java/org/opensearch/gateway/remote/ClusterMetadataManifest.java @@ -45,7 +45,8 @@ public class ClusterMetadataManifest implements Writeable, ToXContentFragment { // also we introduce index routing-metadata, diff and other attributes as part of manifest // required for state publication public static final int CODEC_V3 = 3; // In Codec V3, we have introduced new diff field in diff-manifest's routing_table_diff - public static final int CODEC_V4 = 4; // In Codec V4, we have removed upserts and delete field for routing table in diff manifest + public static final int CODEC_V4 = 4; // In Codec V4, we have removed upserts and delete field for routing table in diff manifest and + // added checksum of cluster state. public static final int[] CODEC_VERSIONS = { CODEC_V0, CODEC_V1, CODEC_V2, CODEC_V3, CODEC_V4 }; private static final ParseField CLUSTER_TERM_FIELD = new ParseField("cluster_term"); @@ -75,6 +76,7 @@ public class ClusterMetadataManifest implements Writeable, ToXContentFragment { ); private static final ParseField UPLOADED_CLUSTER_STATE_CUSTOM_METADATA = new ParseField("uploaded_cluster_state_custom_metadata"); private static final ParseField DIFF_MANIFEST = new ParseField("diff_manifest"); + private static final ParseField CHECKSUM = new ParseField("checksum"); private static ClusterMetadataManifest.Builder manifestV0Builder(Object[] fields) { return ClusterMetadataManifest.builder() @@ -117,7 +119,7 @@ private static ClusterMetadataManifest.Builder manifestV3Builder(Object[] fields } private static ClusterMetadataManifest.Builder manifestV4Builder(Object[] fields) { - return manifestV3Builder(fields); + return manifestV3Builder(fields).checksum(checksum(fields)); } private static long term(Object[] fields) { @@ -222,6 +224,10 @@ private static ClusterStateDiffManifest diffManifest(Object[] fields) { return (ClusterStateDiffManifest) fields[23]; } + private static ClusterStateChecksum checksum(Object[] fields) { + return (ClusterStateChecksum) fields[24]; + } + private static final ConstructingObjectParser PARSER_V0 = new ConstructingObjectParser<>( "cluster_metadata_manifest", fields -> manifestV0Builder(fields).build() @@ -248,6 +254,7 @@ private static ClusterStateDiffManifest diffManifest(Object[] fields) { ); private static final ConstructingObjectParser CURRENT_PARSER = PARSER_V4; + public static final int MANIFEST_CURRENT_CODEC_VERSION = CODEC_V4; private static final Map VERSION_TO_CODEC_MAPPING; @@ -360,6 +367,13 @@ private static void declareParser(ConstructingObjectParser= CODEC_V4) { + parser.declareObject( + ConstructingObjectParser.optionalConstructorArg(), + (p, c) -> ClusterStateChecksum.fromXContent(p), + CHECKSUM + ); + } } private final int codecVersion; @@ -387,6 +401,7 @@ private static void declareParser(ConstructingObjectParser uploadedClusterStateCustomMap; private final ClusterStateDiffManifest diffManifest; + private ClusterStateChecksum clusterStateChecksum; public List getIndices() { return indices; @@ -495,6 +510,10 @@ public List getIndicesRouting() { return indicesRouting; } + public ClusterStateChecksum getClusterStateChecksum() { + return clusterStateChecksum; + } + public ClusterMetadataManifest( long clusterTerm, long version, @@ -520,7 +539,8 @@ public ClusterMetadataManifest( UploadedMetadataAttribute uploadedTransientSettingsMetadata, UploadedMetadataAttribute uploadedHashesOfConsistentSettings, Map uploadedClusterStateCustomMap, - ClusterStateDiffManifest diffManifest + ClusterStateDiffManifest diffManifest, + ClusterStateChecksum clusterStateChecksum ) { this.clusterTerm = clusterTerm; this.stateVersion = version; @@ -551,6 +571,7 @@ public ClusterMetadataManifest( this.uploadedClusterStateCustomMap = Collections.unmodifiableMap( uploadedClusterStateCustomMap != null ? uploadedClusterStateCustomMap : new HashMap<>() ); + this.clusterStateChecksum = clusterStateChecksum; } public ClusterMetadataManifest(StreamInput in) throws IOException { @@ -564,6 +585,7 @@ public ClusterMetadataManifest(StreamInput in) throws IOException { this.indices = Collections.unmodifiableList(in.readList(UploadedIndexMetadata::new)); this.previousClusterUUID = in.readString(); this.clusterUUIDCommitted = in.readBoolean(); + clusterStateChecksum = null; if (in.getVersion().onOrAfter(Version.V_2_15_0)) { this.codecVersion = in.readInt(); this.uploadedCoordinationMetadata = new UploadedMetadataAttribute(in); @@ -626,6 +648,9 @@ public ClusterMetadataManifest(StreamInput in) throws IOException { this.uploadedHashesOfConsistentSettings = null; this.uploadedClusterStateCustomMap = null; } + if (in.getVersion().onOrAfter(Version.V_2_17_0) && in.readBoolean()) { + clusterStateChecksum = new ClusterStateChecksum(in); + } } public static Builder builder() { @@ -723,6 +748,13 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws builder.field(CODEC_VERSION_FIELD.getPreferredName(), getCodecVersion()); builder.field(GLOBAL_METADATA_FIELD.getPreferredName(), getGlobalMetadataFileName()); } + if (onOrAfterCodecVersion(CODEC_V4)) { + if (getClusterStateChecksum() != null) { + builder.startObject(CHECKSUM.getPreferredName()); + getClusterStateChecksum().toXContent(builder, params); + builder.endObject(); + } + } return builder; } @@ -782,6 +814,14 @@ public void writeTo(StreamOutput out) throws IOException { out.writeInt(codecVersion); out.writeString(globalMetadataFileName); } + if (out.getVersion().onOrAfter(Version.V_2_17_0)) { + if (clusterStateChecksum != null) { + out.writeBoolean(true); + clusterStateChecksum.writeTo(out); + } else { + out.writeBoolean(false); + } + } } @Override @@ -817,7 +857,8 @@ public boolean equals(Object o) { && Objects.equals(uploadedTransientSettingsMetadata, that.uploadedTransientSettingsMetadata) && Objects.equals(uploadedHashesOfConsistentSettings, that.uploadedHashesOfConsistentSettings) && Objects.equals(uploadedClusterStateCustomMap, that.uploadedClusterStateCustomMap) - && Objects.equals(diffManifest, that.diffManifest); + && Objects.equals(diffManifest, that.diffManifest) + && Objects.equals(clusterStateChecksum, that.clusterStateChecksum); } @Override @@ -847,7 +888,8 @@ public int hashCode() { uploadedTransientSettingsMetadata, uploadedHashesOfConsistentSettings, uploadedClusterStateCustomMap, - diffManifest + diffManifest, + clusterStateChecksum ); } @@ -912,6 +954,7 @@ public static class Builder { private UploadedMetadataAttribute hashesOfConsistentSettings; private Map clusterStateCustomMetadataMap; private ClusterStateDiffManifest diffManifest; + private ClusterStateChecksum checksum; public Builder indices(List indices) { this.indices = indices; @@ -1051,6 +1094,11 @@ public Builder diffManifest(ClusterStateDiffManifest diffManifest) { return this; } + public Builder checksum(ClusterStateChecksum checksum) { + this.checksum = checksum; + return this; + } + public Builder() { indices = new ArrayList<>(); customMetadataMap = new HashMap<>(); @@ -1083,6 +1131,7 @@ public Builder(ClusterMetadataManifest manifest) { this.diffManifest = manifest.diffManifest; this.hashesOfConsistentSettings = manifest.uploadedHashesOfConsistentSettings; this.clusterStateCustomMetadataMap = manifest.uploadedClusterStateCustomMap; + this.checksum = manifest.clusterStateChecksum; } public ClusterMetadataManifest build() { @@ -1111,7 +1160,8 @@ public ClusterMetadataManifest build() { transientSettingsMetadata, hashesOfConsistentSettings, clusterStateCustomMetadataMap, - diffManifest + diffManifest, + checksum ); } diff --git a/server/src/main/java/org/opensearch/gateway/remote/ClusterStateChecksum.java b/server/src/main/java/org/opensearch/gateway/remote/ClusterStateChecksum.java new file mode 100644 index 0000000000000..fb0eb35f4066b --- /dev/null +++ b/server/src/main/java/org/opensearch/gateway/remote/ClusterStateChecksum.java @@ -0,0 +1,485 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.gateway.remote; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.metadata.DiffableStringMap; +import org.opensearch.common.io.stream.BytesStreamOutput; +import org.opensearch.common.settings.Settings; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.common.io.stream.StreamOutput; +import org.opensearch.core.common.io.stream.Writeable; +import org.opensearch.core.xcontent.ToXContentFragment; +import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.core.xcontent.XContentParseException; +import org.opensearch.core.xcontent.XContentParser; +import org.opensearch.index.translog.BufferedChecksumStreamOutput; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; + +import com.jcraft.jzlib.JZlib; + +import static org.opensearch.core.xcontent.XContentParserUtils.ensureExpectedToken; + +/** + * Stores checksum for all components in cluster state. This will be used to ensure cluster state is same across all nodes in the cluster. + */ +public class ClusterStateChecksum implements ToXContentFragment, Writeable { + + static final String ROUTING_TABLE_CS = "routing_table"; + static final String NODES_CS = "discovery_nodes"; + static final String BLOCKS_CS = "blocks"; + static final String CUSTOMS_CS = "customs"; + static final String COORDINATION_MD_CS = "coordination_md"; + static final String SETTINGS_MD_CS = "settings_md"; + static final String TRANSIENT_SETTINGS_MD_CS = "transient_settings_md"; + static final String TEMPLATES_MD_CS = "templates_md"; + static final String CUSTOM_MD_CS = "customs_md"; + static final String HASHES_MD_CS = "hashes_md"; + static final String INDICES_CS = "indices_md"; + private static final String CLUSTER_STATE_CS = "cluster_state"; + private static final int CHECKSUM_SIZE = 8; + private static final Logger logger = LogManager.getLogger(ClusterStateChecksum.class); + + long routingTableChecksum; + long nodesChecksum; + long blocksChecksum; + long clusterStateCustomsChecksum; + long coordinationMetadataChecksum; + long settingMetadataChecksum; + long transientSettingsMetadataChecksum; + long templatesMetadataChecksum; + long customMetadataMapChecksum; + long hashesOfConsistentSettingsChecksum; + long indicesChecksum; + long clusterStateChecksum; + + public ClusterStateChecksum(ClusterState clusterState) { + try ( + BytesStreamOutput out = new BytesStreamOutput(); + BufferedChecksumStreamOutput checksumOut = new BufferedChecksumStreamOutput(out) + ) { + clusterState.routingTable().writeVerifiableTo(checksumOut); + routingTableChecksum = checksumOut.getChecksum(); + + checksumOut.reset(); + clusterState.nodes().writeVerifiableTo(checksumOut); + nodesChecksum = checksumOut.getChecksum(); + + checksumOut.reset(); + clusterState.coordinationMetadata().writeVerifiableTo(checksumOut); + coordinationMetadataChecksum = checksumOut.getChecksum(); + + // Settings create sortedMap by default, so no explicit sorting required here. + checksumOut.reset(); + Settings.writeSettingsToStream(clusterState.metadata().persistentSettings(), checksumOut); + settingMetadataChecksum = checksumOut.getChecksum(); + + checksumOut.reset(); + Settings.writeSettingsToStream(clusterState.metadata().transientSettings(), checksumOut); + transientSettingsMetadataChecksum = checksumOut.getChecksum(); + + checksumOut.reset(); + clusterState.metadata().templatesMetadata().writeVerifiableTo(checksumOut); + templatesMetadataChecksum = checksumOut.getChecksum(); + + checksumOut.reset(); + checksumOut.writeStringCollection(clusterState.metadata().customs().keySet()); + customMetadataMapChecksum = checksumOut.getChecksum(); + + checksumOut.reset(); + ((DiffableStringMap) clusterState.metadata().hashesOfConsistentSettings()).writeTo(checksumOut); + hashesOfConsistentSettingsChecksum = checksumOut.getChecksum(); + + checksumOut.reset(); + checksumOut.writeMapValues( + clusterState.metadata().indices(), + (stream, value) -> value.writeVerifiableTo((BufferedChecksumStreamOutput) stream) + ); + indicesChecksum = checksumOut.getChecksum(); + + checksumOut.reset(); + clusterState.blocks().writeVerifiableTo(checksumOut); + blocksChecksum = checksumOut.getChecksum(); + + checksumOut.reset(); + checksumOut.writeStringCollection(clusterState.customs().keySet()); + clusterStateCustomsChecksum = checksumOut.getChecksum(); + } catch (IOException e) { + logger.error("Failed to create checksum for cluster state.", e); + throw new RemoteStateTransferException("Failed to create checksum for cluster state.", e); + } + createClusterStateChecksum(); + } + + private void createClusterStateChecksum() { + clusterStateChecksum = JZlib.crc32_combine(routingTableChecksum, nodesChecksum, CHECKSUM_SIZE); + clusterStateChecksum = JZlib.crc32_combine(clusterStateChecksum, blocksChecksum, CHECKSUM_SIZE); + clusterStateChecksum = JZlib.crc32_combine(clusterStateChecksum, clusterStateCustomsChecksum, CHECKSUM_SIZE); + clusterStateChecksum = JZlib.crc32_combine(clusterStateChecksum, coordinationMetadataChecksum, CHECKSUM_SIZE); + clusterStateChecksum = JZlib.crc32_combine(clusterStateChecksum, settingMetadataChecksum, CHECKSUM_SIZE); + clusterStateChecksum = JZlib.crc32_combine(clusterStateChecksum, transientSettingsMetadataChecksum, CHECKSUM_SIZE); + clusterStateChecksum = JZlib.crc32_combine(clusterStateChecksum, templatesMetadataChecksum, CHECKSUM_SIZE); + clusterStateChecksum = JZlib.crc32_combine(clusterStateChecksum, customMetadataMapChecksum, CHECKSUM_SIZE); + clusterStateChecksum = JZlib.crc32_combine(clusterStateChecksum, hashesOfConsistentSettingsChecksum, CHECKSUM_SIZE); + clusterStateChecksum = JZlib.crc32_combine(clusterStateChecksum, indicesChecksum, CHECKSUM_SIZE); + } + + public static ClusterStateChecksum.Builder builder() { + return new ClusterStateChecksum.Builder(); + } + + public ClusterStateChecksum( + long routingTableChecksum, + long nodesChecksum, + long blocksChecksum, + long clusterStateCustomsChecksum, + long coordinationMetadataChecksum, + long settingMetadataChecksum, + long transientSettingsMetadataChecksum, + long templatesMetadataChecksum, + long customMetadataMapChecksum, + long hashesOfConsistentSettingsChecksum, + long indicesChecksum, + long clusterStateChecksum + ) { + this.routingTableChecksum = routingTableChecksum; + this.nodesChecksum = nodesChecksum; + this.blocksChecksum = blocksChecksum; + this.clusterStateCustomsChecksum = clusterStateCustomsChecksum; + this.coordinationMetadataChecksum = coordinationMetadataChecksum; + this.settingMetadataChecksum = settingMetadataChecksum; + this.transientSettingsMetadataChecksum = transientSettingsMetadataChecksum; + this.templatesMetadataChecksum = templatesMetadataChecksum; + this.customMetadataMapChecksum = customMetadataMapChecksum; + this.hashesOfConsistentSettingsChecksum = hashesOfConsistentSettingsChecksum; + this.indicesChecksum = indicesChecksum; + this.clusterStateChecksum = clusterStateChecksum; + } + + public ClusterStateChecksum(StreamInput in) throws IOException { + routingTableChecksum = in.readLong(); + nodesChecksum = in.readLong(); + blocksChecksum = in.readLong(); + clusterStateCustomsChecksum = in.readLong(); + coordinationMetadataChecksum = in.readLong(); + settingMetadataChecksum = in.readLong(); + transientSettingsMetadataChecksum = in.readLong(); + templatesMetadataChecksum = in.readLong(); + customMetadataMapChecksum = in.readLong(); + hashesOfConsistentSettingsChecksum = in.readLong(); + indicesChecksum = in.readLong(); + clusterStateChecksum = in.readLong(); + } + + public static ClusterStateChecksum fromXContent(XContentParser parser) throws IOException { + ClusterStateChecksum.Builder builder = new ClusterStateChecksum.Builder(); + if (parser.currentToken() == null) { // fresh parser? move to next token + parser.nextToken(); + } + if (parser.currentToken() == XContentParser.Token.START_OBJECT) { + parser.nextToken(); + } + ensureExpectedToken(XContentParser.Token.FIELD_NAME, parser.currentToken(), parser); + XContentParser.Token token; + String currentFieldName = parser.currentName(); + while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { + if (parser.currentToken() == XContentParser.Token.FIELD_NAME) { + currentFieldName = parser.currentName(); + } else if (token.isValue()) { + switch (currentFieldName) { + case ROUTING_TABLE_CS: + builder.routingTableChecksum(parser.longValue()); + break; + case NODES_CS: + builder.nodesChecksum(parser.longValue()); + break; + case BLOCKS_CS: + builder.blocksChecksum(parser.longValue()); + break; + case CUSTOMS_CS: + builder.clusterStateCustomsChecksum(parser.longValue()); + break; + case COORDINATION_MD_CS: + builder.coordinationMetadataChecksum(parser.longValue()); + break; + case SETTINGS_MD_CS: + builder.settingMetadataChecksum(parser.longValue()); + break; + case TRANSIENT_SETTINGS_MD_CS: + builder.transientSettingsMetadataChecksum(parser.longValue()); + break; + case TEMPLATES_MD_CS: + builder.templatesMetadataChecksum(parser.longValue()); + break; + case CUSTOM_MD_CS: + builder.customMetadataMapChecksum(parser.longValue()); + break; + case HASHES_MD_CS: + builder.hashesOfConsistentSettingsChecksum(parser.longValue()); + break; + case INDICES_CS: + builder.indicesChecksum(parser.longValue()); + break; + case CLUSTER_STATE_CS: + builder.clusterStateChecksum(parser.longValue()); + break; + default: + throw new XContentParseException("Unexpected field [" + currentFieldName + "]"); + } + } else { + throw new XContentParseException("Unexpected token [" + token + "]"); + } + } + return builder.build(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeLong(routingTableChecksum); + out.writeLong(nodesChecksum); + out.writeLong(blocksChecksum); + out.writeLong(clusterStateCustomsChecksum); + out.writeLong(coordinationMetadataChecksum); + out.writeLong(settingMetadataChecksum); + out.writeLong(transientSettingsMetadataChecksum); + out.writeLong(templatesMetadataChecksum); + out.writeLong(customMetadataMapChecksum); + out.writeLong(hashesOfConsistentSettingsChecksum); + out.writeLong(indicesChecksum); + out.writeLong(clusterStateChecksum); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.field(ROUTING_TABLE_CS, routingTableChecksum); + builder.field(NODES_CS, nodesChecksum); + builder.field(BLOCKS_CS, blocksChecksum); + builder.field(CUSTOMS_CS, clusterStateCustomsChecksum); + builder.field(COORDINATION_MD_CS, coordinationMetadataChecksum); + builder.field(SETTINGS_MD_CS, settingMetadataChecksum); + builder.field(TRANSIENT_SETTINGS_MD_CS, transientSettingsMetadataChecksum); + builder.field(TEMPLATES_MD_CS, templatesMetadataChecksum); + builder.field(CUSTOM_MD_CS, customMetadataMapChecksum); + builder.field(HASHES_MD_CS, hashesOfConsistentSettingsChecksum); + builder.field(INDICES_CS, indicesChecksum); + builder.field(CLUSTER_STATE_CS, clusterStateChecksum); + return builder; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + ClusterStateChecksum that = (ClusterStateChecksum) o; + return routingTableChecksum == that.routingTableChecksum + && nodesChecksum == that.nodesChecksum + && blocksChecksum == that.blocksChecksum + && clusterStateCustomsChecksum == that.clusterStateCustomsChecksum + && coordinationMetadataChecksum == that.coordinationMetadataChecksum + && settingMetadataChecksum == that.settingMetadataChecksum + && transientSettingsMetadataChecksum == that.transientSettingsMetadataChecksum + && templatesMetadataChecksum == that.templatesMetadataChecksum + && customMetadataMapChecksum == that.customMetadataMapChecksum + && hashesOfConsistentSettingsChecksum == that.hashesOfConsistentSettingsChecksum + && indicesChecksum == that.indicesChecksum + && clusterStateChecksum == that.clusterStateChecksum; + } + + @Override + public int hashCode() { + return Objects.hash( + routingTableChecksum, + nodesChecksum, + blocksChecksum, + clusterStateCustomsChecksum, + coordinationMetadataChecksum, + settingMetadataChecksum, + transientSettingsMetadataChecksum, + templatesMetadataChecksum, + customMetadataMapChecksum, + hashesOfConsistentSettingsChecksum, + indicesChecksum, + clusterStateChecksum + ); + } + + @Override + public String toString() { + return "ClusterStateChecksum{" + + "routingTableChecksum=" + + routingTableChecksum + + ", nodesChecksum=" + + nodesChecksum + + ", blocksChecksum=" + + blocksChecksum + + ", clusterStateCustomsChecksum=" + + clusterStateCustomsChecksum + + ", coordinationMetadataChecksum=" + + coordinationMetadataChecksum + + ", settingMetadataChecksum=" + + settingMetadataChecksum + + ", transientSettingsMetadataChecksum=" + + transientSettingsMetadataChecksum + + ", templatesMetadataChecksum=" + + templatesMetadataChecksum + + ", customMetadataMapChecksum=" + + customMetadataMapChecksum + + ", hashesOfConsistentSettingsChecksum=" + + hashesOfConsistentSettingsChecksum + + ", indicesChecksum=" + + indicesChecksum + + ", clusterStateChecksum=" + + clusterStateChecksum + + '}'; + } + + public List getMismatchEntities(ClusterStateChecksum otherClusterStateChecksum) { + if (this.clusterStateChecksum == otherClusterStateChecksum.clusterStateChecksum) { + logger.debug("No mismatch in checksums."); + return List.of(); + } + List mismatches = new ArrayList<>(); + addIfMismatch(this.routingTableChecksum, otherClusterStateChecksum.routingTableChecksum, ROUTING_TABLE_CS, mismatches); + addIfMismatch(this.nodesChecksum, otherClusterStateChecksum.nodesChecksum, NODES_CS, mismatches); + addIfMismatch(this.blocksChecksum, otherClusterStateChecksum.blocksChecksum, BLOCKS_CS, mismatches); + addIfMismatch(this.clusterStateCustomsChecksum, otherClusterStateChecksum.clusterStateCustomsChecksum, CUSTOMS_CS, mismatches); + addIfMismatch( + this.coordinationMetadataChecksum, + otherClusterStateChecksum.coordinationMetadataChecksum, + COORDINATION_MD_CS, + mismatches + ); + addIfMismatch(this.settingMetadataChecksum, otherClusterStateChecksum.settingMetadataChecksum, SETTINGS_MD_CS, mismatches); + addIfMismatch( + this.transientSettingsMetadataChecksum, + otherClusterStateChecksum.transientSettingsMetadataChecksum, + TRANSIENT_SETTINGS_MD_CS, + mismatches + ); + addIfMismatch(this.templatesMetadataChecksum, otherClusterStateChecksum.templatesMetadataChecksum, TEMPLATES_MD_CS, mismatches); + addIfMismatch(this.customMetadataMapChecksum, otherClusterStateChecksum.customMetadataMapChecksum, CUSTOM_MD_CS, mismatches); + addIfMismatch( + this.hashesOfConsistentSettingsChecksum, + otherClusterStateChecksum.hashesOfConsistentSettingsChecksum, + HASHES_MD_CS, + mismatches + ); + addIfMismatch(this.indicesChecksum, otherClusterStateChecksum.indicesChecksum, INDICES_CS, mismatches); + + return mismatches; + } + + private void addIfMismatch(long checksum, long otherChecksum, String entityName, List mismatches) { + if (checksum != otherChecksum) { + mismatches.add(entityName); + } + } + + /** + * Builder for ClusterStateChecksum + */ + public static class Builder { + long routingTableChecksum; + long nodesChecksum; + long blocksChecksum; + long clusterStateCustomsChecksum; + long coordinationMetadataChecksum; + long settingMetadataChecksum; + long transientSettingsMetadataChecksum; + long templatesMetadataChecksum; + long customMetadataMapChecksum; + long hashesOfConsistentSettingsChecksum; + long indicesChecksum; + long clusterStateChecksum; + + public Builder routingTableChecksum(long routingTableChecksum) { + this.routingTableChecksum = routingTableChecksum; + return this; + } + + public Builder nodesChecksum(long nodesChecksum) { + this.nodesChecksum = nodesChecksum; + return this; + } + + public Builder blocksChecksum(long blocksChecksum) { + this.blocksChecksum = blocksChecksum; + return this; + } + + public Builder clusterStateCustomsChecksum(long clusterStateCustomsChecksum) { + this.clusterStateCustomsChecksum = clusterStateCustomsChecksum; + return this; + } + + public Builder coordinationMetadataChecksum(long coordinationMetadataChecksum) { + this.coordinationMetadataChecksum = coordinationMetadataChecksum; + return this; + } + + public Builder settingMetadataChecksum(long settingMetadataChecksum) { + this.settingMetadataChecksum = settingMetadataChecksum; + return this; + } + + public Builder transientSettingsMetadataChecksum(long transientSettingsMetadataChecksum) { + this.transientSettingsMetadataChecksum = transientSettingsMetadataChecksum; + return this; + } + + public Builder templatesMetadataChecksum(long templatesMetadataChecksum) { + this.templatesMetadataChecksum = templatesMetadataChecksum; + return this; + } + + public Builder customMetadataMapChecksum(long customMetadataMapChecksum) { + this.customMetadataMapChecksum = customMetadataMapChecksum; + return this; + } + + public Builder hashesOfConsistentSettingsChecksum(long hashesOfConsistentSettingsChecksum) { + this.hashesOfConsistentSettingsChecksum = hashesOfConsistentSettingsChecksum; + return this; + } + + public Builder indicesChecksum(long indicesChecksum) { + this.indicesChecksum = indicesChecksum; + return this; + } + + public Builder clusterStateChecksum(long clusterStateChecksum) { + this.clusterStateChecksum = clusterStateChecksum; + return this; + } + + public ClusterStateChecksum build() { + return new ClusterStateChecksum( + routingTableChecksum, + nodesChecksum, + blocksChecksum, + clusterStateCustomsChecksum, + coordinationMetadataChecksum, + settingMetadataChecksum, + transientSettingsMetadataChecksum, + templatesMetadataChecksum, + customMetadataMapChecksum, + hashesOfConsistentSettingsChecksum, + indicesChecksum, + clusterStateChecksum + ); + } + } + +} diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java index 098a36c7d42c4..88dda3953a5f9 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java +++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java @@ -90,6 +90,7 @@ import static java.util.Collections.emptyList; import static java.util.Collections.emptyMap; +import static org.opensearch.cluster.ClusterState.CUSTOM_VALUE_SERIALIZER; import static org.opensearch.common.util.FeatureFlags.REMOTE_PUBLICATION_EXPERIMENTAL; import static org.opensearch.gateway.PersistedClusterStateService.SLOW_WRITE_LOGGING_THRESHOLD; import static org.opensearch.gateway.remote.ClusterMetadataManifest.CODEC_V2; @@ -140,6 +141,13 @@ public class RemoteClusterStateService implements Closeable { Setting.Property.NodeScope ); + public static final Setting REMOTE_CLUSTER_STATE_CHECKSUM_VALIDATION_ENABLED_SETTING = Setting.boolSetting( + "cluster.remote_store.state.checksum_validation.enabled", + false, + Property.Dynamic, + Property.NodeScope + ); + private TimeValue remoteStateReadTimeout; private final String nodeId; private final Supplier repositoriesService; @@ -151,6 +159,7 @@ public class RemoteClusterStateService implements Closeable { private BlobStoreTransferService blobStoreTransferService; private RemoteRoutingTableService remoteRoutingTableService; private volatile TimeValue slowWriteLoggingThreshold; + private boolean checksumValidationEnabled; private final RemotePersistenceStats remoteStateStats; private RemoteClusterStateCleanupManager remoteClusterStateCleanupManager; @@ -197,6 +206,12 @@ public RemoteClusterStateService( clusterSettings.addSettingsUpdateConsumer(SLOW_WRITE_LOGGING_THRESHOLD, this::setSlowWriteLoggingThreshold); this.remoteStateReadTimeout = clusterSettings.get(REMOTE_STATE_READ_TIMEOUT_SETTING); clusterSettings.addSettingsUpdateConsumer(REMOTE_STATE_READ_TIMEOUT_SETTING, this::setRemoteStateReadTimeout); + this.checksumValidationEnabled = clusterSettings.get(REMOTE_CLUSTER_STATE_CHECKSUM_VALIDATION_ENABLED_SETTING); + clusterSettings.addSettingsUpdateConsumer( + REMOTE_CLUSTER_STATE_CHECKSUM_VALIDATION_ENABLED_SETTING, + this::setChecksumValidationEnabled + ); + this.remoteStateStats = new RemotePersistenceStats(); this.namedWriteableRegistry = namedWriteableRegistry; this.indexMetadataUploadListeners = indexMetadataUploadListeners; @@ -257,6 +272,7 @@ public RemoteClusterStateManifestInfo writeFullMetadata(ClusterState clusterStat uploadedMetadataResults, previousClusterUUID, clusterStateDiffManifest, + checksumValidationEnabled ? new ClusterStateChecksum(clusterState) : null, false, codecVersion ); @@ -456,6 +472,7 @@ public RemoteClusterStateManifestInfo writeIncrementalMetadata( uploadedMetadataResults, previousManifest.getPreviousClusterUUID(), clusterStateDiffManifest, + checksumValidationEnabled ? new ClusterStateChecksum(clusterState) : null, false, previousManifest.getCodecVersion() ); @@ -900,6 +917,7 @@ public RemoteClusterStateManifestInfo markLastStateAsCommitted(ClusterState clus uploadedMetadataResults, previousManifest.getPreviousClusterUUID(), previousManifest.getDiffManifest(), + checksumValidationEnabled ? previousManifest.getClusterStateChecksum() : null, true, previousManifest.getCodecVersion() ); @@ -985,6 +1003,10 @@ private void setSlowWriteLoggingThreshold(TimeValue slowWriteLoggingThreshold) { this.slowWriteLoggingThreshold = slowWriteLoggingThreshold; } + private void setChecksumValidationEnabled(Boolean checksumValidationEnabled) { + this.checksumValidationEnabled = checksumValidationEnabled; + } + // Package private for unit test RemoteRoutingTableService getRemoteRoutingTableService() { return this.remoteRoutingTableService; @@ -1353,6 +1375,10 @@ public ClusterState getClusterStateForManifest( false, includeEphemeral ); + + if (includeEphemeral && checksumValidationEnabled && manifest.getClusterStateChecksum() != null) { + validateClusterStateFromChecksum(manifest, clusterState, clusterName, localNodeId, true); + } } else { ClusterState state = readClusterStateInParallel( ClusterState.builder(new ClusterName(clusterName)).build(), @@ -1472,6 +1498,9 @@ public ClusterState getClusterStateUsingDiff(ClusterMetadataManifest manifest, C .routingTable(new RoutingTable(manifest.getRoutingTableVersion(), indexRoutingTables)) .build(); + if (checksumValidationEnabled && manifest.getClusterStateChecksum() != null) { + validateClusterStateFromChecksum(manifest, clusterState, previousState.getClusterName().value(), localNodeId, false); + } final long durationMillis = TimeValue.nsecToMSec(relativeTimeNanosSupplier.getAsLong() - startTimeNanos); remoteStateStats.stateDiffDownloadSucceeded(); remoteStateStats.stateDiffDownloadTook(durationMillis); @@ -1479,6 +1508,167 @@ public ClusterState getClusterStateUsingDiff(ClusterMetadataManifest manifest, C return clusterState; } + void validateClusterStateFromChecksum( + ClusterMetadataManifest manifest, + ClusterState clusterState, + String clusterName, + String localNodeId, + boolean isFullStateDownload + ) { + ClusterStateChecksum newClusterStateChecksum = new ClusterStateChecksum(clusterState); + List failedValidation = newClusterStateChecksum.getMismatchEntities(manifest.getClusterStateChecksum()); + if (!failedValidation.isEmpty()) { + logger.error( + () -> new ParameterizedMessage( + "Cluster state checksums do not match. Checksum from manifest {}, checksum from created cluster state {}. Entities failing validation {}", + manifest.getClusterStateChecksum(), + newClusterStateChecksum, + failedValidation + ) + ); + if (isFullStateDownload) { + throw new IllegalStateException( + "Cluster state checksums do not match during full state read. Validation failed for " + failedValidation + ); + } + // download full cluster state and match against state created for the failing entities + ClusterState fullClusterState = readClusterStateInParallel( + ClusterState.builder(new ClusterName(clusterName)).build(), + manifest, + manifest.getClusterUUID(), + localNodeId, + manifest.getIndices(), + manifest.getCustomMetadataMap(), + manifest.getCoordinationMetadata() != null, + manifest.getSettingsMetadata() != null, + manifest.getTransientSettingsMetadata() != null, + manifest.getTemplatesMetadata() != null, + manifest.getDiscoveryNodesMetadata() != null, + manifest.getClusterBlocksMetadata() != null, + manifest.getIndicesRouting(), + manifest.getHashesOfConsistentSettings() != null, + manifest.getClusterStateCustomMap(), + false, + true + ); + for (String failedEntity : failedValidation) { + switch (failedEntity) { + case ClusterStateChecksum.ROUTING_TABLE_CS: + Diff routingTableDiff = fullClusterState.routingTable().diff(clusterState.routingTable()); + logger.error(() -> new ParameterizedMessage("Failing Diff in routing table {}", routingTableDiff)); + break; + case ClusterStateChecksum.NODES_CS: + logger.error( + () -> new ParameterizedMessage( + "Failing Diff in discovery nodes {}", + fullClusterState.nodes().diff(clusterState.nodes()) + ) + ); + break; + case ClusterStateChecksum.BLOCKS_CS: + logger.error( + () -> new ParameterizedMessage( + "Failing Diff in cluster blocks {}", + fullClusterState.blocks().diff(clusterState.blocks()) + ) + ); + break; + case ClusterStateChecksum.CUSTOMS_CS: + logger.error( + () -> new ParameterizedMessage( + "Failing Diff in cluster state customs {}", + DiffableUtils.diff( + clusterState.customs(), + fullClusterState.customs(), + DiffableUtils.getStringKeySerializer(), + CUSTOM_VALUE_SERIALIZER + ) + ) + ); + break; + case ClusterStateChecksum.COORDINATION_MD_CS: + logger.error( + () -> new ParameterizedMessage( + "Failing Diff in coordination md. current md {}, full state md {}", + clusterState.metadata().coordinationMetadata(), + fullClusterState.metadata().coordinationMetadata() + ) + ); + break; + case ClusterStateChecksum.TRANSIENT_SETTINGS_MD_CS: + logger.error( + () -> new ParameterizedMessage( + "Failing Diff in transient settings md. current md {}, full state md {}", + clusterState.metadata().transientSettings(), + fullClusterState.metadata().transientSettings() + ) + ); + + break; + case ClusterStateChecksum.SETTINGS_MD_CS: + logger.error( + () -> new ParameterizedMessage( + "Failing Diff in settings md. current md {}, full state md {}", + clusterState.metadata().settings(), + fullClusterState.metadata().settings() + ) + ); + + break; + case ClusterStateChecksum.HASHES_MD_CS: + logger.error( + () -> new ParameterizedMessage( + "Failing Diff in hashes md {}", + ((DiffableStringMap) fullClusterState.metadata().hashesOfConsistentSettings()).diff( + (DiffableStringMap) clusterState.metadata().hashesOfConsistentSettings() + ) + ) + ); + break; + case ClusterStateChecksum.TEMPLATES_MD_CS: + logger.error( + () -> new ParameterizedMessage( + "Failing Diff in templates md{}", + fullClusterState.metadata().templatesMetadata().diff(clusterState.metadata().templatesMetadata()) + ) + ); + break; + case ClusterStateChecksum.CUSTOM_MD_CS: + logger.error( + () -> new ParameterizedMessage( + "Failing Diff in customs md {}", + DiffableUtils.diff( + clusterState.metadata().customs(), + fullClusterState.metadata().customs(), + DiffableUtils.getStringKeySerializer(), + Metadata.CUSTOM_VALUE_SERIALIZER + ) + ) + ); + break; + case ClusterStateChecksum.INDICES_CS: + logger.error( + () -> new ParameterizedMessage( + "Failing Diff in index md {}", + DiffableUtils.diff( + clusterState.metadata().indices(), + fullClusterState.metadata().indices(), + DiffableUtils.getStringKeySerializer() + ) + ) + ); + break; + default: + logger.error(() -> new ParameterizedMessage("Unknown failed entity {}", failedEntity)); + break; + } + } + throw new IllegalStateException( + "Cluster state checksums do not match during diff read. Validation failed for " + failedValidation + ); + } + } + /** * Fetch the previous cluster UUIDs from remote state store and return the most recent valid cluster UUID * diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteManifestManager.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteManifestManager.java index 67b1b77447bed..47c847b5dc32a 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/RemoteManifestManager.java +++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteManifestManager.java @@ -10,6 +10,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; import org.opensearch.Version; import org.opensearch.action.LatchedActionListener; import org.opensearch.cluster.ClusterState; @@ -98,6 +99,7 @@ RemoteClusterStateManifestInfo uploadManifest( RemoteClusterStateUtils.UploadedMetadataResults uploadedMetadataResult, String previousClusterUUID, ClusterStateDiffManifest clusterDiffManifest, + ClusterStateChecksum clusterStateChecksum, boolean committed, int codecVersion ) { @@ -126,8 +128,10 @@ RemoteClusterStateManifestInfo uploadManifest( .metadataVersion(clusterState.metadata().version()) .transientSettingsMetadata(uploadedMetadataResult.uploadedTransientSettingsMetadata) .clusterStateCustomMetadataMap(uploadedMetadataResult.uploadedClusterStateCustomMetadataMap) - .hashesOfConsistentSettings(uploadedMetadataResult.uploadedHashesOfConsistentSettings); + .hashesOfConsistentSettings(uploadedMetadataResult.uploadedHashesOfConsistentSettings) + .checksum(clusterStateChecksum); final ClusterMetadataManifest manifest = manifestBuilder.build(); + logger.trace(() -> new ParameterizedMessage("[{}] uploading manifest", manifest)); String manifestFileName = writeMetadataManifest(clusterState.metadata().clusterUUID(), manifest); return new RemoteClusterStateManifestInfo(manifest, manifestFileName); } diff --git a/server/src/main/java/org/opensearch/gateway/remote/model/RemoteClusterMetadataManifest.java b/server/src/main/java/org/opensearch/gateway/remote/model/RemoteClusterMetadataManifest.java index 9460e240e1706..999beaa4e865d 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/model/RemoteClusterMetadataManifest.java +++ b/server/src/main/java/org/opensearch/gateway/remote/model/RemoteClusterMetadataManifest.java @@ -35,6 +35,7 @@ public class RemoteClusterMetadataManifest extends AbstractClusterMetadataWritea public static final int SPLITTED_MANIFEST_FILE_LENGTH = 6; public static final String METADATA_MANIFEST_NAME_FORMAT = "%s"; + public static final String COMMITTED = "C"; public static final String PUBLISHED = "P"; diff --git a/server/src/main/java/org/opensearch/index/translog/BufferedChecksumStreamOutput.java b/server/src/main/java/org/opensearch/index/translog/BufferedChecksumStreamOutput.java index 9e96664c79cc5..9d52cf18d47a3 100644 --- a/server/src/main/java/org/opensearch/index/translog/BufferedChecksumStreamOutput.java +++ b/server/src/main/java/org/opensearch/index/translog/BufferedChecksumStreamOutput.java @@ -33,10 +33,20 @@ package org.opensearch.index.translog; import org.apache.lucene.store.BufferedChecksum; +import org.opensearch.common.Nullable; import org.opensearch.common.annotation.PublicApi; import org.opensearch.core.common.io.stream.StreamOutput; +import org.opensearch.core.common.io.stream.Writeable; import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; +import java.util.stream.Collectors; import java.util.zip.CRC32; import java.util.zip.Checksum; @@ -91,4 +101,75 @@ public void reset() throws IOException { public void resetDigest() { digest.reset(); } + + @Override + public void writeMap(@Nullable Map map) throws IOException { + Map newMap = new TreeMap<>(map); + writeGenericValue(newMap); + } + + @Override + public void writeMap(Map map, final Writeable.Writer keyWriter, final Writeable.Writer valueWriter) + throws IOException { + writeVInt(map.size()); + map.keySet().stream().sorted().forEachOrdered(key -> { + try { + keyWriter.write(this, key); + valueWriter.write(this, map.get(key)); + } catch (IOException e) { + throw new RuntimeException("Failed to write map values.", e); + } + }); + } + + public void writeMapValues(Map map, final Writeable.Writer valueWriter) throws IOException { + writeVInt(map.size()); + map.keySet().stream().sorted().forEachOrdered(key -> { + try { + valueWriter.write(this, map.get(key)); + } catch (IOException e) { + throw new RuntimeException("Failed to write map values.", e); + } + }); + } + + @Override + public void writeStringArray(String[] array) throws IOException { + String[] copyArray = Arrays.copyOf(array, array.length); + Arrays.sort(copyArray); + super.writeStringArray(copyArray); + } + + @Override + public void writeVLongArray(long[] values) throws IOException { + long[] copyValues = Arrays.copyOf(values, values.length); + Arrays.sort(copyValues); + super.writeVLongArray(copyValues); + } + + @Override + public void writeCollection(final Collection collection) throws IOException { + List sortedList = collection.stream().sorted().collect(Collectors.toList()); + super.writeCollection(sortedList, (o, v) -> v.writeTo(o)); + } + + @Override + public void writeStringCollection(final Collection collection) throws IOException { + List listCollection = new ArrayList<>(collection); + Collections.sort(listCollection); + writeCollection(listCollection, StreamOutput::writeString); + } + + @Override + public void writeOptionalStringCollection(final Collection collection) throws IOException { + if (collection != null) { + List listCollection = new ArrayList<>(collection); + Collections.sort(listCollection); + writeBoolean(true); + writeCollection(listCollection, StreamOutput::writeString); + } else { + writeBoolean(false); + } + } + } diff --git a/server/src/test/java/org/opensearch/cluster/block/ClusterBlocksTests.java b/server/src/test/java/org/opensearch/cluster/block/ClusterBlocksTests.java new file mode 100644 index 0000000000000..e17cc7c24ffbd --- /dev/null +++ b/server/src/test/java/org/opensearch/cluster/block/ClusterBlocksTests.java @@ -0,0 +1,55 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.cluster.block; + +import org.opensearch.common.io.stream.BytesStreamOutput; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.index.translog.BufferedChecksumStreamOutput; +import org.opensearch.test.OpenSearchTestCase; + +import static org.opensearch.cluster.block.ClusterBlockTests.randomClusterBlock; + +public class ClusterBlocksTests extends OpenSearchTestCase { + + public void testWriteVerifiableTo() throws Exception { + ClusterBlock clusterBlock1 = randomClusterBlock(); + ClusterBlock clusterBlock2 = randomClusterBlock(); + ClusterBlock clusterBlock3 = randomClusterBlock(); + + ClusterBlocks clusterBlocks = ClusterBlocks.builder() + .addGlobalBlock(clusterBlock1) + .addGlobalBlock(clusterBlock2) + .addGlobalBlock(clusterBlock3) + .addIndexBlock("index-1", clusterBlock1) + .addIndexBlock("index-2", clusterBlock2) + .build(); + BytesStreamOutput out = new BytesStreamOutput(); + BufferedChecksumStreamOutput checksumOut = new BufferedChecksumStreamOutput(out); + clusterBlocks.writeVerifiableTo(checksumOut); + StreamInput in = out.bytes().streamInput(); + ClusterBlocks result = ClusterBlocks.readFrom(in); + + assertEquals(clusterBlocks.global().size(), result.global().size()); + assertEquals(clusterBlocks.global(), result.global()); + assertEquals(clusterBlocks.indices().size(), result.indices().size()); + assertEquals(clusterBlocks.indices(), result.indices()); + + ClusterBlocks clusterBlocks2 = ClusterBlocks.builder() + .addGlobalBlock(clusterBlock3) + .addGlobalBlock(clusterBlock1) + .addGlobalBlock(clusterBlock2) + .addIndexBlock("index-2", clusterBlock2) + .addIndexBlock("index-1", clusterBlock1) + .build(); + BytesStreamOutput out2 = new BytesStreamOutput(); + BufferedChecksumStreamOutput checksumOut2 = new BufferedChecksumStreamOutput(out2); + clusterBlocks2.writeVerifiableTo(checksumOut2); + assertEquals(checksumOut.getChecksum(), checksumOut2.getChecksum()); + } +} diff --git a/server/src/test/java/org/opensearch/cluster/coordination/CoordinationMetadataTests.java b/server/src/test/java/org/opensearch/cluster/coordination/CoordinationMetadataTests.java index 290479941aaa9..50ced86c41566 100644 --- a/server/src/test/java/org/opensearch/cluster/coordination/CoordinationMetadataTests.java +++ b/server/src/test/java/org/opensearch/cluster/coordination/CoordinationMetadataTests.java @@ -33,21 +33,27 @@ import org.opensearch.cluster.coordination.CoordinationMetadata.VotingConfigExclusion; import org.opensearch.cluster.coordination.CoordinationMetadata.VotingConfiguration; +import org.opensearch.common.io.stream.BytesStreamOutput; import org.opensearch.common.util.set.Sets; import org.opensearch.common.xcontent.json.JsonXContent; import org.opensearch.core.common.bytes.BytesReference; import org.opensearch.core.common.io.stream.NamedWriteableRegistry; +import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.xcontent.ToXContent; import org.opensearch.core.xcontent.XContentBuilder; import org.opensearch.core.xcontent.XContentParser; +import org.opensearch.index.translog.BufferedChecksumStreamOutput; import org.opensearch.test.EqualsHashCodeTestUtils; import org.opensearch.test.EqualsHashCodeTestUtils.CopyFunction; import org.opensearch.test.OpenSearchTestCase; import java.io.IOException; import java.util.Collections; +import java.util.Comparator; import java.util.HashSet; +import java.util.LinkedHashSet; import java.util.Set; +import java.util.stream.Collectors; import static org.hamcrest.Matchers.equalTo; @@ -231,4 +237,29 @@ public void testXContent() throws IOException { assertThat(originalMeta, equalTo(fromXContentMeta)); } } + + public void testWriteVerifiableTo() throws IOException { + VotingConfiguration votingConfiguration = randomVotingConfig(); + Set votingTombstones = randomVotingTombstones(); + CoordinationMetadata meta1 = new CoordinationMetadata(1, votingConfiguration, votingConfiguration, votingTombstones); + BytesStreamOutput out = new BytesStreamOutput(); + BufferedChecksumStreamOutput checksumOut = new BufferedChecksumStreamOutput(out); + meta1.writeVerifiableTo(checksumOut); + StreamInput in = out.bytes().streamInput(); + CoordinationMetadata result = new CoordinationMetadata(in); + + assertEquals(meta1, result); + + VotingConfiguration votingConfiguration2 = new VotingConfiguration( + (Set) votingConfiguration.getNodeIds().stream().sorted().collect(Collectors.toCollection(LinkedHashSet::new)) + ); + Set votingTombstones2 = votingTombstones.stream() + .sorted(Comparator.comparing(VotingConfigExclusion::getNodeId)) + .collect(Collectors.toCollection(LinkedHashSet::new)); + CoordinationMetadata meta2 = new CoordinationMetadata(1, votingConfiguration2, votingConfiguration2, votingTombstones2); + BytesStreamOutput out2 = new BytesStreamOutput(); + BufferedChecksumStreamOutput checksumOut2 = new BufferedChecksumStreamOutput(out2); + meta2.writeVerifiableTo(checksumOut2); + assertEquals(checksumOut.getChecksum(), checksumOut2.getChecksum()); + } } diff --git a/server/src/test/java/org/opensearch/cluster/metadata/IndexMetadataTests.java b/server/src/test/java/org/opensearch/cluster/metadata/IndexMetadataTests.java index 3b0a2fed52c2d..698ace4105d75 100644 --- a/server/src/test/java/org/opensearch/cluster/metadata/IndexMetadataTests.java +++ b/server/src/test/java/org/opensearch/cluster/metadata/IndexMetadataTests.java @@ -52,6 +52,7 @@ import org.opensearch.core.xcontent.NamedXContentRegistry; import org.opensearch.core.xcontent.XContentBuilder; import org.opensearch.core.xcontent.XContentParser; +import org.opensearch.index.translog.BufferedChecksumStreamOutput; import org.opensearch.indices.IndicesModule; import org.opensearch.test.OpenSearchTestCase; import org.junit.Before; @@ -173,6 +174,86 @@ public void testIndexMetadataSerialization() throws IOException { } } + public void testWriteVerifiableTo() throws IOException { + int numberOfReplicas = randomIntBetween(0, 10); + final boolean system = randomBoolean(); + Map customMap = new HashMap<>(); + customMap.put(randomAlphaOfLength(5), randomAlphaOfLength(10)); + customMap.put(randomAlphaOfLength(10), randomAlphaOfLength(15)); + + RolloverInfo info1 = new RolloverInfo( + randomAlphaOfLength(5), + Arrays.asList( + new MaxAgeCondition(TimeValue.timeValueMillis(randomNonNegativeLong())), + new MaxSizeCondition(new ByteSizeValue(randomNonNegativeLong())), + new MaxDocsCondition(randomNonNegativeLong()) + ), + randomNonNegativeLong() + ); + RolloverInfo info2 = new RolloverInfo( + randomAlphaOfLength(5), + Arrays.asList( + new MaxAgeCondition(TimeValue.timeValueMillis(randomNonNegativeLong())), + new MaxSizeCondition(new ByteSizeValue(randomNonNegativeLong())), + new MaxDocsCondition(randomNonNegativeLong()) + ), + randomNonNegativeLong() + ); + + IndexMetadata metadata1 = IndexMetadata.builder("foo") + .settings( + Settings.builder() + .put("index.version.created", 1) + .put("index.number_of_shards", 4) + .put("index.number_of_replicas", numberOfReplicas) + .build() + ) + .creationDate(randomLong()) + .primaryTerm(0, 2) + .primaryTerm(1, 3) + .setRoutingNumShards(32) + .system(system) + .putCustom("my_custom", customMap) + .putCustom("my_custom2", customMap) + .putAlias(AliasMetadata.builder("alias-1").routing("routing-1").build()) + .putAlias(AliasMetadata.builder("alias-2").routing("routing-2").build()) + .putRolloverInfo(info1) + .putRolloverInfo(info2) + .putInSyncAllocationIds(0, Set.of("1", "2", "3")) + .build(); + + BytesStreamOutput out = new BytesStreamOutput(); + BufferedChecksumStreamOutput checksumOut = new BufferedChecksumStreamOutput(out); + metadata1.writeVerifiableTo(checksumOut); + + IndexMetadata metadata2 = IndexMetadata.builder(metadata1.getIndex().getName()) + .settings( + Settings.builder() + .put("index.number_of_replicas", numberOfReplicas) + .put("index.number_of_shards", 4) + .put("index.version.created", 1) + .build() + ) + .creationDate(metadata1.getCreationDate()) + .primaryTerm(1, 3) + .primaryTerm(0, 2) + .setRoutingNumShards(32) + .system(system) + .putCustom("my_custom2", customMap) + .putCustom("my_custom", customMap) + .putAlias(AliasMetadata.builder("alias-2").routing("routing-2").build()) + .putAlias(AliasMetadata.builder("alias-1").routing("routing-1").build()) + .putRolloverInfo(info2) + .putRolloverInfo(info1) + .putInSyncAllocationIds(0, Set.of("3", "1", "2")) + .build(); + + BytesStreamOutput out2 = new BytesStreamOutput(); + BufferedChecksumStreamOutput checksumOut2 = new BufferedChecksumStreamOutput(out2); + metadata2.writeVerifiableTo(checksumOut2); + assertEquals(checksumOut.getChecksum(), checksumOut2.getChecksum()); + } + public void testGetRoutingFactor() { Integer numShard = randomFrom(1, 2, 4, 8, 16); int routingFactor = IndexMetadata.getRoutingFactor(32, numShard); diff --git a/server/src/test/java/org/opensearch/cluster/metadata/IndexTemplateMetadataTests.java b/server/src/test/java/org/opensearch/cluster/metadata/IndexTemplateMetadataTests.java index 0ea2834cc3024..993cefb050ff7 100644 --- a/server/src/test/java/org/opensearch/cluster/metadata/IndexTemplateMetadataTests.java +++ b/server/src/test/java/org/opensearch/cluster/metadata/IndexTemplateMetadataTests.java @@ -31,17 +31,20 @@ package org.opensearch.cluster.metadata; +import org.opensearch.common.io.stream.BytesStreamOutput; import org.opensearch.common.settings.Settings; import org.opensearch.common.xcontent.XContentHelper; import org.opensearch.common.xcontent.json.JsonXContent; import org.opensearch.core.common.bytes.BytesArray; import org.opensearch.core.common.bytes.BytesReference; +import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.xcontent.DeprecationHandler; import org.opensearch.core.xcontent.MediaTypeRegistry; import org.opensearch.core.xcontent.NamedXContentRegistry; import org.opensearch.core.xcontent.ToXContent; import org.opensearch.core.xcontent.XContentBuilder; import org.opensearch.core.xcontent.XContentParser; +import org.opensearch.index.translog.BufferedChecksumStreamOutput; import org.opensearch.test.OpenSearchTestCase; import java.io.IOException; @@ -232,6 +235,50 @@ public void testFromToXContent() throws Exception { } } + public void testWriteVerifiableTo() throws Exception { + String templateName = randomUnicodeOfCodepointLengthBetween(1, 10); + IndexTemplateMetadata.Builder templateBuilder = IndexTemplateMetadata.builder(templateName); + templateBuilder.patterns(Arrays.asList("pattern-1", "pattern-2")); + int numAlias = between(2, 5); + for (int i = 0; i < numAlias; i++) { + AliasMetadata.Builder alias = AliasMetadata.builder(randomRealisticUnicodeOfLengthBetween(1, 100)); + alias.indexRouting(randomRealisticUnicodeOfLengthBetween(1, 100)); + alias.searchRouting(randomRealisticUnicodeOfLengthBetween(1, 100)); + templateBuilder.putAlias(alias); + } + templateBuilder.settings(Settings.builder().put("index.setting-1", randomLong())); + templateBuilder.settings(Settings.builder().put("index.setting-2", randomTimeValue())); + templateBuilder.order(randomInt()); + templateBuilder.version(between(0, 100)); + templateBuilder.putMapping("doc", "{\"doc\":{\"properties\":{\"type\":\"text\"}}}"); + + IndexTemplateMetadata template = templateBuilder.build(); + BytesStreamOutput out = new BytesStreamOutput(); + BufferedChecksumStreamOutput checksumOut = new BufferedChecksumStreamOutput(out); + template.writeVerifiableTo(checksumOut); + StreamInput in = out.bytes().streamInput(); + IndexTemplateMetadata result = IndexTemplateMetadata.readFrom(in); + assertEquals(result, template); + + IndexTemplateMetadata.Builder templateBuilder2 = IndexTemplateMetadata.builder(templateName); + templateBuilder2.patterns(Arrays.asList("pattern-2", "pattern-1")); + template.getAliases() + .entrySet() + .stream() + .sorted(Map.Entry.comparingByKey()) + .forEachOrdered(entry -> templateBuilder2.putAlias(entry.getValue())); + templateBuilder2.settings(template.settings()); + templateBuilder2.order(template.order()); + templateBuilder2.version(template.version()); + templateBuilder2.putMapping("doc", template.mappings()); + + IndexTemplateMetadata template2 = templateBuilder.build(); + BytesStreamOutput out2 = new BytesStreamOutput(); + BufferedChecksumStreamOutput checksumOut2 = new BufferedChecksumStreamOutput(out2); + template2.writeVerifiableTo(checksumOut2); + assertEquals(checksumOut.getChecksum(), checksumOut2.getChecksum()); + } + public void testDeprecationWarningsOnMultipleMappings() throws IOException { IndexTemplateMetadata.Builder builder = IndexTemplateMetadata.builder("my-template"); builder.patterns(Arrays.asList("a", "b")); diff --git a/server/src/test/java/org/opensearch/cluster/node/DiscoveryNodeTests.java b/server/src/test/java/org/opensearch/cluster/node/DiscoveryNodeTests.java index 8f149e32ec6f5..9f406101203a0 100644 --- a/server/src/test/java/org/opensearch/cluster/node/DiscoveryNodeTests.java +++ b/server/src/test/java/org/opensearch/cluster/node/DiscoveryNodeTests.java @@ -39,6 +39,7 @@ import org.opensearch.common.settings.Settings; import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.common.transport.TransportAddress; +import org.opensearch.index.translog.BufferedChecksumStreamOutput; import org.opensearch.node.remotestore.RemoteStoreNodeAttribute; import org.opensearch.test.NodeRoles; import org.opensearch.test.OpenSearchTestCase; @@ -47,6 +48,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.HashSet; +import java.util.LinkedHashSet; import java.util.Locale; import java.util.Map; import java.util.Set; @@ -129,6 +131,43 @@ public void testDiscoveryNodeSerializationKeepsHost() throws Exception { assertEquals(transportAddress.getPort(), serialized.getAddress().getPort()); } + public void testWriteVerifiableTo() throws Exception { + InetAddress inetAddress = InetAddress.getByAddress("name1", new byte[] { (byte) 192, (byte) 168, (byte) 0, (byte) 1 }); + TransportAddress transportAddress = new TransportAddress(inetAddress, randomIntBetween(0, 65535)); + final Set roles = new HashSet<>(randomSubsetOf(DiscoveryNodeRole.BUILT_IN_ROLES)); + Map attributes = new HashMap<>(); + attributes.put("att-1", "test-repo"); + attributes.put("att-2", "test-repo"); + DiscoveryNode node = new DiscoveryNode("name1", "id1", transportAddress, attributes, roles, Version.CURRENT); + + BytesStreamOutput out = new BytesStreamOutput(); + BufferedChecksumStreamOutput checksumOut = new BufferedChecksumStreamOutput(out); + node.writeVerifiableTo(checksumOut); + StreamInput in = out.bytes().streamInput(); + DiscoveryNode result = new DiscoveryNode(in); + assertEquals(result, node); + + Map attributes2 = new HashMap<>(); + attributes2.put("att-2", "test-repo"); + attributes2.put("att-1", "test-repo"); + + DiscoveryNode node2 = new DiscoveryNode( + node.getName(), + node.getId(), + node.getEphemeralId(), + node.getHostName(), + node.getHostAddress(), + transportAddress, + attributes2, + roles.stream().sorted().collect(Collectors.toCollection(LinkedHashSet::new)), + Version.CURRENT + ); + BytesStreamOutput out2 = new BytesStreamOutput(); + BufferedChecksumStreamOutput checksumOut2 = new BufferedChecksumStreamOutput(out2); + node2.writeVerifiableTo(checksumOut2); + assertEquals(checksumOut.getChecksum(), checksumOut2.getChecksum()); + } + public void testDiscoveryNodeRoleWithOldVersion() throws Exception { InetAddress inetAddress = InetAddress.getByAddress("name1", new byte[] { (byte) 192, (byte) 168, (byte) 0, (byte) 1 }); TransportAddress transportAddress = new TransportAddress(inetAddress, randomIntBetween(0, 65535)); diff --git a/server/src/test/java/org/opensearch/cluster/node/DiscoveryNodesTests.java b/server/src/test/java/org/opensearch/cluster/node/DiscoveryNodesTests.java index ede64da1b3c1f..19c5464b6d3b7 100644 --- a/server/src/test/java/org/opensearch/cluster/node/DiscoveryNodesTests.java +++ b/server/src/test/java/org/opensearch/cluster/node/DiscoveryNodesTests.java @@ -36,10 +36,14 @@ import org.opensearch.LegacyESVersion; import org.opensearch.Version; +import org.opensearch.common.io.stream.BytesStreamOutput; import org.opensearch.common.settings.Setting; +import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.common.transport.TransportAddress; +import org.opensearch.index.translog.BufferedChecksumStreamOutput; import org.opensearch.test.OpenSearchTestCase; +import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -328,6 +332,28 @@ public void testDeprecatedMasterNodeFilter() { assertThat(discoveryNodes.resolveNodes("master:false", "_all"), arrayContainingInAnyOrder(allNodes)); } + public void testWriteVerifiableTo() throws IOException { + final DiscoveryNodes discoveryNodes = buildDiscoveryNodes(); + BytesStreamOutput out = new BytesStreamOutput(); + BufferedChecksumStreamOutput checksumOut = new BufferedChecksumStreamOutput(out); + discoveryNodes.writeVerifiableTo(checksumOut); + StreamInput in = out.bytes().streamInput(); + DiscoveryNodes result = DiscoveryNodes.readFrom(in, discoveryNodes.getLocalNode()); + assertEquals(result, discoveryNodes); + + final DiscoveryNodes.Builder discoveryNodesBuilder = DiscoveryNodes.builder() + .clusterManagerNodeId(discoveryNodes.getClusterManagerNodeId()); + discoveryNodes.getNodes() + .entrySet() + .stream() + .sorted(Map.Entry.comparingByKey()) + .forEachOrdered(entry -> discoveryNodesBuilder.add(entry.getValue())); + BytesStreamOutput out2 = new BytesStreamOutput(); + BufferedChecksumStreamOutput checksumOut2 = new BufferedChecksumStreamOutput(out2); + discoveryNodesBuilder.build().writeVerifiableTo(checksumOut2); + assertEquals(checksumOut.getChecksum(), checksumOut2.getChecksum()); + } + private static AtomicInteger idGenerator = new AtomicInteger(); private static List randomNodes(final int numNodes) { diff --git a/server/src/test/java/org/opensearch/cluster/routing/IndexShardRoutingTableTests.java b/server/src/test/java/org/opensearch/cluster/routing/IndexShardRoutingTableTests.java index 6bfe60980adf3..340e0d9bdf520 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/IndexShardRoutingTableTests.java +++ b/server/src/test/java/org/opensearch/cluster/routing/IndexShardRoutingTableTests.java @@ -32,10 +32,13 @@ package org.opensearch.cluster.routing; +import org.opensearch.common.io.stream.BytesStreamOutput; import org.opensearch.core.index.Index; import org.opensearch.core.index.shard.ShardId; +import org.opensearch.index.translog.BufferedChecksumStreamOutput; import org.opensearch.test.OpenSearchTestCase; +import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.List; @@ -77,6 +80,25 @@ public void testEquals() { assertEquals(table4, table5); } + public void testWriteVerifiableTo() throws IOException { + Index index = new Index("a", "b"); + ShardId shardId = new ShardId(index, 1); + ShardRouting shard1 = TestShardRouting.newShardRouting(shardId, "node-1", true, ShardRoutingState.STARTED); + ShardRouting shard2 = TestShardRouting.newShardRouting(shardId, "node-2", false, ShardRoutingState.STARTED); + ShardRouting shard3 = TestShardRouting.newShardRouting(shardId, null, false, ShardRoutingState.UNASSIGNED); + + IndexShardRoutingTable table1 = new IndexShardRoutingTable(shardId, Arrays.asList(shard1, shard2, shard3)); + BytesStreamOutput out = new BytesStreamOutput(); + BufferedChecksumStreamOutput checksumOut = new BufferedChecksumStreamOutput(out); + IndexShardRoutingTable.Builder.writeVerifiableTo(table1, checksumOut); + + IndexShardRoutingTable table2 = new IndexShardRoutingTable(shardId, Arrays.asList(shard3, shard1, shard2)); + BytesStreamOutput out2 = new BytesStreamOutput(); + BufferedChecksumStreamOutput checksumOut2 = new BufferedChecksumStreamOutput(out2); + IndexShardRoutingTable.Builder.writeVerifiableTo(table2, checksumOut2); + assertEquals(checksumOut.getChecksum(), checksumOut2.getChecksum()); + } + public void testShardsMatchingPredicate() { ShardId shardId = new ShardId(new Index("a", UUID.randomUUID().toString()), 0); ShardRouting primary = TestShardRouting.newShardRouting(shardId, "node-1", true, ShardRoutingState.STARTED); diff --git a/server/src/test/java/org/opensearch/cluster/routing/RoutingTableTests.java b/server/src/test/java/org/opensearch/cluster/routing/RoutingTableTests.java index 97283f561d6d4..d8a67c09e442c 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/RoutingTableTests.java +++ b/server/src/test/java/org/opensearch/cluster/routing/RoutingTableTests.java @@ -714,4 +714,5 @@ public static IndexMetadata updateActiveAllocations(IndexRoutingTable indexRouti } return imdBuilder.build(); } + } diff --git a/server/src/test/java/org/opensearch/gateway/remote/ClusterMetadataManifestTests.java b/server/src/test/java/org/opensearch/gateway/remote/ClusterMetadataManifestTests.java index 6a191d002b971..3f9aa1245cab3 100644 --- a/server/src/test/java/org/opensearch/gateway/remote/ClusterMetadataManifestTests.java +++ b/server/src/test/java/org/opensearch/gateway/remote/ClusterMetadataManifestTests.java @@ -9,15 +9,24 @@ package org.opensearch.gateway.remote; import org.opensearch.Version; +import org.opensearch.cluster.ClusterName; import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.coordination.CoordinationMetadata; import org.opensearch.cluster.metadata.IndexGraveyard; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.cluster.metadata.IndexTemplateMetadata; +import org.opensearch.cluster.metadata.Metadata; import org.opensearch.cluster.metadata.RepositoriesMetadata; +import org.opensearch.cluster.metadata.TemplatesMetadata; import org.opensearch.cluster.metadata.WeightedRoutingMetadata; import org.opensearch.cluster.routing.IndexRoutingTable; +import org.opensearch.cluster.routing.RoutingTable; import org.opensearch.cluster.routing.StringKeyDiffProvider; +import org.opensearch.common.settings.Settings; import org.opensearch.common.xcontent.json.JsonXContent; import org.opensearch.core.common.bytes.BytesReference; import org.opensearch.core.common.io.stream.NamedWriteableRegistry; +import org.opensearch.core.index.Index; import org.opensearch.core.xcontent.ToXContent; import org.opensearch.core.xcontent.XContentBuilder; import org.opensearch.core.xcontent.XContentParser; @@ -31,6 +40,7 @@ import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.function.Function; import java.util.stream.Collectors; @@ -164,7 +174,7 @@ public void testClusterMetadataManifestSerializationEqualsHashCode() { .opensearchVersion(Version.CURRENT) .nodeId("B10RX1f5RJenMQvYccCgSQ") .committed(true) - .codecVersion(CODEC_V3) + .codecVersion(ClusterMetadataManifest.CODEC_V4) .indices(randomUploadedIndexMetadataList()) .previousClusterUUID("yfObdx8KSMKKrXf8UyHhM") .clusterUUIDCommitted(true) @@ -204,6 +214,7 @@ public void testClusterMetadataManifestSerializationEqualsHashCode() { "indicesRoutingDiffPath" ) ) + .checksum(new ClusterStateChecksum(createClusterState())) .build(); { // Mutate Cluster Term EqualsHashCodeTestUtils.checkEqualsAndHashCode( @@ -485,6 +496,22 @@ public void testClusterMetadataManifestSerializationEqualsHashCode() { } ); } + { + // Mutate checksum + EqualsHashCodeTestUtils.checkEqualsAndHashCode( + initialManifest, + orig -> OpenSearchTestCase.copyWriteable( + orig, + new NamedWriteableRegistry(Collections.emptyList()), + ClusterMetadataManifest::new + ), + manifest -> { + ClusterMetadataManifest.Builder builder = ClusterMetadataManifest.builder(manifest); + builder.checksum(null); + return builder.build(); + } + ); + } } public void testClusterMetadataManifestXContentV2() throws IOException { @@ -616,15 +643,11 @@ public void testClusterMetadataManifestXContentV3() throws IOException { } } - public void testClusterMetadataManifestXContentV2WithoutEphemeral() throws IOException { + public void testClusterMetadataManifestXContentV4() throws IOException { UploadedIndexMetadata uploadedIndexMetadata = new UploadedIndexMetadata("test-index", "test-uuid", "/test/upload/path"); UploadedMetadataAttribute uploadedMetadataAttribute = new UploadedMetadataAttribute("attribute_name", "testing_attribute"); - UploadedIndexMetadata uploadedIndexRoutingMetadata = new UploadedIndexMetadata( - "test-index", - "test-uuid", - "routing-path", - INDEX_ROUTING_METADATA_PREFIX - ); + final StringKeyDiffProvider routingTableIncrementalDiff = Mockito.mock(StringKeyDiffProvider.class); + ClusterStateChecksum checksum = new ClusterStateChecksum(createClusterState()); ClusterMetadataManifest originalManifest = ClusterMetadataManifest.builder() .clusterTerm(1L) .stateVersion(1L) @@ -633,7 +656,7 @@ public void testClusterMetadataManifestXContentV2WithoutEphemeral() throws IOExc .opensearchVersion(Version.CURRENT) .nodeId("test-node-id") .committed(false) - .codecVersion(CODEC_V2) + .codecVersion(ClusterMetadataManifest.CODEC_V4) .indices(Collections.singletonList(uploadedIndexMetadata)) .previousClusterUUID("prev-cluster-uuid") .clusterUUIDCommitted(true) @@ -658,7 +681,23 @@ public void testClusterMetadataManifestXContentV2WithoutEphemeral() throws IOExc ) ).stream().collect(Collectors.toMap(UploadedMetadataAttribute::getAttributeName, Function.identity())) ) - .indicesRouting(Collections.singletonList(uploadedIndexRoutingMetadata)) + .routingTableVersion(1L) + .indicesRouting(Collections.singletonList(uploadedIndexMetadata)) + .discoveryNodesMetadata(uploadedMetadataAttribute) + .clusterBlocksMetadata(uploadedMetadataAttribute) + .transientSettingsMetadata(uploadedMetadataAttribute) + .hashesOfConsistentSettings(uploadedMetadataAttribute) + .clusterStateCustomMetadataMap(Collections.emptyMap()) + .diffManifest( + new ClusterStateDiffManifest( + RemoteClusterStateServiceTests.generateClusterStateWithOneIndex().build(), + ClusterState.EMPTY_STATE, + CODEC_V4, + routingTableIncrementalDiff, + uploadedMetadataAttribute.getUploadedFilename() + ) + ) + .checksum(checksum) .build(); final XContentBuilder builder = JsonXContent.contentBuilder(); builder.startObject(); @@ -671,10 +710,15 @@ public void testClusterMetadataManifestXContentV2WithoutEphemeral() throws IOExc } } - public void testClusterMetadataManifestXContentV4() throws IOException { + public void testClusterMetadataManifestXContentV2WithoutEphemeral() throws IOException { UploadedIndexMetadata uploadedIndexMetadata = new UploadedIndexMetadata("test-index", "test-uuid", "/test/upload/path"); UploadedMetadataAttribute uploadedMetadataAttribute = new UploadedMetadataAttribute("attribute_name", "testing_attribute"); - final StringKeyDiffProvider routingTableIncrementalDiff = Mockito.mock(StringKeyDiffProvider.class); + UploadedIndexMetadata uploadedIndexRoutingMetadata = new UploadedIndexMetadata( + "test-index", + "test-uuid", + "routing-path", + INDEX_ROUTING_METADATA_PREFIX + ); ClusterMetadataManifest originalManifest = ClusterMetadataManifest.builder() .clusterTerm(1L) .stateVersion(1L) @@ -683,7 +727,7 @@ public void testClusterMetadataManifestXContentV4() throws IOException { .opensearchVersion(Version.CURRENT) .nodeId("test-node-id") .committed(false) - .codecVersion(ClusterMetadataManifest.CODEC_V4) + .codecVersion(CODEC_V2) .indices(Collections.singletonList(uploadedIndexMetadata)) .previousClusterUUID("prev-cluster-uuid") .clusterUUIDCommitted(true) @@ -708,22 +752,7 @@ public void testClusterMetadataManifestXContentV4() throws IOException { ) ).stream().collect(Collectors.toMap(UploadedMetadataAttribute::getAttributeName, Function.identity())) ) - .routingTableVersion(1L) - .indicesRouting(Collections.singletonList(uploadedIndexMetadata)) - .discoveryNodesMetadata(uploadedMetadataAttribute) - .clusterBlocksMetadata(uploadedMetadataAttribute) - .transientSettingsMetadata(uploadedMetadataAttribute) - .hashesOfConsistentSettings(uploadedMetadataAttribute) - .clusterStateCustomMetadataMap(Collections.emptyMap()) - .diffManifest( - new ClusterStateDiffManifest( - RemoteClusterStateServiceTests.generateClusterStateWithOneIndex().build(), - ClusterState.EMPTY_STATE, - CODEC_V4, - routingTableIncrementalDiff, - uploadedMetadataAttribute.getUploadedFilename() - ) - ) + .indicesRouting(Collections.singletonList(uploadedIndexRoutingMetadata)) .build(); final XContentBuilder builder = JsonXContent.contentBuilder(); builder.startObject(); @@ -815,4 +844,40 @@ private UploadedIndexMetadata randomlyChangingUploadedIndexMetadata(UploadedInde return uploadedIndexMetadata; } + static ClusterState createClusterState() { + final Index index = new Index("test-index", "index-uuid"); + final Settings idxSettings = Settings.builder() + .put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT) + .put(IndexMetadata.SETTING_INDEX_UUID, index.getUUID()) + .build(); + final IndexMetadata indexMetadata = new IndexMetadata.Builder(index.getName()).settings(idxSettings) + .numberOfShards(1) + .numberOfReplicas(0) + .build(); + final CoordinationMetadata coordinationMetadata = CoordinationMetadata.builder().term(1L).build(); + final Settings settings = Settings.builder().put("mock-settings", true).build(); + final TemplatesMetadata templatesMetadata = TemplatesMetadata.builder() + .put(IndexTemplateMetadata.builder("template1").settings(idxSettings).patterns(List.of("test*")).build()) + .build(); + final RemoteClusterStateTestUtils.CustomMetadata1 customMetadata = new RemoteClusterStateTestUtils.CustomMetadata1( + "custom-metadata-1" + ); + return ClusterState.builder(ClusterName.DEFAULT) + .version(1L) + .stateUUID("state-uuid") + .metadata( + Metadata.builder() + .version(randomNonNegativeLong()) + .put(indexMetadata, true) + .clusterUUID("cluster-uuid") + .coordinationMetadata(coordinationMetadata) + .persistentSettings(settings) + .templates(templatesMetadata) + .hashesOfConsistentSettings(Map.of("key1", "value1", "key2", "value2")) + .putCustom(customMetadata.getWriteableName(), customMetadata) + .build() + ) + .routingTable(RoutingTable.builder().addAsNew(indexMetadata).version(1L).build()) + .build(); + } } diff --git a/server/src/test/java/org/opensearch/gateway/remote/ClusterStateChecksumTests.java b/server/src/test/java/org/opensearch/gateway/remote/ClusterStateChecksumTests.java new file mode 100644 index 0000000000000..0203e56dd2d5c --- /dev/null +++ b/server/src/test/java/org/opensearch/gateway/remote/ClusterStateChecksumTests.java @@ -0,0 +1,222 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.gateway.remote; + +import org.opensearch.Version; +import org.opensearch.cluster.ClusterName; +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.block.ClusterBlock; +import org.opensearch.cluster.block.ClusterBlockLevel; +import org.opensearch.cluster.block.ClusterBlocks; +import org.opensearch.cluster.coordination.CoordinationMetadata; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.cluster.metadata.IndexTemplateMetadata; +import org.opensearch.cluster.metadata.Metadata; +import org.opensearch.cluster.metadata.TemplatesMetadata; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.cluster.node.DiscoveryNodes; +import org.opensearch.cluster.routing.RoutingTable; +import org.opensearch.common.io.stream.BytesStreamOutput; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.xcontent.json.JsonXContent; +import org.opensearch.core.common.bytes.BytesReference; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.common.transport.TransportAddress; +import org.opensearch.core.index.Index; +import org.opensearch.core.rest.RestStatus; +import org.opensearch.core.xcontent.ToXContent; +import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.core.xcontent.XContentParser; +import org.opensearch.test.OpenSearchTestCase; + +import java.io.IOException; +import java.util.EnumSet; +import java.util.List; +import java.util.Map; + +public class ClusterStateChecksumTests extends OpenSearchTestCase { + + public void testClusterStateChecksumEmptyClusterState() { + ClusterStateChecksum checksum = new ClusterStateChecksum(ClusterState.EMPTY_STATE); + assertNotNull(checksum); + } + + public void testClusterStateChecksum() { + ClusterStateChecksum checksum = new ClusterStateChecksum(generateClusterState()); + assertNotNull(checksum); + assertTrue(checksum.routingTableChecksum != 0); + assertTrue(checksum.nodesChecksum != 0); + assertTrue(checksum.blocksChecksum != 0); + assertTrue(checksum.clusterStateCustomsChecksum != 0); + assertTrue(checksum.coordinationMetadataChecksum != 0); + assertTrue(checksum.settingMetadataChecksum != 0); + assertTrue(checksum.transientSettingsMetadataChecksum != 0); + assertTrue(checksum.templatesMetadataChecksum != 0); + assertTrue(checksum.customMetadataMapChecksum != 0); + assertTrue(checksum.hashesOfConsistentSettingsChecksum != 0); + assertTrue(checksum.indicesChecksum != 0); + assertTrue(checksum.clusterStateChecksum != 0); + } + + public void testClusterStateMatchChecksum() { + ClusterStateChecksum checksum = new ClusterStateChecksum(generateClusterState()); + ClusterStateChecksum newChecksum = new ClusterStateChecksum(generateClusterState()); + assertNotNull(checksum); + assertNotNull(newChecksum); + assertEquals(checksum.routingTableChecksum, newChecksum.routingTableChecksum); + assertEquals(checksum.nodesChecksum, newChecksum.nodesChecksum); + assertEquals(checksum.blocksChecksum, newChecksum.blocksChecksum); + assertEquals(checksum.clusterStateCustomsChecksum, newChecksum.clusterStateCustomsChecksum); + assertEquals(checksum.coordinationMetadataChecksum, newChecksum.coordinationMetadataChecksum); + assertEquals(checksum.settingMetadataChecksum, newChecksum.settingMetadataChecksum); + assertEquals(checksum.transientSettingsMetadataChecksum, newChecksum.transientSettingsMetadataChecksum); + assertEquals(checksum.templatesMetadataChecksum, newChecksum.templatesMetadataChecksum); + assertEquals(checksum.customMetadataMapChecksum, newChecksum.customMetadataMapChecksum); + assertEquals(checksum.hashesOfConsistentSettingsChecksum, newChecksum.hashesOfConsistentSettingsChecksum); + assertEquals(checksum.indicesChecksum, newChecksum.indicesChecksum); + assertEquals(checksum.clusterStateChecksum, newChecksum.clusterStateChecksum); + } + + public void testXContentConversion() throws IOException { + ClusterStateChecksum checksum = new ClusterStateChecksum(generateClusterState()); + final XContentBuilder builder = JsonXContent.contentBuilder(); + builder.startObject(); + checksum.toXContent(builder, ToXContent.EMPTY_PARAMS); + builder.endObject(); + + try (XContentParser parser = createParser(JsonXContent.jsonXContent, BytesReference.bytes(builder))) { + final ClusterStateChecksum parsedChecksum = ClusterStateChecksum.fromXContent(parser); + assertEquals(checksum, parsedChecksum); + } + } + + public void testSerialization() throws IOException { + ClusterStateChecksum checksum = new ClusterStateChecksum(generateClusterState()); + BytesStreamOutput output = new BytesStreamOutput(); + checksum.writeTo(output); + + try (StreamInput in = output.bytes().streamInput()) { + ClusterStateChecksum deserializedChecksum = new ClusterStateChecksum(in); + assertEquals(checksum, deserializedChecksum); + } + } + + public void testGetMismatchEntities() { + ClusterState clsState1 = generateClusterState(); + ClusterStateChecksum checksum = new ClusterStateChecksum(clsState1); + assertTrue(checksum.getMismatchEntities(checksum).isEmpty()); + + ClusterStateChecksum checksum2 = new ClusterStateChecksum(clsState1); + assertTrue(checksum.getMismatchEntities(checksum2).isEmpty()); + + ClusterState clsState2 = ClusterState.builder(ClusterName.DEFAULT) + .routingTable(RoutingTable.builder().build()) + .nodes(DiscoveryNodes.builder().build()) + .blocks(ClusterBlocks.builder().build()) + .customs(Map.of()) + .metadata(Metadata.EMPTY_METADATA) + .build(); + ClusterStateChecksum checksum3 = new ClusterStateChecksum(clsState2); + List mismatches = checksum.getMismatchEntities(checksum3); + assertFalse(mismatches.isEmpty()); + assertEquals(11, mismatches.size()); + assertEquals(ClusterStateChecksum.ROUTING_TABLE_CS, mismatches.get(0)); + assertEquals(ClusterStateChecksum.NODES_CS, mismatches.get(1)); + assertEquals(ClusterStateChecksum.BLOCKS_CS, mismatches.get(2)); + assertEquals(ClusterStateChecksum.CUSTOMS_CS, mismatches.get(3)); + assertEquals(ClusterStateChecksum.COORDINATION_MD_CS, mismatches.get(4)); + assertEquals(ClusterStateChecksum.SETTINGS_MD_CS, mismatches.get(5)); + assertEquals(ClusterStateChecksum.TRANSIENT_SETTINGS_MD_CS, mismatches.get(6)); + assertEquals(ClusterStateChecksum.TEMPLATES_MD_CS, mismatches.get(7)); + assertEquals(ClusterStateChecksum.CUSTOM_MD_CS, mismatches.get(8)); + assertEquals(ClusterStateChecksum.HASHES_MD_CS, mismatches.get(9)); + assertEquals(ClusterStateChecksum.INDICES_CS, mismatches.get(10)); + } + + public void testGetMismatchEntitiesUnorderedInput() { + ClusterState state1 = generateClusterState(); + DiscoveryNode node1 = DiscoveryNode.createLocal(Settings.EMPTY, new TransportAddress(TransportAddress.META_ADDRESS, 9200), "node1"); + DiscoveryNode node2 = DiscoveryNode.createLocal(Settings.EMPTY, new TransportAddress(TransportAddress.META_ADDRESS, 9201), "node2"); + DiscoveryNode node3 = DiscoveryNode.createLocal(Settings.EMPTY, new TransportAddress(TransportAddress.META_ADDRESS, 9202), "node3"); + + DiscoveryNodes nodes1 = DiscoveryNodes.builder().clusterManagerNodeId("test-node").add(node1).add(node2).add(node3).build(); + DiscoveryNodes nodes2 = DiscoveryNodes.builder().clusterManagerNodeId("test-node").add(node2).add(node3).build(); + nodes2 = nodes2.newNode(node1); + ClusterState state2 = ClusterState.builder(state1).nodes(nodes1).build(); + ClusterState state3 = ClusterState.builder(state1).nodes(nodes2).build(); + + ClusterStateChecksum checksum1 = new ClusterStateChecksum(state2); + ClusterStateChecksum checksum2 = new ClusterStateChecksum(state3); + assertEquals(checksum2, checksum1); + } + + private ClusterState generateClusterState() { + final Index index = new Index("test-index", "index-uuid"); + final Settings idxSettings = Settings.builder() + .put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT) + .put(IndexMetadata.SETTING_INDEX_UUID, index.getUUID()) + .put(IndexMetadata.INDEX_READ_ONLY_SETTING.getKey(), true) + .build(); + final IndexMetadata indexMetadata = new IndexMetadata.Builder(index.getName()).settings(idxSettings) + .numberOfShards(1) + .numberOfReplicas(0) + .build(); + final Index index2 = new Index("test-index2", "index-uuid2"); + final Settings idxSettings2 = Settings.builder() + .put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT) + .put(IndexMetadata.SETTING_INDEX_UUID, index2.getUUID()) + .put(IndexMetadata.INDEX_READ_ONLY_SETTING.getKey(), true) + .build(); + final IndexMetadata indexMetadata2 = new IndexMetadata.Builder(index2.getName()).settings(idxSettings2) + .numberOfShards(3) + .numberOfReplicas(2) + .build(); + final CoordinationMetadata coordinationMetadata = CoordinationMetadata.builder().term(1L).build(); + final Settings settings = Settings.builder().put("mock-settings", true).build(); + final TemplatesMetadata templatesMetadata = TemplatesMetadata.builder() + .put(IndexTemplateMetadata.builder("template1").settings(idxSettings).patterns(List.of("test*")).build()) + .build(); + final RemoteClusterStateTestUtils.CustomMetadata1 customMetadata1 = new RemoteClusterStateTestUtils.CustomMetadata1( + "custom-metadata-1" + ); + RemoteClusterStateTestUtils.TestClusterStateCustom1 clusterStateCustom1 = new RemoteClusterStateTestUtils.TestClusterStateCustom1( + "custom-1" + ); + return ClusterState.builder(ClusterName.DEFAULT) + .version(1L) + .stateUUID("state-uuid") + .metadata( + Metadata.builder() + .version(1L) + .put(indexMetadata, true) + .clusterUUID("cluster-uuid") + .coordinationMetadata(coordinationMetadata) + .persistentSettings(settings) + .transientSettings(settings) + .templates(templatesMetadata) + .hashesOfConsistentSettings(Map.of("key1", "value1", "key2", "value2")) + .putCustom(customMetadata1.getWriteableName(), customMetadata1) + .indices(Map.of(indexMetadata.getIndex().getName(), indexMetadata, indexMetadata2.getIndex().getName(), indexMetadata2)) + .build() + ) + .nodes(DiscoveryNodes.builder().clusterManagerNodeId("test-node").build()) + .blocks( + ClusterBlocks.builder() + .addBlocks(indexMetadata) + .addGlobalBlock(new ClusterBlock(1, "block", true, true, true, RestStatus.ACCEPTED, EnumSet.of(ClusterBlockLevel.READ))) + .addGlobalBlock( + new ClusterBlock(2, "block-name", false, true, true, RestStatus.OK, EnumSet.of(ClusterBlockLevel.WRITE)) + ) + .build() + ) + .customs(Map.of(clusterStateCustom1.getWriteableName(), clusterStateCustom1)) + .routingTable(RoutingTable.builder().addAsNew(indexMetadata).addAsNew(indexMetadata2).version(1L).build()) + .build(); + } +} diff --git a/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java b/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java index 4d301c1e6a4f4..e26a9ed7434b2 100644 --- a/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java +++ b/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java @@ -166,6 +166,7 @@ import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; @@ -2910,6 +2911,359 @@ private void initializeRoutingTable() { ); } + private void initializeWithChecksumEnabled() { + Settings newSettings = Settings.builder() + .put("node.attr." + REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY, "routing_repository") + .put("node.attr." + REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY, "remote_store_repository") + .put(RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING.getKey(), true) + .put(RemoteClusterStateService.REMOTE_CLUSTER_STATE_CHECKSUM_VALIDATION_ENABLED_SETTING.getKey(), true) + .build(); + clusterSettings.applySettings(newSettings); + + Settings nodeSettings = Settings.builder().put(REMOTE_PUBLICATION_EXPERIMENTAL, "true").build(); + FeatureFlags.initializeFeatureFlags(nodeSettings); + remoteClusterStateService = new RemoteClusterStateService( + "test-node-id", + repositoriesServiceSupplier, + newSettings, + clusterService, + () -> 0L, + threadPool, + List.of(new RemoteIndexPathUploader(threadPool, newSettings, repositoriesServiceSupplier, clusterSettings)), + writableRegistry() + ); + } + + public void testWriteFullMetadataSuccessWithChecksumValidationEnabled() throws IOException { + initializeWithChecksumEnabled(); + mockBlobStoreObjects(); + when((blobStoreRepository.basePath())).thenReturn(BlobPath.cleanPath().add("base-path")); + + final ClusterState clusterState = generateClusterStateWithOneIndex().nodes(nodesWithLocalNodeClusterManager()).build(); + remoteClusterStateService.start(); + final ClusterMetadataManifest manifest = remoteClusterStateService.writeFullMetadata( + clusterState, + "prev-cluster-uuid", + MANIFEST_CURRENT_CODEC_VERSION + ).getClusterMetadataManifest(); + final UploadedIndexMetadata uploadedIndexMetadata = new UploadedIndexMetadata("test-index", "index-uuid", "metadata-filename"); + final UploadedIndexMetadata uploadedIndiceRoutingMetadata = new UploadedIndexMetadata( + "test-index", + "index-uuid", + "routing-filename", + INDEX_ROUTING_METADATA_PREFIX + ); + final ClusterMetadataManifest expectedManifest = ClusterMetadataManifest.builder() + .indices(List.of(uploadedIndexMetadata)) + .clusterTerm(1L) + .stateVersion(1L) + .stateUUID("state-uuid") + .clusterUUID("cluster-uuid") + .previousClusterUUID("prev-cluster-uuid") + .routingTableVersion(1L) + .indicesRouting(List.of(uploadedIndiceRoutingMetadata)) + .checksum(new ClusterStateChecksum(clusterState)) + .build(); + + assertThat(manifest.getIndices().size(), is(1)); + assertThat(manifest.getClusterTerm(), is(expectedManifest.getClusterTerm())); + assertThat(manifest.getStateVersion(), is(expectedManifest.getStateVersion())); + assertThat(manifest.getClusterUUID(), is(expectedManifest.getClusterUUID())); + assertThat(manifest.getStateUUID(), is(expectedManifest.getStateUUID())); + assertThat(manifest.getPreviousClusterUUID(), is(expectedManifest.getPreviousClusterUUID())); + assertThat(manifest.getRoutingTableVersion(), is(expectedManifest.getRoutingTableVersion())); + assertThat(manifest.getIndicesRouting().get(0).getIndexName(), is(uploadedIndiceRoutingMetadata.getIndexName())); + assertThat(manifest.getIndicesRouting().get(0).getIndexUUID(), is(uploadedIndiceRoutingMetadata.getIndexUUID())); + assertThat(manifest.getIndicesRouting().get(0).getUploadedFilename(), notNullValue()); + assertThat(manifest.getClusterStateChecksum(), is(expectedManifest.getClusterStateChecksum())); + } + + public void testWriteIncrementalMetadataSuccessWithChecksumValidationEnabled() throws IOException { + initializeWithChecksumEnabled(); + final ClusterState clusterState = generateClusterStateWithOneIndex().nodes(nodesWithLocalNodeClusterManager()).build(); + mockBlobStoreObjects(); + final CoordinationMetadata coordinationMetadata = CoordinationMetadata.builder().term(1L).build(); + final ClusterState previousClusterState = ClusterState.builder(ClusterName.DEFAULT) + .metadata(Metadata.builder().coordinationMetadata(coordinationMetadata)) + .build(); + + final ClusterMetadataManifest previousManifest = ClusterMetadataManifest.builder() + .indices(Collections.emptyList()) + .checksum(new ClusterStateChecksum(clusterState)) + .build(); + when((blobStoreRepository.basePath())).thenReturn(BlobPath.cleanPath().add("base-path")); + + remoteClusterStateService.start(); + final ClusterMetadataManifest manifest = remoteClusterStateService.writeIncrementalMetadata( + previousClusterState, + clusterState, + previousManifest + ).getClusterMetadataManifest(); + final UploadedIndexMetadata uploadedIndexMetadata = new UploadedIndexMetadata("test-index", "index-uuid", "metadata-filename"); + final UploadedIndexMetadata uploadedIndiceRoutingMetadata = new UploadedIndexMetadata( + "test-index", + "index-uuid", + "routing-filename", + INDEX_ROUTING_METADATA_PREFIX + ); + final ClusterMetadataManifest expectedManifest = ClusterMetadataManifest.builder() + .indices(List.of(uploadedIndexMetadata)) + .clusterTerm(1L) + .stateVersion(1L) + .stateUUID("state-uuid") + .clusterUUID("cluster-uuid") + .previousClusterUUID("prev-cluster-uuid") + .routingTableVersion(1) + .indicesRouting(List.of(uploadedIndiceRoutingMetadata)) + .checksum(new ClusterStateChecksum(clusterState)) + .build(); + + assertThat(manifest.getIndices().size(), is(1)); + assertThat(manifest.getClusterTerm(), is(expectedManifest.getClusterTerm())); + assertThat(manifest.getStateVersion(), is(expectedManifest.getStateVersion())); + assertThat(manifest.getClusterUUID(), is(expectedManifest.getClusterUUID())); + assertThat(manifest.getStateUUID(), is(expectedManifest.getStateUUID())); + assertThat(manifest.getRoutingTableVersion(), is(expectedManifest.getRoutingTableVersion())); + assertThat(manifest.getIndicesRouting().get(0).getIndexName(), is(uploadedIndiceRoutingMetadata.getIndexName())); + assertThat(manifest.getIndicesRouting().get(0).getIndexUUID(), is(uploadedIndiceRoutingMetadata.getIndexUUID())); + assertThat(manifest.getIndicesRouting().get(0).getUploadedFilename(), notNullValue()); + assertThat(manifest.getClusterStateChecksum(), is(expectedManifest.getClusterStateChecksum())); + } + + public void testGetClusterStateForManifestWithChecksumValidationEnabledWithNullChecksum() throws IOException { + initializeWithChecksumEnabled(); + ClusterMetadataManifest manifest = generateClusterMetadataManifestWithAllAttributes().build(); + mockBlobStoreObjects(); + remoteClusterStateService.start(); + RemoteReadResult mockedResult = mock(RemoteReadResult.class); + RemoteIndexMetadataManager mockedIndexManager = mock(RemoteIndexMetadataManager.class); + RemoteGlobalMetadataManager mockedGlobalMetadataManager = mock(RemoteGlobalMetadataManager.class); + RemoteClusterStateAttributesManager mockedClusterStateAttributeManager = mock(RemoteClusterStateAttributesManager.class); + remoteClusterStateService.setRemoteIndexMetadataManager(mockedIndexManager); + remoteClusterStateService.setRemoteGlobalMetadataManager(mockedGlobalMetadataManager); + remoteClusterStateService.setRemoteClusterStateAttributesManager(mockedClusterStateAttributeManager); + ArgumentCaptor> listenerArgumentCaptor = ArgumentCaptor.forClass( + LatchedActionListener.class + ); + doAnswer(invocation -> { + listenerArgumentCaptor.getValue().onResponse(mockedResult); + return null; + }).when(mockedIndexManager).readAsync(any(), any(), listenerArgumentCaptor.capture()); + doAnswer(invocation -> { + listenerArgumentCaptor.getValue().onResponse(mockedResult); + return null; + }).when(mockedGlobalMetadataManager).readAsync(any(), any(), listenerArgumentCaptor.capture()); + doAnswer(invocation -> { + listenerArgumentCaptor.getValue().onResponse(mockedResult); + return null; + }).when(mockedClusterStateAttributeManager).readAsync(anyString(), any(), listenerArgumentCaptor.capture()); + when(mockedResult.getComponent()).thenReturn(COORDINATION_METADATA); + RemoteClusterStateService mockService = spy(remoteClusterStateService); + mockService.getClusterStateForManifest(ClusterName.DEFAULT.value(), manifest, NODE_ID, true); + verify(mockService, times(1)).readClusterStateInParallel( + any(), + eq(manifest), + eq(manifest.getClusterUUID()), + eq(NODE_ID), + eq(manifest.getIndices()), + eq(manifest.getCustomMetadataMap()), + eq(true), + eq(true), + eq(true), + eq(true), + eq(true), + eq(true), + eq(manifest.getIndicesRouting()), + eq(true), + eq(manifest.getClusterStateCustomMap()), + eq(false), + eq(true) + ); + verify(mockService, times(0)).validateClusterStateFromChecksum( + any(ClusterMetadataManifest.class), + any(ClusterState.class), + anyString(), + anyString(), + anyBoolean() + ); + } + + public void testGetClusterStateForManifestWithChecksumValidationEnabled() throws IOException { + initializeWithChecksumEnabled(); + ClusterState clusterState = generateClusterStateWithAllAttributes().build(); + ClusterMetadataManifest manifest = generateClusterMetadataManifestWithAllAttributes().checksum( + new ClusterStateChecksum(clusterState) + ).build(); + remoteClusterStateService.start(); + RemoteClusterStateService mockService = spy(remoteClusterStateService); + doReturn(clusterState).when(mockService) + .readClusterStateInParallel( + any(), + eq(manifest), + eq(manifest.getClusterUUID()), + eq(NODE_ID), + eq(manifest.getIndices()), + eq(manifest.getCustomMetadataMap()), + eq(true), + eq(true), + eq(true), + eq(true), + eq(true), + eq(true), + eq(manifest.getIndicesRouting()), + eq(true), + eq(manifest.getClusterStateCustomMap()), + eq(false), + eq(true) + ); + mockService.getClusterStateForManifest(ClusterName.DEFAULT.value(), manifest, NODE_ID, true); + verify(mockService, times(1)).validateClusterStateFromChecksum(manifest, clusterState, ClusterName.DEFAULT.value(), NODE_ID, true); + } + + public void testGetClusterStateForManifestWithChecksumValidationEnabledWithMismatch() throws IOException { + initializeWithChecksumEnabled(); + ClusterState clusterState = generateClusterStateWithAllAttributes().build(); + ClusterMetadataManifest manifest = generateClusterMetadataManifestWithAllAttributes().checksum( + new ClusterStateChecksum(clusterState) + ).build(); + remoteClusterStateService.start(); + RemoteClusterStateService mockService = spy(remoteClusterStateService); + ClusterState clusterStateWrong = ClusterState.builder(clusterState).routingTable(RoutingTable.EMPTY_ROUTING_TABLE).build(); + doReturn(clusterStateWrong).when(mockService) + .readClusterStateInParallel( + any(), + eq(manifest), + eq(manifest.getClusterUUID()), + eq(NODE_ID), + eq(manifest.getIndices()), + eq(manifest.getCustomMetadataMap()), + eq(true), + eq(true), + eq(true), + eq(true), + eq(true), + eq(true), + eq(manifest.getIndicesRouting()), + eq(true), + eq(manifest.getClusterStateCustomMap()), + eq(false), + eq(true) + ); + expectThrows( + IllegalStateException.class, + () -> mockService.getClusterStateForManifest(ClusterName.DEFAULT.value(), manifest, NODE_ID, true) + ); + verify(mockService, times(1)).validateClusterStateFromChecksum( + manifest, + clusterStateWrong, + ClusterName.DEFAULT.value(), + NODE_ID, + true + ); + } + + public void testGetClusterStateUsingDiffWithChecksum() throws IOException { + initializeWithChecksumEnabled(); + ClusterState clusterState = generateClusterStateWithAllAttributes().build(); + ClusterMetadataManifest manifest = generateClusterMetadataManifestWithAllAttributes().checksum( + new ClusterStateChecksum(clusterState) + ).diffManifest(ClusterStateDiffManifest.builder().build()).build(); + + remoteClusterStateService.start(); + RemoteClusterStateService mockService = spy(remoteClusterStateService); + + doReturn(clusterState).when(mockService) + .readClusterStateInParallel( + any(), + eq(manifest), + eq(manifest.getClusterUUID()), + eq(NODE_ID), + eq(emptyList()), + eq(emptyMap()), + anyBoolean(), + anyBoolean(), + anyBoolean(), + anyBoolean(), + anyBoolean(), + anyBoolean(), + eq(emptyList()), + anyBoolean(), + eq(emptyMap()), + anyBoolean(), + anyBoolean() + ); + mockService.getClusterStateUsingDiff(manifest, clusterState, NODE_ID); + + verify(mockService, times(1)).validateClusterStateFromChecksum( + eq(manifest), + any(ClusterState.class), + eq(ClusterName.DEFAULT.value()), + eq(NODE_ID), + eq(false) + ); + } + + public void testGetClusterStateUsingDiffWithChecksumMismatch() throws IOException { + initializeWithChecksumEnabled(); + ClusterState clusterState = generateClusterStateWithAllAttributes().build(); + ClusterMetadataManifest manifest = generateClusterMetadataManifestWithAllAttributes().checksum( + new ClusterStateChecksum(clusterState) + ).diffManifest(ClusterStateDiffManifest.builder().build()).build(); + + remoteClusterStateService.start(); + RemoteClusterStateService mockService = spy(remoteClusterStateService); + ClusterState clusterStateWrong = ClusterState.builder(clusterState).routingTable(RoutingTable.EMPTY_ROUTING_TABLE).build(); + doReturn(clusterStateWrong).when(mockService) + .readClusterStateInParallel( + any(), + eq(manifest), + eq(manifest.getClusterUUID()), + eq(NODE_ID), + eq(emptyList()), + eq(emptyMap()), + anyBoolean(), + anyBoolean(), + anyBoolean(), + anyBoolean(), + anyBoolean(), + anyBoolean(), + eq(emptyList()), + anyBoolean(), + eq(emptyMap()), + anyBoolean(), + anyBoolean() + ); + doReturn(clusterState).when(mockService) + .readClusterStateInParallel( + any(), + eq(manifest), + eq(manifest.getClusterUUID()), + eq(NODE_ID), + eq(manifest.getIndices()), + eq(manifest.getCustomMetadataMap()), + eq(true), + eq(true), + eq(true), + eq(true), + eq(true), + eq(true), + eq(manifest.getIndicesRouting()), + eq(true), + eq(manifest.getClusterStateCustomMap()), + eq(false), + eq(true) + ); + + expectThrows(IllegalStateException.class, () -> mockService.getClusterStateUsingDiff(manifest, clusterState, NODE_ID)); + verify(mockService, times(1)).validateClusterStateFromChecksum( + eq(manifest), + any(ClusterState.class), + eq(ClusterName.DEFAULT.value()), + eq(NODE_ID), + eq(false) + ); + } + private void mockObjectsForGettingPreviousClusterUUID(Map clusterUUIDsPointers) throws IOException { mockObjectsForGettingPreviousClusterUUID(clusterUUIDsPointers, false, Collections.emptyMap()); } @@ -3459,6 +3813,8 @@ static ClusterMetadataManifest.Builder generateClusterMetadataManifestWithAllAtt return ClusterMetadataManifest.builder() .codecVersion(CODEC_V2) .clusterUUID("cluster-uuid") + .stateVersion(1L) + .stateUUID("state-uuid") .indices(List.of(new UploadedIndexMetadata("test-index", "test-index-uuid", "test-index-file__2"))) .customMetadataMap( Map.of( @@ -3484,7 +3840,8 @@ static ClusterMetadataManifest.Builder generateClusterMetadataManifestWithAllAtt "custom_2", new UploadedMetadataAttribute("custom_2", "test-cluster-state-custom2-file__1") ) - ); + ) + .routingTableVersion(1L); } public static DiscoveryNodes nodesWithLocalNodeClusterManager() {