Skip to content

Commit

Permalink
Include last-committed data in publication
Browse files Browse the repository at this point in the history
The cluster coordination consistency layer relies on a couple of fields
within `Metadata` which record the last _committed_ values on each node.
In contrast, the rest of the cluster state can only be changed at
_accept_ time.

In the past we would copy these fields over from the master on every
publication, but since #90101 we don't copy anything at all if the
`Metadata` is unchanged on the master. However, the master computes the
diff against the last _committed_ state whereas the receiving nodes
apply the diff to the last _accepted_ state, and this means if the
master sends a no-op `Metadata` diff then the receiving node will revert
its last-committed values to the ones included in the state it last
accepted.

With this commit we include the last-committed values alongside the
cluster state diff so that they are always copied properly.
  • Loading branch information
DaveCTurner committed Dec 9, 2022
1 parent a1c852a commit 173d526
Show file tree
Hide file tree
Showing 4 changed files with 73 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -556,27 +556,18 @@ public interface PersistedState extends Closeable {
* marked as committed.
*/
default void markLastAcceptedStateAsCommitted() {
final ClusterState lastAcceptedState = getLastAcceptedState();
Metadata.Builder metadataBuilder = null;
if (lastAcceptedState.getLastAcceptedConfiguration().equals(lastAcceptedState.getLastCommittedConfiguration()) == false) {
final CoordinationMetadata coordinationMetadata = CoordinationMetadata.builder(lastAcceptedState.coordinationMetadata())
.lastCommittedConfiguration(lastAcceptedState.getLastAcceptedConfiguration())
.build();
metadataBuilder = Metadata.builder(lastAcceptedState.metadata());
metadataBuilder.coordinationMetadata(coordinationMetadata);
}
assert lastAcceptedState.metadata().clusterUUID().equals(Metadata.UNKNOWN_CLUSTER_UUID) == false
: "received cluster state with empty cluster uuid: " + lastAcceptedState;
if (lastAcceptedState.metadata().clusterUUID().equals(Metadata.UNKNOWN_CLUSTER_UUID) == false
&& lastAcceptedState.metadata().clusterUUIDCommitted() == false) {
if (metadataBuilder == null) {
metadataBuilder = Metadata.builder(lastAcceptedState.metadata());
}
metadataBuilder.clusterUUIDCommitted(true);
final var lastAcceptedState = getLastAcceptedState();
final var hasClusterUuid = lastAcceptedState.metadata().clusterUUID().equals(Metadata.UNKNOWN_CLUSTER_UUID) == false;
assert hasClusterUuid : "received cluster state with empty cluster uuid: " + lastAcceptedState;

if (hasClusterUuid && lastAcceptedState.metadata().clusterUUIDCommitted() == false) {
logger.info("cluster UUID set to [{}]", lastAcceptedState.metadata().clusterUUID());
}
if (metadataBuilder != null) {
setLastAcceptedState(ClusterState.builder(lastAcceptedState).metadata(metadataBuilder).build());

final var adjustedMetadata = lastAcceptedState.metadata()
.withLastCommittedValues(hasClusterUuid, lastAcceptedState.getLastAcceptedConfiguration());
if (adjustedMetadata != lastAcceptedState.metadata()) {
setLastAcceptedState(ClusterState.builder(lastAcceptedState).metadata(adjustedMetadata).build());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@ public class PublicationTransportHandler {
TransportRequestOptions.Type.STATE
);

private static final Version INCLUDES_LAST_COMMITTED_DATA_VERSION = Version.V_8_7_0;

private final SerializationStatsTracker serializationStatsTracker = new SerializationStatsTracker();

public PublicationTransportHandler(
Expand Down Expand Up @@ -118,6 +120,7 @@ public PublishClusterStateStats stats() {

private PublishWithJoinResponse handleIncomingPublishRequest(BytesTransportRequest request) throws IOException {
final Compressor compressor = CompressorFactory.compressor(request.bytes());
final boolean includesLastCommittedData = request.version().onOrAfter(INCLUDES_LAST_COMMITTED_DATA_VERSION);
StreamInput in = request.bytes().streamInput();
try {
if (compressor != null) {
Expand Down Expand Up @@ -151,11 +154,28 @@ private PublishWithJoinResponse handleIncomingPublishRequest(BytesTransportReque
ClusterState incomingState;
try {
final Diff<ClusterState> diff;
final boolean clusterUuidCommitted;
final CoordinationMetadata.VotingConfiguration lastCommittedConfiguration;

// Close stream early to release resources used by the de-compression as early as possible
try (StreamInput input = in) {
diff = ClusterState.readDiffFrom(input, lastSeen.nodes().getLocalNode());
if (includesLastCommittedData) {
clusterUuidCommitted = in.readBoolean();
lastCommittedConfiguration = new CoordinationMetadata.VotingConfiguration(in);
} else {
clusterUuidCommitted = false;
lastCommittedConfiguration = null;
}
}
incomingState = diff.apply(lastSeen); // might throw IncompatibleClusterStateVersionException
if (includesLastCommittedData) {
final var adjustedMetadata = incomingState.metadata()
.withLastCommittedValues(clusterUuidCommitted, lastCommittedConfiguration);
if (adjustedMetadata != incomingState.metadata()) {
incomingState = ClusterState.builder(incomingState).metadata(adjustedMetadata).build();
}
}
} catch (IncompatibleClusterStateVersionException e) {
incompatibleClusterStateDiffReceivedCount.incrementAndGet();
throw e;
Expand Down Expand Up @@ -239,7 +259,8 @@ private ReleasableBytesReference serializeFullClusterState(ClusterState clusterS
}
}

private ReleasableBytesReference serializeDiffClusterState(long clusterStateVersion, Diff<ClusterState> diff, DiscoveryNode node) {
private ReleasableBytesReference serializeDiffClusterState(ClusterState newState, Diff<ClusterState> diff, DiscoveryNode node) {
final long clusterStateVersion = newState.version();
final Version nodeVersion = node.getVersion();
final RecyclerBytesStreamOutput bytesStream = transportService.newNetworkBytesStream();
boolean success = false;
Expand All @@ -253,6 +274,10 @@ private ReleasableBytesReference serializeDiffClusterState(long clusterStateVers
stream.setVersion(nodeVersion);
stream.writeBoolean(false);
diff.writeTo(stream);
if (nodeVersion.onOrAfter(INCLUDES_LAST_COMMITTED_DATA_VERSION)) {
stream.writeBoolean(newState.metadata().clusterUUIDCommitted());
newState.getLastCommittedConfiguration().writeTo(stream);
}
uncompressedBytes = stream.position();
} catch (IOException e) {
throw new ElasticsearchException("failed to serialize cluster state diff for publishing to node {}", e, node);
Expand Down Expand Up @@ -316,7 +341,7 @@ void buildDiffAndSerializeStates() {
} else {
serializedDiffs.computeIfAbsent(
node.getVersion(),
v -> serializeDiffClusterState(newState.version(), diffSupplier.getOrCompute(), node)
v -> serializeDiffClusterState(newState, diffSupplier.getOrCompute(), node)
);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -442,6 +442,42 @@ public Metadata withCoordinationMetadata(CoordinationMetadata coordinationMetada
);
}

public Metadata withLastCommittedValues(
boolean clusterUUIDCommitted,
CoordinationMetadata.VotingConfiguration lastCommittedConfiguration
) {
if (clusterUUIDCommitted == this.clusterUUIDCommitted
&& lastCommittedConfiguration.equals(this.coordinationMetadata.getLastCommittedConfiguration())) {
return this;
}
return new Metadata(
clusterUUID,
clusterUUIDCommitted,
version,
CoordinationMetadata.builder(coordinationMetadata).lastCommittedConfiguration(lastCommittedConfiguration).build(),
transientSettings,
persistentSettings,
settings,
hashesOfConsistentSettings,
totalNumberOfShards,
totalOpenIndexShards,
indices,
aliasedIndices,
templates,
customs,
allIndices,
visibleIndices,
allOpenIndices,
visibleOpenIndices,
allClosedIndices,
visibleClosedIndices,
indicesLookup,
mappingsByHash,
oldestIndexVersion,
reservedStateMetadata
);
}

/**
* Creates a copy of this instance updated with the given {@link IndexMetadata} that must only contain changes to primary terms
* and in-sync allocation ids relative to the existing entries. This method is only used by
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -568,7 +568,6 @@ public void testUnresponsiveLeaderDetectedEventually() {
}
}

@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/90158")
public void testUnhealthyLeaderIsReplaced() {
final AtomicReference<StatusInfo> nodeHealthServiceStatus = new AtomicReference<>(new StatusInfo(HEALTHY, "healthy-info"));
final int initialClusterSize = between(1, 3);
Expand Down

0 comments on commit 173d526

Please sign in to comment.