Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Include last-committed data in publication #92259

Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/changelog/92259.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 92259
summary: Include last-committed data in publication
area: Cluster Coordination
type: bug
issues:
- 90158
Original file line number Diff line number Diff line change
Expand Up @@ -556,27 +556,18 @@ public interface PersistedState extends Closeable {
* marked as committed.
*/
default void markLastAcceptedStateAsCommitted() {
final ClusterState lastAcceptedState = getLastAcceptedState();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the code is cleaner, but I failed to spot the important difference, is it just the logging you wanted out of this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this does anything different, there's just no need to use a Metadata.Builder here any more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. In that case, let us revert this and move this to a follow-up, where we can also assume that assertions are held (i.e., use true instead of hasClusterUuid later in the code).

Metadata.Builder metadataBuilder = null;
if (lastAcceptedState.getLastAcceptedConfiguration().equals(lastAcceptedState.getLastCommittedConfiguration()) == false) {
final CoordinationMetadata coordinationMetadata = CoordinationMetadata.builder(lastAcceptedState.coordinationMetadata())
.lastCommittedConfiguration(lastAcceptedState.getLastAcceptedConfiguration())
.build();
metadataBuilder = Metadata.builder(lastAcceptedState.metadata());
metadataBuilder.coordinationMetadata(coordinationMetadata);
}
assert lastAcceptedState.metadata().clusterUUID().equals(Metadata.UNKNOWN_CLUSTER_UUID) == false
: "received cluster state with empty cluster uuid: " + lastAcceptedState;
if (lastAcceptedState.metadata().clusterUUID().equals(Metadata.UNKNOWN_CLUSTER_UUID) == false
&& lastAcceptedState.metadata().clusterUUIDCommitted() == false) {
if (metadataBuilder == null) {
metadataBuilder = Metadata.builder(lastAcceptedState.metadata());
}
metadataBuilder.clusterUUIDCommitted(true);
final var lastAcceptedState = getLastAcceptedState();
final var hasClusterUuid = lastAcceptedState.metadata().clusterUUID().equals(Metadata.UNKNOWN_CLUSTER_UUID) == false;
assert hasClusterUuid : "received cluster state with empty cluster uuid: " + lastAcceptedState;

if (hasClusterUuid && lastAcceptedState.metadata().clusterUUIDCommitted() == false) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Feels odd to protect the logging by something we assert on above?

Suggested change
if (hasClusterUuid && lastAcceptedState.metadata().clusterUUIDCommitted() == false) {
if (lastAcceptedState.metadata().clusterUUIDCommitted() == false) {

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ehh I just kept the same logic as before but I don't think I've ever seen this assertion trip. I'd be ok to drop it.

logger.info("cluster UUID set to [{}]", lastAcceptedState.metadata().clusterUUID());
}
if (metadataBuilder != null) {
setLastAcceptedState(ClusterState.builder(lastAcceptedState).metadata(metadataBuilder).build());

final var adjustedMetadata = lastAcceptedState.metadata()
.withLastCommittedValues(hasClusterUuid, lastAcceptedState.getLastAcceptedConfiguration());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we pass in hasClusterUuid || lastAcceptedState.metadata().clusterUUIDCommitted() here? I mean, we are outside assertions already, so could also pass in true. But assuming assertions have some case we are trying to protect against, passing in the original committed value seems safer and also in line with original code.

if (adjustedMetadata != lastAcceptedState.metadata()) {
setLastAcceptedState(ClusterState.builder(lastAcceptedState).metadata(adjustedMetadata).build());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@ public class PublicationTransportHandler {
TransportRequestOptions.Type.STATE
);

public static final Version INCLUDES_LAST_COMMITTED_DATA_VERSION = Version.V_8_7_0;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@DaveCTurner should this be 8.6.0 since it's intended to be backported?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll adjust that while backporting. If I set this to 8.6.0 now then the BwC test suite would fail on this PR, but it's important for those tests to pass because they tell us whether there are any other more subtle BwC problems here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Joining the dots here:


private final SerializationStatsTracker serializationStatsTracker = new SerializationStatsTracker();

public PublicationTransportHandler(
Expand Down Expand Up @@ -131,6 +133,7 @@ private PublishWithJoinResponse handleIncomingPublishRequest(BytesTransportReque
// Close early to release resources used by the de-compression as early as possible
try (StreamInput input = in) {
incomingState = ClusterState.readFrom(input, transportService.getLocalNode());
assert input.read() == -1;
} catch (Exception e) {
logger.warn("unexpected error while deserializing an incoming cluster state", e);
assert false : e;
Expand All @@ -151,11 +154,30 @@ private PublishWithJoinResponse handleIncomingPublishRequest(BytesTransportReque
ClusterState incomingState;
try {
final Diff<ClusterState> diff;
final boolean includesLastCommittedData = request.version().onOrAfter(INCLUDES_LAST_COMMITTED_DATA_VERSION);
final boolean clusterUuidCommitted;
final CoordinationMetadata.VotingConfiguration lastCommittedConfiguration;

// Close stream early to release resources used by the de-compression as early as possible
try (StreamInput input = in) {
diff = ClusterState.readDiffFrom(input, lastSeen.nodes().getLocalNode());
if (includesLastCommittedData) {
clusterUuidCommitted = in.readBoolean();
lastCommittedConfiguration = new CoordinationMetadata.VotingConfiguration(in);
} else {
clusterUuidCommitted = false;
lastCommittedConfiguration = null;
}
assert input.read() == -1;
}
incomingState = diff.apply(lastSeen); // might throw IncompatibleClusterStateVersionException
if (includesLastCommittedData) {
final var adjustedMetadata = incomingState.metadata()
.withLastCommittedValues(clusterUuidCommitted, lastCommittedConfiguration);
if (adjustedMetadata != incomingState.metadata()) {
incomingState = ClusterState.builder(incomingState).metadata(adjustedMetadata).build();
}
}
} catch (IncompatibleClusterStateVersionException e) {
incompatibleClusterStateDiffReceivedCount.incrementAndGet();
throw e;
Expand Down Expand Up @@ -239,7 +261,8 @@ private ReleasableBytesReference serializeFullClusterState(ClusterState clusterS
}
}

private ReleasableBytesReference serializeDiffClusterState(long clusterStateVersion, Diff<ClusterState> diff, DiscoveryNode node) {
private ReleasableBytesReference serializeDiffClusterState(ClusterState newState, Diff<ClusterState> diff, DiscoveryNode node) {
final long clusterStateVersion = newState.version();
final Version nodeVersion = node.getVersion();
final RecyclerBytesStreamOutput bytesStream = transportService.newNetworkBytesStream();
boolean success = false;
Expand All @@ -253,6 +276,10 @@ private ReleasableBytesReference serializeDiffClusterState(long clusterStateVers
stream.setVersion(nodeVersion);
stream.writeBoolean(false);
diff.writeTo(stream);
if (nodeVersion.onOrAfter(INCLUDES_LAST_COMMITTED_DATA_VERSION)) {
stream.writeBoolean(newState.metadata().clusterUUIDCommitted());
newState.getLastCommittedConfiguration().writeTo(stream);
}
uncompressedBytes = stream.position();
} catch (IOException e) {
throw new ElasticsearchException("failed to serialize cluster state diff for publishing to node {}", e, node);
Expand Down Expand Up @@ -316,7 +343,7 @@ void buildDiffAndSerializeStates() {
} else {
serializedDiffs.computeIfAbsent(
node.getVersion(),
v -> serializeDiffClusterState(newState.version(), diffSupplier.getOrCompute(), node)
v -> serializeDiffClusterState(newState, diffSupplier.getOrCompute(), node)
);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.elasticsearch.cluster.block.ClusterBlock;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.coordination.CoordinationMetadata;
import org.elasticsearch.cluster.coordination.PublicationTransportHandler;
import org.elasticsearch.cluster.metadata.IndexAbstraction.ConcreteIndex;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.common.Strings;
Expand Down Expand Up @@ -442,6 +443,42 @@ public Metadata withCoordinationMetadata(CoordinationMetadata coordinationMetada
);
}

public Metadata withLastCommittedValues(
boolean clusterUUIDCommitted,
CoordinationMetadata.VotingConfiguration lastCommittedConfiguration
) {
if (clusterUUIDCommitted == this.clusterUUIDCommitted
&& lastCommittedConfiguration.equals(this.coordinationMetadata.getLastCommittedConfiguration())) {
return this;
}
return new Metadata(
clusterUUID,
clusterUUIDCommitted,
version,
CoordinationMetadata.builder(coordinationMetadata).lastCommittedConfiguration(lastCommittedConfiguration).build(),
transientSettings,
persistentSettings,
settings,
hashesOfConsistentSettings,
totalNumberOfShards,
totalOpenIndexShards,
indices,
aliasedIndices,
templates,
customs,
allIndices,
visibleIndices,
allOpenIndices,
visibleOpenIndices,
allClosedIndices,
visibleClosedIndices,
indicesLookup,
mappingsByHash,
oldestIndexVersion,
reservedStateMetadata
);
}

/**
* Creates a copy of this instance updated with the given {@link IndexMetadata} that must only contain changes to primary terms
* and in-sync allocation ids relative to the existing entries. This method is only used by
Expand Down Expand Up @@ -1380,6 +1417,7 @@ public Map<String, MappingMetadata> getMappingsByHash() {
private static class MetadataDiff implements Diff<Metadata> {

private static final Version NOOP_METADATA_DIFF_VERSION = Version.V_8_5_0;
private static final Version NOOP_METADATA_DIFF_SAFE_VERSION = PublicationTransportHandler.INCLUDES_LAST_COMMITTED_DATA_VERSION;

private final long version;
private final String clusterUUID;
Expand Down Expand Up @@ -1466,12 +1504,15 @@ private MetadataDiff(StreamInput in) throws IOException {

@Override
public void writeTo(StreamOutput out) throws IOException {
if (out.getVersion().onOrAfter(NOOP_METADATA_DIFF_VERSION)) {
if (out.getVersion().onOrAfter(NOOP_METADATA_DIFF_SAFE_VERSION)) {
out.writeBoolean(empty);
if (empty) {
// noop diff
return;
}
} else if (out.getVersion().onOrAfter(NOOP_METADATA_DIFF_VERSION)) {
// noops are not safe with these versions, see #92259
out.writeBoolean(false);
}
out.writeString(clusterUUID);
out.writeBoolean(clusterUUIDCommitted);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -568,7 +568,6 @@ public void testUnresponsiveLeaderDetectedEventually() {
}
}

@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/90158")
public void testUnhealthyLeaderIsReplaced() {
final AtomicReference<StatusInfo> nodeHealthServiceStatus = new AtomicReference<>(new StatusInfo(HEALTHY, "healthy-info"));
final int initialClusterSize = between(1, 3);
Expand Down
Loading