Skip to content

Commit

Permalink
using publish cluster-state as fallback
Browse files Browse the repository at this point in the history
  • Loading branch information
rajiv-kv committed Aug 23, 2024
1 parent 92463f4 commit 0edeb40
Show file tree
Hide file tree
Showing 6 changed files with 35 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -381,7 +381,12 @@ public void handleResponse(GetTermVersionResponse response) {
if (isLatestClusterStatePresentOnLocalNode) {
onLatestLocalState.accept(clusterState);
} else {
onStaleLocalState.accept(clusterManagerNode, clusterState);
ClusterState publishState = clusterService.publishState();
if (publishState != null && response.matches(publishState)) {
onLatestLocalState.accept(publishState);
} else {
onStaleLocalState.accept(clusterManagerNode, clusterState);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -464,6 +464,10 @@ && getCurrentTerm() == ZEN1_BWC_TERM
ensureTermAtLeast(sourceNode, publishRequest.getAcceptedState().term());
final PublishResponse publishResponse = coordinationState.get().handlePublishRequest(publishRequest);

final ClusterState publishState = hideStateIfNotRecovered(coordinationState.get().getLastAcceptedState());
applierState = mode == Mode.CANDIDATE ? clusterStateWithNoClusterManagerBlock(publishState) : publishState;
clusterApplier.onPublishClusterState(publishRequest.toString(), () -> publishState);

if (sourceNode.equals(getLocalNode())) {
preVoteCollector.update(getPreVoteResponse(), getLocalNode());
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ public interface ClusterApplier {
*/
void setInitialState(ClusterState initialState);

void onPublishClusterState(String source, Supplier<ClusterState> clusterStateSupplier);

/**
* Method to invoke when a new cluster state is available to be applied
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ public class ClusterApplierService extends AbstractLifecycleComponent implements

private final Collection<ClusterStateListener> clusterStateListeners = new CopyOnWriteArrayList<>();
private final Map<TimeoutClusterStateListener, NotifyTimeout> timeoutClusterStateListeners = new ConcurrentHashMap<>();
private final AtomicReference<ClusterState> publishState; // last published state

private final AtomicReference<ClusterState> state; // last applied state

Expand All @@ -139,6 +140,7 @@ public ClusterApplierService(
) {
this.clusterSettings = clusterSettings;
this.threadPool = threadPool;
this.publishState = new AtomicReference<>();
this.state = new AtomicReference<>();
this.nodeName = nodeName;

Expand Down Expand Up @@ -232,6 +234,10 @@ public ClusterState state() {
return clusterState;
}

public ClusterState publishState() {
return publishState.get();
}

/**
* Returns true if the appliedClusterState is not null
*/
Expand Down Expand Up @@ -367,6 +373,14 @@ public ThreadPool threadPool() {
return threadPool;
}

@Override
public void onPublishClusterState(final String source, final Supplier<ClusterState> clusterStateSupplier) {
ClusterState nextState = clusterStateSupplier.get();
if (nextState != null) {
publishState.set(nextState);
}
}

@Override
public void onNewClusterState(
final String source,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,10 @@ public ClusterState state() {
return clusterApplierService.state();
}

public ClusterState publishState() {
return clusterApplierService.publishState();
}

/**
* Adds a high priority applier of updated cluster states.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,11 @@ public void setInitialState(ClusterState initialState) {

}

@Override
public void onPublishClusterState(String source, Supplier<ClusterState> clusterStateSupplier) {

}

@Override
public void onNewClusterState(String source, Supplier<ClusterState> clusterStateSupplier, ClusterApplyListener listener) {
listener.onSuccess(source);
Expand Down

0 comments on commit 0edeb40

Please sign in to comment.