Skip to content

Commit

Permalink
Add UTs
Browse files Browse the repository at this point in the history
Signed-off-by: Himshikha Gupta <himshikh@amazon.com>
  • Loading branch information
Himshikha Gupta committed Aug 16, 2024
1 parent 379fe90 commit 64af94a
Show file tree
Hide file tree
Showing 6 changed files with 621 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import org.opensearch.core.xcontent.XContentParser;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;

import com.jcraft.jzlib.JZlib;
Expand All @@ -35,18 +37,19 @@
*/
public class ClusterStateChecksum implements ToXContentFragment, Writeable {

private static final String ROUTING_TABLE_CS = "routing_table";
private static final String NODES_CS = "discovery_nodes";
private static final String BLOCKS_CS = "blocks";
private static final String CUSTOMS_CS = "customs";
private static final String COORDINATION_MD_CS = "coordination_md";
private static final String SETTINGS_MD_CS = "settings_md";
private static final String TRANSIENT_SETTINGS_MD_CS = "transient_settings_md";
private static final String TEMPLATES_MD_CS = "templated_md";
private static final String CUSTOM_MD_CS = "customs_md";
private static final String HASHES_MD_CS = "hashes_md";
private static final String INDICES_CS = "indices_md";
static final String ROUTING_TABLE_CS = "routing_table";
static final String NODES_CS = "discovery_nodes";
static final String BLOCKS_CS = "blocks";
static final String CUSTOMS_CS = "customs";
static final String COORDINATION_MD_CS = "coordination_md";
static final String SETTINGS_MD_CS = "settings_md";
static final String TRANSIENT_SETTINGS_MD_CS = "transient_settings_md";
static final String TEMPLATES_MD_CS = "templated_md";
static final String CUSTOM_MD_CS = "customs_md";
static final String HASHES_MD_CS = "hashes_md";
static final String INDICES_CS = "indices_md";
private static final String CLUSTER_STATE_CS = "cluster_state";
private static final int CHECKSUM_SIZE = 8;
private static final Logger logger = LogManager.getLogger(ClusterStateChecksum.class);

long routingTableChecksum;
Expand Down Expand Up @@ -133,16 +136,20 @@ public ClusterStateChecksum(ClusterState clusterState) {
logger.error("Failed to create checksum for cluster state.", e);
throw new RemoteStateTransferException("Failed to create checksum for cluster state.", e);
}
clusterStateChecksum = JZlib.crc32_combine(routingTableChecksum, nodesChecksum, 8);
clusterStateChecksum = JZlib.crc32_combine(clusterStateChecksum, blocksChecksum, 8);
clusterStateChecksum = JZlib.crc32_combine(clusterStateChecksum, clusterStateCustomsChecksum, 8);
clusterStateChecksum = JZlib.crc32_combine(clusterStateChecksum, coordinationMetadataChecksum, 8);
clusterStateChecksum = JZlib.crc32_combine(clusterStateChecksum, settingMetadataChecksum, 8);
clusterStateChecksum = JZlib.crc32_combine(clusterStateChecksum, transientSettingsMetadataChecksum, 8);
clusterStateChecksum = JZlib.crc32_combine(clusterStateChecksum, templatesMetadataChecksum, 8);
clusterStateChecksum = JZlib.crc32_combine(clusterStateChecksum, customMetadataMapChecksum, 8);
clusterStateChecksum = JZlib.crc32_combine(clusterStateChecksum, hashesOfConsistentSettingsChecksum, 8);
clusterStateChecksum = JZlib.crc32_combine(clusterStateChecksum, indicesChecksum, 8);
createClusterStateChecksum();
}

