Skip to content

Commit

Permalink
Add remote download stats
Browse files Browse the repository at this point in the history
Signed-off-by: Arpit Bandejiya <abandeji@amazon.com>
  • Loading branch information
Arpit-Bandejiya committed Aug 19, 2024
1 parent 3db2525 commit 52aaa2f
Show file tree
Hide file tree
Showing 13 changed files with 274 additions and 105 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -651,7 +651,7 @@ public interface PersistedState extends Closeable {
* Returns the stats for the persistence layer for {@link CoordinationState}.
* @return PersistedStateStats
*/
PersistedStateStats getStats();
PersistedStateStats getUploadStats();

/**
* Marks the last accepted cluster state as committed.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -896,9 +896,13 @@ public DiscoveryStats stats() {
ClusterStateStats clusterStateStats = clusterManagerService.getClusterStateStats();
ArrayList<PersistedStateStats> stats = new ArrayList<>();
Stream.of(PersistedStateRegistry.PersistedStateType.values()).forEach(stateType -> {
if (persistedStateRegistry.getPersistedState(stateType) != null
&& persistedStateRegistry.getPersistedState(stateType).getStats() != null) {
stats.add(persistedStateRegistry.getPersistedState(stateType).getStats());
if (persistedStateRegistry.getPersistedState(stateType) != null) {
if (persistedStateRegistry.getPersistedState(stateType).getUploadStats() != null) {
stats.add(persistedStateRegistry.getPersistedState(stateType).getUploadStats());
}
if (coordinationState.get().isRemotePublicationEnabled()) {
stats.add(publicationHandler.getDownloadStats());
}
}
});
clusterStateStats.setPersistenceStats(stats);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public void setLastAcceptedState(ClusterState clusterState) {
}

@Override
public PersistedStateStats getStats() {
public PersistedStateStats getUploadStats() {
return null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,10 @@ public PublishClusterStateStats stats() {
);
}

public PersistedStateStats getDownloadStats() {
return remoteClusterStateService.getDownloadStats();
}

private PublishWithJoinResponse handleIncomingPublishRequest(BytesTransportRequest request) throws IOException {
try (StreamInput in = CompressedStreamUtils.decompressBytes(request, namedWriteableRegistry)) {
ClusterState incomingState;
Expand Down Expand Up @@ -229,69 +233,75 @@ private PublishWithJoinResponse handleIncomingPublishRequest(BytesTransportReque
}

// package private for testing
PublishWithJoinResponse handleIncomingRemotePublishRequest(RemotePublishRequest request) throws IOException {
if (transportService.getLocalNode().equals(request.getSourceNode())) {
return acceptRemoteStateOnLocalNode(request);
}
// TODO Make cluster state download non-blocking: https://github.com/opensearch-project/OpenSearch/issues/14102
ClusterMetadataManifest manifest = remoteClusterStateService.getClusterMetadataManifestByFileName(
request.getClusterUUID(),
request.getManifestFile()
);
if (manifest == null) {
throw new IllegalStateException("Publication failed as manifest was not found for " + request);
}
boolean applyFullState = false;
final ClusterState lastSeen = lastSeenClusterState.get();
if (lastSeen == null) {
logger.debug(() -> "Diff cannot be applied as there is no last cluster state");
applyFullState = true;
} else if (manifest.getDiffManifest() == null) {
logger.trace(() -> "There is no diff in the manifest");
applyFullState = true;
} else if (manifest.getDiffManifest().getFromStateUUID().equals(lastSeen.stateUUID()) == false) {
logger.debug(() -> "Last cluster state not compatible with the diff");
applyFullState = true;
}

if (applyFullState == true) {
logger.debug(
() -> new ParameterizedMessage(
"Downloading full cluster state for term {}, version {}, stateUUID {}",
manifest.getClusterTerm(),
manifest.getStateVersion(),
manifest.getStateUUID()
)
);
ClusterState clusterState = remoteClusterStateService.getClusterStateForManifest(
request.getClusterName(),
manifest,
transportService.getLocalNode().getId(),
true
);
fullClusterStateReceivedCount.incrementAndGet();
final PublishWithJoinResponse response = acceptState(clusterState);
lastSeenClusterState.set(clusterState);
return response;
} else {
logger.debug(
() -> new ParameterizedMessage(
"Downloading diff cluster state for term {}, version {}, previousUUID {}, current UUID {}",
manifest.getClusterTerm(),
manifest.getStateVersion(),
manifest.getDiffManifest().getFromStateUUID(),
manifest.getStateUUID()
)
);
ClusterState clusterState = remoteClusterStateService.getClusterStateUsingDiff(
manifest,
lastSeen,
transportService.getLocalNode().getId()
PublishWithJoinResponse handleIncomingRemotePublishRequest(RemotePublishRequest request) throws IOException, RuntimeException {
try {
if (transportService.getLocalNode().equals(request.getSourceNode())) {
return acceptRemoteStateOnLocalNode(request);
}
// TODO Make cluster state download non-blocking: https://github.com/opensearch-project/OpenSearch/issues/14102
ClusterMetadataManifest manifest = remoteClusterStateService.getClusterMetadataManifestByFileName(
request.getClusterUUID(),
request.getManifestFile()
);
compatibleClusterStateDiffReceivedCount.incrementAndGet();
final PublishWithJoinResponse response = acceptState(clusterState);
lastSeenClusterState.compareAndSet(lastSeen, clusterState);
return response;
if (manifest == null) {
throw new IllegalStateException("Publication failed as manifest was not found for " + request);
}
boolean applyFullState = false;
final ClusterState lastSeen = lastSeenClusterState.get();
if (lastSeen == null) {
logger.debug(() -> "Diff cannot be applied as there is no last cluster state");
applyFullState = true;
} else if (manifest.getDiffManifest() == null) {
logger.debug(() -> "There is no diff in the manifest");
applyFullState = true;
} else if (manifest.getDiffManifest().getFromStateUUID().equals(lastSeen.stateUUID()) == false) {
logger.debug(() -> "Last cluster state not compatible with the diff");
applyFullState = true;
}

if (applyFullState == true) {
logger.debug(
() -> new ParameterizedMessage(
"Downloading full cluster state for term {}, version {}, stateUUID {}",
manifest.getClusterTerm(),
manifest.getStateVersion(),
manifest.getStateUUID()
)
);
ClusterState clusterState = remoteClusterStateService.getClusterStateForManifest(
request.getClusterName(),
manifest,
transportService.getLocalNode().getId(),
true
);
fullClusterStateReceivedCount.incrementAndGet();
final PublishWithJoinResponse response = acceptState(clusterState);
lastSeenClusterState.set(clusterState);
return response;
} else {
logger.debug(
() -> new ParameterizedMessage(
"Downloading diff cluster state for term {}, version {}, previousUUID {}, current UUID {}",
manifest.getClusterTerm(),
manifest.getStateVersion(),
manifest.getDiffManifest().getFromStateUUID(),
manifest.getStateUUID()
)
);
ClusterState clusterState = remoteClusterStateService.getClusterStateUsingDiff(
manifest,
lastSeen,
transportService.getLocalNode().getId()
);
compatibleClusterStateDiffReceivedCount.incrementAndGet();
final PublishWithJoinResponse response = acceptState(clusterState);
lastSeenClusterState.compareAndSet(lastSeen, clusterState);
return response;
}
} catch (Exception e) {
remoteClusterStateService.readMetadataFailed();
if (e instanceof IOException) throw new IOException("IOException in reading remote cluster state", e);
throw new RuntimeException("Runtime exception in reading remote cluster state", e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -618,7 +618,7 @@ public void setLastAcceptedState(ClusterState clusterState) {
}

@Override
public PersistedStateStats getStats() {
public PersistedStateStats getUploadStats() {
// Note: These stats are not published yet, will come in future
return null;
}
Expand Down Expand Up @@ -740,8 +740,8 @@ assert verifyManifestAndClusterState(manifestDetails.getClusterMetadataManifest(
}

@Override
public PersistedStateStats getStats() {
return remoteClusterStateService.getStats();
public PersistedStateStats getUploadStats() {
return remoteClusterStateService.getStats().getUploadStats();
}

private boolean verifyManifestAndClusterState(ClusterMetadataManifest manifest, ClusterState clusterState) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.opensearch.cluster.DiffableUtils;
import org.opensearch.cluster.block.ClusterBlocks;
import org.opensearch.cluster.coordination.CoordinationMetadata;
import org.opensearch.cluster.coordination.PersistedStateStats;
import org.opensearch.cluster.metadata.DiffableStringMap;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.metadata.Metadata;
Expand Down Expand Up @@ -256,8 +257,8 @@ public RemoteClusterStateManifestInfo writeFullMetadata(ClusterState clusterStat
);

final long durationMillis = TimeValue.nsecToMSec(relativeTimeNanosSupplier.getAsLong() - startTimeNanos);
remoteStateStats.stateSucceeded();
remoteStateStats.stateTook(durationMillis);
remoteStateStats.stateUploadSucceeded();
remoteStateStats.stateUploadTook(durationMillis);
if (durationMillis >= slowWriteLoggingThreshold.getMillis()) {
logger.warn(
"writing cluster state took [{}ms] which is above the warn threshold of [{}]; "
Expand Down Expand Up @@ -447,8 +448,8 @@ public RemoteClusterStateManifestInfo writeIncrementalMetadata(
);

final long durationMillis = TimeValue.nsecToMSec(relativeTimeNanosSupplier.getAsLong() - startTimeNanos);
remoteStateStats.stateSucceeded();
remoteStateStats.stateTook(durationMillis);
remoteStateStats.stateUploadSucceeded();
remoteStateStats.stateUploadTook(durationMillis);
ParameterizedMessage clusterStateUploadTimeMessage = new ParameterizedMessage(
CLUSTER_STATE_UPLOAD_TIME_LOG_STRING,
manifestDetails.getClusterMetadataManifest().getStateVersion(),
Expand Down Expand Up @@ -1311,8 +1312,10 @@ public ClusterState getClusterStateForManifest(
String localNodeId,
boolean includeEphemeral
) throws IOException {
final ClusterState clusterState;
final long startTimeNanos = relativeTimeNanosSupplier.getAsLong();
if (manifest.onOrAfterCodecVersion(CODEC_V2)) {
return readClusterStateInParallel(
clusterState = readClusterStateInParallel(
ClusterState.builder(new ClusterName(clusterName)).build(),
manifest,
manifest.getClusterUUID(),
Expand All @@ -1332,7 +1335,7 @@ public ClusterState getClusterStateForManifest(
includeEphemeral
);
} else {
ClusterState clusterState = readClusterStateInParallel(
ClusterState state = readClusterStateInParallel(
ClusterState.builder(new ClusterName(clusterName)).build(),
manifest,
manifest.getClusterUUID(),
Expand All @@ -1353,15 +1356,20 @@ public ClusterState getClusterStateForManifest(
false
);
Metadata.Builder mb = Metadata.builder(remoteGlobalMetadataManager.getGlobalMetadata(manifest.getClusterUUID(), manifest));
mb.indices(clusterState.metadata().indices());
return ClusterState.builder(clusterState).metadata(mb).build();
mb.indices(state.metadata().indices());
clusterState = ClusterState.builder(state).metadata(mb).build();
}
final long durationMillis = TimeValue.nsecToMSec(relativeTimeNanosSupplier.getAsLong() - startTimeNanos);
remoteStateStats.stateDownloadSucceeded();
remoteStateStats.stateDownloadTook(durationMillis);

return clusterState;
}

public ClusterState getClusterStateUsingDiff(ClusterMetadataManifest manifest, ClusterState previousState, String localNodeId)
throws IOException {
assert manifest.getDiffManifest() != null : "Diff manifest null which is required for downloading cluster state";
final long startTimeNanos = relativeTimeNanosSupplier.getAsLong();
ClusterStateDiffManifest diff = manifest.getDiffManifest();
List<UploadedIndexMetadata> updatedIndices = diff.getIndicesUpdated().stream().map(idx -> {
Optional<UploadedIndexMetadata> uploadedIndexMetadataOptional = manifest.getIndices()
Expand Down Expand Up @@ -1437,11 +1445,17 @@ public ClusterState getClusterStateUsingDiff(ClusterMetadataManifest manifest, C
indexRoutingTables.remove(indexName);
}

return clusterStateBuilder.stateUUID(manifest.getStateUUID())
ClusterState clusterState = clusterStateBuilder.stateUUID(manifest.getStateUUID())
.version(manifest.getStateVersion())
.metadata(metadataBuilder)
.routingTable(new RoutingTable(manifest.getRoutingTableVersion(), indexRoutingTables))
.build();

final long durationMillis = TimeValue.nsecToMSec(relativeTimeNanosSupplier.getAsLong() - startTimeNanos);
remoteStateStats.stateDownloadSucceeded();
remoteStateStats.stateDownloadTook(durationMillis);

return clusterState;
}

/**
Expand Down Expand Up @@ -1637,10 +1651,22 @@ void setRemoteClusterStateAttributesManager(RemoteClusterStateAttributesManager
}

public void writeMetadataFailed() {
getStats().stateFailed();
getStats().stateUploadFailed();
}

public RemotePersistenceStats getStats() {
return remoteStateStats;
}

public void readMetadataFailed() {
getStats().stateDownloadFailed();
}

public PersistedStateStats getUploadStats() {
return remoteStateStats.getUploadStats();
}

public PersistedStateStats getDownloadStats() {
return remoteStateStats.getDownloadStats();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.gateway.remote;

import org.opensearch.cluster.coordination.PersistedStateStats;

import java.util.concurrent.atomic.AtomicLong;

public class RemoteDownloadStats extends PersistedStateStats {
static final String TIMEOUT_COUNT = "timeout_count";
static final String REMOTE_DOWNLOAD = "remote_download";
private AtomicLong timeoutCount = new AtomicLong(0);

public RemoteDownloadStats() {
super(REMOTE_DOWNLOAD);
addToExtendedFields(TIMEOUT_COUNT, timeoutCount);
}

public void stateDownloadTimeout() {
timeoutCount.incrementAndGet();
}

public long getStateDownloadTimeout() {
return timeoutCount.get();
}
}
Loading

0 comments on commit 52aaa2f

Please sign in to comment.