From fb756a9ca5bcd30c435bde1bfb82f5b82278b6bb Mon Sep 17 00:00:00 2001 From: David Turner Date: Mon, 12 Dec 2022 11:56:07 +0000 Subject: [PATCH] Include last-committed data in publication (#92259) The cluster coordination consistency layer relies on a couple of fields within `Metadata` which record the last _committed_ values on each node. In contrast, the rest of the cluster state can only be changed at _accept_ time. In the past we would copy these fields over from the master on every publication, but since #90101 we don't copy anything at all if the `Metadata` is unchanged on the master. However, the master computes the diff against the last _committed_ state whereas the receiving nodes apply the diff to the last _accepted_ state, and this means if the master sends a no-op `Metadata` diff then the receiving node will revert its last-committed values to the ones included in the state it last accepted. With this commit we include the last-committed values alongside the cluster state diff so that they are always copied properly. Closes #90158 --- docs/changelog/92259.yaml | 6 + .../PublicationTransportHandler.java | 31 +++- .../cluster/metadata/Metadata.java | 43 ++++- .../coordination/CoordinatorTests.java | 1 - .../PublicationTransportHandlerTests.java | 174 ++++++++++++++++++ 5 files changed, 251 insertions(+), 4 deletions(-) create mode 100644 docs/changelog/92259.yaml diff --git a/docs/changelog/92259.yaml b/docs/changelog/92259.yaml new file mode 100644 index 0000000000000..826d63dedf7fc --- /dev/null +++ b/docs/changelog/92259.yaml @@ -0,0 +1,6 @@ +pr: 92259 +summary: Include last-committed data in publication +area: Cluster Coordination +type: bug +issues: + - 90158 diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/PublicationTransportHandler.java b/server/src/main/java/org/elasticsearch/cluster/coordination/PublicationTransportHandler.java index 5dd308640cc90..11d64f1622415 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/PublicationTransportHandler.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/PublicationTransportHandler.java @@ -86,6 +86,8 @@ public class PublicationTransportHandler { TransportRequestOptions.Type.STATE ); + public static final Version INCLUDES_LAST_COMMITTED_DATA_VERSION = Version.V_8_7_0; + private final SerializationStatsTracker serializationStatsTracker = new SerializationStatsTracker(); public PublicationTransportHandler( @@ -131,6 +133,7 @@ private PublishWithJoinResponse handleIncomingPublishRequest(BytesTransportReque // Close early to release resources used by the de-compression as early as possible try (StreamInput input = in) { incomingState = ClusterState.readFrom(input, transportService.getLocalNode()); + assert input.read() == -1; } catch (Exception e) { logger.warn("unexpected error while deserializing an incoming cluster state", e); assert false : e; @@ -151,11 +154,30 @@ private PublishWithJoinResponse handleIncomingPublishRequest(BytesTransportReque ClusterState incomingState; try { final Diff diff; + final boolean includesLastCommittedData = request.version().onOrAfter(INCLUDES_LAST_COMMITTED_DATA_VERSION); + final boolean clusterUuidCommitted; + final CoordinationMetadata.VotingConfiguration lastCommittedConfiguration; + // Close stream early to release resources used by the de-compression as early as possible try (StreamInput input = in) { diff = ClusterState.readDiffFrom(input, lastSeen.nodes().getLocalNode()); + if (includesLastCommittedData) { + clusterUuidCommitted = in.readBoolean(); + lastCommittedConfiguration = new CoordinationMetadata.VotingConfiguration(in); + } else { + clusterUuidCommitted = false; + lastCommittedConfiguration = null; + } + assert input.read() == -1; } incomingState = diff.apply(lastSeen); // might throw IncompatibleClusterStateVersionException + if (includesLastCommittedData) { + final var adjustedMetadata = incomingState.metadata() + .withLastCommittedValues(clusterUuidCommitted, lastCommittedConfiguration); + if (adjustedMetadata != incomingState.metadata()) { + incomingState = ClusterState.builder(incomingState).metadata(adjustedMetadata).build(); + } + } } catch (IncompatibleClusterStateVersionException e) { incompatibleClusterStateDiffReceivedCount.incrementAndGet(); throw e; @@ -239,7 +261,8 @@ private ReleasableBytesReference serializeFullClusterState(ClusterState clusterS } } - private ReleasableBytesReference serializeDiffClusterState(long clusterStateVersion, Diff diff, DiscoveryNode node) { + private ReleasableBytesReference serializeDiffClusterState(ClusterState newState, Diff diff, DiscoveryNode node) { + final long clusterStateVersion = newState.version(); final Version nodeVersion = node.getVersion(); final RecyclerBytesStreamOutput bytesStream = transportService.newNetworkBytesStream(); boolean success = false; @@ -253,6 +276,10 @@ private ReleasableBytesReference serializeDiffClusterState(long clusterStateVers stream.setVersion(nodeVersion); stream.writeBoolean(false); diff.writeTo(stream); + if (nodeVersion.onOrAfter(INCLUDES_LAST_COMMITTED_DATA_VERSION)) { + stream.writeBoolean(newState.metadata().clusterUUIDCommitted()); + newState.getLastCommittedConfiguration().writeTo(stream); + } uncompressedBytes = stream.position(); } catch (IOException e) { throw new ElasticsearchException("failed to serialize cluster state diff for publishing to node {}", e, node); @@ -316,7 +343,7 @@ void buildDiffAndSerializeStates() { } else { serializedDiffs.computeIfAbsent( node.getVersion(), - v -> serializeDiffClusterState(newState.version(), diffSupplier.getOrCompute(), node) + v -> serializeDiffClusterState(newState, diffSupplier.getOrCompute(), node) ); } } diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/Metadata.java b/server/src/main/java/org/elasticsearch/cluster/metadata/Metadata.java index 7f450bf739230..f45a12b95da90 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/Metadata.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/Metadata.java @@ -22,6 +22,7 @@ import org.elasticsearch.cluster.block.ClusterBlock; import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.coordination.CoordinationMetadata; +import org.elasticsearch.cluster.coordination.PublicationTransportHandler; import org.elasticsearch.cluster.metadata.IndexAbstraction.ConcreteIndex; import org.elasticsearch.cluster.routing.RoutingTable; import org.elasticsearch.common.Strings; @@ -442,6 +443,42 @@ public Metadata withCoordinationMetadata(CoordinationMetadata coordinationMetada ); } + public Metadata withLastCommittedValues( + boolean clusterUUIDCommitted, + CoordinationMetadata.VotingConfiguration lastCommittedConfiguration + ) { + if (clusterUUIDCommitted == this.clusterUUIDCommitted + && lastCommittedConfiguration.equals(this.coordinationMetadata.getLastCommittedConfiguration())) { + return this; + } + return new Metadata( + clusterUUID, + clusterUUIDCommitted, + version, + CoordinationMetadata.builder(coordinationMetadata).lastCommittedConfiguration(lastCommittedConfiguration).build(), + transientSettings, + persistentSettings, + settings, + hashesOfConsistentSettings, + totalNumberOfShards, + totalOpenIndexShards, + indices, + aliasedIndices, + templates, + customs, + allIndices, + visibleIndices, + allOpenIndices, + visibleOpenIndices, + allClosedIndices, + visibleClosedIndices, + indicesLookup, + mappingsByHash, + oldestIndexVersion, + reservedStateMetadata + ); + } + /** * Creates a copy of this instance updated with the given {@link IndexMetadata} that must only contain changes to primary terms * and in-sync allocation ids relative to the existing entries. This method is only used by @@ -1380,6 +1417,7 @@ public Map getMappingsByHash() { private static class MetadataDiff implements Diff { private static final Version NOOP_METADATA_DIFF_VERSION = Version.V_8_5_0; + private static final Version NOOP_METADATA_DIFF_SAFE_VERSION = PublicationTransportHandler.INCLUDES_LAST_COMMITTED_DATA_VERSION; private final long version; private final String clusterUUID; @@ -1466,12 +1504,15 @@ private MetadataDiff(StreamInput in) throws IOException { @Override public void writeTo(StreamOutput out) throws IOException { - if (out.getVersion().onOrAfter(NOOP_METADATA_DIFF_VERSION)) { + if (out.getVersion().onOrAfter(NOOP_METADATA_DIFF_SAFE_VERSION)) { out.writeBoolean(empty); if (empty) { // noop diff return; } + } else if (out.getVersion().onOrAfter(NOOP_METADATA_DIFF_VERSION)) { + // noops are not safe with these versions, see #92259 + out.writeBoolean(false); } out.writeString(clusterUUID); out.writeBoolean(clusterUUIDCommitted); diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java index e24be3b79d5ab..e2dc48e6bd6ec 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java @@ -568,7 +568,6 @@ public void testUnresponsiveLeaderDetectedEventually() { } } - @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/90158") public void testUnhealthyLeaderIsReplaced() { final AtomicReference nodeHealthServiceStatus = new AtomicReference<>(new StatusInfo(HEALTHY, "healthy-info")); final int initialClusterSize = between(1, 3); diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/PublicationTransportHandlerTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/PublicationTransportHandlerTests.java index 52cc92f73c749..ccd34e74c610f 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/PublicationTransportHandlerTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/PublicationTransportHandlerTests.java @@ -16,6 +16,7 @@ import org.elasticsearch.cluster.Diff; import org.elasticsearch.cluster.IncompatibleClusterStateVersionException; import org.elasticsearch.cluster.coordination.CoordinationMetadata.VotingConfiguration; +import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodeRole; import org.elasticsearch.cluster.node.DiscoveryNodes; @@ -30,6 +31,7 @@ import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.MockPageCacheRecycler; +import org.elasticsearch.common.util.concurrent.DeterministicTaskQueue; import org.elasticsearch.core.IOUtils; import org.elasticsearch.core.Nullable; import org.elasticsearch.tasks.Task; @@ -42,17 +44,23 @@ import org.elasticsearch.transport.BytesRefRecycler; import org.elasticsearch.transport.BytesTransportRequest; import org.elasticsearch.transport.RemoteTransportException; +import org.elasticsearch.transport.TestTransportChannel; import org.elasticsearch.transport.TransportRequest; +import org.elasticsearch.transport.TransportResponse; import org.elasticsearch.transport.TransportService; import java.io.IOException; import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; import static java.util.Collections.emptyMap; import static org.elasticsearch.cluster.service.MasterService.STATE_UPDATE_ACTION_NAME; @@ -330,4 +338,170 @@ public void writeTo(StreamOutput out) throws IOException { } } + public void testIncludesLastCommittedFieldsInDiffSerialization() { + final var deterministicTaskQueue = new DeterministicTaskQueue(); + final var threadPool = deterministicTaskQueue.getThreadPool(); + + final var transportsByNode = new HashMap(); + final var transportHandlersByNode = new HashMap(); + final var transportServicesByNode = new HashMap(); + final var receivedStateRef = new AtomicReference(); + final var completed = new AtomicBoolean(); + + final var localNode = new DiscoveryNode("localNode", buildNewFakeTransportAddress(), Version.CURRENT); + final var otherNode = new DiscoveryNode( + "otherNode", + buildNewFakeTransportAddress(), + VersionUtils.randomCompatibleVersion(random(), Version.CURRENT) + ); + for (final var discoveryNode : List.of(localNode, otherNode)) { + final var transport = new MockTransport() { + @Override + protected void onSendRequest(long requestId, String action, TransportRequest request, DiscoveryNode node) { + @SuppressWarnings("unchecked") + final var context = (ResponseContext) getResponseHandlers().remove(requestId); + try { + transportsByNode.get(node) + .getRequestHandlers() + .getHandler(action) + .getHandler() + .messageReceived(request, new TestTransportChannel(new ActionListener<>() { + @Override + public void onResponse(TransportResponse transportResponse) { + context.handler().handleResponse(transportResponse); + } + + @Override + public void onFailure(Exception e) { + throw new AssertionError("unexpected", e); + } + }), new Task(randomNonNegativeLong(), "test", "test", "", TaskId.EMPTY_TASK_ID, Map.of())); + } catch (IncompatibleClusterStateVersionException e) { + context.handler().handleException(new RemoteTransportException("wrapped", e)); + } catch (Exception e) { + throw new AssertionError("unexpected", e); + } + } + }; + transportsByNode.put(discoveryNode, transport); + + final var transportService = transport.createTransportService( + Settings.EMPTY, + threadPool, + TransportService.NOOP_TRANSPORT_INTERCEPTOR, + ignored -> discoveryNode, + null, + Set.of() + ); + transportServicesByNode.put(discoveryNode, transportService); + + final var publicationTransportHandler = new PublicationTransportHandler( + transportService, + writableRegistry(), + publishRequest -> { + assertTrue(receivedStateRef.compareAndSet(null, publishRequest.getAcceptedState())); + return new PublishWithJoinResponse( + new PublishResponse(publishRequest.getAcceptedState().term(), publishRequest.getAcceptedState().version()), + Optional.empty() + ); + } + ); + transportHandlersByNode.put(discoveryNode, publicationTransportHandler); + } + + for (final var transportService : transportServicesByNode.values()) { + transportService.start(); + transportService.acceptIncomingRequests(); + } + + threadPool.getThreadContext().markAsSystemContext(); + + final var clusterState0 = ClusterState.builder(ClusterState.EMPTY_STATE) + .nodes(DiscoveryNodes.builder().add(localNode).add(otherNode).localNodeId(localNode.getId()).masterNodeId(localNode.getId())) + .metadata( + Metadata.builder() + .coordinationMetadata( + CoordinationMetadata.builder().lastAcceptedConfiguration(VotingConfiguration.of(localNode)).build() + ) + .generateClusterUuidIfNeeded() + ) + .build(); + + final ClusterState receivedState0; + var context0 = transportHandlersByNode.get(localNode) + .newPublicationContext( + new ClusterStatePublicationEvent( + new BatchSummary("test"), + clusterState0, + clusterState0, + new Task(randomNonNegativeLong(), "test", "test", "", TaskId.EMPTY_TASK_ID, Map.of()), + 0L, + 0L + ) + ); + try { + context0.sendPublishRequest( + otherNode, + new PublishRequest(clusterState0), + ActionListener.wrap(() -> assertTrue(completed.compareAndSet(false, true))) + ); + assertTrue(completed.getAndSet(false)); + receivedState0 = receivedStateRef.getAndSet(null); + assertEquals(clusterState0.stateUUID(), receivedState0.stateUUID()); + assertEquals(otherNode, receivedState0.nodes().getLocalNode()); + assertFalse(receivedState0.metadata().clusterUUIDCommitted()); + assertEquals(VotingConfiguration.of(), receivedState0.getLastCommittedConfiguration()); + final var receivedStateStats = transportHandlersByNode.get(otherNode).stats(); + assertEquals(0, receivedStateStats.getCompatibleClusterStateDiffReceivedCount()); + assertEquals(1, receivedStateStats.getIncompatibleClusterStateDiffReceivedCount()); + assertEquals(1, receivedStateStats.getFullClusterStateReceivedCount()); + } finally { + context0.decRef(); + } + + final var committedClusterState0 = ClusterState.builder(clusterState0) + .metadata(clusterState0.metadata().withLastCommittedValues(true, clusterState0.getLastAcceptedConfiguration())) + .build(); + assertEquals(clusterState0.stateUUID(), committedClusterState0.stateUUID()); + assertEquals(clusterState0.term(), committedClusterState0.term()); + assertEquals(clusterState0.version(), committedClusterState0.version()); + + final var clusterState1 = ClusterState.builder(committedClusterState0).incrementVersion().build(); + assertSame(committedClusterState0.metadata(), clusterState1.metadata()); + + var context1 = transportHandlersByNode.get(localNode) + .newPublicationContext( + new ClusterStatePublicationEvent( + new BatchSummary("test"), + committedClusterState0, + clusterState1, + new Task(randomNonNegativeLong(), "test", "test", "", TaskId.EMPTY_TASK_ID, Map.of()), + 0L, + 0L + ) + ); + try { + context1.sendPublishRequest( + otherNode, + new PublishRequest(clusterState1), + ActionListener.wrap(() -> assertTrue(completed.compareAndSet(false, true))) + ); + assertTrue(completed.getAndSet(false)); + var receivedState1 = receivedStateRef.getAndSet(null); + assertEquals(clusterState1.stateUUID(), receivedState1.stateUUID()); + assertEquals(otherNode, receivedState1.nodes().getLocalNode()); + assertSame(receivedState0.nodes(), receivedState1.nodes()); // it was a diff + assertTrue(receivedState1.metadata().clusterUUIDCommitted()); + assertEquals(VotingConfiguration.of(localNode), receivedState1.getLastCommittedConfiguration()); + final var receivedStateStats = transportHandlersByNode.get(otherNode).stats(); + assertEquals(1, receivedStateStats.getCompatibleClusterStateDiffReceivedCount()); + assertEquals(1, receivedStateStats.getIncompatibleClusterStateDiffReceivedCount()); + assertEquals(1, receivedStateStats.getFullClusterStateReceivedCount()); + } finally { + context1.decRef(); + } + + assertFalse(deterministicTaskQueue.hasRunnableTasks()); + assertFalse(deterministicTaskQueue.hasDeferredTasks()); + } }