private void createClusterStateChecksum() {
clusterStateChecksum = JZlib.crc32_combine(routingTableChecksum, nodesChecksum, CHECKSUM_SIZE);
clusterStateChecksum = JZlib.crc32_combine(clusterStateChecksum, blocksChecksum, CHECKSUM_SIZE);
clusterStateChecksum = JZlib.crc32_combine(clusterStateChecksum, clusterStateCustomsChecksum, CHECKSUM_SIZE);
clusterStateChecksum = JZlib.crc32_combine(clusterStateChecksum, coordinationMetadataChecksum, CHECKSUM_SIZE);
clusterStateChecksum = JZlib.crc32_combine(clusterStateChecksum, settingMetadataChecksum, CHECKSUM_SIZE);
clusterStateChecksum = JZlib.crc32_combine(clusterStateChecksum, transientSettingsMetadataChecksum, CHECKSUM_SIZE);
clusterStateChecksum = JZlib.crc32_combine(clusterStateChecksum, templatesMetadataChecksum, CHECKSUM_SIZE);
clusterStateChecksum = JZlib.crc32_combine(clusterStateChecksum, customMetadataMapChecksum, CHECKSUM_SIZE);
clusterStateChecksum = JZlib.crc32_combine(clusterStateChecksum, hashesOfConsistentSettingsChecksum, CHECKSUM_SIZE);
clusterStateChecksum = JZlib.crc32_combine(clusterStateChecksum, indicesChecksum, CHECKSUM_SIZE);
}

public static ClusterStateChecksum.Builder builder() {
Expand Down Expand Up @@ -324,6 +331,48 @@ public int hashCode() {
);
}

public List<String> getMismatchEntities(ClusterStateChecksum otherClusterStateChecksum) {
if (this.clusterStateChecksum == otherClusterStateChecksum.clusterStateChecksum) {
logger.info("No mismatch in checksums.");
return List.of();
}
List<String> mismatches = new ArrayList<>();
addIfMismatch(this.routingTableChecksum, otherClusterStateChecksum.routingTableChecksum, ROUTING_TABLE_CS, mismatches);
addIfMismatch(this.nodesChecksum, otherClusterStateChecksum.nodesChecksum, NODES_CS, mismatches);
addIfMismatch(this.blocksChecksum, otherClusterStateChecksum.blocksChecksum, BLOCKS_CS, mismatches);
addIfMismatch(this.clusterStateCustomsChecksum, otherClusterStateChecksum.clusterStateCustomsChecksum, CUSTOMS_CS, mismatches);
addIfMismatch(
this.coordinationMetadataChecksum,
otherClusterStateChecksum.coordinationMetadataChecksum,
COORDINATION_MD_CS,
mismatches
);
addIfMismatch(this.settingMetadataChecksum, otherClusterStateChecksum.settingMetadataChecksum, SETTINGS_MD_CS, mismatches);
addIfMismatch(
this.transientSettingsMetadataChecksum,
otherClusterStateChecksum.transientSettingsMetadataChecksum,
TRANSIENT_SETTINGS_MD_CS,
mismatches
);
addIfMismatch(this.templatesMetadataChecksum, otherClusterStateChecksum.templatesMetadataChecksum, TEMPLATES_MD_CS, mismatches);
addIfMismatch(this.customMetadataMapChecksum, otherClusterStateChecksum.customMetadataMapChecksum, CUSTOM_MD_CS, mismatches);
addIfMismatch(
this.hashesOfConsistentSettingsChecksum,
otherClusterStateChecksum.hashesOfConsistentSettingsChecksum,
HASHES_MD_CS,
mismatches
);
addIfMismatch(this.indicesChecksum, otherClusterStateChecksum.indicesChecksum, INDICES_CS, mismatches);

return mismatches;
}

private void addIfMismatch(long checksum, long otherChecksum, String entityName, List<String> mismatches) {
if (checksum != otherChecksum) {
mismatches.add(entityName);
}
}

