Skip to content

Commit

Permalink
Merge branch '2.x' into 2.x
Browse files Browse the repository at this point in the history
Signed-off-by: Vinay Krishna Pudyodu <vinkrish.neo@gmail.com>
  • Loading branch information
vinaykpud authored Oct 28, 2024
2 parents 81eee96 + 9358ac4 commit fe937e5
Show file tree
Hide file tree
Showing 12 changed files with 123 additions and 164 deletions.
4 changes: 3 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- URI path filtering support in cluster stats API ([#15938](https://github.com/opensearch-project/OpenSearch/pull/15938))
- Add support for renaming aliases during snapshot restore ([#16292](https://github.com/opensearch-project/OpenSearch/pull/16292))
- Add support for restoring from snapshot with search replicas ([#16111](https://github.com/opensearch-project/OpenSearch/pull/16111))
- Add logic in master service to optimize performance and retain detailed logging for critical cluster operations. ([#14795](https://github.com/opensearch-project/OpenSearch/pull/14795))

### Dependencies
- Bump `org.apache.logging.log4j:log4j-core` from 2.23.1 to 2.24.0 ([#15858](https://github.com/opensearch-project/OpenSearch/pull/15858))
Expand All @@ -38,7 +39,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Bump `peter-evans/create-pull-request` from 6 to 7 ([#15863](https://github.com/opensearch-project/OpenSearch/pull/15863))
- Bump `com.nimbusds:oauth2-oidc-sdk` from 11.9.1 to 11.19.1 ([#15862](https://github.com/opensearch-project/OpenSearch/pull/15862))
- Bump `com.microsoft.azure:msal4j` from 1.17.0 to 1.17.2 ([#15945](https://github.com/opensearch-project/OpenSearch/pull/15945), [#16406](https://github.com/opensearch-project/OpenSearch/pull/16406))
- Bump `ch.qos.logback:logback-core` from 1.5.6 to 1.5.10 ([#15946](https://github.com/opensearch-project/OpenSearch/pull/15946), [#16307](https://github.com/opensearch-project/OpenSearch/pull/16307))
- Bump `ch.qos.logback:logback-core` from 1.5.6 to 1.5.12 ([#15946](https://github.com/opensearch-project/OpenSearch/pull/15946), [#16307](https://github.com/opensearch-project/OpenSearch/pull/16307), [#16503](https://github.com/opensearch-project/OpenSearch/pull/16503))
- Update protobuf from 3.25.4 to 3.25.5 ([#16011](https://github.com/opensearch-project/OpenSearch/pull/16011))
- Bump `org.roaringbitmap:RoaringBitmap` from 1.2.1 to 1.3.0 ([#16040](https://github.com/opensearch-project/OpenSearch/pull/16040))
- Bump `com.nimbusds:nimbus-jose-jwt` from 9.40 to 9.41.1 ([#16038](https://github.com/opensearch-project/OpenSearch/pull/16038))
Expand Down Expand Up @@ -99,6 +100,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- [Workload Management] Enhance rejection mechanism in workload management ([#16417](https://github.com/opensearch-project/OpenSearch/pull/16417))
- [Workload Management] Fixing Create/Update QueryGroup TransportActions to execute from non-cluster manager nodes ([16422](https://github.com/opensearch-project/OpenSearch/pull/16422))
- Fix flaky test in `testApproximateRangeWithSizeOverDefault` by adjusting totalHits assertion logic ([#16434](https://github.com/opensearch-project/OpenSearch/pull/16434#pullrequestreview-2386999409))
- Revert changes to upload remote state manifest using minimum codec version([#16403](https://github.com/opensearch-project/OpenSearch/pull/16403))

### Security

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,6 @@ public PublicationContext newPublicationContext(
}

private boolean validateRemotePublicationConfiguredOnAllNodes(DiscoveryNodes discoveryNodes) {
assert ClusterMetadataManifest.getCodecForVersion(discoveryNodes.getMinNodeVersion()) >= ClusterMetadataManifest.CODEC_V0;
for (DiscoveryNode node : discoveryNodes.getNodes().values()) {
// if a node is non-remote then created local publication context
if (node.isRemoteStatePublicationEnabled() == false) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -299,33 +299,37 @@ public static boolean assertNotMasterUpdateThread(String reason) {
}

private void runTasks(TaskInputs taskInputs) {
final String longSummary = logger.isTraceEnabled() ? taskInputs.taskSummaryGenerator.apply(true) : "";
final String shortSummary = taskInputs.taskSummaryGenerator.apply(false);
final String summary;
if (logger.isTraceEnabled()) {
summary = taskInputs.taskSummaryGenerator.apply(true);
} else {
summary = taskInputs.taskSummaryGenerator.apply(false);
}

if (!lifecycle.started()) {
logger.debug("processing [{}]: ignoring, cluster-manager service not started", shortSummary);
logger.debug("processing [{}]: ignoring, cluster-manager service not started", summary);
return;
}

if (logger.isTraceEnabled()) {
logger.trace("executing cluster state update for [{}]", longSummary);
logger.trace("executing cluster state update for [{}]", summary);
} else {
logger.debug("executing cluster state update for [{}]", shortSummary);
logger.debug("executing cluster state update for [{}]", summary);
}

final ClusterState previousClusterState = state();

if (!previousClusterState.nodes().isLocalNodeElectedClusterManager() && taskInputs.runOnlyWhenClusterManager()) {
logger.debug("failing [{}]: local node is no longer cluster-manager", shortSummary);
logger.debug("failing [{}]: local node is no longer cluster-manager", summary);
taskInputs.onNoLongerClusterManager();
return;
}

final long computationStartTime = threadPool.preciseRelativeTimeInNanos();
final TaskOutputs taskOutputs = calculateTaskOutputs(taskInputs, previousClusterState, shortSummary);
final TaskOutputs taskOutputs = calculateTaskOutputs(taskInputs, previousClusterState, summary);
taskOutputs.notifyFailedTasks();
final TimeValue computationTime = getTimeSince(computationStartTime);
logExecutionTime(computationTime, "compute cluster state update", shortSummary);
logExecutionTime(computationTime, "compute cluster state update", summary);

clusterManagerMetrics.recordLatency(
clusterManagerMetrics.clusterStateComputeHistogram,
Expand All @@ -337,25 +341,25 @@ private void runTasks(TaskInputs taskInputs) {
final long notificationStartTime = threadPool.preciseRelativeTimeInNanos();
taskOutputs.notifySuccessfulTasksOnUnchangedClusterState();
final TimeValue executionTime = getTimeSince(notificationStartTime);
logExecutionTime(executionTime, "notify listeners on unchanged cluster state", shortSummary);
logExecutionTime(executionTime, "notify listeners on unchanged cluster state", summary);
} else {
final ClusterState newClusterState = taskOutputs.newClusterState;
if (logger.isTraceEnabled()) {
logger.trace("cluster state updated, source [{}]\n{}", longSummary, newClusterState);
logger.trace("cluster state updated, source [{}]\n{}", summary, newClusterState);
} else {
logger.debug("cluster state updated, version [{}], source [{}]", newClusterState.version(), shortSummary);
logger.debug("cluster state updated, version [{}], source [{}]", newClusterState.version(), summary);
}
final long publicationStartTime = threadPool.preciseRelativeTimeInNanos();
try {
ClusterChangedEvent clusterChangedEvent = new ClusterChangedEvent(shortSummary, newClusterState, previousClusterState);
ClusterChangedEvent clusterChangedEvent = new ClusterChangedEvent(summary, newClusterState, previousClusterState);
// new cluster state, notify all listeners
final DiscoveryNodes.Delta nodesDelta = clusterChangedEvent.nodesDelta();
if (nodesDelta.hasChanges() && logger.isInfoEnabled()) {
String nodesDeltaSummary = nodesDelta.shortSummary();
if (nodesDeltaSummary.length() > 0) {
logger.info(
"{}, term: {}, version: {}, delta: {}",
shortSummary,
summary,
newClusterState.term(),
newClusterState.version(),
nodesDeltaSummary
Expand All @@ -366,7 +370,7 @@ private void runTasks(TaskInputs taskInputs) {
logger.debug("publishing cluster state version [{}]", newClusterState.version());
publish(clusterChangedEvent, taskOutputs, publicationStartTime);
} catch (Exception e) {
handleException(shortSummary, publicationStartTime, newClusterState, e);
handleException(summary, publicationStartTime, newClusterState, e);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,25 +195,37 @@ void runIfNotProcessed(BatchedTask updateTask) {
if (toExecute.isEmpty() == false) {
Function<Boolean, String> taskSummaryGenerator = (longSummaryRequired) -> {
if (longSummaryRequired == null || !longSummaryRequired) {
return buildShortSummary(updateTask.batchingKey, toExecute.size());
final List<BatchedTask> sampleTasks = toExecute.stream()
.limit(Math.min(1000, toExecute.size()))
.collect(Collectors.toList());
return buildShortSummary(updateTask.batchingKey, toExecute.size(), getSummary(updateTask, sampleTasks));
}
final Map<String, List<BatchedTask>> processTasksBySource = new HashMap<>();
for (final BatchedTask task : toExecute) {
processTasksBySource.computeIfAbsent(task.source, s -> new ArrayList<>()).add(task);
}
return processTasksBySource.entrySet().stream().map(entry -> {
String tasks = updateTask.describeTasks(entry.getValue());
return tasks.isEmpty() ? entry.getKey() : entry.getKey() + "[" + tasks + "]";
}).reduce((s1, s2) -> s1 + ", " + s2).orElse("");
return getSummary(updateTask, toExecute);
};
taskBatcherListener.onBeginProcessing(toExecute);
run(updateTask.batchingKey, toExecute, taskSummaryGenerator);
}
}
}

private String buildShortSummary(final Object batchingKey, final int taskCount) {
return "Tasks batched with key: " + batchingKey.toString().split("\\$")[0] + " and count: " + taskCount;
private String getSummary(final BatchedTask updateTask, final List<BatchedTask> toExecute) {
final Map<String, List<BatchedTask>> processTasksBySource = new HashMap<>();
for (final BatchedTask task : toExecute) {
processTasksBySource.computeIfAbsent(task.source, s -> new ArrayList<>()).add(task);
}
return processTasksBySource.entrySet().stream().map(entry -> {
String tasks = updateTask.describeTasks(entry.getValue());
return tasks.isEmpty() ? entry.getKey() : entry.getKey() + "[" + tasks + "]";
}).reduce((s1, s2) -> s1 + ", " + s2).orElse("");
}

private String buildShortSummary(final Object batchingKey, final int taskCount, final String sampleTasks) {
return "Tasks batched with key: "
+ batchingKey.toString().split("\\$")[0]
+ ", count:"
+ taskCount
+ " and sample tasks: "
+ sampleTasks;
}

/**
Expand Down
14 changes: 4 additions & 10 deletions server/src/main/java/org/opensearch/gateway/GatewayMetaState.java
Original file line number Diff line number Diff line change
Expand Up @@ -754,12 +754,8 @@ public void setLastAcceptedState(ClusterState clusterState) {
}
try {
final RemoteClusterStateManifestInfo manifestDetails;
// Decide the codec version
int codecVersion = ClusterMetadataManifest.getCodecForVersion(clusterState.nodes().getMinNodeVersion());
assert codecVersion >= 0 : codecVersion;
logger.info("codec version is {}", codecVersion);

if (shouldWriteFullClusterState(clusterState, codecVersion)) {
if (shouldWriteFullClusterState(clusterState)) {
final Optional<ClusterMetadataManifest> latestManifest = remoteClusterStateService.getLatestClusterMetadataManifest(
clusterState.getClusterName().value(),
clusterState.metadata().clusterUUID()
Expand All @@ -776,7 +772,7 @@ public void setLastAcceptedState(ClusterState clusterState) {
clusterState.metadata().clusterUUID()
);
}
manifestDetails = remoteClusterStateService.writeFullMetadata(clusterState, previousClusterUUID, codecVersion);
manifestDetails = remoteClusterStateService.writeFullMetadata(clusterState, previousClusterUUID);
} else {
assert verifyManifestAndClusterState(lastAcceptedManifest, lastAcceptedState) == true
: "Previous manifest and previous ClusterState are not in sync";
Expand Down Expand Up @@ -821,13 +817,11 @@ private boolean verifyManifestAndClusterState(ClusterMetadataManifest manifest,
return true;
}

private boolean shouldWriteFullClusterState(ClusterState clusterState, int codecVersion) {
assert lastAcceptedManifest == null || lastAcceptedManifest.getCodecVersion() <= codecVersion;
private boolean shouldWriteFullClusterState(ClusterState clusterState) {
if (lastAcceptedState == null
|| lastAcceptedManifest == null
|| (remoteClusterStateService.isRemotePublicationEnabled() == false && lastAcceptedState.term() != clusterState.term())
|| lastAcceptedManifest.getOpensearchVersion() != Version.CURRENT
|| lastAcceptedManifest.getCodecVersion() != codecVersion) {
|| lastAcceptedManifest.getOpensearchVersion() != Version.CURRENT) {
return true;
}
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -301,8 +301,7 @@ public RemoteClusterStateService(
* @return A manifest object which contains the details of uploaded entity metadata.
*/
@Nullable
public RemoteClusterStateManifestInfo writeFullMetadata(ClusterState clusterState, String previousClusterUUID, int codecVersion)
throws IOException {
public RemoteClusterStateManifestInfo writeFullMetadata(ClusterState clusterState, String previousClusterUUID) throws IOException {
final long startTimeNanos = relativeTimeNanosSupplier.getAsLong();
if (clusterState.nodes().isLocalNodeElectedClusterManager() == false) {
logger.error("Local node is not elected cluster manager. Exiting");
Expand Down Expand Up @@ -342,8 +341,7 @@ public RemoteClusterStateManifestInfo writeFullMetadata(ClusterState clusterStat
!remoteClusterStateValidationMode.equals(RemoteClusterStateValidationMode.NONE)
? new ClusterStateChecksum(clusterState, threadpool)
: null,
false,
codecVersion
false
);

final long durationMillis = TimeValue.nsecToMSec(relativeTimeNanosSupplier.getAsLong() - startTimeNanos);
Expand Down Expand Up @@ -551,8 +549,7 @@ public RemoteClusterStateManifestInfo writeIncrementalMetadata(
!remoteClusterStateValidationMode.equals(RemoteClusterStateValidationMode.NONE)
? new ClusterStateChecksum(clusterState, threadpool)
: null,
false,
previousManifest.getCodecVersion()
false
);

final long durationMillis = TimeValue.nsecToMSec(relativeTimeNanosSupplier.getAsLong() - startTimeNanos);
Expand Down Expand Up @@ -1024,8 +1021,7 @@ public RemoteClusterStateManifestInfo markLastStateAsCommitted(
!remoteClusterStateValidationMode.equals(RemoteClusterStateValidationMode.NONE)
? new ClusterStateChecksum(clusterState, threadpool)
: null,
true,
previousManifest.getCodecVersion()
true
);
if (!previousManifest.isClusterUUIDCommitted() && committedManifestDetails.getClusterMetadataManifest().isClusterUUIDCommitted()) {
remoteClusterStateCleanupManager.deleteStaleClusterUUIDs(clusterState, committedManifestDetails.getClusterMetadataManifest());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,7 @@ RemoteClusterStateManifestInfo uploadManifest(
String previousClusterUUID,
ClusterStateDiffManifest clusterDiffManifest,
ClusterStateChecksum clusterStateChecksum,
boolean committed,
int codecVersion
boolean committed
) {
synchronized (this) {
ClusterMetadataManifest.Builder manifestBuilder = ClusterMetadataManifest.builder();
Expand All @@ -112,7 +111,7 @@ RemoteClusterStateManifestInfo uploadManifest(
.opensearchVersion(Version.CURRENT)
.nodeId(nodeId)
.committed(committed)
.codecVersion(codecVersion)
.codecVersion(ClusterMetadataManifest.MANIFEST_CURRENT_CODEC_VERSION)
.indices(uploadedMetadataResult.uploadedIndexMetadata)
.previousClusterUUID(previousClusterUUID)
.clusterUUIDCommitted(clusterState.metadata().clusterUUIDCommitted())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@

import static java.util.Collections.emptyMap;
import static java.util.Collections.emptySet;
import static org.opensearch.gateway.remote.ClusterMetadataManifest.MANIFEST_CURRENT_CODEC_VERSION;
import static org.opensearch.gateway.remote.RemoteClusterStateService.REMOTE_PUBLICATION_SETTING_KEY;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX;
Expand Down Expand Up @@ -962,7 +961,7 @@ public void testHandlePrePublishAndCommitWhenRemoteStateEnabled() throws IOExcep
.previousClusterUUID(randomAlphaOfLength(10))
.clusterUUIDCommitted(true)
.build();
when(remoteClusterStateService.writeFullMetadata(clusterState, previousClusterUUID, MANIFEST_CURRENT_CODEC_VERSION)).thenReturn(
when(remoteClusterStateService.writeFullMetadata(clusterState, previousClusterUUID)).thenReturn(
new RemoteClusterStateManifestInfo(manifest, "path/to/manifest")
);

Expand All @@ -975,8 +974,7 @@ public void testHandlePrePublishAndCommitWhenRemoteStateEnabled() throws IOExcep

final CoordinationState coordinationState = createCoordinationState(persistedStateRegistry, node1, remoteStateSettings());
coordinationState.handlePrePublish(clusterState);
Mockito.verify(remoteClusterStateService, Mockito.times(1))
.writeFullMetadata(clusterState, previousClusterUUID, MANIFEST_CURRENT_CODEC_VERSION);
Mockito.verify(remoteClusterStateService, Mockito.times(1)).writeFullMetadata(clusterState, previousClusterUUID);
assertThat(persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE).getLastAcceptedState(), equalTo(clusterState));

when(remoteClusterStateService.markLastStateAsCommitted(any(), any(), eq(false))).thenReturn(
Expand Down
Loading

0 comments on commit fe937e5

Please sign in to comment.