public static class Builder {
long routingTableChecksum;
long nodesChecksum;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,10 @@ public RemoteClusterStateService(
this.remoteStateReadTimeout = clusterSettings.get(REMOTE_STATE_READ_TIMEOUT_SETTING);
clusterSettings.addSettingsUpdateConsumer(REMOTE_STATE_READ_TIMEOUT_SETTING, this::setRemoteStateReadTimeout);
this.checksumValidationEnabled = clusterSettings.get(REMOTE_CLUSTER_STATE_CHECKSUM_VALIDATION_ENABLED_SETTING);
clusterSettings.addSettingsUpdateConsumer(REMOTE_CLUSTER_STATE_CHECKSUM_VALIDATION_ENABLED_SETTING, this::setChecksumValidationEnabled);
clusterSettings.addSettingsUpdateConsumer(
REMOTE_CLUSTER_STATE_CHECKSUM_VALIDATION_ENABLED_SETTING,
this::setChecksumValidationEnabled
);

this.remoteStateStats = new RemotePersistenceStats();
this.namedWriteableRegistry = namedWriteableRegistry;
Expand Down Expand Up @@ -891,7 +894,7 @@ public RemoteClusterStateManifestInfo markLastStateAsCommitted(ClusterState clus
uploadedMetadataResults,
previousManifest.getPreviousClusterUUID(),
previousManifest.getDiffManifest(),
checksumValidationEnabled ? previousManifest.getClusterStateChecksum(): null,
checksumValidationEnabled ? previousManifest.getClusterStateChecksum() : null,
true
);
if (!previousManifest.isClusterUUIDCommitted() && committedManifestDetails.getClusterMetadataManifest().isClusterUUIDCommitted()) {
Expand Down Expand Up @@ -1330,7 +1333,7 @@ public ClusterState getClusterStateForManifest(
boolean includeEphemeral
) throws IOException {
if (manifest.onOrAfterCodecVersion(CODEC_V2)) {
ClusterState clusterState = readClusterStateInParallel(
ClusterState clusterState = readClusterStateInParallel(
ClusterState.builder(new ClusterName(clusterName)).build(),
manifest,
manifest.getClusterUUID(),
Expand All @@ -1350,7 +1353,7 @@ public ClusterState getClusterStateForManifest(
includeEphemeral
);

if (checksumValidationEnabled && manifest.getClusterStateChecksum()!=null) {
if (checksumValidationEnabled && manifest.getClusterStateChecksum() != null) {
validateClusterStateFromChecksum(manifest.getClusterStateChecksum(), clusterState);
}
return clusterState;
Expand Down Expand Up @@ -1466,21 +1469,25 @@ public ClusterState getClusterStateUsingDiff(ClusterMetadataManifest manifest, C
.routingTable(new RoutingTable(manifest.getRoutingTableVersion(), indexRoutingTables))
.build();

if (checksumValidationEnabled && manifest.getClusterStateChecksum()!=null) {
if (checksumValidationEnabled && manifest.getClusterStateChecksum() != null) {
validateClusterStateFromChecksum(manifest.getClusterStateChecksum(), clusterState);
}
return clusterState;
}

private void validateClusterStateFromChecksum(ClusterStateChecksum clusterStateChecksum, ClusterState clusterState) {
void validateClusterStateFromChecksum(ClusterStateChecksum clusterStateChecksum, ClusterState clusterState) {
ClusterStateChecksum newClusterStateChecksum = new ClusterStateChecksum(clusterState);
if (!newClusterStateChecksum.equals(clusterStateChecksum)) {
List<String> failedValidation = newClusterStateChecksum.getMismatchEntities(clusterStateChecksum);
logger.error(
"Cluster state checksums do not match. Checksum from manifest {}, checksum from created cluster state {}",
clusterStateChecksum,
newClusterStateChecksum
() -> new ParameterizedMessage(
"Cluster state checksums do not match. Checksum from manifest {}, checksum from created cluster state {}. Entities failing validation {}",
clusterStateChecksum,
newClusterStateChecksum,
failedValidation
)
);
throw new IllegalStateException("Cluster state checksums do not match.");
throw new IllegalStateException("Cluster state checksums do not match. Validation failed for " + failedValidation);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.opensearch.Version;
import org.opensearch.action.LatchedActionListener;
import org.opensearch.cluster.ClusterState;
Expand Down Expand Up @@ -129,6 +130,7 @@ RemoteClusterStateManifestInfo uploadManifest(
.checksum(clusterStateChecksum);
final ClusterMetadataManifest manifest = manifestBuilder.build();
logger.trace("uploading manifest [{}]", manifest);
logger.trace(() -> new ParameterizedMessage("[{}] uploading manifest", manifest));
String manifestFileName = writeMetadataManifest(clusterState.metadata().clusterUUID(), manifest);
return new RemoteClusterStateManifestInfo(manifest, manifestFileName);
}
Expand Down
Loading

0 comments on commit 64af94a

Please sign in to comment.